mirror of
https://github.com/Fishwaldo/restic.git
synced 2025-03-15 19:41:38 +00:00
Initial Version of a nats.io storage backend.
This commit is contained in:
parent
24088f8307
commit
487f2dce5d
6 changed files with 516 additions and 4 deletions
|
@ -18,6 +18,7 @@ import (
|
|||
"github.com/restic/restic/internal/backend/gs"
|
||||
"github.com/restic/restic/internal/backend/local"
|
||||
"github.com/restic/restic/internal/backend/location"
|
||||
"github.com/restic/restic/internal/backend/nats"
|
||||
"github.com/restic/restic/internal/backend/rclone"
|
||||
"github.com/restic/restic/internal/backend/rest"
|
||||
"github.com/restic/restic/internal/backend/s3"
|
||||
|
@ -650,8 +651,14 @@ func parseConfig(loc location.Location, opts options.Options) (interface{}, erro
|
|||
|
||||
debug.Log("opening rest repository at %#v", cfg)
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
case "nats":
|
||||
cfg := loc.Config.(nats.Config)
|
||||
if err := opts.Apply(loc.Scheme, &cfg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
debug.Log("opening nats Repository at %#v", cfg)
|
||||
return cfg, nil
|
||||
}
|
||||
return nil, errors.Fatalf("invalid backend: %q", loc.Scheme)
|
||||
}
|
||||
|
||||
|
@ -703,7 +710,8 @@ func open(s string, gopts GlobalOptions, opts options.Options) (restic.Backend,
|
|||
be, err = rest.Open(cfg.(rest.Config), rt)
|
||||
case "rclone":
|
||||
be, err = rclone.Open(cfg.(rclone.Config), lim)
|
||||
|
||||
case "nats":
|
||||
be, err = nats.Open(globalOptions.ctx, cfg.(nats.Config))
|
||||
default:
|
||||
return nil, errors.Fatalf("invalid backend: %q", loc.Scheme)
|
||||
}
|
||||
|
@ -780,6 +788,8 @@ func create(s string, opts options.Options) (restic.Backend, error) {
|
|||
return rest.Create(globalOptions.ctx, cfg.(rest.Config), rt)
|
||||
case "rclone":
|
||||
return rclone.Create(globalOptions.ctx, cfg.(rclone.Config))
|
||||
case "nats":
|
||||
return nats.Create(globalOptions.ctx, cfg.(nats.Config))
|
||||
}
|
||||
|
||||
debug.Log("invalid repository scheme: %v", s)
|
||||
|
|
4
go.mod
4
go.mod
|
@ -6,6 +6,7 @@ require (
|
|||
github.com/Azure/azure-sdk-for-go v55.6.0+incompatible
|
||||
github.com/Azure/go-autorest/autorest v0.11.19 // indirect
|
||||
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
|
||||
github.com/Fishwaldo/restic-nats-server/protocol v0.0.0-00010101000000-000000000000
|
||||
github.com/cenkalti/backoff/v4 v4.1.1
|
||||
github.com/cespare/xxhash/v2 v2.1.1
|
||||
github.com/dnaeon/go-vcr v1.2.0 // indirect
|
||||
|
@ -18,6 +19,7 @@ require (
|
|||
github.com/kurin/blazer v0.5.3
|
||||
github.com/minio/minio-go/v7 v7.0.12
|
||||
github.com/minio/sha256-simd v1.0.0
|
||||
github.com/nats-io/nats.go v1.12.3
|
||||
github.com/ncw/swift/v2 v2.0.0
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/pkg/profile v1.6.0
|
||||
|
@ -36,4 +38,6 @@ require (
|
|||
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
|
||||
)
|
||||
|
||||
replace github.com/Fishwaldo/restic-nats-server/protocol => /home/fish/restic-nats-server/protocol
|
||||
|
||||
go 1.14
|
||||
|
|
25
go.sum
25
go.sum
|
@ -231,6 +231,8 @@ github.com/juju/ratelimit v1.0.1 h1:+7AIFJVQ0EQgq/K9+0Krm7m530Du7tIz0METWzN0RgY=
|
|||
github.com/juju/ratelimit v1.0.1/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk=
|
||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s=
|
||||
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
|
||||
github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
|
||||
github.com/klauspost/cpuid v1.3.1 h1:5JNjFYYQrZeKRJ0734q51WCEEn2huer72Dc7K+R/b6s=
|
||||
github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4=
|
||||
|
@ -249,6 +251,8 @@ github.com/magiconair/properties v1.8.5/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPK
|
|||
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
|
||||
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
|
||||
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
||||
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
|
||||
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
|
||||
github.com/minio/md5-simd v1.1.0 h1:QPfiOqlZH+Cj9teu0t9b1nTBfPbyTl16Of5MeuShdK4=
|
||||
github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=
|
||||
github.com/minio/minio-go/v7 v7.0.12 h1:/4pxUdwn9w0QEryNkrrWaodIESPRX+NxpO0Q6hVdaAA=
|
||||
|
@ -273,6 +277,19 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN
|
|||
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
|
||||
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
|
||||
github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8=
|
||||
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
|
||||
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
|
||||
github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
|
||||
github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
|
||||
github.com/nats-io/nats-server/v2 v2.6.1 h1:cJy+ia7/4EaJL+ZYDmIy2rD1mDWTfckhtPBU0GYo8xM=
|
||||
github.com/nats-io/nats-server/v2 v2.6.1/go.mod h1:Az91TbZiV7K4a6k/4v6YYdOKEoxCXj+iqhHVf/MlrKo=
|
||||
github.com/nats-io/nats.go v1.12.3 h1:te0GLbRsjtejEkZKKiuk46tbfIn6FfCSv3WWSo1+51E=
|
||||
github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
|
||||
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
|
||||
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
github.com/ncw/swift/v2 v2.0.0 h1:Q1jkMe/yhCkx7yAKq4bBZ/Th3NR+ejRcwbVK8Pi1i/0=
|
||||
github.com/ncw/swift/v2 v2.0.0/go.mod h1:z0A9RVdYPjNjXVo2pDOPxZ4eu3oarO1P91fTItcb+Kg=
|
||||
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
|
||||
|
@ -353,9 +370,11 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
|
|||
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
|
||||
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
||||
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
||||
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI=
|
||||
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
|
@ -466,6 +485,7 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ
|
|||
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
|
@ -532,6 +552,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
|||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
|
||||
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
|
||||
|
@ -710,8 +732,9 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
|
|||
google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
|
||||
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
|
||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
|
||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
|
||||
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/restic/restic/internal/backend/b2"
|
||||
"github.com/restic/restic/internal/backend/gs"
|
||||
"github.com/restic/restic/internal/backend/local"
|
||||
"github.com/restic/restic/internal/backend/nats"
|
||||
"github.com/restic/restic/internal/backend/rclone"
|
||||
"github.com/restic/restic/internal/backend/rest"
|
||||
"github.com/restic/restic/internal/backend/s3"
|
||||
|
@ -41,6 +42,7 @@ var parsers = []parser{
|
|||
{"swift", swift.ParseConfig, noPassword},
|
||||
{"rest", rest.ParseConfig, rest.StripPassword},
|
||||
{"rclone", rclone.ParseConfig, noPassword},
|
||||
{"nats", nats.ParseConfig, noPassword},
|
||||
}
|
||||
|
||||
// noPassword returns the repository location unchanged (there's no sensitive information there)
|
||||
|
|
56
internal/backend/nats/config.go
Normal file
56
internal/backend/nats/config.go
Normal file
|
@ -0,0 +1,56 @@
|
|||
package nats
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/options"
|
||||
)
|
||||
|
||||
// Config contains all configuration necessary to connect to a REST server.
|
||||
type Config struct {
|
||||
Server *url.URL
|
||||
Credential string `option:"credentialfile" help:"Path to the NatsIO Credential File"`
|
||||
Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 5)"`
|
||||
Repo string
|
||||
}
|
||||
|
||||
func init() {
|
||||
options.Register("natsio", Config{})
|
||||
}
|
||||
|
||||
// NewConfig returns a new Config with the default values filled in.
|
||||
func NewConfig() Config {
|
||||
return Config{
|
||||
Connections: 5,
|
||||
}
|
||||
}
|
||||
|
||||
// ParseConfig parses the string s and extracts the REST server URL.
|
||||
func ParseConfig(s string) (interface{}, error) {
|
||||
if !strings.HasPrefix(s, "nats:") {
|
||||
return nil, errors.New("invalid REST backend specification")
|
||||
}
|
||||
|
||||
u, err := url.Parse(s)
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "url.Parse")
|
||||
}
|
||||
|
||||
cfg := NewConfig()
|
||||
cfg.Server = u
|
||||
var repo string
|
||||
if cfg.Server.Path[0] == '/' {
|
||||
repo = cfg.Server.Path[1:]
|
||||
}
|
||||
if repo[len(repo)-1] == '/' {
|
||||
repo = repo[0:len(repo)-1]
|
||||
}
|
||||
// replace any further slashes with . to specify a nested queue
|
||||
repo = strings.Replace(repo, "/", ".", -1)
|
||||
|
||||
cfg.Repo = repo
|
||||
return cfg, nil
|
||||
}
|
417
internal/backend/nats/nats.go
Normal file
417
internal/backend/nats/nats.go
Normal file
|
@ -0,0 +1,417 @@
|
|||
package nats
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"hash"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/Fishwaldo/restic-nats-server/protocol"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
// make sure the rest backend implements restic.Backend
|
||||
var _ restic.Backend = &Backend{}
|
||||
|
||||
const defaultLayout = "default"
|
||||
|
||||
// Backend uses the REST protocol to access data stored on a server.
|
||||
type Backend struct {
|
||||
sem *backend.Semaphore
|
||||
backend.Layout
|
||||
cfg Config
|
||||
conn *nats.Conn
|
||||
enc nats.Encoder
|
||||
}
|
||||
|
||||
func connectNats(be *Backend) error {
|
||||
server := be.cfg.Server.Hostname()
|
||||
port := be.cfg.Server.Port()
|
||||
if port == "" {
|
||||
port = "4222"
|
||||
}
|
||||
url := fmt.Sprintf("nats://%s:%s", server, port)
|
||||
|
||||
/* Check Credential File Exists */
|
||||
_, err := os.Stat(be.cfg.Credential)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "credential file missing")
|
||||
}
|
||||
|
||||
var options []nats.Option
|
||||
options = append(options, nats.UserCredentials(be.cfg.Credential))
|
||||
options = append(options, nats.ClosedHandler(natsClosedCB))
|
||||
options = append(options, nats.DisconnectHandler(natsDisconnectedCB))
|
||||
|
||||
be.conn, err = nats.Connect(url, options...)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "nats connection failed")
|
||||
}
|
||||
|
||||
if size := be.conn.MaxPayload(); size < 8388608 {
|
||||
return errors.New("NATS Server Max Payload Size is below 8Mb")
|
||||
}
|
||||
|
||||
if !be.conn.HeadersSupported() {
|
||||
return errors.New("server does not support Headers")
|
||||
}
|
||||
|
||||
be.enc = nats.EncoderForType("gob")
|
||||
if be.enc == nil {
|
||||
return errors.New("Can't Load json Decoder")
|
||||
}
|
||||
be.sem, err = backend.NewSemaphore(be.cfg.Connections)
|
||||
|
||||
fmt.Printf("Connected to Nats Server: %s (Cluster: %s)\n", be.conn.ConnectedServerName(), be.conn.ConnectedClusterName())
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func natsClosedCB(conn *nats.Conn) {
|
||||
|
||||
}
|
||||
|
||||
func natsDisconnectedCB(conn *nats.Conn) {
|
||||
|
||||
}
|
||||
|
||||
func (be *Backend) SendMsgWithReply(ctx context.Context, op protocol.NatsCommand, send interface{}, recv interface{}) error {
|
||||
var operation string
|
||||
be.sem.GetToken()
|
||||
defer be.sem.ReleaseToken()
|
||||
start := time.Now()
|
||||
switch op {
|
||||
case protocol.NatsOpenCmd:
|
||||
if _, ok := send.(protocol.OpenOp); !ok {
|
||||
return errors.Errorf("Struct Supplied to SendMsgWithReply is not a openOP: %T", send)
|
||||
}
|
||||
if _, ok := recv.(*protocol.OpenResult); !ok {
|
||||
return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a openResult: %T", recv)
|
||||
}
|
||||
operation = "open"
|
||||
case protocol.NatsStatCmd:
|
||||
if _, ok := send.(protocol.StatOp); !ok {
|
||||
return errors.Errorf("Struct Supplied to SendMsgWithReply is not a statOp: %T", send)
|
||||
}
|
||||
if _, ok := recv.(*protocol.StatResult); !ok {
|
||||
return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a statResult %T", recv)
|
||||
}
|
||||
operation = "stat"
|
||||
case protocol.NatsMkdirCmd:
|
||||
if _, ok := send.(protocol.MkdirOp); !ok {
|
||||
return errors.Errorf("Struct Supplied to SendMsgWithReply is not a mkdirOp: %T", send)
|
||||
}
|
||||
if _, ok := recv.(*protocol.MkdirResult); !ok {
|
||||
return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a mkdirResult %T", recv)
|
||||
}
|
||||
operation = "mkdir"
|
||||
case protocol.NatsSaveCmd:
|
||||
if _, ok := send.(protocol.SaveOp); !ok {
|
||||
return errors.Errorf("Struct Supplied to SendMsgWithReply is not a saveOp: %T", send)
|
||||
}
|
||||
if _, ok := recv.(*protocol.SaveResult); !ok {
|
||||
return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a saveResult %T", recv)
|
||||
}
|
||||
operation = "save"
|
||||
case protocol.NatsListCmd:
|
||||
if _, ok := send.(protocol.ListOp); !ok {
|
||||
return errors.Errorf("Struct Supplied to SendMsgWithReply is not a listOp: %T", send)
|
||||
}
|
||||
if _, ok := recv.(*protocol.ListResult); !ok {
|
||||
return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a listResult %T", recv)
|
||||
}
|
||||
operation = "list"
|
||||
case protocol.NatsLoadCmd:
|
||||
if _, ok := send.(protocol.LoadOp); !ok {
|
||||
return errors.Errorf("Struct Supplied to SendMsgWithReply is not a loadOp: %T", send)
|
||||
}
|
||||
if _, ok := recv.(*protocol.LoadResult); !ok {
|
||||
return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a loadResult %T", recv)
|
||||
}
|
||||
operation = "load"
|
||||
case protocol.NatsRemoveCmd:
|
||||
if _, ok := send.(protocol.RemoveOp); !ok {
|
||||
return errors.Errorf("Struct Supplied to SendMsgWithReply is not a removeOp: %T", send)
|
||||
}
|
||||
if _, ok := recv.(*protocol.RemoveResult); !ok {
|
||||
return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a removeOp %T", recv)
|
||||
}
|
||||
operation = "remove"
|
||||
|
||||
}
|
||||
var err error
|
||||
subject := fmt.Sprintf("repo.commands.%s", operation)
|
||||
msg := protocol.NewRNSMsg(subject)
|
||||
|
||||
defer func() {
|
||||
if err == nil {
|
||||
debug.Log("SendMsgWithReply (%s - %s) Took %s - %d Bytes", operation, msg.Header.Get("X-RNS-MSGID"), time.Since(start), len(msg.Data))
|
||||
} else {
|
||||
debug.Log("SendMsgWithREply (%s - %s) Failed: %s (Took %s - %d Bytes", operation, msg.Header.Get("X-RNS-MSGID"), err, time.Since(start), len(msg.Data))
|
||||
}
|
||||
}()
|
||||
|
||||
msg.Header.Set("X-RNS-OP", operation)
|
||||
|
||||
msg.Data, err = be.enc.Encode(subject, send)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Encoding Failed")
|
||||
}
|
||||
msg.Reply = nats.NewInbox()
|
||||
//debug.Log("Sending %s %d\n", msg.Subject, len(msg.Data))
|
||||
|
||||
/* check the size of the Data Field. If its close to our NATS max payload size
|
||||
* then we will chunk the transfer instead
|
||||
*/
|
||||
var chunkedmsg *nats.Msg
|
||||
|
||||
chunkedmsg, err = protocol.ChunkSendRequestMsgWithContext(ctx, be.conn, msg, debug.Log)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "ChunkRequestMsgWithContext Error: %d", len(msg.Data))
|
||||
}
|
||||
if err := be.enc.Decode(chunkedmsg.Subject, chunkedmsg.Data, recv); err != nil {
|
||||
return errors.Wrapf(err, "Decode Failed %s %s %d", chunkedmsg.Header.Get("X-RNS-MSGID"), chunkedmsg.Header, len(chunkedmsg.Data))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Open(ctx context.Context, cfg Config) (*Backend, error) {
|
||||
debug.Log("open nats backend at %s", cfg.Server.String())
|
||||
|
||||
sem, err := backend.NewSemaphore(cfg.Connections)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
be := &Backend{
|
||||
sem: sem,
|
||||
cfg: cfg,
|
||||
}
|
||||
|
||||
l, err := backend.ParseLayout(ctx, be, "default", defaultLayout, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
be.Layout = l
|
||||
|
||||
err = connectNats(be)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "open nats failed")
|
||||
}
|
||||
|
||||
co := protocol.OpenOp{Bucket: be.cfg.Repo}
|
||||
var result protocol.OpenResult
|
||||
if err := be.SendMsgWithReply(ctx, protocol.NatsOpenCmd, co, &result); err != nil {
|
||||
return nil, errors.Wrap(err, "OpenOp Failed")
|
||||
}
|
||||
debug.Log("Open Result: %+v\n", result)
|
||||
|
||||
return be, nil
|
||||
}
|
||||
|
||||
// Create creates all the necessary files and directories for a new local
|
||||
// backend at dir. Afterwards a new config blob should be created.
|
||||
func Create(ctx context.Context, cfg Config) (*Backend, error) {
|
||||
debug.Log("create nats backend at %s Repo %s", cfg.Server.String(), cfg.Repo)
|
||||
be, err := Open(ctx, cfg)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Create Repo")
|
||||
}
|
||||
_, err = be.Stat(ctx, restic.Handle{Type: restic.ConfigFile})
|
||||
if err == nil {
|
||||
return nil, errors.New("config file already exists")
|
||||
}
|
||||
|
||||
for _, d := range be.Paths() {
|
||||
err := be.Mkdir(ctx, d)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return be, nil
|
||||
}
|
||||
|
||||
// Location returns a string that describes the type and location of the
|
||||
// repository.
|
||||
func (b *Backend) Location() string {
|
||||
return b.cfg.Server.String()
|
||||
}
|
||||
|
||||
// Hasher may return a hash function for calculating a content hash for the backend
|
||||
func (b *Backend) Hasher() hash.Hash {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Test a boolean value whether a File with the name and type exists.
|
||||
func (b *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) {
|
||||
debug.Log("Test %s - %s", b.cfg.Server.String(), b.Filename(h))
|
||||
_, err := b.Stat(ctx, h)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Remove removes a File described by h.
|
||||
func (b *Backend) Remove(ctx context.Context, h restic.Handle) error {
|
||||
debug.Log("Remove %s - %s", b.cfg.Server.String(), b.Filename(h))
|
||||
ro := protocol.RemoveOp{Bucket: b.cfg.Repo, Dir: b.Dirname(h), Name: filepath.Base(b.Filename(h))}
|
||||
var result protocol.RemoveResult
|
||||
if err := b.SendMsgWithReply(context.Background(), protocol.NatsRemoveCmd, ro, &result); err != nil {
|
||||
return errors.Wrap(err, "Remove: SendMsgWithReply Failed")
|
||||
}
|
||||
if result.Ok {
|
||||
return nil
|
||||
}
|
||||
return errors.Errorf("Remove Failed")
|
||||
}
|
||||
|
||||
// Close the backend
|
||||
func (b *Backend) Close() error {
|
||||
debug.Log("Close %s", b.cfg.Server.String())
|
||||
return nil
|
||||
}
|
||||
|
||||
// Save stores the data from rd under the given handle.
|
||||
func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
debug.Log("Save %s - %s", b.cfg.Server.String(), b.Filename(h))
|
||||
var so protocol.SaveOp
|
||||
so.Dir = b.Dirname(h)
|
||||
so.Name = filepath.Base(b.Filename(h))
|
||||
so.Filesize = rd.Length()
|
||||
so.Bucket = b.cfg.Repo
|
||||
|
||||
var err error
|
||||
so.Data, err = io.ReadAll(rd)
|
||||
so.PacketSize = len(so.Data)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Save")
|
||||
}
|
||||
so.Offset = 0
|
||||
var result protocol.SaveResult
|
||||
if err := b.SendMsgWithReply(context.Background(), protocol.NatsSaveCmd, so, &result); err != nil {
|
||||
return errors.Wrap(err, "Save: SendMsgWithReply Failed")
|
||||
}
|
||||
if result.Ok {
|
||||
return nil
|
||||
}
|
||||
return errors.Errorf("Save Failed")
|
||||
}
|
||||
|
||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||
// given offset. If length is larger than zero, only a portion of the file
|
||||
// is read.
|
||||
//
|
||||
// The function fn may be called multiple times during the same Load invocation
|
||||
// and therefore must be idempotent.
|
||||
//
|
||||
// Implementations are encouraged to use backend.DefaultLoad
|
||||
func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
debug.Log("Load %s - %s (start %d length %d)", b.cfg.Server.String(), b.Filename(h), offset, length)
|
||||
lo := protocol.LoadOp{Bucket: b.cfg.Repo, Dir: b.Dirname(h), Name: filepath.Base(b.Filename(h)), Length: length, Offset: offset}
|
||||
var result protocol.LoadResult
|
||||
if err := b.SendMsgWithReply(context.Background(), protocol.NatsLoadCmd, lo, &result); err != nil {
|
||||
return errors.Wrap(err, "Save: SendMsgWithReply Failed")
|
||||
}
|
||||
if !result.Ok {
|
||||
return errors.Errorf("Load Failed")
|
||||
}
|
||||
rd := bytes.NewReader(result.Data)
|
||||
if err := fn(rd); err != nil {
|
||||
return errors.Wrap(err, "Load Read")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stat returns information about the File identified by h.
|
||||
func (b *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) {
|
||||
debug.Log("Stat %s - %s", b.cfg.Server.String(), b.Filename(h))
|
||||
op := protocol.StatOp{Bucket: b.cfg.Repo, Filename: b.Filename(h)}
|
||||
var result protocol.StatResult
|
||||
if err := b.SendMsgWithReply(context.Background(), protocol.NatsStatCmd, op, &result); err != nil {
|
||||
return restic.FileInfo{}, errors.Wrap(err, "statOp Failed")
|
||||
}
|
||||
if result.Ok {
|
||||
return restic.FileInfo{Size: result.Size, Name: h.Name}, nil
|
||||
} else {
|
||||
return restic.FileInfo{}, errors.New("File does not exist")
|
||||
}
|
||||
}
|
||||
|
||||
// List runs fn for each file in the backend which has the type t. When an
|
||||
// error occurs (or fn returns an error), List stops and returns it.
|
||||
//
|
||||
// The function fn is called exactly once for each file during successful
|
||||
// execution and at most once in case of an error.
|
||||
//
|
||||
// The function fn is called in the same Goroutine that List() is called
|
||||
// from.
|
||||
func (b *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error {
|
||||
basedir, subdirs := b.Basedir(t)
|
||||
debug.Log("List %s - %s - Subdirs? %t", b.cfg.Server.String(), basedir, subdirs)
|
||||
op := protocol.ListOp{Bucket: b.cfg.Repo, BaseDir: basedir, SubDir: subdirs}
|
||||
var result protocol.ListResult
|
||||
if err := b.SendMsgWithReply(context.Background(), protocol.NatsListCmd, op, &result); err != nil {
|
||||
return errors.Wrap(err, "listOp Failed")
|
||||
}
|
||||
for _, fi := range result.FI {
|
||||
rfi := restic.FileInfo{Name: fi.Name, Size: fi.Size}
|
||||
|
||||
if err := fn(rfi); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsNotExist returns true if the error was caused by a non-existing file
|
||||
// in the backend.
|
||||
func (b *Backend) IsNotExist(err error) bool {
|
||||
debug.Log("IsNotExist %s (TODO) - %s", b.cfg.Server.String(), err)
|
||||
|
||||
fmt.Printf("IsNotExist Called\n")
|
||||
return false
|
||||
}
|
||||
|
||||
// Delete removes all data in the backend.
|
||||
func (b *Backend) Delete(ctx context.Context) error {
|
||||
debug.Log("Delete %s (TODO)", b.cfg.Server.String())
|
||||
return errors.Errorf("TODO Delete")
|
||||
}
|
||||
|
||||
func (b *Backend) Join(p ...string) string {
|
||||
return path.Join(p...)
|
||||
}
|
||||
|
||||
func (b *Backend) ReadDir(ctx context.Context, dir string) ([]os.FileInfo, error) {
|
||||
debug.Log("ReadDir %s (TODO) - %s", b.cfg.Server.String(), dir)
|
||||
return nil, errors.Errorf("TODO: ReadDir")
|
||||
}
|
||||
|
||||
func (b *Backend) Mkdir(ctx context.Context, dir string) error {
|
||||
debug.Log("Mkdir %s - %s", b.cfg.Server.String(), dir)
|
||||
op := protocol.MkdirOp{Bucket: b.cfg.Repo, Dir: dir}
|
||||
var result protocol.MkdirResult
|
||||
if err := b.SendMsgWithReply(context.Background(), protocol.NatsMkdirCmd, op, &result); err != nil {
|
||||
return errors.Wrap(err, "natsMkdirCmd Failed")
|
||||
}
|
||||
if result.Ok {
|
||||
return nil
|
||||
} else {
|
||||
return errors.New("mkdir Failed")
|
||||
}
|
||||
}
|
Loading…
Add table
Reference in a new issue