Skip to content

Commit f8b8acc

Browse files
authored
Issue 50 - Allow configuration of individual stores and support uncompressed stores (folbricht#62)
* Introduce new Chunk object in preparation for uncompressed store support * Make store features more customizable by passing StoreOptions to all constructors and allowing the options to be read from config or command line * Add skip-verify option to allow for efficient chaining of chunk stores * Support for uncompressed stores and chunk-servers * Add documentation * Fix failing tests and simplify chunk object usage * Fix out-of-bounds error dealing with index stores without path or URL component
1 parent 7efc19a commit f8b8acc

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+1048
-521
lines changed

README.md

+33-13
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ Among the distinguishing factors:
1212
- Where the upstream command has chosen to optimize for storage efficiency (f/e, being able to use local files as "seeds", building temporary indexes into them), this command chooses to optimize for runtime performance (maintaining a local explicit chunk store, avoiding the need to reindex) at cost to storage efficiency.
1313
- Where the upstream command has chosen to take full advantage of Linux platform features, this client chooses to implement a minimum featureset and, while high-value platform-specific features (such as support for btrfs reflinks into a decompressed local chunk cache) might be added in the future, the ability to build without them on other platforms will be maintained.
1414
- SHA512/256 is currently the only supported hash function.
15-
- Only chunk store using zstd compression are supported at this point.
15+
- Only chunk stores using zstd compression as well uncompressed are supported at this point.
1616
- Supports local stores as well as remote stores (as client) over SSH, SFTP and HTTP
1717
- Built-in HTTP(S) chunk server that can proxy multiple local or remote stores and also supports caching.
1818
- Drop-in replacement for casync on SSH servers when serving chunks read-only
@@ -22,6 +22,7 @@ Among the distinguishing factors:
2222
- Allows FUSE mounting of blob indexes
2323
- S3 protocol support to access chunk stores for read operations and some some commands that write chunks
2424
- Stores and retrieves index files from remote index stores such as HTTP, SFTP and S3
25+
- Built-in HTTP(S) index server to read/write indexes
2526
- Reflinking matching blocks (rather than copying) from seed files if supported by the filesystem (currently only Btrfs and XFS)
2627

2728
## Parallel chunking
@@ -136,24 +137,37 @@ This is a store running on the local machine on port 9000 without SSL.
136137
s3+http://127.0.0.1:9000/store
137138
```
138139

139-
#### Previous S3 storage layout
140-
Before April 2018, chunks in S3 stores were kept in a flat layout, with the name being the checksum of the chunk. Since then, the layout was modified to match that of local stores: `<4-checksum-chars>/<checksum>.cacnk` This change allows the use of other tools to convert or copy stores between local and S3 stores. To convert an existing s3 store from the old format, a command `upgrade-s3` is available in the tool.
140+
### Compressed vs Uncompressed chunk stores
141+
By default, desync reads and writes chunks in compressed form to all supported stores. This is in line with upstream casync's goal of storing in the most efficient way. It is however possible to change this behavior by providing desync with a config file (see Configuration section below). Disabling compression and store chunks uncompressed may reduce latency in some use-cases and improve performance. desync supports reading and writing uncompressed chunks to SFTP, S3, HTTP and local stores and caches. If more than one store is used, each of those can be configured independently, for example it's possible to read compressed chunks from S3 while using a local uncompressed cache for best performance. However, care needs to be taken when using the `chunk-server` command and building chains of chunk store proxies to avoid shifting the decompression load onto the server (it's possible this is actually desirable).
142+
143+
In the setup below, a client reads chunks from an HTTP chunk server which itself gets chunks from S3.
144+
```
145+
<Client> ---> <HTTP chunk server> ---> <S3 store>
146+
```
147+
If the client configures the HTTP chunk server to be uncompressed (`chunk-server` needs to be started with the `-u` option), and the chunk server reads compressed chunks from S3, then the chunk server will have to decompress every chunk that's requested before responding to the client. If the chunk server was reading uncompressed chunks from S3, there would be no overhead.
148+
149+
Compressed and uncompressed chunks can live in the same store and don't interfere with each other. A store that's configured for compressed chunks by configuring it client-side will not see the uncompressed chunks that may be present. `prune` and `verify` too will ignore any chunks written in the other format. Both kinds of chunks can be accessed by multiple clients concurrently and independently.
141150

142151
### Configuration
143152

144-
For most use cases, it is sufficient to use the tool's default configuration not requiring a config file. Having a config file `$HOME/.config/desync/config.json` allows for further customization of timeouts, error retry behaviour or credentials that can't be set via command-line options or environment variables. To view the current configuration, use `desync config`. If no config file is present, this will show the defaults. To create a config file allowing custom values, use `desync config -w` which will write the current configuration to the file, then edit the file.
153+
For most use cases, it is sufficient to use the tool's default configuration not requiring a config file. Having a config file `$HOME/.config/desync/config.json` allows for further customization of timeouts, error retry behaviour or credentials that can't be set via command-line options or environment variables. All values have sensible defaults if unconfigured. Only add configuration for values that differ from the defaults. To view the current configuration, use `desync config`. If no config file is present, this will show the defaults. To create a config file allowing custom values, use `desync config -w` which will write the current configuration to the file, then edit the file.
145154

146155
Available configuration values:
147-
- `http-timeout` - HTTP request timeout used in HTTP stores (not S3) in nanoseconds
148-
- `http-error-retry` - Number of times to retry failed chunk requests from HTTP stores
156+
- `http-timeout` *DEPRECATED, see `store-options.<Location>.timeout`* - HTTP request timeout used in HTTP stores (not S3) in nanoseconds
157+
- `http-error-retry` *DEPRECATED, see `store-options.<Location>.error-retry` - Number of times to retry failed chunk requests from HTTP stores
149158
- `s3-credentials` - Defines credentials for use with S3 stores. Especially useful if more than one S3 store is used. The key in the config needs to be the URL scheme and host used for the store, excluding the path, but including the port number if used in the store URL. It is also possible to use a [standard aws credentials file](https://docs.aws.amazon.com/cli/latest/userguide/cli-config-files.html) in order to store s3 credentials.
159+
- `store-options` - Allows customization of chunk and index stores, for example comression settings, timeouts, retry behavior and keys. Not all options are applicable to every store, some of these like `timeout` are ignored for local stores. Some of these options, such as the client certificates are overwritten with any values set in the command line. Note that the store location used in the command line needs to match the key under `store-options` exactly for these options to be used. Watch out for trailing `/` in URLs.
160+
- `timeout` - Time limit for chunk read or write operation in nanoseconds. Default: 1 minute.
161+
- `error-retry` - Number of times to retry failed chunk requests. Default: 0.
162+
- `client-cert` - Cerificate file to be used for stores where the server requires mutual SSL.
163+
- `client-key` - Key file to be used for stores where the server requires mutual SSL.
164+
- `skip-verify` - Disables data integrity verification when reading chunks to improve performance. Only recommended when chaining chunk stores with the `chunk-server` command using compressed stores.
165+
- `uncompressed` - Reads and writes uncompressed chunks from/to this store. This can improve performance, especially for local stores or caches. Compressed and uncompressed chunks can coexist in the same store, but only one kind is read or written by one client.
150166

151167
**Example config**
152168

153-
```
169+
```json
154170
{
155-
"http-timeout": 60000000000,
156-
"http-error-retry": 0,
157171
"s3-credentials": {
158172
"http://localhost": {
159173
"access-key": "MYACCESSKEY",
@@ -171,6 +185,16 @@ Available configuration values:
171185
"aws-region": "us-west-2",
172186
"aws-profile": "profile_refreshable"
173187
}
188+
},
189+
"store-options": {
190+
"https://192.168.1.1/store": {
191+
"client-cert": "/path/to/crt",
192+
"client-key": "/path/to/key",
193+
"error-retry": 1
194+
},
195+
"/path/to/local/cache": {
196+
"uncompressed": true
197+
}
174198
}
175199
}
176200
```
@@ -337,7 +361,3 @@ desync info -j -s /tmp/store -s s3+http://127.0.0.1:9000/store /path/to/index
337361
## Links
338362
- casync - https://github.com/systemd/casync
339363
- GoDoc for desync library - https://godoc.org/github.com/folbricht/desync
340-
341-
## TODOs
342-
- Pre-allocate the output file to avoid fragmentation when using extract command
343-
- Allow on-disk chunk cache to optionally be stored uncompressed, such that blocks can be directly reflinked (rather than copied) into files, when on a platform and filesystem where reflink support is available.

assemble.go

+5-18
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import (
66
"fmt"
77
"os"
88
"sync"
9-
10-
"github.com/pkg/errors"
119
)
1210

1311
// AssembleFile re-assembles a file based on a list of index chunks. It runs n
@@ -151,34 +149,23 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []
151149
// Record this chunk having been pulled from the store
152150
stats.incChunksFromStore()
153151
// Pull the (compressed) chunk from the store
154-
b, err := s.GetChunk(c.ID)
152+
chunk, err := s.GetChunk(c.ID)
155153
if err != nil {
156154
recordError(err)
157155
continue
158156
}
159-
// Since we know how big the chunk is supposed to be, pre-allocate a
160-
// slice to decompress into
161-
var db []byte
162-
db = make([]byte, c.Size)
163-
// The the chunk is compressed. Decompress it here
164-
db, err = Decompress(db, b)
157+
b, err := chunk.Uncompressed()
165158
if err != nil {
166-
recordError(errors.Wrap(err, c.ID.String()))
167-
continue
168-
}
169-
// Verify the checksum of the chunk matches the ID
170-
sum := sha512.Sum512_256(db)
171-
if sum != c.ID {
172-
recordError(fmt.Errorf("unexpected sha512/256 %s for chunk id %s", sum, c.ID))
159+
recordError(err)
173160
continue
174161
}
175162
// Might as well verify the chunk size while we're at it
176-
if c.Size != uint64(len(db)) {
163+
if c.Size != uint64(len(b)) {
177164
recordError(fmt.Errorf("unexpected size for chunk %s", c.ID))
178165
continue
179166
}
180167
// Write the decompressed chunk into the file at the right position
181-
if _, err = f.WriteAt(db, int64(c.Start)); err != nil {
168+
if _, err = f.WriteAt(b, int64(c.Start)); err != nil {
182169
recordError(err)
183170
continue
184171
}

assemble_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func TestExtract(t *testing.T) {
5353
}
5454
defer os.RemoveAll(store)
5555

56-
s, err := NewLocalStore(store)
56+
s, err := NewLocalStore(store, StoreOptions{})
5757
if err != nil {
5858
t.Fatal(err)
5959
}
@@ -67,7 +67,7 @@ func TestExtract(t *testing.T) {
6767
t.Fatal(err)
6868
}
6969
defer os.RemoveAll(blankstore)
70-
bs, err := NewLocalStore(blankstore)
70+
bs, err := NewLocalStore(blankstore, StoreOptions{})
7171
if err != nil {
7272
t.Fatal(err)
7373
}
@@ -162,7 +162,7 @@ func TestSeed(t *testing.T) {
162162
}
163163
defer os.RemoveAll(store)
164164

165-
s, err := NewLocalStore(store)
165+
s, err := NewLocalStore(store, StoreOptions{})
166166
if err != nil {
167167
t.Fatal(err)
168168
}

cache.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,25 @@ func NewCache(s Store, l WriteStore) Cache {
2323

2424
// GetChunk first asks the local store for the chunk and then the remote one.
2525
// If we get a chunk from the remote, it's stored locally too.
26-
func (c Cache) GetChunk(id ChunkID) ([]byte, error) {
27-
b, err := c.l.GetChunk(id)
26+
func (c Cache) GetChunk(id ChunkID) (*Chunk, error) {
27+
chunk, err := c.l.GetChunk(id)
2828
switch err.(type) {
2929
case nil:
30-
return b, nil
30+
return chunk, nil
3131
case ChunkMissing:
3232
default:
33-
return nil, err
33+
return chunk, err
3434
}
3535
// At this point we failed to find it in the local cache. Ask the remote
36-
b, err = c.s.GetChunk(id)
36+
chunk, err = c.s.GetChunk(id)
3737
if err != nil {
38-
return nil, err
38+
return chunk, err
3939
}
4040
// Got the chunk. Store it in the local cache for next time
41-
if err = c.l.StoreChunk(id, b); err != nil {
42-
return nil, errors.Wrap(err, "failed to store in local cache")
41+
if err = c.l.StoreChunk(chunk); err != nil {
42+
return chunk, errors.Wrap(err, "failed to store in local cache")
4343
}
44-
return b, nil
44+
return chunk, nil
4545
}
4646

4747
// HasChunk first checks the cache for the chunk, then the store.

chop.go

+4-7
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package desync
22

33
import (
44
"context"
5-
"crypto/sha512"
65
"fmt"
76
"io"
87
"os"
@@ -66,15 +65,13 @@ func ChopFile(ctx context.Context, name string, chunks []IndexChunk, ws WriteSto
6665
continue
6766
}
6867

69-
// Calculate this chunks checksum and compare to what it's supposed to be
70-
// according to the index
71-
sum := sha512.Sum512_256(b)
72-
if sum != c.ID {
73-
recordError(fmt.Errorf("chunk %s checksum does not match", c.ID))
68+
chunk, err := NewChunkWithID(c.ID, b, nil, false)
69+
if err != nil {
70+
recordError(err)
7471
continue
7572
}
7673

77-
if err := s.StoreChunk(c.ID, b); err != nil {
74+
if err := s.StoreChunk(chunk); err != nil {
7875
recordError(err)
7976
continue
8077
}

chunk.go

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package desync
2+
3+
import (
4+
"crypto/sha512"
5+
"errors"
6+
)
7+
8+
// Chunk holds chunk data compressed, uncompressed, or both. If a chunk is created
9+
// from compressed data, such as read from a compressed chunk store, and later the
10+
// application requires the uncompressed data, it'll be decompressed on demand and
11+
// also stored in Chunk. The same happens when the Chunk is made from uncompressed
12+
// bytes and then to be stored in a compressed form.
13+
type Chunk struct {
14+
compressed, uncompressed []byte
15+
id ChunkID
16+
idCalculated bool
17+
}
18+
19+
// NewChunkFromUncompressed creates a new chunk from uncompressed data.
20+
func NewChunkFromUncompressed(b []byte) *Chunk {
21+
return &Chunk{uncompressed: b}
22+
}
23+
24+
// NewChunkWithID creates a new chunk from either compressed or uncompressed data
25+
// (or both if available). It also expects an ID and validates that it matches
26+
// the uncompressed data unless skipVerify is true. If called with just compressed
27+
// data, it'll decompress it for the ID validation.
28+
func NewChunkWithID(id ChunkID, uncompressed, compressed []byte, skipVerify bool) (*Chunk, error) {
29+
c := &Chunk{id: id, uncompressed: uncompressed, compressed: compressed}
30+
if skipVerify {
31+
c.idCalculated = true // Pretend this was calculated. No need to re-calc later
32+
return c, nil
33+
}
34+
sum := c.ID()
35+
if sum != id {
36+
return nil, ChunkInvalid{ID: id, Sum: sum}
37+
}
38+
return c, nil
39+
}
40+
41+
// Compressed returns the chunk data in compressed form. If the chunk was created
42+
// with uncompressed data only, it'll be compressed, stored and returned. The
43+
// caller must not modify the data in the returned slice.
44+
func (c *Chunk) Compressed() ([]byte, error) {
45+
if len(c.compressed) > 0 {
46+
return c.compressed, nil
47+
}
48+
if len(c.uncompressed) > 0 {
49+
var err error
50+
c.compressed, err = Compress(c.uncompressed)
51+
return c.compressed, err
52+
}
53+
return nil, errors.New("no data in chunk")
54+
}
55+
56+
// Uncompressed returns the chunk data in uncompressed form. If the chunk was created
57+
// with compressed data only, it'll be decompressed, stored and returned. The
58+
// caller must not modify the data in the returned slice.
59+
func (c *Chunk) Uncompressed() ([]byte, error) {
60+
if len(c.uncompressed) > 0 {
61+
return c.uncompressed, nil
62+
}
63+
if len(c.compressed) > 0 {
64+
var err error
65+
c.uncompressed, err = Decompress(nil, c.compressed)
66+
return c.uncompressed, err
67+
}
68+
return nil, errors.New("no data in chunk")
69+
}
70+
71+
// ID returns the checksum/ID of the uncompressed chunk data. The ID is stored
72+
// after the first call and doesn't need to be re-calculated. Note that calculating
73+
// the ID may mean decompressing the data first.
74+
func (c *Chunk) ID() ChunkID {
75+
76+
if c.idCalculated {
77+
return c.id
78+
}
79+
b, err := c.Uncompressed()
80+
if err != nil {
81+
return ChunkID{}
82+
}
83+
c.id = sha512.Sum512_256(b)
84+
c.idCalculated = true
85+
return c.id
86+
}

chunkstorage.go

+5-37
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
11
package desync
22

33
import (
4-
"bytes"
5-
"fmt"
6-
"os"
74
"sync"
8-
9-
"github.com/pkg/errors"
105
)
116

127
// ChunkStorage stores chunks in a writable store. It can be safely used by multiple goroutines and
@@ -46,55 +41,28 @@ func (s *ChunkStorage) unmarkProcessed(id ChunkID) {
4641
}
4742

4843
// StoreChunk stores a single chunk in a synchronous manner.
49-
func (s *ChunkStorage) StoreChunk(id ChunkID, b []byte) (err error) {
44+
func (s *ChunkStorage) StoreChunk(chunk *Chunk) (err error) {
5045

5146
// Mark this chunk as done so no other goroutine will attempt to store it
5247
// at the same time. If this is the first time this chunk is marked, it'll
5348
// return false and we need to continue processing/storing the chunk below.
54-
if s.markProcessed(id) {
49+
if s.markProcessed(chunk.ID()) {
5550
return nil
5651
}
5752

5853
// Skip this chunk if the store already has it
59-
if s.ws.HasChunk(id) {
54+
if s.ws.HasChunk(chunk.ID()) {
6055
return nil
6156
}
6257

6358
// The chunk was marked as "processed" above. If there's a problem to actually
6459
// store it, we need to unmark it again.
6560
defer func() {
6661
if err != nil {
67-
s.unmarkProcessed(id)
62+
s.unmarkProcessed(chunk.ID())
6863
}
6964
}()
7065

71-
var retried bool
72-
retry:
73-
// Compress the chunk
74-
cb, err := Compress(b)
75-
if err != nil {
76-
return err
77-
}
78-
79-
// The zstd library appears to fail to compress correctly in some cases, to
80-
// avoid storing invalid chunks, verify the chunk again by decompressing
81-
// and comparing. See https://github.com/folbricht/desync/issues/37.
82-
// Ideally the code below should be removed once zstd library can be trusted
83-
// again.
84-
db, err := Decompress(nil, cb)
85-
if err != nil {
86-
return errors.Wrap(err, id.String())
87-
}
88-
89-
if !bytes.Equal(b, db) {
90-
if !retried {
91-
fmt.Fprintln(os.Stderr, "zstd compression error detected, retrying")
92-
retried = true
93-
goto retry
94-
}
95-
return errors.New("too many zstd compression errors, aborting")
96-
}
97-
9866
// Store the compressed chunk
99-
return s.ws.StoreChunk(id, cb)
67+
return s.ws.StoreChunk(chunk)
10068
}

cmd/desync/cache.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func cache(ctx context.Context, args []string) error {
5656
}
5757

5858
// Parse the store locations, open the stores and add a cache is requested
59-
opts := storeOptions{
59+
opts := cmdStoreOptions{
6060
n: n,
6161
clientCert: clientCert,
6262
clientKey: clientKey,

0 commit comments

Comments
 (0)