Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev-tools/omdb/src/bin/omdb/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl DbUrlOptions {

let db_config = db::Config { url: db_url.clone() };
let pool =
Arc::new(db::Pool::new_single_host(&log.clone(), &db_config));
Arc::new(db::PoolBuilder::new_single_host(&log, db_config).build());

// Being a dev tool, we want to try this operation even if the schema
// doesn't match what we expect. So we use `DataStore::new_unchecked()`
Expand Down
2 changes: 1 addition & 1 deletion dev-tools/reconfigurator-exec-unsafe/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl ReconfiguratorExec {
internal_dns_resolver::QorbResolver::new(vec![self.dns_server]);

info!(&log, "setting up database pool");
let pool = Arc::new(db::Pool::new(&log, &qorb_resolver));
let pool = Arc::new(db::PoolBuilder::new(&log, &qorb_resolver).build());
let datastore = Arc::new(
DataStore::new_failfast(&log, pool)
.await
Expand Down
6 changes: 4 additions & 2 deletions live-tests/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,10 @@ async fn create_datastore(
.context("failed to parse constructed postgres URL")?;

let db_config = nexus_db_queries::db::Config { url };
let pool =
Arc::new(nexus_db_queries::db::Pool::new_single_host(log, &db_config));
let pool = Arc::new(
nexus_db_queries::db::PoolBuilder::new_single_host(&log, db_config)
.build(),
);
DataStore::new_failfast(log, pool)
.await
.context("creating DataStore")
Expand Down
12 changes: 11 additions & 1 deletion nexus-config/src/nexus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ pub struct MgdConfig {
struct UnvalidatedTunables {
max_vpc_ipv4_subnet_prefix: u8,
load_timeout: Option<std::time::Duration>,
collect_backtraces: Option<bool>,
}

/// Configuration for HTTP clients to external services.
Expand Down Expand Up @@ -301,6 +302,12 @@ pub struct Tunables {
///
/// If "None", nexus loops forever during initialization.
pub load_timeout: Option<std::time::Duration>,

/// Should backtraces be collected for database connections?
///
/// If "None", uses the default configured by the `PoolBuilder`
/// in nexus_db_queries.
pub collect_backtraces: Option<bool>,
}

// Convert from the unvalidated tunables, verifying each parameter as needed.
Expand All @@ -312,6 +319,7 @@ impl TryFrom<UnvalidatedTunables> for Tunables {
Ok(Tunables {
max_vpc_ipv4_subnet_prefix: unvalidated.max_vpc_ipv4_subnet_prefix,
load_timeout: unvalidated.load_timeout,
collect_backtraces: unvalidated.collect_backtraces,
})
}
}
Expand Down Expand Up @@ -363,6 +371,7 @@ impl Default for Tunables {
Tunables {
max_vpc_ipv4_subnet_prefix: MAX_VPC_IPV4_SUBNET_PREFIX,
load_timeout: None,
collect_backtraces: None,
}
}
}
Expand Down Expand Up @@ -1201,7 +1210,8 @@ mod test {
schema: None,
tunables: Tunables {
max_vpc_ipv4_subnet_prefix: 27,
load_timeout: None
load_timeout: None,
collect_backtraces: None,
},
dendrite: HashMap::from([(
SwitchLocation::Switch0,
Expand Down
1 change: 1 addition & 0 deletions nexus/db-queries/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub use config::Config;
pub use datastore::DataStore;
pub use on_conflict_ext::IncompleteOnConflictExt;
pub use pool::Pool;
pub use pool::PoolBuilder;
pub use saga_types::SecId;
pub use sec_store::CockroachDbSecStore;

Expand Down
144 changes: 69 additions & 75 deletions nexus/db-queries/src/db/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use tokio::sync::watch;
/// Wrapper around a database connection pool.
///
/// Expected to be used as the primary interface to the database.
///
/// Can be constructed with a [`PoolBuilder`].
pub struct Pool {
// IDs are assigned to each connection, acting as keys within the Quiesce
// state. These are used to track the set of in-use connections and
Expand All @@ -37,6 +39,7 @@ pub struct Pool {
log: Logger,
terminated: std::sync::atomic::AtomicBool,
quiesce: watch::Sender<Quiesce>,
collect_backtraces: bool,
}

// Provides an alternative to the DNS resolver for cases where we want to
Expand All @@ -46,7 +49,7 @@ struct SingleHostResolver {
}

impl SingleHostResolver {
fn new(config: &DbConfig) -> Self {
fn new(config: DbConfig) -> Self {
let backends = Arc::new(BTreeMap::from([(
backend::Name::new("singleton"),
backend::Backend { address: config.url.address() },
Expand All @@ -63,7 +66,7 @@ impl Resolver for SingleHostResolver {
}

fn make_single_host_resolver(
config: &DbConfig,
config: DbConfig,
) -> qorb::resolver::BoxedResolver {
Box::new(SingleHostResolver::new(config))
}
Expand All @@ -85,81 +88,60 @@ fn make_postgres_connector(
))
}

impl Pool {
/// Creates a new qorb-backed connection pool to the database.
///
/// Creating this pool does not necessarily wait for connections to become
/// available, as backends may shift over time.
pub fn new(log: &Logger, resolver: &QorbResolver) -> Self {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we already have three different constructors for a Pool...

  • I didn't want to add an argument to all of them
  • I didn't want to add a fourth constructor

So I switched to a builder pattern instead.

let resolver = resolver.for_service(ServiceName::Cockroach);
let connector = make_postgres_connector(log);
let policy = Policy::default();
let inner = match qorb::pool::Pool::new(
"crdb".to_string(),
resolver,
connector,
policy,
) {
Ok(pool) => {
debug!(log, "registered USDT probes");
pool
}
Err(err) => {
error!(log, "failed to register USDT probes");
err.into_inner()
}
};
Self::new_common(inner, log.clone())
enum ConnectWith<'a> {
Resolver(&'a QorbResolver),
SingleHost(Box<DbConfig>),
}

/// Utility for building [`Pool`]s.
pub struct PoolBuilder<'a> {
log: &'a Logger,
connect_with: ConnectWith<'a>,
policy: Option<Policy>,
collect_backtraces: Option<bool>,
}

impl<'a> PoolBuilder<'a> {
/// Uses a resolver to connect to the database.
pub fn new(log: &'a Logger, resolver: &'a QorbResolver) -> Self {
let connect_with = ConnectWith::Resolver(resolver);
Self { log, connect_with, policy: None, collect_backtraces: None }
}

/// Creates a new qorb-backed connection pool to a single instance of the
/// database.
///
/// This is intended for tests that want to skip DNS resolution, relying
/// on a single instance of the database.
///
/// In production, [Self::new] should be preferred.
pub fn new_single_host(log: &Logger, db_config: &DbConfig) -> Self {
let resolver = make_single_host_resolver(db_config);
let connector = make_postgres_connector(log);
let policy = Policy::default();
let inner = match qorb::pool::Pool::new(
"crdb-single-host".to_string(),
resolver,
connector,
policy,
) {
Ok(pool) => {
debug!(log, "registered USDT probes");
pool
/// Connects to a single hard-coded database node.
pub fn new_single_host(log: &'a Logger, config: DbConfig) -> Self {
let connect_with = ConnectWith::SingleHost(Box::new(config));
Self { log, connect_with, policy: None, collect_backtraces: None }
}

pub fn policy(mut self, policy: Policy) -> Self {
self.policy = Some(policy);
self
}

pub fn collect_backtraces(mut self, collect_backtraces: bool) -> Self {
self.collect_backtraces = Some(collect_backtraces);
self
}

pub fn build(self) -> Pool {
let Self { log, connect_with, policy, collect_backtraces } = self;

let (resolver, name) = match connect_with {
ConnectWith::Resolver(resolver) => {
(resolver.for_service(ServiceName::Cockroach), "crdb")
}
Err(err) => {
error!(log, "failed to register USDT probes");
err.into_inner()
ConnectWith::SingleHost(config) => {
(make_single_host_resolver(*config), "crdb-single-host")
}
};
Self::new_common(inner, log.clone())
}

/// Creates a new qorb-backed connection pool which returns an error
/// if claims are not available within one millisecond.
///
/// This is intended for test-only usage, in particular for tests where
/// claim requests should rapidly return errors when a backend has been
/// intentionally disabled.
#[cfg(any(test, feature = "testing"))]
pub fn new_single_host_failfast(
log: &Logger,
db_config: &DbConfig,
) -> Self {
let resolver = make_single_host_resolver(db_config);
let connector = make_postgres_connector(log);
let policy = Policy {
claim_timeout: tokio::time::Duration::from_millis(1),
..Default::default()
};

let policy = policy.unwrap_or_else(|| Policy::default());
let collect_backtraces = collect_backtraces.unwrap_or(true);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where we match the current behavior for backtrace collection by default: unless requested otherwise, collect_backtraces = true.


let inner = match qorb::pool::Pool::new(
"crdb-single-host-failfast".to_string(),
name.to_string(),
resolver,
connector,
policy,
Expand All @@ -173,12 +155,15 @@ impl Pool {
err.into_inner()
}
};
Self::new_common(inner, log.clone())
Pool::new(inner, log.clone(), collect_backtraces)
}
}

fn new_common(
impl Pool {
fn new(
inner: qorb::pool::Pool<AsyncConnection>,
log: Logger,
collect_backtraces: bool,
) -> Self {
let (quiesce, _) = watch::channel(Quiesce {
new_claims_allowed: ClaimsAllowed::Allowed,
Expand All @@ -190,14 +175,19 @@ impl Pool {
log,
terminated: std::sync::atomic::AtomicBool::new(false),
quiesce,
collect_backtraces,
}
}

/// Returns a connection from the pool
pub async fn claim(&self) -> Result<DataStoreConnection, Error> {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let held_since = Utc::now();
let debug = Backtrace::force_capture().to_string();
let debug = if self.collect_backtraces {
Backtrace::force_capture().to_string()
} else {
"<backtraces disabled>".to_string()
};
let allowed = self.quiesce.send_if_modified(|q| {
if let ClaimsAllowed::Disallowed = q.new_claims_allowed {
false
Expand Down Expand Up @@ -366,7 +356,9 @@ mod test {
let mut db = crdb::test_setup_database(log).await;
let cfg = crate::db::Config { url: db.pg_config().clone() };
{
let pool = Pool::new_single_host(&log, &cfg);
let pool = PoolBuilder::new_single_host(&log, cfg)
.collect_backtraces(false)
.build();
pool.terminate().await;
}
db.cleanup().await.unwrap();
Expand All @@ -383,7 +375,9 @@ mod test {
let mut db = crdb::test_setup_database(log).await;
let cfg = crate::db::Config { url: db.pg_config().clone() };
{
let pool = Pool::new_single_host(&log, &cfg);
let pool = PoolBuilder::new_single_host(&log, cfg)
.collect_backtraces(false)
.build();
drop(pool);
}
db.cleanup().await.unwrap();
Expand Down
13 changes: 11 additions & 2 deletions nexus/db-queries/src/db/pub_test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@ enum Interface {

fn new_pool(log: &Logger, db: &CockroachInstance) -> Arc<db::Pool> {
let cfg = db::Config { url: db.pg_config().clone() };
Arc::new(db::Pool::new_single_host(log, &cfg))

Arc::new(
db::PoolBuilder::new_single_host(&log, cfg)
.collect_backtraces(false)
.build(),
)
}

struct TestDatabaseBuilder {
Expand Down Expand Up @@ -322,7 +327,11 @@ async fn datastore_test(
use crate::authn;

let cfg = db::Config { url: db.pg_config().clone() };
let pool = Arc::new(db::Pool::new_single_host(&log, &cfg));
let pool = Arc::new(
db::PoolBuilder::new_single_host(&log, cfg)
.collect_backtraces(false)
.build(),
);
let datastore = Arc::new(
DataStore::new(&log, pool, None, IdentityCheckPolicy::DontCare)
.await
Expand Down
3 changes: 2 additions & 1 deletion nexus/src/bin/schema-updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ async fn main_impl() -> anyhow::Result<()> {
let all_versions = AllSchemaVersions::load(&schema_config.schema_dir)?;

let crdb_cfg = db::Config { url: args.url };
let pool = Arc::new(db::Pool::new_single_host(&log, &crdb_cfg));
let pool =
Arc::new(db::PoolBuilder::new_single_host(&log, crdb_cfg).build());

// We use the unchecked constructor of the datastore because we
// don't want to block on someone else applying an upgrade.
Expand Down
Loading
Loading