From e9fa1f2032fcedd4b7f0cc6ca0109752d0804d03 Mon Sep 17 00:00:00 2001 From: Akinori MUSHA Date: Fri, 22 Aug 2014 00:17:52 +0900 Subject: [PATCH] Add SchedulerAgent, which periodically runs other agents. --- app/models/agents/scheduler_agent.rb | 89 ++++++++++++++++++++++ app/views/agents/_form.html.erb | 14 ++++ app/views/agents/_table.html.erb | 2 +- app/views/agents/show.html.erb | 2 +- lib/huginn_scheduler.rb | 88 +++++++++++++++++++++ spec/models/agents/scheduler_agent_spec.rb | 54 +++++++++++++ 6 files changed, 247 insertions(+), 2 deletions(-) create mode 100644 app/models/agents/scheduler_agent.rb create mode 100644 spec/models/agents/scheduler_agent_spec.rb diff --git a/app/models/agents/scheduler_agent.rb b/app/models/agents/scheduler_agent.rb new file mode 100644 index 00000000..929fa376 --- /dev/null +++ b/app/models/agents/scheduler_agent.rb @@ -0,0 +1,89 @@ +require 'rufus-scheduler' + +module Agents + class SchedulerAgent < Agent + cannot_be_scheduled! + cannot_receive_events! + cannot_create_events! + can_run_other_agents! + + description <<-MD + This agent periodically triggers a run of each target Agent according to a user-defined schedule. + + Select target Agents and set a cron-style schedule to `schedule`. + In the traditional cron format, a schedule part consists of these five columns: `minute hour day-of-month month day-of-week`. + + * `0 22 * * 1-5`: every day of the week at 22:00 (10pm) + + In this variant, you can also specify seconds: + + * `30 0 22 * * 1-5`: every day of the week at 22:00:30 + + And timezones: + + * `0 22 * * 1-5 Europe/Paris`: every day of the week when it's 22:00 in Paris + + * `0 22 * * 1-5 Etc/GMT+2`: every day of the week when it's 22:00 in GMT+2 + + There's also a way to specify "last day of month": + + * `0 22 L * *`: every month on the last day at 22:00 + + And "monthdays": + + * `0 22 * * sun#1,sun#2`: every first and second sunday of the month, at 22:00 + + * `0 22 * * sun#L1`: every last sunday of the month, at 22:00 + MD + + def default_options + { 'schedule' => '0 * * * *' } + end + + def working? + true + end + + def check! + targets.active.each { |target| + log "Agent run queued for '#{target.name}'" + Agent.async_check(target.id) + } + end + + def validate_options + if (spec = options['schedule']).present? + begin + Rufus::Scheduler::CronLine.new(spec) + rescue ArgumentError + errors.add(:base, "invalid schedule") + end + else + errors.add(:base, "schedule is missing") + end + end + + before_save do + self.memory.delete('scheduled_at') if self.options_changed? + end + + def scheduler_tag + '%s#%d' % [self.class.name, id] + end + + class << self + def scheduler_tag_to_id(tag) + case tag + when /\A#{Regexp.quote(self.name)}\#(\d+)\z/o + $1.to_i + end + end + + def from_scheduler_tag(tag) + if id = scheduler_tag_to_id + find_by(id: id) + end + end + end + end +end diff --git a/app/views/agents/_form.html.erb b/app/views/agents/_form.html.erb index 20b73a1e..03ebc430 100644 --- a/app/views/agents/_form.html.erb +++ b/app/views/agents/_form.html.erb @@ -40,6 +40,20 @@ +
+
+
+ <%= f.label :runners %> + + <% eventRunners = current_user.agents.select(&:can_run_other_agents?) %> + <%= f.select(:runner_ids, + options_for_select(eventRunners.map {|s| [s.name, s.id] }, + @agent.runner_ids), + {}, { multiple: true, size: 5, class: 'select2 form-control' }) %> +
+
+
+
diff --git a/app/views/agents/_table.html.erb b/app/views/agents/_table.html.erb index 1f6c72f4..5bdea3f5 100644 --- a/app/views/agents/_table.html.erb +++ b/app/views/agents/_table.html.erb @@ -25,7 +25,7 @@ <% if agent.can_be_scheduled? %> - <%= agent.schedule.to_s.humanize.titleize %> + <%= agent_schedule(agent, ',
') %> <% else %> <% end %> diff --git a/app/views/agents/show.html.erb b/app/views/agents/show.html.erb index c14ce943..682cf0f5 100644 --- a/app/views/agents/show.html.erb +++ b/app/views/agents/show.html.erb @@ -72,7 +72,7 @@ <% if @agent.can_be_scheduled? %>

Schedule: - <%= (@agent.schedule || "n/a").humanize.titleize %> + <%= agent_schedule(@agent) %>

diff --git a/lib/huginn_scheduler.rb b/lib/huginn_scheduler.rb index fa72b7ea..f1e136c0 100644 --- a/lib/huginn_scheduler.rb +++ b/lib/huginn_scheduler.rb @@ -1,5 +1,87 @@ require 'rufus/scheduler' +class Rufus::Scheduler + SCHEDULER_AGENT_TAG = Agents::SchedulerAgent.name + + class Job + # Extract an ID of SchedulerAgent if a matching tag is found. + def scheduler_agent_id + tags.each { |tag| + if agent_id = Agents::SchedulerAgent.scheduler_tag_to_id(tag) + return agent_id + end + } + end + + # Return a SchedulerAgent tied to this job. Return nil if it is + # not found or disabled. + def scheduler_agent + agent_id = scheduler_agent_id or return nil + + Agent.of_type(Agents::SchedulerAgent).active.find_by(id: agent_id) + end + end + + # Get all jobs tied to any SchedulerAgent + def scheduler_agent_jobs + jobs(tag: SCHEDULER_AGENT_TAG) + end + + # Get a job tied to a given SchedulerAgent + def scheduler_agent_job(agent) + jobs(tags: [SCHEDULER_AGENT_TAG, agent.scheduler_tag]).first + end + + # Schedule or reschedule a job for a given SchedulerAgent and return + # the running job. Return nil if unscheduled. + def schedule_scheduler_agent(agent) + job = scheduler_agent_job(agent) + + if agent.disabled? + if job + puts "Unscheduling SchedulerAgent##{agent.id} (disabled)" + job.unschedule + end + nil + else + if job + return job if agent.memory['scheduled_at'] == job.scheduled_at.to_i + puts "Rescheduling SchedulerAgent##{agent.id}" + job.unschedule + else + puts "Scheduling SchedulerAgent##{agent.id}" + end + + job = schedule_cron agent.options['schedule'], tags: [SCHEDULER_AGENT_TAG, agent.scheduler_tag] do |job| + if scheduler_agent = job.scheduler_agent + scheduler_agent.check! + else + puts "Unscheduling SchedulerAgent##{job.scheduler_agent_id} (disabled or deleted)" + job.unschedule + end + end + + agent.memory['scheduled_at'] = job.scheduled_at.to_i + agent.save + + job + end + end + + # Schedule or reschedule jobs for all SchedulerAgents and unschedule + # orphaned jobs if any. + def schedule_scheduler_agents + scheduled_jobs = Agent.of_type(Agents::SchedulerAgent).map { |scheduler_agent| + schedule_scheduler_agent(scheduler_agent) + }.compact + + (scheduler_agent_jobs - scheduled_jobs).each { |job| + puts "Unscheduling SchedulerAgent##{job.scheduler_agent_id} (orphaned)" + job.unschedule + } + end +end + class HuginnScheduler attr_accessor :mutex @@ -82,6 +164,12 @@ class HuginnScheduler end end + # Schedule Scheduler Agents + + @rufus_scheduler.every '1m' do + @rufus_scheduler.schedule_scheduler_agents + end + @rufus_scheduler.join end end diff --git a/spec/models/agents/scheduler_agent_spec.rb b/spec/models/agents/scheduler_agent_spec.rb new file mode 100644 index 00000000..96f662d1 --- /dev/null +++ b/spec/models/agents/scheduler_agent_spec.rb @@ -0,0 +1,54 @@ +require 'spec_helper' + +describe Agents::SchedulerAgent do + before do + @agent = Agents::SchedulerAgent.new(name: 'Example', options: { 'schedule' => '0 * * * *' }) + @agent.user = users(:bob) + end + + describe "validation" do + it "should validate schedule" do + @agent.should be_valid + + @agent.options.delete('schedule') + @agent.should_not be_valid + + @agent.options['schedule'] = nil + @agent.should_not be_valid + + @agent.options['schedule'] = '' + @agent.should_not be_valid + + @agent.options['schedule'] = '0' + @agent.should_not be_valid + + @agent.options['schedule'] = '*/15 * * * * * *' + @agent.should_not be_valid + + @agent.options['schedule'] = '*/15 * * * * *' + @agent.should be_valid + + @agent.options['schedule'] = '*/1 * * * *' + @agent.should be_valid + + @agent.options['schedule'] = '*/1 * * *' + @agent.should_not be_valid + end + end + + describe "check!" do + it "should run targets" do + targets = [agents(:bob_website_agent), agents(:bob_weather_agent)] + @agent.targets = targets + @agent.save! + + target_ids = targets.map(&:id) + stub(Agent).async_check(anything) { |id| + target_ids.delete(id) + } + + @agent.check! + target_ids.should be_empty + end + end +end