Add Context to Job, update examples, Limit Retries of Jobs

This commit is contained in:
Justin Hammond 2021-04-24 23:39:23 +08:00
parent cf767ae59c
commit 1d8093471d
21 changed files with 576 additions and 150 deletions

View file

@ -1,32 +1,76 @@
package main
import (
"context"
"fmt"
"github.com/sherifabdlnaby/sched"
"log"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"
"time"
"github.com/sherifabdlnaby/sched"
"github.com/sherifabdlnaby/sched/job"
)
func testMiddleware(s *sched.Schedule, newstate sched.State) (err error) {
log.Printf("Running Middleware with State %s and NewState %s", s.State().String(), newstate.String())
return nil
type test1mw struct {
}
func testMiddleware2(s *sched.Schedule, newstate sched.State) (err error) {
log.Printf("Running Middleware2 with State %s and NewState %s", s.State().String(), newstate.String())
return nil
func (mw *test1mw) Handler(s *sched.Schedule, newstate sched.State) (bool, error) {
//log.Printf("Running Middleware with State %s and NewState %s", s.State().String(), newstate.String())
return false, nil
}
type test2mw struct {
}
func (mw *test2mw) Handler(s *sched.Schedule, newstate sched.State) (bool, error) {
//log.Printf("Running Middleware2 with State %s and NewState %s", s.State().String(), newstate.String())
return false, nil
}
func main() {
job := func(id string) func() {
return func() {
log.Println(id + "\t Doing some work...")
time.Sleep(1 * time.Second)
log.Println(id + "\t Finished Work.")
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
exbomw := sched.NewDefaultExponentialBackoffMW()
thmw := sched.NewTagHandlerMW()
thmw.SetWantTags("Hello")
job1 := func(seconds time.Duration) func(context.Context) {
return func(ctx context.Context) {
jobrunner, _ := ctx.Value(job.JobCtxValue{}).(*job.Job)
log.Println("Job ", jobrunner.ID(), " Duration: ", seconds*time.Second, "\t Doing some work...")
if thmw.IsHaveTag("Hello") {
thmw.DelHaveTags("Hello")
} else {
thmw.SetHaveTags("Hello")
}
select {
case <-ctx.Done():
log.Println("Job ", jobrunner.ID(), " Context Cancelled Job")
case <-time.After(time.Second * seconds):
log.Println("Job ", jobrunner.ID(), " Work Done")
}
//log.Panic("Job ", job.ID(), "Pannic Test")
log.Println("Job ", jobrunner.ID(), "Duration: ", seconds*time.Second, "\t Finished Work.")
}
}
job2 := func(seconds time.Duration) func(context.Context) {
return func(ctx context.Context) {
jobrunner, _ := ctx.Value(job.JobCtxValue{}).(*job.Job)
select {
case <-ctx.Done():
log.Println("NeedTagsJob ", jobrunner.ID(), " Context Cancelled Job")
default:
log.Println("NeedTagsJob ", jobrunner.ID(), "Is Running")
}
}
}
@ -54,24 +98,38 @@ func main() {
scheduler := sched.NewScheduler(
sched.WithLogger(sched.DefaultLogger()),
//sched.WithConsoleMetrics(1*time.Minute),
sched.WithMiddleWare(testMiddleware),
sched.WithMiddleWare(testMiddleware2),
sched.DisallowOverlappingJobsOption(true),
sched.WithMiddleWare(&test1mw{}),
sched.WithMiddleWare(&test2mw{}),
sched.WithMiddleWare(exbomw),
//sched.SetMaxJobRetriesOption(2),
)
ctx1, cancel1 := context.WithCancel(context.Background())
ctx2, cancel2 := context.WithCancel(context.Background())
_ = cronTimer
_ = onceAfter10s
_ = cronTimer5
//_ = scheduler.Add("cronEveryMinute", cronTimer, job("every-minute-cron"))
//_ = scheduler.Add("cronEvery5Minute", cronTimer5, job("every-five-minute-cron"))
_ = scheduler.Add("fixedTimer30second", fixedTimer10second, job("fixedEvery10Second"))
//_ = scheduler.Add("onceAfter10s", onceAfter10s, job("onceAfter10s"))
_ = scheduler.Add(ctx1, "cronEveryMinute", cronTimer, job1(12))
_ = scheduler.Add(ctx2, "cronEvery5Minute", cronTimer5, job1(8))
_ = scheduler.Add(ctx1, "fixedTimer10second", cronTimer, job1(1))
_ = scheduler.Add(ctx2, "fixedTimer10second30SecondDuration", fixedTimer10second, job1(7))
_ = scheduler.Add(ctx2, "TagHandler", fixedTimer10second, job2(5), sched.WithMiddleWare(thmw))
_ = scheduler.Add(ctx2, "onceAfter10s", onceAfter10s, job1(12))
scheduler.StartAll()
//scheduler.Start("TagHandler")
// Listen to CTRL + C
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
_ = <-signalChan
// Send Cancel Signals to our Jobs
cancel1()
cancel2()
// Stop before shutting down.
scheduler.StopAll()