Skip to content

Commit

Permalink
feat: make the backup service a one-time job
Browse files Browse the repository at this point in the history
  • Loading branch information
armyhaylenko committed Jan 20, 2025
1 parent a7a47d1 commit 23a0aa9
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 110 deletions.
2 changes: 0 additions & 2 deletions .env.example_new
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ ROCKS_ARCHIVES_DIR="path/to/rocks/backup/archives"
ROCKS_BACKUP_ARCHIVES_DIR="path/to/rocks/backup/archives"
ROCKS_MIGRATION_STORAGE_PATH=/path/to/migration_storage
ROCKS_FLUSH_BEFORE_BACKUP=true
ROCKS_SYNC_INTERVAL_SECONDS=3600


#Backfiller
Expand All @@ -29,7 +28,6 @@ API_METRICS_PORT=8985
INGESTER_METRICS_PORT=9091
MIGRATOR_METRICS_PORT=5091
SYNCHRONIZER_METRICS_PORT=6091
ROCKS_DB_BACKUP_SERVICE_METRICS_PORT=10091

# API server port (if API is enabled)
INGESTER_SERVER_PORT=9092
Expand Down
4 changes: 1 addition & 3 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,14 @@ services:

rocksdb-backup:
container_name: rocksdb-backup
restart: always
entrypoint: sh -c "if [ -z '$$MALLOC_CONF' ]; then exec ./rocksdb_backup; else exec ./profiling_rocksdb_backup; fi"
entrypoint: ./rocksdb_backup
env_file:
- .env
network_mode: host
volumes:
- ${ROCKS_DB_PATH}:${ROCKS_DB_PATH_CONTAINER}:ro
- ${ROCKS_BACKUP_DIR}:${ROCKS_BACKUP_DIR}:rw
- ${ROCKS_BACKUP_ARCHIVES_DIR}:${ROCKS_BACKUP_ARCHIVES_DIR}:rw
- ${PROFILING_FILE_PATH}:${PROFILING_FILE_PATH_CONTAINER}:rw
- ${ROCKS_DB_SECONDARY_PATH}:${ROCKS_DB_SECONDARY_PATH_CONTAINER}:rw
- ./heaps:/usr/src/app/heaps:rw
build:
Expand Down
43 changes: 4 additions & 39 deletions nft_ingester/src/bin/rocksdb_backup/main.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,25 @@
use std::sync::Arc;

use clap::Parser;
use metrics_utils::{utils::start_metrics, RocksDbMetricsConfig};
use nft_ingester::{
config::{init_logger, RocksDbBackupServiceClapArgs},
init::graceful_stop,
};
use metrics_utils::RocksDbMetricsConfig;
use nft_ingester::config::{init_logger, RocksDbBackupServiceClapArgs};
use prometheus_client::registry::Registry;
use rocks_db::{
backup_service::{RocksDbBackupService, RocksDbBackupServiceConfig},
errors::RocksDbBackupServiceError,
migrator::MigrationState,
Storage,
};
use tokio::{
sync::{broadcast, Mutex},
task::JoinSet,
};
use tokio_util::sync::CancellationToken;
use tokio::{sync::Mutex, task::JoinSet};
use tracing::{debug, info};

#[cfg(feature = "profiling")]
#[global_allocator]
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), RocksDbBackupServiceError> {
let args = RocksDbBackupServiceClapArgs::parse();
init_logger(&args.log_level);

info!("Starting RocksDb backup service...");

let guard = if args.is_run_profiling {
Some(pprof::ProfilerGuardBuilder::default().frequency(100).build().unwrap())
} else {
None
};

let mut registry = Registry::default();
let metrics = Arc::new(RocksDbMetricsConfig::new());
metrics.register(&mut registry);
Expand All @@ -62,8 +45,6 @@ async fn main() -> Result<(), RocksDbBackupServiceError> {
);

let rocks_storage = Arc::new(storage);
let (shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1);
let shutdown_token = CancellationToken::new();

info!("Starting store DB backup...");
let mut backup_service = RocksDbBackupService::new(
Expand All @@ -72,26 +53,10 @@ async fn main() -> Result<(), RocksDbBackupServiceError> {
rocks_backup_dir: args.backup_dir,
rocks_backup_archives_dir: args.backup_archives_dir,
rocks_flush_before_backup: args.flush_before_backup,
rocks_interval_in_seconds: args.interval_in_seconds,
},
)?;
let cloned_rx = shutdown_rx.resubscribe();
mutexed_tasks
.lock()
.await
.spawn(async move { backup_service.perform_backup(metrics.clone(), cloned_rx).await });

start_metrics(registry, args.metrics_port).await;
// --stop
graceful_stop(
mutexed_tasks.clone(),
shutdown_tx,
Some(shutdown_token),
guard,
args.profiling_file_path_container,
&args.heap_path,
)
.await;
mutexed_tasks.lock().await.spawn(async move { backup_service.perform_backup().await });

Ok(())
}
17 changes: 0 additions & 17 deletions nft_ingester/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,27 +323,10 @@ pub struct RocksDbBackupServiceClapArgs {
help = "Whether to flush RocksDb before backup"
)]
pub flush_before_backup: bool,
#[clap(long, env = "ROCKS_INTERVAL_IN_SECONDS", help = "Backup interval (seconds)")]
pub interval_in_seconds: i64,

#[clap(
long("run_profiling"),
env = "IS_RUN_PROFILING",
default_value_t = false,
help = "Start profiling (default: false)"
)]
pub is_run_profiling: bool,
#[clap(long, env, default_value = "/usr/src/app/heaps", help = "Heap path")]
pub heap_path: String,

#[clap(
long,
env = "ROCKS_DB_BACKUP_SERVICE_METRICS_PORT",
help = "Metrics port. Start HTTP server to report metrics if port exist."
)]
pub metrics_port: Option<u16>,
pub profiling_file_path_container: Option<String>,

#[clap(long, env, default_value = "info", help = "warn|info|debug")]
pub log_level: String,
}
Expand Down
91 changes: 42 additions & 49 deletions rocks-db/src/backup_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@ use std::{
io::{BufReader, Write},
path::Path,
sync::Arc,
time::Duration,
};

use futures_util::StreamExt;
use metrics_utils::RocksDbMetricsConfig;
use indicatif::{ProgressBar, ProgressStyle};
use rocksdb::{
backup::{BackupEngine, BackupEngineOptions, RestoreOptions},
Env, DB,
};
use serde::{Deserialize, Serialize};
use tokio::{sync::broadcast::Receiver, task::JoinError};
use tokio::task::JoinError;
use tracing::{error, info};

use crate::errors::RocksDbBackupServiceError;
Expand All @@ -24,13 +23,13 @@ const BACKUP_POSTFIX: &str = ".tar.lz4";
const ROCKS_NUM_BACKUPS_TO_KEEP: usize = 1;
const NUMBER_ARCHIVES_TO_STORE: usize = 2;
const DEFAULT_BACKUP_DIR_NAME: &str = "_rocksdb_backup";
const BASE_BACKUP_ID: u32 = 1;

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct RocksDbBackupServiceConfig {
pub rocks_backup_dir: String,
pub rocks_backup_archives_dir: String,
pub rocks_flush_before_backup: bool,
pub rocks_interval_in_seconds: i64,
}

pub struct RocksDbBackupService {
Expand Down Expand Up @@ -62,52 +61,47 @@ impl RocksDbBackupService {
self.verify_backup_single(backup_id)
}

pub async fn perform_backup(
&mut self,
metrics: Arc<RocksDbMetricsConfig>,
mut rx: Receiver<()>,
) -> Result<(), JoinError> {
let mut last_backup_id = 1;
while rx.is_empty() {
let start_time = chrono::Utc::now();
last_backup_id = match self.backup_engine.get_backup_info().last() {
None => last_backup_id,
Some(backup_info) => {
if (backup_info.timestamp + self.backup_config.rocks_interval_in_seconds)
>= start_time.timestamp()
{
continue;
}
backup_info.backup_id + 1
},
};

if let Err(err) = self.create_backup(last_backup_id) {
error!("create_backup: {}", err);
}
if let Err(err) = self.delete_old_backups() {
error!("delete_old_backups: {}", err);
}
if let Err(err) = self.build_backup_archive(start_time.timestamp()) {
error!("build_backup_archive: {}", err);
}
if let Err(err) = self.delete_old_archives() {
error!("delete_old_archives: {}", err);
}

let duration = chrono::Utc::now().signed_duration_since(start_time);
metrics.set_rocksdb_backup_latency(duration.num_milliseconds() as f64);
pub async fn perform_backup(&mut self) -> Result<(), JoinError> {
let start_time = chrono::Utc::now();
let last_backup_id = match self.backup_engine.get_backup_info().last() {
None => BASE_BACKUP_ID,
Some(backup_info) => backup_info.backup_id + 1,
};

let progress_bar = Arc::new(ProgressBar::new(4)); // four steps:
// create backup, delete the old one, build archive, delete old archives
progress_bar.set_style(
ProgressStyle::default_bar()
.template(
"[{bar:40.cyan/blue}] {percent}% \
({pos}/{len}) {msg}",
)
.expect("Failed to set progress bar style")
.progress_chars("#>-"),
);

info!("perform_backup {}", duration.num_seconds());
if let Err(err) = self.create_backup(last_backup_id) {
error!(error = %err, "create_backup: {:?}", err);
}

tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(self.backup_config.rocks_interval_in_seconds as u64)) => {},
_ = rx.recv() => {
info!("Received stop signal, stopping performing backup");
break;
}
};
progress_bar.inc(1);
if let Err(err) = self.delete_old_backups() {
error!(error = %err, "delete_old_backups: {:?}", err);
}
progress_bar.inc(1);
if let Err(err) = self.build_backup_archive(start_time.timestamp()) {
error!(error = %err, "build_backup_archive: {:?}", err);
}
progress_bar.inc(1);
if let Err(err) = self.delete_old_archives() {
error!(error = %err, "delete_old_archives: {:?}", err);
}
progress_bar.inc(1);
progress_bar.finish_with_message("Backup completed!");

let duration = chrono::Utc::now().signed_duration_since(start_time);

info!(duration = %duration.num_milliseconds(), "Performed backup in {}ms", duration.num_milliseconds());

Ok(())
}
Expand All @@ -129,8 +123,7 @@ impl RocksDbBackupService {
let backup_dir_name = get_backup_dir_name(self.backup_config.rocks_backup_dir.as_str());
tar.append_dir_all(backup_dir_name, self.backup_config.rocks_backup_dir.clone())?;
tar.into_inner()?;
let (_output, result) = enc.finish();
result?;
enc.finish().1?;

Ok(())
}
Expand Down

0 comments on commit 23a0aa9

Please sign in to comment.