From e4fe2a3dd4c49e23f8a901242bd6777e631e87dd Mon Sep 17 00:00:00 2001 From: Justin Hammond Date: Wed, 6 Oct 2021 16:16:18 +0800 Subject: [PATCH] Initial Commit of restic-nats-server --- .gitignore | 1 + .vscode/launch.json | 15 ++ cache/cache.go | 57 +++++++ go.mod | 47 +++++- go.sum | 161 ++++++++++++++++++++ protocol/go.mod | 17 +++ protocol/go.sum | 63 ++++++++ protocol/protocol.go | 343 +++++++++++++++++++++++++++++++++++++++++++ repo.go | 106 +++++++++++++ rns.go | 185 +++++++++++++++++++++++ rnsworker.go | 313 +++++++++++++++++++++++++++++++++++++++ 11 files changed, 1306 insertions(+), 2 deletions(-) create mode 100644 .vscode/launch.json create mode 100644 cache/cache.go mode change 100644 => 100755 go.mod create mode 100644 go.sum create mode 100644 protocol/go.mod create mode 100644 protocol/go.sum create mode 100755 protocol/protocol.go create mode 100755 repo.go create mode 100755 rns.go create mode 100755 rnsworker.go diff --git a/.gitignore b/.gitignore index 66fd13c..d70e89a 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ # Dependency directories (remove the comment below to include it) # vendor/ +repo/ \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..f944eb2 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,15 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Launch Package", + "type": "go", + "request": "launch", + "mode": "auto", + "program": "." + } + ] +} \ No newline at end of file diff --git a/cache/cache.go b/cache/cache.go new file mode 100644 index 0000000..86bd472 --- /dev/null +++ b/cache/cache.go @@ -0,0 +1,57 @@ +package cache + +import ( + "context" + "log" + "time" + + "github.com/buraksezer/olric" + "github.com/buraksezer/olric/config" +) + +var CacheDM *olric.DMap +var db *olric.Olric + +func init() { + // Deployment scenario: embedded-member + // This creates a single-node Olric cluster. It's good enough for experimenting. + + // config.New returns a new config.Config with sane defaults. Available values for env: + // local, lan, wan + c := config.New("lan") + + // Callback function. It's called when this node is ready to accept connections. + ctx, cancel := context.WithCancel(context.Background()) + c.Started = func() { + defer cancel() + log.Println("[INFO] Olric is ready to accept connections") + } + var err error + db, err = olric.New(c) + if err != nil { + log.Fatalf("Failed to create Olric instance: %v", err) + } + + go func() { + // Call Start at background. It's a blocker call. + err = db.Start() + if err != nil { + log.Fatalf("olric.Start returned an error: %v", err) + } + }() + <-ctx.Done() + + CacheDM, err = db.NewDMap("bucket-of-arbitrary-items") + if err != nil { + log.Fatalf("olric.NewDMap returned an error: %v", err) + } +} + +func ShutdownCache() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + err := db.Shutdown(ctx) + if err != nil { + log.Printf("Failed to shutdown Olric: %v", err) + } + cancel() +} \ No newline at end of file diff --git a/go.mod b/go.mod old mode 100644 new mode 100755 index 0cb89af..96b9695 --- a/go.mod +++ b/go.mod @@ -1,3 +1,46 @@ -module github.com/Fishwaldo/go-template +module github.com/Fishwaldo/restic-nats-server -go 1.16 +go 1.17 + +require ( + github.com/Fishwaldo/restic-nats-server/protocol v0.0.0-00010101000000-000000000000 + github.com/buraksezer/olric v0.4.0 + github.com/nats-io/nats.go v1.12.3 + github.com/pkg/errors v0.9.1 + gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 +) + +require ( + github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect + github.com/buraksezer/connpool v0.4.0 // indirect + github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72 // indirect + github.com/cespare/xxhash v1.1.0 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-immutable-radix v1.0.0 // indirect + github.com/hashicorp/go-msgpack v0.5.3 // indirect + github.com/hashicorp/go-multierror v1.0.0 // indirect + github.com/hashicorp/go-sockaddr v1.0.2 // indirect + github.com/hashicorp/golang-lru v0.5.0 // indirect + github.com/hashicorp/logutils v1.0.0 // indirect + github.com/hashicorp/memberlist v0.1.5 // indirect + github.com/klauspost/compress v1.13.4 // indirect + github.com/miekg/dns v1.1.31 // indirect + github.com/minio/highwayhash v1.0.1 // indirect + github.com/nats-io/jwt/v2 v2.0.3 // indirect + github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect + github.com/vmihailenco/msgpack v4.0.4+incompatible // indirect + golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect + golang.org/x/net v0.0.0-20211005001312-d4b1ae081e3b // indirect + golang.org/x/sync v0.0.0-20200930132711-30421366ff76 // indirect + golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 // indirect + golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 // indirect + google.golang.org/appengine v1.6.7 // indirect + google.golang.org/protobuf v1.27.1 // indirect + gopkg.in/yaml.v2 v2.3.0 // indirect +) + +replace github.com/Fishwaldo/restic-nats-server/protocol => /home/fish/restic-nats-server/protocol diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..895b336 --- /dev/null +++ b/go.sum @@ -0,0 +1,161 @@ +github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I= +github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= +github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= +github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/buraksezer/connpool v0.4.0 h1:fNLvWu0FOtJxL7Sqetm6+040mhZWVs8c6vrke14SEKw= +github.com/buraksezer/connpool v0.4.0/go.mod h1:qPiG7gKXo+EjrwG/yqn2StZM4ek6gcYnnGgFIVKN6b0= +github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72 h1:fUmDBbSvv1uOzo/t8WaxZMVb7BxJ8JECo5lGoR9c5bA= +github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72/go.mod h1:OEE5igu/CDjGegM1Jn6ZMo7R6LlV/JChAkjfQQIRLpg= +github.com/buraksezer/olric v0.4.0 h1:qxIpVmvRBIfgqNPeIFXzcnhUj2XwKLDY+XVDro057q4= +github.com/buraksezer/olric v0.4.0/go.mod h1:xNt+/QiiVuqioJzgTMuiV2LynTNu87L1Tp5289XuU78= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw= +github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0= +github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-msgpack v0.5.3 h1:zKjpN5BK/P5lMYrLmBHdBULWbJ0XpYR+7NGzqkZzoD4= +github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= +github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o= +github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= +github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= +github.com/hashicorp/go-sockaddr v1.0.2 h1:ztczhD1jLxIRjVejw8gFomI1BQZOe2WoVOu0SyteCQc= +github.com/hashicorp/go-sockaddr v1.0.2/go.mod h1:rB4wwRAUzs07qva3c5SdrY/NEtAUjGlgmH/UkBUC97A= +github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM= +github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= +github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/logutils v1.0.0 h1:dLEQVugN8vlakKOUE3ihGLTZJRB4j+M2cdTm/ORI65Y= +github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= +github.com/hashicorp/memberlist v0.1.5 h1:AYBsgJOW9gab/toO5tEB8lWetVgDKZycqkebJ8xxpqM= +github.com/hashicorp/memberlist v0.1.5/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= +github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s= +github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= +github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= +github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= +github.com/miekg/dns v1.1.31 h1:sJFOl9BgwbYAWOGEwr61FU28pqsBNdpRBnhGXtO06Oo= +github.com/miekg/dns v1.1.31/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM= +github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= +github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= +github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= +github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= +github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= +github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI= +github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= +github.com/nats-io/nats-server/v2 v2.6.1 h1:cJy+ia7/4EaJL+ZYDmIy2rD1mDWTfckhtPBU0GYo8xM= +github.com/nats-io/nats-server/v2 v2.6.1/go.mod h1:Az91TbZiV7K4a6k/4v6YYdOKEoxCXj+iqhHVf/MlrKo= +github.com/nats-io/nats.go v1.12.3 h1:te0GLbRsjtejEkZKKiuk46tbfIn6FfCSv3WWSo1+51E= +github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c h1:Lgl0gzECD8GnQ5QCWA8o6BtfL6mDH5rQgM4/fX3avOs= +github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= +github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= +github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I= +github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI= +github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk= +golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201001193750-eb9a90e9f9cb/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200930145003-4acb6c075d10/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20211005001312-d4b1ae081e3b h1:SXy8Ld8oKlcogOvUAh0J5Pm5RKzgYBMMxLxt6n5XW50= +golang.org/x/net v0.0.0-20211005001312-d4b1ae081e3b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20200930132711-30421366ff76 h1:JnxiSYT3Nm0BT2a8CyvYyM6cnrWpidecD1UuSYbhKm0= +golang.org/x/sync v0.0.0-20200930132711-30421366ff76/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= +golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= +google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 h1:yiW+nvdHb9LVqSHQBXfZCieqV4fzYhNBql77zY0ykqs= +gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637/go.mod h1:BHsqpu/nsuzkT5BpiH1EMZPLyqSMM8JbIavyFACoFNk= +gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/protocol/go.mod b/protocol/go.mod new file mode 100644 index 0000000..d15fa66 --- /dev/null +++ b/protocol/go.mod @@ -0,0 +1,17 @@ +module github.com/Fishwaldo/restic-nats-server/protocol + +go 1.17 + +require ( + github.com/nats-io/nats.go v1.12.3 + github.com/pkg/errors v0.9.1 +) + +require ( + github.com/golang/protobuf v1.5.2 // indirect + github.com/nats-io/nats-server/v2 v2.6.1 // indirect + github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect + google.golang.org/protobuf v1.27.1 // indirect +) diff --git a/protocol/go.sum b/protocol/go.sum new file mode 100644 index 0000000..fff460e --- /dev/null +++ b/protocol/go.sum @@ -0,0 +1,63 @@ +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s= +github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= +github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= +github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= +github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI= +github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= +github.com/nats-io/nats-server/v2 v2.6.1 h1:cJy+ia7/4EaJL+ZYDmIy2rD1mDWTfckhtPBU0GYo8xM= +github.com/nats-io/nats-server/v2 v2.6.1/go.mod h1:Az91TbZiV7K4a6k/4v6YYdOKEoxCXj+iqhHVf/MlrKo= +github.com/nats-io/nats.go v1.12.3 h1:te0GLbRsjtejEkZKKiuk46tbfIn6FfCSv3WWSo1+51E= +github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= +golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= diff --git a/protocol/protocol.go b/protocol/protocol.go new file mode 100755 index 0000000..eb2413f --- /dev/null +++ b/protocol/protocol.go @@ -0,0 +1,343 @@ +package protocol + +import ( + "context" + "fmt" + "math/rand" + "strconv" + "strings" + "time" + + "github.com/nats-io/nats.go" + "github.com/pkg/errors" +) + +type NatsCommand int + +const ( + NatsOpenCmd NatsCommand = iota + NatsStatCmd + NatsMkdirCmd + NatsSaveCmd + NatsListCmd + NatsLoadCmd + NatsRemoveCmd +) + +type OpenOp struct { + Bucket string `json:"bucket"` +} +type OpenResult struct { + Ok bool `json:"ok"` +} + +type StatOp struct { + Bucket string `json:"bucket"` + Filename string `json:"filename"` +} + +type StatResult struct { + Ok bool `json:"ok"` + Size int64 `json:"size"` + Name string `json:"name"` +} + +type MkdirOp struct { + Bucket string `json:"bucket"` + Dir string `json:"dir"` +} + +type MkdirResult struct { + Ok bool `json:"ok"` +} + +type SaveOp struct { + Bucket string `json:"bucket"` + Dir string `json:"dir"` + Name string `json:"name"` + Filesize int64 `json:"size"` + PacketSize int `json:"packet_size"` + Offset int64 `json:"offset"` + Data []byte `json:"data"` +} + +type SaveResult struct { + Ok bool `json:"ok"` +} + +type ListOp struct { + Bucket string `json:"bucket"` + BaseDir string `json:"base_dir"` + SubDir bool `json:"sub_dir"` +} + +type FileInfo struct { + Name string `json:"name"` + Size int64 `json:"size"` +} + +type ListResult struct { + Ok bool `json:"ok"` + FI []FileInfo `json:"fi"` +} + +type LoadOp struct { + Bucket string `json:"bucket"` + Dir string `json:"dir"` + Name string `json:"name"` + Length int `json:"length"` + Offset int64 `json:"offset"` +} + +type LoadResult struct { + Ok bool `json:"ok"` + Data []byte `json:"data"` +} + +type RemoveOp struct { + Bucket string `json:"bucket"` + Dir string `json:"dir"` + Name string `json:"name"` +} + +type RemoveResult struct { + Ok bool `json:"ok"` +} + +const ( + msgHeaderID string = "X-RNS-MSGID" + msgHeaderChunk string = "X-RNS-CHUNKS" + msgHeaderChunkSubject string = "X-RNS-CHUNK-SUBJECT" + msgHeaderChunkSeq string = "X-RNS-CHUNKS-SEQ" +) + + +const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" +const ( + letterIdxBits = 6 // 6 bits to represent a letter index + letterIdxMask = 1<= 0; { + if remain == 0 { + cache, remain = src.Int63(), letterIdxMax + } + if idx := int(cache & letterIdxMask); idx < len(letterBytes) { + sb.WriteByte(letterBytes[idx]) + i-- + } + cache >>= letterIdxBits + remain-- + } + + return sb.String() +} +// NewRNSMSG Returns a New RNS Message (for each "Transaction") +func NewRNSMsg(subject string) *nats.Msg { + msg := nats.NewMsg(subject) + msg.Header.Set(msgHeaderID, randStringBytesMaskImprSrcSB(16)) + return msg; +} + +func ChunkSendReplyMsgWithContext(ctx context.Context, conn *nats.Conn, replyto *nats.Msg, msg *nats.Msg, log func(string, ...interface{})) error { + if len(msg.Header.Get(msgHeaderID)) == 0 { + return errors.New("MessageID Not Set") + } + + var maxchunksize int = int(0.95 * float32(conn.MaxPayload())) + //maxchunksize = 1024000 * 0.95 + datasize := len(msg.Data) + log("ChunkSendReplyMsgWithContext: MsgID %s - Headers %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data)) + + if len(msg.Data) < maxchunksize { + /* data is less then our maxchunksize, so we can just send it */ + log("ChunkSendReplyMsgWithContext: Short Reply Message %s", msg.Header.Get(msgHeaderID)) + err := replyto.RespondMsg(msg) + return errors.Wrap(err, "Short Reply Message Send Failure") + } + + /* need to Split the Data into Chunks + * we will end up sending pages + 1 messages + * as the initial message contains data as well + */ + pages := datasize / maxchunksize + initialchunk := nats.NewMsg(msg.Subject) + initialchunk.Header = msg.Header + initialchunk.Header.Set(msgHeaderChunk, fmt.Sprintf("%d", pages)) + if len(msg.Data) < maxchunksize { + maxchunksize = len(msg.Data) + } + initialchunk.Data = msg.Data[:maxchunksize] + log("Chunking Reply Message %s (%s)- pages %d, len %d First Chunk %d", initialchunk.Header.Get(msgHeaderID), initialchunk.Header, pages, len(msg.Data), len(initialchunk.Data)) + chunkchannelmsg, err := conn.RequestMsgWithContext(ctx, initialchunk) + if err != nil { + return errors.Wrap(err, "ChunkSendReplyMsgWithContext") + } + /* Reply Message just has a header with the subject we send the rest of the chunks to */ + chunksubject := chunkchannelmsg.Header.Get(msgHeaderChunkSubject) + if chunksubject == "" { + return errors.New("Chunked Reply Response didn't include Subject") + } + + for i := 1; i <= pages; i++ { + chunkmsg := nats.NewMsg(chunksubject) + chunkmsg.Header = msg.Header + chunkmsg.Header.Set(msgHeaderChunkSeq, fmt.Sprintf("%d", i)) + start := maxchunksize * i + end := maxchunksize * (i + 1) + /* make sure we don't overrun our slice */ + if end > len(msg.Data) { + end = len(msg.Data) + } + chunkmsg.Data = msg.Data[start:end] + log("Sending Reply Chunk %s - Page: %d (%d:%d)", chunkmsg.Header.Get(msgHeaderID), i, start, end) + var chunkack *nats.Msg + if i < pages { + chunkack, err = conn.RequestMsgWithContext(ctx, chunkmsg) + log("Chunk Ack Reply: %s %s - Page %d", chunkack.Header.Get(msgHeaderID), chunkack.Header, i) + if err != nil { + return errors.Wrap(err, "ChunkSendReplyMsgWithContext") + } + } else { + err := conn.PublishMsg(chunkmsg) + if err != nil { + return errors.Wrap(err, "ChunkSendReplyMsgWithContext") + } + } + + /* all chunkackorreply */ + if i == pages { + return nil + } + } + return errors.New("Failed") +} + +func ChunkSendRequestMsgWithContext(ctx context.Context, conn *nats.Conn, msg *nats.Msg, log func(string, ...interface{})) (*nats.Msg, error) { + if len(msg.Header.Get(msgHeaderID)) == 0 { + return nil, errors.New("MessageID Not Set") + } + + var maxchunksize int = int(0.95 * float32(conn.MaxPayload())) + //maxchunksize = 1024000 * 0.95 + datasize := len(msg.Data) + log("ChunkSendRequestMsgWithContext: MsgID %s - Headers %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data)) + + if len(msg.Data) < maxchunksize { + /* data is less then our maxchunksize, so we can just send it */ + log("Short SendRequest MsgID %s - %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data)) + reply, err := conn.RequestMsgWithContext(ctx, msg) + if err != nil { + return nil, errors.Wrap(err, "Short Message Send Failure") + } + log("Short ReplyRequest MsgID %s Headers %s Size: %d", reply.Header.Get(msgHeaderID), reply.Header, len(reply.Data)) + return ChunkReadRequestMsgWithContext(ctx, conn, reply, log) + } + + /* need to Split the Data into Chunks + * we will end up sending pages + 1 messages + * as the initial message contains data as well + */ + pages := datasize / maxchunksize + + initialchunk := nats.NewMsg(msg.Subject) + initialchunk.Header = msg.Header + initialchunk.Header.Set(msgHeaderChunk, fmt.Sprintf("%d", pages)) + + initialchunk.Data = msg.Data[:maxchunksize] + log("Chunking Send Request MsgID %s - %s- pages %d, len %d First Chunk %d", initialchunk.Header.Get(msgHeaderID), initialchunk.Header, pages, len(msg.Data), len(initialchunk.Data)) + chunkchannelmsg, err := conn.RequestMsgWithContext(ctx, initialchunk) + if err != nil { + return nil, errors.Wrap(err, "chunkRequestMsgWithContext") + } + /* Reply Message just has a header with the subject we send the rest of the chunks to */ + chunksubject := chunkchannelmsg.Header.Get(msgHeaderChunkSubject) + if chunksubject == "" { + return nil, errors.New("Chunked Response didn't include Subject") + } + + for i := 1; i <= pages; i++ { + chunkmsg := nats.NewMsg(chunksubject) + chunkmsg.Header = msg.Header + chunkmsg.Header.Set(msgHeaderChunkSeq, fmt.Sprintf("%d", i)) + start := maxchunksize * i + end := maxchunksize * (i + 1) + /* make sure we don't overrun our slice */ + if end > len(msg.Data) { + end = len(msg.Data) + } + chunkmsg.Data = msg.Data[start:end] + log("Sending Request Chunk %s %s - Page: %d (%d:%d)", chunkmsg.Header.Get(msgHeaderID), chunkmsg.Header, i, start, end) + var chunkackorreply *nats.Msg + chunkackorreply, err = conn.RequestMsgWithContext(ctx, chunkmsg) + if err != nil { + return nil, errors.Wrap(err, "chunkRequestMsgWithContext") + } + + /* only the last Chunk Reply will contain the actual Response from the other side */ + if i == pages { + log("SendRequest Chunk Reply: MsgID %s Headers %s Size: %d", chunkackorreply.Header.Get(msgHeaderID), chunkackorreply.Header, len(chunkackorreply.Data)) + return ChunkReadRequestMsgWithContext(ctx, conn, chunkackorreply, log) + } + } + return nil, errors.New("Failed") +} + +func ChunkReadRequestMsgWithContext(ctx context.Context, conn *nats.Conn, msg *nats.Msg, log func(string, ...interface{})) (*nats.Msg, error) { + if len(msg.Header.Get(msgHeaderID)) == 0 { + return nil, errors.New("MessageID Not Set") + } + log("ChunkReadRequestMsgWithContext: MsgID %s - Headers %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data)) + chunked := msg.Header.Get(msgHeaderChunk) + if chunked != "" { + pages, err := strconv.Atoi(chunked) + if err != nil { + return nil, errors.Wrap(err, "Couldn't get Chunk Page Count") + } + log("Chunked Message Recieved: %s - %s - %d pages", msg.Header.Get(msgHeaderID), msg.Header, pages) + chunktransfersubject := conn.NewRespInbox() + chunkchan := make(chan *nats.Msg) + sub, err := conn.ChanSubscribe(chunktransfersubject, chunkchan) + if err != nil { + return nil, errors.Wrap(err, "Couldn't Subscribe to Chunk Channel") + } + defer sub.Unsubscribe() + defer close(chunkchan) + chunksubmsg := nats.NewMsg(msg.Reply) + chunksubmsg.Header = msg.Header + chunksubmsg.Header.Add(msgHeaderChunkSubject, chunktransfersubject) + msg.RespondMsg(chunksubmsg) + /* pages - 1 because we got first Chunk in original message */ + for i := 1; i <= pages; i++ { + log("Pending MsgID %s Chunk %d of %d on %s", chunksubmsg.Header.Get(msgHeaderID), i, pages, chunktransfersubject) + select { + case chunk := <-chunkchan: + seq, _ := strconv.Atoi(chunk.Header.Get(msgHeaderChunkSeq)) + log("Got MsgID %s - %s Chunk %d %d", chunk.Header.Get(msgHeaderID), chunk.Header, seq, i) + msg.Data = append(msg.Data, chunk.Data...) + if i < pages { + ackChunk := nats.NewMsg(chunk.Subject) + ackChunk.Header = chunk.Header + err := chunk.RespondMsg(ackChunk) + if err != nil { + return nil, errors.Wrap(err, "Chunk Reply Error") + } + } else { + msg.Reply = chunk.Reply + } + case <-ctx.Done(): + log("Context Canceled") + return nil, context.DeadlineExceeded + } + } + log("Chunked Messages Done - %s - %s Final Size %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data)) + } + return msg, nil +} diff --git a/repo.go b/repo.go new file mode 100755 index 0000000..28950dc --- /dev/null +++ b/repo.go @@ -0,0 +1,106 @@ +package main + +import ( + "io/fs" + "io/ioutil" + "os" + "path" + "path/filepath" + + "github.com/Fishwaldo/restic-nats-server/protocol" + + "github.com/pkg/errors" +) + +func FSStat(repo string) (fs.FileInfo, error) { + pwd, _ := os.Getwd() + fs, err := os.Stat(path.Join(pwd, "repo", repo)) + if err != nil { + return nil, errors.Wrap(err, "Stat on Repo Failed") + } + return fs, nil +} + +func FSMkDir(dir string) (error) { + pwd, _ := os.Getwd() + return os.MkdirAll(path.Join(pwd, "repo", dir), 0700) +} + +func FSSave(file string, data *[]byte, offset int64) (int, error) { + pwd, _ := os.Getwd() + filename := path.Join(pwd, "repo", file) + tmpname := filepath.Base(filename) + "-tmp-" + f, err := ioutil.TempFile(filepath.Dir(filename), tmpname) + if err != nil { + return 0, errors.Wrap(err, "Tempfile") + } + defer func(f *os.File) { + if err != nil { + _ = f.Close() // Double Close is harmless. + // Remove after Rename is harmless: we embed the final name in the + // temporary's name and no other goroutine will get the same data to + // Save, so the temporary name should never be reused by another + // goroutine. + _ = os.Remove(f.Name()) + } + }(f) + + len, err := f.Write(*data) + if err != nil { + return 0, errors.Wrap(err, "Write Failed") + } + if err := f.Close(); err != nil { + return 0, errors.Wrap(err, "Close") + } + if err := os.Rename(f.Name(), filename); err != nil { + return 0, errors.Wrap(err, "Rename") + } + return len, nil +} + +func FSListFiles(dir string, recursive bool) ([]protocol.FileInfo, error) { + pwd, _ := os.Getwd() + finaldir := path.Join(pwd, "repo", dir) + + d, err := os.Open(finaldir) + if err != nil { + return nil, err + } + defer d.Close() + + sub, err := d.Readdir(-1) + if err != nil { + return nil, err + } + + var result []protocol.FileInfo + for _, fi := range sub { + if fi.IsDir() { + /* dont' recursive more than 1 level */ + test, err := FSListFiles(path.Join(dir, fi.Name()), false) + if err != nil { + return nil, err + } + result = append(result, test...) + } + if fi.Name() != "" { + result = append(result, protocol.FileInfo{Name: fi.Name(), Size: fi.Size()}) + } + } + return result, err +} + +func FSLoadFile(filename string) (*os.File, error) { + pwd, _ := os.Getwd() + finalname := path.Join(pwd, "repo", filename) + fs, err := os.Open(finalname) + if err != nil { + return nil, errors.Wrap(err, "LoadFile") + } + return fs, nil +} +func FSRemove(filename string) error { + pwd, _ := os.Getwd() + finalname := path.Join(pwd, "repo", filename) + return os.Remove(finalname) +} \ No newline at end of file diff --git a/rns.go b/rns.go new file mode 100755 index 0000000..cecd562 --- /dev/null +++ b/rns.go @@ -0,0 +1,185 @@ +package main + +import ( + "fmt" + "net/url" + "os" + "os/signal" + "strings" + "sync" + "syscall" + + "github.com/Fishwaldo/restic-nats-server/cache" + "github.com/nats-io/nats.go" + "github.com/pkg/errors" + "gopkg.in/tomb.v2" +) + +type Config struct { + Server *url.URL + Credential string `option:"credentialfile" help:"Path to the NatsIO Credential File"` + Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 5)"` + Repo string +} + +type Backend struct { + cfg Config + conn *nats.Conn + // js nats.JetStreamContext + // buStream *nats.StreamInfo + buCommands *nats.Subscription + mx sync.Mutex + t tomb.Tomb + enc nats.Encoder +} + + + + +func createStreamConfig(name string) *nats.StreamConfig { + var subjects []string + subjects = append(subjects, "repo.commands.>") + sc := nats.StreamConfig{ + Storage: nats.FileStorage, +// Description: fmt.Sprintf("Stream for %s", name), + Retention: nats.WorkQueuePolicy, + Name: name, + Subjects: subjects, + } + return &sc +} + +// func createConsumerBUConfig() *nats.ConsumerConfig { +// cc := nats.ConsumerConfig { +// Description: fmt.Sprintf("Consumer for me"), +// //DeliverGroup: "workers", +// DeliverPolicy: nats.DeliverAllPolicy, +// AckPolicy: nats.AckExplicitPolicy, +// FilterSubject: "repo.commands.open", +// //Durable: "Workers", +// } +// return &cc +// } + +func connectNats(be *Backend) error { + be.mx.Lock() + defer be.mx.Unlock() + server := be.cfg.Server.Hostname() + port := be.cfg.Server.Port() + if port == "" { + port = "4222" + } + url := fmt.Sprintf("nats://%s:%s", server, port) + + /* Check Credential File Exists */ + _, err := os.Stat(be.cfg.Credential) + if err != nil { + return errors.Wrap(err, "credential file missing") + } + + var options []nats.Option + options = append(options, nats.UserCredentials(be.cfg.Credential)) + options = append(options, nats.ClosedHandler(natsClosedCB)) + options = append(options, nats.DisconnectHandler(natsDisconnectedCB)) + + be.conn, err = nats.Connect(url, options...) + if err != nil { + return errors.Wrap(err, "nats connection failed") + } + if size := be.conn.MaxPayload(); size < 8388608 { + return errors.New("NATS Server Max Payload Size is below 8Mb") + } + + fmt.Printf("Connected to %s (%s)\n", be.conn.ConnectedClusterName(), be.conn.ConnectedServerName()) + + fmt.Printf("Nats Message Size: %d\n", be.conn.MaxPayload()) + + be.enc = nats.EncoderForType("gob") + + // be.js, err = be.conn.JetStream() + // if err != nil { + // return errors.Wrap(err, "can't get nats jetstream context") + // } + // fmt.Printf("Stream Name: %s\n", be.cfg.Repo) + // be.buStream, err = be.js.StreamInfo(be.cfg.Repo) + // if err != nil { + // if err == nats.ErrStreamNotFound { + // /* create the Stream */ + // sc := createStreamConfig(be.cfg.Repo) + // fmt.Printf("Creating Stream %s\n", sc.Name) + // be.buStream, err = be.js.AddStream(sc) + // if err != nil { + // return errors.Wrapf(err, "Can't Create Stream %#v", sc) + // } + // } else { + // return errors.Wrap(err, "Error getting StreamInfo") + // } + // } + // var subOps []nats.SubOpt + // //subOps = append(subOps, nats.Durable("commands")) + // subOps = append(subOps, nats.AckExplicit()) + // subOps = append(subOps, nats.BindStream(be.buStream.Config.Name)) + // subOps = append(subOps, nats.MaxAckPending(5)) + be.buCommands, err = be.conn.QueueSubscribeSync("repo.commands.*", "opencommandworkers") + if err != nil { + return errors.Wrap(err, "Can't Add Consumer") + } +// fmt.Printf("Stream %#v\n", be.buStream) + return nil +} + +func natsClosedCB(conn *nats.Conn) { + fmt.Printf("Connection Closed: %s", conn.LastError()) +} + +func natsDisconnectedCB(conn *nats.Conn) { + fmt.Printf("Connection Disconnected: %s", conn.LastError()) +} + +func main() { + + url, _ := url.Parse("nats://gw-docker-1.dmz.dynam.ac:4222/backup") + + cfg := Config{ + Server: url, + Credential: "/home/fish/.nkeys/creds/Operator/Backup/restic.creds", + } + var repo string + if cfg.Server.Path[0] == '/' { + repo = cfg.Server.Path[1:] + } + if repo[len(repo)-1] == '/' { + repo = repo[0 : len(repo)-1] + } + // replace any further slashes with . to specify a nested queue + repo = strings.Replace(repo, "/", ".", -1) + + cfg.Repo = repo + + be := &Backend{ + cfg: cfg, + } + if err := connectNats(be); err != nil { + fmt.Printf("Error Connecting to Nats: %s\n", err) + os.Exit(0) + } + + for i :=0; i < 50; i++ { + wd := WorkerData{id: i, be: be} + be.t.Go(wd.Run); + } + + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) + + s := <-signalChan + be.t.Kill(errors.New("Shutting Down")) + if err := be.t.Wait(); err != nil { + fmt.Printf("Workers Reported Error: %s", err) + } + + fmt.Printf("Got Shutdown Signal %s", s) + cache.ShutdownCache() + os.Exit(0) + +} diff --git a/rnsworker.go b/rnsworker.go new file mode 100755 index 0000000..ecf5cb2 --- /dev/null +++ b/rnsworker.go @@ -0,0 +1,313 @@ +package main + +import ( + "context" + "fmt" + "io" + "io/fs" + "path" + "time" + + "github.com/Fishwaldo/restic-nats-server/protocol" +// "github.com/nats-io/nats.go" + "github.com/pkg/errors" +) + +type WorkerData struct { + id int + be *Backend + cancel context.CancelFunc +} + +func (wd *WorkerData) Run() error { + for { + var ctx context.Context + ctx, wd.cancel = context.WithCancel(context.Background()) + netctx, _ := context.WithTimeout(ctx, 1*time.Second) + select { + case <-wd.be.t.Dying(): + fmt.Printf("Killing Worker %d\n", wd.id) + wd.cancel() + return nil + default: + } + //wd.be.mx.Lock() + msg, err := wd.be.buCommands.NextMsgWithContext(netctx) + //wd.be.mx.Unlock() + if err != nil { + if !errors.Is(err, context.DeadlineExceeded) { + fmt.Printf("id: %d Error: %s %t\n", wd.id, err, err) + return err + } + } + if msg != nil { + jobctx, _ := context.WithTimeout(ctx, 120 * time.Second) + fmt.Printf("id: %d Message: %s %s\n", wd.id, msg.Subject, msg.Sub.Queue) + start := time.Now() + + log := func(msg string, args ...interface{}) { + fmt.Printf("ID: %d - %s\n", wd.id, fmt.Sprintf(msg, args...)) + } + + msg, err := protocol.ChunkReadRequestMsgWithContext(jobctx, wd.be.conn, msg, log) + if err != nil { + fmt.Printf("ID: %d - ChunkedRead Failed: %s\n", wd.id, err) + continue + } + + operation := msg.Header.Get("X-RNS-OP") + switch operation { + case "open": + var oo protocol.OpenOp + or := protocol.OpenResult{Ok: false} + + if err := wd.be.enc.Decode(msg.Subject, msg.Data, &oo); err != nil { + fmt.Printf("ID: %d - Decode Failed: %s\n", wd.id, err) + } + + or.Ok, err = wd.OpenRepo(oo) + if err != nil { + or.Ok = false + fmt.Printf("ID: %d - OpenOp: Error: %s\n", wd.id, err) + } + + replymsg := protocol.NewRNSMsg(msg.Reply) + + replymsg.Data, err = wd.be.enc.Encode(msg.Reply, or) + if err != nil { + fmt.Printf("ID %d - Encode Failed: %s\n", wd.id, err) + } + if err = protocol.ChunkSendReplyMsgWithContext(ctx, wd.be.conn, msg, replymsg, log); err != nil { + fmt.Printf("ID %d - ChunkReplyRequestMsgWithContext Failed: %s\n", wd.id, err) + } + case "stat": + var so protocol.StatOp + if err := wd.be.enc.Decode(msg.Subject, msg.Data, &so); err != nil { + return errors.Wrap(err, "Decode Failed") + } + + fi, err := wd.Stat(so) + var sr protocol.StatResult + if err != nil { + fmt.Printf("Stat: Error: %s\n", err) + sr.Ok = false + } else { + sr.Ok = true + sr.Size = fi.Size() + sr.Name = fi.Name() + } + replymsg := protocol.NewRNSMsg(msg.Reply) + replymsg.Data, err = wd.be.enc.Encode(msg.Reply, sr) + if err != nil { + return errors.Wrap(err, "Encode Failed") + } + if err = protocol.ChunkSendReplyMsgWithContext(ctx, wd.be.conn, msg, replymsg, log); err != nil { + fmt.Printf("ID %d - ChunkReplyRequestMsgWithContext Failed: %s\n", wd.id, err) + } + case "mkdir": + + var mo protocol.MkdirOp + if err := wd.be.enc.Decode(msg.Subject, msg.Data, &mo); err != nil { + return errors.Wrap(err, "Decode Failed") + } + + var mr protocol.MkdirResult + err := wd.Mkdir(mo) + if err != nil { + fmt.Printf("Mkdir: Error: %s\n", err) + mr.Ok = false + } else { + mr.Ok = true + } + replymsg := protocol.NewRNSMsg(msg.Reply) + replymsg.Data, err = wd.be.enc.Encode(msg.Reply, mr) + if err != nil { + return errors.Wrap(err, "Encode Failed") + } + if err = protocol.ChunkSendReplyMsgWithContext(ctx, wd.be.conn, msg, replymsg, log); err != nil { + fmt.Printf("ID %d - ChunkReplyRequestMsgWithContext Failed: %s\n", wd.id, err) + } + case "save": + var so protocol.SaveOp + if err := wd.be.enc.Decode(msg.Subject, msg.Data, &so); err != nil { + return errors.Wrap(err, "Decode Failed") + } + + var sr protocol.SaveResult + err := wd.Save(so) + if err != nil { + fmt.Printf("Save: Error: %s\n", err) + sr.Ok = false + } else { + sr.Ok = true + } + replymsg := protocol.NewRNSMsg(msg.Reply) + replymsg.Data, err = wd.be.enc.Encode(msg.Reply, sr) + if err != nil { + return errors.Wrap(err, "Encode Failed") + } + if err = protocol.ChunkSendReplyMsgWithContext(ctx, wd.be.conn, msg, replymsg, log); err != nil { + fmt.Printf("ID %d - ChunkReplyRequestMsgWithContext Failed: %s\n", wd.id, err) + } + case "list": + var lo protocol.ListOp + if err := wd.be.enc.Decode(msg.Subject, msg.Data, &lo); err != nil { + return errors.Wrap(err, "Decode Failed") + } + + var lr protocol.ListResult + lr, err = wd.List(lo) + if err != nil { + fmt.Printf("List: Error: %s\n", err) + lr.Ok = false + } else { + lr.Ok = true + } + replymsg := protocol.NewRNSMsg(msg.Reply) + replymsg.Data, err = wd.be.enc.Encode(msg.Reply, lr) + if err != nil { + return errors.Wrap(err, "Encode Failed") + } + if err = protocol.ChunkSendReplyMsgWithContext(ctx, wd.be.conn, msg, replymsg, log); err != nil { + fmt.Printf("ID %d - ChunkReplyRequestMsgWithContext Failed: %s\n", wd.id, err) + } + case "load": + var lo protocol.LoadOp + if err := wd.be.enc.Decode(msg.Subject, msg.Data, &lo); err != nil { + return errors.Wrap(err, "Decode Failed") + } + + var lr protocol.LoadResult + lr, err = wd.Load(lo) + if err != nil { + fmt.Printf("List: Error: %s\n", err) + lr.Ok = false + } else { + lr.Ok = true + } + replymsg := protocol.NewRNSMsg(msg.Reply) + replymsg.Data, err = wd.be.enc.Encode(msg.Reply, lr) + if err != nil { + return errors.Wrap(err, "Encode Failed") + } + if err = protocol.ChunkSendReplyMsgWithContext(ctx, wd.be.conn, msg, replymsg, log); err != nil { + fmt.Printf("ID %d - ChunkReplyRequestMsgWithContext Failed: %s\n", wd.id, err) + } + case "remove": + var ro protocol.RemoveOp + if err := wd.be.enc.Decode(msg.Subject, msg.Data, &ro); err != nil { + return errors.Wrap(err, "Decode Failed") + } + + var rr protocol.RemoveResult + rr, err = wd.Remove(ro) + if err != nil { + fmt.Printf("List: Error: %s\n", err) + rr.Ok = false + } else { + rr.Ok = true + } + replymsg := protocol.NewRNSMsg(msg.Reply) + replymsg.Data, err = wd.be.enc.Encode(msg.Reply, rr) + if err != nil { + return errors.Wrap(err, "Encode Failed") + } + if err = protocol.ChunkSendReplyMsgWithContext(ctx, wd.be.conn, msg, replymsg, log); err != nil { + fmt.Printf("ID %d - ChunkReplyRequestMsgWithContext Failed: %s\n", wd.id, err) + } + } + fmt.Printf("id: %d Command %s Took %s\n\n", wd.id, operation, time.Since(start)) + } + } + return nil +} + +func (wd *WorkerData) OpenRepo(oo protocol.OpenOp) (bool, error) { + fs, err := FSStat(oo.Bucket) + if err != nil { + return false, errors.New("Failed to Open Repository") + } + return fs.IsDir(), nil +} + +func (wd *WorkerData) Stat(so protocol.StatOp) (fs.FileInfo, error) { + fs, err := FSStat(path.Join(so.Bucket, so.Filename)) + if err != nil { + return nil, errors.Wrap(err, "Stat") + } + return fs, nil +} +func (wd *WorkerData) Mkdir(mo protocol.MkdirOp) error { + path := path.Join(mo.Bucket, mo.Dir) + if err := FSMkDir(path); err != nil { + fmt.Printf("Mkdir Failed: %s", err) + return err + } + return nil +} + +func (wd *WorkerData) Save(so protocol.SaveOp) error { + path := path.Join(so.Bucket, so.Dir, so.Name) + len, err := FSSave(path, &so.Data, so.Offset) + if err != nil { + fmt.Printf("Save Failed: %s", err) + return err + } + if len != so.PacketSize { + fmt.Printf("Packetsize != writtensize") + return errors.New("Packetsize != Writtensize") + } + return nil +} + +func (wd *WorkerData) List(lo protocol.ListOp) (protocol.ListResult, error) { + var result protocol.ListResult + fi, err := FSListFiles(path.Join(lo.Bucket, lo.BaseDir), lo.SubDir) + if err != nil { + return protocol.ListResult{Ok: false}, errors.Wrap(err, "ListFiles") + } + result.Ok = true + result.FI = fi + return result, nil +} + +func (wd *WorkerData) Load(lo protocol.LoadOp) (protocol.LoadResult, error) { + var result protocol.LoadResult + rd, err := FSLoadFile(path.Join(lo.Bucket, lo.Dir, lo.Name)) + if err != nil { + return protocol.LoadResult{Ok: false}, errors.Wrap(err, "LoadFile") + } + defer rd.Close() + if lo.Offset > 0 { + _, err = rd.Seek(lo.Offset, 0) + if err != nil { + return protocol.LoadResult{Ok: false}, errors.Wrap(err, "Seek") + } + } + if lo.Length > 0 { + result.Data = make([]byte, lo.Length) + len, err := rd.Read(result.Data) + if err != nil { + return protocol.LoadResult{Ok: false}, errors.Wrap(err, "Read") + } + if len != lo.Length { + return protocol.LoadResult{Ok: false}, errors.Errorf("Requested Length %d != Actual Length %d", lo.Length, len) + } + } else { + result.Data, err = io.ReadAll(rd) + if err != nil { + return protocol.LoadResult{Ok: false}, errors.Wrap(err, "ReadAll") + } + } + //fmt.Printf("%+v\n", result) + return result, nil +} + +func (wd *WorkerData) Remove(ro protocol.RemoveOp) (protocol.RemoveResult, error) { + var result protocol.RemoveResult + if err := FSRemove(path.Join(ro.Bucket, ro.Dir, ro.Name)); err != nil { + return protocol.RemoveResult{Ok: false}, errors.Wrap(err, "FSRemove") + } + result.Ok = true + return result, nil +}