Skip to content

Commit 95fae2d

Browse files
feat: recreate DB on schema change (#4697)
1 parent 615b7a5 commit 95fae2d

File tree

4 files changed

+193
-5
lines changed

4 files changed

+193
-5
lines changed

backend/provisioner/dev_provisioner.go

+82-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package provisioner
22

33
import (
44
"context"
5+
"database/sql"
6+
"errors"
57
"fmt"
68
"time"
79

@@ -38,6 +40,15 @@ func provisionMysql(mysqlPort int, recreate bool) InMemResourceProvisionerFn {
3840
dbName := strcase.ToLowerSnake(deployment.Payload.Module) + "_" + strcase.ToLowerSnake(res.ResourceID())
3941

4042
logger.Infof("Provisioning mysql database: %s", dbName) //nolint
43+
db, ok := res.(*schema.Database)
44+
if !ok {
45+
return nil, fmt.Errorf("expected database, got %T", res)
46+
}
47+
migrationHash := ""
48+
for migration := range slices.FilterVariants[*schema.MetadataSQLMigration](db.Metadata) {
49+
migrationHash = migration.Digest
50+
break
51+
}
4152

4253
// We assume that the DB hsas already been started when running in dev mode
4354
mysqlDSN, err := dev.SetupMySQL(ctx, mysqlPort)
@@ -52,7 +63,7 @@ func provisionMysql(mysqlPort int, recreate bool) InMemResourceProvisionerFn {
5263
case <-timeout:
5364
return nil, fmt.Errorf("failed to query database: %w", err)
5465
case <-retry.C:
55-
event, err := establishMySQLDB(ctx, mysqlDSN, dbName, mysqlPort, recreate)
66+
event, err := establishMySQLDB(ctx, mysqlDSN, dbName, mysqlPort, recreate, migrationHash)
5667
if err != nil {
5768
logger.Debugf("failed to establish mysql database: %s", err.Error())
5869
continue
@@ -68,8 +79,9 @@ func provisionMysql(mysqlPort int, recreate bool) InMemResourceProvisionerFn {
6879
}
6980
}
7081

71-
func establishMySQLDB(ctx context.Context, mysqlDSN string, dbName string, mysqlPort int, recreate bool) (*schema.DatabaseRuntimeConnections, error) {
82+
func establishMySQLDB(ctx context.Context, mysqlDSN string, dbName string, mysqlPort int, recreate bool, migrationHash string) (*schema.DatabaseRuntimeConnections, error) {
7283
conn, err := otelsql.Open("mysql", mysqlDSN)
84+
logger := log.FromContext(ctx)
7385
if err != nil {
7486
return nil, fmt.Errorf("failed to connect to mysql: %w", err)
7587
}
@@ -82,6 +94,31 @@ func establishMySQLDB(ctx context.Context, mysqlDSN string, dbName string, mysql
8294
defer res.Close()
8395

8496
exists := res.Next()
97+
98+
if migrationHash != "" {
99+
_, err := conn.Exec("CREATE TABLE IF NOT EXISTS migrations (db VARCHAR(255) PRIMARY KEY NOT NULL, migration VARCHAR(255) NOT NULL)")
100+
if err != nil {
101+
return nil, fmt.Errorf("failed to create migrations tracking table: %w", err)
102+
}
103+
if exists && !recreate {
104+
// We might still need to recreate the database if the schema has changed
105+
existing := ""
106+
err := conn.QueryRow("SELECT migration FROM migrations WHERE db=?", dbName).Scan(&existing)
107+
if err != nil {
108+
if !errors.Is(err, sql.ErrNoRows) {
109+
return nil, fmt.Errorf("failed to query migrations table: %w", err)
110+
}
111+
logger.Debugf("No existing migration found")
112+
} else {
113+
logger.Debugf("existing migration: %s , current migration %s", existing, migrationHash)
114+
if existing != migrationHash {
115+
logger.Infof("Recreating database %q due to schema change", dbName) //nolint
116+
recreate = true
117+
}
118+
}
119+
}
120+
}
121+
85122
if exists && recreate {
86123
_, err = conn.ExecContext(ctx, "DROP DATABASE "+dbName)
87124
if err != nil {
@@ -97,6 +134,13 @@ func establishMySQLDB(ctx context.Context, mysqlDSN string, dbName string, mysql
97134

98135
dsn := dsn.MySQLDSN(dbName, dsn.Port(mysqlPort))
99136

137+
if migrationHash != "" {
138+
_, err := conn.Exec("INSERT INTO migrations (db, migration) VALUES (?, ?) ON DUPLICATE KEY UPDATE migration = ?", dbName, migrationHash, migrationHash)
139+
if err != nil {
140+
return nil, fmt.Errorf("failed to insert migration hash: %w", err)
141+
}
142+
}
143+
100144
return &schema.DatabaseRuntimeConnections{
101145
Write: &schema.DSNDatabaseConnector{DSN: dsn, Database: dbName},
102146
Read: &schema.DSNDatabaseConnector{DSN: dsn, Database: dbName},
@@ -123,9 +167,14 @@ func ProvisionMySQLForTest(ctx context.Context, moduleName string, id string) (s
123167

124168
}
125169

126-
func provisionPostgres(postgresPort int, recreate bool) InMemResourceProvisionerFn {
170+
func provisionPostgres(postgresPort int, alwaysRecreate bool) InMemResourceProvisionerFn {
127171
return func(ctx context.Context, changeset key.Changeset, deployment key.Deployment, resource schema.Provisioned) (*schema.RuntimeElement, error) {
172+
recreate := alwaysRecreate
128173
logger := log.FromContext(ctx).Deployment(deployment)
174+
db, ok := resource.(*schema.Database)
175+
if !ok {
176+
return nil, fmt.Errorf("expected database, got %T", resource)
177+
}
129178

130179
dbName := strcase.ToLowerSnake(deployment.Payload.Module) + "_" + strcase.ToLowerSnake(resource.ResourceID())
131180
logger.Infof("Provisioning postgres database: %s", dbName) //nolint
@@ -150,6 +199,30 @@ func provisionPostgres(postgresPort int, recreate bool) InMemResourceProvisioner
150199
defer res.Close()
151200

152201
exists := res.Next()
202+
migrationHash := ""
203+
for migration := range slices.FilterVariants[*schema.MetadataSQLMigration](db.Metadata) {
204+
_, err := conn.Exec("CREATE TABLE IF NOT EXISTS migrations (db VARCHAR PRIMARY KEY NOT NULL, migration VARCHAR NOT NULL)")
205+
if err != nil {
206+
return nil, fmt.Errorf("failed to create migrations tracking table: %w", err)
207+
}
208+
migrationHash = migration.Digest
209+
if exists && !recreate {
210+
// We might still need to recreate the database if the schema has changed
211+
existing := ""
212+
err := conn.QueryRow("SELECT migration FROM migrations WHERE db=$1", dbName).Scan(&existing)
213+
if err != nil {
214+
if !errors.Is(err, sql.ErrNoRows) {
215+
return nil, fmt.Errorf("failed to query migrations table: %w", err)
216+
}
217+
} else {
218+
if existing != migrationHash {
219+
logger.Infof("Recreating database %q due to schema change", dbName) //nolint
220+
recreate = true
221+
}
222+
}
223+
}
224+
}
225+
153226
if exists && recreate {
154227
// Terminate any dangling connections.
155228
_, err = conn.ExecContext(ctx, `
@@ -172,6 +245,12 @@ func provisionPostgres(postgresPort int, recreate bool) InMemResourceProvisioner
172245
}
173246
}
174247

248+
if migrationHash != "" {
249+
_, err := conn.Exec("INSERT INTO migrations (db, migration) VALUES ($1, $2)ON CONFLICT (db) DO UPDATE SET migration = EXCLUDED.migration;", dbName, migrationHash)
250+
if err != nil {
251+
return nil, fmt.Errorf("failed to insert migration hash: %w", err)
252+
}
253+
}
175254
dsn := dsn.PostgresDSN(dbName, dsn.Port(postgresPort))
176255

177256
return &schema.RuntimeElement{

common/schema/database.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,9 @@ func (d *Database) GetProvisioned() ResourceSet {
6060
migration, ok := slices.FindVariant[*MetadataSQLMigration](d.Metadata)
6161
if ok {
6262
result = append(result, &ProvisionedResource{
63-
Kind: ResourceTypeSQLMigration,
64-
Config: &Database{Type: d.Type, Metadata: []Metadata{migration}},
63+
Kind: ResourceTypeSQLMigration,
64+
Config: &Database{Type: d.Type, Metadata: []Metadata{migration}},
65+
DeploymentSpecific: true,
6566
})
6667
}
6768

go-runtime/hot_reload_test.go

+96
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,102 @@ import (
1414
in "github.com/block/ftl/internal/integration"
1515
)
1616

17+
func TestHotReloadDatabaseGo(t *testing.T) {
18+
in.Run(t,
19+
in.WithLanguages("go"),
20+
in.WithDevMode(),
21+
in.GitInit(),
22+
in.Exec("rm", "ftl-project.toml"),
23+
in.Exec("ftl", "init", "test", "."),
24+
// Create module with database
25+
in.Exec("ftl", "module", "new", "go", "users"),
26+
in.WaitWithTimeout("users", time.Minute),
27+
// Initialize MySQL database
28+
in.Exec("ftl", "mysql", "new", "users.userdb"),
29+
in.Sleep(time.Second*5),
30+
// Edit initial migration
31+
in.EditFiles("users", func(file string, content []byte) (bool, []byte) {
32+
in.Infof("users.userdb: %s", file)
33+
if !strings.Contains(file, "init") {
34+
return false, nil
35+
}
36+
return true, []byte(`-- migrate:up
37+
CREATE TABLE users (
38+
id INT AUTO_INCREMENT PRIMARY KEY,
39+
name VARCHAR(255) NOT NULL
40+
);
41+
42+
-- migrate:down
43+
DROP TABLE users;`)
44+
}, "db/mysql/userdb/schema/"),
45+
in.Sleep(time.Second*5),
46+
47+
// Add query file
48+
in.CreateFile("users", `-- name: CreateUser :exec
49+
INSERT INTO users (name) VALUES (?);
50+
`, "db/mysql/userdb/queries/queries.sql"),
51+
in.Sleep(time.Second*5),
52+
// Test initial schema works
53+
in.Call("users", "createUser", map[string]string{"name": "Alice"}, func(t testing.TB, response map[string]interface{}) {
54+
55+
}),
56+
in.Call("users", "createUser", map[string]string{"name": "Bob"}, func(t testing.TB, response map[string]interface{}) {
57+
58+
}),
59+
// Edit the query file, verify we keep our state
60+
in.EditFile("users", func(content []byte) []byte {
61+
return []byte(`-- name: CreateUser :exec
62+
INSERT INTO users (name) VALUES (?);
63+
64+
-- name: GetUsers :many
65+
SELECT id, name FROM users ORDER BY id;`)
66+
}, "db/mysql/userdb/queries/queries.sql"),
67+
in.Sleep(time.Second*5),
68+
in.Call("users", "getUsers", struct{}{}, func(t testing.TB, response []map[string]interface{}) {
69+
assert.Equal(t, 2, len(response))
70+
assert.Equal(t, "Alice", response[0]["name"])
71+
assert.Equal(t, "Bob", response[1]["name"])
72+
}),
73+
in.EditFiles("users", func(file string, content []byte) (bool, []byte) {
74+
if !strings.Contains(file, "init") {
75+
return false, nil
76+
}
77+
return true, []byte(`-- migrate:up
78+
CREATE TABLE users (
79+
id INT AUTO_INCREMENT PRIMARY KEY,
80+
name VARCHAR(255) NOT NULL,
81+
email VARCHAR(255) NOT NULL
82+
);
83+
84+
-- migrate:down
85+
DROP TABLE users;`)
86+
}, "db/mysql/userdb/schema/"),
87+
88+
// Update queries
89+
in.EditFile("users", func(content []byte) []byte {
90+
return []byte(`-- name: CreateUserNew :exec
91+
INSERT INTO users (name, email) VALUES (?, ?);
92+
93+
-- name: GetUsers :many
94+
SELECT id, name, email FROM users ORDER BY id;`)
95+
}, "db/mysql/userdb/queries/queries.sql"),
96+
in.Sleep(time.Second*5),
97+
98+
// Test updated schema
99+
// We change the verb name so we can't accidentally call the old verb before the new one is ready
100+
in.Call("users", "createUserNew", map[string]interface{}{
101+
"name": "Charlie",
102+
"email": "[email protected]",
103+
}, func(t testing.TB, response map[string]interface{}) {
104+
105+
}),
106+
in.Call("users", "getUsers", struct{}{}, func(t testing.TB, response []map[string]interface{}) {
107+
assert.Equal(t, "Charlie", response[0]["name"])
108+
assert.Equal(t, "[email protected]", response[0]["email"])
109+
}),
110+
)
111+
}
112+
17113
func TestHotReloadMultiModuleGo(t *testing.T) {
18114
var serviceDeployment, clientDeployment string
19115
in.Run(t,

internal/integration/actions.go

+12
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,18 @@ func WriteFile(path string, content []byte) Action {
402402
}
403403
}
404404

405+
// CreateFile creates a file in a module
406+
func CreateFile(module string, contents string, path ...string) Action {
407+
return func(t testing.TB, ic TestContext) {
408+
parts := []string{ic.workDir, module}
409+
parts = append(parts, path...)
410+
file := filepath.Join(parts...)
411+
Infof("Creating %s", file)
412+
err := os.WriteFile(file, []byte(contents), os.FileMode(0644)) //nolint:gosec
413+
assert.NoError(t, err)
414+
}
415+
}
416+
405417
// EditFile edits a file in a module
406418
func EditFile(module string, editFunc func([]byte) []byte, path ...string) Action {
407419
return func(t testing.TB, ic TestContext) {

0 commit comments

Comments
 (0)