diff --git a/crates/file_storage_cleanup/Cargo.toml b/crates/file_storage_cleanup/Cargo.toml new file mode 100644 index 000000000..850539b50 --- /dev/null +++ b/crates/file_storage_cleanup/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "file_storage_cleanup" +version = "0.1.0" +edition = "2021" +license = "LicenseRef-FSL-1.1-Apache-2.0" + +[[bin]] +name = "file-storage-cleanup" +path = "src/main.rs" + +[dependencies] +anyhow = { workspace = true } +clap = { workspace = true } +cmd_util = { path = "../cmd_util" } +common = { path = "../common" } +db_connection = { path = "../db_connection" } +runtime = { path = "../runtime" } +search = { path = "../search" } +database = { path = "../database" } +model = { path = "../model" } +application = { path = "../application" } +aws_utils = { path = "../aws_utils" } +aws_s3 = { path = "../aws_s3" } +storage = { path = "../storage" } +walkdir = { workspace = true } +tracing = { workspace = true } +async-trait = { workspace = true } +futures = { workspace = true } diff --git a/crates/file_storage_cleanup/src/main.rs b/crates/file_storage_cleanup/src/main.rs new file mode 100644 index 000000000..5ebb226cb --- /dev/null +++ b/crates/file_storage_cleanup/src/main.rs @@ -0,0 +1,183 @@ +use std::{ + collections::HashSet, + io::{self, Write}, + path::{Path, PathBuf}, + sync::Arc, +}; + +use anyhow::{Context, Result}; +use clap::Parser; +use cmd_util::env::config_tool; +use common::{ + identity::Identity, + runtime::Runtime, + types::ObjectKey, +}; +use db_connection::connect_persistence; +use futures::{StreamExt, TryStreamExt}; +use model::{ + database_globals::{types::StorageType, DatabaseGlobalsModel}, + file_storage::{self, FILE_STORAGE_TABLE}, + initialize_application_system_tables, + IndexModel, +}; +use runtime::prod::ProdRuntime; +use search::{searcher::InProcessSearcher, Searcher}; +use storage::{create_storage, StorageUseCase, Storage}; +use walkdir::WalkDir; +use database::Database; +use events::usage::NoOpUsageEventLogger; + +#[derive(Parser, Debug)] +struct CleanupConfig { + #[clap(long, value_enum, default_value_t = clusters::DbDriverTag::Sqlite)] + db: clusters::DbDriverTag, + #[clap(long, default_value = "convex_local_backend.sqlite3")] + db_spec: String, + #[clap(long, default_value = "convex_local_storage")] + local_storage: String, + #[clap(long)] + s3_storage: bool, + #[clap(long)] + do_not_require_ssl: bool, + #[clap(long)] + confirm: bool, +} + +fn main() -> Result<()> { + let _guard = config_tool(); + let config = CleanupConfig::parse(); + let tokio = ProdRuntime::init_tokio()?; + let runtime = ProdRuntime::new(&tokio); + runtime.block_on("file_storage_cleanup", run(config, runtime)) +} + +async fn run(config: CleanupConfig, runtime: ProdRuntime) -> Result<()> { + let persistence = connect_persistence( + config.db, + &config.db_spec, + !config.do_not_require_ssl, + false, + "file-storage-cleanup", + runtime.clone(), + common::shutdown::ShutdownSignal::panic(), + ) + .await?; + let searcher: Arc = Arc::new(InProcessSearcher::new(runtime.clone()).await?); + let database = Database::load( + persistence.clone(), + runtime.clone(), + searcher, + common::shutdown::ShutdownSignal::panic(), + model::virtual_system_mapping().clone(), + Arc::new(NoOpUsageEventLogger), + ) + .await?; + initialize_application_system_tables(&database).await?; + + let storage_type = { + let mut tx = database.begin_system().await?; + let globals = DatabaseGlobalsModel::new(&mut tx).database_globals().await?; + globals + .value + .storage_type + .context("storage_type not set")? + }; + let files_storage = create_storage(runtime.clone(), &storage_type, StorageUseCase::Files).await?; + + let active_keys = list_active_keys(&database).await?; + let stored_keys = match &storage_type { + StorageType::Local { dir } => { + let base = PathBuf::from(dir).join(StorageUseCase::Files.to_string()); + list_local_objects(&base)? + } + StorageType::S3 { s3_prefix } => { + let bucket = aws_s3::storage::s3_bucket_name(&StorageUseCase::Files)?; + list_s3_objects(s3_prefix, bucket).await? + } + }; + + let to_delete: Vec = stored_keys + .difference(&active_keys) + .cloned() + .collect(); + + println!("Found {} unreferenced files", to_delete.len()); + if to_delete.is_empty() { + return Ok(()); + } + if !config.confirm { + println!("Run again with --confirm to delete these files."); + return Ok(()); + } + println!("This will permanently delete {} files. Type 'yes' to proceed:", to_delete.len()); + io::stdout().flush()?; + let mut input = String::new(); + io::stdin().read_line(&mut input)?; + if input.trim() != "yes" { + println!("Aborted"); + return Ok(()); + } + for key in &to_delete { + files_storage.delete_object(key).await?; + } + println!("Deleted {} files", to_delete.len()); + Ok(()) +} + +async fn list_active_keys(db: &Database) -> Result> { + let (tablet_ids, snapshot_ts) = { + let mut tx = db.begin(Identity::system()).await?; + let by_id_indexes = IndexModel::new(&mut tx).by_id_indexes().await?; + let table_mapping = tx.table_mapping(); + let tablet_ids: HashSet<_> = table_mapping + .iter() + .filter(|(tablet_id, _, _, table_name)| { + **table_name == *FILE_STORAGE_TABLE && table_mapping.is_active(*tablet_id) + }) + .map(|(tablet_id, ..)| *tablet_id) + .collect(); + let snapshot_ts = tx.begin_timestamp(); + (tablet_ids, snapshot_ts) + }; + let mut keys = HashSet::new(); + for tablet_id in tablet_ids { + let table_iterator = db.table_iterator(snapshot_ts, 100); + let by_id = db.index_registry().must_get_by_id(tablet_id)?.id; + let mut stream = Box::pin(table_iterator.stream_documents_in_table(tablet_id, by_id, None)); + while let Some(doc) = stream.try_next().await? { + let entry: file_storage::FileStorageEntry = doc.value.parse()?; + keys.insert(entry.storage_key); + } + } + Ok(keys) +} + +fn list_local_objects(base: &Path) -> Result> { + let mut keys = HashSet::new(); + for entry in WalkDir::new(base).into_iter().filter_map(Result::ok) { + if entry.file_type().is_file() { + let rel = entry.path().strip_prefix(base)?; + let mut s = rel.to_string_lossy().replace('\\', "/"); + if let Some(stripped) = s.strip_suffix(".blob") { + s = stripped.to_string(); + } + keys.insert(s.try_into()?); + } + } + Ok(keys) +} + +async fn list_s3_objects(prefix: &str, bucket: String) -> Result> { + let client = aws_utils::s3::S3Client::new(true).await?; + let mut stream = client.list_all_s3_files_from_bucket(bucket, Some(prefix.to_string())); + let mut keys = HashSet::new(); + while let Some(obj) = stream.try_next().await? { + if let Some(key) = obj.key() { + if let Some(stripped) = key.strip_prefix(prefix) { + keys.insert(stripped.to_string().try_into()?); + } + } + } + Ok(keys) +}