mirror of
https://github.com/Fishwaldo/huginn.git
synced 2025-03-27 17:21:49 +00:00
196 lines
4.8 KiB
Ruby
196 lines
4.8 KiB
Ruby
require 'rufus/scheduler'
|
|
|
|
class Rufus::Scheduler
|
|
SCHEDULER_AGENT_TAG = Agents::SchedulerAgent.name
|
|
|
|
class Job
|
|
# Store an ID of SchedulerAgent in this job.
|
|
def scheduler_agent_id=(id)
|
|
self[:scheduler_agent_id] = id
|
|
end
|
|
|
|
# Extract an ID of SchedulerAgent if any.
|
|
def scheduler_agent_id
|
|
self[:scheduler_agent_id]
|
|
end
|
|
|
|
# Return a SchedulerAgent tied to this job. Return nil if it is
|
|
# not found or disabled.
|
|
def scheduler_agent
|
|
agent_id = scheduler_agent_id or return nil
|
|
|
|
Agent.of_type(Agents::SchedulerAgent).active.find_by(id: agent_id)
|
|
end
|
|
end
|
|
|
|
# Get all jobs tied to any SchedulerAgent
|
|
def scheduler_agent_jobs
|
|
jobs(tag: SCHEDULER_AGENT_TAG)
|
|
end
|
|
|
|
# Get a job tied to a given SchedulerAgent
|
|
def scheduler_agent_job(agent)
|
|
scheduler_agent_jobs.find { |job|
|
|
job.scheduler_agent_id == agent.id
|
|
}
|
|
end
|
|
|
|
# Schedule or reschedule a job for a given SchedulerAgent and return
|
|
# the running job. Return nil if unscheduled.
|
|
def schedule_scheduler_agent(agent)
|
|
job = scheduler_agent_job(agent)
|
|
|
|
if agent.unavailable?
|
|
if job
|
|
puts "Unscheduling SchedulerAgent##{agent.id} (disabled)"
|
|
job.unschedule
|
|
end
|
|
nil
|
|
else
|
|
if job
|
|
return job if agent.memory['scheduled_at'] == job.scheduled_at.to_i
|
|
puts "Rescheduling SchedulerAgent##{agent.id}"
|
|
job.unschedule
|
|
else
|
|
puts "Scheduling SchedulerAgent##{agent.id}"
|
|
end
|
|
|
|
agent_id = agent.id
|
|
|
|
job = schedule_cron agent.options['schedule'], tag: SCHEDULER_AGENT_TAG do |job|
|
|
job.scheduler_agent_id = agent_id
|
|
|
|
if scheduler_agent = job.scheduler_agent
|
|
scheduler_agent.check!
|
|
else
|
|
puts "Unscheduling SchedulerAgent##{job.scheduler_agent_id} (disabled or deleted)"
|
|
job.unschedule
|
|
end
|
|
end
|
|
# Make sure the job is associated with a SchedulerAgent before
|
|
# it is triggered.
|
|
job.scheduler_agent_id = agent_id
|
|
|
|
agent.memory['scheduled_at'] = job.scheduled_at.to_i
|
|
agent.save
|
|
|
|
job
|
|
end
|
|
end
|
|
|
|
# Schedule or reschedule jobs for all SchedulerAgents and unschedule
|
|
# orphaned jobs if any.
|
|
def schedule_scheduler_agents
|
|
scheduled_jobs = Agent.of_type(Agents::SchedulerAgent).map { |scheduler_agent|
|
|
schedule_scheduler_agent(scheduler_agent)
|
|
}.compact
|
|
|
|
(scheduler_agent_jobs - scheduled_jobs).each { |job|
|
|
puts "Unscheduling SchedulerAgent##{job.scheduler_agent_id} (orphaned)"
|
|
job.unschedule
|
|
}
|
|
end
|
|
end
|
|
|
|
class HuginnScheduler
|
|
FAILED_JOBS_TO_KEEP = 100
|
|
attr_accessor :mutex
|
|
|
|
def initialize(options = {})
|
|
@rufus_scheduler = Rufus::Scheduler.new(options)
|
|
self.mutex = Mutex.new
|
|
end
|
|
|
|
def stop
|
|
@rufus_scheduler.stop
|
|
end
|
|
|
|
def run!
|
|
tzinfo_friendly_timezone = ActiveSupport::TimeZone::MAPPING[ENV['TIMEZONE'].presence || "Pacific Time (US & Canada)"]
|
|
|
|
# Schedule event propagation.
|
|
@rufus_scheduler.every '1m' do
|
|
propagate!
|
|
end
|
|
|
|
# Schedule event cleanup.
|
|
@rufus_scheduler.cron "0 0 * * * " + tzinfo_friendly_timezone do
|
|
cleanup_expired_events!
|
|
end
|
|
|
|
# Schedule failed job cleanup.
|
|
@rufus_scheduler.every '1h' do
|
|
cleanup_failed_jobs!
|
|
end
|
|
|
|
# Schedule repeating events.
|
|
%w[1m 2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule|
|
|
@rufus_scheduler.every schedule do
|
|
run_schedule "every_#{schedule}"
|
|
end
|
|
end
|
|
|
|
# Schedule events for specific times.
|
|
24.times do |hour|
|
|
@rufus_scheduler.cron "0 #{hour} * * * " + tzinfo_friendly_timezone do
|
|
run_schedule hour_to_schedule_name(hour)
|
|
end
|
|
end
|
|
|
|
# Schedule Scheduler Agents
|
|
|
|
@rufus_scheduler.every '1m' do
|
|
@rufus_scheduler.schedule_scheduler_agents
|
|
end
|
|
|
|
@rufus_scheduler.join
|
|
end
|
|
|
|
private
|
|
def run_schedule(time)
|
|
with_mutex do
|
|
puts "Queuing schedule for #{time}"
|
|
Agent.delay.run_schedule(time)
|
|
end
|
|
end
|
|
|
|
def propagate!
|
|
with_mutex do
|
|
puts "Queuing event propagation"
|
|
Agent.delay.receive!
|
|
end
|
|
end
|
|
|
|
def cleanup_expired_events!
|
|
with_mutex do
|
|
puts "Running event cleanup"
|
|
Event.delay.cleanup_expired!
|
|
end
|
|
end
|
|
|
|
def cleanup_failed_jobs!
|
|
num_to_keep = (ENV['FAILED_JOBS_TO_KEEP'].presence || FAILED_JOBS_TO_KEEP).to_i
|
|
first_to_delete = Delayed::Job.where.not(failed_at: nil).order("failed_at DESC").offset(num_to_keep).limit(1).pluck(:failed_at).first
|
|
Delayed::Job.where(["failed_at <= ?", first_to_delete]).delete_all if first_to_delete.present?
|
|
end
|
|
|
|
def hour_to_schedule_name(hour)
|
|
if hour == 0
|
|
"midnight"
|
|
elsif hour < 12
|
|
"#{hour}am"
|
|
elsif hour == 12
|
|
"noon"
|
|
else
|
|
"#{hour - 12}pm"
|
|
end
|
|
end
|
|
|
|
def with_mutex
|
|
ActiveRecord::Base.connection_pool.with_connection do
|
|
mutex.synchronize do
|
|
yield
|
|
end
|
|
end
|
|
end
|
|
end
|