Skip to content

Commit 183e370

Browse files
committed
Maintain a pool of SFTP connections rather than sharing one (folbricht#69)
1 parent 50ca041 commit 183e370

File tree

1 file changed

+59
-28
lines changed

1 file changed

+59
-28
lines changed

sftp.go

+59-28
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ type SFTPStoreBase struct {
3030

3131
// SFTPStore is a chunk store that uses SFTP over SSH.
3232
type SFTPStore struct {
33-
*SFTPStoreBase
33+
pool chan *SFTPStoreBase
34+
location *url.URL
35+
n int
3436
}
3537

3638
// Creates a base sftp client
@@ -117,19 +119,37 @@ func (s *SFTPStoreBase) String() string {
117119
return s.location.String()
118120
}
119121

122+
// Returns the path for a chunk
123+
func (s *SFTPStoreBase) nameFromID(id ChunkID) string {
124+
sID := id.String()
125+
name := s.path + sID[0:4] + "/" + sID
126+
if s.opt.Uncompressed {
127+
name += UncompressedChunkExt
128+
} else {
129+
name += CompressedChunkExt
130+
}
131+
return name
132+
}
133+
120134
// NewSFTPStore initializes a chunk store using SFTP over SSH.
121135
func NewSFTPStore(location *url.URL, opt StoreOptions) (*SFTPStore, error) {
122-
b, err := newSFTPStoreBase(location, opt)
123-
if err != nil {
124-
return nil, err
136+
s := &SFTPStore{make(chan *SFTPStoreBase, opt.N), location, opt.N}
137+
for i := 0; i < opt.N; i++ {
138+
c, err := newSFTPStoreBase(location, opt)
139+
if err != nil {
140+
return nil, err
141+
}
142+
s.pool <- c
125143
}
126-
return &SFTPStore{b}, nil
144+
return s, nil
127145
}
128146

129147
// GetChunk returns a chunk from an SFTP store, returns ChunkMissing if the file does not exist
130148
func (s *SFTPStore) GetChunk(id ChunkID) (*Chunk, error) {
131-
name := s.nameFromID(id)
132-
f, err := s.client.Open(name)
149+
c := <-s.pool
150+
defer func() { s.pool <- c }()
151+
name := c.nameFromID(id)
152+
f, err := c.client.Open(name)
133153
if err != nil {
134154
if os.IsNotExist(err) {
135155
err = ChunkMissing{id}
@@ -141,51 +161,59 @@ func (s *SFTPStore) GetChunk(id ChunkID) (*Chunk, error) {
141161
if err != nil {
142162
return nil, errors.Wrapf(err, "unable to read from %s", name)
143163
}
144-
if s.opt.Uncompressed {
145-
return NewChunkWithID(id, b, nil, s.opt.SkipVerify)
164+
if c.opt.Uncompressed {
165+
return NewChunkWithID(id, b, nil, c.opt.SkipVerify)
146166
}
147-
return NewChunkWithID(id, nil, b, s.opt.SkipVerify)
167+
return NewChunkWithID(id, nil, b, c.opt.SkipVerify)
148168
}
149169

150170
// RemoveChunk deletes a chunk, typically an invalid one, from the filesystem.
151171
// Used when verifying and repairing caches.
152172
func (s *SFTPStore) RemoveChunk(id ChunkID) error {
153-
name := s.nameFromID(id)
154-
if _, err := s.client.Stat(name); err != nil {
173+
c := <-s.pool
174+
defer func() { s.pool <- c }()
175+
name := c.nameFromID(id)
176+
if _, err := c.client.Stat(name); err != nil {
155177
return ChunkMissing{id}
156178
}
157-
return s.client.Remove(name)
179+
return c.client.Remove(name)
158180
}
159181

160182
// StoreChunk adds a new chunk to the store
161183
func (s *SFTPStore) StoreChunk(chunk *Chunk) error {
162-
name := s.nameFromID(chunk.ID())
184+
c := <-s.pool
185+
defer func() { s.pool <- c }()
186+
name := c.nameFromID(chunk.ID())
163187
var (
164188
b []byte
165189
err error
166190
)
167-
if s.opt.Uncompressed {
191+
if c.opt.Uncompressed {
168192
b, err = chunk.Uncompressed()
169193
} else {
170194
b, err = chunk.Compressed()
171195
}
172196
if err != nil {
173197
return err
174198
}
175-
return s.StoreObject(name, bytes.NewReader(b))
199+
return c.StoreObject(name, bytes.NewReader(b))
176200
}
177201

178202
// HasChunk returns true if the chunk is in the store
179203
func (s *SFTPStore) HasChunk(id ChunkID) bool {
180-
name := s.nameFromID(id)
181-
_, err := s.client.Stat(name)
204+
c := <-s.pool
205+
defer func() { s.pool <- c }()
206+
name := c.nameFromID(id)
207+
_, err := c.client.Stat(name)
182208
return err == nil
183209
}
184210

185211
// Prune removes any chunks from the store that are not contained in a list
186212
// of chunks
187213
func (s *SFTPStore) Prune(ctx context.Context, ids map[ChunkID]struct{}) error {
188-
walker := s.client.Walk(s.path)
214+
c := <-s.pool
215+
defer func() { s.pool <- c }()
216+
walker := c.client.Walk(c.path)
189217

190218
for walker.Step() {
191219
// See if we're meant to stop
@@ -207,7 +235,7 @@ func (s *SFTPStore) Prune(ctx context.Context, ids map[ChunkID]struct{}) error {
207235
}
208236
// Skip compressed chunks if this is running in uncompressed mode and vice-versa
209237
var sID string
210-
if s.opt.Uncompressed {
238+
if c.opt.Uncompressed {
211239
if !strings.HasSuffix(path, UncompressedChunkExt) {
212240
return nil
213241
}
@@ -235,13 +263,16 @@ func (s *SFTPStore) Prune(ctx context.Context, ids map[ChunkID]struct{}) error {
235263
return nil
236264
}
237265

238-
func (s *SFTPStore) nameFromID(id ChunkID) string {
239-
sID := id.String()
240-
name := s.path + sID[0:4] + "/" + sID
241-
if s.opt.Uncompressed {
242-
name += UncompressedChunkExt
243-
} else {
244-
name += CompressedChunkExt
266+
// Close terminates all client connections
267+
func (s *SFTPStore) Close() error {
268+
var err error
269+
for i := 0; i < s.n; i++ {
270+
c := <-s.pool
271+
err = c.Close()
245272
}
246-
return name
273+
return err
274+
}
275+
276+
func (s *SFTPStore) String() string {
277+
return s.location.String()
247278
}

0 commit comments

Comments
 (0)