Skip to content

Commit

Permalink
db: perform a snapshot if WAL contains persisted entries (#569)
Browse files Browse the repository at this point in the history
* db: set uncompressed insert size after recovery

Previously, the uncompressed insert size would not be initialized correctly
after recovery, this could cause L0 size to grow

* db: perform a snapshot if WAL contains persisted entries

When rotating or persisting a block, a TableBlockPersisted entry is logged in
the WAL and an async snapshot is performed. If on recovery, the entry still
exists in the WAL, it means that a snapshot was not performed, since it would
have truncated the WAL. This commit ensures the WAL is cleaned up in these
cases, since we have seen that this could lead the WAL to grow indefinitely.
  • Loading branch information
asubiotto authored Oct 24, 2023
1 parent 92877c7 commit 558ded3
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 11 deletions.
48 changes: 44 additions & 4 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,11 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
return err
}

// performSnapshot is set to true if a snapshot should be performed after
// replay. This is set in cases where there could be "dead bytes" in the
// WAL (i.e. entries that occupy space on disk but are useless).
performSnapshot := false

// Writes are performed concurrently to speed up replay.
var writeWg errgroup.Group
writeWg.SetLimit(runtime.GOMAXPROCS(0))
Expand Down Expand Up @@ -755,6 +760,11 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {

// If we get to this point it means a block was finished but did
// not get persisted.
level.Info(db.logger).Log(
"msg", "writing unfinished block in recovery",
"table", tableName,
"tx", tx,
)
table.pendingBlocks[table.active] = struct{}{}
go table.writeBlock(table.active, db.columnStore.manualBlockRotation, false)

Expand Down Expand Up @@ -824,6 +834,10 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
})

case *walpb.Entry_TableBlockPersisted_:
// If a block was persisted but the entry still exists in the WAL,
// a snapshot was not performed after persisting the block. Perform
// one now to clean up the WAL.
performSnapshot = true
return nil
case *walpb.Entry_Snapshot_:
return nil
Expand All @@ -843,7 +857,28 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
if lastTx.TxnID > resetTxn.TxnID {
resetTxn = lastTx
}

db.mtx.Lock()
for _, table := range db.tables {
block := table.ActiveBlock()
block.uncompressedInsertsSize.Store(block.Index().LevelSize(index.L0))
}
db.mtx.Unlock()

db.resetToTxn(resetTxn, nil)
if performSnapshot && db.columnStore.snapshotTriggerSize != 0 {
level.Info(db.logger).Log(
"msg", "performing snapshot after recovery",
)
db.snapshot(ctx, false, func() {
if err := db.reclaimDiskSpace(ctx, wal); err != nil {
level.Error(db.logger).Log(
"msg", "failed to reclaim disk space after snapshot during recovery",
"err", err,
)
}
})
}
level.Info(db.logger).Log(
append(
[]any{
Expand Down Expand Up @@ -894,7 +929,7 @@ func (db *DB) Close(options ...CloseOption) error {
start := time.Now()
db.snapshot(context.Background(), false, func() {
level.Info(db.logger).Log("msg", "snapshot on close completed", "duration", time.Since(start))
if err := db.reclaimDiskSpace(context.Background()); err != nil {
if err := db.reclaimDiskSpace(context.Background(), nil); err != nil {
level.Error(db.logger).Log(
"msg", "failed to reclaim disk space after snapshot",
"err", err,
Expand Down Expand Up @@ -938,8 +973,10 @@ func (db *DB) maintainWAL() {
}

// reclaimDiskSpace attempts to read the latest valid snapshot txn and removes
// any snapshots/wal entries that are older than the snapshot tx.
func (db *DB) reclaimDiskSpace(ctx context.Context) error {
// any snapshots/wal entries that are older than the snapshot tx. Since this can
// be called before db.wal is set, the caller may optionally pass in a WAL to
// truncate.
func (db *DB) reclaimDiskSpace(ctx context.Context, wal WAL) error {
if db.columnStore.testingOptions.disableReclaimDiskSpaceOnSnapshot {
return nil
}
Expand All @@ -953,7 +990,10 @@ func (db *DB) reclaimDiskSpace(ctx context.Context) error {
if err := db.cleanupSnapshotDir(ctx, validSnapshotTxn); err != nil {
return err
}
return db.wal.Truncate(validSnapshotTxn)
if wal == nil {
wal = db.wal
}
return wal.Truncate(validSnapshotTxn)
}

func (db *DB) getMinTXPersisted() uint64 {
Expand Down
76 changes: 76 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/polarsignals/frostdb/dynparquet"
schemapb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/schema/v1alpha1"
walpb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/wal/v1alpha1"
"github.com/polarsignals/frostdb/query"
"github.com/polarsignals/frostdb/query/logicalplan"
"github.com/polarsignals/frostdb/query/physicalplan"
Expand Down Expand Up @@ -1630,6 +1631,81 @@ func TestDBRecover(t *testing.T) {
require.NoError(t, err)
newWriteAndExpectWALRecord(t, db, table)
})

// SnapshotOnRecovery verifies that a snapshot is taken on recovery if the
// WAL indicates that a block was rotated but no snapshot was taken.
t.Run("SnapshotOnRecovery", func(t *testing.T) {
dir := setup(t, false)
c, err := New(
WithLogger(newTestLogger(t)),
WithStoragePath(dir),
WithWAL(),
// This option will disable snapshots on block rotation.
WithSnapshotTriggerSize(0),
)
require.NoError(t, err)

snapshotsPath := filepath.Join(dir, "databases", dbAndTableName, "snapshots")
snapshots, err := os.ReadDir(snapshotsPath)
require.NoError(t, err)

seenSnapshots := make(map[string]struct{})
for _, s := range snapshots {
seenSnapshots[s.Name()] = struct{}{}
}

db, err := c.DB(ctx, dbAndTableName)
require.NoError(t, err)
table, err := db.Table(dbAndTableName, nil)
require.NoError(t, err)

require.NoError(t, table.RotateBlock(ctx, table.ActiveBlock(), false))

rec, err := dynparquet.NewTestSamples().ToRecord()
require.NoError(t, err)

insertTx, err := table.InsertRecord(ctx, rec)
require.NoError(t, err)

// RotateBlock again, this should log a couple of persisted block WAL
// entries.
require.NoError(t, table.RotateBlock(ctx, table.ActiveBlock(), false))
require.NoError(t, c.Close())

c, err = New(
WithLogger(newTestLogger(t)),
WithStoragePath(dir),
WithWAL(),
// Enable snapshots.
WithSnapshotTriggerSize(1),
)
require.NoError(t, err)
defer c.Close()

snapshots, err = os.ReadDir(snapshotsPath)
require.NoError(t, err)

for _, s := range snapshots {
tx, err := getTxFromSnapshotFileName(s.Name())
require.NoError(t, err)
require.GreaterOrEqual(t, tx, insertTx, "expected only snapshots after insert txn")
}
db, err = c.DB(ctx, dbAndTableName)
require.NoError(t, err)

require.Eventually(t, func() bool {
numBlockPersists := 0
require.NoError(t, db.wal.Replay(0, func(tx uint64, entry *walpb.Record) error {
if _, ok := entry.Entry.EntryType.(*walpb.Entry_TableBlockPersisted_); ok {
numBlockPersists++
}
return nil
}))
return numBlockPersists <= 1
}, 1*time.Second, 10*time.Millisecond,
"expected at most one block persist entry; the others should have been snapshot and truncated",
)
})
}

func Test_DB_WalReplayTableConfig(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions index/lsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ func (l *LSM) Size() int64 {
return size
}

// LevelSize returns the size of a specific level in bytes.
func (l *LSM) LevelSize(t SentinelType) int64 {
return l.sizes[t].Load()
}

func validateLevels(levels []*LevelConfig) error {
for i, l := range levels {
if int(l.Level) != i {
Expand Down
14 changes: 9 additions & 5 deletions snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,18 @@ func (db *DB) snapshotAtTX(ctx context.Context, tx uint64, writeSnapshot func(co
}
return nil
}(); err != nil {
db.metrics.snapshotMetrics.snapshotsTotal.WithLabelValues("false").Inc()
if db.metrics != nil {
db.metrics.snapshotMetrics.snapshotsTotal.WithLabelValues("false").Inc()
}
return err
}
db.metrics.snapshotMetrics.snapshotsTotal.WithLabelValues("true").Inc()
if fileSize > 0 {
db.metrics.snapshotMetrics.snapshotFileSizeBytes.Set(float64(fileSize))
if db.metrics != nil {
db.metrics.snapshotMetrics.snapshotsTotal.WithLabelValues("true").Inc()
if fileSize > 0 {
db.metrics.snapshotMetrics.snapshotFileSizeBytes.Set(float64(fileSize))
}
db.metrics.snapshotMetrics.snapshotDurationHistogram.Observe(time.Since(start).Seconds())
}
db.metrics.snapshotMetrics.snapshotDurationHistogram.Observe(time.Since(start).Seconds())
// TODO(asubiotto): If snapshot file sizes become too large, investigate
// adding compression.
return nil
Expand Down
4 changes: 2 additions & 2 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func (t *Table) writeBlock(block *TableBlock, skipPersist, snapshotDB bool) {
"err", err,
)
}
if err := t.db.reclaimDiskSpace(ctx); err != nil {
if err := t.db.reclaimDiskSpace(ctx, nil); err != nil {
level.Error(t.logger).Log(
"msg", "failed to reclaim disk space after snapshot on block rotation",
"err", err,
Expand Down Expand Up @@ -608,7 +608,7 @@ func (t *Table) appender(ctx context.Context) (*TableBlock, func(), error) {
"last_snapshot_size", humanize.IBytes(uint64(block.lastSnapshotSize.Load())),
)
block.lastSnapshotSize.Store(uncompressedInsertsSize)
if err := t.db.reclaimDiskSpace(context.Background()); err != nil {
if err := t.db.reclaimDiskSpace(context.Background(), nil); err != nil {
level.Error(t.logger).Log(
"msg", "failed to reclaim disk space after snapshot",
"err", err,
Expand Down

0 comments on commit 558ded3

Please sign in to comment.