Skip to content

Commit

Permalink
handle last compact
Browse files Browse the repository at this point in the history
  • Loading branch information
RidRisR committed Jan 13, 2025
1 parent 6802b19 commit 1903be3
Showing 1 changed file with 88 additions and 86 deletions.
174 changes: 88 additions & 86 deletions pkg/backup/backupschedule/backup_schedule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (bm *backupScheduleManager) doCompact(bs *v1alpha1.BackupSchedule, startTim
func calEndTs(bs *v1alpha1.BackupSchedule, startTs time.Time, interval time.Duration, scheduleTime *time.Time) time.Time {
lastBackup := bs.Status.LastBackupTime
nextCompact := bs.Status.NextCompactTime
var endTime time.Time
var endTs time.Time

fastCompactLimit := startTs.Add(3 * interval)

Expand All @@ -75,28 +75,26 @@ func calEndTs(bs *v1alpha1.BackupSchedule, startTs time.Time, interval time.Dura
}

for _, target := range targets {
if target == nil || target.Time.Before(startTs) {
if target == nil || target.Time.Compare(startTs) <= 0 {
continue
}

klog.Infof("backupSchedule %s/%s target time is %v", bs.GetNamespace(), bs.GetName(), target.Time)

if target.After(fastCompactLimit) {
endTime = fastCompactLimit
endTs = fastCompactLimit
nextCompact = target
break
} else {
endTime = target.Time
endTs = target.Time
nextCompact = nil
break
}
}
bs.Status.NextCompactTime = nextCompact

if endTime.IsZero() {
endTime = startTs.Add(interval)
if endTs.IsZero() {
endTs = startTs.Add(interval)
}
return endTime
return endTs
}

func (bm *backupScheduleManager) performCompact(bs *v1alpha1.BackupSchedule, scheduleTime *time.Time, nowFn nowFn) error {
Expand All @@ -121,11 +119,11 @@ func (bm *backupScheduleManager) performCompact(bs *v1alpha1.BackupSchedule, sch
}
now := nowFn()

endTime := calEndTs(bs, startTs, interval, scheduleTime)
klog.Infof("backupSchedule %s/%s endTime is %v", bs.GetNamespace(), bs.GetName(), endTime)
endTs = calEndTs(bs, startTs, interval, scheduleTime)
klog.Infof("backupSchedule %s/%s endTs is %v", bs.GetNamespace(), bs.GetName(), endTs)

if endTime.After(now) {
klog.Infof("backupSchedule %s/%s next compact time is not reached: %v", bs.GetNamespace(), bs.GetName(), endTime)
if endTs.After(now) {
klog.Infof("backupSchedule %s/%s next compact time is not reached: %v", bs.GetNamespace(), bs.GetName(), endTs)
return nil
}

Expand Down Expand Up @@ -290,84 +288,88 @@ func (bm *backupScheduleManager) deleteLastcompactJob(bs *v1alpha1.BackupSchedul
return bm.deps.JobControl.DeleteJob(compact, job)
}

func (bm *backupScheduleManager) canPerformNextCompact(bs *v1alpha1.BackupSchedule) error {
ns := bs.GetNamespace()
bsName := bs.GetName()

// If this backup schedule has specified label of backup schedule group, then we need to check the last backup of the group.
// Otherwise, check its own last backup.
bsGroupName := bs.GetLabels()[label.BackupScheduleGroupLabelKey]

if bsGroupName == "" {
compact, err := bm.deps.CompactBackupLister.CompactBackups(ns).Get(bs.Status.LastCompact)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return fmt.Errorf("backup schedule %s/%s, get backup %s failed, err: %v", ns, bsName, bs.Status.LastBackup, err)
}

var timestamp string
switch compact.Status.State {
case string(v1alpha1.BackupComplete):
timestamp = compact.Spec.EndTs
case string(v1alpha1.BackupFailed):
timestamp = compact.Spec.StartTs
default:
// skip this sync round of the backup schedule and waiting the last backup.
return controller.RequeueErrorf("backup schedule %s/%s, the last backup %s is still running", ns, bsName, bs.Status.LastBackup)
}

t, err := config.ParseTSStringToGoTime(timestamp)
if err != nil {
return perrors.AddStack(err)
}
bs.Status.LastCompactTime = &metav1.Time{Time: t}
return nil
}

// Check the last backup of the group
backupScheduleGroupLabels := label.NewBackupScheduleGroup(bsGroupName)
selector, err := backupScheduleGroupLabels.Selector()
if err != nil {
return fmt.Errorf("generate backup schedule group %s label selector failed, err: %v", bsGroupName, err)
}

bss, err := bm.deps.BackupScheduleLister.BackupSchedules(ns).List(selector)
if err != nil {
return fmt.Errorf("backup schedule %s/%s, list backup schedules failed, err: %v", ns, bsName, err)
}

for _, bsMember := range bss {
// The check is not safe in fact since we don't have strict serialization
compact, err := bm.deps.CompactBackupLister.CompactBackups(ns).Get(bs.Status.LastCompact)
if err != nil {
if errors.IsNotFound(err) {
continue
}
return fmt.Errorf("backup schedule %s/%s, get backup %s failed, err: %v", ns, bsName, bsMember.Status.LastBackup, err)
}
// handleLastCompact handles the compact backup processing logic.
// It returns a controller.RequeueError if the backup is still running,
// otherwise it updates the LastCompactTime or returns any encountered error.
func (bm *backupScheduleManager) handleLastCompact(ns, bsName, lastCompact string, bs *v1alpha1.BackupSchedule) error {
if lastCompact == "" {
return nil
}

compact, err := bm.deps.CompactBackupLister.CompactBackups(ns).Get(lastCompact)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return fmt.Errorf("backup schedule %s/%s: get compact backup %s failed: %w", ns, bsName, lastCompact, err)
}

var timestamp string
switch compact.Status.State {
case string(v1alpha1.BackupComplete):
timestamp = compact.Spec.EndTs
case string(v1alpha1.BackupFailed):
timestamp = compact.Spec.StartTs
default:
return controller.RequeueErrorf("backup schedule %s/%s: compact backup %s is still running", ns, bsName, lastCompact)
}

t, err := config.ParseTSStringToGoTime(timestamp)
if err != nil {
return perrors.AddStack(err)
}

if err := bm.deps.CompactControl.DeleteCompactBackup(compact); err != nil {
return fmt.Errorf("backup schedule %s/%s: delete compact backup %s failed: %w", ns, bsName, lastCompact, err)
}

if !t.IsZero() && t.Before(bs.Status.LogBackupStartTs.Time) {
return fmt.Errorf("backupSchedule %s/%s last compact time can't rollback (from %v to %v)", bs.GetNamespace(), bs.GetName(), bs.Status.LogBackupStartTs, t)
}
if bs.Status.LastCompactTime != nil && t.Before(bs.Status.LastCompactTime.Time) {
return fmt.Errorf("backupSchedule %s/%s last compact time can't rollback (from %v to %v)", bs.GetNamespace(), bs.GetName(), bs.Status.LastCompactTime, t)
}
bs.Status.LastCompactTime = &metav1.Time{Time: t}
return nil
}

var timestamp string
switch compact.Status.State {
case string(v1alpha1.BackupComplete):
timestamp = compact.Spec.EndTs
case string(v1alpha1.BackupFailed):
timestamp = compact.Spec.StartTs
default:
// skip this sync round of the backup schedule and waiting the last backup.
return controller.RequeueErrorf("backup schedule %s/%s, the last backup %s is still running", ns, bsName, bs.Status.LastBackup)
}
func (bm *backupScheduleManager) canPerformNextCompact(bs *v1alpha1.BackupSchedule) error {
ns := bs.GetNamespace()
bsName := bs.GetName()

t, err := config.ParseTSStringToGoTime(timestamp)
if err != nil {
return perrors.AddStack(err)
}
bs.Status.LastCompactTime = &metav1.Time{Time: t}
if bs.Status.LogBackupStartTs == nil {
return nil
}

return nil
// Determine if the backup schedule is part of a group
bsGroupName := bs.GetLabels()[label.BackupScheduleGroupLabelKey]
if bsGroupName == "" {
return bm.handleLastCompact(ns, bsName, bs.Status.LastCompact, bs)
}

backupScheduleGroupLabels := label.NewBackupScheduleGroup(bsGroupName)
selector, err := backupScheduleGroupLabels.Selector()
if err != nil {
return fmt.Errorf("generate backup schedule group %s label selector failed: %w", bsGroupName, err)
}

bss, err := bm.deps.BackupScheduleLister.BackupSchedules(ns).List(selector)
if err != nil {
return fmt.Errorf("backup schedule %s/%s: list backup schedules failed: %w", ns, bsName, err)
}

for _, bsMember := range bss {
if err := bm.handleLastCompact(ns, bsName, bsMember.Status.LastCompact, bsMember); err != nil {
// If it's a requeue error, propagate it
if controller.IsRequeueError(err) {
return err
}
// For other errors, return immediately
return fmt.Errorf("backup schedule %s/%s: processing compact failed: %w", ns, bsName, err)
}
}

return nil
}

func (bm *backupScheduleManager) performLogBackupIfNeeded(bs *v1alpha1.BackupSchedule) error {
Expand Down

0 comments on commit 1903be3

Please sign in to comment.