mirror of
https://github.com/Fishwaldo/restic-nats.git
synced 2025-03-15 11:31:42 +00:00
Initial Version
This commit is contained in:
parent
9225aba816
commit
a6001d7edc
8 changed files with 1196 additions and 2 deletions
144
clientcommands.go
Normal file
144
clientcommands.go
Normal file
|
@ -0,0 +1,144 @@
|
|||
package rns
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
)
|
||||
|
||||
func (rns *ResticNatsClient) OpenRepo(ctx context.Context, hostname string) (OpenRepoResult, error) {
|
||||
var err error
|
||||
op := OpenRepoOp{Bucket: rns.bucket, Client: rns.Conn.Opts.Name, Hostname: hostname}
|
||||
msg := NewRNSClientMsg(NatsOpenCmd)
|
||||
|
||||
var result OpenRepoResult
|
||||
if err = rns.sendOperation(ctx, msg, op, &result); err != nil {
|
||||
return OpenRepoResult{}, err
|
||||
}
|
||||
|
||||
if !result.Ok {
|
||||
rns.logger.Error("Open Repository Failed: %s", result.Err)
|
||||
} else {
|
||||
rns.clientid = result.ClientID
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (rns *ResticNatsClient) Stat(ctx context.Context, dir, filename string) (StatResult, error) {
|
||||
var err error
|
||||
op := StatOp{Directory: dir, Filename: filename}
|
||||
msg := NewRNSClientMsg(NatsStatCmd)
|
||||
|
||||
var result StatResult
|
||||
if err = rns.sendOperation(ctx, msg, op, &result); err != nil {
|
||||
return StatResult{}, err
|
||||
}
|
||||
|
||||
if !result.Ok {
|
||||
rns.logger.Error("Stat File Failed: %s", result.Err)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (rns *ResticNatsClient) List(ctx context.Context, dir string, recursive bool) (ListResult, error) {
|
||||
var err error
|
||||
op := ListOp{BaseDir: dir, Recurse: recursive}
|
||||
msg := NewRNSClientMsg(NatsListCmd)
|
||||
|
||||
var result ListResult
|
||||
if err = rns.sendOperation(ctx, msg, op, &result); err != nil {
|
||||
return ListResult{}, err
|
||||
}
|
||||
|
||||
if !result.Ok {
|
||||
rns.logger.Error("List Dir Failed: %s", result.Err)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (rns *ResticNatsClient) Load(ctx context.Context, dir string, filename string, length int, offset int64) (LoadResult, error) {
|
||||
var err error
|
||||
op := LoadOp{Dir: dir, Name: filename, Length: length, Offset: offset}
|
||||
msg := NewRNSClientMsg(NatsLoadCmd)
|
||||
|
||||
var result LoadResult
|
||||
if err = rns.sendOperation(ctx, msg, op, &result); err != nil {
|
||||
return LoadResult{}, err
|
||||
}
|
||||
if !result.Ok {
|
||||
rns.logger.Error("Load Failed: %s", result.Err)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (rns *ResticNatsClient) Save(ctx context.Context, dir string, filename string, rd io.Reader) (SaveResult, error) {
|
||||
var err error
|
||||
op := SaveOp{Dir: dir, Name: filename}
|
||||
op.Data, err = io.ReadAll(rd)
|
||||
if err != nil {
|
||||
rns.logger.Error("ReadAll Failed: %s", err)
|
||||
return SaveResult{}, err
|
||||
}
|
||||
op.Filesize = len(op.Data)
|
||||
msg := NewRNSClientMsg(NatsSaveCmd)
|
||||
|
||||
var result SaveResult
|
||||
if err = rns.sendOperation(ctx, msg, op, &result); err != nil {
|
||||
return SaveResult{}, err
|
||||
}
|
||||
|
||||
if !result.Ok {
|
||||
rns.logger.Error("Save Failed: %s", result.Err)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (rns *ResticNatsClient) Remove(ctx context.Context, dir string, filename string) (RemoveResult, error) {
|
||||
var err error
|
||||
op := RemoveOp{Dir: dir, Name: filename}
|
||||
msg := NewRNSClientMsg(NatsRemoveCmd)
|
||||
|
||||
var result RemoveResult
|
||||
if err = rns.sendOperation(ctx, msg, op, &result); err != nil {
|
||||
return RemoveResult{}, err
|
||||
}
|
||||
|
||||
if !result.Ok {
|
||||
rns.logger.Error("Remove File Failed: %s", result.Err)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (rns *ResticNatsClient) Mkdir(ctx context.Context, dir string) (MkdirResult, error) {
|
||||
var err error
|
||||
op := MkdirOp{Dir: dir}
|
||||
msg := NewRNSClientMsg(NatsMkdirCmd)
|
||||
|
||||
var result MkdirResult
|
||||
if err = rns.sendOperation(ctx, msg, op, &result); err != nil {
|
||||
return MkdirResult{}, err
|
||||
}
|
||||
|
||||
if !result.Ok {
|
||||
rns.logger.Error("Mkdir Failed: %s", result.Err)
|
||||
}
|
||||
return result, nil
|
||||
|
||||
}
|
||||
|
||||
func (rns *ResticNatsClient) Close(ctx context.Context) (CloseResult, error) {
|
||||
var err error
|
||||
op := CloseOp{}
|
||||
rns.clientid = ""
|
||||
msg := NewRNSClientMsg(NatsCloseCmd)
|
||||
|
||||
var result CloseResult
|
||||
if err = rns.sendOperation(ctx, msg, op, &result); err != nil {
|
||||
return CloseResult{}, err
|
||||
}
|
||||
|
||||
if !result.Ok {
|
||||
rns.logger.Error("Close Failed: %s", result.Err)
|
||||
}
|
||||
return result, nil
|
||||
|
||||
}
|
173
commands.go
Normal file
173
commands.go
Normal file
|
@ -0,0 +1,173 @@
|
|||
package rns
|
||||
|
||||
//NatsCommand is the Command the packet represents.
|
||||
type NatsCommand string
|
||||
|
||||
const (
|
||||
//NatsOpenCmd - Open a Repository
|
||||
NatsOpenCmd NatsCommand = "open"
|
||||
//NatsStatCmd - Stat a file on the Repository
|
||||
NatsStatCmd NatsCommand = "stat"
|
||||
//NatsMkdirCmd - Mkdir on the Repository
|
||||
NatsMkdirCmd NatsCommand = "mkdir"
|
||||
//NatsSaveCmd - Save a file to the respository
|
||||
NatsSaveCmd NatsCommand = "save"
|
||||
//NatsListCmd - List files in a Repository
|
||||
NatsListCmd NatsCommand = "list"
|
||||
//NatsLoadCmd - Load a file from a respository
|
||||
NatsLoadCmd NatsCommand = "load"
|
||||
//NatsRemoveCmd - Remove a file from a Repository
|
||||
NatsRemoveCmd NatsCommand = "remove"
|
||||
//NatsCloseCmd - Close a Repository
|
||||
NatsCloseCmd NatsCommand = "close"
|
||||
)
|
||||
|
||||
//OpenRepoOp - Open a Repository
|
||||
type OpenRepoOp struct {
|
||||
//Bucket the Repository to open
|
||||
Bucket string `json:"bucket"`
|
||||
//Client Name
|
||||
Client string `json:"client"`
|
||||
//Host name
|
||||
Hostname string `json:"hostname"`
|
||||
}
|
||||
|
||||
//OpenRepoResult - The result of Opening a Repository
|
||||
type OpenRepoResult struct {
|
||||
// Ok - If the Repository is successfully opened
|
||||
Ok bool `json:"ok"`
|
||||
// Err, if any
|
||||
Err error `json:"err"`
|
||||
//ClientID - The ClientID
|
||||
ClientID string `json:"clientid"`
|
||||
}
|
||||
|
||||
//StatOp - Stat a file in the repository Bucket
|
||||
type StatOp struct {
|
||||
//Directory the Directory the file lives in
|
||||
Directory string `json:"directory"`
|
||||
//Filename the Filename to Stat
|
||||
Filename string `json:"filename"`
|
||||
}
|
||||
|
||||
//StatResult - the result of Stating a file
|
||||
type StatResult struct {
|
||||
//Ok - If the Stat Command was successful
|
||||
Ok bool `json:"ok"`
|
||||
//Size - The Size of the file
|
||||
Size int64 `json:"size"`
|
||||
//Name - The Name of the file
|
||||
Name string `json:"name"`
|
||||
//Err - Error
|
||||
Err error
|
||||
}
|
||||
|
||||
//MkdirOp - Make a Directory in the Repository
|
||||
type MkdirOp struct {
|
||||
//Dir the name of the directory to create
|
||||
Dir string `json:"dir"`
|
||||
}
|
||||
|
||||
//MkdirResult - THe result of Making a Directory
|
||||
type MkdirResult struct {
|
||||
//Ok - if the Mkdir Command was successful
|
||||
Ok bool `json:"ok"`
|
||||
//Err, if any
|
||||
Err error
|
||||
}
|
||||
|
||||
//SaveOp - Save a file to the respository
|
||||
type SaveOp struct {
|
||||
//Dir - The Directory to save the file in
|
||||
Dir string `json:"dir"`
|
||||
//Name - The Name of the file to save
|
||||
Name string `json:"name"`
|
||||
//FileSize - the size of the entire file
|
||||
Filesize int `json:"size"`
|
||||
//Data - The actual file data
|
||||
Data []byte `json:"data"`
|
||||
}
|
||||
|
||||
//SaveResult - The result of saving a file
|
||||
type SaveResult struct {
|
||||
//Ok - of the save command was successful
|
||||
Ok bool `json:"ok"`
|
||||
//Err - Error, if any
|
||||
Err error
|
||||
}
|
||||
|
||||
//ListOp - List files in a directory (and optionally, subdirectories)
|
||||
type ListOp struct {
|
||||
//Basedir - the Base Directory to list
|
||||
BaseDir string `json:"base_dir"`
|
||||
//Subdir - If we would recurse into subsdirectories
|
||||
Recurse bool `json:"recurse"`
|
||||
}
|
||||
|
||||
//FileInfo - File Information returned for each file in a ListOp Command
|
||||
type FileInfo struct {
|
||||
//Name - The name fo the file
|
||||
Name string `json:"name"`
|
||||
//Size - The size of the file
|
||||
Size int64 `json:"size"`
|
||||
}
|
||||
|
||||
//ListResult - The result of listing files
|
||||
type ListResult struct {
|
||||
//Ok - If the command was succesful
|
||||
Ok bool `json:"ok"`
|
||||
//FI - Slice of FileInfo for all files found
|
||||
FI []FileInfo `json:"fi"`
|
||||
//Err - Error, if any
|
||||
Err error
|
||||
}
|
||||
|
||||
//LoadOp - Read a file from a respository
|
||||
type LoadOp struct {
|
||||
//Dir - The Directory to read the file from
|
||||
Dir string `json:"dir"`
|
||||
//Name - The name of the file to Load
|
||||
Name string `json:"name"`
|
||||
//Length - How much data to load from the file
|
||||
Length int `json:"length"`
|
||||
//Offset - The offset where we should start reading the file from
|
||||
Offset int64 `json:"offset"`
|
||||
}
|
||||
|
||||
//LoadResult - The result of loading a file
|
||||
type LoadResult struct {
|
||||
//Ok - if the command was successful
|
||||
Ok bool `json:"ok"`
|
||||
//Data - slice of bytes contianing the file contents
|
||||
Data []byte `json:"data"`
|
||||
//Err - Error, if any
|
||||
Err error
|
||||
}
|
||||
|
||||
//RemoveOp - Remove a file from the Repository
|
||||
type RemoveOp struct {
|
||||
//Dir - The Name of the directory where the file resides
|
||||
Dir string `json:"dir"`
|
||||
//Name - Name of the file to remove
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
//RemoveResult - The result of removing a file
|
||||
type RemoveResult struct {
|
||||
//Ok - if the command was scuccessful
|
||||
Ok bool `json:"ok"`
|
||||
//Err - Error, if any
|
||||
Err error
|
||||
}
|
||||
|
||||
//CloseOp - Close a Repository
|
||||
type CloseOp struct {
|
||||
}
|
||||
|
||||
//CloseResult - The result of Closing a Repository
|
||||
type CloseResult struct {
|
||||
//Ok - if the command was scuccessful
|
||||
Ok bool `json:"ok"`
|
||||
//Err - Error, if any
|
||||
Err error
|
||||
}
|
19
go.mod
19
go.mod
|
@ -1,3 +1,18 @@
|
|||
module github.com/Fishwaldo/go-template
|
||||
module github.com/Fishwaldo/restic-nats
|
||||
|
||||
go 1.16
|
||||
go 1.17
|
||||
|
||||
require (
|
||||
github.com/Fishwaldo/go-logadapter v0.0.2
|
||||
github.com/nats-io/nats.go v1.13.0
|
||||
github.com/pkg/errors v0.9.1
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/golang/protobuf v1.5.2 // indirect
|
||||
github.com/nats-io/nats-server/v2 v2.6.1 // indirect
|
||||
github.com/nats-io/nkeys v0.3.0 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect
|
||||
google.golang.org/protobuf v1.27.1 // indirect
|
||||
)
|
||||
|
|
113
go.sum
Normal file
113
go.sum
Normal file
|
@ -0,0 +1,113 @@
|
|||
github.com/Fishwaldo/go-logadapter v0.0.2 h1:RxFOr+bEDqQ1rPUmjUX5u8fGLCKY0Lyea+9AxDqzaW4=
|
||||
github.com/Fishwaldo/go-logadapter v0.0.2/go.mod h1:aRbQ8rWdpeD0WWo241ctqgk/yRto8Axg09EkwWiVGK0=
|
||||
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
|
||||
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
|
||||
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
|
||||
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
|
||||
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
|
||||
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
|
||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
|
||||
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s=
|
||||
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
|
||||
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
|
||||
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
|
||||
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
|
||||
github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
|
||||
github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
|
||||
github.com/nats-io/nats-server/v2 v2.6.1 h1:cJy+ia7/4EaJL+ZYDmIy2rD1mDWTfckhtPBU0GYo8xM=
|
||||
github.com/nats-io/nats-server/v2 v2.6.1/go.mod h1:Az91TbZiV7K4a6k/4v6YYdOKEoxCXj+iqhHVf/MlrKo=
|
||||
github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nats.go v1.13.0 h1:LvYqRB5epIzZWQp6lmeltOOZNLqCvm4b+qfvzZO03HE=
|
||||
github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
|
||||
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
|
||||
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
|
||||
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
|
||||
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
|
||||
go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
||||
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI=
|
||||
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e h1:WUoyKPm6nCo1BnNUvPGnFG3T5DUVem42yDJZZ4CNxMA=
|
||||
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
|
||||
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
|
||||
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
|
||||
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
|
||||
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
|
||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
|
||||
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
65
options.go
Normal file
65
options.go
Normal file
|
@ -0,0 +1,65 @@
|
|||
package rns
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/Fishwaldo/go-logadapter"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
type RNSOptions func (*ResticNatsClient) error
|
||||
|
||||
func WithCredentials(credfile string) RNSOptions {
|
||||
return func (rns *ResticNatsClient) error {
|
||||
f, err := os.Open(credfile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
f.Close()
|
||||
rns.natsoptions = append(rns.natsoptions, nats.UserCredentials(credfile))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithUserPass(username, password string) RNSOptions {
|
||||
return func(rns *ResticNatsClient) error {
|
||||
rns.natsoptions = append(rns.natsoptions, nats.UserInfo(username, password))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithName(name string) RNSOptions {
|
||||
return func(rns *ResticNatsClient) error {
|
||||
rns.natsoptions = append(rns.natsoptions, nats.Name(name))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithPingInterval(t time.Duration) RNSOptions {
|
||||
return func(rns *ResticNatsClient) error {
|
||||
rns.natsoptions = append(rns.natsoptions, nats.PingInterval(t))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithTLSOptions(tls *tls.Config) RNSOptions {
|
||||
return func(rns *ResticNatsClient) error {
|
||||
rns.natsoptions = append(rns.natsoptions, nats.Secure(tls))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithLogger(log logadapter.Logger) RNSOptions {
|
||||
return func(rns *ResticNatsClient) error {
|
||||
rns.logger = log
|
||||
return nil
|
||||
}
|
||||
}
|
||||
func WithServer() RNSOptions {
|
||||
return func(rns *ResticNatsClient) error {
|
||||
rns.server = true
|
||||
return nil
|
||||
}
|
||||
}
|
448
protocol.go
Normal file
448
protocol.go
Normal file
|
@ -0,0 +1,448 @@
|
|||
package rns
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/Fishwaldo/go-logadapter"
|
||||
stdlogger "github.com/Fishwaldo/go-logadapter/loggers/std"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type ResticNatsClient struct {
|
||||
Conn *nats.Conn
|
||||
natsoptions []nats.Option
|
||||
Encoder nats.Encoder
|
||||
bucket string
|
||||
logger logadapter.Logger
|
||||
server bool
|
||||
clientid string
|
||||
}
|
||||
|
||||
//header Key Constant Strings for our messages
|
||||
const (
|
||||
msgHeaderID string = "X-RNS-MSGID"
|
||||
msgHeaderChunk string = "X-RNS-CHUNKS"
|
||||
msgHeaderChunkSubject string = "X-RNS-CHUNK-SUBJECT"
|
||||
msgHeaderChunkSeq string = "X-RNS-CHUNKS-SEQ"
|
||||
msgHeaderOperation string = "X-RNS-OP"
|
||||
msgHeaderVersion string = "X-RNS-VERSION"
|
||||
msgHeaderClientID string = "X-RNS-CLIENTID"
|
||||
msgHeaderNRI string = "Nats-Request-Info"
|
||||
)
|
||||
|
||||
//copyHeader copies the relevent header firles from our source to message
|
||||
//to the destination message
|
||||
func copyHeader(msg *nats.Msg) (hdr nats.Header) {
|
||||
hdr = make(nats.Header)
|
||||
hdr.Add(msgHeaderID, msg.Header.Get(msgHeaderID))
|
||||
hdr.Add(msgHeaderChunk, msg.Header.Get(msgHeaderChunk))
|
||||
hdr.Add(msgHeaderOperation, msg.Header.Get(msgHeaderOperation))
|
||||
return hdr
|
||||
}
|
||||
|
||||
//nriT is the Nats-Request-Info header fields. Used to detect which
|
||||
//account sent this message
|
||||
type nriT struct {
|
||||
//Acc The Account this message come from
|
||||
Acc string `json:"acc"`
|
||||
//Round Trip Time
|
||||
Rtt int `json:"rtt"`
|
||||
}
|
||||
|
||||
//getNRI gets the nriT from the Nats-Request-Info Header
|
||||
func getNRI(msg *nats.Msg) (*nriT, bool) {
|
||||
nri := msg.Header.Get(msgHeaderNRI)
|
||||
if nri == "" {
|
||||
return nil, false
|
||||
}
|
||||
var res nriT
|
||||
if err := json.Unmarshal([]byte(nri), &res); err != nil {
|
||||
return nil, false
|
||||
}
|
||||
return &res, true
|
||||
}
|
||||
|
||||
// NewRNSClientMsg Returns a New RNS Client Message (for each "Transaction")
|
||||
func NewRNSClientMsg(operation NatsCommand) *nats.Msg {
|
||||
msg := nats.NewMsg(fmt.Sprintf("repo.commands.%s", operation))
|
||||
msg.Header.Set(msgHeaderID, randStringBytesMaskImprSrcSB(16))
|
||||
msg.Header.Set(msgHeaderOperation, string(operation))
|
||||
msg.Header.Set(msgHeaderVersion, "1")
|
||||
return msg
|
||||
}
|
||||
|
||||
// NewRNSClientMsg Returns a New RNS Client Message (for each "Transaction")
|
||||
func NewRNSReplyMsg(replyto *nats.Msg) *nats.Msg {
|
||||
msg := nats.NewMsg(replyto.Reply)
|
||||
msg.Header.Set(msgHeaderID, replyto.Header.Get(msgHeaderID))
|
||||
msg.Header.Set(msgHeaderOperation, replyto.Header.Get(msgHeaderOperation))
|
||||
msg.Header.Set(msgHeaderVersion, "1")
|
||||
return msg
|
||||
}
|
||||
|
||||
|
||||
|
||||
func New(server url.URL, opts ...RNSOptions) (*ResticNatsClient, error) {
|
||||
var rns *ResticNatsClient
|
||||
var err error
|
||||
rns = &ResticNatsClient{logger: stdlogger.DefaultLogger(), natsoptions: make([]nats.Option, 0)}
|
||||
|
||||
if len(server.User.Username()) > 0 {
|
||||
if pass, ok := server.User.Password(); ok {
|
||||
opts = append(opts, WithUserPass(server.User.Username(), pass))
|
||||
} else {
|
||||
return nil, errors.New("No Password Supplied")
|
||||
}
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
if err := opt(rns); err != nil {
|
||||
return nil, errors.Wrap(err, "Open Failed")
|
||||
}
|
||||
}
|
||||
|
||||
if !rns.server {
|
||||
if len(server.Path) == 0 {
|
||||
return nil, errors.New("No Bucket Specified")
|
||||
} else {
|
||||
parts := strings.Split(server.Path, "/")
|
||||
if parts[1] == "" {
|
||||
return nil, errors.New("Invalid Bucket Specified")
|
||||
}
|
||||
rns.bucket = parts[1]
|
||||
}
|
||||
}
|
||||
|
||||
if server.IsAbs() {
|
||||
server.Scheme = "nats"
|
||||
}
|
||||
if server.Port() == "" {
|
||||
return nil, errors.New("No Port Specified")
|
||||
}
|
||||
|
||||
rns.Conn, err = nats.Connect(server.String(), rns.natsoptions...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if size := rns.Conn.MaxPayload(); size < 8388608 {
|
||||
return nil, errors.New("NATS Server Max Payload Size is below 8Mb")
|
||||
}
|
||||
|
||||
if !rns.Conn.HeadersSupported() {
|
||||
return nil, errors.New("server does not support Headers")
|
||||
}
|
||||
|
||||
rns.Encoder = nats.EncoderForType("gob")
|
||||
if rns.Encoder == nil {
|
||||
return nil, errors.New("Can't Load Nats Encoder")
|
||||
}
|
||||
return rns, nil
|
||||
}
|
||||
|
||||
func (rns *ResticNatsClient) sendOperation(ctx context.Context, msg *nats.Msg, send interface{}, result interface{}) error {
|
||||
var err error
|
||||
if msg.Data, err = rns.Encoder.Encode(msg.Subject, send); err != nil {
|
||||
return errors.Wrap(err, "Encode Failed")
|
||||
}
|
||||
msg.Header.Add(msgHeaderClientID, rns.clientid)
|
||||
msg.Reply = nats.NewInbox()
|
||||
|
||||
var reply *nats.Msg
|
||||
if reply, err = rns.ChunkSendRequestMsgWithContext(ctx, msg); err != nil {
|
||||
return errors.Wrapf(err, "ChunkRequestMsgWithContext Error: %d", len(msg.Data))
|
||||
}
|
||||
|
||||
if len(reply.Data) > 0 {
|
||||
if err := rns.Encoder.Decode(reply.Subject, reply.Data, result); err != nil {
|
||||
return errors.Wrapf(err, "Decode Failed")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
//ChunkSendReplyMsgWithContext - send a reply to a message, chunking this reply if its Data exceeds the NATS server max payload length
|
||||
//ctx - Context
|
||||
//replyto The Message we are replying to
|
||||
//msg the actual message we want to send
|
||||
//log Custom Logger
|
||||
func (rns *ResticNatsClient) ChunkSendReplyMsgWithContext(ctx context.Context, replyto *nats.Msg, msg *nats.Msg) error {
|
||||
if len(msg.Header.Get(msgHeaderID)) == 0 {
|
||||
return errors.New("MessageID Not Set")
|
||||
}
|
||||
|
||||
//Get the max payload size and use 95% as our limit (to account for any additioanl Meta data)
|
||||
var maxchunksize int = int(0.95 * float32(rns.Conn.MaxPayload()))
|
||||
datasize := len(msg.Data)
|
||||
rns.logger.Debug("ChunkSendReplyMsgWithContext: MsgID %s - Headers %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data))
|
||||
|
||||
//if the Data is smaller than our payload limit, we can just send it over without chunking
|
||||
if len(msg.Data) < maxchunksize {
|
||||
/* data is less then our maxchunksize, so we can just send it */
|
||||
rns.logger.Trace("ChunkSendReplyMsgWithContext: Short Reply Message %s", msg.Header.Get(msgHeaderID))
|
||||
// as this is a reply, we don't have anything coming back...
|
||||
err := replyto.RespondMsg(msg)
|
||||
return errors.Wrap(err, "Short Reply Message Send Failure")
|
||||
}
|
||||
|
||||
/* We need to Split the Data into Chunks
|
||||
* The first Chunk will be sent to the replyto Subject and include a Header
|
||||
* indicating this is a chunked message.
|
||||
*/
|
||||
pages := datasize / maxchunksize
|
||||
initialchunk := nats.NewMsg(msg.Subject)
|
||||
initialchunk.Header = copyHeader(msg)
|
||||
initialchunk.Header.Set(msgHeaderChunk, fmt.Sprintf("%d", pages))
|
||||
// Copy only the max chunk size from the original message into this first chunk
|
||||
initialchunk.Data = msg.Data[:maxchunksize]
|
||||
|
||||
rns.logger.Debug("Chunking Initial Reply Message %s (%s)- pages %d, len %d First Chunk %d", initialchunk.Header.Get(msgHeaderID), initialchunk.Header, pages, len(msg.Data), len(initialchunk.Data))
|
||||
chunkchannelmsg, err := rns.Conn.RequestMsgWithContext(ctx, initialchunk)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "ChunkSendReplyMsgWithContext")
|
||||
}
|
||||
/* Reply Message just has a header with the subject we send the rest of the chunks to */
|
||||
chunkid := chunkchannelmsg.Header.Get(msgHeaderChunkSubject)
|
||||
if chunkid == "" {
|
||||
return errors.New("Chunked Reply Response didn't include ChunkID")
|
||||
}
|
||||
var chunksubject string
|
||||
/* The subject we reply to for subsequent chunks might be
|
||||
* coming from another Account, so check the Nats-Request-Info header (set by the NATS server)
|
||||
* to see, and if so, alter the subject we send to chunks to
|
||||
*/
|
||||
if nri, ok := getNRI(replyto); ok {
|
||||
chunksubject = fmt.Sprintf("chunk.%s.send.%s", nri.Acc, chunkid)
|
||||
} else {
|
||||
chunksubject = fmt.Sprintf("chunk.send.%s", chunkid)
|
||||
}
|
||||
rns.logger.Trace("Chunk Reply Subject %s", chunksubject)
|
||||
/* now, start sending each remaining chunk to the subject, and wait for a reply acknowledging
|
||||
* its reciept.
|
||||
*/
|
||||
for i := 1; i <= pages; i++ {
|
||||
chunkmsg := nats.NewMsg(chunksubject)
|
||||
chunkmsg.Header = copyHeader(msg)
|
||||
chunkmsg.Header.Set(msgHeaderChunkSeq, fmt.Sprintf("%d", i))
|
||||
start := maxchunksize * i
|
||||
end := maxchunksize * (i + 1)
|
||||
/* make sure we don't overrun our slice */
|
||||
if end > len(msg.Data) {
|
||||
end = len(msg.Data)
|
||||
}
|
||||
chunkmsg.Data = msg.Data[start:end]
|
||||
rns.logger.Debug("Sending Reply Chunk %s - Page: %d of %d (%d:%d)", chunkmsg.Header.Get(msgHeaderID), i, pages, start, end)
|
||||
var chunkack *nats.Msg
|
||||
|
||||
/* If this chunk is not the last chunk, we expect a reply
|
||||
* acknowledging the reciept
|
||||
*/
|
||||
if i < pages {
|
||||
rns.logger.Trace("Sending Chunk to %s", chunkmsg.Subject)
|
||||
chunkack, err = rns.Conn.RequestMsgWithContext(ctx, chunkmsg)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "ChunkSendReplyMsgWithContext")
|
||||
}
|
||||
/* XXX TODO: Check Success */
|
||||
rns.logger.Trace("Chunk Ack Reply: %s %s - Page %d", chunkack.Header.Get(msgHeaderID), chunkack.Header, i)
|
||||
} else {
|
||||
/* if its the last chunk, then just send as we wont get a Ack from the
|
||||
* reciever
|
||||
*/
|
||||
err := rns.Conn.PublishMsg(chunkmsg)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "ChunkSendReplyMsgWithContext")
|
||||
}
|
||||
}
|
||||
|
||||
/* once we have sent everything, return */
|
||||
if i == pages {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
/* Sending the Chunks failed for some reason. */
|
||||
return errors.New("Failed")
|
||||
}
|
||||
|
||||
//ChunkSendRequestMsgWithContext send a message and expect a reply back from the Reciever
|
||||
//ctx - The Context to use
|
||||
//conn - the Nats Client Connection
|
||||
//msg - the message to send
|
||||
//log - Custom Logger
|
||||
func (rns *ResticNatsClient) ChunkSendRequestMsgWithContext(ctx context.Context, msg *nats.Msg) (*nats.Msg, error) {
|
||||
if len(msg.Header.Get(msgHeaderID)) == 0 {
|
||||
return nil, errors.New("MessageID Not Set")
|
||||
}
|
||||
|
||||
//Get the max payload size and use 95% as our limit (to account for any additioanl Meta data)
|
||||
var maxchunksize int = int(0.95 * float32(rns.Conn.MaxPayload()))
|
||||
datasize := len(msg.Data)
|
||||
rns.logger.Debug("ChunkSendRequestMsgWithContext: MsgID %s - Headers %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data))
|
||||
|
||||
// If the data is less then our max payload size, just send it without chunking
|
||||
if len(msg.Data) < maxchunksize {
|
||||
rns.logger.Trace("Short SendRequest MsgID %s - %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data))
|
||||
|
||||
reply, err := rns.Conn.RequestMsgWithContext(ctx, msg)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Short Message Send Failure")
|
||||
}
|
||||
rns.logger.Trace("Short ReplyRequest MsgID %s Headers %s Size: %d", reply.Header.Get(msgHeaderID), reply.Header, len(reply.Data))
|
||||
//The reply that came back in may be chunked so parse it and return the final respose
|
||||
return rns.ChunkReadRequestMsgWithContext(ctx, reply)
|
||||
}
|
||||
|
||||
/* We need to Split the Data into Chunks
|
||||
* The first Chunk will be sent to the replyto Subject and include a Header
|
||||
* indicating this is a chunked message.
|
||||
*/
|
||||
pages := datasize / maxchunksize
|
||||
initialchunk := nats.NewMsg(msg.Subject)
|
||||
initialchunk.Header = copyHeader(msg)
|
||||
initialchunk.Header.Set(msgHeaderChunk, fmt.Sprintf("%d", pages))
|
||||
|
||||
initialchunk.Data = msg.Data[:maxchunksize]
|
||||
rns.logger.Debug("Chunking Send Request MsgID %s - %s- pages %d, len %d First Chunk %d", initialchunk.Header.Get(msgHeaderID), initialchunk.Header, pages, len(msg.Data), len(initialchunk.Data))
|
||||
chunkchannelmsg, err := rns.Conn.RequestMsgWithContext(ctx, initialchunk)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "chunkRequestMsgWithContext")
|
||||
}
|
||||
/* Reply Message just has a header with the subject we send the rest of the chunks to */
|
||||
chunkid := chunkchannelmsg.Header.Get(msgHeaderChunkSubject)
|
||||
if chunkid == "" {
|
||||
return nil, errors.New("Chunked Reply Response didn't include ChunkID")
|
||||
}
|
||||
/* The subject we reply to for subsequent chunks might be
|
||||
* coming from another Account, so check the Nats-Request-Info header (set by the NATS server)
|
||||
* to see, and if so, alter the subject we send to chunks to
|
||||
*/
|
||||
var chunksubject string
|
||||
if nri, ok := getNRI(chunkchannelmsg); ok {
|
||||
chunksubject = fmt.Sprintf("chunk.%s.send.%s", nri.Acc, chunkid)
|
||||
} else {
|
||||
chunksubject = fmt.Sprintf("chunk.send.%s", chunkid)
|
||||
}
|
||||
/* now send each Chunk */
|
||||
for i := 1; i <= pages; i++ {
|
||||
chunkmsg := nats.NewMsg(chunksubject)
|
||||
chunkmsg.Header = copyHeader(msg)
|
||||
chunkmsg.Header.Set(msgHeaderChunkSeq, fmt.Sprintf("%d", i))
|
||||
start := maxchunksize * i
|
||||
end := maxchunksize * (i + 1)
|
||||
/* make sure we don't overrun our slice */
|
||||
if end > len(msg.Data) {
|
||||
end = len(msg.Data)
|
||||
}
|
||||
chunkmsg.Data = msg.Data[start:end]
|
||||
rns.logger.Debug("Sending Request Chunk %s %s to %s- Page: %d (%d:%d)", chunkmsg.Header.Get(msgHeaderID), chunkmsg.Header, chunkmsg.Subject, i, start, end)
|
||||
var chunkackorreply *nats.Msg
|
||||
chunkackorreply, err = rns.Conn.RequestMsgWithContext(ctx, chunkmsg)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Chunk Send")
|
||||
}
|
||||
rns.logger.Trace("got Result %s - %s", chunkmsg.Header.Get(msgHeaderID), chunkmsg.Header)
|
||||
/* only the last Chunk Reply will contain the actual Response from the other side
|
||||
* (the other messages were just acks for each Chunk)
|
||||
*/
|
||||
if i == pages {
|
||||
rns.logger.Debug("SendRequest Chunk Reply: MsgID %s Headers %s Size: %d", chunkackorreply.Header.Get(msgHeaderID), chunkackorreply.Header, len(chunkackorreply.Data))
|
||||
//The reply might be chunked, so read it and return the final reply
|
||||
return rns.ChunkReadRequestMsgWithContext(ctx, chunkackorreply)
|
||||
}
|
||||
}
|
||||
// Chunking Failed for some reason. Die...
|
||||
return nil, errors.New("Failed")
|
||||
}
|
||||
|
||||
/* ChunkReadRequestMsgWithContext - Read a message from the Nats Client and if its chunked,
|
||||
* get the remaining chunks and reconstruct the message
|
||||
* ctx - Context
|
||||
* msg - The message we got
|
||||
* returns the actual reconstructed message, or a error
|
||||
*/
|
||||
func (rns *ResticNatsClient) ChunkReadRequestMsgWithContext(ctx context.Context, msg *nats.Msg) (*nats.Msg, error) {
|
||||
if len(msg.Header.Get(msgHeaderID)) == 0 {
|
||||
return nil, errors.New("MessageID Not Set")
|
||||
}
|
||||
rns.logger.Debug("ChunkReadRequestMsgWithContext: MsgID %s - Headers %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data))
|
||||
chunked := msg.Header.Get(msgHeaderChunk)
|
||||
/* if we have the Chunked Header then this message is chunks
|
||||
* so we need to reconstruct it */
|
||||
if chunked != "" {
|
||||
/* how many Chunks are needed? */
|
||||
pages, err := strconv.Atoi(chunked)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Couldn't get Chunk Page Count")
|
||||
}
|
||||
rns.logger.Debug("Chunked Message Recieved: %s - %s - %d pages", msg.Header.Get(msgHeaderID), msg.Header, pages)
|
||||
/* create a random subject to recieve the rest of the chunks from */
|
||||
chunktransfer := randStringBytesMaskImprSrcSB(16)
|
||||
var chunktransfersubject string
|
||||
/* if the message come from another account, we need to read the Nats-Request-Info to get that
|
||||
* account and create the subject appropriately */
|
||||
if nri, ok := getNRI(msg); ok {
|
||||
chunktransfersubject = fmt.Sprintf("chunk.%s.recieve.%s", nri.Acc, chunktransfer)
|
||||
} else {
|
||||
chunktransfersubject = fmt.Sprintf("chunk.recieve.%s", chunktransfer)
|
||||
}
|
||||
/* Subscribe to that Subject with a buffered Channel */
|
||||
chunkchan := make(chan *nats.Msg, 10)
|
||||
sub, err := rns.Conn.QueueSubscribeSyncWithChan(chunktransfersubject, chunktransfer, chunkchan)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Couldn't Subscribe to Chunk Channel")
|
||||
}
|
||||
/* increase our limits just in case (helps with Slow Consumer issues*/
|
||||
sub.SetPendingLimits(1000, 64*1024*1024)
|
||||
rns.logger.Trace("Subscription: %+v", sub)
|
||||
/* make sure we unsubscribe and close our channels when done */
|
||||
defer sub.Unsubscribe()
|
||||
defer close(chunkchan)
|
||||
|
||||
/* send back the Channel we will recieve the Chunks on to the sender.
|
||||
* we send back to the original subject */
|
||||
chunksubmsg := nats.NewMsg(msg.Reply)
|
||||
chunksubmsg.Header = copyHeader(msg)
|
||||
chunksubmsg.Header.Add(msgHeaderChunkSubject, chunktransfer)
|
||||
if err := msg.RespondMsg(chunksubmsg); err != nil {
|
||||
return nil, errors.Wrap(err, "Respond to initial Chunk")
|
||||
}
|
||||
/* now start recieving our chunks from the sender on the Subject we sent over */
|
||||
for i := 1; i <= pages; i++ {
|
||||
rns.logger.Debug("Pending MsgID %s Chunk %d of %d on %s", chunksubmsg.Header.Get(msgHeaderID), i, pages, chunktransfersubject)
|
||||
select {
|
||||
case chunk := <-chunkchan:
|
||||
/* got another chunk from the Sending */
|
||||
seq, _ := strconv.Atoi(chunk.Header.Get(msgHeaderChunkSeq))
|
||||
rns.logger.Debug("Got MsgID %s - %s Chunk %d %d", chunk.Header.Get(msgHeaderID), chunk.Header, seq, i)
|
||||
/* this chunk contains Data, Append it to our original message */
|
||||
msg.Data = append(msg.Data, chunk.Data...)
|
||||
if i < pages {
|
||||
/* Everything but the last chunk, we need to Ack back to to the sender */
|
||||
ackChunk := nats.NewMsg(chunk.Subject)
|
||||
ackChunk.Header = copyHeader(chunk)
|
||||
rns.logger.Trace("sending ack %d %d", i, pages)
|
||||
err := chunk.RespondMsg(ackChunk)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Chunk Reply Error")
|
||||
}
|
||||
} else {
|
||||
/* Last chunk doesn't need a Ack */
|
||||
rns.logger.Trace("Chunked Messages.... %d - %d", i, pages)
|
||||
msg.Reply = chunk.Reply
|
||||
}
|
||||
case <-ctx.Done():
|
||||
rns.logger.Debug("Context Canceled")
|
||||
return nil, context.DeadlineExceeded
|
||||
}
|
||||
}
|
||||
rns.logger.Debug("Chunked Messages Done - %s - %s Final Size %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data))
|
||||
}
|
||||
/* return the final message back */
|
||||
return msg, nil
|
||||
}
|
198
servercommands.go
Normal file
198
servercommands.go
Normal file
|
@ -0,0 +1,198 @@
|
|||
package rns
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/Fishwaldo/go-logadapter"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
ClientID string
|
||||
Bucket string
|
||||
}
|
||||
|
||||
type RNSServerImpl interface {
|
||||
LookupClient(string) (Client, error)
|
||||
Open(context.Context, OpenRepoOp) (OpenRepoResult, Client, error)
|
||||
Stat(context.Context, Client, StatOp) (StatResult, error)
|
||||
Mkdir(context.Context, Client, MkdirOp) (MkdirResult, error)
|
||||
Save(context.Context, Client, SaveOp) (SaveResult, error)
|
||||
List(context.Context, Client, ListOp) (ListResult, error)
|
||||
Load(context.Context, Client, LoadOp) (LoadResult, error)
|
||||
Remove(context.Context, Client, RemoveOp) (RemoveResult, error)
|
||||
Close(context.Context, Client, CloseOp) (CloseResult, error)
|
||||
}
|
||||
|
||||
type RNSServer struct {
|
||||
impl RNSServerImpl
|
||||
log logadapter.Logger
|
||||
client *ResticNatsClient
|
||||
}
|
||||
|
||||
var (
|
||||
errRNSServerError = errors.New("Server Failure")
|
||||
)
|
||||
|
||||
func NewRNSServer(impl RNSServerImpl, rnsclient *ResticNatsClient, log logadapter.Logger) (RNSServer, error) {
|
||||
ret := RNSServer{
|
||||
impl: impl,
|
||||
client: rnsclient,
|
||||
log: log,
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (rns *RNSServer) decodeMsg(msg *nats.Msg, vptr interface{}) error {
|
||||
if err := rns.client.Encoder.Decode(msg.Subject, msg.Data, vptr); err != nil {
|
||||
rns.log.Warn("Decode Failed: %s", err)
|
||||
return errRNSServerError
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rns *RNSServer) replyResult(ctx context.Context, replyto *nats.Msg, result interface{}) error {
|
||||
var err error
|
||||
reply := NewRNSReplyMsg(replyto)
|
||||
reply.Data, err = rns.client.Encoder.Encode(reply.Reply, result)
|
||||
if err != nil {
|
||||
rns.log.Warn("Encode Failed: %s", err)
|
||||
return errRNSServerError
|
||||
}
|
||||
/* send it */
|
||||
if err = rns.client.ChunkSendReplyMsgWithContext(ctx, replyto, reply); err != nil {
|
||||
rns.log.Warn("Send Reply Failed: %s", err)
|
||||
return errRNSServerError
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rns *RNSServer) ProcessServerMsg(ctx context.Context, msg *nats.Msg) error {
|
||||
rns.log.Info("Message: %s", msg.Subject)
|
||||
|
||||
/* get the entire message, if its chunked */
|
||||
msg, err := rns.client.ChunkReadRequestMsgWithContext(ctx, msg)
|
||||
if err != nil {
|
||||
rns.log.Warn("Chunk Read Failed: %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
var cmd NatsCommand = NatsCommand(msg.Header.Get(msgHeaderOperation))
|
||||
|
||||
var client Client
|
||||
/* first case, if its a open command */
|
||||
if cmd == NatsOpenCmd {
|
||||
var oo OpenRepoOp
|
||||
err := rns.decodeMsg(msg, &oo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var or OpenRepoResult
|
||||
or, client, err = rns.impl.Open(ctx, oo)
|
||||
if err != nil {
|
||||
/* its a server specific error */
|
||||
rns.log.Warn("Server Open Failed: %s", err)
|
||||
/* make sure our result contains a generic error */
|
||||
or.Err = errRNSServerError
|
||||
}
|
||||
return rns.replyResult(ctx, msg, or)
|
||||
} else {
|
||||
/* any other command should have our CID */
|
||||
client, err = rns.impl.LookupClient(msg.Header.Get(msgHeaderClientID))
|
||||
if err != nil {
|
||||
rns.log.Warn("Client Lookup Failed: %s", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
var cmdResult interface{}
|
||||
|
||||
/* now we can process the rest of our commands */
|
||||
switch cmd {
|
||||
case NatsStatCmd:
|
||||
var so StatOp
|
||||
err := rns.decodeMsg(msg, &so)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmdResult, err = rns.impl.Stat(ctx, client, so)
|
||||
if err != nil {
|
||||
sr := cmdResult.(StatResult)
|
||||
sr.Err = errRNSServerError
|
||||
}
|
||||
case NatsMkdirCmd:
|
||||
var mo MkdirOp
|
||||
err := rns.decodeMsg(msg, &mo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmdResult, err = rns.impl.Mkdir(ctx, client, mo)
|
||||
if err != nil {
|
||||
sr := cmdResult.(MkdirResult)
|
||||
sr.Err = errRNSServerError
|
||||
}
|
||||
case NatsSaveCmd:
|
||||
var so SaveOp
|
||||
err := rns.decodeMsg(msg, &so)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmdResult, err = rns.impl.Save(ctx, client, so)
|
||||
if err != nil {
|
||||
sr := cmdResult.(SaveResult)
|
||||
sr.Err = errRNSServerError
|
||||
}
|
||||
case NatsListCmd:
|
||||
var lo ListOp
|
||||
err := rns.decodeMsg(msg, &lo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmdResult, err = rns.impl.List(ctx, client, lo)
|
||||
if err != nil {
|
||||
sr := cmdResult.(ListResult)
|
||||
sr.Err = errRNSServerError
|
||||
}
|
||||
case NatsLoadCmd:
|
||||
var lo LoadOp
|
||||
err := rns.decodeMsg(msg, &lo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmdResult, err = rns.impl.Load(ctx, client, lo)
|
||||
if err != nil {
|
||||
sr := cmdResult.(LoadResult)
|
||||
sr.Err = errRNSServerError
|
||||
}
|
||||
case NatsRemoveCmd:
|
||||
var ro RemoveOp
|
||||
err := rns.decodeMsg(msg, &ro)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmdResult, err = rns.impl.Remove(ctx, client, ro)
|
||||
if err != nil {
|
||||
sr := cmdResult.(RemoveResult)
|
||||
sr.Err = errRNSServerError
|
||||
}
|
||||
case NatsCloseCmd:
|
||||
var co CloseOp
|
||||
err := rns.decodeMsg(msg, &co)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmdResult, err = rns.impl.Close(ctx, client, co)
|
||||
if err != nil {
|
||||
sr := cmdResult.(CloseResult)
|
||||
sr.Err = errRNSServerError
|
||||
}
|
||||
default:
|
||||
rns.log.Warn("Got Unknown Command %s", cmd)
|
||||
return errRNSServerError
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
rns.log.Warn("Server %s Failed: %s", cmd, err)
|
||||
}
|
||||
return rns.replyResult(ctx, msg, cmdResult)
|
||||
}
|
38
utils.go
Normal file
38
utils.go
Normal file
|
@ -0,0 +1,38 @@
|
|||
package rns
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
)
|
||||
|
||||
|
||||
//the followng creates a random string
|
||||
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||||
const (
|
||||
letterIdxBits = 6 // 6 bits to represent a letter index
|
||||
letterIdxMask = 1<<letterIdxBits - 1 // All 1-bits, as many as letterIdxBits
|
||||
letterIdxMax = 63 / letterIdxBits // # of letter indices fitting in 63 bits
|
||||
)
|
||||
|
||||
var src = rand.NewSource(time.Now().UnixNano())
|
||||
|
||||
func randStringBytesMaskImprSrcSB(n int) string {
|
||||
sb := strings.Builder{}
|
||||
sb.Grow(n)
|
||||
// A src.Int63() generates 63 random bits, enough for letterIdxMax characters!
|
||||
for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; {
|
||||
if remain == 0 {
|
||||
cache, remain = src.Int63(), letterIdxMax
|
||||
}
|
||||
if idx := int(cache & letterIdxMask); idx < len(letterBytes) {
|
||||
sb.WriteByte(letterBytes[idx])
|
||||
i--
|
||||
}
|
||||
cache >>= letterIdxBits
|
||||
remain--
|
||||
}
|
||||
|
||||
return sb.String()
|
||||
}
|
Loading…
Add table
Reference in a new issue