diff --git a/go.mod b/go.mod index 2023ed12..d5c5bb0e 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ require ( github.com/Azure/azure-sdk-for-go v55.6.0+incompatible github.com/Azure/go-autorest/autorest v0.11.19 // indirect github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect - github.com/Fishwaldo/restic-nats-server/protocol v0.0.0-20211006112721-85bf5a959b04 github.com/cenkalti/backoff/v4 v4.1.1 github.com/cespare/xxhash/v2 v2.1.1 github.com/dnaeon/go-vcr v1.2.0 // indirect @@ -19,6 +18,7 @@ require ( github.com/kurin/blazer v0.5.3 github.com/minio/minio-go/v7 v7.0.12 github.com/minio/sha256-simd v1.0.0 + github.com/nats-io/nats-server/v2 v2.6.1 // indirect github.com/nats-io/nats.go v1.12.3 github.com/ncw/swift/v2 v2.0.0 github.com/pkg/errors v0.9.1 @@ -35,9 +35,8 @@ require ( golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c golang.org/x/text v0.3.6 google.golang.org/api v0.50.0 + google.golang.org/protobuf v1.27.1 // indirect gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 ) -//replace github.com/Fishwaldo/restic-nats-server/protocol => /home/fish/restic-nats-server/protocol - go 1.14 diff --git a/go.sum b/go.sum index 1e082e48..aa818a8c 100644 --- a/go.sum +++ b/go.sum @@ -64,8 +64,6 @@ github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUM github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/Fishwaldo/restic-nats-server/protocol v0.0.0-20211006112721-85bf5a959b04 h1:PNLAHZO3SXbjsUxZ7s5M+In48LQr7+NYXYcchFstMJs= -github.com/Fishwaldo/restic-nats-server/protocol v0.0.0-20211006112721-85bf5a959b04/go.mod h1:URXej+zQBychnTp4/xwPd3nxnj0mK5EG1BPBppYoeKU= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= diff --git a/internal/backend/nats/nats.go b/internal/backend/nats/nats.go index 8df56b80..b66dadbd 100644 --- a/internal/backend/nats/nats.go +++ b/internal/backend/nats/nats.go @@ -11,7 +11,7 @@ import ( "path/filepath" "time" - "github.com/Fishwaldo/restic-nats-server/protocol" + "github.com/restic/restic/internal/backend/nats/protocol" "github.com/nats-io/nats.go" "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/debug" @@ -48,7 +48,13 @@ func connectNats(be *Backend) error { } var options []nats.Option - options = append(options, nats.UserCredentials(be.cfg.Credential)) + if len(be.cfg.Credential) > 0 { + options = append(options, nats.UserCredentials(be.cfg.Credential)) + } + if len(be.cfg.Server.User.Username()) > 0 { + pass, _ := be.cfg.Server.User.Password() + options = append(options, nats.UserInfo(be.cfg.Server.User.Username(), pass)) + } options = append(options, nats.ClosedHandler(natsClosedCB)) options = append(options, nats.DisconnectHandler(natsDisconnectedCB)) @@ -175,9 +181,14 @@ func (be *Backend) SendMsgWithReply(ctx context.Context, op protocol.NatsCommand /* check the size of the Data Field. If its close to our NATS max payload size * then we will chunk the transfer instead */ + log := func(msg string, args ...interface{}) { + fmt.Printf(msg, args...) + fmt.Println() + } + var chunkedmsg *nats.Msg - chunkedmsg, err = protocol.ChunkSendRequestMsgWithContext(ctx, be.conn, msg, debug.Log) + chunkedmsg, err = protocol.ChunkSendRequestMsgWithContext(ctx, be.conn, msg, log) if err != nil { return errors.Wrapf(err, "ChunkRequestMsgWithContext Error: %d", len(msg.Data)) } diff --git a/internal/backend/nats/protocol/protocol.go b/internal/backend/nats/protocol/protocol.go new file mode 100644 index 00000000..85f35d74 --- /dev/null +++ b/internal/backend/nats/protocol/protocol.go @@ -0,0 +1,391 @@ +package protocol + +import ( + "context" + "encoding/json" + "fmt" + "math/rand" + "strconv" + "strings" + "time" + + "github.com/nats-io/nats.go" + "github.com/pkg/errors" +) + +type NatsCommand int + +const ( + NatsOpenCmd NatsCommand = iota + NatsStatCmd + NatsMkdirCmd + NatsSaveCmd + NatsListCmd + NatsLoadCmd + NatsRemoveCmd +) + +type OpenOp struct { + Bucket string `json:"bucket"` +} +type OpenResult struct { + Ok bool `json:"ok"` +} + +type StatOp struct { + Bucket string `json:"bucket"` + Filename string `json:"filename"` +} + +type StatResult struct { + Ok bool `json:"ok"` + Size int64 `json:"size"` + Name string `json:"name"` +} + +type MkdirOp struct { + Bucket string `json:"bucket"` + Dir string `json:"dir"` +} + +type MkdirResult struct { + Ok bool `json:"ok"` +} + +type SaveOp struct { + Bucket string `json:"bucket"` + Dir string `json:"dir"` + Name string `json:"name"` + Filesize int64 `json:"size"` + PacketSize int `json:"packet_size"` + Offset int64 `json:"offset"` + Data []byte `json:"data"` +} + +type SaveResult struct { + Ok bool `json:"ok"` +} + +type ListOp struct { + Bucket string `json:"bucket"` + BaseDir string `json:"base_dir"` + SubDir bool `json:"sub_dir"` +} + +type FileInfo struct { + Name string `json:"name"` + Size int64 `json:"size"` +} + +type ListResult struct { + Ok bool `json:"ok"` + FI []FileInfo `json:"fi"` +} + +type LoadOp struct { + Bucket string `json:"bucket"` + Dir string `json:"dir"` + Name string `json:"name"` + Length int `json:"length"` + Offset int64 `json:"offset"` +} + +type LoadResult struct { + Ok bool `json:"ok"` + Data []byte `json:"data"` +} + +type RemoveOp struct { + Bucket string `json:"bucket"` + Dir string `json:"dir"` + Name string `json:"name"` +} + +type RemoveResult struct { + Ok bool `json:"ok"` +} + +const ( + msgHeaderID string = "X-RNS-MSGID" + msgHeaderChunk string = "X-RNS-CHUNKS" + msgHeaderChunkSubject string = "X-RNS-CHUNK-SUBJECT" + msgHeaderChunkSeq string = "X-RNS-CHUNKS-SEQ" + msgHeaderNRI string = "Nats-Request-Info" +) + +func copyHeader(msg *nats.Msg) (hdr nats.Header) { + hdr = make(nats.Header) + hdr.Add(msgHeaderID, msg.Header.Get(msgHeaderID)) + hdr.Add(msgHeaderChunk, msg.Header.Get(msgHeaderChunk)) + return hdr +} + +type nriT struct { + Acc string `json:"acc"` + Rtt int `json:"rtt"` +} + +func getNRI(msg *nats.Msg) (*nriT, bool) { + nri := msg.Header.Get(msgHeaderNRI) + if nri == "" { + return nil, false + } + var res nriT + if err := json.Unmarshal([]byte(nri), &res); err != nil { + return nil, false + } + return &res, true +} + +// NewRNSMSG Returns a New RNS Message (for each "Transaction") +func NewRNSMsg(subject string) *nats.Msg { + msg := nats.NewMsg(subject) + msg.Header.Set(msgHeaderID, randStringBytesMaskImprSrcSB(16)) + return msg; +} + +const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" +const ( + letterIdxBits = 6 // 6 bits to represent a letter index + letterIdxMask = 1<= 0; { + if remain == 0 { + cache, remain = src.Int63(), letterIdxMax + } + if idx := int(cache & letterIdxMask); idx < len(letterBytes) { + sb.WriteByte(letterBytes[idx]) + i-- + } + cache >>= letterIdxBits + remain-- + } + + return sb.String() +} + + +func ChunkSendReplyMsgWithContext(ctx context.Context, conn *nats.Conn, replyto *nats.Msg, msg *nats.Msg, log func(string, ...interface{})) error { + if len(msg.Header.Get(msgHeaderID)) == 0 { + return errors.New("MessageID Not Set") + } + + var maxchunksize int = int(0.95 * float32(conn.MaxPayload())) + maxchunksize = 1024000 * 0.95 + datasize := len(msg.Data) + log("ChunkSendReplyMsgWithContext: MsgID %s - Headers %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data)) + + if len(msg.Data) < maxchunksize { + /* data is less then our maxchunksize, so we can just send it */ + log("ChunkSendReplyMsgWithContext: Short Reply Message %s", msg.Header.Get(msgHeaderID)) + err := replyto.RespondMsg(msg) + return errors.Wrap(err, "Short Reply Message Send Failure") + } + + /* need to Split the Data into Chunks + * we will end up sending pages + 1 messages + * as the initial message contains data as well + */ + pages := datasize / maxchunksize + initialchunk := nats.NewMsg(msg.Subject) + initialchunk.Header = copyHeader(msg) + initialchunk.Header.Set(msgHeaderChunk, fmt.Sprintf("%d", pages)) + if len(msg.Data) < maxchunksize { + maxchunksize = len(msg.Data) + } + initialchunk.Data = msg.Data[:maxchunksize] + log("Chunking Reply Message %s (%s)- pages %d, len %d First Chunk %d", initialchunk.Header.Get(msgHeaderID), initialchunk.Header, pages, len(msg.Data), len(initialchunk.Data)) + chunkchannelmsg, err := conn.RequestMsgWithContext(ctx, initialchunk) + if err != nil { + return errors.Wrap(err, "ChunkSendReplyMsgWithContext") + } + /* Reply Message just has a header with the subject we send the rest of the chunks to */ + chunkid := chunkchannelmsg.Header.Get(msgHeaderChunkSubject) + if chunkid == "" { + return errors.New("Chunked Reply Response didn't include ChunkID") + } + var chunksubject string + if nri, ok := getNRI(chunkchannelmsg); ok { + chunksubject = fmt.Sprintf("chunk.%s.send.%s", nri.Acc, chunkid ) + } else { + chunksubject = fmt.Sprintf("chunk.send.%s", chunkid ) + } + + + + for i := 1; i <= pages; i++ { + chunkmsg := nats.NewMsg(chunksubject) + chunkmsg.Header = copyHeader(msg) + chunkmsg.Header.Set(msgHeaderChunkSeq, fmt.Sprintf("%d", i)) + start := maxchunksize * i + end := maxchunksize * (i + 1) + /* make sure we don't overrun our slice */ + if end > len(msg.Data) { + end = len(msg.Data) + } + chunkmsg.Data = msg.Data[start:end] + log("Sending Reply Chunk %s - Page: %d (%d:%d)", chunkmsg.Header.Get(msgHeaderID), i, start, end) + var chunkack *nats.Msg + if i < pages { + chunkack, err = conn.RequestMsgWithContext(ctx, chunkmsg) + log("Chunk Ack Reply: %s %s - Page %d", chunkack.Header.Get(msgHeaderID), chunkack.Header, i) + if err != nil { + return errors.Wrap(err, "ChunkSendReplyMsgWithContext") + } + } else { + err := conn.PublishMsg(chunkmsg) + if err != nil { + return errors.Wrap(err, "ChunkSendReplyMsgWithContext") + } + } + + /* all chunkackorreply */ + if i == pages { + return nil + } + } + return errors.New("Failed") +} + +func ChunkSendRequestMsgWithContext(ctx context.Context, conn *nats.Conn, msg *nats.Msg, log func(string, ...interface{})) (*nats.Msg, error) { + if len(msg.Header.Get(msgHeaderID)) == 0 { + return nil, errors.New("MessageID Not Set") + } + + var maxchunksize int = int(0.95 * float32(conn.MaxPayload())) + maxchunksize = 1024000 * 0.95 + datasize := len(msg.Data) + log("ChunkSendRequestMsgWithContext: MsgID %s - Headers %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data)) + + if len(msg.Data) < maxchunksize { + /* data is less then our maxchunksize, so we can just send it */ + log("Short SendRequest MsgID %s - %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data)) + reply, err := conn.RequestMsgWithContext(ctx, msg) + if err != nil { + return nil, errors.Wrap(err, "Short Message Send Failure") + } + log("Short ReplyRequest MsgID %s Headers %s Size: %d", reply.Header.Get(msgHeaderID), reply.Header, len(reply.Data)) + return ChunkReadRequestMsgWithContext(ctx, conn, reply, log) + } + + /* need to Split the Data into Chunks + * we will end up sending pages + 1 messages + * as the initial message contains data as well + */ + pages := datasize / maxchunksize + + initialchunk := nats.NewMsg(msg.Subject) + initialchunk.Header = copyHeader(msg) + initialchunk.Header.Set(msgHeaderChunk, fmt.Sprintf("%d", pages)) + + initialchunk.Data = msg.Data[:maxchunksize] + log("Chunking Send Request MsgID %s - %s- pages %d, len %d First Chunk %d", initialchunk.Header.Get(msgHeaderID), initialchunk.Header, pages, len(msg.Data), len(initialchunk.Data)) + chunkchannelmsg, err := conn.RequestMsgWithContext(ctx, initialchunk) + if err != nil { + return nil, errors.Wrap(err, "chunkRequestMsgWithContext") + } + /* Reply Message just has a header with the subject we send the rest of the chunks to */ + chunkid := chunkchannelmsg.Header.Get(msgHeaderChunkSubject) + if chunkid == "" { + return nil, errors.New("Chunked Reply Response didn't include ChunkID") + } + var chunksubject string + if nri, ok := getNRI(chunkchannelmsg); ok { + chunksubject = fmt.Sprintf("chunk.%s.send.%s", nri.Acc, chunkid ) + } else { + chunksubject = fmt.Sprintf("chunk.send.%s", chunkid ) + } + + for i := 1; i <= pages; i++ { + chunkmsg := nats.NewMsg(chunksubject) + chunkmsg.Header = copyHeader(msg) + chunkmsg.Header.Set(msgHeaderChunkSeq, fmt.Sprintf("%d", i)) + start := maxchunksize * i + end := maxchunksize * (i + 1) + /* make sure we don't overrun our slice */ + if end > len(msg.Data) { + end = len(msg.Data) + } + chunkmsg.Data = msg.Data[start:end] + log("Sending Request Chunk %s %s to %s- Page: %d (%d:%d)", chunkmsg.Header.Get(msgHeaderID), chunkmsg.Header, chunkmsg.Subject, i, start, end) + var chunkackorreply *nats.Msg + chunkackorreply, err = conn.RequestMsgWithContext(ctx, chunkmsg) + if err != nil { + return nil, errors.Wrap(err, "Chunk Send") + } + log("got Result %s - %s", chunkmsg.Header.Get(msgHeaderID), chunkmsg.Header) + /* only the last Chunk Reply will contain the actual Response from the other side */ + if i == pages { + log("SendRequest Chunk Reply: MsgID %s Headers %s Size: %d", chunkackorreply.Header.Get(msgHeaderID), chunkackorreply.Header, len(chunkackorreply.Data)) + return ChunkReadRequestMsgWithContext(ctx, conn, chunkackorreply, log) + } + } + return nil, errors.New("Failed") +} + +func ChunkReadRequestMsgWithContext(ctx context.Context, conn *nats.Conn, msg *nats.Msg, log func(string, ...interface{})) (*nats.Msg, error) { + if len(msg.Header.Get(msgHeaderID)) == 0 { + return nil, errors.New("MessageID Not Set") + } + log("ChunkReadRequestMsgWithContext: MsgID %s - Headers %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data)) + chunked := msg.Header.Get(msgHeaderChunk) + if chunked != "" { + pages, err := strconv.Atoi(chunked) + if err != nil { + return nil, errors.Wrap(err, "Couldn't get Chunk Page Count") + } + log("Chunked Message Recieved: %s - %s - %d pages", msg.Header.Get(msgHeaderID), msg.Header, pages) + chunktransfer := randStringBytesMaskImprSrcSB(16) + chunkchan := make(chan *nats.Msg) + var chunktransfersubject string + if nri, ok := getNRI(msg); ok { + chunktransfersubject = fmt.Sprintf("chunk.%s.recieve.%s", nri.Acc, chunktransfer ) + } else { + chunktransfersubject = fmt.Sprintf("chunk.recieve.%s", chunktransfer ) + } + sub, err := conn.ChanSubscribe(chunktransfersubject, chunkchan) + if err != nil { + return nil, errors.Wrap(err, "Couldn't Subscribe to Chunk Channel") + } + log("Subscription: %+v", sub) + defer sub.Unsubscribe() + defer close(chunkchan) + chunksubmsg := nats.NewMsg(msg.Reply) + chunksubmsg.Header = copyHeader(msg) + chunksubmsg.Header.Add(msgHeaderChunkSubject, chunktransfer) + msg.RespondMsg(chunksubmsg) + /* pages - 1 because we got first Chunk in original message */ + for i := 1; i <= pages; i++ { + log("Pending MsgID %s Chunk %d of %d on %s", chunksubmsg.Header.Get(msgHeaderID), i, pages, chunktransfersubject) + select { + case chunk := <-chunkchan: + seq, _ := strconv.Atoi(chunk.Header.Get(msgHeaderChunkSeq)) + log("Got MsgID %s - %s Chunk %d %d", chunk.Header.Get(msgHeaderID), chunk.Header, seq, i) + msg.Data = append(msg.Data, chunk.Data...) + if i < pages { + ackChunk := nats.NewMsg(chunk.Subject) + ackChunk.Header = copyHeader(chunk) + err := chunk.RespondMsg(ackChunk) + if err != nil { + return nil, errors.Wrap(err, "Chunk Reply Error") + } + } else { + msg.Reply = chunk.Reply + } + case <-ctx.Done(): + log("Context Canceled") + return nil, context.DeadlineExceeded + } + } + log("Chunked Messages Done - %s - %s Final Size %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data)) + } + return msg, nil +}