From a5d2db2a6a5a2efa113fb214e6c5918ac46f84a9 Mon Sep 17 00:00:00 2001 From: Justin Hammond Date: Thu, 14 Oct 2021 12:31:49 +0800 Subject: [PATCH] use new nats-restic module for Nats Communication --- go.mod | 9 +- go.sum | 23 +- internal/backend/nats/logger.go | 79 +++++++ internal/backend/nats/nats.go | 369 ++++++++++---------------------- 4 files changed, 212 insertions(+), 268 deletions(-) create mode 100644 internal/backend/nats/logger.go diff --git a/go.mod b/go.mod index e549fd3b..379b4a9b 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,8 @@ require ( github.com/Azure/azure-sdk-for-go v55.6.0+incompatible github.com/Azure/go-autorest/autorest v0.11.19 // indirect github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect + github.com/Fishwaldo/go-logadapter v0.0.2 + github.com/Fishwaldo/restic-nats v0.0.1-rc1 github.com/cenkalti/backoff/v4 v4.1.1 github.com/cespare/xxhash/v2 v2.1.1 github.com/dnaeon/go-vcr v1.2.0 // indirect @@ -18,8 +20,6 @@ require ( github.com/kurin/blazer v0.5.3 github.com/minio/minio-go/v7 v7.0.14 github.com/minio/sha256-simd v1.0.0 - github.com/nats-io/nats-server/v2 v2.6.1 // indirect - github.com/nats-io/nats.go v1.12.3 github.com/ncw/swift/v2 v2.0.0 github.com/pkg/errors v0.9.1 github.com/pkg/profile v1.6.0 @@ -32,11 +32,12 @@ require ( golang.org/x/net v0.0.0-20210614182718-04defd469f4e golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c - golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c + golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e golang.org/x/text v0.3.6 google.golang.org/api v0.50.0 - google.golang.org/protobuf v1.27.1 // indirect gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 ) go 1.14 + +//replace github.com/Fishwaldo/restic-nats => ../restic-nats diff --git a/go.sum b/go.sum index d501be5d..9a046e12 100644 --- a/go.sum +++ b/go.sum @@ -64,10 +64,15 @@ github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUM github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/Fishwaldo/go-logadapter v0.0.2 h1:RxFOr+bEDqQ1rPUmjUX5u8fGLCKY0Lyea+9AxDqzaW4= +github.com/Fishwaldo/go-logadapter v0.0.2/go.mod h1:aRbQ8rWdpeD0WWo241ctqgk/yRto8Axg09EkwWiVGK0= +github.com/Fishwaldo/restic-nats v0.0.1-rc1 h1:Byhk87ggM4AHPU+J1NV50zwZ1D/E7p33gEvl8cR83lo= +github.com/Fishwaldo/restic-nats v0.0.1-rc1/go.mod h1:GlR1upvCwuE0pTyxdU1Ztb8hxCJ63fNH4Gjxo4hG1nc= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= +github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM= github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1PxPBQ= @@ -240,8 +245,9 @@ github.com/klauspost/cpuid/v2 v2.0.4 h1:g0I61F2K2DjRHz1cnxlkNSBIaePVoJIjjnHui8QH github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -283,8 +289,9 @@ github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI= github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= github.com/nats-io/nats-server/v2 v2.6.1 h1:cJy+ia7/4EaJL+ZYDmIy2rD1mDWTfckhtPBU0GYo8xM= github.com/nats-io/nats-server/v2 v2.6.1/go.mod h1:Az91TbZiV7K4a6k/4v6YYdOKEoxCXj+iqhHVf/MlrKo= -github.com/nats-io/nats.go v1.12.3 h1:te0GLbRsjtejEkZKKiuk46tbfIn6FfCSv3WWSo1+51E= github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.13.0 h1:LvYqRB5epIzZWQp6lmeltOOZNLqCvm4b+qfvzZO03HE= +github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= @@ -362,8 +369,10 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= +go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -535,8 +544,8 @@ golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210603125802-9665404d3644/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e h1:WUoyKPm6nCo1BnNUvPGnFG3T5DUVem42yDJZZ4CNxMA= +golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -607,8 +616,9 @@ golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.1.4 h1:cVngSRcfgyZCzys3KYOpCFa+4dqX/Oub9tAq00ttGVs= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -736,8 +746,9 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/ini.v1 v1.57.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU= diff --git a/internal/backend/nats/logger.go b/internal/backend/nats/logger.go new file mode 100644 index 00000000..70ba6062 --- /dev/null +++ b/internal/backend/nats/logger.go @@ -0,0 +1,79 @@ +package nats + + +import ( + "os" + "fmt" + "github.com/restic/restic/internal/debug" + "github.com/Fishwaldo/go-logadapter" +) + +/* compile check against our logAdapter interface */ + +var _ logadapter.Logger = (*resticLogger)(nil) + +/* a custom logger implementation just for Restic */ +type resticLogger struct { + +} + +// Log a Trace Message +func (l *resticLogger) Trace(message string, params ...interface{}) { + debug.Log(message, params...) +} +// Log a Debug Message +func (l *resticLogger) Debug(message string, params ...interface{}) { + debug.Log(message, params...) +} +// Log a Info Message +func (l *resticLogger) Info(message string, params ...interface{}) { + debug.Log(message, params...) +} +// Log a Warn Message +func (l *resticLogger) Warn(message string, params ...interface{}) { + debug.Log(message, params...) +} +// Log a Error Message +func (l *resticLogger) Error(message string, params ...interface{}) { + fmt.Printf("Nats Error: %s\n", fmt.Sprintf(message, params...)) + debug.Log(message, params...) +} +// Log a Fatal Message (some implementations may call os.exit() here) +func (l *resticLogger) Fatal(message string, params ...interface{}) { + fmt.Printf("Nats Fatal: %s\n", fmt.Sprintf(message, params...)) + debug.Log(message, params...) + os.Exit(-1) +} +// Log a Panic Message (some implmentations may call Panic) +func (l *resticLogger) Panic(message string, params ...interface{}) { + fmt.Printf("Nats Panic: %s\n", fmt.Sprintf(message, params...)) + debug.Log(message, params...) + panic(fmt.Sprintf(message, params...)) +} +// Create a New Logger Instance with Name +func (l *resticLogger) New(name string) (logadapter.Logger) { + return l +} +// Add Key/Value Pairs for Structured Logging and return a new Logger +func (l *resticLogger) With(key string, value interface{}) ( logadapter.Logger) { + return l +} +// Set the Log Prefix +func (l *resticLogger) SetPrefix(name string) { +} +// Get the Log Prefix +func (l *resticLogger) GetPrefix() (string) { + return "" +} +// Set Logging Level +func (l *resticLogger) SetLevel(logadapter.Log_Level) { + +} +// Get Logging Level +func(l *resticLogger) GetLevel() (logadapter.Log_Level) { + return logadapter.LOG_TRACE +} +// Sync/Flush the Log Buffers +func (l *resticLogger) Sync() { + +} diff --git a/internal/backend/nats/nats.go b/internal/backend/nats/nats.go index 8d48fa92..ba50b208 100644 --- a/internal/backend/nats/nats.go +++ b/internal/backend/nats/nats.go @@ -9,195 +9,23 @@ import ( "os" "path" "path/filepath" - "time" - "github.com/nats-io/nats.go" "github.com/restic/restic/internal/backend" - "github.com/restic/restic/internal/backend/nats/protocol" + "github.com/Fishwaldo/restic-nats" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" ) -// make sure the rest backend implements restic.Backend +// make sure the nats backend implements restic.Backend var _ restic.Backend = &Backend{} -const defaultLayout = "default" - -// Backend uses the REST protocol to access data stored on a server. +// Backend uses the nats protocol to access workers that interface with the actual repository on behalf of restic type Backend struct { sem *backend.Semaphore backend.Layout cfg Config - conn *nats.Conn - enc nats.Encoder -} - -func connectNats(be *Backend) error { - server := be.cfg.Server.Hostname() - port := be.cfg.Server.Port() - if port == "" { - port = "4222" - } - url := fmt.Sprintf("nats://%s:%s", server, port) - - /* Check Credential File Exists */ - _, err := os.Stat(be.cfg.Credential) - if err != nil { - return errors.Wrap(err, "credential file missing") - } - - var options []nats.Option - if len(be.cfg.Credential) > 0 { - options = append(options, nats.UserCredentials(be.cfg.Credential)) - } - if len(be.cfg.Server.User.Username()) > 0 { - pass, _ := be.cfg.Server.User.Password() - options = append(options, nats.UserInfo(be.cfg.Server.User.Username(), pass)) - } - options = append(options, nats.ClosedHandler(natsClosedCB)) - options = append(options, nats.DisconnectHandler(natsDisconnectedCB)) - - be.conn, err = nats.Connect(url, options...) - if err != nil { - return errors.Wrap(err, "nats connection failed") - } - - if size := be.conn.MaxPayload(); size < 8388608 { - return errors.New("NATS Server Max Payload Size is below 8Mb") - } - - if !be.conn.HeadersSupported() { - return errors.New("server does not support Headers") - } - - be.enc = nats.EncoderForType("gob") - if be.enc == nil { - return errors.New("Can't Load json Decoder") - } - be.sem, err = backend.NewSemaphore(be.cfg.Connections) - - fmt.Printf("Connected to Nats Server: %s (Cluster: %s)\n", be.conn.ConnectedServerName(), be.conn.ConnectedClusterName()) - - if err != nil { - return err - } - return nil -} - -func natsClosedCB(conn *nats.Conn) { - -} - -func natsDisconnectedCB(conn *nats.Conn) { - -} - -func (be *Backend) SendMsgWithReply(ctx context.Context, op protocol.NatsCommand, send interface{}, recv interface{}) error { - var operation string - be.sem.GetToken() - defer be.sem.ReleaseToken() - start := time.Now() - switch op { - case protocol.NatsOpenCmd: - if _, ok := send.(protocol.OpenOp); !ok { - return errors.Errorf("Struct Supplied to SendMsgWithReply is not a openOP: %T", send) - } - if _, ok := recv.(*protocol.OpenResult); !ok { - return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a openResult: %T", recv) - } - operation = "open" - case protocol.NatsStatCmd: - if _, ok := send.(protocol.StatOp); !ok { - return errors.Errorf("Struct Supplied to SendMsgWithReply is not a statOp: %T", send) - } - if _, ok := recv.(*protocol.StatResult); !ok { - return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a statResult %T", recv) - } - operation = "stat" - case protocol.NatsMkdirCmd: - if _, ok := send.(protocol.MkdirOp); !ok { - return errors.Errorf("Struct Supplied to SendMsgWithReply is not a mkdirOp: %T", send) - } - if _, ok := recv.(*protocol.MkdirResult); !ok { - return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a mkdirResult %T", recv) - } - operation = "mkdir" - case protocol.NatsSaveCmd: - if _, ok := send.(protocol.SaveOp); !ok { - return errors.Errorf("Struct Supplied to SendMsgWithReply is not a saveOp: %T", send) - } - if _, ok := recv.(*protocol.SaveResult); !ok { - return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a saveResult %T", recv) - } - operation = "save" - case protocol.NatsListCmd: - if _, ok := send.(protocol.ListOp); !ok { - return errors.Errorf("Struct Supplied to SendMsgWithReply is not a listOp: %T", send) - } - if _, ok := recv.(*protocol.ListResult); !ok { - return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a listResult %T", recv) - } - operation = "list" - case protocol.NatsLoadCmd: - if _, ok := send.(protocol.LoadOp); !ok { - return errors.Errorf("Struct Supplied to SendMsgWithReply is not a loadOp: %T", send) - } - if _, ok := recv.(*protocol.LoadResult); !ok { - return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a loadResult %T", recv) - } - operation = "load" - case protocol.NatsRemoveCmd: - if _, ok := send.(protocol.RemoveOp); !ok { - return errors.Errorf("Struct Supplied to SendMsgWithReply is not a removeOp: %T", send) - } - if _, ok := recv.(*protocol.RemoveResult); !ok { - return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a removeOp %T", recv) - } - operation = "remove" - - } - var err error - subject := fmt.Sprintf("repo.commands.%s", operation) - msg := protocol.NewRNSMsg(subject) - - defer func() { - if err == nil { - debug.Log("SendMsgWithReply (%s - %s) Took %s - %d Bytes", operation, msg.Header.Get("X-RNS-MSGID"), time.Since(start), len(msg.Data)) - } else { - debug.Log("SendMsgWithREply (%s - %s) Failed: %s (Took %s - %d Bytes", operation, msg.Header.Get("X-RNS-MSGID"), err, time.Since(start), len(msg.Data)) - } - }() - - msg.Header.Set("X-RNS-OP", operation) - - msg.Data, err = be.enc.Encode(subject, send) - if err != nil { - return errors.Wrap(err, "Encoding Failed") - } - msg.Reply = nats.NewInbox() - //debug.Log("Sending %s %d\n", msg.Subject, len(msg.Data)) - - /* check the size of the Data Field. If its close to our NATS max payload size - * then we will chunk the transfer instead - */ - // log := func(msg string, args ...interface{}) { - // fmt.Printf(msg, args...) - // fmt.Println() - // } - - var chunkedmsg *nats.Msg - - chunkedmsg, err = protocol.ChunkSendRequestMsgWithContext(ctx, be.conn, msg, debug.Log) - if err != nil { - return errors.Wrapf(err, "ChunkRequestMsgWithContext Error: %d", len(msg.Data)) - } - if len(chunkedmsg.Data) > 0 { - if err := be.enc.Decode(chunkedmsg.Subject, chunkedmsg.Data, recv); err != nil { - return errors.Wrapf(err, "Decode Failed %s %s %d", chunkedmsg.Header.Get("X-RNS-MSGID"), chunkedmsg.Header, len(chunkedmsg.Data)) - } - } - return nil + rns *rns.ResticNatsClient } func Open(ctx context.Context, cfg Config) (*Backend, error) { @@ -211,27 +39,27 @@ func Open(ctx context.Context, cfg Config) (*Backend, error) { be := &Backend{ sem: sem, cfg: cfg, + Layout: &backend.DefaultLayout{Join: path.Join}, } - l, err := backend.ParseLayout(ctx, be, "default", defaultLayout, "") + host, _ := os.Hostname() + be.rns, err = rns.New(*be.cfg.Server, rns.WithName(host), rns.WithLogger(&resticLogger{})) if err != nil { return nil, err } + fmt.Printf("Connected to Nats Server: %s (Cluster: %s)\n", be.rns.Conn.ConnectedServerName(), be.rns.Conn.ConnectedClusterName()) - be.Layout = l + hostname, _ := os.Hostname() - err = connectNats(be) + result, err := be.rns.OpenRepo(ctx, hostname) if err != nil { - return nil, errors.Wrap(err, "open nats failed") + // Communication Error + return nil, err } - - co := protocol.OpenOp{Bucket: be.cfg.Repo} - var result protocol.OpenResult - if err := be.SendMsgWithReply(ctx, protocol.NatsOpenCmd, co, &result); err != nil { - return nil, errors.Wrap(err, "OpenOp Failed") + if !result.Ok { + // Backend Returned a Error + return nil, result.Err } - debug.Log("Open Result: %+v\n", result) - return be, nil } @@ -243,9 +71,9 @@ func Create(ctx context.Context, cfg Config) (*Backend, error) { if err != nil { return nil, errors.Wrap(err, "Create Repo") } - _, err = be.Stat(ctx, restic.Handle{Type: restic.ConfigFile}) - if err == nil { - return nil, errors.New("config file already exists") + exist, _ := be.Test(ctx, restic.Handle{Type: restic.ConfigFile}) + if exist { + return nil, errors.Errorf("config file already exists") } for _, d := range be.Paths() { @@ -272,58 +100,60 @@ func (b *Backend) Hasher() hash.Hash { // Test a boolean value whether a File with the name and type exists. func (b *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) { debug.Log("Test %s - %s", b.cfg.Server.String(), b.Filename(h)) - _, err := b.Stat(ctx, h) + res, err := b.Stat(ctx, h) if err != nil { - return false, nil + return false, err } - return true, nil + if res.Name == b.Filename(h) { + return true, nil + } + return false, nil } // Remove removes a File described by h. func (b *Backend) Remove(ctx context.Context, h restic.Handle) error { debug.Log("Remove %s - %s", b.cfg.Server.String(), b.Filename(h)) - ro := protocol.RemoveOp{Bucket: b.cfg.Repo, Dir: b.Dirname(h), Name: filepath.Base(b.Filename(h))} - var result protocol.RemoveResult - if err := b.SendMsgWithReply(context.Background(), protocol.NatsRemoveCmd, ro, &result); err != nil { - return errors.Wrap(err, "Remove: SendMsgWithReply Failed") + result, err := b.rns.Remove(ctx, b.Dirname(h), filepath.Base(b.Filename(h))) + if err != nil { + //Communication Error + return errors.Wrap(err, "save") } - if result.Ok { - return nil + if !result.Ok { + //Backend returned a Error + return result.Err } - return errors.Errorf("Remove Failed") + return nil } // Close the backend func (b *Backend) Close() error { debug.Log("Close %s", b.cfg.Server.String()) + result, err := b.rns.Close(context.Background()) + if err != nil { + // Communication Error + return errors.Wrap(err, "close") + } + if !result.Ok { + // Backend Returned a Error + return result.Err + } return nil } // Save stores the data from rd under the given handle. func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { debug.Log("Save %s - %s", b.cfg.Server.String(), b.Filename(h)) - var so protocol.SaveOp - so.Dir = b.Dirname(h) - so.Name = filepath.Base(b.Filename(h)) - so.Filesize = rd.Length() - so.Bucket = b.cfg.Repo - var err error - so.Data, err = io.ReadAll(rd) - so.PacketSize = len(so.Data) + result, err := b.rns.Save(ctx, b.Dirname(h), filepath.Base(b.Filename(h)), rd) if err != nil { - return errors.Wrap(err, "Save") + // Communication Error + return errors.Wrap(err, "save") } - so.Offset = 0 - var result protocol.SaveResult - if err := b.SendMsgWithReply(context.Background(), protocol.NatsSaveCmd, so, &result); err != nil { - return errors.Wrap(err, "Save: SendMsgWithReply Failed") + if !result.Ok { + // Backend Returned a Error + return result.Err } - debug.Log("Save Result: %+v", result) - if result.Ok { - return nil - } - return errors.Errorf("Save Failed") + return nil } // Load runs fn with a reader that yields the contents of the file at h at the @@ -336,34 +166,39 @@ func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRea // Implementations are encouraged to use backend.DefaultLoad func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { debug.Log("Load %s - %s (start %d length %d)", b.cfg.Server.String(), b.Filename(h), offset, length) - lo := protocol.LoadOp{Bucket: b.cfg.Repo, Dir: b.Dirname(h), Name: filepath.Base(b.Filename(h)), Length: length, Offset: offset} - var result protocol.LoadResult - if err := b.SendMsgWithReply(context.Background(), protocol.NatsLoadCmd, lo, &result); err != nil { - return errors.Wrap(err, "Save: SendMsgWithReply Failed") + + result, err := b.rns.Load(ctx, b.Dirname(h), filepath.Base(b.Filename(h)), length, offset) + if err != nil { + //Communication Error + return errors.Wrap(err, "Load") } if !result.Ok { - return errors.Errorf("Load Failed") + // Backend Returned a Error + return result.Err } rd := bytes.NewReader(result.Data) if err := fn(rd); err != nil { - return errors.Wrap(err, "Load Read") + return err } return nil } // Stat returns information about the File identified by h. func (b *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { - debug.Log("Stat %s - %s", b.cfg.Server.String(), b.Filename(h)) - op := protocol.StatOp{Bucket: b.cfg.Repo, Filename: b.Filename(h)} - var result protocol.StatResult - if err := b.SendMsgWithReply(context.Background(), protocol.NatsStatCmd, op, &result); err != nil { - return restic.FileInfo{}, errors.Wrap(err, "statOp Failed") + fmt.Printf("Backend: %+v\n", b) + debug.Log("Stat %s", b.Filename(h)) + + result, err := b.rns.Stat(ctx, b.Dirname(h), b.Filename(h)) + + if err != nil { + //Communication Error + return restic.FileInfo{}, errors.Wrap(err, "Stat") } - if result.Ok { - return restic.FileInfo{Size: result.Size, Name: h.Name}, nil - } else { - return restic.FileInfo{}, errors.New("File does not exist") + if !result.Ok { + // Backend Returned a Error + return restic.FileInfo{}, result.Err } + return restic.FileInfo{Size: result.Size, Name: h.Name}, nil } // List runs fn for each file in the backend which has the type t. When an @@ -375,16 +210,19 @@ func (b *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, e // The function fn is called in the same Goroutine that List() is called // from. func (b *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { - basedir, subdirs := b.Basedir(t) - debug.Log("List %s - %s - Subdirs? %t", b.cfg.Server.String(), basedir, subdirs) - op := protocol.ListOp{Bucket: b.cfg.Repo, BaseDir: basedir, SubDir: subdirs} - var result protocol.ListResult - if err := b.SendMsgWithReply(context.Background(), protocol.NatsListCmd, op, &result); err != nil { - return errors.Wrap(err, "listOp Failed") + dir, recursive := b.Basedir(t) + debug.Log("List %s - %s - Subdirs? %t", b.cfg.Server.String(),dir, recursive) + result, err := b.rns.List(ctx, dir, recursive) + if err != nil { + //Communication Error + return errors.Wrap(err, "List") + } + if !result.Ok { + //Backend Returned a Error + return result.Err } for _, fi := range result.FI { rfi := restic.FileInfo{Name: fi.Name, Size: fi.Size} - if err := fn(rfi); err != nil { return err } @@ -395,37 +233,52 @@ func (b *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.Fi // IsNotExist returns true if the error was caused by a non-existing file // in the backend. func (b *Backend) IsNotExist(err error) bool { - debug.Log("IsNotExist %s (TODO) - %s", b.cfg.Server.String(), err) + debug.Log("IsNotExist %s (TODO) - %T", b.cfg.Server.String(), err) fmt.Printf("IsNotExist Called\n") return false } +func (b *Backend) removeKeys(ctx context.Context, t restic.FileType) error { + return b.List(ctx, t, func(fi restic.FileInfo) error { + return b.Remove(ctx, restic.Handle{Type: t, Name: fi.Name}) + }) +} + // Delete removes all data in the backend. func (b *Backend) Delete(ctx context.Context) error { - debug.Log("Delete %s (TODO)", b.cfg.Server.String()) - return errors.Errorf("TODO Delete") -} + alltypes := []restic.FileType{ + restic.PackFile, + restic.KeyFile, + restic.LockFile, + restic.SnapshotFile, + restic.IndexFile} -func (b *Backend) Join(p ...string) string { - return path.Join(p...) -} + for _, t := range alltypes { + err := b.removeKeys(ctx, t) + if err != nil { + return nil + } + } -func (b *Backend) ReadDir(ctx context.Context, dir string) ([]os.FileInfo, error) { - debug.Log("ReadDir %s (TODO) - %s", b.cfg.Server.String(), dir) - return nil, errors.Errorf("TODO: ReadDir") + err := b.Remove(ctx, restic.Handle{Type: restic.ConfigFile}) + if err != nil && !b.IsNotExist(err) { + return err + } + + return nil } func (b *Backend) Mkdir(ctx context.Context, dir string) error { debug.Log("Mkdir %s - %s", b.cfg.Server.String(), dir) - op := protocol.MkdirOp{Bucket: b.cfg.Repo, Dir: dir} - var result protocol.MkdirResult - if err := b.SendMsgWithReply(context.Background(), protocol.NatsMkdirCmd, op, &result); err != nil { - return errors.Wrap(err, "natsMkdirCmd Failed") + result, err := b.rns.Mkdir(ctx, dir) + if err != nil { + //Communication Error + return errors.Wrap(err, "Mkdir") } - if result.Ok { - return nil - } else { - return errors.New("mkdir Failed") + if !result.Ok { + //Backend Returned a Error + return result.Err } + return nil }