mirror of
https://github.com/Fishwaldo/sched.git
synced 2025-07-05 20:49:42 +00:00
feat: Introduce Context and a few sample middleware - Exponential Backoff, Constant Backoff, HaveTags. Also put a retry counter and limit
This commit is contained in:
parent
7f2a45f37e
commit
cf767ae59c
1 changed files with 309 additions and 0 deletions
309
middleware.go
Normal file
309
middleware.go
Normal file
|
@ -0,0 +1,309 @@
|
|||
package sched
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
)
|
||||
|
||||
type MiddleWarehandler interface {
|
||||
Handler(s *Schedule, newstate State) (stop bool, err error)
|
||||
}
|
||||
|
||||
type eboCtxKey struct{}
|
||||
|
||||
type ExponentialBackoffHandler struct {
|
||||
mx sync.RWMutex
|
||||
bo *backoff.ExponentialBackOff
|
||||
handlePanic bool
|
||||
handleOverlap bool
|
||||
handleDeferred bool
|
||||
}
|
||||
|
||||
func (ebh *ExponentialBackoffHandler) HandlePanic(val bool) {
|
||||
ebh.handlePanic = val
|
||||
}
|
||||
|
||||
func (ebh *ExponentialBackoffHandler) HandleOverlap(val bool) {
|
||||
ebh.handleOverlap = val
|
||||
}
|
||||
func (ebh *ExponentialBackoffHandler) HandleDeferred(val bool) {
|
||||
ebh.handleDeferred = val
|
||||
}
|
||||
|
||||
func (ebh *ExponentialBackoffHandler) shouldHandleState(state State) bool {
|
||||
if ebh.handlePanic && state == PANICED {
|
||||
return true
|
||||
}
|
||||
if ebh.handleOverlap && state == OVERLAPPINGJOB {
|
||||
return true
|
||||
}
|
||||
if ebh.handleDeferred && state == DEFERRED {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (ebh *ExponentialBackoffHandler) getBackOffCtx(s *Schedule) *backoff.ExponentialBackOff {
|
||||
bo, ok := s.ctx.Value(eboCtxKey{}).(*backoff.ExponentialBackOff)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return bo
|
||||
}
|
||||
|
||||
func (ebh *ExponentialBackoffHandler) setBackoffCtx(s *Schedule, bo *backoff.ExponentialBackOff) {
|
||||
s.ctx = context.WithValue(s.ctx, eboCtxKey{}, bo)
|
||||
}
|
||||
|
||||
func (ebh *ExponentialBackoffHandler) Handler(s *Schedule, newstate State) (bool, error) {
|
||||
ebh.mx.Lock()
|
||||
defer ebh.mx.Unlock()
|
||||
bo := ebh.getBackOffCtx(s)
|
||||
if newstate == NEW {
|
||||
s.logger.Infow("Creating New Exponential Backoff for Schedule")
|
||||
if bo == nil {
|
||||
bo := backoff.NewExponentialBackOff()
|
||||
bo.InitialInterval = ebh.bo.InitialInterval
|
||||
bo.MaxElapsedTime = ebh.bo.MaxElapsedTime
|
||||
bo.MaxInterval = ebh.bo.MaxInterval
|
||||
bo.Multiplier = ebh.bo.Multiplier
|
||||
bo.RandomizationFactor = ebh.bo.RandomizationFactor
|
||||
ebh.setBackoffCtx(s, bo)
|
||||
}
|
||||
}
|
||||
if newstate == COMPLETED {
|
||||
bo.Reset()
|
||||
s.RetryJob(0)
|
||||
return false, nil
|
||||
}
|
||||
s.logger.Infow("JOb State: ", "state", newstate.String())
|
||||
if ebh.shouldHandleState(newstate) {
|
||||
next := bo.NextBackOff()
|
||||
if next != bo.Stop {
|
||||
if s.RetryJob(next) {
|
||||
s.logger.Infow(fmt.Sprintf("Exponential BO Handler Retrying %s Job in %s", newstate.String(), next))
|
||||
}
|
||||
} else {
|
||||
return false, fmt.Errorf("max Elapsed Time Exceeded")
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func NewDefaultExponentialBackoffMW() *ExponentialBackoffHandler {
|
||||
val := NewExponentialBackoffMW(backoff.NewExponentialBackOff())
|
||||
return val
|
||||
}
|
||||
|
||||
func NewExponentialBackoffMW(ebo *backoff.ExponentialBackOff) *ExponentialBackoffHandler {
|
||||
val := ExponentialBackoffHandler{
|
||||
bo: ebo,
|
||||
handlePanic: true,
|
||||
handleOverlap: true,
|
||||
handleDeferred: true,
|
||||
}
|
||||
return &val
|
||||
}
|
||||
|
||||
type constantCtxKey struct{}
|
||||
|
||||
type ConstantBackoffHandler struct {
|
||||
mx sync.RWMutex
|
||||
interval time.Duration
|
||||
handlePanic bool
|
||||
handleOverlap bool
|
||||
handleDeferred bool
|
||||
}
|
||||
|
||||
func (ebh *ConstantBackoffHandler) HandlePanic(val bool) {
|
||||
ebh.handlePanic = val
|
||||
}
|
||||
|
||||
func (ebh *ConstantBackoffHandler) HandleOverlap(val bool) {
|
||||
ebh.handleOverlap = val
|
||||
}
|
||||
func (ebh *ConstantBackoffHandler) HandleDeferred(val bool) {
|
||||
ebh.handleDeferred = val
|
||||
}
|
||||
|
||||
func (ebh *ConstantBackoffHandler) shouldHandleState(state State) bool {
|
||||
if ebh.handlePanic && state == PANICED {
|
||||
return true
|
||||
}
|
||||
if ebh.handleOverlap && state == OVERLAPPINGJOB {
|
||||
return true
|
||||
}
|
||||
if ebh.handleDeferred && state == DEFERRED {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (ebh *ConstantBackoffHandler) getBackOffCtx(s *Schedule) *backoff.ConstantBackOff {
|
||||
bo, ok := s.ctx.Value(constantCtxKey{}).(*backoff.ConstantBackOff)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return bo
|
||||
}
|
||||
|
||||
func (ebh *ConstantBackoffHandler) setBackoffCtx(s *Schedule, bo *backoff.ConstantBackOff) {
|
||||
s.ctx = context.WithValue(s.ctx, constantCtxKey{}, bo)
|
||||
}
|
||||
|
||||
func (ebh *ConstantBackoffHandler) Handler(s *Schedule, newstate State) (bool, error) {
|
||||
ebh.mx.Lock()
|
||||
defer ebh.mx.Unlock()
|
||||
bo := ebh.getBackOffCtx(s)
|
||||
if newstate == NEW {
|
||||
s.logger.Infow("Creating New Constant Backoff for Schedule")
|
||||
if bo == nil {
|
||||
bo := backoff.NewConstantBackOff(ebh.interval)
|
||||
ebh.setBackoffCtx(s, bo)
|
||||
}
|
||||
}
|
||||
if newstate == COMPLETED {
|
||||
bo.Reset()
|
||||
s.RetryJob(0)
|
||||
return false, nil
|
||||
}
|
||||
if ebh.shouldHandleState(newstate) {
|
||||
next := bo.NextBackOff()
|
||||
if next != backoff.Stop {
|
||||
if s.RetryJob(next) {
|
||||
s.logger.Infow(fmt.Sprintf("Constant BO Handler Retrying %s Job in %s", newstate.String(), next))
|
||||
}
|
||||
} else {
|
||||
return false, fmt.Errorf("max Elapsed Time Exceeded")
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func NewDefaultConstandBackoffMW() *ConstantBackoffHandler {
|
||||
val := NewConstandBackoffMW(1 * time.Second)
|
||||
return val
|
||||
}
|
||||
|
||||
func NewConstandBackoffMW(interval time.Duration) *ConstantBackoffHandler {
|
||||
val := ConstantBackoffHandler{
|
||||
interval: interval,
|
||||
handlePanic: true,
|
||||
handleOverlap: true,
|
||||
handleDeferred: true,
|
||||
}
|
||||
return &val
|
||||
}
|
||||
|
||||
type hasTagCtxKey struct{}
|
||||
|
||||
type HasTagHandler struct {
|
||||
mx sync.RWMutex
|
||||
wantTags map[string]interface{}
|
||||
haveTags map[string]interface{}
|
||||
handlePanic bool
|
||||
handleOverlap bool
|
||||
handleDeferred bool
|
||||
}
|
||||
|
||||
func (hth *HasTagHandler) HandlePanic(val bool) {
|
||||
hth.handlePanic = val
|
||||
}
|
||||
|
||||
func (hth *HasTagHandler) HandleOverlap(val bool) {
|
||||
hth.handleOverlap = val
|
||||
}
|
||||
func (hth *HasTagHandler) HandleDeferred(val bool) {
|
||||
hth.handleDeferred = val
|
||||
}
|
||||
|
||||
func (hth *HasTagHandler) SetHaveTags(tag string) {
|
||||
hth.mx.Lock()
|
||||
defer hth.mx.Unlock()
|
||||
hth.haveTags[tag] = nil
|
||||
}
|
||||
|
||||
func (hth *HasTagHandler) DelHaveTags(tag string) {
|
||||
hth.mx.Lock()
|
||||
defer hth.mx.Unlock()
|
||||
delete(hth.haveTags, tag)
|
||||
}
|
||||
|
||||
func (hth *HasTagHandler) IsHaveTag(tag string) bool {
|
||||
hth.mx.RLock()
|
||||
defer hth.mx.RUnlock()
|
||||
_, ok := hth.haveTags[tag]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (hth *HasTagHandler) SetWantTags(tag string) {
|
||||
hth.mx.Lock()
|
||||
defer hth.mx.Unlock()
|
||||
hth.wantTags[tag] = nil
|
||||
}
|
||||
|
||||
func (hth *HasTagHandler) DelWantTags(tag string) {
|
||||
hth.mx.Lock()
|
||||
defer hth.mx.Unlock()
|
||||
delete(hth.wantTags, tag)
|
||||
}
|
||||
|
||||
func (hth *HasTagHandler) IsWantTag(tag string) bool {
|
||||
hth.mx.RLock()
|
||||
defer hth.mx.RUnlock()
|
||||
_, ok := hth.wantTags[tag]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (hth *HasTagHandler) shouldHandleState(state State) bool {
|
||||
if hth.handlePanic && state == PANICED {
|
||||
return true
|
||||
}
|
||||
if hth.handleOverlap && state == OVERLAPPINGJOB {
|
||||
return true
|
||||
}
|
||||
if hth.handleDeferred && state == DEFERRED {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (hth *HasTagHandler) getTagCtx(s *Schedule) *backoff.ConstantBackOff {
|
||||
bo, ok := s.ctx.Value(hasTagCtxKey{}).(*backoff.ConstantBackOff)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return bo
|
||||
}
|
||||
|
||||
func (hth *HasTagHandler) setTagCtx(s *Schedule, bo *backoff.ConstantBackOff) {
|
||||
s.ctx = context.WithValue(s.ctx, hasTagCtxKey{}, bo)
|
||||
}
|
||||
|
||||
func (hth *HasTagHandler) Handler(s *Schedule, newstate State) (bool, error) {
|
||||
if newstate == DISPATCHED {
|
||||
s.logger.Infow("Checking Tags", "have", hth.haveTags, "Want", hth.wantTags)
|
||||
for k := range hth.wantTags {
|
||||
if !(hth.IsHaveTag(k)) {
|
||||
s.logger.Infow("Missing Tag", "tag", k)
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func NewTagHandlerMW() *HasTagHandler {
|
||||
val := HasTagHandler{
|
||||
wantTags: make(map[string]interface{}),
|
||||
haveTags: make(map[string]interface{}),
|
||||
handlePanic: true,
|
||||
handleOverlap: true,
|
||||
handleDeferred: true,
|
||||
}
|
||||
return &val
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue