mirror of
https://github.com/Fishwaldo/restic.git
synced 2025-03-15 19:41:38 +00:00
fix a few bugs with subscriptions and multitenant setup
This commit is contained in:
parent
980f1a5326
commit
b3dc3615b1
2 changed files with 38 additions and 32 deletions
|
@ -11,9 +11,9 @@ import (
|
|||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/restic/restic/internal/backend/nats/protocol"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/backend/nats/protocol"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
|
@ -181,19 +181,21 @@ func (be *Backend) SendMsgWithReply(ctx context.Context, op protocol.NatsCommand
|
|||
/* check the size of the Data Field. If its close to our NATS max payload size
|
||||
* then we will chunk the transfer instead
|
||||
*/
|
||||
log := func(msg string, args ...interface{}) {
|
||||
fmt.Printf(msg, args...)
|
||||
fmt.Println()
|
||||
}
|
||||
// log := func(msg string, args ...interface{}) {
|
||||
// fmt.Printf(msg, args...)
|
||||
// fmt.Println()
|
||||
// }
|
||||
|
||||
var chunkedmsg *nats.Msg
|
||||
|
||||
chunkedmsg, err = protocol.ChunkSendRequestMsgWithContext(ctx, be.conn, msg, log)
|
||||
chunkedmsg, err = protocol.ChunkSendRequestMsgWithContext(ctx, be.conn, msg, debug.Log)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "ChunkRequestMsgWithContext Error: %d", len(msg.Data))
|
||||
}
|
||||
if err := be.enc.Decode(chunkedmsg.Subject, chunkedmsg.Data, recv); err != nil {
|
||||
return errors.Wrapf(err, "Decode Failed %s %s %d", chunkedmsg.Header.Get("X-RNS-MSGID"), chunkedmsg.Header, len(chunkedmsg.Data))
|
||||
if len(chunkedmsg.Data) > 0 {
|
||||
if err := be.enc.Decode(chunkedmsg.Subject, chunkedmsg.Data, recv); err != nil {
|
||||
return errors.Wrapf(err, "Decode Failed %s %s %d", chunkedmsg.Header.Get("X-RNS-MSGID"), chunkedmsg.Header, len(chunkedmsg.Data))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -317,6 +319,7 @@ func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRea
|
|||
if err := b.SendMsgWithReply(context.Background(), protocol.NatsSaveCmd, so, &result); err != nil {
|
||||
return errors.Wrap(err, "Save: SendMsgWithReply Failed")
|
||||
}
|
||||
debug.Log("Save Result: %+v", result)
|
||||
if result.Ok {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -106,17 +106,19 @@ type RemoveResult struct {
|
|||
}
|
||||
|
||||
const (
|
||||
msgHeaderID string = "X-RNS-MSGID"
|
||||
msgHeaderChunk string = "X-RNS-CHUNKS"
|
||||
msgHeaderID string = "X-RNS-MSGID"
|
||||
msgHeaderChunk string = "X-RNS-CHUNKS"
|
||||
msgHeaderChunkSubject string = "X-RNS-CHUNK-SUBJECT"
|
||||
msgHeaderChunkSeq string = "X-RNS-CHUNKS-SEQ"
|
||||
msgHeaderNRI string = "Nats-Request-Info"
|
||||
msgHeaderChunkSeq string = "X-RNS-CHUNKS-SEQ"
|
||||
msgHeaderOperation string = "X-RNS-OP"
|
||||
msgHeaderNRI string = "Nats-Request-Info"
|
||||
)
|
||||
|
||||
func copyHeader(msg *nats.Msg) (hdr nats.Header) {
|
||||
hdr = make(nats.Header)
|
||||
hdr.Add(msgHeaderID, msg.Header.Get(msgHeaderID))
|
||||
hdr.Add(msgHeaderChunk, msg.Header.Get(msgHeaderChunk))
|
||||
hdr.Add(msgHeaderOperation, msg.Header.Get(msgHeaderOperation))
|
||||
return hdr
|
||||
}
|
||||
|
||||
|
@ -141,7 +143,7 @@ func getNRI(msg *nats.Msg) (*nriT, bool) {
|
|||
func NewRNSMsg(subject string) *nats.Msg {
|
||||
msg := nats.NewMsg(subject)
|
||||
msg.Header.Set(msgHeaderID, randStringBytesMaskImprSrcSB(16))
|
||||
return msg;
|
||||
return msg
|
||||
}
|
||||
|
||||
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||||
|
@ -172,7 +174,6 @@ func randStringBytesMaskImprSrcSB(n int) string {
|
|||
return sb.String()
|
||||
}
|
||||
|
||||
|
||||
func ChunkSendReplyMsgWithContext(ctx context.Context, conn *nats.Conn, replyto *nats.Msg, msg *nats.Msg, log func(string, ...interface{})) error {
|
||||
if len(msg.Header.Get(msgHeaderID)) == 0 {
|
||||
return errors.New("MessageID Not Set")
|
||||
|
@ -202,7 +203,7 @@ func ChunkSendReplyMsgWithContext(ctx context.Context, conn *nats.Conn, replyto
|
|||
maxchunksize = len(msg.Data)
|
||||
}
|
||||
initialchunk.Data = msg.Data[:maxchunksize]
|
||||
log("Chunking Reply Message %s (%s)- pages %d, len %d First Chunk %d", initialchunk.Header.Get(msgHeaderID), initialchunk.Header, pages, len(msg.Data), len(initialchunk.Data))
|
||||
log("Chunking Initial Reply Message %s (%s)- pages %d, len %d First Chunk %d", initialchunk.Header.Get(msgHeaderID), initialchunk.Header, pages, len(msg.Data), len(initialchunk.Data))
|
||||
chunkchannelmsg, err := conn.RequestMsgWithContext(ctx, initialchunk)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "ChunkSendReplyMsgWithContext")
|
||||
|
@ -213,14 +214,12 @@ func ChunkSendReplyMsgWithContext(ctx context.Context, conn *nats.Conn, replyto
|
|||
return errors.New("Chunked Reply Response didn't include ChunkID")
|
||||
}
|
||||
var chunksubject string
|
||||
if nri, ok := getNRI(chunkchannelmsg); ok {
|
||||
chunksubject = fmt.Sprintf("chunk.%s.send.%s", nri.Acc, chunkid )
|
||||
if nri, ok := getNRI(replyto); ok {
|
||||
chunksubject = fmt.Sprintf("chunk.%s.send.%s", nri.Acc, chunkid)
|
||||
} else {
|
||||
chunksubject = fmt.Sprintf("chunk.send.%s", chunkid )
|
||||
chunksubject = fmt.Sprintf("chunk.send.%s", chunkid)
|
||||
}
|
||||
|
||||
|
||||
|
||||
log("Chunk Reply Subject %s", chunksubject)
|
||||
for i := 1; i <= pages; i++ {
|
||||
chunkmsg := nats.NewMsg(chunksubject)
|
||||
chunkmsg.Header = copyHeader(msg)
|
||||
|
@ -232,14 +231,15 @@ func ChunkSendReplyMsgWithContext(ctx context.Context, conn *nats.Conn, replyto
|
|||
end = len(msg.Data)
|
||||
}
|
||||
chunkmsg.Data = msg.Data[start:end]
|
||||
log("Sending Reply Chunk %s - Page: %d (%d:%d)", chunkmsg.Header.Get(msgHeaderID), i, start, end)
|
||||
log("Sending Reply Chunk %s - Page: %d of %d (%d:%d)", chunkmsg.Header.Get(msgHeaderID), i, pages, start, end)
|
||||
var chunkack *nats.Msg
|
||||
if i < pages {
|
||||
log("Sending Chunk to %s", chunkmsg.Subject)
|
||||
chunkack, err = conn.RequestMsgWithContext(ctx, chunkmsg)
|
||||
log("Chunk Ack Reply: %s %s - Page %d", chunkack.Header.Get(msgHeaderID), chunkack.Header, i)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "ChunkSendReplyMsgWithContext")
|
||||
}
|
||||
log("Chunk Ack Reply: %s %s - Page %d", chunkack.Header.Get(msgHeaderID), chunkack.Header, i)
|
||||
} else {
|
||||
err := conn.PublishMsg(chunkmsg)
|
||||
if err != nil {
|
||||
|
@ -249,7 +249,7 @@ func ChunkSendReplyMsgWithContext(ctx context.Context, conn *nats.Conn, replyto
|
|||
|
||||
/* all chunkackorreply */
|
||||
if i == pages {
|
||||
return nil
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return errors.New("Failed")
|
||||
|
@ -280,7 +280,7 @@ func ChunkSendRequestMsgWithContext(ctx context.Context, conn *nats.Conn, msg *n
|
|||
* we will end up sending pages + 1 messages
|
||||
* as the initial message contains data as well
|
||||
*/
|
||||
pages := datasize / maxchunksize
|
||||
pages := datasize / maxchunksize
|
||||
|
||||
initialchunk := nats.NewMsg(msg.Subject)
|
||||
initialchunk.Header = copyHeader(msg)
|
||||
|
@ -299,9 +299,9 @@ func ChunkSendRequestMsgWithContext(ctx context.Context, conn *nats.Conn, msg *n
|
|||
}
|
||||
var chunksubject string
|
||||
if nri, ok := getNRI(chunkchannelmsg); ok {
|
||||
chunksubject = fmt.Sprintf("chunk.%s.send.%s", nri.Acc, chunkid )
|
||||
chunksubject = fmt.Sprintf("chunk.%s.send.%s", nri.Acc, chunkid)
|
||||
} else {
|
||||
chunksubject = fmt.Sprintf("chunk.send.%s", chunkid )
|
||||
chunksubject = fmt.Sprintf("chunk.send.%s", chunkid)
|
||||
}
|
||||
|
||||
for i := 1; i <= pages; i++ {
|
||||
|
@ -323,7 +323,7 @@ func ChunkSendRequestMsgWithContext(ctx context.Context, conn *nats.Conn, msg *n
|
|||
}
|
||||
log("got Result %s - %s", chunkmsg.Header.Get(msgHeaderID), chunkmsg.Header)
|
||||
/* only the last Chunk Reply will contain the actual Response from the other side */
|
||||
if i == pages {
|
||||
if i == pages {
|
||||
log("SendRequest Chunk Reply: MsgID %s Headers %s Size: %d", chunkackorreply.Header.Get(msgHeaderID), chunkackorreply.Header, len(chunkackorreply.Data))
|
||||
return ChunkReadRequestMsgWithContext(ctx, conn, chunkackorreply, log)
|
||||
}
|
||||
|
@ -344,17 +344,18 @@ func ChunkReadRequestMsgWithContext(ctx context.Context, conn *nats.Conn, msg *n
|
|||
}
|
||||
log("Chunked Message Recieved: %s - %s - %d pages", msg.Header.Get(msgHeaderID), msg.Header, pages)
|
||||
chunktransfer := randStringBytesMaskImprSrcSB(16)
|
||||
chunkchan := make(chan *nats.Msg)
|
||||
chunkchan := make(chan *nats.Msg, 10)
|
||||
var chunktransfersubject string
|
||||
if nri, ok := getNRI(msg); ok {
|
||||
chunktransfersubject = fmt.Sprintf("chunk.%s.recieve.%s", nri.Acc, chunktransfer )
|
||||
chunktransfersubject = fmt.Sprintf("chunk.%s.recieve.%s", nri.Acc, chunktransfer)
|
||||
} else {
|
||||
chunktransfersubject = fmt.Sprintf("chunk.recieve.%s", chunktransfer )
|
||||
chunktransfersubject = fmt.Sprintf("chunk.recieve.%s", chunktransfer)
|
||||
}
|
||||
sub, err := conn.ChanSubscribe(chunktransfersubject, chunkchan)
|
||||
sub, err := conn.QueueSubscribeSyncWithChan(chunktransfersubject, chunktransfer, chunkchan)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Couldn't Subscribe to Chunk Channel")
|
||||
}
|
||||
sub.SetPendingLimits(1000, 64*1024*1024)
|
||||
log("Subscription: %+v", sub)
|
||||
defer sub.Unsubscribe()
|
||||
defer close(chunkchan)
|
||||
|
@ -373,11 +374,13 @@ func ChunkReadRequestMsgWithContext(ctx context.Context, conn *nats.Conn, msg *n
|
|||
if i < pages {
|
||||
ackChunk := nats.NewMsg(chunk.Subject)
|
||||
ackChunk.Header = copyHeader(chunk)
|
||||
log("sending ack %d %d", i, pages)
|
||||
err := chunk.RespondMsg(ackChunk)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Chunk Reply Error")
|
||||
}
|
||||
} else {
|
||||
log("Chunked Messages.... %d - %d", i, pages)
|
||||
msg.Reply = chunk.Reply
|
||||
}
|
||||
case <-ctx.Done():
|
||||
|
|
Loading…
Add table
Reference in a new issue