This commit is contained in:
Justin Hammond 2021-09-17 13:09:42 +08:00
commit 78c04b73fc
34 changed files with 3202 additions and 0 deletions

36
.gitignore vendored Normal file
View file

@ -0,0 +1,36 @@
### General
bin
.env
### JetBrains template
# User-specific stuff
.idea
### Windows template
# Windows thumbnail cache files
Thumbs.db
### macOS template
# General
.DS_Store
# Thumbnails
._*
### Go template
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
### VisualStudioCode template
.vscode/*

46
.golangci.yml Normal file
View file

@ -0,0 +1,46 @@
run:
skip-dirs:
- .gen
skip-files:
- ".*_gen\\.go$"
linters-settings:
gocyclo:
min-complexity: 30
goconst:
min-len: 2
min-occurrences: 2
misspell:
locale: US
golint:
min-confidence: 0.8
linters:
disable-all: true
enable:
- goconst
- gocyclo
- golint
- govet
- gofmt
- errcheck
- staticcheck
- unused
- gosimple
- structcheck
- varcheck
- ineffassign
- deadcode
- unconvert
- gosec
- golint
- bodyclose
- misspell
- unconvert
- unparam
- dogsled
- goimports
service:
golangci-lint-version: 1.21.x

22
LICENSE Normal file
View file

@ -0,0 +1,22 @@
MIT License
Copyright (c) 2021 Sherif Abdel-Naby
Copyright (c) 2021 Justin Hammond
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

1
README.md Normal file
View file

@ -0,0 +1 @@
# Go TaskManager

View file

@ -0,0 +1,56 @@
# 4 Schedules in a Scheduler Manager
1. Cron Every Minute
2. Cron Every 5 Minutes
3. Fixed Interval Every 30 Secs
4. *Once* after 10 Secs from schedule start.
Started and Stopped using `StartAll()` and `StopAll()`
## Output
```json
2021-04-10T13:26: 43.142+0200 INFO sched sched/schedule.go: 96 Job Schedule Started {"id": "cronEveryMinute"}
2021-04-10T13:26: 43.142+0200 INFO sched sched/schedule.go: 96 Job Schedule Started {"id": "cronEvery5Minute"}
2021-04-10T13:26: 43.142+0200 INFO sched sched/schedule.go: 96 Job Schedule Started {"id": "fixedTimer30second"}
2021-04-10T13:26: 43.142+0200 INFO sched sched/schedule.go: 96 Job Schedule Started {"id": "onceAfter10s"}
2021-04-10T13:26: 43.142+0200 INFO sched sched/schedule.go: 168 Job Next Run Scheduled {"id": "cronEveryMinute", "After": "17s", "At": "2021-04-10T13:27:00+02:00"
}
2021-04-10T13: 26: 43.143+0200 INFO sched sched/schedule.go: 168 Job Next Run Scheduled {"id": "onceAfter10s", "After": "10s", "At": "2021-04-10T13:26:53+02:00"}
2021-04-10T13: 26:43.142+0200 INFO sched sched/schedule.go: 168 Job Next Run Scheduled {"id": "cronEvery5Minute", "After": "3m17s", "At": "2021-04-10T13:30:00+02:00"
}
2021-04-10T13: 26: 43.143+0200 INFO sched sched/schedule.go: 168 Job Next Run Scheduled {"id": "fixedTimer30second", "After": "30s", "At": "2021-04-10T13:27:13+02:00"}
2021-04-10T13: 26: 53.143+0200 INFO sched sched/schedule.go: 162 No more Jobs will be scheduled {"id": "onceAfter10s"}
2021-04-10T13: 26: 53.143+0200 INFO sched sched/schedule.go: 125 Stopping Schedule... {"id": "onceAfter10s"}
2021-04-10T13: 26:53.143+0200 INFO sched sched/schedule.go: 130 Waiting active jobs to finish... {"id": "onceAfter10s"}
2021-04-10T13:26: 53.143+0200 INFO sched sched/schedule.go: 193 Job Run Starting {"id": "onceAfter10s", "Instance": "47fe7d35-3494-43e0-8771-4282ecb80f3a"}
2021/04/10 13: 26: 53 onceAfter10s Doing some work...
2021/04/10 13: 26: 54 onceAfter10s Finished Work.
2021-04-10T13: 26: 54.148+0200 INFO sched sched/schedule.go: 208 Job Finished {"id": "onceAfter10s", "Instance": "47fe7d35-3494-43e0-8771-4282ecb80f3a", "Duration": "1.005s", "State": "FINISHED"}
2021-04-10T13: 26: 54.148+0200 INFO sched.metrics sched/metric.go: 48 timer sched.run_actual_elapsed_time {"id": "onceAfter10s", "name": "sched.run_actual_elapsed_time", "interval": "1.004917899s", "tags": {
"ID": "onceAfter10s"
}
}
2021-04-10T13: 26: 54.148+0200 INFO sched.metrics sched/metric.go: 48 timer sched.run_total_elapsed_time {"id": "onceAfter10s", "name": "sched.run_total_elapsed_time", "interval": "1.004966378s", "tags": {"ID": "onceAfter10s"}
}
2021-04-10T13: 26: 54.148+0200 INFO sched sched/schedule.go: 133 Job Schedule Stopped {"id": "onceAfter10s"}
2021-04-10T13: 26: 54.148+0200 INFO sched sched/schedule.go: 153 Job Schedule Finished {"id": "onceAfter10s"}
2021-04-10T13: 26: 57.662+0200 INFO sched sched/schedule.go: 125 Stopping Schedule... {"id": "cronEveryMinute"}
2021-04-10T13: 26: 57.662+0200 INFO sched sched/schedule.go: 125 Stopping Schedule... {"id": "fixedTimer30second"}
2021-04-10T13: 26: 57.662+0200 INFO sched sched/schedule.go: 130 Waiting active jobs to finish... {"id": "cronEveryMinute"}
2021-04-10T13: 26: 57.662+0200 INFO sched sched/schedule.go: 125 Stopping Schedule... {"id": "cronEvery5Minute"}
2021-04-10T13: 26: 57.662+0200 INFO sched sched/schedule.go: 133 Job Schedule Stopped {"id": "cronEveryMinute"}
2021-04-10T13: 26: 57.662+0200 INFO sched sched/schedule.go: 130 Waiting active jobs to finish... {"id": "cronEvery5Minute"
}
2021-04-10T13: 26: 57.662+0200 INFO sched sched/schedule.go: 171 Job Next Run Canceled {"id": "fixedTimer30second", "At": "2021-04-10T13:27:13+02:00"
}
2021-04-10T13:26: 57.662+0200 INFO sched sched/schedule.go: 171 Job Next Run Canceled {"id": "cronEvery5Minute", "At": "2021-04-10T13:30:00+02:00"
}
2021-04-10T13: 26: 57.662+0200 INFO sched sched/schedule.go: 133 Job Schedule Stopped {"id": "cronEvery5Minute"
}
2021-04-10T13: 26: 57.662+0200 INFO sched sched/schedule.go: 130 Waiting active jobs to finish... {"id": "fixedTimer30second"}
2021-04-10T13: 26: 57.662+0200 INFO sched sched/schedule.go: 133 Job Schedule Stopped {"id": "fixedTimer30second"}
2021-04-10T13: 26: 57.662+0200 INFO sched sched/schedule.go: 171 Job Next Run Canceled {"id": "cronEveryMinute", "At": "2021-04-10T13:27:00+02:00"}
```

View file

@ -0,0 +1,146 @@
package main
import (
"context"
"fmt"
"log"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"
"time"
"github.com/Fishwaldo/go-taskmanager"
"github.com/Fishwaldo/go-taskmanager/job"
logruslog "github.com/Fishwaldo/go-taskmanager/loggers/logrus"
prometheusConfig "github.com/Fishwaldo/go-taskmanager/metrics/prometheus"
executionmiddleware "github.com/Fishwaldo/go-taskmanager/middleware/executation"
retrymiddleware "github.com/Fishwaldo/go-taskmanager/middleware/retry"
//"github.com/armon/go-metrics"
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
func main() {
cfg := prometheus.PrometheusOpts{
GaugeDefinitions: prometheusConfig.GetPrometicsGaugeConfig(),
CounterDefinitions: prometheusConfig.GetPrometicsCounterConfig(),
SummaryDefinitions: prometheusConfig.GetPrometicsSummaryConfig(),
}
sink, err := prometheus.NewPrometheusSinkFrom(cfg)
if err != nil {
log.Fatal("Cant Create Prometheus Sink")
}
defer prom.Unregister(sink)
metrics.NewGlobal(metrics.DefaultConfig("test"), sink)
go func() {
http.Handle("/metrics", promhttp.Handler())
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
exbomw := retrymiddleware.NewDefaultRetryExponentialBackoff()
thmw := executionmiddleware.NewTagHandler()
thmw.SetRequiredTags("Hello")
cjl := executionmiddleware.NewCJLock()
mrt := retrymiddleware.NewRetryRetryCountLimit(5)
job1 := func(seconds time.Duration) func(context.Context) {
return func(ctx context.Context) {
jobrunner, _ := ctx.Value(job.JobCtxValue{}).(*job.Job)
log.Println("Job ", jobrunner.ID(), " Duration: ", seconds*time.Second, "\t Doing some work...")
if thmw.IsHaveTag("Hello") {
thmw.DelHaveTags("Hello")
} else {
thmw.SetHaveTags("Hello")
}
select {
case <-ctx.Done():
log.Println("Job ", jobrunner.ID(), " Context Cancelled Job")
case <-time.After(time.Second * seconds):
log.Println("Job ", jobrunner.ID(), " Work Done")
}
//log.Panic("Job ", jobrunner.ID(), "Pannic Test")
log.Println("Job ", jobrunner.ID(), "Duration: ", seconds*time.Second, "\t Finished Work.")
}
}
job2 := func(seconds time.Duration) func(context.Context) {
return func(ctx context.Context) {
jobrunner, _ := ctx.Value(job.JobCtxValue{}).(*job.Job)
select {
case <-ctx.Done():
log.Println("NeedTagsJob ", jobrunner.ID(), " Context Cancelled Job")
default:
log.Println("NeedTagsJob ", jobrunner.ID(), "Is Running")
}
}
}
cronTimer, err := taskmanager.NewCron("* * * * *")
if err != nil {
panic(fmt.Sprintf("invalid cron expression: %s", err.Error()))
}
cronTimer5, err := taskmanager.NewCron("*/1 * * * *")
if err != nil {
panic(fmt.Sprintf("invalid cron expression: %s", err.Error()))
}
fixedTimer10second, err := taskmanager.NewFixed(10 * time.Second)
if err != nil {
panic(fmt.Sprintf("invalid interval: %s", err.Error()))
}
onceAfter10s, err := taskmanager.NewOnce(10 * time.Second)
if err != nil {
panic(fmt.Sprintf("invalid delay: %s", err.Error()))
}
// logger := sched.DefaultLogger()
// logger.SetLevel(sched.LOG_TRACE)
logger := logruslog.LogrusDefaultLogger()
// Create Schedule
scheduler := taskmanager.NewScheduler(
taskmanager.WithLogger(logger),
)
//ctx1, cancel1 := context.WithCancel(context.Background())
ctx2, cancel2 := context.WithCancel(context.Background())
_ = cronTimer
_ = onceAfter10s
_ = cronTimer5
//_ = scheduler.Add(ctx1, "cronEveryMinute", cronTimer, job1(12))
//_ = scheduler.Add(ctx2, "cronEvery5Minute", cronTimer5, job1(8))
//_ = scheduler.Add(ctx1, "fixedTimer10second", cronTimer, job1(1))
_ = scheduler.Add(ctx2, "fixedTimer10second30SecondDuration", fixedTimer10second, job1(21), taskmanager.WithExecutationMiddleWare(cjl), taskmanager.WithRetryMiddleWare(mrt), taskmanager.WithRetryMiddleWare(exbomw))
_ = scheduler.Add(ctx2, "TagHandler", fixedTimer10second, job2(5), taskmanager.WithExecutationMiddleWare(thmw))
//_ = scheduler.Add(ctx2, "onceAfter10s", onceAfter10s, job1(12))
scheduler.StartAll()
//scheduler.Start("TagHandler")
// Listen to CTRL + C
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
_ = <-signalChan
// Send Cancel Signals to our Jobs
//cancel1()
cancel2()
// Stop before shutting down.
scheduler.StopAll()
}

24
go.mod Normal file
View file

@ -0,0 +1,24 @@
module github.com/Fishwaldo/go-taskmanager
go 1.13
require (
github.com/armon/go-metrics v0.3.9
github.com/cenkalti/backoff/v4 v4.1.0
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.2.0
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75
github.com/m3db/prometheus_client_golang v0.8.1 // indirect
github.com/m3db/prometheus_client_model v0.1.0 // indirect
github.com/m3db/prometheus_common v0.1.0 // indirect
github.com/m3db/prometheus_procfs v0.8.1 // indirect
github.com/prometheus/client_golang v1.4.0
github.com/sasha-s/go-deadlock v0.3.1 // indirect
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.5.1
github.com/uber-go/tally v3.3.17+incompatible
go.uber.org/zap v1.16.0
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4 // indirect
golang.org/x/tools v0.1.0 // indirect
)

197
go.sum Normal file
View file

@ -0,0 +1,197 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/armon/go-metrics v0.3.9 h1:O2sNqxBdvq8Eq5xmzljcYzAORli6RWCvEym4cJf9m18=
github.com/armon/go-metrics v0.3.9/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 h1:f0n1xnMSmBLzVfsMMvriDyA75NB/oBgILX2GcHXIQzY=
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75/go.mod h1:g2644b03hfBX9Ov0ZBDgXXens4rxSxmqFBbhvKv2yVA=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs=
github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/m3db/prometheus_client_golang v0.8.1 h1:t7w/tcFws81JL1j5sqmpqcOyQOpH4RDOmIe3A3fdN3w=
github.com/m3db/prometheus_client_golang v0.8.1/go.mod h1:8R/f1xYhXWq59KD/mbRqoBulXejss7vYtYzWmruNUwI=
github.com/m3db/prometheus_client_model v0.1.0 h1:cg1+DiuyT6x8h9voibtarkH1KT6CmsewBSaBhe8wzLo=
github.com/m3db/prometheus_client_model v0.1.0/go.mod h1:Qfsxn+LypxzF+lNhak7cF7k0zxK7uB/ynGYoj80zcD4=
github.com/m3db/prometheus_common v0.1.0 h1:YJu6eCIV6MQlcwND24cRG/aRkZDX1jvYbsNNs1ZYr0w=
github.com/m3db/prometheus_common v0.1.0/go.mod h1:EBmDQaMAy4B8i+qsg1wMXAelLNVbp49i/JOeVszQ/rs=
github.com/m3db/prometheus_procfs v0.8.1 h1:LsxWzVELhDU9sLsZTaFLCeAwCn7bC7qecZcK4zobs/g=
github.com/m3db/prometheus_procfs v0.8.1/go.mod h1:N8lv8fLh3U3koZx1Bnisj60GYUMDpWb09x1R+dmMOJo=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.4.0 h1:YVIb/fVcOTMSqtqZWSKnHpSLBxu8DKgxq8z6RuBZwqI=
github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.9.1 h1:KOMtN28tlbam3/7ZKEYKHhKoJZYYj3gMH4uc62x7X7U=
github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/sasha-s/go-deadlock v0.3.1 h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0=
github.com/sasha-s/go-deadlock v0.3.1/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/uber-go/tally v3.3.17+incompatible h1:nFHIuW3VQ22wItiE9kPXic8dEgExWOsVOHwpmoIvsMw=
github.com/uber-go/tally v3.3.17+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM=
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 h1:2M3HP5CCK1Si9FQhwnzYhXdG6DXeebvUHFpre8QvbyI=
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4 h1:EZ2mChiOa8udjfp6rRmswTbtZN/QzUQp4ptM4rnjHvc=
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5 h1:ymVxjfMaHvXD8RqPRmzHHsB3VvucivSkIAvJFDI5O3c=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=

130
job/job.go Normal file
View file

@ -0,0 +1,130 @@
package job
import (
"context"
"fmt"
"sync"
"time"
"github.com/Fishwaldo/go-taskmanager/joberrors"
"github.com/google/uuid"
)
//Job Wraps JobFun and provide:
// 1. Creation, Start, and Finish Time
// 2. Recover From Panics
type Job struct {
id string
jobFunc func(ctx context.Context)
createTime time.Time
startTime time.Time
finishTime time.Time
state State
mx sync.RWMutex
ctx context.Context
}
type JobCtxValue struct{}
//State Return Job current state.
func (j *Job) State() State {
j.mx.RLock()
defer j.mx.RUnlock()
return j.state
}
//NewJobWithID Create new Job with the supplied Id.
func NewJobWithID(ctx context.Context, id string, jobFunc func(context.Context)) *Job {
return &Job{
id: id,
jobFunc: jobFunc,
createTime: time.Now(),
startTime: time.Time{},
finishTime: time.Time{},
state: NEW,
ctx: ctx,
}
}
//NewJob Create new Job, id is assigned a UUID instead.
func NewJob(ctx context.Context, jobFunc func(context.Context)) *Job {
return NewJobWithID(ctx, uuid.New().String(), jobFunc)
}
//ID Return Job ID
func (j *Job) ID() string {
return j.id
}
//ActualElapsed Return the actual time of procession of Job.
// Return -1 if job hasn't started yet.
func (j *Job) ActualElapsed() time.Duration {
j.mx.RLock()
defer j.mx.RUnlock()
if !j.startTime.IsZero() {
if j.finishTime.IsZero() {
return time.Since(j.startTime)
}
return j.finishTime.Sub(j.startTime)
}
return -1
}
//TotalElapsed Returns the total time between creation of object and finishing processing its job.
// Return -1 if job hasn't started yet.
func (j *Job) TotalElapsed() time.Duration {
j.mx.RLock()
defer j.mx.RUnlock()
if !j.startTime.IsZero() {
if j.finishTime.IsZero() {
return time.Since(j.createTime)
}
return j.finishTime.Sub(j.createTime)
}
return -1
}
//Run Run the internal Job (synchronous)
func (j *Job) Run() error {
return j.run()
}
func (j *Job) run() (err error) {
j.mx.Lock()
if j.state != NEW {
if j.state == RUNNING {
err = joberrors.FailedJobError{ErrorType: joberrors.Error_ConcurrentJob, Message: "job already started"}
} else {
err = joberrors.FailedJobError{ErrorType: joberrors.Error_ConcurrentJob, Message: "job finished execution"}
}
j.mx.Unlock()
return err
}
// Handle Panics and set correct state
defer func() {
j.mx.Lock()
// TODO handle panics
if r := recover(); r != nil {
err = joberrors.FailedJobError{ErrorType: joberrors.Error_Panic, Message: fmt.Sprintf("job panicked: %v", r)}
j.state = PANICKED
} else {
j.state = FINISHED
}
j.finishTime = time.Now()
j.mx.Unlock()
}()
j.state = RUNNING
j.startTime = time.Now()
// Unlock State
j.mx.Unlock()
// Run Job
j.jobFunc(context.WithValue(j.ctx, JobCtxValue{}, j))
return nil
}

112
job/job_test.go Normal file
View file

@ -0,0 +1,112 @@
//nolint
package job
import (
"context"
"errors"
"testing"
"time"
)
func TestErrorJobPanic_Error(t *testing.T) {
want := "panic text"
e := ErrorJobPanic{want}
if got := e.Error(); got != want {
t.Errorf("Error() = %v, want %v", got, want)
}
}
func TestErrorJobStarted_Error(t *testing.T) {
want := "panic text"
e := ErrorJobPanic{want}
if got := e.Error(); got != want {
t.Errorf("Error() = %v, want %v", got, want)
}
}
func TestJob_ActualElapsed(t *testing.T) {
timeWait := 1 * time.Second
ctx := context.Background()
j := NewJob(ctx, func(context.Context) {
time.Sleep(timeWait)
})
j.Run()
want := timeWait
got := j.ActualElapsed().Round(1 * time.Second)
if got != want {
t.Errorf("Actual Elapsed Time not accurate, want %v, got %v", want, got)
}
}
func TestJob_TotalElapsed(t *testing.T) {
timeWait := 1 * time.Second
ctx := context.Background()
j := NewJob(ctx, func(context.Context) {
time.Sleep(timeWait)
})
time.Sleep(timeWait)
j.Run()
want := timeWait * 2
got := j.TotalElapsed().Round(1 * time.Second)
if got != want {
t.Errorf("Total Elapsed Time not accurate, want %v, got %v", want, got)
}
}
func TestJob_ID(t *testing.T) {
want := "idxxx"
j := &Job{
id: want,
}
if got := j.ID(); got != want {
t.Errorf("ID() = %v, want %v", got, want)
}
}
func TestJob_Run(t *testing.T) {
receiveChan := make(chan string)
ctx := context.Background()
receiveWant := "testx"
j := NewJob(ctx, func(context.Context) {
receiveChan <- receiveWant
})
go j.Run()
select {
case got := <-receiveChan:
if got != receiveWant {
t.Errorf("Job Run but got unexpcted result, want %v, got %v", receiveWant, got)
}
case <-time.After(5 * time.Second):
t.Errorf("job didn't run [timeout]")
}
}
func TestJob_RunPanicRecover(t *testing.T) {
ctx := context.Background()
j := NewJob(ctx, func(context.Context) {
panic("panicked")
})
err := j.Run()
if err == nil {
t.Error("Job panicked and returned no error.")
return
}
ref := ErrorJobPanic{"example error"}
if !errors.As(err, &ref) {
t.Error("Job panicked and handled but returned different error type.")
}
}

30
job/state.go Normal file
View file

@ -0,0 +1,30 @@
package job
//State Indicate the state of the Job
type State int64
const (
// NEW Job has just been created and hasn't started yet
NEW State = iota
// RUNNING Job started and is running.
RUNNING
// FINISHED Job started and finished processing.
FINISHED
// PANICKED Job started and finished but encountered a panic.
PANICKED
)
func (s State) String() string {
switch s {
case NEW:
return "NEW"
case RUNNING:
return "RUNNING"
case FINISHED:
return "FINISHED"
case PANICKED:
return "PANICKED"
default:
return "UNKNOWN"
}
}

View file

@ -0,0 +1,26 @@
// Code generated by "stringer -type=Error_Type"; DO NOT EDIT.
package joberrors
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[Error_None-0]
_ = x[Error_Panic-1]
_ = x[Error_ConcurrentJob-2]
_ = x[Error_DeferedJob-3]
}
const _Error_Type_name = "Error_NoneError_PanicError_ConcurrentJobError_DeferedJob"
var _Error_Type_index = [...]uint8{0, 10, 21, 40, 56}
func (i Error_Type) String() string {
if i < 0 || i >= Error_Type(len(_Error_Type_index)-1) {
return "Error_Type(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _Error_Type_name[_Error_Type_index[i]:_Error_Type_index[i+1]]
}

52
joberrors/errors.go Normal file
View file

@ -0,0 +1,52 @@
package joberrors
import (
// "errors"
)
//go:generate stringer -type=Error_Type
type Error_Type int;
const (
Error_None Error_Type = iota
Error_Panic
Error_ConcurrentJob
Error_DeferedJob
Error_Middleware
)
type FailedJobError struct {
Message string
ErrorType Error_Type
}
func (e FailedJobError) Error() string {
return e.Message;
}
func (e FailedJobError) Is(target error) bool {
_, ok := target.(FailedJobError)
if ok == false {
return false
}
return true;
}
//ErrorScheduleNotFound Error When we can't find a Schedule
type ErrorScheduleNotFound struct {
Message string
}
func (e ErrorScheduleNotFound) Error() string {
return e.Message
}
//ErrorScheduleExists Error When a schedule already exists
type ErrorScheduleExists struct {
Message string
}
func (e ErrorScheduleExists) Error() string {
return e.Message
}

36
jobmap.go Normal file
View file

@ -0,0 +1,36 @@
package taskmanager
import (
"sync"
"github.com/Fishwaldo/go-taskmanager/job"
)
type jobMap struct {
jobs map[string]*job.Job
mx sync.RWMutex
}
func newJobMap() *jobMap {
return &jobMap{
jobs: make(map[string]*job.Job),
}
}
func (jm *jobMap) add(j *job.Job) {
jm.mx.Lock()
defer jm.mx.Unlock()
jm.jobs[j.ID()] = j
}
func (jm *jobMap) delete(j *job.Job) {
jm.mx.Lock()
defer jm.mx.Unlock()
delete(jm.jobs, j.ID())
}
func (jm *jobMap) len() int {
jm.mx.RLock()
defer jm.mx.RUnlock()
return len(jm.jobs)
}

120
log.go Normal file
View file

@ -0,0 +1,120 @@
package taskmanager
import (
"encoding/json"
"fmt"
"log"
"sync"
"github.com/Fishwaldo/go-taskmanager/utils"
)
type Logger interface {
Trace(message string, params ...interface{})
Debug(message string, params ...interface{})
Info(message string, params ...interface{})
Warn(message string, params ...interface{})
Error(message string, params ...interface{})
Fatal(message string, params ...interface{})
Panic(message string, params ...interface{})
New(name string) (l Logger)
With(key string, value interface{}) (l Logger)
Sync()
}
//DefaultLogger uses Golang Standard Logging Libary
func DefaultLogger() (l *StdLogger) {
stdlog := &StdLogger{Log: *log.Default(), keys: make(map[string]interface{})}
stdlog.Log.SetPrefix("sched - ")
return stdlog
}
type StdLogger struct {
Log log.Logger
keys map[string]interface{}
mx sync.Mutex
level Log_Level
}
type Log_Level int
const (
LOG_TRACE Log_Level = iota
LOG_DEBUG
LOG_INFO
LOG_WARN
LOG_ERROR
LOG_FATAL
LOG_PANIC
)
func (l *StdLogger) Trace(message string, params ...interface{}) {
if l.level <= LOG_TRACE {
l.Log.Printf("TRACE: %s - %s", fmt.Sprintf(message, params...), l.getKeys())
}
}
func (l *StdLogger) Debug(message string, params ...interface{}) {
if l.level <= LOG_DEBUG {
l.Log.Printf("DEBUG: %s - %s", fmt.Sprintf(message, params...), l.getKeys())
}
}
func (l *StdLogger) Info(message string, params ...interface{}) {
if l.level <= LOG_INFO {
l.Log.Printf("INFO: %s - %s", fmt.Sprintf(message, params...), l.getKeys())
}
}
func (l *StdLogger) Warn(message string, params ...interface{}) {
if l.level <= LOG_WARN {
l.Log.Printf("WARN: %s - %s", fmt.Sprintf(message, params...), l.getKeys())
}
}
func (l *StdLogger) Error(message string, params ...interface{}) {
if l.level <= LOG_ERROR {
l.Log.Printf("ERROR: %s - %s", fmt.Sprintf(message, params...), l.getKeys())
}
}
func (l *StdLogger) Fatal(message string, params ...interface{}) {
l.Log.Fatal(fmt.Printf("FATAL: %s - %s", fmt.Sprintf(message, params...), l.getKeys()))
}
func (l *StdLogger) Panic(message string, params ...interface{}) {
l.Log.Panic(fmt.Printf("PANIC: %s - %s", fmt.Sprintf(message, params...), l.getKeys()))
}
func (l *StdLogger) New(name string) Logger {
//nl := &StdLogger{keys: l.keys}
nl := &StdLogger{level: l.level}
nl.Log.SetPrefix(fmt.Sprintf("%s.%s", l.Log.Prefix(), name))
nl.Log.SetFlags(l.Log.Flags())
nl.Log.SetOutput(l.Log.Writer())
return nl
}
func (l *StdLogger) With(key string, value interface{}) Logger {
l.mx.Lock()
defer l.mx.Unlock()
stdlog := &StdLogger{level: l.level, keys: utils.CopyableMap(l.keys).DeepCopy()}
stdlog.Log.SetPrefix(l.Log.Prefix())
stdlog.Log.SetFlags(l.Log.Flags())
stdlog.Log.SetOutput(l.Log.Writer())
stdlog.keys[key] = value
return stdlog
}
func (l *StdLogger) Sync() {
// do nothing
}
func (l *StdLogger) getKeys() (message string) {
l.mx.Lock()
defer l.mx.Unlock()
msg, err := json.Marshal(l.keys)
if err == nil {
return string(msg)
}
return err.Error()
}
func (l *StdLogger) SetLevel(level Log_Level) {
l.level = level
}
func (l *StdLogger) GetLevel() (level Log_Level) {
return l.level
}

115
log_test.go Normal file
View file

@ -0,0 +1,115 @@
package taskmanager
import (
"bytes"
"os"
"regexp"
"testing"
)
func TestDefaultLogger(t *testing.T) {
logger := DefaultLogger()
logger.SetLevel(LOG_TRACE)
if logger.level != LOG_TRACE {
t.Errorf("Can't Set Logging Level")
}
if logger.GetLevel() != LOG_TRACE {
t.Error("GetLevel Didn't return Correct Logging Level")
}
}
func captureOutput(l *StdLogger, f func()) string {
var buf bytes.Buffer
l.Log.SetOutput(&buf)
f()
l.Log.SetOutput(os.Stderr)
return buf.String()
}
func TestLogTrace(t *testing.T) {
logger := DefaultLogger()
logger.SetLevel(LOG_TRACE)
output := captureOutput(logger, func() {
logger.Trace("Hello %s", "world")
})
validmsg := regexp.MustCompile(`^.* TRACE: Hello world \- {}`)
if !validmsg.MatchString(output) {
t.Errorf("Log Trace Failed: %s", output)
}
}
func TestLogDebug(t *testing.T) {
logger := DefaultLogger()
logger.SetLevel(LOG_TRACE)
output := captureOutput(logger, func() {
logger.Debug("Hello %s", "world")
})
validmsg := regexp.MustCompile(`^.* DEBUG: Hello world \- {}`)
if !validmsg.MatchString(output) {
t.Errorf("Log Debug Failed: %s", output)
}
}
func TestLogInfo(t *testing.T) {
logger := DefaultLogger()
logger.SetLevel(LOG_TRACE)
output := captureOutput(logger, func() {
logger.Info("Hello %s", "world")
})
validmsg := regexp.MustCompile(`^.* INFO: Hello world \- {}`)
if !validmsg.MatchString(output) {
t.Errorf("Log Info Failed: %s", output)
}
}
func TestLogWarn(t *testing.T) {
logger := DefaultLogger()
logger.SetLevel(LOG_TRACE)
output := captureOutput(logger, func() {
logger.Warn("Hello %s", "world")
})
validmsg := regexp.MustCompile(`^.* WARN: Hello world \- {}`)
if !validmsg.MatchString(output) {
t.Errorf("Log Warn Failed: %s", output)
}
}
func TestLogError(t *testing.T) {
logger := DefaultLogger()
logger.SetLevel(LOG_TRACE)
output := captureOutput(logger, func() {
logger.Error("Hello %s", "world")
})
validmsg := regexp.MustCompile(`^.* ERROR: Hello world \- {}`)
if !validmsg.MatchString(output) {
t.Errorf("Log Error Failed: %s", output)
}
}
func TestLogFatal(t *testing.T) {
//logger := DefaultLogger()
//logger.SetLevel(LOG_TRACE)
//output := captureOutput(logger, func() {
// logger.Fatal("Hello %s", "world")
//})
//validmsg := regexp.MustCompile(`^.* FATAL: Hello world \- {}`)
//if !validmsg.MatchString(output) {
// t.Errorf("Log Fatal Failed: %s", output)
//}
}
func TestLogPanic(t *testing.T) {
logger := DefaultLogger()
logger.SetLevel(LOG_TRACE)
defer func() {
if err := recover(); err == nil {
t.Errorf("Log Panic Recovery Failed")
}
}()
output := captureOutput(logger, func() {
logger.Panic("Hello %s", "world")
})
validmsg := regexp.MustCompile(`^.* PANIC: Hello world \- {}`)
if !validmsg.MatchString(output) {
t.Errorf("Log Panic Failed: %s", output)
}
}

52
loggers/logrus/logrus.go Normal file
View file

@ -0,0 +1,52 @@
package logruslog
import (
"github.com/Fishwaldo/go-taskmanager"
"github.com/sirupsen/logrus"
)
var _ taskmanager.Logger = (*LruLogger)(nil)
type LruLogger struct {
Lru *logrus.Entry
}
func (l LruLogger) Debug(msg string, keysAndValues ...interface{}) {
l.Lru.Debugf(msg, keysAndValues...)
}
func (l LruLogger) Error(msg string, keysAndValues ...interface{}) {
l.Lru.Errorf(msg, keysAndValues...)
}
func (l LruLogger) Fatal(msg string, keysAndValues ...interface{}) {
l.Lru.Fatalf(msg, keysAndValues...)
}
func (l LruLogger) Info(msg string, keysAndValues ...interface{}) {
l.Lru.Infof(msg, keysAndValues...)
}
func (l LruLogger) Panic(msg string, keysAndValues ...interface{}) {
l.Lru.Panicf(msg, keysAndValues...)
}
func (l LruLogger) Warn(msg string, keysAndValues ...interface{}) {
l.Lru.Warnf(msg, keysAndValues...)
}
func (l *LruLogger) With(key string, value interface{}) taskmanager.Logger {
nl := &LruLogger{Lru: logrus.NewEntry(l.Lru.Logger)}
nl.Lru = l.Lru.WithField(key, value)
return nl
}
func (l *LruLogger) Trace(key string, args ...interface{}) {
l.Lru.Tracef(key, args...)
}
func (l *LruLogger) Sync() {
}
func (l *LruLogger) New(name string) taskmanager.Logger {
nl := &LruLogger{Lru: logrus.NewEntry(l.Lru.Logger)}
return nl
}
//LogrusDefaultLogger Return Logger based on logrus with new instance
func LogrusDefaultLogger() taskmanager.Logger {
// TODO control verbosity
l := &LruLogger{Lru: logrus.NewEntry(logrus.New())}
return l
}

58
loggers/zap/log.go Normal file
View file

@ -0,0 +1,58 @@
package zaplog
import (
"github.com/Fishwaldo/go-taskmanager"
"go.uber.org/zap"
)
var _ taskmanager.Logger = (*ZapLogger)(nil)
type ZapLogger struct {
Zap *zap.SugaredLogger
}
func (l *ZapLogger) With(key string, value interface{}) taskmanager.Logger {
nl := &ZapLogger{Zap: l.Zap.With(key, value)}
return nl
}
func (l *ZapLogger) Trace(message string, params ...interface{}) {
l.Zap.Debugf(message, params...)
}
func (l *ZapLogger) Debug(message string, params ...interface{}) {
l.Zap.Debugf(message, params...)
}
func (l *ZapLogger) Info(message string, params ...interface{}) {
l.Zap.Infof(message, params...)
}
func (l *ZapLogger) Warn(message string, params ...interface{}) {
l.Zap.Warnf(message, params...)
}
func (l *ZapLogger) Error(message string, params ...interface{}) {
l.Zap.Errorf(message, params...)
}
func (l *ZapLogger) Fatal(message string, params ...interface{}) {
l.Zap.Fatalf(message, params...)
}
func (l *ZapLogger) Panic(message string, params ...interface{}) {
l.Zap.Panicf(message, params...)
}
func (l *ZapLogger) New(name string) (nl taskmanager.Logger) {
zl := &ZapLogger{Zap: l.Zap}
zl.Zap.Named(name)
return zl
}
func (l *ZapLogger) Sync() {
}
//DefaultLogger Return Default Sched Logger based on Zap's sugared logger
func NewZapLogger() *ZapLogger {
// TODO control verbosity
loggerBase, _ := zap.NewDevelopment(zap.AddCallerSkip(1), zap.AddStacktrace(zap.ErrorLevel))
sugarLogger := loggerBase.Sugar()
return &ZapLogger{
Zap: sugarLogger,
}
}

237
metrics/metrics.go Normal file
View file

@ -0,0 +1,237 @@
package schedmetrics
import (
// "github.com/armon/go-metrics/prometheus"
)
const (
Metrics_Guage_Up = iota
Metrics_Guage_Jobs
)
const (
Metrics_Counter_ContextCancels = iota
Metrics_Counter_Stops
Metrics_Counter_Reschedule
Metrics_Counter_FailedJobs
Metrics_Counter_PostExecutationFailedRuns
Metrics_Counter_PostExecutationRetriesRequested
Metrics_Counter_PostExecutationRetryRuns
Metrics_Counter_PostExecutationRetries
Metrics_Counter_PostExecutationRetryResets
Metrics_Counter_PostExecutationSkips
Metrics_Counter_PostExecucutionDefaultRetries
Metrics_Counter_SucceededJobs
Metrics_Counter_PostExecutationRuns
Metrics_Counter_PreExecutationRuns
Metrics_Counter_DeferredJobs
Metrics_Counter_PreRetryRuns
Metrics_Counter_PreRetryRetries
Metrics_Counter_PreRetryResets
Metrics_Counter_PreRetrySkips
Metrics_Counter_PreRetryDefault
Metrics_Counter_Runs
Metrics_Counter_OverlappingRuns
Metrics_Counter_Runerrors
Metrics_Counter_MW_ConcurrentJob_Blocked
Metrics_Counter_MW_HasTags_Blocked
Metrics_Counter_MW_ConstantBackoff_Retries
Metrics_Counter_MW_ExpBackoff_Retries
Metrics_Counter_MW_RetryLimit_Hit
)
type GaugeValues struct {
Name []string
Help string
}
type CounterValues struct {
Name []string
Help string
}
type SummaryValues struct {
Name []string
Help string
}
var MetricsGauges = func() map[int]GaugeValues {
return map[int]GaugeValues {
Metrics_Guage_Up:
{
Name: []string{"sched", "up"},
Help: "If the Job Scheduler is Active",
},
Metrics_Guage_Jobs:
{
Name: []string{"sched", "jobs"},
Help: "Number of Jobs Scheduled",
},
}
}
var MetricsCounter = func() map[int]CounterValues {
return map[int]CounterValues {
Metrics_Counter_ContextCancels:
{
Name: []string{"sched", "contextcancels"},
Help: "Number of Times a Job was canceled by a Context",
},
Metrics_Counter_Stops:
{
Name: []string{"sched", "stops"},
Help: "Number of Times a Job was Stopped",
},
Metrics_Counter_Reschedule:
{
Name: []string{"sched", "reschedules"},
Help: "Number of times a Job was rescheduled",
},
Metrics_Counter_FailedJobs:
{
Name: []string{"sched", "failedjobs"},
Help: "Number of Failed Job Runs",
},
Metrics_Counter_PostExecutationFailedRuns:
{
Name: []string{"sched", "postexecutationfailedmiddlewareruns"},
Help: "Number of Post Exececutation Failure Middlwares were executed",
},
Metrics_Counter_PostExecutationRetriesRequested:
{
Name: []string{"sched", "postexecutionfailureretriesrequested"},
Help: "Number of Post Exececution Retries Requested",
},
Metrics_Counter_PostExecutationRetryRuns:
{
Name: []string{"sched", "postretrymiddlewareruns"},
Help: "Number of Post Failure Retry Middlewwares executed",
},
Metrics_Counter_PostExecutationRetries:
{
Name: []string{"sched", "postretrymiddlewareretries"},
Help: "Number of Post Failure Retries executed with a Retry time set",
},
Metrics_Counter_PostExecutationRetryResets:
{
Name: []string{"sched", "postretrymiddlewareresets"},
Help: "Number of Post Failure Retry Middlewares that were Reset",
},
Metrics_Counter_PostExecutationSkips:
{
Name: []string{"sched", "postretrymiddlewareskips"},
Help: "Number of Post Failure Retry Middlware that skipped setting a duration",
},
Metrics_Counter_PostExecucutionDefaultRetries:
{
Name: []string{"sched", "postretrymiddlewaredefaultretry"},
Help: "Number of Default Retry Durations executed During Post Executation stage - Failures",
},
Metrics_Counter_SucceededJobs:
{
Name: []string{"sched", "succceededjobs"},
Help: "Number of Successful Job Runs",
},
Metrics_Counter_PostExecutationRuns:
{
Name: []string{"sched", "postexecutationsucceededmiddlewareruns"},
Help: "Number of Times a Post Executation Middlware was run after a successful return",
},
Metrics_Counter_PreExecutationRuns:
{
Name: []string{"sched", "preexecutationmiddlewareruns"},
Help: "Number of times Pre Executation Middlewares were run",
},
Metrics_Counter_DeferredJobs:
{
Name: []string{"sched", "deferredjobs"},
Help: "Number of times jobs were deferred",
},
Metrics_Counter_PreRetryRuns:
{
Name: []string{"sched", "preretrymiddlewareruns"},
Help: "Number of times Pre-Retry Middlewares were run",
},
Metrics_Counter_PreRetryRetries:
{
Name: []string{"sched", "preretrymiddlewareretries"},
Help: "Number of Pre Retry Middlewares that Set a Retry Duration",
},
Metrics_Counter_PreRetryResets:
{
Name: []string{"sched", "preretrymiddlewareresets"},
Help: "Number of Pre Retry Middlewares that were Reset",
},
Metrics_Counter_PreRetrySkips:
{
Name: []string{"sched", "preretrymiddlewareskips"},
Help: "Number of Pre Retry Middlewares that Skipped Setting a Retry Duration",
},
Metrics_Counter_PreRetryDefault:
{
Name: []string{"sched", "preretrymiddlewaredefaultretry"},
Help: "Number of Default Retry Durations executed During Pre-Executation stage",
},
Metrics_Counter_Runs:
{
Name: []string{"sched", "runs"},
Help: "Number of Times a Job has run",
},
Metrics_Counter_OverlappingRuns:
{
Name: []string{"sched", "overlappingRuns"},
Help: "Number of Times a Job has Overlapped with a previous Job",
},
Metrics_Counter_Runerrors:
{
Name: []string{"sched", "runerrors"},
Help: "Number of Times a Job Errored Out",
},
Metrics_Counter_MW_ConcurrentJob_Blocked:
{
Name: []string{"sched", "middleware", "concurrentjob", "blocked"},
Help: "Number of Times a Job was Blocked from running",
},
Metrics_Counter_MW_HasTags_Blocked:
{
Name: []string{"sched", "middleware", "hastags", "blocked"},
Help: "Number of times the Has Tags Middlware Blocked Executation",
},
Metrics_Counter_MW_ConstantBackoff_Retries:
{
Name: []string{"sched", "middleware", "constantbackoff", "retries"},
Help: "Number of times the ConstantBackoff Middleware set a Retry Time",
},
Metrics_Counter_MW_ExpBackoff_Retries:
{
Name: []string{"sched", "middleware", "exponentialbackoff", "retries"},
Help: "Number of times the Exponential Backoff Middleware set a Retry Time",
},
Metrics_Counter_MW_RetryLimit_Hit:
{
Name: []string{"sched", "middleware", "retrylimit", "hit"},
Help: "Number of times the Retry Limit Middleware Canceled a pending job",
},
}
}
var MetricsSummary = func() map[int]SummaryValues {
return map[int]SummaryValues {
}
}
func GetMetricsGaugeKey(key int) []string {
return MetricsGauges()[key].Name
}
func GetMetricsCounterKey(key int) []string {
return MetricsCounter()[key].Name
}
func GetMetricsSummaryKey(key int) []string {
return MetricsSummary()[key].Name
}

View file

@ -0,0 +1,34 @@
package prometheusConfig
import (
schedmetrics "github.com/Fishwaldo/go-taskmanager/metrics"
schedprom "github.com/armon/go-metrics/prometheus"
//"github.com/prometheus/client_golang/prometheus"
)
func GetPrometicsGaugeConfig() []schedprom.GaugeDefinition {
var pgd []schedprom.GaugeDefinition
for _, v := range schedmetrics.MetricsGauges() {
pgi := schedprom.GaugeDefinition{Name: v.Name, Help: v.Help}
pgd = append(pgd, pgi)
}
return pgd
}
func GetPrometicsCounterConfig() []schedprom.CounterDefinition {
var pcd []schedprom.CounterDefinition
for _, v := range schedmetrics.MetricsCounter() {
pci := schedprom.CounterDefinition{Name: v.Name, Help: v.Help}
pcd = append(pcd, pci)
}
return pcd
}
func GetPrometicsSummaryConfig() []schedprom.SummaryDefinition {
var psd []schedprom.SummaryDefinition
for _, v := range schedmetrics.MetricsSummary() {
psi := schedprom.SummaryDefinition{Name: v.Name, Help: v.Help}
psd = append(psd, psi)
}
return psd
}

View file

@ -0,0 +1,86 @@
package executionmiddleware
import (
"context"
"sync"
"github.com/Fishwaldo/go-taskmanager"
"github.com/Fishwaldo/go-taskmanager/joberrors"
schedmetrics "github.com/Fishwaldo/go-taskmanager/metrics"
"github.com/armon/go-metrics"
)
var _ taskmanager.ExecutionMiddleWare = (*ConcurrentJobBlocker)(nil)
type hasCJBtxKey struct{}
// ConcurrentJobBlocker is a Middleware that will defer a job if one is already running
type ConcurrentJobBlocker struct {
mx sync.Mutex
}
type cjllock struct {
running bool
}
func (hth *ConcurrentJobBlocker) getTagCtx(s *taskmanager.Task) *cjllock {
cjl, ok := s.Ctx.Value(hasCJBtxKey{}).(*cjllock)
if !ok {
return nil
}
return cjl
}
func (hth *ConcurrentJobBlocker) setTagCtx(s *taskmanager.Task, cjl *cjllock) {
s.Ctx = context.WithValue(s.Ctx, hasCJBtxKey{}, cjl)
}
func (hth *ConcurrentJobBlocker) PreHandler(s *taskmanager.Task) (taskmanager.MWResult, error) {
hth.mx.Lock()
defer hth.mx.Unlock()
cjl := hth.getTagCtx(s)
if cjl != nil {
s.Logger.Debug("Concurrent Job Lock is %t", cjl.running)
if cjl.running {
metrics.IncrCounterWithLabels(schedmetrics.GetMetricsCounterKey(schedmetrics.Metrics_Counter_MW_ConcurrentJob_Blocked), 1, []metrics.Label{{Name: "id", Value: s.GetID()}})
return taskmanager.MWResult{Result: taskmanager.MWResult_Defer}, joberrors.FailedJobError{Message: "Job Already Running", ErrorType: joberrors.Error_ConcurrentJob}
} else {
cjl.running = true
}
} else {
return taskmanager.MWResult{Result: taskmanager.MWResult_NextMW}, joberrors.FailedJobError{Message: "ConcurrentJobBlocker Not Initilzied", ErrorType: joberrors.Error_Middleware}
}
return taskmanager.MWResult{Result: taskmanager.MWResult_NextMW}, nil
}
func (hth *ConcurrentJobBlocker) PostHandler(s *taskmanager.Task, err error) taskmanager.MWResult {
hth.mx.Lock()
defer hth.mx.Unlock()
cjl := hth.getTagCtx(s)
if cjl != nil {
s.Logger.Debug("Concurrent Job Lock is %t", cjl.running)
if !cjl.running {
s.Logger.Warn("Job Was Not Locked")
return taskmanager.MWResult{Result: taskmanager.MWResult_NextMW}
} else {
cjl.running = false
}
}
return taskmanager.MWResult{Result: taskmanager.MWResult_NextMW}
}
func (hth *ConcurrentJobBlocker) Initilize(s *taskmanager.Task) {
hth.mx.Lock()
defer hth.mx.Unlock()
hth.setTagCtx(s, &cjllock{running: false})
}
func (hth *ConcurrentJobBlocker) Reset(s *taskmanager.Task) {
hth.mx.Lock()
defer hth.mx.Unlock()
cjl := hth.getTagCtx(s)
cjl.running = false
}
// NewCJLock Create a new ConcurrentJob Lock Middleware
func NewCJLock() *ConcurrentJobBlocker {
return &ConcurrentJobBlocker{}
}

View file

@ -0,0 +1,113 @@
package executionmiddleware
import (
"sync"
"github.com/Fishwaldo/go-taskmanager"
"github.com/Fishwaldo/go-taskmanager/joberrors"
schedmetrics "github.com/Fishwaldo/go-taskmanager/metrics"
"github.com/armon/go-metrics"
)
var _ taskmanager.ExecutionMiddleWare = (*HasTagHandler)(nil)
type hasTagCtxKey struct{}
// HasTagHandler is a Middleware that will Defer jobs if the Requirements are not meet
// Requirements are Specified as "Tags" (strings) and a Job has a list of Tags needed
// When a Job is about to be dispatched, the Jobs "Required" tags are compared against a
// list of "available" tags, and if the available tags does not match the required tags, the
// job is deferred (or canceled if there is no other Middleware after this one.)
type HasTagHandler struct {
mx sync.RWMutex
requiredTags map[string]bool
haveTags map[string]bool
}
// SetHaveTags Set a tag indicating that a resource is available. Before a job runs with a Matching
// Required tag, it checks if its present.
func (hth *HasTagHandler) SetHaveTags(tag string) {
hth.mx.Lock()
defer hth.mx.Unlock()
hth.haveTags[tag] = true
}
// DelHaveTags Delete a tag indicating a resource is no longer available.
func (hth *HasTagHandler) DelHaveTags(tag string) {
hth.mx.Lock()
defer hth.mx.Unlock()
delete(hth.haveTags, tag)
}
// IsHaveTag Test if a resource represented by tag is present
func (hth *HasTagHandler) IsHaveTag(tag string) bool {
hth.mx.RLock()
defer hth.mx.RUnlock()
_, ok := hth.haveTags[tag]
return ok
}
// SetRequiredTags add a tag that makes a job dependant upon a resource being available (set via SetHaveTag)
func (hth *HasTagHandler) SetRequiredTags(tag string) {
hth.mx.Lock()
defer hth.mx.Unlock()
hth.requiredTags[tag] = true
}
// DelRequiredTags delete a tag that a job requires.
func (hth *HasTagHandler) DelRequiredTags(tag string) {
hth.mx.Lock()
defer hth.mx.Unlock()
delete(hth.requiredTags, tag)
}
// IsRequiredTag test if a resource requirement represented by tag is available
func (hth *HasTagHandler) IsRequiredTag(tag string) bool {
hth.mx.RLock()
defer hth.mx.RUnlock()
_, ok := hth.requiredTags[tag]
return ok
}
// Handler Runs the Tag Handler before a job is dispatched.
func (hth *HasTagHandler) PreHandler(s *taskmanager.Task) (taskmanager.MWResult, error) {
s.Logger.
With("Present", hth.haveTags).
With("Required", hth.requiredTags).
Debug("Checking Tags")
for k := range hth.requiredTags {
if !(hth.IsHaveTag(k)) {
s.Logger.
With("Tag", k).
Warn("Missing Tag")
metrics.IncrCounterWithLabels(schedmetrics.GetMetricsCounterKey(schedmetrics.Metrics_Counter_MW_HasTags_Blocked), 1, []metrics.Label{{Name: "id", Value: s.GetID()}})
return taskmanager.MWResult{Result: taskmanager.MWResult_Defer}, joberrors.FailedJobError{Message: "Missing Tag", ErrorType: joberrors.Error_DeferedJob}
} else {
s.Logger.
With("Tag", k).
Info("Tag Present")
}
}
return taskmanager.MWResult{Result: taskmanager.MWResult_NextMW}, nil
}
func (hth *HasTagHandler) PostHandler(s *taskmanager.Task, err error) taskmanager.MWResult {
return taskmanager.MWResult{Result: taskmanager.MWResult_NextMW}
}
func (hth *HasTagHandler) Initilize(s *taskmanager.Task) {
}
func (hth *HasTagHandler) Reset(s *taskmanager.Task) {
}
// NewTagHandlerMW Create a new Tag Handler Middleware
func NewTagHandler() *HasTagHandler {
val := HasTagHandler{
requiredTags: make(map[string]bool),
haveTags: make(map[string]bool),
}
return &val
}

View file

@ -0,0 +1,85 @@
package retrymiddleware
import (
"context"
"sync"
"time"
"github.com/Fishwaldo/go-taskmanager"
"github.com/Fishwaldo/go-taskmanager/joberrors"
schedmetrics "github.com/Fishwaldo/go-taskmanager/metrics"
"github.com/armon/go-metrics"
"github.com/cenkalti/backoff/v4"
)
var _ taskmanager.RetryMiddleware = (*RetryConstantBackoff)(nil)
type constantCtxKey struct{}
// RetryConstantBackoff is a Middleware that will retry jobs after failures.
// By Default, it runs after Panics, Deferred Jobs (by other Middleware) or if OverLapped Jobs are prohibited.
// It uses a Constant Backoff Scheme and is implemented by github.com/cenkalti/backoff/v4
type RetryConstantBackoff struct {
mx sync.RWMutex
interval time.Duration
RetryMiddlewareOptions
}
func (ebh *RetryConstantBackoff) getCtx(s *taskmanager.Task) (*backoff.ConstantBackOff, bool) {
bo, ok := s.Ctx.Value(constantCtxKey{}).(*backoff.ConstantBackOff)
return bo, ok
}
func (ebh *RetryConstantBackoff) setCtx(s *taskmanager.Task, bo *backoff.ConstantBackOff) {
s.Ctx = context.WithValue(s.Ctx, constantCtxKey{}, bo)
}
//Handler Run the Contant Backoff Handler
func (ebh *RetryConstantBackoff) Handler(s *taskmanager.Task, prerun bool, e error) (retry taskmanager.RetryResult, err error) {
ebh.mx.Lock()
defer ebh.mx.Unlock()
bo, ok := ebh.getCtx(s)
if !ok {
s.Logger.Error("RetryConstantBackoff Not Reset/Initialzied")
return taskmanager.RetryResult{Result: taskmanager.RetryResult_NextMW}, joberrors.FailedJobError{ErrorType: joberrors.Error_Middleware, Message: "RetryConstantBackoff Not Reset/Initialzied"}
}
if ebh.shouldHandleState(e) {
next := bo.NextBackOff()
s.Logger.Info("Constant BO Handler Retrying Job in %s", next)
metrics.IncrCounterWithLabels(schedmetrics.GetMetricsCounterKey(schedmetrics.Metrics_Counter_MW_ConstantBackoff_Retries), 1, []metrics.Label{{Name: "id", Value: s.GetID()}})
return taskmanager.RetryResult{Result: taskmanager.RetryResult_Retry, Delay: next}, nil
}
return taskmanager.RetryResult{Result: taskmanager.RetryResult_NextMW}, nil
}
func (ebh *RetryConstantBackoff) Reset(s *taskmanager.Task) (ok bool) {
bo, ok := ebh.getCtx(s)
if ok {
bo.Reset()
} else {
bo := backoff.NewConstantBackOff(ebh.interval)
ebh.setCtx(s, bo)
}
return true
}
func (ebh *RetryConstantBackoff) Initilize(s *taskmanager.Task) {
ebh.Reset(s)
}
//NewDefaultRetryConstantBackoff Create a Constant Backoff Handler with 1 second
func NewDefaultRetryConstantBackoff() *RetryConstantBackoff {
val := NewRetryConstantBackoff(1 * time.Second)
return val
}
//NewConstandBackoffMW Create a Constant Backoff Handler with Specified Duration
func NewRetryConstantBackoff(interval time.Duration) *RetryConstantBackoff {
val := RetryConstantBackoff{
interval: interval,
}
val.handleDeferred = true
val.handleOverlap = true
val.handlePanic = true
return &val
}

View file

@ -0,0 +1,89 @@
package retrymiddleware
import (
"context"
"sync"
"github.com/Fishwaldo/go-taskmanager"
"github.com/Fishwaldo/go-taskmanager/joberrors"
schedmetrics "github.com/Fishwaldo/go-taskmanager/metrics"
"github.com/armon/go-metrics"
"github.com/cenkalti/backoff/v4"
)
var _ taskmanager.RetryMiddleware = (*RetryExponentialBackoff)(nil)
type eboCtxKey struct{}
// RetryExponentialBackoff will retry a job using a exponential backoff
// By Default, it runs after Panics, Deferred Jobs (by other Middleware) or if OverLapped Jobs are prohibited.
// It uses a Exponential Backoff Scheme and is implemented by github.com/cenkalti/backoff/v4
type RetryExponentialBackoff struct {
mx sync.RWMutex
bo *backoff.ExponentialBackOff
RetryMiddlewareOptions
}
func (ebh *RetryExponentialBackoff) getCtx(s *taskmanager.Task) (*backoff.ExponentialBackOff, bool) {
bo, ok := s.Ctx.Value(eboCtxKey{}).(*backoff.ExponentialBackOff)
return bo, ok
}
func (ebh *RetryExponentialBackoff) setCtx(s *taskmanager.Task, bo *backoff.ExponentialBackOff) {
s.Ctx = context.WithValue(s.Ctx, eboCtxKey{}, bo)
}
//Handler Run the ExponentialBackoff
func (ebh *RetryExponentialBackoff) Handler(s *taskmanager.Task, prerun bool, e error) (retry taskmanager.RetryResult, err error) {
ebh.mx.Lock()
defer ebh.mx.Unlock()
bo, ok := ebh.getCtx(s)
if !ok {
s.Logger.Error("RetryExponentialBackoff Not Reset/Initialzied")
return taskmanager.RetryResult{Result: taskmanager.RetryResult_NextMW}, joberrors.FailedJobError{ErrorType: joberrors.Error_Middleware, Message: "RetryExponentialBackoff Not Reset/Initialzied"}
}
if ebh.shouldHandleState(e) {
next := bo.NextBackOff()
s.Logger.Info("Exponential BO Handler Retrying Job in %s", next)
metrics.IncrCounterWithLabels(schedmetrics.GetMetricsCounterKey(schedmetrics.Metrics_Counter_MW_ExpBackoff_Retries), 1, []metrics.Label{{Name: "id", Value: s.GetID()}})
return taskmanager.RetryResult{Result: taskmanager.RetryResult_Retry, Delay: next}, nil
}
return taskmanager.RetryResult{Result: taskmanager.RetryResult_NextMW}, nil
}
func (ebh *RetryExponentialBackoff) Reset(s *taskmanager.Task) (ok bool) {
bo, ok := ebh.getCtx(s)
if ok {
bo.Reset()
} else {
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = ebh.bo.InitialInterval
bo.MaxElapsedTime = ebh.bo.MaxElapsedTime
bo.MaxInterval = ebh.bo.MaxInterval
bo.Multiplier = ebh.bo.Multiplier
bo.RandomizationFactor = ebh.bo.RandomizationFactor
ebh.setCtx(s, bo)
}
return true
}
func (ebh *RetryExponentialBackoff) Initilize(s *taskmanager.Task) {
ebh.Reset(s)
}
//NewDefaultExponentialBackoffMW Create a ExponentialBackoff Handler with Default settings
func NewDefaultRetryExponentialBackoff() *RetryExponentialBackoff {
val := NewRetryExponentialBackoff(backoff.NewExponentialBackOff())
return val
}
//NewExponentialBackoffMW Create a ExponentialBackoff Handler Custom Settings
//Uses github.com/cenkalti/backoff/v4 as the Backoff Implementation
func NewRetryExponentialBackoff(ebo *backoff.ExponentialBackOff) *RetryExponentialBackoff {
val := RetryExponentialBackoff{
bo: ebo,
}
val.handlePanic = true
val.handleOverlap = true
val.handleDeferred = true
return &val
}

View file

@ -0,0 +1,91 @@
package retrymiddleware
import (
"context"
"sync"
"github.com/Fishwaldo/go-taskmanager"
"github.com/Fishwaldo/go-taskmanager/joberrors"
schedmetrics "github.com/Fishwaldo/go-taskmanager/metrics"
"github.com/armon/go-metrics"
)
var _ taskmanager.RetryMiddleware = (*RetryCountLimit)(nil)
type retryCountCtxKey struct{}
// RetryConstantBackoff is a Middleware that will retry jobs after failures.
// By Default, it runs after Panics, Deferred Jobs (by other Middleware) or if OverLapped Jobs are prohibited.
// It uses a Constant Backoff Scheme and is implemented by github.com/cenkalti/backoff/v4
type RetryCountLimit struct {
mx sync.RWMutex
max int
RetryMiddlewareOptions
}
type retryCountCtx struct {
attempts int
}
func (ebh *RetryCountLimit) getCtx(s *taskmanager.Task) (*retryCountCtx, bool) {
bo, ok := s.Ctx.Value(retryCountCtxKey{}).(*retryCountCtx)
return bo, ok
}
func (ebh *RetryCountLimit) setCtx(s *taskmanager.Task, bo *retryCountCtx) {
s.Ctx = context.WithValue(s.Ctx, retryCountCtxKey{}, bo)
}
//Handler Run the Contant Backoff Handler
func (ebh *RetryCountLimit) Handler(s *taskmanager.Task, prerun bool, e error) (retry taskmanager.RetryResult, err error) {
ebh.mx.Lock()
defer ebh.mx.Unlock()
bo, ok := ebh.getCtx(s)
if !ok {
s.Logger.Error("RetryCountLimit Not Reset/Initialzied")
return taskmanager.RetryResult{Result: taskmanager.RetryResult_NextMW}, joberrors.FailedJobError{ErrorType: joberrors.Error_Middleware, Message: "RetryCountLimit Not Reset/Initialzied"}
}
if ebh.shouldHandleState(e) {
bo.attempts++
if bo.attempts > ebh.max {
s.Logger.Warn("Exceeded Max Number of Attempts: %d (%d limit)", bo.attempts, ebh.max)
metrics.IncrCounterWithLabels(schedmetrics.GetMetricsCounterKey(schedmetrics.Metrics_Counter_MW_RetryLimit_Hit), 1, []metrics.Label{{Name: "id", Value: s.GetID()}})
return taskmanager.RetryResult{Result: taskmanager.RetryResult_NoRetry}, nil
} else {
s.Logger.Info("Retrying Job: Attempt %d - %d limit", bo.attempts, ebh.max)
return taskmanager.RetryResult{Result: taskmanager.RetryResult_NextMW}, nil
}
}
return taskmanager.RetryResult{Result: taskmanager.RetryResult_NextMW}, nil
}
func (ebh *RetryCountLimit) Reset(s *taskmanager.Task) (ok bool) {
bo, ok := ebh.getCtx(s)
if ok {
bo.attempts = 0
} else {
ebh.setCtx(s, &retryCountCtx{attempts: 0})
}
return true
}
func (ebh *RetryCountLimit) Initilize(s *taskmanager.Task) {
ebh.Reset(s)
}
//NewDefaultRetryConstantBackoff Create a Constant Backoff Handler with 1 second
func NewDefaultRetryCountLimit() *RetryCountLimit {
val := NewRetryRetryCountLimit(10)
return val
}
//NewConstandBackoffMW Create a Constant Backoff Handler with Specified Duration
func NewRetryRetryCountLimit(limit int) *RetryCountLimit {
val := RetryCountLimit{
max: limit,
}
val.handleDeferred = true
val.handleOverlap = true
val.handlePanic = true
return &val
}

View file

@ -0,0 +1,49 @@
package retrymiddleware
import (
"errors"
"github.com/Fishwaldo/go-taskmanager/joberrors"
)
type RetryMiddlewareOptions struct {
handlePanic bool
handleOverlap bool
handleDeferred bool
}
// HandlePanic Enable/Disable the ExponetialBackoff Handler for Panics
func (retryOptions *RetryMiddlewareOptions) HandlePanic(val bool) {
retryOptions.handlePanic = val
}
// HandleOverlap Enable/Disable the ExponetialBackoff Handler for Overlapped Jobs
func (retryOptions *RetryMiddlewareOptions) HandleOverlap(val bool) {
retryOptions.handleOverlap = val
}
// HandleDeferred Enable/Disable the ExponetialBackoff Handler for Deferred Jobs
func (retryOptions *RetryMiddlewareOptions) HandleDeferred(val bool) {
retryOptions.handleDeferred = val
}
func (retryOptions *RetryMiddlewareOptions) shouldHandleState(e error) bool {
var err joberrors.FailedJobError
if errors.As(e, &err) {
switch err.ErrorType {
case joberrors.Error_Panic:
if retryOptions.handlePanic {
return true
}
case joberrors.Error_ConcurrentJob:
if retryOptions.handleOverlap {
return true
}
case joberrors.Error_DeferedJob:
if retryOptions.handleDeferred {
return true
}
}
}
return false
}

70
options.go Normal file
View file

@ -0,0 +1,70 @@
package taskmanager
import (
)
type taskoptions struct {
logger Logger
executationmiddlewares []ExecutionMiddleWare
retryMiddlewares []RetryMiddleware
}
func defaultTaskOptions() *taskoptions {
logger := DefaultLogger()
return &taskoptions{
logger: logger,
}
}
func defaultSchedOptions() *taskoptions {
logger := DefaultLogger()
return &taskoptions {
logger: logger,
}
}
// Option to customize schedule behavior, check the sched.With*() functions that implement Option interface for the
// available options
type Option interface {
apply(*taskoptions)
}
type loggerOption struct {
Logger Logger
}
func (l loggerOption) apply(opts *taskoptions) {
opts.logger = l.Logger
}
//WithLogger Use the supplied Logger as the logger.
func WithLogger(logger Logger) Option {
return loggerOption{Logger: logger}
}
type executationMiddleWare struct {
middleware ExecutionMiddleWare
}
func (l executationMiddleWare) apply(opts *taskoptions) {
opts.executationmiddlewares = append(opts.executationmiddlewares, l.middleware)
}
func WithExecutationMiddleWare(handler ExecutionMiddleWare) Option {
return executationMiddleWare{middleware: handler}
}
type retryMiddleware struct {
middleware RetryMiddleware
}
func (l retryMiddleware) apply(opts *taskoptions) {
opts.retryMiddlewares = append(opts.retryMiddlewares, l.middleware)
}
func WithRetryMiddleWare(handler RetryMiddleware) Option {
return retryMiddleware{middleware: handler}
}

72
options_test.go Normal file
View file

@ -0,0 +1,72 @@
package taskmanager
import (
"testing"
)
func TestOptionsLogger(t *testing.T) {
options := defaultTaskOptions()
log := DefaultLogger()
logop := WithLogger(log)
logop.apply(options)
switch options.logger.(type) {
case *StdLogger:
default:
t.Errorf("WithLogger Options Apply Failed")
}
}
type testemw struct {
}
func (mw *testemw) PreHandler(s *Task) (MWResult,error) {
return MWResult{}, nil
}
func (mw *testemw) PostHandler(s *Task, err error) (MWResult) {
return MWResult{}
}
func (mw *testemw) Reset(s *Task) {
}
func (mw *testemw) Initilize(s *Task) {
}
func TestOptionsExecutionMW (t *testing.T) {
options := defaultTaskOptions()
mw := &testemw{}
mwop := WithExecutationMiddleWare(mw)
mwop.apply(options)
switch options.executationmiddlewares[0].(type) {
case *testemw:
default:
t.Errorf("WithExecutionMiddleware isn't testmw")
}
}
type testrmw struct {
}
func (mw *testrmw) Handler(s *Task, prerun bool, e error) (retry RetryResult, err error) {
return RetryResult{}, nil
}
func (mw *testrmw) Reset(s *Task) (ok bool) {
return false
}
func (mw *testrmw) Initilize(s *Task) {
}
func TestOptionsRetryMW (t *testing.T) {
options := defaultTaskOptions()
mw := &testrmw{}
mwop := WithRetryMiddleWare(mw)
mwop.apply(options)
switch options.retryMiddlewares[0].(type) {
case *testrmw:
default:
t.Errorf("WithRetryMiddleWare isn't testmw")
}
}

249
scheduler.go Normal file
View file

@ -0,0 +1,249 @@
package taskmanager
import (
"context"
"sort"
"sync"
"time"
"github.com/sasha-s/go-deadlock"
"github.com/Fishwaldo/go-taskmanager/joberrors"
schedmetrics "github.com/Fishwaldo/go-taskmanager/metrics"
"github.com/armon/go-metrics"
)
// Scheduler manage one or more Schedule creating them using common options, enforcing unique IDs, and supply methods to
// Start / Stop all schedule(s).
type Scheduler struct {
tasks map[string]*Task
nextRun timeSlice
mx deadlock.RWMutex
tsmx deadlock.RWMutex
log Logger
updateScheduleChan chan updateSignalOp
scheduleOpts []Option
}
type UpdateSignalOp_Type int
const (
updateSignalOp_Reschedule UpdateSignalOp_Type = iota
)
type updateSignalOp struct {
id string
operation UpdateSignalOp_Type
}
type timeSlice []*Task
func (p timeSlice) Len() int {
return len(p)
}
func (p timeSlice) Less(i, j int) bool {
if p[i].GetNextRun().IsZero() {
return false
}
return p[i].GetNextRun().Before(p[j].GetNextRun())
}
func (p timeSlice) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}
//NewScheduler Creates new Scheduler, opt Options are applied to *every* schedule added and created by this scheduler.
func NewScheduler(opts ...Option) *Scheduler {
var options = defaultSchedOptions()
// Apply Options
for _, option := range opts {
option.apply(options)
}
s := &Scheduler{
tasks: make(map[string]*Task),
nextRun: make(timeSlice, 0),
updateScheduleChan: make(chan updateSignalOp, 100),
scheduleOpts: opts,
log: options.logger,
}
go s.scheduleLoop()
return s
}
//Add Create a new Task for` jobFunc func()` that will run according to `timer Timer` with the []Options of the Scheduler.
func (s *Scheduler) Add(ctx context.Context, id string, timer Timer, job func(context.Context), extraOpts ...Option) error {
s.mx.Lock()
defer s.mx.Unlock()
if _, ok := s.tasks[id]; ok {
return joberrors.ErrorScheduleExists{Message: "job with this id already exists"}
}
// Create schedule
opts := append(extraOpts, s.scheduleOpts...)
schedule := NewSchedule(ctx, id, timer, job, opts...)
schedule.updateSignal = s.updateScheduleChan
// Add to managed schedules
s.tasks[id] = schedule
metrics.SetGauge(schedmetrics.GetMetricsGaugeKey(schedmetrics.Metrics_Guage_Jobs), float32(len(s.tasks)))
s.log.Info("Added New Job %s", schedule.GetID())
return nil
}
//Start Start the Schedule with the given id. Return error if no Schedule with the given id exist.
func (s *Scheduler) Start(id string) error {
s.mx.Lock()
// Find Schedule by id
schedule, found := s.tasks[id]
if !found {
return joberrors.ErrorScheduleNotFound{Message: "Schedule Not Found"}
}
// Start it ¯\_(ツ)_/¯
schedule.Start()
s.mx.Unlock()
s.addScheduletoRunQueue(schedule)
s.log.Info("Start Job %s", schedule.GetID())
return nil
}
//StartAll Start All Schedules managed by the Scheduler
func (s *Scheduler) StartAll() {
s.log.Info("StartAll Called")
for _, schedule := range s.tasks {
s.Start(schedule.id)
}
}
//Stop Stop the Schedule with the given id. Return error if no Schedule with the given id exist.
func (s *Scheduler) Stop(id string) error {
s.mx.Lock()
defer s.mx.Unlock()
schedule, found := s.tasks[id]
if !found {
return joberrors.ErrorScheduleNotFound{Message: "Schedule Not Found"}
}
s.tsmx.Lock()
for pos, sched := range s.nextRun {
if sched.id == id {
s.nextRun = append(s.nextRun[:pos], s.nextRun[pos+1])
break
}
}
s.tsmx.Unlock()
schedule.Stop()
return nil
}
//StopAll Stops All Schedules managed by the Scheduler concurrently, but will block until ALL of them have stopped.
func (s *Scheduler) StopAll() {
s.mx.Lock()
defer s.mx.Unlock()
s.tsmx.Lock()
s.nextRun = nil
s.tsmx.Unlock()
wg := sync.WaitGroup{}
wg.Add(len(s.tasks))
for _, schedule := range s.tasks {
go func(scheduleCpy *Task) {
scheduleCpy.Stop()
wg.Done()
}(schedule)
}
wg.Wait()
}
//GetSchedule Returns a Schedule by ID from the Scheduler
func (s *Scheduler) GetSchedule(id string) (*Task, error) {
s.mx.Lock()
defer s.mx.Unlock()
j, ok := s.tasks[id]
if !ok {
return nil, joberrors.ErrorScheduleNotFound{Message: "Schedule Not Found"}
}
return j, nil
}
//GetAllSchedules Returns all Schedule's in the Scheduler
func (s *Scheduler) GetAllSchedules() (map[string]*Task, error) {
s.mx.Lock()
defer s.mx.Unlock()
return s.tasks, nil
}
func (s *Scheduler) getNextJob() *Task {
s.tsmx.RLock()
defer s.tsmx.RUnlock()
if s.nextRun.Len() == 0 {
return nil
}
for _, sched := range s.nextRun {
if sched.nextRun.Get().IsZero() {
s.log.Info("NextRun for %s is Zero", sched.id)
continue
}
return sched
}
return nil
}
func (s *Scheduler) scheduleLoop() {
var nextRun time.Time
var nextRunChan <-chan time.Time
for {
nextjob := s.getNextJob()
if nextjob != nil {
nextRun = nextjob.GetNextRun()
s.log.Info("Next Scheduler Run is at %s for %s", time.Until(nextRun), nextjob.GetID())
nextRunChan = time.After(time.Until(nextRun))
} else {
s.log.Info("No Jobs Scheduled")
}
select {
case <-nextRunChan:
if nextjob != nil {
s.log.Info("Dispatching Job %s", nextjob.id)
nextjob.nextRun.Set(time.Time{})
go nextjob.Run()
} else {
s.log.Error("nextjob is Nil")
}
case op := <-s.updateScheduleChan:
switch op.operation {
case updateSignalOp_Reschedule:
s.log.Info("recalcSchedule Triggered from %s", op.id)
s.updateNextRun()
default:
s.log.Warn("Unhandled updateSignalOp Recieved")
}
}
}
}
func (s *Scheduler) updateNextRun() {
s.tsmx.Lock()
defer s.tsmx.Unlock()
sort.Sort(s.nextRun)
for _, job := range s.nextRun {
s.log.Info("Next Run %s: %s", job.GetID(), job.GetNextRun().Format(time.RFC1123))
}
}
func (s *Scheduler) addScheduletoRunQueue(schedule *Task) {
s.tsmx.Lock()
defer s.tsmx.Unlock()
s.nextRun = append(s.nextRun, schedule)
s.log.Info("addScheduletoRunQueue %s", schedule.GetID())
for _, job := range s.nextRun {
s.log.Info("Job Run Queue: %s: %s", job.GetID(), job.GetNextRun().Format(time.RFC1123))
}
s.updateScheduleChan <- updateSignalOp{operation: updateSignalOp_Reschedule, id: schedule.id}
}

392
task.go Normal file
View file

@ -0,0 +1,392 @@
package taskmanager
import (
// "errors"
"context"
"fmt"
"strconv"
"sync"
"time"
"github.com/armon/go-metrics"
"github.com/sasha-s/go-deadlock"
"github.com/Fishwaldo/go-taskmanager/job"
"github.com/Fishwaldo/go-taskmanager/joberrors"
schedmetrics "github.com/Fishwaldo/go-taskmanager/metrics"
)
// PreExecutionMiddleWare Interface for developing new Middleware
// Pre Executation Middleware is run before executing a job.
type ExecutionMiddleWare interface {
PreHandler(s *Task) (MWResult, error)
PostHandler(s *Task, err error) MWResult
Reset(s *Task)
Initilize(s *Task)
}
type RetryMiddleware interface {
Handler(s *Task, prerun bool, e error) (retry RetryResult, err error)
Reset(s *Task) (ok bool)
Initilize(s *Task)
}
type RetryResult_Op int
const (
RetryResult_Retry RetryResult_Op = iota
RetryResult_NoRetry
RetryResult_NextMW
)
type RetryResult struct {
Result RetryResult_Op
Delay time.Duration
}
type MWResult_Op int
const (
MWResult_Cancel MWResult_Op = iota
MWResult_Defer
MWResult_NextMW
)
type MWResult struct {
Result MWResult_Op
}
type nextRuni struct {
mx deadlock.RWMutex
time time.Time
}
func (nr *nextRuni) Get() time.Time {
nr.mx.RLock()
defer nr.mx.RUnlock()
return nr.time
}
func (nr *nextRuni) Set(t time.Time) {
nr.mx.Lock()
defer nr.mx.Unlock()
nr.time = t
}
// passing context to a function with variables: https://play.golang.org/p/SW7uoU_KjlR
// Task A Task is an object that wraps a Job (func(){}) and runs it on a schedule according to the supplied
// Timer; With the the ability to expose metrics, and write logs to indicate job health, state, and stats.
type Task struct {
id string
// Source function used to create job.Job
jobSrcFunc func(ctx context.Context)
// Timer used to trigger Jobs
timer Timer
// Next Scheduled Run
nextRun nextRuni
// Signal Channel to Update Scheduler Class about changes
updateSignal chan updateSignalOp
// SignalChan for termination
stopScheduleSignal chan interface{}
// Concurrent safe JobMap
activeJobs jobMap
// Wait-group
wg sync.WaitGroup
// Logging Interface
Logger Logger
// Lock the Schedule Structure for Modifications
mx deadlock.RWMutex
// Middleware to run
executationMiddleWares []ExecutionMiddleWare
// Retry Middleware to run
retryMiddlewares []RetryMiddleware
// Context for Jobs
Ctx context.Context
}
// NewSchedule Create a new schedule for` jobFunc func()` that will run according to `timer Timer` with the supplied []Options
func NewSchedule(ctx context.Context, id string, timer Timer, jobFunc func(context.Context), opts ...Option) *Task {
var options = defaultTaskOptions()
// Apply Options
for _, option := range opts {
option.apply(options)
}
s := &Task{
id: id,
jobSrcFunc: jobFunc,
timer: timer,
activeJobs: *newJobMap(),
Logger: options.logger.With("id", id),
executationMiddleWares: options.executationmiddlewares,
retryMiddlewares: options.retryMiddlewares,
Ctx: ctx,
}
t, _ := timer.Next()
s.nextRun.Set(t)
return s
}
func (s *Task) GetID() string {
s.mx.Lock()
defer s.mx.Unlock()
return s.id
}
// Start Start the scheduler. Method is concurrent safe. Calling Start() have the following effects according to the
// scheduler state:
// 1. NEW: Start the Schedule; running the defined Job on the first Timer's Next() time.
// 2. QUEUED: No Effect (and prints warning)
// 3. STOPPED: Restart the schedule
// 4. FINISHED: No Effect (and prints warning)
func (s *Task) Start() {
s.mx.Lock()
defer s.mx.Unlock()
s.Logger.Info("Job Schedule Started")
metrics.SetGaugeWithLabels(schedmetrics.GetMetricsGaugeKey(schedmetrics.Metrics_Guage_Up), 1, []metrics.Label{{Name: "id", Value: s.id}})
// Create stopSchedule signal channel, buffer = 1 to allow non-blocking signaling.
s.stopScheduleSignal = make(chan interface{}, 1)
for _, mw := range s.executationMiddleWares {
s.Logger.Trace("Initilized %T Executation Middleware", mw)
mw.Initilize(s)
}
for _, mw := range s.retryMiddlewares {
s.Logger.Trace("Initilized %T Retry Middleware", mw)
mw.Initilize(s)
}
//go s.scheduleLoop()
//go func() {}()
}
// Stop stops the scheduler. Method is **Blocking** and concurrent safe. When called:
// 1. Schedule will cancel all waiting scheduled jobs.
// 2. Schedule will wait for all running jobs to finish.
// Calling Stop() has the following effects depending on the state of the schedule:
// 1. NEW: No Effect
// 2. QUEUED: Stop Schedule
// 3. STOPPED: No Effect
// 4. FINISHED: No Effect
func (s *Task) Stop() {
s.mx.Lock()
defer s.mx.Unlock()
// Stop control loop
s.Logger.Info("Stopping Schedule...")
s.stopScheduleSignal <- struct{}{}
// Print No. of Active Jobs
if noOfActiveJobs := s.activeJobs.len(); s.activeJobs.len() > 0 {
s.Logger.Info("Waiting for '%d' active jobs still running...", noOfActiveJobs)
}
s.wg.Wait()
s.Logger.Info("Job Schedule Stopped")
metrics.SetGaugeWithLabels(schedmetrics.GetMetricsGaugeKey(schedmetrics.Metrics_Guage_Up), 0, []metrics.Label{{Name: "id", Value: s.id}})
s.Logger.Sync()
close(s.stopScheduleSignal)
}
func (s *Task) runPreExecutationMiddlware() (MWResult, error) {
for _, middleware := range s.executationMiddleWares {
s.Logger.Trace("Running Handler %T", middleware)
metrics.IncrCounterWithLabels(schedmetrics.GetMetricsCounterKey(schedmetrics.Metrics_Counter_PreExecutationRuns), 1, []metrics.Label{{Name: "id", Value: s.id}, {Name: "middleware", Value: fmt.Sprintf("%T", middleware)}})
result, err := middleware.PreHandler(s)
if err != nil {
s.Logger.With("Result", result).Warn("Middleware %T Returned Error: %s", middleware, err.Error())
} else {
s.Logger.With("Result", result).Info("Middleware %T Returned No Error", middleware)
}
switch result.Result {
case MWResult_Defer:
metrics.IncrCounterWithLabels(schedmetrics.GetMetricsCounterKey(schedmetrics.Metrics_Counter_DeferredJobs), 1, []metrics.Label{{Name: "id", Value: s.id}})
return result, err
case MWResult_NextMW:
continue
case MWResult_Cancel:
return result, err
}
}
return MWResult{Result: MWResult_NextMW}, nil
}
func (s *Task) runRetryMiddleware(prerun bool, err error) {
for _, retrymiddleware := range s.retryMiddlewares {
s.Logger.Trace("Running Retry Middleware %T", retrymiddleware)
metrics.IncrCounterWithLabels(schedmetrics.GetMetricsCounterKey(schedmetrics.Metrics_Counter_PreRetryRuns), 1, []metrics.Label{{Name: "id", Value: s.id}, {Name: "middleware", Value: fmt.Sprintf("%T", retrymiddleware)}, {Name: "Prerun", Value: strconv.FormatBool(prerun)}})
retryops, _ := retrymiddleware.Handler(s, prerun, err)
switch retryops.Result {
case RetryResult_Retry:
s.Logger.Debug("Retry Middleware %T Delayed Job %d", retrymiddleware, retryops.Delay)
metrics.IncrCounterWithLabels(schedmetrics.GetMetricsCounterKey(schedmetrics.Metrics_Counter_PreRetryRetries), 1, []metrics.Label{{Name: "id", Value: s.id}, {Name: "middleware", Value: fmt.Sprintf("%T", retrymiddleware)}, {Name: "Prerun", Value: strconv.FormatBool(prerun)}})
s.retryJob(retryops.Delay)
case RetryResult_NoRetry:
s.Logger.Debug("Retry Middleware %T Canceled Retries", retrymiddleware)
metrics.IncrCounterWithLabels(schedmetrics.GetMetricsCounterKey(schedmetrics.Metrics_Counter_PreRetryResets), 1, []metrics.Label{{Name: "id", Value: s.id}, {Name: "middleware", Value: fmt.Sprintf("%T", retrymiddleware)}, {Name: "Prerun", Value: strconv.FormatBool(prerun)}})
case RetryResult_NextMW:
s.Logger.Debug("Retry Middleware %T Skipped", retrymiddleware)
metrics.IncrCounterWithLabels(schedmetrics.GetMetricsCounterKey(schedmetrics.Metrics_Counter_PreRetrySkips), 1, []metrics.Label{{Name: "id", Value: s.id}, {Name: "middleware", Value: fmt.Sprintf("%T", retrymiddleware)}, {Name: "Prerun", Value: strconv.FormatBool(prerun)}})
}
}
}
func (s *Task) runPostExecutionHandler(err error) MWResult {
for _, postmiddleware := range s.executationMiddleWares {
s.Logger.Trace("Running PostHandler Middlware %T", postmiddleware)
metrics.IncrCounterWithLabels(schedmetrics.GetMetricsCounterKey(schedmetrics.Metrics_Counter_PostExecutationFailedRuns), 1, []metrics.Label{{Name: "id", Value: s.id}, {Name: "middleware", Value: fmt.Sprintf("%T", postmiddleware)}})
result := postmiddleware.PostHandler(s, err)
switch result.Result {
case MWResult_Defer:
return MWResult{Result: MWResult_Defer}
case MWResult_Cancel:
return MWResult{Result: MWResult_Cancel}
case MWResult_NextMW:
continue
}
}
return MWResult{Result: MWResult_NextMW}
}
func (s *Task) runJobInstance(result chan interface{}) {
s.wg.Add(1)
defer s.wg.Done()
// Create a new instance of s.jobSrcFunc
jobInstance := job.NewJob(s.Ctx, s.jobSrcFunc)
joblog := s.Logger.With("Instance", jobInstance.ID())
joblog.Debug("Job Run Starting")
// Add to active jobs map
s.activeJobs.add(jobInstance)
defer s.activeJobs.delete(jobInstance)
// Logs and Metrics --------------------------------------
// -------------------------------------------------------
labels := []metrics.Label{{Name: "id", Value: s.id}}
metrics.IncrCounterWithLabels(schedmetrics.GetMetricsCounterKey(schedmetrics.Metrics_Counter_Runs), 1, labels)
if s.activeJobs.len() > 1 {
metrics.IncrCounterWithLabels(schedmetrics.GetMetricsCounterKey(schedmetrics.Metrics_Counter_OverlappingRuns), 1, labels)
}
// -------------------------------------------------------
// Synchronously Run Job Instance
lastError := jobInstance.Run()
// -------------------------------------------------------
// Logs and Metrics --------------------------------------
if lastError != nil {
joblog.
With("Duration", jobInstance.ActualElapsed().Round(1*time.Millisecond)).
With("State", jobInstance.State().String()).
With("Error", lastError.Error()).
Error("Job Error")
metrics.IncrCounterWithLabels([]string{"sched", "runerrors"}, 1, labels)
result <- lastError
} else {
joblog.
With("Duration", jobInstance.ActualElapsed().Round(1*time.Millisecond)).
With("State", jobInstance.State().String()).
Info("Job Finished")
result <- nil
}
}
func negativeToZero(nextRunDuration time.Duration) time.Duration {
if nextRunDuration < 0 {
nextRunDuration = 0
}
return nextRunDuration
}
func (s *Task) retryJob(in time.Duration) {
s.Logger.
With("Duration", in).
Debug("Rescheduling Job")
s.timer.Reschedule(in)
}
func (s *Task) GetNextRun() time.Time {
return s.nextRun.Get()
}
func (s *Task) Run() {
jobResultSignal := make(chan interface{})
defer close(jobResultSignal)
s.Logger.Info("Checking Pre Execution Middleware")
result, err := s.runPreExecutationMiddlware()
switch result.Result {
case MWResult_Cancel:
s.Logger.Warn("Scheduled Job run is Canceled")
t, _ := s.timer.Next()
s.nextRun.Set(t)
s.sendUpdateSignal(updateSignalOp_Reschedule)
return
case MWResult_Defer:
s.Logger.Info("Scheduled Job will be Retried")
s.runRetryMiddleware(true, err)
t, _ := s.timer.Next()
s.nextRun.Set(t)
s.sendUpdateSignal(updateSignalOp_Reschedule)
return
case MWResult_NextMW:
s.Logger.Info("Dispatching Job")
go s.runJobInstance(jobResultSignal)
t, _ := s.timer.Next()
s.nextRun.Set(t)
s.sendUpdateSignal(updateSignalOp_Reschedule)
}
select {
case result := <-jobResultSignal:
s.Logger.
With("Result", result).
Info("Got Job Result", "result", result)
err, ok := result.(joberrors.FailedJobError)
if ok {
s.Logger.Warn("Job Failed with %s", err.Error())
metrics.IncrCounterWithLabels(schedmetrics.GetMetricsCounterKey(schedmetrics.Metrics_Counter_FailedJobs), 1, []metrics.Label{{Name: "id", Value: s.id}})
mwresult := s.runPostExecutionHandler(err)
if mwresult.Result == MWResult_Defer {
/* run Retry Framework */
s.Logger.Debug("Post Executation Middleware Retry Request")
s.runRetryMiddleware(false, err)
}
} else {
metrics.IncrCounterWithLabels(schedmetrics.GetMetricsCounterKey(schedmetrics.Metrics_Counter_SucceededJobs), 1, []metrics.Label{{Name: "id", Value: s.id}})
s.runPostExecutionHandler(nil)
}
}
t, _ := s.timer.Next()
s.nextRun.Set(t)
s.sendUpdateSignal(updateSignalOp_Reschedule)
}
func (s *Task) sendUpdateSignal(op UpdateSignalOp_Type) {
s.Logger.Trace("Sending Update Signal")
s.updateSignal <- updateSignalOp{id: s.id, operation: op}
s.Logger.Trace("Sent Update Signal")
}

130
timer.go Normal file
View file

@ -0,0 +1,130 @@
package taskmanager
import (
"fmt"
"time"
"github.com/gorhill/cronexpr"
)
//Timer is an Interface for a Timer object that is used by a Schedule to determine when to run the next run of a job.
// Timer need to implement the Next() method returning the time of the next Job run. Timer indicates that no jobs shall
// be scheduled anymore by returning done == true. The `next time.Time` returned with `done bool` == true IS IGNORED.
// Next() shall not return time in the past. Time in the past is reset to time.Now() at evaluation time in the scheduler.
type Timer interface {
Next() (next time.Time, done bool)
Reschedule(delay time.Duration)
}
//Once A timer that run ONCE after an optional specific delay.
type Once struct {
delay time.Duration
done bool
}
//NewOnce Return a timer that trigger ONCE after `d` delay as soon as Timer is inquired for the next Run.
//Delay = 0 means the Timer return now(), aka as soon as time is inquired.
func NewOnce(d time.Duration) (*Once, error) {
if d < 0 {
return nil, fmt.Errorf("invalid d, must be >= 0")
}
return &Once{
delay: d,
}, nil
}
// NewOnceTime Return a timer that trigger ONCE at `t` time.Time.
//If `t` is in the past at inquery time, timer will NOT run.
func NewOnceTime(t time.Time) (*Once, error) {
remaining := time.Until(t)
if remaining < 0 {
return &Once{
delay: remaining,
done: true,
}, nil
}
return &Once{
delay: remaining,
}, nil
}
//Next Return Next Time OR a boolean indicating no more Next()(s)
func (o *Once) Next() (time.Time, bool) {
if !o.done {
o.done = true
return time.Now().Add(o.delay), false
}
return time.Time{}, o.done
}
func (o *Once) Reschedule(d time.Duration) {
o.delay = d
if o.done {
o.done = false
}
}
//Fixed A Timer that fires at a fixed duration intervals
type Fixed struct {
duration time.Duration
next time.Time
delay time.Duration
}
//NewFixed Returns Fixed Timer; A Timer that fires at a fixed duration intervals.
func NewFixed(duration time.Duration) (*Fixed, error) {
if duration < 0 {
return nil, fmt.Errorf("invalid duration, must be >= 0")
}
return &Fixed{
duration: duration,
next: time.Now().Add(duration),
}, nil
}
//Next Return Next fire time.
func (f *Fixed) Next() (time.Time, bool) {
now := time.Now()
if f.delay > 0 {
next := now.Add(f.delay)
f.delay = 0
return next, false
}
f.next = time.Now().Add(f.duration)
return f.next, false
}
func (f *Fixed) Reschedule(t time.Duration) {
f.delay = t
}
//Cron A Timer that fires at according to a cron expression.
//All expresion supported by `https://github.com/gorhill/cronexpr` are supported.
type Cron struct {
expression cronexpr.Expression
delay time.Duration
}
//NewCron returns a Timer that fires at according to a cron expression.
//All expresion supported by `https://github.com/gorhill/cronexpr` are supported.
func NewCron(cronExpression string) (*Cron, error) {
expression, err := cronexpr.Parse(cronExpression)
if err != nil {
return nil, fmt.Errorf("cron expression invalid: %w", err)
}
return &Cron{expression: *expression}, nil
}
//Next Return Next fire time.
func (c *Cron) Next() (time.Time, bool) {
if c.delay > 0 {
next := time.Now().Add(c.delay)
c.delay = 0
return next, false
}
return c.expression.Next(time.Now()), false
}
func (c *Cron) Reschedule(d time.Duration) {
c.delay = d
}

140
timer_test.go Normal file
View file

@ -0,0 +1,140 @@
package taskmanager
import (
"testing"
"time"
)
func TestTimerOnce(t *testing.T) {
timer, err := NewOnce(1 * time.Second)
if err != nil {
t.Errorf("NewOnce Timer Returned Error %s", err.Error())
}
next, run := timer.Next()
if !next.Round(time.Second).Equal(time.Now().Add(1 * time.Second).Round(time.Second)) {
t.Errorf("next != time.Now().Add(1 * time.Second) - %s - %s", next.Round(time.Second), time.Now().Add(1 * time.Second).Round(time.Second))
}
if run {
t.Errorf("Done is Not True")
}
next, run = timer.Next()
if !run {
t.Errorf("Done is Not True after second run")
}
timer.Reschedule(2 * time.Second)
next, run = timer.Next()
if !next.Round(time.Second).Equal(time.Now().Add(2 * time.Second).Round(time.Second)) {
t.Errorf("Reschedule next != time.Now().Add(2 * time.Second) - %s - %s", next.Round(time.Second), time.Now().Add(2 * time.Second).Round(time.Second))
}
if run {
t.Errorf("Done is Not True")
}
next, run = timer.Next()
if !run {
t.Errorf("Done is Not True after second run")
}
}
func TestTimerOnceInvalidDuration(t *testing.T) {
_, err := NewOnce(-1 * time.Second)
if err == nil {
t.Errorf("NewOnce Timer Did Not Returned Error")
}
}
func TestTimerOnceTime(t *testing.T) {
timer, err := NewOnceTime(time.Now().Add(1 * time.Second))
if err != nil {
t.Errorf("NewOnceTime Timer Returned Error %s", err.Error())
}
next, run := timer.Next()
if !next.Round(time.Second).Equal(time.Now().Add(1 * time.Second).Round(time.Second)) {
t.Errorf("NewOnceTime next != time.Now().Add(1 * time.Second) - %s - %s", next.Round(time.Second), time.Now().Add(1 * time.Second).Round(time.Second))
}
if run {
t.Errorf("NewOnceTime Done is Not True")
}
next, run = timer.Next()
if !run {
t.Errorf("NewOnceTime Done is Not True after second run")
}
}
func TestTimerOnceTimeInvalidDuration(t *testing.T) {
timer, err := NewOnceTime(time.Now().Add(-1 * time.Hour))
if err != nil {
t.Errorf("NewOnce Timer Returned Error %s", err.Error())
}
next, run := timer.Next()
t.Logf("test %d", timer.delay)
if !run {
t.Errorf("NewOnceTime Done is True")
}
if !next.Round(time.Second).Equal(time.Time{}.Round(time.Second)) {
t.Errorf("NewOnceTime next != invalid Time - %s - %s", next.Round(time.Second), time.Now().Add(1 * time.Second).Round(time.Second))
}
}
func TestTimerFixed(t *testing.T) {
timer, err := NewFixed(10 * time.Second)
if err != nil {
t.Errorf("NewFixed Timer Returned Error: %s", err.Error())
}
next, run := timer.Next()
if !next.Round(time.Second).Equal(time.Now().Add(10 * time.Second).Round(time.Second)) {
t.Errorf("FixedTimer next != time.Now().Add(10 *time.Second) - %s - %s", next.Round(time.Second), time.Now().Add(10 * time.Second).Round(time.Second))
}
if run {
t.Errorf("FixedTimer Run is False")
}
timer.Reschedule(2 * time.Second)
next, run = timer.Next()
if !next.Round(time.Second).Equal(time.Now().Add(2 * time.Second).Round(time.Second)) {
t.Errorf("FixedTimer next != time.Now().Add(2 *time.Second) - %s - %s", next.Round(time.Second), time.Now().Add(10 * time.Second).Round(time.Second))
}
if run {
t.Errorf("FixedTimer Run is False")
}
}
func TestTimerFixedInvalidDuration(t *testing.T) {
_, err := NewFixed(-1 * time.Second)
if err == nil {
t.Errorf("NewOnce Timer Did Not Returned Error")
}
}
func TestTimerCron(t *testing.T) {
timer, err := NewCron("5 4 1 12 2")
if err != nil {
t.Errorf("Crontimer Timer Returned Error: %s", err.Error())
}
next, run := timer.Next()
test, _ := time.Parse(time.RFC3339, "2021-12-01T04:05:00+08:00")
if !next.Round(time.Second).Equal(test.Round(time.Second)) {
t.Errorf("Crontimer next != 2021-12-01T04:05:00+08:00 - %s - %s", next.Round(time.Second), time.Now().Add(10 * time.Second).Round(time.Second))
}
if run {
t.Errorf("Crontimer Run is False")
}
timer.Reschedule(10 * time.Second)
next, run = timer.Next()
if !next.Round(time.Second).Equal(time.Now().Add(10 * time.Second).Round(time.Second)) {
t.Errorf("Crontimer next != time.Now().Add(10 *time.Second) - %s - %s", next.Round(time.Second), time.Now().Add(10 * time.Second).Round(time.Second))
}
if run {
t.Errorf("Crontimer Run is False")
}
}
func TestTimerCronInvalidFormat(t *testing.T) {
_, err := NewCron("5 4 1 14 2")
if err == nil {
t.Errorf("NewOnce Timer Did Not Returned Error")
}
}

58
utils/copymap.go Normal file
View file

@ -0,0 +1,58 @@
package utils
type CopyableMap map[string]interface{}
type CopyableSlice []interface{}
// DeepCopy will create a deep copy of this map. The depth of this
// copy is all inclusive. Both maps and slices will be considered when
// making the copy.
func (m CopyableMap) DeepCopy() map[string]interface{} {
result := map[string]interface{}{}
for k,v := range m {
// Handle maps
mapvalue,isMap := v.(map[string]interface{})
if isMap {
result[k] = CopyableMap(mapvalue).DeepCopy()
continue
}
// Handle slices
slicevalue,isSlice := v.([]interface{})
if isSlice {
result[k] = CopyableSlice(slicevalue).DeepCopy()
continue
}
result[k] = v
}
return result
}
// DeepCopy will create a deep copy of this slice. The depth of this
// copy is all inclusive. Both maps and slices will be considered when
// making the copy.
func (s CopyableSlice) DeepCopy() []interface{} {
result := []interface{}{}
for _,v := range s {
// Handle maps
mapvalue,isMap := v.(map[string]interface{})
if isMap {
result = append(result, CopyableMap(mapvalue).DeepCopy())
continue
}
// Handle slices
slicevalue,isSlice := v.([]interface{})
if isSlice {
result = append(result, CopyableSlice(slicevalue).DeepCopy())
continue
}
result = append(result, v)
}
return result
}

48
utils/copymap_test.go Normal file
View file

@ -0,0 +1,48 @@
package utils
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestCopyMap(t *testing.T) {
m1 := map[string]interface{}{
"a": "bbb",
"b": map[string]interface{}{
"c": 123,
},
"c": []interface{} {
"d", "e", map[string]interface{} {
"f": "g",
},
},
}
m2 := CopyableMap(m1).DeepCopy()
m1["a"] = "zzz"
delete(m1, "b")
m1["c"].([]interface{})[1] = "x"
m1["c"].([]interface{})[2].(map[string]interface{})["f"] = "h"
require.Equal(t, map[string]interface{}{
"a": "zzz",
"c": []interface{} {
"d", "x", map[string]interface{} {
"f": "h",
},
},
}, m1)
require.Equal(t, map[string]interface{}{
"a": "bbb",
"b": map[string]interface{}{
"c": 123,
},
"c": []interface{} {
"d", "e", map[string]interface{} {
"f": "g",
},
},
}, m2)
}