Added Extra Opts + Example + ExceedExpected Option

Signed-off-by: Sherif Abdel-Naby <sherifabdlnaby@gmail.com>
This commit is contained in:
Sherif Abdel-Naby 2021-04-10 14:06:34 +02:00
parent 8fc48d852e
commit 7deb8c4c48
9 changed files with 254 additions and 94 deletions

View file

@ -1,71 +0,0 @@
package main
import (
"github.com/sherifabdlnaby/sched"
"github.com/uber-go/tally"
promreporter "github.com/uber-go/tally/prometheus"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
defer time.Sleep(2 * time.Second)
r := promreporter.NewReporter(promreporter.Options{})
// Note: `promreporter.DefaultSeparator` is "_".
// Prometheus doesnt like metrics with "." or "-" in them.
promScope, closer := tally.NewRootScope(tally.ScopeOptions{
Tags: map[string]string{},
CachedReporter: r,
Separator: promreporter.DefaultSeparator,
}, 1*time.Second)
defer closer.Close()
fixed, _ := sched.NewFixed(1 * time.Second)
once, _ := sched.NewOnce(1 * time.Second)
fixed2, _ := sched.NewFixed(5 * time.Second)
schedler1 := sched.NewSchedule("one", once, func() {
log.Println("Once Hello World")
time.Sleep(1 * time.Second)
}, sched.WithLogger(sched.DefaultLogger()), sched.WithConsoleMetrics(10*time.Second))
schedler2 := sched.NewSchedule("ovrlp", fixed, func() {
log.Println("Cron Hello World")
time.Sleep(1 * time.Second)
}, sched.WithLogger(sched.DefaultLogger()), sched.WithMetrics(promScope))
schedler3 := sched.NewSchedule("three", fixed2, func() {
log.Println("panicking")
panic("I panicked :(")
}, sched.WithLogger(sched.DefaultLogger()), sched.WithConsoleMetrics(10*time.Second))
http.Handle("/metrics", r.HTTPHandler())
go http.ListenAndServe(":8080", nil)
schedler1.Start()
defer schedler1.Stop()
schedler2.Start()
defer schedler2.Stop()
schedler3.Start()
defer schedler3.Stop()
time.AfterFunc(5*time.Second, func() {
schedler1.Stop()
})
// Listen to Signals
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
// Termination
_ = <-signalChan
return
}

View file

@ -0,0 +1,45 @@
# Output with a job that run every 2s, takes 1s to run, and expected is 500ms.
Notice the WARN logs.
## Output
```json
2021-04-10T13:57:59.558+0200 INFO sched sched/schedule.go:101 Job Schedule Started {"id": "fixed2s"}
2021-04-10T13:57:59.558+0200 INFO sched sched/schedule.go:176 Job Next Run Scheduled {"id": "fixed2s", "After": "2s", "At": "2021-04-10T13:58:01+02:00"}
2021-04-10T13:58:01.558+0200 INFO sched sched/schedule.go:176 Job Next Run Scheduled {"id": "fixed2s", "After": "2s", "At": "2021-04-10T13:58:03+02:00"}
2021-04-10T13:58:01.558+0200 INFO sched sched/schedule.go:203 Job Run Starting {"id": "fixed2s", "Instance": "4cb6b448-5b93-4689-a129-5448297e727e"}
2021/04/10 13:58:01 Doing some work...
2021-04-10T13:58:02.060+0200 WARN sched sched/schedule.go:211 Job Run Exceeded Expected Time {"id": "fixed2s", "Instance": "4cb6b448-5b93-4689-a129-5448297e727e", "Expected": "1s"}
github.com/sherifabdlnaby/sched.(*Schedule).runJobInstance.func1
/Users/sherifabdlnaby/code/projects/sched/schedule.go:211
2021/04/10 13:58:02 Finished Work.
2021-04-10T13:58:02.560+0200 INFO sched sched/schedule.go:229 Job Finished {"id": "fixed2s", "Instance": "4cb6b448-5b93-4689-a129-5448297e727e", "Duration": "1.002s", "State": "FINISHED"}
2021-04-10T13:58:03.561+0200 INFO sched sched/schedule.go:176 Job Next Run Scheduled {"id": "fixed2s", "After": "2s", "At": "2021-04-10T13:58:05+02:00"}
2021-04-10T13:58:03.561+0200 INFO sched sched/schedule.go:203 Job Run Starting {"id": "fixed2s", "Instance": "3d47d01b-f2d3-4bf3-830c-1815308e3b1f"}
2021/04/10 13:58:03 Doing some work...
2021-04-10T13:58:04.062+0200 WARN sched sched/schedule.go:211 Job Run Exceeded Expected Time {"id": "fixed2s", "Instance": "3d47d01b-f2d3-4bf3-830c-1815308e3b1f", "Expected": "1s"}
github.com/sherifabdlnaby/sched.(*Schedule).runJobInstance.func1
/Users/sherifabdlnaby/code/projects/sched/schedule.go:211
2021/04/10 13:58:04 Finished Work.
2021-04-10T13:58:04.564+0200 INFO sched sched/schedule.go:229 Job Finished {"id": "fixed2s", "Instance": "3d47d01b-f2d3-4bf3-830c-1815308e3b1f", "Duration": "1.003s", "State": "FINISHED"}
2021-04-10T13:58:05.561+0200 INFO sched sched/schedule.go:176 Job Next Run Scheduled {"id": "fixed2s", "After": "2s", "At": "2021-04-10T13:58:07+02:00"}
2021-04-10T13:58:05.561+0200 INFO sched sched/schedule.go:203 Job Run Starting {"id": "fixed2s", "Instance": "78059970-4fc9-4e85-b310-15a3aef08602"}
2021/04/10 13:58:05 Doing some work...
2021-04-10T13:58:06.066+0200 WARN sched sched/schedule.go:211 Job Run Exceeded Expected Time {"id": "fixed2s", "Instance": "78059970-4fc9-4e85-b310-15a3aef08602", "Expected": "1s"}
github.com/sherifabdlnaby/sched.(*Schedule).runJobInstance.func1
/Users/sherifabdlnaby/code/projects/sched/schedule.go:211
2021/04/10 13:58:06 Finished Work.
2021-04-10T13:58:06.563+0200 INFO sched sched/schedule.go:229 Job Finished {"id": "fixed2s", "Instance": "78059970-4fc9-4e85-b310-15a3aef08602", "Duration": "1.002s", "State": "FINISHED"}
2021-04-10T13:58:07.561+0200 INFO sched sched/schedule.go:176 Job Next Run Scheduled {"id": "fixed2s", "After": "2s", "At": "2021-04-10T13:58:09+02:00"}
2021-04-10T13:58:07.561+0200 INFO sched sched/schedule.go:203 Job Run Starting {"id": "fixed2s", "Instance": "2e846c92-e0d6-4028-bca4-d930caded0ce"}
2021/04/10 13:58:07 Doing some work...
2021-04-10T13:58:08.066+0200 WARN sched sched/schedule.go:211 Job Run Exceeded Expected Time {"id": "fixed2s", "Instance": "2e846c92-e0d6-4028-bca4-d930caded0ce", "Expected": "1s"}
github.com/sherifabdlnaby/sched.(*Schedule).runJobInstance.func1
/Users/sherifabdlnaby/code/projects/sched/schedule.go:211
2021/04/10 13:58:08 Finished Work.
2021-04-10T13:58:08.561+0200 INFO sched sched/schedule.go:229 Job Finished {"id": "fixed2s", "Instance": "2e846c92-e0d6-4028-bca4-d930caded0ce", "Duration": "1s", "State": "FINISHED"}
2021-04-10T13:58:09.245+0200 INFO sched sched/schedule.go:130 Stopping Schedule... {"id": "fixed2s"}
2021-04-10T13:58:09.245+0200 INFO sched sched/schedule.go:141 Job Schedule Stopped {"id": "fixed2s"}
```

View file

@ -0,0 +1,41 @@
package main
import (
"fmt"
"github.com/sherifabdlnaby/sched"
"log"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
fixed2s, err := sched.NewFixed(2 * time.Second)
if err != nil {
panic(fmt.Sprintf("invalid interval: %s", err.Error()))
}
job := func() {
log.Println("Doing some work...")
time.Sleep(1 * time.Second)
log.Println("Finished Work.")
}
// Create Schedule
schedule := sched.NewSchedule("fixed2s", fixed2s, job, sched.WithLogger(sched.DefaultLogger()), sched.WithExpectedRunTime(500*time.Millisecond))
// Start Schedule
schedule.Start()
// Listen to CTRL + C And indefintly wait shutdown.
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
_ = <-signalChan
// Stop before shutting down.
schedule.Stop()
return
}

View file

@ -0,0 +1,48 @@
# 4 Schedules in a Scheduler Manager
1. Cron Every Minute
2. Cron Every 5 Minutes
3. Fixed Interval Every 30 Secs
4. *Once* after 10 Secs from schedule start.
Started and Stopped using `StartAll()` and `StopAll()`
## Extra Options
While all schedules inherit the same opts passed to their scheduler. The `fixedTimer30second` schedule has an extra
Option passed to it. Extra Options override inherited options.
## Output
```json
2021-04-10T14:02:12.912+0200 INFO sched sched/schedule.go:101 Job Schedule Started {"id": "cronEveryMinute"}
2021-04-10T14:02:12.912+0200 INFO sched sched/schedule.go:101 Job Schedule Started {"id": "cronEvery5Minute"}
2021-04-10T14:02:12.912+0200 INFO sched sched/schedule.go:101 Job Schedule Started {"id": "fixedTimer30second"}
2021-04-10T14:02:12.912+0200 INFO sched sched/schedule.go:101 Job Schedule Started {"id": "onceAfter10s"}
2021-04-10T14:02:12.912+0200 INFO sched sched/schedule.go:176 Job Next Run Scheduled {"id": "cronEveryMinute", "After": "47s", "At": "2021-04-10T14:03:00+02:00"}
2021-04-10T14:02:12.912+0200 INFO sched sched/schedule.go:176 Job Next Run Scheduled {"id": "fixedTimer30second", "After": "30s", "At": "2021-04-10T14:02:42+02:00"}
2021-04-10T14:02:12.912+0200 INFO sched sched/schedule.go:176 Job Next Run Scheduled {"id": "cronEvery5Minute", "After": "2m47s", "At": "2021-04-10T14:05:00+02:00"}
2021-04-10T14:02:12.912+0200 INFO sched sched/schedule.go:176 Job Next Run Scheduled {"id": "onceAfter10s", "After": "10s", "At": "2021-04-10T14:02:22+02:00"}
2021-04-10T14:02:22.917+0200 INFO sched sched/schedule.go:170 No more Jobs will be scheduled {"id": "onceAfter10s"}
2021-04-10T14:02:22.917+0200 INFO sched sched/schedule.go:130 Stopping Schedule... {"id": "onceAfter10s"}
2021-04-10T14:02:22.917+0200 INFO sched sched/schedule.go:203 Job Run Starting {"id": "onceAfter10s", "Instance": "9d70dff1-a120-446c-937c-64fae1c5922e"}
2021/04/10 14:02:22 onceAfter10s Doing some work...
2021/04/10 14:02:23 onceAfter10s Finished Work.
2021-04-10T14:02:23.921+0200 INFO sched sched/schedule.go:229 Job Finished {"id": "onceAfter10s", "Instance": "9d70dff1-a120-446c-937c-64fae1c5922e", "Duration": "1.003s", "State": "FINISHED"}
2021-04-10T14:02:23.921+0200 INFO sched.metrics sched/metric.go:48 timer sched.run_actual_elapsed_time {"id": "onceAfter10s", "name": "sched.run_actual_elapsed_time", "interval": "1.00318182s", "tags": {"ID":"onceAfter10s"}}
2021-04-10T14:02:23.921+0200 INFO sched.metrics sched/metric.go:48 timer sched.run_total_elapsed_time {"id": "onceAfter10s", "name": "sched.run_total_elapsed_time", "interval": "1.003226808s", "tags": {"ID":"onceAfter10s"}}
2021-04-10T14:02:23.921+0200 INFO sched sched/schedule.go:141 Job Schedule Stopped {"id": "onceAfter10s"}
2021-04-10T14:02:23.921+0200 INFO sched sched/schedule.go:161 Job Schedule Finished {"id": "onceAfter10s"}
2021-04-10T14:02:42.916+0200 INFO sched sched/schedule.go:176 Job Next Run Scheduled {"id": "fixedTimer30second", "After": "30s", "At": "2021-04-10T14:03:12+02:00"}
2021-04-10T14:02:42.916+0200 INFO sched sched/schedule.go:203 Job Run Starting {"id": "fixedTimer30second", "Instance": "2a02ec70-d141-48bd-885f-14bc89329ea6"}
2021/04/10 14:02:42 fixedEvery30Second Doing some work...
2021-04-10T14:02:43.419+0200 WARN sched sched/schedule.go:211 Job Run Exceeded Expected Time {"id": "fixedTimer30second", "Instance": "2a02ec70-d141-48bd-885f-14bc89329ea6", "Expected": "1s"}
github.com/sherifabdlnaby/sched.(*Schedule).runJobInstance.func1
/Users/sherifabdlnaby/code/projects/sched/schedule.go:211
2021/04/10 14:02:43 fixedEvery30Second Finished Work.
2021-04-10T14:02:43.921+0200 INFO sched sched/schedule.go:229 Job Finished {"id": "fixedTimer30second", "Instance": "2a02ec70-d141-48bd-885f-14bc89329ea6", "Duration": "1.005s", "State": "FINISHED"}
2021-04-10T14:02:43.921+0200 INFO sched.metrics sched/metric.go:48 timer sched.run_actual_elapsed_time {"id": "fixedTimer30second", "name": "sched.run_actual_elapsed_time", "interval": "1.004632668s", "tags": {"ID":"fixedTimer30second"}}
2021-04-10T14:02:43.921+0200 INFO sched.metrics sched/metric.go:48 timer sched.run_total_elapsed_time {"id": "fixedTimer30second", "name": "sched.run_total_elapsed_time", "interval": "1.004697199s", "tags": {"ID":"fixedTimer30second"}}
```

View file

@ -0,0 +1,63 @@
package main
import (
"fmt"
"github.com/sherifabdlnaby/sched"
"log"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
job := func(id string) func() {
return func() {
log.Println(id + "\t Doing some work...")
time.Sleep(1 * time.Second)
log.Println(id + "\t Finished Work.")
}
}
cronTimer, err := sched.NewCron("* * * * *")
if err != nil {
panic(fmt.Sprintf("invalid cron expression: %s", err.Error()))
}
cronTimer5, err := sched.NewCron("*/5 * * * *")
if err != nil {
panic(fmt.Sprintf("invalid cron expression: %s", err.Error()))
}
fixedTimer30second, err := sched.NewFixed(30 * time.Second)
if err != nil {
panic(fmt.Sprintf("invalid interval: %s", err.Error()))
}
onceAfter10s, err := sched.NewOnce(10 * time.Second)
if err != nil {
panic(fmt.Sprintf("invalid delay: %s", err.Error()))
}
// Create Schedule
scheduler := sched.NewScheduler(sched.WithLogger(sched.DefaultLogger()),
sched.WithConsoleMetrics(1*time.Minute))
_ = scheduler.Add("cronEveryMinute", cronTimer, job("every-minute-cron"))
_ = scheduler.Add("cronEvery5Minute", cronTimer5, job("every-five-minute-cron"))
_ = scheduler.Add("fixedTimer30second", fixedTimer30second, job("fixedEvery30Second"), sched.WithExpectedRunTime(500*time.Millisecond))
_ = scheduler.Add("onceAfter10s", onceAfter10s, job("onceAfter10s"))
scheduler.StartAll()
// Listen to CTRL + C
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
_ = <-signalChan
// Stop before shutting down.
scheduler.StopAll()
return
}

View file

@ -8,23 +8,25 @@ import (
)
type metrics struct {
up tally.Gauge
runs tally.Counter
overlappingCount tally.Counter
runActualElapsed tally.Timer
runTotalElapsed tally.Timer
runErrors tally.Counter
up tally.Gauge
runs tally.Counter
overlappingCount tally.Counter
runActualElapsed tally.Timer
runTotalElapsed tally.Timer
runErrors tally.Counter
runExceedExpected tally.Counter
}
func newMetrics(name string, metricsScope tally.Scope) *metrics {
subScope := metricsScope.SubScope("sched")
return &metrics{
up: subScope.Tagged(map[string]string{"ID": name}).Gauge("up"),
runs: subScope.Tagged(map[string]string{"ID": name}).Counter("runs"),
overlappingCount: subScope.Tagged(map[string]string{"ID": name}).Counter("runs_overlapping"),
runActualElapsed: subScope.Tagged(map[string]string{"ID": name}).Timer("run_actual_elapsed_time"),
runTotalElapsed: subScope.Tagged(map[string]string{"ID": name}).Timer("run_total_elapsed_time"),
runErrors: subScope.Tagged(map[string]string{"ID": name}).Counter("run_errors"),
up: subScope.Tagged(map[string]string{"ID": name}).Gauge("up"),
runs: subScope.Tagged(map[string]string{"ID": name}).Counter("runs"),
overlappingCount: subScope.Tagged(map[string]string{"ID": name}).Counter("runs_overlapping"),
runActualElapsed: subScope.Tagged(map[string]string{"ID": name}).Timer("run_actual_elapsed_time"),
runTotalElapsed: subScope.Tagged(map[string]string{"ID": name}).Timer("run_total_elapsed_time"),
runErrors: subScope.Tagged(map[string]string{"ID": name}).Counter("run_errors"),
runExceedExpected: subScope.Tagged(map[string]string{"ID": name}).Counter("run_exceed_expected_time"),
}
}

View file

@ -7,8 +7,9 @@ import (
)
type options struct {
logger Logger
metricsScope tally.Scope
logger Logger
metricsScope tally.Scope
expectedRunDuration time.Duration
// ------------------
initDefaultScope bool
defaultScopePrintEvery time.Duration
@ -72,3 +73,16 @@ func WithMetrics(metricsScope tally.Scope) Option {
func WithConsoleMetrics(printEvery time.Duration) Option {
return metricsOption{metricsScope: nil, initConsoleMetrics: true, defaultScopePrintEvery: printEvery}
}
type expectedRunTime struct {
duration time.Duration
}
func (l expectedRunTime) apply(opts *options) {
opts.expectedRunDuration = l.duration
}
//WithExpectedRunTime Use to indicate the expected Runtime ( Logs a warning and adds in metrics when it exceeds )
func WithExpectedRunTime(d time.Duration) Option {
return expectedRunTime{duration: d}
}

View file

@ -40,6 +40,9 @@ type Schedule struct {
// metrics
metrics metrics
// State
expectedRuntime time.Duration
}
// NewSchedule Create a new schedule for` jobFunc func()` that will run according to `timer Timer` with the supplied []Options
@ -64,13 +67,14 @@ func NewSchedule(id string, timer Timer, jobFunc func(), opts ...Option) *Schedu
metrics := *newMetrics(id, options.metricsScope)
return &Schedule{
ID: id,
state: NEW,
jobSrcFunc: jobFunc,
timer: timer,
activeJobs: *newJobMap(),
logger: logger,
metrics: metrics,
ID: id,
state: NEW,
jobSrcFunc: jobFunc,
timer: timer,
activeJobs: *newJobMap(),
logger: logger,
metrics: metrics,
expectedRuntime: options.expectedRunDuration,
}
}
@ -194,15 +198,29 @@ func (s *Schedule) runJobInstance() {
s.activeJobs.add(jobInstance)
defer s.activeJobs.delete(jobInstance)
// Logs and Metrics --------------------------------------
// -------------------------------------------------------
s.logger.Infow("Job Run Starting", "Instance", jobInstance.ID())
s.metrics.runs.Inc(1)
if s.activeJobs.len() > 1 {
s.metrics.overlappingCount.Inc(1)
}
if s.expectedRuntime > 0 {
time.AfterFunc(s.expectedRuntime, func() {
if jobInstance.State() == job.RUNNING {
s.logger.Warnw("Job Run Exceeded Expected Time", "Instance", jobInstance.ID(),
"Expected", s.expectedRuntime.Round(1000*time.Millisecond))
s.metrics.runExceedExpected.Inc(1)
}
})
}
// -------------------------------------------------------
// Synchronously Run Job Instance
err := jobInstance.Run()
// -------------------------------------------------------
// Logs and Metrics --------------------------------------
if err != nil {
s.logger.Errorw("Job Error", "Instance", jobInstance.ID(),
"Duration", jobInstance.ActualElapsed().Round(1*time.Millisecond),

View file

@ -22,7 +22,7 @@ func NewScheduler(opts ...Option) *Scheduler {
}
//Add Create a new schedule for` jobFunc func()` that will run according to `timer Timer` with the []Options of the Scheduler.
func (s *Scheduler) Add(id string, timer Timer, job func()) error {
func (s *Scheduler) Add(id string, timer Timer, job func(), extraOpts ...Option) error {
s.mx.Lock()
defer s.mx.Unlock()
@ -31,7 +31,7 @@ func (s *Scheduler) Add(id string, timer Timer, job func()) error {
}
// Create schedule
schedule := NewSchedule(id, timer, job, s.scheduleOpts...)
schedule := NewSchedule(id, timer, job, append(s.scheduleOpts, extraOpts...)...)
// Add to managed schedules
s.schedules[id] = schedule