diff --git a/Cargo.lock b/Cargo.lock index c1e9791eb..fd874b2c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3657,6 +3657,7 @@ version = "1.7.0" dependencies = [ "async-trait", "chrono", + "crc", "futures-util", "mas-data-model", "mas-iana", @@ -3673,6 +3674,7 @@ dependencies = [ "sha2", "sqlx", "thiserror 2.0.17", + "tokio", "tracing", "ulid", "url", diff --git a/crates/cli/src/commands/config.rs b/crates/cli/src/commands/config.rs index 034f84b4a..ae4d329ff 100644 --- a/crates/cli/src/commands/config.rs +++ b/crates/cli/src/commands/config.rs @@ -12,10 +12,9 @@ use clap::Parser; use figment::Figment; use mas_config::{ConfigurationSection, RootConfig, SyncConfig}; use mas_data_model::{Clock as _, SystemClock}; -use mas_storage_pg::MIGRATOR; use rand::SeedableRng; use tokio::io::AsyncWriteExt; -use tracing::{Instrument, info, info_span}; +use tracing::{info, info_span}; use crate::util::database_connection_from_config; @@ -129,9 +128,7 @@ impl Options { // Grab a connection to the database let mut conn = database_connection_from_config(&config.database).await?; - MIGRATOR - .run(&mut conn) - .instrument(info_span!("db.migrate")) + mas_storage_pg::migrate(&mut conn) .await .context("could not run migrations")?; diff --git a/crates/cli/src/commands/database.rs b/crates/cli/src/commands/database.rs index 519536fff..7acc6830e 100644 --- a/crates/cli/src/commands/database.rs +++ b/crates/cli/src/commands/database.rs @@ -10,8 +10,7 @@ use anyhow::Context; use clap::Parser; use figment::Figment; use mas_config::{ConfigurationSectionExt, DatabaseConfig}; -use mas_storage_pg::MIGRATOR; -use tracing::{Instrument, info_span}; +use tracing::info_span; use crate::util::database_connection_from_config; @@ -35,9 +34,7 @@ impl Options { let mut conn = database_connection_from_config(&config).await?; // Run pending migrations - MIGRATOR - .run(&mut conn) - .instrument(info_span!("db.migrate")) + mas_storage_pg::migrate(&mut conn) .await .context("could not run migrations")?; diff --git a/crates/cli/src/commands/server.rs b/crates/cli/src/commands/server.rs index d64cd26a3..b72d48111 100644 --- a/crates/cli/src/commands/server.rs +++ b/crates/cli/src/commands/server.rs @@ -4,7 +4,7 @@ // SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial // Please see LICENSE files in the repository root for full details. -use std::{collections::BTreeSet, process::ExitCode, sync::Arc, time::Duration}; +use std::{process::ExitCode, sync::Arc, time::Duration}; use anyhow::Context; use clap::Parser; @@ -18,9 +18,8 @@ use mas_data_model::SystemClock; use mas_handlers::{ActivityTracker, CookieManager, Limiter, MetadataCache}; use mas_listener::server::Server; use mas_router::UrlBuilder; -use mas_storage_pg::{MIGRATOR, PgRepositoryFactory}; -use sqlx::migrate::Migrate; -use tracing::{Instrument, info, info_span, warn}; +use mas_storage_pg::PgRepositoryFactory; +use tracing::{info, info_span, warn}; use crate::{ app_state::AppState, @@ -73,24 +72,20 @@ impl Options { let pool = database_pool_from_config(&config.database).await?; if self.no_migrate { - // Check that we applied all the migrations let mut conn = pool.acquire().await?; - let applied = conn.list_applied_migrations().await?; - let applied: BTreeSet<_> = applied.into_iter().map(|m| m.version).collect(); - let has_missing_migrations = MIGRATOR.iter().any(|m| !applied.contains(&m.version)); - if has_missing_migrations { + let pending_migrations = mas_storage_pg::pending_migrations(&mut conn).await?; + if !pending_migrations.is_empty() { // Refuse to start if there are pending migrations return Err(anyhow::anyhow!( - "The server is running with `--no-migrate` but there are pending. Please run them first with `mas-cli database migrate`, or omit the `--no-migrate` flag to apply them automatically on startup." + "The server is running with `--no-migrate` but there are pending migrations. Please run them first with `mas-cli database migrate`, or omit the `--no-migrate` flag to apply them automatically on startup." )); } } else { info!("Running pending database migrations"); - MIGRATOR - .run(&pool) - .instrument(info_span!("db.migrate")) + let mut conn = pool.acquire().await?; + mas_storage_pg::migrate(&mut conn) .await - .context("could not run database migrations")?; + .context("could not run migrations")?; } let encrypter = config.secrets.encrypter().await?; diff --git a/crates/cli/src/commands/syn2mas.rs b/crates/cli/src/commands/syn2mas.rs index c28935af5..dfa835b95 100644 --- a/crates/cli/src/commands/syn2mas.rs +++ b/crates/cli/src/commands/syn2mas.rs @@ -14,13 +14,12 @@ use mas_config::{ UpstreamOAuth2Config, }; use mas_data_model::SystemClock; -use mas_storage_pg::MIGRATOR; use rand::thread_rng; use sqlx::{Connection, Either, PgConnection, postgres::PgConnectOptions, types::Uuid}; use syn2mas::{ LockedMasDatabase, MasWriter, Progress, ProgressStage, SynapseReader, synapse_config, }; -use tracing::{Instrument, error, info, info_span}; +use tracing::{Instrument, error, info}; use crate::util::{DatabaseConnectOptions, database_connection_from_config_with_options}; @@ -122,9 +121,7 @@ impl Options { ) .await?; - MIGRATOR - .run(&mut mas_connection) - .instrument(info_span!("db.migrate")) + mas_storage_pg::migrate(&mut mas_connection) .await .context("could not run migrations")?; diff --git a/crates/storage-pg/.sqlx/query-2f66991d7b9ba58f011d9aef0eb6a38f3b244c2f46444c0ab345de7feff54aba.json b/crates/storage-pg/.sqlx/query-2f66991d7b9ba58f011d9aef0eb6a38f3b244c2f46444c0ab345de7feff54aba.json new file mode 100644 index 000000000..7fb8be867 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-2f66991d7b9ba58f011d9aef0eb6a38f3b244c2f46444c0ab345de7feff54aba.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT current_database() as \"current_database!\"", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "current_database!", + "type_info": "Name" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + null + ] + }, + "hash": "2f66991d7b9ba58f011d9aef0eb6a38f3b244c2f46444c0ab345de7feff54aba" +} diff --git a/crates/storage-pg/.sqlx/query-fbf926f630df5d588df4f1c9c0dc0f594332be5829d5d7c6b66183ac25b3d166.json b/crates/storage-pg/.sqlx/query-fbf926f630df5d588df4f1c9c0dc0f594332be5829d5d7c6b66183ac25b3d166.json new file mode 100644 index 000000000..d41b1dccd --- /dev/null +++ b/crates/storage-pg/.sqlx/query-fbf926f630df5d588df4f1c9c0dc0f594332be5829d5d7c6b66183ac25b3d166.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT EXISTS (\n SELECT 1\n FROM information_schema.tables\n WHERE table_name = '_sqlx_migrations'\n ) AS \"exists!\"\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "exists!", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + null + ] + }, + "hash": "fbf926f630df5d588df4f1c9c0dc0f594332be5829d5d7c6b66183ac25b3d166" +} diff --git a/crates/storage-pg/Cargo.toml b/crates/storage-pg/Cargo.toml index 8710ead70..c058c3c14 100644 --- a/crates/storage-pg/Cargo.toml +++ b/crates/storage-pg/Cargo.toml @@ -19,6 +19,7 @@ workspace = true [dependencies] async-trait.workspace = true chrono.workspace = true +crc.workspace = true futures-util.workspace = true opentelemetry-semantic-conventions.workspace = true opentelemetry.workspace = true @@ -31,6 +32,7 @@ sha2.workspace = true sqlx.workspace = true thiserror.workspace = true tracing.workspace = true +tokio.workspace = true ulid.workspace = true url.workspace = true uuid.workspace = true diff --git a/crates/storage-pg/src/lib.rs b/crates/storage-pg/src/lib.rs index 207235667..ea2029475 100644 --- a/crates/storage-pg/src/lib.rs +++ b/crates/storage-pg/src/lib.rs @@ -160,7 +160,15 @@ #![deny(clippy::future_not_send, missing_docs)] #![allow(clippy::module_name_repetitions, clippy::blocks_in_conditions)] -use sqlx::migrate::Migrator; +use std::collections::{BTreeMap, BTreeSet, HashSet}; + +use ::tracing::{Instrument, debug, info, info_span, warn}; +use opentelemetry_semantic_conventions::trace::DB_QUERY_TEXT; +use sqlx::{ + Either, PgConnection, + migrate::{AppliedMigration, Migrate, MigrateError, Migration, Migrator}, + postgres::{PgAdvisoryLock, PgAdvisoryLockKey}, +}; pub mod app_session; pub mod compat; @@ -186,14 +194,290 @@ pub use self::{ tracing::ExecuteExt, }; -/// Embedded migrations, allowing them to run on startup -pub static MIGRATOR: Migrator = { - // XXX: The macro does not let us ignore missing migrations, so we have to do it - // like this. See https://github.com/launchbadge/sqlx/issues/1788 - let mut m = sqlx::migrate!(); +/// Embedded migrations in the binary +pub static MIGRATOR: Migrator = sqlx::migrate!(); - // We manually removed some migrations because they made us depend on the - // `pgcrypto` extension. See: https://github.com/matrix-org/matrix-authentication-service/issues/1557 - m.ignore_missing = true; - m -}; +fn available_migrations() -> BTreeMap { + MIGRATOR.iter().map(|m| (m.version, m)).collect() +} + +/// This is the list of migrations we've removed from the migration history but +/// might have been applied in the past +#[allow(clippy::inconsistent_digit_grouping)] +const ALLOWED_MISSING_MIGRATIONS: &[i64] = &[ + // https://github.com/matrix-org/matrix-authentication-service/pull/1585 + 20220709_210445, + 20230330_210841, + 20230408_110421, +]; + +fn allowed_missing_migrations() -> BTreeSet { + ALLOWED_MISSING_MIGRATIONS.iter().copied().collect() +} + +/// This is a list of possible additional checksums from previous versions of +/// migrations. The checksum we store in the database is 48 bytes long. We're +/// not really concerned with partial hash collisions, and to avoid this file to +/// be completely unreadable, we only store the upper 16 bytes of that hash. +#[allow(clippy::inconsistent_digit_grouping)] +const ALLOWED_ALTERNATE_CHECKSUMS: &[(i64, u128)] = &[ + // https://github.com/element-hq/matrix-authentication-service/pull/5300 + (20250410_000000, 0x8811_c3ef_dbee_8c00_5b49_25da_5d55_9c3f), + (20250410_000001, 0x7990_37b3_2193_8a5d_c72f_bccd_95fd_82e5), + (20250410_000002, 0xf2b8_f120_deae_27e7_60d0_79a3_0b77_eea3), + (20250410_000003, 0x06be_fc2b_cedc_acf4_b981_02c7_b40c_c469), + (20250410_000004, 0x0a90_9c6a_dba7_545c_10d9_60eb_6d30_2f50), + (20250410_000006, 0xcc7f_5152_6497_5729_d94b_be0d_9c95_8316), + (20250410_000007, 0x12e7_cfab_a017_a5a5_4f2c_18fa_541c_ce62), + (20250410_000008, 0x171d_62e5_ee1a_f0d9_3639_6c5a_277c_54cd), + (20250410_000009, 0xb1a0_93c7_6645_92ad_df45_b395_57bb_a281), + (20250410_000010, 0x8089_86ac_7cff_8d86_2850_d287_cdb1_2b57), + (20250410_000011, 0x8d9d_3fae_02c9_3d3f_81e4_6242_2b39_b5b8), + (20250410_000012, 0x9805_1372_41aa_d5b0_ebe1_ba9d_28c7_faf6), + (20250410_000013, 0x7291_9a97_e4d1_0d45_1791_6e8c_3f2d_e34d), + (20250410_000014, 0x811d_f965_8127_e168_4aa2_f177_a4e6_f077), + (20250410_000015, 0xa639_0780_aab7_d60d_5fcb_771d_13ed_73ee), + (20250410_000016, 0x22b6_e909_6de4_39e3_b2b9_c684_7417_fe07), + (20250410_000017, 0x9dfe_b6d3_89e4_e509_651b_2793_8d8d_cd32), + (20250410_000018, 0x638f_bdbc_2276_5094_020b_cec1_ab95_c07f), + (20250410_000019, 0xa283_84bc_5fd5_7cbd_b5fb_b5fe_0255_6845), + (20250410_000020, 0x17d1_54b1_7c6e_fc48_61dd_da3d_f8a5_9546), + (20250410_000022, 0xbc36_af82_994a_6f93_8aca_a46b_fc3c_ffde), + (20250410_000023, 0x54ec_3b07_ac79_443b_9e18_a2b3_2d17_5ab9), + (20250410_000024, 0x8ab4_4f80_00b6_58b2_d757_c40f_bc72_3d87), + (20250410_000025, 0x5dc4_2ff3_3042_2f45_046d_10af_ab3a_b583), + (20250410_000026, 0x5263_c547_0b64_6425_5729_48b2_ce84_7cad), + (20250410_000027, 0x0aad_cb50_1d6a_7794_9017_d24d_55e7_1b9d), + (20250410_000028, 0x8fc1_92f8_68df_ca4e_3e2b_cddf_bc12_cffe), + (20250410_000029, 0x416c_9446_b6a3_1b49_2940_a8ac_c1c2_665a), + (20250410_000030, 0x83a5_e51e_25a6_77fb_2b79_6ea5_db1e_364f), + (20250410_000031, 0xfa18_a707_9438_dbc7_2cde_b5f1_ee21_5c7e), + (20250410_000032, 0xd669_662e_8930_838a_b142_c3fa_7b39_d2a0), + (20250410_000033, 0x4019_1053_cabc_191c_c02e_9aa9_407c_0de5), + (20250410_000034, 0xdd59_e595_24e6_4dad_c5f7_fef2_90b8_df57), + (20250410_000035, 0x09b4_ea53_2da4_9c39_eb10_db33_6a6d_608b), + (20250410_000036, 0x3ca5_9c78_8480_e342_d729_907c_d293_2049), + (20250410_000037, 0xc857_2a10_450b_0612_822c_2b86_535a_ea7d), + (20250410_000038, 0x1642_39da_9c3b_d9fd_b1e1_72b1_db78_b978), + (20250410_000039, 0xdd70_b211_6016_bb84_0d84_f04e_eb8a_59d9), + (20250410_000040, 0xe435_ead6_c363_a0b6_e048_dd85_0ecb_9499), + (20250410_000041, 0xe9f3_122f_70d4_9839_c818_4b18_0192_ae26), + (20250410_000043, 0xec5e_1400_483d_c4bf_6014_aba4_ffc3_6236), + (20250410_000044, 0x4750_5eba_4095_6664_78d0_27f9_64bf_64f4), + (20250410_000045, 0x9a53_bd70_4cad_2bf1_61d4_f143_0c82_681d), + (20250410_121612, 0x25f0_9d20_a897_df18_162d_1c47_b68e_81bd), + (20250602_212101, 0xd1a8_782c_b3f0_5045_3f46_49a0_bab0_822b), + (20250708_155857, 0xb78e_6957_a588_c16a_d292_a0c7_cae9_f290), + (20250915_092635, 0x6854_d58b_99d7_3ac5_82f8_25e5_b1c3_cc0b), + (20251127_145951, 0x3bcd_d92e_8391_2a2c_8a18_1d76_354f_96c6), +]; + +fn alternate_checksums_map() -> BTreeMap> { + let mut map = BTreeMap::new(); + for (version, checksum) in ALLOWED_ALTERNATE_CHECKSUMS { + map.entry(*version) + .or_insert_with(HashSet::new) + .insert(*checksum); + } + map +} + +/// Load the list of applied migrations into a map. +/// +/// It's important to use a [`BTreeMap`] so that the migrations are naturally +/// ordered by version. +async fn applied_migrations_map( + conn: &mut PgConnection, +) -> Result, MigrateError> { + let applied_migrations = conn + .list_applied_migrations() + .await? + .into_iter() + .map(|m| (m.version, m)) + .collect(); + + Ok(applied_migrations) +} + +/// Checks if the migration table exists +async fn migration_table_exists(conn: &mut PgConnection) -> Result { + sqlx::query_scalar!( + r#" + SELECT EXISTS ( + SELECT 1 + FROM information_schema.tables + WHERE table_name = '_sqlx_migrations' + ) AS "exists!" + "#, + ) + .fetch_one(conn) + .await +} + +/// Run the migrations on the given connection +/// +/// This function acquires an advisory lock on the database to ensure that only +/// one migrator is running at a time. +/// +/// # Errors +/// +/// This function returns an error if the migration fails. +#[::tracing::instrument(name = "db.migrate", skip_all, err)] +pub async fn migrate(conn: &mut PgConnection) -> Result<(), MigrateError> { + // Get the database name and use it to derive an advisory lock key. This + // is the same lock key used by SQLx default migrator, so that it works even + // with older versions of MAS, and when running through `cargo sqlx migrate run` + let database_name = sqlx::query_scalar!(r#"SELECT current_database() as "current_database!""#) + .fetch_one(&mut *conn) + .await + .map_err(MigrateError::from)?; + + let lock = + PgAdvisoryLock::with_key(PgAdvisoryLockKey::BigInt(generate_lock_id(&database_name))); + + // Try to acquire the migration lock in a loop. + // + // The reason we do that with a `try_acquire` is because in Postgres, `CREATE + // INDEX CONCURRENTLY` will *not* complete whilst an advisory lock is being + // acquired on another connection. This then means that if we run two + // migration process at the same time, one of them will go through and block + // on concurrent index creations, because the other will get stuck trying to + // acquire this lock. + // + // To avoid this, we use `try_acquire`/`pg_advisory_lock_try` in a loop, which + // will fail immediately if the lock is held by another connection, allowing + // potential 'CREATE INDEX CONCURRENTLY' statements to complete. + let mut backoff = std::time::Duration::from_millis(250); + let mut conn = conn; + let mut locked_connection = loop { + match lock.try_acquire(conn).await? { + Either::Left(guard) => break guard, + Either::Right(conn_) => { + warn!( + "Another process is already running migrations on the database, waiting {duration}s and trying again…", + duration = backoff.as_secs_f32() + ); + tokio::time::sleep(backoff).await; + backoff = std::cmp::min(backoff * 2, std::time::Duration::from_secs(5)); + conn = conn_; + } + } + }; + + // Creates the migration table if missing + // We check if the table exists before calling `ensure_migrations_table` to + // avoid the pesky 'relation "_sqlx_migrations" already exists, skipping' notice + if !migration_table_exists(locked_connection.as_mut()).await? { + locked_connection.as_mut().ensure_migrations_table().await?; + } + + for migration in pending_migrations(locked_connection.as_mut()).await? { + info!( + "Applying migration {version}: {description}", + version = migration.version, + description = migration.description + ); + locked_connection + .as_mut() + .apply(migration) + .instrument(info_span!( + "db.migrate.run_migration", + db.migration.version = migration.version, + db.migration.description = &*migration.description, + { DB_QUERY_TEXT } = &*migration.sql, + )) + .await?; + } + + locked_connection.release_now().await?; + + Ok(()) +} + +/// Get the list of pending migrations +/// +/// # Errors +/// +/// This function returns an error if there is a problem checking the applied +/// migrations +pub async fn pending_migrations( + conn: &mut PgConnection, +) -> Result, MigrateError> { + // Load the maps of available migrations, applied migrations, migrations that + // are allowed to be missing, alternate checksums for migrations that changed + let available_migrations = available_migrations(); + let allowed_missing = allowed_missing_migrations(); + let alternate_checksums = alternate_checksums_map(); + let applied_migrations = if migration_table_exists(&mut *conn).await? { + applied_migrations_map(&mut *conn).await? + } else { + BTreeMap::new() + }; + + // Check that all applied migrations are still valid + for applied_migration in applied_migrations.values() { + // Check that we know about the applied migration + if let Some(migration) = available_migrations.get(&applied_migration.version) { + // Check the migration checksum + if applied_migration.checksum != migration.checksum { + // The checksum we have in the database doesn't match the one we + // have embedded. This might be because a migration was + // intentionally changed, so we check the alternate checksums + if let Some(alternates) = alternate_checksums.get(&applied_migration.version) { + // This converts the first 16 bytes of the checksum into a u128 + let Some(applied_checksum_prefix) = applied_migration + .checksum + .get(..16) + .and_then(|bytes| bytes.try_into().ok()) + .map(u128::from_be_bytes) + else { + return Err(MigrateError::ExecuteMigration( + sqlx::Error::InvalidArgument( + "checksum stored in database is invalid".to_owned(), + ), + applied_migration.version, + )); + }; + + if !alternates.contains(&applied_checksum_prefix) { + warn!( + "The database has a migration applied ({version}) which has known alternative checksums {alternates:x?}, but none of them matched {applied_checksum_prefix:x}", + version = applied_migration.version, + ); + return Err(MigrateError::VersionMismatch(applied_migration.version)); + } + } else { + return Err(MigrateError::VersionMismatch(applied_migration.version)); + } + } + } else if allowed_missing.contains(&applied_migration.version) { + // The migration is missing, but allowed to be missing + debug!( + "The database has a migration applied ({version}) that doesn't exist anymore, but it was intentionally removed", + version = applied_migration.version + ); + } else { + // The migration is missing, warn about it + warn!( + "The database has a migration applied ({version}) that doesn't exist anymore! This should not happen, unless rolling back to an older version of MAS.", + version = applied_migration.version + ); + } + } + + Ok(available_migrations + .values() + .copied() + .filter(|migration| { + !migration.migration_type.is_down_migration() + && !applied_migrations.contains_key(&migration.version) + }) + .collect()) +} + +// Copied from the sqlx source code, so that we generate the same lock ID +fn generate_lock_id(database_name: &str) -> i64 { + const CRC_IEEE: crc::Crc = crc::Crc::::new(&crc::CRC_32_ISO_HDLC); + // 0x3d32ad9e chosen by fair dice roll + 0x3d32_ad9e * i64::from(CRC_IEEE.checksum(database_name.as_bytes())) +} diff --git a/docs/development/database.md b/docs/development/database.md index 5ffe8a5a8..2deafce2b 100644 --- a/docs/development/database.md +++ b/docs/development/database.md @@ -40,7 +40,7 @@ cargo sqlx prepare ## Migrations -Migration files live in the `migrations` folder in the `mas-core` crate. +Migration files live in the `migrations` folder in the `mas-storage-pg` crate. ```sh cd crates/storage-pg/ # Again, in the mas-storage-pg crate folder @@ -50,3 +50,29 @@ cargo sqlx migrate add [description] # Add new migration files ``` Note that migrations are embedded in the final binary and can be run from the service CLI tool. + +### Removing migrations + +For various reasons, we may want to delete migrations. +In case we do, we *must* declare that migration version as allowed to be missing. +This is because on startup, MAS will validate that all the applied migrations are known, and warn if some are missing. + +To do so, get the migration version and add it to the `ALLOWED_MISSING_MIGRATIONS` array in the `mas-storage-pg` crate. + +### Modifying existing migrations + +We may want to modify existing migrations to fix mistakes. +In case we do, we *must* save the hash of the original migration file so that MAS can validate it on startup. + +To do so, extract the first 16 bytes of the existing applied migration and append it to the `ALLOWED_ALTERNATE_CHECKSUMS` array in the `mas-storage-pg` crate. + +```sql +SELECT version, ENCODE(SUBSTRING(checksum FOR 16), 'hex') AS short_checksum +FROM _sqlx_migrations +WHERE version = 20250410000002; +``` +``` + version | short_checksum +----------------+---------------------------------- + 20250410000002 | f2b8f120deae27e760d079a30b77eea3 +```