diff --git a/crates/rbuilder-utils/src/clickhouse/backup/mod.rs b/crates/rbuilder-utils/src/clickhouse/backup/mod.rs index 67d800bd3..6b872522f 100644 --- a/crates/rbuilder-utils/src/clickhouse/backup/mod.rs +++ b/crates/rbuilder-utils/src/clickhouse/backup/mod.rs @@ -33,8 +33,6 @@ use crate::{ tasks::TaskExecutor, }; -const TARGET: &str = "clickhouse_with_backup::backup"; - /// A type alias for disk backup keys. type DiskBackupKey = u128; /// A type alias for disk backup tables. @@ -402,14 +400,20 @@ impl DiskBackup { async fn flush_routine(mut self) { loop { self.config.flush_interval.tick().await; - let start = Instant::now(); - match self.flush().await { - Ok(_) => { - tracing::debug!(target: TARGET, elapsed = ?start.elapsed(), "flushed backup write buffer to disk"); - } - Err(e) => { - tracing::error!(target: TARGET, ?e, "failed to flush backup write buffer to disk"); - } + self.flush_routine_inner().await; + } + } + + /// The inner flush routine, instrumented for tracing. + #[tracing::instrument(name = "disk_backup_flush_routine", skip(self))] + async fn flush_routine_inner(&mut self) { + let start = Instant::now(); + match self.flush().await { + Ok(_) => { + tracing::debug!(elapsed = ?start.elapsed(), "flushed backup write buffer to disk"); + } + Err(e) => { + tracing::error!(?e, "failed to flush backup write buffer to disk"); } } } @@ -551,9 +555,10 @@ impl Backup { } /// Backs up a failed commit, first trying to write to disk, then to memory. + #[tracing::instrument(name = "indexer_backup", skip_all, fields(order = T::ORDER))] fn backup(&mut self, failed_commit: FailedCommit) { let quantities = failed_commit.quantities; - tracing::debug!(target: TARGET, order = T::ORDER, bytes = ?quantities.bytes, rows = ?quantities.rows, "backing up failed commit"); + tracing::debug!(bytes = ?quantities.bytes, rows = ?quantities.rows, "backing up failed commit"); #[cfg(any(test, feature = "test-utils"))] if self.use_only_memory_backup { @@ -568,23 +573,26 @@ impl Backup { let start = Instant::now(); match self.disk_backup.save(&failed_commit) { Ok(stats) => { - tracing::debug!(target: TARGET, order = T::ORDER, total_size = stats.size_bytes.format_bytes(), elapsed = ?start.elapsed(), "saved failed commit to disk"); + tracing::debug!(total_size = stats.size_bytes.format_bytes(), elapsed = ?start.elapsed(), "saved failed commit to disk"); MetricsType::set_disk_backup_size(stats.size_bytes, stats.total_batches, T::ORDER); return; } Err(e) => { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to write commit, trying in-memory"); + tracing::error!(?e, "failed to write commit, trying in-memory"); MetricsType::increment_backup_disk_errors(T::ORDER, e.as_ref()); } }; let stats = self.memory_backup.save(failed_commit); MetricsType::set_memory_backup_size(stats.size_bytes, stats.total_batches, T::ORDER); - tracing::debug!(target: TARGET, order = T::ORDER, bytes = ?quantities.bytes, rows = ?quantities.rows, ?stats, "saved failed commit in-memory"); + tracing::debug!(bytes = ?quantities.bytes, rows = ?quantities.rows, ?stats, "saved failed commit in-memory"); if let Some((stats, oldest_quantities)) = self.memory_backup.drop_excess() { - tracing::warn!(target: TARGET, order = T::ORDER, ?stats, "failed commits exceeded max memory backup size, dropping oldest"); + tracing::warn!( + ?stats, + "failed commits exceeded max memory backup size, dropping oldest" + ); MetricsType::process_backup_data_lost_quantities(&oldest_quantities); // Clear the cached last commit if it was from memory and we just dropped it. self.last_cached = self @@ -595,14 +603,15 @@ impl Backup { } /// Retrieves the oldest failed commit, first trying from memory, then from disk. + #[tracing::instrument(name = "indexer_backup_retrieve", skip_all, fields(order = T::ORDER))] fn retrieve_oldest(&mut self) -> Option> { if let Some(cached) = self.last_cached.take() { - tracing::debug!(target: TARGET, order = T::ORDER, rows = cached.commit.rows.len(), "retrieved last cached failed commit"); + tracing::debug!(rows = cached.commit.rows.len(), "last cached commit"); return Some(cached); } if let Some(commit) = self.memory_backup.retrieve_oldest() { - tracing::debug!(target: TARGET, order = T::ORDER, rows = commit.rows.len(), "retrieved oldest failed commit from memory"); + tracing::debug!(rows = commit.rows.len(), "oldest commit from memory"); return Some(RetrievedFailedCommit { source: BackupSource::Memory, commit, @@ -610,17 +619,16 @@ impl Backup { } match self.disk_backup.retrieve_oldest() { - Ok(maybe_commit) => { - maybe_commit.inspect(|data| { - tracing::debug!(target: TARGET, order = T::ORDER, rows = data.stats.total_batches, "retrieved oldest failed commit from disk"); + Ok(maybe_commit) => maybe_commit + .inspect(|data| { + tracing::debug!(rows = data.stats.total_batches, "oldest commit from disk"); }) .map(|data| RetrievedFailedCommit { source: BackupSource::Disk(data.key), commit: data.value, - }) - } + }), Err(e) => { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to retrieve oldest failed commit from disk"); + tracing::error!(?e, "failed to retrieve oldest commit from disk"); MetricsType::increment_backup_disk_errors(T::ORDER, e.as_ref()); None } @@ -628,13 +636,14 @@ impl Backup { } /// Populates the inserter with the rows from the given failed commit. + #[tracing::instrument(name = "indexer_backup_populate_inserter", skip_all, fields(order = T::ORDER))] async fn populate_inserter(&mut self, commit: &FailedCommit) { for row in &commit.rows { let value_ref = T::to_row_ref(row); if let Err(e) = self.inserter.write(value_ref).await { MetricsType::increment_write_failures(e.to_string()); - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to write to backup inserter"); + tracing::error!(?e, "failed to write to inserter"); continue; } } @@ -646,7 +655,7 @@ impl Backup { let start = Instant::now(); match self.disk_backup.delete(key) { Ok(stats) => { - tracing::debug!(target: TARGET, order = T::ORDER, total_size = stats.size_bytes.format_bytes(), elapsed = ?start.elapsed(), "deleted failed commit from disk"); + tracing::debug!(total_size = stats.size_bytes.format_bytes(), elapsed = ?start.elapsed(), "deleted failed commit from disk"); MetricsType::set_disk_backup_size( stats.size_bytes, stats.total_batches, @@ -654,10 +663,29 @@ impl Backup { ); } Err(e) => { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to purge failed commit from disk"); + tracing::error!(?e, "failed to purge failed commit from disk"); } } - tracing::debug!(target: TARGET, order = T::ORDER, "purged committed failed commit from disk"); + tracing::debug!("purged committed failed commit from disk"); + } + } + + #[tracing::instrument(name = "indexer_backup_force_commit", skip_all, fields(order = T::ORDER))] + async fn force_commit(&mut self, oldest: RetrievedFailedCommit) { + let start = Instant::now(); + match self.inserter.force_commit().await { + Ok(quantities) => { + tracing::info!(?quantities, "successfully backed up"); + MetricsType::process_backup_data_quantities(&quantities.into()); + MetricsType::record_batch_commit_time(start.elapsed()); + self.interval.reset(); + self.purge_commit(&oldest).await; + } + Err(e) => { + tracing::error!(?e, quantities = ?oldest.commit.quantities, "failed to commit bundle to clickhouse from backup"); + MetricsType::increment_commit_failures(e.to_string()); + self.last_cached = Some(oldest); + } } } @@ -669,7 +697,7 @@ impl Backup { tokio::select! { maybe_failed_commit = self.rx.recv() => { let Some(failed_commit) = maybe_failed_commit else { - tracing::error!(target: TARGET, order = T::ORDER, "backup channel closed"); + tracing::warn!(order = T::ORDER, "backup channel closed"); break; }; @@ -683,23 +711,7 @@ impl Backup { }; self.populate_inserter(&oldest.commit).await; - - let start = Instant::now(); - match self.inserter.force_commit().await { - Ok(quantities) => { - tracing::info!(target: TARGET, order = T::ORDER, ?quantities, "successfully backed up"); - MetricsType::process_backup_data_quantities(&quantities.into()); - MetricsType::record_batch_commit_time(start.elapsed()); - self.interval.reset(); - self.purge_commit(&oldest).await; - } - Err(e) => { - tracing::error!(target: TARGET, order = T::ORDER, ?e, quantities = ?oldest.commit.quantities, "failed to commit bundle to clickhouse from backup"); - MetricsType::increment_commit_failures(e.to_string()); - self.last_cached = Some(oldest); - continue; - } - } + self.force_commit(oldest).await; } } } @@ -707,39 +719,40 @@ impl Backup { /// To call on shutdown, tries make a last-resort attempt to post back to Clickhouse all /// in-memory data. + #[tracing::instrument(name = "indexer_backup_end", skip(self), fields(order = T::ORDER))] pub async fn end(mut self) { for failed_commit in self.memory_backup.failed_commits.drain(..) { for row in &failed_commit.rows { let value_ref = T::to_row_ref(row); if let Err(e) = self.inserter.write(value_ref).await { - tracing::error!( target: TARGET, order = T::ORDER, ?e, "failed to write to backup inserter during shutdown"); + tracing::error!(?e, "failed to write to inserter"); MetricsType::increment_write_failures(e.to_string()); continue; } } if let Err(e) = self.inserter.force_commit().await { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to commit backup to CH during shutdown, trying disk"); + tracing::error!(?e, "failed to force commit, trying disk"); MetricsType::increment_commit_failures(e.to_string()); } if let Err(e) = self.disk_backup.save(&failed_commit) { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to write commit to disk backup during shutdown"); + tracing::error!(?e, "failed to write commit to disk"); MetricsType::increment_backup_disk_errors(T::ORDER, e.as_ref()); } } if let Err(e) = self.disk_backup.flush().await { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to flush disk backup during shutdown"); + tracing::error!(?e, "failed to flush disk"); MetricsType::increment_backup_disk_errors(T::ORDER, e.as_ref()); } else { - tracing::info!(target: TARGET, order = T::ORDER, "flushed disk backup during shutdown"); + tracing::info!("flushed disk"); } if let Err(e) = self.inserter.end().await { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to end backup inserter during shutdown"); + tracing::error!(?e, "failed to end inserter"); } else { - tracing::info!(target: TARGET, order = T::ORDER, "successfully ended backup inserter during shutdown"); + tracing::info!("ended backup inserter during shutdown"); } } } diff --git a/crates/rbuilder-utils/src/clickhouse/indexer.rs b/crates/rbuilder-utils/src/clickhouse/indexer.rs index bfc732ed8..06ccd3458 100644 --- a/crates/rbuilder-utils/src/clickhouse/indexer.rs +++ b/crates/rbuilder-utils/src/clickhouse/indexer.rs @@ -5,13 +5,11 @@ use std::{ time::{Duration, Instant}, }; -/// The tracing target for this indexer crate. @PendingDX REMOVE -const TARGET: &str = "indexer"; - use clickhouse::{ error::Result as ClickhouseResult, inserter::Inserter, Client as ClickhouseClient, Row, }; use tokio::sync::mpsc; +use tracing::Instrument; use crate::{ clickhouse::{ @@ -91,12 +89,11 @@ impl ClickhouseInserter ClickhouseInserter { if quantities == Quantities::ZERO.into() { - tracing::trace!(target: TARGET, order = T::ORDER, "committed to inserter"); + tracing::trace!("committed batch to inserter"); } else { - tracing::debug!(target: TARGET, order = T::ORDER, ?quantities, "inserted batch to clickhouse"); + tracing::debug!(?quantities, "committed batch to server"); MetricsType::process_quantities(&quantities.into()); MetricsType::record_batch_commit_time(start.elapsed()); // Clear the backup rows. @@ -125,13 +122,13 @@ impl ClickhouseInserter { MetricsType::increment_commit_failures(e.to_string()); - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to commit bundle to clickhouse"); + tracing::error!(?e, "failed to commit bundle"); let rows = std::mem::take(&mut self.rows_backup); let failed_commit = FailedCommit::new(rows, pending); if let Err(e) = self.backup_tx.try_send(failed_commit) { - tracing::error!(target: TARGET, order = T::ORDER, ?e, "failed to send rows backup"); + tracing::error!(?e, "failed to send rows backup"); } } } @@ -194,16 +191,26 @@ impl InserterRunner ClickhouseResult {