From 8ddd068480c4754a86c6e73857204dffe1bb0232 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 4 Apr 2025 10:21:48 -0700 Subject: [PATCH 01/13] graph: Make metrics registration less noisy --- graph/src/components/metrics/registry.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/graph/src/components/metrics/registry.rs b/graph/src/components/metrics/registry.rs index e010d3a89fa..93cf51b3bd1 100644 --- a/graph/src/components/metrics/registry.rs +++ b/graph/src/components/metrics/registry.rs @@ -3,7 +3,7 @@ use std::sync::{Arc, RwLock}; use prometheus::IntGauge; use prometheus::{labels, Histogram, IntCounterVec}; -use slog::info; +use slog::debug; use crate::components::metrics::{counter_with_labels, gauge_with_labels}; use crate::prelude::Collector; @@ -133,7 +133,7 @@ impl MetricsRegistry { let mut result = self.registry.register(collector.clone()); if matches!(result, Err(PrometheusError::AlreadyReg)) { - info!(logger, "Resolving duplicate metric registration"); + debug!(logger, "Resolving duplicate metric registration"); // Since the current metric is a duplicate, // we can use it to unregister the previous registration. @@ -144,7 +144,6 @@ impl MetricsRegistry { match result { Ok(()) => { - info!(logger, "Successfully registered a new metric"); self.registered_metrics.inc(); } Err(err) => { From 13cce5330b19dc9488458342fb3c8a45c3dcb37f Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 3 Apr 2025 09:53:41 -0700 Subject: [PATCH 02/13] store: Allow creating special Namespaces --- store/postgres/src/primary.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index 5ec81dcbd61..f329ae4bba2 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -266,6 +266,13 @@ impl Namespace { Namespace(format!("prune{id}")) } + /// A namespace that is not a deployment namespace. This is used for + /// special namespaces we use. No checking is done on `s` and the caller + /// must ensure it's a valid namespace name + pub fn special(s: impl Into) -> Self { + Namespace(s.into()) + } + pub fn as_str(&self) -> &str { &self.0 } From 3eac30c4e594875b8c136ec4cf27fab0faf5c0e9 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 3 Apr 2025 14:45:41 -0700 Subject: [PATCH 03/13] store: Factor the locale check into a method --- store/postgres/src/connection_pool.rs | 35 +++++++++++++++++---------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index ace5cddd719..05868815c02 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -1169,24 +1169,33 @@ impl PoolInner { .and_then(|()| pool.create_cross_shard_views(coord.servers.as_ref())); result.unwrap_or_else(|err| die(&pool.logger, "migrations failed", &err)); - // Locale check - if let Err(msg) = catalog::Locale::load(&mut conn)?.suitable() { - if &self.shard == &*PRIMARY_SHARD && primary::is_empty(&mut conn)? { - die( - &pool.logger, - "Database does not use C locale. \ - Please check the graph-node documentation for how to set up the database locale", - &msg, - ); - } else { - warn!(pool.logger, "{}.\nPlease check the graph-node documentation for how to set up the database locale", msg); - } - } + self.locale_check(&pool.logger, conn)?; debug!(&pool.logger, "Setup finished"; "setup_time_s" => start.elapsed().as_secs()); Ok(()) } + fn locale_check( + &self, + logger: &Logger, + mut conn: PooledConnection>, + ) -> Result<(), StoreError> { + Ok( + if let Err(msg) = catalog::Locale::load(&mut conn)?.suitable() { + if &self.shard == &*PRIMARY_SHARD && primary::is_empty(&mut conn)? { + const MSG: &str = + "Database does not use C locale. \ + Please check the graph-node documentation for how to set up the database locale"; + + crit!(logger, "{}: {}", MSG, msg); + panic!("{}: {}", MSG, msg); + } else { + warn!(logger, "{}.\nPlease check the graph-node documentation for how to set up the database locale", msg); + } + }, + ) + } + pub(crate) async fn query_permit(&self) -> tokio::sync::OwnedSemaphorePermit { let start = Instant::now(); let permit = self.query_semaphore.cheap_clone().acquire_owned().await; From 034cba57a833fd4db619f5ca7120cd5d4e957e78 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 4 Apr 2025 10:22:18 -0700 Subject: [PATCH 04/13] graph: Allow returning values from task_spawn --- graph/src/task_spawn.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/graph/src/task_spawn.rs b/graph/src/task_spawn.rs index 09055ad5381..dd1477bb1c8 100644 --- a/graph/src/task_spawn.rs +++ b/graph/src/task_spawn.rs @@ -57,10 +57,11 @@ pub fn block_on(f: impl Future03) -> T { } /// Spawns a thread with access to the tokio runtime. Panics if the thread cannot be spawned. -pub fn spawn_thread( - name: impl Into, - f: impl 'static + FnOnce() + Send, -) -> std::thread::JoinHandle<()> { +pub fn spawn_thread(name: impl Into, f: F) -> std::thread::JoinHandle +where + F: 'static + FnOnce() -> R + Send, + R: 'static + Send, +{ let conf = std::thread::Builder::new().name(name.into()); let runtime = tokio::runtime::Handle::current(); conf.spawn(move || { From cbbd4e1680ec8e5089df6234dc8364400ad2ff31 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 4 Apr 2025 10:37:06 -0700 Subject: [PATCH 05/13] store: Remove dead code from connection_pool --- store/postgres/src/connection_pool.rs | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index 05868815c02..7b4bc1bafb6 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -580,16 +580,6 @@ impl ConnectionPool { .ignore_timeout(|| inner.try_get_fdw(logger, timeout)) } - pub fn connection_detail(&self) -> Result { - let pool = self.get_ready()?; - ForeignServer::new(pool.shard.clone(), &pool.postgres_url).map_err(|e| e.into()) - } - - /// Check that we can connect to the database - pub fn check(&self) -> bool { - true - } - /// Setup the database for this pool. This includes configuring foreign /// data wrappers for cross-shard communication, and running any pending /// schema migrations for this database. @@ -1027,20 +1017,6 @@ impl PoolInner { self.pool.get().map_err(|_| StoreError::DatabaseUnavailable) } - pub fn get_with_timeout_warning( - &self, - logger: &Logger, - ) -> Result>, StoreError> { - loop { - match self.pool.get_timeout(ENV_VARS.store.connection_timeout) { - Ok(conn) => return Ok(conn), - Err(e) => error!(logger, "Error checking out connection, retrying"; - "error" => brief_error_msg(&e), - ), - } - } - } - /// Get the pool for fdw connections. It is an error if none is configured fn fdw_pool( &self, From f162e2f97bbeaeba8a43de3b012e29e51bd27568 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 4 Apr 2025 21:29:21 -0700 Subject: [PATCH 06/13] store: Do not manage anything about pg_stat_statements It should be up to the operator if they use it or not, and when they want to reset it --- store/postgres/src/connection_pool.rs | 5 ----- tests/src/config.rs | 1 - 2 files changed, 6 deletions(-) diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index 7b4bc1bafb6..e00071ef138 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -1364,11 +1364,6 @@ fn migrate_schema(logger: &Logger, conn: &mut PgConnection) -> Result Date: Thu, 3 Apr 2025 09:31:51 -0700 Subject: [PATCH 07/13] store: Change db setup strategy to guard better against races The current database setup code was inerently racy when several nodes were starting up as it relied on piecemeal locking of individual steps. This change completely revamps the strategy we use: setup now takes a lock on the primary, so that only one node at a time will run the setup code. --- graph/src/components/store/err.rs | 2 +- node/src/store_builder.rs | 7 +- store/postgres/src/advisory_lock.rs | 34 ++- store/postgres/src/connection_pool.rs | 291 ++++++++++++++++++-------- 4 files changed, 227 insertions(+), 107 deletions(-) diff --git a/graph/src/components/store/err.rs b/graph/src/components/store/err.rs index 6af676f8e52..76be7c311ce 100644 --- a/graph/src/components/store/err.rs +++ b/graph/src/components/store/err.rs @@ -141,7 +141,7 @@ impl Clone for StoreError { } impl StoreError { - fn from_diesel_error(e: &DieselError) -> Option { + pub fn from_diesel_error(e: &DieselError) -> Option { const CONN_CLOSE: &str = "server closed the connection unexpectedly"; const STMT_TIMEOUT: &str = "canceling statement due to statement timeout"; let DieselError::DatabaseError(_, info) = e else { diff --git a/node/src/store_builder.rs b/node/src/store_builder.rs index 7fadf6b92c2..abaf59471fd 100644 --- a/node/src/store_builder.rs +++ b/node/src/store_builder.rs @@ -1,7 +1,6 @@ use std::iter::FromIterator; use std::{collections::HashMap, sync::Arc}; -use graph::futures03::future::join_all; use graph::prelude::{o, MetricsRegistry, NodeId}; use graph::url::Url; use graph::{ @@ -62,7 +61,7 @@ impl StoreBuilder { // attempt doesn't work for all of them because the database is // unavailable, they will try again later in the normal course of // using the pool - join_all(pools.values().map(|pool| pool.setup())).await; + coord.setup_all(logger).await; let chains = HashMap::from_iter(config.chains.chains.iter().map(|(name, chain)| { let shard = ShardName::new(chain.shard.to_string()) @@ -196,8 +195,8 @@ impl StoreBuilder { Arc::new(DieselStore::new(subgraph_store, block_store)) } - /// Create a connection pool for the main database of the primary shard - /// without connecting to all the other configured databases + /// Create a connection pool for the main (non-replica) database of a + /// shard pub fn main_pool( logger: &Logger, node: &NodeId, diff --git a/store/postgres/src/advisory_lock.rs b/store/postgres/src/advisory_lock.rs index bd60d34c634..85e2cf5a4ae 100644 --- a/store/postgres/src/advisory_lock.rs +++ b/store/postgres/src/advisory_lock.rs @@ -6,7 +6,7 @@ //! has more details on advisory locks. //! //! We use the following 64 bit locks: -//! * 1,2: to synchronize on migratons +//! * 1: to synchronize on migratons //! //! We use the following 2x 32-bit locks //! * 1, n: to lock copying of the deployment with id n in the destination @@ -69,17 +69,31 @@ const COPY: Scope = Scope { id: 1 }; const WRITE: Scope = Scope { id: 2 }; const PRUNE: Scope = Scope { id: 3 }; -/// Get a lock for running migrations. Blocks until we get the lock. -pub(crate) fn lock_migration(conn: &mut PgConnection) -> Result<(), StoreError> { - sql_query("select pg_advisory_lock(1)").execute(conn)?; +/// Block until we can get the migration lock, then run `f` and unlock when +/// it is done. This is used to make sure that only one node runs setup at a +/// time. +pub(crate) async fn with_migration_lock( + conn: &mut PgConnection, + f: F, +) -> Result +where + F: FnOnce(&mut PgConnection) -> Fut, + Fut: std::future::Future>, +{ + fn execute(conn: &mut PgConnection, query: &str, msg: &str) -> Result<(), StoreError> { + sql_query(query).execute(conn).map(|_| ()).map_err(|e| { + StoreError::from_diesel_error(&e) + .unwrap_or_else(|| StoreError::Unknown(anyhow::anyhow!("{}: {}", msg, e))) + }) + } - Ok(()) -} + const LOCK: &str = "select pg_advisory_lock(1)"; + const UNLOCK: &str = "select pg_advisory_unlock(1)"; -/// Release the migration lock. -pub(crate) fn unlock_migration(conn: &mut PgConnection) -> Result<(), StoreError> { - sql_query("select pg_advisory_unlock(1)").execute(conn)?; - Ok(()) + execute(conn, LOCK, "failed to acquire migration lock")?; + let res = f(conn).await; + execute(conn, UNLOCK, "failed to release migration lock")?; + res } /// Take the lock used to keep two copy operations to run simultaneously on diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index e00071ef138..171fb8dbbb6 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -10,6 +10,8 @@ use diesel_migrations::{EmbeddedMigrations, HarnessWithOutput}; use graph::cheap_clone::CheapClone; use graph::components::store::QueryPermit; use graph::constraint_violation; +use graph::futures03::future::join_all; +use graph::futures03::FutureExt as _; use graph::prelude::tokio::time::Instant; use graph::prelude::{tokio, MetricsRegistry}; use graph::slog::warn; @@ -33,8 +35,9 @@ use std::{collections::HashMap, sync::RwLock}; use postgres::config::{Config, Host}; -use crate::primary::{self, Mirror, NAMESPACE_PUBLIC}; -use crate::{advisory_lock, catalog}; +use crate::advisory_lock::with_migration_lock; +use crate::catalog; +use crate::primary::{self, Mirror, Namespace, NAMESPACE_PUBLIC}; use crate::{Shard, PRIMARY_SHARD}; /// Tables that we map from the primary into `primary_public` in each shard @@ -479,12 +482,17 @@ impl ConnectionPool { } match &*guard { - PoolState::Created(pool, servers) => { - pool.setup(servers.clone())?; - let pool2 = pool.clone(); - *guard = PoolState::Ready(pool.clone()); - self.state_tracker.mark_available(); - Ok(pool2) + PoolState::Created(pool, coord) => { + let migrated = coord.cheap_clone().setup_bg(pool.cheap_clone())?; + + if migrated { + let pool2 = pool.clone(); + *guard = PoolState::Ready(pool.clone()); + self.state_tracker.mark_available(); + Ok(pool2) + } else { + Err(StoreError::DatabaseUnavailable) + } } PoolState::Ready(pool) => Ok(pool.clone()), PoolState::Disabled => Err(StoreError::DatabaseDisabled), @@ -580,23 +588,6 @@ impl ConnectionPool { .ignore_timeout(|| inner.try_get_fdw(logger, timeout)) } - /// Setup the database for this pool. This includes configuring foreign - /// data wrappers for cross-shard communication, and running any pending - /// schema migrations for this database. - /// - /// # Panics - /// - /// If any errors happen during the migration, the process panics - pub async fn setup(&self) { - let pool = self.clone(); - graph::spawn_blocking_allow_panic(move || { - pool.get_ready().ok(); - }) - .await - // propagate panics - .unwrap(); - } - pub(crate) async fn query_permit(&self) -> Result { let pool = match &*self.inner.lock(&self.logger) { PoolState::Created(pool, _) | PoolState::Ready(pool) => pool.clone(), @@ -1096,61 +1087,6 @@ impl PoolInner { .unwrap_or(false) } - /// Setup the database for this pool. This includes configuring foreign - /// data wrappers for cross-shard communication, and running any pending - /// schema migrations for this database. - /// - /// Returns `StoreError::DatabaseUnavailable` if we can't connect to the - /// database. Any other error causes a panic. - /// - /// # Panics - /// - /// If any errors happen during the migration, the process panics - fn setup(&self, coord: Arc) -> Result<(), StoreError> { - fn die(logger: &Logger, msg: &'static str, err: &dyn std::fmt::Display) -> ! { - crit!(logger, "{}", msg; "error" => format!("{:#}", err)); - panic!("{}: {}", msg, err); - } - - let pool = self.clone(); - let mut conn = self.get().map_err(|_| StoreError::DatabaseUnavailable)?; - - let start = Instant::now(); - - advisory_lock::lock_migration(&mut conn) - .unwrap_or_else(|err| die(&pool.logger, "failed to get migration lock", &err)); - // This code can cause a race in database setup: if pool A has had - // schema changes and pool B then tries to map tables from pool A, - // but does so before the concurrent thread running this code for - // pool B has at least finished `configure_fdw`, mapping tables will - // fail. In that case, the node must be restarted. The restart is - // guaranteed because this failure will lead to a panic in the setup - // for pool A - // - // This code can also leave the table mappings in a state where they - // have not been updated if the process is killed after migrating - // the schema but before finishing remapping in all shards. - // Addressing that would require keeping track of the need to remap - // in the database instead of just in memory - let result = pool - .configure_fdw(coord.servers.as_ref()) - .and_then(|()| pool.drop_cross_shard_views()) - .and_then(|()| migrate_schema(&pool.logger, &mut conn)); - debug!(&pool.logger, "Release migration lock"); - advisory_lock::unlock_migration(&mut conn).unwrap_or_else(|err| { - die(&pool.logger, "failed to release migration lock", &err); - }); - let result = result - .and_then(|count| coord.propagate(&pool, count)) - .and_then(|()| pool.create_cross_shard_views(coord.servers.as_ref())); - result.unwrap_or_else(|err| die(&pool.logger, "migrations failed", &err)); - - self.locale_check(&pool.logger, conn)?; - - debug!(&pool.logger, "Setup finished"; "setup_time_s" => start.elapsed().as_secs()); - Ok(()) - } - fn locale_check( &self, logger: &Logger, @@ -1199,6 +1135,28 @@ impl PoolInner { }) } + /// Do the part of database setup that only affects this pool. Those + /// steps are + /// 1. Configuring foreign servers and user mappings for talking to the + /// other shards + /// 2. Migrating the schema to the latest version + /// 3. Checking that the locale is set to C + async fn migrate( + self: Arc, + servers: &[ForeignServer], + ) -> Result<(Arc, MigrationCount), StoreError> { + self.configure_fdw(servers)?; + let mut conn = self.get()?; + let (this, count) = conn.transaction(|conn| -> Result<_, StoreError> { + let count = migrate_schema(&self.logger, conn)?; + Ok((self, count)) + })?; + + this.locale_check(&this.logger, conn)?; + + Ok((this, count)) + } + /// If this is the primary shard, drop the namespace `CROSS_SHARD_NSP` fn drop_cross_shard_views(&self) -> Result<(), StoreError> { if self.shard != *PRIMARY_SHARD { @@ -1242,14 +1200,17 @@ impl PoolInner { return Ok(()); } - info!(&self.logger, "Creating cross-shard views"); let mut conn = self.get()?; + let sharded = Namespace::special(ForeignServer::CROSS_SHARD_NSP); + if catalog::has_namespace(&mut conn, &sharded)? { + // We dropped the namespace before, but another node must have + // recreated it in the meantime so we don't need to do anything + return Ok(()); + } + info!(&self.logger, "Creating cross-shard views"); conn.transaction(|conn| { - let query = format!( - "create schema if not exists {}", - ForeignServer::CROSS_SHARD_NSP - ); + let query = format!("create schema {}", ForeignServer::CROSS_SHARD_NSP); conn.batch_execute(&query)?; for (src_nsp, src_tables) in SHARDED_TABLES { // Pairs of (shard, nsp) for all servers @@ -1458,13 +1419,7 @@ impl PoolCoordinator { if count.had_migrations() { let server = self.server(&pool.shard)?; for pool in self.pools.lock().unwrap().values() { - let mut conn = pool.get()?; - let remap_res = { - advisory_lock::lock_migration(&mut conn)?; - let res = pool.remap(server); - advisory_lock::unlock_migration(&mut conn)?; - res - }; + let remap_res = pool.remap(server); if let Err(e) = remap_res { error!(pool.logger, "Failed to map imports from {}", server.shard; "error" => e.to_string()); return Err(e); @@ -1488,4 +1443,156 @@ impl PoolCoordinator { .find(|server| &server.shard == shard) .ok_or_else(|| constraint_violation!("unknown shard {shard}")) } + + fn primary(&self) -> Result, StoreError> { + self.pools + .lock() + .unwrap() + .get(&*PRIMARY_SHARD) + .cloned() + .ok_or_else(|| { + constraint_violation!("internal error: primary shard not found in pool coordinator") + }) + } + + /// Setup all pools the coordinator knows about and return the number of + /// pools that were successfully set up. + /// + /// # Panics + /// + /// If any errors besides a database not being available happen during + /// the migration, the process panics + pub async fn setup_all(&self, logger: &Logger) -> usize { + let pools = self + .pools + .lock() + .unwrap() + .values() + .cloned() + .collect::>(); + + let res = self.setup(pools).await; + + match res { + Ok(count) => { + info!(logger, "Setup finished"; "shards" => count); + count + } + Err(e) => { + crit!(logger, "database setup failed"; "error" => format!("{e}")); + panic!("database setup failed: {}", e); + } + } + } + + /// A helper to call `setup` from a non-async context. Returns `true` if + /// the setup was actually run, i.e. if `pool` was available + fn setup_bg(self: Arc, pool: Arc) -> Result { + let migrated = graph::spawn_thread("database-setup", move || { + graph::block_on(self.setup(vec![pool.clone()])) + }) + .join() + // unwrap: propagate panics + .unwrap()?; + Ok(migrated == 1) + } + + /// Setup all pools by doing the following steps: + /// 1. Get the migration lock in the primary. This makes sure that only + /// one node runs migrations + /// 2. Remove the views in `sharded` as they might interfere with + /// running migrations + /// 3. In parallel, do the following in each pool: + /// 1. Configure fdw servers + /// 2. Run migrations in all pools in parallel + /// 4. In parallel, do the following in each pool: + /// 1. Create/update the mappings in `shard__subgraphs` and in + /// `primary_public` + /// 5. Create the views in `sharded` again + /// 6. Release the migration lock + /// + /// This method tolerates databases that are not available and will + /// simply ignore them. The returned count is the number of pools that + /// were successfully set up. + async fn setup(&self, pools: Vec>) -> Result { + type MigrationCounts = Vec<(Arc, MigrationCount)>; + + /// Filter out pools that are not available. We don't want to fail + /// because one of the pools is not available. We will just ignore + /// them and continue with the others. + fn filter_unavailable( + (pool, res): (Arc, Result), + ) -> Option> { + if let Err(StoreError::DatabaseUnavailable) = res { + error!( + pool.logger, + "migrations failed because database was unavailable" + ); + None + } else { + Some(res) + } + } + + /// Migrate all pools in parallel + async fn migrate( + pools: &[Arc], + servers: &[ForeignServer], + ) -> Result { + let futures = pools + .iter() + .map(|pool| { + pool.cheap_clone() + .migrate(servers) + .map(|res| (pool.cheap_clone(), res)) + }) + .collect::>(); + join_all(futures) + .await + .into_iter() + .filter_map(filter_unavailable) + .collect::, _>>() + } + + /// Propagate the schema changes to all other pools in parallel + async fn propagate( + this: &PoolCoordinator, + migrated: MigrationCounts, + ) -> Result { + let futures = migrated + .into_iter() + .map(|(pool, count)| async move { + let res = this.propagate(&pool, count); + (pool.cheap_clone(), res) + }) + .collect::>(); + join_all(futures) + .await + .into_iter() + .filter_map(filter_unavailable) + .collect::, _>>() + .map(|v| v.len()) + } + + let primary = self.primary()?; + + let mut pconn = primary.get().map_err(|_| StoreError::DatabaseUnavailable)?; + + // Everything here happens under the migration lock. Anything called + // from here should not try to get that lock, otherwise the process + // will deadlock + let res = with_migration_lock(&mut pconn, |_| async { + primary.drop_cross_shard_views()?; + + let migrated = migrate(&pools, self.servers.as_ref()).await?; + + let propagated = propagate(&self, migrated).await?; + + primary.create_cross_shard_views(&self.servers)?; + Ok(propagated) + }) + .await; + + res + } } From a63e607c844875b3d211b254ff5ed2dafd246df1 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 4 Apr 2025 11:41:49 -0700 Subject: [PATCH 08/13] node, store: Give the PoolCoordinator a logger --- node/src/bin/manager.rs | 2 +- node/src/store_builder.rs | 2 +- store/postgres/src/connection_pool.rs | 9 +++++++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index 5142a2ab939..50ee9b61958 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -898,7 +898,7 @@ impl Context { fn primary_pool(self) -> ConnectionPool { let primary = self.config.primary_store(); - let coord = Arc::new(PoolCoordinator::new(Arc::new(vec![]))); + let coord = Arc::new(PoolCoordinator::new(&self.logger, Arc::new(vec![]))); let pool = StoreBuilder::main_pool( &self.logger, &self.node_id, diff --git a/node/src/store_builder.rs b/node/src/store_builder.rs index abaf59471fd..2d2e56dbc69 100644 --- a/node/src/store_builder.rs +++ b/node/src/store_builder.rs @@ -110,7 +110,7 @@ impl StoreBuilder { .collect::, _>>() .expect("connection url's contain enough detail"); let servers = Arc::new(servers); - let coord = Arc::new(PoolCoordinator::new(servers)); + let coord = Arc::new(PoolCoordinator::new(logger, servers)); let shards: Vec<_> = config .stores diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index 171fb8dbbb6..f03bd75ecb0 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -1336,13 +1336,16 @@ fn migrate_schema(logger: &Logger, conn: &mut PgConnection) -> Result>>, servers: Arc>, } impl PoolCoordinator { - pub fn new(servers: Arc>) -> Self { + pub fn new(logger: &Logger, servers: Arc>) -> Self { + let logger = logger.new(o!("component" => "ConnectionPool", "component" => "Coordinator")); Self { + logger, pools: Mutex::new(HashMap::new()), servers, } @@ -1581,7 +1584,9 @@ impl PoolCoordinator { // Everything here happens under the migration lock. Anything called // from here should not try to get that lock, otherwise the process // will deadlock + debug!(self.logger, "Waiting for migration lock"); let res = with_migration_lock(&mut pconn, |_| async { + debug!(self.logger, "Migration lock acquired"); primary.drop_cross_shard_views()?; let migrated = migrate(&pools, self.servers.as_ref()).await?; @@ -1592,7 +1597,7 @@ impl PoolCoordinator { Ok(propagated) }) .await; - + debug!(self.logger, "Database setup finished"); res } } From 3282af28a63038bc41a53074b088ba6146c52108 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 4 Apr 2025 12:35:14 -0700 Subject: [PATCH 09/13] store: Encapsulate mutable state tracking in PoolState Before, PoolState was just an enum and code all over the place dealt with its interior mutability. Now, we encapsulate that to simplify code using the PoolState --- store/postgres/src/connection_pool.rs | 173 ++++++++++++++++++-------- 1 file changed, 118 insertions(+), 55 deletions(-) diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index f03bd75ecb0..70caf4c49fc 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -10,6 +10,7 @@ use diesel_migrations::{EmbeddedMigrations, HarnessWithOutput}; use graph::cheap_clone::CheapClone; use graph::components::store::QueryPermit; use graph::constraint_violation; +use graph::derive::CheapClone; use graph::futures03::future::join_all; use graph::futures03::FutureExt as _; use graph::prelude::tokio::time::Instant; @@ -312,7 +313,17 @@ impl ForeignServer { /// them on idle. This is much shorter than the default of 10 minutes. const FDW_IDLE_TIMEOUT: Duration = Duration::from_secs(60); -/// A pool goes through several states, and this enum tracks what state we +enum PoolStateInner { + /// A connection pool, and all the servers for which we need to + /// establish fdw mappings when we call `setup` on the pool + Created(Arc, Arc), + /// The pool has been successfully set up + Ready(Arc), + /// The pool has been disabled by setting its size to 0 + Disabled(String), +} + +/// A pool goes through several states, and this struct tracks what state we /// are in, together with the `state_tracker` field on `ConnectionPool`. /// When first created, the pool is in state `Created`; once we successfully /// called `setup` on it, it moves to state `Ready`. During use, we use the @@ -322,20 +333,96 @@ const FDW_IDLE_TIMEOUT: Duration = Duration::from_secs(60); /// database connection. That avoids overall undesirable states like buildup /// of queries; instead of queueing them until the database is available, /// they return almost immediately with an error -enum PoolState { - /// A connection pool, and all the servers for which we need to - /// establish fdw mappings when we call `setup` on the pool - Created(Arc, Arc), - /// The pool has been successfully set up - Ready(Arc), - /// The pool has been disabled by setting its size to 0 - Disabled, +#[derive(Clone, CheapClone)] +struct PoolState { + logger: Logger, + inner: Arc>, } +impl PoolState { + fn new(logger: Logger, inner: PoolStateInner, name: String) -> Self { + let pool_name = format!("pool-{}", name); + Self { + logger, + inner: Arc::new(TimedMutex::new(inner, pool_name)), + } + } + + fn disabled(logger: Logger, name: &str) -> Self { + Self::new( + logger, + PoolStateInner::Disabled(name.to_string()), + name.to_string(), + ) + } + + fn created(pool: Arc, coord: Arc) -> Self { + let logger = pool.logger.clone(); + let name = pool.shard.to_string(); + let inner = PoolStateInner::Created(pool, coord); + Self::new(logger, inner, name) + } + + fn ready(pool: Arc) -> Self { + let logger = pool.logger.clone(); + let name = pool.shard.to_string(); + let inner = PoolStateInner::Ready(pool); + Self::new(logger, inner, name) + } + + fn set_ready(&self) { + use PoolStateInner::*; + + let mut guard = self.inner.lock(&self.logger); + match &*guard { + Created(pool, _) => *guard = Ready(pool.clone()), + Ready(_) | Disabled(_) => { /* nothing to do */ } + } + } + + /// Get a connection pool that is ready, i.e., has been through setup + /// and running migrations + fn get_ready(&self) -> Result, StoreError> { + let mut guard = self.inner.lock(&self.logger); + + use PoolStateInner::*; + match &*guard { + Created(pool, coord) => { + let migrated = coord.cheap_clone().setup_bg(pool.cheap_clone())?; + + if migrated { + let pool2 = pool.cheap_clone(); + *guard = Ready(pool.cheap_clone()); + Ok(pool2) + } else { + Err(StoreError::DatabaseUnavailable) + } + } + Ready(pool) => Ok(pool.clone()), + Disabled(name) => Err(constraint_violation!( + "tried to access disabled database pool `{}`", + name + )), + } + } + + /// Get the inner pool, regardless of whether it has been set up or not. + /// Most uses should use `get_ready` instead + fn get_unready(&self) -> Result, StoreError> { + use PoolStateInner::*; + + match &*self.inner.lock(&self.logger) { + Created(pool, _) | Ready(pool) => Ok(pool.cheap_clone()), + Disabled(name) => Err(constraint_violation!( + "tried to access disabled database pool `{}`", + name + )), + } + } +} #[derive(Clone)] pub struct ConnectionPool { - inner: Arc>, - logger: Logger, + inner: PoolState, pub shard: Shard, state_tracker: PoolStateTracker, } @@ -428,9 +515,9 @@ impl ConnectionPool { let state_tracker = PoolStateTracker::new(); let shard = Shard::new(shard_name.to_string()).expect("shard_name is a valid name for a shard"); - let pool_state = { + let inner = { if pool_size == 0 { - PoolState::Disabled + PoolState::disabled(logger.cheap_clone(), shard_name) } else { let pool = PoolInner::create( shard.clone(), @@ -443,15 +530,14 @@ impl ConnectionPool { state_tracker.clone(), ); if pool_name.is_replica() { - PoolState::Ready(Arc::new(pool)) + PoolState::ready(Arc::new(pool)) } else { - PoolState::Created(Arc::new(pool), coord) + PoolState::created(Arc::new(pool), coord) } } }; ConnectionPool { - inner: Arc::new(TimedMutex::new(pool_state, format!("pool-{}", shard_name))), - logger: logger.clone(), + inner, shard, state_tracker, } @@ -460,11 +546,7 @@ impl ConnectionPool { /// This is only used for `graphman` to ensure it doesn't run migrations /// or other setup steps pub fn skip_setup(&self) { - let mut guard = self.inner.lock(&self.logger); - match &*guard { - PoolState::Created(pool, _) => *guard = PoolState::Ready(pool.clone()), - PoolState::Ready(_) | PoolState::Disabled => { /* nothing to do */ } - } + self.inner.set_ready(); } /// Return a pool that is ready, i.e., connected to the database. If the @@ -472,7 +554,6 @@ impl ConnectionPool { /// or the pool is marked as unavailable, return /// `StoreError::DatabaseUnavailable` fn get_ready(&self) -> Result, StoreError> { - let mut guard = self.inner.lock(&self.logger); if !self.state_tracker.is_available() { // We know that trying to use this pool is pointless since the // database is not available, and will only lead to other @@ -481,21 +562,12 @@ impl ConnectionPool { return Err(StoreError::DatabaseUnavailable); } - match &*guard { - PoolState::Created(pool, coord) => { - let migrated = coord.cheap_clone().setup_bg(pool.cheap_clone())?; - - if migrated { - let pool2 = pool.clone(); - *guard = PoolState::Ready(pool.clone()); - self.state_tracker.mark_available(); - Ok(pool2) - } else { - Err(StoreError::DatabaseUnavailable) - } + match self.inner.get_ready() { + Ok(pool) => { + self.state_tracker.mark_available(); + Ok(pool) } - PoolState::Ready(pool) => Ok(pool.clone()), - PoolState::Disabled => Err(StoreError::DatabaseDisabled), + Err(e) => Err(e), } } @@ -589,12 +661,7 @@ impl ConnectionPool { } pub(crate) async fn query_permit(&self) -> Result { - let pool = match &*self.inner.lock(&self.logger) { - PoolState::Created(pool, _) | PoolState::Ready(pool) => pool.clone(), - PoolState::Disabled => { - return Err(StoreError::DatabaseDisabled); - } - }; + let pool = self.inner.get_unready()?; let start = Instant::now(); let permit = pool.query_permit().await; Ok(QueryPermit { @@ -604,10 +671,9 @@ impl ConnectionPool { } pub(crate) fn wait_stats(&self) -> Result { - match &*self.inner.lock(&self.logger) { - PoolState::Created(pool, _) | PoolState::Ready(pool) => Ok(pool.wait_stats.clone()), - PoolState::Disabled => Err(StoreError::DatabaseDisabled), - } + self.inner + .get_unready() + .map(|pool| pool.wait_stats.cheap_clone()) } /// Mirror key tables from the primary into our own schema. We do this @@ -1381,14 +1447,11 @@ impl PoolCoordinator { // yet. We remember the `PoolInner` so that later, when we have to // call `remap()`, we do not have to take this lock as that will be // already held in `get_ready()` - match &*pool.inner.lock(logger) { - PoolState::Created(inner, _) | PoolState::Ready(inner) => { - self.pools - .lock() - .unwrap() - .insert(pool.shard.clone(), inner.clone()); - } - PoolState::Disabled => { /* nothing to do */ } + if let Some(inner) = pool.inner.get_unready().ok() { + self.pools + .lock() + .unwrap() + .insert(pool.shard.clone(), inner.clone()); } } pool From 70656a378172e8aca86a8fa1c803a8d0868d9c56 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 4 Apr 2025 12:37:21 -0700 Subject: [PATCH 10/13] node, store: Rename 'PoolName' to 'PoolRole' --- node/src/store_builder.rs | 6 +++--- store/postgres/src/connection_pool.rs | 18 +++++++++--------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/node/src/store_builder.rs b/node/src/store_builder.rs index 2d2e56dbc69..27dc7d5d021 100644 --- a/node/src/store_builder.rs +++ b/node/src/store_builder.rs @@ -8,7 +8,7 @@ use graph::{ util::security::SafeDisplay, }; use graph_store_postgres::connection_pool::{ - ConnectionPool, ForeignServer, PoolCoordinator, PoolName, + ConnectionPool, ForeignServer, PoolCoordinator, PoolRole, }; use graph_store_postgres::{ BlockStore as DieselBlockStore, ChainHeadUpdateListener as PostgresChainHeadUpdateListener, @@ -224,7 +224,7 @@ impl StoreBuilder { coord.create_pool( &logger, name, - PoolName::Main, + PoolRole::Main, shard.connection.clone(), pool_size, Some(fdw_pool_size), @@ -264,7 +264,7 @@ impl StoreBuilder { coord.clone().create_pool( &logger, name, - PoolName::Replica(pool), + PoolRole::Replica(pool), replica.connection.clone(), pool_size, None, diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index 70caf4c49fc..fd8c26204af 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -435,27 +435,27 @@ impl fmt::Debug for ConnectionPool { } } -/// The name of the pool, mostly for logging, and what purpose it serves. +/// The role of the pool, mostly for logging, and what purpose it serves. /// The main pool will always be called `main`, and can be used for reading /// and writing. Replica pools can only be used for reading, and don't /// require any setup (migrations etc.) -pub enum PoolName { +pub enum PoolRole { Main, Replica(String), } -impl PoolName { +impl PoolRole { fn as_str(&self) -> &str { match self { - PoolName::Main => "main", - PoolName::Replica(name) => name, + PoolRole::Main => "main", + PoolRole::Replica(name) => name, } } fn is_replica(&self) -> bool { match self { - PoolName::Main => false, - PoolName::Replica(_) => true, + PoolRole::Main => false, + PoolRole::Replica(_) => true, } } } @@ -504,7 +504,7 @@ impl PoolStateTracker { impl ConnectionPool { fn create( shard_name: &str, - pool_name: PoolName, + pool_name: PoolRole, postgres_url: String, pool_size: u32, fdw_pool_size: Option, @@ -1421,7 +1421,7 @@ impl PoolCoordinator { self: Arc, logger: &Logger, name: &str, - pool_name: PoolName, + pool_name: PoolRole, postgres_url: String, pool_size: u32, fdw_pool_size: Option, From 67108356b0614fd128a17c21f908524b82cca884 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 4 Apr 2025 15:26:54 -0700 Subject: [PATCH 11/13] all: Filter out shards with pool size 0 Instead of dealing with disabled shards (shards that have a pool size of 0 configured), filter those shards out on startup and warn about them. The end effect is that for that configuration, users will get an error of 'unkown shard' rather than 'shard disabled'. Since configuring a shard to have no connections is kinda pathological, and leads to an error when it is used either way, the code simplification is worth the slightly less helpful error message. Removing the 'disabled' state from pools has ripple effects to quite a few other places, simplifying them a bit --- graph/src/components/store/traits.rs | 6 +- graph/src/data/query/trace.rs | 7 +- graphql/src/execution/resolver.rs | 2 +- graphql/src/introspection/resolver.rs | 2 +- graphql/src/runner.rs | 2 +- graphql/src/store/resolver.rs | 4 +- node/src/store_builder.rs | 20 ++++- server/index-node/src/resolver.rs | 4 +- store/postgres/src/block_store.rs | 6 +- store/postgres/src/connection_pool.rs | 79 +++++++------------ store/postgres/src/deployment_store.rs | 4 +- store/postgres/src/query_store.rs | 4 +- store/postgres/src/store.rs | 4 +- .../test-store/tests/graphql/introspection.rs | 6 +- 14 files changed, 67 insertions(+), 83 deletions(-) diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 27cb3768e2c..73cb22269fe 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -655,7 +655,7 @@ pub trait QueryStore: Send + Sync { block_hash: &BlockHash, ) -> Result, Option)>, StoreError>; - fn wait_stats(&self) -> Result; + fn wait_stats(&self) -> PoolWaitStats; /// Find the current state for the subgraph deployment `id` and /// return details about it needed for executing queries @@ -668,7 +668,7 @@ pub trait QueryStore: Send + Sync { fn network_name(&self) -> &str; /// A permit should be acquired before starting query execution. - async fn query_permit(&self) -> Result; + async fn query_permit(&self) -> QueryPermit; /// Report the name of the shard in which the subgraph is stored. This /// should only be used for reporting and monitoring @@ -683,7 +683,7 @@ pub trait QueryStore: Send + Sync { #[async_trait] pub trait StatusStore: Send + Sync + 'static { /// A permit should be acquired before starting query execution. - async fn query_permit(&self) -> Result; + async fn query_permit(&self) -> QueryPermit; fn status(&self, filter: status::Filter) -> Result, StoreError>; diff --git a/graph/src/data/query/trace.rs b/graph/src/data/query/trace.rs index cf2d153dca4..256c9cdeaf6 100644 --- a/graph/src/data/query/trace.rs +++ b/graph/src/data/query/trace.rs @@ -118,11 +118,8 @@ impl Trace { } } - pub fn query_done(&mut self, dur: Duration, permit: &Result) { - let permit_dur = match permit { - Ok(permit) => permit.wait, - Err(_) => Duration::from_millis(0), - }; + pub fn query_done(&mut self, dur: Duration, permit: &QueryPermit) { + let permit_dur = permit.wait; match self { Trace::None => { /* nothing to do */ } Trace::Root { .. } => { diff --git a/graphql/src/execution/resolver.rs b/graphql/src/execution/resolver.rs index ca59e401dfc..0074eb124d8 100644 --- a/graphql/src/execution/resolver.rs +++ b/graphql/src/execution/resolver.rs @@ -18,7 +18,7 @@ use super::Query; pub trait Resolver: Sized + Send + Sync + 'static { const CACHEABLE: bool; - async fn query_permit(&self) -> Result; + async fn query_permit(&self) -> QueryPermit; /// Prepare for executing a query by prefetching as much data as possible fn prefetch( diff --git a/graphql/src/introspection/resolver.rs b/graphql/src/introspection/resolver.rs index 0f67b717c5a..765b0399695 100644 --- a/graphql/src/introspection/resolver.rs +++ b/graphql/src/introspection/resolver.rs @@ -356,7 +356,7 @@ impl Resolver for IntrospectionResolver { // see `fn as_introspection_context`, so this value is irrelevant. const CACHEABLE: bool = false; - async fn query_permit(&self) -> Result { + async fn query_permit(&self) -> QueryPermit { unreachable!() } diff --git a/graphql/src/runner.rs b/graphql/src/runner.rs index 96f30e8bc9d..d2f0bc9c96c 100644 --- a/graphql/src/runner.rs +++ b/graphql/src/runner.rs @@ -143,7 +143,7 @@ where )?; self.load_manager .decide( - &store.wait_stats().map_err(QueryExecutionError::from)?, + &store.wait_stats(), store.shard(), store.deployment_id(), query.shape_hash, diff --git a/graphql/src/store/resolver.rs b/graphql/src/store/resolver.rs index 82c40420fa6..d7032740768 100644 --- a/graphql/src/store/resolver.rs +++ b/graphql/src/store/resolver.rs @@ -256,8 +256,8 @@ impl StoreResolver { impl Resolver for StoreResolver { const CACHEABLE: bool = true; - async fn query_permit(&self) -> Result { - self.store.query_permit().await.map_err(Into::into) + async fn query_permit(&self) -> QueryPermit { + self.store.query_permit().await } fn prefetch( diff --git a/node/src/store_builder.rs b/node/src/store_builder.rs index 27dc7d5d021..5294179f8eb 100644 --- a/node/src/store_builder.rs +++ b/node/src/store_builder.rs @@ -2,6 +2,7 @@ use std::iter::FromIterator; use std::{collections::HashMap, sync::Arc}; use graph::prelude::{o, MetricsRegistry, NodeId}; +use graph::slog::warn; use graph::url::Url; use graph::{ prelude::{info, CheapClone, Logger}, @@ -115,8 +116,23 @@ impl StoreBuilder { let shards: Vec<_> = config .stores .iter() - .map(|(name, shard)| { + .filter_map(|(name, shard)| { let logger = logger.new(o!("shard" => name.to_string())); + let pool_size = shard.pool_size.size_for(node, name).unwrap_or_else(|_| { + panic!("cannot determine the pool size for store {}", name) + }); + if pool_size == 0 { + if name == PRIMARY_SHARD.as_str() { + panic!("pool size for primary shard must be greater than 0"); + } else { + warn!( + logger, + "pool size for shard {} is 0, ignoring this shard", name + ); + return None; + } + } + let conn_pool = Self::main_pool( &logger, node, @@ -137,7 +153,7 @@ impl StoreBuilder { let name = ShardName::new(name.to_string()).expect("shard names have been validated"); - (name, conn_pool, read_only_conn_pools, weights) + Some((name, conn_pool, read_only_conn_pools, weights)) }) .collect(); diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index a60e5d35fd9..7974afe41db 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -777,8 +777,8 @@ fn entity_changes_to_graphql(entity_changes: Vec) -> r::Value { impl Resolver for IndexNodeResolver { const CACHEABLE: bool = false; - async fn query_permit(&self) -> Result { - self.store.query_permit().await.map_err(Into::into) + async fn query_permit(&self) -> QueryPermit { + self.store.query_permit().await } fn prefetch( diff --git a/store/postgres/src/block_store.rs b/store/postgres/src/block_store.rs index 9af40b8d2a0..f69267fff17 100644 --- a/store/postgres/src/block_store.rs +++ b/store/postgres/src/block_store.rs @@ -319,11 +319,7 @@ impl BlockStore { } pub(crate) async fn query_permit_primary(&self) -> QueryPermit { - self.mirror - .primary() - .query_permit() - .await - .expect("the primary is never disabled") + self.mirror.primary().query_permit().await } pub fn allocate_chain( diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index fd8c26204af..c2f5bc95a9c 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -319,8 +319,6 @@ enum PoolStateInner { Created(Arc, Arc), /// The pool has been successfully set up Ready(Arc), - /// The pool has been disabled by setting its size to 0 - Disabled(String), } /// A pool goes through several states, and this struct tracks what state we @@ -348,14 +346,6 @@ impl PoolState { } } - fn disabled(logger: Logger, name: &str) -> Self { - Self::new( - logger, - PoolStateInner::Disabled(name.to_string()), - name.to_string(), - ) - } - fn created(pool: Arc, coord: Arc) -> Self { let logger = pool.logger.clone(); let name = pool.shard.to_string(); @@ -376,7 +366,7 @@ impl PoolState { let mut guard = self.inner.lock(&self.logger); match &*guard { Created(pool, _) => *guard = Ready(pool.clone()), - Ready(_) | Disabled(_) => { /* nothing to do */ } + Ready(_) => { /* nothing to do */ } } } @@ -399,24 +389,16 @@ impl PoolState { } } Ready(pool) => Ok(pool.clone()), - Disabled(name) => Err(constraint_violation!( - "tried to access disabled database pool `{}`", - name - )), } } /// Get the inner pool, regardless of whether it has been set up or not. /// Most uses should use `get_ready` instead - fn get_unready(&self) -> Result, StoreError> { + fn get_unready(&self) -> Arc { use PoolStateInner::*; match &*self.inner.lock(&self.logger) { - Created(pool, _) | Ready(pool) => Ok(pool.cheap_clone()), - Disabled(name) => Err(constraint_violation!( - "tried to access disabled database pool `{}`", - name - )), + Created(pool, _) | Ready(pool) => pool.cheap_clone(), } } } @@ -516,24 +498,20 @@ impl ConnectionPool { let shard = Shard::new(shard_name.to_string()).expect("shard_name is a valid name for a shard"); let inner = { - if pool_size == 0 { - PoolState::disabled(logger.cheap_clone(), shard_name) + let pool = PoolInner::create( + shard.clone(), + pool_name.as_str(), + postgres_url, + pool_size, + fdw_pool_size, + logger, + registry, + state_tracker.clone(), + ); + if pool_name.is_replica() { + PoolState::ready(Arc::new(pool)) } else { - let pool = PoolInner::create( - shard.clone(), - pool_name.as_str(), - postgres_url, - pool_size, - fdw_pool_size, - logger, - registry, - state_tracker.clone(), - ); - if pool_name.is_replica() { - PoolState::ready(Arc::new(pool)) - } else { - PoolState::created(Arc::new(pool), coord) - } + PoolState::created(Arc::new(pool), coord) } }; ConnectionPool { @@ -660,20 +638,18 @@ impl ConnectionPool { .ignore_timeout(|| inner.try_get_fdw(logger, timeout)) } - pub(crate) async fn query_permit(&self) -> Result { - let pool = self.inner.get_unready()?; + pub(crate) async fn query_permit(&self) -> QueryPermit { + let pool = self.inner.get_unready(); let start = Instant::now(); let permit = pool.query_permit().await; - Ok(QueryPermit { + QueryPermit { permit, wait: start.elapsed(), - }) + } } - pub(crate) fn wait_stats(&self) -> Result { - self.inner - .get_unready() - .map(|pool| pool.wait_stats.cheap_clone()) + pub(crate) fn wait_stats(&self) -> PoolWaitStats { + self.inner.get_unready().wait_stats.cheap_clone() } /// Mirror key tables from the primary into our own schema. We do this @@ -1447,12 +1423,11 @@ impl PoolCoordinator { // yet. We remember the `PoolInner` so that later, when we have to // call `remap()`, we do not have to take this lock as that will be // already held in `get_ready()` - if let Some(inner) = pool.inner.get_unready().ok() { - self.pools - .lock() - .unwrap() - .insert(pool.shard.clone(), inner.clone()); - } + let inner = pool.inner.get_unready(); + self.pools + .lock() + .unwrap() + .insert(pool.shard.clone(), inner.clone()); } pool } diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 96dd5507f3e..91230d63b7b 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -415,7 +415,7 @@ impl DeploymentStore { Ok(conn) } - pub(crate) async fn query_permit(&self, replica: ReplicaId) -> Result { + pub(crate) async fn query_permit(&self, replica: ReplicaId) -> QueryPermit { let pool = match replica { ReplicaId::Main => &self.pool, ReplicaId::ReadOnly(idx) => &self.read_only_pools[idx], @@ -423,7 +423,7 @@ impl DeploymentStore { pool.query_permit().await } - pub(crate) fn wait_stats(&self, replica: ReplicaId) -> Result { + pub(crate) fn wait_stats(&self, replica: ReplicaId) -> PoolWaitStats { match replica { ReplicaId::Main => self.pool.wait_stats(), ReplicaId::ReadOnly(idx) => self.read_only_pools[idx].wait_stats(), diff --git a/store/postgres/src/query_store.rs b/store/postgres/src/query_store.rs index 8fc2da822e4..fe7d084030b 100644 --- a/store/postgres/src/query_store.rs +++ b/store/postgres/src/query_store.rs @@ -112,7 +112,7 @@ impl QueryStoreTrait for QueryStore { self.chain_store.block_numbers(block_hashes).await } - fn wait_stats(&self) -> Result { + fn wait_stats(&self) -> PoolWaitStats { self.store.wait_stats(self.replica_id) } @@ -137,7 +137,7 @@ impl QueryStoreTrait for QueryStore { &self.site.network } - async fn query_permit(&self) -> Result { + async fn query_permit(&self) -> QueryPermit { self.store.query_permit(self.replica_id).await } diff --git a/store/postgres/src/store.rs b/store/postgres/src/store.rs index 50a5e4b21e0..7eb428a5058 100644 --- a/store/postgres/src/store.rs +++ b/store/postgres/src/store.rs @@ -167,8 +167,8 @@ impl StatusStore for Store { .await } - async fn query_permit(&self) -> Result { + async fn query_permit(&self) -> QueryPermit { // Status queries go to the primary shard. - Ok(self.block_store.query_permit_primary().await) + self.block_store.query_permit_primary().await } } diff --git a/store/test-store/tests/graphql/introspection.rs b/store/test-store/tests/graphql/introspection.rs index 6139e673767..8bc76213e6b 100644 --- a/store/test-store/tests/graphql/introspection.rs +++ b/store/test-store/tests/graphql/introspection.rs @@ -53,15 +53,15 @@ impl Resolver for MockResolver { Ok(r::Value::Null) } - async fn query_permit(&self) -> Result { + async fn query_permit(&self) -> QueryPermit { let permit = Arc::new(tokio::sync::Semaphore::new(1)) .acquire_owned() .await .unwrap(); - Ok(QueryPermit { + QueryPermit { permit, wait: Duration::from_secs(0), - }) + } } } From eb6fae71fd9a05c38b779a97b0bb3de8f17edd7b Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 4 Apr 2025 16:15:33 -0700 Subject: [PATCH 12/13] store: Make sure we do not run setup twice for the same pool With the previous code, we would run setup initially when creating all pools, but they would not be marked as set up. On the first access to the pool we would try to run setup again, which is not needed. This change makes it so that we remember that we ran setup successfully when pools are created --- store/postgres/src/connection_pool.rs | 132 ++++++++++++++++---------- 1 file changed, 84 insertions(+), 48 deletions(-) diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index c2f5bc95a9c..9a6afe9a37e 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -373,22 +373,27 @@ impl PoolState { /// Get a connection pool that is ready, i.e., has been through setup /// and running migrations fn get_ready(&self) -> Result, StoreError> { - let mut guard = self.inner.lock(&self.logger); + // We have to be careful here that we do not hold a lock when we + // call `setup_bg`, otherwise we will deadlock + let (pool, coord) = { + let guard = self.inner.lock(&self.logger); + + use PoolStateInner::*; + match &*guard { + Created(pool, coord) => (pool.cheap_clone(), coord.cheap_clone()), + Ready(pool) => return Ok(pool.clone()), + } + }; - use PoolStateInner::*; - match &*guard { - Created(pool, coord) => { - let migrated = coord.cheap_clone().setup_bg(pool.cheap_clone())?; + // self is `Created` and needs to have setup run + coord.setup_bg(self.cheap_clone())?; - if migrated { - let pool2 = pool.cheap_clone(); - *guard = Ready(pool.cheap_clone()); - Ok(pool2) - } else { - Err(StoreError::DatabaseUnavailable) - } - } - Ready(pool) => Ok(pool.clone()), + // We just tried to set up the pool; if it is still not set up and + // we didn't have an error, it means the database is not available + if self.needs_setup() { + return Err(StoreError::DatabaseUnavailable); + } else { + Ok(pool) } } @@ -401,6 +406,16 @@ impl PoolState { Created(pool, _) | Ready(pool) => pool.cheap_clone(), } } + + fn needs_setup(&self) -> bool { + let guard = self.inner.lock(&self.logger); + + use PoolStateInner::*; + match &*guard { + Created(_, _) => true, + Ready(_) => false, + } + } } #[derive(Clone)] pub struct ConnectionPool { @@ -1186,7 +1201,7 @@ impl PoolInner { async fn migrate( self: Arc, servers: &[ForeignServer], - ) -> Result<(Arc, MigrationCount), StoreError> { + ) -> Result { self.configure_fdw(servers)?; let mut conn = self.get()?; let (this, count) = conn.transaction(|conn| -> Result<_, StoreError> { @@ -1196,7 +1211,7 @@ impl PoolInner { this.locale_check(&this.logger, conn)?; - Ok((this, count)) + Ok(count) } /// If this is the primary shard, drop the namespace `CROSS_SHARD_NSP` @@ -1379,7 +1394,7 @@ fn migrate_schema(logger: &Logger, conn: &mut PgConnection) -> Result>>, + pools: Mutex>, servers: Arc>, } @@ -1419,16 +1434,12 @@ impl PoolCoordinator { // Ignore non-writable pools (replicas), there is no need (and no // way) to coordinate schema changes with them if is_writable { - // It is safe to take this lock here since nobody has seen the pool - // yet. We remember the `PoolInner` so that later, when we have to - // call `remap()`, we do not have to take this lock as that will be - // already held in `get_ready()` - let inner = pool.inner.get_unready(); self.pools .lock() .unwrap() - .insert(pool.shard.clone(), inner.clone()); + .insert(pool.shard.clone(), pool.inner.cheap_clone()); } + pool } @@ -1460,6 +1471,7 @@ impl PoolCoordinator { if count.had_migrations() { let server = self.server(&pool.shard)?; for pool in self.pools.lock().unwrap().values() { + let pool = pool.get_unready(); let remap_res = pool.remap(server); if let Err(e) = remap_res { error!(pool.logger, "Failed to map imports from {}", server.shard; "error" => e.to_string()); @@ -1470,8 +1482,15 @@ impl PoolCoordinator { Ok(()) } + /// Return a list of all pools, regardless of whether they are ready or + /// not. pub fn pools(&self) -> Vec> { - self.pools.lock().unwrap().values().cloned().collect() + self.pools + .lock() + .unwrap() + .values() + .map(|state| state.get_unready()) + .collect::>() } pub fn servers(&self) -> Arc> { @@ -1486,14 +1505,12 @@ impl PoolCoordinator { } fn primary(&self) -> Result, StoreError> { - self.pools - .lock() - .unwrap() - .get(&*PRIMARY_SHARD) - .cloned() - .ok_or_else(|| { - constraint_violation!("internal error: primary shard not found in pool coordinator") - }) + let map = self.pools.lock().unwrap(); + let pool_state = map.get(&*&PRIMARY_SHARD).ok_or_else(|| { + constraint_violation!("internal error: primary shard not found in pool coordinator") + })?; + + Ok(pool_state.get_unready()) } /// Setup all pools the coordinator knows about and return the number of @@ -1528,7 +1545,7 @@ impl PoolCoordinator { /// A helper to call `setup` from a non-async context. Returns `true` if /// the setup was actually run, i.e. if `pool` was available - fn setup_bg(self: Arc, pool: Arc) -> Result { + fn setup_bg(self: Arc, pool: PoolState) -> Result { let migrated = graph::spawn_thread("database-setup", move || { graph::block_on(self.setup(vec![pool.clone()])) }) @@ -1555,37 +1572,43 @@ impl PoolCoordinator { /// This method tolerates databases that are not available and will /// simply ignore them. The returned count is the number of pools that /// were successfully set up. - async fn setup(&self, pools: Vec>) -> Result { - type MigrationCounts = Vec<(Arc, MigrationCount)>; + /// + /// When this method returns, the entries from `states` that were + /// successfully set up will be marked as ready. The method returns the + /// number of pools that were set up + async fn setup(&self, states: Vec) -> Result { + type MigrationCounts = Vec<(PoolState, MigrationCount)>; /// Filter out pools that are not available. We don't want to fail /// because one of the pools is not available. We will just ignore /// them and continue with the others. fn filter_unavailable( - (pool, res): (Arc, Result), - ) -> Option> { + (state, res): (PoolState, Result), + ) -> Option> { if let Err(StoreError::DatabaseUnavailable) = res { error!( - pool.logger, + state.logger, "migrations failed because database was unavailable" ); None } else { - Some(res) + Some(res.map(|count| (state, count))) } } /// Migrate all pools in parallel async fn migrate( - pools: &[Arc], + pools: &[PoolState], servers: &[ForeignServer], ) -> Result { let futures = pools .iter() - .map(|pool| { - pool.cheap_clone() + .map(|state| { + state + .get_unready() + .cheap_clone() .migrate(servers) - .map(|res| (pool.cheap_clone(), res)) + .map(|res| (state.cheap_clone(), res)) }) .collect::>(); join_all(futures) @@ -1599,26 +1622,32 @@ impl PoolCoordinator { async fn propagate( this: &PoolCoordinator, migrated: MigrationCounts, - ) -> Result { + ) -> Result, StoreError> { let futures = migrated .into_iter() - .map(|(pool, count)| async move { + .map(|(state, count)| async move { + let pool = state.get_unready(); let res = this.propagate(&pool, count); - (pool.cheap_clone(), res) + (state.cheap_clone(), res) }) .collect::>(); join_all(futures) .await .into_iter() .filter_map(filter_unavailable) + .map(|res| res.map(|(state, ())| state)) .collect::, _>>() - .map(|v| v.len()) } let primary = self.primary()?; let mut pconn = primary.get().map_err(|_| StoreError::DatabaseUnavailable)?; + let pools: Vec<_> = states + .into_iter() + .filter(|pool| pool.needs_setup()) + .collect(); + // Everything here happens under the migration lock. Anything called // from here should not try to get that lock, otherwise the process // will deadlock @@ -1636,6 +1665,13 @@ impl PoolCoordinator { }) .await; debug!(self.logger, "Database setup finished"); - res + + // Mark all pool states that we set up completely as ready + res.map(|states| { + for state in &states { + state.set_ready(); + } + states.len() + }) } } From c23ee969d1d420d0cb331020a3ba335adce27799 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Sun, 6 Apr 2025 11:14:45 -0700 Subject: [PATCH 13/13] store: Avoid running setup unnecessarily if several threads try to run it --- store/postgres/src/connection_pool.rs | 33 +++++++++++++++++++-------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index 9a6afe9a37e..6ff46649494 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -1643,10 +1643,13 @@ impl PoolCoordinator { let mut pconn = primary.get().map_err(|_| StoreError::DatabaseUnavailable)?; - let pools: Vec<_> = states + let states: Vec<_> = states .into_iter() .filter(|pool| pool.needs_setup()) .collect(); + if states.is_empty() { + return Ok(0); + } // Everything here happens under the migration lock. Anything called // from here should not try to get that lock, otherwise the process @@ -1654,24 +1657,34 @@ impl PoolCoordinator { debug!(self.logger, "Waiting for migration lock"); let res = with_migration_lock(&mut pconn, |_| async { debug!(self.logger, "Migration lock acquired"); + + // While we were waiting for the migration lock, another thread + // might have already run this + let states: Vec<_> = states + .into_iter() + .filter(|pool| pool.needs_setup()) + .collect(); + if states.is_empty() { + debug!(self.logger, "No pools to set up"); + return Ok(0); + } + primary.drop_cross_shard_views()?; - let migrated = migrate(&pools, self.servers.as_ref()).await?; + let migrated = migrate(&states, self.servers.as_ref()).await?; let propagated = propagate(&self, migrated).await?; primary.create_cross_shard_views(&self.servers)?; - Ok(propagated) - }) - .await; - debug!(self.logger, "Database setup finished"); - // Mark all pool states that we set up completely as ready - res.map(|states| { - for state in &states { + for state in &propagated { state.set_ready(); } - states.len() + Ok(propagated.len()) }) + .await; + debug!(self.logger, "Database setup finished"); + + res } }