mirror of
https://github.com/Fishwaldo/go-taskmanager.git
synced 2025-03-15 11:21:28 +00:00
249 lines
6.2 KiB
Go
249 lines
6.2 KiB
Go
package taskmanager
|
|
|
|
import (
|
|
"context"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/sasha-s/go-deadlock"
|
|
"github.com/go-logr/logr"
|
|
"github.com/Fishwaldo/go-taskmanager/joberrors"
|
|
schedmetrics "github.com/Fishwaldo/go-taskmanager/metrics"
|
|
"github.com/armon/go-metrics"
|
|
)
|
|
|
|
// Scheduler manage one or more Schedule creating them using common options, enforcing unique IDs, and supply methods to
|
|
// Start / Stop all schedule(s).
|
|
type Scheduler struct {
|
|
tasks map[string]*Task
|
|
nextRun timeSlice
|
|
mx deadlock.RWMutex
|
|
tsmx deadlock.RWMutex
|
|
log logr.Logger
|
|
updateScheduleChan chan updateSignalOp
|
|
scheduleOpts []Option
|
|
}
|
|
|
|
type UpdateSignalOp_Type int
|
|
|
|
const (
|
|
updateSignalOp_Reschedule UpdateSignalOp_Type = iota
|
|
)
|
|
|
|
type updateSignalOp struct {
|
|
id string
|
|
operation UpdateSignalOp_Type
|
|
}
|
|
|
|
type timeSlice []*Task
|
|
|
|
func (p timeSlice) Len() int {
|
|
return len(p)
|
|
}
|
|
|
|
func (p timeSlice) Less(i, j int) bool {
|
|
if p[i].GetNextRun().IsZero() {
|
|
return false
|
|
}
|
|
return p[i].GetNextRun().Before(p[j].GetNextRun())
|
|
}
|
|
|
|
func (p timeSlice) Swap(i, j int) {
|
|
p[i], p[j] = p[j], p[i]
|
|
}
|
|
|
|
//NewScheduler Creates new Scheduler, opt Options are applied to *every* schedule added and created by this scheduler.
|
|
func NewScheduler(opts ...Option) *Scheduler {
|
|
var options = defaultSchedOptions()
|
|
|
|
// Apply Options
|
|
for _, option := range opts {
|
|
option.apply(options)
|
|
}
|
|
|
|
s := &Scheduler{
|
|
tasks: make(map[string]*Task),
|
|
nextRun: make(timeSlice, 0),
|
|
updateScheduleChan: make(chan updateSignalOp, 100),
|
|
scheduleOpts: opts,
|
|
log: options.logger,
|
|
}
|
|
|
|
go s.scheduleLoop()
|
|
return s
|
|
}
|
|
|
|
//Add Create a new Task for` jobFunc func()` that will run according to `timer Timer` with the []Options of the Scheduler.
|
|
func (s *Scheduler) Add(ctx context.Context, id string, timer Timer, job func(context.Context), extraOpts ...Option) error {
|
|
s.mx.Lock()
|
|
defer s.mx.Unlock()
|
|
|
|
if _, ok := s.tasks[id]; ok {
|
|
return joberrors.ErrorScheduleExists{Message: "job with this id already exists"}
|
|
}
|
|
|
|
// Create schedule
|
|
opts := append(extraOpts, s.scheduleOpts...)
|
|
schedule := NewSchedule(ctx, id, timer, job, opts...)
|
|
schedule.updateSignal = s.updateScheduleChan
|
|
// Add to managed schedules
|
|
s.tasks[id] = schedule
|
|
metrics.SetGauge(schedmetrics.GetMetricsGaugeKey(schedmetrics.Metrics_Guage_Jobs), float32(len(s.tasks)))
|
|
|
|
s.log.Info("Added New Job", "jobid", schedule.GetID())
|
|
return nil
|
|
}
|
|
|
|
//Start Start the Schedule with the given id. Return error if no Schedule with the given id exist.
|
|
func (s *Scheduler) Start(id string) error {
|
|
s.mx.Lock()
|
|
|
|
// Find Schedule by id
|
|
schedule, found := s.tasks[id]
|
|
if !found {
|
|
return joberrors.ErrorScheduleNotFound{Message: "Schedule Not Found"}
|
|
}
|
|
|
|
// Start it ¯\_(ツ)_/¯
|
|
schedule.Start()
|
|
s.mx.Unlock()
|
|
s.addScheduletoRunQueue(schedule)
|
|
s.log.Info("Start Job", "jobid", schedule.GetID())
|
|
return nil
|
|
}
|
|
|
|
//StartAll Start All Schedules managed by the Scheduler
|
|
func (s *Scheduler) StartAll() {
|
|
s.log.Info("StartAll Called")
|
|
for _, schedule := range s.tasks {
|
|
s.Start(schedule.id)
|
|
}
|
|
}
|
|
|
|
//Stop Stop the Schedule with the given id. Return error if no Schedule with the given id exist.
|
|
func (s *Scheduler) Stop(id string) error {
|
|
s.mx.Lock()
|
|
defer s.mx.Unlock()
|
|
schedule, found := s.tasks[id]
|
|
if !found {
|
|
return joberrors.ErrorScheduleNotFound{Message: "Schedule Not Found"}
|
|
}
|
|
s.tsmx.Lock()
|
|
for pos, sched := range s.nextRun {
|
|
if sched.id == id {
|
|
s.nextRun = append(s.nextRun[:pos], s.nextRun[pos+1])
|
|
break
|
|
}
|
|
}
|
|
s.tsmx.Unlock()
|
|
schedule.Stop()
|
|
return nil
|
|
}
|
|
|
|
//StopAll Stops All Schedules managed by the Scheduler concurrently, but will block until ALL of them have stopped.
|
|
func (s *Scheduler) StopAll() {
|
|
s.mx.Lock()
|
|
defer s.mx.Unlock()
|
|
s.tsmx.Lock()
|
|
s.nextRun = nil
|
|
s.tsmx.Unlock()
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(len(s.tasks))
|
|
for _, schedule := range s.tasks {
|
|
go func(scheduleCpy *Task) {
|
|
scheduleCpy.Stop()
|
|
wg.Done()
|
|
}(schedule)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
//GetSchedule Returns a Schedule by ID from the Scheduler
|
|
func (s *Scheduler) GetSchedule(id string) (*Task, error) {
|
|
s.mx.Lock()
|
|
defer s.mx.Unlock()
|
|
j, ok := s.tasks[id]
|
|
if !ok {
|
|
return nil, joberrors.ErrorScheduleNotFound{Message: "Schedule Not Found"}
|
|
}
|
|
return j, nil
|
|
}
|
|
|
|
//GetAllSchedules Returns all Schedule's in the Scheduler
|
|
func (s *Scheduler) GetAllSchedules() (map[string]*Task, error) {
|
|
s.mx.Lock()
|
|
defer s.mx.Unlock()
|
|
return s.tasks, nil
|
|
}
|
|
|
|
func (s *Scheduler) getNextJob() *Task {
|
|
s.tsmx.RLock()
|
|
defer s.tsmx.RUnlock()
|
|
if s.nextRun.Len() == 0 {
|
|
return nil
|
|
}
|
|
for _, sched := range s.nextRun {
|
|
if sched.nextRun.Get().IsZero() {
|
|
s.log.Info("NextRun is Zero", "jobid", sched.GetID())
|
|
continue
|
|
}
|
|
return sched
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Scheduler) scheduleLoop() {
|
|
var nextRun time.Time
|
|
var nextRunChan <-chan time.Time
|
|
for {
|
|
nextjob := s.getNextJob()
|
|
if nextjob != nil {
|
|
nextRun = nextjob.GetNextRun()
|
|
s.log.Info("Next Scheduler Run", "next", time.Until(nextRun), "jobid", nextjob.GetID())
|
|
nextRunChan = time.After(time.Until(nextRun))
|
|
} else {
|
|
s.log.Info("No Jobs Scheduled")
|
|
}
|
|
|
|
select {
|
|
case <-nextRunChan:
|
|
if nextjob != nil {
|
|
s.log.Info("Dispatching Job", "jobid", nextjob.id)
|
|
nextjob.nextRun.Set(time.Time{})
|
|
go nextjob.Run()
|
|
} else {
|
|
s.log.Error(nil, "nextjob is Nil")
|
|
}
|
|
case op := <-s.updateScheduleChan:
|
|
switch op.operation {
|
|
case updateSignalOp_Reschedule:
|
|
s.log.Info("recalcSchedule Triggered", "operation", op.id)
|
|
s.updateNextRun()
|
|
default:
|
|
s.log.Error(nil, "Unhandled updateSignalOp Recieved")
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func (s *Scheduler) updateNextRun() {
|
|
s.tsmx.Lock()
|
|
defer s.tsmx.Unlock()
|
|
sort.Sort(s.nextRun)
|
|
for _, job := range s.nextRun {
|
|
s.log.Info("Next Run", "jobid", job.GetID(), "when", job.GetNextRun().Format(time.RFC1123))
|
|
}
|
|
}
|
|
|
|
func (s *Scheduler) addScheduletoRunQueue(schedule *Task) {
|
|
s.tsmx.Lock()
|
|
defer s.tsmx.Unlock()
|
|
s.nextRun = append(s.nextRun, schedule)
|
|
s.log.Info("addScheduletoRunQueue", "jobid", schedule.GetID())
|
|
for _, job := range s.nextRun {
|
|
s.log.Info("Job Run Queue", "jobid", job.GetID(), "when", job.GetNextRun().Format(time.RFC1123))
|
|
}
|
|
s.updateScheduleChan <- updateSignalOp{operation: updateSignalOp_Reschedule, id: schedule.id}
|
|
}
|