Skip to content

Commit

Permalink
feat: add tag deleter (#1469)
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Yuichi Okimoto <[email protected]>
  • Loading branch information
cre8ivejp authored Jan 22, 2025
1 parent 6b2ae5f commit 6558bd0
Show file tree
Hide file tree
Showing 15 changed files with 569 additions and 4 deletions.
3 changes: 3 additions & 0 deletions manifests/bucketeer/charts/batch/values.dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ cronjob:
- name: mau-partition-creator
jobId: MauPartitionCreator
schedule: "0 2 2 * *"
- name: tag-deleter
jobId: TagDeleter
schedule: "* * * * *"

subscribers:
# This is the processor's name. It must match the same name defined in the
Expand Down
3 changes: 3 additions & 0 deletions manifests/bucketeer/charts/batch/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,6 @@ cronjob:
- name: auto-ops-rules-cacher
jobId: AutoOpsRulesCacher
schedule: "* * * * *"
- name: tag-deleter
jobId: TagDeleter
schedule: "0 0 * * *"
2 changes: 1 addition & 1 deletion manifests/bucketeer/charts/web/values.yaml

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions manifests/bucketeer/values.dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ batch-server:
- name: auto-ops-rules-cacher
jobId: AutoOpsRulesCacher
schedule: "* * * * *"
- name: tag-deleter
jobId: TagDeleter
schedule: "* * * * *"

subscriber:
env:
Expand Down
6 changes: 5 additions & 1 deletion pkg/batch/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type batchService struct {
apiKeyCacher jobs.Job
experimentCacher jobs.Job
autoOpsRulesCacher jobs.Job
tagDeleter jobs.Job
logger *zap.Logger
}

Expand All @@ -59,7 +60,7 @@ func NewBatchService(
redisCounterDeleter, experimentCalculator,
mauSummarizer, mauPartitionDeleter, mauPartitionCreator,
featureFlagCacher, segmentUserCacher, apiKeyCacher,
experimentCacher, autoOpsRulesCacher jobs.Job,
experimentCacher, autoOpsRulesCacher, tagDeleter jobs.Job,
logger *zap.Logger,
) *batchService {
return &batchService{
Expand All @@ -80,6 +81,7 @@ func NewBatchService(
apiKeyCacher: apiKeyCacher,
experimentCacher: experimentCacher,
autoOpsRulesCacher: autoOpsRulesCacher,
tagDeleter: tagDeleter,
logger: logger.Named("batch-service"),
}
}
Expand Down Expand Up @@ -122,6 +124,8 @@ func (s *batchService) ExecuteBatchJob(
err = s.experimentCacher.Run(ctx)
case batch.BatchJob_AutoOpsRulesCacher:
err = s.autoOpsRulesCacher.Run(ctx)
case batch.BatchJob_TagDeleter:
err = s.tagDeleter.Run(ctx)
default:
s.logger.Error("Unknown job",
log.FieldsFromImcomingContext(ctx).AddFields(
Expand Down
2 changes: 2 additions & 0 deletions pkg/batch/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
autoopsdomain "github.com/bucketeer-io/bucketeer/pkg/autoops/domain"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs"
cacher "github.com/bucketeer-io/bucketeer/pkg/batch/jobs/cacher"
deleter "github.com/bucketeer-io/bucketeer/pkg/batch/jobs/deleter"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/experiment"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/mau"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/notification"
Expand Down Expand Up @@ -843,6 +844,7 @@ func newBatchService(t *testing.T,
autoOpsRulesMockClient,
redisMockClient,
),
deleter.NewTagDeleter(mysqlMockClient),
logger,
)
return service
Expand Down
6 changes: 6 additions & 0 deletions pkg/batch/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs"
cacher "github.com/bucketeer-io/bucketeer/pkg/batch/jobs/cacher"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/calculator"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/deleter"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/experiment"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/mau"
"github.com/bucketeer-io/bucketeer/pkg/batch/jobs/notification"
Expand Down Expand Up @@ -559,6 +560,11 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
cachev3.NewRedisCache(persistentRedisClient),
jobs.WithLogger(logger),
),
deleter.NewTagDeleter(
mysqlClient,
jobs.WithTimeout(5*time.Minute),
jobs.WithLogger(logger),
),
logger,
)

Expand Down
176 changes: 176 additions & 0 deletions pkg/batch/jobs/deleter/tag_deleter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Copyright 2025 The Bucketeer Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package deleter

import (
"context"
"errors"
"time"

"go.uber.org/zap"

"github.com/bucketeer-io/bucketeer/pkg/batch/jobs"
ftstorage "github.com/bucketeer-io/bucketeer/pkg/feature/storage/v2"
"github.com/bucketeer-io/bucketeer/pkg/storage/v2/mysql"
tagstorage "github.com/bucketeer-io/bucketeer/pkg/tag/storage"
ftproto "github.com/bucketeer-io/bucketeer/proto/feature"
tagproto "github.com/bucketeer-io/bucketeer/proto/tag"
)

var (
errInternal = errors.New("batch: internal error")
)

type tagDeleter struct {
tagStorage tagstorage.TagStorage
ftStorage ftstorage.FeatureStorage
opts *jobs.Options
logger *zap.Logger
}

func NewTagDeleter(
mysqlClient mysql.Client,
opts ...jobs.Option) jobs.Job {

dopts := &jobs.Options{
Timeout: 1 * time.Minute,
Logger: zap.NewNop(),
}
for _, opt := range opts {
opt(dopts)
}
return &tagDeleter{
tagStorage: tagstorage.NewTagStorage(mysqlClient),
ftStorage: ftstorage.NewFeatureStorage(mysqlClient),
opts: dopts,
logger: dopts.Logger.Named("tag-deleter"),
}
}

func (td *tagDeleter) Run(ctx context.Context) (lastErr error) {
ctx, cancel := context.WithTimeout(ctx, td.opts.Timeout)
defer cancel()

td.logger.Info("Starting to delete unused tags")
startTime := time.Now()
// List all the tags by environment
envTags, err := td.tagStorage.ListAllEnvironmentTags(ctx)
if err != nil {
td.logger.Error("Failed to list all environment tags", zap.Error(err))
return errInternal
}
// List all the features by environment
envFts, err := td.ftStorage.ListAllEnvironmentFeatures(ctx)
if err != nil {
td.logger.Error("Failed to list all environment features", zap.Error(err))
return errInternal
}

var deletedSize int
for _, envTag := range envTags {
// Delete all the tags when there are no flags
if len(envFts) == 0 {
for _, tag := range envTag.Tags {
// Delete unused tag
if err := td.deleteTag(ctx, tag); err != nil {
lastErr = err
continue
}
deletedSize++
}
} else {
// Check if the tags are in use in all the flags
for _, envFt := range envFts {
if envFt.EnvironmentId == envTag.EnvironmentId {
deletedCount, err := td.deleteUnusedTags(ctx, envTag.Tags, envFt.Features)
if err != nil {
lastErr = err
continue
}
deletedSize += deletedCount
}
}
}
}

if lastErr != nil {
td.logger.Error("Finished deleting unused tags with errors",
zap.Error(lastErr),
zap.Duration("elapsedTime", time.Since(startTime)),
zap.Int("deletedSize", deletedSize),
)
} else {
td.logger.Info("Finished deleting unused tags",
zap.Duration("elapsedTime", time.Since(startTime)),
zap.Int("deletedSize", deletedSize),
)
}
return
}

func (td *tagDeleter) deleteUnusedTags(
ctx context.Context,
tags []*tagproto.Tag,
fts []*ftproto.Feature,
) (int, error) {
var deletedCount int
for _, tag := range tags {
var inUse bool
for _, ft := range fts {
// Check if the tag is in use
if td.contains(tag.Name, ft.Tags) {
inUse = true
break
}
}
// Skip deleting if the tag is in use
if inUse {
continue
}
// Delete unused tag
if err := td.deleteTag(ctx, tag); err != nil {
return deletedCount, err
}
deletedCount++
}
return deletedCount, nil
}

func (td *tagDeleter) deleteTag(ctx context.Context, tag *tagproto.Tag) error {
if err := td.tagStorage.DeleteTag(ctx, tag.Id, tag.EnvironmentId); err != nil {
td.logger.Error("Failed to delete the tag",
zap.Error(err),
zap.String("tagId", tag.Id),
zap.String("tagName", tag.Name),
zap.String("environmentId", tag.EnvironmentId),
)
return errInternal
}
td.logger.Debug("Deleted tag successfully",
zap.String("tagId", tag.Id),
zap.String("tagName", tag.Name),
zap.String("environmentId", tag.EnvironmentId),
)
return nil
}

func (td *tagDeleter) contains(needle string, haystack []string) bool {
for i := range haystack {
if haystack[i] == needle {
return true
}
}
return false
}
Loading

0 comments on commit 6558bd0

Please sign in to comment.