Initial Commit of restic-nats-server

This commit is contained in:
Justin Hammond 2021-10-06 16:16:18 +08:00
parent 0ab1d805e3
commit e4fe2a3dd4
11 changed files with 1306 additions and 2 deletions

1
.gitignore vendored
View file

@ -13,3 +13,4 @@
# Dependency directories (remove the comment below to include it) # Dependency directories (remove the comment below to include it)
# vendor/ # vendor/
repo/

15
.vscode/launch.json vendored Normal file
View file

@ -0,0 +1,15 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Launch Package",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "."
}
]
}

57
cache/cache.go vendored Normal file
View file

@ -0,0 +1,57 @@
package cache
import (
"context"
"log"
"time"
"github.com/buraksezer/olric"
"github.com/buraksezer/olric/config"
)
var CacheDM *olric.DMap
var db *olric.Olric
func init() {
// Deployment scenario: embedded-member
// This creates a single-node Olric cluster. It's good enough for experimenting.
// config.New returns a new config.Config with sane defaults. Available values for env:
// local, lan, wan
c := config.New("lan")
// Callback function. It's called when this node is ready to accept connections.
ctx, cancel := context.WithCancel(context.Background())
c.Started = func() {
defer cancel()
log.Println("[INFO] Olric is ready to accept connections")
}
var err error
db, err = olric.New(c)
if err != nil {
log.Fatalf("Failed to create Olric instance: %v", err)
}
go func() {
// Call Start at background. It's a blocker call.
err = db.Start()
if err != nil {
log.Fatalf("olric.Start returned an error: %v", err)
}
}()
<-ctx.Done()
CacheDM, err = db.NewDMap("bucket-of-arbitrary-items")
if err != nil {
log.Fatalf("olric.NewDMap returned an error: %v", err)
}
}
func ShutdownCache() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err := db.Shutdown(ctx)
if err != nil {
log.Printf("Failed to shutdown Olric: %v", err)
}
cancel()
}

47
go.mod Normal file → Executable file
View file

@ -1,3 +1,46 @@
module github.com/Fishwaldo/go-template module github.com/Fishwaldo/restic-nats-server
go 1.16 go 1.17
require (
github.com/Fishwaldo/restic-nats-server/protocol v0.0.0-00010101000000-000000000000
github.com/buraksezer/olric v0.4.0
github.com/nats-io/nats.go v1.12.3
github.com/pkg/errors v0.9.1
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
)
require (
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
github.com/buraksezer/connpool v0.4.0 // indirect
github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-msgpack v0.5.3 // indirect
github.com/hashicorp/go-multierror v1.0.0 // indirect
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.0 // indirect
github.com/hashicorp/logutils v1.0.0 // indirect
github.com/hashicorp/memberlist v0.1.5 // indirect
github.com/klauspost/compress v1.13.4 // indirect
github.com/miekg/dns v1.1.31 // indirect
github.com/minio/highwayhash v1.0.1 // indirect
github.com/nats-io/jwt/v2 v2.0.3 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/vmihailenco/msgpack v4.0.4+incompatible // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/net v0.0.0-20211005001312-d4b1ae081e3b // indirect
golang.org/x/sync v0.0.0-20200930132711-30421366ff76 // indirect
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 // indirect
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
)
replace github.com/Fishwaldo/restic-nats-server/protocol => /home/fish/restic-nats-server/protocol

161
go.sum Normal file
View file

@ -0,0 +1,161 @@
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/buraksezer/connpool v0.4.0 h1:fNLvWu0FOtJxL7Sqetm6+040mhZWVs8c6vrke14SEKw=
github.com/buraksezer/connpool v0.4.0/go.mod h1:qPiG7gKXo+EjrwG/yqn2StZM4ek6gcYnnGgFIVKN6b0=
github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72 h1:fUmDBbSvv1uOzo/t8WaxZMVb7BxJ8JECo5lGoR9c5bA=
github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72/go.mod h1:OEE5igu/CDjGegM1Jn6ZMo7R6LlV/JChAkjfQQIRLpg=
github.com/buraksezer/olric v0.4.0 h1:qxIpVmvRBIfgqNPeIFXzcnhUj2XwKLDY+XVDro057q4=
github.com/buraksezer/olric v0.4.0/go.mod h1:xNt+/QiiVuqioJzgTMuiV2LynTNu87L1Tp5289XuU78=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.3 h1:zKjpN5BK/P5lMYrLmBHdBULWbJ0XpYR+7NGzqkZzoD4=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-sockaddr v1.0.2 h1:ztczhD1jLxIRjVejw8gFomI1BQZOe2WoVOu0SyteCQc=
github.com/hashicorp/go-sockaddr v1.0.2/go.mod h1:rB4wwRAUzs07qva3c5SdrY/NEtAUjGlgmH/UkBUC97A=
github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/logutils v1.0.0 h1:dLEQVugN8vlakKOUE3ihGLTZJRB4j+M2cdTm/ORI65Y=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/hashicorp/memberlist v0.1.5 h1:AYBsgJOW9gab/toO5tEB8lWetVgDKZycqkebJ8xxpqM=
github.com/hashicorp/memberlist v0.1.5/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s=
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.31 h1:sJFOl9BgwbYAWOGEwr61FU28pqsBNdpRBnhGXtO06Oo=
github.com/miekg/dns v1.1.31/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM=
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
github.com/nats-io/nats-server/v2 v2.6.1 h1:cJy+ia7/4EaJL+ZYDmIy2rD1mDWTfckhtPBU0GYo8xM=
github.com/nats-io/nats-server/v2 v2.6.1/go.mod h1:Az91TbZiV7K4a6k/4v6YYdOKEoxCXj+iqhHVf/MlrKo=
github.com/nats-io/nats.go v1.12.3 h1:te0GLbRsjtejEkZKKiuk46tbfIn6FfCSv3WWSo1+51E=
github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c h1:Lgl0gzECD8GnQ5QCWA8o6BtfL6mDH5rQgM4/fX3avOs=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI=
github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201001193750-eb9a90e9f9cb/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200930145003-4acb6c075d10/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20211005001312-d4b1ae081e3b h1:SXy8Ld8oKlcogOvUAh0J5Pm5RKzgYBMMxLxt6n5XW50=
golang.org/x/net v0.0.0-20211005001312-d4b1ae081e3b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200930132711-30421366ff76 h1:JnxiSYT3Nm0BT2a8CyvYyM6cnrWpidecD1UuSYbhKm0=
golang.org/x/sync v0.0.0-20200930132711-30421366ff76/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c=
google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 h1:yiW+nvdHb9LVqSHQBXfZCieqV4fzYhNBql77zY0ykqs=
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637/go.mod h1:BHsqpu/nsuzkT5BpiH1EMZPLyqSMM8JbIavyFACoFNk=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

17
protocol/go.mod Normal file
View file

@ -0,0 +1,17 @@
module github.com/Fishwaldo/restic-nats-server/protocol
go 1.17
require (
github.com/nats-io/nats.go v1.12.3
github.com/pkg/errors v0.9.1
)
require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/nats-io/nats-server/v2 v2.6.1 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect
google.golang.org/protobuf v1.27.1 // indirect
)

63
protocol/go.sum Normal file
View file

@ -0,0 +1,63 @@
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s=
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
github.com/nats-io/nats-server/v2 v2.6.1 h1:cJy+ia7/4EaJL+ZYDmIy2rD1mDWTfckhtPBU0GYo8xM=
github.com/nats-io/nats-server/v2 v2.6.1/go.mod h1:Az91TbZiV7K4a6k/4v6YYdOKEoxCXj+iqhHVf/MlrKo=
github.com/nats-io/nats.go v1.12.3 h1:te0GLbRsjtejEkZKKiuk46tbfIn6FfCSv3WWSo1+51E=
github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=

343
protocol/protocol.go Executable file
View file

@ -0,0 +1,343 @@
package protocol
import (
"context"
"fmt"
"math/rand"
"strconv"
"strings"
"time"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
)
type NatsCommand int
const (
NatsOpenCmd NatsCommand = iota
NatsStatCmd
NatsMkdirCmd
NatsSaveCmd
NatsListCmd
NatsLoadCmd
NatsRemoveCmd
)
type OpenOp struct {
Bucket string `json:"bucket"`
}
type OpenResult struct {
Ok bool `json:"ok"`
}
type StatOp struct {
Bucket string `json:"bucket"`
Filename string `json:"filename"`
}
type StatResult struct {
Ok bool `json:"ok"`
Size int64 `json:"size"`
Name string `json:"name"`
}
type MkdirOp struct {
Bucket string `json:"bucket"`
Dir string `json:"dir"`
}
type MkdirResult struct {
Ok bool `json:"ok"`
}
type SaveOp struct {
Bucket string `json:"bucket"`
Dir string `json:"dir"`
Name string `json:"name"`
Filesize int64 `json:"size"`
PacketSize int `json:"packet_size"`
Offset int64 `json:"offset"`
Data []byte `json:"data"`
}
type SaveResult struct {
Ok bool `json:"ok"`
}
type ListOp struct {
Bucket string `json:"bucket"`
BaseDir string `json:"base_dir"`
SubDir bool `json:"sub_dir"`
}
type FileInfo struct {
Name string `json:"name"`
Size int64 `json:"size"`
}
type ListResult struct {
Ok bool `json:"ok"`
FI []FileInfo `json:"fi"`
}
type LoadOp struct {
Bucket string `json:"bucket"`
Dir string `json:"dir"`
Name string `json:"name"`
Length int `json:"length"`
Offset int64 `json:"offset"`
}
type LoadResult struct {
Ok bool `json:"ok"`
Data []byte `json:"data"`
}
type RemoveOp struct {
Bucket string `json:"bucket"`
Dir string `json:"dir"`
Name string `json:"name"`
}
type RemoveResult struct {
Ok bool `json:"ok"`
}
const (
msgHeaderID string = "X-RNS-MSGID"
msgHeaderChunk string = "X-RNS-CHUNKS"
msgHeaderChunkSubject string = "X-RNS-CHUNK-SUBJECT"
msgHeaderChunkSeq string = "X-RNS-CHUNKS-SEQ"
)
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
const (
letterIdxBits = 6 // 6 bits to represent a letter index
letterIdxMask = 1<<letterIdxBits - 1 // All 1-bits, as many as letterIdxBits
letterIdxMax = 63 / letterIdxBits // # of letter indices fitting in 63 bits
)
var src = rand.NewSource(time.Now().UnixNano())
func randStringBytesMaskImprSrcSB(n int) string {
sb := strings.Builder{}
sb.Grow(n)
// A src.Int63() generates 63 random bits, enough for letterIdxMax characters!
for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; {
if remain == 0 {
cache, remain = src.Int63(), letterIdxMax
}
if idx := int(cache & letterIdxMask); idx < len(letterBytes) {
sb.WriteByte(letterBytes[idx])
i--
}
cache >>= letterIdxBits
remain--
}
return sb.String()
}
// NewRNSMSG Returns a New RNS Message (for each "Transaction")
func NewRNSMsg(subject string) *nats.Msg {
msg := nats.NewMsg(subject)
msg.Header.Set(msgHeaderID, randStringBytesMaskImprSrcSB(16))
return msg;
}
func ChunkSendReplyMsgWithContext(ctx context.Context, conn *nats.Conn, replyto *nats.Msg, msg *nats.Msg, log func(string, ...interface{})) error {
if len(msg.Header.Get(msgHeaderID)) == 0 {
return errors.New("MessageID Not Set")
}
var maxchunksize int = int(0.95 * float32(conn.MaxPayload()))
//maxchunksize = 1024000 * 0.95
datasize := len(msg.Data)
log("ChunkSendReplyMsgWithContext: MsgID %s - Headers %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data))
if len(msg.Data) < maxchunksize {
/* data is less then our maxchunksize, so we can just send it */
log("ChunkSendReplyMsgWithContext: Short Reply Message %s", msg.Header.Get(msgHeaderID))
err := replyto.RespondMsg(msg)
return errors.Wrap(err, "Short Reply Message Send Failure")
}
/* need to Split the Data into Chunks
* we will end up sending pages + 1 messages
* as the initial message contains data as well
*/
pages := datasize / maxchunksize
initialchunk := nats.NewMsg(msg.Subject)
initialchunk.Header = msg.Header
initialchunk.Header.Set(msgHeaderChunk, fmt.Sprintf("%d", pages))
if len(msg.Data) < maxchunksize {
maxchunksize = len(msg.Data)
}
initialchunk.Data = msg.Data[:maxchunksize]
log("Chunking Reply Message %s (%s)- pages %d, len %d First Chunk %d", initialchunk.Header.Get(msgHeaderID), initialchunk.Header, pages, len(msg.Data), len(initialchunk.Data))
chunkchannelmsg, err := conn.RequestMsgWithContext(ctx, initialchunk)
if err != nil {
return errors.Wrap(err, "ChunkSendReplyMsgWithContext")
}
/* Reply Message just has a header with the subject we send the rest of the chunks to */
chunksubject := chunkchannelmsg.Header.Get(msgHeaderChunkSubject)
if chunksubject == "" {
return errors.New("Chunked Reply Response didn't include Subject")
}
for i := 1; i <= pages; i++ {
chunkmsg := nats.NewMsg(chunksubject)
chunkmsg.Header = msg.Header
chunkmsg.Header.Set(msgHeaderChunkSeq, fmt.Sprintf("%d", i))
start := maxchunksize * i
end := maxchunksize * (i + 1)
/* make sure we don't overrun our slice */
if end > len(msg.Data) {
end = len(msg.Data)
}
chunkmsg.Data = msg.Data[start:end]
log("Sending Reply Chunk %s - Page: %d (%d:%d)", chunkmsg.Header.Get(msgHeaderID), i, start, end)
var chunkack *nats.Msg
if i < pages {
chunkack, err = conn.RequestMsgWithContext(ctx, chunkmsg)
log("Chunk Ack Reply: %s %s - Page %d", chunkack.Header.Get(msgHeaderID), chunkack.Header, i)
if err != nil {
return errors.Wrap(err, "ChunkSendReplyMsgWithContext")
}
} else {
err := conn.PublishMsg(chunkmsg)
if err != nil {
return errors.Wrap(err, "ChunkSendReplyMsgWithContext")
}
}
/* all chunkackorreply */
if i == pages {
return nil
}
}
return errors.New("Failed")
}
func ChunkSendRequestMsgWithContext(ctx context.Context, conn *nats.Conn, msg *nats.Msg, log func(string, ...interface{})) (*nats.Msg, error) {
if len(msg.Header.Get(msgHeaderID)) == 0 {
return nil, errors.New("MessageID Not Set")
}
var maxchunksize int = int(0.95 * float32(conn.MaxPayload()))
//maxchunksize = 1024000 * 0.95
datasize := len(msg.Data)
log("ChunkSendRequestMsgWithContext: MsgID %s - Headers %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data))
if len(msg.Data) < maxchunksize {
/* data is less then our maxchunksize, so we can just send it */
log("Short SendRequest MsgID %s - %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data))
reply, err := conn.RequestMsgWithContext(ctx, msg)
if err != nil {
return nil, errors.Wrap(err, "Short Message Send Failure")
}
log("Short ReplyRequest MsgID %s Headers %s Size: %d", reply.Header.Get(msgHeaderID), reply.Header, len(reply.Data))
return ChunkReadRequestMsgWithContext(ctx, conn, reply, log)
}
/* need to Split the Data into Chunks
* we will end up sending pages + 1 messages
* as the initial message contains data as well
*/
pages := datasize / maxchunksize
initialchunk := nats.NewMsg(msg.Subject)
initialchunk.Header = msg.Header
initialchunk.Header.Set(msgHeaderChunk, fmt.Sprintf("%d", pages))
initialchunk.Data = msg.Data[:maxchunksize]
log("Chunking Send Request MsgID %s - %s- pages %d, len %d First Chunk %d", initialchunk.Header.Get(msgHeaderID), initialchunk.Header, pages, len(msg.Data), len(initialchunk.Data))
chunkchannelmsg, err := conn.RequestMsgWithContext(ctx, initialchunk)
if err != nil {
return nil, errors.Wrap(err, "chunkRequestMsgWithContext")
}
/* Reply Message just has a header with the subject we send the rest of the chunks to */
chunksubject := chunkchannelmsg.Header.Get(msgHeaderChunkSubject)
if chunksubject == "" {
return nil, errors.New("Chunked Response didn't include Subject")
}
for i := 1; i <= pages; i++ {
chunkmsg := nats.NewMsg(chunksubject)
chunkmsg.Header = msg.Header
chunkmsg.Header.Set(msgHeaderChunkSeq, fmt.Sprintf("%d", i))
start := maxchunksize * i
end := maxchunksize * (i + 1)
/* make sure we don't overrun our slice */
if end > len(msg.Data) {
end = len(msg.Data)
}
chunkmsg.Data = msg.Data[start:end]
log("Sending Request Chunk %s %s - Page: %d (%d:%d)", chunkmsg.Header.Get(msgHeaderID), chunkmsg.Header, i, start, end)
var chunkackorreply *nats.Msg
chunkackorreply, err = conn.RequestMsgWithContext(ctx, chunkmsg)
if err != nil {
return nil, errors.Wrap(err, "chunkRequestMsgWithContext")
}
/* only the last Chunk Reply will contain the actual Response from the other side */
if i == pages {
log("SendRequest Chunk Reply: MsgID %s Headers %s Size: %d", chunkackorreply.Header.Get(msgHeaderID), chunkackorreply.Header, len(chunkackorreply.Data))
return ChunkReadRequestMsgWithContext(ctx, conn, chunkackorreply, log)
}
}
return nil, errors.New("Failed")
}
func ChunkReadRequestMsgWithContext(ctx context.Context, conn *nats.Conn, msg *nats.Msg, log func(string, ...interface{})) (*nats.Msg, error) {
if len(msg.Header.Get(msgHeaderID)) == 0 {
return nil, errors.New("MessageID Not Set")
}
log("ChunkReadRequestMsgWithContext: MsgID %s - Headers %s Size: %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data))
chunked := msg.Header.Get(msgHeaderChunk)
if chunked != "" {
pages, err := strconv.Atoi(chunked)
if err != nil {
return nil, errors.Wrap(err, "Couldn't get Chunk Page Count")
}
log("Chunked Message Recieved: %s - %s - %d pages", msg.Header.Get(msgHeaderID), msg.Header, pages)
chunktransfersubject := conn.NewRespInbox()
chunkchan := make(chan *nats.Msg)
sub, err := conn.ChanSubscribe(chunktransfersubject, chunkchan)
if err != nil {
return nil, errors.Wrap(err, "Couldn't Subscribe to Chunk Channel")
}
defer sub.Unsubscribe()
defer close(chunkchan)
chunksubmsg := nats.NewMsg(msg.Reply)
chunksubmsg.Header = msg.Header
chunksubmsg.Header.Add(msgHeaderChunkSubject, chunktransfersubject)
msg.RespondMsg(chunksubmsg)
/* pages - 1 because we got first Chunk in original message */
for i := 1; i <= pages; i++ {
log("Pending MsgID %s Chunk %d of %d on %s", chunksubmsg.Header.Get(msgHeaderID), i, pages, chunktransfersubject)
select {
case chunk := <-chunkchan:
seq, _ := strconv.Atoi(chunk.Header.Get(msgHeaderChunkSeq))
log("Got MsgID %s - %s Chunk %d %d", chunk.Header.Get(msgHeaderID), chunk.Header, seq, i)
msg.Data = append(msg.Data, chunk.Data...)
if i < pages {
ackChunk := nats.NewMsg(chunk.Subject)
ackChunk.Header = chunk.Header
err := chunk.RespondMsg(ackChunk)
if err != nil {
return nil, errors.Wrap(err, "Chunk Reply Error")
}
} else {
msg.Reply = chunk.Reply
}
case <-ctx.Done():
log("Context Canceled")
return nil, context.DeadlineExceeded
}
}
log("Chunked Messages Done - %s - %s Final Size %d", msg.Header.Get(msgHeaderID), msg.Header, len(msg.Data))
}
return msg, nil
}

106
repo.go Executable file
View file

@ -0,0 +1,106 @@
package main
import (
"io/fs"
"io/ioutil"
"os"
"path"
"path/filepath"
"github.com/Fishwaldo/restic-nats-server/protocol"
"github.com/pkg/errors"
)
func FSStat(repo string) (fs.FileInfo, error) {
pwd, _ := os.Getwd()
fs, err := os.Stat(path.Join(pwd, "repo", repo))
if err != nil {
return nil, errors.Wrap(err, "Stat on Repo Failed")
}
return fs, nil
}
func FSMkDir(dir string) (error) {
pwd, _ := os.Getwd()
return os.MkdirAll(path.Join(pwd, "repo", dir), 0700)
}
func FSSave(file string, data *[]byte, offset int64) (int, error) {
pwd, _ := os.Getwd()
filename := path.Join(pwd, "repo", file)
tmpname := filepath.Base(filename) + "-tmp-"
f, err := ioutil.TempFile(filepath.Dir(filename), tmpname)
if err != nil {
return 0, errors.Wrap(err, "Tempfile")
}
defer func(f *os.File) {
if err != nil {
_ = f.Close() // Double Close is harmless.
// Remove after Rename is harmless: we embed the final name in the
// temporary's name and no other goroutine will get the same data to
// Save, so the temporary name should never be reused by another
// goroutine.
_ = os.Remove(f.Name())
}
}(f)
len, err := f.Write(*data)
if err != nil {
return 0, errors.Wrap(err, "Write Failed")
}
if err := f.Close(); err != nil {
return 0, errors.Wrap(err, "Close")
}
if err := os.Rename(f.Name(), filename); err != nil {
return 0, errors.Wrap(err, "Rename")
}
return len, nil
}
func FSListFiles(dir string, recursive bool) ([]protocol.FileInfo, error) {
pwd, _ := os.Getwd()
finaldir := path.Join(pwd, "repo", dir)
d, err := os.Open(finaldir)
if err != nil {
return nil, err
}
defer d.Close()
sub, err := d.Readdir(-1)
if err != nil {
return nil, err
}
var result []protocol.FileInfo
for _, fi := range sub {
if fi.IsDir() {
/* dont' recursive more than 1 level */
test, err := FSListFiles(path.Join(dir, fi.Name()), false)
if err != nil {
return nil, err
}
result = append(result, test...)
}
if fi.Name() != "" {
result = append(result, protocol.FileInfo{Name: fi.Name(), Size: fi.Size()})
}
}
return result, err
}
func FSLoadFile(filename string) (*os.File, error) {
pwd, _ := os.Getwd()
finalname := path.Join(pwd, "repo", filename)
fs, err := os.Open(finalname)
if err != nil {
return nil, errors.Wrap(err, "LoadFile")
}
return fs, nil
}
func FSRemove(filename string) error {
pwd, _ := os.Getwd()
finalname := path.Join(pwd, "repo", filename)
return os.Remove(finalname)
}

185
rns.go Executable file
View file

@ -0,0 +1,185 @@
package main
import (
"fmt"
"net/url"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"github.com/Fishwaldo/restic-nats-server/cache"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
"gopkg.in/tomb.v2"
)
type Config struct {
Server *url.URL
Credential string `option:"credentialfile" help:"Path to the NatsIO Credential File"`
Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 5)"`
Repo string
}
type Backend struct {
cfg Config
conn *nats.Conn
// js nats.JetStreamContext
// buStream *nats.StreamInfo
buCommands *nats.Subscription
mx sync.Mutex
t tomb.Tomb
enc nats.Encoder
}
func createStreamConfig(name string) *nats.StreamConfig {
var subjects []string
subjects = append(subjects, "repo.commands.>")
sc := nats.StreamConfig{
Storage: nats.FileStorage,
// Description: fmt.Sprintf("Stream for %s", name),
Retention: nats.WorkQueuePolicy,
Name: name,
Subjects: subjects,
}
return &sc
}
// func createConsumerBUConfig() *nats.ConsumerConfig {
// cc := nats.ConsumerConfig {
// Description: fmt.Sprintf("Consumer for me"),
// //DeliverGroup: "workers",
// DeliverPolicy: nats.DeliverAllPolicy,
// AckPolicy: nats.AckExplicitPolicy,
// FilterSubject: "repo.commands.open",
// //Durable: "Workers",
// }
// return &cc
// }
func connectNats(be *Backend) error {
be.mx.Lock()
defer be.mx.Unlock()
server := be.cfg.Server.Hostname()
port := be.cfg.Server.Port()
if port == "" {
port = "4222"
}
url := fmt.Sprintf("nats://%s:%s", server, port)
/* Check Credential File Exists */
_, err := os.Stat(be.cfg.Credential)
if err != nil {
return errors.Wrap(err, "credential file missing")
}
var options []nats.Option
options = append(options, nats.UserCredentials(be.cfg.Credential))
options = append(options, nats.ClosedHandler(natsClosedCB))
options = append(options, nats.DisconnectHandler(natsDisconnectedCB))
be.conn, err = nats.Connect(url, options...)
if err != nil {
return errors.Wrap(err, "nats connection failed")
}
if size := be.conn.MaxPayload(); size < 8388608 {
return errors.New("NATS Server Max Payload Size is below 8Mb")
}
fmt.Printf("Connected to %s (%s)\n", be.conn.ConnectedClusterName(), be.conn.ConnectedServerName())
fmt.Printf("Nats Message Size: %d\n", be.conn.MaxPayload())
be.enc = nats.EncoderForType("gob")
// be.js, err = be.conn.JetStream()
// if err != nil {
// return errors.Wrap(err, "can't get nats jetstream context")
// }
// fmt.Printf("Stream Name: %s\n", be.cfg.Repo)
// be.buStream, err = be.js.StreamInfo(be.cfg.Repo)
// if err != nil {
// if err == nats.ErrStreamNotFound {
// /* create the Stream */
// sc := createStreamConfig(be.cfg.Repo)
// fmt.Printf("Creating Stream %s\n", sc.Name)
// be.buStream, err = be.js.AddStream(sc)
// if err != nil {
// return errors.Wrapf(err, "Can't Create Stream %#v", sc)
// }
// } else {
// return errors.Wrap(err, "Error getting StreamInfo")
// }
// }
// var subOps []nats.SubOpt
// //subOps = append(subOps, nats.Durable("commands"))
// subOps = append(subOps, nats.AckExplicit())
// subOps = append(subOps, nats.BindStream(be.buStream.Config.Name))
// subOps = append(subOps, nats.MaxAckPending(5))
be.buCommands, err = be.conn.QueueSubscribeSync("repo.commands.*", "opencommandworkers")
if err != nil {
return errors.Wrap(err, "Can't Add Consumer")
}
// fmt.Printf("Stream %#v\n", be.buStream)
return nil
}
func natsClosedCB(conn *nats.Conn) {
fmt.Printf("Connection Closed: %s", conn.LastError())
}
func natsDisconnectedCB(conn *nats.Conn) {
fmt.Printf("Connection Disconnected: %s", conn.LastError())
}
func main() {
url, _ := url.Parse("nats://gw-docker-1.dmz.dynam.ac:4222/backup")
cfg := Config{
Server: url,
Credential: "/home/fish/.nkeys/creds/Operator/Backup/restic.creds",
}
var repo string
if cfg.Server.Path[0] == '/' {
repo = cfg.Server.Path[1:]
}
if repo[len(repo)-1] == '/' {
repo = repo[0 : len(repo)-1]
}
// replace any further slashes with . to specify a nested queue
repo = strings.Replace(repo, "/", ".", -1)
cfg.Repo = repo
be := &Backend{
cfg: cfg,
}
if err := connectNats(be); err != nil {
fmt.Printf("Error Connecting to Nats: %s\n", err)
os.Exit(0)
}
for i :=0; i < 50; i++ {
wd := WorkerData{id: i, be: be}
be.t.Go(wd.Run);
}
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
s := <-signalChan
be.t.Kill(errors.New("Shutting Down"))
if err := be.t.Wait(); err != nil {
fmt.Printf("Workers Reported Error: %s", err)
}
fmt.Printf("Got Shutdown Signal %s", s)
cache.ShutdownCache()
os.Exit(0)
}

313
rnsworker.go Executable file
View file

@ -0,0 +1,313 @@
package main
import (
"context"
"fmt"
"io"
"io/fs"
"path"
"time"
"github.com/Fishwaldo/restic-nats-server/protocol"
// "github.com/nats-io/nats.go"
"github.com/pkg/errors"
)
type WorkerData struct {
id int
be *Backend
cancel context.CancelFunc
}
func (wd *WorkerData) Run() error {
for {
var ctx context.Context
ctx, wd.cancel = context.WithCancel(context.Background())
netctx, _ := context.WithTimeout(ctx, 1*time.Second)
select {
case <-wd.be.t.Dying():
fmt.Printf("Killing Worker %d\n", wd.id)
wd.cancel()
return nil
default:
}
//wd.be.mx.Lock()
msg, err := wd.be.buCommands.NextMsgWithContext(netctx)
//wd.be.mx.Unlock()
if err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
fmt.Printf("id: %d Error: %s %t\n", wd.id, err, err)
return err
}
}
if msg != nil {
jobctx, _ := context.WithTimeout(ctx, 120 * time.Second)
fmt.Printf("id: %d Message: %s %s\n", wd.id, msg.Subject, msg.Sub.Queue)
start := time.Now()
log := func(msg string, args ...interface{}) {
fmt.Printf("ID: %d - %s\n", wd.id, fmt.Sprintf(msg, args...))
}
msg, err := protocol.ChunkReadRequestMsgWithContext(jobctx, wd.be.conn, msg, log)
if err != nil {
fmt.Printf("ID: %d - ChunkedRead Failed: %s\n", wd.id, err)
continue
}
operation := msg.Header.Get("X-RNS-OP")
switch operation {
case "open":
var oo protocol.OpenOp
or := protocol.OpenResult{Ok: false}
if err := wd.be.enc.Decode(msg.Subject, msg.Data, &oo); err != nil {
fmt.Printf("ID: %d - Decode Failed: %s\n", wd.id, err)
}
or.Ok, err = wd.OpenRepo(oo)
if err != nil {
or.Ok = false
fmt.Printf("ID: %d - OpenOp: Error: %s\n", wd.id, err)
}
replymsg := protocol.NewRNSMsg(msg.Reply)
replymsg.Data, err = wd.be.enc.Encode(msg.Reply, or)
if err != nil {
fmt.Printf("ID %d - Encode Failed: %s\n", wd.id, err)
}
if err = protocol.ChunkSendReplyMsgWithContext(ctx, wd.be.conn, msg, replymsg, log); err != nil {
fmt.Printf("ID %d - ChunkReplyRequestMsgWithContext Failed: %s\n", wd.id, err)
}
case "stat":
var so protocol.StatOp
if err := wd.be.enc.Decode(msg.Subject, msg.Data, &so); err != nil {
return errors.Wrap(err, "Decode Failed")
}
fi, err := wd.Stat(so)
var sr protocol.StatResult
if err != nil {
fmt.Printf("Stat: Error: %s\n", err)
sr.Ok = false
} else {
sr.Ok = true
sr.Size = fi.Size()
sr.Name = fi.Name()
}
replymsg := protocol.NewRNSMsg(msg.Reply)
replymsg.Data, err = wd.be.enc.Encode(msg.Reply, sr)
if err != nil {
return errors.Wrap(err, "Encode Failed")
}
if err = protocol.ChunkSendReplyMsgWithContext(ctx, wd.be.conn, msg, replymsg, log); err != nil {
fmt.Printf("ID %d - ChunkReplyRequestMsgWithContext Failed: %s\n", wd.id, err)
}
case "mkdir":
var mo protocol.MkdirOp
if err := wd.be.enc.Decode(msg.Subject, msg.Data, &mo); err != nil {
return errors.Wrap(err, "Decode Failed")
}
var mr protocol.MkdirResult
err := wd.Mkdir(mo)
if err != nil {
fmt.Printf("Mkdir: Error: %s\n", err)
mr.Ok = false
} else {
mr.Ok = true
}
replymsg := protocol.NewRNSMsg(msg.Reply)
replymsg.Data, err = wd.be.enc.Encode(msg.Reply, mr)
if err != nil {
return errors.Wrap(err, "Encode Failed")
}
if err = protocol.ChunkSendReplyMsgWithContext(ctx, wd.be.conn, msg, replymsg, log); err != nil {
fmt.Printf("ID %d - ChunkReplyRequestMsgWithContext Failed: %s\n", wd.id, err)
}
case "save":
var so protocol.SaveOp
if err := wd.be.enc.Decode(msg.Subject, msg.Data, &so); err != nil {
return errors.Wrap(err, "Decode Failed")
}
var sr protocol.SaveResult
err := wd.Save(so)
if err != nil {
fmt.Printf("Save: Error: %s\n", err)
sr.Ok = false
} else {
sr.Ok = true
}
replymsg := protocol.NewRNSMsg(msg.Reply)
replymsg.Data, err = wd.be.enc.Encode(msg.Reply, sr)
if err != nil {
return errors.Wrap(err, "Encode Failed")
}
if err = protocol.ChunkSendReplyMsgWithContext(ctx, wd.be.conn, msg, replymsg, log); err != nil {
fmt.Printf("ID %d - ChunkReplyRequestMsgWithContext Failed: %s\n", wd.id, err)
}
case "list":
var lo protocol.ListOp
if err := wd.be.enc.Decode(msg.Subject, msg.Data, &lo); err != nil {
return errors.Wrap(err, "Decode Failed")
}
var lr protocol.ListResult
lr, err = wd.List(lo)
if err != nil {
fmt.Printf("List: Error: %s\n", err)
lr.Ok = false
} else {
lr.Ok = true
}
replymsg := protocol.NewRNSMsg(msg.Reply)
replymsg.Data, err = wd.be.enc.Encode(msg.Reply, lr)
if err != nil {
return errors.Wrap(err, "Encode Failed")
}
if err = protocol.ChunkSendReplyMsgWithContext(ctx, wd.be.conn, msg, replymsg, log); err != nil {
fmt.Printf("ID %d - ChunkReplyRequestMsgWithContext Failed: %s\n", wd.id, err)
}
case "load":
var lo protocol.LoadOp
if err := wd.be.enc.Decode(msg.Subject, msg.Data, &lo); err != nil {
return errors.Wrap(err, "Decode Failed")
}
var lr protocol.LoadResult
lr, err = wd.Load(lo)
if err != nil {
fmt.Printf("List: Error: %s\n", err)
lr.Ok = false
} else {
lr.Ok = true
}
replymsg := protocol.NewRNSMsg(msg.Reply)
replymsg.Data, err = wd.be.enc.Encode(msg.Reply, lr)
if err != nil {
return errors.Wrap(err, "Encode Failed")
}
if err = protocol.ChunkSendReplyMsgWithContext(ctx, wd.be.conn, msg, replymsg, log); err != nil {
fmt.Printf("ID %d - ChunkReplyRequestMsgWithContext Failed: %s\n", wd.id, err)
}
case "remove":
var ro protocol.RemoveOp
if err := wd.be.enc.Decode(msg.Subject, msg.Data, &ro); err != nil {
return errors.Wrap(err, "Decode Failed")
}
var rr protocol.RemoveResult
rr, err = wd.Remove(ro)
if err != nil {
fmt.Printf("List: Error: %s\n", err)
rr.Ok = false
} else {
rr.Ok = true
}
replymsg := protocol.NewRNSMsg(msg.Reply)
replymsg.Data, err = wd.be.enc.Encode(msg.Reply, rr)
if err != nil {
return errors.Wrap(err, "Encode Failed")
}
if err = protocol.ChunkSendReplyMsgWithContext(ctx, wd.be.conn, msg, replymsg, log); err != nil {
fmt.Printf("ID %d - ChunkReplyRequestMsgWithContext Failed: %s\n", wd.id, err)
}
}
fmt.Printf("id: %d Command %s Took %s\n\n", wd.id, operation, time.Since(start))
}
}
return nil
}
func (wd *WorkerData) OpenRepo(oo protocol.OpenOp) (bool, error) {
fs, err := FSStat(oo.Bucket)
if err != nil {
return false, errors.New("Failed to Open Repository")
}
return fs.IsDir(), nil
}
func (wd *WorkerData) Stat(so protocol.StatOp) (fs.FileInfo, error) {
fs, err := FSStat(path.Join(so.Bucket, so.Filename))
if err != nil {
return nil, errors.Wrap(err, "Stat")
}
return fs, nil
}
func (wd *WorkerData) Mkdir(mo protocol.MkdirOp) error {
path := path.Join(mo.Bucket, mo.Dir)
if err := FSMkDir(path); err != nil {
fmt.Printf("Mkdir Failed: %s", err)
return err
}
return nil
}
func (wd *WorkerData) Save(so protocol.SaveOp) error {
path := path.Join(so.Bucket, so.Dir, so.Name)
len, err := FSSave(path, &so.Data, so.Offset)
if err != nil {
fmt.Printf("Save Failed: %s", err)
return err
}
if len != so.PacketSize {
fmt.Printf("Packetsize != writtensize")
return errors.New("Packetsize != Writtensize")
}
return nil
}
func (wd *WorkerData) List(lo protocol.ListOp) (protocol.ListResult, error) {
var result protocol.ListResult
fi, err := FSListFiles(path.Join(lo.Bucket, lo.BaseDir), lo.SubDir)
if err != nil {
return protocol.ListResult{Ok: false}, errors.Wrap(err, "ListFiles")
}
result.Ok = true
result.FI = fi
return result, nil
}
func (wd *WorkerData) Load(lo protocol.LoadOp) (protocol.LoadResult, error) {
var result protocol.LoadResult
rd, err := FSLoadFile(path.Join(lo.Bucket, lo.Dir, lo.Name))
if err != nil {
return protocol.LoadResult{Ok: false}, errors.Wrap(err, "LoadFile")
}
defer rd.Close()
if lo.Offset > 0 {
_, err = rd.Seek(lo.Offset, 0)
if err != nil {
return protocol.LoadResult{Ok: false}, errors.Wrap(err, "Seek")
}
}
if lo.Length > 0 {
result.Data = make([]byte, lo.Length)
len, err := rd.Read(result.Data)
if err != nil {
return protocol.LoadResult{Ok: false}, errors.Wrap(err, "Read")
}
if len != lo.Length {
return protocol.LoadResult{Ok: false}, errors.Errorf("Requested Length %d != Actual Length %d", lo.Length, len)
}
} else {
result.Data, err = io.ReadAll(rd)
if err != nil {
return protocol.LoadResult{Ok: false}, errors.Wrap(err, "ReadAll")
}
}
//fmt.Printf("%+v\n", result)
return result, nil
}
func (wd *WorkerData) Remove(ro protocol.RemoveOp) (protocol.RemoveResult, error) {
var result protocol.RemoveResult
if err := FSRemove(path.Join(ro.Bucket, ro.Dir, ro.Name)); err != nil {
return protocol.RemoveResult{Ok: false}, errors.Wrap(err, "FSRemove")
}
result.Ok = true
return result, nil
}