From 8c5e23474fdb10aa36d00b76dbcc5b5ece1589c5 Mon Sep 17 00:00:00 2001 From: GreatRiver Date: Wed, 9 Jul 2025 16:10:53 +0800 Subject: [PATCH 1/7] Update code --- pkg/vm/engine/tae/catalog/object.go | 33 +++++++++++++++++++++++++ pkg/vm/engine/tae/logtail/ckp_writer.go | 2 +- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/pkg/vm/engine/tae/catalog/object.go b/pkg/vm/engine/tae/catalog/object.go index be8e033ffe114..d51f926ff688a 100644 --- a/pkg/vm/engine/tae/catalog/object.go +++ b/pkg/vm/engine/tae/catalog/object.go @@ -728,3 +728,36 @@ func (entry *ObjectEntry) ForeachMVCCNodeInRange(start, end types.TS, f func(*tx } return nil } + +func (entry *ObjectEntry) ForeachMVCCNodeInRange2(start, end types.TS, f func(*txnbase.TxnMVCCNode) error) error { + needWait, txn := entry.GetLastMVCCNode().NeedWaitCommitting(end.Next()) + if needWait { + txn.GetTxnState(true) + } + + var createIn, deleteIn bool + createIn, _ = entry.CreateNode.PreparedIn(start, end) + if createIn { + if err := f(&entry.CreateNode); err != nil { + return err + } + } + deleteIn, _ = entry.DeleteNode.PreparedIn(start, end) + if !deleteIn { + if !entry.IsAppendable() { + return nil + } + if !createIn { + return nil + } + if !entry.DeleteNode.IsCommitted() { + return nil + } + } + logutil.Infof("ForeachMVCCNodeInRange2 start name %v - %v -%v; start %v, end %v", + entry.ObjectName().String(), entry.CreateNode.String(), entry.DeleteNode.String(), start.ToString(), end.ToString()) + if err := f(&entry.DeleteNode); err != nil { + return err + } + return nil +} diff --git a/pkg/vm/engine/tae/logtail/ckp_writer.go b/pkg/vm/engine/tae/logtail/ckp_writer.go index 7053fd7e0974e..04f75974c5e2b 100644 --- a/pkg/vm/engine/tae/logtail/ckp_writer.go +++ b/pkg/vm/engine/tae/logtail/ckp_writer.go @@ -90,7 +90,7 @@ func (collector *BaseCollector_V2) Collect(c *catalog.Catalog) (err error) { } func (collector *BaseCollector_V2) visitObject(entry *catalog.ObjectEntry) error { - return entry.ForeachMVCCNodeInRange(collector.start, collector.end, func(node *txnbase.TxnMVCCNode) error { + return entry.ForeachMVCCNodeInRange2(collector.start, collector.end, func(node *txnbase.TxnMVCCNode) error { if node.IsAborted() { return nil } From 7ef51696d3cc9d96aff0b9907a33f306c6e46677 Mon Sep 17 00:00:00 2001 From: GreatRiver Date: Wed, 9 Jul 2025 16:42:37 +0800 Subject: [PATCH 2/7] Update code --- pkg/vm/engine/tae/catalog/object.go | 6 +++--- pkg/vm/engine/tae/logtail/ckp_writer.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/vm/engine/tae/catalog/object.go b/pkg/vm/engine/tae/catalog/object.go index d51f926ff688a..b22ce4ab7627f 100644 --- a/pkg/vm/engine/tae/catalog/object.go +++ b/pkg/vm/engine/tae/catalog/object.go @@ -729,7 +729,9 @@ func (entry *ObjectEntry) ForeachMVCCNodeInRange(start, end types.TS, f func(*tx return nil } -func (entry *ObjectEntry) ForeachMVCCNodeInRange2(start, end types.TS, f func(*txnbase.TxnMVCCNode) error) error { +// ForeachMVCCSpecificNodeInRange is used by "do checkpoint". +// The purpose is to ensure that the checkpoint contains all data before the end timestamp. +func (entry *ObjectEntry) ForeachMVCCSpecificNodeInRange(start, end types.TS, f func(*txnbase.TxnMVCCNode) error) error { needWait, txn := entry.GetLastMVCCNode().NeedWaitCommitting(end.Next()) if needWait { txn.GetTxnState(true) @@ -754,8 +756,6 @@ func (entry *ObjectEntry) ForeachMVCCNodeInRange2(start, end types.TS, f func(*t return nil } } - logutil.Infof("ForeachMVCCNodeInRange2 start name %v - %v -%v; start %v, end %v", - entry.ObjectName().String(), entry.CreateNode.String(), entry.DeleteNode.String(), start.ToString(), end.ToString()) if err := f(&entry.DeleteNode); err != nil { return err } diff --git a/pkg/vm/engine/tae/logtail/ckp_writer.go b/pkg/vm/engine/tae/logtail/ckp_writer.go index 04f75974c5e2b..6c2237b050a85 100644 --- a/pkg/vm/engine/tae/logtail/ckp_writer.go +++ b/pkg/vm/engine/tae/logtail/ckp_writer.go @@ -90,7 +90,7 @@ func (collector *BaseCollector_V2) Collect(c *catalog.Catalog) (err error) { } func (collector *BaseCollector_V2) visitObject(entry *catalog.ObjectEntry) error { - return entry.ForeachMVCCNodeInRange2(collector.start, collector.end, func(node *txnbase.TxnMVCCNode) error { + return entry.ForeachMVCCSpecificNodeInRange(collector.start, collector.end, func(node *txnbase.TxnMVCCNode) error { if node.IsAborted() { return nil } From 00553e688db876669e29a25d733b39f55abbea2c Mon Sep 17 00:00:00 2001 From: GreatRiver Date: Wed, 9 Jul 2025 16:44:21 +0800 Subject: [PATCH 3/7] Update code --- pkg/vm/engine/disttae/db.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/vm/engine/disttae/db.go b/pkg/vm/engine/disttae/db.go index 8c6efa32e7231..2709000e2c889 100644 --- a/pkg/vm/engine/disttae/db.go +++ b/pkg/vm/engine/disttae/db.go @@ -434,13 +434,13 @@ func (e *Engine) getOrCreateSnapPartBy( ckpsCanServe := func() bool { // The checkpoint entry required by SnapshotRead must meet two or more checkpoints, // otherwise the latest partition can meet this SnapshotRead request - if len(checkpointEntries) < 2 { + if len(checkpointEntries) < 1 { return false } // The end time of the penultimate checkpoint must not be less than the ts of the snapshot, // because the data of the snapshot may exist in the wal and be collected to the next checkpoint - end := checkpointEntries[len(checkpointEntries)-2].GetEnd() + end := checkpointEntries[len(checkpointEntries)-1].GetEnd() return !end.LT(&ts) } From 2da3f432fa021f852bdebc55f7abddf346dfdb33 Mon Sep 17 00:00:00 2001 From: GreatRiver Date: Wed, 9 Jul 2025 17:09:09 +0800 Subject: [PATCH 4/7] Revert "Update code" This reverts commit 00553e688db876669e29a25d733b39f55abbea2c. --- pkg/vm/engine/disttae/db.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/vm/engine/disttae/db.go b/pkg/vm/engine/disttae/db.go index 2709000e2c889..8c6efa32e7231 100644 --- a/pkg/vm/engine/disttae/db.go +++ b/pkg/vm/engine/disttae/db.go @@ -434,13 +434,13 @@ func (e *Engine) getOrCreateSnapPartBy( ckpsCanServe := func() bool { // The checkpoint entry required by SnapshotRead must meet two or more checkpoints, // otherwise the latest partition can meet this SnapshotRead request - if len(checkpointEntries) < 1 { + if len(checkpointEntries) < 2 { return false } // The end time of the penultimate checkpoint must not be less than the ts of the snapshot, // because the data of the snapshot may exist in the wal and be collected to the next checkpoint - end := checkpointEntries[len(checkpointEntries)-1].GetEnd() + end := checkpointEntries[len(checkpointEntries)-2].GetEnd() return !end.LT(&ts) } From 44c6e641df0bb1a8198569aeac7e6a5b956e9bf5 Mon Sep 17 00:00:00 2001 From: GreatRiver Date: Thu, 10 Jul 2025 15:30:46 +0800 Subject: [PATCH 5/7] Update code --- pkg/datasync/consumer_test.go | 2 +- pkg/vm/engine/tae/db/db.go | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/pkg/datasync/consumer_test.go b/pkg/datasync/consumer_test.go index 1d8a4184a5fe3..f91b8309f7eb8 100644 --- a/pkg/datasync/consumer_test.go +++ b/pkg/datasync/consumer_test.go @@ -304,7 +304,7 @@ func TestParseCheckpointLocations(t *testing.T) { assert.NotNil(t, ckp) locations, err := c.parseCheckpointLocations(ctx, ckp.Location) assert.NoError(t, err) - assert.Equal(t, 35, len(locations)) + assert.Equal(t, 65, len(locations)) }) }) } diff --git a/pkg/vm/engine/tae/db/db.go b/pkg/vm/engine/tae/db/db.go index 5fd372e02c49a..9400ee6be9d4b 100644 --- a/pkg/vm/engine/tae/db/db.go +++ b/pkg/vm/engine/tae/db/db.go @@ -236,8 +236,6 @@ func (db *DB) ForceCheckpointForBackup( ts types.TS, ) (location string, err error) { t0 := time.Now() - err = db.ForceCheckpoint(ctx, ts) - t1 := time.Now() defer func() { logger := logutil.Info @@ -247,8 +245,7 @@ func (db *DB) ForceCheckpointForBackup( logger( "Force-Backup-CKP", zap.Duration("total-cost", time.Since(t0)), - zap.Duration("force-ickp-cost", t1.Sub(t0)), - zap.Duration("create-backup-cost", time.Since(t1)), + zap.Duration("create-backup-cost", time.Since(t0)), zap.String("location", location), zap.Error(err), ) From 751e87349c2b7298c8c9d00c723836292a16ffea Mon Sep 17 00:00:00 2001 From: GreatRiver Date: Thu, 10 Jul 2025 16:43:39 +0800 Subject: [PATCH 6/7] Update code --- pkg/vm/engine/tae/db/db.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/vm/engine/tae/db/db.go b/pkg/vm/engine/tae/db/db.go index 9400ee6be9d4b..5fd372e02c49a 100644 --- a/pkg/vm/engine/tae/db/db.go +++ b/pkg/vm/engine/tae/db/db.go @@ -236,6 +236,8 @@ func (db *DB) ForceCheckpointForBackup( ts types.TS, ) (location string, err error) { t0 := time.Now() + err = db.ForceCheckpoint(ctx, ts) + t1 := time.Now() defer func() { logger := logutil.Info @@ -245,7 +247,8 @@ func (db *DB) ForceCheckpointForBackup( logger( "Force-Backup-CKP", zap.Duration("total-cost", time.Since(t0)), - zap.Duration("create-backup-cost", time.Since(t0)), + zap.Duration("force-ickp-cost", t1.Sub(t0)), + zap.Duration("create-backup-cost", time.Since(t1)), zap.String("location", location), zap.Error(err), ) From d61510f01719bb9e247dda0ad7474a20ad30e985 Mon Sep 17 00:00:00 2001 From: GreatRiver Date: Tue, 15 Jul 2025 10:53:59 +0800 Subject: [PATCH 7/7] Update log --- pkg/vm/engine/tae/db/gc/v3/checkpoint.go | 18 +++++++++--------- pkg/vm/engine/tae/db/gc/v3/diskcleaner.go | 8 +++++++- pkg/vm/engine/tae/db/gc/v3/exec_v1.go | 2 +- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/pkg/vm/engine/tae/db/gc/v3/checkpoint.go b/pkg/vm/engine/tae/db/gc/v3/checkpoint.go index 67cc929030219..b7817e4959e3c 100644 --- a/pkg/vm/engine/tae/db/gc/v3/checkpoint.go +++ b/pkg/vm/engine/tae/db/gc/v3/checkpoint.go @@ -828,8 +828,8 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked( if ckpMaxEnd.GT(checkpointLowWaterMark) { logutil.Warn("GC-PANIC-MERGE-FILES", zap.String("task", c.TaskNameLocked()), - zap.String("ckpMaxEnd", ckpMaxEnd.ToString()), - zap.String("checkpointLowWaterMark", checkpointLowWaterMark.ToString()), + zap.String("ckp-max", ckpMaxEnd.ToString()), + zap.String("ckp-low", checkpointLowWaterMark.ToString()), ) return } @@ -848,7 +848,7 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked( window := c.GetScannedWindowLocked() if ckpMaxEnd.GT(&window.tsRange.end) { logutil.Warn("GC-PANIC-MERGE-FILES", - zap.String("ckpMaxEnd", ckpMaxEnd.ToString()), + zap.String("ckp-max", ckpMaxEnd.ToString()), zap.String("window-end", window.tsRange.end.ToString()), ) return @@ -901,7 +901,7 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked( logutil.Error( "GC-TRACE-MERGE-CHECKPOINT-FILES", zap.String("task", c.TaskNameLocked()), - zap.Int("file len", len(newFiles)), + zap.Int("file-len", len(newFiles)), zap.Error(err), ) return @@ -947,7 +947,7 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked( logutil.Info( "GC-TRACE-DELETE-CHECKPOINT-FILE", zap.String("task", c.TaskNameLocked()), - zap.String("delete file", deleteFile), + zap.String("delete-file", deleteFile), ) c.checkpointCli.RemoveCheckpointMetaFile(decodedFile.GetName()) } @@ -1140,7 +1140,7 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked( if scanMark.IsEmpty() { logutil.Warn("GC-PANIC-SCANMARK-EMPTY", zap.String("task", c.TaskNameLocked()), - zap.String("mergeMark", mergeMark.ToString())) + zap.String("merge-mark", mergeMark.ToString())) return nil } if waterMark.GT(&scanMark) { @@ -1573,7 +1573,7 @@ func (c *checkpointCleaner) tryScanLocked( cpt := c.checkpointCli.GetCompacted() if cpt == nil { logutil.Info("GC-PANIC-REBUILD-TABLE", - zap.String("max gCkp", maxGCkp.String()), + zap.String("max-gckp", maxGCkp.String()), zap.String("start", start.ToString())) } else { candidates = append(candidates, cpt) @@ -1582,8 +1582,8 @@ func (c *checkpointCleaner) tryScanLocked( gcWaterMark := c.GetGCWaterMark() if gcWaterMark != nil { logutil.Warn("GC-PANIC-REBUILD-GC-WATER-MARK", - zap.String("max gCkp", maxGCkp.String()), - zap.String("gcWaterMark", gcWaterMark.String())) + zap.String("max-gckp", maxGCkp.String()), + zap.String("gc-water-mark", gcWaterMark.String())) } c.updateGCWaterMark(maxGCkp) tryGC = false diff --git a/pkg/vm/engine/tae/db/gc/v3/diskcleaner.go b/pkg/vm/engine/tae/db/gc/v3/diskcleaner.go index dab5d1ee35b2c..dc2b5d7c0e22b 100644 --- a/pkg/vm/engine/tae/db/gc/v3/diskcleaner.go +++ b/pkg/vm/engine/tae/db/gc/v3/diskcleaner.go @@ -346,6 +346,12 @@ func (cleaner *DiskCleaner) Start() { cleaner.onceStart.Do(func() { cleaner.processQueue.Start() step := cleaner.step.Load() + defer func() { + logutil.Info( + "GC-DISK-CLEANER-START", + zap.Uint32("step", cleaner.step.Load()), + ) + }() switch step { case StateStep_Write: if err := cleaner.forceScheduleJob(JT_GCReplayAndExecute); err != nil { @@ -366,7 +372,7 @@ func (cleaner *DiskCleaner) Stop() { cleaner.processQueue.Stop() cleaner.cleaner.Stop() logutil.Info( - "GC-DiskCleaner-Started", + "GC-DISK-CLEANER-STOP", zap.Uint32("step", cleaner.step.Load()), ) }) diff --git a/pkg/vm/engine/tae/db/gc/v3/exec_v1.go b/pkg/vm/engine/tae/db/gc/v3/exec_v1.go index c00792d728efe..7c2423f5c6c32 100644 --- a/pkg/vm/engine/tae/db/gc/v3/exec_v1.go +++ b/pkg/vm/engine/tae/db/gc/v3/exec_v1.go @@ -336,7 +336,7 @@ func MakeSnapshotAndPitrFineFilter( if deleteTS.IsEmpty() { logutil.Warn("GC-PANIC-TS-EMPTY", zap.String("name", name), - zap.String("createTS", createTS.ToString())) + zap.String("create-ts", createTS.ToString())) continue } if !logtail.ObjectIsSnapshotRefers(