Skip to content

Commit

Permalink
feat: use PathBufs in backup-related functions
Browse files Browse the repository at this point in the history
  • Loading branch information
armyhaylenko committed Jan 23, 2025
1 parent 4d01d0c commit 221938b
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 42 deletions.
3 changes: 1 addition & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,7 @@ services:
- ${ROCKS_DB_PATH}:${ROCKS_DB_PATH_CONTAINER}:ro
- ${ROCKS_BACKUP_DIR}:${ROCKS_BACKUP_DIR}:rw
- ${ROCKS_BACKUP_ARCHIVES_DIR}:${ROCKS_BACKUP_ARCHIVES_DIR}:rw
- ${ROCKS_DB_SECONDARY_PATH}:${ROCKS_DB_SECONDARY_PATH_CONTAINER}:rw
- ./heaps:/usr/src/app/heaps:rw
- ${ROCKS_DB_SECONDARY_PATH}/backup:${ROCKS_DB_SECONDARY_PATH_CONTAINER}:rw
build:
context: .
dockerfile: ingester.Dockerfile
Expand Down
12 changes: 8 additions & 4 deletions nft_ingester/src/bin/ingester/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,14 @@ pub async fn main() -> Result<(), IngesterError> {
&args
.rocks_backup_url
.expect("rocks_backup_url is required for the restore rocks db process"),
&args
.rocks_backup_archives_dir
.expect("rocks_backup_archives_dir is required for the restore rocks db process"),
&args.rocks_db_path_container,
&PathBuf::from_str(
&args.rocks_backup_archives_dir.expect(
"rocks_backup_archives_dir is required for the restore rocks db process",
),
)
.expect("invalid rocks backup archives dir"),
&PathBuf::from_str(&args.rocks_db_path_container)
.expect("invalid rocks backup archives dir"),
)
.await?;
}
Expand Down
6 changes: 3 additions & 3 deletions nft_ingester/src/bin/rocksdb_backup/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ async fn main() -> Result<(), RocksDbBackupServiceError> {
.unwrap();

debug!(
rocks_db_path_container = %args.rocks_db_path_container,
rocks_db_secondary_path = %args.rocks_db_secondary_path,
rocks_db_path_container = ?args.rocks_db_path_container,
rocks_db_secondary_path = ?args.rocks_db_secondary_path,
"Opened RocksDb in secondary mode"
);

Expand All @@ -53,7 +53,7 @@ async fn main() -> Result<(), RocksDbBackupServiceError> {
},
)?;

mutexed_tasks.lock().await.spawn(async move { backup_service.perform_backup().await });
backup_service.perform_backup().await?;

Ok(())
}
14 changes: 7 additions & 7 deletions nft_ingester/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::path::PathBuf;

use clap::{ArgAction, Parser, ValueEnum};
// TODO: replace String paths with PathBuf
use figment::value::Dict;
use serde::Deserialize;
use solana_sdk::commitment_config::CommitmentLevel;
Expand Down Expand Up @@ -324,23 +327,20 @@ pub struct SynchronizerClapArgs {
#[command(author, version, about, long_about = None)]
pub struct RocksDbBackupServiceClapArgs {
#[clap(long, env, default_value = "./my_rocksdb", help = "Rocks db path container")]
pub rocks_db_path_container: String,
pub rocks_db_path_container: PathBuf,
#[clap(long, env, default_value = "./my_rocksdb_secondary", help = "Rocks db secondary path")]
pub rocks_db_secondary_path: String,
pub rocks_db_secondary_path: PathBuf,
#[clap(long, env = "ROCKS_BACKUP_ARCHIVES_DIR", help = "Rocks backup archives dir")]
pub backup_archives_dir: String,
pub backup_archives_dir: PathBuf,
#[clap(long, env = "ROCKS_BACKUP_DIR", help = "Rocks backup dir")]
pub backup_dir: String,
pub backup_dir: PathBuf,
#[clap(
long,
env = "ROCKS_FLUSH_BEFORE_BACKUP",
help = "Whether to flush RocksDb before backup"
)]
pub flush_before_backup: bool,

#[clap(long, env, default_value = "/usr/src/app/heaps", help = "Heap path")]
pub heap_path: String,

#[clap(long, env, default_value = "info", help = "warn|info|debug")]
pub log_level: String,
}
Expand Down
13 changes: 7 additions & 6 deletions nft_ingester/src/rocks_db.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
fs::{create_dir_all, remove_dir_all},
path::PathBuf,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Expand Down Expand Up @@ -48,20 +49,20 @@ pub async fn receive_last_saved_slot(

pub async fn restore_rocksdb(
rocks_backup_url: &str,
rocks_backup_archives_dir: &str,
rocks_db_path_container: &str,
rocks_backup_archives_dir: &PathBuf,
rocks_db_path_container: &PathBuf,
) -> Result<(), RocksDbBackupServiceError> {
create_dir_all(rocks_backup_archives_dir)?;

let backup_path = format!("{}/{}", rocks_backup_archives_dir, INGESTER_BACKUP_NAME);
let backup_path = rocks_backup_archives_dir.join(INGESTER_BACKUP_NAME);

backup_service::download_backup_archive(rocks_backup_url, &backup_path).await?;
backup_service::unpack_backup_archive(&backup_path, rocks_backup_archives_dir)?;

let unpacked_archive = format!(
"{}/{}",
&rocks_backup_archives_dir,
let unpacked_archive = rocks_backup_archives_dir.join(
backup_service::get_backup_dir_name(rocks_backup_url)
.parse::<PathBuf>()
.expect("invalid backup dir name"),
);

backup_service::restore_external_backup(&unpacked_archive, rocks_db_path_container)?;
Expand Down
50 changes: 30 additions & 20 deletions rocks-db/src/backup_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
ffi::OsStr,
fs::File,
io::{BufReader, Write},
path::Path,
path::{Path, PathBuf},
sync::Arc,
};

Expand All @@ -13,7 +13,6 @@ use rocksdb::{
Env, DB,
};
use serde::{Deserialize, Serialize};
use tokio::task::JoinError;
use tracing::{error, info};

use crate::errors::RocksDbBackupServiceError;
Expand All @@ -27,8 +26,8 @@ const BASE_BACKUP_ID: u32 = 1;

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct RocksDbBackupServiceConfig {
pub rocks_backup_dir: String,
pub rocks_backup_archives_dir: String,
pub rocks_backup_dir: PathBuf,
pub rocks_backup_archives_dir: PathBuf,
pub rocks_flush_before_backup: bool,
}

Expand Down Expand Up @@ -61,7 +60,7 @@ impl RocksDbBackupService {
self.verify_backup_single(backup_id)
}

pub async fn perform_backup(&mut self) -> Result<(), JoinError> {
pub async fn perform_backup(&mut self) -> Result<(), RocksDbBackupServiceError> {
let start_time = chrono::Utc::now();
let last_backup_id = match self.backup_engine.get_backup_info().last() {
None => BASE_BACKUP_ID,
Expand All @@ -80,22 +79,21 @@ impl RocksDbBackupService {
.progress_chars("#>-"),
);

if let Err(err) = self.create_backup(last_backup_id) {
self.create_backup(last_backup_id).inspect_err(|err| {
error!(error = %err, "create_backup: {:?}", err);
}

})?;
progress_bar.inc(1);
if let Err(err) = self.delete_old_backups() {
self.delete_old_backups().inspect_err(|err| {
error!(error = %err, "delete_old_backups: {:?}", err);
}
})?;
progress_bar.inc(1);
if let Err(err) = self.build_backup_archive(start_time.timestamp()) {
self.build_backup_archive(start_time.timestamp()).inspect_err(|err| {
error!(error = %err, "build_backup_archive: {:?}", err);
}
})?;
progress_bar.inc(1);
if let Err(err) = self.delete_old_archives() {
self.delete_old_archives().inspect_err(|err| {
error!(error = %err, "delete_old_archives: {:?}", err);
}
})?;
progress_bar.inc(1);
progress_bar.finish_with_message("Backup completed!");

Expand All @@ -109,7 +107,10 @@ impl RocksDbBackupService {
pub fn build_backup_archive(&self, backup_time: i64) -> Result<(), RocksDbBackupServiceError> {
let file_path = format!(
"{}/{}-{}{}",
self.backup_config.rocks_backup_archives_dir,
self.backup_config
.rocks_backup_archives_dir
.to_str()
.expect("Invalid backup archives dir path"),
BACKUP_PREFIX,
backup_time,
BACKUP_POSTFIX
Expand All @@ -120,7 +121,13 @@ impl RocksDbBackupService {
let mut enc = lz4::EncoderBuilder::new().level(1).build(file)?;
let mut tar = tar::Builder::new(&mut enc);

let backup_dir_name = get_backup_dir_name(self.backup_config.rocks_backup_dir.as_str());
let backup_dir_name = get_backup_dir_name(
self.backup_config
.rocks_backup_dir
.as_path()
.to_str()
.expect("invalid rocks backup dir provided"),
);
tar.append_dir_all(backup_dir_name, self.backup_config.rocks_backup_dir.clone())?;
tar.into_inner()?;
enc.finish().1?;
Expand Down Expand Up @@ -196,7 +203,7 @@ pub fn get_backup_dir_name(backup_path: &str) -> String {

pub async fn download_backup_archive(
url: &str,
backup_path: &str,
backup_path: &PathBuf,
) -> Result<(), RocksDbBackupServiceError> {
let resp = reqwest::get(url).await?;
if resp.status().is_success() {
Expand All @@ -211,7 +218,10 @@ pub async fn download_backup_archive(
Err(RocksDbBackupServiceError::ReqwestError(resp.status().to_string()))
}

pub fn unpack_backup_archive(file_path: &str, dst: &str) -> Result<(), RocksDbBackupServiceError> {
pub fn unpack_backup_archive(
file_path: &PathBuf,
dst: &PathBuf,
) -> Result<(), RocksDbBackupServiceError> {
let file = File::open(file_path)?;
let decoder = lz4::Decoder::new(BufReader::new(file))?;
let mut archive = tar::Archive::new(decoder);
Expand All @@ -221,8 +231,8 @@ pub fn unpack_backup_archive(file_path: &str, dst: &str) -> Result<(), RocksDbBa
}

pub fn restore_external_backup(
backup_dir: &str,
new_db_dir: &str,
backup_dir: &PathBuf,
new_db_dir: &PathBuf,
) -> Result<(), RocksDbBackupServiceError> {
let env = Env::new()?;
let backup_options = BackupEngineOptions::new(backup_dir)?;
Expand Down

0 comments on commit 221938b

Please sign in to comment.