From 9e57dbfb0ee204997fa280be1c59de7e9bde2741 Mon Sep 17 00:00:00 2001 From: 0x5b62656e5d Date: Sat, 11 Apr 2026 20:08:54 +0800 Subject: [PATCH 1/5] Add ability to delete all multipart uploads at once --- src/cli.rs | 17 +++++++++++++---- src/main.rs | 10 ++++++++-- src/multipart/delete.rs | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 6 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 39f1794..a39345b 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -87,11 +87,20 @@ pub enum MultpartCommands { #[arg(required = true)] bucket: String, - #[arg(required = true)] - key: String, + #[arg(short, long)] + all: bool, - #[arg(required = true)] - timestamp_id: String, + #[arg( + required_unless_present = "all", + conflicts_with = "all" + )] + key: Option, + + #[arg( + required_unless_present = "all", + conflicts_with = "all" + )] + timestamp_id: Option, }, List { #[arg(required = true)] diff --git a/src/main.rs b/src/main.rs index 304d99f..edf9a78 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,7 @@ use crate::{ files::{ delete::delete_file, download::download_file, list_files::list_files, upload::upload_file, }, - multipart::delete::delete_multipart_upload, + multipart::delete::{delete_all_multipart_uploads, delete_multipart_upload}, util::get_bucket_region, }; use anyhow::{Result, bail}; @@ -166,6 +166,7 @@ async fn main() -> Result<()> { Commands::Multipart { commands } => match commands { MultpartCommands::Delete { bucket, + all, key, timestamp_id, } => { @@ -175,7 +176,12 @@ async fn main() -> Result<()> { ) .await?; - delete_multipart_upload(&client, bucket, key, timestamp_id).await?; + if all { + delete_all_multipart_uploads(&client, bucket).await?; + } else { + delete_multipart_upload(&client, bucket, key.unwrap(), timestamp_id.unwrap()) + .await?; + } } MultpartCommands::List { bucket } => { let client: Client = build_client( diff --git a/src/multipart/delete.rs b/src/multipart/delete.rs index f386147..f273e2e 100644 --- a/src/multipart/delete.rs +++ b/src/multipart/delete.rs @@ -61,3 +61,36 @@ pub async fn delete_multipart_upload( Ok(()) } + +/// Deletes all incomplete multipart uploads from an S3 bucket +/// # Arguments +/// * `client` - A reference to the S3 client +/// * `bucket` - The name of the bucket +/// # Returns +/// * `Result<(), anyhow::Error>` - `Ok(())` if successful, error if the operation fails +pub async fn delete_all_multipart_uploads( + client: &Client, + bucket: String, +) -> Result<(), anyhow::Error> { + let res: ListMultipartUploadsOutput = client + .list_multipart_uploads() + .bucket(&bucket) + .send() + .await?; + + if res.uploads.is_none() { + return Ok(()); + } + + for upload in res.uploads.unwrap().iter() { + client + .abort_multipart_upload() + .bucket(bucket.clone()) + .key(upload.key().unwrap().to_string()) + .upload_id(upload.upload_id.clone().unwrap()) + .send() + .await?; + } + + Ok(()) +} From f0d6e5fdf72dfe2d6b105ff7ca1ce37f2d9fac51 Mon Sep 17 00:00:00 2001 From: 0x5b62656e5d Date: Sat, 11 Apr 2026 20:09:06 +0800 Subject: [PATCH 2/5] Add yes flag to file deletes --- src/cli.rs | 3 +++ src/main.rs | 22 +++++++++++++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/src/cli.rs b/src/cli.rs index a39345b..5254cf3 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -55,6 +55,9 @@ pub enum FileCommands { #[arg(short, long)] force: bool, + + #[arg(short, long)] + yes: bool, }, Download { #[arg(required = true)] diff --git a/src/main.rs b/src/main.rs index edf9a78..9ab52cd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -83,13 +83,33 @@ async fn main() -> Result<()> { println!("{}", list_files(&client, &bucket).await?); } - FileCommands::Delete { bucket, key, force } => { + FileCommands::Delete { + bucket, + key, + force, + yes, + } => { let client: Client = build_client( &config.default, get_bucket_region(&mut regions, bucket.clone(), &default_client).await?, ) .await?; + if yes { + delete_file( + &client, + bucket, + key.clone(), + !config.default.endpoint_url.contains("cloudflare"), + force, + ) + .await?; + + println!("Deleted {:?} successfully", key.clone()); + + return Ok(()); + } + match Confirm::new(&format!( "Are you sure you want to delete the file {:?} from bucket {:?}? (y/n)", key.clone(), From 14b236fe6f2772617b62f777e18b1fcf928c878b Mon Sep 17 00:00:00 2001 From: 0x5b62656e5d Date: Sat, 11 Apr 2026 23:05:46 +0800 Subject: [PATCH 3/5] Implement concurrent multipart uploads --- src/files/upload.rs | 452 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 372 insertions(+), 80 deletions(-) diff --git a/src/files/upload.rs b/src/files/upload.rs index 7f1a276..831aea5 100644 --- a/src/files/upload.rs +++ b/src/files/upload.rs @@ -1,27 +1,49 @@ use anyhow::bail; use aws_sdk_s3::{ Client, - operation::create_multipart_upload::CreateMultipartUploadOutput, + operation::{ + create_multipart_upload::CreateMultipartUploadOutput, upload_part::UploadPartOutput, + }, primitives::{ByteStream, Length}, types::{CompletedMultipartUpload, CompletedPart}, }; use std::{ fs, io::{Write, stdout}, - path::Path, + path::{Path, PathBuf}, sync::{ Arc, - atomic::{AtomicBool, AtomicU64, Ordering}, + atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, }, time::Duration, }; -use tokio::time::interval; +use tokio::{ + sync::{Mutex, MutexGuard}, + task::JoinSet, + time::{Interval, interval}, +}; use tree_magic::from_u8; -const CHUNK_SIZE: u64 = 1024 * 1024 * 10; // 10 MB +const CHUNK_SIZE: u64 = 1024 * 1024 * 8; // 8 MB const MAX_CHUNKS: u64 = 10000; +const MAX_CONCURRENCY: usize = 32; const SPINNER_FRAMES: &[&str] = &["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]; +struct SharedState { + uploaded_bytes_total: AtomicU64, + uploaded_bytes_window: AtomicU64, + active_uploads: AtomicUsize, + next_chunk_idx: AtomicU64, + target_concurrency: AtomicUsize, + stop_flag: AtomicBool, + uploaded_parts: Mutex>, + client: Client, + bucket: String, + key: String, + upload_id: String, + path: PathBuf, +} + /// Uploads a file to an S3 bucket at the specified key (path). /// # Arguments /// * `client` - A reference to the S3 client @@ -38,7 +60,7 @@ pub async fn upload_file( ) -> Result<(), anyhow::Error> { let path: &Path = Path::new(&file_path); - let file_size = tokio::fs::metadata(path) + let file_size: u64 = tokio::fs::metadata(path) .await .expect("Failed to get file metadata") .len(); @@ -62,6 +84,20 @@ pub async fn upload_file( return Ok(()); } + let mut chunk_count: u64 = (file_size / CHUNK_SIZE) + 1; + let mut prev_chunk_size: u64 = file_size % CHUNK_SIZE; + if prev_chunk_size == 0 { + prev_chunk_size = CHUNK_SIZE; + chunk_count -= 1; + } + + if chunk_count > MAX_CHUNKS { + bail!( + "File is too large to upload. Maximum number of chunks is {}", + MAX_CHUNKS + ); + } + let multipart_upload_res: CreateMultipartUploadOutput = client .create_multipart_upload() .bucket(&bucket) @@ -73,111 +109,367 @@ pub async fn upload_file( anyhow::anyhow!("Failed to initiate multipart upload: No upload ID returned") })?; - let mut chunk_count = (file_size / CHUNK_SIZE) + 1; - let mut size_of_last_chunk = file_size % CHUNK_SIZE; - if size_of_last_chunk == 0 { - size_of_last_chunk = CHUNK_SIZE; - chunk_count -= 1; + let state: Arc = Arc::new(SharedState { + uploaded_bytes_total: AtomicU64::new(0), + uploaded_bytes_window: AtomicU64::new(0), + active_uploads: AtomicUsize::new(0), + next_chunk_idx: AtomicU64::new(0), + target_concurrency: AtomicUsize::new(4), + stop_flag: AtomicBool::new(false), + uploaded_parts: Mutex::new(Vec::new()), + client: client.clone(), + bucket: bucket.to_string(), + key: key.to_string(), + upload_id: upload_id.to_string(), + path: path.to_path_buf(), + }); + + let file_size_spinner_task: u64 = file_size; + let state_spinner_task: Arc = Arc::clone(&state); + + let spinner_task: tokio::task::JoinHandle<()> = + spawn_progress_task(state_spinner_task, file_size_spinner_task); + + let controller_handle: tokio::task::JoinHandle<()> = + spawn_controller(state.clone(), MAX_CONCURRENCY, file_size); + + let scheduler_result: Result<(), anyhow::Error> = + run_scheduler(chunk_count, prev_chunk_size, state.clone()).await; + state.stop_flag.store(true, Ordering::Relaxed); + let _ = spinner_task.await; + let _ = controller_handle.await; + + println!(); + + if let Err(e) = scheduler_result { + let _ = client + .abort_multipart_upload() + .bucket(&bucket) + .key(&key) + .upload_id(upload_id) + .send() + .await; + + bail!("Upload failed: {}", e); } - if chunk_count > MAX_CHUNKS { + let mut parts: tokio::sync::MutexGuard<'_, Vec> = + state.uploaded_parts.lock().await; + parts.sort_by_key(|part| part.part_number); + + if parts.len() as u64 != chunk_count { + let _ = client + .abort_multipart_upload() + .bucket(&bucket) + .key(&key) + .upload_id(upload_id) + .send() + .await; + bail!( - "File is too large to upload. Maximum number of chunks is {}", - MAX_CHUNKS + "Upload failed: Expected {} parts but got {}", + chunk_count, + parts.len() ); } - let mut upload_parts: Vec = Vec::new(); + let completed_multipart_upload: CompletedMultipartUpload = CompletedMultipartUpload::builder() + .set_parts(Some(parts.clone())) + .build(); - let is_uploading = Arc::new(AtomicBool::new(true)); - let is_uploading_task = Arc::clone(&is_uploading); + let _ = client + .complete_multipart_upload() + .bucket(&bucket) + .key(&key) + .multipart_upload(completed_multipart_upload) + .upload_id(upload_id) + .send() + .await?; - let spinner_progress = Arc::new(AtomicU64::new(0)); - let spinner_progress_task = Arc::clone(&spinner_progress); + Ok(()) +} - let task = tokio::spawn(async move { - let mut ticker = interval(Duration::from_millis(100)); - let mut frame_idx: usize = 0; +fn spawn_controller( + state: Arc, + max_concurrency: usize, + file_size: u64, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + let mut ticker: Interval = interval(Duration::from_secs(2)); + let mut smoothed_us_mb_s: f64 = 0.0; + let mut prev_diff: i8 = 1; + let mut baseline_throughput: Option = None; + let mut cooldown: usize = 0; + let mut current_target: usize = state.target_concurrency.load(Ordering::Relaxed); + let mut best_target: usize = current_target; + let mut best_throughput: f64 = 0.0; + let mut prev_target: usize = current_target; + let mut stable_windows: usize = 0; loop { + if state.stop_flag.load(Ordering::Relaxed) { + break; + } + ticker.tick().await; - let progress = spinner_progress_task.load(Ordering::Relaxed); - print!( - "\r{}% {}", - (progress) * 100 / chunk_count, - SPINNER_FRAMES[frame_idx] - ); - stdout().flush().unwrap(); + if cooldown > 0 { + cooldown -= 1; + continue; + } - frame_idx += 1; + if state.uploaded_bytes_total.load(Ordering::Relaxed) * 100 / file_size >= 80 { + current_target = best_target; + state + .target_concurrency + .store(current_target, Ordering::Relaxed); + continue; + } - if frame_idx >= SPINNER_FRAMES.len() { - frame_idx = 0; + let window_bytes: u64 = state.uploaded_bytes_window.swap(0, Ordering::Relaxed); + + let instant_us_mb_s: f64 = (window_bytes as f64) / (2.0 * 1024.0 * 1024.0); + + if instant_us_mb_s > 0.0 { + if smoothed_us_mb_s == 0.0 { + smoothed_us_mb_s = instant_us_mb_s; + } else { + smoothed_us_mb_s = (0.7 * smoothed_us_mb_s) + (0.3 * instant_us_mb_s); + } } - if chunk_count == progress || !is_uploading_task.load(Ordering::Relaxed) { - break; + if current_target == prev_target { + stable_windows += 1; + } else { + stable_windows = 0; + prev_target = current_target; + } + + if state.active_uploads.load(Ordering::Relaxed) >= current_target.saturating_sub(1) + && current_target == prev_target + && stable_windows >= 2 + && smoothed_us_mb_s > best_throughput * 1.03 + { + best_throughput = smoothed_us_mb_s; + best_target = current_target; } - } - }); - let upload_res: Result<(), anyhow::Error> = async { - for chunk_index in 0..chunk_count { - let this_chunk = if chunk_count - 1 == chunk_index { - size_of_last_chunk + if baseline_throughput.is_none() { + baseline_throughput = Some(smoothed_us_mb_s); + current_target = (current_target + 1).min(max_concurrency); + state + .target_concurrency + .store(current_target, Ordering::Relaxed); + cooldown = 1; + continue; + } + + let prev: f64 = baseline_throughput.unwrap(); + + let rel_diff: f64 = if prev > 0.0 { + (smoothed_us_mb_s - prev) / prev } else { - CHUNK_SIZE + 0.0 }; - let stream = ByteStream::read_from() - .path(path) - .offset(chunk_index * CHUNK_SIZE) - .length(Length::Exact(this_chunk)) - .build() - .await - .unwrap(); - - let part_number = (chunk_index as i32) + 1; - let upload_part_res = client - .upload_part() - .key(&key) - .bucket(&bucket) - .upload_id(upload_id) - .body(stream) - .part_number(part_number) - .send() + + if rel_diff >= 0.05 { + current_target = if prev_diff > 0 { + (current_target + 1).min(max_concurrency) + } else { + current_target.saturating_sub(1).max(2) + }; + + baseline_throughput = Some(smoothed_us_mb_s); + cooldown = 1; + } else if rel_diff <= -0.2 { + if smoothed_us_mb_s < best_throughput * 0.90 { + current_target = best_target; + baseline_throughput = Some(best_throughput); + cooldown = 1; + prev_diff = 1; // or keep previous direction, depending on how you want probing to resume + } else { + prev_diff = -prev_diff; + + current_target = if prev_diff > 0 { + (current_target + 1).min(max_concurrency) + } else { + current_target.saturating_sub(1).max(2) + }; + + baseline_throughput = Some(smoothed_us_mb_s); + cooldown = 1; + } + } else { + baseline_throughput = Some(smoothed_us_mb_s); + } + + state + .target_concurrency + .store(current_target, Ordering::Relaxed); + } + }) +} + +async fn run_scheduler( + chunk_count: u64, + prev_chunk_size: u64, + state: Arc, +) -> Result<(), anyhow::Error> { + let mut join_set: JoinSet> = JoinSet::new(); + let mut scheduler_error: Option = None; + let mut aborting: bool = false; + + loop { + while !aborting + && state.active_uploads.load(Ordering::Relaxed) + < state.target_concurrency.load(Ordering::Relaxed) + && state.next_chunk_idx.load(Ordering::Relaxed) < chunk_count + { + let chunk_idx: u64 = state.next_chunk_idx.fetch_add(1, Ordering::Relaxed); + let chunk_size: u64 = chunk_size_for(chunk_idx, chunk_count, prev_chunk_size); + + state.active_uploads.fetch_add(1, Ordering::Relaxed); + + let state_clone = Arc::clone(&state); + + join_set.spawn(async move { + let part_number = (chunk_idx as i32) + 1; + + let completed_part = upload_part( + state_clone, + part_number, + chunk_idx, + chunk_size, + ) .await?; - upload_parts.push( - CompletedPart::builder() - .e_tag(upload_part_res.e_tag.unwrap_or_default()) - .part_number(part_number) - .build(), - ); + Ok((completed_part, chunk_size)) + }); + } + + if state.next_chunk_idx.load(Ordering::Relaxed) >= chunk_count + && state.active_uploads.load(Ordering::Relaxed) == 0 + { + break; + } + + if let Some(result) = join_set.join_next().await { + state.active_uploads.fetch_sub(1, Ordering::Relaxed); - spinner_progress.store(chunk_index + 1, Ordering::Relaxed); + match result { + Ok(Ok((completed_part, bytes_uploaded))) => { + if !aborting { + { + let mut parts: MutexGuard<'_, Vec> = + state.uploaded_parts.lock().await; + parts.push(completed_part); + } + state + .uploaded_bytes_total + .fetch_add(bytes_uploaded, Ordering::Relaxed); + state + .uploaded_bytes_window + .fetch_add(bytes_uploaded, Ordering::Relaxed); + } + } + Ok(Err(e)) => { + if !aborting { + aborting = true; + scheduler_error = Some(anyhow::anyhow!("Upload part failed: {}", e)); + join_set.abort_all(); + } + } + Err(join_error) => { + if !aborting { + aborting = true; + scheduler_error = Some(anyhow::anyhow!("Task join error: {}", join_error)); + join_set.abort_all(); + } + } + } } - Ok(()) } - .await; - is_uploading.store(false, Ordering::Relaxed); - let _ = task.await; + state.active_uploads.store(0, Ordering::Relaxed); - upload_res?; + if let Some(error) = scheduler_error { + bail!("Upload failed: {}", error); + } - let completed_multipart_upload: CompletedMultipartUpload = CompletedMultipartUpload::builder() - .set_parts(Some(upload_parts)) - .build(); + Ok(()) +} - let _ = client - .complete_multipart_upload() - .bucket(&bucket) - .key(&key) - .multipart_upload(completed_multipart_upload) - .upload_id(upload_id) +async fn upload_part( + state: Arc, + part_number: i32, + chunk_idx: u64, + chunk_size: u64, +) -> Result { + let stream: ByteStream = ByteStream::read_from() + .path(state.path.clone()) + .offset(chunk_idx * CHUNK_SIZE) + .length(Length::Exact(chunk_size)) + .build() + .await?; + + let upload_part_res: UploadPartOutput = state.client + .upload_part() + .key(state.key.clone()) + .bucket(state.bucket.clone()) + .upload_id(state.upload_id.clone()) + .body(stream) + .part_number(part_number) .send() .await?; - Ok(()) + Ok(CompletedPart::builder() + .e_tag(upload_part_res.e_tag.unwrap_or_default()) + .part_number(part_number) + .build()) +} + +fn spawn_progress_task( + state_spinner_task: Arc, + file_size_spinner_task: u64, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + let mut ticker: Interval = interval(Duration::from_millis(100)); + let mut frame_idx: usize = 0; + + loop { + ticker.tick().await; + + print!( + "\r{}% {}", + state_spinner_task + .uploaded_bytes_total + .load(Ordering::Relaxed) + * 100 + / file_size_spinner_task, + SPINNER_FRAMES[frame_idx], + ); + + if let Err(e) = stdout().flush() { + eprintln!("Failed to flush stdout: {}", e); + } + + frame_idx += 1; + + if frame_idx >= SPINNER_FRAMES.len() { + frame_idx = 0; + } + + if state_spinner_task.stop_flag.load(Ordering::Relaxed) { + break; + } + } + }) +} + +fn chunk_size_for(chunk_index: u64, chunk_count: u64, prev_chunk_size: u64) -> u64 { + if chunk_index == chunk_count - 1 { + prev_chunk_size + } else { + CHUNK_SIZE + } } From ea3fcc4500a581a8b0134c56aa6c8c8979f60d0a Mon Sep 17 00:00:00 2001 From: 0x5b62656e5d Date: Sat, 11 Apr 2026 23:22:08 +0800 Subject: [PATCH 4/5] Add verbosity flag in file upload command --- README.md | 8 +++-- src/cli.rs | 13 ++++---- src/files/upload.rs | 72 +++++++++++++++++++++++++++++++++------------ src/main.rs | 4 ++- 4 files changed, 67 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index 5ac53de..f45bc1e 100644 --- a/README.md +++ b/README.md @@ -56,16 +56,20 @@ COMMANDS files list Lists all the files in a bucket - files delete [-f | --force] + files delete [-f | --force] [-y | --yes] Deletes a file in a bucket Apply the `-f` or `--force` flag to delete all versions of a file (Not supported for R2) + Apply the `-y` or `--yes` flag to bypass confirmation files download [-o ] Downloads a file from a bucket to a given location (optionally rename it) - files upload [-o ] + files upload [-o ] [-v | --verbose] Uploads a file into a bucket (optionally rename it) + Apply the `-v` or `--verbose` flag for verbose output multipart list Lists all multipart uploads in a bucket multipart delete Deletes a multipart upload in a bucket + multipart delete <-a | --all> + Deletes all multipart upload in a bucket ``` \ No newline at end of file diff --git a/src/cli.rs b/src/cli.rs index 5254cf3..78efeb6 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -81,6 +81,9 @@ pub enum FileCommands { #[arg(short, long)] override_filename: Option, + + #[arg(short, long, default_value = "false")] + verbose: bool, }, } @@ -93,16 +96,10 @@ pub enum MultpartCommands { #[arg(short, long)] all: bool, - #[arg( - required_unless_present = "all", - conflicts_with = "all" - )] + #[arg(required_unless_present = "all", conflicts_with = "all")] key: Option, - #[arg( - required_unless_present = "all", - conflicts_with = "all" - )] + #[arg(required_unless_present = "all", conflicts_with = "all")] timestamp_id: Option, }, List { diff --git a/src/files/upload.rs b/src/files/upload.rs index 831aea5..b16996d 100644 --- a/src/files/upload.rs +++ b/src/files/upload.rs @@ -15,7 +15,7 @@ use std::{ Arc, atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, }, - time::Duration, + time::{Duration, Instant}, }; use tokio::{ sync::{Mutex, MutexGuard}, @@ -42,6 +42,7 @@ struct SharedState { key: String, upload_id: String, path: PathBuf, + upload_speed_kb: AtomicU64, } /// Uploads a file to an S3 bucket at the specified key (path). @@ -57,14 +58,20 @@ pub async fn upload_file( bucket: String, key: String, file_path: String, + verbose: bool, ) -> Result<(), anyhow::Error> { let path: &Path = Path::new(&file_path); + let start: Instant = Instant::now(); let file_size: u64 = tokio::fs::metadata(path) .await .expect("Failed to get file metadata") .len(); + if verbose { + println!("File size: {} bytes", file_size); + } + if file_size == 0 { bail!("Bad file size."); } @@ -81,6 +88,8 @@ pub async fn upload_file( .send() .await?; + println!("Upload completed in {} seconds", start.elapsed().as_secs()); + return Ok(()); } @@ -91,6 +100,10 @@ pub async fn upload_file( chunk_count -= 1; } + if verbose { + println!("Total chunks: {}", chunk_count); + } + if chunk_count > MAX_CHUNKS { bail!( "File is too large to upload. Maximum number of chunks is {}", @@ -122,13 +135,14 @@ pub async fn upload_file( key: key.to_string(), upload_id: upload_id.to_string(), path: path.to_path_buf(), + upload_speed_kb: AtomicU64::new(0), }); let file_size_spinner_task: u64 = file_size; let state_spinner_task: Arc = Arc::clone(&state); let spinner_task: tokio::task::JoinHandle<()> = - spawn_progress_task(state_spinner_task, file_size_spinner_task); + spawn_progress_task(state_spinner_task, file_size_spinner_task, verbose); let controller_handle: tokio::task::JoinHandle<()> = spawn_controller(state.clone(), MAX_CONCURRENCY, file_size); @@ -186,6 +200,8 @@ pub async fn upload_file( .send() .await?; + println!("Upload completed in {} seconds", start.elapsed().as_secs()); + Ok(()) } @@ -238,6 +254,10 @@ fn spawn_controller( } } + state + .upload_speed_kb + .store((smoothed_us_mb_s * 1024.0) as u64, Ordering::Relaxed); + if current_target == prev_target { stable_windows += 1; } else { @@ -335,13 +355,8 @@ async fn run_scheduler( join_set.spawn(async move { let part_number = (chunk_idx as i32) + 1; - let completed_part = upload_part( - state_clone, - part_number, - chunk_idx, - chunk_size, - ) - .await?; + let completed_part = + upload_part(state_clone, part_number, chunk_idx, chunk_size).await?; Ok((completed_part, chunk_size)) }); @@ -412,7 +427,8 @@ async fn upload_part( .build() .await?; - let upload_part_res: UploadPartOutput = state.client + let upload_part_res: UploadPartOutput = state + .client .upload_part() .key(state.key.clone()) .bucket(state.bucket.clone()) @@ -431,6 +447,7 @@ async fn upload_part( fn spawn_progress_task( state_spinner_task: Arc, file_size_spinner_task: u64, + verbose: bool, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let mut ticker: Interval = interval(Duration::from_millis(100)); @@ -439,15 +456,32 @@ fn spawn_progress_task( loop { ticker.tick().await; - print!( - "\r{}% {}", - state_spinner_task - .uploaded_bytes_total - .load(Ordering::Relaxed) - * 100 - / file_size_spinner_task, - SPINNER_FRAMES[frame_idx], - ); + if verbose { + print!( + "\r{}% {} - Concurrent uploads: {} - Target concurrency: {} - Upload speed: {:.2} MB/s", + state_spinner_task + .uploaded_bytes_total + .load(Ordering::Relaxed) + * 100 + / file_size_spinner_task, + SPINNER_FRAMES[frame_idx], + state_spinner_task.active_uploads.load(Ordering::Relaxed), + state_spinner_task + .target_concurrency + .load(Ordering::Relaxed), + state_spinner_task.upload_speed_kb.load(Ordering::Relaxed) as f64 / 1024.0 + ); + } else { + print!( + "\r{}% {}", + state_spinner_task + .uploaded_bytes_total + .load(Ordering::Relaxed) + * 100 + / file_size_spinner_task, + SPINNER_FRAMES[frame_idx], + ); + } if let Err(e) = stdout().flush() { eprintln!("Failed to flush stdout: {}", e); diff --git a/src/main.rs b/src/main.rs index 9ab52cd..f47a0cc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -158,6 +158,7 @@ async fn main() -> Result<()> { bucket, location, override_filename, + verbose, } => { let client: Client = build_client( &config.default, @@ -166,13 +167,14 @@ async fn main() -> Result<()> { .await?; if let Some(filename) = override_filename { - upload_file(&client, bucket, filename, location.clone()).await?; + upload_file(&client, bucket, filename, location.clone(), verbose).await?; } else { upload_file( &client, bucket, location.clone().split('/').next_back().unwrap().to_string(), location.clone(), + verbose, ) .await?; } From da0e5b64cc1476f9767f48615d4b352142230170 Mon Sep 17 00:00:00 2001 From: 0x5b62656e5d Date: Sat, 11 Apr 2026 23:22:23 +0800 Subject: [PATCH 5/5] Bump version --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 110ea0c..4e11fd7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2018,7 +2018,7 @@ dependencies = [ [[package]] name = "pepper-s3-cli" -version = "1.4.0" +version = "1.5.0" dependencies = [ "anyhow", "aws-config", diff --git a/Cargo.toml b/Cargo.toml index 3abf5b3..4cc2b11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pepper-s3-cli" -version = "1.4.0" +version = "1.5.0" edition = "2024" description = "A simple CLI tool to manage S3 buckets and files from the terminal" license = "MIT"