From 3de8db1a3b4ba9ddee314937df860b42590b877d Mon Sep 17 00:00:00 2001 From: armyhaylenko Date: Mon, 20 Jan 2025 17:37:00 +0200 Subject: [PATCH] feat: use `PathBuf`s in backup-related functions --- docker-compose.yaml | 3 +- nft_ingester/src/bin/ingester/main.rs | 12 +++-- nft_ingester/src/bin/rocksdb_backup/main.rs | 6 +-- nft_ingester/src/config.rs | 15 ++++--- nft_ingester/src/rocks_db.rs | 13 +++--- rocks-db/src/backup_service.rs | 50 ++++++++++++--------- 6 files changed, 57 insertions(+), 42 deletions(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index a21eff8e..9402f86a 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -172,8 +172,7 @@ services: - ${ROCKS_DB_PATH}:${ROCKS_DB_PATH_CONTAINER}:ro - ${ROCKS_BACKUP_DIR}:${ROCKS_BACKUP_DIR}:rw - ${ROCKS_BACKUP_ARCHIVES_DIR}:${ROCKS_BACKUP_ARCHIVES_DIR}:rw - - ${ROCKS_DB_SECONDARY_PATH}:${ROCKS_DB_SECONDARY_PATH_CONTAINER}:rw - - ./heaps:/usr/src/app/heaps:rw + - ${ROCKS_DB_SECONDARY_PATH}/backup:${ROCKS_DB_SECONDARY_PATH_CONTAINER}:rw build: context: . dockerfile: ingester.Dockerfile diff --git a/nft_ingester/src/bin/ingester/main.rs b/nft_ingester/src/bin/ingester/main.rs index edb8f20b..20bcf522 100644 --- a/nft_ingester/src/bin/ingester/main.rs +++ b/nft_ingester/src/bin/ingester/main.rs @@ -110,10 +110,14 @@ pub async fn main() -> Result<(), IngesterError> { &args .rocks_backup_url .expect("rocks_backup_url is required for the restore rocks db process"), - &args - .rocks_backup_archives_dir - .expect("rocks_backup_archives_dir is required for the restore rocks db process"), - &args.rocks_db_path_container, + &PathBuf::from_str( + &args.rocks_backup_archives_dir.expect( + "rocks_backup_archives_dir is required for the restore rocks db process", + ), + ) + .expect("invalid rocks backup archives dir"), + &PathBuf::from_str(&args.rocks_db_path_container) + .expect("invalid rocks backup archives dir"), ) .await?; } diff --git a/nft_ingester/src/bin/rocksdb_backup/main.rs b/nft_ingester/src/bin/rocksdb_backup/main.rs index 45d43f3b..15a6afe2 100644 --- a/nft_ingester/src/bin/rocksdb_backup/main.rs +++ b/nft_ingester/src/bin/rocksdb_backup/main.rs @@ -36,8 +36,8 @@ async fn main() -> Result<(), RocksDbBackupServiceError> { .unwrap(); debug!( - rocks_db_path_container = %args.rocks_db_path_container, - rocks_db_secondary_path = %args.rocks_db_secondary_path, + rocks_db_path_container = ?args.rocks_db_path_container, + rocks_db_secondary_path = ?args.rocks_db_secondary_path, "Opened RocksDb in secondary mode" ); @@ -53,7 +53,7 @@ async fn main() -> Result<(), RocksDbBackupServiceError> { }, )?; - mutexed_tasks.lock().await.spawn(async move { backup_service.perform_backup().await }); + backup_service.perform_backup().await?; Ok(()) } diff --git a/nft_ingester/src/config.rs b/nft_ingester/src/config.rs index 86630dd6..56b93dfc 100644 --- a/nft_ingester/src/config.rs +++ b/nft_ingester/src/config.rs @@ -1,3 +1,7 @@ +// TODO: replace String paths with PathBuf + +use std::path::PathBuf; + use clap::{Parser, ValueEnum}; use figment::value::Dict; use serde::Deserialize; @@ -310,13 +314,13 @@ pub struct SynchronizerClapArgs { #[command(author, version, about, long_about = None)] pub struct RocksDbBackupServiceClapArgs { #[clap(long, env, default_value = "./my_rocksdb", help = "Rocks db path container")] - pub rocks_db_path_container: String, + pub rocks_db_path_container: PathBuf, #[clap(long, env, default_value = "./my_rocksdb_secondary", help = "Rocks db secondary path")] - pub rocks_db_secondary_path: String, + pub rocks_db_secondary_path: PathBuf, #[clap(long, env = "ROCKS_BACKUP_ARCHIVES_DIR", help = "Rocks backup archives dir")] - pub backup_archives_dir: String, + pub backup_archives_dir: PathBuf, #[clap(long, env = "ROCKS_BACKUP_DIR", help = "Rocks backup dir")] - pub backup_dir: String, + pub backup_dir: PathBuf, #[clap( long, env = "ROCKS_FLUSH_BEFORE_BACKUP", @@ -324,9 +328,6 @@ pub struct RocksDbBackupServiceClapArgs { )] pub flush_before_backup: bool, - #[clap(long, env, default_value = "/usr/src/app/heaps", help = "Heap path")] - pub heap_path: String, - #[clap(long, env, default_value = "info", help = "warn|info|debug")] pub log_level: String, } diff --git a/nft_ingester/src/rocks_db.rs b/nft_ingester/src/rocks_db.rs index ff0ca3d0..f3881ea2 100644 --- a/nft_ingester/src/rocks_db.rs +++ b/nft_ingester/src/rocks_db.rs @@ -1,5 +1,6 @@ use std::{ fs::{create_dir_all, remove_dir_all}, + path::PathBuf, sync::{ atomic::{AtomicU64, Ordering}, Arc, @@ -48,20 +49,20 @@ pub async fn receive_last_saved_slot( pub async fn restore_rocksdb( rocks_backup_url: &str, - rocks_backup_archives_dir: &str, - rocks_db_path_container: &str, + rocks_backup_archives_dir: &PathBuf, + rocks_db_path_container: &PathBuf, ) -> Result<(), RocksDbBackupServiceError> { create_dir_all(rocks_backup_archives_dir)?; - let backup_path = format!("{}/{}", rocks_backup_archives_dir, INGESTER_BACKUP_NAME); + let backup_path = rocks_backup_archives_dir.join(INGESTER_BACKUP_NAME); backup_service::download_backup_archive(rocks_backup_url, &backup_path).await?; backup_service::unpack_backup_archive(&backup_path, rocks_backup_archives_dir)?; - let unpacked_archive = format!( - "{}/{}", - &rocks_backup_archives_dir, + let unpacked_archive = rocks_backup_archives_dir.join( backup_service::get_backup_dir_name(rocks_backup_url) + .parse::() + .expect("invalid backup dir name"), ); backup_service::restore_external_backup(&unpacked_archive, rocks_db_path_container)?; diff --git a/rocks-db/src/backup_service.rs b/rocks-db/src/backup_service.rs index 88d9e881..60d57dfa 100644 --- a/rocks-db/src/backup_service.rs +++ b/rocks-db/src/backup_service.rs @@ -2,7 +2,7 @@ use std::{ ffi::OsStr, fs::File, io::{BufReader, Write}, - path::Path, + path::{Path, PathBuf}, sync::Arc, }; @@ -13,7 +13,6 @@ use rocksdb::{ Env, DB, }; use serde::{Deserialize, Serialize}; -use tokio::task::JoinError; use tracing::{error, info}; use crate::errors::RocksDbBackupServiceError; @@ -27,8 +26,8 @@ const BASE_BACKUP_ID: u32 = 1; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct RocksDbBackupServiceConfig { - pub rocks_backup_dir: String, - pub rocks_backup_archives_dir: String, + pub rocks_backup_dir: PathBuf, + pub rocks_backup_archives_dir: PathBuf, pub rocks_flush_before_backup: bool, } @@ -61,7 +60,7 @@ impl RocksDbBackupService { self.verify_backup_single(backup_id) } - pub async fn perform_backup(&mut self) -> Result<(), JoinError> { + pub async fn perform_backup(&mut self) -> Result<(), RocksDbBackupServiceError> { let start_time = chrono::Utc::now(); let last_backup_id = match self.backup_engine.get_backup_info().last() { None => BASE_BACKUP_ID, @@ -80,22 +79,21 @@ impl RocksDbBackupService { .progress_chars("#>-"), ); - if let Err(err) = self.create_backup(last_backup_id) { + self.create_backup(last_backup_id).inspect_err(|err| { error!(error = %err, "create_backup: {:?}", err); - } - + })?; progress_bar.inc(1); - if let Err(err) = self.delete_old_backups() { + self.delete_old_backups().inspect_err(|err| { error!(error = %err, "delete_old_backups: {:?}", err); - } + })?; progress_bar.inc(1); - if let Err(err) = self.build_backup_archive(start_time.timestamp()) { + self.build_backup_archive(start_time.timestamp()).inspect_err(|err| { error!(error = %err, "build_backup_archive: {:?}", err); - } + })?; progress_bar.inc(1); - if let Err(err) = self.delete_old_archives() { + self.delete_old_archives().inspect_err(|err| { error!(error = %err, "delete_old_archives: {:?}", err); - } + })?; progress_bar.inc(1); progress_bar.finish_with_message("Backup completed!"); @@ -109,7 +107,10 @@ impl RocksDbBackupService { pub fn build_backup_archive(&self, backup_time: i64) -> Result<(), RocksDbBackupServiceError> { let file_path = format!( "{}/{}-{}{}", - self.backup_config.rocks_backup_archives_dir, + self.backup_config + .rocks_backup_archives_dir + .to_str() + .expect("Invalid backup archives dir path"), BACKUP_PREFIX, backup_time, BACKUP_POSTFIX @@ -120,7 +121,13 @@ impl RocksDbBackupService { let mut enc = lz4::EncoderBuilder::new().level(1).build(file)?; let mut tar = tar::Builder::new(&mut enc); - let backup_dir_name = get_backup_dir_name(self.backup_config.rocks_backup_dir.as_str()); + let backup_dir_name = get_backup_dir_name( + self.backup_config + .rocks_backup_dir + .as_path() + .to_str() + .expect("invalid rocks backup dir provided"), + ); tar.append_dir_all(backup_dir_name, self.backup_config.rocks_backup_dir.clone())?; tar.into_inner()?; enc.finish().1?; @@ -196,7 +203,7 @@ pub fn get_backup_dir_name(backup_path: &str) -> String { pub async fn download_backup_archive( url: &str, - backup_path: &str, + backup_path: &PathBuf, ) -> Result<(), RocksDbBackupServiceError> { let resp = reqwest::get(url).await?; if resp.status().is_success() { @@ -211,7 +218,10 @@ pub async fn download_backup_archive( Err(RocksDbBackupServiceError::ReqwestError(resp.status().to_string())) } -pub fn unpack_backup_archive(file_path: &str, dst: &str) -> Result<(), RocksDbBackupServiceError> { +pub fn unpack_backup_archive( + file_path: &PathBuf, + dst: &PathBuf, +) -> Result<(), RocksDbBackupServiceError> { let file = File::open(file_path)?; let decoder = lz4::Decoder::new(BufReader::new(file))?; let mut archive = tar::Archive::new(decoder); @@ -221,8 +231,8 @@ pub fn unpack_backup_archive(file_path: &str, dst: &str) -> Result<(), RocksDbBa } pub fn restore_external_backup( - backup_dir: &str, - new_db_dir: &str, + backup_dir: &PathBuf, + new_db_dir: &PathBuf, ) -> Result<(), RocksDbBackupServiceError> { let env = Env::new()?; let backup_options = BackupEngineOptions::new(backup_dir)?;