mirror of
https://github.com/Fishwaldo/sched.git
synced 2025-03-15 19:41:46 +00:00
386 lines
10 KiB
Go
386 lines
10 KiB
Go
package sched
|
|
|
|
import (
|
|
// "errors"
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/sherifabdlnaby/sched/job"
|
|
"github.com/uber-go/tally"
|
|
)
|
|
|
|
// passing context to a function with variables: https://play.golang.org/p/SW7uoU_KjlR
|
|
|
|
// Schedule A Schedule is an object that wraps a Job (func(){}) and runs it on a schedule according to the supplied
|
|
// Timer; With the the ability to expose metrics, and write logs to indicate job health, state, and stats.
|
|
type Schedule struct {
|
|
id string
|
|
|
|
// Source function used to create job.Job
|
|
jobSrcFunc func(ctx context.Context)
|
|
|
|
// Timer used to trigger Jobs
|
|
timer Timer
|
|
|
|
// SignalChan for termination
|
|
stopScheduleSignal chan interface{}
|
|
|
|
// Signal to Reschedule Job
|
|
retryScheduleSignal chan interface{}
|
|
|
|
// Concurrent safe JobMap
|
|
activeJobs jobMap
|
|
|
|
// Wait-group
|
|
wg sync.WaitGroup
|
|
|
|
// Logging Interface
|
|
logger Logger
|
|
|
|
// Logging Interface
|
|
mx sync.RWMutex
|
|
|
|
// State
|
|
state State
|
|
|
|
// metrics
|
|
metrics metrics
|
|
|
|
// expected runtime
|
|
expectedRuntime time.Duration
|
|
|
|
// Middleware to run
|
|
middlewares []MiddleWarehandler
|
|
|
|
// Disallow Overlapping Jobs
|
|
disallowOverlappingJobs bool
|
|
|
|
// Last Job Error Return
|
|
lastError error
|
|
|
|
// Context for Jobs
|
|
ctx context.Context
|
|
|
|
// Retry time to retry the job
|
|
retry time.Duration
|
|
|
|
// Max Number of Attempts to run a job
|
|
maxRetries int
|
|
|
|
// current number of attemps to run the job
|
|
attempts int
|
|
}
|
|
|
|
// NewSchedule Create a new schedule for` jobFunc func()` that will run according to `timer Timer` with the supplied []Options
|
|
func NewSchedule(ctx context.Context, id string, timer Timer, jobFunc func(context.Context), opts ...Option) *Schedule {
|
|
var options = defaultOptions()
|
|
|
|
// Apply Options
|
|
for _, option := range opts {
|
|
option.apply(options)
|
|
}
|
|
|
|
// Set Logger
|
|
logger := options.logger.With("id", id)
|
|
|
|
// Set Metrics
|
|
// // Init Default Scope if true, ignore io.closer on purpose.
|
|
if options.initDefaultScope {
|
|
options.metricsScope, _ = tally.NewRootScope(tally.ScopeOptions{
|
|
Reporter: newConsoleStatsReporter(logger.Named("metrics")),
|
|
}, options.defaultScopePrintEvery)
|
|
}
|
|
metrics := *newMetrics(id, options.metricsScope)
|
|
|
|
s := &Schedule{
|
|
id: id,
|
|
state: INIT,
|
|
jobSrcFunc: jobFunc,
|
|
timer: timer,
|
|
activeJobs: *newJobMap(),
|
|
logger: logger,
|
|
metrics: metrics,
|
|
expectedRuntime: options.expectedRunDuration,
|
|
middlewares: options.middlewares,
|
|
disallowOverlappingJobs: options.disallowOverlapping,
|
|
maxRetries: options.maxRetries,
|
|
ctx: ctx,
|
|
}
|
|
s.transitionState(NEW)
|
|
return s
|
|
}
|
|
|
|
// Start Start the scheduler. Method is concurrent safe. Calling Start() have the following effects according to the
|
|
// scheduler state:
|
|
// 1. NEW: Start the Schedule; running the defined Job on the first Timer's Next() time.
|
|
// 2. QUEUED: No Effect (and prints warning)
|
|
// 3. STOPPED: Restart the schedule
|
|
// 4. FINISHED: No Effect (and prints warning)
|
|
func (s *Schedule) Start() {
|
|
s.mx.Lock()
|
|
defer s.mx.Unlock()
|
|
|
|
if s.state == FINISHED {
|
|
s.logger.Warnw("Attempting to start a finished schedule")
|
|
return
|
|
}
|
|
|
|
if s.state == QUEUED {
|
|
s.logger.Warnw("Attempting to start an already QUEUED schedule")
|
|
return
|
|
}
|
|
|
|
s.logger.Infow("Job Schedule QUEUED")
|
|
s.transitionState(QUEUED)
|
|
s.metrics.up.Update(1)
|
|
|
|
// Create stopSchedule signal channel, buffer = 1 to allow non-blocking signaling.
|
|
s.stopScheduleSignal = make(chan interface{}, 1)
|
|
|
|
// Create retrySchedule signal channel, buffer 1 to allow non-blocking Signalling.
|
|
s.retryScheduleSignal = make(chan interface{}, 1)
|
|
|
|
go s.scheduleLoop()
|
|
go func() {}()
|
|
}
|
|
|
|
// Stop stops the scheduler. Method is **Blocking** and concurrent safe. When called:
|
|
// 1. Schedule will cancel all waiting scheduled jobs.
|
|
// 2. Schedule will wait for all running jobs to finish.
|
|
// Calling Stop() has the following effects depending on the state of the schedule:
|
|
// 1. NEW: No Effect
|
|
// 2. QUEUED: Stop Schedule
|
|
// 3. STOPPED: No Effect
|
|
// 4. FINISHED: No Effect
|
|
func (s *Schedule) Stop() {
|
|
s.mx.Lock()
|
|
defer s.mx.Unlock()
|
|
|
|
if s.state == STOPPED || s.state == FINISHED || s.state == NEW {
|
|
return
|
|
}
|
|
s.transitionState(STOPPING)
|
|
|
|
// Stop control loop
|
|
s.logger.Infow("Stopping Schedule...")
|
|
s.stopScheduleSignal <- struct{}{}
|
|
close(s.stopScheduleSignal)
|
|
|
|
// Print No. of Active Jobs
|
|
if noOfActiveJobs := s.activeJobs.len(); s.activeJobs.len() > 0 {
|
|
s.logger.Infow(fmt.Sprintf("Waiting for '%d' active jobs still running...", noOfActiveJobs))
|
|
}
|
|
|
|
s.wg.Wait()
|
|
s.transitionState(STOPPED)
|
|
s.logger.Infow("Job Schedule Stopped")
|
|
s.metrics.up.Update(0)
|
|
_ = s.logger.Sync()
|
|
}
|
|
|
|
// Finish stops the scheduler and put it FINISHED state so that schedule cannot re-start again. Finish() is called
|
|
// automatically if Schedule Timer returned `done bool` == true.
|
|
// Method is **Blocking** and concurrent safe.
|
|
func (s *Schedule) Finish() {
|
|
// Stop First
|
|
s.Stop()
|
|
|
|
s.mx.Lock()
|
|
defer s.mx.Unlock()
|
|
|
|
if s.state == FINISHED {
|
|
return
|
|
}
|
|
|
|
s.transitionState(FINISHED)
|
|
s.logger.Infow("Job Schedule Finished")
|
|
}
|
|
|
|
// scheduleLoop scheduler control loop
|
|
func (s *Schedule) scheduleLoop() {
|
|
// Main Loop
|
|
main:
|
|
for {
|
|
var nextRun time.Time
|
|
var done bool = false
|
|
if s.retry > 0 {
|
|
nextRun = time.Now().Add(s.retry)
|
|
s.logger.Infow("Job Retry Set")
|
|
s.retry = 0
|
|
} else {
|
|
nextRun, done = s.timer.Next()
|
|
if done {
|
|
s.logger.Infow("No more Jobs will be scheduled")
|
|
break main
|
|
}
|
|
s.logger.Infow("Job Will Run according to Schedule")
|
|
}
|
|
nextRunDuration := time.Until(nextRun)
|
|
nextRunDuration = negativeToZero(nextRunDuration)
|
|
nextRunChan := time.After(nextRunDuration)
|
|
s.logger.Infow("Job Next Run Scheduled", "After", nextRunDuration.Round(1*time.Millisecond).String(), "At", nextRun.Format(time.RFC3339))
|
|
select {
|
|
case <-s.ctx.Done():
|
|
s.logger.Infow("Job Cancelled by Context")
|
|
break main
|
|
case <-s.stopScheduleSignal:
|
|
s.logger.Infow("Job Next Run Canceled", "At", nextRun.Format(time.RFC3339))
|
|
break main
|
|
case <-s.retryScheduleSignal:
|
|
s.logger.Infow("Rescheduling Job Run")
|
|
case <-nextRunChan:
|
|
// Run job
|
|
s.logger.Infow("Dispating Job")
|
|
go s.runJobInstance()
|
|
s.transitionState(QUEUED)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Schedule) runJobInstance() {
|
|
s.wg.Add(1)
|
|
defer s.wg.Done()
|
|
|
|
// If we don't allow Overlapping Jobs... bail out.
|
|
if s.disallowOverlappingJobs && (s.activeJobs.len() > 0) {
|
|
s.logger.Warnw("Job is already running and Overlapping Jobs is Disabled")
|
|
s.transitionState(OVERLAPPINGJOB)
|
|
return
|
|
}
|
|
|
|
if !s.transitionState(DISPATCHED) {
|
|
s.logger.Warnw("Job Transition Was Cancelled")
|
|
return;
|
|
}
|
|
// Create a new instance of s.jobSrcFunc
|
|
jobInstance := job.NewJob(s.ctx, s.jobSrcFunc)
|
|
|
|
s.logger.Infow("Job Run Starting", "Instance", jobInstance.ID())
|
|
|
|
// Add to active jobs map
|
|
s.activeJobs.add(jobInstance)
|
|
defer s.activeJobs.delete(jobInstance)
|
|
|
|
// Logs and Metrics --------------------------------------
|
|
// -------------------------------------------------------
|
|
s.metrics.runs.Inc(1)
|
|
if s.activeJobs.len() > 1 {
|
|
s.metrics.overlappingCount.Inc(1)
|
|
}
|
|
if s.expectedRuntime > 0 {
|
|
time.AfterFunc(s.expectedRuntime, func() {
|
|
if jobInstance.State() == job.RUNNING {
|
|
s.logger.Warnw("Job Run Exceeded Expected Time", "Instance", jobInstance.ID(),
|
|
"Expected", s.expectedRuntime.Round(1000*time.Millisecond))
|
|
s.metrics.runExceedExpected.Inc(1)
|
|
}
|
|
})
|
|
}
|
|
// -------------------------------------------------------
|
|
|
|
// Synchronously Run Job Instance
|
|
s.lastError = jobInstance.Run()
|
|
|
|
// -------------------------------------------------------
|
|
// Logs and Metrics --------------------------------------
|
|
if s.lastError != nil {
|
|
s.logger.Errorw("Job Error", "Instance", jobInstance.ID(),
|
|
"Duration", jobInstance.ActualElapsed().Round(1*time.Millisecond),
|
|
"State", jobInstance.State().String(), "error", s.lastError.Error())
|
|
s.metrics.runErrors.Inc(1)
|
|
switch s.lastError.(type) {
|
|
case job.ErrorJobPanic:
|
|
s.transitionState(PANICED)
|
|
case job.ErrorJobStarted:
|
|
s.transitionState(OVERLAPPINGJOB)
|
|
}
|
|
} else {
|
|
s.logger.Infow("Job Finished", "Instance", jobInstance.ID(),
|
|
"Duration", jobInstance.ActualElapsed().Round(1*time.Millisecond),
|
|
"State", jobInstance.State().String())
|
|
s.transitionState(COMPLETED)
|
|
}
|
|
s.metrics.runActualElapsed.Record(jobInstance.ActualElapsed())
|
|
s.metrics.runTotalElapsed.Record(jobInstance.TotalElapsed())
|
|
s.lastError = nil
|
|
}
|
|
|
|
func negativeToZero(nextRunDuration time.Duration) time.Duration {
|
|
if nextRunDuration < 0 {
|
|
nextRunDuration = 0
|
|
}
|
|
return nextRunDuration
|
|
}
|
|
|
|
// State Returns the current State of the Schedule
|
|
func (s *Schedule) State() State {
|
|
return s.state
|
|
}
|
|
|
|
func (s *Schedule) transitionState(newstate State) (ok bool) {
|
|
if s.state == newstate {
|
|
return true
|
|
}
|
|
s.logger.Infow(fmt.Sprintf("Middleware Transition: %s -> %s", s.state.String(), newstate.String()))
|
|
for _, middleware := range s.middlewares {
|
|
stop, err := middleware.Handler(s, newstate)
|
|
if stop {
|
|
s.logger.Infow("Job will be Deferred")
|
|
newstate = DEFERRED
|
|
}
|
|
if err != nil {
|
|
s.logger.Infow("Middleware Blocked Transition: ", "Error", err)
|
|
return false
|
|
}
|
|
s.logger.Infow(fmt.Sprintf("Handler Finished %T: %v", middleware, stop))
|
|
}
|
|
if !(s.state == STOPPING || s.state == STOPPED || s.state == FINISHED) {
|
|
switch newstate {
|
|
case PANICED:
|
|
s.state = QUEUED
|
|
ok = false;
|
|
case OVERLAPPINGJOB:
|
|
s.state = QUEUED
|
|
ok = false;
|
|
case COMPLETED:
|
|
s.state = QUEUED
|
|
ok = false;
|
|
case DEFERRED:
|
|
s.state = QUEUED
|
|
ok = false;
|
|
default:
|
|
s.state = newstate
|
|
ok = true;
|
|
}
|
|
}
|
|
s.logger.Infow(fmt.Sprintf("Final Middleware Transition: %s <- %s", s.state.String(), newstate.String()))
|
|
return ok
|
|
}
|
|
|
|
func (s *Schedule) RetryJob(in time.Duration) bool {
|
|
if s.state == STOPPING || s.state == STOPPED || s.state == FINISHED {
|
|
return false
|
|
}
|
|
if s.maxRetries > 0 && s.attempts >= s.maxRetries {
|
|
s.logger.Infow("Max Retries Exceeded", "Attempts", s.attempts, "MaxRetries", s.maxRetries)
|
|
s.state = QUEUED
|
|
s.attempts = 0
|
|
s.retry = 0
|
|
return false
|
|
} else {
|
|
s.retry = in
|
|
if s.retry > 0 {
|
|
s.logger.Infow("Rescheduling Job", "Attempt", s.attempts)
|
|
select {
|
|
case s.retryScheduleSignal <- struct{}{}:
|
|
s.logger.Infow("Job Rescheduled")
|
|
default:
|
|
s.logger.Infow("Job Rescheduled")
|
|
}
|
|
s.attempts++
|
|
}
|
|
}
|
|
return true
|
|
}
|