use new nats-restic module for Nats Communication

This commit is contained in:
Justin Hammond 2021-10-14 12:31:49 +08:00
parent 7e4101d6c3
commit a5d2db2a6a
4 changed files with 212 additions and 268 deletions

9
go.mod
View file

@ -6,6 +6,8 @@ require (
github.com/Azure/azure-sdk-for-go v55.6.0+incompatible
github.com/Azure/go-autorest/autorest v0.11.19 // indirect
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
github.com/Fishwaldo/go-logadapter v0.0.2
github.com/Fishwaldo/restic-nats v0.0.1-rc1
github.com/cenkalti/backoff/v4 v4.1.1
github.com/cespare/xxhash/v2 v2.1.1
github.com/dnaeon/go-vcr v1.2.0 // indirect
@ -18,8 +20,6 @@ require (
github.com/kurin/blazer v0.5.3
github.com/minio/minio-go/v7 v7.0.14
github.com/minio/sha256-simd v1.0.0
github.com/nats-io/nats-server/v2 v2.6.1 // indirect
github.com/nats-io/nats.go v1.12.3
github.com/ncw/swift/v2 v2.0.0
github.com/pkg/errors v0.9.1
github.com/pkg/profile v1.6.0
@ -32,11 +32,12 @@ require (
golang.org/x/net v0.0.0-20210614182718-04defd469f4e
golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e
golang.org/x/text v0.3.6
google.golang.org/api v0.50.0
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
)
go 1.14
//replace github.com/Fishwaldo/restic-nats => ../restic-nats

23
go.sum
View file

@ -64,10 +64,15 @@ github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUM
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/Fishwaldo/go-logadapter v0.0.2 h1:RxFOr+bEDqQ1rPUmjUX5u8fGLCKY0Lyea+9AxDqzaW4=
github.com/Fishwaldo/go-logadapter v0.0.2/go.mod h1:aRbQ8rWdpeD0WWo241ctqgk/yRto8Axg09EkwWiVGK0=
github.com/Fishwaldo/restic-nats v0.0.1-rc1 h1:Byhk87ggM4AHPU+J1NV50zwZ1D/E7p33gEvl8cR83lo=
github.com/Fishwaldo/restic-nats v0.0.1-rc1/go.mod h1:GlR1upvCwuE0pTyxdU1Ztb8hxCJ63fNH4Gjxo4hG1nc=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM=
github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1PxPBQ=
@ -240,8 +245,9 @@ github.com/klauspost/cpuid/v2 v2.0.4 h1:g0I61F2K2DjRHz1cnxlkNSBIaePVoJIjjnHui8QH
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
@ -283,8 +289,9 @@ github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
github.com/nats-io/nats-server/v2 v2.6.1 h1:cJy+ia7/4EaJL+ZYDmIy2rD1mDWTfckhtPBU0GYo8xM=
github.com/nats-io/nats-server/v2 v2.6.1/go.mod h1:Az91TbZiV7K4a6k/4v6YYdOKEoxCXj+iqhHVf/MlrKo=
github.com/nats-io/nats.go v1.12.3 h1:te0GLbRsjtejEkZKKiuk46tbfIn6FfCSv3WWSo1+51E=
github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.13.0 h1:LvYqRB5epIzZWQp6lmeltOOZNLqCvm4b+qfvzZO03HE=
github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
@ -362,8 +369,10 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
@ -535,8 +544,8 @@ golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210603125802-9665404d3644/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e h1:WUoyKPm6nCo1BnNUvPGnFG3T5DUVem42yDJZZ4CNxMA=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
@ -607,8 +616,9 @@ golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.4 h1:cVngSRcfgyZCzys3KYOpCFa+4dqX/Oub9tAq00ttGVs=
golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@ -736,8 +746,9 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/ini.v1 v1.57.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU=

View file

@ -0,0 +1,79 @@
package nats
import (
"os"
"fmt"
"github.com/restic/restic/internal/debug"
"github.com/Fishwaldo/go-logadapter"
)
/* compile check against our logAdapter interface */
var _ logadapter.Logger = (*resticLogger)(nil)
/* a custom logger implementation just for Restic */
type resticLogger struct {
}
// Log a Trace Message
func (l *resticLogger) Trace(message string, params ...interface{}) {
debug.Log(message, params...)
}
// Log a Debug Message
func (l *resticLogger) Debug(message string, params ...interface{}) {
debug.Log(message, params...)
}
// Log a Info Message
func (l *resticLogger) Info(message string, params ...interface{}) {
debug.Log(message, params...)
}
// Log a Warn Message
func (l *resticLogger) Warn(message string, params ...interface{}) {
debug.Log(message, params...)
}
// Log a Error Message
func (l *resticLogger) Error(message string, params ...interface{}) {
fmt.Printf("Nats Error: %s\n", fmt.Sprintf(message, params...))
debug.Log(message, params...)
}
// Log a Fatal Message (some implementations may call os.exit() here)
func (l *resticLogger) Fatal(message string, params ...interface{}) {
fmt.Printf("Nats Fatal: %s\n", fmt.Sprintf(message, params...))
debug.Log(message, params...)
os.Exit(-1)
}
// Log a Panic Message (some implmentations may call Panic)
func (l *resticLogger) Panic(message string, params ...interface{}) {
fmt.Printf("Nats Panic: %s\n", fmt.Sprintf(message, params...))
debug.Log(message, params...)
panic(fmt.Sprintf(message, params...))
}
// Create a New Logger Instance with Name
func (l *resticLogger) New(name string) (logadapter.Logger) {
return l
}
// Add Key/Value Pairs for Structured Logging and return a new Logger
func (l *resticLogger) With(key string, value interface{}) ( logadapter.Logger) {
return l
}
// Set the Log Prefix
func (l *resticLogger) SetPrefix(name string) {
}
// Get the Log Prefix
func (l *resticLogger) GetPrefix() (string) {
return ""
}
// Set Logging Level
func (l *resticLogger) SetLevel(logadapter.Log_Level) {
}
// Get Logging Level
func(l *resticLogger) GetLevel() (logadapter.Log_Level) {
return logadapter.LOG_TRACE
}
// Sync/Flush the Log Buffers
func (l *resticLogger) Sync() {
}

View file

@ -9,195 +9,23 @@ import (
"os"
"path"
"path/filepath"
"time"
"github.com/nats-io/nats.go"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/nats/protocol"
"github.com/Fishwaldo/restic-nats"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
)
// make sure the rest backend implements restic.Backend
// make sure the nats backend implements restic.Backend
var _ restic.Backend = &Backend{}
const defaultLayout = "default"
// Backend uses the REST protocol to access data stored on a server.
// Backend uses the nats protocol to access workers that interface with the actual repository on behalf of restic
type Backend struct {
sem *backend.Semaphore
backend.Layout
cfg Config
conn *nats.Conn
enc nats.Encoder
}
func connectNats(be *Backend) error {
server := be.cfg.Server.Hostname()
port := be.cfg.Server.Port()
if port == "" {
port = "4222"
}
url := fmt.Sprintf("nats://%s:%s", server, port)
/* Check Credential File Exists */
_, err := os.Stat(be.cfg.Credential)
if err != nil {
return errors.Wrap(err, "credential file missing")
}
var options []nats.Option
if len(be.cfg.Credential) > 0 {
options = append(options, nats.UserCredentials(be.cfg.Credential))
}
if len(be.cfg.Server.User.Username()) > 0 {
pass, _ := be.cfg.Server.User.Password()
options = append(options, nats.UserInfo(be.cfg.Server.User.Username(), pass))
}
options = append(options, nats.ClosedHandler(natsClosedCB))
options = append(options, nats.DisconnectHandler(natsDisconnectedCB))
be.conn, err = nats.Connect(url, options...)
if err != nil {
return errors.Wrap(err, "nats connection failed")
}
if size := be.conn.MaxPayload(); size < 8388608 {
return errors.New("NATS Server Max Payload Size is below 8Mb")
}
if !be.conn.HeadersSupported() {
return errors.New("server does not support Headers")
}
be.enc = nats.EncoderForType("gob")
if be.enc == nil {
return errors.New("Can't Load json Decoder")
}
be.sem, err = backend.NewSemaphore(be.cfg.Connections)
fmt.Printf("Connected to Nats Server: %s (Cluster: %s)\n", be.conn.ConnectedServerName(), be.conn.ConnectedClusterName())
if err != nil {
return err
}
return nil
}
func natsClosedCB(conn *nats.Conn) {
}
func natsDisconnectedCB(conn *nats.Conn) {
}
func (be *Backend) SendMsgWithReply(ctx context.Context, op protocol.NatsCommand, send interface{}, recv interface{}) error {
var operation string
be.sem.GetToken()
defer be.sem.ReleaseToken()
start := time.Now()
switch op {
case protocol.NatsOpenCmd:
if _, ok := send.(protocol.OpenOp); !ok {
return errors.Errorf("Struct Supplied to SendMsgWithReply is not a openOP: %T", send)
}
if _, ok := recv.(*protocol.OpenResult); !ok {
return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a openResult: %T", recv)
}
operation = "open"
case protocol.NatsStatCmd:
if _, ok := send.(protocol.StatOp); !ok {
return errors.Errorf("Struct Supplied to SendMsgWithReply is not a statOp: %T", send)
}
if _, ok := recv.(*protocol.StatResult); !ok {
return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a statResult %T", recv)
}
operation = "stat"
case protocol.NatsMkdirCmd:
if _, ok := send.(protocol.MkdirOp); !ok {
return errors.Errorf("Struct Supplied to SendMsgWithReply is not a mkdirOp: %T", send)
}
if _, ok := recv.(*protocol.MkdirResult); !ok {
return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a mkdirResult %T", recv)
}
operation = "mkdir"
case protocol.NatsSaveCmd:
if _, ok := send.(protocol.SaveOp); !ok {
return errors.Errorf("Struct Supplied to SendMsgWithReply is not a saveOp: %T", send)
}
if _, ok := recv.(*protocol.SaveResult); !ok {
return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a saveResult %T", recv)
}
operation = "save"
case protocol.NatsListCmd:
if _, ok := send.(protocol.ListOp); !ok {
return errors.Errorf("Struct Supplied to SendMsgWithReply is not a listOp: %T", send)
}
if _, ok := recv.(*protocol.ListResult); !ok {
return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a listResult %T", recv)
}
operation = "list"
case protocol.NatsLoadCmd:
if _, ok := send.(protocol.LoadOp); !ok {
return errors.Errorf("Struct Supplied to SendMsgWithReply is not a loadOp: %T", send)
}
if _, ok := recv.(*protocol.LoadResult); !ok {
return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a loadResult %T", recv)
}
operation = "load"
case protocol.NatsRemoveCmd:
if _, ok := send.(protocol.RemoveOp); !ok {
return errors.Errorf("Struct Supplied to SendMsgWithReply is not a removeOp: %T", send)
}
if _, ok := recv.(*protocol.RemoveResult); !ok {
return errors.Errorf("Recv Struct supplied to SendMsgWithReply is not a removeOp %T", recv)
}
operation = "remove"
}
var err error
subject := fmt.Sprintf("repo.commands.%s", operation)
msg := protocol.NewRNSMsg(subject)
defer func() {
if err == nil {
debug.Log("SendMsgWithReply (%s - %s) Took %s - %d Bytes", operation, msg.Header.Get("X-RNS-MSGID"), time.Since(start), len(msg.Data))
} else {
debug.Log("SendMsgWithREply (%s - %s) Failed: %s (Took %s - %d Bytes", operation, msg.Header.Get("X-RNS-MSGID"), err, time.Since(start), len(msg.Data))
}
}()
msg.Header.Set("X-RNS-OP", operation)
msg.Data, err = be.enc.Encode(subject, send)
if err != nil {
return errors.Wrap(err, "Encoding Failed")
}
msg.Reply = nats.NewInbox()
//debug.Log("Sending %s %d\n", msg.Subject, len(msg.Data))
/* check the size of the Data Field. If its close to our NATS max payload size
* then we will chunk the transfer instead
*/
// log := func(msg string, args ...interface{}) {
// fmt.Printf(msg, args...)
// fmt.Println()
// }
var chunkedmsg *nats.Msg
chunkedmsg, err = protocol.ChunkSendRequestMsgWithContext(ctx, be.conn, msg, debug.Log)
if err != nil {
return errors.Wrapf(err, "ChunkRequestMsgWithContext Error: %d", len(msg.Data))
}
if len(chunkedmsg.Data) > 0 {
if err := be.enc.Decode(chunkedmsg.Subject, chunkedmsg.Data, recv); err != nil {
return errors.Wrapf(err, "Decode Failed %s %s %d", chunkedmsg.Header.Get("X-RNS-MSGID"), chunkedmsg.Header, len(chunkedmsg.Data))
}
}
return nil
rns *rns.ResticNatsClient
}
func Open(ctx context.Context, cfg Config) (*Backend, error) {
@ -211,27 +39,27 @@ func Open(ctx context.Context, cfg Config) (*Backend, error) {
be := &Backend{
sem: sem,
cfg: cfg,
Layout: &backend.DefaultLayout{Join: path.Join},
}
l, err := backend.ParseLayout(ctx, be, "default", defaultLayout, "")
host, _ := os.Hostname()
be.rns, err = rns.New(*be.cfg.Server, rns.WithName(host), rns.WithLogger(&resticLogger{}))
if err != nil {
return nil, err
}
fmt.Printf("Connected to Nats Server: %s (Cluster: %s)\n", be.rns.Conn.ConnectedServerName(), be.rns.Conn.ConnectedClusterName())
be.Layout = l
hostname, _ := os.Hostname()
err = connectNats(be)
result, err := be.rns.OpenRepo(ctx, hostname)
if err != nil {
return nil, errors.Wrap(err, "open nats failed")
// Communication Error
return nil, err
}
co := protocol.OpenOp{Bucket: be.cfg.Repo}
var result protocol.OpenResult
if err := be.SendMsgWithReply(ctx, protocol.NatsOpenCmd, co, &result); err != nil {
return nil, errors.Wrap(err, "OpenOp Failed")
if !result.Ok {
// Backend Returned a Error
return nil, result.Err
}
debug.Log("Open Result: %+v\n", result)
return be, nil
}
@ -243,9 +71,9 @@ func Create(ctx context.Context, cfg Config) (*Backend, error) {
if err != nil {
return nil, errors.Wrap(err, "Create Repo")
}
_, err = be.Stat(ctx, restic.Handle{Type: restic.ConfigFile})
if err == nil {
return nil, errors.New("config file already exists")
exist, _ := be.Test(ctx, restic.Handle{Type: restic.ConfigFile})
if exist {
return nil, errors.Errorf("config file already exists")
}
for _, d := range be.Paths() {
@ -272,58 +100,60 @@ func (b *Backend) Hasher() hash.Hash {
// Test a boolean value whether a File with the name and type exists.
func (b *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) {
debug.Log("Test %s - %s", b.cfg.Server.String(), b.Filename(h))
_, err := b.Stat(ctx, h)
res, err := b.Stat(ctx, h)
if err != nil {
return false, nil
return false, err
}
return true, nil
if res.Name == b.Filename(h) {
return true, nil
}
return false, nil
}
// Remove removes a File described by h.
func (b *Backend) Remove(ctx context.Context, h restic.Handle) error {
debug.Log("Remove %s - %s", b.cfg.Server.String(), b.Filename(h))
ro := protocol.RemoveOp{Bucket: b.cfg.Repo, Dir: b.Dirname(h), Name: filepath.Base(b.Filename(h))}
var result protocol.RemoveResult
if err := b.SendMsgWithReply(context.Background(), protocol.NatsRemoveCmd, ro, &result); err != nil {
return errors.Wrap(err, "Remove: SendMsgWithReply Failed")
result, err := b.rns.Remove(ctx, b.Dirname(h), filepath.Base(b.Filename(h)))
if err != nil {
//Communication Error
return errors.Wrap(err, "save")
}
if result.Ok {
return nil
if !result.Ok {
//Backend returned a Error
return result.Err
}
return errors.Errorf("Remove Failed")
return nil
}
// Close the backend
func (b *Backend) Close() error {
debug.Log("Close %s", b.cfg.Server.String())
result, err := b.rns.Close(context.Background())
if err != nil {
// Communication Error
return errors.Wrap(err, "close")
}
if !result.Ok {
// Backend Returned a Error
return result.Err
}
return nil
}
// Save stores the data from rd under the given handle.
func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
debug.Log("Save %s - %s", b.cfg.Server.String(), b.Filename(h))
var so protocol.SaveOp
so.Dir = b.Dirname(h)
so.Name = filepath.Base(b.Filename(h))
so.Filesize = rd.Length()
so.Bucket = b.cfg.Repo
var err error
so.Data, err = io.ReadAll(rd)
so.PacketSize = len(so.Data)
result, err := b.rns.Save(ctx, b.Dirname(h), filepath.Base(b.Filename(h)), rd)
if err != nil {
return errors.Wrap(err, "Save")
// Communication Error
return errors.Wrap(err, "save")
}
so.Offset = 0
var result protocol.SaveResult
if err := b.SendMsgWithReply(context.Background(), protocol.NatsSaveCmd, so, &result); err != nil {
return errors.Wrap(err, "Save: SendMsgWithReply Failed")
if !result.Ok {
// Backend Returned a Error
return result.Err
}
debug.Log("Save Result: %+v", result)
if result.Ok {
return nil
}
return errors.Errorf("Save Failed")
return nil
}
// Load runs fn with a reader that yields the contents of the file at h at the
@ -336,34 +166,39 @@ func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRea
// Implementations are encouraged to use backend.DefaultLoad
func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
debug.Log("Load %s - %s (start %d length %d)", b.cfg.Server.String(), b.Filename(h), offset, length)
lo := protocol.LoadOp{Bucket: b.cfg.Repo, Dir: b.Dirname(h), Name: filepath.Base(b.Filename(h)), Length: length, Offset: offset}
var result protocol.LoadResult
if err := b.SendMsgWithReply(context.Background(), protocol.NatsLoadCmd, lo, &result); err != nil {
return errors.Wrap(err, "Save: SendMsgWithReply Failed")
result, err := b.rns.Load(ctx, b.Dirname(h), filepath.Base(b.Filename(h)), length, offset)
if err != nil {
//Communication Error
return errors.Wrap(err, "Load")
}
if !result.Ok {
return errors.Errorf("Load Failed")
// Backend Returned a Error
return result.Err
}
rd := bytes.NewReader(result.Data)
if err := fn(rd); err != nil {
return errors.Wrap(err, "Load Read")
return err
}
return nil
}
// Stat returns information about the File identified by h.
func (b *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) {
debug.Log("Stat %s - %s", b.cfg.Server.String(), b.Filename(h))
op := protocol.StatOp{Bucket: b.cfg.Repo, Filename: b.Filename(h)}
var result protocol.StatResult
if err := b.SendMsgWithReply(context.Background(), protocol.NatsStatCmd, op, &result); err != nil {
return restic.FileInfo{}, errors.Wrap(err, "statOp Failed")
fmt.Printf("Backend: %+v\n", b)
debug.Log("Stat %s", b.Filename(h))
result, err := b.rns.Stat(ctx, b.Dirname(h), b.Filename(h))
if err != nil {
//Communication Error
return restic.FileInfo{}, errors.Wrap(err, "Stat")
}
if result.Ok {
return restic.FileInfo{Size: result.Size, Name: h.Name}, nil
} else {
return restic.FileInfo{}, errors.New("File does not exist")
if !result.Ok {
// Backend Returned a Error
return restic.FileInfo{}, result.Err
}
return restic.FileInfo{Size: result.Size, Name: h.Name}, nil
}
// List runs fn for each file in the backend which has the type t. When an
@ -375,16 +210,19 @@ func (b *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, e
// The function fn is called in the same Goroutine that List() is called
// from.
func (b *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error {
basedir, subdirs := b.Basedir(t)
debug.Log("List %s - %s - Subdirs? %t", b.cfg.Server.String(), basedir, subdirs)
op := protocol.ListOp{Bucket: b.cfg.Repo, BaseDir: basedir, SubDir: subdirs}
var result protocol.ListResult
if err := b.SendMsgWithReply(context.Background(), protocol.NatsListCmd, op, &result); err != nil {
return errors.Wrap(err, "listOp Failed")
dir, recursive := b.Basedir(t)
debug.Log("List %s - %s - Subdirs? %t", b.cfg.Server.String(),dir, recursive)
result, err := b.rns.List(ctx, dir, recursive)
if err != nil {
//Communication Error
return errors.Wrap(err, "List")
}
if !result.Ok {
//Backend Returned a Error
return result.Err
}
for _, fi := range result.FI {
rfi := restic.FileInfo{Name: fi.Name, Size: fi.Size}
if err := fn(rfi); err != nil {
return err
}
@ -395,37 +233,52 @@ func (b *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.Fi
// IsNotExist returns true if the error was caused by a non-existing file
// in the backend.
func (b *Backend) IsNotExist(err error) bool {
debug.Log("IsNotExist %s (TODO) - %s", b.cfg.Server.String(), err)
debug.Log("IsNotExist %s (TODO) - %T", b.cfg.Server.String(), err)
fmt.Printf("IsNotExist Called\n")
return false
}
func (b *Backend) removeKeys(ctx context.Context, t restic.FileType) error {
return b.List(ctx, t, func(fi restic.FileInfo) error {
return b.Remove(ctx, restic.Handle{Type: t, Name: fi.Name})
})
}
// Delete removes all data in the backend.
func (b *Backend) Delete(ctx context.Context) error {
debug.Log("Delete %s (TODO)", b.cfg.Server.String())
return errors.Errorf("TODO Delete")
}
alltypes := []restic.FileType{
restic.PackFile,
restic.KeyFile,
restic.LockFile,
restic.SnapshotFile,
restic.IndexFile}
func (b *Backend) Join(p ...string) string {
return path.Join(p...)
}
for _, t := range alltypes {
err := b.removeKeys(ctx, t)
if err != nil {
return nil
}
}
func (b *Backend) ReadDir(ctx context.Context, dir string) ([]os.FileInfo, error) {
debug.Log("ReadDir %s (TODO) - %s", b.cfg.Server.String(), dir)
return nil, errors.Errorf("TODO: ReadDir")
err := b.Remove(ctx, restic.Handle{Type: restic.ConfigFile})
if err != nil && !b.IsNotExist(err) {
return err
}
return nil
}
func (b *Backend) Mkdir(ctx context.Context, dir string) error {
debug.Log("Mkdir %s - %s", b.cfg.Server.String(), dir)
op := protocol.MkdirOp{Bucket: b.cfg.Repo, Dir: dir}
var result protocol.MkdirResult
if err := b.SendMsgWithReply(context.Background(), protocol.NatsMkdirCmd, op, &result); err != nil {
return errors.Wrap(err, "natsMkdirCmd Failed")
result, err := b.rns.Mkdir(ctx, dir)
if err != nil {
//Communication Error
return errors.Wrap(err, "Mkdir")
}
if result.Ok {
return nil
} else {
return errors.New("mkdir Failed")
if !result.Ok {
//Backend Returned a Error
return result.Err
}
return nil
}