diff --git a/compaction.go b/compaction.go index 7ca2bc8bac..35cdd30068 100644 --- a/compaction.go +++ b/compaction.go @@ -462,6 +462,7 @@ const ( compactionKindRead compactionKindRewrite compactionKindIngestedFlushable + compactionKindBufferedFlush ) func (k compactionKind) String() string { @@ -659,6 +660,33 @@ type compaction struct { pickerMetrics compactionPickerMetrics } +// objectCreator provides the subset of the objstorage.Provider interface +// necessary for compactions and flushes. It's typically satisfied by +// d.objProvider but may be satisfied by bufferedSSTables during flushes. +type objectCreator interface { + // Create creates a new object and opens it for writing. + // + // The object is not guaranteed to be durable (accessible in case of crashes) + // until Sync is called. + Create( + ctx context.Context, + fileType base.FileType, + FileNum base.DiskFileNum, + opts objstorage.CreateOptions, + ) (w objstorage.Writable, meta objstorage.ObjectMetadata, err error) + // Path returns an internal, implementation-dependent path for the object. It is + // meant to be used for informational purposes (like logging). + Path(meta objstorage.ObjectMetadata) string + // Remove removes an object. + // + // The object is not guaranteed to be durably removed until Sync is called. + Remove(fileType base.FileType, FileNum base.DiskFileNum) error + // Sync flushes the metadata from creation or removal of objects since the + // last Sync. This includes objects that have been Created but for which + // Writable.Finish() has not yet been called. + Sync() error +} + func (c *compaction) makeInfo(jobID int) CompactionInfo { info := CompactionInfo{ JobID: jobID, @@ -1847,6 +1875,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { var n, inputs int var inputBytes uint64 var ingest bool + var bufferedFlush = true // TODO(aaditya): loop this into a config setting for ; n < len(d.mu.mem.queue)-1; n++ { if f, ok := d.mu.mem.queue[n].flushable.(*ingestedFlushable); ok { if n == 0 { @@ -1905,6 +1934,10 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { if err != nil { return 0, err } + + if bufferedFlush && !ingest { + c.kind = compactionKindBufferedFlush + } d.addInProgressCompaction(c) jobID := d.mu.nextJobID @@ -1917,6 +1950,18 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { }) startTime := d.timeNow() + // Compactions always write directly to the database's object provider. + // Flushes may write to an in-memory object provider first. + var objCreator objectCreator + if c.kind == compactionKindBufferedFlush { + bufferedSSTs := &bufferedSSTables{} + // TODO(aaditya): pick a better size + bufferedSSTs.init(10) + objCreator = bufferedSSTs + } else { + objCreator = d.objProvider + } + var ve *manifest.VersionEdit var pendingOutputs []physicalMeta var stats compactStats @@ -1927,7 +1972,32 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { // runCompaction. For all other flush cases, we construct the VersionEdit // inside runCompaction. if c.kind != compactionKindIngestedFlushable { - ve, pendingOutputs, stats, err = d.runCompaction(jobID, c) + ve, pendingOutputs, stats, err = d.runCompaction(jobID, c, objCreator) + } + + // TODO(aadityas,jackson): If the buffered output sstables are too small, + // avoid linking them into the version and just update the flushable queue + // appropriately. + if c.kind == compactionKindBufferedFlush { + var metas []*fileMetadata + var fileNums []base.DiskFileNum + for _, file := range ve.NewFiles { + metas = append(metas, file.Meta) + fileNums = append(fileNums, file.BackingFileNum) + } + + bufferedSST := objCreator.(*bufferedSSTables) + if bufferedSST.size < d.opts.MemTableSize /* TODO(aaditya): does this make sense? */ { + var f flushable + f, err = newFlushableBufferedSSTables(d.opts.Comparer, metas, sstable.ReaderOptions{}, bufferedSST) + fe := d.newFlushableEntry(f, fileNums[0], 0 /* TODO(aaditya): figure out what to put here */) + remaining := d.mu.mem.queue[n : len(d.mu.mem.queue)-2] + mutable := d.mu.mem.queue[len(d.mu.mem.queue)-1] + d.mu.mem.queue = append(remaining, fe, mutable) + return 0, err + } + + // else convert to objProvider and write to disk } // Acquire logLock. This will be released either on an error, by way of @@ -2634,7 +2704,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) { d.opts.EventListener.CompactionBegin(info) startTime := d.timeNow() - ve, pendingOutputs, stats, err := d.runCompaction(jobID, c) + ve, pendingOutputs, stats, err := d.runCompaction(jobID, c, d.objProvider) info.Duration = d.timeNow().Sub(startTime) if err == nil { @@ -2800,7 +2870,7 @@ func (d *DB) runCopyCompaction( // d.mu must be held when calling this, but the mutex may be dropped and // re-acquired during the course of this method. func (d *DB) runCompaction( - jobID int, c *compaction, + jobID int, c *compaction, objCreator objectCreator, ) (ve *versionEdit, pendingOutputs []physicalMeta, stats compactStats, retErr error) { // As a sanity check, confirm that the smallest / largest keys for new and // deleted files in the new versionEdit pass a validation function before @@ -2968,7 +3038,7 @@ func (d *DB) runCompaction( } if retErr != nil { for _, fileNum := range createdFiles { - _ = d.objProvider.Remove(fileTypeTable, fileNum) + _ = objCreator.Remove(fileTypeTable, fileNum) } } for _, closer := range c.closers { @@ -3063,7 +3133,8 @@ func (d *DB) runCompaction( PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level), } diskFileNum := base.PhysicalTableDiskFileNum(fileNum) - writable, objMeta, err := d.objProvider.Create(ctx, fileTypeTable, diskFileNum, createOpts) + + writable, objMeta, err := objCreator.Create(ctx, fileTypeTable, diskFileNum, createOpts) if err != nil { return err } @@ -3075,7 +3146,7 @@ func (d *DB) runCompaction( d.opts.EventListener.TableCreated(TableCreateInfo{ JobID: jobID, Reason: reason, - Path: d.objProvider.Path(objMeta), + Path: objCreator.Path(objMeta), FileNum: diskFileNum, }) if c.kind != compactionKindFlush { @@ -3516,7 +3587,7 @@ func (d *DB) runCompaction( // compactStats. stats.countMissizedDels = iter.stats.countMissizedDels - if err := d.objProvider.Sync(); err != nil { + if err := objCreator.Sync(); err != nil { return nil, pendingOutputs, stats, err } diff --git a/data_test.go b/data_test.go index 2dd2cf686e..de5ff20fd8 100644 --- a/data_test.go +++ b/data_test.go @@ -869,7 +869,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) { // to the user-defined boundaries. c.maxOutputFileSize = math.MaxUint64 - newVE, _, _, err := d.runCompaction(0, c) + newVE, _, _, err := d.runCompaction(0, c, d.objProvider) if err != nil { return err } diff --git a/flushable.go b/flushable.go index 630191f737..07faac3b86 100644 --- a/flushable.go +++ b/flushable.go @@ -5,16 +5,22 @@ package pebble import ( + "bytes" "context" "fmt" "io" + "slices" "sync/atomic" "time" + "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl" "github.com/cockroachdb/pebble/internal/manifest" + "github.com/cockroachdb/pebble/objstorage" + "github.com/cockroachdb/pebble/objstorage/objstorageprovider" + "github.com/cockroachdb/pebble/sstable" ) // flushable defines the interface for immutable memtables. @@ -78,6 +84,10 @@ type flushableEntry struct { delayedFlushForcedAt time.Time // logNum corresponds to the WAL that contains the records present in the // receiver. + // + // TODO(aadityas,jackson): We'll need to do something about this (and + // logSize) for entries corresponding to bufferedSSTables since there may be + // multiple associated log nums. logNum base.DiskFileNum // logSize is the size in bytes of the associated WAL. Protected by DB.mu. logSize uint64 @@ -250,7 +260,6 @@ func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIter if !s.containsRangeKeys() { return nil } - return keyspanimpl.NewLevelIter( keyspan.SpanIterOptions{}, s.comparer.Compare, s.newRangeKeyIters, s.slice.Iter(), manifest.Level(0), manifest.KeyTypeRange, @@ -313,6 +322,333 @@ func (s *ingestedFlushable) computePossibleOverlaps( } } +func newFlushableBufferedSSTables( + comparer *base.Comparer, + metas []*fileMetadata, + ro sstable.ReaderOptions, + b *bufferedSSTables, + extraReaderOpts ...sstable.ReaderOption, +) (*flushableBufferedSSTables, error) { + if len(metas) != len(b.finished) { + panic(errors.AssertionFailedf("metadata for %d files provided, but buffered %d files", len(metas), len(b.finished))) + } + f := &flushableBufferedSSTables{ + comparer: comparer, + ls: manifest.NewLevelSliceKeySorted(comparer.Compare, metas), + metas: metas, + readers: make([]*sstable.Reader, len(b.finished)), + } + // NB: metas and b.finished are parallel slices. + for i := range b.finished { + if b.finished[i].fileNum != base.PhysicalTableDiskFileNum(metas[i].FileNum) { + panic(errors.AssertionFailedf("file metas and file buffers in mismatched order")) + } + f.anyRangeKeys = f.anyRangeKeys || metas[i].HasRangeKeys + f.size += metas[i].Size + readable := objstorageprovider.BytesReadable(b.finished[i].buf) + var err error + f.readers[i], err = sstable.NewReader(readable, ro, extraReaderOpts...) + if err != nil { + return nil, err + } + } + return f, nil +} + +// flushableBufferedSSTables holds a set of in-memory sstables produced by a +// flush. Buffering flushed state reduces write amplification by making it more +// likely that we're able to drop KVs before they reach disk. +type flushableBufferedSSTables struct { + comparer *base.Comparer + ls manifest.LevelSlice + metas []*fileMetadata + readers []*sstable.Reader + // size is the size, in bytes, of all the buffered sstables. + size uint64 + // anyRangeKeys indicates whether any of the sstables contain any range + // keys. + anyRangeKeys bool +} + +var ( + // Assert that *flushableBufferedSSTables implements the flushable + // interface. + _ flushable = (*flushableBufferedSSTables)(nil) +) + +// newIters implements the tableNewIters function signature. Ordinarily this +// function is provided by the table cache. Flushable buffered sstables are not +// opened through the table cache since they're not present on the real +// filesystem and do not require use of file descriptors. Instead, the +// flushableBufferedSSTables keeps sstable.Readers for all the buffered sstables +// open, and this newIters func uses them to construct iterators. +func (b *flushableBufferedSSTables) newIters( + ctx context.Context, + file *manifest.FileMetadata, + opts *IterOptions, + internalOpts internalIterOpts, + kinds iterKinds, +) (iterSet, error) { + var r *sstable.Reader + for i := range b.metas { + if b.metas[i].FileNum == file.FileNum { + r = b.readers[i] + break + } + } + if r == nil { + return iterSet{}, errors.Newf("file %s not found among flushable buffered sstables", file.FileNum) + } + var iters iterSet + var err error + if kinds.RangeKey() && file.HasRangeKeys { + iters.rangeKey, err = r.NewRawRangeKeyIter() + } + if kinds.RangeDeletion() && file.HasPointKeys && err == nil { + iters.rangeDeletion, err = r.NewRawRangeDelIter() + } + if kinds.Point() && err == nil { + // TODO(aadityas,jackson): We could support block-property filtering + // within the in-memory sstables, but it's unclear it's worth the code + // complexity. We expect these blocks to be hit frequently, and the cost + // of loading them is less since they're already in main memory. + tableFormat, err := r.TableFormat() + if err != nil { + return iterSet{}, err + } + var rp sstable.ReaderProvider + if tableFormat >= sstable.TableFormatPebblev3 && r.Properties.NumValueBlocks > 0 { + // NB: We can return a fixedReaderProvider because the Readers for + // these in-memory sstables are guaranteed to be open until the + // readState is obsolete which will only occur when all iterators + // have closed. + rp = &fixedReaderProvider{r} + } + var categoryAndQoS sstable.CategoryAndQoS + if internalOpts.bytesIterated != nil { + iters.point, err = r.NewCompactionIter( + internalOpts.bytesIterated, categoryAndQoS, nil /* statsCollector */, rp, + internalOpts.bufferPool) + } else { + iters.point, err = r.NewIterWithBlockPropertyFiltersAndContextEtc( + ctx, opts.GetLowerBound(), opts.GetUpperBound(), + nil /* filterer */, false /* hideObsoletePoints */, true, /* useFilter */ + internalOpts.stats, categoryAndQoS, nil /* stats collector */, rp) + } + } + if err != nil { + iters.CloseAll() + return iterSet{}, err + } + return iters, nil +} + +// newIter is part of the flushable interface. +func (b *flushableBufferedSSTables) newIter(o *IterOptions) internalIterator { + var opts IterOptions + if o != nil { + opts = *o + } + // TODO(jackson): The manifest.Level in newLevelIter is only used for + // logging. Update the manifest.Level encoding to account for levels which + // aren't truly levels in the lsm. Right now, the encoding only supports + // L0 sublevels, and the rest of the levels in the lsm. + return newLevelIter( + context.Background(), opts, b.comparer, b.newIters, b.ls.Iter(), manifest.Level(0), + internalIterOpts{}, + ) +} + +// newFlushIter is part of the flushable interface. +func (b *flushableBufferedSSTables) newFlushIter( + o *IterOptions, bytesFlushed *uint64, +) internalIterator { + var opts IterOptions + if o != nil { + opts = *o + } + // TODO(jackson): The manifest.Level in newLevelIter is only used for + // logging. Update the manifest.Level encoding to account for levels which + // aren't truly levels in the lsm. Right now, the encoding only supports + // L0 sublevels, and the rest of the levels in the lsm. + return newLevelIter( + context.Background(), opts, b.comparer, b.newIters, b.ls.Iter(), manifest.Level(0), + internalIterOpts{bytesIterated: bytesFlushed}, + ) +} + +// constructRangeDelIter implements keyspanimpl.TableNewSpanIter. +func (b *flushableBufferedSSTables) constructRangeDelIter( + file *manifest.FileMetadata, _ keyspan.SpanIterOptions, +) (keyspan.FragmentIterator, error) { + iters, err := b.newIters(context.Background(), file, nil, internalIterOpts{}, iterRangeDeletions) + return iters.RangeDeletion(), err +} + +// newRangeDelIter is part of the flushable interface. +func (b *flushableBufferedSSTables) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator { + return keyspanimpl.NewLevelIter( + keyspan.SpanIterOptions{}, b.comparer.Compare, + b.constructRangeDelIter, b.ls.Iter(), manifest.Level(0), + manifest.KeyTypePoint, + ) +} + +// constructRangeKeyIter implements keyspanimpl.TableNewSpanIter. +func (b *flushableBufferedSSTables) constructRangeKeyIter( + file *manifest.FileMetadata, _ keyspan.SpanIterOptions, +) (keyspan.FragmentIterator, error) { + iters, err := b.newIters(context.Background(), file, nil, internalIterOpts{}, iterRangeKeys) + if err != nil { + return nil, err + } + return iters.RangeKey(), nil +} + +// newRangeKeyIter is part of the flushable interface. +func (b *flushableBufferedSSTables) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator { + if !b.containsRangeKeys() { + return nil + } + return keyspanimpl.NewLevelIter( + keyspan.SpanIterOptions{}, b.comparer.Compare, b.constructRangeKeyIter, + b.ls.Iter(), manifest.Level(0), manifest.KeyTypeRange, + ) +} + +// containsRangeKeys is part of the flushable interface. +func (b *flushableBufferedSSTables) containsRangeKeys() bool { return b.anyRangeKeys } + +// inuseBytes is part of the flushable interface. +func (b *flushableBufferedSSTables) inuseBytes() uint64 { return b.size } + +// totalBytes is part of the flushable interface. +func (b *flushableBufferedSSTables) totalBytes() uint64 { return b.size } + +// readyForFlush is part of the flushable interface. +func (b *flushableBufferedSSTables) readyForFlush() bool { + // Buffered sstables are always ready for flush; they're immutable. + return true +} + +// computePossibleOverlaps is part of the flushable interface. +func (b *flushableBufferedSSTables) computePossibleOverlaps( + fn func(bounded) shouldContinue, bounded ...bounded, +) { + computePossibleOverlapsGenericImpl[*flushableBufferedSSTables](b, b.comparer.Compare, fn, bounded) +} + +// bufferedSSTables implements the objectCreator interface and is used by a +// flush to buffer sstables into memory. When the flush is complete, the +// buffered sstables are either flushed to durable storage or moved into a +// flushableBufferedSSTables that's linked into the flushable queue. +// +// The bufferedSSTables implementation of objectCreator requires that only one +// created object may be open at a time. If violated, Create will panic. +type bufferedSSTables struct { + // curr is a byte buffer used to accumulate the writes of the current + // sstable when *bufferedSSTables is used as a writable. + curr bytes.Buffer + // currFileNum holds the file number assigned to the sstable being + // constructed in curr. + currFileNum base.DiskFileNum + // finished holds the set of previously written and finished sstables. + finished []bufferedSSTable + // cumulative size of the finished buffers + size uint64 + // objectIsOpen is true if the bufferedSSTables is currently being used as a + // Writable. + objectIsOpen bool +} + +// A bufferedSSTable holds a single, serialized sstable and a corresponding file +// number. +type bufferedSSTable struct { + fileNum base.DiskFileNum + buf []byte +} + +// init initializes the bufferedSSTables. +func (b *bufferedSSTables) init(targetFileSize int) { + b.curr.Grow(targetFileSize) +} + +// Assert that *bufferedSSTables implements the objectCreator interface. +var _ objectCreator = (*bufferedSSTables)(nil) + +// Create implements the objectCreator interface. +func (b *bufferedSSTables) Create( + ctx context.Context, + fileType base.FileType, + fileNum base.DiskFileNum, + opts objstorage.CreateOptions, +) (w objstorage.Writable, meta objstorage.ObjectMetadata, err error) { + // The bufferedSSTables implementation depends on only one writable being + // open at a time. The *bufferedSSTables itself is used as the + // implementation of both the objectCreator interface and the + // objstorage.Writable interface. We guard against misuse by verifying that + // there is no object currently open. + if b.objectIsOpen { + panic("bufferedSSTables used with concurrent open files") + } + b.objectIsOpen = true + b.currFileNum = fileNum + return b, objstorage.ObjectMetadata{ + DiskFileNum: fileNum, + FileType: fileType, + }, nil +} + +// Path implements the objectCreator interface. +func (b *bufferedSSTables) Path(meta objstorage.ObjectMetadata) string { + panic("TODO") +} + +// Remove implements the objectCreator interface. +func (b *bufferedSSTables) Remove(fileType base.FileType, FileNum base.DiskFileNum) error { + panic("TODO") +} + +// Sync implements the objectCreator interface. +func (b *bufferedSSTables) Sync() error { + // BufferedSSTs store their data in memory and do not need to sync. + return nil +} + +// Assert that bufferedSSTables implements objstorage.Writable. +// +// A flush writes files sequentially, so the bufferedSSTables type implements +// Writable directly, serving as the destination for writes across all sstables +// written by the flush. +var _ objstorage.Writable = (*bufferedSSTables)(nil) + +// Write implements objstorage.Writable. +func (o *bufferedSSTables) Write(p []byte) error { + _, err := o.curr.Write(p) + return err +} + +// Finish implements objstorage.Writable. +func (o *bufferedSSTables) Finish() error { + if !o.objectIsOpen { + panic("bufferedSSTables.Finish() invoked when no object is open") + } + o.finished = append(o.finished, bufferedSSTable{ + fileNum: o.currFileNum, + buf: slices.Clone(o.curr.Bytes()), + }) + o.size += uint64(o.curr.Len()) + o.curr.Reset() + o.objectIsOpen = false + return nil +} + +// Abort implements objstorage.Writable. +func (o *bufferedSSTables) Abort() { + o.curr.Reset() + o.objectIsOpen = false +} + // computePossibleOverlapsGenericImpl is an implemention of the flushable // interface's computePossibleOverlaps function for flushable implementations // with only in-memory state that do not have special requirements and should @@ -346,3 +682,20 @@ func computePossibleOverlapsGenericImpl[F flushable]( } } } + +type fixedReaderProvider struct { + *sstable.Reader +} + +var _ sstable.ReaderProvider = (*fixedReaderProvider)(nil) + +// GetReader implements sstable.ReaderProvider. +// +// Note that currently the Reader returned here is only used to read value +// blocks. +func (p *fixedReaderProvider) GetReader() (*sstable.Reader, error) { + return p.Reader, nil +} + +// Close implements sstable.ReaderProvider. +func (p *fixedReaderProvider) Close() {} diff --git a/objstorage/objstorageprovider/bytes_readable.go b/objstorage/objstorageprovider/bytes_readable.go new file mode 100644 index 0000000000..d110c73146 --- /dev/null +++ b/objstorage/objstorageprovider/bytes_readable.go @@ -0,0 +1,40 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package objstorageprovider + +import ( + "context" + + "github.com/cockroachdb/pebble/internal/invariants" + "github.com/cockroachdb/pebble/objstorage" +) + +// BytesReadable implements objstorage.Readable for a byte slice. +type BytesReadable []byte + +var _ objstorage.Readable = (BytesReadable)(nil) + +// ReadAt is part of the objstorage.Readable interface. +func (r BytesReadable) ReadAt(_ context.Context, p []byte, off int64) error { + if n := copy(p, r[off:]); invariants.Enabled && n != len(p) { + panic("short read") + } + return nil +} + +// Close is part of the objstorage.Readable interface. +func (r BytesReadable) Close() error { return nil } + +// Size is part of the objstorage.Readable interface. +func (r BytesReadable) Size() int64 { return int64(len(r)) } + +// NewReadHandle is part of the objstorage.Readable interface. +func (r BytesReadable) NewReadHandle(_ context.Context) objstorage.ReadHandle { return r } + +// SetupForCompaction is part of the objstorage.ReadHandle interface. +func (r BytesReadable) SetupForCompaction() {} + +// RecordCacheHit is part of the objstorage.ReadHandle interface. +func (r BytesReadable) RecordCacheHit(ctx context.Context, offset, size int64) {} diff --git a/open.go b/open.go index 8f027562bc..b17acecaaf 100644 --- a/open.go +++ b/open.go @@ -803,7 +803,7 @@ func (d *DB) replayWAL( if err != nil { return err } - newVE, _, _, err := d.runCompaction(jobID, c) + newVE, _, _, err := d.runCompaction(jobID, c, d.objProvider) if err != nil { return errors.Wrapf(err, "running compaction during WAL replay") } diff --git a/table_cache.go b/table_cache.go index 14f2492e1c..a7f6145256 100644 --- a/table_cache.go +++ b/table_cache.go @@ -1180,6 +1180,7 @@ type loadInfo struct { } func (v *tableCacheValue) load(loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts) { + // TODO(aaditya): Example of creating iter for SST // Try opening the file first. var f objstorage.Readable var err error