Provide an optional threaded background worker

Due to the three background workers (scheduler, twitter stream and delayed job)
huginn needs a lot of memory to run (about 520MB on my dev machine). This PR
introduces an optional threaded background worker which combines the three current
separated processed into one (reducing the memory footprint to ~260MB).
Since just one instance of the of every background processor is running at a time
there should not be any threading related issues.
The main gotcha of this is, that it's most likely not possible to run multiple
delayed job workers concurrently. The ultimate solution would probably be switching
to sidekiq with sidetiq as scheduler, but that is a different task :)

When running on MRI the GIL should not be an issue because it is released for most IO
bound operations (waiting for the database/website/sleeping).
This commit is contained in:
Dominik Sander 2014-05-12 00:04:43 +02:00
parent ead506ccac
commit 8a51dbc4bd
7 changed files with 275 additions and 194 deletions

View file

@ -4,6 +4,10 @@ schedule: bundle exec rails runner bin/schedule.rb
twitter: bundle exec rails runner bin/twitter_stream.rb
dj: bundle exec script/delayed_job run
# Procfile for the exprimental threaded scheduler, twitter stream and delayed job
#web: bundle exec rails server
#jobs: bundle exec rails runner bin/threaded.rb
# Possible Profile configuration for production:
# web: bundle exec unicorn -c config/unicorn/production.rb
# schedule: bundle exec rails runner bin/schedule.rb

View file

@ -11,87 +11,5 @@ unless defined?(Rails)
exit 1
end
require 'rufus/scheduler'
class HuginnScheduler
attr_accessor :mutex
def run_schedule(time)
with_mutex do
puts "Queuing schedule for #{time}"
Agent.delay.run_schedule(time)
end
end
def propagate!
with_mutex do
puts "Queuing event propagation"
Agent.delay.receive!
end
end
def cleanup_expired_events!
with_mutex do
puts "Running event cleanup"
Event.delay.cleanup_expired!
end
end
def with_mutex
ActiveRecord::Base.connection_pool.with_connection do
mutex.synchronize do
yield
end
end
end
def run!
self.mutex = Mutex.new
rufus_scheduler = Rufus::Scheduler.new
tzinfo_friendly_timezone = ActiveSupport::TimeZone::MAPPING[ENV['TIMEZONE'].present? ? ENV['TIMEZONE'] : "Pacific Time (US & Canada)"]
# Schedule event propagation.
rufus_scheduler.every '1m' do
propagate!
end
# Schedule event cleanup.
rufus_scheduler.cron "0 0 * * * " + tzinfo_friendly_timezone do
cleanup_expired_events!
end
# Schedule repeating events.
%w[1m 2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule|
rufus_scheduler.every schedule do
run_schedule "every_#{schedule}"
end
end
# Schedule events for specific times.
# Times are assumed to be in PST for now. Can store a user#timezone later.
24.times do |hour|
rufus_scheduler.cron "0 #{hour} * * * " + tzinfo_friendly_timezone do
if hour == 0
run_schedule "midnight"
elsif hour < 12
run_schedule "#{hour}am"
elsif hour == 12
run_schedule "noon"
else
run_schedule "#{hour - 12}pm"
end
end
end
rufus_scheduler.join
end
end
scheduler = HuginnScheduler.new
scheduler.run!

57
bin/threaded.rb Normal file
View file

@ -0,0 +1,57 @@
require 'thread'
def stop
puts 'Exiting...'
@scheduler.stop
@dj.stop
@stream.stop
end
def safely(&block)
begin
yield block
rescue StandardError => e
STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n"
STDERR.puts "Terminating myself ..."
stop
end
end
threads = []
threads << Thread.new do
safely do
@stream = TwitterStream.new
@stream.run
puts "Twitter stream stopped ..."
end
end
threads << Thread.new do
safely do
@scheduler = HuginnScheduler.new
@scheduler.run!
puts "Scheduler stopped ..."
end
end
threads << Thread.new do
safely do
require 'delayed/command'
@dj = Delayed::Worker.new
@dj.start
puts "Delayed job stopped ..."
end
end
# We need to wait a bit to let delayed_job set it's traps so we can override them
sleep 0.5
trap('TERM') do
stop
end
trap('INT') do
stop
end
threads.collect { |t| t.join }

View file

@ -12,115 +12,4 @@ unless defined?(Rails)
exit 1
end
require 'cgi'
require 'json'
require 'twitter/json_stream'
require 'em-http-request'
require 'pp'
def stream!(filters, agent, &block)
stream = Twitter::JSONStream.connect(
:path => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}",
:ssl => true,
:oauth => {
:consumer_key => agent.twitter_consumer_key,
:consumer_secret => agent.twitter_consumer_secret,
:access_key => agent.twitter_oauth_token,
:access_secret => agent.twitter_oauth_token_secret
}
)
stream.each_item do |status|
status = JSON.parse(status) if status.is_a?(String)
next unless status
next if status.has_key?('delete')
next unless status['text']
status['text'] = status['text'].gsub(/&lt;/, "<").gsub(/&gt;/, ">").gsub(/[\t\n\r]/, ' ')
block.call(status)
end
stream.on_error do |message|
STDERR.puts " --> Twitter error: #{message} <--"
end
stream.on_no_data do |message|
STDERR.puts " --> Got no data for awhile; trying to reconnect."
EventMachine::stop_event_loop
end
stream.on_max_reconnects do |timeout, retries|
STDERR.puts " --> Oops, tried too many times! <--"
EventMachine::stop_event_loop
end
end
def load_and_run(agents)
agents.group_by { |agent| agent.twitter_oauth_token }.each do |oauth_token, agents|
filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m }
agents.each do |agent|
agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter|
filter_to_agent_map[filter] << agent
end
end
recent_tweets = []
stream!(filter_to_agent_map.keys, agents.first) do |status|
if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash)
puts "Skipping retweet: #{status["text"]}"
elsif recent_tweets.include?(status["id_str"])
puts "Skipping duplicate tweet: #{status["text"]}"
else
recent_tweets << status["id_str"]
recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH
puts status["text"]
filter_to_agent_map.keys.each do |filter|
if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson
filter_to_agent_map[filter].each do |agent|
puts " -> #{agent.name}"
agent.process_tweet(filter, status)
end
end
end
end
end
end
end
RELOAD_TIMEOUT = 10.minutes
DUPLICATE_DETECTION_LENGTH = 1000
SEPARATOR = /[^\w_\-]+/
while true
begin
agents = Agents::TwitterStreamAgent.all
EventMachine::run do
EventMachine.add_periodic_timer(RELOAD_TIMEOUT) {
puts "Reloading EventMachine and all Agents..."
EventMachine::stop_event_loop
}
if agents.length == 0
puts "No agents found. Will look again in a minute."
sleep 60
EventMachine::stop_event_loop
else
puts "Found #{agents.length} agent(s). Loading them now..."
load_and_run agents
end
end
print "Pausing..."; STDOUT.flush
sleep 1
puts "done."
rescue SignalException, SystemExit
EventMachine::stop_event_loop if EventMachine.reactor_running?
exit
rescue StandardError => e
STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n"
STDERR.puts "Waiting for a couple of minutes..."
sleep 120
end
end
TwitterStream.new.run

View file

@ -1,6 +1,7 @@
Delayed::Worker.destroy_failed_jobs = true
Delayed::Worker.max_attempts = 5
Delayed::Worker.max_run_time = 20.minutes
Delayed::Worker.read_ahead = 5
Delayed::Worker.default_priority = 10
Delayed::Worker.delay_jobs = !Rails.env.test?

87
lib/huginn_scheduler.rb Normal file
View file

@ -0,0 +1,87 @@
require 'rufus/scheduler'
class HuginnScheduler
attr_accessor :mutex
def initialize
@rufus_scheduler = Rufus::Scheduler.new
end
def stop
@rufus_scheduler.stop
end
def run_schedule(time)
with_mutex do
puts "Queuing schedule for #{time}"
Agent.delay.run_schedule(time)
end
end
def propagate!
with_mutex do
puts "Queuing event propagation"
Agent.delay.receive!
end
end
def cleanup_expired_events!
with_mutex do
puts "Running event cleanup"
Event.delay.cleanup_expired!
end
end
def with_mutex
ActiveRecord::Base.connection_pool.with_connection do
mutex.synchronize do
yield
end
end
end
def run!
self.mutex = Mutex.new
tzinfo_friendly_timezone = ActiveSupport::TimeZone::MAPPING[ENV['TIMEZONE'].present? ? ENV['TIMEZONE'] : "Pacific Time (US & Canada)"]
# Schedule event propagation.
@rufus_scheduler.every '1m' do
propagate!
end
# Schedule event cleanup.
@rufus_scheduler.cron "0 0 * * * " + tzinfo_friendly_timezone do
cleanup_expired_events!
end
# Schedule repeating events.
%w[1m 2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule|
@rufus_scheduler.every schedule do
run_schedule "every_#{schedule}"
end
end
# Schedule events for specific times.
# Times are assumed to be in PST for now. Can store a user#timezone later.
24.times do |hour|
@rufus_scheduler.cron "0 #{hour} * * * " + tzinfo_friendly_timezone do
if hour == 0
run_schedule "midnight"
elsif hour < 12
run_schedule "#{hour}am"
elsif hour == 12
run_schedule "noon"
else
run_schedule "#{hour - 12}pm"
end
end
end
@rufus_scheduler.join
end
end

125
lib/twitter_stream.rb Normal file
View file

@ -0,0 +1,125 @@
require 'cgi'
require 'json'
require 'twitter/json_stream'
require 'em-http-request'
require 'pp'
class TwitterStream
def initialize
@running = true
end
def stop
@running = false
EventMachine::stop_event_loop if EventMachine.reactor_running?
end
def stream!(filters, agent, &block)
stream = Twitter::JSONStream.connect(
:path => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}",
:ssl => true,
:oauth => {
:consumer_key => agent.twitter_consumer_key,
:consumer_secret => agent.twitter_consumer_secret,
:access_key => agent.twitter_oauth_token,
:access_secret => agent.twitter_oauth_token_secret
}
)
stream.each_item do |status|
status = JSON.parse(status) if status.is_a?(String)
next unless status
next if status.has_key?('delete')
next unless status['text']
status['text'] = status['text'].gsub(/&lt;/, "<").gsub(/&gt;/, ">").gsub(/[\t\n\r]/, ' ')
block.call(status)
end
stream.on_error do |message|
STDERR.puts " --> Twitter error: #{message} <--"
end
stream.on_no_data do |message|
STDERR.puts " --> Got no data for awhile; trying to reconnect."
EventMachine::stop_event_loop
end
stream.on_max_reconnects do |timeout, retries|
STDERR.puts " --> Oops, tried too many times! <--"
EventMachine::stop_event_loop
end
end
def load_and_run(agents)
agents.group_by { |agent| agent.twitter_oauth_token }.each do |oauth_token, agents|
filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m }
agents.each do |agent|
agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter|
filter_to_agent_map[filter] << agent
end
end
recent_tweets = []
stream!(filter_to_agent_map.keys, agents.first) do |status|
if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash)
puts "Skipping retweet: #{status["text"]}"
elsif recent_tweets.include?(status["id_str"])
puts "Skipping duplicate tweet: #{status["text"]}"
else
recent_tweets << status["id_str"]
recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH
puts status["text"]
filter_to_agent_map.keys.each do |filter|
if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson
filter_to_agent_map[filter].each do |agent|
puts " -> #{agent.name}"
agent.process_tweet(filter, status)
end
end
end
end
end
end
end
RELOAD_TIMEOUT = 10.minutes
DUPLICATE_DETECTION_LENGTH = 1000
SEPARATOR = /[^\w_\-]+/
def run
while @running
begin
agents = Agents::TwitterStreamAgent.all
EventMachine::run do
EventMachine.add_periodic_timer(RELOAD_TIMEOUT) {
puts "Reloading EventMachine and all Agents..."
EventMachine::stop_event_loop
}
if agents.length == 0
puts "No agents found. Will look again in a minute."
sleep 60
EventMachine::stop_event_loop
else
puts "Found #{agents.length} agent(s). Loading them now..."
load_and_run agents
end
end
print "Pausing..."; STDOUT.flush
sleep 1
puts "done."
rescue SignalException, SystemExit
@running = false
EventMachine::stop_event_loop if EventMachine.reactor_running?
rescue StandardError => e
STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n"
STDERR.puts "Waiting for a couple of minutes..."
sleep 120
end
end
end
end