Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
aadityasondhi committed Feb 15, 2024
1 parent 89ea47d commit d91997a
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 4 deletions.
38 changes: 37 additions & 1 deletion compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ const (
compactionKindRead
compactionKindRewrite
compactionKindIngestedFlushable
compactionKindBufferedFlush
)

func (k compactionKind) String() string {
Expand Down Expand Up @@ -1874,6 +1875,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
var n, inputs int
var inputBytes uint64
var ingest bool
var bufferedFlush = true // TODO(aaditya): loop this into a config setting
for ; n < len(d.mu.mem.queue)-1; n++ {
if f, ok := d.mu.mem.queue[n].flushable.(*ingestedFlushable); ok {
if n == 0 {
Expand Down Expand Up @@ -1932,6 +1934,10 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
if err != nil {
return 0, err
}

if bufferedFlush && !ingest {
c.kind = compactionKindBufferedFlush
}
d.addInProgressCompaction(c)

jobID := d.mu.nextJobID
Expand All @@ -1946,7 +1952,16 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {

// Compactions always write directly to the database's object provider.
// Flushes may write to an in-memory object provider first.
var objCreator objectCreator = d.objProvider
var objCreator objectCreator
if c.kind == compactionKindBufferedFlush {
bufferedSSTs := &bufferedSSTables{}
// TODO(aaditya): pick a better size
bufferedSSTs.init(10)
objCreator = bufferedSSTs
} else {
objCreator = d.objProvider
}

var ve *manifest.VersionEdit
var pendingOutputs []physicalMeta
var stats compactStats
Expand All @@ -1963,6 +1978,27 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
// TODO(aadityas,jackson): If the buffered output sstables are too small,
// avoid linking them into the version and just update the flushable queue
// appropriately.
if c.kind == compactionKindBufferedFlush {
var metas []*fileMetadata
var fileNums []base.DiskFileNum
for _, file := range ve.NewFiles {
metas = append(metas, file.Meta)
fileNums = append(fileNums, file.BackingFileNum)
}

bufferedSST := objCreator.(*bufferedSSTables)
if bufferedSST.size < d.opts.MemTableSize /* TODO(aaditya): does this make sense? */ {
var f flushable
f, err = newFlushableBufferedSSTables(d.opts.Comparer, metas, sstable.ReaderOptions{}, bufferedSST)
fe := d.newFlushableEntry(f, fileNums[0], 0 /* TODO(aaditya): figure out what to put here */)
remaining := d.mu.mem.queue[n : len(d.mu.mem.queue)-2]
mutable := d.mu.mem.queue[len(d.mu.mem.queue)-1]
d.mu.mem.queue = append(remaining, fe, mutable)
return 0, err
}

// else convert to objProvider and write to disk
}

// Acquire logLock. This will be released either on an error, by way of
// logUnlock, or through a call to logAndApply if there is no error.
Expand Down
9 changes: 6 additions & 3 deletions flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,8 @@ type bufferedSSTables struct {
currFileNum base.DiskFileNum
// finished holds the set of previously written and finished sstables.
finished []bufferedSSTable
// cumulative size of the finished buffers
size uint64
// objectIsOpen is true if the bufferedSSTables is currently being used as a
// Writable.
objectIsOpen bool
Expand Down Expand Up @@ -609,7 +611,8 @@ func (b *bufferedSSTables) Remove(fileType base.FileType, FileNum base.DiskFileN

// Sync implements the objectCreator interface.
func (b *bufferedSSTables) Sync() error {
panic("TODO")
// BufferedSSTs store their data in memory and do not need to sync.
return nil
}

// Assert that bufferedSSTables implements objstorage.Writable.
Expand All @@ -619,10 +622,9 @@ func (b *bufferedSSTables) Sync() error {
// written by the flush.
var _ objstorage.Writable = (*bufferedSSTables)(nil)

// Finish implements objstorage.Writable.
// Write implements objstorage.Writable.
func (o *bufferedSSTables) Write(p []byte) error {

Check failure on line 626 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-no-invariants

receiver name o should be consistent with previous receiver name b for bufferedSSTables

Check failure on line 626 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-no-cgo

receiver name o should be consistent with previous receiver name b for bufferedSSTables

Check failure on line 626 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux-32bit

receiver name o should be consistent with previous receiver name b for bufferedSSTables

Check failure on line 626 in flushable.go

View workflow job for this annotation

GitHub Actions / go-macos

receiver name o should be consistent with previous receiver name b for bufferedSSTables

Check failure on line 626 in flushable.go

View workflow job for this annotation

GitHub Actions / go-linux

receiver name o should be consistent with previous receiver name b for bufferedSSTables
_, err := o.curr.Write(p)
o.curr.Reset()
return err
}

Expand All @@ -635,6 +637,7 @@ func (o *bufferedSSTables) Finish() error {
fileNum: o.currFileNum,
buf: slices.Clone(o.curr.Bytes()),
})
o.size += uint64(o.curr.Len())
o.curr.Reset()
o.objectIsOpen = false
return nil
Expand Down
1 change: 1 addition & 0 deletions table_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,7 @@ type loadInfo struct {
}

func (v *tableCacheValue) load(loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts) {
// TODO(aaditya): Example of creating iter for SST
// Try opening the file first.
var f objstorage.Readable
var err error
Expand Down

0 comments on commit d91997a

Please sign in to comment.