Skip to content

Commit

Permalink
chore: change execute progressive rollout condition
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Yuichi Okimoto <[email protected]>
  • Loading branch information
cre8ivejp committed Feb 2, 2024
1 parent 675e56b commit 42c5cca
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 180 deletions.
4 changes: 4 additions & 0 deletions pkg/autoops/api/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ var (
codes.Internal,
"autoops: internal error occurs for a progressive rollout",
)
statusProgressiveRolloutAlreadyStopped = gstatus.New(
codes.Internal,
"autoops: progressive rollout is already stopped",
)
statusProgressiveRolloutClauseVariationIDRequired = gstatus.New(
codes.InvalidArgument,
"autoops: clause variation id must be specified for a progressive rollout",
Expand Down
59 changes: 56 additions & 3 deletions pkg/autoops/api/progressive_rollout.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/bucketeer-io/bucketeer/pkg/autoops/command"
"github.com/bucketeer-io/bucketeer/pkg/autoops/domain"
v2as "github.com/bucketeer-io/bucketeer/pkg/autoops/storage/v2"
ftstorage "github.com/bucketeer-io/bucketeer/pkg/feature/storage/v2"
"github.com/bucketeer-io/bucketeer/pkg/locale"
"github.com/bucketeer-io/bucketeer/pkg/log"
"github.com/bucketeer-io/bucketeer/pkg/storage/v2/mysql"
Expand Down Expand Up @@ -435,6 +436,23 @@ func (s *AutoOpsService) ExecuteProgressiveRollout(
if err != nil {
return err
}
ftStorage := ftstorage.NewFeatureStorage(tx)
feature, err := ftStorage.GetFeature(ctx, progressiveRollout.FeatureId, req.EnvironmentNamespace)
if err != nil {
return err
}
if err := s.checkStopStatus(progressiveRollout, localizer); err != nil {
s.logger.Error(
"Failed to execute progressive rollout",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
zap.String("environmentNamespace", req.EnvironmentNamespace),
zap.String("id", progressiveRollout.Id),
zap.String("featureId", progressiveRollout.FeatureId),
)...,
)
return err
}
triggered, err := s.checkAlreadyTriggered(
req.ChangeProgressiveRolloutTriggeredAtCommand,
progressiveRollout,
Expand All @@ -453,6 +471,12 @@ func (s *AutoOpsService) ExecuteProgressiveRollout(
)
return nil
}
// Enable the flag if it is disabled and it is the first rollout execution
if !feature.Enabled && progressiveRollout.Status == autoopsproto.ProgressiveRollout_WAITING {
if err := feature.Enable(); err != nil {
return err
}
}
handler := command.NewProgressiveRolloutCommandHandler(
editor,
progressiveRollout,
Expand All @@ -465,13 +489,28 @@ func (s *AutoOpsService) ExecuteProgressiveRollout(
if err := storage.UpdateProgressiveRollout(ctx, progressiveRollout, req.EnvironmentNamespace); err != nil {
return err
}
return ExecuteProgressiveRolloutOperation(
if err := ExecuteProgressiveRolloutOperation(
ctx,
progressiveRollout,
s.featureClient,
feature,
req.ChangeProgressiveRolloutTriggeredAtCommand.ScheduleId,
req.EnvironmentNamespace,
)
); err != nil {
return err
}
if err := ftStorage.UpdateFeature(ctx, feature, req.EnvironmentNamespace); err != nil {
s.logger.Error(
"Failed to update feature flag",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
zap.String("environmentNamespace", req.EnvironmentNamespace),
zap.String("id", progressiveRollout.Id),
zap.String("featureId", progressiveRollout.FeatureId),
)...,
)
return err
}
return nil
})
if err != nil {
s.logger.Error(
Expand Down Expand Up @@ -503,6 +542,20 @@ func (s *AutoOpsService) ExecuteProgressiveRollout(
return &autoopsproto.ExecuteProgressiveRolloutResponse{}, nil
}

func (s *AutoOpsService) checkStopStatus(p *domain.ProgressiveRollout, localizer locale.Localizer) error {
if p.IsStopped() {
dt, err := statusProgressiveRolloutAlreadyStopped.WithDetails(&errdetails.LocalizedMessage{
Locale: localizer.GetLocale(),
Message: localizer.MustLocalize(locale.InternalServerError),
})
if err != nil {
return statusProgressiveRolloutInternal.Err()
}
return dt.Err()
}
return nil
}

func (s *AutoOpsService) checkAlreadyTriggered(
cmd *autoopsproto.ChangeProgressiveRolloutScheduleTriggeredAtCommand,
p *domain.ProgressiveRollout,
Expand Down
148 changes: 38 additions & 110 deletions pkg/autoops/api/progressive_rollout_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/golang/protobuf/ptypes"

"github.com/bucketeer-io/bucketeer/pkg/autoops/domain"
featureclient "github.com/bucketeer-io/bucketeer/pkg/feature/client"
ftdomain "github.com/bucketeer-io/bucketeer/pkg/feature/domain"
autoopsproto "github.com/bucketeer-io/bucketeer/proto/autoops"
featureproto "github.com/bucketeer-io/bucketeer/proto/feature"
)
Expand All @@ -33,159 +33,87 @@ const totalVariationWeight = int32(100000)
func ExecuteProgressiveRolloutOperation(
ctx context.Context,
progressiveRollout *domain.ProgressiveRollout,
featureClient featureclient.Client,
feature *ftdomain.Feature,
scheduleID, environmentNamespace string,
) error {
var variationID string
var weight int32
switch progressiveRollout.Type {
case autoopsproto.ProgressiveRollout_MANUAL_SCHEDULE:
c := &autoopsproto.ProgressiveRolloutManualScheduleClause{}
if err := ptypes.UnmarshalAny(progressiveRollout.Clause, c); err != nil {
return err
}
s, err := getTargetSchedule(c.Schedules, scheduleID)
variationID = c.VariationId
var err error
weight, err = getTargetWeight(c.Schedules, scheduleID)
if err != nil {
return err
}
if err := updateRolloutStrategy(
ctx,
s,
featureClient,
c.VariationId,
progressiveRollout.FeatureId,
environmentNamespace,
); err != nil {
return err
}
case autoopsproto.ProgressiveRollout_TEMPLATE_SCHEDULE:
c := &autoopsproto.ProgressiveRolloutTemplateScheduleClause{}
if err := ptypes.UnmarshalAny(progressiveRollout.Clause, c); err != nil {
return err
}
s, err := getTargetSchedule(c.Schedules, scheduleID)
variationID = c.VariationId
var err error
weight, err = getTargetWeight(c.Schedules, scheduleID)
if err != nil {
return err
}
if err := updateRolloutStrategy(
ctx,
s,
featureClient,
c.VariationId,
progressiveRollout.FeatureId,
environmentNamespace,
); err != nil {
return err
}
default:
return domain.ErrProgressiveRolloutInvalidType
}
if err := updateRolloutStrategy(
ctx,
weight,
feature,
variationID,
progressiveRollout.FeatureId,
environmentNamespace,
); err != nil {
return err
}
return nil
}

func getTargetSchedule(
func getTargetWeight(
schedules []*autoopsproto.ProgressiveRolloutSchedule,
scheduleID string,
) (*autoopsproto.ProgressiveRolloutSchedule, error) {
) (int32, error) {
for _, s := range schedules {
if s.ScheduleId == scheduleID {
return s, nil
return s.Weight, nil
}
}
return nil, domain.ErrProgressiveRolloutScheduleNotFound
return 0, domain.ErrProgressiveRolloutScheduleNotFound
}

func updateRolloutStrategy(
ctx context.Context,
schedule *autoopsproto.ProgressiveRolloutSchedule,
featureClient featureclient.Client,
weight int32,
feature *ftdomain.Feature,
targetVariationID, featureID, environmentNamespace string,
) error {
f, err := fetchFeature(ctx, featureClient, featureID, environmentNamespace)
variations, err := getRolloutStrategyVariations(feature, weight, targetVariationID)
if err != nil {
return err
}
if err := updateFeatureTargeting(
ctx,
schedule,
featureClient,
f,
targetVariationID,
environmentNamespace,
); err != nil {
return err
}
return nil
}

func fetchFeature(
ctx context.Context,
featureClient featureclient.Client,
featureID, environmentNamespace string,
) (*featureproto.Feature, error) {
resp, err := featureClient.GetFeature(ctx, &featureproto.GetFeatureRequest{
EnvironmentNamespace: environmentNamespace,
Id: featureID,
})
if err != nil {
return nil, err
}
return resp.Feature, nil
}

func updateFeatureTargeting(
ctx context.Context,
schedule *autoopsproto.ProgressiveRolloutSchedule,
featureClient featureclient.Client,
feature *featureproto.Feature,
targetVariationID, environmentNamespace string,
) error {
c, err := getChangeDefaultStrategyCmd(feature, schedule, targetVariationID)
if err != nil {
return err
}
_, err = featureClient.UpdateFeatureTargeting(
ctx,
&featureproto.UpdateFeatureTargetingRequest{
EnvironmentNamespace: environmentNamespace,
Id: feature.Id,
Commands: []*featureproto.Command{c},
From: featureproto.UpdateFeatureTargetingRequest_OPS,
strategy := &featureproto.Strategy{
Type: featureproto.Strategy_ROLLOUT,
RolloutStrategy: &featureproto.RolloutStrategy{
Variations: variations,
},
)
if err != nil {
}
if err := feature.ChangeDefaultStrategy(strategy); err != nil {
return err
}
return nil
}

func getChangeDefaultStrategyCmd(
feature *featureproto.Feature,
schedule *autoopsproto.ProgressiveRolloutSchedule,
targetVariationID string,
) (*featureproto.Command, error) {
variations, err := getRolloutStrategyVariations(feature, schedule, targetVariationID)
if err != nil {
return nil, err
}
c := &featureproto.ChangeDefaultStrategyCommand{
Strategy: &featureproto.Strategy{
Type: featureproto.Strategy_ROLLOUT,
RolloutStrategy: &featureproto.RolloutStrategy{
Variations: variations,
},
},
}
ac, err := ptypes.MarshalAny(c)
if err != nil {
return nil, err
}
return &featureproto.Command{
Command: ac,
}, nil
}

func getRolloutStrategyVariations(
feature *featureproto.Feature,
schedule *autoopsproto.ProgressiveRolloutSchedule,
feature *ftdomain.Feature,
weight int32,
targetVariationID string,
) ([]*featureproto.RolloutStrategy_Variation, error) {
nonTargetVariationID, err := findNonTargetVariationID(feature, targetVariationID)
Expand All @@ -195,17 +123,17 @@ func getRolloutStrategyVariations(
return []*featureproto.RolloutStrategy_Variation{
{
Variation: targetVariationID,
Weight: schedule.Weight,
Weight: weight,
},
{
Variation: nonTargetVariationID,
Weight: totalVariationWeight - schedule.Weight,
Weight: totalVariationWeight - weight,
},
}, nil
}

func findNonTargetVariationID(
feature *featureproto.Feature,
feature *ftdomain.Feature,
variationID string,
) (string, error) {
for _, v := range feature.Variations {
Expand Down
Loading

0 comments on commit 42c5cca

Please sign in to comment.