Continue Refactoring Library Code

This commit is contained in:
Justin Hammond 2022-09-06 13:27:00 +08:00
parent f3ba7c0e73
commit 413562261b
28 changed files with 1674 additions and 529 deletions

1
go.mod
View file

@ -109,6 +109,7 @@ require (
go.mongodb.org/mongo-driver v1.9.1 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
golang.org/x/image v0.0.0-20211028202545-6944b10bf410 // indirect
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect

2
go.sum
View file

@ -769,6 +769,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e h1:+WEEuIdZHnUeJJmEUjyYC2gfUMj69yZXw17EnHg/otA=
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e/go.mod h1:Kr81I6Kryrl9sr8s2FK3vxD90NdsKWRuOIl2O4CvYbA=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/image v0.0.0-20211028202545-6944b10bf410 h1:hTftEOvwiOq2+O8k2D5/Q7COC7k5Qcrgc2TFURJYnvQ=

View file

@ -22,7 +22,6 @@ func GetZapLogger() (*zap.Logger, error) {
return zapLog, nil
}
func InitLogger() logr.Logger {
var cfg zap.Config
var lvl zapcore.Level

View file

@ -6,8 +6,6 @@ import (
mouthpiece "github.com/Fishwaldo/mouthpiece/pkg"
"github.com/Fishwaldo/mouthpiece/pkg/interfaces"
"github.com/danielgtaylor/huma"
"github.com/danielgtaylor/huma/responses"
)
@ -22,12 +20,11 @@ func setupApps(res *huma.Resource, mps *mouthpiece.MouthPiece) error {
responses.OK().Headers("Set-Cookie"),
responses.OK().Model([]interfaces.AppDetails{}),
).Run(func(ctx huma.Context) {
apps := mps.GetAppService().GetApps(ctx)
var newapps []interfaces.AppDetails
for _, app := range apps {
newapps = append(newapps, app.GetDetails())
if apps, err := mps.GetAppService().GetApps(ctx); err != nil {
ctx.WriteError(http.StatusInternalServerError, "Internal Error", err)
} else {
ctx.WriteModel(http.StatusOK, apps)
}
ctx.WriteModel(http.StatusOK, newapps)
})
appapi.Put("create-app", "Create a Application",

View file

@ -69,7 +69,7 @@ func NewRestAPI(mps *mouthpiece.MouthPiece) *RestAPI {
OneMin rest.BenchmarkStats `json:"1min"`
FiveMin rest.BenchmarkStats `json:"5min"`
FifteenMin rest.BenchmarkStats `json:"15min"`
Hourly rest.BenchmarkStats `json:"hourly"`
Hourly rest.BenchmarkStats `json:"hourly"`
}{
bench.Stats(time.Minute),
bench.Stats(time.Minute * 5),

View file

@ -3,15 +3,15 @@ package restapi
import (
"context"
"net/http"
"strconv"
"path"
"strconv"
"github.com/Fishwaldo/mouthpiece/pkg/interfaces"
"github.com/Fishwaldo/mouthpiece/pkg/log"
"github.com/danielgtaylor/huma"
"github.com/go-pkgz/auth/token"
"github.com/go-chi/chi"
"github.com/go-pkgz/auth/token"
)
// UpdateAuthContext defines interface adding extras or modifying UserInfo in request context
@ -78,4 +78,4 @@ func CleanPath(next http.Handler) http.Handler {
next.ServeHTTP(w, r)
})
}
}

View file

@ -14,4 +14,4 @@ func checkErrors(ctx huma.Context, err error) {
} else {
ctx.AddError(err)
}
}
}

16
main.go
View file

@ -25,6 +25,7 @@ SOFTWARE.
package main
import (
"context"
"embed"
"fmt"
@ -38,8 +39,9 @@ import (
// "github.com/Fishwaldo/mouthpiece/pkg/apps"
// "github.com/Fishwaldo/mouthpiece/pkg/db"
// "github.com/Fishwaldo/mouthpiece/pkg/filter"
"github.com/Fishwaldo/mouthpiece/pkg/interfaces"
"github.com/Fishwaldo/mouthpiece/pkg/log"
// "github.com/Fishwaldo/mouthpiece/pkg/message"
"github.com/Fishwaldo/mouthpiece/pkg/msg"
// "github.com/Fishwaldo/mouthpiece/pkg/transport"
// "github.com/Fishwaldo/mouthpiece/pkg/users"
@ -117,7 +119,16 @@ func main() {
fmt.Println(bi.String())
mps := mouthpiece.NewMouthPiece(sqlite.Open("test.db"), mpserver.InitLogger())
ctx := interfaces.NewContext(context.Background())
db := sqlite.Open("test.db?cache=shared&mode=rwc")
mps := mouthpiece.NewMouthPiece(ctx, db, mpserver.InitLogger())
mps.Start()
mps.SeedMouthPieceApp(context.Background())
msg := msg.NewMessage("MouthPiece")
msg.Body.Message = "Hello World"
msg.ProcessMessage()
mps.RouteMessage(context.Background(), msg)
restapi := restapi.NewRestAPI(mps)
@ -127,7 +138,6 @@ func main() {
zl, _ := mpserver.GetZapLogger()
zl.Sync()
// Create a new router & CLI with default middleware.
// Server = mpserver{

View file

@ -1,43 +1,50 @@
package apps
import (
// "errors"
"context"
// "fmt"
"github.com/Fishwaldo/mouthpiece/pkg/db"
"github.com/Fishwaldo/mouthpiece/pkg/filter"
"github.com/Fishwaldo/mouthpiece/pkg/interfaces"
"github.com/Fishwaldo/mouthpiece/pkg/log"
"github.com/Fishwaldo/mouthpiece/pkg/message"
"github.com/Fishwaldo/mouthpiece/pkg/mperror"
"github.com/Fishwaldo/mouthpiece/pkg/msg"
"github.com/Fishwaldo/mouthpiece/pkg/validate"
"gorm.io/gorm"
)
type ApplicationFilters struct {
gorm.Model `json:"-"`
AppID uint `json:"-"`
Name string
ID uint `doc:"Filter ID" gorm:"primary_key"`
AppID uint `json:"-"`
Name string
}
type App struct {
mpctx *interfaces.MPContext
interfaces.AppDetails
Filters []ApplicationFilters
Filters []AppFilter `json:"-" gorm:"many2many:app2flt;"`
}
func (app App) validateSaveField(val interfaces.AppDetails, fields...string) error {
if len(fields) > 0 {
type AppFilter struct {
ID uint `doc:"Filter ID" gorm:"primary_key"`
AppID uint `json:"-"`
FilterID uint `json:"-"`
}
// TableName Override the table name for the App table
// to ensure its consistant with the table name defined in the Interfaces Package
func (app *App) TableName() string {
return "apps"
}
func (app *App) validateSaveField(val interfaces.AppDetails, fields ...string) error {
if len(fields) > 0 {
if err := validate.Get().StructPartial(val, fields...); err != nil {
log.Log.Info("Validation Error", "Error", err)
return err;
return err
}
} else {
if err := validate.Get().Struct(val); err != nil {
log.Log.Info("Validation Error", "Error", err)
return err;
return err
}
}
app.AppDetails = val
@ -48,74 +55,138 @@ func (app App) validateSaveField(val interfaces.AppDetails, fields...string) err
return nil
}
func (app App) GetName() string {
func (app *App) SetSvcCtx(ctx *interfaces.MPContext) {
app.mpctx = ctx
}
func (app *App) GetName() string {
return app.AppName
}
func (app App) SetName(name string) error {
func (app *App) SetName(name string) error {
var details interfaces.AppDetails = app.AppDetails
details.AppName = name
return app.validateSaveField(details, "AppName")
}
func (app App) GetID() uint {
func (app *App) GetID() uint {
return app.ID
}
func (app App) GetDetails() interfaces.AppDetails {
func (app *App) GetDetails() interfaces.AppDetails {
return app.AppDetails
}
func (app App) SetDetails(details interfaces.AppDetails) error {
func (app *App) SetDetails(details interfaces.AppDetails) error {
return app.validateSaveField(details)
}
func (app App) GetDescription() string {
func (app *App) GetDescription() string {
return app.Description
}
func (app App) SetDescription(description string) error {
func (app *App) SetDescription(description string) error {
var details interfaces.AppDetails = app.GetDetails()
details.Description = description
return app.validateSaveField(details, "Description")
return app.validateSaveField(details, "Description")
}
func (app App) GetIcon() string {
func (app *App) GetIcon() string {
return app.Icon
}
func (app App) SetIcon(icon string) error {
func (app *App) SetIcon(icon string) error {
var details interfaces.AppDetails = app.AppDetails
details.Icon = icon
return app.validateSaveField(details, "Icon")
}
func (app App) GetURL() string {
func (app *App) GetURL() string {
return app.URL
}
func (app App) SetURL(icon string) error {
func (app *App) SetURL(icon string) error {
var details interfaces.AppDetails = app.AppDetails
details.Icon = icon
return app.validateSaveField(details, "URL")
}
func (app App) ProcessMessage(ctx context.Context, us interfaces.UserServicierI, msg *msg.Message) error {
func (app *App) ProcessMessage(ctx context.Context, us interfaces.UserServicierI, msg *msg.Message) error {
log.Log.V(1).Info("App Processing Message", "App", app.AppName, "MessageID", msg.ID)
/* populate Message Fields with App Data */
msg.Body.Fields["app_description"] = app.Description
msg.Body.Fields["app_icon"] = app.Icon
msg.Body.Fields["app_url"] = app.URL
// make sure our Filters are loaded
if len(app.Filters) == 0 {
log.Log.Info("Loading Filters?", "App", app.AppName)
if tx := db.Db.Debug().Model(&app).Association("Filters").Find(&app.Filters); tx != nil {
log.Log.Error(tx, "Error loading application filters", "AppName", app.AppName)
return tx
}
}
log.Log.V(1).Info("App Filters", "App", app.AppName, "Filters", app.Filters)
for _, appfilter := range app.Filters {
flt := filter.FindFilter(appfilter.Name)
if flt != nil {
log.Log.V(1).Info("App Processing Message with Filter", "Filter", appfilter)
ok, _ := flt.ProcessMessage(ctx, msg)
if !ok {
log.Log.Info("App Filter Blocked Message", "App", app.AppName, "Filter", appfilter, "Message", msg)
return nil
}
} else {
log.Log.Info("App Filter Not Found", "App", app.AppName, "Filter", appfilter)
flt := app.mpctx.GetFilterService().GetByID(ctx, appfilter.FilterID, interfaces.AppFilter)
log.Log.V(1).Info("App Processing Message with Filter", "Filter", flt.GetName())
if ok, err := flt.ProcessMessage(ctx, msg); err != nil {
log.Log.Error(err, "Error processing message with filter", "Filter", flt.GetName())
continue
} else if !ok {
log.Log.Info("App Filter Blocked Message", "App", app.AppName, "Filter", flt.GetName(), "Message", msg)
return nil
}
}
if err := app.mpctx.GetGroupService().SendMessageToUsers(ctx, msg, app); err != nil {
log.Log.Error(err, "Error sending message to users", "App", app.AppName)
}
if err := app.mpctx.GetGroupService().SendMessageToTransports(ctx, msg, app); err != nil {
log.Log.Error(err, "Error sending message to transports", "App", app.AppName)
}
return nil
}
func (app *App) AddFilter(ctx context.Context, filter interfaces.FilterI) error {
if app.ID == 0 {
log.Log.Error(nil, "Error adding filter to application, application not saved", "AppName", app.AppName)
return mperror.ErrAppNotFound
}
if filter.GetID() == 0 {
log.Log.Error(nil, "Error adding filter to application, filter not saved", "AppName", app.AppName, "Filter", filter.GetName())
return mperror.ErrFilterNotFound
}
appflt := AppFilter{AppID: app.ID, FilterID: filter.GetID()}
if tx := db.Db.Model(&app).Association("Filters").Append(&appflt); tx != nil {
log.Log.Error(tx, "Error adding filter to application", "AppName", app.AppName, "Filter", filter.GetName())
return tx
}
app.Filters = append(app.Filters, appflt)
return nil
}
func removeCopy(slice []AppFilter, i int) []AppFilter {
copy(slice[i:], slice[i+1:])
return slice[:len(slice)-1]
}
func (app *App) DelFilter(ctx context.Context, filter interfaces.FilterI) error {
if app.ID == 0 {
log.Log.Error(nil, "Error adding filter to application, application not saved", "AppName", app.AppName)
return mperror.ErrAppNotFound
}
if filter.GetID() == 0 {
log.Log.Error(nil, "Error adding filter to application, filter not saved", "AppName", app.AppName, "Filter", filter.GetName())
return mperror.ErrFilterNotFound
}
appflt := AppFilter{AppID: app.ID, FilterID: filter.GetID()}
if tx := db.Db.Model(&app).Association("Filters").Delete(&appflt); tx != nil {
log.Log.Error(tx, "Error deleting filter from application", "AppName", app.AppName, "Filter", filter.GetName())
return tx
}
for i, appfilter := range app.Filters {
if appfilter.ID == appflt.ID {
app.Filters = removeCopy(app.Filters, i)
break
}
}
return nil

View file

@ -5,10 +5,9 @@ import (
"errors"
"github.com/Fishwaldo/mouthpiece/pkg/db"
"github.com/Fishwaldo/mouthpiece/pkg/errors"
"github.com/Fishwaldo/mouthpiece/pkg/filter"
"github.com/Fishwaldo/mouthpiece/pkg/interfaces"
"github.com/Fishwaldo/mouthpiece/pkg/log"
"github.com/Fishwaldo/mouthpiece/pkg/mperror"
"github.com/Fishwaldo/mouthpiece/pkg/validate"
"github.com/go-playground/validator/v10"
@ -17,55 +16,65 @@ import (
"gorm.io/gorm"
)
type AppService struct {
ctx *interfaces.MPContext
}
func NewAppsService() *AppService {
db.Db.AutoMigrate(&App{})
db.Db.AutoMigrate(&ApplicationFilters{})
return &AppService{}
return &AppService{
}
}
func (a AppService) GetApps(ctx context.Context) map[uint]interfaces.AppI {
func (a *AppService) Start(ctx *interfaces.MPContext) error {
db.Db.AutoMigrate(&App{})
a.ctx = ctx
return nil
}
func (a *AppService) GetApps(ctx context.Context) ([]interfaces.AppDetails, error) {
var apps []App
appMap := make(map[uint]interfaces.AppI)
var appDetails []interfaces.AppDetails
if tx := db.Db.WithContext(ctx).Find(&apps); tx.Error != nil {
log.Log.Error(tx.Error, "Error Finding Apps")
return appDetails, tx.Error
}
for _, app := range apps {
appMap[app.ID] = app
appDetails = append(appDetails, app.AppDetails)
}
return appMap
return appDetails, nil
}
func (a AppService) GetAppByName(ctx context.Context, app_name string) (app interfaces.AppI, err error) {
func (a *AppService) GetAppByName(ctx context.Context, app_name string) (app interfaces.AppI, err error) {
var dbapp App
tx := db.Db.WithContext(ctx).Preload("Filters").First(&dbapp, "app_name = ?", app_name)
log.Log.V(1).Info("Finding App", "App", app_name, "Result", tx, "app", dbapp)
if tx.Error != nil {
log.Log.V(1).Error(tx.Error, "GetAppByName", "App", app_name)
}
dbapp.SetSvcCtx(a.ctx)
return &dbapp, tx.Error
}
func (a AppService) GetApp(ctx context.Context, appid uint) (app interfaces.AppI, err error) {
func (a *AppService) GetApp(ctx context.Context, appid uint) (app interfaces.AppI, err error) {
var dbApp App
tx := db.Db.WithContext(ctx).Preload("Filters").First(&dbApp, "id = ?", appid)
log.Log.V(1).Info("Finding App", "App", appid, "Result", tx, "app", dbApp)
if tx.Error != nil {
log.Log.V(1).Error(tx.Error, "GetApp", "App", appid)
}
dbApp.SetSvcCtx(a.ctx)
return &dbApp, tx.Error
}
func (a AppService) CreateApp(ctx context.Context, app interfaces.AppDetails) (newapp interfaces.AppI, err error) {
func (a *AppService) GetAppObj(ctx context.Context, appDet interfaces.AppDetails) (app interfaces.AppI, err error) {
return a.GetApp(ctx, appDet.ID)
}
func (a *AppService) CreateApp(ctx context.Context, app interfaces.AppDetails) (newapp interfaces.AppI, err error) {
newapp, err = a.GetAppByName(ctx, app.AppName)
if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
log.Log.Info("Creating New App", "App", app)
var dbApp App
copier.Copy(&dbApp, &app)
if filter.FindFilter("CopyShortMessage") != nil {
dbApp.Filters = append(dbApp.Filters, ApplicationFilters{Name: "CopyShortMessage"})
}
if filter.FindFilter("FindSeverity") != nil {
dbApp.Filters = append(dbApp.Filters, ApplicationFilters{Name: "FindSeverity"})
}
if err := validate.Get().Struct(dbApp); err != nil {
for _, e := range err.(validator.ValidationErrors) {
log.Log.Info("CreateApp: Validation Error", "Error", e)
@ -74,7 +83,7 @@ func (a AppService) CreateApp(ctx context.Context, app interfaces.AppDetails) (n
}
result := db.Db.WithContext(ctx).Create(&dbApp)
if result.Error != nil {
return newapp, result.Error
return nil, result.Error
}
return a.GetApp(ctx, dbApp.ID)
}

View file

@ -1,11 +0,0 @@
package mperror
import (
"errors"
)
var (
ErrAppExists = errors.New("App Already Exists")
ErrAppNotFound = errors.New("App Not Found")
ErrUserNotFound = errors.New("User Not Found")
)

View file

@ -0,0 +1,208 @@
package evalfilter
import (
"context"
"fmt"
"github.com/Fishwaldo/mouthpiece/pkg/filter"
"github.com/Fishwaldo/mouthpiece/pkg/log"
"github.com/Fishwaldo/mouthpiece/pkg/msg"
"github.com/go-logr/logr"
"github.com/skx/evalfilter/v2"
"github.com/skx/evalfilter/v2/object"
"golang.org/x/exp/slices"
)
var llog logr.Logger
type EvalFilter struct {
ready bool
filter *evalfilter.Eval
processedMessage *msg.Message
config []filter.Filterconfig
}
var (
FilterImplDetails = filter.FilterImplDetails{Factory: NewEvalFilter}
)
func init() {
fmt.Println("Registering EvalFilter")
filter.RegisterFilterImpl("EvalFilter", FilterImplDetails)
}
func NewEvalFilter(ctx context.Context, config []filter.Filterconfig) (filter.FilterImplI, error) {
llog = log.Log.WithName("EvalFilter")
if idx := slices.IndexFunc(config, func(config filter.Filterconfig) bool { return config.Name == "name" }); idx == -1 {
return &EvalFilter{}, fmt.Errorf("EvalFilter: No name specified")
}
if idx := slices.IndexFunc(config, func(config filter.Filterconfig) bool { return config.Name == "script" }); idx == -1 {
return &EvalFilter{}, fmt.Errorf("EvalFilter: No Script specified")
}
ef := &EvalFilter{
config: config,
}
return ef, nil
}
func (ef *EvalFilter) FilterName() string {
return "EvalFilter"
}
func (ev *EvalFilter) Init() error {
idx := slices.IndexFunc(ev.config, func(config filter.Filterconfig) bool { return config.Name == "script" })
ev.filter = evalfilter.New(ev.config[idx].Value)
ev.filter.AddFunction("printf", ev.fnPrintf)
ev.filter.AddFunction("print", ev.fnPrint)
ev.filter.AddFunction("setfield", ev.fnSetField)
ev.filter.AddFunction("clearfield", ev.fnClearField)
ev.filter.AddFunction("setshortmessage", ev.fnSetShortMessage)
ev.filter.AddFunction("setseverity", ev.fnSetSeverity)
if err := ev.filter.Prepare(); err != nil {
llog.Error(err, "Compile Filter Script Failed", "filter", ev.getName())
ev.ready = false
return err
}
llog.V(1).Info("Compile Filter Script Success", "filter", ev.getName())
ev.ready = true
return nil
}
func (ev *EvalFilter) getName() string {
idx := slices.IndexFunc(ev.config, func(config filter.Filterconfig) bool { return config.Name == "name" })
return ev.config[idx].Value
}
func (ev *EvalFilter) fnPrintf(args []object.Object) object.Object {
// We expect 1+ arguments
if len(args) < 1 {
return &object.Null{}
}
// Type-check
if args[0].Type() != object.STRING {
return &object.Null{}
}
// Get the format-string.
fs := args[0].(*object.String).Value
// Convert the arguments to something go's sprintf
// code will understand.
argLen := len(args)
fmtArgs := make([]interface{}, argLen-1)
// Here we convert and assign.
for i, v := range args[1:] {
fmtArgs[i] = v.ToInterface()
}
// Call the helper
out := fmt.Sprintf(fs, fmtArgs...)
llog.Info("Filter Script Output", "filter", ev.getName(), "output", out)
return &object.Void{}
}
func (ev *EvalFilter) fnPrint(args []object.Object) object.Object {
for _, e := range args {
llog.Info("Filter Script Output", "filter", ev.getName(), "Output", e.Inspect())
}
return &object.Void{}
}
func (ev *EvalFilter) fnSetField(args []object.Object) object.Object {
// We expect 2 arguments
if len(args) != 2 {
return &object.Null{}
}
// Type-check
if args[0].Type() != object.STRING {
return &object.Null{}
}
// Get the field name.
fld := args[0].(*object.String).Value
arg := args[0].ToInterface()
llog.Info("Setting Field Value", "filter", ev.getName(), "field", fld, "value", arg)
ev.processedMessage.Body.Fields[fld] = arg
return &object.Void{}
}
func (ev *EvalFilter) fnClearField(args []object.Object) object.Object {
if len(args) != 1 {
return &object.Null{}
}
// Type-check
if args[0].Type() != object.STRING {
return &object.Null{}
}
fld := args[0].(*object.String).Value
llog.Info("Clearing Field Value", "filter", ev.getName(), "field", fld)
if _, ok := ev.processedMessage.Body.Fields[fld]; ok {
delete(ev.processedMessage.Body.Fields, fld)
} else {
llog.Info("Field Not Found", "filter", ev.getName(), "field", fld)
}
return &object.Void{}
}
func (ev *EvalFilter) fnSetShortMessage(arg []object.Object) object.Object {
if len(arg) != 1 {
return &object.Null{}
}
// Type-check
if arg[0].Type() != object.STRING {
return &object.Null{}
}
msg := arg[0].(*object.String).Value
llog.Info("Setting Short Message", "filter", ev.getName(), "message", msg)
ev.processedMessage.Body.ShortMsg = msg
return &object.Void{}
}
func (ev *EvalFilter) fnSetSeverity(arg []object.Object) object.Object {
if len(arg) != 1 {
return &object.Null{}
}
// Type-check
if arg[0].Type() != object.STRING {
return &object.Null{}
}
msg := arg[0].(*object.String).Value
llog.Info("Setting Severity", "filter", ev.getName(), "Severity", msg)
ev.processedMessage.Body.Severity = msg
return &object.Void{}
}
func (ev *EvalFilter) Process(ctx context.Context, msg *msg.Message) (bool, error) {
if !ev.ready {
if err := ev.Init(); err != nil {
llog.Error(err, "Filter Init Failed", "filter", ev.getName())
return true, err
}
}
defer func() {
if err := recover(); err != nil {
llog.Error(err.(error), "Filter Script Error", "filter", ev.getName())
}
}()
ev.processedMessage = msg
ev.filter.SetContext(ctx)
ok, err := ev.filter.Run(msg.Body)
ev.processedMessage = nil
if err != nil {
llog.Info("Filter Run Failed", "filter", ev.getName(), "result", ok, "Error", err)
return true, err
}
llog.V(1).Info("Filter Run Success", "filter", ev.getName(), "result", ok)
return ok, nil
}

View file

@ -2,52 +2,244 @@ package filter
import (
"context"
"embed"
"io/fs"
"path/filepath"
"strings"
//"io/ioutil"
"embed"
"fmt"
"sync"
"time"
"github.com/Fishwaldo/mouthpiece/pkg/db"
"github.com/Fishwaldo/mouthpiece/pkg/interfaces"
"github.com/Fishwaldo/mouthpiece/pkg/log"
"github.com/Fishwaldo/mouthpiece/pkg/message"
"github.com/Fishwaldo/mouthpiece/pkg/msg"
"github.com/go-logr/logr"
"github.com/skx/evalfilter/v2"
"github.com/skx/evalfilter/v2/object"
"gorm.io/gorm"
// "gorm.io/datatypes"
)
//go:embed scripts
var ScriptFiles embed.FS
type FilterType int
var llog logr.Logger
const (
AppFilter = iota
UserFilter
TransportFilter
)
func (ft FilterType) String() string {
return [...]string{"AppFilter", "UserFilter", "TransportFilter"}[ft]
type FilterImplI interface {
FilterName() string
Init() error
Process(context.Context, *msg.Message) (bool, error)
}
type Filter struct {
gorm.Model `json:"-"`
Name string
Content string
Type FilterType
Enabled bool
script *evalfilter.Eval `gorm:"-" json:"-"`
ok bool `gorm:"-"`
processedMessage *msg.Message `gorm:"-" json:"-"`
ID uint `gorm:"primary_key"`
Name string
Type interfaces.FilterType
Enabled bool
FilterImplType string
Config []Filterconfig
lastUsed time.Time `gorm:"-"`
filterImpl FilterImplI `gorm:"-"`
}
var Filters []*Filter
type Filterconfig struct {
ID uint `gorm:"primary_key"`
FilterID uint
Name string
Value string
}
type FilterService struct {
ctx *interfaces.MPContext
fltMutex sync.Mutex
Filters map[string]*Filter
}
func NewFilterService() *FilterService {
llog = log.Log.WithName("filter")
llog.Info("Loading Filters")
fs := &FilterService{
Filters: make(map[string]*Filter),
}
return fs
}
func (fs *FilterService) Start(ctx *interfaces.MPContext) error {
fs.ctx = ctx
db.Db.AutoMigrate(&Filter{}, &Filterconfig{})
scripts := filterFiles("scripts/apps", ".scp")
fs.loadEvalScriptFiles(scripts, interfaces.AppFilter)
scripts = filterFiles("scripts/transports", ".scp")
fs.loadEvalScriptFiles(scripts, interfaces.TransportFilter)
go fs.expireFilters()
return nil
}
func (fs *FilterService) expireFilters() {
for {
select {
case <-fs.ctx.Done():
return
case <-time.After(time.Second * 60):
fs.fltMutex.Lock()
defer fs.fltMutex.Unlock()
for _, flt := range fs.Filters {
if flt.lastUsed.Add(time.Second * 60).Before(time.Now()) {
llog.Info("Expiring Filter", "name", flt.Name, "lastUsed", flt.lastUsed)
delete(fs.Filters, flt.Name)
}
}
}
}
}
func (fs *FilterService) Get(ctx context.Context, name string, scripttype interfaces.FilterType) interfaces.FilterI {
var flt Filter
fs.fltMutex.Lock()
defer fs.fltMutex.Unlock()
if flt, ok := fs.Filters[name]; ok {
if flt.Type == scripttype {
flt.lastUsed = time.Now()
return flt
}
}
tx := db.Db.WithContext(ctx).Preload("Config").Find(&flt, "name = ? and type = ?", name, scripttype)
if tx != nil && tx.Error != gorm.ErrRecordNotFound {
llog.Error(tx.Error, "Finding filter Error", "name", name)
return nil
} else if tx.Error == gorm.ErrRecordNotFound {
return nil
}
var err error
if flt.filterImpl, err = GetNewFilterImpl(ctx, flt.FilterImplType, flt.Config); err != nil {
llog.Error(err, "Failed to create Filter", "name", name)
return nil
}
flt.lastUsed = time.Now()
fs.Filters[name] = &flt
return flt
}
func (fs *FilterService) GetByID(ctx context.Context, id uint, scripttype interfaces.FilterType) interfaces.FilterI {
var flt Filter
fs.fltMutex.Lock()
defer fs.fltMutex.Unlock()
for _, v := range fs.Filters {
if v.ID == id {
v.lastUsed = time.Now()
return v
}
}
tx := db.Db.WithContext(ctx).Preload("Config").Find(&flt, "id = ? and type = ?", id, scripttype)
if tx.Error != nil && tx.Error != gorm.ErrRecordNotFound {
llog.Error(tx.Error, "Finding filter Error", "id", id)
return nil
} else if tx.Error == gorm.ErrRecordNotFound {
return nil
}
var err error
if flt.filterImpl, err = GetNewFilterImpl(ctx, flt.FilterImplType, flt.Config); err != nil {
llog.Error(err, "Failed to create Filter", "name", flt.Name)
return nil
}
flt.lastUsed = time.Now()
fs.Filters[flt.Name] = &flt
return flt
}
func (myfs *FilterService) loadEvalScriptFiles(files []string, scripttype interfaces.FilterType) {
for _, script := range files {
flt := &Filter{}
if tx := db.Db.Find(flt, "name = ? and type = ?", trimFileExtension(filepath.Base(script)), scripttype); tx.RowsAffected == 0 {
llog.V(1).Info("Reading Filter Script from Filesystem", "type", scripttype, "filter", trimFileExtension(filepath.Base(script)))
content, err := fs.ReadFile(ScriptFiles, script)
if err != nil {
llog.Error(err, "Failed to read Filter Script File", "filename", script)
continue
}
//
// Create an evalfilter, with the script inside it.
//
config := []Filterconfig{
{
Name: "script",
Value: string(content),
},
{
Name: "name",
Value: trimFileExtension(filepath.Base(script)),
},
}
if fltimpl, err := GetNewFilterImpl(context.Background(), "EvalFilter", config); err != nil {
llog.Error(err, "Failed to create EvalFilter", "filename", script)
continue
} else {
if err := fltimpl.Init(); err == nil {
llog.Info("Loaded Filter Script ", "type", scripttype, "filter", fltimpl.FilterName())
flt := &Filter{
Type: scripttype,
filterImpl: fltimpl,
FilterImplType: "EvalFilter",
Config: config,
lastUsed: time.Now(),
Enabled: true,
}
flt.Name = flt.GetConfig("name").(string)
myfs.fltMutex.Lock()
myfs.Filters[flt.Name] = flt
myfs.fltMutex.Unlock()
if tx := db.Db.Save(&flt); tx.Error != nil {
llog.Error(tx.Error, "Failed to save Filter into database", "name", flt.Name)
}
} else {
llog.Error(err, "Failed to load Filter Script into database", "type", scripttype, "filter", fltimpl.FilterName())
}
}
} else {
llog.Info("Filter Script already loaded into Database", "type", scripttype, "filter", trimFileExtension(filepath.Base(script)))
}
}
}
func (f Filter) GetID() uint {
return f.ID
}
func (f Filter) GetName() string {
return f.Name
}
func (f Filter) ProcessMessage(ctx context.Context, msg *msg.Message) (bool, error) {
if f.Config == nil {
if tx := db.Db.WithContext(ctx).Find(&f.Config, "filter_id = ? ", f.ID); tx.Error != nil {
return false, tx.Error
}
}
var err error
if f.filterImpl == nil {
if f.filterImpl, err = GetNewFilterImpl(ctx, f.FilterImplType, f.Config); err != nil {
return true, err
}
}
return f.filterImpl.Process(ctx, msg)
}
func (fs Filter) GetConfig(item string) interface{} {
for _, v := range fs.Config {
if strings.EqualFold(v.Name, item) {
return v.Value
}
}
return nil
}
func trimFileExtension(fileName string) string {
return strings.TrimSuffix(fileName, filepath.Ext(fileName))
}
func filterFiles(root, ext string) []string {
var a []string
@ -63,211 +255,3 @@ func filterFiles(root, ext string) []string {
})
return a
}
func trimFileExtension(fileName string) string {
return strings.TrimSuffix(fileName, filepath.Ext(fileName))
}
func loadScriptFiles(files []string, scripttype FilterType) {
for _, script := range files {
/* first see if this script exists in the Database first */
var flt *Filter
tx := db.Db.Where("name = ? and type = ?", trimFileExtension(filepath.Base(script)), scripttype).First(&flt)
if tx.RowsAffected == 0 {
llog.V(1).Info("Reading Filter Script from Filesystem", "type", scripttype, "filter", trimFileExtension(filepath.Base(script)))
content, err := fs.ReadFile(ScriptFiles, script)
if err != nil {
llog.Error(err, "Failed to read Filter Script File", "filename", script)
continue
}
//
// Create an evalfilter, with the script inside it.
//
flt = &Filter{
Name: trimFileExtension(filepath.Base(script)),
Content: string(content),
Type: scripttype,
}
} else {
llog.V(1).Info("Loading Filter Script from Databse", "type", scripttype, "filter", flt.Name)
}
if err := flt.SetupEvalFilter(); err == nil {
llog.Info("Loaded Filter Script ", "type", scripttype, "filter", flt.Name)
Filters = append(Filters, flt)
db.Db.Save(flt)
} else {
llog.Error(err, "Failed to load Filter Script", "type", scripttype, "filter", flt.Name)
}
}
}
func InitFilter() {
llog = log.Log.WithName("filter")
db.Db.AutoMigrate(&Filter{})
Filters = make([]*Filter, 0)
scripts := filterFiles("scripts/apps", ".scp")
loadScriptFiles(scripts, AppFilter)
scripts = filterFiles("scripts/transports", ".scp")
loadScriptFiles(scripts, TransportFilter)
}
func FindFilter(name string) *Filter {
for _, flt := range Filters {
if flt.Name == name {
return flt
}
}
return nil
}
func (ev *Filter) fnPrintf(args []object.Object) object.Object {
// We expect 1+ arguments
if len(args) < 1 {
return &object.Null{}
}
// Type-check
if args[0].Type() != object.STRING {
return &object.Null{}
}
// Get the format-string.
fs := args[0].(*object.String).Value
// Convert the arguments to something go's sprintf
// code will understand.
argLen := len(args)
fmtArgs := make([]interface{}, argLen-1)
// Here we convert and assign.
for i, v := range args[1:] {
fmtArgs[i] = v.ToInterface()
}
// Call the helper
out := fmt.Sprintf(fs, fmtArgs...)
llog.Info("Filter Script Output", "filter", ev.Name, "output", out)
return &object.Void{}
}
func (ev *Filter) fnPrint(args []object.Object) object.Object {
for _, e := range args {
llog.Info("Filter Script Output", "filter", ev.Name, "Output", e.Inspect())
}
return &object.Void{}
}
func (ev *Filter) fnSetField(args []object.Object) object.Object {
// We expect 2 arguments
if len(args) != 2 {
return &object.Null{}
}
// Type-check
if args[0].Type() != object.STRING {
return &object.Null{}
}
// Get the field name.
fld := args[0].(*object.String).Value
arg := args[0].ToInterface()
llog.Info("Setting Field Value", "filter", ev.Name, "field", fld, "value", arg)
ev.processedMessage.Body.Fields[fld] = arg
return &object.Void{}
}
func (ev *Filter) fnClearField(args []object.Object) object.Object {
if len(args) != 1 {
return &object.Null{}
}
// Type-check
if args[0].Type() != object.STRING {
return &object.Null{}
}
fld := args[0].(*object.String).Value
llog.Info("Clearing Field Value", "filter", ev.Name, "field", fld)
if _, ok := ev.processedMessage.Body.Fields[fld]; ok {
delete(ev.processedMessage.Body.Fields, fld)
} else {
llog.Info("Field Not Found", "filter", ev.Name, "field", fld)
}
return &object.Void{}
}
func (ev *Filter) fnSetShortMessage(arg []object.Object) object.Object {
if len(arg) != 1 {
return &object.Null{}
}
// Type-check
if arg[0].Type() != object.STRING {
return &object.Null{}
}
msg := arg[0].(*object.String).Value
llog.Info("Setting Short Message", "filter", ev.Name, "message", msg)
ev.processedMessage.Body.ShortMsg = msg
return &object.Void{}
}
func (ev *Filter) fnSetSeverity(arg []object.Object) object.Object {
if len(arg) != 1 {
return &object.Null{}
}
// Type-check
if arg[0].Type() != object.STRING {
return &object.Null{}
}
msg := arg[0].(*object.String).Value
llog.Info("Setting Severity", "filter", ev.Name, "Severity", msg)
ev.processedMessage.Body.Severity = msg
return &object.Void{}
}
func (ev *Filter) ProcessMessage(ctx context.Context, msg *msg.Message) (bool, error) {
if !ev.Enabled {
return true, nil
}
defer func() {
if err := recover(); err != nil {
llog.Error(err.(error), "Filter Script Error", "filter", ev.Name)
}
}()
if !ev.ok {
llog.Info("Filter Script Not ready", "filter", ev.Name)
return true, nil
}
ev.processedMessage = msg
ev.script.SetContext(ctx)
ok, err := ev.script.Run(msg.Body)
ev.processedMessage = nil
if err != nil {
llog.Info("Filter Run Failed", "filter", ev.Name, "result", ok, "Error", err)
return true, err
}
llog.V(1).Info("Filter Run Success", "filter", ev.Name, "result", ok)
return ok, nil
}
func (ev *Filter) SetupEvalFilter() error {
//
// Create an evaluator, with the script inside it.
//
ev.script = evalfilter.New(ev.Content)
llog.V(1).Info("Filter Script Content", "filter", ev.Name, "content", ev.Content)
ev.script.AddFunction("printf", ev.fnPrintf)
ev.script.AddFunction("print", ev.fnPrint)
ev.script.AddFunction("setfield", ev.fnSetField)
ev.script.AddFunction("clearfield", ev.fnClearField)
ev.script.AddFunction("setshortmessage", ev.fnSetShortMessage)
ev.script.AddFunction("setseverity", ev.fnSetSeverity)
if err := ev.script.Prepare(); err != nil {
llog.Error(err, "Compile Filter Script Failed", "filter", ev.Name)
ev.ok = false
return err
}
llog.V(1).Info("Compile Filter Script Success", "filter", ev.Name)
ev.ok = true
return nil
}

View file

@ -0,0 +1,40 @@
package filter
import (
"context"
"errors"
"fmt"
)
type FilterFactoryFN func(context.Context, []Filterconfig) (FilterImplI, error)
type FilterImplDetails struct {
Factory FilterFactoryFN
}
var (
FilterImpl map[string]FilterImplDetails
)
func RegisterFilterImpl(name string, factory FilterImplDetails) {
fmt.Println("RegisterFilterImpl", name)
if FilterImpl == nil {
FilterImpl = make(map[string]FilterImplDetails)
}
FilterImpl[name] = factory
}
func GetNewFilterImpl(ctx context.Context, name string, config []Filterconfig) (FilterImplI, error) {
if flt, ok := FilterImpl[name]; ok {
return flt.Factory(ctx, config)
}
return nil, errors.New("FilterImpl Not Found")
}
func GetFilterImpls(ctx context.Context) []string {
var a []string
for k := range FilterImpl {
a = append(a, k)
}
return a
}

View file

@ -8,11 +8,29 @@ import (
)
type Group struct {
ID uint `gorm:"primary_key"`
Name string `gorm:"not null"`
Apps []uint `gorm:"many2many:group_apps"`
Users []uint `gorm:"many2many:group_users"`
Transports []uint `gorm:"many2many:group_apps"`
ID uint `gorm:"primary_key"`
Name string `gorm:"not null"`
Apps []GroupAppMember `gorm:"many2many:group_apps"`
Users []GroupUserMember `gorm:"many2many:group_users"`
Transports []GroupTransportMember `gorm:"many2many:group_transports"`
}
type GroupAppMember struct {
ID uint `gorm:"primary_key"`
GroupID uint
AppID uint
}
type GroupUserMember struct {
ID uint `gorm:"primary_key"`
GroupID uint
UserID uint
}
type GroupTransportMember struct {
ID uint `gorm:"primary_key"`
GroupID uint
TransportID uint
}
func (g Group) GetID() uint {
@ -25,25 +43,48 @@ func (g Group) SetName(name string) error {
return db.Db.Model(&g).Update("Name", name).Error
}
func (g Group) GetApps() []uint {
return g.Apps
apps := []uint{}
for _, app := range g.Apps {
apps = append(apps, app.AppID)
}
return apps
}
func (g Group) GetUsers() []uint {
return g.Users
users := []uint{}
for _, user := range g.Users {
users = append(users, user.UserID)
}
return users
}
func (g Group) GetTransports() []uint {
return g.Transports
tps := []uint{}
for _, tp := range g.Transports {
tps = append(tps, tp.TransportID)
}
return tps
}
func (g Group) AddAppToGroup(ctx context.Context, app uint) bool {
tx := db.Db.Model(&g).Association("Apps").Append(app)
log.Log.Info("Adding App to Group", "app", app, "group", g.GetID())
member := &GroupAppMember{
GroupID: g.GetID(),
AppID: app,
}
tx := db.Db.Model(&g).Association("Apps").Append(member)
if tx != nil {
log.Log.Error(tx, "Error Adding App to Group", "app", app, "group", g.GetID())
return false
}
g.Apps = append(g.Apps, *member)
return true
}
func (g Group) DelAppFromGroup(ctx context.Context, app uint) bool {
tx := db.Db.Model(&g).Association("Apps").Delete(app)
log.Log.Info("Deleting App from Group", "app", app, "group", g.GetID())
member := &GroupAppMember{
GroupID: g.GetID(),
AppID: app,
}
tx := db.Db.Model(&g).Association("Apps").Delete(member)
if tx != nil {
log.Log.Error(tx, "Error Deleting App from Group", "app", app, "group", g.GetID())
return false
@ -52,16 +93,27 @@ func (g Group) DelAppFromGroup(ctx context.Context, app uint) bool {
}
func (g Group) AddUserToGroup(ctx context.Context, user uint) bool {
tx := db.Db.Model(&g).Association("Users").Append(user)
log.Log.Info("Adding User to Group", "user", user, "group", g.GetID())
member := &GroupUserMember{
GroupID: g.GetID(),
UserID: user,
}
tx := db.Db.Model(&g).Association("Users").Append(member)
if tx != nil {
log.Log.Error(tx, "Error Adding User to Group", "user", user, "group", g.GetID())
return false
}
g.Users = append(g.Users, *member)
return true
}
func (g Group) DelUserFromGroup(ctx context.Context, user uint) bool {
tx := db.Db.Model(&g).Association("Users").Delete(user)
log.Log.Info("Deleting User from Group", "user", user, "group", g.GetID())
member := &GroupUserMember{
GroupID: g.GetID(),
UserID: user,
}
tx := db.Db.Model(&g).Association("Users").Delete(member)
if tx != nil {
log.Log.Error(tx, "Error Deleting User from Group", "user", user, "group", g.GetID())
return false
@ -70,16 +122,27 @@ func (g Group) DelUserFromGroup(ctx context.Context, user uint) bool {
}
func (g Group) AddTransportToGroup(ctx context.Context, tid uint) bool {
tx := db.Db.Model(&g).Association("Transports").Append(tid)
log.Log.Info("Adding Transport to Group", "tid", tid, "group", g.GetID())
member := &GroupTransportMember{
GroupID: g.GetID(),
TransportID: tid,
}
tx := db.Db.Model(&g).Association("Transports").Append(member)
if tx != nil {
log.Log.Error(tx, "Error Adding Transport to Group", "tid", tid, "group", g.GetID())
return false
}
g.Transports = append(g.Transports, *member)
return true
}
func (g Group) DelTransportFromGroup(ctx context.Context, tid uint) bool {
tx := db.Db.Model(&g).Association("Transports").Delete(tid)
log.Log.Info("Deleting Transport from Group", "tid", tid, "group", g.GetID())
member := &GroupTransportMember{
GroupID: g.GetID(),
TransportID: tid,
}
tx := db.Db.Model(&g).Association("Transports").Delete(member)
if tx != nil {
log.Log.Error(tx, "Error Deleting Transport from Group", "tid", tid, "group", g.GetID())
return false

View file

@ -6,18 +6,28 @@ import (
"github.com/Fishwaldo/mouthpiece/pkg/db"
"github.com/Fishwaldo/mouthpiece/pkg/interfaces"
"github.com/Fishwaldo/mouthpiece/pkg/log"
"github.com/Fishwaldo/mouthpiece/pkg/message"
"github.com/Fishwaldo/mouthpiece/pkg/msg"
"github.com/jinzhu/copier"
)
type GroupsService struct {
ctx *interfaces.MPContext
}
func NewGroupsService() *GroupsService {
db.Db.Debug().AutoMigrate(&Group{})
return &GroupsService{}
return &GroupsService{
}
}
func (gs *GroupsService) Start(ctx *interfaces.MPContext) error {
gs.ctx = ctx
db.Db.AutoMigrate(&Group{}, &GroupAppMember{}, &GroupUserMember{}, &GroupTransportMember{})
return nil
}
func (gs GroupsService) CreateGroup(ctx context.Context, name string) (interfaces.GroupI, error) {
log.Log.Info("Group: Creating Group", "name", name)
var group = Group{
Name: name,
}
@ -29,7 +39,8 @@ func (gs GroupsService) CreateGroup(ctx context.Context, name string) (interface
return group, tx.Error
}
func (gs GroupsService) DeleteGroup(ctx context.Context, group Group) error {
func (gs GroupsService) DeleteGroup(ctx context.Context, group interfaces.GroupI) error {
log.Log.Info("Group: Deleting Group", "name", group.GetName())
tx := db.Db.WithContext(ctx).First(&group, group.GetID())
if tx.Error != nil {
return tx.Error
@ -38,36 +49,56 @@ func (gs GroupsService) DeleteGroup(ctx context.Context, group Group) error {
return tx.Error
}
func (gs GroupsService) GetGroup(ctx context.Context, id uint) (interfaces.GroupI, error) {
func (gs GroupsService) GetGroupByID(ctx context.Context, id uint) (interfaces.GroupI, error) {
var group Group
tx := db.Db.WithContext(ctx).First(&group, id)
tx := db.Db.WithContext(ctx).Preload("Apps").Preload("Users").Preload("Transports").Find(&group, id)
return group, tx.Error
}
func (g GroupsService) GetGroupsForApp(ctx context.Context, app uint) ([]interfaces.GroupI, error) {
var groups []Group
tx := db.Db.WithContext(ctx).Where("apps = ?", app).Find(&groups)
var groupList []interfaces.GroupI
for _, group := range groups {
groupList = append(groupList, group)
}
return groupList, tx.Error
func (gs GroupsService) GetGroup(ctx context.Context, name string) (interfaces.GroupI, error) {
var group Group
tx := db.Db.WithContext(ctx).Preload("Apps").Preload("Users").Preload("Transports").First(&group, "name = ?", name)
return group, tx.Error
}
func (g GroupsService) GetGroupsForUser(ctx context.Context, id uint) ([]interfaces.GroupI, error) {
func (g GroupsService) GetGroupsForApp(ctx context.Context, app interfaces.AppI) ([]interfaces.GroupI, error) {
var groups []Group
tx := db.Db.WithContext(ctx).Where("users = ?", id).Find(&groups)
tx := db.Db.WithContext(ctx).Preload("Apps", "id = ?", app.GetID()).Find(&groups).Error
var groupList []interfaces.GroupI
for _, group := range groups {
groupList = append(groupList, group)
gp, _ := g.GetGroupByID(ctx, group.GetID())
groupList = append(groupList, gp)
}
return groupList, tx.Error
return groupList, tx
}
func (g GroupsService) SendMessageToUsers(ctx context.Context, msg msg.Message, appid uint, sender interfaces.UserSender) error {
func (g GroupsService) GetGroupsForUser(ctx context.Context, user interfaces.UserI) ([]interfaces.GroupI, error) {
var groups []Group
tx := db.Db.WithContext(ctx).Preload("Users", "id = ?", user.GetID()).Find(&groups).Error
var groupList []interfaces.GroupI
for _, group := range groups {
gp, _ := g.GetGroupByID(ctx, group.GetID())
groupList = append(groupList, gp)
}
return groupList, tx
}
func (g GroupsService) GetGroupsForTransport(ctx context.Context, tid uint) ([]interfaces.GroupI, error) {
var groups []Group
tx := db.Db.WithContext(ctx).Preload("Transports", "id = ?", tid).Find(&groups).Error
var groupList []interfaces.GroupI
for _, group := range groups {
gp, _ := g.GetGroupByID(ctx, group.GetID())
groupList = append(groupList, gp)
}
return groupList, tx
}
func (g GroupsService) SendMessageToUsers(ctx context.Context, sendmsg *msg.Message, app interfaces.AppI) error {
log.Log.V(1).Info("Group: Sending Message to Users", "app", app.GetName())
var sendto []uint
if groups, err := g.GetGroupsForApp(ctx, appid); err != nil {
log.Log.Error(err, "Error Getting Groups for App", "appid", appid)
if groups, err := g.GetGroupsForApp(ctx, app); err != nil {
log.Log.Error(err, "Error Getting Groups for App", "app", app.GetName())
return err
} else {
for _, group := range groups {
@ -75,18 +106,27 @@ func (g GroupsService) SendMessageToUsers(ctx context.Context, msg msg.Message,
}
}
sendto = removeDuplicate(sendto)
log.Log.V(1).Info("Group: Sending Message to Users", "app", app.GetName(), "userid", sendto)
for _, userid := range sendto {
if err := sender(ctx, msg, userid); err != nil {
log.Log.Error(err, "Error Sending Message to User", "userid", userid)
if user, err := g.ctx.GetUserService().GetUser(ctx, userid); err != nil {
log.Log.Error(err, "Error Getting User", "userid", userid)
} else {
var usrmsg msg.Message
usrmsg.Body.Fields = make(map[string]interface{})
copier.Copy(usrmsg, sendmsg)
if err := user.ProcessMessage(ctx, usrmsg); err != nil {
log.Log.Error(err, "Error Processing Message for User", "userid", userid)
}
}
}
return nil
}
func (g GroupsService) SendMessageToTransports(ctx context.Context, msg msg.Message, appid uint, sender interfaces.UserSender) error {
func (g GroupsService) SendMessageToTransports(ctx context.Context, sendmsg *msg.Message, app interfaces.AppI) error {
log.Log.V(1).Info("Group: Sending Message to Transports", "app", app.GetName())
var sendto []uint
if groups, err := g.GetGroupsForApp(ctx, appid); err != nil {
log.Log.Error(err, "Error Getting Groups for App", "appid", appid)
if groups, err := g.GetGroupsForApp(ctx, app); err != nil {
log.Log.Error(err, "Error Getting Groups for App", "app", app.GetName())
return err
} else {
for _, group := range groups {
@ -94,9 +134,17 @@ func (g GroupsService) SendMessageToTransports(ctx context.Context, msg msg.Mess
}
}
sendto = removeDuplicate(sendto)
for _, userid := range sendto {
if err := sender(ctx, msg, userid); err != nil {
log.Log.Error(err, "Error Sending Message to Transports", "userid", userid)
log.Log.V(1).Info("Group: Sending Message to Transports", "app", app.GetName(), "transportid", sendto)
for _, tid := range sendto {
if transport, err := g.ctx.GetTransportService().GetTransportReciepient(ctx, tid); err != nil {
log.Log.Error(err, "Error Getting Transport", "transportid", tid)
} else {
var usrmsg msg.Message
usrmsg.Body.Fields = make(map[string]interface{})
copier.Copy(usrmsg, sendmsg)
if err := transport.ProcessGroupMessage(ctx, usrmsg); err != nil {
log.Log.Error(err, "Error Processing Message for Group Transport", "transportid", tid)
}
}
}
return nil

View file

@ -2,11 +2,12 @@ package interfaces
import (
"context"
msg "github.com/Fishwaldo/mouthpiece/pkg/message"
"github.com/Fishwaldo/mouthpiece/pkg/msg"
"github.com/go-logr/logr"
)
type AppI interface {
SetSvcCtx(*MPContext)
ProcessMessage(context.Context, UserServicierI, *msg.Message) error
GetName() string
SetName(string) error
@ -19,18 +20,22 @@ type AppI interface {
SetIcon(string) error
GetURL() string
SetURL(string) error
AddFilter(ctx context.Context, filter FilterI) error
DelFilter(ctx context.Context, filter FilterI) error
}
type AppServiceI interface {
GetApps(context.Context) map[uint]AppI
Start(*MPContext) error
GetApps(context.Context) ([]AppDetails, error)
GetApp(context.Context, uint) (AppI, error)
GetAppByName(context.Context, string) (AppI, error)
GetAppObj(context.Context, AppDetails) (AppI, error)
CreateApp(context.Context, AppDetails) (AppI, error)
}
type UserI interface {
GetID() uint
ProcessMessage(context.Context, *msg.Message) error
ProcessMessage(context.Context, msg.Message) error
CheckPassword(context.Context, string) bool
SetPassword(context.Context, string) error
AddRoleToUser(context.Context, string) bool
@ -45,6 +50,7 @@ type UserI interface {
}
type UserServicierI interface {
Start(*MPContext) error
GetUser(context.Context, uint) (UserI, error)
GetUserByEmail(context.Context, string) (UserI, error)
CreateUser(context.Context, UserDetails) (UserI, error)
@ -69,7 +75,70 @@ type GroupI interface {
}
type GroupServiceI interface {
GetGroup(context.Context, uint) (GroupI, error)
SendMessageToUsers(ctx context.Context, msg msg.Message, appid uint, sender UserSender) error
SendMessageToTransports(ctx context.Context, msg msg.Message, appid uint, sender UserSender) error
Start(*MPContext) error
CreateGroup(ctx context.Context, name string) (GroupI, error)
DeleteGroup(ctx context.Context, group GroupI) error
GetGroupByID(context.Context, uint) (GroupI, error)
GetGroup(context.Context, string) (GroupI, error)
SendMessageToUsers(ctx context.Context, msg *msg.Message, app AppI) error
SendMessageToTransports(ctx context.Context, msg *msg.Message, app AppI) error
}
type FilterI interface {
GetID() uint
GetName() string
ProcessMessage(context.Context, *msg.Message) (bool, error)
GetConfig(item string) interface{}
}
type FilterServiceI interface {
Start(*MPContext) error
Get(context.Context, string, FilterType) FilterI
GetByID(context.Context, uint, FilterType) FilterI
}
type TransportProvider interface {
Start(*MPContext, logr.Logger) error
GetName() string
CreateInstance(context.Context, any) (TransportInstance, error)
DeleteInstance(context.Context, TransportInstance) error
GetInstance(context.Context, uint) (TransportInstance, error)
GetInstanceByName(context.Context, string) (TransportInstance, error)
GetInstances(context.Context) ([]uint, error)
}
type TransportInstance interface {
Start(*MPContext) error
Stop (context.Context) error
GetName() string
GetID() uint
CreateGroupTransportRecipient(context.Context, uint, GroupI, any) (TransportRecipient, error)
CreateUserTransportRecipient(context.Context, uint, UserI, any) (TransportRecipient, error)
DeleteTransportRecipient(context.Context, TransportRecipient) error
GetTransportRecipientByTransportID(context.Context, uint) (TransportRecipient, error)
GetTransportReciepients(context.Context) ([]uint, error)
}
type TransportRecipient interface {
ProcessGroupMessage(context.Context, msg.Message) error
ProcessMessage(context.Context, msg.Message) error
GetID() uint
GetGroupID() uint
GetUserID() uint
}
type TransportServiceI interface {
Start(*MPContext) error
CreateTransportInstance(context.Context, string, any) (TransportInstance, error)
GetTransportInstances(context.Context) ([]uint, error)
GetTransportInstance(context.Context, uint) (TransportInstance, error)
GetTransportInstanceByName(context.Context, string) (TransportInstance, error)
DeleteTransportInstance(context.Context, TransportInstance) error
CreateGroupTransportRecipient(context.Context, TransportInstance, GroupI, any) (TransportRecipient, error)
CreateUserTransportRecipient(context.Context, TransportInstance, UserI, any) (TransportRecipient, error)
DeleteTransportRecipient(context.Context, TransportRecipient) error
GetTransportReciepientsForGroup(context.Context, GroupI) ([]uint, error)
GetTransportReciepientsForUser(context.Context, UserI) ([]uint, error)
GetTransportReciepients(context.Context) ([]uint, error)
GetTransportReciepient(context.Context, uint) (TransportRecipient, error)
}

View file

@ -1,5 +1,11 @@
package interfaces
import (
"context"
"github.com/Fishwaldo/mouthpiece/pkg/log"
)
//CtxUserValue Context Key to get token.User value from Context
type CtxUserValue struct{}
@ -12,6 +18,11 @@ type AppDetails struct {
URL string `doc:"URL of Application" validate:"url"`
}
// TableName overrides the table name used by UserDetails to `users`
func (AppDetails) TableName() string {
return "apps"
}
type UserDetails struct {
ID uint `doc:"User ID" gorm:"primary_key"`
Email string `doc:"Email" validate:"required,email"`
@ -19,3 +30,86 @@ type UserDetails struct {
LastName string `doc:"Last Name" validate:"required"`
Password string `doc:"Password" json:"-" writeOnly:"true" validate:"required"`
}
// TableName overrides the table name used by UserDetails to `users`
func (UserDetails) TableName() string {
return "users"
}
const (
InvalidFilter = iota
AppFilter
UserFilter
TransportFilter
)
type FilterType int
func (ft FilterType) String() string {
return [...]string{"InvalidFilter", "AppFilter", "UserFilter", "TransportFilter"}[ft]
}
type MPContext struct {
context.Context
}
type ctxKey struct {
key string
}
var (
userctxKey = ctxKey{key: "user"}
appctxKey = ctxKey{key: "app"}
groupctxKey = ctxKey{key: "group"}
filterctxKey = ctxKey{key: "filter"}
transportctxKey = ctxKey{key: "transport"}
)
func NewContext(ctx context.Context) *MPContext {
return &MPContext{ctx}
}
func (c *MPContext) SetUserService(usersvc UserServicierI) {
c.Context = context.WithValue(c.Context, userctxKey, usersvc)
}
func (c *MPContext) GetUserService() UserServicierI {
return c.Context.Value(userctxKey).(UserServicierI)
}
func (c *MPContext) SetAppService(appsvc AppServiceI) {
c.Context = context.WithValue(c.Context, appctxKey, appsvc)
}
func (c *MPContext) GetAppService() AppServiceI {
return c.Context.Value(appctxKey).(AppServiceI)
}
func (c *MPContext) SetGroupService(groupsvc GroupServiceI) {
c.Context = context.WithValue(c.Context, groupctxKey, groupsvc)
}
func (c *MPContext) GetGroupService() GroupServiceI {
return c.Context.Value(groupctxKey).(GroupServiceI)
}
func (c *MPContext) SetFilterService(filtersvc FilterServiceI) {
c.Context = context.WithValue(c.Context, filterctxKey, filtersvc)
}
func (c *MPContext) GetFilterService() FilterServiceI {
if c.Context.Value((filterctxKey)).(FilterServiceI) != nil {
return c.Context.Value((filterctxKey)).(FilterServiceI)
} else {
log.Log.Error(nil, "Filter service is nil")
return nil
}
}
func (c *MPContext) SetTransportService(tps TransportInstance) {
c.Context = context.WithValue(c.Context, transportctxKey, tps)
}
func (c *MPContext) GetTransportService() TransportServiceI {
return c.Context.Value(transportctxKey).(TransportServiceI)
}

View file

@ -1,40 +1,124 @@
package mouthpiece
import (
"context"
"github.com/Fishwaldo/mouthpiece/pkg/apps"
"github.com/Fishwaldo/mouthpiece/pkg/db"
"github.com/Fishwaldo/mouthpiece/pkg/filter"
"github.com/Fishwaldo/mouthpiece/pkg/groups"
"github.com/Fishwaldo/mouthpiece/pkg/interfaces"
"github.com/Fishwaldo/mouthpiece/pkg/log"
"github.com/Fishwaldo/mouthpiece/pkg/msg"
"github.com/Fishwaldo/mouthpiece/pkg/transport"
"github.com/Fishwaldo/mouthpiece/pkg/users"
_ "github.com/Fishwaldo/mouthpiece/pkg/filter/evalfilter"
"github.com/Fishwaldo/mouthpiece/pkg/transport/console"
"github.com/go-logr/logr"
"gorm.io/gorm"
)
type MouthPiece struct {
appService interfaces.AppServiceI
userService interfaces.UserServicierI
groupService interfaces.GroupServiceI
db *gorm.DB
log logr.Logger
serviceContext *interfaces.MPContext
appService interfaces.AppServiceI
userService interfaces.UserServicierI
groupService interfaces.GroupServiceI
filterService interfaces.FilterServiceI
transportService interfaces.TransportServiceI
db *gorm.DB
log logr.Logger
}
func NewMouthPiece(dbconn gorm.Dialector, logger logr.Logger) *MouthPiece {
func NewMouthPiece(svcctx *interfaces.MPContext, dbconn gorm.Dialector, logger logr.Logger) *MouthPiece {
mp := MouthPiece{
log: log.InitLogger(logger),
db: db.Initialize(dbconn),
userService: users.NewUsersService(),
appService: apps.NewAppsService(),
groupService: groups.NewGroupsService(),
serviceContext: svcctx,
log: log.InitLogger(logger),
db: db.Initialize(dbconn),
userService: users.NewUsersService(),
appService: apps.NewAppsService(),
groupService: groups.NewGroupsService(),
filterService: filter.NewFilterService(),
transportService: transport.NewTransportService(),
}
mp.serviceContext.SetAppService(mp.appService)
mp.serviceContext.SetUserService(mp.userService)
mp.serviceContext.SetGroupService(mp.groupService)
mp.serviceContext.SetFilterService(mp.filterService)
msg.InitializeMessage()
return &mp
}
func (mp MouthPiece) Start() error {
mp.userService.Start(mp.serviceContext)
mp.appService.Start(mp.serviceContext)
mp.groupService.Start(mp.serviceContext)
mp.filterService.Start(mp.serviceContext)
mp.transportService.Start(mp.serviceContext)
return nil
}
func (mp MouthPiece) GetAppService() interfaces.AppServiceI {
return mp.appService
}
func (mp MouthPiece) GetUserService() interfaces.UserServicierI {
return mp.userService
}
func (mp MouthPiece) SeedMouthPieceApp(ctx context.Context) error {
if _, err := mp.appService.GetAppByName(ctx, "MouthPiece"); err != nil {
log.Log.Info("Creating Default Users")
admind := interfaces.UserDetails{FirstName: "Admin", LastName: "User", Email: "admin@example.com", Password: "password"}
var admin interfaces.UserI
var user interfaces.UserI
if admin, err = mp.userService.CreateUser(context.Background(), admind); err == nil {
admin.AddRoleToUser(context.Background(), "admin")
log.Log.Info("Created Default Admin admin@example.com")
}
userd := interfaces.UserDetails{FirstName: "User", LastName: "User", Email: "user@example.com", Password: "password"}
if user, err = mp.userService.CreateUser(context.Background(), userd); err == nil {
log.Log.Info("Created Default User user@example.com")
}
/* create console output */
consoleconfig := console.ConsoleInstanceConfig{}
consoleconfig.Name = "StdOut"
consoleconfig.Description = "Standard Output"
consoleinstance, err := mp.transportService.CreateTransportInstance(context.Background(), "console", consoleconfig)
if err != nil {
log.Log.Error(err, "Error Creating StdOut Console Instance")
}
/* create console reciptient for admin */
app, _ := mp.appService.CreateApp(mp.serviceContext, interfaces.AppDetails{
AppName: "MouthPiece",
Status: "Enabled",
Description: "Internal MouthPiece App",
Icon: "https://www.google.com/images/branding/googlelogo/2x/googlelogo_color_272x92dp.png",
URL: "https://github.com/Fishwaldo/mouthpiece",
})
flt := mp.filterService.Get(ctx, "CopyShortMessage", interfaces.AppFilter)
app.AddFilter(mp.serviceContext, flt)
if grp, err := mp.groupService.CreateGroup(mp.serviceContext, "MouthPiece"); err != nil {
log.Log.Error(err, "Error creating MouthPiece group")
} else {
grp.AddAppToGroup(mp.serviceContext, app.GetID())
grp.AddUserToGroup(mp.serviceContext, admin.GetID())
grp.AddUserToGroup(mp.serviceContext, user.GetID())
reciptientconfig := &console.ConsoleRecipientConfig{}
reciptientconfig.UserID = admin.GetID()
reciptientconfig.Name = "Group Stdout"
reciptientconfig.Description = "Group Stdout Console Reciptient"
atr, err := mp.transportService.CreateGroupTransportRecipient(context.Background(),consoleinstance, grp, reciptientconfig)
if err != nil {
log.Log.Error(err, "Error Creating StdOut Console Reciptient")
}
grp.AddTransportToGroup(mp.serviceContext, atr.GetID())
}
}
return nil
}

14
pkg/mperror/errors.go Normal file
View file

@ -0,0 +1,14 @@
package mperror
import (
"errors"
)
var (
ErrAppExists = errors.New("App Already Exists")
ErrAppNotFound = errors.New("App Not Found")
ErrUserNotFound = errors.New("User Not Found")
ErrFilterNotFound = errors.New("Filter Not Found")
ErrTransportInstanceNotFound = errors.New("Transport Instance Not Found")
ErrTransportReciepiantNotFound = errors.New("Transport Recipient Not Found")
)

View file

@ -2,8 +2,8 @@ package msg
import (
"github.com/Fishwaldo/mouthpiece/pkg/db"
"github.com/Fishwaldo/mouthpiece/pkg/errors"
"github.com/Fishwaldo/mouthpiece/pkg/log"
"github.com/Fishwaldo/mouthpiece/pkg/mperror"
"gorm.io/gorm"
"time"
)
@ -32,6 +32,14 @@ type MessageResult struct {
Status string `json:"status" doc:"Status of Message"`
}
func NewMessage(appname string) *Message {
msg := &Message{
AppName: appname,
}
msg.Body.Fields = make(map[string]interface{})
return msg
}
func (msg *Message) ProcessMessage() (err error) {
if len(msg.AppName) == 0 {
return mperror.ErrAppNotFound

View file

@ -3,9 +3,10 @@ package mouthpiece
import (
"context"
"github.com/Fishwaldo/mouthpiece/pkg/errors"
"github.com/Fishwaldo/mouthpiece/pkg/log"
"github.com/Fishwaldo/mouthpiece/pkg/message"
"github.com/Fishwaldo/mouthpiece/pkg/mperror"
"github.com/Fishwaldo/mouthpiece/pkg/msg"
)
func (mp MouthPiece) RouteMessage(ctx context.Context, msg *msg.Message) {
@ -14,33 +15,10 @@ func (mp MouthPiece) RouteMessage(ctx context.Context, msg *msg.Message) {
log.Log.Info("App Processing Message Failed", "App", app.GetName(), "Message", msg, "Error", err)
return
} else {
if err := mp.groupService.SendMessageToUsers(ctx, *msg, app.GetID(), mp.userSender); err != nil {
log.Log.Error(err, "Failed to send message to users", "App", app.GetName(), "Message", msg)
}
if err := mp.groupService.SendMessageToTransports(ctx, *msg, app.GetID(), mp.transportSender); err != nil {
log.Log.Error(err, "Failed to send message to Transports", "App", app.GetName(), "Message", msg)
}
log.Log.Info("App Processing Message Success", "App", app.GetName(), "Message", msg)
}
} else {
log.Log.Error(mperror.ErrAppNotFound, "App Not Found", "App", msg.AppName)
}
}
func (mp MouthPiece) transportSender(ctx context.Context, msg msg.Message, tid uint) error {
log.Log.Info("Sending Message to Transport", "Message", msg, "Transport", tid)
return nil
}
func (mp MouthPiece) userSender(ctx context.Context, msg msg.Message, uid uint) error {
if user, err := mp.userService.GetUser(ctx, uid); err != nil {
log.Log.Error(err, "Failed to get userID for userSender", "User", uid)
return err
} else {
if err := user.ProcessMessage(ctx, &msg); err != nil {
log.Log.Error(err, "Failed to process message for userSender", "User", user.GetID(), "Message", msg)
} else {
log.Log.Info("Sending Message to User", "Message", msg, "User", user.GetID())
}
}
return nil
}

View file

@ -0,0 +1,284 @@
package console
import (
"context"
"fmt"
"sync"
"github.com/Fishwaldo/mouthpiece/pkg/db"
"github.com/Fishwaldo/mouthpiece/pkg/interfaces"
"github.com/Fishwaldo/mouthpiece/pkg/msg"
"github.com/Fishwaldo/mouthpiece/pkg/transport"
"github.com/Fishwaldo/mouthpiece/pkg/mperror"
"github.com/go-logr/logr"
)
type ConsoleTransportProvider struct {
svcctx *interfaces.MPContext
instancelock sync.Mutex
instances map[string]*ConsoleTransportInstance
log logr.Logger
}
type ConsoleInstanceConfig struct {
transport.TransportInstanceStdConfigFields
}
type ConsoleTransportInstance struct {
Config ConsoleInstanceConfig `gorm:"embedded"`
log logr.Logger
recptlock sync.Mutex
recipients map[uint]*ConsoleTransportRecipient
}
type ConsoleRecipientConfig struct {
transport.TransportRecipientConfigFields
}
type ConsoleTransportRecipient struct {
Config ConsoleRecipientConfig `gorm:"embedded"`
instance *ConsoleTransportInstance
}
func init() {
tp := NewStdoutTransportProvider()
transport.RegisterTransportProvider(tp)
}
func NewStdoutTransportProvider() interfaces.TransportProvider {
return &ConsoleTransportProvider{
instances: make(map[string]*ConsoleTransportInstance),
}
}
func (t *ConsoleTransportProvider) GetName() string {
return "console"
}
func (t *ConsoleTransportProvider) Start(ctx *interfaces.MPContext, log logr.Logger) error {
t.instancelock.Lock()
defer t.instancelock.Unlock()
t.svcctx = ctx
t.log = log.WithName("ConsoleTransportProvider")
db.Db.AutoMigrate(&ConsoleTransportInstance{}, &ConsoleTransportRecipient{})
var tpi []ConsoleTransportInstance
if tx := db.Db.Find(&tpi); tx.Error != nil {
t.log.Error(tx.Error, "Error Loading Transport Instances", "name", t.GetName())
return tx.Error
}
for _, v := range tpi {
v.log = t.log.WithName(v.GetName())
v.recipients = make(map[uint]*ConsoleTransportRecipient)
if err := v.Start(t.svcctx); err != nil {
t.log.Error(err, "Error Starting Transport Instance", "name", v.GetName())
} else {
t.instances[v.GetName()] = &v
}
}
t.log.Info("Transport Started", "name", t.GetName())
return nil
}
func (t *ConsoleTransportProvider) CreateInstance(ctx context.Context, config any) (interfaces.TransportInstance, error) {
t.log.Info("Creating Transport Instance", "name", t.GetName())
cticonfig, ok := config.(ConsoleInstanceConfig);
if ok == false {
return nil, fmt.Errorf("Invalid Config - Cannot Cast to ConsoleInstanceConfig")
}
tpi := &ConsoleTransportInstance{
Config: cticonfig,
log: t.log.WithName(cticonfig.Name),
recipients: make(map[uint]*ConsoleTransportRecipient),
}
if tx := db.Db.WithContext(ctx).Create(tpi); tx.Error != nil {
t.log.Error(tx.Error, "Error Creating Transport Instance", "name", tpi.GetName())
return nil, tx.Error
}
if err := tpi.Start(t.svcctx); err != nil {
t.log.Error(err, "Error Starting Transport Instance", "name", tpi.GetName())
return nil, err
}
t.instancelock.Lock()
defer t.instancelock.Unlock()
t.instances[tpi.GetName()] = tpi
return tpi, nil
}
func (t *ConsoleTransportProvider) DeleteInstance(ctx context.Context, tpi interfaces.TransportInstance) error {
t.log.Info("Deleting Transport Instance", "name", tpi.GetName())
if tx := db.Db.WithContext(ctx).Delete(tpi); tx.Error != nil {
t.log.Error(tx.Error, "Error Deleting Transport Instance", "name", tpi.GetName())
return tx.Error
}
if err := tpi.Stop(ctx); err != nil {
t.log.Error(err, "Error Stopping Transport Instance", "name", tpi.GetName())
}
t.instancelock.Lock()
defer t.instancelock.Unlock()
delete(t.instances, tpi.GetName())
return nil
}
func (t *ConsoleTransportProvider) GetInstance(ctx context.Context, ID uint) (interfaces.TransportInstance, error) {
t.instancelock.Lock()
defer t.instancelock.Unlock()
for _, v := range t.instances {
if v.GetID() == ID {
return v, nil
}
}
return nil, fmt.Errorf("transport Instance Not Found")
}
func (t *ConsoleTransportProvider) GetInstanceByName(ctx context.Context, name string) (interfaces.TransportInstance, error) {
t.instancelock.Lock()
defer t.instancelock.Unlock()
if v, ok := t.instances[name]; ok {
return v, nil
}
return nil, fmt.Errorf("transport Instance Not Found")
}
func (t *ConsoleTransportProvider) GetInstances(ctx context.Context) ([]uint, error) {
t.instancelock.Lock()
defer t.instancelock.Unlock()
var IDs []uint
for _, v := range t.instances {
IDs = append(IDs, v.GetID())
}
return IDs, nil
}
func (tpi *ConsoleTransportInstance) GetName() string {
return tpi.Config.Name
}
func (tpi *ConsoleTransportInstance) GetID() uint {
return tpi.Config.ID
}
func (tpi *ConsoleTransportInstance) Start(*interfaces.MPContext) error {
tpi.log.Info("Starting Transport Instance", "name", tpi.GetName())
var tprs []ConsoleTransportRecipient
if tx := db.Db.Find(&tprs); tx.Error != nil {
tpi.log.Error(tx.Error, "Error Loading Transport Recipients", "name", tpi.GetName())
return tx.Error
}
for _, v := range tprs {
v.instance = tpi
tpi.recipients[v.GetID()] = &v
}
return nil
}
func (tpi *ConsoleTransportInstance) Stop (context.Context) error {
return nil
}
func (tpi *ConsoleTransportInstance) CreateGroupTransportRecipient(ctx context.Context, assignedID uint, grp interfaces.GroupI, config any) (interfaces.TransportRecipient, error) {
tpi.log.Info("Creating Transport Reciepient", "name", tpi.GetName(), "group", grp.GetID())
return tpi.createTransportReciepient(ctx, assignedID, nil, grp, config)
}
func (tpi *ConsoleTransportInstance) CreateUserTransportRecipient(ctx context.Context, assignedID uint, user interfaces.UserI, config any) (interfaces.TransportRecipient, error) {
tpi.log.Info("Creating Transport Reciepient", "name", tpi.GetName(), "user", user.GetID())
return tpi.createTransportReciepient(ctx, assignedID, user, nil, config)
}
func (tpi *ConsoleTransportInstance) createTransportReciepient(ctx context.Context, assignedID uint, user interfaces.UserI, grp interfaces.GroupI, config any) (interfaces.TransportRecipient, error) {
if assignedID == 0 {
return nil, fmt.Errorf("assignedID Cannot Be Zero")
}
tpr := ConsoleTransportRecipient{
instance: tpi,
}
tpr.Config.ID = assignedID
if user != nil {
tpr.Config.UserID = user.GetID()
}
if grp != nil {
tpr.Config.GroupID = grp.GetID()
}
if tx := db.Db.WithContext(ctx).Create(&tpr); tx.Error != nil {
tpi.log.Error(tx.Error, "Error Creating Transport Recipient", "ID", tpr.GetID())
return nil, tx.Error
}
tpi.recptlock.Lock()
defer tpi.recptlock.Unlock()
tpi.recipients[tpr.GetID()] = &tpr
return tpr, nil
}
func (tpi *ConsoleTransportInstance) GetTransportReciepients(ctx context.Context) ([]uint, error) {
tpi.recptlock.Lock()
defer tpi.recptlock.Unlock()
var IDs []uint
for _, v := range tpi.recipients {
IDs = append(IDs, v.GetID())
}
return IDs, nil
}
func (tpi *ConsoleTransportInstance) GetTransportRecipientByTransportID(ctx context.Context, ID uint) (interfaces.TransportRecipient, error) {
tpi.recptlock.Lock()
defer tpi.recptlock.Unlock()
if tpr, ok := tpi.recipients[ID]; ok {
return tpr, nil
}
return nil, nil
}
func (tpi *ConsoleTransportInstance) DeleteTransportRecipient(ctx context.Context, tpr interfaces.TransportRecipient) error {
tpi.recptlock.Lock()
defer tpi.recptlock.Unlock()
for k, v := range tpi.recipients {
if v.GetID() == tpr.GetID() {
delete(tpi.recipients, k)
if tx := db.Db.WithContext(ctx).Delete(tpr); tx.Error != nil {
tpi.log.Error(tx.Error, "Error Deleting Transport Recipient", "ID", tpr.GetID())
return tx.Error
}
return nil
}
}
return mperror.ErrTransportReciepiantNotFound
}
func (tpr ConsoleTransportRecipient) GetID() uint {
return tpr.Config.ID
}
func (tpr ConsoleTransportRecipient) GetGroupID() uint {
return tpr.Config.GroupID
}
func (tpr ConsoleTransportRecipient) GetUserID() uint {
return tpr.Config.UserID
}
func (tpr ConsoleTransportRecipient) ProcessGroupMessage(ctx context.Context, msg msg.Message) error {
fmt.Println("=========================================================")
fmt.Printf("Group Message: %s\n", msg.Body.Message)
fmt.Println("=========================================================")
return nil
}
func (tpr ConsoleTransportRecipient) ProcessMessage(ctx context.Context, msg msg.Message) error {
fmt.Println("=========================================================")
fmt.Printf("Message: %s\n", msg.Body.Message)
fmt.Println("=========================================================")
return nil
}

View file

@ -1,45 +0,0 @@
package stdout
import (
"context"
"fmt"
"github.com/Fishwaldo/mouthpiece/pkg/log"
"github.com/Fishwaldo/mouthpiece/pkg/message"
"github.com/Fishwaldo/mouthpiece/pkg/transport"
)
type StdoutTransport struct {
}
func init() {
tp := NewStdoutTransport()
transport.RegisterTransport(tp)
}
func NewStdoutTransport() transport.ITransport {
return &StdoutTransport{}
}
func (t StdoutTransport) GetName() string {
return "stdout"
}
func (t StdoutTransport) SendMessage(ctx context.Context, config transport.TransportConfig, msg msg.Message) (err error) {
fmt.Println("=========================================================")
fmt.Printf("Message: %s\n", msg.Body.Message)
fmt.Println("=========================================================")
transport.UpdateTransportStatus(ctx, t, msg, "sent")
return nil
}
func (t StdoutTransport) Start() {
log.Log.Info("Transport Started", "name", t.GetName())
}
func (t StdoutTransport) NewTransportConfig(ctx context.Context) {
// user.TransportConfigs = append(user.TransportConfigs, mouthpiece.TransportConfig{
// Transport: t.GetName(),
// Config: user.Username,
// })
return
}

View file

@ -3,55 +3,75 @@ package transport
import (
"context"
"errors"
"sync"
"github.com/Fishwaldo/mouthpiece/pkg/db"
"github.com/Fishwaldo/mouthpiece/pkg/interfaces"
"github.com/Fishwaldo/mouthpiece/pkg/log"
"github.com/Fishwaldo/mouthpiece/pkg/message"
"gorm.io/gorm"
"github.com/Fishwaldo/mouthpiece/pkg/mperror"
// "github.com/Fishwaldo/mouthpiece/pkg/msg"
//"gorm.io/gorm"
"github.com/go-logr/logr"
)
type TransportConfig struct {
gorm.Model `json:"-"`
UserID uint `json:"-"`
Transport string
Config string
type TransportInstanceStdConfigFields struct {
ID uint `gorm:"primary_key"`
Name string
Description string
TransportProviderID uint
}
type ITransport interface {
GetName() string
Start()
SendMessage(ctx context.Context, config TransportConfig, message msg.Message) (err error)
NewTransportConfig(ctx context.Context)
type TransportRecipientConfigFields struct {
ID uint `gorm:"primary_key"`
Name string
Description string
UserID uint
GroupID uint
TransportInstanceID uint
TransportProviderID uint
}
var transports map[string]ITransport
type TransportService struct {
serviceContext *interfaces.MPContext
log logr.Logger
transportInstancesMutex sync.Mutex
transportInstances map[uint]interfaces.TransportInstance
}
func RegisterTransport(transport ITransport) {
type transportServiceRecieptientMap struct {
ID uint `gorm:"primary_key"`
TransportInstance uint
TransportRecipientID uint
}
// Map of the Available Transports
var transports map[string]interfaces.TransportProvider
// Mutex to protect the transports map
var transportsMutex sync.Mutex
func RegisterTransportProvider(transport interfaces.TransportProvider) {
transportsMutex.Lock()
defer transportsMutex.Unlock()
if transports == nil {
transports = make(map[string]ITransport)
transports = make(map[string]interfaces.TransportProvider)
}
transports[transport.GetName()] = transport
}
func InitializeTransports() {
db.Db.AutoMigrate(&TransportConfig{})
}
func StartTransports() {
for k, t := range transports {
log.Log.Info("Starting Transport", "transport", k)
t.Start()
}
}
func GetTransport(ctx context.Context, name string) (ITransport, error) {
func GetTransportProvider(ctx context.Context, name string) (interfaces.TransportProvider, error) {
transportsMutex.Lock()
defer transportsMutex.Unlock()
if t, ok := transports[name]; ok {
return t, nil
}
return nil, errors.New("Transport Not Found")
}
func GetTransports(ctx context.Context) []string {
func GetTransportProviders(ctx context.Context) []string {
transportsMutex.Lock()
defer transportsMutex.Unlock()
var a []string
for k := range transports {
a = append(a, k)
@ -59,6 +79,237 @@ func GetTransports(ctx context.Context) []string {
return a
}
func UpdateTransportStatus(ctx context.Context, t ITransport, m msg.Message, status string) {
log.Log.Info("Transport Status", "status", status, "MessageID", m.ID, "Transport", t.GetName())
func NewTransportService() *TransportService {
return &TransportService{
log: log.Log.WithName("transports"),
transportInstances: make(map[uint]interfaces.TransportInstance),
}
}
func (tps *TransportService) Start(svcctx *interfaces.MPContext) error {
db.Db.AutoMigrate(&transportServiceRecieptientMap{})
transportsMutex.Lock()
defer transportsMutex.Unlock()
tps.serviceContext = svcctx
for k, t := range transports {
tps.log.Info("Starting Transport", "transport", k)
if err := t.Start(svcctx, tps.log); err != nil {
tps.log.Error(err, "Error Starting Transport", "transport", k)
}
}
return nil
}
func (tps *TransportService) CreateTransportInstance(ctx context.Context, provider string, config any) (interfaces.TransportInstance, error) {
if t, ok := transports[provider]; ok {
if tpi, err := t.CreateInstance(ctx, config); err != nil {
tps.log.Error(err, "Error Creating Transport Instance", "transport", provider)
return nil, err
} else {
tps.transportInstancesMutex.Lock()
defer tps.transportInstancesMutex.Unlock()
tps.transportInstances[tpi.GetID()] = tpi
return tpi, nil
}
}
return nil, errors.New("transport Provider Not Found")
}
func (tps *TransportService) GetTransportInstances(context.Context) ([]uint, error) {
var tpis []uint
tps.transportInstancesMutex.Lock()
defer tps.transportInstancesMutex.Unlock()
for _, v := range tps.transportInstances {
tpis = append(tpis, v.GetID())
}
return tpis, nil
}
func (tps *TransportService) GetTransportInstance(ctx context.Context, id uint) (interfaces.TransportInstance, error) {
tps.transportInstancesMutex.Lock()
defer tps.transportInstancesMutex.Unlock()
if tpi, ok := tps.transportInstances[id]; ok {
return tpi, nil
}
return nil, mperror.ErrTransportInstanceNotFound
}
func (tps *TransportService) GetTransportInstanceByName(ctx context.Context, name string) (interfaces.TransportInstance, error) {
tps.transportInstancesMutex.Lock()
defer tps.transportInstancesMutex.Unlock()
for _, v := range tps.transportInstances {
if v.GetName() == name {
return v, nil
}
}
return nil, mperror.ErrTransportInstanceNotFound
}
func (tps *TransportService) DeleteTransportInstance(ctx context.Context, tpi interfaces.TransportInstance) error {
tps.transportInstancesMutex.Lock()
defer tps.transportInstancesMutex.Unlock()
delete(tps.transportInstances, tpi.GetID())
var tprmap []transportServiceRecieptientMap
if tx := db.Db.WithContext(ctx).Find(&tprmap, "transport_instance_id = ?", tpi.GetID()); tx.Error != nil {
log.Log.Error(tx.Error, "Error Deleting Transport Instance", "transport", tpi.GetName())
return tx.Error
} else {
for _, v := range tprmap {
if tpr, err := tpi.GetTransportRecipientByTransportID(ctx, v.TransportRecipientID); err != nil {
log.Log.Error(err, "Error Getting Transport Recieptients", "transport", tpi.GetName())
} else {
if err := tpi.DeleteTransportRecipient(ctx, tpr); err != nil {
log.Log.Error(err, "Error Deleting Transport Recipient", "transport", tpi.GetName())
}
if tx := db.Db.WithContext(ctx).Delete(&v); tx.Error != nil {
log.Log.Error(tx.Error, "Error Deleting Transport Recipient from Map", "transport", tpi.GetName())
}
}
}
}
return nil
}
func (tps *TransportService) CreateGroupTransportRecipient(ctx context.Context, tpi interfaces.TransportInstance, grp interfaces.GroupI, config any) (interfaces.TransportRecipient, error) {
if tpi == nil {
return nil, mperror.ErrTransportInstanceNotFound
}
tx := db.Db.Begin()
tprmap := &transportServiceRecieptientMap{
TransportInstance: tpi.GetID(),
}
if res := tx.Create(tprmap); res.Error != nil {
tx.Rollback()
return nil, res.Error
}
if tpr, err := tpi.CreateGroupTransportRecipient(ctx, tprmap.ID, grp, config); err != nil {
tx.Rollback()
return nil, err
} else {
tprmap.TransportRecipientID = tpr.GetID()
if res2 := tx.Save(tprmap); res2.Error != nil {
tx.Rollback()
return nil, res2.Error
}
tx.Commit()
return tpr, nil
}
}
func (tps *TransportService) CreateUserTransportRecipient(ctx context.Context, tpi interfaces.TransportInstance, user interfaces.UserI, config any) (interfaces.TransportRecipient, error) {
if tpi == nil {
return nil, mperror.ErrTransportInstanceNotFound
}
tx := db.Db.Begin()
tprmap := &transportServiceRecieptientMap{
TransportInstance: tpi.GetID(),
}
if res := tx.Create(tprmap); res.Error != nil {
tx.Rollback()
return nil, res.Error
}
if tpr, err := tpi.CreateUserTransportRecipient(ctx, tprmap.ID, user, config); err != nil {
tx.Rollback()
return nil, err
} else {
tprmap.TransportRecipientID = tpr.GetID()
if res2 := tx.Save(tprmap); res2.Error != nil {
tx.Rollback()
return nil, res2.Error
}
if res := tx.Commit(); res.Error != nil {
tx.Rollback()
return nil, res.Error
}
return tpr, nil
}
}
func (tps *TransportService) DeleteTransportRecipient(ctx context.Context, tpr interfaces.TransportRecipient) error {
var tprmap transportServiceRecieptientMap
if res := db.Db.WithContext(ctx).Find(&tprmap, "transport_recipient_id = ?", tpr.GetID()); res.Error != nil {
return res.Error
}
tx := db.Db.Begin()
if tpi, err := tps.GetTransportInstance(ctx, tprmap.TransportInstance); err != nil {
log.Log.Error(err, "Error Getting Transport Instance", "transport_instance", tprmap.TransportInstance)
tx.Rollback()
return err
} else {
if err := tpi.DeleteTransportRecipient(ctx, tpr); err != nil {
tx.Rollback()
return err
}
if res := tx.WithContext(ctx).Delete(&tprmap); res != nil {
tx.Rollback()
return res.Error
}
if res := tx.Commit(); res.Error != nil {
tx.Rollback()
return res.Error
}
return nil
}
}
func (tps *TransportService) GetTransportReciepientsForGroup(ctx context.Context, grp interfaces.GroupI) ([]uint, error) {
var res []uint
for _, v := range tps.transportInstances {
if tprs, err := v.GetTransportReciepients(ctx); err != nil {
log.Log.Error(err, "Error Getting Transport Recipients", "transport_instance", v.GetID())
} else {
for _, tpid := range tprs {
if tpr, err := v.GetTransportRecipientByTransportID(ctx, tpid); err != nil {
log.Log.Error(err, "Error Getting Transport Recipient", "transport_recipient", tpid)
} else {
if tpr.GetGroupID() == grp.GetID() {
res = append(res, tpr.GetID())
}
}
}
}
}
return res, nil
}
func (tps *TransportService) GetTransportReciepientsForUser(ctx context.Context, user interfaces.UserI) ([]uint, error) {
var res []uint
for _, v := range tps.transportInstances {
if tprs, err := v.GetTransportReciepients(ctx); err != nil {
log.Log.Error(err, "Error Getting Transport Recipients", "transport_instance", v.GetID())
} else {
for _, tpid := range tprs {
if tpr, err := v.GetTransportRecipientByTransportID(ctx, tpid); err != nil {
log.Log.Error(err, "Error Getting Transport Recipient", "transport_recipient", tpid)
} else {
if tpr.GetUserID() == user.GetID() {
res = append(res, tpr.GetID())
}
}
}
}
}
return res, nil
}
func (tps *TransportService) GetTransportReciepients(ctx context.Context) ([]uint, error) {
var res []uint
for _, v := range tps.transportInstances {
if tprs, err := v.GetTransportReciepients(ctx); err != nil {
log.Log.Error(err, "Error Getting Transport", "transport_instance", v.GetID())
} else {
res = append(res, tprs...)
}
}
return res, nil
}
func (tps *TransportService) GetTransportReciepient(ctx context.Context, tid uint) (interfaces.TransportRecipient, error) {
var tpr transportServiceRecieptientMap
if res := db.Db.WithContext(ctx).Find(&tpr, "id = ?", tid); res.Error != nil {
return nil, res.Error
}
if tpi, err := tps.GetTransportInstance(ctx, tpr.TransportInstance); err != nil {
log.Log.Error(err, "Error Getting Transport Instance", "transport_instance", tpr.TransportInstance)
return nil, err
} else {
return tpi.GetTransportRecipientByTransportID(ctx, tpr.TransportRecipientID)
}
}

View file

@ -9,14 +9,19 @@ import (
"github.com/Fishwaldo/mouthpiece/pkg/interfaces"
"github.com/Fishwaldo/mouthpiece/pkg/log"
"github.com/Fishwaldo/mouthpiece/pkg/message"
"github.com/Fishwaldo/mouthpiece/pkg/transport"
"github.com/Fishwaldo/mouthpiece/pkg/msg"
"github.com/go-playground/validator/v10"
)
type User struct {
interfaces.UserDetails
TransportConfigs []transport.TransportConfig `json:"transports,omitempty" gorm:"many2many:user_transports;" validate:"-"`
TransportRecipient []uint `gorm:"-"`
}
// TableName Override the table name for the Users table
// to ensure its consistant with the table name defined in the Interfaces Package
func (User) TableName() string {
return "users"
}
func (u User) AddRoleToUser(ctx context.Context, role string) bool {
@ -106,18 +111,18 @@ func (u User) SetLastName(lname string) error {
return u.SetDetails(details)
}
func (u User) ProcessMessage(ctx context.Context, msg *msg.Message) (err error) {
func (u User) ProcessMessage(ctx context.Context, msg msg.Message) (err error) {
/* add User Fields to Message */
msg.Body.Fields["first_name"] = u.FirstName
msg.Body.Fields["last_name"] = u.LastName
msg.Body.Fields["email"] = u.Email
log.Log.V(1).Info("User Processing Message", "Email", u.Email, "MessageID", msg.ID)
for _, tc := range u.TransportConfigs {
t, err := transport.GetTransport(ctx, tc.Transport)
if err != nil {
log.Log.Info("Cant find Transport", "Transport", tc.Transport)
}
go t.SendMessage(ctx, tc, *msg)
}
// for _, tc := range u.TransportConfigs {
// t, err := transport.GetTransport(ctx, tc.Transport)
// if err != nil {
// log.Log.Info("Cant find Transport", "Transport", tc.Transport)
// }
// go t.SendMessage(ctx, tc, *msg)
// }
return
}

View file

@ -4,41 +4,27 @@ import (
"context"
"github.com/Fishwaldo/mouthpiece/pkg/db"
mperror "github.com/Fishwaldo/mouthpiece/pkg/errors"
"github.com/Fishwaldo/mouthpiece/pkg/interfaces"
"github.com/Fishwaldo/mouthpiece/pkg/log"
"github.com/go-playground/validator/v10"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
type UsersService struct {
ctx *interfaces.MPContext
}
func NewUsersService() *UsersService {
us := &UsersService{}
us := &UsersService{
}
return us
}
func (us UsersService) Init() {
db.Db.Debug().AutoMigrate(&User{})
var count int64
db.Db.Debug().Model(&User{}).Count(&count)
log.Log.V(1).Info("Initializing Users", "count", count)
if count == 0 {
log.Log.Info("Creating Default Users")
admin := interfaces.UserDetails{FirstName: "Admin", LastName: "User", Email: "admin@example.com", Password: "password"}
if admin, err := us.CreateUser(context.Background(), admin); err == nil {
admin.AddRoleToUser(context.Background(), "admin")
log.Log.Info("Created Default Admin admin@example.com")
}
user := interfaces.UserDetails{FirstName: "User", LastName: "User", Email: "user@example.com", Password: "password"}
if _, err := us.CreateUser(context.Background(), user); err == nil {
log.Log.Info("Created Default User user@example.com")
}
}
func (us UsersService) Start(ctx *interfaces.MPContext) error {
us.ctx = ctx
db.Db.AutoMigrate(&User{})
return nil
}
func (us UsersService) CreateUser(ctx context.Context, user interfaces.UserDetails) (interfaces.UserI, error) {
@ -66,6 +52,7 @@ func (us UsersService) CreateUser(ctx context.Context, user interfaces.UserDetai
if !dbuser.AddRoleToUser(ctx, "user") {
log.Log.Info("Error Adding User Role", "Error", err)
}
log.Log.Info("User Created", "User", dbuser)
return dbuser, nil
} else {
return nil, err
@ -83,16 +70,12 @@ func (u UsersService) GetUsers(ctx context.Context) map[uint]interfaces.UserI {
}
func (u UsersService) GetUserByEmail(ctx context.Context, email string) (user interfaces.UserI, err error) {
tx := db.Db.WithContext(ctx).Preload(clause.Associations).First(&user, "email = ?", email)
if tx.Error == gorm.ErrRecordNotFound {
return nil, mperror.ErrUserNotFound
}
return user, nil
var dbuser User
tx := db.Db.WithContext(ctx).Preload(clause.Associations).First(&dbuser, "email = ?", email)
return &dbuser, tx.Error
}
func (u UsersService) GetUser(ctx context.Context, id uint) (user interfaces.UserI, err error) {
tx := db.Db.WithContext(ctx).Preload(clause.Associations).First(&user, "ID = ?", id)
if tx.Error == gorm.ErrRecordNotFound {
return nil, mperror.ErrUserNotFound
}
return user, nil
var dbuser User
tx := db.Db.WithContext(ctx).Preload(clause.Associations).First(&dbuser, "id = ?", id)
return &dbuser, tx.Error
}

View file

@ -8,7 +8,6 @@ var (
Validate *validator.Validate
)
func Get() *validator.Validate {
if Validate == nil {
Validate = validator.New()