Skip to content

Commit 4cd06a5

Browse files
committed
limit functions reaper to own orgs
1 parent df70695 commit 4cd06a5

File tree

8 files changed

+134
-15
lines changed

8 files changed

+134
-15
lines changed

server/internal/background/activities.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,6 @@ func (a *Activities) ValidateDeployment(ctx context.Context, projectID uuid.UUID
147147
return a.validateDeployment.Do(ctx, projectID, deploymentID)
148148
}
149149

150-
func (a *Activities) ReapFlyApps(ctx context.Context) (*activities.ReapFlyAppsResult, error) {
151-
return a.reapFlyApps.Do(ctx)
150+
func (a *Activities) ReapFlyApps(ctx context.Context, req activities.ReapFlyAppsRequest) (*activities.ReapFlyAppsResult, error) {
151+
return a.reapFlyApps.Do(ctx, req)
152152
}

server/internal/background/activities/reap_functions.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,24 @@ import (
44
"context"
55
"log/slog"
66

7+
"github.com/google/uuid"
78
"github.com/jackc/pgx/v5/pgtype"
89
"github.com/jackc/pgx/v5/pgxpool"
910
"go.opentelemetry.io/otel/metric"
11+
"go.temporal.io/sdk/temporal"
1012

1113
"github.com/speakeasy-api/gram/server/internal/attr"
1214
"github.com/speakeasy-api/gram/server/internal/functions"
1315
funcrepo "github.com/speakeasy-api/gram/server/internal/functions/repo"
1416
"github.com/speakeasy-api/gram/server/internal/oops"
1517
)
1618

19+
type ReapFlyAppsRequest struct {
20+
Scope FunctionsReaperScope
21+
22+
ProjectID uuid.NullUUID
23+
}
24+
1725
type ReapFlyAppsResult struct {
1826
Reaped int
1927
Errors int
@@ -43,9 +51,22 @@ func NewReapFlyApps(
4351
}
4452
}
4553

46-
func (r *ReapFlyApps) Do(ctx context.Context) (*ReapFlyAppsResult, error) {
54+
func (r *ReapFlyApps) Do(ctx context.Context, req ReapFlyAppsRequest) (*ReapFlyAppsResult, error) {
4755
logger := r.logger
4856

57+
switch {
58+
case req.Scope == FunctionsReaperScopeProject && req.ProjectID.UUID == uuid.Nil:
59+
return nil, temporal.NewApplicationErrorWithOptions("project ID must be set for project-scoped reaper", "reaper_error", temporal.ApplicationErrorOptions{
60+
NonRetryable: true,
61+
Cause: nil,
62+
})
63+
case req.Scope == FunctionsReaperScopeGlobal && req.ProjectID.UUID != uuid.Nil:
64+
return nil, temporal.NewApplicationErrorWithOptions("project ID must not be set for global reaper", "reaper_error", temporal.ApplicationErrorOptions{
65+
NonRetryable: true,
66+
Cause: nil,
67+
})
68+
}
69+
4970
repo := funcrepo.New(r.db)
5071

5172
// Get all apps that should be reaped (keeping only the most recent N per project)
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package activities
2+
3+
type FunctionsReaperScope string
4+
5+
const (
6+
FunctionsReaperScopeGlobal FunctionsReaperScope = "global"
7+
FunctionsReaperScopeProject FunctionsReaperScope = "project"
8+
)

server/internal/background/deployments.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -184,18 +184,23 @@ func ProcessDeploymentWorkflow(ctx workflow.Context, params ProcessDeploymentWor
184184
)
185185
}
186186

187-
// Trigger functions reaper workflow after successful deployment
187+
// Trigger project-scoped functions reaper workflow after successful deployment
188188
// This runs asynchronously and doesn't block the deployment workflow
189189
if finalStatus == "completed" {
190-
_ = workflow.ExecuteChildWorkflow(
191-
workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
192-
WorkflowID: "v1:functions-reaper-triggered",
193-
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
194-
}),
195-
FunctionsReaperWorkflow,
196-
FunctionsReaperWorkflowParams{},
197-
).GetChildWorkflowExecution()
198-
// We don't wait for the reaper to complete - it runs independently
190+
logger.Info(
191+
"starting project-scoped function reaper",
192+
string(attr.ProjectIDKey), params.ProjectID,
193+
string(attr.DeploymentIDKey), params.DeploymentID,
194+
)
195+
_, err = ExecuteProjectFunctionsReaperChildWorkflow(ctx, params.ProjectID)
196+
if err != nil {
197+
logger.Error(
198+
"failed to start project-scoped functions reaper workflow",
199+
"error", err.Error(),
200+
string(attr.ProjectIDKey), params.ProjectID,
201+
string(attr.DeploymentIDKey), params.DeploymentID,
202+
)
203+
}
199204
}
200205

201206
return &ProcessDeploymentWorkflowResult{

server/internal/background/functions_reaper.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,34 @@ import (
1010
"go.temporal.io/sdk/temporal"
1111
"go.temporal.io/sdk/workflow"
1212

13+
"github.com/google/uuid"
1314
"github.com/speakeasy-api/gram/server/internal/background/activities"
1415
)
1516

1617
type FunctionsReaperWorkflowParams struct {
17-
// No parameters needed - reaper runs globally across all projects
18+
Scope activities.FunctionsReaperScope
19+
20+
ProjectID uuid.NullUUID
1821
}
1922

2023
type FunctionsReaperWorkflowResult struct {
2124
AppsReaped int
2225
Errors int
2326
}
2427

28+
func ExecuteProjectFunctionsReaperChildWorkflow(ctx workflow.Context, projectID uuid.UUID) (workflow.ChildWorkflowFuture, error) {
29+
return workflow.ExecuteChildWorkflow(ctx, client.StartWorkflowOptions{
30+
ID: "v1:functions-reaper:" + projectID.String(),
31+
TaskQueue: string(TaskQueueMain),
32+
WorkflowIDConflictPolicy: enums.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING,
33+
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
34+
WorkflowRunTimeout: time.Minute * 5,
35+
}, FunctionsReaperWorkflow, FunctionsReaperWorkflowParams{
36+
Scope: activities.FunctionsReaperScopeProject,
37+
ProjectID: uuid.NullUUID{UUID: projectID, Valid: projectID != uuid.Nil},
38+
}), nil
39+
}
40+
2541
func ExecuteFunctionsReaperWorkflow(ctx context.Context, temporalClient client.Client, params FunctionsReaperWorkflowParams) (client.WorkflowRun, error) {
2642
// Use a fixed workflow ID so that only one reaper workflow can run at a time
2743
return temporalClient.ExecuteWorkflow(ctx, client.StartWorkflowOptions{
@@ -54,6 +70,10 @@ func FunctionsReaperWorkflow(ctx workflow.Context, params FunctionsReaperWorkflo
5470
err := workflow.ExecuteActivity(
5571
ctx,
5672
a.ReapFlyApps,
73+
activities.ReapFlyAppsRequest{
74+
Scope: params.Scope,
75+
ProjectID: params.ProjectID,
76+
},
5777
).Get(ctx, &result)
5878
if err != nil {
5979
return nil, fmt.Errorf("failed to reap functions: %w", err)

server/internal/functions/deploy_fly.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,13 +536,51 @@ func (f *FlyRunner) reap(ctx context.Context, logger *slog.Logger, appsRepo *rep
536536
return fmt.Errorf("get existing app name: %w", err)
537537
}
538538

539+
enabled, err := appsRepo.IsReapingEnabledForProject(ctx, repo.IsReapingEnabledForProjectParams{
540+
ProjectID: req.ProjectID,
541+
OrganizationIds: []string{"5ad61845-b72e-4f0d-9dde-e0bdcf98e374", "5a25158b-24dc-4d49-b03d-e85acfbea59c"},
542+
})
543+
if err != nil && !errors.Is(err, sql.ErrNoRows) {
544+
return fmt.Errorf("check reaping enabled for project: %w", err)
545+
}
546+
if !enabled {
547+
logger.InfoContext(ctx, "reaping is not enabled for project")
548+
return nil
549+
}
550+
539551
logger = logger.With(
540552
attr.SlogFlyAppName(app.AppName),
541553
attr.SlogFlyOrgSlug(app.FlyOrgSlug),
542554
)
543555

544556
logger.InfoContext(ctx, fmt.Sprintf("deleting existing app: %s", app.AppName))
545557

558+
if app.AppName == "" {
559+
logger.InfoContext(ctx, "app name is empty, skipping reap")
560+
return nil
561+
}
562+
563+
deleteRequest, err := http.NewRequestWithContext(
564+
ctx,
565+
http.MethodDelete,
566+
fmt.Sprintf("%s/v1/apps/%s", f.machinesAPIBase, app.AppName),
567+
nil,
568+
)
569+
if err != nil {
570+
return fmt.Errorf("create delete app request: %w", err)
571+
}
572+
573+
res, err := f.machinesClient.Do(deleteRequest)
574+
if err != nil {
575+
return fmt.Errorf("send delete app request: %w", err)
576+
}
577+
defer o11y.LogDefer(ctx, logger, func() error { return res.Body.Close() })
578+
579+
if res.StatusCode == http.StatusNotFound {
580+
logger.InfoContext(ctx, "app not found during delete, assuming already deleted")
581+
return nil
582+
}
583+
546584
reapErrorMessage := ""
547585
if err := f.client.DeleteApp(ctx, app.AppName); err != nil {
548586
// Log error but continue - we still want to mark it as reaped in the database

server/internal/functions/queries.sql

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,4 +157,11 @@ WHERE
157157
AND status = 'ready'
158158
AND reaped_at IS NULL
159159
ORDER BY created_at DESC
160-
LIMIT 1;
160+
LIMIT 1;
161+
162+
-- name: IsReapingEnabledForProject :one
163+
SELECT true AS enabled
164+
FROM projects
165+
WHERE
166+
id = @project_id
167+
AND organization_id = ANY(@organization_ids);

server/internal/functions/repo/queries.sql.go

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)