diff --git a/examples/main.go b/examples/main.go deleted file mode 100644 index 43fed79..0000000 --- a/examples/main.go +++ /dev/null @@ -1,71 +0,0 @@ -package main - -import ( - "github.com/sherifabdlnaby/sched" - "github.com/uber-go/tally" - promreporter "github.com/uber-go/tally/prometheus" - "log" - "net/http" - "os" - "os/signal" - "syscall" - "time" -) - -func main() { - - defer time.Sleep(2 * time.Second) - - r := promreporter.NewReporter(promreporter.Options{}) - - // Note: `promreporter.DefaultSeparator` is "_". - // Prometheus doesnt like metrics with "." or "-" in them. - promScope, closer := tally.NewRootScope(tally.ScopeOptions{ - Tags: map[string]string{}, - CachedReporter: r, - Separator: promreporter.DefaultSeparator, - }, 1*time.Second) - defer closer.Close() - - fixed, _ := sched.NewFixed(1 * time.Second) - once, _ := sched.NewOnce(1 * time.Second) - fixed2, _ := sched.NewFixed(5 * time.Second) - - schedler1 := sched.NewSchedule("one", once, func() { - log.Println("Once Hello World") - time.Sleep(1 * time.Second) - }, sched.WithLogger(sched.DefaultLogger()), sched.WithConsoleMetrics(10*time.Second)) - - schedler2 := sched.NewSchedule("ovrlp", fixed, func() { - log.Println("Cron Hello World") - time.Sleep(1 * time.Second) - }, sched.WithLogger(sched.DefaultLogger()), sched.WithMetrics(promScope)) - - schedler3 := sched.NewSchedule("three", fixed2, func() { - log.Println("panicking") - panic("I panicked :(") - }, sched.WithLogger(sched.DefaultLogger()), sched.WithConsoleMetrics(10*time.Second)) - - http.Handle("/metrics", r.HTTPHandler()) - go http.ListenAndServe(":8080", nil) - - schedler1.Start() - defer schedler1.Stop() - schedler2.Start() - defer schedler2.Stop() - schedler3.Start() - defer schedler3.Stop() - - time.AfterFunc(5*time.Second, func() { - schedler1.Stop() - }) - - // Listen to Signals - signalChan := make(chan os.Signal, 1) - signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) - - // Termination - _ = <-signalChan - - return -} diff --git a/examples/schedule-warn-expected/README.md b/examples/schedule-warn-expected/README.md new file mode 100644 index 0000000..64963ad --- /dev/null +++ b/examples/schedule-warn-expected/README.md @@ -0,0 +1,45 @@ +# Output with a job that run every 2s, takes 1s to run, and expected is 500ms. + +Notice the WARN logs. + +## Output + +```json +2021-04-10T13:57:59.558+0200 INFO sched sched/schedule.go:101 Job Schedule Started {"id": "fixed2s"} +2021-04-10T13:57:59.558+0200 INFO sched sched/schedule.go:176 Job Next Run Scheduled {"id": "fixed2s", "After": "2s", "At": "2021-04-10T13:58:01+02:00"} +2021-04-10T13:58:01.558+0200 INFO sched sched/schedule.go:176 Job Next Run Scheduled {"id": "fixed2s", "After": "2s", "At": "2021-04-10T13:58:03+02:00"} +2021-04-10T13:58:01.558+0200 INFO sched sched/schedule.go:203 Job Run Starting {"id": "fixed2s", "Instance": "4cb6b448-5b93-4689-a129-5448297e727e"} +2021/04/10 13:58:01 Doing some work... +2021-04-10T13:58:02.060+0200 WARN sched sched/schedule.go:211 Job Run Exceeded Expected Time {"id": "fixed2s", "Instance": "4cb6b448-5b93-4689-a129-5448297e727e", "Expected": "1s"} +github.com/sherifabdlnaby/sched.(*Schedule).runJobInstance.func1 +/Users/sherifabdlnaby/code/projects/sched/schedule.go:211 +2021/04/10 13:58:02 Finished Work. +2021-04-10T13:58:02.560+0200 INFO sched sched/schedule.go:229 Job Finished {"id": "fixed2s", "Instance": "4cb6b448-5b93-4689-a129-5448297e727e", "Duration": "1.002s", "State": "FINISHED"} +2021-04-10T13:58:03.561+0200 INFO sched sched/schedule.go:176 Job Next Run Scheduled {"id": "fixed2s", "After": "2s", "At": "2021-04-10T13:58:05+02:00"} +2021-04-10T13:58:03.561+0200 INFO sched sched/schedule.go:203 Job Run Starting {"id": "fixed2s", "Instance": "3d47d01b-f2d3-4bf3-830c-1815308e3b1f"} +2021/04/10 13:58:03 Doing some work... +2021-04-10T13:58:04.062+0200 WARN sched sched/schedule.go:211 Job Run Exceeded Expected Time {"id": "fixed2s", "Instance": "3d47d01b-f2d3-4bf3-830c-1815308e3b1f", "Expected": "1s"} +github.com/sherifabdlnaby/sched.(*Schedule).runJobInstance.func1 +/Users/sherifabdlnaby/code/projects/sched/schedule.go:211 +2021/04/10 13:58:04 Finished Work. +2021-04-10T13:58:04.564+0200 INFO sched sched/schedule.go:229 Job Finished {"id": "fixed2s", "Instance": "3d47d01b-f2d3-4bf3-830c-1815308e3b1f", "Duration": "1.003s", "State": "FINISHED"} +2021-04-10T13:58:05.561+0200 INFO sched sched/schedule.go:176 Job Next Run Scheduled {"id": "fixed2s", "After": "2s", "At": "2021-04-10T13:58:07+02:00"} +2021-04-10T13:58:05.561+0200 INFO sched sched/schedule.go:203 Job Run Starting {"id": "fixed2s", "Instance": "78059970-4fc9-4e85-b310-15a3aef08602"} +2021/04/10 13:58:05 Doing some work... +2021-04-10T13:58:06.066+0200 WARN sched sched/schedule.go:211 Job Run Exceeded Expected Time {"id": "fixed2s", "Instance": "78059970-4fc9-4e85-b310-15a3aef08602", "Expected": "1s"} +github.com/sherifabdlnaby/sched.(*Schedule).runJobInstance.func1 +/Users/sherifabdlnaby/code/projects/sched/schedule.go:211 +2021/04/10 13:58:06 Finished Work. +2021-04-10T13:58:06.563+0200 INFO sched sched/schedule.go:229 Job Finished {"id": "fixed2s", "Instance": "78059970-4fc9-4e85-b310-15a3aef08602", "Duration": "1.002s", "State": "FINISHED"} +2021-04-10T13:58:07.561+0200 INFO sched sched/schedule.go:176 Job Next Run Scheduled {"id": "fixed2s", "After": "2s", "At": "2021-04-10T13:58:09+02:00"} +2021-04-10T13:58:07.561+0200 INFO sched sched/schedule.go:203 Job Run Starting {"id": "fixed2s", "Instance": "2e846c92-e0d6-4028-bca4-d930caded0ce"} +2021/04/10 13:58:07 Doing some work... +2021-04-10T13:58:08.066+0200 WARN sched sched/schedule.go:211 Job Run Exceeded Expected Time {"id": "fixed2s", "Instance": "2e846c92-e0d6-4028-bca4-d930caded0ce", "Expected": "1s"} +github.com/sherifabdlnaby/sched.(*Schedule).runJobInstance.func1 +/Users/sherifabdlnaby/code/projects/sched/schedule.go:211 +2021/04/10 13:58:08 Finished Work. +2021-04-10T13:58:08.561+0200 INFO sched sched/schedule.go:229 Job Finished {"id": "fixed2s", "Instance": "2e846c92-e0d6-4028-bca4-d930caded0ce", "Duration": "1s", "State": "FINISHED"} +2021-04-10T13:58:09.245+0200 INFO sched sched/schedule.go:130 Stopping Schedule... {"id": "fixed2s"} +2021-04-10T13:58:09.245+0200 INFO sched sched/schedule.go:141 Job Schedule Stopped {"id": "fixed2s"} + +``` diff --git a/examples/schedule-warn-expected/main.go b/examples/schedule-warn-expected/main.go new file mode 100644 index 0000000..0432b0c --- /dev/null +++ b/examples/schedule-warn-expected/main.go @@ -0,0 +1,41 @@ +package main + +import ( + "fmt" + "github.com/sherifabdlnaby/sched" + "log" + "os" + "os/signal" + "syscall" + "time" +) + +func main() { + + fixed2s, err := sched.NewFixed(2 * time.Second) + if err != nil { + panic(fmt.Sprintf("invalid interval: %s", err.Error())) + } + + job := func() { + log.Println("Doing some work...") + time.Sleep(1 * time.Second) + log.Println("Finished Work.") + } + + // Create Schedule + schedule := sched.NewSchedule("fixed2s", fixed2s, job, sched.WithLogger(sched.DefaultLogger()), sched.WithExpectedRunTime(500*time.Millisecond)) + + // Start Schedule + schedule.Start() + + // Listen to CTRL + C And indefintly wait shutdown. + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) + _ = <-signalChan + + // Stop before shutting down. + schedule.Stop() + + return +} diff --git a/examples/scheduler-extra-opts/README.md b/examples/scheduler-extra-opts/README.md new file mode 100644 index 0000000..cfd5be5 --- /dev/null +++ b/examples/scheduler-extra-opts/README.md @@ -0,0 +1,48 @@ +# 4 Schedules in a Scheduler Manager + +1. Cron Every Minute +2. Cron Every 5 Minutes +3. Fixed Interval Every 30 Secs +4. *Once* after 10 Secs from schedule start. + +Started and Stopped using `StartAll()` and `StopAll()` + +## Extra Options + +While all schedules inherit the same opts passed to their scheduler. The `fixedTimer30second` schedule has an extra +Option passed to it. Extra Options override inherited options. + +## Output + +```json +2021-04-10T14:02:12.912+0200 INFO sched sched/schedule.go:101 Job Schedule Started {"id": "cronEveryMinute"} +2021-04-10T14:02:12.912+0200 INFO sched sched/schedule.go:101 Job Schedule Started {"id": "cronEvery5Minute"} +2021-04-10T14:02:12.912+0200 INFO sched sched/schedule.go:101 Job Schedule Started {"id": "fixedTimer30second"} +2021-04-10T14:02:12.912+0200 INFO sched sched/schedule.go:101 Job Schedule Started {"id": "onceAfter10s"} +2021-04-10T14:02:12.912+0200 INFO sched sched/schedule.go:176 Job Next Run Scheduled {"id": "cronEveryMinute", "After": "47s", "At": "2021-04-10T14:03:00+02:00"} +2021-04-10T14:02:12.912+0200 INFO sched sched/schedule.go:176 Job Next Run Scheduled {"id": "fixedTimer30second", "After": "30s", "At": "2021-04-10T14:02:42+02:00"} +2021-04-10T14:02:12.912+0200 INFO sched sched/schedule.go:176 Job Next Run Scheduled {"id": "cronEvery5Minute", "After": "2m47s", "At": "2021-04-10T14:05:00+02:00"} +2021-04-10T14:02:12.912+0200 INFO sched sched/schedule.go:176 Job Next Run Scheduled {"id": "onceAfter10s", "After": "10s", "At": "2021-04-10T14:02:22+02:00"} +2021-04-10T14:02:22.917+0200 INFO sched sched/schedule.go:170 No more Jobs will be scheduled {"id": "onceAfter10s"} +2021-04-10T14:02:22.917+0200 INFO sched sched/schedule.go:130 Stopping Schedule... {"id": "onceAfter10s"} +2021-04-10T14:02:22.917+0200 INFO sched sched/schedule.go:203 Job Run Starting {"id": "onceAfter10s", "Instance": "9d70dff1-a120-446c-937c-64fae1c5922e"} +2021/04/10 14:02:22 onceAfter10s Doing some work... +2021/04/10 14:02:23 onceAfter10s Finished Work. +2021-04-10T14:02:23.921+0200 INFO sched sched/schedule.go:229 Job Finished {"id": "onceAfter10s", "Instance": "9d70dff1-a120-446c-937c-64fae1c5922e", "Duration": "1.003s", "State": "FINISHED"} +2021-04-10T14:02:23.921+0200 INFO sched.metrics sched/metric.go:48 timer sched.run_actual_elapsed_time {"id": "onceAfter10s", "name": "sched.run_actual_elapsed_time", "interval": "1.00318182s", "tags": {"ID":"onceAfter10s"}} +2021-04-10T14:02:23.921+0200 INFO sched.metrics sched/metric.go:48 timer sched.run_total_elapsed_time {"id": "onceAfter10s", "name": "sched.run_total_elapsed_time", "interval": "1.003226808s", "tags": {"ID":"onceAfter10s"}} +2021-04-10T14:02:23.921+0200 INFO sched sched/schedule.go:141 Job Schedule Stopped {"id": "onceAfter10s"} +2021-04-10T14:02:23.921+0200 INFO sched sched/schedule.go:161 Job Schedule Finished {"id": "onceAfter10s"} +2021-04-10T14:02:42.916+0200 INFO sched sched/schedule.go:176 Job Next Run Scheduled {"id": "fixedTimer30second", "After": "30s", "At": "2021-04-10T14:03:12+02:00"} +2021-04-10T14:02:42.916+0200 INFO sched sched/schedule.go:203 Job Run Starting {"id": "fixedTimer30second", "Instance": "2a02ec70-d141-48bd-885f-14bc89329ea6"} +2021/04/10 14:02:42 fixedEvery30Second Doing some work... +2021-04-10T14:02:43.419+0200 WARN sched sched/schedule.go:211 Job Run Exceeded Expected Time {"id": "fixedTimer30second", "Instance": "2a02ec70-d141-48bd-885f-14bc89329ea6", "Expected": "1s"} +github.com/sherifabdlnaby/sched.(*Schedule).runJobInstance.func1 +/Users/sherifabdlnaby/code/projects/sched/schedule.go:211 +2021/04/10 14:02:43 fixedEvery30Second Finished Work. +2021-04-10T14:02:43.921+0200 INFO sched sched/schedule.go:229 Job Finished {"id": "fixedTimer30second", "Instance": "2a02ec70-d141-48bd-885f-14bc89329ea6", "Duration": "1.005s", "State": "FINISHED"} +2021-04-10T14:02:43.921+0200 INFO sched.metrics sched/metric.go:48 timer sched.run_actual_elapsed_time {"id": "fixedTimer30second", "name": "sched.run_actual_elapsed_time", "interval": "1.004632668s", "tags": {"ID":"fixedTimer30second"}} +2021-04-10T14:02:43.921+0200 INFO sched.metrics sched/metric.go:48 timer sched.run_total_elapsed_time {"id": "fixedTimer30second", "name": "sched.run_total_elapsed_time", "interval": "1.004697199s", "tags": {"ID":"fixedTimer30second"}} + +``` + diff --git a/examples/scheduler-extra-opts/main.go b/examples/scheduler-extra-opts/main.go new file mode 100644 index 0000000..110cd34 --- /dev/null +++ b/examples/scheduler-extra-opts/main.go @@ -0,0 +1,63 @@ +package main + +import ( + "fmt" + "github.com/sherifabdlnaby/sched" + "log" + "os" + "os/signal" + "syscall" + "time" +) + +func main() { + + job := func(id string) func() { + return func() { + log.Println(id + "\t Doing some work...") + time.Sleep(1 * time.Second) + log.Println(id + "\t Finished Work.") + } + } + + cronTimer, err := sched.NewCron("* * * * *") + if err != nil { + panic(fmt.Sprintf("invalid cron expression: %s", err.Error())) + } + + cronTimer5, err := sched.NewCron("*/5 * * * *") + if err != nil { + panic(fmt.Sprintf("invalid cron expression: %s", err.Error())) + } + + fixedTimer30second, err := sched.NewFixed(30 * time.Second) + if err != nil { + panic(fmt.Sprintf("invalid interval: %s", err.Error())) + } + + onceAfter10s, err := sched.NewOnce(10 * time.Second) + if err != nil { + panic(fmt.Sprintf("invalid delay: %s", err.Error())) + } + + // Create Schedule + scheduler := sched.NewScheduler(sched.WithLogger(sched.DefaultLogger()), + sched.WithConsoleMetrics(1*time.Minute)) + + _ = scheduler.Add("cronEveryMinute", cronTimer, job("every-minute-cron")) + _ = scheduler.Add("cronEvery5Minute", cronTimer5, job("every-five-minute-cron")) + _ = scheduler.Add("fixedTimer30second", fixedTimer30second, job("fixedEvery30Second"), sched.WithExpectedRunTime(500*time.Millisecond)) + _ = scheduler.Add("onceAfter10s", onceAfter10s, job("onceAfter10s")) + + scheduler.StartAll() + + // Listen to CTRL + C + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) + _ = <-signalChan + + // Stop before shutting down. + scheduler.StopAll() + + return +} diff --git a/metric.go b/metric.go index e6642a0..c75fe13 100644 --- a/metric.go +++ b/metric.go @@ -8,23 +8,25 @@ import ( ) type metrics struct { - up tally.Gauge - runs tally.Counter - overlappingCount tally.Counter - runActualElapsed tally.Timer - runTotalElapsed tally.Timer - runErrors tally.Counter + up tally.Gauge + runs tally.Counter + overlappingCount tally.Counter + runActualElapsed tally.Timer + runTotalElapsed tally.Timer + runErrors tally.Counter + runExceedExpected tally.Counter } func newMetrics(name string, metricsScope tally.Scope) *metrics { subScope := metricsScope.SubScope("sched") return &metrics{ - up: subScope.Tagged(map[string]string{"ID": name}).Gauge("up"), - runs: subScope.Tagged(map[string]string{"ID": name}).Counter("runs"), - overlappingCount: subScope.Tagged(map[string]string{"ID": name}).Counter("runs_overlapping"), - runActualElapsed: subScope.Tagged(map[string]string{"ID": name}).Timer("run_actual_elapsed_time"), - runTotalElapsed: subScope.Tagged(map[string]string{"ID": name}).Timer("run_total_elapsed_time"), - runErrors: subScope.Tagged(map[string]string{"ID": name}).Counter("run_errors"), + up: subScope.Tagged(map[string]string{"ID": name}).Gauge("up"), + runs: subScope.Tagged(map[string]string{"ID": name}).Counter("runs"), + overlappingCount: subScope.Tagged(map[string]string{"ID": name}).Counter("runs_overlapping"), + runActualElapsed: subScope.Tagged(map[string]string{"ID": name}).Timer("run_actual_elapsed_time"), + runTotalElapsed: subScope.Tagged(map[string]string{"ID": name}).Timer("run_total_elapsed_time"), + runErrors: subScope.Tagged(map[string]string{"ID": name}).Counter("run_errors"), + runExceedExpected: subScope.Tagged(map[string]string{"ID": name}).Counter("run_exceed_expected_time"), } } diff --git a/options.go b/options.go index 56f76a3..e3bb3d7 100644 --- a/options.go +++ b/options.go @@ -7,8 +7,9 @@ import ( ) type options struct { - logger Logger - metricsScope tally.Scope + logger Logger + metricsScope tally.Scope + expectedRunDuration time.Duration // ------------------ initDefaultScope bool defaultScopePrintEvery time.Duration @@ -72,3 +73,16 @@ func WithMetrics(metricsScope tally.Scope) Option { func WithConsoleMetrics(printEvery time.Duration) Option { return metricsOption{metricsScope: nil, initConsoleMetrics: true, defaultScopePrintEvery: printEvery} } + +type expectedRunTime struct { + duration time.Duration +} + +func (l expectedRunTime) apply(opts *options) { + opts.expectedRunDuration = l.duration +} + +//WithExpectedRunTime Use to indicate the expected Runtime ( Logs a warning and adds in metrics when it exceeds ) +func WithExpectedRunTime(d time.Duration) Option { + return expectedRunTime{duration: d} +} diff --git a/schedule.go b/schedule.go index 8215cb6..ca3cb53 100644 --- a/schedule.go +++ b/schedule.go @@ -40,6 +40,9 @@ type Schedule struct { // metrics metrics metrics + + // State + expectedRuntime time.Duration } // NewSchedule Create a new schedule for` jobFunc func()` that will run according to `timer Timer` with the supplied []Options @@ -64,13 +67,14 @@ func NewSchedule(id string, timer Timer, jobFunc func(), opts ...Option) *Schedu metrics := *newMetrics(id, options.metricsScope) return &Schedule{ - ID: id, - state: NEW, - jobSrcFunc: jobFunc, - timer: timer, - activeJobs: *newJobMap(), - logger: logger, - metrics: metrics, + ID: id, + state: NEW, + jobSrcFunc: jobFunc, + timer: timer, + activeJobs: *newJobMap(), + logger: logger, + metrics: metrics, + expectedRuntime: options.expectedRunDuration, } } @@ -194,15 +198,29 @@ func (s *Schedule) runJobInstance() { s.activeJobs.add(jobInstance) defer s.activeJobs.delete(jobInstance) + // Logs and Metrics -------------------------------------- + // ------------------------------------------------------- s.logger.Infow("Job Run Starting", "Instance", jobInstance.ID()) s.metrics.runs.Inc(1) if s.activeJobs.len() > 1 { s.metrics.overlappingCount.Inc(1) } + if s.expectedRuntime > 0 { + time.AfterFunc(s.expectedRuntime, func() { + if jobInstance.State() == job.RUNNING { + s.logger.Warnw("Job Run Exceeded Expected Time", "Instance", jobInstance.ID(), + "Expected", s.expectedRuntime.Round(1000*time.Millisecond)) + s.metrics.runExceedExpected.Inc(1) + } + }) + } + // ------------------------------------------------------- // Synchronously Run Job Instance err := jobInstance.Run() + // ------------------------------------------------------- + // Logs and Metrics -------------------------------------- if err != nil { s.logger.Errorw("Job Error", "Instance", jobInstance.ID(), "Duration", jobInstance.ActualElapsed().Round(1*time.Millisecond), diff --git a/scheduler.go b/scheduler.go index 2fa2f43..f8253ab 100644 --- a/scheduler.go +++ b/scheduler.go @@ -22,7 +22,7 @@ func NewScheduler(opts ...Option) *Scheduler { } //Add Create a new schedule for` jobFunc func()` that will run according to `timer Timer` with the []Options of the Scheduler. -func (s *Scheduler) Add(id string, timer Timer, job func()) error { +func (s *Scheduler) Add(id string, timer Timer, job func(), extraOpts ...Option) error { s.mx.Lock() defer s.mx.Unlock() @@ -31,7 +31,7 @@ func (s *Scheduler) Add(id string, timer Timer, job func()) error { } // Create schedule - schedule := NewSchedule(id, timer, job, s.scheduleOpts...) + schedule := NewSchedule(id, timer, job, append(s.scheduleOpts, extraOpts...)...) // Add to managed schedules s.schedules[id] = schedule