From d729cd1c90b14a6a07bfef9630ca3c7ec7c06491 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Mon, 12 Feb 2024 12:19:54 -0500 Subject: [PATCH 1/4] wip --- compaction.go | 49 +++++++++++++++++++---- data_test.go | 2 +- flushable.go | 106 ++++++++++++++++++++++++++++++++++++++++++++++++++ open.go | 2 +- 4 files changed, 150 insertions(+), 9 deletions(-) diff --git a/compaction.go b/compaction.go index 7ca2bc8bac..536d7b2c1c 100644 --- a/compaction.go +++ b/compaction.go @@ -659,6 +659,33 @@ type compaction struct { pickerMetrics compactionPickerMetrics } +// objectCreator provides the subset of the objstorage.Provider interface +// necessary for compactions and flushes. It's typically satisfied by +// d.objProvider but may be satisfied by bufferedSSTables during flushes. +type objectCreator interface { + // Create creates a new object and opens it for writing. + // + // The object is not guaranteed to be durable (accessible in case of crashes) + // until Sync is called. + Create( + ctx context.Context, + fileType base.FileType, + FileNum base.DiskFileNum, + opts objstorage.CreateOptions, + ) (w objstorage.Writable, meta objstorage.ObjectMetadata, err error) + // Path returns an internal, implementation-dependent path for the object. It is + // meant to be used for informational purposes (like logging). + Path(meta objstorage.ObjectMetadata) string + // Remove removes an object. + // + // The object is not guaranteed to be durably removed until Sync is called. + Remove(fileType base.FileType, FileNum base.DiskFileNum) error + // Sync flushes the metadata from creation or removal of objects since the + // last Sync. This includes objects that have been Created but for which + // Writable.Finish() has not yet been called. + Sync() error +} + func (c *compaction) makeInfo(jobID int) CompactionInfo { info := CompactionInfo{ JobID: jobID, @@ -1917,6 +1944,9 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { }) startTime := d.timeNow() + // Compactions always write directly to the database's object provider. + // Flushes may write to an in-memory object provider first. + var objCreator objectCreator = d.objProvider var ve *manifest.VersionEdit var pendingOutputs []physicalMeta var stats compactStats @@ -1927,9 +1957,13 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { // runCompaction. For all other flush cases, we construct the VersionEdit // inside runCompaction. if c.kind != compactionKindIngestedFlushable { - ve, pendingOutputs, stats, err = d.runCompaction(jobID, c) + ve, pendingOutputs, stats, err = d.runCompaction(jobID, c, objCreator) } + // TODO(aadityas,jackson): If the buffered output sstables are too small, + // avoid linking them into the version and just update the flushable queue + // appropriately. + // Acquire logLock. This will be released either on an error, by way of // logUnlock, or through a call to logAndApply if there is no error. d.mu.versions.logLock() @@ -2634,7 +2668,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) { d.opts.EventListener.CompactionBegin(info) startTime := d.timeNow() - ve, pendingOutputs, stats, err := d.runCompaction(jobID, c) + ve, pendingOutputs, stats, err := d.runCompaction(jobID, c, d.objProvider) info.Duration = d.timeNow().Sub(startTime) if err == nil { @@ -2800,7 +2834,7 @@ func (d *DB) runCopyCompaction( // d.mu must be held when calling this, but the mutex may be dropped and // re-acquired during the course of this method. func (d *DB) runCompaction( - jobID int, c *compaction, + jobID int, c *compaction, objCreator objectCreator, ) (ve *versionEdit, pendingOutputs []physicalMeta, stats compactStats, retErr error) { // As a sanity check, confirm that the smallest / largest keys for new and // deleted files in the new versionEdit pass a validation function before @@ -2968,7 +3002,7 @@ func (d *DB) runCompaction( } if retErr != nil { for _, fileNum := range createdFiles { - _ = d.objProvider.Remove(fileTypeTable, fileNum) + _ = objCreator.Remove(fileTypeTable, fileNum) } } for _, closer := range c.closers { @@ -3063,7 +3097,8 @@ func (d *DB) runCompaction( PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level), } diskFileNum := base.PhysicalTableDiskFileNum(fileNum) - writable, objMeta, err := d.objProvider.Create(ctx, fileTypeTable, diskFileNum, createOpts) + + writable, objMeta, err := objCreator.Create(ctx, fileTypeTable, diskFileNum, createOpts) if err != nil { return err } @@ -3075,7 +3110,7 @@ func (d *DB) runCompaction( d.opts.EventListener.TableCreated(TableCreateInfo{ JobID: jobID, Reason: reason, - Path: d.objProvider.Path(objMeta), + Path: objCreator.Path(objMeta), FileNum: diskFileNum, }) if c.kind != compactionKindFlush { @@ -3516,7 +3551,7 @@ func (d *DB) runCompaction( // compactStats. stats.countMissizedDels = iter.stats.countMissizedDels - if err := d.objProvider.Sync(); err != nil { + if err := objCreator.Sync(); err != nil { return nil, pendingOutputs, stats, err } diff --git a/data_test.go b/data_test.go index 2dd2cf686e..de5ff20fd8 100644 --- a/data_test.go +++ b/data_test.go @@ -869,7 +869,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) { // to the user-defined boundaries. c.maxOutputFileSize = math.MaxUint64 - newVE, _, _, err := d.runCompaction(0, c) + newVE, _, _, err := d.runCompaction(0, c, d.objProvider) if err != nil { return err } diff --git a/flushable.go b/flushable.go index 630191f737..b0c768b7f3 100644 --- a/flushable.go +++ b/flushable.go @@ -15,6 +15,8 @@ import ( "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl" "github.com/cockroachdb/pebble/internal/manifest" + "github.com/cockroachdb/pebble/objstorage" + "github.com/cockroachdb/pebble/sstable" ) // flushable defines the interface for immutable memtables. @@ -78,6 +80,10 @@ type flushableEntry struct { delayedFlushForcedAt time.Time // logNum corresponds to the WAL that contains the records present in the // receiver. + // + // TODO(aadityas,jackson): We'll need to do something about this (and + // logSize) for entries corresponding to bufferedSSTables since there may be + // multiple associated log nums. logNum base.DiskFileNum // logSize is the size in bytes of the associated WAL. Protected by DB.mu. logSize uint64 @@ -313,6 +319,106 @@ func (s *ingestedFlushable) computePossibleOverlaps( } } +// bufferedSSTables holds a set of in-memory sstables produced by a flush. +// Buffering flushed state reduces write amplification by making it more likely +// that we're able to drop KVs before they reach disk. +type bufferedSSTables struct { + metas []*fileMetadata + readers []*sstable.Reader +} + +var ( + // Assert that *bufferedSSTables implements the flushable interface. + _ flushable = (*bufferedSSTables)(nil) + // Assert that *bufferedSSTables implements the objectCreator interface. + _ objectCreator = (*bufferedSSTables)(nil) +) + +// newIter is part of the flushable interface. +func (b *bufferedSSTables) newIter(o *IterOptions) internalIterator { + panic("TODO") +} + +// newFlushIter is part of the flushable interface. +func (b *bufferedSSTables) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator { + panic("TODO") +} + +func (b *bufferedSSTables) constructRangeDelIter( + file *manifest.FileMetadata, _ keyspan.SpanIterOptions, +) (keyspan.FragmentIterator, error) { + panic("TODO") +} + +// newRangeDelIter is part of the flushable interface. +// +// TODO(sumeer): *IterOptions are being ignored, so the index block load for +// the point iterator in constructRangeDeIter is not tracked. +func (b *bufferedSSTables) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator { + panic("TODO") +} + +// newRangeKeyIter is part of the flushable interface. +func (b *bufferedSSTables) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator { + if !b.containsRangeKeys() { + return nil + } + panic("TODO") +} + +// containsRangeKeys is part of the flushable interface. +func (b *bufferedSSTables) containsRangeKeys() bool { + panic("TODO") +} + +// inuseBytes is part of the flushable interface. +func (b *bufferedSSTables) inuseBytes() uint64 { + panic("TODO") +} + +// totalBytes is part of the flushable interface. +func (b *bufferedSSTables) totalBytes() uint64 { + panic("TODO") +} + +// readyForFlush is part of the flushable interface. +func (b *bufferedSSTables) readyForFlush() bool { + // Buffered sstables are always ready for flush; they're immutable. + return true +} + +// computePossibleOverlaps is part of the flushable interface. +func (b *bufferedSSTables) computePossibleOverlaps( + fn func(bounded) shouldContinue, bounded ...bounded, +) { + panic("TODO") +} + +// Create implements the objectCreator interface. +func (b *bufferedSSTables) Create( + ctx context.Context, + fileType base.FileType, + FileNum base.DiskFileNum, + opts objstorage.CreateOptions, +) (w objstorage.Writable, meta objstorage.ObjectMetadata, err error) { + panic("TODO") +} + +// Path implements the objectCreator interface. +func (b *bufferedSSTables) Path(meta objstorage.ObjectMetadata) string { + panic("TODO") +} + +// Remove implements the objectCreator interface. +func (b *bufferedSSTables) Remove(fileType base.FileType, FileNum base.DiskFileNum) error { + panic("TODO") +} + +// Sync implements the objectCreator interface. +func (b *bufferedSSTables) Sync() error { + panic("TODO") +} + // computePossibleOverlapsGenericImpl is an implemention of the flushable // interface's computePossibleOverlaps function for flushable implementations // with only in-memory state that do not have special requirements and should diff --git a/open.go b/open.go index 8f027562bc..b17acecaaf 100644 --- a/open.go +++ b/open.go @@ -803,7 +803,7 @@ func (d *DB) replayWAL( if err != nil { return err } - newVE, _, _, err := d.runCompaction(jobID, c) + newVE, _, _, err := d.runCompaction(jobID, c, d.objProvider) if err != nil { return errors.Wrapf(err, "running compaction during WAL replay") } From 0c880500e05735e3d2efe5b76d2a282de9851544 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Mon, 12 Feb 2024 15:08:53 -0500 Subject: [PATCH 2/4] wip --- flushable.go | 199 +++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 176 insertions(+), 23 deletions(-) diff --git a/flushable.go b/flushable.go index b0c768b7f3..466a0008d9 100644 --- a/flushable.go +++ b/flushable.go @@ -5,12 +5,15 @@ package pebble import ( + "bytes" "context" "fmt" "io" + "slices" "sync/atomic" "time" + "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl" @@ -319,32 +322,99 @@ func (s *ingestedFlushable) computePossibleOverlaps( } } -// bufferedSSTables holds a set of in-memory sstables produced by a flush. -// Buffering flushed state reduces write amplification by making it more likely -// that we're able to drop KVs before they reach disk. -type bufferedSSTables struct { - metas []*fileMetadata - readers []*sstable.Reader +// flushableBufferedSSTables holds a set of in-memory sstables produced by a +// flush. Buffering flushed state reduces write amplification by making it more +// likely that we're able to drop KVs before they reach disk. +type flushableBufferedSSTables struct { + comparer *base.Comparer + ls manifest.LevelSlice + metas []*fileMetadata + readers []*sstable.Reader } var ( - // Assert that *bufferedSSTables implements the flushable interface. - _ flushable = (*bufferedSSTables)(nil) - // Assert that *bufferedSSTables implements the objectCreator interface. - _ objectCreator = (*bufferedSSTables)(nil) + // Assert that *flushableBufferedSSTables implements the flushable + // interface. + _ flushable = (*flushableBufferedSSTables)(nil) ) +// newIters implements the tableNewIters function signature. Ordinarily this +// function is provided by the table cache. Flushable buffered sstables are not +// opened through the table cache since they're not present on the real +// filesystem and do not require use of file descriptors. Instead, the +// flushableBufferedSSTables keeps sstable.Readers for all the buffered sstables +// open, and this newIters func uses them to construct iterators. +func (b *flushableBufferedSSTables) newIters( + ctx context.Context, + file *manifest.FileMetadata, + opts *IterOptions, + internalOpts internalIterOpts, + kinds iterKinds, +) (iterSet, error) { + var r *sstable.Reader + for i := range b.metas { + if b.metas[i].FileNum == file.FileNum { + r = b.readers[i] + break + } + } + if r == nil { + return iterSet{}, errors.Newf("file %s not found among flushable buffered sstables", file.FileNum) + } + var iters iterSet + var err error + if kinds.RangeKey() && file.HasRangeKeys { + iters.rangeKey, err = r.NewRawRangeKeyIter() + } + if kinds.RangeDeletion() && file.HasPointKeys && err == nil { + iters.rangeDeletion, err = r.NewRawRangeDelIter() + } + if kinds.Point() && err == nil { + panic("TODO") + //var categoryAndQoS sstable.CategoryAndQoS + //var iter internalIterator + //if internalOpts.bytesIterated != nil { + //iter, err = r.NewCompactionIter( + //internalOpts.bytesIterated, categoryAndQoS, nil [> statsCollector <], rp, + //internalOpts.bufferPool) + //} else { + //iter, err = cr.NewIterWithBlockPropertyFiltersAndContextEtc( + //ctx, opts.GetLowerBound(), opts.GetUpperBound(), + //nil [> filterer */, false /* hideObsoletePoints */, true, /* useFilter <] + //internalOpts.stats, categoryAndQoS, nil [> stats collector <], rp) + //} + } + if err != nil { + iters.CloseAll() + return iterSet{}, err + } + return iters, nil +} + // newIter is part of the flushable interface. -func (b *bufferedSSTables) newIter(o *IterOptions) internalIterator { - panic("TODO") +func (b *flushableBufferedSSTables) newIter(o *IterOptions) internalIterator { + var opts IterOptions + if o != nil { + opts = *o + } + // TODO(jackson): The manifest.Level in newLevelIter is only used for + // logging. Update the manifest.Level encoding to account for levels which + // aren't truly levels in the lsm. Right now, the encoding only supports + // L0 sublevels, and the rest of the levels in the lsm. + return newLevelIter( + context.Background(), opts, b.comparer, b.newIters, b.ls.Iter(), manifest.Level(0), + internalIterOpts{}, + ) } // newFlushIter is part of the flushable interface. -func (b *bufferedSSTables) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator { +func (b *flushableBufferedSSTables) newFlushIter( + o *IterOptions, bytesFlushed *uint64, +) internalIterator { panic("TODO") } -func (b *bufferedSSTables) constructRangeDelIter( +func (b *flushableBufferedSSTables) constructRangeDelIter( file *manifest.FileMetadata, _ keyspan.SpanIterOptions, ) (keyspan.FragmentIterator, error) { panic("TODO") @@ -354,12 +424,12 @@ func (b *bufferedSSTables) constructRangeDelIter( // // TODO(sumeer): *IterOptions are being ignored, so the index block load for // the point iterator in constructRangeDeIter is not tracked. -func (b *bufferedSSTables) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator { +func (b *flushableBufferedSSTables) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator { panic("TODO") } // newRangeKeyIter is part of the flushable interface. -func (b *bufferedSSTables) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator { +func (b *flushableBufferedSSTables) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator { if !b.containsRangeKeys() { return nil } @@ -367,41 +437,90 @@ func (b *bufferedSSTables) newRangeKeyIter(o *IterOptions) keyspan.FragmentItera } // containsRangeKeys is part of the flushable interface. -func (b *bufferedSSTables) containsRangeKeys() bool { +func (b *flushableBufferedSSTables) containsRangeKeys() bool { panic("TODO") } // inuseBytes is part of the flushable interface. -func (b *bufferedSSTables) inuseBytes() uint64 { +func (b *flushableBufferedSSTables) inuseBytes() uint64 { panic("TODO") } // totalBytes is part of the flushable interface. -func (b *bufferedSSTables) totalBytes() uint64 { +func (b *flushableBufferedSSTables) totalBytes() uint64 { panic("TODO") } // readyForFlush is part of the flushable interface. -func (b *bufferedSSTables) readyForFlush() bool { +func (b *flushableBufferedSSTables) readyForFlush() bool { // Buffered sstables are always ready for flush; they're immutable. return true } // computePossibleOverlaps is part of the flushable interface. -func (b *bufferedSSTables) computePossibleOverlaps( +func (b *flushableBufferedSSTables) computePossibleOverlaps( fn func(bounded) shouldContinue, bounded ...bounded, ) { panic("TODO") } +// bufferedSSTables implements the objectCreator interface and is used by a +// flush to buffer sstables into memory. When the flush is complete, the +// buffered sstables are either flushed to durable storage or moved into a +// flushableBufferedSSTables that's linked into the flushable queue. +// +// The bufferedSSTables implementation of objectCreator requires that only one +// created object may be open at a time. If violated, Create will panic. +type bufferedSSTables struct { + // curr is a byte buffer used to accumulate the writes of the current + // sstable when *bufferedSSTables is used as a writable. + curr bytes.Buffer + // currFileNum holds the file number assigned to the sstable being + // constructed in curr. + currFileNum base.DiskFileNum + // finished holds the set of previously written and finished sstables. + finished []bufferedSSTable + // objectIsOpen is true if the bufferedSSTables is currently being used as a + // Writable. + objectIsOpen bool +} + +// A bufferedSSTable holds a single, serialized sstable and a corresponding file +// number. +type bufferedSSTable struct { + fileNum base.DiskFileNum + buf []byte +} + +// init initializes the bufferedSSTables. +func (b *bufferedSSTables) init(targetFileSize int) { + b.curr.Grow(targetFileSize) +} + +// Assert that *bufferedSSTables implements the objectCreator interface. +var _ objectCreator = (*bufferedSSTables)(nil) + // Create implements the objectCreator interface. func (b *bufferedSSTables) Create( ctx context.Context, fileType base.FileType, - FileNum base.DiskFileNum, + fileNum base.DiskFileNum, opts objstorage.CreateOptions, ) (w objstorage.Writable, meta objstorage.ObjectMetadata, err error) { - panic("TODO") + // The bufferedSSTables implementation depends on only one writable being + // open at a time. The *bufferedSSTables itself is used as the + // implementation of both the objectCreator interface and the + // objstorage.Writable interface. We guard against misuse by verifying that + // there is no object currently open. + if b.objectIsOpen { + panic("bufferedSSTables used with concurrent open files") + } + b.objectIsOpen = true + b.currFileNum = fileNum + return b, objstorage.ObjectMetadata{ + DiskFileNum: fileNum, + FileType: fileType, + }, nil } // Path implements the objectCreator interface. @@ -419,6 +538,40 @@ func (b *bufferedSSTables) Sync() error { panic("TODO") } +// Assert that bufferedSSTables implements objstorage.Writable. +// +// A flush writes files sequentially, so the bufferedSSTables type implements +// Writable directly, serving as the destination for writes across all sstables +// written by the flush. +var _ objstorage.Writable = (*bufferedSSTables)(nil) + +// Finish implements objstorage.Writable. +func (o *bufferedSSTables) Write(p []byte) error { + _, err := o.curr.Write(p) + o.curr.Reset() + return err +} + +// Finish implements objstorage.Writable. +func (o *bufferedSSTables) Finish() error { + if !o.objectIsOpen { + panic("bufferedSSTables.Finish() invoked when no object is open") + } + o.finished = append(o.finished, bufferedSSTable{ + fileNum: o.currFileNum, + buf: slices.Clone(o.curr.Bytes()), + }) + o.curr.Reset() + o.objectIsOpen = false + return nil +} + +// Abort implements objstorage.Writable. +func (o *bufferedSSTables) Abort() { + o.curr.Reset() + o.objectIsOpen = false +} + // computePossibleOverlapsGenericImpl is an implemention of the flushable // interface's computePossibleOverlaps function for flushable implementations // with only in-memory state that do not have special requirements and should From 89ea47df49e6f171a5b1bc3068a841568e0d23a4 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Wed, 14 Feb 2024 15:47:10 -0500 Subject: [PATCH 3/4] wip --- flushable.go | 153 ++++++++++++++---- .../objstorageprovider/bytes_readable.go | 40 +++++ 2 files changed, 162 insertions(+), 31 deletions(-) create mode 100644 objstorage/objstorageprovider/bytes_readable.go diff --git a/flushable.go b/flushable.go index 466a0008d9..50fb9fba21 100644 --- a/flushable.go +++ b/flushable.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl" "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/objstorage" + "github.com/cockroachdb/pebble/objstorage/objstorageprovider" "github.com/cockroachdb/pebble/sstable" ) @@ -259,7 +260,6 @@ func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIter if !s.containsRangeKeys() { return nil } - return keyspanimpl.NewLevelIter( keyspan.SpanIterOptions{}, s.comparer.Compare, s.newRangeKeyIters, s.slice.Iter(), manifest.Level(0), manifest.KeyTypeRange, @@ -322,6 +322,39 @@ func (s *ingestedFlushable) computePossibleOverlaps( } } +func newFlushableBufferedSSTables( + comparer *base.Comparer, + metas []*fileMetadata, + ro sstable.ReaderOptions, + b *bufferedSSTables, + extraReaderOpts ...sstable.ReaderOption, +) (*flushableBufferedSSTables, error) { + if len(metas) != len(b.finished) { + panic(errors.AssertionFailedf("metadata for %d files provided, but buffered %d files", len(metas), len(b.finished))) + } + f := &flushableBufferedSSTables{ + comparer: comparer, + ls: manifest.NewLevelSliceKeySorted(comparer.Compare, metas), + metas: metas, + readers: make([]*sstable.Reader, len(b.finished)), + } + // NB: metas and b.finished are parallel slices. + for i := range b.finished { + if b.finished[i].fileNum != base.PhysicalTableDiskFileNum(metas[i].FileNum) { + panic(errors.AssertionFailedf("file metas and file buffers in mismatched order")) + } + f.anyRangeKeys = f.anyRangeKeys || metas[i].HasRangeKeys + f.size += metas[i].Size + readable := objstorageprovider.BytesReadable(b.finished[i].buf) + var err error + f.readers[i], err = sstable.NewReader(readable, ro, extraReaderOpts...) + if err != nil { + return nil, err + } + } + return f, nil +} + // flushableBufferedSSTables holds a set of in-memory sstables produced by a // flush. Buffering flushed state reduces write amplification by making it more // likely that we're able to drop KVs before they reach disk. @@ -330,6 +363,11 @@ type flushableBufferedSSTables struct { ls manifest.LevelSlice metas []*fileMetadata readers []*sstable.Reader + // size is the size, in bytes, of all the buffered sstables. + size uint64 + // anyRangeKeys indicates whether any of the sstables contain any range + // keys. + anyRangeKeys bool } var ( @@ -370,19 +408,33 @@ func (b *flushableBufferedSSTables) newIters( iters.rangeDeletion, err = r.NewRawRangeDelIter() } if kinds.Point() && err == nil { - panic("TODO") - //var categoryAndQoS sstable.CategoryAndQoS - //var iter internalIterator - //if internalOpts.bytesIterated != nil { - //iter, err = r.NewCompactionIter( - //internalOpts.bytesIterated, categoryAndQoS, nil [> statsCollector <], rp, - //internalOpts.bufferPool) - //} else { - //iter, err = cr.NewIterWithBlockPropertyFiltersAndContextEtc( - //ctx, opts.GetLowerBound(), opts.GetUpperBound(), - //nil [> filterer */, false /* hideObsoletePoints */, true, /* useFilter <] - //internalOpts.stats, categoryAndQoS, nil [> stats collector <], rp) - //} + // TODO(aadityas,jackson): We could support block-property filtering + // within the in-memory sstables, but it's unclear it's worth the code + // complexity. We expect these blocks to be hit frequently, and the cost + // of loading them is less since they're already in main memory. + tableFormat, err := r.TableFormat() + if err != nil { + return iterSet{}, err + } + var rp sstable.ReaderProvider + if tableFormat >= sstable.TableFormatPebblev3 && r.Properties.NumValueBlocks > 0 { + // NB: We can return a fixedReaderProvider because the Readers for + // these in-memory sstables are guaranteed to be open until the + // readState is obsolete which will only occur when all iterators + // have closed. + rp = &fixedReaderProvider{r} + } + var categoryAndQoS sstable.CategoryAndQoS + if internalOpts.bytesIterated != nil { + iters.point, err = r.NewCompactionIter( + internalOpts.bytesIterated, categoryAndQoS, nil /* statsCollector */, rp, + internalOpts.bufferPool) + } else { + iters.point, err = r.NewIterWithBlockPropertyFiltersAndContextEtc( + ctx, opts.GetLowerBound(), opts.GetUpperBound(), + nil /* filterer */, false /* hideObsoletePoints */, true, /* useFilter */ + internalOpts.stats, categoryAndQoS, nil /* stats collector */, rp) + } } if err != nil { iters.CloseAll() @@ -411,21 +463,46 @@ func (b *flushableBufferedSSTables) newIter(o *IterOptions) internalIterator { func (b *flushableBufferedSSTables) newFlushIter( o *IterOptions, bytesFlushed *uint64, ) internalIterator { - panic("TODO") + var opts IterOptions + if o != nil { + opts = *o + } + // TODO(jackson): The manifest.Level in newLevelIter is only used for + // logging. Update the manifest.Level encoding to account for levels which + // aren't truly levels in the lsm. Right now, the encoding only supports + // L0 sublevels, and the rest of the levels in the lsm. + return newLevelIter( + context.Background(), opts, b.comparer, b.newIters, b.ls.Iter(), manifest.Level(0), + internalIterOpts{bytesIterated: bytesFlushed}, + ) } +// constructRangeDelIter implements keyspanimpl.TableNewSpanIter. func (b *flushableBufferedSSTables) constructRangeDelIter( file *manifest.FileMetadata, _ keyspan.SpanIterOptions, ) (keyspan.FragmentIterator, error) { - panic("TODO") + iters, err := b.newIters(context.Background(), file, nil, internalIterOpts{}, iterRangeDeletions) + return iters.RangeDeletion(), err } // newRangeDelIter is part of the flushable interface. -// -// TODO(sumeer): *IterOptions are being ignored, so the index block load for -// the point iterator in constructRangeDeIter is not tracked. func (b *flushableBufferedSSTables) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator { - panic("TODO") + return keyspanimpl.NewLevelIter( + keyspan.SpanIterOptions{}, b.comparer.Compare, + b.constructRangeDelIter, b.ls.Iter(), manifest.Level(0), + manifest.KeyTypePoint, + ) +} + +// constructRangeKeyIter implements keyspanimpl.TableNewSpanIter. +func (b *flushableBufferedSSTables) constructRangeKeyIter( + file *manifest.FileMetadata, _ keyspan.SpanIterOptions, +) (keyspan.FragmentIterator, error) { + iters, err := b.newIters(context.Background(), file, nil, internalIterOpts{}, iterRangeKeys) + if err != nil { + return nil, err + } + return iters.RangeKey(), nil } // newRangeKeyIter is part of the flushable interface. @@ -433,23 +510,20 @@ func (b *flushableBufferedSSTables) newRangeKeyIter(o *IterOptions) keyspan.Frag if !b.containsRangeKeys() { return nil } - panic("TODO") + return keyspanimpl.NewLevelIter( + keyspan.SpanIterOptions{}, b.comparer.Compare, b.constructRangeKeyIter, + b.ls.Iter(), manifest.Level(0), manifest.KeyTypeRange, + ) } // containsRangeKeys is part of the flushable interface. -func (b *flushableBufferedSSTables) containsRangeKeys() bool { - panic("TODO") -} +func (b *flushableBufferedSSTables) containsRangeKeys() bool { return b.anyRangeKeys } // inuseBytes is part of the flushable interface. -func (b *flushableBufferedSSTables) inuseBytes() uint64 { - panic("TODO") -} +func (b *flushableBufferedSSTables) inuseBytes() uint64 { return b.size } // totalBytes is part of the flushable interface. -func (b *flushableBufferedSSTables) totalBytes() uint64 { - panic("TODO") -} +func (b *flushableBufferedSSTables) totalBytes() uint64 { return b.size } // readyForFlush is part of the flushable interface. func (b *flushableBufferedSSTables) readyForFlush() bool { @@ -461,7 +535,7 @@ func (b *flushableBufferedSSTables) readyForFlush() bool { func (b *flushableBufferedSSTables) computePossibleOverlaps( fn func(bounded) shouldContinue, bounded ...bounded, ) { - panic("TODO") + computePossibleOverlapsGenericImpl[*flushableBufferedSSTables](b, b.comparer.Compare, fn, bounded) } // bufferedSSTables implements the objectCreator interface and is used by a @@ -605,3 +679,20 @@ func computePossibleOverlapsGenericImpl[F flushable]( } } } + +type fixedReaderProvider struct { + *sstable.Reader +} + +var _ sstable.ReaderProvider = (*fixedReaderProvider)(nil) + +// GetReader implements sstable.ReaderProvider. +// +// Note that currently the Reader returned here is only used to read value +// blocks. +func (p *fixedReaderProvider) GetReader() (*sstable.Reader, error) { + return p.Reader, nil +} + +// Close implements sstable.ReaderProvider. +func (p *fixedReaderProvider) Close() {} diff --git a/objstorage/objstorageprovider/bytes_readable.go b/objstorage/objstorageprovider/bytes_readable.go new file mode 100644 index 0000000000..d110c73146 --- /dev/null +++ b/objstorage/objstorageprovider/bytes_readable.go @@ -0,0 +1,40 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package objstorageprovider + +import ( + "context" + + "github.com/cockroachdb/pebble/internal/invariants" + "github.com/cockroachdb/pebble/objstorage" +) + +// BytesReadable implements objstorage.Readable for a byte slice. +type BytesReadable []byte + +var _ objstorage.Readable = (BytesReadable)(nil) + +// ReadAt is part of the objstorage.Readable interface. +func (r BytesReadable) ReadAt(_ context.Context, p []byte, off int64) error { + if n := copy(p, r[off:]); invariants.Enabled && n != len(p) { + panic("short read") + } + return nil +} + +// Close is part of the objstorage.Readable interface. +func (r BytesReadable) Close() error { return nil } + +// Size is part of the objstorage.Readable interface. +func (r BytesReadable) Size() int64 { return int64(len(r)) } + +// NewReadHandle is part of the objstorage.Readable interface. +func (r BytesReadable) NewReadHandle(_ context.Context) objstorage.ReadHandle { return r } + +// SetupForCompaction is part of the objstorage.ReadHandle interface. +func (r BytesReadable) SetupForCompaction() {} + +// RecordCacheHit is part of the objstorage.ReadHandle interface. +func (r BytesReadable) RecordCacheHit(ctx context.Context, offset, size int64) {} From d91997a0b9e930c823d64252a82adb233b6c7268 Mon Sep 17 00:00:00 2001 From: Aaditya Sondhi <20070511+aadityasondhi@users.noreply.github.com> Date: Thu, 15 Feb 2024 16:28:40 -0500 Subject: [PATCH 4/4] wip --- compaction.go | 38 +++++++++++++++++++++++++++++++++++++- flushable.go | 9 ++++++--- table_cache.go | 1 + 3 files changed, 44 insertions(+), 4 deletions(-) diff --git a/compaction.go b/compaction.go index 536d7b2c1c..35cdd30068 100644 --- a/compaction.go +++ b/compaction.go @@ -462,6 +462,7 @@ const ( compactionKindRead compactionKindRewrite compactionKindIngestedFlushable + compactionKindBufferedFlush ) func (k compactionKind) String() string { @@ -1874,6 +1875,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { var n, inputs int var inputBytes uint64 var ingest bool + var bufferedFlush = true // TODO(aaditya): loop this into a config setting for ; n < len(d.mu.mem.queue)-1; n++ { if f, ok := d.mu.mem.queue[n].flushable.(*ingestedFlushable); ok { if n == 0 { @@ -1932,6 +1934,10 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { if err != nil { return 0, err } + + if bufferedFlush && !ingest { + c.kind = compactionKindBufferedFlush + } d.addInProgressCompaction(c) jobID := d.mu.nextJobID @@ -1946,7 +1952,16 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { // Compactions always write directly to the database's object provider. // Flushes may write to an in-memory object provider first. - var objCreator objectCreator = d.objProvider + var objCreator objectCreator + if c.kind == compactionKindBufferedFlush { + bufferedSSTs := &bufferedSSTables{} + // TODO(aaditya): pick a better size + bufferedSSTs.init(10) + objCreator = bufferedSSTs + } else { + objCreator = d.objProvider + } + var ve *manifest.VersionEdit var pendingOutputs []physicalMeta var stats compactStats @@ -1963,6 +1978,27 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { // TODO(aadityas,jackson): If the buffered output sstables are too small, // avoid linking them into the version and just update the flushable queue // appropriately. + if c.kind == compactionKindBufferedFlush { + var metas []*fileMetadata + var fileNums []base.DiskFileNum + for _, file := range ve.NewFiles { + metas = append(metas, file.Meta) + fileNums = append(fileNums, file.BackingFileNum) + } + + bufferedSST := objCreator.(*bufferedSSTables) + if bufferedSST.size < d.opts.MemTableSize /* TODO(aaditya): does this make sense? */ { + var f flushable + f, err = newFlushableBufferedSSTables(d.opts.Comparer, metas, sstable.ReaderOptions{}, bufferedSST) + fe := d.newFlushableEntry(f, fileNums[0], 0 /* TODO(aaditya): figure out what to put here */) + remaining := d.mu.mem.queue[n : len(d.mu.mem.queue)-2] + mutable := d.mu.mem.queue[len(d.mu.mem.queue)-1] + d.mu.mem.queue = append(remaining, fe, mutable) + return 0, err + } + + // else convert to objProvider and write to disk + } // Acquire logLock. This will be released either on an error, by way of // logUnlock, or through a call to logAndApply if there is no error. diff --git a/flushable.go b/flushable.go index 50fb9fba21..07faac3b86 100644 --- a/flushable.go +++ b/flushable.go @@ -554,6 +554,8 @@ type bufferedSSTables struct { currFileNum base.DiskFileNum // finished holds the set of previously written and finished sstables. finished []bufferedSSTable + // cumulative size of the finished buffers + size uint64 // objectIsOpen is true if the bufferedSSTables is currently being used as a // Writable. objectIsOpen bool @@ -609,7 +611,8 @@ func (b *bufferedSSTables) Remove(fileType base.FileType, FileNum base.DiskFileN // Sync implements the objectCreator interface. func (b *bufferedSSTables) Sync() error { - panic("TODO") + // BufferedSSTs store their data in memory and do not need to sync. + return nil } // Assert that bufferedSSTables implements objstorage.Writable. @@ -619,10 +622,9 @@ func (b *bufferedSSTables) Sync() error { // written by the flush. var _ objstorage.Writable = (*bufferedSSTables)(nil) -// Finish implements objstorage.Writable. +// Write implements objstorage.Writable. func (o *bufferedSSTables) Write(p []byte) error { _, err := o.curr.Write(p) - o.curr.Reset() return err } @@ -635,6 +637,7 @@ func (o *bufferedSSTables) Finish() error { fileNum: o.currFileNum, buf: slices.Clone(o.curr.Bytes()), }) + o.size += uint64(o.curr.Len()) o.curr.Reset() o.objectIsOpen = false return nil diff --git a/table_cache.go b/table_cache.go index 14f2492e1c..a7f6145256 100644 --- a/table_cache.go +++ b/table_cache.go @@ -1180,6 +1180,7 @@ type loadInfo struct { } func (v *tableCacheValue) load(loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts) { + // TODO(aaditya): Example of creating iter for SST // Try opening the file first. var f objstorage.Readable var err error