-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathpgmigrate.go
325 lines (312 loc) · 11.5 KB
/
pgmigrate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
package pgmigrate
import (
"context"
"database/sql"
"fmt"
"io/fs"
"strings"
)
// Load walks a filesystem from its root and extracts all files ending in `.sql`
// as Migrations, with the filename (without extension) being the ID and the
// file's contents being the SQL.
//
// From disk:
//
// // the migration files will be read at run time
// fs := os.DirFS("./path/to/migrations/directory/*.sql")
//
// From an embedded fs:
//
// // the migration files will be embedded at compile time
// //go:embed path/to/migrations/directory/*.sql
// var fs embed.FS
//
// Load returns the migrations in sorted order.
func Load(filesystem fs.FS) ([]Migration, error) {
var migrations []Migration
if err := fs.WalkDir(filesystem, ".", func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if !strings.HasSuffix(path, ".sql") {
return nil
}
migration := Migration{
ID: IDFromFilename(d.Name()),
}
data, err := fs.ReadFile(filesystem, path)
if err != nil {
return err
}
migration.SQL = string(data)
migrations = append(migrations, migration)
return nil
}); err != nil {
return nil, fmt.Errorf("load: %w", err)
}
SortByID(migrations)
return migrations, nil
}
// Migrate will apply any previously applied migrations. It stores metadata in the
// [DefaultTableName] table, with the following schema:
// - id: text not null
// - checksum: text not null
// - execution_time_in_millis: integer not null
// - applied_at: timestamp with time zone not null
//
// It does the following things:
//
// First, acquire an advisory lock to prevent conflicts with other instances
// that may be running in parallel. This way only one migrator will attempt to
// run the migrations at any point in time.
//
// Then, calculate a plan of migrations to apply. The plan will be a list of
// migrations that have not yet been marked as applied in the migrations table.
// The migrations in the plan will be ordered by their IDs, in ascending
// lexicographical order.
//
// For each migration in the plan,
//
// - Begin a transaction
// - Run the migration
// - Create a record in the migrations table saying that the migration was applied
// - Commit the transaction
//
// If a migration fails at any point, the transaction will roll back. A failed
// migration results in NO record for that migration in the migrations table,
// which means that future attempts to run the migrations will include it in
// their plan.
//
// Migrate() will immediately return the error related to a failed migration,
// and will NOT attempt to run any further migrations. Any migrations applied
// before the failure will remain applied. Any migrations not yet applied will
// not be attempted.
//
// If all the migrations in the plan are applied successfully, then call Verify()
// to double-check that all known migrations have been marked as applied in the
// migrations table.
//
// Finally, the advisory lock is released.
func Migrate(ctx context.Context, db *sql.DB, dir fs.FS, logger Logger) ([]VerificationError, error) {
migrations, err := Load(dir)
if err != nil {
return nil, err
}
migrator := NewMigrator(migrations)
migrator.Logger = logger
return migrator.Migrate(ctx, db)
}
// Verify returns a list of [VerificationError]s with warnings for any migrations that:
//
// - Are marked as applied in the database table but do not exist in the
// migrations directory.
// - Have a different checksum in the database than the current file hash.
//
// These warnings usually signify that the schema described by the migrations no longer
// matches the schema in the database. Usually the cause is removing/editing a migration
// without realizing that it was already applied to a database.
//
// The most common cause of a warning is in the case that a new
// release/deployment contains migrations, the migrations are applied
// successfully, but the release is then rolled back due to other issues. In
// this case the warning is just that, a warning, and should not be a long-term
// problem.
//
// These warnings should not prevent your application from starting, but are
// worth showing to a human devops/db-admin/sre-type person for them to
// investigate.
func Verify(ctx context.Context, db *sql.DB, dir fs.FS, logger Logger) ([]VerificationError, error) {
migrations, err := Load(dir)
if err != nil {
return nil, err
}
migrator := NewMigrator(migrations)
migrator.Logger = logger
return migrator.Verify(ctx, db)
}
// Plan shows which migrations, if any, would be applied, in the order that they
// would be applied in.
//
// The plan will be a list of [Migration]s that are present in the migrations
// directory that have not yet been marked as applied in the migrations table.
//
// The migrations in the plan will be ordered by their IDs, in ascending
// lexicographical order. This is the same order that you see if you use "ls".
// This is also the same order that they will be applied in.
//
// The ID of a migration is its filename without the ".sql" suffix.
//
// A migration will only ever be applied once. Editing the contents of the
// migration file will NOT result in it being re-applied. Instead, you will see a
// verification error warning that the contents of the migration differ from its
// contents when it was previously applied.
//
// Migrations can be applied "out of order". For instance, if there were three
// migrations that had been applied:
//
// - 001_initial
// - 002_create_users
// - 003_create_viewers
//
// And a new migration "002_create_companies" is merged:
//
// - 001_initial
// - 002_create_companies
// - 002_create_users
// - 003_create_viewers
//
// Running "pgmigrate plan" will show:
//
// - 002_create_companies
//
// Because the other migrations have already been applied. This is by design; most
// of the time, when you're working with your coworkers, you will not write
// migrations that conflict with each other. As long as you use a migration
// name/number higher than that of any dependencies, you will not have any
// problems.
func Plan(ctx context.Context, db *sql.DB, dir fs.FS, logger Logger) ([]Migration, error) {
migrations, err := Load(dir)
if err != nil {
return nil, err
}
migrator := NewMigrator(migrations)
migrator.Logger = logger
return migrator.Plan(ctx, db)
}
// Applied returns a list of [AppliedMigration]s in the order that they were
// applied in (applied_at ASC, id ASC).
//
// If there are no applied migrations, or the specified table does not exist,
// this will return an empty list without an error.
func Applied(ctx context.Context, db *sql.DB, dir fs.FS, logger Logger) ([]AppliedMigration, error) {
migrations, err := Load(dir)
if err != nil {
return nil, err
}
migrator := NewMigrator(migrations)
migrator.Logger = logger
return migrator.Applied(ctx, db)
}
// MarkApplied (⚠️ danger) is a manual operation that marks specific migrations
// as applied without running them.
//
// You should NOT use this as part of normal operations, it exists to help
// devops/db-admin/sres interact with migration state.
//
// It returns a list of the [AppliedMigration]s that have been marked as
// applied.
func MarkApplied(ctx context.Context, db *sql.DB, dir fs.FS, logger Logger, ids ...string) ([]AppliedMigration, error) {
migrations, err := Load(dir)
if err != nil {
return nil, err
}
migrator := NewMigrator(migrations)
migrator.Logger = logger
return migrator.MarkApplied(ctx, db, ids...)
}
// MarkAllApplied (⚠️ danger) is a manual operation that marks all known migrations as
// applied without running them.
//
// You should NOT use this as part of normal operations, it exists to help
// devops/db-admin/sres interact with migration state.
//
// It returns a list of the [AppliedMigration]s that have been marked as
// applied.
func MarkAllApplied(ctx context.Context, db *sql.DB, dir fs.FS, logger Logger) ([]AppliedMigration, error) {
migrations, err := Load(dir)
if err != nil {
return nil, err
}
migrator := NewMigrator(migrations)
migrator.Logger = logger
return migrator.MarkAllApplied(ctx, db)
}
// MarkUnapplied (⚠️ danger) is a manual operation that marks specific migrations as
// unapplied (not having been run) by removing their records from the migrations
// table.
//
// You should NOT use this as part of normal operations, it exists to help
// devops/db-admin/sres interact with migration state.
//
// It returns a list of the [AppliedMigration]s that have been marked as
// unapplied.
func MarkUnapplied(ctx context.Context, db *sql.DB, dir fs.FS, logger Logger, ids ...string) ([]AppliedMigration, error) {
migrations, err := Load(dir)
if err != nil {
return nil, err
}
migrator := NewMigrator(migrations)
migrator.Logger = logger
return migrator.MarkUnapplied(ctx, db, ids...)
}
// MarkAllUnapplied (⚠️ danger) is a manual operation that marks all known migrations as
// unapplied (not having been run) by removing their records from the migrations
// table.
//
// You should NOT use this as part of normal operations, it exists to help
// devops/db-admin/sres interact with migration state.
//
// It returns a list of the [AppliedMigration]s that have been marked as
// unapplied.
func MarkAllUnapplied(ctx context.Context, db *sql.DB, dir fs.FS, logger Logger) ([]AppliedMigration, error) {
migrations, err := Load(dir)
if err != nil {
return nil, err
}
migrator := NewMigrator(migrations)
migrator.Logger = logger
return migrator.MarkAllUnapplied(ctx, db)
}
// SetChecksums (⚠️ danger) is a manual operation that explicitly sets the recorded
// checksum of applied migrations in the migrations table.
//
// You should NOT use this as part of normal operations, it exists to help
// devops/db-admin/sres interact with migration state.
//
// It returns a list of the [AppliedMigration]s whose checksums have been
// updated.
func SetChecksums(ctx context.Context, db *sql.DB, dir fs.FS, logger Logger, updates ...ChecksumUpdate) ([]AppliedMigration, error) {
migrations, err := Load(dir)
if err != nil {
return nil, err
}
migrator := NewMigrator(migrations)
migrator.Logger = logger
return migrator.SetChecksums(ctx, db, updates...)
}
// RecalculateChecksums (⚠️ danger) is a manual operation that explicitly
// recalculates the checksums of the specified migrations and updates their
// records in the migrations table to have the calculated checksum.
//
// You should NOT use this as part of normal operations, it exists to help
// devops/db-admin/sres interact with migration state.
//
// It returns a list of the [AppliedMigration]s whose checksums have been
// recalculated.
func RecalculateChecksums(ctx context.Context, db *sql.DB, dir fs.FS, logger Logger, ids ...string) ([]AppliedMigration, error) {
migrations, err := Load(dir)
if err != nil {
return nil, err
}
migrator := NewMigrator(migrations)
migrator.Logger = logger
return migrator.RecalculateChecksums(ctx, db, ids...)
}
// RecalculateChecksums (⚠️ danger) is a manual operation that explicitly
// recalculates the checksums of all known migrations and updates their records
// in the migrations table to have the calculated checksum.
//
// You should NOT use this as part of normal operations, it exists to help
// devops/db-admin/sres interact with migration state.
//
// It returns a list of the [AppliedMigration]s whose checksums have been
// recalculated.
func RecalculateAllChecksums(ctx context.Context, db *sql.DB, dir fs.FS, logger Logger) ([]AppliedMigration, error) {
migrations, err := Load(dir)
if err != nil {
return nil, err
}
migrator := NewMigrator(migrations)
migrator.Logger = logger
return migrator.RecalculateAllChecksums(ctx, db)
}