Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compacted restore: ignore some keys out of specific range #59112

Merged
merged 15 commits into from
Jan 24, 2025
8 changes: 6 additions & 2 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (rc *LogClient) Close(ctx context.Context) {
log.Info("Restore client closed")
}

func rewriteRulesFor(sst SSTs, rules *restoreutils.RewriteRules) (*restoreutils.RewriteRules, error) {
func (rc *LogClient) rewriteRulesFor(sst SSTs, rules *restoreutils.RewriteRules) (*restoreutils.RewriteRules, error) {
if r, ok := sst.(RewrittenSSTs); ok {
rewritten := r.RewrittenTo()
if rewritten != sst.TableID() {
Expand All @@ -287,6 +287,10 @@ func rewriteRulesFor(sst SSTs, rules *restoreutils.RewriteRules) (*restoreutils.
return rewriteRules, nil
}
}
// Need to set ts range for compacted sst to filter out irrelevant data.
if sst.Type() == CompactedSSTsType && !rules.HasSetTs() {
rules.SetTsRange(rc.shiftStartTS, rc.startTS, rc.restoreTS)
}
return rules, nil
}

Expand Down Expand Up @@ -318,7 +322,7 @@ func (rc *LogClient) RestoreSSTFiles(
log.Warn("[Compacted SST Restore] Skipping excluded table during restore.", zap.Int64("table_id", i.TableID()))
continue
}
newRules, err := rewriteRulesFor(i, rewriteRules)
newRules, err := rc.rewriteRulesFor(i, rewriteRules)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/log_client/compacted_file_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (cs *CompactedFileSplitStrategy) inspect(ssts SSTs) sstIdentity {
}
}

rule := utils.GetRewriteRuleOfTable(ssts.TableID(), r.RewrittenTo(), 0, map[int64]int64{}, false)
rule := utils.GetRewriteRuleOfTable(ssts.TableID(), r.RewrittenTo(), map[int64]int64{}, false)

return sstIdentity{
EffectiveID: r.RewrittenTo(),
Expand Down
9 changes: 7 additions & 2 deletions br/pkg/restore/log_client/log_file_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,14 +495,19 @@ func (rc *LogFileManager) ReadAllEntries(
return kvEntries, nextKvEntries, nil
}

func Subcompactions(ctx context.Context, prefix string, s storage.ExternalStorage) SubCompactionIter {
func Subcompactions(ctx context.Context, prefix string, s storage.ExternalStorage, shiftStartTS, restoredTS uint64) SubCompactionIter {
return iter.FlatMap(storage.UnmarshalDir(
ctx,
&storage.WalkOption{SubDir: prefix},
s,
func(t *backuppb.LogFileSubcompactions, name string, b []byte) error { return t.Unmarshal(b) },
), func(subcs *backuppb.LogFileSubcompactions) iter.TryNextor[*backuppb.LogFileSubcompaction] {
return iter.FromSlice(subcs.Subcompactions)
return iter.MapFilter(iter.FromSlice(subcs.Subcompactions), func(subc *backuppb.LogFileSubcompaction) (*backuppb.LogFileSubcompaction, bool) {
if subc.Meta.InputMaxTs < shiftStartTS || subc.Meta.InputMinTs > restoredTS {
return nil, true
}
return subc, false
})
})
}

Expand Down
6 changes: 4 additions & 2 deletions br/pkg/restore/log_client/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (builder *WithMigrationsBuilder) Build(migs []*backuppb.Migration) WithMigr
fullBackups: fullBackups,
restoredTS: builder.restoredTS,
startTS: builder.startTS,
shiftStartTS: builder.shiftStartTS,
}
return withMigrations
}
Expand Down Expand Up @@ -222,8 +223,9 @@ type WithMigrations struct {
skipmap metaSkipMap
compactionDirs []string
fullBackups []string
restoredTS uint64
shiftStartTS uint64
startTS uint64
restoredTS uint64
}

func (wm *WithMigrations) Metas(metaNameIter MetaNameIter) MetaMigrationsIter {
Expand All @@ -249,7 +251,7 @@ func (wm *WithMigrations) Compactions(ctx context.Context, s storage.ExternalSto
compactionDirIter := iter.FromSlice(wm.compactionDirs)
return iter.FlatMap(compactionDirIter, func(name string) iter.TryNextor[*backuppb.LogFileSubcompaction] {
// name is the absolute path in external storage.
return Subcompactions(ctx, name, s)
return Subcompactions(ctx, name, s, wm.shiftStartTS, wm.restoredTS)
})
}

Expand Down
16 changes: 15 additions & 1 deletion br/pkg/restore/log_client/ssts.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ var (
_ RewrittenSSTs = &CopiedSST{}
)

const (
CompactedSSTsType = 1
CopiedSSTsType = 2
)

// RewrittenSSTs is an extension to the `SSTs` that needs extra key rewriting.
// This allows a SST being restored "as if" it in another table.
//
Expand All @@ -37,6 +42,7 @@ type RewrittenSSTs interface {
type SSTs interface {
fmt.Stringer

Type() int
// TableID returns the ID of the table associated with the SST files.
// This should be the same as the physical content's table ID.
TableID() int64
Expand All @@ -51,6 +57,10 @@ type CompactedSSTs struct {
*backuppb.LogFileSubcompaction
}

func (s *CompactedSSTs) Type() int {
return CompactedSSTsType
}

func (s *CompactedSSTs) String() string {
return fmt.Sprintf("CompactedSSTs: %s", s.Meta)
}
Expand All @@ -75,7 +85,11 @@ type CopiedSST struct {
}

func (s *CopiedSST) String() string {
return fmt.Sprintf("AddedSSTs: %s", s.File)
return fmt.Sprintf("CopiedSSTs: %s", s.File)
}

func (s *CopiedSST) Type() int {
return CopiedSSTsType
}

func (s *CopiedSST) TableID() int64 {
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/restore/snap_client/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,10 @@ func (importer *SnapFileImporter) buildDownloadRequest(
regionInfo *split.RegionInfo,
cipher *backuppb.CipherInfo,
) (*import_sstpb.DownloadRequest, import_sstpb.SSTMeta, error) {
err := rewriteRules.SetTimeRangeFilter(file.Cf)
if err != nil {
return nil, import_sstpb.SSTMeta{}, err
}
// Get the rewrite rule for the file.
fileRule := restoreutils.FindMatchedRewriteRule(file, rewriteRules)
if fileRule == nil {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ go_test(
"rewrite_rule_test.go",
],
flaky = True,
shard_count = 14,
shard_count = 15,
deps = [
":utils",
"//br/pkg/conn",
Expand Down
45 changes: 41 additions & 4 deletions br/pkg/restore/utils/rewrite_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,25 @@ type RewriteRules struct {
NewKeyspace []byte
// used to record checkpoint data
NewTableID int64

ShiftStartTs uint64
StartTs uint64
RestoredTs uint64
// used to record backup files to pitr.
// note: should NewTableID merged with this?
TableIDRemapHint []TableIDRemap
}

func (r *RewriteRules) HasSetTs() bool {
return r.StartTs != 0 && r.RestoredTs != 0
}

func (r *RewriteRules) SetTsRange(shiftStartTs, startTs, restoredTs uint64) {
r.ShiftStartTs = shiftStartTs
r.StartTs = startTs
r.RestoredTs = restoredTs
}

func (r *RewriteRules) RewriteSourceTableID(from, to int64) (rewritten bool) {
toPrefix := tablecodec.EncodeTablePrefix(to)
fromPrefix := tablecodec.EncodeTablePrefix(from)
Expand Down Expand Up @@ -93,6 +107,33 @@ func (r *RewriteRules) Append(other RewriteRules) {
r.Data = append(r.Data, other.Data...)
}

func (r *RewriteRules) SetTimeRangeFilter(cfName string) error {
// for some sst files like db restore copy ssts, we don't need to set the time range filter
if !r.HasSetTs() {
return nil
}

var ignoreBeforeTs uint64
switch {
case strings.Contains(cfName, DefaultCFName):
// for default cf, we need to check if shift start ts is greater than start ts
if r.ShiftStartTs > r.StartTs {
return errors.Errorf("shift start ts %d is greater than start ts %d", r.ShiftStartTs, r.StartTs)
}
ignoreBeforeTs = r.ShiftStartTs
case strings.Contains(cfName, WriteCFName):
ignoreBeforeTs = r.StartTs
default:
return errors.Errorf("unsupported column family type: %s", cfName)
}

for _, rule := range r.Data {
rule.IgnoreBeforeTimestamp = ignoreBeforeTs
rule.IgnoreAfterTimestamp = r.RestoredTs
}
return nil
}

// EmptyRewriteRule make a map of new, empty rewrite rules.
func EmptyRewriteRulesMap() map[int64]*RewriteRules {
return make(map[int64]*RewriteRules)
Expand Down Expand Up @@ -192,7 +233,6 @@ func GetRewriteRulesMap(
// GetRewriteRuleOfTable returns a rewrite rule from t_{oldID} to t_{newID}.
func GetRewriteRuleOfTable(
oldTableID, newTableID int64,
newTimeStamp uint64,
indexIDs map[int64]int64,
getDetailRule bool,
) *RewriteRules {
Expand All @@ -202,20 +242,17 @@ func GetRewriteRuleOfTable(
dataRules = append(dataRules, &import_sstpb.RewriteRule{
OldKeyPrefix: tablecodec.GenTableRecordPrefix(oldTableID),
NewKeyPrefix: tablecodec.GenTableRecordPrefix(newTableID),
NewTimestamp: newTimeStamp,
})
for oldIndexID, newIndexID := range indexIDs {
dataRules = append(dataRules, &import_sstpb.RewriteRule{
OldKeyPrefix: tablecodec.EncodeTableIndexPrefix(oldTableID, oldIndexID),
NewKeyPrefix: tablecodec.EncodeTableIndexPrefix(newTableID, newIndexID),
NewTimestamp: newTimeStamp,
})
}
} else {
dataRules = append(dataRules, &import_sstpb.RewriteRule{
OldKeyPrefix: tablecodec.EncodeTablePrefix(oldTableID),
NewKeyPrefix: tablecodec.EncodeTablePrefix(newTableID),
NewTimestamp: newTimeStamp,
})
}

Expand Down
Loading