Skip to content

Commit

Permalink
compacted restore: ignore some keys out of specific range (#59112)
Browse files Browse the repository at this point in the history
close #58238
  • Loading branch information
3pointer authored Jan 24, 2025
1 parent 162c899 commit f5dc850
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 18 deletions.
8 changes: 6 additions & 2 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (rc *LogClient) Close(ctx context.Context) {
log.Info("Restore client closed")
}

func rewriteRulesFor(sst SSTs, rules *restoreutils.RewriteRules) (*restoreutils.RewriteRules, error) {
func (rc *LogClient) rewriteRulesFor(sst SSTs, rules *restoreutils.RewriteRules) (*restoreutils.RewriteRules, error) {
if r, ok := sst.(RewrittenSSTs); ok {
rewritten := r.RewrittenTo()
if rewritten != sst.TableID() {
Expand All @@ -287,6 +287,10 @@ func rewriteRulesFor(sst SSTs, rules *restoreutils.RewriteRules) (*restoreutils.
return rewriteRules, nil
}
}
// Need to set ts range for compacted sst to filter out irrelevant data.
if sst.Type() == CompactedSSTsType && !rules.HasSetTs() {
rules.SetTsRange(rc.shiftStartTS, rc.startTS, rc.restoreTS)
}
return rules, nil
}

Expand Down Expand Up @@ -318,7 +322,7 @@ func (rc *LogClient) RestoreSSTFiles(
log.Warn("[Compacted SST Restore] Skipping excluded table during restore.", zap.Int64("table_id", i.TableID()))
continue
}
newRules, err := rewriteRulesFor(i, rewriteRules)
newRules, err := rc.rewriteRulesFor(i, rewriteRules)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/log_client/compacted_file_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (cs *CompactedFileSplitStrategy) inspect(ssts SSTs) sstIdentity {
}
}

rule := utils.GetRewriteRuleOfTable(ssts.TableID(), r.RewrittenTo(), 0, map[int64]int64{}, false)
rule := utils.GetRewriteRuleOfTable(ssts.TableID(), r.RewrittenTo(), map[int64]int64{}, false)

return sstIdentity{
EffectiveID: r.RewrittenTo(),
Expand Down
9 changes: 7 additions & 2 deletions br/pkg/restore/log_client/log_file_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,14 +495,19 @@ func (rc *LogFileManager) ReadAllEntries(
return kvEntries, nextKvEntries, nil
}

func Subcompactions(ctx context.Context, prefix string, s storage.ExternalStorage) SubCompactionIter {
func Subcompactions(ctx context.Context, prefix string, s storage.ExternalStorage, shiftStartTS, restoredTS uint64) SubCompactionIter {
return iter.FlatMap(storage.UnmarshalDir(
ctx,
&storage.WalkOption{SubDir: prefix},
s,
func(t *backuppb.LogFileSubcompactions, name string, b []byte) error { return t.Unmarshal(b) },
), func(subcs *backuppb.LogFileSubcompactions) iter.TryNextor[*backuppb.LogFileSubcompaction] {
return iter.FromSlice(subcs.Subcompactions)
return iter.MapFilter(iter.FromSlice(subcs.Subcompactions), func(subc *backuppb.LogFileSubcompaction) (*backuppb.LogFileSubcompaction, bool) {
if subc.Meta.InputMaxTs < shiftStartTS || subc.Meta.InputMinTs > restoredTS {
return nil, true
}
return subc, false
})
})
}

Expand Down
6 changes: 4 additions & 2 deletions br/pkg/restore/log_client/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (builder *WithMigrationsBuilder) Build(migs []*backuppb.Migration) WithMigr
fullBackups: fullBackups,
restoredTS: builder.restoredTS,
startTS: builder.startTS,
shiftStartTS: builder.shiftStartTS,
}
return withMigrations
}
Expand Down Expand Up @@ -222,8 +223,9 @@ type WithMigrations struct {
skipmap metaSkipMap
compactionDirs []string
fullBackups []string
restoredTS uint64
shiftStartTS uint64
startTS uint64
restoredTS uint64
}

func (wm *WithMigrations) Metas(metaNameIter MetaNameIter) MetaMigrationsIter {
Expand All @@ -249,7 +251,7 @@ func (wm *WithMigrations) Compactions(ctx context.Context, s storage.ExternalSto
compactionDirIter := iter.FromSlice(wm.compactionDirs)
return iter.FlatMap(compactionDirIter, func(name string) iter.TryNextor[*backuppb.LogFileSubcompaction] {
// name is the absolute path in external storage.
return Subcompactions(ctx, name, s)
return Subcompactions(ctx, name, s, wm.shiftStartTS, wm.restoredTS)
})
}

Expand Down
16 changes: 15 additions & 1 deletion br/pkg/restore/log_client/ssts.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ var (
_ RewrittenSSTs = &CopiedSST{}
)

const (
CompactedSSTsType = 1
CopiedSSTsType = 2
)

// RewrittenSSTs is an extension to the `SSTs` that needs extra key rewriting.
// This allows a SST being restored "as if" it in another table.
//
Expand All @@ -37,6 +42,7 @@ type RewrittenSSTs interface {
type SSTs interface {
fmt.Stringer

Type() int
// TableID returns the ID of the table associated with the SST files.
// This should be the same as the physical content's table ID.
TableID() int64
Expand All @@ -51,6 +57,10 @@ type CompactedSSTs struct {
*backuppb.LogFileSubcompaction
}

func (s *CompactedSSTs) Type() int {
return CompactedSSTsType
}

func (s *CompactedSSTs) String() string {
return fmt.Sprintf("CompactedSSTs: %s", s.Meta)
}
Expand All @@ -75,7 +85,11 @@ type CopiedSST struct {
}

func (s *CopiedSST) String() string {
return fmt.Sprintf("AddedSSTs: %s", s.File)
return fmt.Sprintf("CopiedSSTs: %s", s.File)
}

func (s *CopiedSST) Type() int {
return CopiedSSTsType
}

func (s *CopiedSST) TableID() int64 {
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/restore/snap_client/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,10 @@ func (importer *SnapFileImporter) buildDownloadRequest(
regionInfo *split.RegionInfo,
cipher *backuppb.CipherInfo,
) (*import_sstpb.DownloadRequest, import_sstpb.SSTMeta, error) {
err := rewriteRules.SetTimeRangeFilter(file.Cf)
if err != nil {
return nil, import_sstpb.SSTMeta{}, err
}
// Get the rewrite rule for the file.
fileRule := restoreutils.FindMatchedRewriteRule(file, rewriteRules)
if fileRule == nil {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ go_test(
"rewrite_rule_test.go",
],
flaky = True,
shard_count = 14,
shard_count = 15,
deps = [
":utils",
"//br/pkg/conn",
Expand Down
45 changes: 41 additions & 4 deletions br/pkg/restore/utils/rewrite_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,25 @@ type RewriteRules struct {
NewKeyspace []byte
// used to record checkpoint data
NewTableID int64

ShiftStartTs uint64
StartTs uint64
RestoredTs uint64
// used to record backup files to pitr.
// note: should NewTableID merged with this?
TableIDRemapHint []TableIDRemap
}

func (r *RewriteRules) HasSetTs() bool {
return r.StartTs != 0 && r.RestoredTs != 0
}

func (r *RewriteRules) SetTsRange(shiftStartTs, startTs, restoredTs uint64) {
r.ShiftStartTs = shiftStartTs
r.StartTs = startTs
r.RestoredTs = restoredTs
}

func (r *RewriteRules) RewriteSourceTableID(from, to int64) (rewritten bool) {
toPrefix := tablecodec.EncodeTablePrefix(to)
fromPrefix := tablecodec.EncodeTablePrefix(from)
Expand Down Expand Up @@ -93,6 +107,33 @@ func (r *RewriteRules) Append(other RewriteRules) {
r.Data = append(r.Data, other.Data...)
}

func (r *RewriteRules) SetTimeRangeFilter(cfName string) error {
// for some sst files like db restore copy ssts, we don't need to set the time range filter
if !r.HasSetTs() {
return nil
}

var ignoreBeforeTs uint64
switch {
case strings.Contains(cfName, DefaultCFName):
// for default cf, we need to check if shift start ts is greater than start ts
if r.ShiftStartTs > r.StartTs {
return errors.Errorf("shift start ts %d is greater than start ts %d", r.ShiftStartTs, r.StartTs)
}
ignoreBeforeTs = r.ShiftStartTs
case strings.Contains(cfName, WriteCFName):
ignoreBeforeTs = r.StartTs
default:
return errors.Errorf("unsupported column family type: %s", cfName)
}

for _, rule := range r.Data {
rule.IgnoreBeforeTimestamp = ignoreBeforeTs
rule.IgnoreAfterTimestamp = r.RestoredTs
}
return nil
}

// EmptyRewriteRule make a map of new, empty rewrite rules.
func EmptyRewriteRulesMap() map[int64]*RewriteRules {
return make(map[int64]*RewriteRules)
Expand Down Expand Up @@ -192,7 +233,6 @@ func GetRewriteRulesMap(
// GetRewriteRuleOfTable returns a rewrite rule from t_{oldID} to t_{newID}.
func GetRewriteRuleOfTable(
oldTableID, newTableID int64,
newTimeStamp uint64,
indexIDs map[int64]int64,
getDetailRule bool,
) *RewriteRules {
Expand All @@ -202,20 +242,17 @@ func GetRewriteRuleOfTable(
dataRules = append(dataRules, &import_sstpb.RewriteRule{
OldKeyPrefix: tablecodec.GenTableRecordPrefix(oldTableID),
NewKeyPrefix: tablecodec.GenTableRecordPrefix(newTableID),
NewTimestamp: newTimeStamp,
})
for oldIndexID, newIndexID := range indexIDs {
dataRules = append(dataRules, &import_sstpb.RewriteRule{
OldKeyPrefix: tablecodec.EncodeTableIndexPrefix(oldTableID, oldIndexID),
NewKeyPrefix: tablecodec.EncodeTableIndexPrefix(newTableID, newIndexID),
NewTimestamp: newTimeStamp,
})
}
} else {
dataRules = append(dataRules, &import_sstpb.RewriteRule{
OldKeyPrefix: tablecodec.EncodeTablePrefix(oldTableID),
NewKeyPrefix: tablecodec.EncodeTablePrefix(newTableID),
NewTimestamp: newTimeStamp,
})
}

Expand Down
Loading

0 comments on commit f5dc850

Please sign in to comment.