Skip to content

Commit

Permalink
feat: compress backups with gzip
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed Oct 25, 2024
1 parent fc2b111 commit 189a093
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 37 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ anyhow = { workspace = true }
async-broadcast = "0.7.1"
async-channel = { workspace = true }
async-imap = { version = "0.10.2", default-features = false, features = ["runtime-tokio", "compress"] }
async-compression = { version = "0.4.15", default-features = false, features = ["tokio", "gzip"] }
async-native-tls = { version = "0.5", default-features = false, features = ["runtime-tokio"] }
async-smtp = { version = "0.9", default-features = false, features = ["runtime-tokio"] }
async_zip = { version = "0.0.17", default-features = false, features = ["deflate", "tokio-fs"] }
Expand Down
2 changes: 1 addition & 1 deletion deltachat-rpc-client/tests/test_securejoin.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def test_qr_securejoin(acfactory, protect, tmp_path):
# Setup second device for Alice
# to test observing securejoin protocol.
alice.export_backup(tmp_path)
files = list(tmp_path.glob("*.tar"))
files = list(tmp_path.glob("*.tar.gz"))
alice2 = acfactory.get_unconfigured_account()
alice2.import_backup(files[0])

Expand Down
4 changes: 2 additions & 2 deletions deltachat-rpc-client/tests/test_something.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ def test_import_export_backup(acfactory, tmp_path) -> None:
alice = acfactory.new_configured_account()
alice.export_backup(tmp_path)

files = list(tmp_path.glob("*.tar"))
files = list(tmp_path.glob("*.tar.gz"))
alice2 = acfactory.get_unconfigured_account()
alice2.import_backup(files[0])

Expand Down Expand Up @@ -630,7 +630,7 @@ def test_markseen_contact_request(acfactory, tmp_path):

# Bob sets up a second device.
bob.export_backup(tmp_path)
files = list(tmp_path.glob("*.tar"))
files = list(tmp_path.glob("*.tar.gz"))
bob2 = acfactory.get_unconfigured_account()
bob2.import_backup(files[0])
bob2.start_io()
Expand Down
72 changes: 43 additions & 29 deletions src/imex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures_lite::FutureExt;
use pin_project::pin_project;

use tokio::fs::{self, File};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio_tar::Archive;

use crate::blob::BlobDirContents;
Expand Down Expand Up @@ -123,7 +123,7 @@ pub async fn has_backup(_context: &Context, dir_name: &Path) -> Result<String> {
let name = dirent.file_name();
let name: String = name.to_string_lossy().into();
if name.starts_with("delta-chat")
&& name.ends_with(".tar")
&& (name.ends_with(".tar") || name.ends_with(".tar.gz"))
&& (newest_backup_name.is_empty() || name > newest_backup_name)
{
// We just use string comparison to determine which backup is newer.
Expand Down Expand Up @@ -269,30 +269,24 @@ async fn import_backup(
context.get_dbfile().display()
);

import_backup_stream(context, backup_file, file_size, passphrase).await?;
let backup_file = ProgressReader::new(backup_file, context.clone(), file_size);
if backup_to_import.extension() == Some(OsStr::new("gz")) {
let backup_file = tokio::io::BufReader::new(backup_file);
let backup_file = async_compression::tokio::bufread::GzipDecoder::new(backup_file);
import_backup_stream(context, backup_file, passphrase).await?;
} else {
import_backup_stream(context, backup_file, passphrase).await?;
}
Ok(())
}

/// Imports backup by reading a tar file from a stream.
///
/// `file_size` is used to calculate the progress
/// and emit progress events.
/// Ideally it is the sum of the entry
/// sizes without the header overhead,
/// but can be estimated as tar file size
/// in which case the progress is underestimated
/// and may not reach 99.9% by the end of import.
/// Underestimating is better than
/// overestimating because the progress
/// jumps to 100% instead of getting stuck at 99.9%
/// for some time.
pub(crate) async fn import_backup_stream<R: tokio::io::AsyncRead + Unpin>(
context: &Context,
backup_file: R,
file_size: u64,
passphrase: String,
) -> Result<()> {
import_backup_stream_inner(context, backup_file, file_size, passphrase)
import_backup_stream_inner(context, backup_file, passphrase)
.await
.0
}
Expand All @@ -319,6 +313,19 @@ struct ProgressReader<R> {
}

impl<R> ProgressReader<R> {
/// Creates a new `ProgressReader`.
///
/// `file_size` is used to calculate the progress
/// and emit progress events.
/// Ideally it is the sum of the entry
/// sizes without the header overhead,
/// but can be estimated as tar file size
/// in which case the progress is underestimated
/// and may not reach 99.9% by the end of import.
/// Underestimating is better than
/// overestimating because the progress
/// jumps to 100% instead of getting stuck at 99.9%
/// for some time.
fn new(r: R, context: Context, file_size: u64) -> Self {
Self {
inner: r,
Expand Down Expand Up @@ -358,10 +365,8 @@ where
async fn import_backup_stream_inner<R: tokio::io::AsyncRead + Unpin>(
context: &Context,
backup_file: R,
file_size: u64,
passphrase: String,
) -> (Result<()>,) {
let backup_file = ProgressReader::new(backup_file, context.clone(), file_size);
let mut archive = Archive::new(backup_file);

let mut entries = match archive.entries() {
Expand Down Expand Up @@ -461,10 +466,10 @@ fn get_next_backup_path(
tempdbfile.push(format!("{stem}-{i:02}-{addr}.db"));

let mut tempfile = folder.clone();
tempfile.push(format!("{stem}-{i:02}-{addr}.tar.part"));
tempfile.push(format!("{stem}-{i:02}-{addr}.tar.gz.part"));

let mut destfile = folder.clone();
destfile.push(format!("{stem}-{i:02}-{addr}.tar"));
destfile.push(format!("{stem}-{i:02}-{addr}.tar.gz"));

if !tempdbfile.exists() && !tempfile.exists() && !destfile.exists() {
return Ok((tempdbfile, tempfile, destfile));
Expand Down Expand Up @@ -504,9 +509,13 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res
file_size += blob.to_abs_path().metadata()?.len()
}

export_backup_stream(context, &temp_db_path, blobdir, file, file_size)
.await
.context("Exporting backup to file failed")?;
let gzip_encoder = async_compression::tokio::write::GzipEncoder::new(file);
let mut gzip_encoder =
export_backup_stream(context, &temp_db_path, blobdir, gzip_encoder, file_size)
.await
.context("Exporting backup to file failed")?;
gzip_encoder.shutdown().await?;

fs::rename(temp_path, &dest_path).await?;
context.emit_event(EventType::ImexFileWritten(dest_path));
Ok(())
Expand Down Expand Up @@ -543,6 +552,10 @@ impl<W> ProgressWriter<W> {
context,
}
}

fn into_inner(self) -> W {
self.inner
}
}

impl<W> AsyncWrite for ProgressWriter<W>
Expand Down Expand Up @@ -590,12 +603,12 @@ pub(crate) async fn export_backup_stream<'a, W>(
blobdir: BlobDirContents<'a>,
writer: W,
file_size: u64,
) -> Result<()>
) -> Result<W>
where
W: tokio::io::AsyncWrite + tokio::io::AsyncWriteExt + Unpin + Send + 'static,
{
let writer = ProgressWriter::new(writer, context.clone(), file_size);
let mut builder = tokio_tar::Builder::new(writer);
let progress_writer = ProgressWriter::new(writer, context.clone(), file_size);
let mut builder = tokio_tar::Builder::new(progress_writer);

builder
.append_path_with_name(temp_db_path, DBFILE_BACKUP_NAME)
Expand All @@ -607,8 +620,9 @@ where
builder.append_file(path_in_archive, &mut file).await?;
}

builder.finish().await?;
Ok(())
// Convert tar builder back into the underlying stream.
let progress_writer = builder.into_inner().await?;
Ok(progress_writer.into_inner())
}

/// Imports secret key from a file.
Expand Down
14 changes: 9 additions & 5 deletions src/imex/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ use futures_lite::FutureExt;
use iroh_net::relay::RelayMode;
use iroh_net::Endpoint;
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;

use crate::chat::add_device_msg;
use crate::context::Context;
use crate::imex::BlobDirContents;
use crate::imex::{BlobDirContents, ProgressReader};
use crate::message::{Message, Viewtype};
use crate::qr::Qr;
use crate::stock_str::backup_transfer_msg_body;
Expand Down Expand Up @@ -190,9 +191,11 @@ impl BackupProvider {

send_stream.write_all(&file_size.to_be_bytes()).await?;

export_backup_stream(&context, &dbfile, blobdir, send_stream, file_size)
.await
.context("Failed to write backup into QUIC stream")?;
let mut send_stream =
export_backup_stream(&context, &dbfile, blobdir, send_stream, file_size)
.await
.context("Failed to write backup into QUIC stream")?;
send_stream.shutdown().await?;
info!(context, "Finished writing backup into QUIC stream.");
let mut buf = [0u8; 1];
info!(context, "Waiting for acknowledgment.");
Expand Down Expand Up @@ -310,7 +313,8 @@ pub async fn get_backup2(
let mut file_size_buf = [0u8; 8];
recv_stream.read_exact(&mut file_size_buf).await?;
let file_size = u64::from_be_bytes(file_size_buf);
import_backup_stream(context, recv_stream, file_size, passphrase)
let recv_stream = ProgressReader::new(recv_stream, context.clone(), file_size);
import_backup_stream(context, recv_stream, passphrase)
.await
.context("Failed to import backup from QUIC stream")?;
info!(context, "Finished importing backup from the stream.");
Expand Down

0 comments on commit 189a093

Please sign in to comment.