From eead335705fdcbb3310adb47ecf798574ab093f6 Mon Sep 17 00:00:00 2001 From: Andrew Cantino Date: Sat, 23 Mar 2013 15:12:39 -0700 Subject: [PATCH] switch to class methods and ids to shorten the payload size of serialized delayed_jobs --- app/controllers/agents_controller.rb | 3 +- app/models/agent.rb | 150 ++++++++++-------- spec/models/agent_spec.rb | 32 ++-- spec/models/agents/digest_email_agent_spec.rb | 6 +- spec/models/agents/trigger_agent_spec.rb | 2 +- 5 files changed, 104 insertions(+), 89 deletions(-) diff --git a/app/controllers/agents_controller.rb b/app/controllers/agents_controller.rb index 0aa1c449..e51414b1 100644 --- a/app/controllers/agents_controller.rb +++ b/app/controllers/agents_controller.rb @@ -9,8 +9,7 @@ class AgentsController < ApplicationController end def run - @agent = current_user.agents.find(params[:id]) - @agent.async_check + Agent.async_check(current_user.agents.find(params[:id]).id) redirect_to agents_path, notice: "Agent run queued" end diff --git a/app/models/agent.rb b/app/models/agent.rb index abbd46c6..c5e7570a 100644 --- a/app/models/agent.rb +++ b/app/models/agent.rb @@ -100,20 +100,6 @@ class Agent < ActiveRecord::Base @memoized_last_event_at ||= events.select(:created_at).first.try(:created_at) end - def async_check - check - self.last_check_at = Time.now - save! - end - handle_asynchronously :async_check #, :priority => 10, :run_at => Proc.new { 5.minutes.from_now } - - def async_receive(event_ids) - receive(Event.where(:id => event_ids)) - self.last_receive_at = Time.now - save! - end - handle_asynchronously :async_receive #, :priority => 10, :run_at => Proc.new { 5.minutes.from_now } - def default_schedule self.class.default_schedule end @@ -135,67 +121,93 @@ class Agent < ActiveRecord::Base end # Class Methods - - def self.cannot_be_scheduled! - @cannot_be_scheduled = true - end - - def self.cannot_be_scheduled? - !!@cannot_be_scheduled - end - - def self.default_schedule(schedule = nil) - @default_schedule = schedule unless schedule.nil? - @default_schedule - end - - def self.cannot_receive_events! - @cannot_receive_events = true - end - - def self.cannot_receive_events? - !!@cannot_receive_events - end - - def self.receive! - sql = Agent. - select("agents.id AS receiver_agent_id, sources.id AS source_agent_id, events.id AS event_id"). - joins("JOIN links ON (links.receiver_id = agents.id)"). - joins("JOIN agents AS sources ON (links.source_id = sources.id)"). - joins("JOIN events ON (events.agent_id = sources.id)"). - where("agents.last_checked_event_id IS NULL OR events.id > agents.last_checked_event_id").to_sql - - agents_to_events = {} - Agent.connection.select_rows(sql).each do |receiver_agent_id, source_agent_id, event_id| - agents_to_events[receiver_agent_id] ||= [] - agents_to_events[receiver_agent_id] << event_id + class << self + def cannot_be_scheduled! + @cannot_be_scheduled = true end - event_ids = agents_to_events.values.flatten.uniq.compact - - Agent.where(:id => agents_to_events.keys).each do |agent| - agent.update_attribute :last_checked_event_id, event_ids.max - agent.async_receive(agents_to_events[agent.id].uniq) + def cannot_be_scheduled? + !!@cannot_be_scheduled end - { - :agent_count => agents_to_events.keys.length, - :event_count => event_ids.length - } - end - - def self.run_schedule(schedule) - types = where(:schedule => schedule).group(:type).pluck(:type) - types.each do |type| - type.constantize.bulk_check(schedule) + def default_schedule(schedule = nil) + @default_schedule = schedule unless schedule.nil? + @default_schedule end - end - # You can override this to define a custom bulk_check for your type of Agent. - def self.bulk_check(schedule) - raise "Call #bulk_check on the appropriate subclass of Agent" if self == Agent - where(:schedule => schedule).find_each do |agent| - agent.async_check + def cannot_receive_events! + @cannot_receive_events = true end + + def cannot_receive_events? + !!@cannot_receive_events + end + + def receive! + sql = Agent. + select("agents.id AS receiver_agent_id, sources.id AS source_agent_id, events.id AS event_id"). + joins("JOIN links ON (links.receiver_id = agents.id)"). + joins("JOIN agents AS sources ON (links.source_id = sources.id)"). + joins("JOIN events ON (events.agent_id = sources.id)"). + where("agents.last_checked_event_id IS NULL OR events.id > agents.last_checked_event_id").to_sql + + agents_to_events = {} + Agent.connection.select_rows(sql).each do |receiver_agent_id, source_agent_id, event_id| + agents_to_events[receiver_agent_id] ||= [] + agents_to_events[receiver_agent_id] << event_id + end + + event_ids = agents_to_events.values.flatten.uniq.compact + + Agent.where(:id => agents_to_events.keys).each do |agent| + agent.update_attribute :last_checked_event_id, event_ids.max + Agent.async_receive(agent.id, agents_to_events[agent.id].uniq) + end + + { + :agent_count => agents_to_events.keys.length, + :event_count => event_ids.length + } + end + + # Given an Agent id and an array of Event ids, load the Agent, call #receive on it with the Event objects, and then + # save it with an updated _last_receive_at_ timestamp. + # + # This method is tagged with _handle_asynchronously_ and will be delayed and run with delayed_job. It accepts Agent + # and Event ids instead of a literal ActiveRecord models because it is preferable to serialize delayed_jobs with ids. + def async_receive(agent_id, event_ids) + agent = Agent.find(agent_id) + agent.receive(Event.where(:id => event_ids)) + agent.last_receive_at = Time.now + agent.save! + end + handle_asynchronously :async_receive + + def run_schedule(schedule) + types = where(:schedule => schedule).group(:type).pluck(:type) + types.each do |type| + type.constantize.bulk_check(schedule) + end + end + + # You can override this to define a custom bulk_check for your type of Agent. + def bulk_check(schedule) + raise "Call #bulk_check on the appropriate subclass of Agent" if self == Agent + where(:schedule => schedule).pluck("agents.id").each do |agent_id| + async_check(agent_id) + end + end + + # Given an Agent id, load the Agent, call #check on it, and then save it with an updated _last_check_at_ timestamp. + # + # This method is tagged with _handle_asynchronously_ and will be delayed and run with delayed_job. It accepts an Agent + # id instead of a literal Agent because it is preferable to serialize delayed_jobs with ids. + def async_check(agent_id) + agent = Agent.find(agent_id) + agent.check + agent.last_check_at = Time.now + agent.save! + end + handle_asynchronously :async_check end end diff --git a/spec/models/agent_spec.rb b/spec/models/agent_spec.rb index 36906d34..ebd8a5b6 100644 --- a/spec/models/agent_spec.rb +++ b/spec/models/agent_spec.rb @@ -8,9 +8,11 @@ describe Agent do end it "runs agents with the given schedule" do - mock.any_instance_of(Agents::WeatherAgent).async_check.twice - mock.any_instance_of(Agents::WebsiteAgent).async_check.once + weather_agent_ids = [agents(:bob_weather_agent), agents(:jane_weather_agent)].map(&:id) + stub(Agents::WeatherAgent).async_check(anything) {|agent_id| weather_agent_ids.delete(agent_id) } + stub(Agents::WebsiteAgent).async_check(agents(:bob_website_agent).id) Agent.run_schedule("midnight") + weather_agent_ids.should be_empty end it "groups agents by type" do @@ -20,7 +22,7 @@ describe Agent do end it "only runs agents with the given schedule" do - do_not_allow.any_instance_of(Agents::WebsiteAgent).async_check + do_not_allow(Agents::WebsiteAgent).async_check Agent.run_schedule("blah") end end @@ -116,19 +118,21 @@ describe Agent do end end - describe "#async_check" do - it "records last_check_at and calls check" do + describe ".async_check" do + it "records last_check_at and calls check on the given Agent" do @checker = Agents::SomethingSource.new(:name => "something") @checker.user = users(:bob) @checker.save! - @checker.options[:new] = true - mock(@checker).check.once + mock(@checker).check.once { + @checker.options[:new] = true + } + + mock(Agent).find(@checker.id) { @checker } @checker.last_check_at.should be_nil - @checker.async_check - @checker.last_check_at.should be_within(2).of(Time.now) - + Agents::SomethingSource.async_check(@checker.id) + @checker.reload.last_check_at.should be_within(2).of(Time.now) @checker.reload.options[:new].should be_true # Show that we save options end end @@ -141,13 +145,13 @@ describe Agent do it "should use available events" do mock.any_instance_of(Agents::TriggerAgent).receive(anything).once - agents(:bob_weather_agent).async_check + Agent.async_check(agents(:bob_weather_agent).id) Agent.receive! end it "should track when events have been seen and not see them again" do mock.any_instance_of(Agents::TriggerAgent).receive(anything).once - agents(:bob_weather_agent).async_check + Agent.async_check(agents(:bob_weather_agent).id) Agent.receive! Agent.receive! end @@ -161,8 +165,8 @@ describe Agent do mock.any_instance_of(Agents::TriggerAgent).receive(anything).twice { |events| events.map(&:user).map(&:username).uniq.length.should == 1 } - agents(:bob_weather_agent).async_check - agents(:jane_weather_agent).async_check + Agent.async_check(agents(:bob_weather_agent).id) + Agent.async_check(agents(:jane_weather_agent).id) Agent.receive! end end diff --git a/spec/models/agents/digest_email_agent_spec.rb b/spec/models/agents/digest_email_agent_spec.rb index 6be2693b..94a0a57b 100644 --- a/spec/models/agents/digest_email_agent_spec.rb +++ b/spec/models/agents/digest_email_agent_spec.rb @@ -23,20 +23,20 @@ describe Agents::DigestEmailAgent do event2.payload = "Something else you should know about" event2.save! - @checker.async_receive([event1.id, event2.id]) + Agents::DigestEmailAgent.async_receive(@checker.id, [event1.id, event2.id]) @checker.reload.memory[:queue].should == ["Something you should know about", "Something else you should know about"] end end describe "#check" do it "should send an email" do - @checker.async_check + Agents::DigestEmailAgent.async_check(@checker.id) ActionMailer::Base.deliveries.should == [] @checker.memory[:queue] = ["Something you should know about", { :title => "Foo", :url => "http://google.com", :bar => 2 }, { "message" => "hi", :woah => "there" }] @checker.save! - @checker.async_check + Agents::DigestEmailAgent.async_check(@checker.id) ActionMailer::Base.deliveries.last.to.should == ["bob@example.com"] ActionMailer::Base.deliveries.last.subject.should == "something interesting" get_message_part(ActionMailer::Base.deliveries.last, /plain/).strip.should == "Something you should know about\n\nFoo (bar: 2 and url: http://google.com)\n\nhi (woah: there)" diff --git a/spec/models/agents/trigger_agent_spec.rb b/spec/models/agents/trigger_agent_spec.rb index aeb61859..661e9553 100644 --- a/spec/models/agents/trigger_agent_spec.rb +++ b/spec/models/agents/trigger_agent_spec.rb @@ -50,7 +50,7 @@ describe Agents::TriggerAgent do @event.save! @checker.should_not be_working # no events have ever been received - @checker.async_receive([@event.id]) + Agents::TriggerAgent.async_receive(@checker.id, [@event.id]) @checker.reload.should be_working # no events have ever been received three_days_from_now = 3.days.from_now stub(Time).now { three_days_from_now }