mirror of
https://github.com/Fishwaldo/huginn.git
synced 2025-03-15 19:31:26 +00:00
Merge remote-tracking branch 'origin/master' into scenarios
This commit is contained in:
commit
f3ffb33a7d
18 changed files with 650 additions and 218 deletions
2
Gemfile
2
Gemfile
|
@ -74,6 +74,8 @@ gem 'slack-notifier', '~> 0.5.0'
|
|||
|
||||
gem 'therubyracer', '~> 0.12.1'
|
||||
|
||||
gem 'mqtt'
|
||||
|
||||
group :development do
|
||||
gem 'binding_of_caller'
|
||||
gem 'better_errors'
|
||||
|
|
|
@ -160,6 +160,7 @@ GEM
|
|||
mime-types (1.25.1)
|
||||
mini_portile (0.5.3)
|
||||
minitest (5.3.3)
|
||||
mqtt (0.2.0)
|
||||
multi_json (1.9.3)
|
||||
multi_xml (0.5.5)
|
||||
multipart-post (2.0.0)
|
||||
|
@ -341,6 +342,7 @@ DEPENDENCIES
|
|||
kaminari (~> 0.15.1)
|
||||
kramdown (~> 1.3.3)
|
||||
liquid (~> 2.6.1)
|
||||
mqtt
|
||||
mysql2 (~> 0.3.15)
|
||||
nokogiri (~> 1.6.1)
|
||||
protected_attributes (~> 1.0.7)
|
||||
|
|
16
Procfile
16
Procfile
|
@ -1,11 +1,13 @@
|
|||
# Procfile for development:
|
||||
# Procfile for development using the new threaded worker (scheduler, twitter stream and delayed job)
|
||||
web: bundle exec rails server
|
||||
schedule: bundle exec rails runner bin/schedule.rb
|
||||
twitter: bundle exec rails runner bin/twitter_stream.rb
|
||||
dj: bundle exec script/delayed_job run
|
||||
jobs: bundle exec rails runner bin/threaded.rb
|
||||
|
||||
# Possible Profile configuration for production:
|
||||
# web: bundle exec unicorn -c config/unicorn/production.rb
|
||||
# schedule: bundle exec rails runner bin/schedule.rb
|
||||
# twitter: bundle exec rails runner bin/twitter_stream.rb
|
||||
# dj: bundle exec script/delayed_job run
|
||||
# jobs: bundle exec rails runner bin/threaded.rb
|
||||
|
||||
# Old version with seperate processes (use this if you have issues with the threaded version)
|
||||
#web: bundle exec rails server
|
||||
#schedule: bundle exec rails runner bin/schedule.rb
|
||||
#twitter: bundle exec rails runner bin/twitter_stream.rb
|
||||
#dj: bundle exec script/delayed_job run
|
|
@ -1,5 +1,5 @@
|
|||
module Agents
|
||||
class DigestEmailAgent < Agent
|
||||
class EmailDigestAgent < Agent
|
||||
include EmailConcern
|
||||
|
||||
default_schedule "5am"
|
||||
|
@ -7,7 +7,7 @@ module Agents
|
|||
cannot_create_events!
|
||||
|
||||
description <<-MD
|
||||
The DigestEmailAgent collects any Events sent to it and sends them all via email when run.
|
||||
The EmailDigestAgent collects any Events sent to it and sends them all via email when run.
|
||||
The email will be sent to your account's address and will have a `subject` and an optional `headline` before
|
||||
listing the Events. If the Events' payloads contain a `message`, that will be highlighted, otherwise everything in
|
||||
their payloads will be shown.
|
138
app/models/agents/mqtt_agent.rb
Normal file
138
app/models/agents/mqtt_agent.rb
Normal file
|
@ -0,0 +1,138 @@
|
|||
# encoding: utf-8
|
||||
require "mqtt"
|
||||
require "json"
|
||||
|
||||
module Agents
|
||||
class MqttAgent < Agent
|
||||
description <<-MD
|
||||
The MQTT agent allows both publication and subscription to an MQTT topic.
|
||||
|
||||
MQTT is a generic transport protocol for machine to machine communication.
|
||||
|
||||
You can do things like:
|
||||
|
||||
* Publish to [RabbitMQ](http://www.rabbitmq.com/mqtt.html)
|
||||
* Run [OwnTracks, a location tracking tool](http://owntracks.org/) for iOS and Android
|
||||
* Subscribe to your home automation setup like [Ninjablocks](http://forums.ninjablocks.com/index.php?p=/discussion/661/today-i-learned-about-mqtt/p1) or [TheThingSystem](http://thethingsystem.com/dev/supported-things.html)
|
||||
|
||||
Simply choose a topic (think email subject line) to publish/listen to, and configure your service.
|
||||
|
||||
It's easy to setup your own [broker](http://jpmens.net/2013/09/01/installing-mosquitto-on-a-raspberry-pi/) or connect to a [cloud service](www.cloudmqtt.com)
|
||||
|
||||
Hints:
|
||||
Many services run mqtts (mqtt over SSL) often with a custom certificate.
|
||||
|
||||
You'll want to download their cert and install it locally, specifying the ```certificate_path``` configuration.
|
||||
|
||||
|
||||
Example configuration:
|
||||
|
||||
<pre><code>{
|
||||
'uri' => 'mqtts://user:pass@locahost:8883'
|
||||
'ssl' => :TLSv1,
|
||||
'ca_file' => './ca.pem',
|
||||
'cert_file' => './client.crt',
|
||||
'key_file' => './client.key',
|
||||
'topic' => 'huginn'
|
||||
}
|
||||
</code></pre>
|
||||
|
||||
Subscribe to CloCkWeRX's TheThingSystem instance (thethingsystem.com), where
|
||||
temperature and other events are being published.
|
||||
|
||||
<pre><code>{
|
||||
'uri' => 'mqtt://kcqlmkgx:sVNoccqwvXxE@m10.cloudmqtt.com:13858',
|
||||
'topic' => 'the_thing_system/demo'
|
||||
}
|
||||
</code></pre>
|
||||
|
||||
Subscribe to all topics
|
||||
<pre><code>{
|
||||
'uri' => 'mqtt://kcqlmkgx:sVNoccqwvXxE@m10.cloudmqtt.com:13858',
|
||||
'topic' => '/#'
|
||||
}
|
||||
</code></pre>
|
||||
|
||||
Find out more detail on [subscription wildcards](http://www.eclipse.org/paho/files/mqttdoc/Cclient/wildcard.html)
|
||||
MD
|
||||
|
||||
event_description <<-MD
|
||||
Events are simply nested MQTT payloads. For example, an MQTT payload for Owntracks
|
||||
|
||||
<pre><code>{
|
||||
"topic": "owntracks/kcqlmkgx/Dan",
|
||||
"message": {"_type": "location", "lat": "-34.8493644", "lon": "138.5218119", "tst": "1401771049", "acc": "50.0", "batt": "31", "desc": "Home", "event": "enter"},
|
||||
"time": 1401771051
|
||||
}</code></pre>
|
||||
MD
|
||||
|
||||
def validate_options
|
||||
unless options['uri'].present? &&
|
||||
options['topic'].present?
|
||||
errors.add(:base, "topic and uri are required")
|
||||
end
|
||||
end
|
||||
|
||||
def working?
|
||||
event_created_within?(options['expected_update_period_in_days']) && !recent_error_logs?
|
||||
end
|
||||
|
||||
def default_options
|
||||
{
|
||||
'uri' => 'mqtts://user:pass@locahost:8883',
|
||||
'ssl' => :TLSv1,
|
||||
'ca_file' => './ca.pem',
|
||||
'cert_file' => './client.crt',
|
||||
'key_file' => './client.key',
|
||||
'topic' => 'huginn',
|
||||
'max_read_time' => '10'
|
||||
}
|
||||
end
|
||||
|
||||
def mqtt_client
|
||||
@client ||= MQTT::Client.new(options['uri'])
|
||||
|
||||
if options['ssl']
|
||||
@client.ssl = options['ssl'].to_sym
|
||||
@client.ca_file = options['ca_file']
|
||||
@client.cert_file = options['cert_file']
|
||||
@client.key_file = options['key_file']
|
||||
end
|
||||
|
||||
@client
|
||||
end
|
||||
|
||||
def receive(incoming_events)
|
||||
mqtt_client.connect do |c|
|
||||
incoming_events.each do |event|
|
||||
c.publish(options['topic'], payload)
|
||||
end
|
||||
|
||||
c.disconnect
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
def check
|
||||
mqtt_client.connect do |c|
|
||||
|
||||
Timeout::timeout((options['max_read_time'].presence || 15).to_i) {
|
||||
c.get(options['topic']) do |topic, message|
|
||||
|
||||
# A lot of services generate JSON. Try that first
|
||||
payload = JSON.parse(message) rescue message
|
||||
|
||||
create_event :payload => {
|
||||
'topic' => topic,
|
||||
'message' => payload,
|
||||
'time' => Time.now.to_i
|
||||
}
|
||||
end
|
||||
} rescue TimeoutError
|
||||
|
||||
c.disconnect
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
end
|
|
@ -19,6 +19,8 @@ module Agents
|
|||
You must setup an [API key for Wunderground](http://www.wunderground.com/weather/api/) in order to use this Agent with Wunderground.
|
||||
|
||||
You must setup an [API key for Forecast](https://developer.forecast.io/) in order to use this Agent with ForecastIO.
|
||||
|
||||
Set `expected_update_period_in_days` to the maximum amount of time that you'd expect to pass between Events being created by this Agent.
|
||||
MD
|
||||
|
||||
event_description <<-MD
|
||||
|
@ -49,7 +51,7 @@ module Agents
|
|||
default_schedule "8pm"
|
||||
|
||||
def working?
|
||||
event_created_within?(2) && !recent_error_logs?
|
||||
event_created_within?((options['expected_update_period_in_days'].presence || 2).to_i) && !recent_error_logs?
|
||||
end
|
||||
|
||||
def key_setup?
|
||||
|
@ -61,7 +63,8 @@ module Agents
|
|||
'service' => 'wunderground',
|
||||
'api_key' => 'your-key',
|
||||
'location' => '94103',
|
||||
'which_day' => '1'
|
||||
'which_day' => '1',
|
||||
'expected_update_period_in_days' => '2'
|
||||
}
|
||||
end
|
||||
|
||||
|
@ -163,7 +166,7 @@ module Agents
|
|||
'ozone' => value.ozone.to_s
|
||||
}
|
||||
return day
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -11,87 +11,5 @@ unless defined?(Rails)
|
|||
exit 1
|
||||
end
|
||||
|
||||
require 'rufus/scheduler'
|
||||
|
||||
class HuginnScheduler
|
||||
attr_accessor :mutex
|
||||
|
||||
def run_schedule(time)
|
||||
with_mutex do
|
||||
puts "Queuing schedule for #{time}"
|
||||
Agent.delay.run_schedule(time)
|
||||
end
|
||||
end
|
||||
|
||||
def propagate!
|
||||
with_mutex do
|
||||
puts "Queuing event propagation"
|
||||
Agent.delay.receive!
|
||||
end
|
||||
end
|
||||
|
||||
def cleanup_expired_events!
|
||||
with_mutex do
|
||||
puts "Running event cleanup"
|
||||
Event.delay.cleanup_expired!
|
||||
end
|
||||
end
|
||||
|
||||
def with_mutex
|
||||
ActiveRecord::Base.connection_pool.with_connection do
|
||||
mutex.synchronize do
|
||||
yield
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def run!
|
||||
self.mutex = Mutex.new
|
||||
|
||||
rufus_scheduler = Rufus::Scheduler.new
|
||||
|
||||
tzinfo_friendly_timezone = ActiveSupport::TimeZone::MAPPING[ENV['TIMEZONE'].present? ? ENV['TIMEZONE'] : "Pacific Time (US & Canada)"]
|
||||
|
||||
# Schedule event propagation.
|
||||
|
||||
rufus_scheduler.every '1m' do
|
||||
propagate!
|
||||
end
|
||||
|
||||
# Schedule event cleanup.
|
||||
|
||||
rufus_scheduler.cron "0 0 * * * " + tzinfo_friendly_timezone do
|
||||
cleanup_expired_events!
|
||||
end
|
||||
|
||||
# Schedule repeating events.
|
||||
|
||||
%w[1m 2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule|
|
||||
rufus_scheduler.every schedule do
|
||||
run_schedule "every_#{schedule}"
|
||||
end
|
||||
end
|
||||
|
||||
# Schedule events for specific times.
|
||||
|
||||
# Times are assumed to be in PST for now. Can store a user#timezone later.
|
||||
24.times do |hour|
|
||||
rufus_scheduler.cron "0 #{hour} * * * " + tzinfo_friendly_timezone do
|
||||
if hour == 0
|
||||
run_schedule "midnight"
|
||||
elsif hour < 12
|
||||
run_schedule "#{hour}am"
|
||||
elsif hour == 12
|
||||
run_schedule "noon"
|
||||
else
|
||||
run_schedule "#{hour - 12}pm"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
rufus_scheduler.join
|
||||
end
|
||||
end
|
||||
|
||||
scheduler = HuginnScheduler.new
|
||||
scheduler.run!
|
57
bin/threaded.rb
Normal file
57
bin/threaded.rb
Normal file
|
@ -0,0 +1,57 @@
|
|||
require 'thread'
|
||||
|
||||
def stop
|
||||
puts 'Exiting...'
|
||||
@scheduler.stop
|
||||
@dj.stop
|
||||
@stream.stop
|
||||
end
|
||||
|
||||
def safely(&block)
|
||||
begin
|
||||
yield block
|
||||
rescue StandardError => e
|
||||
STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n"
|
||||
STDERR.puts "Terminating myself ..."
|
||||
stop
|
||||
end
|
||||
end
|
||||
|
||||
threads = []
|
||||
threads << Thread.new do
|
||||
safely do
|
||||
@stream = TwitterStream.new
|
||||
@stream.run
|
||||
puts "Twitter stream stopped ..."
|
||||
end
|
||||
end
|
||||
|
||||
threads << Thread.new do
|
||||
safely do
|
||||
@scheduler = HuginnScheduler.new
|
||||
@scheduler.run!
|
||||
puts "Scheduler stopped ..."
|
||||
end
|
||||
end
|
||||
|
||||
threads << Thread.new do
|
||||
safely do
|
||||
require 'delayed/command'
|
||||
@dj = Delayed::Worker.new
|
||||
@dj.start
|
||||
puts "Delayed job stopped ..."
|
||||
end
|
||||
end
|
||||
|
||||
# We need to wait a bit to let delayed_job set it's traps so we can override them
|
||||
sleep 0.5
|
||||
|
||||
trap('TERM') do
|
||||
stop
|
||||
end
|
||||
|
||||
trap('INT') do
|
||||
stop
|
||||
end
|
||||
|
||||
threads.collect { |t| t.join }
|
|
@ -12,115 +12,4 @@ unless defined?(Rails)
|
|||
exit 1
|
||||
end
|
||||
|
||||
require 'cgi'
|
||||
require 'json'
|
||||
require 'twitter/json_stream'
|
||||
require 'em-http-request'
|
||||
require 'pp'
|
||||
|
||||
def stream!(filters, agent, &block)
|
||||
stream = Twitter::JSONStream.connect(
|
||||
:path => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}",
|
||||
:ssl => true,
|
||||
:oauth => {
|
||||
:consumer_key => agent.twitter_consumer_key,
|
||||
:consumer_secret => agent.twitter_consumer_secret,
|
||||
:access_key => agent.twitter_oauth_token,
|
||||
:access_secret => agent.twitter_oauth_token_secret
|
||||
}
|
||||
)
|
||||
|
||||
stream.each_item do |status|
|
||||
status = JSON.parse(status) if status.is_a?(String)
|
||||
next unless status
|
||||
next if status.has_key?('delete')
|
||||
next unless status['text']
|
||||
status['text'] = status['text'].gsub(/</, "<").gsub(/>/, ">").gsub(/[\t\n\r]/, ' ')
|
||||
block.call(status)
|
||||
end
|
||||
|
||||
stream.on_error do |message|
|
||||
STDERR.puts " --> Twitter error: #{message} <--"
|
||||
end
|
||||
|
||||
stream.on_no_data do |message|
|
||||
STDERR.puts " --> Got no data for awhile; trying to reconnect."
|
||||
EventMachine::stop_event_loop
|
||||
end
|
||||
|
||||
stream.on_max_reconnects do |timeout, retries|
|
||||
STDERR.puts " --> Oops, tried too many times! <--"
|
||||
EventMachine::stop_event_loop
|
||||
end
|
||||
end
|
||||
|
||||
def load_and_run(agents)
|
||||
agents.group_by { |agent| agent.twitter_oauth_token }.each do |oauth_token, agents|
|
||||
filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m }
|
||||
|
||||
agents.each do |agent|
|
||||
agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter|
|
||||
filter_to_agent_map[filter] << agent
|
||||
end
|
||||
end
|
||||
|
||||
recent_tweets = []
|
||||
|
||||
stream!(filter_to_agent_map.keys, agents.first) do |status|
|
||||
if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash)
|
||||
puts "Skipping retweet: #{status["text"]}"
|
||||
elsif recent_tweets.include?(status["id_str"])
|
||||
puts "Skipping duplicate tweet: #{status["text"]}"
|
||||
else
|
||||
recent_tweets << status["id_str"]
|
||||
recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH
|
||||
puts status["text"]
|
||||
filter_to_agent_map.keys.each do |filter|
|
||||
if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson
|
||||
filter_to_agent_map[filter].each do |agent|
|
||||
puts " -> #{agent.name}"
|
||||
agent.process_tweet(filter, status)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
RELOAD_TIMEOUT = 10.minutes
|
||||
DUPLICATE_DETECTION_LENGTH = 1000
|
||||
SEPARATOR = /[^\w_\-]+/
|
||||
|
||||
while true
|
||||
begin
|
||||
agents = Agents::TwitterStreamAgent.all
|
||||
|
||||
EventMachine::run do
|
||||
EventMachine.add_periodic_timer(RELOAD_TIMEOUT) {
|
||||
puts "Reloading EventMachine and all Agents..."
|
||||
EventMachine::stop_event_loop
|
||||
}
|
||||
|
||||
if agents.length == 0
|
||||
puts "No agents found. Will look again in a minute."
|
||||
sleep 60
|
||||
EventMachine::stop_event_loop
|
||||
else
|
||||
puts "Found #{agents.length} agent(s). Loading them now..."
|
||||
load_and_run agents
|
||||
end
|
||||
end
|
||||
|
||||
print "Pausing..."; STDOUT.flush
|
||||
sleep 1
|
||||
puts "done."
|
||||
rescue SignalException, SystemExit
|
||||
EventMachine::stop_event_loop if EventMachine.reactor_running?
|
||||
exit
|
||||
rescue StandardError => e
|
||||
STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n"
|
||||
STDERR.puts "Waiting for a couple of minutes..."
|
||||
sleep 120
|
||||
end
|
||||
end
|
||||
TwitterStream.new.run
|
|
@ -1,6 +1,7 @@
|
|||
Delayed::Worker.destroy_failed_jobs = true
|
||||
Delayed::Worker.max_attempts = 5
|
||||
Delayed::Worker.max_run_time = 20.minutes
|
||||
Delayed::Worker.read_ahead = 5
|
||||
Delayed::Worker.default_priority = 10
|
||||
Delayed::Worker.delay_jobs = !Rails.env.test?
|
||||
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
class RenameDigestEmailToEmailDigest < ActiveRecord::Migration
|
||||
def up
|
||||
sql = <<-SQL
|
||||
UPDATE #{ActiveRecord::Base.connection.quote_table_name('agents')}
|
||||
SET #{ActiveRecord::Base.connection.quote_column_name('type')} = "Agents::EmailDigestAgent"
|
||||
WHERE #{ActiveRecord::Base.connection.quote_column_name('type')} = "Agents::DigestEmailAgent"
|
||||
SQL
|
||||
|
||||
execute sql
|
||||
end
|
||||
|
||||
def down
|
||||
sql = <<-SQL
|
||||
UPDATE #{ActiveRecord::Base.connection.quote_table_name('agents')}
|
||||
SET #{ActiveRecord::Base.connection.quote_column_name('type')} = "Agents::DigestEmailAgent"
|
||||
WHERE #{ActiveRecord::Base.connection.quote_column_name('type')} = "Agents::EmailDigestAgent"
|
||||
SQL
|
||||
|
||||
execute sql
|
||||
end
|
||||
end
|
|
@ -69,7 +69,7 @@ unless user.agents.where(:name => "Rain Notifier").exists?
|
|||
end
|
||||
|
||||
unless user.agents.where(:name => "Morning Digest").exists?
|
||||
Agent.build_for_type("Agents::DigestEmailAgent", user,
|
||||
Agent.build_for_type("Agents::EmailDigestAgent", user,
|
||||
:name => "Morning Digest",
|
||||
:schedule => "6am",
|
||||
:options => { 'subject' => "Your Morning Digest", 'expected_receive_period_in_days' => "30" },
|
||||
|
@ -77,7 +77,7 @@ unless user.agents.where(:name => "Morning Digest").exists?
|
|||
end
|
||||
|
||||
unless user.agents.where(:name => "Afternoon Digest").exists?
|
||||
Agent.build_for_type("Agents::DigestEmailAgent", user,
|
||||
Agent.build_for_type("Agents::EmailDigestAgent", user,
|
||||
:name => "Afternoon Digest",
|
||||
:schedule => "5pm",
|
||||
:options => { 'subject' => "Your Afternoon Digest", 'expected_receive_period_in_days' => "7" },
|
||||
|
|
|
@ -1,4 +1,2 @@
|
|||
web: sudo bundle exec unicorn_rails -c config/unicorn.rb -E production
|
||||
schedule: sudo RAILS_ENV=production bundle exec rails runner bin/schedule.rb
|
||||
twitter: sudo RAILS_ENV=production bundle exec rails runner bin/twitter_stream.rb
|
||||
dj: sudo RAILS_ENV=production bundle exec script/delayed_job run
|
||||
jobs: sudo RAILS_ENV=production bundle exec rails runner bin/threaded.rb
|
87
lib/huginn_scheduler.rb
Normal file
87
lib/huginn_scheduler.rb
Normal file
|
@ -0,0 +1,87 @@
|
|||
require 'rufus/scheduler'
|
||||
|
||||
class HuginnScheduler
|
||||
attr_accessor :mutex
|
||||
|
||||
def initialize
|
||||
@rufus_scheduler = Rufus::Scheduler.new
|
||||
end
|
||||
|
||||
def stop
|
||||
@rufus_scheduler.stop
|
||||
end
|
||||
|
||||
def run_schedule(time)
|
||||
with_mutex do
|
||||
puts "Queuing schedule for #{time}"
|
||||
Agent.delay.run_schedule(time)
|
||||
end
|
||||
end
|
||||
|
||||
def propagate!
|
||||
with_mutex do
|
||||
puts "Queuing event propagation"
|
||||
Agent.delay.receive!
|
||||
end
|
||||
end
|
||||
|
||||
def cleanup_expired_events!
|
||||
with_mutex do
|
||||
puts "Running event cleanup"
|
||||
Event.delay.cleanup_expired!
|
||||
end
|
||||
end
|
||||
|
||||
def with_mutex
|
||||
ActiveRecord::Base.connection_pool.with_connection do
|
||||
mutex.synchronize do
|
||||
yield
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def run!
|
||||
self.mutex = Mutex.new
|
||||
|
||||
tzinfo_friendly_timezone = ActiveSupport::TimeZone::MAPPING[ENV['TIMEZONE'].present? ? ENV['TIMEZONE'] : "Pacific Time (US & Canada)"]
|
||||
|
||||
# Schedule event propagation.
|
||||
|
||||
@rufus_scheduler.every '1m' do
|
||||
propagate!
|
||||
end
|
||||
|
||||
# Schedule event cleanup.
|
||||
|
||||
@rufus_scheduler.cron "0 0 * * * " + tzinfo_friendly_timezone do
|
||||
cleanup_expired_events!
|
||||
end
|
||||
|
||||
# Schedule repeating events.
|
||||
|
||||
%w[1m 2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule|
|
||||
@rufus_scheduler.every schedule do
|
||||
run_schedule "every_#{schedule}"
|
||||
end
|
||||
end
|
||||
|
||||
# Schedule events for specific times.
|
||||
|
||||
# Times are assumed to be in PST for now. Can store a user#timezone later.
|
||||
24.times do |hour|
|
||||
@rufus_scheduler.cron "0 #{hour} * * * " + tzinfo_friendly_timezone do
|
||||
if hour == 0
|
||||
run_schedule "midnight"
|
||||
elsif hour < 12
|
||||
run_schedule "#{hour}am"
|
||||
elsif hour == 12
|
||||
run_schedule "noon"
|
||||
else
|
||||
run_schedule "#{hour - 12}pm"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@rufus_scheduler.join
|
||||
end
|
||||
end
|
125
lib/twitter_stream.rb
Normal file
125
lib/twitter_stream.rb
Normal file
|
@ -0,0 +1,125 @@
|
|||
require 'cgi'
|
||||
require 'json'
|
||||
require 'twitter/json_stream'
|
||||
require 'em-http-request'
|
||||
require 'pp'
|
||||
|
||||
class TwitterStream
|
||||
def initialize
|
||||
@running = true
|
||||
end
|
||||
|
||||
def stop
|
||||
@running = false
|
||||
EventMachine::stop_event_loop if EventMachine.reactor_running?
|
||||
end
|
||||
|
||||
def stream!(filters, agent, &block)
|
||||
stream = Twitter::JSONStream.connect(
|
||||
:path => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}",
|
||||
:ssl => true,
|
||||
:oauth => {
|
||||
:consumer_key => agent.twitter_consumer_key,
|
||||
:consumer_secret => agent.twitter_consumer_secret,
|
||||
:access_key => agent.twitter_oauth_token,
|
||||
:access_secret => agent.twitter_oauth_token_secret
|
||||
}
|
||||
)
|
||||
|
||||
stream.each_item do |status|
|
||||
status = JSON.parse(status) if status.is_a?(String)
|
||||
next unless status
|
||||
next if status.has_key?('delete')
|
||||
next unless status['text']
|
||||
status['text'] = status['text'].gsub(/</, "<").gsub(/>/, ">").gsub(/[\t\n\r]/, ' ')
|
||||
block.call(status)
|
||||
end
|
||||
|
||||
stream.on_error do |message|
|
||||
STDERR.puts " --> Twitter error: #{message} <--"
|
||||
end
|
||||
|
||||
stream.on_no_data do |message|
|
||||
STDERR.puts " --> Got no data for awhile; trying to reconnect."
|
||||
EventMachine::stop_event_loop
|
||||
end
|
||||
|
||||
stream.on_max_reconnects do |timeout, retries|
|
||||
STDERR.puts " --> Oops, tried too many times! <--"
|
||||
EventMachine::stop_event_loop
|
||||
end
|
||||
end
|
||||
|
||||
def load_and_run(agents)
|
||||
agents.group_by { |agent| agent.twitter_oauth_token }.each do |oauth_token, agents|
|
||||
filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m }
|
||||
|
||||
agents.each do |agent|
|
||||
agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter|
|
||||
filter_to_agent_map[filter] << agent
|
||||
end
|
||||
end
|
||||
|
||||
recent_tweets = []
|
||||
|
||||
stream!(filter_to_agent_map.keys, agents.first) do |status|
|
||||
if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash)
|
||||
puts "Skipping retweet: #{status["text"]}"
|
||||
elsif recent_tweets.include?(status["id_str"])
|
||||
puts "Skipping duplicate tweet: #{status["text"]}"
|
||||
else
|
||||
recent_tweets << status["id_str"]
|
||||
recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH
|
||||
puts status["text"]
|
||||
filter_to_agent_map.keys.each do |filter|
|
||||
if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson
|
||||
filter_to_agent_map[filter].each do |agent|
|
||||
puts " -> #{agent.name}"
|
||||
agent.process_tweet(filter, status)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
RELOAD_TIMEOUT = 10.minutes
|
||||
DUPLICATE_DETECTION_LENGTH = 1000
|
||||
SEPARATOR = /[^\w_\-]+/
|
||||
|
||||
def run
|
||||
while @running
|
||||
begin
|
||||
agents = Agents::TwitterStreamAgent.all
|
||||
|
||||
EventMachine::run do
|
||||
EventMachine.add_periodic_timer(RELOAD_TIMEOUT) {
|
||||
puts "Reloading EventMachine and all Agents..."
|
||||
EventMachine::stop_event_loop
|
||||
}
|
||||
|
||||
if agents.length == 0
|
||||
puts "No agents found. Will look again in a minute."
|
||||
sleep 60
|
||||
EventMachine::stop_event_loop
|
||||
else
|
||||
puts "Found #{agents.length} agent(s). Loading them now..."
|
||||
load_and_run agents
|
||||
end
|
||||
end
|
||||
|
||||
print "Pausing..."; STDOUT.flush
|
||||
sleep 1
|
||||
puts "done."
|
||||
rescue SignalException, SystemExit
|
||||
@running = false
|
||||
EventMachine::stop_event_loop if EventMachine.reactor_running?
|
||||
rescue StandardError => e
|
||||
STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n"
|
||||
STDERR.puts "Waiting for a couple of minutes..."
|
||||
sleep 120
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,12 +1,12 @@
|
|||
require 'spec_helper'
|
||||
|
||||
describe Agents::DigestEmailAgent do
|
||||
describe Agents::EmailDigestAgent do
|
||||
def get_message_part(mail, content_type)
|
||||
mail.body.parts.find { |p| p.content_type.match content_type }.body.raw_source
|
||||
end
|
||||
|
||||
before do
|
||||
@checker = Agents::DigestEmailAgent.new(:name => "something", :options => { :expected_receive_period_in_days => 2, :subject => "something interesting" })
|
||||
@checker = Agents::EmailDigestAgent.new(:name => "something", :options => { :expected_receive_period_in_days => 2, :subject => "something interesting" })
|
||||
@checker.user = users(:bob)
|
||||
@checker.save!
|
||||
end
|
||||
|
@ -27,14 +27,14 @@ describe Agents::DigestEmailAgent do
|
|||
event2.payload = { :data => "Something else you should know about" }
|
||||
event2.save!
|
||||
|
||||
Agents::DigestEmailAgent.async_receive(@checker.id, [event1.id, event2.id])
|
||||
Agents::EmailDigestAgent.async_receive(@checker.id, [event1.id, event2.id])
|
||||
@checker.reload.memory[:queue].should == [{ 'data' => "Something you should know about" }, { 'data' => "Something else you should know about" }]
|
||||
end
|
||||
end
|
||||
|
||||
describe "#check" do
|
||||
it "should send an email" do
|
||||
Agents::DigestEmailAgent.async_check(@checker.id)
|
||||
Agents::EmailDigestAgent.async_check(@checker.id)
|
||||
ActionMailer::Base.deliveries.should == []
|
||||
|
||||
@checker.memory[:queue] = [{ :data => "Something you should know about" },
|
||||
|
@ -44,7 +44,7 @@ describe Agents::DigestEmailAgent do
|
|||
@checker.memory[:events] = [1,2,3,4]
|
||||
@checker.save!
|
||||
|
||||
Agents::DigestEmailAgent.async_check(@checker.id)
|
||||
Agents::EmailDigestAgent.async_check(@checker.id)
|
||||
ActionMailer::Base.deliveries.last.to.should == ["bob@example.com"]
|
||||
ActionMailer::Base.deliveries.last.subject.should == "something interesting"
|
||||
get_message_part(ActionMailer::Base.deliveries.last, /plain/).strip.should == "Event\n data: Something you should know about\n\nFoo\n bar: 2\n url: http://google.com\n\nhi\n woah: there\n\nEvent\n test: 2"
|
||||
|
@ -61,7 +61,7 @@ describe Agents::DigestEmailAgent do
|
|||
Agent.receive!
|
||||
@checker.reload.memory[:queue].should_not be_empty
|
||||
|
||||
Agents::DigestEmailAgent.async_check(@checker.id)
|
||||
Agents::EmailDigestAgent.async_check(@checker.id)
|
||||
|
||||
plain_email_text = get_message_part(ActionMailer::Base.deliveries.last, /plain/).strip
|
||||
html_email_text = get_message_part(ActionMailer::Base.deliveries.last, /html/).strip
|
||||
|
@ -72,4 +72,4 @@ describe Agents::DigestEmailAgent do
|
|||
@checker.reload.memory[:queue].should be_empty
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
52
spec/models/agents/mqtt_agent_spec.rb
Normal file
52
spec/models/agents/mqtt_agent_spec.rb
Normal file
|
@ -0,0 +1,52 @@
|
|||
require 'spec_helper'
|
||||
require 'mqtt'
|
||||
require './spec/support/fake_mqtt_server'
|
||||
|
||||
describe Agents::MqttAgent do
|
||||
|
||||
before :each do
|
||||
@error_log = StringIO.new
|
||||
|
||||
@server = MQTT::FakeServer.new(41234, '127.0.0.1')
|
||||
@server.just_one = true
|
||||
@server.logger = Logger.new(@error_log)
|
||||
@server.logger.level = Logger::DEBUG
|
||||
@server.start
|
||||
|
||||
@valid_params = {
|
||||
'uri' => "mqtt://#{@server.address}:#{@server.port}",
|
||||
'topic' => '/#',
|
||||
'max_read_time' => '1',
|
||||
'expected_update_period_in_days' => "2"
|
||||
}
|
||||
|
||||
@checker = Agents::MqttAgent.new(
|
||||
:name => "somename",
|
||||
:options => @valid_params,
|
||||
:schedule => "midnight",
|
||||
)
|
||||
@checker.user = users(:jane)
|
||||
@checker.save!
|
||||
end
|
||||
|
||||
after :each do
|
||||
@server.stop
|
||||
end
|
||||
|
||||
describe "#check" do
|
||||
it "should check that initial run creates an event" do
|
||||
expect { @checker.check }.to change { Event.count }.by(2)
|
||||
end
|
||||
end
|
||||
|
||||
describe "#working?" do
|
||||
it "checks if its generating events as scheduled" do
|
||||
@checker.should_not be_working
|
||||
@checker.check
|
||||
@checker.reload.should be_working
|
||||
three_days_from_now = 3.days.from_now
|
||||
stub(Time).now { three_days_from_now }
|
||||
@checker.should_not be_working
|
||||
end
|
||||
end
|
||||
end
|
137
spec/support/fake_mqtt_server.rb
Normal file
137
spec/support/fake_mqtt_server.rb
Normal file
|
@ -0,0 +1,137 @@
|
|||
#!/usr/bin/env ruby
|
||||
#
|
||||
# This is a 'fake' MQTT server to help with testing client implementations
|
||||
#
|
||||
# See https://github.com/njh/ruby-mqtt/blob/master/spec/fake_server.rb
|
||||
#
|
||||
# It behaves in the following ways:
|
||||
# * Responses to CONNECT with a successful CONACK
|
||||
# * Responses to PUBLISH by echoing the packet back
|
||||
# * Responses to SUBSCRIBE with SUBACK and a PUBLISH to the topic
|
||||
# * Responses to PINGREQ with PINGRESP
|
||||
# * Responses to DISCONNECT by closing the socket
|
||||
#
|
||||
# It has the following restrictions
|
||||
# * Doesn't deal with timeouts
|
||||
# * Only handles a single connection at a time
|
||||
#
|
||||
|
||||
$:.unshift File.dirname(__FILE__)+'/../lib'
|
||||
|
||||
require 'logger'
|
||||
require 'socket'
|
||||
require 'mqtt'
|
||||
|
||||
|
||||
class MQTT::FakeServer
|
||||
attr_reader :address, :port
|
||||
attr_reader :last_publish
|
||||
attr_reader :thread
|
||||
attr_reader :pings_received
|
||||
attr_accessor :just_one
|
||||
attr_accessor :logger
|
||||
|
||||
# Create a new fake MQTT server
|
||||
#
|
||||
# If no port is given, bind to a random port number
|
||||
# If no bind address is given, bind to localhost
|
||||
def initialize(port=nil, bind_address='127.0.0.1')
|
||||
@port = port
|
||||
@address = bind_address
|
||||
end
|
||||
|
||||
# Get the logger used by the server
|
||||
def logger
|
||||
@logger ||= Logger.new(STDOUT)
|
||||
end
|
||||
|
||||
# Start the thread and open the socket that will process client connections
|
||||
def start
|
||||
@socket ||= TCPServer.new(@address, @port)
|
||||
@address = @socket.addr[3]
|
||||
@port = @socket.addr[1]
|
||||
@thread ||= Thread.new do
|
||||
logger.info "Started a fake MQTT server on #{@address}:#{@port}"
|
||||
loop do
|
||||
# Wait for a client to connect
|
||||
client = @socket.accept
|
||||
@pings_received = 0
|
||||
handle_client(client)
|
||||
break if just_one
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Stop the thread and close the socket
|
||||
def stop
|
||||
logger.info "Stopping fake MQTT server"
|
||||
@socket.close unless @socket.nil?
|
||||
@socket = nil
|
||||
|
||||
@thread.kill if @thread and @thread.alive?
|
||||
@thread = nil
|
||||
end
|
||||
|
||||
# Start the server thread and wait for it to finish (possibly never)
|
||||
def run
|
||||
start
|
||||
begin
|
||||
@thread.join
|
||||
rescue Interrupt
|
||||
stop
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
protected
|
||||
|
||||
# Given a client socket, process MQTT packets from the client
|
||||
def handle_client(client)
|
||||
loop do
|
||||
packet = MQTT::Packet.read(client)
|
||||
logger.debug packet.inspect
|
||||
|
||||
case packet
|
||||
when MQTT::Packet::Connect
|
||||
client.write MQTT::Packet::Connack.new(:return_code => 0)
|
||||
when MQTT::Packet::Publish
|
||||
client.write packet
|
||||
@last_publish = packet
|
||||
when MQTT::Packet::Subscribe
|
||||
client.write MQTT::Packet::Suback.new(
|
||||
:message_id => packet.message_id,
|
||||
:granted_qos => 0
|
||||
)
|
||||
topic = packet.topics[0][0]
|
||||
client.write MQTT::Packet::Publish.new(
|
||||
:topic => topic,
|
||||
:payload => "hello #{topic}",
|
||||
:retain => true
|
||||
)
|
||||
client.write MQTT::Packet::Publish.new(
|
||||
:topic => topic,
|
||||
:payload => "did you know about #{topic}",
|
||||
:retain => true
|
||||
)
|
||||
|
||||
when MQTT::Packet::Pingreq
|
||||
client.write MQTT::Packet::Pingresp.new
|
||||
@pings_received += 1
|
||||
when MQTT::Packet::Disconnect
|
||||
client.close
|
||||
break
|
||||
end
|
||||
end
|
||||
|
||||
rescue MQTT::ProtocolException => e
|
||||
logger.warn "Protocol error, closing connection: #{e}"
|
||||
client.close
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
if __FILE__ == $0
|
||||
server = MQTT::FakeServer.new(MQTT::DEFAULT_PORT)
|
||||
server.logger.level = Logger::DEBUG
|
||||
server.run
|
||||
end
|
Loading…
Add table
Reference in a new issue