Skip to content

Commit

Permalink
add isnew + await flush
Browse files Browse the repository at this point in the history
  • Loading branch information
mh0lt committed Jul 17, 2024
1 parent 636e8d6 commit fd84feb
Show file tree
Hide file tree
Showing 24 changed files with 110 additions and 41 deletions.
6 changes: 5 additions & 1 deletion bad_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,18 @@ func (p badStoragePiece) Completion() storage.Completion {
return storage.Completion{Complete: true, Ok: true}
}

func (p badStoragePiece) MarkComplete() error {
func (p badStoragePiece) MarkComplete(awaitFlush bool) error {
return errors.New("psyyyyyyyche")
}

func (p badStoragePiece) MarkNotComplete() error {
return errors.New("psyyyyyyyche")
}

func (p badStoragePiece) IsNew() bool {
return false
}

func (p badStoragePiece) randomlyTruncatedDataString() string {
return testutil.GreetingFileContents[:rand.Intn(14)]
}
Expand Down
2 changes: 1 addition & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf
for i := 0; i < info.NumPieces(); i++ {
p := info.Piece(i)
if alreadyCompleted {
require.NoError(t, greetingData.Piece(p).MarkComplete())
require.NoError(t, greetingData.Piece(p).MarkComplete(false))
}
}
cfg := TestingConfig(t)
Expand Down
1 change: 1 addition & 0 deletions mmap_span/mmap_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type MMapSpan struct {
mMaps []Mmap
dirtyPieces roaring.Bitmap
segmentLocater segments.Index
Created bool
InfoHash infohash.T
flushTimer *time.Timer
FlushTime time.Duration
Expand Down
6 changes: 5 additions & 1 deletion peerconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,18 @@ func (me *torrentStorage) Completion() storage.Completion {
return storage.Completion{}
}

func (me *torrentStorage) MarkComplete() error {
func (me *torrentStorage) MarkComplete(awaitFlush bool) error {
return nil
}

func (me *torrentStorage) MarkNotComplete() error {
return nil
}

func (me *torrentStorage) IsNew() bool {
return false
}

func (me *torrentStorage) ReadAt([]byte, int64) (int, error) {
panic("shouldn't be called")
}
Expand Down
4 changes: 2 additions & 2 deletions piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ func (p *Piece) bytesLeft() (ret pp.Integer) {

// Forces the piece data to be rehashed.
func (p *Piece) VerifyData() {
p.t.cl.lock()
defer p.t.cl.unlock()
p.t.mu.Lock()
defer p.t.mu.Unlock()

p.mu.RLock()
target := p.numVerifies + 1
Expand Down
6 changes: 5 additions & 1 deletion request-strategy-impls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s storagePiece) WriteAt(p []byte, off int64) (n int, err error) {
panic("implement me")
}

func (s storagePiece) MarkComplete() error {
func (s storagePiece) MarkComplete(awaitFlush bool) error {
//TODO implement me
panic("implement me")
}
Expand All @@ -55,6 +55,10 @@ func (s storagePiece) MarkNotComplete() error {
panic("implement me")
}

func (s storagePiece) IsNew() bool {
return false
}

func (s storagePiece) Completion() storage.Completion {
return storage.Completion{Ok: true, Complete: s.complete}
}
Expand Down
2 changes: 1 addition & 1 deletion storage/bolt-piece-completion.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (me boltPieceCompletion) Get(pk metainfo.PieceKey) (cn Completion, err erro
return
}

func (me boltPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
func (me boltPieceCompletion) Set(pk metainfo.PieceKey, b bool, awaitFlush bool) error {
if c, err := me.Get(pk); err == nil && c.Ok && c.Complete == b {
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions storage/bolt-piece-completion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ func TestBoltPieceCompletion(t *testing.T) {
require.NoError(t, err)
assert.False(t, b.Ok)

require.NoError(t, pc.Set(pk, false))
require.NoError(t, pc.Set(pk, false, false))

b, err = pc.Get(pk)
require.NoError(t, err)
assert.Equal(t, Completion{Complete: false, Ok: true}, b)

require.NoError(t, pc.Set(pk, true))
require.NoError(t, pc.Set(pk, true, false))

b, err = pc.Get(pk)
require.NoError(t, err)
Expand Down
10 changes: 7 additions & 3 deletions storage/bolt-piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ func (me *boltPiece) Completion() Completion {
return c
}

func (me *boltPiece) MarkComplete() error {
return me.pc().Set(me.pk(), true)
func (me *boltPiece) MarkComplete(awaitFlush bool) error {
return me.pc().Set(me.pk(), true, awaitFlush)
}

func (me *boltPiece) MarkNotComplete() error {
return me.pc().Set(me.pk(), false)
return me.pc().Set(me.pk(), false, false)
}

func (me *boltPiece) ReadAt(b []byte, off int64) (n int, err error) {
Expand Down Expand Up @@ -110,3 +110,7 @@ func (me *boltPiece) WriteAt(b []byte, off int64) (n int, err error) {
})
return
}

func (me *boltPiece) IsNew() bool {
return false
}
6 changes: 5 additions & 1 deletion storage/disabled/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (Piece) WriteAt(p []byte, off int64) (n int, err error) {
return
}

func (Piece) MarkComplete() error {
func (Piece) MarkComplete(awaitFlush bool) error {
return errors.New("disabled")
}

Expand All @@ -50,3 +50,7 @@ func (Piece) Completion() storage.Completion {
Ok: true,
}
}

func (Piece) IsNew() bool {
return false
}
12 changes: 8 additions & 4 deletions storage/file-piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,20 @@ func (fs *filePieceImpl) Completion() Completion {
if !verified {
// The completion was wrong, fix it.
c.Complete = false
fs.completion.Set(fs.pieceKey(), false)
fs.completion.Set(fs.pieceKey(), false, false)
}

return c
}

func (fs *filePieceImpl) MarkComplete() error {
return fs.completion.Set(fs.pieceKey(), true)
func (fs *filePieceImpl) MarkComplete(awaitFlush bool) error {
return fs.completion.Set(fs.pieceKey(), true, awaitFlush)
}

func (fs *filePieceImpl) MarkNotComplete() error {
return fs.completion.Set(fs.pieceKey(), false)
return fs.completion.Set(fs.pieceKey(), false, false)
}

func (fs *filePieceImpl) IsNew() bool {
return fs.created
}
4 changes: 4 additions & 0 deletions storage/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (fs fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash
dir := fs.opts.TorrentDirMaker(fs.opts.ClientBaseDir, info, infoHash)
upvertedFiles := info.UpvertedFiles()
files := make([]file, 0, len(upvertedFiles))
exists := true
for i, fileInfo := range upvertedFiles {
filePath := filepath.Join(dir, fs.opts.FilePathMaker(FilePathMakerOpts{
Info: info,
Expand All @@ -73,6 +74,7 @@ func (fs fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash
length: fileInfo.Length,
}
if f.length == 0 {
exists = false
err = CreateNativeZeroLengthFile(f.path)
if err != nil {
err = fmt.Errorf("creating zero length file: %w", err)
Expand All @@ -83,6 +85,7 @@ func (fs fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash
}
t := &fileTorrentImpl{
files,
!exists,
segments.NewIndex(common.LengthIterFromUpvertedFiles(upvertedFiles)),
infoHash,
fs.opts.PieceCompletion,
Expand All @@ -101,6 +104,7 @@ type file struct {

type fileTorrentImpl struct {
files []file
created bool
segmentLocater segments.Index
infoHash metainfo.Hash
completion PieceCompletion
Expand Down
3 changes: 2 additions & 1 deletion storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ type PieceImpl interface {
// Called when the client believes the piece data will pass a hash check.
// The storage can move or mark the piece data as read-only as it sees
// fit.
MarkComplete() error
MarkComplete(awaitFlush bool) error
MarkNotComplete() error
// Returns true if the piece is complete.
Completion() Completion
IsNew() bool
}

type Completion struct {
Expand Down
2 changes: 1 addition & 1 deletion storage/issue96_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func testMarkedCompleteMissingOnRead(t *testing.T, csf func(string) ClientImplCl
ts, err := cs.OpenTorrent(info, metainfo.Hash{})
require.NoError(t, err)
p := ts.Piece(info.Piece(0))
require.NoError(t, p.MarkComplete())
require.NoError(t, p.MarkComplete(false))
// require.False(t, p.GetIsComplete())
n, err := p.ReadAt(make([]byte, 1), 0)
require.Error(t, err)
Expand Down
2 changes: 1 addition & 1 deletion storage/map-piece-completion.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (me *mapPieceCompletion) Get(pk metainfo.PieceKey) (c Completion, err error
return
}

func (me *mapPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
func (me *mapPieceCompletion) Set(pk metainfo.PieceKey, b bool, awaitFlush bool) error {
me.m.Store(pk, b)
return nil
}
46 changes: 35 additions & 11 deletions storage/mmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,10 @@ func (ts *mmapTorrentStorage) Flush() error {
}

type mmapStoragePiece struct {
pc PieceCompletionGetSetter
p metainfo.Piece
ih metainfo.Hash
pc PieceCompletionGetSetter
p metainfo.Piece
ih metainfo.Hash
created bool
io.ReaderAt
io.WriterAt
}
Expand All @@ -118,16 +119,20 @@ func (sp mmapStoragePiece) Completion() Completion {
return c
}

func (sp mmapStoragePiece) MarkComplete() error {
sp.pc.Set(sp.pieceKey(), true)
func (sp mmapStoragePiece) MarkComplete(awaitFlush bool) error {
sp.pc.Set(sp.pieceKey(), true, awaitFlush)
return nil
}

func (sp mmapStoragePiece) MarkNotComplete() error {
sp.pc.Set(sp.pieceKey(), false)
sp.pc.Set(sp.pieceKey(), false, false)
return nil
}

func (sp mmapStoragePiece) IsNew() bool {
return sp.created
}

func mMapTorrent(md *metainfo.Info, infoHash metainfo.Hash, location string, flushTime time.Duration, flushedCallback mmap_span.FlushedCallback) (mms *mmap_span.MMapSpan, err error) {
mms = &mmap_span.MMapSpan{
InfoHash: infoHash,
Expand All @@ -140,6 +145,7 @@ func mMapTorrent(md *metainfo.Info, infoHash metainfo.Hash, location string, flu
mms.Close()
}
}()

for _, miFile := range md.UpvertedFiles() {
var safeName string
safeName, err = ToSafeFilePath(append([]string{md.Name}, miFile.Path...)...)
Expand All @@ -148,24 +154,33 @@ func mMapTorrent(md *metainfo.Info, infoHash metainfo.Hash, location string, flu
}
fileName := filepath.Join(location, safeName)
var mm FileMapping
mm, err = mmapFile(fileName, miFile.Length)
var exists bool
mm, exists, err = mmapFile(fileName, miFile.Length)
if err != nil {
err = fmt.Errorf("file %q: %s", miFile.DisplayPath(md), err)
return
}
mms.Append(mm)
if !exists {
mms.Created = true
}
}
mms.InitIndex()
return
}

func mmapFile(name string, size int64) (_ FileMapping, err error) {
func mmapFile(name string, size int64) (_ FileMapping, exists bool, err error) {
dir := filepath.Dir(name)
err = os.MkdirAll(dir, 0o750)
if err != nil {
err = fmt.Errorf("making directory %q: %s", dir, err)
return
}

_, err = os.Stat(name)

exists = err == nil || !errors.Is(err, os.ErrNotExist)

var file *os.File
file, err = os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0o666)
if err != nil {
Expand All @@ -188,9 +203,13 @@ func mmapFile(name string, size int64) (_ FileMapping, err error) {
if err != nil {
return
}
// if the file needs to be resized assume we're creating it
exists = false
}
return func() (ret mmapWithFile, err error) {
return func() (ret mmapWithFile, ex bool, err error) {
ret.f = file
ex = exists

if size == 0 {
// Can't mmap() regions with length 0.
return
Expand Down Expand Up @@ -226,14 +245,19 @@ type FileMapping = mmap_span.Mmap
// Handles closing the mmap's file handle (needed for Windows). Could be implemented differently by
// OS.
type mmapWithFile struct {
f *os.File
mmap mmap.MMap
f *os.File
created bool
mmap mmap.MMap
}

func (m mmapWithFile) Flush() error {
return m.mmap.Flush()
}

func (m mmapWithFile) IsLocal() bool {
return !m.created
}

func (m mmapWithFile) Unmap() (err error) {
if m.mmap != nil {
err = m.mmap.Unmap()
Expand Down
2 changes: 1 addition & 1 deletion storage/piece-completion.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

type PieceCompletionGetSetter interface {
Get(metainfo.PieceKey) (Completion, error)
Set(_ metainfo.PieceKey, complete bool) error
Set(_ metainfo.PieceKey, complete bool, awaitFlush bool) error
}

// Implementations track the completion of pieces. It must be concurrent-safe.
Expand Down
6 changes: 5 additions & 1 deletion storage/piece-resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,15 @@ func (s piecePerResourcePiece) Completion() Completion {
}
}

func (s piecePerResourcePiece) IsNew() bool {
return false
}

type SizedPutter interface {
PutSized(io.Reader, int64) error
}

func (s piecePerResourcePiece) MarkComplete() error {
func (s piecePerResourcePiece) MarkComplete(awaitFlush bool) error {
s.mu.Lock()
defer s.mu.Unlock()
incompleteChunks := s.getChunks()
Expand Down
Loading

0 comments on commit fd84feb

Please sign in to comment.