Skip to content

Commit

Permalink
make an asynchronous writer
Browse files Browse the repository at this point in the history
Signed-off-by: hillium <[email protected]>
  • Loading branch information
YuJuncen committed Dec 27, 2024
1 parent 9a3645c commit 1582a9b
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 9 deletions.
73 changes: 71 additions & 2 deletions br/pkg/restore/snap_client/pitr_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,67 @@ import (
"golang.org/x/sync/errgroup"
)

type writerCall struct {
cx context.Context
cb func(error)
}

type writerRoutine struct {
hnd chan<- writerCall
}

func (w writerRoutine) close() {
close(w.hnd)
}

func (w writerRoutine) write(ctx context.Context) error {
ch := make(chan error)
w.hnd <- writerCall{
cx: ctx,
cb: func(err error) {
select {
case ch <- err:
default:
}
},
}
return <-ch
}

func (c *pitrCollector) goWriter() {
hnd := make(chan writerCall, 2048)
exhaust := func(f func(writerCall)) {
select {
case cb, ok := <-hnd:
if !ok {
log.Warn("Early channel close. Should not happen.")
return
}
f(cb)
default:
}
}

go func() {
for newCall := range hnd {
cs := []writerCall{newCall}
exhaust(func(newCall writerCall) {
cs = append(cs, newCall)
})

err := c.doPersistExtraBackupMeta(cs[0].cx)

for _, c := range cs {
c.cb(err)
}
}
}()

c.writerRoutine = writerRoutine{
hnd: hnd,
}
}

type pitrCollector struct {
// Immutable state.
taskStorage storage.ExternalStorage
Expand All @@ -40,6 +101,8 @@ type pitrCollector struct {
extraBackupMetaLock sync.Mutex
putMigOnce sync.Once

writerRoutine writerRoutine

// Delegates.
tso func(ctx context.Context) (uint64, error)
restoreSuccess func() bool
Expand Down Expand Up @@ -78,8 +141,9 @@ func (c *pitrCollector) close() error {
}
log.Info("Log backup SSTs are committed.",
zap.Uint64("commitTS", commitTS), zap.String("committedTo", c.outputPath()))
return nil

c.writerRoutine.close()
return nil
}

func (c *pitrCollector) onBatch(ctx context.Context, fileSets restore.BatchBackupFileSet) (func() error, error) {
Expand Down Expand Up @@ -206,7 +270,7 @@ func (c *pitrCollector) putRewriteRule(_ context.Context, oldID int64, newID int
return err
}

func (c *pitrCollector) persistExtraBackupMeta(ctx context.Context) (err error) {
func (c *pitrCollector) doPersistExtraBackupMeta(ctx context.Context) (err error) {
c.extraBackupMetaLock.Lock()
defer c.extraBackupMetaLock.Unlock()

Expand All @@ -223,6 +287,10 @@ func (c *pitrCollector) persistExtraBackupMeta(ctx context.Context) (err error)
return nil
}

func (c *pitrCollector) persistExtraBackupMeta(ctx context.Context) (err error) {
return c.writerRoutine.write(ctx)
}

// Commit commits the collected SSTs to a migration.
func (c *pitrCollector) prepareMig(ctx context.Context) error {
if !c.enabled {
Expand Down Expand Up @@ -322,6 +390,7 @@ func newPiTRColl(ctx context.Context, deps PiTRCollDep) (*pitrCollector, error)
}
coll.restoreStorage = restoreStrg
coll.restoreSuccess = summary.Succeed
coll.goWriter()
coll.resetCommitting()
return coll, nil
}
28 changes: 21 additions & 7 deletions br/pkg/restore/snap_client/pitr_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,15 @@ type pitrCollectorT struct {
cx context.Context
}

func (p pitrCollectorT) RestoreAFile(fs restore.BatchBackupFileSet) func() error {
func (p *pitrCollector) hackyCloseWriterWithoutPanic() {
defer func() {
_ = recover()
}()

p.writerRoutine.close()
}

func (p *pitrCollectorT) RestoreAFile(fs restore.BatchBackupFileSet) func() error {
for _, b := range fs {
for _, file := range b.SSTFiles {
require.NoError(p.t, p.coll.restoreStorage.WriteFile(p.cx, file.Name, []byte("something")))
Expand All @@ -43,11 +51,11 @@ func (p pitrCollectorT) RestoreAFile(fs restore.BatchBackupFileSet) func() error
return res
}

func (p pitrCollectorT) Done() {
func (p *pitrCollectorT) Done() {
require.NoError(p.t, p.coll.close())
}

func (p pitrCollectorT) ExtFullBkups() []backuppb.ExtraFullBackup {
func (p *pitrCollectorT) ExtFullBkups() []backuppb.ExtraFullBackup {
est := stream.MigrationExtension(p.coll.taskStorage)
migs, err := est.Load(p.cx)
require.NoError(p.t, err)
Expand Down Expand Up @@ -78,11 +86,13 @@ func (p *pitrCollectorT) Reopen() {
tso: p.coll.tso,
restoreSuccess: p.coll.restoreSuccess,
}
p.t.Cleanup(newColl.hackyCloseWriterWithoutPanic)
p.success.Store(false)
newColl.goWriter()
p.coll = newColl
}

func (p pitrCollectorT) RequireCopied(extBk backuppb.ExtraFullBackup, files ...string) {
func (p *pitrCollectorT) RequireCopied(extBk backuppb.ExtraFullBackup, files ...string) {
extFiles := make([]string, 0)
for _, f := range extBk.Files {
extFiles = append(extFiles, f.Name)
Expand All @@ -96,7 +106,7 @@ func (p pitrCollectorT) RequireCopied(extBk backuppb.ExtraFullBackup, files ...s
require.ElementsMatch(p.t, extFiles, locatedFiles)
}

func (p pitrCollectorT) RequireRewrite(extBk backuppb.ExtraFullBackup, rules ...utils.TableIDRemap) {
func (p *pitrCollectorT) RequireRewrite(extBk backuppb.ExtraFullBackup, rules ...utils.TableIDRemap) {
rulesInExtBk := []utils.TableIDRemap{}
for _, f := range extBk.RewrittenTables {
rulesInExtBk = append(rulesInExtBk, utils.TableIDRemap{
Expand All @@ -107,7 +117,7 @@ func (p pitrCollectorT) RequireRewrite(extBk backuppb.ExtraFullBackup, rules ...
require.ElementsMatch(p.t, rulesInExtBk, rules)
}

func newPiTRCollForTest(t *testing.T) pitrCollectorT {
func newPiTRCollForTest(t *testing.T) *pitrCollectorT {
taskStorage := tmp(t)
restoreStorage := tmp(t)

Expand All @@ -124,14 +134,18 @@ func newPiTRCollForTest(t *testing.T) pitrCollectorT {
return tsoCnt.Add(1), nil
}
coll.restoreSuccess = restoreSuccess.Load
coll.goWriter()
t.Cleanup(coll.hackyCloseWriterWithoutPanic)

return pitrCollectorT{
p := &pitrCollectorT{
t: t,
coll: coll,
tsoCnt: tsoCnt,
success: restoreSuccess,
cx: context.Background(),
}

return p
}

type backupFileSetOp func(*restore.BackupFileSet)
Expand Down

0 comments on commit 1582a9b

Please sign in to comment.