Add SchedulerAgent, which periodically runs other agents.

This commit is contained in:
Akinori MUSHA 2014-08-22 00:17:52 +09:00
parent 0ae0f1a302
commit e9fa1f2032
6 changed files with 247 additions and 2 deletions

View file

@ -0,0 +1,89 @@
require 'rufus-scheduler'
module Agents
class SchedulerAgent < Agent
cannot_be_scheduled!
cannot_receive_events!
cannot_create_events!
can_run_other_agents!
description <<-MD
This agent periodically triggers a run of each target Agent according to a user-defined schedule.
Select target Agents and set a cron-style schedule to `schedule`.
In the traditional cron format, a schedule part consists of these five columns: `minute hour day-of-month month day-of-week`.
* `0 22 * * 1-5`: every day of the week at 22:00 (10pm)
In this variant, you can also specify seconds:
* `30 0 22 * * 1-5`: every day of the week at 22:00:30
And timezones:
* `0 22 * * 1-5 Europe/Paris`: every day of the week when it's 22:00 in Paris
* `0 22 * * 1-5 Etc/GMT+2`: every day of the week when it's 22:00 in GMT+2
There's also a way to specify "last day of month":
* `0 22 L * *`: every month on the last day at 22:00
And "monthdays":
* `0 22 * * sun#1,sun#2`: every first and second sunday of the month, at 22:00
* `0 22 * * sun#L1`: every last sunday of the month, at 22:00
MD
def default_options
{ 'schedule' => '0 * * * *' }
end
def working?
true
end
def check!
targets.active.each { |target|
log "Agent run queued for '#{target.name}'"
Agent.async_check(target.id)
}
end
def validate_options
if (spec = options['schedule']).present?
begin
Rufus::Scheduler::CronLine.new(spec)
rescue ArgumentError
errors.add(:base, "invalid schedule")
end
else
errors.add(:base, "schedule is missing")
end
end
before_save do
self.memory.delete('scheduled_at') if self.options_changed?
end
def scheduler_tag
'%s#%d' % [self.class.name, id]
end
class << self
def scheduler_tag_to_id(tag)
case tag
when /\A#{Regexp.quote(self.name)}\#(\d+)\z/o
$1.to_i
end
end
def from_scheduler_tag(tag)
if id = scheduler_tag_to_id
find_by(id: id)
end
end
end
end
end

View file

@ -40,6 +40,20 @@
</div>
</div>
<div class="schedule-region" data-can-be-scheduled="<%= @agent.can_be_scheduled? %>">
<div class="can-be-scheduled">
<div class="form-group">
<%= f.label :runners %>
<span class="glyphicon glyphicon-question-sign hover-help" data-content="Other than the system-defined schedule above, this agent may be run by user-defined ScheduleAgents."></span>
<% eventRunners = current_user.agents.select(&:can_run_other_agents?) %>
<%= f.select(:runner_ids,
options_for_select(eventRunners.map {|s| [s.name, s.id] },
@agent.runner_ids),
{}, { multiple: true, size: 5, class: 'select2 form-control' }) %>
</div>
</div>
</div>
<div class="chain-region" data-can-run-other-agents="<%= @agent.can_run_other_agents? %>">
<div class="can-run-other-agents">
<div class="form-group">

View file

@ -25,7 +25,7 @@
</td>
<td class='<%= "agent-disabled" if agent.disabled? %>'>
<% if agent.can_be_scheduled? %>
<%= agent.schedule.to_s.humanize.titleize %>
<%= agent_schedule(agent, ',<br/>') %>
<% else %>
<span class='not-applicable'></span>
<% end %>

View file

@ -72,7 +72,7 @@
<% if @agent.can_be_scheduled? %>
<p>
<b>Schedule:</b>
<%= (@agent.schedule || "n/a").humanize.titleize %>
<%= agent_schedule(@agent) %>
</p>
<p>

View file

@ -1,5 +1,87 @@
require 'rufus/scheduler'
class Rufus::Scheduler
SCHEDULER_AGENT_TAG = Agents::SchedulerAgent.name
class Job
# Extract an ID of SchedulerAgent if a matching tag is found.
def scheduler_agent_id
tags.each { |tag|
if agent_id = Agents::SchedulerAgent.scheduler_tag_to_id(tag)
return agent_id
end
}
end
# Return a SchedulerAgent tied to this job. Return nil if it is
# not found or disabled.
def scheduler_agent
agent_id = scheduler_agent_id or return nil
Agent.of_type(Agents::SchedulerAgent).active.find_by(id: agent_id)
end
end
# Get all jobs tied to any SchedulerAgent
def scheduler_agent_jobs
jobs(tag: SCHEDULER_AGENT_TAG)
end
# Get a job tied to a given SchedulerAgent
def scheduler_agent_job(agent)
jobs(tags: [SCHEDULER_AGENT_TAG, agent.scheduler_tag]).first
end
# Schedule or reschedule a job for a given SchedulerAgent and return
# the running job. Return nil if unscheduled.
def schedule_scheduler_agent(agent)
job = scheduler_agent_job(agent)
if agent.disabled?
if job
puts "Unscheduling SchedulerAgent##{agent.id} (disabled)"
job.unschedule
end
nil
else
if job
return job if agent.memory['scheduled_at'] == job.scheduled_at.to_i
puts "Rescheduling SchedulerAgent##{agent.id}"
job.unschedule
else
puts "Scheduling SchedulerAgent##{agent.id}"
end
job = schedule_cron agent.options['schedule'], tags: [SCHEDULER_AGENT_TAG, agent.scheduler_tag] do |job|
if scheduler_agent = job.scheduler_agent
scheduler_agent.check!
else
puts "Unscheduling SchedulerAgent##{job.scheduler_agent_id} (disabled or deleted)"
job.unschedule
end
end
agent.memory['scheduled_at'] = job.scheduled_at.to_i
agent.save
job
end
end
# Schedule or reschedule jobs for all SchedulerAgents and unschedule
# orphaned jobs if any.
def schedule_scheduler_agents
scheduled_jobs = Agent.of_type(Agents::SchedulerAgent).map { |scheduler_agent|
schedule_scheduler_agent(scheduler_agent)
}.compact
(scheduler_agent_jobs - scheduled_jobs).each { |job|
puts "Unscheduling SchedulerAgent##{job.scheduler_agent_id} (orphaned)"
job.unschedule
}
end
end
class HuginnScheduler
attr_accessor :mutex
@ -82,6 +164,12 @@ class HuginnScheduler
end
end
# Schedule Scheduler Agents
@rufus_scheduler.every '1m' do
@rufus_scheduler.schedule_scheduler_agents
end
@rufus_scheduler.join
end
end

View file

@ -0,0 +1,54 @@
require 'spec_helper'
describe Agents::SchedulerAgent do
before do
@agent = Agents::SchedulerAgent.new(name: 'Example', options: { 'schedule' => '0 * * * *' })
@agent.user = users(:bob)
end
describe "validation" do
it "should validate schedule" do
@agent.should be_valid
@agent.options.delete('schedule')
@agent.should_not be_valid
@agent.options['schedule'] = nil
@agent.should_not be_valid
@agent.options['schedule'] = ''
@agent.should_not be_valid
@agent.options['schedule'] = '0'
@agent.should_not be_valid
@agent.options['schedule'] = '*/15 * * * * * *'
@agent.should_not be_valid
@agent.options['schedule'] = '*/15 * * * * *'
@agent.should be_valid
@agent.options['schedule'] = '*/1 * * * *'
@agent.should be_valid
@agent.options['schedule'] = '*/1 * * *'
@agent.should_not be_valid
end
end
describe "check!" do
it "should run targets" do
targets = [agents(:bob_website_agent), agents(:bob_weather_agent)]
@agent.targets = targets
@agent.save!
target_ids = targets.map(&:id)
stub(Agent).async_check(anything) { |id|
target_ids.delete(id)
}
@agent.check!
target_ids.should be_empty
end
end
end