Skip to content

Commit 2aa65a6

Browse files
committed
fix: reap unused fly apps from past deployments
This change implements a background worker that periodically checks for and reaps unused Fly.io applications associated with function deployments that are no longer active. For now, we will retain all apps created in the past 3 deployments of every Gram project.
1 parent 3d85b22 commit 2aa65a6

File tree

14 files changed

+548
-2
lines changed

14 files changed

+548
-2
lines changed

server/database/schema.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,7 @@ CREATE TABLE IF NOT EXISTS fly_apps (
428428
status TEXT NOT NULL, -- pending, ready, failed
429429

430430
reaped_at timestamptz, -- when we deleted an app and its resources on fly.io
431+
reap_error TEXT, -- error message if reaping failed
431432

432433
created_at timestamptz NOT NULL DEFAULT clock_timestamp(),
433434
updated_at timestamptz NOT NULL DEFAULT clock_timestamp(),
@@ -439,7 +440,8 @@ CREATE TABLE IF NOT EXISTS fly_apps (
439440
CONSTRAINT fly_apps_access_id_fkey FOREIGN KEY (access_id) REFERENCES functions_access (id) ON DELETE CASCADE
440441
);
441442

442-
CREATE UNIQUE INDEX IF NOT EXISTS fly_apps_project_deployment_function_key ON fly_apps (project_id, deployment_id, function_id, status);
443+
CREATE UNIQUE INDEX IF NOT EXISTS fly_apps_project_deployment_function_active_key ON fly_apps (project_id, deployment_id, function_id) WHERE reaped_at IS NULL;
444+
CREATE INDEX IF NOT EXISTS fly_apps_reaper_idx ON fly_apps (project_id, created_at DESC) WHERE status = 'ready' AND reaped_at IS NULL;
443445
CREATE UNIQUE INDEX IF NOT EXISTS fly_apps_seq_key ON fly_apps (seq DESC);
444446
CREATE INDEX IF NOT EXISTS fly_apps_access_id_idx ON fly_apps (access_id DESC);
445447

server/database/sqlc.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,8 @@ sql:
325325
- schema: schema.sql
326326
queries: ../internal/functions/queries.sql
327327
engine: postgresql
328+
database:
329+
managed: true
328330
gen:
329331
go:
330332
package: "repo"

server/internal/background/activities.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type Activities struct {
3434
processDeployment *activities.ProcessDeployment
3535
provisionFunctionsAccess *activities.ProvisionFunctionsAccess
3636
deployFunctionRunners *activities.DeployFunctionRunners
37+
reapFlyApps *activities.ReapFlyApps
3738
refreshBillingUsage *activities.RefreshBillingUsage
3839
refreshOpenRouterKey *activities.RefreshOpenRouterKey
3940
slackChatCompletion *activities.SlackChatCompletion
@@ -72,6 +73,7 @@ func NewActivities(
7273
processDeployment: activities.NewProcessDeployment(logger, tracerProvider, meterProvider, db, features, assetStorage, billingRepo),
7374
provisionFunctionsAccess: activities.NewProvisionFunctionsAccess(logger, db, encryption),
7475
deployFunctionRunners: activities.NewDeployFunctionRunners(logger, db, functionsDeployer, functionsVersion, encryption),
76+
reapFlyApps: activities.NewReapFlyApps(logger, meterProvider, db, functionsDeployer, 3),
7577
refreshBillingUsage: activities.NewRefreshBillingUsage(logger, db, billingRepo),
7678
refreshOpenRouterKey: activities.NewRefreshOpenRouterKey(logger, db, openrouter),
7779
slackChatCompletion: activities.NewSlackChatCompletionActivity(logger, slackClient, chatClient),
@@ -144,3 +146,7 @@ func (a *Activities) DeployFunctionRunners(ctx context.Context, req activities.D
144146
func (a *Activities) ValidateDeployment(ctx context.Context, projectID uuid.UUID, deploymentID uuid.UUID) error {
145147
return a.validateDeployment.Do(ctx, projectID, deploymentID)
146148
}
149+
150+
func (a *Activities) ReapFlyApps(ctx context.Context) (*activities.ReapFlyAppsResult, error) {
151+
return a.reapFlyApps.Do(ctx)
152+
}

server/internal/background/activities/metrics.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ const (
2626
meterFunctionsToolsSkipped = "functions.tools.skipped"
2727
meterFunctionsToolsCounter = "functions.tools.count"
2828
meterFunctionsProcessedDuration = "functions.processed.duration"
29+
30+
meterFlyAppReaperReapCount = "flyapp_reaper.reap.count"
2931
)
3032

3133
type metrics struct {
@@ -40,6 +42,8 @@ type metrics struct {
4042
functionsToolsSkipped metric.Int64Counter
4143
functionsToolsCounter metric.Int64Counter
4244
functionsProcessedDuration metric.Float64Histogram
45+
46+
flyAppReaperReapCount metric.Int64Counter
4347
}
4448

4549
func newMetrics(meter metric.Meter, logger *slog.Logger) *metrics {
@@ -120,6 +124,15 @@ func newMetrics(meter metric.Meter, logger *slog.Logger) *metrics {
120124
logger.ErrorContext(ctx, "failed to create metric", attr.SlogMetricName(meterFunctionsProcessedDuration), attr.SlogError(err))
121125
}
122126

127+
flyAppReaperReapCount, err := meter.Int64Counter(
128+
meterFlyAppReaperReapCount,
129+
metric.WithDescription("Number of fly apps reaped by the reaper workflow"),
130+
metric.WithUnit("{app}"),
131+
)
132+
if err != nil {
133+
logger.ErrorContext(ctx, "failed to create metric", attr.SlogMetricName(meterFlyAppReaperReapCount), attr.SlogError(err))
134+
}
135+
123136
return &metrics{
124137
opSkipped: opSkipped,
125138
openAPIUpgradeCounter: openAPIUpgradeCounter,
@@ -129,6 +142,7 @@ func newMetrics(meter metric.Meter, logger *slog.Logger) *metrics {
129142
functionsToolsSkipped: functionsToolsSkipped,
130143
functionsToolsCounter: functionsToolsCounter,
131144
functionsProcessedDuration: functionsProcessedDuration,
145+
flyAppReaperReapCount: flyAppReaperReapCount,
132146
}
133147
}
134148

@@ -214,3 +228,14 @@ func (m *metrics) RecordFunctionsProcessed(
214228
))
215229
}
216230
}
231+
232+
func (m *metrics) RecordFlyAppReaperReapCount(ctx context.Context, success int64, fail int64) {
233+
if counter := m.flyAppReaperReapCount; counter != nil {
234+
counter.Add(ctx, success, metric.WithAttributes(
235+
attr.Outcome(o11y.OutcomeSuccess),
236+
))
237+
counter.Add(ctx, fail, metric.WithAttributes(
238+
attr.Outcome(o11y.OutcomeFailure),
239+
))
240+
}
241+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package activities
2+
3+
import (
4+
"context"
5+
"log/slog"
6+
7+
"github.com/jackc/pgx/v5/pgtype"
8+
"github.com/jackc/pgx/v5/pgxpool"
9+
"go.opentelemetry.io/otel/metric"
10+
11+
"github.com/speakeasy-api/gram/server/internal/attr"
12+
"github.com/speakeasy-api/gram/server/internal/functions"
13+
funcrepo "github.com/speakeasy-api/gram/server/internal/functions/repo"
14+
"github.com/speakeasy-api/gram/server/internal/oops"
15+
)
16+
17+
type ReapFlyAppsResult struct {
18+
Reaped int
19+
Errors int
20+
}
21+
22+
type ReapFlyApps struct {
23+
logger *slog.Logger
24+
metrics *metrics
25+
db *pgxpool.Pool
26+
deployer functions.Deployer
27+
keepCount int64
28+
}
29+
30+
func NewReapFlyApps(
31+
logger *slog.Logger,
32+
meterProvider metric.MeterProvider,
33+
db *pgxpool.Pool,
34+
deployer functions.Deployer,
35+
keepCount int64,
36+
) *ReapFlyApps {
37+
return &ReapFlyApps{
38+
logger: logger.With(attr.SlogComponent("flyio-reaper")),
39+
metrics: newMetrics(newMeter(meterProvider), logger),
40+
db: db,
41+
deployer: deployer,
42+
keepCount: keepCount,
43+
}
44+
}
45+
46+
func (r *ReapFlyApps) Do(ctx context.Context) (*ReapFlyAppsResult, error) {
47+
logger := r.logger
48+
49+
repo := funcrepo.New(r.db)
50+
51+
// Get all apps that should be reaped (keeping only the most recent N per project)
52+
appsToReap, err := repo.GetFlyAppsToReap(ctx, pgtype.Int8{Int64: r.keepCount, Valid: true})
53+
if err != nil {
54+
return nil, oops.E(oops.CodeUnexpected, err, "failed to query apps to reap").Log(ctx, logger)
55+
}
56+
57+
if len(appsToReap) == 0 {
58+
logger.InfoContext(ctx, "no apps to reap")
59+
return &ReapFlyAppsResult{
60+
Reaped: 0,
61+
Errors: 0,
62+
}, nil
63+
}
64+
65+
result := &ReapFlyAppsResult{
66+
Reaped: 0,
67+
Errors: 0,
68+
}
69+
70+
for _, app := range appsToReap {
71+
appLogger := logger.With(
72+
attr.SlogFlyAppInternalID(app.ID.String()),
73+
attr.SlogFlyAppName(app.AppName),
74+
attr.SlogFlyOrgSlug(app.FlyOrgSlug),
75+
attr.SlogProjectID(app.ProjectID.String()),
76+
attr.SlogDeploymentID(app.DeploymentID.String()),
77+
attr.SlogDeploymentFunctionsID(app.FunctionID.String()),
78+
)
79+
80+
appLogger.InfoContext(ctx, "reaping fly app")
81+
82+
if err := r.deployer.Reap(ctx, functions.ReapRequest{
83+
ProjectID: app.ProjectID,
84+
DeploymentID: app.DeploymentID,
85+
FunctionID: app.FunctionID,
86+
}); err != nil {
87+
appLogger.ErrorContext(ctx, "failed to reap app", attr.SlogError(err))
88+
result.Errors++
89+
continue
90+
}
91+
92+
result.Reaped++
93+
appLogger.InfoContext(ctx, "successfully reaped fly app")
94+
}
95+
96+
r.metrics.RecordFlyAppReaperReapCount(ctx, int64(result.Reaped), int64(result.Errors))
97+
98+
return result, nil
99+
}

server/internal/background/deployments.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,20 @@ func ProcessDeploymentWorkflow(ctx workflow.Context, params ProcessDeploymentWor
184184
)
185185
}
186186

187+
// Trigger functions reaper workflow after successful deployment
188+
// This runs asynchronously and doesn't block the deployment workflow
189+
if finalStatus == "completed" {
190+
_ = workflow.ExecuteChildWorkflow(
191+
workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
192+
WorkflowID: "v1:functions-reaper-triggered",
193+
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
194+
}),
195+
FunctionsReaperWorkflow,
196+
FunctionsReaperWorkflowParams{},
197+
).GetChildWorkflowExecution()
198+
// We don't wait for the reaper to complete - it runs independently
199+
}
200+
187201
return &ProcessDeploymentWorkflowResult{
188202
ProjectID: params.ProjectID,
189203
DeploymentID: params.DeploymentID,
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package background
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"go.temporal.io/api/enums/v1"
9+
"go.temporal.io/sdk/client"
10+
"go.temporal.io/sdk/temporal"
11+
"go.temporal.io/sdk/workflow"
12+
13+
"github.com/speakeasy-api/gram/server/internal/background/activities"
14+
)
15+
16+
type FunctionsReaperWorkflowParams struct {
17+
// No parameters needed - reaper runs globally across all projects
18+
}
19+
20+
type FunctionsReaperWorkflowResult struct {
21+
AppsReaped int
22+
Errors int
23+
}
24+
25+
func ExecuteFunctionsReaperWorkflow(ctx context.Context, temporalClient client.Client, params FunctionsReaperWorkflowParams) (client.WorkflowRun, error) {
26+
// Use a fixed workflow ID so that only one reaper workflow can run at a time
27+
return temporalClient.ExecuteWorkflow(ctx, client.StartWorkflowOptions{
28+
ID: "v1:functions-reaper",
29+
TaskQueue: string(TaskQueueMain),
30+
WorkflowIDConflictPolicy: enums.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING,
31+
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
32+
WorkflowRunTimeout: time.Minute * 10,
33+
}, FunctionsReaperWorkflow, params)
34+
}
35+
36+
func FunctionsReaperWorkflow(ctx workflow.Context, params FunctionsReaperWorkflowParams) (*FunctionsReaperWorkflowResult, error) {
37+
// This can stay nil/unassigned. Temporal just uses this to get activity names.
38+
// The actual activities are registered in the CLI layer (cmd/gram/worker.go).
39+
var a *Activities
40+
41+
logger := workflow.GetLogger(ctx)
42+
43+
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
44+
StartToCloseTimeout: 5 * time.Minute,
45+
RetryPolicy: &temporal.RetryPolicy{
46+
InitialInterval: time.Second,
47+
MaximumInterval: time.Minute,
48+
BackoffCoefficient: 2,
49+
MaximumAttempts: 3,
50+
},
51+
})
52+
53+
var result activities.ReapFlyAppsResult
54+
err := workflow.ExecuteActivity(
55+
ctx,
56+
a.ReapFlyApps,
57+
).Get(ctx, &result)
58+
if err != nil {
59+
return nil, fmt.Errorf("failed to reap functions: %w", err)
60+
}
61+
62+
logger.Info("functions reaper completed",
63+
"apps_reaped", result.Reaped,
64+
"errors", result.Errors,
65+
)
66+
67+
return &FunctionsReaperWorkflowResult{
68+
AppsReaped: result.Reaped,
69+
Errors: result.Errors,
70+
}, nil
71+
}

server/internal/background/worker.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ func NewTemporalWorker(
150150
temporalWorker.RegisterActivity(activities.TransitionDeployment)
151151
temporalWorker.RegisterActivity(activities.ProvisionFunctionsAccess)
152152
temporalWorker.RegisterActivity(activities.DeployFunctionRunners)
153+
temporalWorker.RegisterActivity(activities.ReapFlyApps)
153154
temporalWorker.RegisterActivity(activities.GetSlackProjectContext)
154155
temporalWorker.RegisterActivity(activities.PostSlackMessage)
155156
temporalWorker.RegisterActivity(activities.SlackChatCompletion)
@@ -164,6 +165,7 @@ func NewTemporalWorker(
164165
temporalWorker.RegisterActivity(activities.ValidateDeployment)
165166

166167
temporalWorker.RegisterWorkflow(ProcessDeploymentWorkflow)
168+
temporalWorker.RegisterWorkflow(FunctionsReaperWorkflow)
167169
temporalWorker.RegisterWorkflow(SlackEventWorkflow)
168170
temporalWorker.RegisterWorkflow(OpenrouterKeyRefreshWorkflow)
169171
temporalWorker.RegisterWorkflow(CustomDomainRegistrationWorkflow)

server/internal/database/models.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/internal/functions/deploy.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type Orchestrator interface {
1919

2020
type Deployer interface {
2121
Deploy(context.Context, RunnerDeployRequest) (*RunnerDeployResult, error)
22+
Reap(context.Context, ReapRequest) error
2223
}
2324

2425
type ToolCaller interface {
@@ -102,3 +103,9 @@ type RunnerResourceReadRequest struct {
102103
ResourceURI string
103104
ResourceName string
104105
}
106+
107+
type ReapRequest struct {
108+
ProjectID uuid.UUID
109+
DeploymentID uuid.UUID
110+
FunctionID uuid.UUID
111+
}

0 commit comments

Comments
 (0)