2014-05-12 00:04:43 +02:00
|
|
|
require 'cgi'
|
|
|
|
require 'json'
|
|
|
|
require 'em-http-request'
|
|
|
|
require 'pp'
|
|
|
|
|
|
|
|
class TwitterStream
|
|
|
|
def initialize
|
|
|
|
@running = true
|
|
|
|
end
|
|
|
|
|
|
|
|
def stop
|
|
|
|
@running = false
|
|
|
|
end
|
|
|
|
|
|
|
|
def stream!(filters, agent, &block)
|
|
|
|
stream = Twitter::JSONStream.connect(
|
|
|
|
:path => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}",
|
|
|
|
:ssl => true,
|
|
|
|
:oauth => {
|
|
|
|
:consumer_key => agent.twitter_consumer_key,
|
|
|
|
:consumer_secret => agent.twitter_consumer_secret,
|
|
|
|
:access_key => agent.twitter_oauth_token,
|
|
|
|
:access_secret => agent.twitter_oauth_token_secret
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
stream.each_item do |status|
|
|
|
|
status = JSON.parse(status) if status.is_a?(String)
|
|
|
|
next unless status
|
|
|
|
next if status.has_key?('delete')
|
|
|
|
next unless status['text']
|
|
|
|
status['text'] = status['text'].gsub(/</, "<").gsub(/>/, ">").gsub(/[\t\n\r]/, ' ')
|
|
|
|
block.call(status)
|
|
|
|
end
|
|
|
|
|
|
|
|
stream.on_error do |message|
|
|
|
|
STDERR.puts " --> Twitter error: #{message} <--"
|
|
|
|
end
|
|
|
|
|
|
|
|
stream.on_no_data do |message|
|
|
|
|
STDERR.puts " --> Got no data for awhile; trying to reconnect."
|
|
|
|
EventMachine::stop_event_loop
|
|
|
|
end
|
|
|
|
|
|
|
|
stream.on_max_reconnects do |timeout, retries|
|
|
|
|
STDERR.puts " --> Oops, tried too many times! <--"
|
|
|
|
EventMachine::stop_event_loop
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
def load_and_run(agents)
|
|
|
|
agents.group_by { |agent| agent.twitter_oauth_token }.each do |oauth_token, agents|
|
|
|
|
filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m }
|
|
|
|
|
|
|
|
agents.each do |agent|
|
|
|
|
agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter|
|
|
|
|
filter_to_agent_map[filter] << agent
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
recent_tweets = []
|
|
|
|
|
|
|
|
stream!(filter_to_agent_map.keys, agents.first) do |status|
|
|
|
|
if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash)
|
|
|
|
puts "Skipping retweet: #{status["text"]}"
|
|
|
|
elsif recent_tweets.include?(status["id_str"])
|
|
|
|
puts "Skipping duplicate tweet: #{status["text"]}"
|
|
|
|
else
|
|
|
|
recent_tweets << status["id_str"]
|
|
|
|
recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH
|
|
|
|
puts status["text"]
|
|
|
|
filter_to_agent_map.keys.each do |filter|
|
|
|
|
if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson
|
|
|
|
filter_to_agent_map[filter].each do |agent|
|
|
|
|
puts " -> #{agent.name}"
|
|
|
|
agent.process_tweet(filter, status)
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
RELOAD_TIMEOUT = 10.minutes
|
|
|
|
DUPLICATE_DETECTION_LENGTH = 1000
|
|
|
|
SEPARATOR = /[^\w_\-]+/
|
|
|
|
|
|
|
|
def run
|
2014-09-23 21:22:03 -07:00
|
|
|
if Agents::TwitterStreamAgent.dependencies_missing?
|
|
|
|
STDERR.puts Agents::TwitterStreamAgent.twitter_dependencies_missing
|
|
|
|
STDERR.flush
|
|
|
|
return
|
|
|
|
end
|
|
|
|
|
|
|
|
require 'twitter/json_stream'
|
|
|
|
|
2014-05-12 00:04:43 +02:00
|
|
|
while @running
|
|
|
|
begin
|
2014-08-17 15:22:49 +02:00
|
|
|
agents = Agents::TwitterStreamAgent.active.all
|
2014-05-12 00:04:43 +02:00
|
|
|
|
|
|
|
EventMachine::run do
|
2014-08-17 15:22:49 +02:00
|
|
|
EventMachine.add_periodic_timer(1) {
|
|
|
|
EventMachine::stop_event_loop if !@running
|
|
|
|
}
|
|
|
|
|
2014-05-12 00:04:43 +02:00
|
|
|
EventMachine.add_periodic_timer(RELOAD_TIMEOUT) {
|
|
|
|
puts "Reloading EventMachine and all Agents..."
|
|
|
|
EventMachine::stop_event_loop
|
|
|
|
}
|
|
|
|
|
|
|
|
if agents.length == 0
|
|
|
|
puts "No agents found. Will look again in a minute."
|
2014-08-17 15:22:49 +02:00
|
|
|
EventMachine.add_timer(60) {
|
|
|
|
EventMachine::stop_event_loop
|
|
|
|
}
|
2014-05-12 00:04:43 +02:00
|
|
|
else
|
|
|
|
puts "Found #{agents.length} agent(s). Loading them now..."
|
|
|
|
load_and_run agents
|
|
|
|
end
|
|
|
|
end
|
|
|
|
rescue SignalException, SystemExit
|
|
|
|
@running = false
|
|
|
|
EventMachine::stop_event_loop if EventMachine.reactor_running?
|
|
|
|
rescue StandardError => e
|
|
|
|
STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n"
|
|
|
|
STDERR.puts "Waiting for a couple of minutes..."
|
|
|
|
sleep 120
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|