Skip to content

Commit

Permalink
Merge pull request #364 from metaplex-foundation/feat/MTG-1044-backup…
Browse files Browse the repository at this point in the history
…-service

MTG-1044: add a separate secondary rocks backup service
  • Loading branch information
armyhaylenko authored Jan 23, 2025
2 parents daac082 + 221938b commit a09844d
Show file tree
Hide file tree
Showing 11 changed files with 225 additions and 134 deletions.
3 changes: 2 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ target
db-data
node-modules
ledger
.anchor
tmp
.anchor
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ start-synchronizer:
start-api:
@docker compose -f docker-compose.yaml up -d das-api

start-rocksdb-backup:
@docker compose -f docker-compose.yaml up -d rocksdb-backup

stop-api:
@docker stop --time 20 das-api

Expand Down
19 changes: 19 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,22 @@ services:
logging:
options:
max-size: "100m"

rocksdb-backup:
container_name: rocksdb-backup
entrypoint: ./rocksdb_backup
env_file:
- .env
network_mode: host
volumes:
- ${ROCKS_DB_PATH}:${ROCKS_DB_PATH_CONTAINER}:ro
- ${ROCKS_BACKUP_DIR}:${ROCKS_BACKUP_DIR}:rw
- ${ROCKS_BACKUP_ARCHIVES_DIR}:${ROCKS_BACKUP_ARCHIVES_DIR}:rw
- ${ROCKS_DB_SECONDARY_PATH}/backup:${ROCKS_DB_SECONDARY_PATH_CONTAINER}:rw
build:
context: .
dockerfile: ingester.Dockerfile
stop_grace_period: 2m
logging:
options:
max-size: "100m"
3 changes: 2 additions & 1 deletion ingester.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ RUN cargo chef cook --release --recipe-path recipe.json
# Building the services
FROM cacher AS builder
COPY . .
RUN cargo build --release --bin ingester --bin api --bin backfill --bin synchronizer --bin slot_persister
RUN cargo build --release --bin ingester --bin api --bin backfill --bin synchronizer --bin slot_persister --bin rocksdb_backup

# Building the profiling feature services
FROM cacher AS builder-with-profiling
Expand All @@ -58,6 +58,7 @@ COPY --from=builder /rust/target/release/backfill ${APP}/backfill
COPY --from=builder /rust/target/release/api ${APP}/api
COPY --from=builder /rust/target/release/synchronizer ${APP}/synchronizer
COPY --from=builder /rust/target/release/slot_persister ${APP}/slot_persister
COPY --from=builder /rust/target/release/rocksdb_backup ${APP}/rocksdb_backup
COPY --from=builder-with-profiling /rust/target/release/ingester ${APP}/profiling_ingester
COPY --from=builder-with-profiling /rust/target/release/backfill ${APP}/profiling_backfill
COPY --from=builder-with-profiling /rust/target/release/api ${APP}/profiling_api
Expand Down
25 changes: 8 additions & 17 deletions nft_ingester/src/bin/ingester/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,14 @@ pub async fn main() -> Result<(), IngesterError> {
&args
.rocks_backup_url
.expect("rocks_backup_url is required for the restore rocks db process"),
&args
.rocks_backup_archives_dir
.expect("rocks_backup_archives_dir is required for the restore rocks db process"),
&args.rocks_db_path_container,
&PathBuf::from_str(
&args.rocks_backup_archives_dir.expect(
"rocks_backup_archives_dir is required for the restore rocks db process",
),
)
.expect("invalid rocks backup archives dir"),
&PathBuf::from_str(&args.rocks_db_path_container)
.expect("invalid rocks backup archives dir"),
)
.await?;
}
Expand Down Expand Up @@ -625,17 +629,4 @@ pub async fn main() -> Result<(), IngesterError> {
.await;

Ok(())

// todo: remove backup service from here and move it to a separate process with a secondary db - verify it's possible first!
// start backup service
// if config.store_db_backups() {
// info!("Start store DB backup...");
// let backup_service = BackupService::new(primary_rocks_storage.db.clone(), &backup_service::load_config()?)?;
// let cloned_metrics = metrics_state.ingester_metrics.clone();
// let cloned_rx = shutdown_rx.resubscribe();
// mutexed_tasks
// .lock()
// .await
// .spawn(perform_backup(backup_service, cloned_rx, cloned_metrics));
// }
}
59 changes: 59 additions & 0 deletions nft_ingester/src/bin/rocksdb_backup/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use std::sync::Arc;

use clap::Parser;
use nft_ingester::config::{init_logger, RocksDbBackupServiceClapArgs};
use prometheus_client::registry::Registry;
use rocks_db::{
backup_service::{RocksDbBackupService, RocksDbBackupServiceConfig},
errors::RocksDbBackupServiceError,
migrator::MigrationState,
Storage,
};
use tokio::{sync::Mutex, task::JoinSet};
use tracing::{debug, info};

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), RocksDbBackupServiceError> {
let args = RocksDbBackupServiceClapArgs::parse();
init_logger(&args.log_level);

info!("Starting RocksDb backup service...");

let mut registry = Registry::default();
let red_metrics = Arc::new(metrics_utils::red::RequestErrorDurationMetrics::new());
red_metrics.register(&mut registry);

let tasks = JoinSet::new();
let mutexed_tasks = Arc::new(Mutex::new(tasks));

let storage = Storage::open_secondary(
&args.rocks_db_path_container,
&args.rocks_db_secondary_path,
mutexed_tasks.clone(),
red_metrics.clone(),
MigrationState::Last,
)
.unwrap();

debug!(
rocks_db_path_container = ?args.rocks_db_path_container,
rocks_db_secondary_path = ?args.rocks_db_secondary_path,
"Opened RocksDb in secondary mode"
);

let rocks_storage = Arc::new(storage);

info!("Starting store DB backup...");
let mut backup_service = RocksDbBackupService::new(
rocks_storage.db.clone(),
&RocksDbBackupServiceConfig {
rocks_backup_dir: args.backup_dir,
rocks_backup_archives_dir: args.backup_archives_dir,
rocks_flush_before_backup: args.flush_before_backup,
},
)?;

backup_service.perform_backup().await?;

Ok(())
}
25 changes: 25 additions & 0 deletions nft_ingester/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::path::PathBuf;

use clap::{ArgAction, Parser, ValueEnum};
// TODO: replace String paths with PathBuf
use figment::value::Dict;
use serde::Deserialize;
use solana_sdk::commitment_config::CommitmentLevel;
Expand Down Expand Up @@ -320,6 +323,28 @@ pub struct SynchronizerClapArgs {
pub log_level: String,
}

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct RocksDbBackupServiceClapArgs {
#[clap(long, env, default_value = "./my_rocksdb", help = "Rocks db path container")]
pub rocks_db_path_container: PathBuf,
#[clap(long, env, default_value = "./my_rocksdb_secondary", help = "Rocks db secondary path")]
pub rocks_db_secondary_path: PathBuf,
#[clap(long, env = "ROCKS_BACKUP_ARCHIVES_DIR", help = "Rocks backup archives dir")]
pub backup_archives_dir: PathBuf,
#[clap(long, env = "ROCKS_BACKUP_DIR", help = "Rocks backup dir")]
pub backup_dir: PathBuf,
#[clap(
long,
env = "ROCKS_FLUSH_BEFORE_BACKUP",
help = "Whether to flush RocksDb before backup"
)]
pub flush_before_backup: bool,

#[clap(long, env, default_value = "info", help = "warn|info|debug")]
pub log_level: String,
}

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct MigratorClapArgs {
Expand Down
6 changes: 3 additions & 3 deletions nft_ingester/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use interface::error::UsecaseError;
use plerkle_messenger::MessengerError;
use plerkle_serialization::error::PlerkleSerializationError;
use postgre_client::error::IndexDbError;
use rocks_db::errors::{BackupServiceError, StorageError};
use rocks_db::errors::{RocksDbBackupServiceError, StorageError};
use solana_sdk::{pubkey::ParsePubkeyError, signature::ParseSignatureError};
use solana_transaction_status::EncodeError;
use thiserror::Error;
Expand Down Expand Up @@ -212,8 +212,8 @@ impl From<sqlx::Error> for IngesterError {
}
}

impl From<BackupServiceError> for IngesterError {
fn from(value: BackupServiceError) -> Self {
impl From<RocksDbBackupServiceError> for IngesterError {
fn from(value: RocksDbBackupServiceError) -> Self {
IngesterError::BackupError(value.to_string())
}
}
Expand Down
28 changes: 9 additions & 19 deletions nft_ingester/src/rocks_db.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use std::{
fs::{create_dir_all, remove_dir_all},
path::PathBuf,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};

use metrics_utils::IngesterMetricsConfig;
use rocks_db::{
backup_service, backup_service::BackupService, errors::BackupServiceError,
storage_traits::AssetSlotStorage, Storage,
backup_service, errors::RocksDbBackupServiceError, storage_traits::AssetSlotStorage, Storage,
};
use tokio::{
sync::broadcast::{Receiver, Sender},
Expand All @@ -21,15 +20,6 @@ use tracing::{error, info};

use crate::config::INGESTER_BACKUP_NAME;

pub async fn perform_backup(
mut backup_service: BackupService,
cloned_rx: Receiver<()>,
cloned_metrics: Arc<IngesterMetricsConfig>,
) -> Result<(), JoinError> {
backup_service.perform_backup(cloned_metrics, cloned_rx).await;
Ok(())
}

pub async fn receive_last_saved_slot(
cloned_rx: Receiver<()>,
cloned_tx: Sender<()>,
Expand Down Expand Up @@ -59,20 +49,20 @@ pub async fn receive_last_saved_slot(

pub async fn restore_rocksdb(
rocks_backup_url: &str,
rocks_backup_archives_dir: &str,
rocks_db_path_container: &str,
) -> Result<(), BackupServiceError> {
rocks_backup_archives_dir: &PathBuf,
rocks_db_path_container: &PathBuf,
) -> Result<(), RocksDbBackupServiceError> {
create_dir_all(rocks_backup_archives_dir)?;

let backup_path = format!("{}/{}", rocks_backup_archives_dir, INGESTER_BACKUP_NAME);
let backup_path = rocks_backup_archives_dir.join(INGESTER_BACKUP_NAME);

backup_service::download_backup_archive(rocks_backup_url, &backup_path).await?;
backup_service::unpack_backup_archive(&backup_path, rocks_backup_archives_dir)?;

let unpacked_archive = format!(
"{}/{}",
&rocks_backup_archives_dir,
let unpacked_archive = rocks_backup_archives_dir.join(
backup_service::get_backup_dir_name(rocks_backup_url)
.parse::<PathBuf>()
.expect("invalid backup dir name"),
);

backup_service::restore_external_backup(&unpacked_archive, rocks_db_path_container)?;
Expand Down
Loading

0 comments on commit a09844d

Please sign in to comment.