Skip to content

Commit

Permalink
fix(tarball): use rwlock instead of dashmap to avoid potential deadlock
Browse files Browse the repository at this point in the history
chore: fmt
  • Loading branch information
await-ovo committed Nov 17, 2023
1 parent 078359e commit ac2c01c
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 23 deletions.
14 changes: 0 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ async-recursion = { version = "1.0.5" }
clap = { version = "4", features = ["derive", "string"] }
command-extra = { version = "1.0.0" }
base64 = { version = "0.21.5" }
dashmap = { version = "5.5.3" }
derive_more = { version = "1.0.0-beta.3", features = ["full"] }
dunce = { version = "1.0.4" }
home = { version = "0.5.5" }
Expand Down
3 changes: 2 additions & 1 deletion crates/cli/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use pacquet_package_manifest::{PackageManifest, PackageManifestError};
use pacquet_tarball::MemCache;
use pipe_trait::Pipe;
use reqwest::Client;
use std::collections::HashMap;
use std::path::PathBuf;

/// Application state when running `pacquet run` or `pacquet install`.
Expand Down Expand Up @@ -44,7 +45,7 @@ impl State {
lockfile: call_load_lockfile(config.lockfile, Lockfile::load_from_current_dir)
.map_err(InitStateError::LoadLockfile)?,
http_client: Client::new(),
tarball_mem_cache: MemCache::new(),
tarball_mem_cache: MemCache::new(HashMap::new()),
})
}
}
Expand Down
1 change: 0 additions & 1 deletion crates/tarball/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ pacquet-fs = { workspace = true }
pacquet-store-dir = { workspace = true }

base64 = { workspace = true }
dashmap = { workspace = true }
derive_more = { workspace = true }
miette = { workspace = true }
pipe-trait = { workspace = true }
Expand Down
18 changes: 12 additions & 6 deletions crates/tarball/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use std::{
collections::HashMap,
ffi::OsString,
io::{Cursor, Read},
mem::drop,
path::PathBuf,
sync::Arc,
time::UNIX_EPOCH,
};

use base64::{engine::general_purpose::STANDARD as BASE64_STD, Engine};
use dashmap::DashMap;
use derive_more::{Display, Error, From};
use miette::Diagnostic;
use pacquet_fs::file_mode;
Expand Down Expand Up @@ -88,7 +88,7 @@ pub enum CacheValue {
/// Internal in-memory cache of tarballs.
///
/// The key of this hashmap is the url of each tarball.
pub type MemCache = DashMap<String, Arc<RwLock<CacheValue>>>;
pub type MemCache = RwLock<HashMap<String, Arc<RwLock<CacheValue>>>>;

#[instrument(skip(gz_data), fields(gz_data_len = gz_data.len()))]
fn decompress_gzip(gz_data: &[u8], unpacked_size: Option<usize>) -> Result<Vec<u8>, TarballError> {
Expand Down Expand Up @@ -130,9 +130,9 @@ impl<'a> DownloadTarballToStore<'a> {

// QUESTION: I see no copying from existing store_dir, is there such mechanism?
// TODO: If it's not implemented yet, implement it

if let Some(cache_lock) = mem_cache.get(package_url) {
let notify = match &*cache_lock.write().await {
let mem_cache_reader = mem_cache.read().await;
if let Some(cache_lock) = mem_cache_reader.get(package_url) {
let notify = match &*cache_lock.read().await {
CacheValue::Available(cas_paths) => {
return Ok(Arc::clone(cas_paths));
}
Expand All @@ -146,13 +146,19 @@ impl<'a> DownloadTarballToStore<'a> {
}
unreachable!("Failed to get or compute tarball data for {package_url:?}");
} else {
drop(mem_cache_reader);
let notify = Arc::new(Notify::new());
let cache_lock = notify
.pipe_ref(Arc::clone)
.pipe(CacheValue::InProgress)
.pipe(RwLock::new)
.pipe(Arc::new);
if mem_cache.insert(package_url.to_string(), Arc::clone(&cache_lock)).is_some() {
if mem_cache
.write()
.await
.insert(package_url.to_string(), Arc::clone(&cache_lock))
.is_some()
{
tracing::warn!(target: "pacquet::download", ?package_url, "Race condition detected when writing to cache");
}
let cas_paths = self.run_without_mem_cache().await?.pipe(Arc::new);
Expand Down

0 comments on commit ac2c01c

Please sign in to comment.