mirror of
https://github.com/Fishwaldo/huginn.git
synced 2025-03-15 19:31:26 +00:00
switch to class methods and ids to shorten the payload size of serialized delayed_jobs
This commit is contained in:
parent
15b03fc40c
commit
eead335705
5 changed files with 104 additions and 89 deletions
|
@ -9,8 +9,7 @@ class AgentsController < ApplicationController
|
|||
end
|
||||
|
||||
def run
|
||||
@agent = current_user.agents.find(params[:id])
|
||||
@agent.async_check
|
||||
Agent.async_check(current_user.agents.find(params[:id]).id)
|
||||
redirect_to agents_path, notice: "Agent run queued"
|
||||
end
|
||||
|
||||
|
|
|
@ -100,20 +100,6 @@ class Agent < ActiveRecord::Base
|
|||
@memoized_last_event_at ||= events.select(:created_at).first.try(:created_at)
|
||||
end
|
||||
|
||||
def async_check
|
||||
check
|
||||
self.last_check_at = Time.now
|
||||
save!
|
||||
end
|
||||
handle_asynchronously :async_check #, :priority => 10, :run_at => Proc.new { 5.minutes.from_now }
|
||||
|
||||
def async_receive(event_ids)
|
||||
receive(Event.where(:id => event_ids))
|
||||
self.last_receive_at = Time.now
|
||||
save!
|
||||
end
|
||||
handle_asynchronously :async_receive #, :priority => 10, :run_at => Proc.new { 5.minutes.from_now }
|
||||
|
||||
def default_schedule
|
||||
self.class.default_schedule
|
||||
end
|
||||
|
@ -135,67 +121,93 @@ class Agent < ActiveRecord::Base
|
|||
end
|
||||
|
||||
# Class Methods
|
||||
|
||||
def self.cannot_be_scheduled!
|
||||
@cannot_be_scheduled = true
|
||||
end
|
||||
|
||||
def self.cannot_be_scheduled?
|
||||
!!@cannot_be_scheduled
|
||||
end
|
||||
|
||||
def self.default_schedule(schedule = nil)
|
||||
@default_schedule = schedule unless schedule.nil?
|
||||
@default_schedule
|
||||
end
|
||||
|
||||
def self.cannot_receive_events!
|
||||
@cannot_receive_events = true
|
||||
end
|
||||
|
||||
def self.cannot_receive_events?
|
||||
!!@cannot_receive_events
|
||||
end
|
||||
|
||||
def self.receive!
|
||||
sql = Agent.
|
||||
select("agents.id AS receiver_agent_id, sources.id AS source_agent_id, events.id AS event_id").
|
||||
joins("JOIN links ON (links.receiver_id = agents.id)").
|
||||
joins("JOIN agents AS sources ON (links.source_id = sources.id)").
|
||||
joins("JOIN events ON (events.agent_id = sources.id)").
|
||||
where("agents.last_checked_event_id IS NULL OR events.id > agents.last_checked_event_id").to_sql
|
||||
|
||||
agents_to_events = {}
|
||||
Agent.connection.select_rows(sql).each do |receiver_agent_id, source_agent_id, event_id|
|
||||
agents_to_events[receiver_agent_id] ||= []
|
||||
agents_to_events[receiver_agent_id] << event_id
|
||||
class << self
|
||||
def cannot_be_scheduled!
|
||||
@cannot_be_scheduled = true
|
||||
end
|
||||
|
||||
event_ids = agents_to_events.values.flatten.uniq.compact
|
||||
|
||||
Agent.where(:id => agents_to_events.keys).each do |agent|
|
||||
agent.update_attribute :last_checked_event_id, event_ids.max
|
||||
agent.async_receive(agents_to_events[agent.id].uniq)
|
||||
def cannot_be_scheduled?
|
||||
!!@cannot_be_scheduled
|
||||
end
|
||||
|
||||
{
|
||||
:agent_count => agents_to_events.keys.length,
|
||||
:event_count => event_ids.length
|
||||
}
|
||||
end
|
||||
|
||||
def self.run_schedule(schedule)
|
||||
types = where(:schedule => schedule).group(:type).pluck(:type)
|
||||
types.each do |type|
|
||||
type.constantize.bulk_check(schedule)
|
||||
def default_schedule(schedule = nil)
|
||||
@default_schedule = schedule unless schedule.nil?
|
||||
@default_schedule
|
||||
end
|
||||
end
|
||||
|
||||
# You can override this to define a custom bulk_check for your type of Agent.
|
||||
def self.bulk_check(schedule)
|
||||
raise "Call #bulk_check on the appropriate subclass of Agent" if self == Agent
|
||||
where(:schedule => schedule).find_each do |agent|
|
||||
agent.async_check
|
||||
def cannot_receive_events!
|
||||
@cannot_receive_events = true
|
||||
end
|
||||
|
||||
def cannot_receive_events?
|
||||
!!@cannot_receive_events
|
||||
end
|
||||
|
||||
def receive!
|
||||
sql = Agent.
|
||||
select("agents.id AS receiver_agent_id, sources.id AS source_agent_id, events.id AS event_id").
|
||||
joins("JOIN links ON (links.receiver_id = agents.id)").
|
||||
joins("JOIN agents AS sources ON (links.source_id = sources.id)").
|
||||
joins("JOIN events ON (events.agent_id = sources.id)").
|
||||
where("agents.last_checked_event_id IS NULL OR events.id > agents.last_checked_event_id").to_sql
|
||||
|
||||
agents_to_events = {}
|
||||
Agent.connection.select_rows(sql).each do |receiver_agent_id, source_agent_id, event_id|
|
||||
agents_to_events[receiver_agent_id] ||= []
|
||||
agents_to_events[receiver_agent_id] << event_id
|
||||
end
|
||||
|
||||
event_ids = agents_to_events.values.flatten.uniq.compact
|
||||
|
||||
Agent.where(:id => agents_to_events.keys).each do |agent|
|
||||
agent.update_attribute :last_checked_event_id, event_ids.max
|
||||
Agent.async_receive(agent.id, agents_to_events[agent.id].uniq)
|
||||
end
|
||||
|
||||
{
|
||||
:agent_count => agents_to_events.keys.length,
|
||||
:event_count => event_ids.length
|
||||
}
|
||||
end
|
||||
|
||||
# Given an Agent id and an array of Event ids, load the Agent, call #receive on it with the Event objects, and then
|
||||
# save it with an updated _last_receive_at_ timestamp.
|
||||
#
|
||||
# This method is tagged with _handle_asynchronously_ and will be delayed and run with delayed_job. It accepts Agent
|
||||
# and Event ids instead of a literal ActiveRecord models because it is preferable to serialize delayed_jobs with ids.
|
||||
def async_receive(agent_id, event_ids)
|
||||
agent = Agent.find(agent_id)
|
||||
agent.receive(Event.where(:id => event_ids))
|
||||
agent.last_receive_at = Time.now
|
||||
agent.save!
|
||||
end
|
||||
handle_asynchronously :async_receive
|
||||
|
||||
def run_schedule(schedule)
|
||||
types = where(:schedule => schedule).group(:type).pluck(:type)
|
||||
types.each do |type|
|
||||
type.constantize.bulk_check(schedule)
|
||||
end
|
||||
end
|
||||
|
||||
# You can override this to define a custom bulk_check for your type of Agent.
|
||||
def bulk_check(schedule)
|
||||
raise "Call #bulk_check on the appropriate subclass of Agent" if self == Agent
|
||||
where(:schedule => schedule).pluck("agents.id").each do |agent_id|
|
||||
async_check(agent_id)
|
||||
end
|
||||
end
|
||||
|
||||
# Given an Agent id, load the Agent, call #check on it, and then save it with an updated _last_check_at_ timestamp.
|
||||
#
|
||||
# This method is tagged with _handle_asynchronously_ and will be delayed and run with delayed_job. It accepts an Agent
|
||||
# id instead of a literal Agent because it is preferable to serialize delayed_jobs with ids.
|
||||
def async_check(agent_id)
|
||||
agent = Agent.find(agent_id)
|
||||
agent.check
|
||||
agent.last_check_at = Time.now
|
||||
agent.save!
|
||||
end
|
||||
handle_asynchronously :async_check
|
||||
end
|
||||
end
|
||||
|
|
|
@ -8,9 +8,11 @@ describe Agent do
|
|||
end
|
||||
|
||||
it "runs agents with the given schedule" do
|
||||
mock.any_instance_of(Agents::WeatherAgent).async_check.twice
|
||||
mock.any_instance_of(Agents::WebsiteAgent).async_check.once
|
||||
weather_agent_ids = [agents(:bob_weather_agent), agents(:jane_weather_agent)].map(&:id)
|
||||
stub(Agents::WeatherAgent).async_check(anything) {|agent_id| weather_agent_ids.delete(agent_id) }
|
||||
stub(Agents::WebsiteAgent).async_check(agents(:bob_website_agent).id)
|
||||
Agent.run_schedule("midnight")
|
||||
weather_agent_ids.should be_empty
|
||||
end
|
||||
|
||||
it "groups agents by type" do
|
||||
|
@ -20,7 +22,7 @@ describe Agent do
|
|||
end
|
||||
|
||||
it "only runs agents with the given schedule" do
|
||||
do_not_allow.any_instance_of(Agents::WebsiteAgent).async_check
|
||||
do_not_allow(Agents::WebsiteAgent).async_check
|
||||
Agent.run_schedule("blah")
|
||||
end
|
||||
end
|
||||
|
@ -116,19 +118,21 @@ describe Agent do
|
|||
end
|
||||
end
|
||||
|
||||
describe "#async_check" do
|
||||
it "records last_check_at and calls check" do
|
||||
describe ".async_check" do
|
||||
it "records last_check_at and calls check on the given Agent" do
|
||||
@checker = Agents::SomethingSource.new(:name => "something")
|
||||
@checker.user = users(:bob)
|
||||
@checker.save!
|
||||
|
||||
@checker.options[:new] = true
|
||||
mock(@checker).check.once
|
||||
mock(@checker).check.once {
|
||||
@checker.options[:new] = true
|
||||
}
|
||||
|
||||
mock(Agent).find(@checker.id) { @checker }
|
||||
|
||||
@checker.last_check_at.should be_nil
|
||||
@checker.async_check
|
||||
@checker.last_check_at.should be_within(2).of(Time.now)
|
||||
|
||||
Agents::SomethingSource.async_check(@checker.id)
|
||||
@checker.reload.last_check_at.should be_within(2).of(Time.now)
|
||||
@checker.reload.options[:new].should be_true # Show that we save options
|
||||
end
|
||||
end
|
||||
|
@ -141,13 +145,13 @@ describe Agent do
|
|||
|
||||
it "should use available events" do
|
||||
mock.any_instance_of(Agents::TriggerAgent).receive(anything).once
|
||||
agents(:bob_weather_agent).async_check
|
||||
Agent.async_check(agents(:bob_weather_agent).id)
|
||||
Agent.receive!
|
||||
end
|
||||
|
||||
it "should track when events have been seen and not see them again" do
|
||||
mock.any_instance_of(Agents::TriggerAgent).receive(anything).once
|
||||
agents(:bob_weather_agent).async_check
|
||||
Agent.async_check(agents(:bob_weather_agent).id)
|
||||
Agent.receive!
|
||||
Agent.receive!
|
||||
end
|
||||
|
@ -161,8 +165,8 @@ describe Agent do
|
|||
mock.any_instance_of(Agents::TriggerAgent).receive(anything).twice { |events|
|
||||
events.map(&:user).map(&:username).uniq.length.should == 1
|
||||
}
|
||||
agents(:bob_weather_agent).async_check
|
||||
agents(:jane_weather_agent).async_check
|
||||
Agent.async_check(agents(:bob_weather_agent).id)
|
||||
Agent.async_check(agents(:jane_weather_agent).id)
|
||||
Agent.receive!
|
||||
end
|
||||
end
|
||||
|
|
|
@ -23,20 +23,20 @@ describe Agents::DigestEmailAgent do
|
|||
event2.payload = "Something else you should know about"
|
||||
event2.save!
|
||||
|
||||
@checker.async_receive([event1.id, event2.id])
|
||||
Agents::DigestEmailAgent.async_receive(@checker.id, [event1.id, event2.id])
|
||||
@checker.reload.memory[:queue].should == ["Something you should know about", "Something else you should know about"]
|
||||
end
|
||||
end
|
||||
|
||||
describe "#check" do
|
||||
it "should send an email" do
|
||||
@checker.async_check
|
||||
Agents::DigestEmailAgent.async_check(@checker.id)
|
||||
ActionMailer::Base.deliveries.should == []
|
||||
|
||||
@checker.memory[:queue] = ["Something you should know about", { :title => "Foo", :url => "http://google.com", :bar => 2 }, { "message" => "hi", :woah => "there" }]
|
||||
@checker.save!
|
||||
|
||||
@checker.async_check
|
||||
Agents::DigestEmailAgent.async_check(@checker.id)
|
||||
ActionMailer::Base.deliveries.last.to.should == ["bob@example.com"]
|
||||
ActionMailer::Base.deliveries.last.subject.should == "something interesting"
|
||||
get_message_part(ActionMailer::Base.deliveries.last, /plain/).strip.should == "Something you should know about\n\nFoo (bar: 2 and url: http://google.com)\n\nhi (woah: there)"
|
||||
|
|
|
@ -50,7 +50,7 @@ describe Agents::TriggerAgent do
|
|||
@event.save!
|
||||
|
||||
@checker.should_not be_working # no events have ever been received
|
||||
@checker.async_receive([@event.id])
|
||||
Agents::TriggerAgent.async_receive(@checker.id, [@event.id])
|
||||
@checker.reload.should be_working # no events have ever been received
|
||||
three_days_from_now = 3.days.from_now
|
||||
stub(Time).now { three_days_from_now }
|
||||
|
|
Loading…
Add table
Reference in a new issue