From 487f2dce5d3776fe6e33b983adc5cf4db46d3a2d Mon Sep 17 00:00:00 2001 From: Justin Hammond Date: Wed, 6 Oct 2021 16:06:00 +0800 Subject: [PATCH 1/9] Initial Version of a nats.io storage backend. --- cmd/restic/global.go | 16 +- go.mod | 4 + go.sum | 25 +- internal/backend/location/location.go | 2 + internal/backend/nats/config.go | 56 ++++ internal/backend/nats/nats.go | 417 ++++++++++++++++++++++++++ 6 files changed, 516 insertions(+), 4 deletions(-) create mode 100644 internal/backend/nats/config.go create mode 100644 internal/backend/nats/nats.go diff --git a/cmd/restic/global.go b/cmd/restic/global.go index 360789ea..e86fa78e 100644 --- a/cmd/restic/global.go +++ b/cmd/restic/global.go @@ -18,6 +18,7 @@ import ( "github.com/restic/restic/internal/backend/gs" "github.com/restic/restic/internal/backend/local" "github.com/restic/restic/internal/backend/location" + "github.com/restic/restic/internal/backend/nats" "github.com/restic/restic/internal/backend/rclone" "github.com/restic/restic/internal/backend/rest" "github.com/restic/restic/internal/backend/s3" @@ -650,8 +651,14 @@ func parseConfig(loc location.Location, opts options.Options) (interface{}, erro debug.Log("opening rest repository at %#v", cfg) return cfg, nil - } - + case "nats": + cfg := loc.Config.(nats.Config) + if err := opts.Apply(loc.Scheme, &cfg); err != nil { + return nil, err + } + debug.Log("opening nats Repository at %#v", cfg) + return cfg, nil + } return nil, errors.Fatalf("invalid backend: %q", loc.Scheme) } @@ -703,7 +710,8 @@ func open(s string, gopts GlobalOptions, opts options.Options) (restic.Backend, be, err = rest.Open(cfg.(rest.Config), rt) case "rclone": be, err = rclone.Open(cfg.(rclone.Config), lim) - + case "nats": + be, err = nats.Open(globalOptions.ctx, cfg.(nats.Config)) default: return nil, errors.Fatalf("invalid backend: %q", loc.Scheme) } @@ -780,6 +788,8 @@ func create(s string, opts options.Options) (restic.Backend, error) { return rest.Create(globalOptions.ctx, cfg.(rest.Config), rt) case "rclone": return rclone.Create(globalOptions.ctx, cfg.(rclone.Config)) + case "nats": + return nats.Create(globalOptions.ctx, cfg.(nats.Config)) } debug.Log("invalid repository scheme: %v", s) diff --git a/go.mod b/go.mod index 6a5f17d5..1d7162e5 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/Azure/azure-sdk-for-go v55.6.0+incompatible github.com/Azure/go-autorest/autorest v0.11.19 // indirect github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect + github.com/Fishwaldo/restic-nats-server/protocol v0.0.0-00010101000000-000000000000 github.com/cenkalti/backoff/v4 v4.1.1 github.com/cespare/xxhash/v2 v2.1.1 github.com/dnaeon/go-vcr v1.2.0 // indirect @@ -18,6 +19,7 @@ require ( github.com/kurin/blazer v0.5.3 github.com/minio/minio-go/v7 v7.0.12 github.com/minio/sha256-simd v1.0.0 + github.com/nats-io/nats.go v1.12.3 github.com/ncw/swift/v2 v2.0.0 github.com/pkg/errors v0.9.1 github.com/pkg/profile v1.6.0 @@ -36,4 +38,6 @@ require ( gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 ) +replace github.com/Fishwaldo/restic-nats-server/protocol => /home/fish/restic-nats-server/protocol + go 1.14 diff --git a/go.sum b/go.sum index f85dafd2..aa818a8c 100644 --- a/go.sum +++ b/go.sum @@ -231,6 +231,8 @@ github.com/juju/ratelimit v1.0.1 h1:+7AIFJVQ0EQgq/K9+0Krm7m530Du7tIz0METWzN0RgY= github.com/juju/ratelimit v1.0.1/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s= +github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid v1.3.1 h1:5JNjFYYQrZeKRJ0734q51WCEEn2huer72Dc7K+R/b6s= github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4= @@ -249,6 +251,8 @@ github.com/magiconair/properties v1.8.5/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPK github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= +github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= +github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/minio/md5-simd v1.1.0 h1:QPfiOqlZH+Cj9teu0t9b1nTBfPbyTl16Of5MeuShdK4= github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw= github.com/minio/minio-go/v7 v7.0.12 h1:/4pxUdwn9w0QEryNkrrWaodIESPRX+NxpO0Q6hVdaAA= @@ -273,6 +277,19 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8= +github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= +github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= +github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI= +github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= +github.com/nats-io/nats-server/v2 v2.6.1 h1:cJy+ia7/4EaJL+ZYDmIy2rD1mDWTfckhtPBU0GYo8xM= +github.com/nats-io/nats-server/v2 v2.6.1/go.mod h1:Az91TbZiV7K4a6k/4v6YYdOKEoxCXj+iqhHVf/MlrKo= +github.com/nats-io/nats.go v1.12.3 h1:te0GLbRsjtejEkZKKiuk46tbfIn6FfCSv3WWSo1+51E= +github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/ncw/swift/v2 v2.0.0 h1:Q1jkMe/yhCkx7yAKq4bBZ/Th3NR+ejRcwbVK8Pi1i/0= github.com/ncw/swift/v2 v2.0.0/go.mod h1:z0A9RVdYPjNjXVo2pDOPxZ4eu3oarO1P91fTItcb+Kg= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= @@ -353,9 +370,11 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= @@ -466,6 +485,7 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -532,6 +552,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= +golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -710,8 +732,9 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/backend/location/location.go b/internal/backend/location/location.go index 43d6fa5a..53fc4856 100644 --- a/internal/backend/location/location.go +++ b/internal/backend/location/location.go @@ -8,6 +8,7 @@ import ( "github.com/restic/restic/internal/backend/b2" "github.com/restic/restic/internal/backend/gs" "github.com/restic/restic/internal/backend/local" + "github.com/restic/restic/internal/backend/nats" "github.com/restic/restic/internal/backend/rclone" "github.com/restic/restic/internal/backend/rest" "github.com/restic/restic/internal/backend/s3" @@ -41,6 +42,7 @@ var parsers = []parser{ {"swift", swift.ParseConfig, noPassword}, {"rest", rest.ParseConfig, rest.StripPassword}, {"rclone", rclone.ParseConfig, noPassword}, + {"nats", nats.ParseConfig, noPassword}, } // noPassword returns the repository location unchanged (there's no sensitive information there) diff --git a/internal/backend/nats/config.go b/internal/backend/nats/config.go new file mode 100644 index 00000000..c9cc60d8 --- /dev/null +++ b/internal/backend/nats/config.go @@ -0,0 +1,56 @@ +package nats + +import ( + "net/url" + "strings" + + "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/options" +) + +// Config contains all configuration necessary to connect to a REST server. +type Config struct { + Server *url.URL + Credential string `option:"credentialfile" help:"Path to the NatsIO Credential File"` + Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 5)"` + Repo string +} + +func init() { + options.Register("natsio", Config{}) +} + +// NewConfig returns a new Config with the default values filled in. +func NewConfig() Config { + return Config{ + Connections: 5, + } +} + +// ParseConfig parses the string s and extracts the REST server URL. +func ParseConfig(s string) (interface{}, error) { + if !strings.HasPrefix(s, "nats:") { + return nil, errors.New("invalid REST backend specification") + } + + u, err := url.Parse(s) + + if err != nil { + return nil, errors.Wrap(err, "url.Parse") + } + + cfg := NewConfig() + cfg.Server = u + var repo string + if cfg.Server.Path[0] == '/' { + repo = cfg.Server.Path[1:] + } + if repo[len(repo)-1] == '/' { + repo = repo[0:len(repo)-1] + } + // replace any further slashes with . to specify a nested queue + repo = strings.Replace(repo, "/", ".", -1) + + cfg.Repo = repo + return cfg, nil +} \ No newline at end of file diff --git a/internal/backend/nats/nats.go b/internal/backend/nats/nats.go new file mode 100644 index 00000000..dc82844d --- /dev/null +++ b/internal/backend/nats/nats.go @@ -0,0 +1,417 @@ +package nats + +import ( + "bytes" + "context" + "fmt" + "hash" + "io" + "os" + "path" + "path/filepath" + "time" + + "github.com/nats-io/nats.go" + "github.com/restic/restic/internal/backend" + "github.com/Fishwaldo/restic-nats-server/protocol" + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/restic" +) + +// make sure the rest backend implements restic.Backend +var _ restic.Backend = &Backend{} + +const defaultLayout = "default" + +// Backend uses the REST protocol to access data stored on a server. +type Backend struct { + sem *backend.Semaphore + backend.Layout + cfg Config + conn *nats.Conn + enc nats.Encoder +} + +func connectNats(be *Backend) error { + server := be.cfg.Server.Hostname() + port := be.cfg.Server.Port() + if port == "" { + port = "4222" + } + url := fmt.Sprintf("nats://%s:%s", server, port) + + /* Check Credential File Exists */ + _, err := os.Stat(be.cfg.Credential) + if err != nil { + return errors.Wrap(err, "credential file missing") + } + + var options []nats.Option + options = append(options, nats.UserCredentials(be.cfg.Credential)) + options = append(options, nats.ClosedHandler(natsClosedCB)) + options = append(options, nats.DisconnectHandler(natsDisconnectedCB)) + + be.conn, err = nats.Connect(url, options...) + if err != nil { + return errors.Wrap(err, "nats connection failed") + } + + if size := be.conn.MaxPayload(); size < 8388608 { + return errors.New("NATS Server Max Payload Size is below 8Mb") + } + + if !be.conn.HeadersSupported() { + return errors.New("server does not support Headers") + } + + be.enc = nats.EncoderForType("gob") + if be.enc == nil { + return errors.New("Can't Load json Decoder") + } + be.sem, err = backend.NewSemaphore(be.cfg.Connections) + + fmt.Printf("Connected to Nats Server: %s (Cluster: %s)\n", be.conn.ConnectedServerName(), be.conn.ConnectedClusterName()) + + if err != nil { + return err + } + return nil +} + +func natsClosedCB(conn *nats.Conn) { + +} + +func natsDisconnectedCB(conn *nats.Conn) { + +} + +func (be *Backend) SendMsgWithReply(ctx context.Context, op protocol.NatsCommand, send interface{}, recv interface{}) error { + var operation string + be.sem.GetToken() + defer be.sem.ReleaseToken() + start := time.Now() + switch op { + case protocol.NatsOpenCmd: + if _, ok := send.(protocol.OpenOp); !ok { + return errors.Errorf("Struct Supplied to SendMsgWithReply is not a openOP: %T", send) + } + if _, ok := recv.(*protocol.OpenResult); !ok { + return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a openResult: %T", recv) + } + operation = "open" + case protocol.NatsStatCmd: + if _, ok := send.(protocol.StatOp); !ok { + return errors.Errorf("Struct Supplied to SendMsgWithReply is not a statOp: %T", send) + } + if _, ok := recv.(*protocol.StatResult); !ok { + return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a statResult %T", recv) + } + operation = "stat" + case protocol.NatsMkdirCmd: + if _, ok := send.(protocol.MkdirOp); !ok { + return errors.Errorf("Struct Supplied to SendMsgWithReply is not a mkdirOp: %T", send) + } + if _, ok := recv.(*protocol.MkdirResult); !ok { + return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a mkdirResult %T", recv) + } + operation = "mkdir" + case protocol.NatsSaveCmd: + if _, ok := send.(protocol.SaveOp); !ok { + return errors.Errorf("Struct Supplied to SendMsgWithReply is not a saveOp: %T", send) + } + if _, ok := recv.(*protocol.SaveResult); !ok { + return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a saveResult %T", recv) + } + operation = "save" + case protocol.NatsListCmd: + if _, ok := send.(protocol.ListOp); !ok { + return errors.Errorf("Struct Supplied to SendMsgWithReply is not a listOp: %T", send) + } + if _, ok := recv.(*protocol.ListResult); !ok { + return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a listResult %T", recv) + } + operation = "list" + case protocol.NatsLoadCmd: + if _, ok := send.(protocol.LoadOp); !ok { + return errors.Errorf("Struct Supplied to SendMsgWithReply is not a loadOp: %T", send) + } + if _, ok := recv.(*protocol.LoadResult); !ok { + return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a loadResult %T", recv) + } + operation = "load" + case protocol.NatsRemoveCmd: + if _, ok := send.(protocol.RemoveOp); !ok { + return errors.Errorf("Struct Supplied to SendMsgWithReply is not a removeOp: %T", send) + } + if _, ok := recv.(*protocol.RemoveResult); !ok { + return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a removeOp %T", recv) + } + operation = "remove" + + } + var err error + subject := fmt.Sprintf("repo.commands.%s", operation) + msg := protocol.NewRNSMsg(subject) + + defer func() { + if err == nil { + debug.Log("SendMsgWithReply (%s - %s) Took %s - %d Bytes", operation, msg.Header.Get("X-RNS-MSGID"), time.Since(start), len(msg.Data)) + } else { + debug.Log("SendMsgWithREply (%s - %s) Failed: %s (Took %s - %d Bytes", operation, msg.Header.Get("X-RNS-MSGID"), err, time.Since(start), len(msg.Data)) + } + }() + + msg.Header.Set("X-RNS-OP", operation) + + msg.Data, err = be.enc.Encode(subject, send) + if err != nil { + return errors.Wrap(err, "Encoding Failed") + } + msg.Reply = nats.NewInbox() + //debug.Log("Sending %s %d\n", msg.Subject, len(msg.Data)) + + /* check the size of the Data Field. If its close to our NATS max payload size + * then we will chunk the transfer instead + */ + var chunkedmsg *nats.Msg + + chunkedmsg, err = protocol.ChunkSendRequestMsgWithContext(ctx, be.conn, msg, debug.Log) + if err != nil { + return errors.Wrapf(err, "ChunkRequestMsgWithContext Error: %d", len(msg.Data)) + } + if err := be.enc.Decode(chunkedmsg.Subject, chunkedmsg.Data, recv); err != nil { + return errors.Wrapf(err, "Decode Failed %s %s %d", chunkedmsg.Header.Get("X-RNS-MSGID"), chunkedmsg.Header, len(chunkedmsg.Data)) + } + return nil +} + +func Open(ctx context.Context, cfg Config) (*Backend, error) { + debug.Log("open nats backend at %s", cfg.Server.String()) + + sem, err := backend.NewSemaphore(cfg.Connections) + if err != nil { + return nil, err + } + + be := &Backend{ + sem: sem, + cfg: cfg, + } + + l, err := backend.ParseLayout(ctx, be, "default", defaultLayout, "") + if err != nil { + return nil, err + } + + be.Layout = l + + err = connectNats(be) + if err != nil { + return nil, errors.Wrap(err, "open nats failed") + } + + co := protocol.OpenOp{Bucket: be.cfg.Repo} + var result protocol.OpenResult + if err := be.SendMsgWithReply(ctx, protocol.NatsOpenCmd, co, &result); err != nil { + return nil, errors.Wrap(err, "OpenOp Failed") + } + debug.Log("Open Result: %+v\n", result) + + return be, nil +} + +// Create creates all the necessary files and directories for a new local +// backend at dir. Afterwards a new config blob should be created. +func Create(ctx context.Context, cfg Config) (*Backend, error) { + debug.Log("create nats backend at %s Repo %s", cfg.Server.String(), cfg.Repo) + be, err := Open(ctx, cfg) + if err != nil { + return nil, errors.Wrap(err, "Create Repo") + } + _, err = be.Stat(ctx, restic.Handle{Type: restic.ConfigFile}) + if err == nil { + return nil, errors.New("config file already exists") + } + + for _, d := range be.Paths() { + err := be.Mkdir(ctx, d) + if err != nil { + return nil, err + } + } + + return be, nil +} + +// Location returns a string that describes the type and location of the +// repository. +func (b *Backend) Location() string { + return b.cfg.Server.String() +} + +// Hasher may return a hash function for calculating a content hash for the backend +func (b *Backend) Hasher() hash.Hash { + return nil +} + +// Test a boolean value whether a File with the name and type exists. +func (b *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) { + debug.Log("Test %s - %s", b.cfg.Server.String(), b.Filename(h)) + _, err := b.Stat(ctx, h) + if err != nil { + return false, nil + } + return true, nil +} + +// Remove removes a File described by h. +func (b *Backend) Remove(ctx context.Context, h restic.Handle) error { + debug.Log("Remove %s - %s", b.cfg.Server.String(), b.Filename(h)) + ro := protocol.RemoveOp{Bucket: b.cfg.Repo, Dir: b.Dirname(h), Name: filepath.Base(b.Filename(h))} + var result protocol.RemoveResult + if err := b.SendMsgWithReply(context.Background(), protocol.NatsRemoveCmd, ro, &result); err != nil { + return errors.Wrap(err, "Remove: SendMsgWithReply Failed") + } + if result.Ok { + return nil + } + return errors.Errorf("Remove Failed") +} + +// Close the backend +func (b *Backend) Close() error { + debug.Log("Close %s", b.cfg.Server.String()) + return nil +} + +// Save stores the data from rd under the given handle. +func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { + debug.Log("Save %s - %s", b.cfg.Server.String(), b.Filename(h)) + var so protocol.SaveOp + so.Dir = b.Dirname(h) + so.Name = filepath.Base(b.Filename(h)) + so.Filesize = rd.Length() + so.Bucket = b.cfg.Repo + + var err error + so.Data, err = io.ReadAll(rd) + so.PacketSize = len(so.Data) + if err != nil { + return errors.Wrap(err, "Save") + } + so.Offset = 0 + var result protocol.SaveResult + if err := b.SendMsgWithReply(context.Background(), protocol.NatsSaveCmd, so, &result); err != nil { + return errors.Wrap(err, "Save: SendMsgWithReply Failed") + } + if result.Ok { + return nil + } + return errors.Errorf("Save Failed") +} + +// Load runs fn with a reader that yields the contents of the file at h at the +// given offset. If length is larger than zero, only a portion of the file +// is read. +// +// The function fn may be called multiple times during the same Load invocation +// and therefore must be idempotent. +// +// Implementations are encouraged to use backend.DefaultLoad +func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + debug.Log("Load %s - %s (start %d length %d)", b.cfg.Server.String(), b.Filename(h), offset, length) + lo := protocol.LoadOp{Bucket: b.cfg.Repo, Dir: b.Dirname(h), Name: filepath.Base(b.Filename(h)), Length: length, Offset: offset} + var result protocol.LoadResult + if err := b.SendMsgWithReply(context.Background(), protocol.NatsLoadCmd, lo, &result); err != nil { + return errors.Wrap(err, "Save: SendMsgWithReply Failed") + } + if !result.Ok { + return errors.Errorf("Load Failed") + } + rd := bytes.NewReader(result.Data) + if err := fn(rd); err != nil { + return errors.Wrap(err, "Load Read") + } + return nil +} + +// Stat returns information about the File identified by h. +func (b *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { + debug.Log("Stat %s - %s", b.cfg.Server.String(), b.Filename(h)) + op := protocol.StatOp{Bucket: b.cfg.Repo, Filename: b.Filename(h)} + var result protocol.StatResult + if err := b.SendMsgWithReply(context.Background(), protocol.NatsStatCmd, op, &result); err != nil { + return restic.FileInfo{}, errors.Wrap(err, "statOp Failed") + } + if result.Ok { + return restic.FileInfo{Size: result.Size, Name: h.Name}, nil + } else { + return restic.FileInfo{}, errors.New("File does not exist") + } +} + +// List runs fn for each file in the backend which has the type t. When an +// error occurs (or fn returns an error), List stops and returns it. +// +// The function fn is called exactly once for each file during successful +// execution and at most once in case of an error. +// +// The function fn is called in the same Goroutine that List() is called +// from. +func (b *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { + basedir, subdirs := b.Basedir(t) + debug.Log("List %s - %s - Subdirs? %t", b.cfg.Server.String(), basedir, subdirs) + op := protocol.ListOp{Bucket: b.cfg.Repo, BaseDir: basedir, SubDir: subdirs} + var result protocol.ListResult + if err := b.SendMsgWithReply(context.Background(), protocol.NatsListCmd, op, &result); err != nil { + return errors.Wrap(err, "listOp Failed") + } + for _, fi := range result.FI { + rfi := restic.FileInfo{Name: fi.Name, Size: fi.Size} + + if err := fn(rfi); err != nil { + return err + } + } + return nil +} + +// IsNotExist returns true if the error was caused by a non-existing file +// in the backend. +func (b *Backend) IsNotExist(err error) bool { + debug.Log("IsNotExist %s (TODO) - %s", b.cfg.Server.String(), err) + + fmt.Printf("IsNotExist Called\n") + return false +} + +// Delete removes all data in the backend. +func (b *Backend) Delete(ctx context.Context) error { + debug.Log("Delete %s (TODO)", b.cfg.Server.String()) + return errors.Errorf("TODO Delete") +} + +func (b *Backend) Join(p ...string) string { + return path.Join(p...) +} + +func (b *Backend) ReadDir(ctx context.Context, dir string) ([]os.FileInfo, error) { + debug.Log("ReadDir %s (TODO) - %s", b.cfg.Server.String(), dir) + return nil, errors.Errorf("TODO: ReadDir") +} + +func (b *Backend) Mkdir(ctx context.Context, dir string) error { + debug.Log("Mkdir %s - %s", b.cfg.Server.String(), dir) + op := protocol.MkdirOp{Bucket: b.cfg.Repo, Dir: dir} + var result protocol.MkdirResult + if err := b.SendMsgWithReply(context.Background(), protocol.NatsMkdirCmd, op, &result); err != nil { + return errors.Wrap(err, "natsMkdirCmd Failed") + } + if result.Ok { + return nil + } else { + return errors.New("mkdir Failed") + } +} From 3c01734ddf8c7a14f0f340911c2c072578c3141a Mon Sep 17 00:00:00 2001 From: Justin Hammond Date: Wed, 6 Oct 2021 23:30:15 +0800 Subject: [PATCH 2/9] Update go.mod --- go.mod | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 1d7162e5..eebda9bd 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/Azure/azure-sdk-for-go v55.6.0+incompatible github.com/Azure/go-autorest/autorest v0.11.19 // indirect github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect - github.com/Fishwaldo/restic-nats-server/protocol v0.0.0-00010101000000-000000000000 + github.com/Fishwaldo/restic-nats-server/protocol github.com/cenkalti/backoff/v4 v4.1.1 github.com/cespare/xxhash/v2 v2.1.1 github.com/dnaeon/go-vcr v1.2.0 // indirect @@ -38,6 +38,6 @@ require ( gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 ) -replace github.com/Fishwaldo/restic-nats-server/protocol => /home/fish/restic-nats-server/protocol +//replace github.com/Fishwaldo/restic-nats-server/protocol => /home/fish/restic-nats-server/protocol go 1.14 From fdbc9af83c2cc6a1d82c6ab015c8150c08741e77 Mon Sep 17 00:00:00 2001 From: Justin Hammond Date: Thu, 7 Oct 2021 13:55:12 +0800 Subject: [PATCH 3/9] Fix up go.mod --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index eebda9bd..2023ed12 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/Azure/azure-sdk-for-go v55.6.0+incompatible github.com/Azure/go-autorest/autorest v0.11.19 // indirect github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect - github.com/Fishwaldo/restic-nats-server/protocol + github.com/Fishwaldo/restic-nats-server/protocol v0.0.0-20211006112721-85bf5a959b04 github.com/cenkalti/backoff/v4 v4.1.1 github.com/cespare/xxhash/v2 v2.1.1 github.com/dnaeon/go-vcr v1.2.0 // indirect diff --git a/go.sum b/go.sum index aa818a8c..1e082e48 100644 --- a/go.sum +++ b/go.sum @@ -64,6 +64,8 @@ github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUM github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/Fishwaldo/restic-nats-server/protocol v0.0.0-20211006112721-85bf5a959b04 h1:PNLAHZO3SXbjsUxZ7s5M+In48LQr7+NYXYcchFstMJs= +github.com/Fishwaldo/restic-nats-server/protocol v0.0.0-20211006112721-85bf5a959b04/go.mod h1:URXej+zQBychnTp4/xwPd3nxnj0mK5EG1BPBppYoeKU= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= From 43a1914da890d39cbd1c7193b27467c9593161ab Mon Sep 17 00:00:00 2001 From: Justin Hammond Date: Thu, 7 Oct 2021 14:30:02 +0800 Subject: [PATCH 4/9] Fix up Formatting --- internal/backend/nats/config.go | 12 ++++++------ internal/backend/nats/nats.go | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/backend/nats/config.go b/internal/backend/nats/config.go index c9cc60d8..5bb8d324 100644 --- a/internal/backend/nats/config.go +++ b/internal/backend/nats/config.go @@ -10,10 +10,10 @@ import ( // Config contains all configuration necessary to connect to a REST server. type Config struct { - Server *url.URL - Credential string `option:"credentialfile" help:"Path to the NatsIO Credential File"` - Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 5)"` - Repo string + Server *url.URL + Credential string `option:"credentialfile" help:"Path to the NatsIO Credential File"` + Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 5)"` + Repo string } func init() { @@ -46,11 +46,11 @@ func ParseConfig(s string) (interface{}, error) { repo = cfg.Server.Path[1:] } if repo[len(repo)-1] == '/' { - repo = repo[0:len(repo)-1] + repo = repo[0 : len(repo)-1] } // replace any further slashes with . to specify a nested queue repo = strings.Replace(repo, "/", ".", -1) cfg.Repo = repo return cfg, nil -} \ No newline at end of file +} diff --git a/internal/backend/nats/nats.go b/internal/backend/nats/nats.go index dc82844d..8df56b80 100644 --- a/internal/backend/nats/nats.go +++ b/internal/backend/nats/nats.go @@ -11,9 +11,9 @@ import ( "path/filepath" "time" + "github.com/Fishwaldo/restic-nats-server/protocol" "github.com/nats-io/nats.go" "github.com/restic/restic/internal/backend" - "github.com/Fishwaldo/restic-nats-server/protocol" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" From 980f1a5326c8a54f66d5fe84ced2f138c1aa94e4 Mon Sep 17 00:00:00 2001 From: Justin Hammond Date: Fri, 8 Oct 2021 19:09:24 +0800 Subject: [PATCH 5/9] Work with Nats Tennants --- go.mod | 5 +- go.sum | 2 - internal/backend/nats/nats.go | 17 +- internal/backend/nats/protocol/protocol.go | 391 +++++++++++++++++++++ 4 files changed, 407 insertions(+), 8 deletions(-) create mode 100644 internal/backend/nats/protocol/protocol.go diff --git a/go.mod b/go.mod index 2023ed12..d5c5bb0e 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ require ( github.com/Azure/azure-sdk-for-go v55.6.0+incompatible github.com/Azure/go-autorest/autorest v0.11.19 // indirect github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect - github.com/Fishwaldo/restic-nats-server/protocol v0.0.0-20211006112721-85bf5a959b04 github.com/cenkalti/backoff/v4 v4.1.1 github.com/cespare/xxhash/v2 v2.1.1 github.com/dnaeon/go-vcr v1.2.0 // indirect @@ -19,6 +18,7 @@ require ( github.com/kurin/blazer v0.5.3 github.com/minio/minio-go/v7 v7.0.12 github.com/minio/sha256-simd v1.0.0 + github.com/nats-io/nats-server/v2 v2.6.1 // indirect github.com/nats-io/nats.go v1.12.3 github.com/ncw/swift/v2 v2.0.0 github.com/pkg/errors v0.9.1 @@ -35,9 +35,8 @@ require ( golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c golang.org/x/text v0.3.6 google.golang.org/api v0.50.0 + google.golang.org/protobuf v1.27.1 // indirect gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 ) -//replace github.com/Fishwaldo/restic-nats-server/protocol => /home/fish/restic-nats-server/protocol - go 1.14 diff --git a/go.sum b/go.sum index 1e082e48..aa818a8c 100644 --- a/go.sum +++ b/go.sum @@ -64,8 +64,6 @@ github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUM github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/Fishwaldo/restic-nats-server/protocol v0.0.0-20211006112721-85bf5a959b04 h1:PNLAHZO3SXbjsUxZ7s5M+In48LQr7+NYXYcchFstMJs= -github.com/Fishwaldo/restic-nats-server/protocol v0.0.0-20211006112721-85bf5a959b04/go.mod h1:URXej+zQBychnTp4/xwPd3nxnj0mK5EG1BPBppYoeKU= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= diff --git a/internal/backend/nats/nats.go b/internal/backend/nats/nats.go index 8df56b80..b66dadbd 100644 --- a/internal/backend/nats/nats.go +++ b/internal/backend/nats/nats.go @@ -11,7 +11,7 @@ import ( "path/filepath" "time" - "github.com/Fishwaldo/restic-nats-server/protocol" + "github.com/restic/restic/internal/backend/nats/protocol" "github.com/nats-io/nats.go" "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/debug" @@ -48,7 +48,13 @@ func connectNats(be *Backend) error { } var options []nats.Option - options = append(options, nats.UserCredentials(be.cfg.Credential)) + if len(be.cfg.Credential) > 0 { + options = append(options, nats.UserCredentials(be.cfg.Credential)) + } + if len(be.cfg.Server.User.Username()) > 0 { + pass, _ := be.cfg.Server.User.Password() + options = append(options, nats.UserInfo(be.cfg.Server.User.Username(), pass)) + } options = append(options, nats.ClosedHandler(natsClosedCB)) options = append(options, nats.DisconnectHandler(natsDisconnectedCB)) @@ -175,9 +181,14 @@ func (be *Backend) SendMsgWithReply(ctx context.Context, op protocol.NatsCommand /* check the size of the Data Field. If its close to our NATS max payload size * then we will chunk the transfer instead */ + log := func(msg string, args ...interface{}) { + fmt.Printf(msg, args...) + fmt.Println() + } + var chunkedmsg *nats.Msg - chunkedmsg, err = protocol.ChunkSendRequestMsgWithContext(ctx, be.conn, msg, debug.Log) + chunkedmsg, err = protocol.ChunkSendRequestMsgWithContext(ctx, be.conn, msg, log) if err != nil { return errors.Wrapf(err, "ChunkRequestMsgWithContext Error: %d", len(msg.Data)) } diff --git a/internal/backend/nats/protocol/protocol.go b/internal/backend/nats/protocol/protocol.go new file mode 100644 index 00000000..85f35d74 --- /dev/null +++ b/internal/backend/nats/protocol/protocol.go @@ -0,0 +1,391 @@ +package protocol + +import ( + "context" + "encoding/json" + "fmt" + "math/rand" + "strconv" + "strings" + "time" + + "github.com/nats-io/nats.go" + "github.com/pkg/errors" +) + +type NatsCommand int + +const ( + NatsOpenCmd NatsCommand = iota + NatsStatCmd + NatsMkdirCmd + NatsSaveCmd + NatsListCmd + NatsLoadCmd + NatsRemoveCmd +) + +type OpenOp struct { + Bucket string `json:"bucket"` +} +type OpenResult struct { + Ok bool `json:"ok"` +} + +type StatOp struct { + Bucket string `json:"bucket"` + Filename string `json:"filename"` +} + +type StatResult struct { + Ok bool `json:"ok"` + Size int64 `json:"size"` + Name string `json:"name"` +} + +type MkdirOp struct { + Bucket string `json:"bucket"` + Dir string `json:"dir"` +} + +type MkdirResult struct { + Ok bool `json:"ok"` +} + +type SaveOp struct { + Bucket string `json:"bucket"` + Dir string `json:"dir"` + Name string `json:"name"` + Filesize int64 `json:"size"` + PacketSize int `json:"packet_size"` + Offset int64 `json:"offset"` + Data []byte `json:"data"` +} + +type SaveResult struct { + Ok bool `json:"ok"` +} + +type ListOp struct { + Bucket string `json:"bucket"` + BaseDir string `json:"base_dir"` + SubDir bool `json:"sub_dir"` +} + +type FileInfo struct { + Name string `json:"name"` + Size int64 `json:"size"` +} + +type ListResult struct { + Ok bool `json:"ok"` + FI []FileInfo `json:"fi"` +} + +type LoadOp struct { + Bucket string `json:"bucket"` + Dir string `json:"dir"` + Name string `json:"name"` + Length int `json:"length"` + Offset int64 `json:"offset"` +} + +type LoadResult struct { + Ok bool `json:"ok"` + Data []byte `json:"data"` +} + +type RemoveOp struct { + Bucket string `json:"bucket"` + Dir string `json:"dir"` + Name string `json:"name"` +} + +type RemoveResult struct { + Ok bool `json:"ok"` +} + +const ( + msgHeaderID string = "X-RNS-MSGID" + msgHeaderChunk string = "X-RNS-CHUNKS" + msgHeaderChunkSubject string = "X-RNS-CHUNK-SUBJECT" + msgHeaderChunkSeq string = "X-RNS-CHUNKS-SEQ" + msgHeaderNRI string = "Nats-Request-Info" +) + +func copyHeader(msg *nats.Msg) (hdr nats.Header) { + hdr = make(nats.Header) + hdr.Add(msgHeaderID, msg.Header.Get(msgHeaderID)) + hdr.Add(msgHeaderChunk, msg.Header.Get(msgHeaderChunk)) + return hdr +} + +type nriT struct { + Acc string `json:"acc"` + Rtt int `json:"rtt"` +} + +func getNRI(msg *nats.Msg) (*nriT, bool) { + nri := msg.Header.Get(msgHeaderNRI) + if nri == "" { + return nil, false + } + var res nriT + if err := json.Unmarshal([]byte(nri), &res); err != nil { + return nil, false + } + return &res, true +} + +// NewRNSMSG Returns a New RNS Message (for each "Transaction") +func NewRNSMsg(subject string) *nats.Msg { + msg := nats.NewMsg(subject) + msg.Header.Set(msgHeaderID, randStringBytesMaskImprSrcSB(16)) + return msg; +} + +const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" +const ( + letterIdxBits = 6 // 6 bits to represent a letter index + letterIdxMask = 1<= 0; { + if remain == 0 { + cache, remain = src.Int63(), letterIdxMax + } + if idx := int(cache & letterIdxMask); idx < len(letterBytes) { + sb.WriteByte(letterBytes[idx]) + i-- + } + cache >>= letterIdxBits + remain-- + } + + return sb.String() +} + + +func ChunkSendReplyMsgWithContext(ctx context.Context, conn *nats.Conn, replyto *nats.Msg, msg *nats.Msg, log func(string, ...interface{})) error { + if len(msg.Header.Get(msgHeaderID)) == 0 { + return errors.New("MessageID Not Set") + } + + var maxchunksize int = int(0.95 * float32(conn.MaxPayload())) + maxchunksize = 1024000 * 0.95 + datasize := len(msg.Data) + log("ChunkSendReplyMsgWithContext: MsgID %s - Headers %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data)) + + if len(msg.Data) < maxchunksize { + /* data is less then our maxchunksize, so we can just send it */ + log("ChunkSendReplyMsgWithContext: Short Reply Message %s", msg.Header.Get(msgHeaderID)) + err := replyto.RespondMsg(msg) + return errors.Wrap(err, "Short Reply Message Send Failure") + } + + /* need to Split the Data into Chunks + * we will end up sending pages + 1 messages + * as the initial message contains data as well + */ + pages := datasize / maxchunksize + initialchunk := nats.NewMsg(msg.Subject) + initialchunk.Header = copyHeader(msg) + initialchunk.Header.Set(msgHeaderChunk, fmt.Sprintf("%d", pages)) + if len(msg.Data) < maxchunksize { + maxchunksize = len(msg.Data) + } + initialchunk.Data = msg.Data[:maxchunksize] + log("Chunking Reply Message %s (%s)- pages %d, len %d First Chunk %d", initialchunk.Header.Get(msgHeaderID), initialchunk.Header, pages, len(msg.Data), len(initialchunk.Data)) + chunkchannelmsg, err := conn.RequestMsgWithContext(ctx, initialchunk) + if err != nil { + return errors.Wrap(err, "ChunkSendReplyMsgWithContext") + } + /* Reply Message just has a header with the subject we send the rest of the chunks to */ + chunkid := chunkchannelmsg.Header.Get(msgHeaderChunkSubject) + if chunkid == "" { + return errors.New("Chunked Reply Response didn't include ChunkID") + } + var chunksubject string + if nri, ok := getNRI(chunkchannelmsg); ok { + chunksubject = fmt.Sprintf("chunk.%s.send.%s", nri.Acc, chunkid ) + } else { + chunksubject = fmt.Sprintf("chunk.send.%s", chunkid ) + } + + + + for i := 1; i <= pages; i++ { + chunkmsg := nats.NewMsg(chunksubject) + chunkmsg.Header = copyHeader(msg) + chunkmsg.Header.Set(msgHeaderChunkSeq, fmt.Sprintf("%d", i)) + start := maxchunksize * i + end := maxchunksize * (i + 1) + /* make sure we don't overrun our slice */ + if end > len(msg.Data) { + end = len(msg.Data) + } + chunkmsg.Data = msg.Data[start:end] + log("Sending Reply Chunk %s - Page: %d (%d:%d)", chunkmsg.Header.Get(msgHeaderID), i, start, end) + var chunkack *nats.Msg + if i < pages { + chunkack, err = conn.RequestMsgWithContext(ctx, chunkmsg) + log("Chunk Ack Reply: %s %s - Page %d", chunkack.Header.Get(msgHeaderID), chunkack.Header, i) + if err != nil { + return errors.Wrap(err, "ChunkSendReplyMsgWithContext") + } + } else { + err := conn.PublishMsg(chunkmsg) + if err != nil { + return errors.Wrap(err, "ChunkSendReplyMsgWithContext") + } + } + + /* all chunkackorreply */ + if i == pages { + return nil + } + } + return errors.New("Failed") +} + +func ChunkSendRequestMsgWithContext(ctx context.Context, conn *nats.Conn, msg *nats.Msg, log func(string, ...interface{})) (*nats.Msg, error) { + if len(msg.Header.Get(msgHeaderID)) == 0 { + return nil, errors.New("MessageID Not Set") + } + + var maxchunksize int = int(0.95 * float32(conn.MaxPayload())) + maxchunksize = 1024000 * 0.95 + datasize := len(msg.Data) + log("ChunkSendRequestMsgWithContext: MsgID %s - Headers %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data)) + + if len(msg.Data) < maxchunksize { + /* data is less then our maxchunksize, so we can just send it */ + log("Short SendRequest MsgID %s - %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data)) + reply, err := conn.RequestMsgWithContext(ctx, msg) + if err != nil { + return nil, errors.Wrap(err, "Short Message Send Failure") + } + log("Short ReplyRequest MsgID %s Headers %s Size: %d", reply.Header.Get(msgHeaderID), reply.Header, len(reply.Data)) + return ChunkReadRequestMsgWithContext(ctx, conn, reply, log) + } + + /* need to Split the Data into Chunks + * we will end up sending pages + 1 messages + * as the initial message contains data as well + */ + pages := datasize / maxchunksize + + initialchunk := nats.NewMsg(msg.Subject) + initialchunk.Header = copyHeader(msg) + initialchunk.Header.Set(msgHeaderChunk, fmt.Sprintf("%d", pages)) + + initialchunk.Data = msg.Data[:maxchunksize] + log("Chunking Send Request MsgID %s - %s- pages %d, len %d First Chunk %d", initialchunk.Header.Get(msgHeaderID), initialchunk.Header, pages, len(msg.Data), len(initialchunk.Data)) + chunkchannelmsg, err := conn.RequestMsgWithContext(ctx, initialchunk) + if err != nil { + return nil, errors.Wrap(err, "chunkRequestMsgWithContext") + } + /* Reply Message just has a header with the subject we send the rest of the chunks to */ + chunkid := chunkchannelmsg.Header.Get(msgHeaderChunkSubject) + if chunkid == "" { + return nil, errors.New("Chunked Reply Response didn't include ChunkID") + } + var chunksubject string + if nri, ok := getNRI(chunkchannelmsg); ok { + chunksubject = fmt.Sprintf("chunk.%s.send.%s", nri.Acc, chunkid ) + } else { + chunksubject = fmt.Sprintf("chunk.send.%s", chunkid ) + } + + for i := 1; i <= pages; i++ { + chunkmsg := nats.NewMsg(chunksubject) + chunkmsg.Header = copyHeader(msg) + chunkmsg.Header.Set(msgHeaderChunkSeq, fmt.Sprintf("%d", i)) + start := maxchunksize * i + end := maxchunksize * (i + 1) + /* make sure we don't overrun our slice */ + if end > len(msg.Data) { + end = len(msg.Data) + } + chunkmsg.Data = msg.Data[start:end] + log("Sending Request Chunk %s %s to %s- Page: %d (%d:%d)", chunkmsg.Header.Get(msgHeaderID), chunkmsg.Header, chunkmsg.Subject, i, start, end) + var chunkackorreply *nats.Msg + chunkackorreply, err = conn.RequestMsgWithContext(ctx, chunkmsg) + if err != nil { + return nil, errors.Wrap(err, "Chunk Send") + } + log("got Result %s - %s", chunkmsg.Header.Get(msgHeaderID), chunkmsg.Header) + /* only the last Chunk Reply will contain the actual Response from the other side */ + if i == pages { + log("SendRequest Chunk Reply: MsgID %s Headers %s Size: %d", chunkackorreply.Header.Get(msgHeaderID), chunkackorreply.Header, len(chunkackorreply.Data)) + return ChunkReadRequestMsgWithContext(ctx, conn, chunkackorreply, log) + } + } + return nil, errors.New("Failed") +} + +func ChunkReadRequestMsgWithContext(ctx context.Context, conn *nats.Conn, msg *nats.Msg, log func(string, ...interface{})) (*nats.Msg, error) { + if len(msg.Header.Get(msgHeaderID)) == 0 { + return nil, errors.New("MessageID Not Set") + } + log("ChunkReadRequestMsgWithContext: MsgID %s - Headers %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data)) + chunked := msg.Header.Get(msgHeaderChunk) + if chunked != "" { + pages, err := strconv.Atoi(chunked) + if err != nil { + return nil, errors.Wrap(err, "Couldn't get Chunk Page Count") + } + log("Chunked Message Recieved: %s - %s - %d pages", msg.Header.Get(msgHeaderID), msg.Header, pages) + chunktransfer := randStringBytesMaskImprSrcSB(16) + chunkchan := make(chan *nats.Msg) + var chunktransfersubject string + if nri, ok := getNRI(msg); ok { + chunktransfersubject = fmt.Sprintf("chunk.%s.recieve.%s", nri.Acc, chunktransfer ) + } else { + chunktransfersubject = fmt.Sprintf("chunk.recieve.%s", chunktransfer ) + } + sub, err := conn.ChanSubscribe(chunktransfersubject, chunkchan) + if err != nil { + return nil, errors.Wrap(err, "Couldn't Subscribe to Chunk Channel") + } + log("Subscription: %+v", sub) + defer sub.Unsubscribe() + defer close(chunkchan) + chunksubmsg := nats.NewMsg(msg.Reply) + chunksubmsg.Header = copyHeader(msg) + chunksubmsg.Header.Add(msgHeaderChunkSubject, chunktransfer) + msg.RespondMsg(chunksubmsg) + /* pages - 1 because we got first Chunk in original message */ + for i := 1; i <= pages; i++ { + log("Pending MsgID %s Chunk %d of %d on %s", chunksubmsg.Header.Get(msgHeaderID), i, pages, chunktransfersubject) + select { + case chunk := <-chunkchan: + seq, _ := strconv.Atoi(chunk.Header.Get(msgHeaderChunkSeq)) + log("Got MsgID %s - %s Chunk %d %d", chunk.Header.Get(msgHeaderID), chunk.Header, seq, i) + msg.Data = append(msg.Data, chunk.Data...) + if i < pages { + ackChunk := nats.NewMsg(chunk.Subject) + ackChunk.Header = copyHeader(chunk) + err := chunk.RespondMsg(ackChunk) + if err != nil { + return nil, errors.Wrap(err, "Chunk Reply Error") + } + } else { + msg.Reply = chunk.Reply + } + case <-ctx.Done(): + log("Context Canceled") + return nil, context.DeadlineExceeded + } + } + log("Chunked Messages Done - %s - %s Final Size %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data)) + } + return msg, nil +} From b3dc3615b1cec60f659c35652b34fdcae9f45d61 Mon Sep 17 00:00:00 2001 From: Justin Hammond Date: Fri, 8 Oct 2021 23:17:12 +0800 Subject: [PATCH 6/9] fix a few bugs with subscriptions and multitenant setup --- internal/backend/nats/nats.go | 19 ++++---- internal/backend/nats/protocol/protocol.go | 51 ++++++++++++---------- 2 files changed, 38 insertions(+), 32 deletions(-) diff --git a/internal/backend/nats/nats.go b/internal/backend/nats/nats.go index b66dadbd..8d48fa92 100644 --- a/internal/backend/nats/nats.go +++ b/internal/backend/nats/nats.go @@ -11,9 +11,9 @@ import ( "path/filepath" "time" - "github.com/restic/restic/internal/backend/nats/protocol" "github.com/nats-io/nats.go" "github.com/restic/restic/internal/backend" + "github.com/restic/restic/internal/backend/nats/protocol" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" @@ -181,19 +181,21 @@ func (be *Backend) SendMsgWithReply(ctx context.Context, op protocol.NatsCommand /* check the size of the Data Field. If its close to our NATS max payload size * then we will chunk the transfer instead */ - log := func(msg string, args ...interface{}) { - fmt.Printf(msg, args...) - fmt.Println() - } + // log := func(msg string, args ...interface{}) { + // fmt.Printf(msg, args...) + // fmt.Println() + // } var chunkedmsg *nats.Msg - chunkedmsg, err = protocol.ChunkSendRequestMsgWithContext(ctx, be.conn, msg, log) + chunkedmsg, err = protocol.ChunkSendRequestMsgWithContext(ctx, be.conn, msg, debug.Log) if err != nil { return errors.Wrapf(err, "ChunkRequestMsgWithContext Error: %d", len(msg.Data)) } - if err := be.enc.Decode(chunkedmsg.Subject, chunkedmsg.Data, recv); err != nil { - return errors.Wrapf(err, "Decode Failed %s %s %d", chunkedmsg.Header.Get("X-RNS-MSGID"), chunkedmsg.Header, len(chunkedmsg.Data)) + if len(chunkedmsg.Data) > 0 { + if err := be.enc.Decode(chunkedmsg.Subject, chunkedmsg.Data, recv); err != nil { + return errors.Wrapf(err, "Decode Failed %s %s %d", chunkedmsg.Header.Get("X-RNS-MSGID"), chunkedmsg.Header, len(chunkedmsg.Data)) + } } return nil } @@ -317,6 +319,7 @@ func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRea if err := b.SendMsgWithReply(context.Background(), protocol.NatsSaveCmd, so, &result); err != nil { return errors.Wrap(err, "Save: SendMsgWithReply Failed") } + debug.Log("Save Result: %+v", result) if result.Ok { return nil } diff --git a/internal/backend/nats/protocol/protocol.go b/internal/backend/nats/protocol/protocol.go index 85f35d74..192136ce 100644 --- a/internal/backend/nats/protocol/protocol.go +++ b/internal/backend/nats/protocol/protocol.go @@ -106,17 +106,19 @@ type RemoveResult struct { } const ( - msgHeaderID string = "X-RNS-MSGID" - msgHeaderChunk string = "X-RNS-CHUNKS" + msgHeaderID string = "X-RNS-MSGID" + msgHeaderChunk string = "X-RNS-CHUNKS" msgHeaderChunkSubject string = "X-RNS-CHUNK-SUBJECT" - msgHeaderChunkSeq string = "X-RNS-CHUNKS-SEQ" - msgHeaderNRI string = "Nats-Request-Info" + msgHeaderChunkSeq string = "X-RNS-CHUNKS-SEQ" + msgHeaderOperation string = "X-RNS-OP" + msgHeaderNRI string = "Nats-Request-Info" ) func copyHeader(msg *nats.Msg) (hdr nats.Header) { hdr = make(nats.Header) hdr.Add(msgHeaderID, msg.Header.Get(msgHeaderID)) hdr.Add(msgHeaderChunk, msg.Header.Get(msgHeaderChunk)) + hdr.Add(msgHeaderOperation, msg.Header.Get(msgHeaderOperation)) return hdr } @@ -141,7 +143,7 @@ func getNRI(msg *nats.Msg) (*nriT, bool) { func NewRNSMsg(subject string) *nats.Msg { msg := nats.NewMsg(subject) msg.Header.Set(msgHeaderID, randStringBytesMaskImprSrcSB(16)) - return msg; + return msg } const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" @@ -172,7 +174,6 @@ func randStringBytesMaskImprSrcSB(n int) string { return sb.String() } - func ChunkSendReplyMsgWithContext(ctx context.Context, conn *nats.Conn, replyto *nats.Msg, msg *nats.Msg, log func(string, ...interface{})) error { if len(msg.Header.Get(msgHeaderID)) == 0 { return errors.New("MessageID Not Set") @@ -202,7 +203,7 @@ func ChunkSendReplyMsgWithContext(ctx context.Context, conn *nats.Conn, replyto maxchunksize = len(msg.Data) } initialchunk.Data = msg.Data[:maxchunksize] - log("Chunking Reply Message %s (%s)- pages %d, len %d First Chunk %d", initialchunk.Header.Get(msgHeaderID), initialchunk.Header, pages, len(msg.Data), len(initialchunk.Data)) + log("Chunking Initial Reply Message %s (%s)- pages %d, len %d First Chunk %d", initialchunk.Header.Get(msgHeaderID), initialchunk.Header, pages, len(msg.Data), len(initialchunk.Data)) chunkchannelmsg, err := conn.RequestMsgWithContext(ctx, initialchunk) if err != nil { return errors.Wrap(err, "ChunkSendReplyMsgWithContext") @@ -213,14 +214,12 @@ func ChunkSendReplyMsgWithContext(ctx context.Context, conn *nats.Conn, replyto return errors.New("Chunked Reply Response didn't include ChunkID") } var chunksubject string - if nri, ok := getNRI(chunkchannelmsg); ok { - chunksubject = fmt.Sprintf("chunk.%s.send.%s", nri.Acc, chunkid ) + if nri, ok := getNRI(replyto); ok { + chunksubject = fmt.Sprintf("chunk.%s.send.%s", nri.Acc, chunkid) } else { - chunksubject = fmt.Sprintf("chunk.send.%s", chunkid ) + chunksubject = fmt.Sprintf("chunk.send.%s", chunkid) } - - - + log("Chunk Reply Subject %s", chunksubject) for i := 1; i <= pages; i++ { chunkmsg := nats.NewMsg(chunksubject) chunkmsg.Header = copyHeader(msg) @@ -232,14 +231,15 @@ func ChunkSendReplyMsgWithContext(ctx context.Context, conn *nats.Conn, replyto end = len(msg.Data) } chunkmsg.Data = msg.Data[start:end] - log("Sending Reply Chunk %s - Page: %d (%d:%d)", chunkmsg.Header.Get(msgHeaderID), i, start, end) + log("Sending Reply Chunk %s - Page: %d of %d (%d:%d)", chunkmsg.Header.Get(msgHeaderID), i, pages, start, end) var chunkack *nats.Msg if i < pages { + log("Sending Chunk to %s", chunkmsg.Subject) chunkack, err = conn.RequestMsgWithContext(ctx, chunkmsg) - log("Chunk Ack Reply: %s %s - Page %d", chunkack.Header.Get(msgHeaderID), chunkack.Header, i) if err != nil { return errors.Wrap(err, "ChunkSendReplyMsgWithContext") } + log("Chunk Ack Reply: %s %s - Page %d", chunkack.Header.Get(msgHeaderID), chunkack.Header, i) } else { err := conn.PublishMsg(chunkmsg) if err != nil { @@ -249,7 +249,7 @@ func ChunkSendReplyMsgWithContext(ctx context.Context, conn *nats.Conn, replyto /* all chunkackorreply */ if i == pages { - return nil + return nil } } return errors.New("Failed") @@ -280,7 +280,7 @@ func ChunkSendRequestMsgWithContext(ctx context.Context, conn *nats.Conn, msg *n * we will end up sending pages + 1 messages * as the initial message contains data as well */ - pages := datasize / maxchunksize + pages := datasize / maxchunksize initialchunk := nats.NewMsg(msg.Subject) initialchunk.Header = copyHeader(msg) @@ -299,9 +299,9 @@ func ChunkSendRequestMsgWithContext(ctx context.Context, conn *nats.Conn, msg *n } var chunksubject string if nri, ok := getNRI(chunkchannelmsg); ok { - chunksubject = fmt.Sprintf("chunk.%s.send.%s", nri.Acc, chunkid ) + chunksubject = fmt.Sprintf("chunk.%s.send.%s", nri.Acc, chunkid) } else { - chunksubject = fmt.Sprintf("chunk.send.%s", chunkid ) + chunksubject = fmt.Sprintf("chunk.send.%s", chunkid) } for i := 1; i <= pages; i++ { @@ -323,7 +323,7 @@ func ChunkSendRequestMsgWithContext(ctx context.Context, conn *nats.Conn, msg *n } log("got Result %s - %s", chunkmsg.Header.Get(msgHeaderID), chunkmsg.Header) /* only the last Chunk Reply will contain the actual Response from the other side */ - if i == pages { + if i == pages { log("SendRequest Chunk Reply: MsgID %s Headers %s Size: %d", chunkackorreply.Header.Get(msgHeaderID), chunkackorreply.Header, len(chunkackorreply.Data)) return ChunkReadRequestMsgWithContext(ctx, conn, chunkackorreply, log) } @@ -344,17 +344,18 @@ func ChunkReadRequestMsgWithContext(ctx context.Context, conn *nats.Conn, msg *n } log("Chunked Message Recieved: %s - %s - %d pages", msg.Header.Get(msgHeaderID), msg.Header, pages) chunktransfer := randStringBytesMaskImprSrcSB(16) - chunkchan := make(chan *nats.Msg) + chunkchan := make(chan *nats.Msg, 10) var chunktransfersubject string if nri, ok := getNRI(msg); ok { - chunktransfersubject = fmt.Sprintf("chunk.%s.recieve.%s", nri.Acc, chunktransfer ) + chunktransfersubject = fmt.Sprintf("chunk.%s.recieve.%s", nri.Acc, chunktransfer) } else { - chunktransfersubject = fmt.Sprintf("chunk.recieve.%s", chunktransfer ) + chunktransfersubject = fmt.Sprintf("chunk.recieve.%s", chunktransfer) } - sub, err := conn.ChanSubscribe(chunktransfersubject, chunkchan) + sub, err := conn.QueueSubscribeSyncWithChan(chunktransfersubject, chunktransfer, chunkchan) if err != nil { return nil, errors.Wrap(err, "Couldn't Subscribe to Chunk Channel") } + sub.SetPendingLimits(1000, 64*1024*1024) log("Subscription: %+v", sub) defer sub.Unsubscribe() defer close(chunkchan) @@ -373,11 +374,13 @@ func ChunkReadRequestMsgWithContext(ctx context.Context, conn *nats.Conn, msg *n if i < pages { ackChunk := nats.NewMsg(chunk.Subject) ackChunk.Header = copyHeader(chunk) + log("sending ack %d %d", i, pages) err := chunk.RespondMsg(ackChunk) if err != nil { return nil, errors.Wrap(err, "Chunk Reply Error") } } else { + log("Chunked Messages.... %d - %d", i, pages) msg.Reply = chunk.Reply } case <-ctx.Done(): From a5d2db2a6a5a2efa113fb214e6c5918ac46f84a9 Mon Sep 17 00:00:00 2001 From: Justin Hammond Date: Thu, 14 Oct 2021 12:31:49 +0800 Subject: [PATCH 7/9] use new nats-restic module for Nats Communication --- go.mod | 9 +- go.sum | 23 +- internal/backend/nats/logger.go | 79 +++++++ internal/backend/nats/nats.go | 369 ++++++++++---------------------- 4 files changed, 212 insertions(+), 268 deletions(-) create mode 100644 internal/backend/nats/logger.go diff --git a/go.mod b/go.mod index e549fd3b..379b4a9b 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,8 @@ require ( github.com/Azure/azure-sdk-for-go v55.6.0+incompatible github.com/Azure/go-autorest/autorest v0.11.19 // indirect github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect + github.com/Fishwaldo/go-logadapter v0.0.2 + github.com/Fishwaldo/restic-nats v0.0.1-rc1 github.com/cenkalti/backoff/v4 v4.1.1 github.com/cespare/xxhash/v2 v2.1.1 github.com/dnaeon/go-vcr v1.2.0 // indirect @@ -18,8 +20,6 @@ require ( github.com/kurin/blazer v0.5.3 github.com/minio/minio-go/v7 v7.0.14 github.com/minio/sha256-simd v1.0.0 - github.com/nats-io/nats-server/v2 v2.6.1 // indirect - github.com/nats-io/nats.go v1.12.3 github.com/ncw/swift/v2 v2.0.0 github.com/pkg/errors v0.9.1 github.com/pkg/profile v1.6.0 @@ -32,11 +32,12 @@ require ( golang.org/x/net v0.0.0-20210614182718-04defd469f4e golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c - golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c + golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e golang.org/x/text v0.3.6 google.golang.org/api v0.50.0 - google.golang.org/protobuf v1.27.1 // indirect gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 ) go 1.14 + +//replace github.com/Fishwaldo/restic-nats => ../restic-nats diff --git a/go.sum b/go.sum index d501be5d..9a046e12 100644 --- a/go.sum +++ b/go.sum @@ -64,10 +64,15 @@ github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUM github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/Fishwaldo/go-logadapter v0.0.2 h1:RxFOr+bEDqQ1rPUmjUX5u8fGLCKY0Lyea+9AxDqzaW4= +github.com/Fishwaldo/go-logadapter v0.0.2/go.mod h1:aRbQ8rWdpeD0WWo241ctqgk/yRto8Axg09EkwWiVGK0= +github.com/Fishwaldo/restic-nats v0.0.1-rc1 h1:Byhk87ggM4AHPU+J1NV50zwZ1D/E7p33gEvl8cR83lo= +github.com/Fishwaldo/restic-nats v0.0.1-rc1/go.mod h1:GlR1upvCwuE0pTyxdU1Ztb8hxCJ63fNH4Gjxo4hG1nc= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= +github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM= github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1PxPBQ= @@ -240,8 +245,9 @@ github.com/klauspost/cpuid/v2 v2.0.4 h1:g0I61F2K2DjRHz1cnxlkNSBIaePVoJIjjnHui8QH github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -283,8 +289,9 @@ github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI= github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= github.com/nats-io/nats-server/v2 v2.6.1 h1:cJy+ia7/4EaJL+ZYDmIy2rD1mDWTfckhtPBU0GYo8xM= github.com/nats-io/nats-server/v2 v2.6.1/go.mod h1:Az91TbZiV7K4a6k/4v6YYdOKEoxCXj+iqhHVf/MlrKo= -github.com/nats-io/nats.go v1.12.3 h1:te0GLbRsjtejEkZKKiuk46tbfIn6FfCSv3WWSo1+51E= github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.13.0 h1:LvYqRB5epIzZWQp6lmeltOOZNLqCvm4b+qfvzZO03HE= +github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= @@ -362,8 +369,10 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= +go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -535,8 +544,8 @@ golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210603125802-9665404d3644/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e h1:WUoyKPm6nCo1BnNUvPGnFG3T5DUVem42yDJZZ4CNxMA= +golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -607,8 +616,9 @@ golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.1.4 h1:cVngSRcfgyZCzys3KYOpCFa+4dqX/Oub9tAq00ttGVs= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -736,8 +746,9 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/ini.v1 v1.57.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU= diff --git a/internal/backend/nats/logger.go b/internal/backend/nats/logger.go new file mode 100644 index 00000000..70ba6062 --- /dev/null +++ b/internal/backend/nats/logger.go @@ -0,0 +1,79 @@ +package nats + + +import ( + "os" + "fmt" + "github.com/restic/restic/internal/debug" + "github.com/Fishwaldo/go-logadapter" +) + +/* compile check against our logAdapter interface */ + +var _ logadapter.Logger = (*resticLogger)(nil) + +/* a custom logger implementation just for Restic */ +type resticLogger struct { + +} + +// Log a Trace Message +func (l *resticLogger) Trace(message string, params ...interface{}) { + debug.Log(message, params...) +} +// Log a Debug Message +func (l *resticLogger) Debug(message string, params ...interface{}) { + debug.Log(message, params...) +} +// Log a Info Message +func (l *resticLogger) Info(message string, params ...interface{}) { + debug.Log(message, params...) +} +// Log a Warn Message +func (l *resticLogger) Warn(message string, params ...interface{}) { + debug.Log(message, params...) +} +// Log a Error Message +func (l *resticLogger) Error(message string, params ...interface{}) { + fmt.Printf("Nats Error: %s\n", fmt.Sprintf(message, params...)) + debug.Log(message, params...) +} +// Log a Fatal Message (some implementations may call os.exit() here) +func (l *resticLogger) Fatal(message string, params ...interface{}) { + fmt.Printf("Nats Fatal: %s\n", fmt.Sprintf(message, params...)) + debug.Log(message, params...) + os.Exit(-1) +} +// Log a Panic Message (some implmentations may call Panic) +func (l *resticLogger) Panic(message string, params ...interface{}) { + fmt.Printf("Nats Panic: %s\n", fmt.Sprintf(message, params...)) + debug.Log(message, params...) + panic(fmt.Sprintf(message, params...)) +} +// Create a New Logger Instance with Name +func (l *resticLogger) New(name string) (logadapter.Logger) { + return l +} +// Add Key/Value Pairs for Structured Logging and return a new Logger +func (l *resticLogger) With(key string, value interface{}) ( logadapter.Logger) { + return l +} +// Set the Log Prefix +func (l *resticLogger) SetPrefix(name string) { +} +// Get the Log Prefix +func (l *resticLogger) GetPrefix() (string) { + return "" +} +// Set Logging Level +func (l *resticLogger) SetLevel(logadapter.Log_Level) { + +} +// Get Logging Level +func(l *resticLogger) GetLevel() (logadapter.Log_Level) { + return logadapter.LOG_TRACE +} +// Sync/Flush the Log Buffers +func (l *resticLogger) Sync() { + +} diff --git a/internal/backend/nats/nats.go b/internal/backend/nats/nats.go index 8d48fa92..ba50b208 100644 --- a/internal/backend/nats/nats.go +++ b/internal/backend/nats/nats.go @@ -9,195 +9,23 @@ import ( "os" "path" "path/filepath" - "time" - "github.com/nats-io/nats.go" "github.com/restic/restic/internal/backend" - "github.com/restic/restic/internal/backend/nats/protocol" + "github.com/Fishwaldo/restic-nats" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" ) -// make sure the rest backend implements restic.Backend +// make sure the nats backend implements restic.Backend var _ restic.Backend = &Backend{} -const defaultLayout = "default" - -// Backend uses the REST protocol to access data stored on a server. +// Backend uses the nats protocol to access workers that interface with the actual repository on behalf of restic type Backend struct { sem *backend.Semaphore backend.Layout cfg Config - conn *nats.Conn - enc nats.Encoder -} - -func connectNats(be *Backend) error { - server := be.cfg.Server.Hostname() - port := be.cfg.Server.Port() - if port == "" { - port = "4222" - } - url := fmt.Sprintf("nats://%s:%s", server, port) - - /* Check Credential File Exists */ - _, err := os.Stat(be.cfg.Credential) - if err != nil { - return errors.Wrap(err, "credential file missing") - } - - var options []nats.Option - if len(be.cfg.Credential) > 0 { - options = append(options, nats.UserCredentials(be.cfg.Credential)) - } - if len(be.cfg.Server.User.Username()) > 0 { - pass, _ := be.cfg.Server.User.Password() - options = append(options, nats.UserInfo(be.cfg.Server.User.Username(), pass)) - } - options = append(options, nats.ClosedHandler(natsClosedCB)) - options = append(options, nats.DisconnectHandler(natsDisconnectedCB)) - - be.conn, err = nats.Connect(url, options...) - if err != nil { - return errors.Wrap(err, "nats connection failed") - } - - if size := be.conn.MaxPayload(); size < 8388608 { - return errors.New("NATS Server Max Payload Size is below 8Mb") - } - - if !be.conn.HeadersSupported() { - return errors.New("server does not support Headers") - } - - be.enc = nats.EncoderForType("gob") - if be.enc == nil { - return errors.New("Can't Load json Decoder") - } - be.sem, err = backend.NewSemaphore(be.cfg.Connections) - - fmt.Printf("Connected to Nats Server: %s (Cluster: %s)\n", be.conn.ConnectedServerName(), be.conn.ConnectedClusterName()) - - if err != nil { - return err - } - return nil -} - -func natsClosedCB(conn *nats.Conn) { - -} - -func natsDisconnectedCB(conn *nats.Conn) { - -} - -func (be *Backend) SendMsgWithReply(ctx context.Context, op protocol.NatsCommand, send interface{}, recv interface{}) error { - var operation string - be.sem.GetToken() - defer be.sem.ReleaseToken() - start := time.Now() - switch op { - case protocol.NatsOpenCmd: - if _, ok := send.(protocol.OpenOp); !ok { - return errors.Errorf("Struct Supplied to SendMsgWithReply is not a openOP: %T", send) - } - if _, ok := recv.(*protocol.OpenResult); !ok { - return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a openResult: %T", recv) - } - operation = "open" - case protocol.NatsStatCmd: - if _, ok := send.(protocol.StatOp); !ok { - return errors.Errorf("Struct Supplied to SendMsgWithReply is not a statOp: %T", send) - } - if _, ok := recv.(*protocol.StatResult); !ok { - return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a statResult %T", recv) - } - operation = "stat" - case protocol.NatsMkdirCmd: - if _, ok := send.(protocol.MkdirOp); !ok { - return errors.Errorf("Struct Supplied to SendMsgWithReply is not a mkdirOp: %T", send) - } - if _, ok := recv.(*protocol.MkdirResult); !ok { - return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a mkdirResult %T", recv) - } - operation = "mkdir" - case protocol.NatsSaveCmd: - if _, ok := send.(protocol.SaveOp); !ok { - return errors.Errorf("Struct Supplied to SendMsgWithReply is not a saveOp: %T", send) - } - if _, ok := recv.(*protocol.SaveResult); !ok { - return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a saveResult %T", recv) - } - operation = "save" - case protocol.NatsListCmd: - if _, ok := send.(protocol.ListOp); !ok { - return errors.Errorf("Struct Supplied to SendMsgWithReply is not a listOp: %T", send) - } - if _, ok := recv.(*protocol.ListResult); !ok { - return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a listResult %T", recv) - } - operation = "list" - case protocol.NatsLoadCmd: - if _, ok := send.(protocol.LoadOp); !ok { - return errors.Errorf("Struct Supplied to SendMsgWithReply is not a loadOp: %T", send) - } - if _, ok := recv.(*protocol.LoadResult); !ok { - return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a loadResult %T", recv) - } - operation = "load" - case protocol.NatsRemoveCmd: - if _, ok := send.(protocol.RemoveOp); !ok { - return errors.Errorf("Struct Supplied to SendMsgWithReply is not a removeOp: %T", send) - } - if _, ok := recv.(*protocol.RemoveResult); !ok { - return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a removeOp %T", recv) - } - operation = "remove" - - } - var err error - subject := fmt.Sprintf("repo.commands.%s", operation) - msg := protocol.NewRNSMsg(subject) - - defer func() { - if err == nil { - debug.Log("SendMsgWithReply (%s - %s) Took %s - %d Bytes", operation, msg.Header.Get("X-RNS-MSGID"), time.Since(start), len(msg.Data)) - } else { - debug.Log("SendMsgWithREply (%s - %s) Failed: %s (Took %s - %d Bytes", operation, msg.Header.Get("X-RNS-MSGID"), err, time.Since(start), len(msg.Data)) - } - }() - - msg.Header.Set("X-RNS-OP", operation) - - msg.Data, err = be.enc.Encode(subject, send) - if err != nil { - return errors.Wrap(err, "Encoding Failed") - } - msg.Reply = nats.NewInbox() - //debug.Log("Sending %s %d\n", msg.Subject, len(msg.Data)) - - /* check the size of the Data Field. If its close to our NATS max payload size - * then we will chunk the transfer instead - */ - // log := func(msg string, args ...interface{}) { - // fmt.Printf(msg, args...) - // fmt.Println() - // } - - var chunkedmsg *nats.Msg - - chunkedmsg, err = protocol.ChunkSendRequestMsgWithContext(ctx, be.conn, msg, debug.Log) - if err != nil { - return errors.Wrapf(err, "ChunkRequestMsgWithContext Error: %d", len(msg.Data)) - } - if len(chunkedmsg.Data) > 0 { - if err := be.enc.Decode(chunkedmsg.Subject, chunkedmsg.Data, recv); err != nil { - return errors.Wrapf(err, "Decode Failed %s %s %d", chunkedmsg.Header.Get("X-RNS-MSGID"), chunkedmsg.Header, len(chunkedmsg.Data)) - } - } - return nil + rns *rns.ResticNatsClient } func Open(ctx context.Context, cfg Config) (*Backend, error) { @@ -211,27 +39,27 @@ func Open(ctx context.Context, cfg Config) (*Backend, error) { be := &Backend{ sem: sem, cfg: cfg, + Layout: &backend.DefaultLayout{Join: path.Join}, } - l, err := backend.ParseLayout(ctx, be, "default", defaultLayout, "") + host, _ := os.Hostname() + be.rns, err = rns.New(*be.cfg.Server, rns.WithName(host), rns.WithLogger(&resticLogger{})) if err != nil { return nil, err } + fmt.Printf("Connected to Nats Server: %s (Cluster: %s)\n", be.rns.Conn.ConnectedServerName(), be.rns.Conn.ConnectedClusterName()) - be.Layout = l + hostname, _ := os.Hostname() - err = connectNats(be) + result, err := be.rns.OpenRepo(ctx, hostname) if err != nil { - return nil, errors.Wrap(err, "open nats failed") + // Communication Error + return nil, err } - - co := protocol.OpenOp{Bucket: be.cfg.Repo} - var result protocol.OpenResult - if err := be.SendMsgWithReply(ctx, protocol.NatsOpenCmd, co, &result); err != nil { - return nil, errors.Wrap(err, "OpenOp Failed") + if !result.Ok { + // Backend Returned a Error + return nil, result.Err } - debug.Log("Open Result: %+v\n", result) - return be, nil } @@ -243,9 +71,9 @@ func Create(ctx context.Context, cfg Config) (*Backend, error) { if err != nil { return nil, errors.Wrap(err, "Create Repo") } - _, err = be.Stat(ctx, restic.Handle{Type: restic.ConfigFile}) - if err == nil { - return nil, errors.New("config file already exists") + exist, _ := be.Test(ctx, restic.Handle{Type: restic.ConfigFile}) + if exist { + return nil, errors.Errorf("config file already exists") } for _, d := range be.Paths() { @@ -272,58 +100,60 @@ func (b *Backend) Hasher() hash.Hash { // Test a boolean value whether a File with the name and type exists. func (b *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) { debug.Log("Test %s - %s", b.cfg.Server.String(), b.Filename(h)) - _, err := b.Stat(ctx, h) + res, err := b.Stat(ctx, h) if err != nil { - return false, nil + return false, err } - return true, nil + if res.Name == b.Filename(h) { + return true, nil + } + return false, nil } // Remove removes a File described by h. func (b *Backend) Remove(ctx context.Context, h restic.Handle) error { debug.Log("Remove %s - %s", b.cfg.Server.String(), b.Filename(h)) - ro := protocol.RemoveOp{Bucket: b.cfg.Repo, Dir: b.Dirname(h), Name: filepath.Base(b.Filename(h))} - var result protocol.RemoveResult - if err := b.SendMsgWithReply(context.Background(), protocol.NatsRemoveCmd, ro, &result); err != nil { - return errors.Wrap(err, "Remove: SendMsgWithReply Failed") + result, err := b.rns.Remove(ctx, b.Dirname(h), filepath.Base(b.Filename(h))) + if err != nil { + //Communication Error + return errors.Wrap(err, "save") } - if result.Ok { - return nil + if !result.Ok { + //Backend returned a Error + return result.Err } - return errors.Errorf("Remove Failed") + return nil } // Close the backend func (b *Backend) Close() error { debug.Log("Close %s", b.cfg.Server.String()) + result, err := b.rns.Close(context.Background()) + if err != nil { + // Communication Error + return errors.Wrap(err, "close") + } + if !result.Ok { + // Backend Returned a Error + return result.Err + } return nil } // Save stores the data from rd under the given handle. func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { debug.Log("Save %s - %s", b.cfg.Server.String(), b.Filename(h)) - var so protocol.SaveOp - so.Dir = b.Dirname(h) - so.Name = filepath.Base(b.Filename(h)) - so.Filesize = rd.Length() - so.Bucket = b.cfg.Repo - var err error - so.Data, err = io.ReadAll(rd) - so.PacketSize = len(so.Data) + result, err := b.rns.Save(ctx, b.Dirname(h), filepath.Base(b.Filename(h)), rd) if err != nil { - return errors.Wrap(err, "Save") + // Communication Error + return errors.Wrap(err, "save") } - so.Offset = 0 - var result protocol.SaveResult - if err := b.SendMsgWithReply(context.Background(), protocol.NatsSaveCmd, so, &result); err != nil { - return errors.Wrap(err, "Save: SendMsgWithReply Failed") + if !result.Ok { + // Backend Returned a Error + return result.Err } - debug.Log("Save Result: %+v", result) - if result.Ok { - return nil - } - return errors.Errorf("Save Failed") + return nil } // Load runs fn with a reader that yields the contents of the file at h at the @@ -336,34 +166,39 @@ func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRea // Implementations are encouraged to use backend.DefaultLoad func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { debug.Log("Load %s - %s (start %d length %d)", b.cfg.Server.String(), b.Filename(h), offset, length) - lo := protocol.LoadOp{Bucket: b.cfg.Repo, Dir: b.Dirname(h), Name: filepath.Base(b.Filename(h)), Length: length, Offset: offset} - var result protocol.LoadResult - if err := b.SendMsgWithReply(context.Background(), protocol.NatsLoadCmd, lo, &result); err != nil { - return errors.Wrap(err, "Save: SendMsgWithReply Failed") + + result, err := b.rns.Load(ctx, b.Dirname(h), filepath.Base(b.Filename(h)), length, offset) + if err != nil { + //Communication Error + return errors.Wrap(err, "Load") } if !result.Ok { - return errors.Errorf("Load Failed") + // Backend Returned a Error + return result.Err } rd := bytes.NewReader(result.Data) if err := fn(rd); err != nil { - return errors.Wrap(err, "Load Read") + return err } return nil } // Stat returns information about the File identified by h. func (b *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { - debug.Log("Stat %s - %s", b.cfg.Server.String(), b.Filename(h)) - op := protocol.StatOp{Bucket: b.cfg.Repo, Filename: b.Filename(h)} - var result protocol.StatResult - if err := b.SendMsgWithReply(context.Background(), protocol.NatsStatCmd, op, &result); err != nil { - return restic.FileInfo{}, errors.Wrap(err, "statOp Failed") + fmt.Printf("Backend: %+v\n", b) + debug.Log("Stat %s", b.Filename(h)) + + result, err := b.rns.Stat(ctx, b.Dirname(h), b.Filename(h)) + + if err != nil { + //Communication Error + return restic.FileInfo{}, errors.Wrap(err, "Stat") } - if result.Ok { - return restic.FileInfo{Size: result.Size, Name: h.Name}, nil - } else { - return restic.FileInfo{}, errors.New("File does not exist") + if !result.Ok { + // Backend Returned a Error + return restic.FileInfo{}, result.Err } + return restic.FileInfo{Size: result.Size, Name: h.Name}, nil } // List runs fn for each file in the backend which has the type t. When an @@ -375,16 +210,19 @@ func (b *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, e // The function fn is called in the same Goroutine that List() is called // from. func (b *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { - basedir, subdirs := b.Basedir(t) - debug.Log("List %s - %s - Subdirs? %t", b.cfg.Server.String(), basedir, subdirs) - op := protocol.ListOp{Bucket: b.cfg.Repo, BaseDir: basedir, SubDir: subdirs} - var result protocol.ListResult - if err := b.SendMsgWithReply(context.Background(), protocol.NatsListCmd, op, &result); err != nil { - return errors.Wrap(err, "listOp Failed") + dir, recursive := b.Basedir(t) + debug.Log("List %s - %s - Subdirs? %t", b.cfg.Server.String(),dir, recursive) + result, err := b.rns.List(ctx, dir, recursive) + if err != nil { + //Communication Error + return errors.Wrap(err, "List") + } + if !result.Ok { + //Backend Returned a Error + return result.Err } for _, fi := range result.FI { rfi := restic.FileInfo{Name: fi.Name, Size: fi.Size} - if err := fn(rfi); err != nil { return err } @@ -395,37 +233,52 @@ func (b *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.Fi // IsNotExist returns true if the error was caused by a non-existing file // in the backend. func (b *Backend) IsNotExist(err error) bool { - debug.Log("IsNotExist %s (TODO) - %s", b.cfg.Server.String(), err) + debug.Log("IsNotExist %s (TODO) - %T", b.cfg.Server.String(), err) fmt.Printf("IsNotExist Called\n") return false } +func (b *Backend) removeKeys(ctx context.Context, t restic.FileType) error { + return b.List(ctx, t, func(fi restic.FileInfo) error { + return b.Remove(ctx, restic.Handle{Type: t, Name: fi.Name}) + }) +} + // Delete removes all data in the backend. func (b *Backend) Delete(ctx context.Context) error { - debug.Log("Delete %s (TODO)", b.cfg.Server.String()) - return errors.Errorf("TODO Delete") -} + alltypes := []restic.FileType{ + restic.PackFile, + restic.KeyFile, + restic.LockFile, + restic.SnapshotFile, + restic.IndexFile} -func (b *Backend) Join(p ...string) string { - return path.Join(p...) -} + for _, t := range alltypes { + err := b.removeKeys(ctx, t) + if err != nil { + return nil + } + } -func (b *Backend) ReadDir(ctx context.Context, dir string) ([]os.FileInfo, error) { - debug.Log("ReadDir %s (TODO) - %s", b.cfg.Server.String(), dir) - return nil, errors.Errorf("TODO: ReadDir") + err := b.Remove(ctx, restic.Handle{Type: restic.ConfigFile}) + if err != nil && !b.IsNotExist(err) { + return err + } + + return nil } func (b *Backend) Mkdir(ctx context.Context, dir string) error { debug.Log("Mkdir %s - %s", b.cfg.Server.String(), dir) - op := protocol.MkdirOp{Bucket: b.cfg.Repo, Dir: dir} - var result protocol.MkdirResult - if err := b.SendMsgWithReply(context.Background(), protocol.NatsMkdirCmd, op, &result); err != nil { - return errors.Wrap(err, "natsMkdirCmd Failed") + result, err := b.rns.Mkdir(ctx, dir) + if err != nil { + //Communication Error + return errors.Wrap(err, "Mkdir") } - if result.Ok { - return nil - } else { - return errors.New("mkdir Failed") + if !result.Ok { + //Backend Returned a Error + return result.Err } + return nil } From 82fca6d76aa12e8265ef7252e8bf9702a49c38dd Mon Sep 17 00:00:00 2001 From: Justin Hammond Date: Thu, 14 Oct 2021 12:37:56 +0800 Subject: [PATCH 8/9] remove protocol files --- internal/backend/nats/protocol/protocol.go | 394 --------------------- 1 file changed, 394 deletions(-) delete mode 100644 internal/backend/nats/protocol/protocol.go diff --git a/internal/backend/nats/protocol/protocol.go b/internal/backend/nats/protocol/protocol.go deleted file mode 100644 index 192136ce..00000000 --- a/internal/backend/nats/protocol/protocol.go +++ /dev/null @@ -1,394 +0,0 @@ -package protocol - -import ( - "context" - "encoding/json" - "fmt" - "math/rand" - "strconv" - "strings" - "time" - - "github.com/nats-io/nats.go" - "github.com/pkg/errors" -) - -type NatsCommand int - -const ( - NatsOpenCmd NatsCommand = iota - NatsStatCmd - NatsMkdirCmd - NatsSaveCmd - NatsListCmd - NatsLoadCmd - NatsRemoveCmd -) - -type OpenOp struct { - Bucket string `json:"bucket"` -} -type OpenResult struct { - Ok bool `json:"ok"` -} - -type StatOp struct { - Bucket string `json:"bucket"` - Filename string `json:"filename"` -} - -type StatResult struct { - Ok bool `json:"ok"` - Size int64 `json:"size"` - Name string `json:"name"` -} - -type MkdirOp struct { - Bucket string `json:"bucket"` - Dir string `json:"dir"` -} - -type MkdirResult struct { - Ok bool `json:"ok"` -} - -type SaveOp struct { - Bucket string `json:"bucket"` - Dir string `json:"dir"` - Name string `json:"name"` - Filesize int64 `json:"size"` - PacketSize int `json:"packet_size"` - Offset int64 `json:"offset"` - Data []byte `json:"data"` -} - -type SaveResult struct { - Ok bool `json:"ok"` -} - -type ListOp struct { - Bucket string `json:"bucket"` - BaseDir string `json:"base_dir"` - SubDir bool `json:"sub_dir"` -} - -type FileInfo struct { - Name string `json:"name"` - Size int64 `json:"size"` -} - -type ListResult struct { - Ok bool `json:"ok"` - FI []FileInfo `json:"fi"` -} - -type LoadOp struct { - Bucket string `json:"bucket"` - Dir string `json:"dir"` - Name string `json:"name"` - Length int `json:"length"` - Offset int64 `json:"offset"` -} - -type LoadResult struct { - Ok bool `json:"ok"` - Data []byte `json:"data"` -} - -type RemoveOp struct { - Bucket string `json:"bucket"` - Dir string `json:"dir"` - Name string `json:"name"` -} - -type RemoveResult struct { - Ok bool `json:"ok"` -} - -const ( - msgHeaderID string = "X-RNS-MSGID" - msgHeaderChunk string = "X-RNS-CHUNKS" - msgHeaderChunkSubject string = "X-RNS-CHUNK-SUBJECT" - msgHeaderChunkSeq string = "X-RNS-CHUNKS-SEQ" - msgHeaderOperation string = "X-RNS-OP" - msgHeaderNRI string = "Nats-Request-Info" -) - -func copyHeader(msg *nats.Msg) (hdr nats.Header) { - hdr = make(nats.Header) - hdr.Add(msgHeaderID, msg.Header.Get(msgHeaderID)) - hdr.Add(msgHeaderChunk, msg.Header.Get(msgHeaderChunk)) - hdr.Add(msgHeaderOperation, msg.Header.Get(msgHeaderOperation)) - return hdr -} - -type nriT struct { - Acc string `json:"acc"` - Rtt int `json:"rtt"` -} - -func getNRI(msg *nats.Msg) (*nriT, bool) { - nri := msg.Header.Get(msgHeaderNRI) - if nri == "" { - return nil, false - } - var res nriT - if err := json.Unmarshal([]byte(nri), &res); err != nil { - return nil, false - } - return &res, true -} - -// NewRNSMSG Returns a New RNS Message (for each "Transaction") -func NewRNSMsg(subject string) *nats.Msg { - msg := nats.NewMsg(subject) - msg.Header.Set(msgHeaderID, randStringBytesMaskImprSrcSB(16)) - return msg -} - -const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" -const ( - letterIdxBits = 6 // 6 bits to represent a letter index - letterIdxMask = 1<= 0; { - if remain == 0 { - cache, remain = src.Int63(), letterIdxMax - } - if idx := int(cache & letterIdxMask); idx < len(letterBytes) { - sb.WriteByte(letterBytes[idx]) - i-- - } - cache >>= letterIdxBits - remain-- - } - - return sb.String() -} - -func ChunkSendReplyMsgWithContext(ctx context.Context, conn *nats.Conn, replyto *nats.Msg, msg *nats.Msg, log func(string, ...interface{})) error { - if len(msg.Header.Get(msgHeaderID)) == 0 { - return errors.New("MessageID Not Set") - } - - var maxchunksize int = int(0.95 * float32(conn.MaxPayload())) - maxchunksize = 1024000 * 0.95 - datasize := len(msg.Data) - log("ChunkSendReplyMsgWithContext: MsgID %s - Headers %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data)) - - if len(msg.Data) < maxchunksize { - /* data is less then our maxchunksize, so we can just send it */ - log("ChunkSendReplyMsgWithContext: Short Reply Message %s", msg.Header.Get(msgHeaderID)) - err := replyto.RespondMsg(msg) - return errors.Wrap(err, "Short Reply Message Send Failure") - } - - /* need to Split the Data into Chunks - * we will end up sending pages + 1 messages - * as the initial message contains data as well - */ - pages := datasize / maxchunksize - initialchunk := nats.NewMsg(msg.Subject) - initialchunk.Header = copyHeader(msg) - initialchunk.Header.Set(msgHeaderChunk, fmt.Sprintf("%d", pages)) - if len(msg.Data) < maxchunksize { - maxchunksize = len(msg.Data) - } - initialchunk.Data = msg.Data[:maxchunksize] - log("Chunking Initial Reply Message %s (%s)- pages %d, len %d First Chunk %d", initialchunk.Header.Get(msgHeaderID), initialchunk.Header, pages, len(msg.Data), len(initialchunk.Data)) - chunkchannelmsg, err := conn.RequestMsgWithContext(ctx, initialchunk) - if err != nil { - return errors.Wrap(err, "ChunkSendReplyMsgWithContext") - } - /* Reply Message just has a header with the subject we send the rest of the chunks to */ - chunkid := chunkchannelmsg.Header.Get(msgHeaderChunkSubject) - if chunkid == "" { - return errors.New("Chunked Reply Response didn't include ChunkID") - } - var chunksubject string - if nri, ok := getNRI(replyto); ok { - chunksubject = fmt.Sprintf("chunk.%s.send.%s", nri.Acc, chunkid) - } else { - chunksubject = fmt.Sprintf("chunk.send.%s", chunkid) - } - log("Chunk Reply Subject %s", chunksubject) - for i := 1; i <= pages; i++ { - chunkmsg := nats.NewMsg(chunksubject) - chunkmsg.Header = copyHeader(msg) - chunkmsg.Header.Set(msgHeaderChunkSeq, fmt.Sprintf("%d", i)) - start := maxchunksize * i - end := maxchunksize * (i + 1) - /* make sure we don't overrun our slice */ - if end > len(msg.Data) { - end = len(msg.Data) - } - chunkmsg.Data = msg.Data[start:end] - log("Sending Reply Chunk %s - Page: %d of %d (%d:%d)", chunkmsg.Header.Get(msgHeaderID), i, pages, start, end) - var chunkack *nats.Msg - if i < pages { - log("Sending Chunk to %s", chunkmsg.Subject) - chunkack, err = conn.RequestMsgWithContext(ctx, chunkmsg) - if err != nil { - return errors.Wrap(err, "ChunkSendReplyMsgWithContext") - } - log("Chunk Ack Reply: %s %s - Page %d", chunkack.Header.Get(msgHeaderID), chunkack.Header, i) - } else { - err := conn.PublishMsg(chunkmsg) - if err != nil { - return errors.Wrap(err, "ChunkSendReplyMsgWithContext") - } - } - - /* all chunkackorreply */ - if i == pages { - return nil - } - } - return errors.New("Failed") -} - -func ChunkSendRequestMsgWithContext(ctx context.Context, conn *nats.Conn, msg *nats.Msg, log func(string, ...interface{})) (*nats.Msg, error) { - if len(msg.Header.Get(msgHeaderID)) == 0 { - return nil, errors.New("MessageID Not Set") - } - - var maxchunksize int = int(0.95 * float32(conn.MaxPayload())) - maxchunksize = 1024000 * 0.95 - datasize := len(msg.Data) - log("ChunkSendRequestMsgWithContext: MsgID %s - Headers %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data)) - - if len(msg.Data) < maxchunksize { - /* data is less then our maxchunksize, so we can just send it */ - log("Short SendRequest MsgID %s - %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data)) - reply, err := conn.RequestMsgWithContext(ctx, msg) - if err != nil { - return nil, errors.Wrap(err, "Short Message Send Failure") - } - log("Short ReplyRequest MsgID %s Headers %s Size: %d", reply.Header.Get(msgHeaderID), reply.Header, len(reply.Data)) - return ChunkReadRequestMsgWithContext(ctx, conn, reply, log) - } - - /* need to Split the Data into Chunks - * we will end up sending pages + 1 messages - * as the initial message contains data as well - */ - pages := datasize / maxchunksize - - initialchunk := nats.NewMsg(msg.Subject) - initialchunk.Header = copyHeader(msg) - initialchunk.Header.Set(msgHeaderChunk, fmt.Sprintf("%d", pages)) - - initialchunk.Data = msg.Data[:maxchunksize] - log("Chunking Send Request MsgID %s - %s- pages %d, len %d First Chunk %d", initialchunk.Header.Get(msgHeaderID), initialchunk.Header, pages, len(msg.Data), len(initialchunk.Data)) - chunkchannelmsg, err := conn.RequestMsgWithContext(ctx, initialchunk) - if err != nil { - return nil, errors.Wrap(err, "chunkRequestMsgWithContext") - } - /* Reply Message just has a header with the subject we send the rest of the chunks to */ - chunkid := chunkchannelmsg.Header.Get(msgHeaderChunkSubject) - if chunkid == "" { - return nil, errors.New("Chunked Reply Response didn't include ChunkID") - } - var chunksubject string - if nri, ok := getNRI(chunkchannelmsg); ok { - chunksubject = fmt.Sprintf("chunk.%s.send.%s", nri.Acc, chunkid) - } else { - chunksubject = fmt.Sprintf("chunk.send.%s", chunkid) - } - - for i := 1; i <= pages; i++ { - chunkmsg := nats.NewMsg(chunksubject) - chunkmsg.Header = copyHeader(msg) - chunkmsg.Header.Set(msgHeaderChunkSeq, fmt.Sprintf("%d", i)) - start := maxchunksize * i - end := maxchunksize * (i + 1) - /* make sure we don't overrun our slice */ - if end > len(msg.Data) { - end = len(msg.Data) - } - chunkmsg.Data = msg.Data[start:end] - log("Sending Request Chunk %s %s to %s- Page: %d (%d:%d)", chunkmsg.Header.Get(msgHeaderID), chunkmsg.Header, chunkmsg.Subject, i, start, end) - var chunkackorreply *nats.Msg - chunkackorreply, err = conn.RequestMsgWithContext(ctx, chunkmsg) - if err != nil { - return nil, errors.Wrap(err, "Chunk Send") - } - log("got Result %s - %s", chunkmsg.Header.Get(msgHeaderID), chunkmsg.Header) - /* only the last Chunk Reply will contain the actual Response from the other side */ - if i == pages { - log("SendRequest Chunk Reply: MsgID %s Headers %s Size: %d", chunkackorreply.Header.Get(msgHeaderID), chunkackorreply.Header, len(chunkackorreply.Data)) - return ChunkReadRequestMsgWithContext(ctx, conn, chunkackorreply, log) - } - } - return nil, errors.New("Failed") -} - -func ChunkReadRequestMsgWithContext(ctx context.Context, conn *nats.Conn, msg *nats.Msg, log func(string, ...interface{})) (*nats.Msg, error) { - if len(msg.Header.Get(msgHeaderID)) == 0 { - return nil, errors.New("MessageID Not Set") - } - log("ChunkReadRequestMsgWithContext: MsgID %s - Headers %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data)) - chunked := msg.Header.Get(msgHeaderChunk) - if chunked != "" { - pages, err := strconv.Atoi(chunked) - if err != nil { - return nil, errors.Wrap(err, "Couldn't get Chunk Page Count") - } - log("Chunked Message Recieved: %s - %s - %d pages", msg.Header.Get(msgHeaderID), msg.Header, pages) - chunktransfer := randStringBytesMaskImprSrcSB(16) - chunkchan := make(chan *nats.Msg, 10) - var chunktransfersubject string - if nri, ok := getNRI(msg); ok { - chunktransfersubject = fmt.Sprintf("chunk.%s.recieve.%s", nri.Acc, chunktransfer) - } else { - chunktransfersubject = fmt.Sprintf("chunk.recieve.%s", chunktransfer) - } - sub, err := conn.QueueSubscribeSyncWithChan(chunktransfersubject, chunktransfer, chunkchan) - if err != nil { - return nil, errors.Wrap(err, "Couldn't Subscribe to Chunk Channel") - } - sub.SetPendingLimits(1000, 64*1024*1024) - log("Subscription: %+v", sub) - defer sub.Unsubscribe() - defer close(chunkchan) - chunksubmsg := nats.NewMsg(msg.Reply) - chunksubmsg.Header = copyHeader(msg) - chunksubmsg.Header.Add(msgHeaderChunkSubject, chunktransfer) - msg.RespondMsg(chunksubmsg) - /* pages - 1 because we got first Chunk in original message */ - for i := 1; i <= pages; i++ { - log("Pending MsgID %s Chunk %d of %d on %s", chunksubmsg.Header.Get(msgHeaderID), i, pages, chunktransfersubject) - select { - case chunk := <-chunkchan: - seq, _ := strconv.Atoi(chunk.Header.Get(msgHeaderChunkSeq)) - log("Got MsgID %s - %s Chunk %d %d", chunk.Header.Get(msgHeaderID), chunk.Header, seq, i) - msg.Data = append(msg.Data, chunk.Data...) - if i < pages { - ackChunk := nats.NewMsg(chunk.Subject) - ackChunk.Header = copyHeader(chunk) - log("sending ack %d %d", i, pages) - err := chunk.RespondMsg(ackChunk) - if err != nil { - return nil, errors.Wrap(err, "Chunk Reply Error") - } - } else { - log("Chunked Messages.... %d - %d", i, pages) - msg.Reply = chunk.Reply - } - case <-ctx.Done(): - log("Context Canceled") - return nil, context.DeadlineExceeded - } - } - log("Chunked Messages Done - %s - %s Final Size %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data)) - } - return msg, nil -} From 740df5861f0dbefae88c23ef0e37e9cfe682080c Mon Sep 17 00:00:00 2001 From: Justin Hammond Date: Fri, 15 Oct 2021 00:35:20 +0800 Subject: [PATCH 9/9] upgrade the restic-nats module --- go.mod | 4 ++-- go.sum | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 379b4a9b..b93ddb35 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/Azure/go-autorest/autorest v0.11.19 // indirect github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect github.com/Fishwaldo/go-logadapter v0.0.2 - github.com/Fishwaldo/restic-nats v0.0.1-rc1 + github.com/Fishwaldo/restic-nats v0.0.1-rc2 github.com/cenkalti/backoff/v4 v4.1.1 github.com/cespare/xxhash/v2 v2.1.1 github.com/dnaeon/go-vcr v1.2.0 // indirect @@ -28,7 +28,7 @@ require ( github.com/restic/chunker v0.4.0 github.com/spf13/cobra v1.2.1 github.com/spf13/pflag v1.0.5 - golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e + golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 golang.org/x/net v0.0.0-20210614182718-04defd469f4e golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c diff --git a/go.sum b/go.sum index 9a046e12..69fd33da 100644 --- a/go.sum +++ b/go.sum @@ -68,6 +68,8 @@ github.com/Fishwaldo/go-logadapter v0.0.2 h1:RxFOr+bEDqQ1rPUmjUX5u8fGLCKY0Lyea+9 github.com/Fishwaldo/go-logadapter v0.0.2/go.mod h1:aRbQ8rWdpeD0WWo241ctqgk/yRto8Axg09EkwWiVGK0= github.com/Fishwaldo/restic-nats v0.0.1-rc1 h1:Byhk87ggM4AHPU+J1NV50zwZ1D/E7p33gEvl8cR83lo= github.com/Fishwaldo/restic-nats v0.0.1-rc1/go.mod h1:GlR1upvCwuE0pTyxdU1Ztb8hxCJ63fNH4Gjxo4hG1nc= +github.com/Fishwaldo/restic-nats v0.0.1-rc2 h1:qQBRQgCzlwz/kFXr3YSW6Vm5Q4W1tiUz+LqYiPOi8u4= +github.com/Fishwaldo/restic-nats v0.0.1-rc2/go.mod h1:GlR1upvCwuE0pTyxdU1Ztb8hxCJ63fNH4Gjxo4hG1nc= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= @@ -387,6 +389,8 @@ golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=