From f369a725010eae156f24b81644c58f3f357ad1d5 Mon Sep 17 00:00:00 2001 From: link2xt Date: Thu, 24 Oct 2024 22:28:14 +0000 Subject: [PATCH] feat: compress backups with gzip --- Cargo.lock | 1 + Cargo.toml | 1 + src/imex.rs | 72 ++++++++++++++++++++++++++------------------ src/imex/transfer.rs | 14 ++++++--- 4 files changed, 54 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 14069e547d..941568a40f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1298,6 +1298,7 @@ dependencies = [ "anyhow", "async-broadcast", "async-channel 2.3.1", + "async-compression", "async-imap", "async-native-tls", "async-smtp", diff --git a/Cargo.toml b/Cargo.toml index d9142aa5b0..26affa3d7f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ anyhow = { workspace = true } async-broadcast = "0.7.1" async-channel = { workspace = true } async-imap = { version = "0.10.2", default-features = false, features = ["runtime-tokio", "compress"] } +async-compression = { version = "0.4.15", default-features = false, features = ["tokio", "gzip"] } async-native-tls = { version = "0.5", default-features = false, features = ["runtime-tokio"] } async-smtp = { version = "0.9", default-features = false, features = ["runtime-tokio"] } async_zip = { version = "0.0.17", default-features = false, features = ["deflate", "tokio-fs"] } diff --git a/src/imex.rs b/src/imex.rs index f99115cb6f..5409f619fc 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -11,7 +11,7 @@ use futures_lite::FutureExt; use pin_project::pin_project; use tokio::fs::{self, File}; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf}; use tokio_tar::Archive; use crate::blob::BlobDirContents; @@ -123,7 +123,7 @@ pub async fn has_backup(_context: &Context, dir_name: &Path) -> Result { let name = dirent.file_name(); let name: String = name.to_string_lossy().into(); if name.starts_with("delta-chat") - && name.ends_with(".tar") + && (name.ends_with(".tar") || name.ends_with(".tar.gz")) && (newest_backup_name.is_empty() || name > newest_backup_name) { // We just use string comparison to determine which backup is newer. @@ -269,30 +269,24 @@ async fn import_backup( context.get_dbfile().display() ); - import_backup_stream(context, backup_file, file_size, passphrase).await?; + let backup_file = ProgressReader::new(backup_file, context.clone(), file_size); + if backup_to_import.extension() == Some(OsStr::new("gz")) { + let backup_file = tokio::io::BufReader::new(backup_file); + let backup_file = async_compression::tokio::bufread::GzipDecoder::new(backup_file); + import_backup_stream(context, backup_file, passphrase).await?; + } else { + import_backup_stream(context, backup_file, passphrase).await?; + } Ok(()) } /// Imports backup by reading a tar file from a stream. -/// -/// `file_size` is used to calculate the progress -/// and emit progress events. -/// Ideally it is the sum of the entry -/// sizes without the header overhead, -/// but can be estimated as tar file size -/// in which case the progress is underestimated -/// and may not reach 99.9% by the end of import. -/// Underestimating is better than -/// overestimating because the progress -/// jumps to 100% instead of getting stuck at 99.9% -/// for some time. pub(crate) async fn import_backup_stream( context: &Context, backup_file: R, - file_size: u64, passphrase: String, ) -> Result<()> { - import_backup_stream_inner(context, backup_file, file_size, passphrase) + import_backup_stream_inner(context, backup_file, passphrase) .await .0 } @@ -319,6 +313,19 @@ struct ProgressReader { } impl ProgressReader { + /// Creates a new `ProgressReader`. + /// + /// `file_size` is used to calculate the progress + /// and emit progress events. + /// Ideally it is the sum of the entry + /// sizes without the header overhead, + /// but can be estimated as tar file size + /// in which case the progress is underestimated + /// and may not reach 99.9% by the end of import. + /// Underestimating is better than + /// overestimating because the progress + /// jumps to 100% instead of getting stuck at 99.9% + /// for some time. fn new(r: R, context: Context, file_size: u64) -> Self { Self { inner: r, @@ -358,10 +365,8 @@ where async fn import_backup_stream_inner( context: &Context, backup_file: R, - file_size: u64, passphrase: String, ) -> (Result<()>,) { - let backup_file = ProgressReader::new(backup_file, context.clone(), file_size); let mut archive = Archive::new(backup_file); let mut entries = match archive.entries() { @@ -461,10 +466,10 @@ fn get_next_backup_path( tempdbfile.push(format!("{stem}-{i:02}-{addr}.db")); let mut tempfile = folder.clone(); - tempfile.push(format!("{stem}-{i:02}-{addr}.tar.part")); + tempfile.push(format!("{stem}-{i:02}-{addr}.tar.gz.part")); let mut destfile = folder.clone(); - destfile.push(format!("{stem}-{i:02}-{addr}.tar")); + destfile.push(format!("{stem}-{i:02}-{addr}.tar.gz")); if !tempdbfile.exists() && !tempfile.exists() && !destfile.exists() { return Ok((tempdbfile, tempfile, destfile)); @@ -504,9 +509,13 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res file_size += blob.to_abs_path().metadata()?.len() } - export_backup_stream(context, &temp_db_path, blobdir, file, file_size) - .await - .context("Exporting backup to file failed")?; + let gzip_encoder = async_compression::tokio::write::GzipEncoder::new(file); + let mut gzip_encoder = + export_backup_stream(context, &temp_db_path, blobdir, gzip_encoder, file_size) + .await + .context("Exporting backup to file failed")?; + gzip_encoder.shutdown().await?; + fs::rename(temp_path, &dest_path).await?; context.emit_event(EventType::ImexFileWritten(dest_path)); Ok(()) @@ -543,6 +552,10 @@ impl ProgressWriter { context, } } + + fn into_inner(self) -> W { + self.inner + } } impl AsyncWrite for ProgressWriter @@ -590,12 +603,12 @@ pub(crate) async fn export_backup_stream<'a, W>( blobdir: BlobDirContents<'a>, writer: W, file_size: u64, -) -> Result<()> +) -> Result where W: tokio::io::AsyncWrite + tokio::io::AsyncWriteExt + Unpin + Send + 'static, { - let writer = ProgressWriter::new(writer, context.clone(), file_size); - let mut builder = tokio_tar::Builder::new(writer); + let progress_writer = ProgressWriter::new(writer, context.clone(), file_size); + let mut builder = tokio_tar::Builder::new(progress_writer); builder .append_path_with_name(temp_db_path, DBFILE_BACKUP_NAME) @@ -607,8 +620,9 @@ where builder.append_file(path_in_archive, &mut file).await?; } - builder.finish().await?; - Ok(()) + // Convert tar builder back into the underlying stream. + let progress_writer = builder.into_inner().await?; + Ok(progress_writer.into_inner()) } /// Imports secret key from a file. diff --git a/src/imex/transfer.rs b/src/imex/transfer.rs index d280dd8f88..e5a002aeac 100644 --- a/src/imex/transfer.rs +++ b/src/imex/transfer.rs @@ -36,12 +36,13 @@ use futures_lite::FutureExt; use iroh_net::relay::RelayMode; use iroh_net::Endpoint; use tokio::fs; +use tokio::io::AsyncWriteExt; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use crate::chat::add_device_msg; use crate::context::Context; -use crate::imex::BlobDirContents; +use crate::imex::{BlobDirContents, ProgressReader}; use crate::message::{Message, Viewtype}; use crate::qr::Qr; use crate::stock_str::backup_transfer_msg_body; @@ -190,9 +191,11 @@ impl BackupProvider { send_stream.write_all(&file_size.to_be_bytes()).await?; - export_backup_stream(&context, &dbfile, blobdir, send_stream, file_size) - .await - .context("Failed to write backup into QUIC stream")?; + let mut send_stream = + export_backup_stream(&context, &dbfile, blobdir, send_stream, file_size) + .await + .context("Failed to write backup into QUIC stream")?; + send_stream.shutdown().await?; info!(context, "Finished writing backup into QUIC stream."); let mut buf = [0u8; 1]; info!(context, "Waiting for acknowledgment."); @@ -310,7 +313,8 @@ pub async fn get_backup2( let mut file_size_buf = [0u8; 8]; recv_stream.read_exact(&mut file_size_buf).await?; let file_size = u64::from_be_bytes(file_size_buf); - import_backup_stream(context, recv_stream, file_size, passphrase) + let recv_stream = ProgressReader::new(recv_stream, context.clone(), file_size); + import_backup_stream(context, recv_stream, passphrase) .await .context("Failed to import backup from QUIC stream")?; info!(context, "Finished importing backup from the stream.");