mirror of
https://github.com/Fishwaldo/restic-nats-server.git
synced 2025-03-15 19:41:35 +00:00
Update to the nats-restic module and tidy up
This commit is contained in:
parent
23f435abc7
commit
326eddf64a
12 changed files with 240 additions and 785 deletions
|
@ -5,7 +5,7 @@
|
|||
|
||||
## Sub Packages
|
||||
|
||||
* [cmd](./cmd)
|
||||
* [cache](./cache)
|
||||
|
||||
* [protocol](./protocol)
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
{
|
||||
"loglevel": 2,
|
||||
"loglevel": 0,
|
||||
"authtype": "username",
|
||||
"nats": {
|
||||
"workers": [
|
||||
|
@ -28,7 +28,7 @@
|
|||
]
|
||||
},
|
||||
"worker": {
|
||||
"number": 50,
|
||||
"number": 5,
|
||||
"connecturl": "nats://localhost:4222/",
|
||||
"nkey": "asdfasdf",
|
||||
"credfile": "/home/fish/.nkeys/creds/Operator/Backup/restic.creds",
|
||||
|
|
5
go.mod
5
go.mod
|
@ -4,9 +4,10 @@ go 1.17
|
|||
|
||||
require (
|
||||
github.com/Fishwaldo/go-logadapter v0.0.2
|
||||
github.com/Fishwaldo/restic-nats v0.0.1-rc1
|
||||
github.com/buraksezer/olric v0.4.0
|
||||
github.com/nats-io/nats-server/v2 v2.6.1
|
||||
github.com/nats-io/nats.go v1.12.3
|
||||
github.com/nats-io/nats.go v1.13.0
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/spf13/cobra v1.2.1
|
||||
github.com/spf13/viper v1.9.0
|
||||
|
@ -60,4 +61,4 @@ require (
|
|||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
)
|
||||
|
||||
replace github.com/Fishwaldo/restic-nats-server/protocol => /home/fish/restic-nats-server/protocol
|
||||
//replace github.com/Fishwaldo/restic-nats => ../restic-nats
|
||||
|
|
5
go.sum
5
go.sum
|
@ -47,6 +47,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
|
|||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||
github.com/Fishwaldo/go-logadapter v0.0.2 h1:RxFOr+bEDqQ1rPUmjUX5u8fGLCKY0Lyea+9AxDqzaW4=
|
||||
github.com/Fishwaldo/go-logadapter v0.0.2/go.mod h1:aRbQ8rWdpeD0WWo241ctqgk/yRto8Axg09EkwWiVGK0=
|
||||
github.com/Fishwaldo/restic-nats v0.0.1-rc1 h1:Byhk87ggM4AHPU+J1NV50zwZ1D/E7p33gEvl8cR83lo=
|
||||
github.com/Fishwaldo/restic-nats v0.0.1-rc1/go.mod h1:GlR1upvCwuE0pTyxdU1Ztb8hxCJ63fNH4Gjxo4hG1nc=
|
||||
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
|
||||
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
|
||||
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
|
||||
|
@ -272,8 +274,9 @@ github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
|
|||
github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
|
||||
github.com/nats-io/nats-server/v2 v2.6.1 h1:cJy+ia7/4EaJL+ZYDmIy2rD1mDWTfckhtPBU0GYo8xM=
|
||||
github.com/nats-io/nats-server/v2 v2.6.1/go.mod h1:Az91TbZiV7K4a6k/4v6YYdOKEoxCXj+iqhHVf/MlrKo=
|
||||
github.com/nats-io/nats.go v1.12.3 h1:te0GLbRsjtejEkZKKiuk46tbfIn6FfCSv3WWSo1+51E=
|
||||
github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nats.go v1.13.0 h1:LvYqRB5epIzZWQp6lmeltOOZNLqCvm4b+qfvzZO03HE=
|
||||
github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
|
||||
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
|
||||
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
|
||||
|
|
|
@ -7,7 +7,7 @@ import (
|
|||
"path"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/Fishwaldo/restic-nats-server/protocol"
|
||||
"github.com/Fishwaldo/restic-nats"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
@ -26,7 +26,7 @@ func FSMkDir(dir string) (error) {
|
|||
return os.MkdirAll(path.Join(pwd, "repo", dir), 0700)
|
||||
}
|
||||
|
||||
func FSSave(file string, data *[]byte, offset int64) (int, error) {
|
||||
func FSSave(file string, data *[]byte) (int, error) {
|
||||
pwd, _ := os.Getwd()
|
||||
filename := path.Join(pwd, "repo", file)
|
||||
tmpname := filepath.Base(filename) + "-tmp-"
|
||||
|
@ -58,7 +58,7 @@ func FSSave(file string, data *[]byte, offset int64) (int, error) {
|
|||
return len, nil
|
||||
}
|
||||
|
||||
func FSListFiles(dir string, recursive bool) ([]protocol.FileInfo, error) {
|
||||
func FSListFiles(dir string, recursive bool) ([]rns.FileInfo, error) {
|
||||
pwd, _ := os.Getwd()
|
||||
finaldir := path.Join(pwd, "repo", dir)
|
||||
|
||||
|
@ -73,7 +73,7 @@ func FSListFiles(dir string, recursive bool) ([]protocol.FileInfo, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
var result []protocol.FileInfo
|
||||
var result []rns.FileInfo
|
||||
for _, fi := range sub {
|
||||
if fi.IsDir() {
|
||||
/* dont' recursive more than 1 level */
|
||||
|
@ -84,10 +84,10 @@ func FSListFiles(dir string, recursive bool) ([]protocol.FileInfo, error) {
|
|||
result = append(result, test...)
|
||||
}
|
||||
if fi.Name() != "" {
|
||||
result = append(result, protocol.FileInfo{Name: fi.Name(), Size: fi.Size()})
|
||||
result = append(result, rns.FileInfo{Name: fi.Name(), Size: fi.Size()})
|
||||
}
|
||||
}
|
||||
return result, err
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func FSLoadFile(filename string) (*os.File, error) {
|
||||
|
|
35
internal/client/client.go
Normal file
35
internal/client/client.go
Normal file
|
@ -0,0 +1,35 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/Fishwaldo/restic-nats"
|
||||
"github.com/Fishwaldo/restic-nats-server/internal"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var clientList sync.Map
|
||||
|
||||
func Create(or rns.OpenRepoOp) (rns.Client, error) {
|
||||
client := rns.Client{ClientID: internal.RandString(16), Bucket: or.Bucket}
|
||||
clientList.Store(client.ClientID, client)
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func Find(clientid string) (rns.Client, error) {
|
||||
client, found := clientList.Load(clientid)
|
||||
if !found {
|
||||
return rns.Client{}, errors.New("Client Not Found")
|
||||
}
|
||||
return client.(rns.Client), nil
|
||||
}
|
||||
|
||||
func Remove(clientid string) (error) {
|
||||
_, found := clientList.Load(clientid)
|
||||
if found {
|
||||
clientList.Delete(clientid)
|
||||
return nil
|
||||
} else {
|
||||
return errors.New("Client Not Found")
|
||||
}
|
||||
}
|
|
@ -4,22 +4,32 @@ import (
|
|||
"net/url"
|
||||
"sync"
|
||||
|
||||
rns "github.com/Fishwaldo/restic-nats"
|
||||
"github.com/nats-io/nats.go"
|
||||
"gopkg.in/tomb.v2"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Server *url.URL
|
||||
Credential string
|
||||
|
||||
type WorkerConfigT struct {
|
||||
NumWorkers int
|
||||
Connections uint
|
||||
Repo string
|
||||
}
|
||||
|
||||
type Backend struct {
|
||||
Cfg Config
|
||||
Conn *nats.Conn
|
||||
BuCommands *nats.Subscription
|
||||
Mx sync.Mutex
|
||||
T tomb.Tomb
|
||||
Enc nats.Encoder
|
||||
type NatsConfigT struct {
|
||||
MaxNatsConnections int
|
||||
NatsURL *url.URL
|
||||
NatsNKey string
|
||||
NatsCredfile string
|
||||
}
|
||||
|
||||
type GlobalStateT struct {
|
||||
WorkerConfig WorkerConfigT
|
||||
NatsConfig NatsConfigT
|
||||
Conn *rns.ResticNatsClient
|
||||
ClientCommand chan *nats.Msg
|
||||
ClientCommandSubscription *nats.Subscription
|
||||
Mx sync.Mutex
|
||||
T tomb.Tomb
|
||||
}
|
||||
|
||||
var GlobalState GlobalStateT
|
||||
|
|
|
@ -10,7 +10,7 @@ var Log logadapter.Logger
|
|||
|
||||
func init() {
|
||||
Log = logrus.LogrusDefaultLogger()
|
||||
viper.SetDefault("loglevel", 2)
|
||||
viper.SetDefault("loglevel", logadapter.LOG_DEBUG)
|
||||
}
|
||||
|
||||
func StartLogger() {
|
||||
|
|
|
@ -72,7 +72,7 @@ func GetInternalWorkerURL() (path *url.URL, err error) {
|
|||
if internalWorkerCred.Username == "" || internalWorkerCred.Password == "" {
|
||||
return nil, errors.New("Internal User Credentials are empty?")
|
||||
}
|
||||
return url.Parse(fmt.Sprintf("nats://%s:%s@localhost:%d/backup/", internalWorkerCred.Username, internalWorkerCred.Password, 4222))
|
||||
return url.Parse(fmt.Sprintf("nats://%s:%s@localhost:%d/", internalWorkerCred.Username, internalWorkerCred.Password, 4222))
|
||||
}
|
||||
|
||||
func Start() {
|
||||
|
|
|
@ -1,70 +0,0 @@
|
|||
package worker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/Fishwaldo/restic-nats-server/internal"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func connectNats(be *internal.Backend) error {
|
||||
be.Mx.Lock()
|
||||
defer be.Mx.Unlock()
|
||||
server := be.Cfg.Server.Hostname()
|
||||
port := be.Cfg.Server.Port()
|
||||
if port == "" {
|
||||
port = "4222"
|
||||
}
|
||||
url := fmt.Sprintf("nats://%s:%s", server, port)
|
||||
|
||||
var options []nats.Option
|
||||
|
||||
if len(be.Cfg.Credential) > 0 {
|
||||
/* Check Credential File Exists */
|
||||
_, err := os.Stat(be.Cfg.Credential)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "credential file missing")
|
||||
}
|
||||
options = append(options, nats.UserCredentials(be.Cfg.Credential))
|
||||
} else if len(be.Cfg.Server.User.Username()) > 0 {
|
||||
pass, _ := be.Cfg.Server.User.Password()
|
||||
options = append(options, nats.UserInfo(be.Cfg.Server.User.Username(), pass))
|
||||
}
|
||||
|
||||
|
||||
|
||||
options = append(options, nats.ClosedHandler(natsClosedCB))
|
||||
options = append(options, nats.DisconnectHandler(natsDisconnectedCB))
|
||||
var err error
|
||||
be.Conn, err = nats.Connect(url, options...)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "nats connection failed")
|
||||
}
|
||||
if size := be.Conn.MaxPayload(); size < 8388608 {
|
||||
return errors.New("NATS Server Max Payload Size is below 8Mb")
|
||||
}
|
||||
|
||||
internal.Log.Info("Connected to %s (%s)", be.Conn.ConnectedClusterName(), be.Conn.ConnectedServerName())
|
||||
|
||||
internal.Log.Info("Nats Message Size: %d", be.Conn.MaxPayload())
|
||||
|
||||
be.Enc = nats.EncoderForType("gob")
|
||||
|
||||
be.BuCommands, err = be.Conn.QueueSubscribeSync("repo.Hosts.commands.*", "opencommandworkers")
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Can't Add Consumer")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func natsClosedCB(conn *nats.Conn) {
|
||||
internal.Log.Info("Connection Closed: %s", conn.LastError())
|
||||
}
|
||||
|
||||
func natsDisconnectedCB(conn *nats.Conn) {
|
||||
internal.Log.Info("Connection Disconnected: %s", conn.LastError())
|
||||
}
|
|
@ -3,44 +3,33 @@ package worker
|
|||
import (
|
||||
"context"
|
||||
"io"
|
||||
"io/fs"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/Fishwaldo/go-logadapter"
|
||||
"github.com/Fishwaldo/restic-nats-server/internal"
|
||||
"github.com/Fishwaldo/restic-nats-server/internal/cache"
|
||||
"github.com/Fishwaldo/restic-nats-server/internal/natsserver"
|
||||
|
||||
"github.com/Fishwaldo/restic-nats-server/internal/backend/localfs"
|
||||
"github.com/Fishwaldo/restic-nats-server/protocol"
|
||||
"github.com/Fishwaldo/restic-nats-server/internal/cache"
|
||||
"github.com/Fishwaldo/restic-nats-server/internal/client"
|
||||
"github.com/Fishwaldo/restic-nats-server/internal/natsserver"
|
||||
"github.com/nats-io/nats.go"
|
||||
|
||||
"github.com/Fishwaldo/go-logadapter"
|
||||
rns "github.com/Fishwaldo/restic-nats"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
type WorkerData struct {
|
||||
type Worker struct {
|
||||
ID int
|
||||
Be *internal.Backend
|
||||
cancel context.CancelFunc
|
||||
Log logadapter.Logger
|
||||
Conn *rns.ResticNatsClient
|
||||
}
|
||||
|
||||
type workerCfgT struct {
|
||||
NumWorkers int
|
||||
ConnectURL string
|
||||
NKey string
|
||||
Credfile string
|
||||
Backends []string
|
||||
}
|
||||
|
||||
var WorkerCfg workerCfgT
|
||||
|
||||
|
||||
func init() {
|
||||
internal.ConfigRegister("worker", parseConfig, validateConfig)
|
||||
viper.SetDefault("worker.number", 10)
|
||||
|
@ -48,363 +37,244 @@ func init() {
|
|||
viper.SetDefault("worker.handles", "*")
|
||||
}
|
||||
|
||||
func parseConfig(cfg *viper.Viper) (error) {
|
||||
WorkerCfg.NumWorkers = cfg.GetInt("number")
|
||||
WorkerCfg.ConnectURL = cfg.GetString("connecturl")
|
||||
WorkerCfg.NKey = cfg.GetString("nkey")
|
||||
WorkerCfg.Credfile = cfg.GetString("credfile")
|
||||
WorkerCfg.Backends = cfg.GetStringSlice("handles")
|
||||
func parseConfig(cfg *viper.Viper) error {
|
||||
var err error
|
||||
internal.GlobalState.WorkerConfig.NumWorkers = cfg.GetInt("number")
|
||||
internal.GlobalState.NatsConfig.NatsURL, err = url.Parse(cfg.GetString("connecturl"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
internal.GlobalState.NatsConfig.NatsNKey = cfg.GetString("nkey")
|
||||
internal.GlobalState.NatsConfig.NatsCredfile = cfg.GetString("credfile")
|
||||
//WorkerCfg.Backends = cfg.GetStringSlice("handles")
|
||||
return nil
|
||||
}
|
||||
func validateConfig() (warnings []error, errors error) {
|
||||
internal.Log.Info("%+v\n", WorkerCfg)
|
||||
return nil, nil
|
||||
func validateConfig() (warnings []error, err error) {
|
||||
if viper.GetBool("start-nats-server") &&
|
||||
internal.GlobalState.NatsConfig.NatsURL.String() != "" {
|
||||
warnings = append(warnings, errors.New("Using Internal Nats Server. Ignoring Nats Credentials/URL"))
|
||||
url, _ := natsserver.GetInternalWorkerURL()
|
||||
internal.GlobalState.NatsConfig.NatsURL = url
|
||||
} else if viper.GetBool("start-nats-server") {
|
||||
url, _ := natsserver.GetInternalWorkerURL()
|
||||
internal.GlobalState.NatsConfig.NatsURL = url
|
||||
} else {
|
||||
if internal.GlobalState.NatsConfig.NatsURL.User.Username() != "" && internal.GlobalState.NatsConfig.NatsNKey != "" {
|
||||
return nil, errors.New("Cannot Set a Username and Nkey at the same time")
|
||||
}
|
||||
if internal.GlobalState.NatsConfig.NatsURL.User.Username() != "" && internal.GlobalState.NatsConfig.NatsCredfile != "" {
|
||||
return nil, errors.New("Cannot Set a Username and Credential file at the same time")
|
||||
}
|
||||
/* stat the Creds File if it exists */
|
||||
if internal.GlobalState.NatsConfig.NatsCredfile != "" {
|
||||
f, err := os.Open(internal.GlobalState.NatsConfig.NatsCredfile)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Cannot find Credential File")
|
||||
}
|
||||
f.Close()
|
||||
}
|
||||
|
||||
}
|
||||
return warnings, nil
|
||||
}
|
||||
|
||||
func StartWorker() {
|
||||
var url *url.URL
|
||||
if viper.GetBool("start-nats-server") {
|
||||
url, _ = natsserver.GetInternalWorkerURL()
|
||||
} else {
|
||||
url, _ = url.Parse(WorkerCfg.ConnectURL)
|
||||
}
|
||||
var options []rns.RNSOptions
|
||||
|
||||
cfg := internal.Config{
|
||||
Server: url,
|
||||
//Credential: "/home/fish/.nkeys/creds/Operator/Backup/restic.creds",
|
||||
if internal.GlobalState.NatsConfig.NatsCredfile != "" {
|
||||
options = append(options, rns.WithCredentials(internal.GlobalState.NatsConfig.NatsCredfile))
|
||||
} else if internal.GlobalState.NatsConfig.NatsNKey != "" {
|
||||
//XXX TODO
|
||||
internal.Log.Fatal("NKey Authentication TODO")
|
||||
}
|
||||
var repo string
|
||||
if cfg.Server.Path[0] == '/' {
|
||||
repo = cfg.Server.Path[1:]
|
||||
}
|
||||
if repo[len(repo)-1] == '/' {
|
||||
repo = repo[0 : len(repo)-1]
|
||||
}
|
||||
// replace any further slashes with . to specify a nested queue
|
||||
repo = strings.Replace(repo, "/", ".", -1)
|
||||
options = append(options, rns.WithLogger(internal.Log.New("RNSClient")))
|
||||
host, _ := os.Hostname()
|
||||
options = append(options, rns.WithName(host))
|
||||
options = append(options, rns.WithServer())
|
||||
|
||||
cfg.Repo = repo
|
||||
internal.Log.Debug("Connecting to %s", internal.GlobalState.NatsConfig.NatsURL)
|
||||
|
||||
be := &internal.Backend{
|
||||
Cfg: cfg,
|
||||
}
|
||||
if err := connectNats(be); err != nil {
|
||||
internal.Log.Fatal("Error Connecting to Nats: %s", err)
|
||||
os.Exit(0)
|
||||
conn, err := rns.New(*internal.GlobalState.NatsConfig.NatsURL, options...)
|
||||
if err != nil {
|
||||
internal.Log.Fatal("Cannot Create a new RNS Connection: %s", err)
|
||||
}
|
||||
internal.GlobalState.Conn = conn
|
||||
|
||||
for i :=0; i < WorkerCfg.NumWorkers; i++ {
|
||||
wd := WorkerData{ID: i, Be: be, Log: internal.Log.New("worker").With("ID", i)}
|
||||
be.T.Go(wd.Run);
|
||||
internal.Log.Debug("Connected to Nats Server %s (%s)", conn.Conn.ConnectedServerName(), conn.Conn.ConnectedClusterName())
|
||||
|
||||
/* setup our Subscription for Client Commands */
|
||||
internal.GlobalState.ClientCommand = make(chan *nats.Msg, 5)
|
||||
sub, err := internal.GlobalState.Conn.Conn.ChanQueueSubscribe("repo.Hosts.commands.*", "workerqueue", internal.GlobalState.ClientCommand)
|
||||
if err != nil {
|
||||
internal.Log.Fatal("Cant Setup Client Command Subscription: %s", err)
|
||||
return
|
||||
}
|
||||
internal.GlobalState.ClientCommandSubscription = sub
|
||||
|
||||
for i := 0; i < internal.GlobalState.WorkerConfig.NumWorkers; i++ {
|
||||
wd := Worker{ID: i,
|
||||
Log: internal.Log.New("worker").With("ID", i),
|
||||
Conn: internal.GlobalState.Conn}
|
||||
internal.GlobalState.T.Go(wd.Run)
|
||||
}
|
||||
|
||||
signalChan := make(chan os.Signal, 1)
|
||||
signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
|
||||
|
||||
s := <-signalChan
|
||||
be.T.Kill(nil)
|
||||
if err := be.T.Wait(); err != nil {
|
||||
internal.GlobalState.T.Kill(nil)
|
||||
if err := internal.GlobalState.T.Wait(); err != nil {
|
||||
internal.Log.Warn("Workers Reported Error: %s", err)
|
||||
}
|
||||
|
||||
|
||||
internal.Log.Warn("Got Shutdown Signal %s", s)
|
||||
cache.Shutdown()
|
||||
natsserver.Shutdown()
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
func (wd *Worker) Run() error {
|
||||
wd.Log.Trace("Worker Started")
|
||||
rnsServer, err := rns.NewRNSServer(wd, wd.Conn, wd.Log.New("RNSServer"))
|
||||
if err != nil {
|
||||
wd.Log.Warn("NewRNSServer Failed: %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
|
||||
func (wd *WorkerData) Run() error {
|
||||
for {
|
||||
var ctx context.Context
|
||||
ctx, wd.cancel = context.WithCancel(context.Background())
|
||||
netctx, _ := context.WithTimeout(ctx, 1*time.Second)
|
||||
var msg *nats.Msg
|
||||
select {
|
||||
case <-wd.Be.T.Dying():
|
||||
case <-internal.GlobalState.T.Dying():
|
||||
wd.Log.Warn("Killing Worker")
|
||||
wd.cancel()
|
||||
return nil
|
||||
default:
|
||||
case msg = <-internal.GlobalState.ClientCommand:
|
||||
}
|
||||
//wd.Be.Mx.Lock()
|
||||
msg, err := wd.Be.BuCommands.NextMsgWithContext(netctx)
|
||||
//wd.Be.Mx.Unlock()
|
||||
if err != nil {
|
||||
if !errors.Is(err, context.DeadlineExceeded) {
|
||||
wd.Log.Error("NextMsg Failed: Error: %s %t", err, err)
|
||||
return err
|
||||
}
|
||||
jobctx, _ := context.WithTimeout(ctx, 120*time.Second)
|
||||
start := time.Now()
|
||||
|
||||
if err := rnsServer.ProcessServerMsg(jobctx, msg); err != nil {
|
||||
wd.Log.Warn("Process Client Message Failed: %s", err)
|
||||
continue
|
||||
}
|
||||
if msg != nil {
|
||||
jobctx, _ := context.WithTimeout(ctx, 120*time.Second)
|
||||
wd.Log.Info("Message: %s %s", msg.Subject, msg.Sub.Queue)
|
||||
start := time.Now()
|
||||
|
||||
log := func(msg string, args ...interface{}) {
|
||||
wd.Log.Info(msg, args...)
|
||||
}
|
||||
|
||||
msg, err := protocol.ChunkReadRequestMsgWithContext(jobctx, wd.Be.Conn, msg, log)
|
||||
if err != nil {
|
||||
wd.Log.Warn("ChunkedRead Failed: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
operation := msg.Header.Get("X-RNS-OP")
|
||||
switch operation {
|
||||
case "open":
|
||||
var oo protocol.OpenOp
|
||||
or := protocol.OpenResult{Ok: false}
|
||||
|
||||
if err := wd.Be.Enc.Decode(msg.Subject, msg.Data, &oo); err != nil {
|
||||
wd.Log.Warn("Decode Failed: %s", err)
|
||||
}
|
||||
|
||||
or.Ok, err = wd.OpenRepo(oo)
|
||||
if err != nil {
|
||||
or.Ok = false
|
||||
wd.Log.Warn("OpenOp: Error: %s", err)
|
||||
}
|
||||
|
||||
replymsg := protocol.NewRNSMsg(msg.Reply)
|
||||
|
||||
replymsg.Data, err = wd.Be.Enc.Encode(msg.Reply, or)
|
||||
if err != nil {
|
||||
wd.Log.Warn("Encode Failed: %s", wd.ID, err)
|
||||
}
|
||||
if err = protocol.ChunkSendReplyMsgWithContext(ctx, wd.Be.Conn, msg, replymsg, log); err != nil {
|
||||
wd.Log.Warn("ChunkReplyRequestMsgWithContext Failed: %s", err)
|
||||
}
|
||||
case "stat":
|
||||
var so protocol.StatOp
|
||||
if err := wd.Be.Enc.Decode(msg.Subject, msg.Data, &so); err != nil {
|
||||
return errors.Wrap(err, "Decode Failed")
|
||||
}
|
||||
|
||||
fi, err := wd.Stat(so)
|
||||
var sr protocol.StatResult
|
||||
if err != nil {
|
||||
wd.Log.Warn("Stat: Error: %s\n", err)
|
||||
sr.Ok = false
|
||||
} else {
|
||||
sr.Ok = true
|
||||
sr.Size = fi.Size()
|
||||
sr.Name = fi.Name()
|
||||
}
|
||||
replymsg := protocol.NewRNSMsg(msg.Reply)
|
||||
replymsg.Data, err = wd.Be.Enc.Encode(msg.Reply, sr)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Encode Failed")
|
||||
}
|
||||
if err = protocol.ChunkSendReplyMsgWithContext(ctx, wd.Be.Conn, msg, replymsg, log); err != nil {
|
||||
wd.Log.Warn("ChunkReplyRequestMsgWithContext Failed: %s", err)
|
||||
}
|
||||
case "mkdir":
|
||||
|
||||
var mo protocol.MkdirOp
|
||||
if err := wd.Be.Enc.Decode(msg.Subject, msg.Data, &mo); err != nil {
|
||||
return errors.Wrap(err, "Decode Failed")
|
||||
}
|
||||
|
||||
var mr protocol.MkdirResult
|
||||
err := wd.Mkdir(mo)
|
||||
if err != nil {
|
||||
wd.Log.Warn("Mkdir: Error: %s", err)
|
||||
mr.Ok = false
|
||||
} else {
|
||||
mr.Ok = true
|
||||
}
|
||||
replymsg := protocol.NewRNSMsg(msg.Reply)
|
||||
replymsg.Data, err = wd.Be.Enc.Encode(msg.Reply, mr)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Encode Failed")
|
||||
}
|
||||
if err = protocol.ChunkSendReplyMsgWithContext(ctx, wd.Be.Conn, msg, replymsg, log); err != nil {
|
||||
wd.Log.Warn("ChunkReplyRequestMsgWithContext Failed: %s", err)
|
||||
}
|
||||
case "save":
|
||||
var so protocol.SaveOp
|
||||
if err := wd.Be.Enc.Decode(msg.Subject, msg.Data, &so); err != nil {
|
||||
return errors.Wrap(err, "Decode Failed")
|
||||
}
|
||||
|
||||
var sr protocol.SaveResult
|
||||
err := wd.Save(so)
|
||||
if err != nil {
|
||||
wd.Log.Warn("Save: Error: %s", err)
|
||||
sr.Ok = false
|
||||
} else {
|
||||
sr.Ok = true
|
||||
}
|
||||
wd.Log.Warn("Save Result: %+v", sr)
|
||||
replymsg := protocol.NewRNSMsg(msg.Reply)
|
||||
replymsg.Data, err = wd.Be.Enc.Encode(msg.Reply, sr)
|
||||
wd.Log.Warn("Reply is %d", len(replymsg.Data))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Encode Failed")
|
||||
}
|
||||
if err = protocol.ChunkSendReplyMsgWithContext(ctx, wd.Be.Conn, msg, replymsg, log); err != nil {
|
||||
wd.Log.Warn("ChunkSendReplyMsgWithContext Failed: %s", err)
|
||||
}
|
||||
case "list":
|
||||
var lo protocol.ListOp
|
||||
if err := wd.Be.Enc.Decode(msg.Subject, msg.Data, &lo); err != nil {
|
||||
return errors.Wrap(err, "Decode Failed")
|
||||
}
|
||||
|
||||
var lr protocol.ListResult
|
||||
lr, err = wd.List(lo)
|
||||
if err != nil {
|
||||
wd.Log.Warn("List: Error: %s", err)
|
||||
lr.Ok = false
|
||||
} else {
|
||||
lr.Ok = true
|
||||
}
|
||||
replymsg := protocol.NewRNSMsg(msg.Reply)
|
||||
replymsg.Data, err = wd.Be.Enc.Encode(msg.Reply, lr)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Encode Failed")
|
||||
}
|
||||
if err = protocol.ChunkSendReplyMsgWithContext(ctx, wd.Be.Conn, msg, replymsg, log); err != nil {
|
||||
wd.Log.Warn("ChunkReplyRequestMsgWithContext Failed: %s", err)
|
||||
}
|
||||
case "load":
|
||||
var lo protocol.LoadOp
|
||||
if err := wd.Be.Enc.Decode(msg.Subject, msg.Data, &lo); err != nil {
|
||||
return errors.Wrap(err, "Decode Failed")
|
||||
}
|
||||
|
||||
var lr protocol.LoadResult
|
||||
lr, err = wd.Load(lo)
|
||||
if err != nil {
|
||||
wd.Log.Warn("List: Error: %s", err)
|
||||
lr.Ok = false
|
||||
} else {
|
||||
lr.Ok = true
|
||||
}
|
||||
replymsg := protocol.NewRNSMsg(msg.Reply)
|
||||
replymsg.Data, err = wd.Be.Enc.Encode(msg.Reply, lr)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Encode Failed")
|
||||
}
|
||||
if err = protocol.ChunkSendReplyMsgWithContext(ctx, wd.Be.Conn, msg, replymsg, log); err != nil {
|
||||
wd.Log.Warn("ChunkReplyRequestMsgWithContext Failed: %s", err)
|
||||
}
|
||||
case "remove":
|
||||
var ro protocol.RemoveOp
|
||||
if err := wd.Be.Enc.Decode(msg.Subject, msg.Data, &ro); err != nil {
|
||||
return errors.Wrap(err, "Decode Failed")
|
||||
}
|
||||
|
||||
var rr protocol.RemoveResult
|
||||
rr, err = wd.Remove(ro)
|
||||
if err != nil {
|
||||
wd.Log.Warn("List: Error: %s", err)
|
||||
rr.Ok = false
|
||||
} else {
|
||||
rr.Ok = true
|
||||
}
|
||||
replymsg := protocol.NewRNSMsg(msg.Reply)
|
||||
replymsg.Data, err = wd.Be.Enc.Encode(msg.Reply, rr)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Encode Failed")
|
||||
}
|
||||
if err = protocol.ChunkSendReplyMsgWithContext(ctx, wd.Be.Conn, msg, replymsg, log); err != nil {
|
||||
wd.Log.Warn("ChunkReplyRequestMsgWithContext Failed: %s", err)
|
||||
}
|
||||
}
|
||||
wd.Log.Info("Command %s Took %s", operation, time.Since(start))
|
||||
}
|
||||
wd.Log.Info("Command Took %s", time.Since(start))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (wd *WorkerData) OpenRepo(oo protocol.OpenOp) (bool, error) {
|
||||
fs, err := localfs.FSStat(oo.Bucket)
|
||||
if err != nil {
|
||||
return false, errors.New("Failed to Open Repository")
|
||||
}
|
||||
return fs.IsDir(), nil
|
||||
func (wd *Worker) LookupClient(clientid string) (rns.Client, error) {
|
||||
return client.Find(clientid)
|
||||
}
|
||||
|
||||
func (wd *WorkerData) Stat(so protocol.StatOp) (fs.FileInfo, error) {
|
||||
fs, err := localfs.FSStat(path.Join(so.Bucket, so.Filename))
|
||||
func (wd *Worker) Open(ctx context.Context, oo rns.OpenRepoOp) (rns.OpenRepoResult, rns.Client, error) {
|
||||
_, err := localfs.FSStat(oo.Bucket)
|
||||
or := rns.OpenRepoResult{}
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Stat")
|
||||
or.Err = errors.New("Repository Not Found")
|
||||
return or, rns.Client{}, errors.New("Failed to Open Repository")
|
||||
}
|
||||
return fs, nil
|
||||
|
||||
/* create a new Client */
|
||||
rnsclient, err := client.Create(oo)
|
||||
if err != nil {
|
||||
return or, rns.Client{}, errors.Wrap(err, "ClientCreate")
|
||||
}
|
||||
or.Ok = true
|
||||
or.ClientID = rnsclient.ClientID
|
||||
|
||||
return or, rnsclient, nil
|
||||
}
|
||||
func (wd *WorkerData) Mkdir(mo protocol.MkdirOp) error {
|
||||
path := path.Join(mo.Bucket, mo.Dir)
|
||||
|
||||
func (wd *Worker) Stat(ctx context.Context, rnsclient rns.Client, so rns.StatOp) (rns.StatResult, error) {
|
||||
fs, err := localfs.FSStat(path.Join(rnsclient.Bucket, so.Filename))
|
||||
if err != nil {
|
||||
return rns.StatResult{Ok: false}, errors.Wrap(err, "Stat")
|
||||
}
|
||||
sr := rns.StatResult{
|
||||
Ok: true,
|
||||
Name: fs.Name(),
|
||||
Size: fs.Size(),
|
||||
}
|
||||
return sr, nil
|
||||
}
|
||||
func (wd *Worker) Mkdir(ctx context.Context, rnsclient rns.Client, mo rns.MkdirOp) (rns.MkdirResult, error) {
|
||||
path := path.Join(rnsclient.Bucket, mo.Dir)
|
||||
if err := localfs.FSMkDir(path); err != nil {
|
||||
return err
|
||||
return rns.MkdirResult{Ok: false}, errors.Wrap(err, "Mkdir")
|
||||
}
|
||||
return nil
|
||||
return rns.MkdirResult{Ok: true}, nil
|
||||
}
|
||||
|
||||
func (wd *WorkerData) Save(so protocol.SaveOp) error {
|
||||
path := path.Join(so.Bucket, so.Dir, so.Name)
|
||||
len, err := localfs.FSSave(path, &so.Data, so.Offset)
|
||||
func (wd *Worker) Save(ctx context.Context, rnsclient rns.Client, so rns.SaveOp) (rns.SaveResult, error) {
|
||||
path := path.Join(rnsclient.Bucket, so.Dir, so.Name)
|
||||
len, err := localfs.FSSave(path, &so.Data)
|
||||
if err != nil {
|
||||
return err
|
||||
return rns.SaveResult{Ok: false}, errors.Wrap(err, "Save")
|
||||
}
|
||||
if len != so.PacketSize {
|
||||
return errors.New("Packetsize != Writtensize")
|
||||
if len != int(so.Filesize) {
|
||||
return rns.SaveResult{Ok: false}, errors.New("Packetsize != Writtensize")
|
||||
}
|
||||
return nil
|
||||
return rns.SaveResult{Ok: true}, nil
|
||||
}
|
||||
|
||||
func (wd *WorkerData) List(lo protocol.ListOp) (protocol.ListResult, error) {
|
||||
var result protocol.ListResult
|
||||
fi, err := localfs.FSListFiles(path.Join(lo.Bucket, lo.BaseDir), lo.SubDir)
|
||||
func (wd *Worker) List(ctx context.Context, rnsclient rns.Client, lo rns.ListOp) (rns.ListResult, error) {
|
||||
var result rns.ListResult
|
||||
fi, err := localfs.FSListFiles(path.Join(rnsclient.Bucket, lo.BaseDir), lo.Recurse)
|
||||
if err != nil {
|
||||
return protocol.ListResult{Ok: false}, errors.Wrap(err, "ListFiles")
|
||||
return rns.ListResult{Ok: false}, errors.Wrap(err, "List")
|
||||
}
|
||||
result.Ok = true
|
||||
result.FI = fi
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (wd *WorkerData) Load(lo protocol.LoadOp) (protocol.LoadResult, error) {
|
||||
var result protocol.LoadResult
|
||||
rd, err := localfs.FSLoadFile(path.Join(lo.Bucket, lo.Dir, lo.Name))
|
||||
func (wd *Worker) Load(ctx context.Context, rnsclient rns.Client, lo rns.LoadOp) (rns.LoadResult, error) {
|
||||
var result rns.LoadResult
|
||||
rd, err := localfs.FSLoadFile(path.Join(rnsclient.Bucket, lo.Dir, lo.Name))
|
||||
if err != nil {
|
||||
return protocol.LoadResult{Ok: false}, errors.Wrap(err, "LoadFile")
|
||||
return rns.LoadResult{Ok: false}, errors.Wrap(err, "Load")
|
||||
}
|
||||
defer rd.Close()
|
||||
if lo.Offset > 0 {
|
||||
_, err = rd.Seek(lo.Offset, 0)
|
||||
if err != nil {
|
||||
return protocol.LoadResult{Ok: false}, errors.Wrap(err, "Seek")
|
||||
return rns.LoadResult{Ok: false}, errors.Wrap(err, "Seek")
|
||||
}
|
||||
}
|
||||
if lo.Length > 0 {
|
||||
result.Data = make([]byte, lo.Length)
|
||||
len, err := rd.Read(result.Data)
|
||||
if err != nil {
|
||||
return protocol.LoadResult{Ok: false}, errors.Wrap(err, "Read")
|
||||
return rns.LoadResult{Ok: false}, errors.Wrap(err, "Read")
|
||||
}
|
||||
if len != lo.Length {
|
||||
return protocol.LoadResult{Ok: false}, errors.Errorf("Requested Length %d != Actual Length %d", lo.Length, len)
|
||||
return rns.LoadResult{Ok: false}, errors.Errorf("Requested Length %d != Actual Length %d", lo.Length, len)
|
||||
}
|
||||
} else {
|
||||
result.Data, err = io.ReadAll(rd)
|
||||
if err != nil {
|
||||
return protocol.LoadResult{Ok: false}, errors.Wrap(err, "ReadAll")
|
||||
return rns.LoadResult{Ok: false}, errors.Wrap(err, "ReadAll")
|
||||
}
|
||||
}
|
||||
//fmt.Printf("%+v\n", result)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (wd *WorkerData) Remove(ro protocol.RemoveOp) (protocol.RemoveResult, error) {
|
||||
var result protocol.RemoveResult
|
||||
if err := localfs.FSRemove(path.Join(ro.Bucket, ro.Dir, ro.Name)); err != nil {
|
||||
return protocol.RemoveResult{Ok: false}, errors.Wrap(err, "FSRemove")
|
||||
}
|
||||
result.Ok = true
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (wd *Worker) Remove(ctx context.Context, rnsclient rns.Client, ro rns.RemoveOp) (rns.RemoveResult, error) {
|
||||
var result rns.RemoveResult
|
||||
if err := localfs.FSRemove(path.Join(rnsclient.Bucket, ro.Dir, ro.Name)); err != nil {
|
||||
return rns.RemoveResult{Ok: false}, errors.Wrap(err, "Remove")
|
||||
}
|
||||
result.Ok = true
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (wd *Worker) Close(ctx context.Context, rnsclient rns.Client, co rns.CloseOp) (rns.CloseResult, error) {
|
||||
if err := client.Remove(rnsclient.ClientID); err != nil {
|
||||
wd.Log.Warn("Can't Find Client %s", rnsclient.ClientID)
|
||||
}
|
||||
/* always return success */
|
||||
return rns.CloseResult{Ok: true}, nil
|
||||
}
|
||||
|
|
|
@ -1,394 +0,0 @@
|
|||
package protocol
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type NatsCommand int
|
||||
|
||||
const (
|
||||
NatsOpenCmd NatsCommand = iota
|
||||
NatsStatCmd
|
||||
NatsMkdirCmd
|
||||
NatsSaveCmd
|
||||
NatsListCmd
|
||||
NatsLoadCmd
|
||||
NatsRemoveCmd
|
||||
)
|
||||
|
||||
type OpenOp struct {
|
||||
Bucket string `json:"bucket"`
|
||||
}
|
||||
type OpenResult struct {
|
||||
Ok bool `json:"ok"`
|
||||
}
|
||||
|
||||
type StatOp struct {
|
||||
Bucket string `json:"bucket"`
|
||||
Filename string `json:"filename"`
|
||||
}
|
||||
|
||||
type StatResult struct {
|
||||
Ok bool `json:"ok"`
|
||||
Size int64 `json:"size"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
type MkdirOp struct {
|
||||
Bucket string `json:"bucket"`
|
||||
Dir string `json:"dir"`
|
||||
}
|
||||
|
||||
type MkdirResult struct {
|
||||
Ok bool `json:"ok"`
|
||||
}
|
||||
|
||||
type SaveOp struct {
|
||||
Bucket string `json:"bucket"`
|
||||
Dir string `json:"dir"`
|
||||
Name string `json:"name"`
|
||||
Filesize int64 `json:"size"`
|
||||
PacketSize int `json:"packet_size"`
|
||||
Offset int64 `json:"offset"`
|
||||
Data []byte `json:"data"`
|
||||
}
|
||||
|
||||
type SaveResult struct {
|
||||
Ok bool `json:"ok"`
|
||||
}
|
||||
|
||||
type ListOp struct {
|
||||
Bucket string `json:"bucket"`
|
||||
BaseDir string `json:"base_dir"`
|
||||
SubDir bool `json:"sub_dir"`
|
||||
}
|
||||
|
||||
type FileInfo struct {
|
||||
Name string `json:"name"`
|
||||
Size int64 `json:"size"`
|
||||
}
|
||||
|
||||
type ListResult struct {
|
||||
Ok bool `json:"ok"`
|
||||
FI []FileInfo `json:"fi"`
|
||||
}
|
||||
|
||||
type LoadOp struct {
|
||||
Bucket string `json:"bucket"`
|
||||
Dir string `json:"dir"`
|
||||
Name string `json:"name"`
|
||||
Length int `json:"length"`
|
||||
Offset int64 `json:"offset"`
|
||||
}
|
||||
|
||||
type LoadResult struct {
|
||||
Ok bool `json:"ok"`
|
||||
Data []byte `json:"data"`
|
||||
}
|
||||
|
||||
type RemoveOp struct {
|
||||
Bucket string `json:"bucket"`
|
||||
Dir string `json:"dir"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
type RemoveResult struct {
|
||||
Ok bool `json:"ok"`
|
||||
}
|
||||
|
||||
const (
|
||||
msgHeaderID string = "X-RNS-MSGID"
|
||||
msgHeaderChunk string = "X-RNS-CHUNKS"
|
||||
msgHeaderChunkSubject string = "X-RNS-CHUNK-SUBJECT"
|
||||
msgHeaderChunkSeq string = "X-RNS-CHUNKS-SEQ"
|
||||
msgHeaderOperation string = "X-RNS-OP"
|
||||
msgHeaderNRI string = "Nats-Request-Info"
|
||||
)
|
||||
|
||||
func copyHeader(msg *nats.Msg) (hdr nats.Header) {
|
||||
hdr = make(nats.Header)
|
||||
hdr.Add(msgHeaderID, msg.Header.Get(msgHeaderID))
|
||||
hdr.Add(msgHeaderChunk, msg.Header.Get(msgHeaderChunk))
|
||||
hdr.Add(msgHeaderOperation, msg.Header.Get(msgHeaderOperation))
|
||||
return hdr
|
||||
}
|
||||
|
||||
type nriT struct {
|
||||
Acc string `json:"acc"`
|
||||
Rtt int `json:"rtt"`
|
||||
}
|
||||
|
||||
func getNRI(msg *nats.Msg) (*nriT, bool) {
|
||||
nri := msg.Header.Get(msgHeaderNRI)
|
||||
if nri == "" {
|
||||
return nil, false
|
||||
}
|
||||
var res nriT
|
||||
if err := json.Unmarshal([]byte(nri), &res); err != nil {
|
||||
return nil, false
|
||||
}
|
||||
return &res, true
|
||||
}
|
||||
|
||||
// NewRNSMSG Returns a New RNS Message (for each "Transaction")
|
||||
func NewRNSMsg(subject string) *nats.Msg {
|
||||
msg := nats.NewMsg(subject)
|
||||
msg.Header.Set(msgHeaderID, randStringBytesMaskImprSrcSB(16))
|
||||
return msg
|
||||
}
|
||||
|
||||
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||||
const (
|
||||
letterIdxBits = 6 // 6 bits to represent a letter index
|
||||
letterIdxMask = 1<<letterIdxBits - 1 // All 1-bits, as many as letterIdxBits
|
||||
letterIdxMax = 63 / letterIdxBits // # of letter indices fitting in 63 bits
|
||||
)
|
||||
|
||||
var src = rand.NewSource(time.Now().UnixNano())
|
||||
|
||||
func randStringBytesMaskImprSrcSB(n int) string {
|
||||
sb := strings.Builder{}
|
||||
sb.Grow(n)
|
||||
// A src.Int63() generates 63 random bits, enough for letterIdxMax characters!
|
||||
for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; {
|
||||
if remain == 0 {
|
||||
cache, remain = src.Int63(), letterIdxMax
|
||||
}
|
||||
if idx := int(cache & letterIdxMask); idx < len(letterBytes) {
|
||||
sb.WriteByte(letterBytes[idx])
|
||||
i--
|
||||
}
|
||||
cache >>= letterIdxBits
|
||||
remain--
|
||||
}
|
||||
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
func ChunkSendReplyMsgWithContext(ctx context.Context, conn *nats.Conn, replyto *nats.Msg, msg *nats.Msg, log func(string, ...interface{})) error {
|
||||
if len(msg.Header.Get(msgHeaderID)) == 0 {
|
||||
return errors.New("MessageID Not Set")
|
||||
}
|
||||
|
||||
var maxchunksize int = int(0.95 * float32(conn.MaxPayload()))
|
||||
maxchunksize = 1024000 * 0.95
|
||||
datasize := len(msg.Data)
|
||||
log("ChunkSendReplyMsgWithContext: MsgID %s - Headers %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data))
|
||||
|
||||
if len(msg.Data) < maxchunksize {
|
||||
/* data is less then our maxchunksize, so we can just send it */
|
||||
log("ChunkSendReplyMsgWithContext: Short Reply Message %s", msg.Header.Get(msgHeaderID))
|
||||
err := replyto.RespondMsg(msg)
|
||||
return errors.Wrap(err, "Short Reply Message Send Failure")
|
||||
}
|
||||
|
||||
/* need to Split the Data into Chunks
|
||||
* we will end up sending pages + 1 messages
|
||||
* as the initial message contains data as well
|
||||
*/
|
||||
pages := datasize / maxchunksize
|
||||
initialchunk := nats.NewMsg(msg.Subject)
|
||||
initialchunk.Header = copyHeader(msg)
|
||||
initialchunk.Header.Set(msgHeaderChunk, fmt.Sprintf("%d", pages))
|
||||
if len(msg.Data) < maxchunksize {
|
||||
maxchunksize = len(msg.Data)
|
||||
}
|
||||
initialchunk.Data = msg.Data[:maxchunksize]
|
||||
log("Chunking Initial Reply Message %s (%s)- pages %d, len %d First Chunk %d", initialchunk.Header.Get(msgHeaderID), initialchunk.Header, pages, len(msg.Data), len(initialchunk.Data))
|
||||
chunkchannelmsg, err := conn.RequestMsgWithContext(ctx, initialchunk)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "ChunkSendReplyMsgWithContext")
|
||||
}
|
||||
/* Reply Message just has a header with the subject we send the rest of the chunks to */
|
||||
chunkid := chunkchannelmsg.Header.Get(msgHeaderChunkSubject)
|
||||
if chunkid == "" {
|
||||
return errors.New("Chunked Reply Response didn't include ChunkID")
|
||||
}
|
||||
var chunksubject string
|
||||
if nri, ok := getNRI(replyto); ok {
|
||||
chunksubject = fmt.Sprintf("chunk.%s.send.%s", nri.Acc, chunkid)
|
||||
} else {
|
||||
chunksubject = fmt.Sprintf("chunk.send.%s", chunkid)
|
||||
}
|
||||
log("Chunk Reply Subject %s", chunksubject)
|
||||
for i := 1; i <= pages; i++ {
|
||||
chunkmsg := nats.NewMsg(chunksubject)
|
||||
chunkmsg.Header = copyHeader(msg)
|
||||
chunkmsg.Header.Set(msgHeaderChunkSeq, fmt.Sprintf("%d", i))
|
||||
start := maxchunksize * i
|
||||
end := maxchunksize * (i + 1)
|
||||
/* make sure we don't overrun our slice */
|
||||
if end > len(msg.Data) {
|
||||
end = len(msg.Data)
|
||||
}
|
||||
chunkmsg.Data = msg.Data[start:end]
|
||||
log("Sending Reply Chunk %s - Page: %d of %d (%d:%d)", chunkmsg.Header.Get(msgHeaderID), i, pages, start, end)
|
||||
var chunkack *nats.Msg
|
||||
if i < pages {
|
||||
log("Sending Chunk to %s", chunkmsg.Subject)
|
||||
chunkack, err = conn.RequestMsgWithContext(ctx, chunkmsg)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "ChunkSendReplyMsgWithContext")
|
||||
}
|
||||
log("Chunk Ack Reply: %s %s - Page %d", chunkack.Header.Get(msgHeaderID), chunkack.Header, i)
|
||||
} else {
|
||||
err := conn.PublishMsg(chunkmsg)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "ChunkSendReplyMsgWithContext")
|
||||
}
|
||||
}
|
||||
|
||||
/* all chunkackorreply */
|
||||
if i == pages {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return errors.New("Failed")
|
||||
}
|
||||
|
||||
func ChunkSendRequestMsgWithContext(ctx context.Context, conn *nats.Conn, msg *nats.Msg, log func(string, ...interface{})) (*nats.Msg, error) {
|
||||
if len(msg.Header.Get(msgHeaderID)) == 0 {
|
||||
return nil, errors.New("MessageID Not Set")
|
||||
}
|
||||
|
||||
var maxchunksize int = int(0.95 * float32(conn.MaxPayload()))
|
||||
maxchunksize = 1024000 * 0.95
|
||||
datasize := len(msg.Data)
|
||||
log("ChunkSendRequestMsgWithContext: MsgID %s - Headers %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data))
|
||||
|
||||
if len(msg.Data) < maxchunksize {
|
||||
/* data is less then our maxchunksize, so we can just send it */
|
||||
log("Short SendRequest MsgID %s - %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data))
|
||||
reply, err := conn.RequestMsgWithContext(ctx, msg)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Short Message Send Failure")
|
||||
}
|
||||
log("Short ReplyRequest MsgID %s Headers %s Size: %d", reply.Header.Get(msgHeaderID), reply.Header, len(reply.Data))
|
||||
return ChunkReadRequestMsgWithContext(ctx, conn, reply, log)
|
||||
}
|
||||
|
||||
/* need to Split the Data into Chunks
|
||||
* we will end up sending pages + 1 messages
|
||||
* as the initial message contains data as well
|
||||
*/
|
||||
pages := datasize / maxchunksize
|
||||
|
||||
initialchunk := nats.NewMsg(msg.Subject)
|
||||
initialchunk.Header = copyHeader(msg)
|
||||
initialchunk.Header.Set(msgHeaderChunk, fmt.Sprintf("%d", pages))
|
||||
|
||||
initialchunk.Data = msg.Data[:maxchunksize]
|
||||
log("Chunking Send Request MsgID %s - %s- pages %d, len %d First Chunk %d", initialchunk.Header.Get(msgHeaderID), initialchunk.Header, pages, len(msg.Data), len(initialchunk.Data))
|
||||
chunkchannelmsg, err := conn.RequestMsgWithContext(ctx, initialchunk)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "chunkRequestMsgWithContext")
|
||||
}
|
||||
/* Reply Message just has a header with the subject we send the rest of the chunks to */
|
||||
chunkid := chunkchannelmsg.Header.Get(msgHeaderChunkSubject)
|
||||
if chunkid == "" {
|
||||
return nil, errors.New("Chunked Reply Response didn't include ChunkID")
|
||||
}
|
||||
var chunksubject string
|
||||
if nri, ok := getNRI(chunkchannelmsg); ok {
|
||||
chunksubject = fmt.Sprintf("chunk.%s.send.%s", nri.Acc, chunkid)
|
||||
} else {
|
||||
chunksubject = fmt.Sprintf("chunk.send.%s", chunkid)
|
||||
}
|
||||
|
||||
for i := 1; i <= pages; i++ {
|
||||
chunkmsg := nats.NewMsg(chunksubject)
|
||||
chunkmsg.Header = copyHeader(msg)
|
||||
chunkmsg.Header.Set(msgHeaderChunkSeq, fmt.Sprintf("%d", i))
|
||||
start := maxchunksize * i
|
||||
end := maxchunksize * (i + 1)
|
||||
/* make sure we don't overrun our slice */
|
||||
if end > len(msg.Data) {
|
||||
end = len(msg.Data)
|
||||
}
|
||||
chunkmsg.Data = msg.Data[start:end]
|
||||
log("Sending Request Chunk %s %s to %s- Page: %d (%d:%d)", chunkmsg.Header.Get(msgHeaderID), chunkmsg.Header, chunkmsg.Subject, i, start, end)
|
||||
var chunkackorreply *nats.Msg
|
||||
chunkackorreply, err = conn.RequestMsgWithContext(ctx, chunkmsg)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Chunk Send")
|
||||
}
|
||||
log("got Result %s - %s", chunkmsg.Header.Get(msgHeaderID), chunkmsg.Header)
|
||||
/* only the last Chunk Reply will contain the actual Response from the other side */
|
||||
if i == pages {
|
||||
log("SendRequest Chunk Reply: MsgID %s Headers %s Size: %d", chunkackorreply.Header.Get(msgHeaderID), chunkackorreply.Header, len(chunkackorreply.Data))
|
||||
return ChunkReadRequestMsgWithContext(ctx, conn, chunkackorreply, log)
|
||||
}
|
||||
}
|
||||
return nil, errors.New("Failed")
|
||||
}
|
||||
|
||||
func ChunkReadRequestMsgWithContext(ctx context.Context, conn *nats.Conn, msg *nats.Msg, log func(string, ...interface{})) (*nats.Msg, error) {
|
||||
if len(msg.Header.Get(msgHeaderID)) == 0 {
|
||||
return nil, errors.New("MessageID Not Set")
|
||||
}
|
||||
log("ChunkReadRequestMsgWithContext: MsgID %s - Headers %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data))
|
||||
chunked := msg.Header.Get(msgHeaderChunk)
|
||||
if chunked != "" {
|
||||
pages, err := strconv.Atoi(chunked)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Couldn't get Chunk Page Count")
|
||||
}
|
||||
log("Chunked Message Recieved: %s - %s - %d pages", msg.Header.Get(msgHeaderID), msg.Header, pages)
|
||||
chunktransfer := randStringBytesMaskImprSrcSB(16)
|
||||
chunkchan := make(chan *nats.Msg, 10)
|
||||
var chunktransfersubject string
|
||||
if nri, ok := getNRI(msg); ok {
|
||||
chunktransfersubject = fmt.Sprintf("chunk.%s.recieve.%s", nri.Acc, chunktransfer)
|
||||
} else {
|
||||
chunktransfersubject = fmt.Sprintf("chunk.recieve.%s", chunktransfer)
|
||||
}
|
||||
sub, err := conn.QueueSubscribeSyncWithChan(chunktransfersubject, chunktransfer, chunkchan)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Couldn't Subscribe to Chunk Channel")
|
||||
}
|
||||
sub.SetPendingLimits(1000, 64*1024*1024)
|
||||
log("Subscription: %+v", sub)
|
||||
defer sub.Unsubscribe()
|
||||
defer close(chunkchan)
|
||||
chunksubmsg := nats.NewMsg(msg.Reply)
|
||||
chunksubmsg.Header = copyHeader(msg)
|
||||
chunksubmsg.Header.Add(msgHeaderChunkSubject, chunktransfer)
|
||||
msg.RespondMsg(chunksubmsg)
|
||||
/* pages - 1 because we got first Chunk in original message */
|
||||
for i := 1; i <= pages; i++ {
|
||||
log("Pending MsgID %s Chunk %d of %d on %s", chunksubmsg.Header.Get(msgHeaderID), i, pages, chunktransfersubject)
|
||||
select {
|
||||
case chunk := <-chunkchan:
|
||||
seq, _ := strconv.Atoi(chunk.Header.Get(msgHeaderChunkSeq))
|
||||
log("Got MsgID %s - %s Chunk %d %d", chunk.Header.Get(msgHeaderID), chunk.Header, seq, i)
|
||||
msg.Data = append(msg.Data, chunk.Data...)
|
||||
if i < pages {
|
||||
ackChunk := nats.NewMsg(chunk.Subject)
|
||||
ackChunk.Header = copyHeader(chunk)
|
||||
log("sending ack %d %d", i, pages)
|
||||
err := chunk.RespondMsg(ackChunk)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Chunk Reply Error")
|
||||
}
|
||||
} else {
|
||||
log("Chunked Messages.... %d - %d", i, pages)
|
||||
msg.Reply = chunk.Reply
|
||||
}
|
||||
case <-ctx.Done():
|
||||
log("Context Canceled")
|
||||
return nil, context.DeadlineExceeded
|
||||
}
|
||||
}
|
||||
log("Chunked Messages Done - %s - %s Final Size %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data))
|
||||
}
|
||||
return msg, nil
|
||||
}
|
Loading…
Add table
Reference in a new issue