From 8a51dbc4bde76733362c36ddef632351324e62d0 Mon Sep 17 00:00:00 2001 From: Dominik Sander Date: Mon, 12 May 2014 00:04:43 +0200 Subject: [PATCH 01/23] Provide an optional threaded background worker Due to the three background workers (scheduler, twitter stream and delayed job) huginn needs a lot of memory to run (about 520MB on my dev machine). This PR introduces an optional threaded background worker which combines the three current separated processed into one (reducing the memory footprint to ~260MB). Since just one instance of the of every background processor is running at a time there should not be any threading related issues. The main gotcha of this is, that it's most likely not possible to run multiple delayed job workers concurrently. The ultimate solution would probably be switching to sidekiq with sidetiq as scheduler, but that is a different task :) When running on MRI the GIL should not be an issue because it is released for most IO bound operations (waiting for the database/website/sleeping). --- Procfile | 4 + bin/schedule.rb | 82 ------------------- bin/threaded.rb | 57 +++++++++++++ bin/twitter_stream.rb | 113 +------------------------- config/initializers/delayed_job.rb | 1 + lib/huginn_scheduler.rb | 87 ++++++++++++++++++++ lib/twitter_stream.rb | 125 +++++++++++++++++++++++++++++ 7 files changed, 275 insertions(+), 194 deletions(-) create mode 100644 bin/threaded.rb create mode 100644 lib/huginn_scheduler.rb create mode 100644 lib/twitter_stream.rb diff --git a/Procfile b/Procfile index 86fee247..a6391525 100644 --- a/Procfile +++ b/Procfile @@ -4,6 +4,10 @@ schedule: bundle exec rails runner bin/schedule.rb twitter: bundle exec rails runner bin/twitter_stream.rb dj: bundle exec script/delayed_job run +# Procfile for the exprimental threaded scheduler, twitter stream and delayed job +#web: bundle exec rails server +#jobs: bundle exec rails runner bin/threaded.rb + # Possible Profile configuration for production: # web: bundle exec unicorn -c config/unicorn/production.rb # schedule: bundle exec rails runner bin/schedule.rb diff --git a/bin/schedule.rb b/bin/schedule.rb index 43e06d0c..7a29fd4c 100755 --- a/bin/schedule.rb +++ b/bin/schedule.rb @@ -11,87 +11,5 @@ unless defined?(Rails) exit 1 end -require 'rufus/scheduler' - -class HuginnScheduler - attr_accessor :mutex - - def run_schedule(time) - with_mutex do - puts "Queuing schedule for #{time}" - Agent.delay.run_schedule(time) - end - end - - def propagate! - with_mutex do - puts "Queuing event propagation" - Agent.delay.receive! - end - end - - def cleanup_expired_events! - with_mutex do - puts "Running event cleanup" - Event.delay.cleanup_expired! - end - end - - def with_mutex - ActiveRecord::Base.connection_pool.with_connection do - mutex.synchronize do - yield - end - end - end - - def run! - self.mutex = Mutex.new - - rufus_scheduler = Rufus::Scheduler.new - - tzinfo_friendly_timezone = ActiveSupport::TimeZone::MAPPING[ENV['TIMEZONE'].present? ? ENV['TIMEZONE'] : "Pacific Time (US & Canada)"] - - # Schedule event propagation. - - rufus_scheduler.every '1m' do - propagate! - end - - # Schedule event cleanup. - - rufus_scheduler.cron "0 0 * * * " + tzinfo_friendly_timezone do - cleanup_expired_events! - end - - # Schedule repeating events. - - %w[1m 2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule| - rufus_scheduler.every schedule do - run_schedule "every_#{schedule}" - end - end - - # Schedule events for specific times. - - # Times are assumed to be in PST for now. Can store a user#timezone later. - 24.times do |hour| - rufus_scheduler.cron "0 #{hour} * * * " + tzinfo_friendly_timezone do - if hour == 0 - run_schedule "midnight" - elsif hour < 12 - run_schedule "#{hour}am" - elsif hour == 12 - run_schedule "noon" - else - run_schedule "#{hour - 12}pm" - end - end - end - - rufus_scheduler.join - end -end - scheduler = HuginnScheduler.new scheduler.run! \ No newline at end of file diff --git a/bin/threaded.rb b/bin/threaded.rb new file mode 100644 index 00000000..8d44dd72 --- /dev/null +++ b/bin/threaded.rb @@ -0,0 +1,57 @@ +require 'thread' + +def stop + puts 'Exiting...' + @scheduler.stop + @dj.stop + @stream.stop +end + +def safely(&block) + begin + yield block + rescue StandardError => e + STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n" + STDERR.puts "Terminating myself ..." + stop + end +end + +threads = [] +threads << Thread.new do + safely do + @stream = TwitterStream.new + @stream.run + puts "Twitter stream stopped ..." + end +end + +threads << Thread.new do + safely do + @scheduler = HuginnScheduler.new + @scheduler.run! + puts "Scheduler stopped ..." + end +end + +threads << Thread.new do + safely do + require 'delayed/command' + @dj = Delayed::Worker.new + @dj.start + puts "Delayed job stopped ..." + end +end + +# We need to wait a bit to let delayed_job set it's traps so we can override them +sleep 0.5 + +trap('TERM') do + stop +end + +trap('INT') do + stop +end + +threads.collect { |t| t.join } diff --git a/bin/twitter_stream.rb b/bin/twitter_stream.rb index 945f4a32..504ce97a 100755 --- a/bin/twitter_stream.rb +++ b/bin/twitter_stream.rb @@ -12,115 +12,4 @@ unless defined?(Rails) exit 1 end -require 'cgi' -require 'json' -require 'twitter/json_stream' -require 'em-http-request' -require 'pp' - -def stream!(filters, agent, &block) - stream = Twitter::JSONStream.connect( - :path => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}", - :ssl => true, - :oauth => { - :consumer_key => agent.twitter_consumer_key, - :consumer_secret => agent.twitter_consumer_secret, - :access_key => agent.twitter_oauth_token, - :access_secret => agent.twitter_oauth_token_secret - } - ) - - stream.each_item do |status| - status = JSON.parse(status) if status.is_a?(String) - next unless status - next if status.has_key?('delete') - next unless status['text'] - status['text'] = status['text'].gsub(/</, "<").gsub(/>/, ">").gsub(/[\t\n\r]/, ' ') - block.call(status) - end - - stream.on_error do |message| - STDERR.puts " --> Twitter error: #{message} <--" - end - - stream.on_no_data do |message| - STDERR.puts " --> Got no data for awhile; trying to reconnect." - EventMachine::stop_event_loop - end - - stream.on_max_reconnects do |timeout, retries| - STDERR.puts " --> Oops, tried too many times! <--" - EventMachine::stop_event_loop - end -end - -def load_and_run(agents) - agents.group_by { |agent| agent.twitter_oauth_token }.each do |oauth_token, agents| - filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m } - - agents.each do |agent| - agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter| - filter_to_agent_map[filter] << agent - end - end - - recent_tweets = [] - - stream!(filter_to_agent_map.keys, agents.first) do |status| - if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash) - puts "Skipping retweet: #{status["text"]}" - elsif recent_tweets.include?(status["id_str"]) - puts "Skipping duplicate tweet: #{status["text"]}" - else - recent_tweets << status["id_str"] - recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH - puts status["text"] - filter_to_agent_map.keys.each do |filter| - if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson - filter_to_agent_map[filter].each do |agent| - puts " -> #{agent.name}" - agent.process_tweet(filter, status) - end - end - end - end - end - end -end - -RELOAD_TIMEOUT = 10.minutes -DUPLICATE_DETECTION_LENGTH = 1000 -SEPARATOR = /[^\w_\-]+/ - -while true - begin - agents = Agents::TwitterStreamAgent.all - - EventMachine::run do - EventMachine.add_periodic_timer(RELOAD_TIMEOUT) { - puts "Reloading EventMachine and all Agents..." - EventMachine::stop_event_loop - } - - if agents.length == 0 - puts "No agents found. Will look again in a minute." - sleep 60 - EventMachine::stop_event_loop - else - puts "Found #{agents.length} agent(s). Loading them now..." - load_and_run agents - end - end - - print "Pausing..."; STDOUT.flush - sleep 1 - puts "done." - rescue SignalException, SystemExit - EventMachine::stop_event_loop if EventMachine.reactor_running? - exit - rescue StandardError => e - STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n" - STDERR.puts "Waiting for a couple of minutes..." - sleep 120 - end -end \ No newline at end of file +TwitterStream.new.run \ No newline at end of file diff --git a/config/initializers/delayed_job.rb b/config/initializers/delayed_job.rb index e9560a59..084d4b93 100644 --- a/config/initializers/delayed_job.rb +++ b/config/initializers/delayed_job.rb @@ -1,6 +1,7 @@ Delayed::Worker.destroy_failed_jobs = true Delayed::Worker.max_attempts = 5 Delayed::Worker.max_run_time = 20.minutes +Delayed::Worker.read_ahead = 5 Delayed::Worker.default_priority = 10 Delayed::Worker.delay_jobs = !Rails.env.test? diff --git a/lib/huginn_scheduler.rb b/lib/huginn_scheduler.rb new file mode 100644 index 00000000..fa72b7ea --- /dev/null +++ b/lib/huginn_scheduler.rb @@ -0,0 +1,87 @@ +require 'rufus/scheduler' + +class HuginnScheduler + attr_accessor :mutex + + def initialize + @rufus_scheduler = Rufus::Scheduler.new + end + + def stop + @rufus_scheduler.stop + end + + def run_schedule(time) + with_mutex do + puts "Queuing schedule for #{time}" + Agent.delay.run_schedule(time) + end + end + + def propagate! + with_mutex do + puts "Queuing event propagation" + Agent.delay.receive! + end + end + + def cleanup_expired_events! + with_mutex do + puts "Running event cleanup" + Event.delay.cleanup_expired! + end + end + + def with_mutex + ActiveRecord::Base.connection_pool.with_connection do + mutex.synchronize do + yield + end + end + end + + def run! + self.mutex = Mutex.new + + tzinfo_friendly_timezone = ActiveSupport::TimeZone::MAPPING[ENV['TIMEZONE'].present? ? ENV['TIMEZONE'] : "Pacific Time (US & Canada)"] + + # Schedule event propagation. + + @rufus_scheduler.every '1m' do + propagate! + end + + # Schedule event cleanup. + + @rufus_scheduler.cron "0 0 * * * " + tzinfo_friendly_timezone do + cleanup_expired_events! + end + + # Schedule repeating events. + + %w[1m 2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule| + @rufus_scheduler.every schedule do + run_schedule "every_#{schedule}" + end + end + + # Schedule events for specific times. + + # Times are assumed to be in PST for now. Can store a user#timezone later. + 24.times do |hour| + @rufus_scheduler.cron "0 #{hour} * * * " + tzinfo_friendly_timezone do + if hour == 0 + run_schedule "midnight" + elsif hour < 12 + run_schedule "#{hour}am" + elsif hour == 12 + run_schedule "noon" + else + run_schedule "#{hour - 12}pm" + end + end + end + + @rufus_scheduler.join + end +end diff --git a/lib/twitter_stream.rb b/lib/twitter_stream.rb new file mode 100644 index 00000000..3aac876a --- /dev/null +++ b/lib/twitter_stream.rb @@ -0,0 +1,125 @@ +require 'cgi' +require 'json' +require 'twitter/json_stream' +require 'em-http-request' +require 'pp' + +class TwitterStream + def initialize + @running = true + end + + def stop + @running = false + EventMachine::stop_event_loop if EventMachine.reactor_running? + end + + def stream!(filters, agent, &block) + stream = Twitter::JSONStream.connect( + :path => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}", + :ssl => true, + :oauth => { + :consumer_key => agent.twitter_consumer_key, + :consumer_secret => agent.twitter_consumer_secret, + :access_key => agent.twitter_oauth_token, + :access_secret => agent.twitter_oauth_token_secret + } + ) + + stream.each_item do |status| + status = JSON.parse(status) if status.is_a?(String) + next unless status + next if status.has_key?('delete') + next unless status['text'] + status['text'] = status['text'].gsub(/</, "<").gsub(/>/, ">").gsub(/[\t\n\r]/, ' ') + block.call(status) + end + + stream.on_error do |message| + STDERR.puts " --> Twitter error: #{message} <--" + end + + stream.on_no_data do |message| + STDERR.puts " --> Got no data for awhile; trying to reconnect." + EventMachine::stop_event_loop + end + + stream.on_max_reconnects do |timeout, retries| + STDERR.puts " --> Oops, tried too many times! <--" + EventMachine::stop_event_loop + end + end + + def load_and_run(agents) + agents.group_by { |agent| agent.twitter_oauth_token }.each do |oauth_token, agents| + filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m } + + agents.each do |agent| + agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter| + filter_to_agent_map[filter] << agent + end + end + + recent_tweets = [] + + stream!(filter_to_agent_map.keys, agents.first) do |status| + if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash) + puts "Skipping retweet: #{status["text"]}" + elsif recent_tweets.include?(status["id_str"]) + puts "Skipping duplicate tweet: #{status["text"]}" + else + recent_tweets << status["id_str"] + recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH + puts status["text"] + filter_to_agent_map.keys.each do |filter| + if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson + filter_to_agent_map[filter].each do |agent| + puts " -> #{agent.name}" + agent.process_tweet(filter, status) + end + end + end + end + end + end + end + + RELOAD_TIMEOUT = 10.minutes + DUPLICATE_DETECTION_LENGTH = 1000 + SEPARATOR = /[^\w_\-]+/ + + def run + while @running + begin + agents = Agents::TwitterStreamAgent.all + + EventMachine::run do + EventMachine.add_periodic_timer(RELOAD_TIMEOUT) { + puts "Reloading EventMachine and all Agents..." + EventMachine::stop_event_loop + } + + if agents.length == 0 + puts "No agents found. Will look again in a minute." + sleep 60 + EventMachine::stop_event_loop + else + puts "Found #{agents.length} agent(s). Loading them now..." + load_and_run agents + end + end + + print "Pausing..."; STDOUT.flush + sleep 1 + puts "done." + rescue SignalException, SystemExit + @running = false + EventMachine::stop_event_loop if EventMachine.reactor_running? + rescue StandardError => e + STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n" + STDERR.puts "Waiting for a couple of minutes..." + sleep 120 + end + end + end +end \ No newline at end of file From ec0ec300c5a81067763c3bba8919eee599339c0b Mon Sep 17 00:00:00 2001 From: Dominik Sander Date: Mon, 12 May 2014 23:16:41 +0200 Subject: [PATCH 02/23] Use threaded workers in the chef production Procfile --- .../site-cookbooks/huginn_production/files/default/Procfile | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/deployment/site-cookbooks/huginn_production/files/default/Procfile b/deployment/site-cookbooks/huginn_production/files/default/Procfile index 295bbe44..fcc42611 100644 --- a/deployment/site-cookbooks/huginn_production/files/default/Procfile +++ b/deployment/site-cookbooks/huginn_production/files/default/Procfile @@ -1,4 +1,2 @@ web: sudo bundle exec unicorn_rails -c config/unicorn.rb -E production -schedule: sudo RAILS_ENV=production bundle exec rails runner bin/schedule.rb -twitter: sudo RAILS_ENV=production bundle exec rails runner bin/twitter_stream.rb -dj: sudo RAILS_ENV=production bundle exec script/delayed_job run +jobs: sudo RAILS_ENV=production bundle exec rails runner bin/threaded.rb \ No newline at end of file From df9032272c526697cbd68a9acbcd8ba84de8cf8a Mon Sep 17 00:00:00 2001 From: Dominik Sander Date: Sun, 1 Jun 2014 12:15:50 +0200 Subject: [PATCH 03/23] Threaded background worker now is the default --- Procfile | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/Procfile b/Procfile index a6391525..aeb2ac4c 100644 --- a/Procfile +++ b/Procfile @@ -1,15 +1,13 @@ -# Procfile for development: +# Procfile for development using the new threaded worker (scheduler, twitter stream and delayed job) web: bundle exec rails server -schedule: bundle exec rails runner bin/schedule.rb -twitter: bundle exec rails runner bin/twitter_stream.rb -dj: bundle exec script/delayed_job run - -# Procfile for the exprimental threaded scheduler, twitter stream and delayed job -#web: bundle exec rails server -#jobs: bundle exec rails runner bin/threaded.rb +jobs: bundle exec rails runner bin/threaded.rb # Possible Profile configuration for production: # web: bundle exec unicorn -c config/unicorn/production.rb -# schedule: bundle exec rails runner bin/schedule.rb -# twitter: bundle exec rails runner bin/twitter_stream.rb -# dj: bundle exec script/delayed_job run +# jobs: bundle exec rails runner bin/threaded.rb + +# Old version with seperate processes (use this if you have issues with the threaded version) +#web: bundle exec rails server +#schedule: bundle exec rails runner bin/schedule.rb +#twitter: bundle exec rails runner bin/twitter_stream.rb +#dj: bundle exec script/delayed_job run \ No newline at end of file From bb407f50c1db658646acf0ebe552308fcc1ebd66 Mon Sep 17 00:00:00 2001 From: Glenn 'devalias' Grant Date: Mon, 2 Jun 2014 20:33:46 +1000 Subject: [PATCH 04/23] Solves #354 --- ...digest_email_agent.rb => email_digest_agent.rb} | 4 ++-- db/seeds.rb | 4 ++-- ...il_agent_spec.rb => email_digest_agent_spec.rb} | 14 +++++++------- 3 files changed, 11 insertions(+), 11 deletions(-) rename app/models/agents/{digest_email_agent.rb => email_digest_agent.rb} (94%) rename spec/models/agents/{digest_email_agent_spec.rb => email_digest_agent_spec.rb} (88%) diff --git a/app/models/agents/digest_email_agent.rb b/app/models/agents/email_digest_agent.rb similarity index 94% rename from app/models/agents/digest_email_agent.rb rename to app/models/agents/email_digest_agent.rb index 62204d68..fa86fab6 100644 --- a/app/models/agents/digest_email_agent.rb +++ b/app/models/agents/email_digest_agent.rb @@ -1,5 +1,5 @@ module Agents - class DigestEmailAgent < Agent + class EmailDigestAgent < Agent include EmailConcern default_schedule "5am" @@ -7,7 +7,7 @@ module Agents cannot_create_events! description <<-MD - The DigestEmailAgent collects any Events sent to it and sends them all via email when run. + The EmailDigestAgent collects any Events sent to it and sends them all via email when run. The email will be sent to your account's address and will have a `subject` and an optional `headline` before listing the Events. If the Events' payloads contain a `message`, that will be highlighted, otherwise everything in their payloads will be shown. diff --git a/db/seeds.rb b/db/seeds.rb index 185c080a..81fbd2af 100644 --- a/db/seeds.rb +++ b/db/seeds.rb @@ -69,7 +69,7 @@ unless user.agents.where(:name => "Rain Notifier").exists? end unless user.agents.where(:name => "Morning Digest").exists? - Agent.build_for_type("Agents::DigestEmailAgent", user, + Agent.build_for_type("Agents::EmailDigestAgent", user, :name => "Morning Digest", :schedule => "6am", :options => { 'subject' => "Your Morning Digest", 'expected_receive_period_in_days' => "30" }, @@ -77,7 +77,7 @@ unless user.agents.where(:name => "Morning Digest").exists? end unless user.agents.where(:name => "Afternoon Digest").exists? - Agent.build_for_type("Agents::DigestEmailAgent", user, + Agent.build_for_type("Agents::EmailDigestAgent", user, :name => "Afternoon Digest", :schedule => "5pm", :options => { 'subject' => "Your Afternoon Digest", 'expected_receive_period_in_days' => "7" }, diff --git a/spec/models/agents/digest_email_agent_spec.rb b/spec/models/agents/email_digest_agent_spec.rb similarity index 88% rename from spec/models/agents/digest_email_agent_spec.rb rename to spec/models/agents/email_digest_agent_spec.rb index 68e4b9b3..c2f0941f 100644 --- a/spec/models/agents/digest_email_agent_spec.rb +++ b/spec/models/agents/email_digest_agent_spec.rb @@ -1,12 +1,12 @@ require 'spec_helper' -describe Agents::DigestEmailAgent do +describe Agents::EmailDigestAgent do def get_message_part(mail, content_type) mail.body.parts.find { |p| p.content_type.match content_type }.body.raw_source end before do - @checker = Agents::DigestEmailAgent.new(:name => "something", :options => { :expected_receive_period_in_days => 2, :subject => "something interesting" }) + @checker = Agents::EmailDigestAgent.new(:name => "something", :options => { :expected_receive_period_in_days => 2, :subject => "something interesting" }) @checker.user = users(:bob) @checker.save! end @@ -27,14 +27,14 @@ describe Agents::DigestEmailAgent do event2.payload = { :data => "Something else you should know about" } event2.save! - Agents::DigestEmailAgent.async_receive(@checker.id, [event1.id, event2.id]) + Agents::EmailDigestAgent.async_receive(@checker.id, [event1.id, event2.id]) @checker.reload.memory[:queue].should == [{ 'data' => "Something you should know about" }, { 'data' => "Something else you should know about" }] end end describe "#check" do it "should send an email" do - Agents::DigestEmailAgent.async_check(@checker.id) + Agents::EmailDigestAgent.async_check(@checker.id) ActionMailer::Base.deliveries.should == [] @checker.memory[:queue] = [{ :data => "Something you should know about" }, @@ -44,7 +44,7 @@ describe Agents::DigestEmailAgent do @checker.memory[:events] = [1,2,3,4] @checker.save! - Agents::DigestEmailAgent.async_check(@checker.id) + Agents::EmailDigestAgent.async_check(@checker.id) ActionMailer::Base.deliveries.last.to.should == ["bob@example.com"] ActionMailer::Base.deliveries.last.subject.should == "something interesting" get_message_part(ActionMailer::Base.deliveries.last, /plain/).strip.should == "Event\n data: Something you should know about\n\nFoo\n bar: 2\n url: http://google.com\n\nhi\n woah: there\n\nEvent\n test: 2" @@ -61,7 +61,7 @@ describe Agents::DigestEmailAgent do Agent.receive! @checker.reload.memory[:queue].should_not be_empty - Agents::DigestEmailAgent.async_check(@checker.id) + Agents::EmailDigestAgent.async_check(@checker.id) plain_email_text = get_message_part(ActionMailer::Base.deliveries.last, /plain/).strip html_email_text = get_message_part(ActionMailer::Base.deliveries.last, /html/).strip @@ -72,4 +72,4 @@ describe Agents::DigestEmailAgent do @checker.reload.memory[:queue].should be_empty end end -end \ No newline at end of file +end From 4c267aba3fe79a9896495919e694ad4f4c379771 Mon Sep 17 00:00:00 2001 From: Daniel O'Connor Date: Tue, 3 Jun 2014 11:56:20 +0930 Subject: [PATCH 05/23] Add simple MQTT support to subscribe to, publish messages. --- Gemfile | 2 + Gemfile.lock | 2 + app/models/agents/mqtt_agent.rb | 112 ++++++++++++++++++++++++++++++++ 3 files changed, 116 insertions(+) create mode 100644 app/models/agents/mqtt_agent.rb diff --git a/Gemfile b/Gemfile index cf69d89d..ff6aa6d3 100644 --- a/Gemfile +++ b/Gemfile @@ -74,6 +74,8 @@ gem 'slack-notifier', '~> 0.5.0' gem 'therubyracer', '~> 0.12.1' +gem 'mqtt' + group :development do gem 'binding_of_caller' gem 'better_errors' diff --git a/Gemfile.lock b/Gemfile.lock index 4c5488db..b74c580a 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -160,6 +160,7 @@ GEM mime-types (1.25.1) mini_portile (0.5.3) minitest (5.3.3) + mqtt (0.2.0) multi_json (1.9.3) multi_xml (0.5.5) multipart-post (2.0.0) @@ -341,6 +342,7 @@ DEPENDENCIES kaminari (~> 0.15.1) kramdown (~> 1.3.3) liquid (~> 2.6.1) + mqtt mysql2 (~> 0.3.15) nokogiri (~> 1.6.1) protected_attributes (~> 1.0.7) diff --git a/app/models/agents/mqtt_agent.rb b/app/models/agents/mqtt_agent.rb new file mode 100644 index 00000000..97e42786 --- /dev/null +++ b/app/models/agents/mqtt_agent.rb @@ -0,0 +1,112 @@ +# encoding: utf-8 +require "mqtt" + +module Agents + class MqttAgent < Agent + description <<-MD + The MQTT agent allows both publication to an MQTT topic and subscription to an MQTT topic. + + Setup your own broker (http://jpmens.net/2013/09/01/installing-mosquitto-on-a-raspberry-pi/) or connect to a cloud service (www.cloudmqtt.com). + + MQTT is a generic transport protocol for machine to machine communication. + + You can do things like: + + * Publish to [RabbitMQ](http://www.rabbitmq.com/mqtt.html) + * Run [OwnTracks, a location tracking tool](http://owntracks.org/) for iOS and Android + * Subscribe to your home automation setup like [Ninjablocks](http://forums.ninjablocks.com/index.php?p=/discussion/661/today-i-learned-about-mqtt/p1) or [TheThingSystem](http://thethingsystem.com/dev/supported-things.html) + + Simply choose a topic (think email subject line) to publish/listen to, and configure your service. + + Hints: + Many services run mqtts (mqtt over SSL) often with a custom certificate. + + You'll want to download their cert and install it locally, specifying the ```certificate_path``` configuration. + + + Example configuration: + +
{
+      'uri' => 'mqtts://user:pass@locahost:8883'
+      'ssl' => :TLSv1,
+      'ca_file' => './ca.pem',
+      'cert_file' => './client.crt',
+      'key_file' => './client.key',
+      'topic' => 'huginn'
+    }
+    
+ + Subscribe to CloCkWeRX's TheThingSystem instance (thethingsystem.com), where + temperature and other events are being published. + +
{
+      'uri' => 'mqtt://kcqlmkgx:sVNoccqwvXxE@m10.cloudmqtt.com:13858'
+      'topic' => 'the_thing_system/demo'
+    }
+    
+ + Subscribe to all topics +
{
+      'uri' => 'mqtt://kcqlmkgx:sVNoccqwvXxE@m10.cloudmqtt.com:13858'
+      'topic' => '/#'
+    }
+    
+ + Find out more detail on [subscription wildcards](http://www.eclipse.org/paho/files/mqttdoc/Cclient/wildcard.html) + + MD + + def validate_options + # unless options['uid'].present? && + # options['expected_update_period_in_days'].present? + # errors.add(:base, "expected_update_period_in_days and uid are required") + # end + end + + def working? + !recent_error_logs? + end + + def default_options + { + 'uri' => 'mqtts://user:pass@locahost:8883', + 'ssl' => :TLSv1, + 'ca_file' => './ca.pem', + 'cert_file' => './client.crt', + 'key_file' => './client.key', + 'topic' => 'huginn' + } + end + + def mqtt_client + client = MQTT::Client.new(options['uri']) + + if options['ssl'] + client.ssl = options['ssl'].to_sym + client.ca_file = options['ca_file'] + client.cert_file = options['cert_file'] + client.key_file = options['key_file'] + end + + client + end + + def receive(incoming_events) + mqtt_client.connect do |c| + incoming_events.each do |event| + c.publish(options['topic'], payload) + end + end + end + + + def check + mqtt_client.connect do |c| + c.get(options['topic']) do |topic,message| + create_event :payload => { 'topic' => topic, 'message' => message, 'time' => Time.now.to_i } + end + end + end + + end +end \ No newline at end of file From 6824eb63d3ddaea3e499cec627f4078d3c48dc40 Mon Sep 17 00:00:00 2001 From: Daniel O'Connor Date: Tue, 3 Jun 2014 12:19:54 +0930 Subject: [PATCH 06/23] Tweak wording --- app/models/agents/mqtt_agent.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/app/models/agents/mqtt_agent.rb b/app/models/agents/mqtt_agent.rb index 97e42786..845d7f68 100644 --- a/app/models/agents/mqtt_agent.rb +++ b/app/models/agents/mqtt_agent.rb @@ -4,9 +4,7 @@ require "mqtt" module Agents class MqttAgent < Agent description <<-MD - The MQTT agent allows both publication to an MQTT topic and subscription to an MQTT topic. - - Setup your own broker (http://jpmens.net/2013/09/01/installing-mosquitto-on-a-raspberry-pi/) or connect to a cloud service (www.cloudmqtt.com). + The MQTT agent allows both publication and subscription to an MQTT topic. MQTT is a generic transport protocol for machine to machine communication. @@ -18,6 +16,8 @@ module Agents Simply choose a topic (think email subject line) to publish/listen to, and configure your service. + It's easy to setup your own [broker](http://jpmens.net/2013/09/01/installing-mosquitto-on-a-raspberry-pi/) or connect to a [cloud service](www.cloudmqtt.com) + Hints: Many services run mqtts (mqtt over SSL) often with a custom certificate. From 6a4779be9dcaec8a6c6e91932661dc237c2bb3f7 Mon Sep 17 00:00:00 2001 From: Daniel O'Connor Date: Tue, 3 Jun 2014 14:35:04 +0930 Subject: [PATCH 07/23] Assume the payload is going to be JSON --- app/models/agents/mqtt_agent.rb | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/app/models/agents/mqtt_agent.rb b/app/models/agents/mqtt_agent.rb index 845d7f68..22fd7287 100644 --- a/app/models/agents/mqtt_agent.rb +++ b/app/models/agents/mqtt_agent.rb @@ -1,5 +1,6 @@ # encoding: utf-8 require "mqtt" +require "json" module Agents class MqttAgent < Agent @@ -56,6 +57,27 @@ module Agents MD + event_description <<-MD + Events are simply nested MQTT payloads. For example, an MQTT payload for Owntracks + +
{
+        "topic": "owntracks/kcqlmkgx/Dan",
+        "message": {
+  "topic": "owntracks/kcqlmkgx/Galaxy S3 Dan",
+  "message": {
+    "_type": "location",
+    "lat": "-34.849373",
+    "lon": "138.5218449",
+    "tst": "1401761484",
+    "acc": "10.0",
+    "batt": "71"
+  },
+  "time": 1401771825
+},
+        "time": 1401771051
+      }
+ MD + def validate_options # unless options['uid'].present? && # options['expected_update_period_in_days'].present? @@ -103,7 +125,7 @@ module Agents def check mqtt_client.connect do |c| c.get(options['topic']) do |topic,message| - create_event :payload => { 'topic' => topic, 'message' => message, 'time' => Time.now.to_i } + create_event :payload => { 'topic' => topic, 'message' => JSON.parse(message), 'time' => Time.now.to_i } end end end From b642fa38dc87db1a14ae149cde1cb81e9d0769f8 Mon Sep 17 00:00:00 2001 From: Daniel O'Connor Date: Tue, 3 Jun 2014 14:49:46 +0930 Subject: [PATCH 08/23] Indent description --- app/models/agents/mqtt_agent.rb | 84 ++++++++++++++------------------- 1 file changed, 36 insertions(+), 48 deletions(-) diff --git a/app/models/agents/mqtt_agent.rb b/app/models/agents/mqtt_agent.rb index 22fd7287..7433def3 100644 --- a/app/models/agents/mqtt_agent.rb +++ b/app/models/agents/mqtt_agent.rb @@ -5,56 +5,55 @@ require "json" module Agents class MqttAgent < Agent description <<-MD - The MQTT agent allows both publication and subscription to an MQTT topic. + The MQTT agent allows both publication and subscription to an MQTT topic. - MQTT is a generic transport protocol for machine to machine communication. + MQTT is a generic transport protocol for machine to machine communication. - You can do things like: + You can do things like: - * Publish to [RabbitMQ](http://www.rabbitmq.com/mqtt.html) - * Run [OwnTracks, a location tracking tool](http://owntracks.org/) for iOS and Android - * Subscribe to your home automation setup like [Ninjablocks](http://forums.ninjablocks.com/index.php?p=/discussion/661/today-i-learned-about-mqtt/p1) or [TheThingSystem](http://thethingsystem.com/dev/supported-things.html) + * Publish to [RabbitMQ](http://www.rabbitmq.com/mqtt.html) + * Run [OwnTracks, a location tracking tool](http://owntracks.org/) for iOS and Android + * Subscribe to your home automation setup like [Ninjablocks](http://forums.ninjablocks.com/index.php?p=/discussion/661/today-i-learned-about-mqtt/p1) or [TheThingSystem](http://thethingsystem.com/dev/supported-things.html) - Simply choose a topic (think email subject line) to publish/listen to, and configure your service. + Simply choose a topic (think email subject line) to publish/listen to, and configure your service. - It's easy to setup your own [broker](http://jpmens.net/2013/09/01/installing-mosquitto-on-a-raspberry-pi/) or connect to a [cloud service](www.cloudmqtt.com) + It's easy to setup your own [broker](http://jpmens.net/2013/09/01/installing-mosquitto-on-a-raspberry-pi/) or connect to a [cloud service](www.cloudmqtt.com) - Hints: - Many services run mqtts (mqtt over SSL) often with a custom certificate. + Hints: + Many services run mqtts (mqtt over SSL) often with a custom certificate. - You'll want to download their cert and install it locally, specifying the ```certificate_path``` configuration. + You'll want to download their cert and install it locally, specifying the ```certificate_path``` configuration. - Example configuration: + Example configuration: -
{
-      'uri' => 'mqtts://user:pass@locahost:8883'
-      'ssl' => :TLSv1,
-      'ca_file' => './ca.pem',
-      'cert_file' => './client.crt',
-      'key_file' => './client.key',
-      'topic' => 'huginn'
-    }
-    
+
{
+        'uri' => 'mqtts://user:pass@locahost:8883'
+        'ssl' => :TLSv1,
+        'ca_file' => './ca.pem',
+        'cert_file' => './client.crt',
+        'key_file' => './client.key',
+        'topic' => 'huginn'
+      }
+      
- Subscribe to CloCkWeRX's TheThingSystem instance (thethingsystem.com), where - temperature and other events are being published. + Subscribe to CloCkWeRX's TheThingSystem instance (thethingsystem.com), where + temperature and other events are being published. -
{
-      'uri' => 'mqtt://kcqlmkgx:sVNoccqwvXxE@m10.cloudmqtt.com:13858'
-      'topic' => 'the_thing_system/demo'
-    }
-    
+
{
+        'uri' => 'mqtt://kcqlmkgx:sVNoccqwvXxE@m10.cloudmqtt.com:13858'
+        'topic' => 'the_thing_system/demo'
+      }
+      
- Subscribe to all topics -
{
-      'uri' => 'mqtt://kcqlmkgx:sVNoccqwvXxE@m10.cloudmqtt.com:13858'
-      'topic' => '/#'
-    }
-    
- - Find out more detail on [subscription wildcards](http://www.eclipse.org/paho/files/mqttdoc/Cclient/wildcard.html) + Subscribe to all topics +
{
+        'uri' => 'mqtt://kcqlmkgx:sVNoccqwvXxE@m10.cloudmqtt.com:13858'
+        'topic' => '/#'
+      }
+      
+ Find out more detail on [subscription wildcards](http://www.eclipse.org/paho/files/mqttdoc/Cclient/wildcard.html) MD event_description <<-MD @@ -62,18 +61,7 @@ module Agents
{
         "topic": "owntracks/kcqlmkgx/Dan",
-        "message": {
-  "topic": "owntracks/kcqlmkgx/Galaxy S3 Dan",
-  "message": {
-    "_type": "location",
-    "lat": "-34.849373",
-    "lon": "138.5218449",
-    "tst": "1401761484",
-    "acc": "10.0",
-    "batt": "71"
-  },
-  "time": 1401771825
-},
+        "message": {"_type": "location", "lat": "-34.8493644", "lon": "138.5218119", "tst": "1401771049", "acc": "50.0", "batt": "31", "desc": "Home", "event": "enter"},
         "time": 1401771051
       }
MD From 2ec6c618d88c38fcd1c764f190e9806468ed54d7 Mon Sep 17 00:00:00 2001 From: Daniel O'Connor Date: Tue, 3 Jun 2014 14:52:06 +0930 Subject: [PATCH 09/23] Add presence validation --- app/models/agents/mqtt_agent.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/app/models/agents/mqtt_agent.rb b/app/models/agents/mqtt_agent.rb index 7433def3..cd2e4729 100644 --- a/app/models/agents/mqtt_agent.rb +++ b/app/models/agents/mqtt_agent.rb @@ -67,10 +67,10 @@ module Agents MD def validate_options - # unless options['uid'].present? && - # options['expected_update_period_in_days'].present? - # errors.add(:base, "expected_update_period_in_days and uid are required") - # end + unless options['uri'].present? && + options['topic'].present? + errors.add(:base, "topic and uri are required") + end end def working? From 7209b2ad8c4c7ccce0926763a412ee5e9f9ed20c Mon Sep 17 00:00:00 2001 From: Glenn 'devalias' Grant Date: Tue, 3 Jun 2014 20:46:06 +1000 Subject: [PATCH 10/23] Add db migration --- ...211_rename_digest_email_to_email_digest.rb | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 db/migrate/20140603104211_rename_digest_email_to_email_digest.rb diff --git a/db/migrate/20140603104211_rename_digest_email_to_email_digest.rb b/db/migrate/20140603104211_rename_digest_email_to_email_digest.rb new file mode 100644 index 00000000..81ae575e --- /dev/null +++ b/db/migrate/20140603104211_rename_digest_email_to_email_digest.rb @@ -0,0 +1,21 @@ +class RenameDigestEmailToEmailDigest < ActiveRecord::Migration + def up + sql = <<-SQL + UPDATE #{ActiveRecord::Base.connection.quote_table_name('agents')} + SET #{ActiveRecord::Base.connection.quote_column_name('type')} = "EmailDigestAgent" + WHERE #{ActiveRecord::Base.connection.quote_column_name('type')} = "DigestEmailAgent" + SQL + + execute sql + end + + def down + sql = <<-SQL + UPDATE #{ActiveRecord::Base.connection.quote_table_name('agents')} + SET #{ActiveRecord::Base.connection.quote_column_name('type')} = "DigestEmailAgent" + WHERE #{ActiveRecord::Base.connection.quote_column_name('type')} = "EmailDigestAgent" + SQL + + execute sql + end +end From e42d54f8d13ff73c12cb3a9a84ca560435e7b3ca Mon Sep 17 00:00:00 2001 From: Glenn 'devalias' Grant Date: Tue, 3 Jun 2014 21:45:12 +1000 Subject: [PATCH 11/23] Made expected_update_period_in_days an option --- app/models/agents/weather_agent.rb | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/app/models/agents/weather_agent.rb b/app/models/agents/weather_agent.rb index 074182b8..d1716feb 100644 --- a/app/models/agents/weather_agent.rb +++ b/app/models/agents/weather_agent.rb @@ -19,6 +19,8 @@ module Agents You must setup an [API key for Wunderground](http://www.wunderground.com/weather/api/) in order to use this Agent with Wunderground. You must setup an [API key for Forecast](https://developer.forecast.io/) in order to use this Agent with ForecastIO. + + Set `expected_update_period_in_days` to the maximum amount of time that you'd expect to pass between Events being created by this Agent. MD event_description <<-MD @@ -49,7 +51,7 @@ module Agents default_schedule "8pm" def working? - event_created_within?(2) && !recent_error_logs? + event_created_within?(options['expected_update_period_in_days']) && !recent_error_logs? end def key_setup? @@ -61,7 +63,8 @@ module Agents 'service' => 'wunderground', 'api_key' => 'your-key', 'location' => '94103', - 'which_day' => '1' + 'which_day' => '1', + 'expected_update_period_in_days' => '2' } end @@ -83,6 +86,7 @@ module Agents errors.add(:base, "location is required") unless location.present? errors.add(:base, "api_key is required") unless key_setup? errors.add(:base, "which_day selection is required") unless which_day.present? + errors.add(:base, "expected_update_period_in_days is required") unless options['expected_update_period_in_days'].present? end def wunderground @@ -163,7 +167,7 @@ module Agents 'ozone' => value.ozone.to_s } return day - end + end end end end From e7f078ae93935d7d12f138005601adc14fc46e0f Mon Sep 17 00:00:00 2001 From: Glenn 'devalias' Grant Date: Tue, 3 Jun 2014 21:51:37 +1000 Subject: [PATCH 12/23] Default the update period if not set --- app/models/agents/weather_agent.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/models/agents/weather_agent.rb b/app/models/agents/weather_agent.rb index d1716feb..ef119a12 100644 --- a/app/models/agents/weather_agent.rb +++ b/app/models/agents/weather_agent.rb @@ -51,7 +51,7 @@ module Agents default_schedule "8pm" def working? - event_created_within?(options['expected_update_period_in_days']) && !recent_error_logs? + event_created_within?(options['expected_update_period_in_days'].presence || 2) && !recent_error_logs? end def key_setup? From f69617f6aea5a8918a9759b0f8e8b22272d4bf10 Mon Sep 17 00:00:00 2001 From: Glenn 'devalias' Grant Date: Tue, 3 Jun 2014 22:33:55 +1000 Subject: [PATCH 13/23] Remove requirement for expected_update_period_in_days (since we default it if it doesn't exist) --- app/models/agents/weather_agent.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/app/models/agents/weather_agent.rb b/app/models/agents/weather_agent.rb index ef119a12..7399e87a 100644 --- a/app/models/agents/weather_agent.rb +++ b/app/models/agents/weather_agent.rb @@ -86,7 +86,6 @@ module Agents errors.add(:base, "location is required") unless location.present? errors.add(:base, "api_key is required") unless key_setup? errors.add(:base, "which_day selection is required") unless which_day.present? - errors.add(:base, "expected_update_period_in_days is required") unless options['expected_update_period_in_days'].present? end def wunderground From 325d2eace694910dfd3f046705d7bc26fb8b77d6 Mon Sep 17 00:00:00 2001 From: Daniel O'Connor Date: Wed, 4 Jun 2014 16:45:32 +0930 Subject: [PATCH 14/23] Add fake MQTT server from ruby-mqtt gem --- spec/support/fake_mqtt_server.rb | 131 +++++++++++++++++++++++++++++++ 1 file changed, 131 insertions(+) create mode 100644 spec/support/fake_mqtt_server.rb diff --git a/spec/support/fake_mqtt_server.rb b/spec/support/fake_mqtt_server.rb new file mode 100644 index 00000000..c259c7ba --- /dev/null +++ b/spec/support/fake_mqtt_server.rb @@ -0,0 +1,131 @@ +#!/usr/bin/env ruby +# +# This is a 'fake' MQTT server to help with testing client implementations +# +# See https://github.com/njh/ruby-mqtt/blob/master/spec/fake_server.rb +# +# It behaves in the following ways: +# * Responses to CONNECT with a successful CONACK +# * Responses to PUBLISH by echoing the packet back +# * Responses to SUBSCRIBE with SUBACK and a PUBLISH to the topic +# * Responses to PINGREQ with PINGRESP +# * Responses to DISCONNECT by closing the socket +# +# It has the following restrictions +# * Doesn't deal with timeouts +# * Only handles a single connection at a time +# + +$:.unshift File.dirname(__FILE__)+'/../lib' + +require 'logger' +require 'socket' +require 'mqtt' + + +class MQTT::FakeServer + attr_reader :address, :port + attr_reader :last_publish + attr_reader :thread + attr_reader :pings_received + attr_accessor :just_one + attr_accessor :logger + + # Create a new fake MQTT server + # + # If no port is given, bind to a random port number + # If no bind address is given, bind to localhost + def initialize(port=nil, bind_address='127.0.0.1') + @port = port + @address = bind_address + end + + # Get the logger used by the server + def logger + @logger ||= Logger.new(STDOUT) + end + + # Start the thread and open the socket that will process client connections + def start + @socket ||= TCPServer.new(@address, @port) + @address = @socket.addr[3] + @port = @socket.addr[1] + @thread ||= Thread.new do + logger.info "Started a fake MQTT server on #{@address}:#{@port}" + loop do + # Wait for a client to connect + client = @socket.accept + @pings_received = 0 + handle_client(client) + break if just_one + end + end + end + + # Stop the thread and close the socket + def stop + logger.info "Stopping fake MQTT server" + @socket.close unless @socket.nil? + @socket = nil + + @thread.kill if @thread and @thread.alive? + @thread = nil + end + + # Start the server thread and wait for it to finish (possibly never) + def run + start + begin + @thread.join + rescue Interrupt + stop + end + end + + + protected + + # Given a client socket, process MQTT packets from the client + def handle_client(client) + loop do + packet = MQTT::Packet.read(client) + logger.debug packet.inspect + + case packet + when MQTT::Packet::Connect + client.write MQTT::Packet::Connack.new(:return_code => 0) + when MQTT::Packet::Publish + client.write packet + @last_publish = packet + when MQTT::Packet::Subscribe + client.write MQTT::Packet::Suback.new( + :message_id => packet.message_id, + :granted_qos => 0 + ) + topic = packet.topics[0][0] + client.write MQTT::Packet::Publish.new( + :topic => topic, + :payload => "hello #{topic}", + :retain => true + ) + when MQTT::Packet::Pingreq + client.write MQTT::Packet::Pingresp.new + @pings_received += 1 + when MQTT::Packet::Disconnect + client.close + break + end + end + + rescue MQTT::ProtocolException => e + logger.warn "Protocol error, closing connection: #{e}" + client.close + end + +end + +if __FILE__ == $0 + server = MQTT::FakeServer.new(MQTT::DEFAULT_PORT) + server.logger.level = Logger::DEBUG + server.run +end \ No newline at end of file From d841b3e4b591b146957e265ecbdfd2e10c9ad5a9 Mon Sep 17 00:00:00 2001 From: Daniel O'Connor Date: Wed, 4 Jun 2014 17:42:07 +0930 Subject: [PATCH 15/23] Introduce a max_read_time --- app/models/agents/mqtt_agent.rb | 42 +++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/app/models/agents/mqtt_agent.rb b/app/models/agents/mqtt_agent.rb index cd2e4729..82d9b3c9 100644 --- a/app/models/agents/mqtt_agent.rb +++ b/app/models/agents/mqtt_agent.rb @@ -41,14 +41,14 @@ module Agents temperature and other events are being published.
{
-        'uri' => 'mqtt://kcqlmkgx:sVNoccqwvXxE@m10.cloudmqtt.com:13858'
+        'uri' => 'mqtt://kcqlmkgx:sVNoccqwvXxE@m10.cloudmqtt.com:13858',
         'topic' => 'the_thing_system/demo'
       }
       
Subscribe to all topics
{
-        'uri' => 'mqtt://kcqlmkgx:sVNoccqwvXxE@m10.cloudmqtt.com:13858'
+        'uri' => 'mqtt://kcqlmkgx:sVNoccqwvXxE@m10.cloudmqtt.com:13858',
         'topic' => '/#'
       }
       
@@ -81,24 +81,25 @@ module Agents { 'uri' => 'mqtts://user:pass@locahost:8883', 'ssl' => :TLSv1, - 'ca_file' => './ca.pem', + 'ca_file' => './ca.pem', 'cert_file' => './client.crt', 'key_file' => './client.key', - 'topic' => 'huginn' + 'topic' => 'huginn', + 'max_read_time' => '10' } end def mqtt_client - client = MQTT::Client.new(options['uri']) + @client ||= MQTT::Client.new(options['uri']) if options['ssl'] - client.ssl = options['ssl'].to_sym - client.ca_file = options['ca_file'] - client.cert_file = options['cert_file'] - client.key_file = options['key_file'] + @client.ssl = options['ssl'].to_sym + @client.ca_file = options['ca_file'] + @client.cert_file = options['cert_file'] + @client.key_file = options['key_file'] end - client + @client end def receive(incoming_events) @@ -106,15 +107,30 @@ module Agents incoming_events.each do |event| c.publish(options['topic'], payload) end + + c.disconnect end end def check mqtt_client.connect do |c| - c.get(options['topic']) do |topic,message| - create_event :payload => { 'topic' => topic, 'message' => JSON.parse(message), 'time' => Time.now.to_i } - end + + Timeout::timeout(options['max_read_time']) { + c.get(options['topic']) do |topic, message| + + # A lot of services generate JSON. Try that first + payload = JSON.parse(message) rescue message + + create_event :payload => { + 'topic' => topic, + 'message' => payload, + 'time' => Time.now.to_i + } + end + } rescue TimeoutError + + c.disconnect end end From b8604777b621fd0f4414c85f616a7d2401af5a42 Mon Sep 17 00:00:00 2001 From: Daniel O'Connor Date: Wed, 4 Jun 2014 17:42:48 +0930 Subject: [PATCH 16/23] Add initial specs --- spec/models/agents/mqtt_agent_spec.rb | 53 +++++++++++++++++++++++++++ spec/support/fake_mqtt_server.rb | 6 +++ 2 files changed, 59 insertions(+) create mode 100644 spec/models/agents/mqtt_agent_spec.rb diff --git a/spec/models/agents/mqtt_agent_spec.rb b/spec/models/agents/mqtt_agent_spec.rb new file mode 100644 index 00000000..8933a1dc --- /dev/null +++ b/spec/models/agents/mqtt_agent_spec.rb @@ -0,0 +1,53 @@ +require 'spec_helper' +require 'mqtt' +require './spec/support/fake_mqtt_server' + +require 'pry' + +describe Agents::MqttAgent do + before :each do + # stub_request(:get, /parse/).to_return(:body => File.read(Rails.root.join("spec/data_fixtures/adioso_parse.json")), :status => 200, :headers => {"Content-Type" => "text/json"}) + # stub_request(:get, /fares/).to_return(:body => File.read(Rails.root.join("spec/data_fixtures/adioso_fare.json")), :status => 200, :headers => {"Content-Type" => "text/json"}) + @error_log = StringIO.new + @server = MQTT::FakeServer.new(1234, '127.0.0.1') + @server.just_one = true + @server.logger = Logger.new(@error_log) + @server.logger.level = Logger::DEBUG + @server.start + + @valid_params = { + 'uri' => "mqtt://#{@server.address}:#{@server.port}", + 'topic' => '/#', + 'max_read_time' => 1 + } + + @checker = Agents::MqttAgent.new( + :name => "somename", + :options => @valid_params, + :schedule => "midnight", + ) + @checker.user = users(:jane) + @checker.save! + end + + after :each do + @server.stop + end + + describe "#check" do + it "should check that initial run creates an event" do + expect { @checker.check }.to change { Event.count }.by(2) + end + end + + describe "#working?" do + it "checks if its generating events as scheduled" do + @checker.should_not be_working + @checker.check + @checker.reload.should be_working + three_days_from_now = 3.days.from_now + stub(Time).now { three_days_from_now } + @checker.should_not be_working + end + end +end diff --git a/spec/support/fake_mqtt_server.rb b/spec/support/fake_mqtt_server.rb index c259c7ba..39f8db74 100644 --- a/spec/support/fake_mqtt_server.rb +++ b/spec/support/fake_mqtt_server.rb @@ -108,6 +108,12 @@ class MQTT::FakeServer :payload => "hello #{topic}", :retain => true ) + client.write MQTT::Packet::Publish.new( + :topic => topic, + :payload => "did you know about #{topic}", + :retain => true + ) + when MQTT::Packet::Pingreq client.write MQTT::Packet::Pingresp.new @pings_received += 1 From efcd63ea96919f537c6221f2a16137eefc6d20c1 Mon Sep 17 00:00:00 2001 From: Daniel O'Connor Date: Wed, 4 Jun 2014 17:45:14 +0930 Subject: [PATCH 17/23] Make specs pass --- app/models/agents/mqtt_agent.rb | 2 +- spec/models/agents/mqtt_agent_spec.rb | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/app/models/agents/mqtt_agent.rb b/app/models/agents/mqtt_agent.rb index 82d9b3c9..fa8f8f02 100644 --- a/app/models/agents/mqtt_agent.rb +++ b/app/models/agents/mqtt_agent.rb @@ -74,7 +74,7 @@ module Agents end def working? - !recent_error_logs? + event_created_within?(options['expected_update_period_in_days']) && !recent_error_logs? end def default_options diff --git a/spec/models/agents/mqtt_agent_spec.rb b/spec/models/agents/mqtt_agent_spec.rb index 8933a1dc..7100212e 100644 --- a/spec/models/agents/mqtt_agent_spec.rb +++ b/spec/models/agents/mqtt_agent_spec.rb @@ -18,7 +18,8 @@ describe Agents::MqttAgent do @valid_params = { 'uri' => "mqtt://#{@server.address}:#{@server.port}", 'topic' => '/#', - 'max_read_time' => 1 + 'max_read_time' => 1, + 'expected_update_period_in_days' => "2" } @checker = Agents::MqttAgent.new( From 057496b9c22605421a4dd3141d733770b33b373a Mon Sep 17 00:00:00 2001 From: Daniel O'Connor Date: Thu, 5 Jun 2014 12:35:36 +0930 Subject: [PATCH 18/23] Mixed whitespace :( --- spec/models/agents/mqtt_agent_spec.rb | 42 +++++++++++++-------------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/spec/models/agents/mqtt_agent_spec.rb b/spec/models/agents/mqtt_agent_spec.rb index 7100212e..65298d5f 100644 --- a/spec/models/agents/mqtt_agent_spec.rb +++ b/spec/models/agents/mqtt_agent_spec.rb @@ -2,10 +2,8 @@ require 'spec_helper' require 'mqtt' require './spec/support/fake_mqtt_server' -require 'pry' - describe Agents::MqttAgent do - before :each do + before :each do # stub_request(:get, /parse/).to_return(:body => File.read(Rails.root.join("spec/data_fixtures/adioso_parse.json")), :status => 200, :headers => {"Content-Type" => "text/json"}) # stub_request(:get, /fares/).to_return(:body => File.read(Rails.root.join("spec/data_fixtures/adioso_fare.json")), :status => 200, :headers => {"Content-Type" => "text/json"}) @error_log = StringIO.new @@ -15,40 +13,40 @@ describe Agents::MqttAgent do @server.logger.level = Logger::DEBUG @server.start - @valid_params = { + @valid_params = { 'uri' => "mqtt://#{@server.address}:#{@server.port}", 'topic' => '/#', 'max_read_time' => 1, 'expected_update_period_in_days' => "2" } - @checker = Agents::MqttAgent.new( + @checker = Agents::MqttAgent.new( :name => "somename", :options => @valid_params, :schedule => "midnight", ) - @checker.user = users(:jane) - @checker.save! - end + @checker.user = users(:jane) + @checker.save! + end after :each do @server.stop end - describe "#check" do - it "should check that initial run creates an event" do + describe "#check" do + it "should check that initial run creates an event" do expect { @checker.check }.to change { Event.count }.by(2) - end - end + end + end - describe "#working?" do - it "checks if its generating events as scheduled" do - @checker.should_not be_working - @checker.check - @checker.reload.should be_working - three_days_from_now = 3.days.from_now - stub(Time).now { three_days_from_now } - @checker.should_not be_working - end - end + describe "#working?" do + it "checks if its generating events as scheduled" do + @checker.should_not be_working + @checker.check + @checker.reload.should be_working + three_days_from_now = 3.days.from_now + stub(Time).now { three_days_from_now } + @checker.should_not be_working + end + end end From d7eeec1b101fa738e892e2074b2d0c70e81e7b1e Mon Sep 17 00:00:00 2001 From: Daniel O'Connor Date: Thu, 5 Jun 2014 12:38:14 +0930 Subject: [PATCH 19/23] Update to cast to int --- app/models/agents/mqtt_agent.rb | 2 +- spec/models/agents/mqtt_agent_spec.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/app/models/agents/mqtt_agent.rb b/app/models/agents/mqtt_agent.rb index fa8f8f02..fd86be34 100644 --- a/app/models/agents/mqtt_agent.rb +++ b/app/models/agents/mqtt_agent.rb @@ -116,7 +116,7 @@ module Agents def check mqtt_client.connect do |c| - Timeout::timeout(options['max_read_time']) { + Timeout::timeout((options['max_read_time'].presence || 15).to_i) { c.get(options['topic']) do |topic, message| # A lot of services generate JSON. Try that first diff --git a/spec/models/agents/mqtt_agent_spec.rb b/spec/models/agents/mqtt_agent_spec.rb index 65298d5f..ebabc1e6 100644 --- a/spec/models/agents/mqtt_agent_spec.rb +++ b/spec/models/agents/mqtt_agent_spec.rb @@ -16,7 +16,7 @@ describe Agents::MqttAgent do @valid_params = { 'uri' => "mqtt://#{@server.address}:#{@server.port}", 'topic' => '/#', - 'max_read_time' => 1, + 'max_read_time' => '1', 'expected_update_period_in_days' => "2" } From 116f8b3fdef6e177af56a22239df8db8a7022fc2 Mon Sep 17 00:00:00 2001 From: Daniel O'Connor Date: Thu, 5 Jun 2014 13:16:42 +0930 Subject: [PATCH 20/23] Dead code --- spec/models/agents/mqtt_agent_spec.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/models/agents/mqtt_agent_spec.rb b/spec/models/agents/mqtt_agent_spec.rb index ebabc1e6..98a500a9 100644 --- a/spec/models/agents/mqtt_agent_spec.rb +++ b/spec/models/agents/mqtt_agent_spec.rb @@ -3,10 +3,10 @@ require 'mqtt' require './spec/support/fake_mqtt_server' describe Agents::MqttAgent do + before :each do - # stub_request(:get, /parse/).to_return(:body => File.read(Rails.root.join("spec/data_fixtures/adioso_parse.json")), :status => 200, :headers => {"Content-Type" => "text/json"}) - # stub_request(:get, /fares/).to_return(:body => File.read(Rails.root.join("spec/data_fixtures/adioso_fare.json")), :status => 200, :headers => {"Content-Type" => "text/json"}) @error_log = StringIO.new + @server = MQTT::FakeServer.new(1234, '127.0.0.1') @server.just_one = true @server.logger = Logger.new(@error_log) From 8b089a64d02a55498077d36e033afe2b8d5f4339 Mon Sep 17 00:00:00 2001 From: Daniel O'Connor Date: Thu, 5 Jun 2014 13:17:41 +0930 Subject: [PATCH 21/23] Pick a higher port --- spec/models/agents/mqtt_agent_spec.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/models/agents/mqtt_agent_spec.rb b/spec/models/agents/mqtt_agent_spec.rb index 98a500a9..f2049d60 100644 --- a/spec/models/agents/mqtt_agent_spec.rb +++ b/spec/models/agents/mqtt_agent_spec.rb @@ -6,8 +6,8 @@ describe Agents::MqttAgent do before :each do @error_log = StringIO.new - - @server = MQTT::FakeServer.new(1234, '127.0.0.1') + + @server = MQTT::FakeServer.new(41234, '127.0.0.1') @server.just_one = true @server.logger = Logger.new(@error_log) @server.logger.level = Logger::DEBUG From ca54ef68ee06aa04092c07da03797209728b4a1d Mon Sep 17 00:00:00 2001 From: Glenn 'devalias' Grant Date: Thu, 5 Jun 2014 15:21:21 +1000 Subject: [PATCH 22/23] Add to_i --- app/models/agents/weather_agent.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/models/agents/weather_agent.rb b/app/models/agents/weather_agent.rb index 7399e87a..3f286ed7 100644 --- a/app/models/agents/weather_agent.rb +++ b/app/models/agents/weather_agent.rb @@ -51,7 +51,7 @@ module Agents default_schedule "8pm" def working? - event_created_within?(options['expected_update_period_in_days'].presence || 2) && !recent_error_logs? + event_created_within?((options['expected_update_period_in_days'].presence || 2).to_i) && !recent_error_logs? end def key_setup? From b2fbf70723b6ab00744d59f3dd1e1d552d208e99 Mon Sep 17 00:00:00 2001 From: Glenn 'devalias' Grant Date: Thu, 5 Jun 2014 15:26:32 +1000 Subject: [PATCH 23/23] Fix migration to actually do the right thing --- .../20140603104211_rename_digest_email_to_email_digest.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/db/migrate/20140603104211_rename_digest_email_to_email_digest.rb b/db/migrate/20140603104211_rename_digest_email_to_email_digest.rb index 81ae575e..cc20d6d6 100644 --- a/db/migrate/20140603104211_rename_digest_email_to_email_digest.rb +++ b/db/migrate/20140603104211_rename_digest_email_to_email_digest.rb @@ -2,8 +2,8 @@ class RenameDigestEmailToEmailDigest < ActiveRecord::Migration def up sql = <<-SQL UPDATE #{ActiveRecord::Base.connection.quote_table_name('agents')} - SET #{ActiveRecord::Base.connection.quote_column_name('type')} = "EmailDigestAgent" - WHERE #{ActiveRecord::Base.connection.quote_column_name('type')} = "DigestEmailAgent" + SET #{ActiveRecord::Base.connection.quote_column_name('type')} = "Agents::EmailDigestAgent" + WHERE #{ActiveRecord::Base.connection.quote_column_name('type')} = "Agents::DigestEmailAgent" SQL execute sql @@ -12,8 +12,8 @@ class RenameDigestEmailToEmailDigest < ActiveRecord::Migration def down sql = <<-SQL UPDATE #{ActiveRecord::Base.connection.quote_table_name('agents')} - SET #{ActiveRecord::Base.connection.quote_column_name('type')} = "DigestEmailAgent" - WHERE #{ActiveRecord::Base.connection.quote_column_name('type')} = "EmailDigestAgent" + SET #{ActiveRecord::Base.connection.quote_column_name('type')} = "Agents::DigestEmailAgent" + WHERE #{ActiveRecord::Base.connection.quote_column_name('type')} = "Agents::EmailDigestAgent" SQL execute sql