sched/middleware.go

309 lines
7.1 KiB
Go

package sched
import (
"context"
"fmt"
"sync"
"time"
"github.com/cenkalti/backoff/v4"
)
type MiddleWarehandler interface {
Handler(s *Schedule, newstate State) (stop bool, err error)
}
type eboCtxKey struct{}
type ExponentialBackoffHandler struct {
mx sync.RWMutex
bo *backoff.ExponentialBackOff
handlePanic bool
handleOverlap bool
handleDeferred bool
}
func (ebh *ExponentialBackoffHandler) HandlePanic(val bool) {
ebh.handlePanic = val
}
func (ebh *ExponentialBackoffHandler) HandleOverlap(val bool) {
ebh.handleOverlap = val
}
func (ebh *ExponentialBackoffHandler) HandleDeferred(val bool) {
ebh.handleDeferred = val
}
func (ebh *ExponentialBackoffHandler) shouldHandleState(state State) bool {
if ebh.handlePanic && state == PANICED {
return true
}
if ebh.handleOverlap && state == OVERLAPPINGJOB {
return true
}
if ebh.handleDeferred && state == DEFERRED {
return true
}
return false
}
func (ebh *ExponentialBackoffHandler) getBackOffCtx(s *Schedule) *backoff.ExponentialBackOff {
bo, ok := s.ctx.Value(eboCtxKey{}).(*backoff.ExponentialBackOff)
if !ok {
return nil
}
return bo
}
func (ebh *ExponentialBackoffHandler) setBackoffCtx(s *Schedule, bo *backoff.ExponentialBackOff) {
s.ctx = context.WithValue(s.ctx, eboCtxKey{}, bo)
}
func (ebh *ExponentialBackoffHandler) Handler(s *Schedule, newstate State) (bool, error) {
ebh.mx.Lock()
defer ebh.mx.Unlock()
bo := ebh.getBackOffCtx(s)
if newstate == NEW {
s.logger.Infow("Creating New Exponential Backoff for Schedule")
if bo == nil {
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = ebh.bo.InitialInterval
bo.MaxElapsedTime = ebh.bo.MaxElapsedTime
bo.MaxInterval = ebh.bo.MaxInterval
bo.Multiplier = ebh.bo.Multiplier
bo.RandomizationFactor = ebh.bo.RandomizationFactor
ebh.setBackoffCtx(s, bo)
}
}
if newstate == COMPLETED {
bo.Reset()
s.RetryJob(0)
return false, nil
}
s.logger.Infow("JOb State: ", "state", newstate.String())
if ebh.shouldHandleState(newstate) {
next := bo.NextBackOff()
if next != bo.Stop {
if s.RetryJob(next) {
s.logger.Infow(fmt.Sprintf("Exponential BO Handler Retrying %s Job in %s", newstate.String(), next))
}
} else {
return false, fmt.Errorf("max Elapsed Time Exceeded")
}
}
return false, nil
}
func NewDefaultExponentialBackoffMW() *ExponentialBackoffHandler {
val := NewExponentialBackoffMW(backoff.NewExponentialBackOff())
return val
}
func NewExponentialBackoffMW(ebo *backoff.ExponentialBackOff) *ExponentialBackoffHandler {
val := ExponentialBackoffHandler{
bo: ebo,
handlePanic: true,
handleOverlap: true,
handleDeferred: true,
}
return &val
}
type constantCtxKey struct{}
type ConstantBackoffHandler struct {
mx sync.RWMutex
interval time.Duration
handlePanic bool
handleOverlap bool
handleDeferred bool
}
func (ebh *ConstantBackoffHandler) HandlePanic(val bool) {
ebh.handlePanic = val
}
func (ebh *ConstantBackoffHandler) HandleOverlap(val bool) {
ebh.handleOverlap = val
}
func (ebh *ConstantBackoffHandler) HandleDeferred(val bool) {
ebh.handleDeferred = val
}
func (ebh *ConstantBackoffHandler) shouldHandleState(state State) bool {
if ebh.handlePanic && state == PANICED {
return true
}
if ebh.handleOverlap && state == OVERLAPPINGJOB {
return true
}
if ebh.handleDeferred && state == DEFERRED {
return true
}
return false
}
func (ebh *ConstantBackoffHandler) getBackOffCtx(s *Schedule) *backoff.ConstantBackOff {
bo, ok := s.ctx.Value(constantCtxKey{}).(*backoff.ConstantBackOff)
if !ok {
return nil
}
return bo
}
func (ebh *ConstantBackoffHandler) setBackoffCtx(s *Schedule, bo *backoff.ConstantBackOff) {
s.ctx = context.WithValue(s.ctx, constantCtxKey{}, bo)
}
func (ebh *ConstantBackoffHandler) Handler(s *Schedule, newstate State) (bool, error) {
ebh.mx.Lock()
defer ebh.mx.Unlock()
bo := ebh.getBackOffCtx(s)
if newstate == NEW {
s.logger.Infow("Creating New Constant Backoff for Schedule")
if bo == nil {
bo := backoff.NewConstantBackOff(ebh.interval)
ebh.setBackoffCtx(s, bo)
}
}
if newstate == COMPLETED {
bo.Reset()
s.RetryJob(0)
return false, nil
}
if ebh.shouldHandleState(newstate) {
next := bo.NextBackOff()
if next != backoff.Stop {
if s.RetryJob(next) {
s.logger.Infow(fmt.Sprintf("Constant BO Handler Retrying %s Job in %s", newstate.String(), next))
}
} else {
return false, fmt.Errorf("max Elapsed Time Exceeded")
}
}
return false, nil
}
func NewDefaultConstandBackoffMW() *ConstantBackoffHandler {
val := NewConstandBackoffMW(1 * time.Second)
return val
}
func NewConstandBackoffMW(interval time.Duration) *ConstantBackoffHandler {
val := ConstantBackoffHandler{
interval: interval,
handlePanic: true,
handleOverlap: true,
handleDeferred: true,
}
return &val
}
type hasTagCtxKey struct{}
type HasTagHandler struct {
mx sync.RWMutex
wantTags map[string]interface{}
haveTags map[string]interface{}
handlePanic bool
handleOverlap bool
handleDeferred bool
}
func (hth *HasTagHandler) HandlePanic(val bool) {
hth.handlePanic = val
}
func (hth *HasTagHandler) HandleOverlap(val bool) {
hth.handleOverlap = val
}
func (hth *HasTagHandler) HandleDeferred(val bool) {
hth.handleDeferred = val
}
func (hth *HasTagHandler) SetHaveTags(tag string) {
hth.mx.Lock()
defer hth.mx.Unlock()
hth.haveTags[tag] = nil
}
func (hth *HasTagHandler) DelHaveTags(tag string) {
hth.mx.Lock()
defer hth.mx.Unlock()
delete(hth.haveTags, tag)
}
func (hth *HasTagHandler) IsHaveTag(tag string) bool {
hth.mx.RLock()
defer hth.mx.RUnlock()
_, ok := hth.haveTags[tag]
return ok
}
func (hth *HasTagHandler) SetWantTags(tag string) {
hth.mx.Lock()
defer hth.mx.Unlock()
hth.wantTags[tag] = nil
}
func (hth *HasTagHandler) DelWantTags(tag string) {
hth.mx.Lock()
defer hth.mx.Unlock()
delete(hth.wantTags, tag)
}
func (hth *HasTagHandler) IsWantTag(tag string) bool {
hth.mx.RLock()
defer hth.mx.RUnlock()
_, ok := hth.wantTags[tag]
return ok
}
func (hth *HasTagHandler) shouldHandleState(state State) bool {
if hth.handlePanic && state == PANICED {
return true
}
if hth.handleOverlap && state == OVERLAPPINGJOB {
return true
}
if hth.handleDeferred && state == DEFERRED {
return true
}
return false
}
func (hth *HasTagHandler) getTagCtx(s *Schedule) *backoff.ConstantBackOff {
bo, ok := s.ctx.Value(hasTagCtxKey{}).(*backoff.ConstantBackOff)
if !ok {
return nil
}
return bo
}
func (hth *HasTagHandler) setTagCtx(s *Schedule, bo *backoff.ConstantBackOff) {
s.ctx = context.WithValue(s.ctx, hasTagCtxKey{}, bo)
}
func (hth *HasTagHandler) Handler(s *Schedule, newstate State) (bool, error) {
if newstate == DISPATCHED {
s.logger.Infow("Checking Tags", "have", hth.haveTags, "Want", hth.wantTags)
for k := range hth.wantTags {
if !(hth.IsHaveTag(k)) {
s.logger.Infow("Missing Tag", "tag", k)
return true, nil
}
}
}
return false, nil
}
func NewTagHandlerMW() *HasTagHandler {
val := HasTagHandler{
wantTags: make(map[string]interface{}),
haveTags: make(map[string]interface{}),
handlePanic: true,
handleOverlap: true,
handleDeferred: true,
}
return &val
}