-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(tarball): use rwlock instead of dashmap to avoid potential deadlock #200
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,9 +6,10 @@ use command_extra::CommandExtra; | |
use pacquet_testing_utils::{ | ||
bin::{AddMockedRegistry, CommandTempCwd}, | ||
fs::{get_all_files, get_all_folders, is_symlink_or_junction}, | ||
misc::panic_after, | ||
}; | ||
use pipe_trait::Pipe; | ||
use std::fs; | ||
use std::{fs, thread, time::Duration}; | ||
|
||
#[test] | ||
fn should_install_dependencies() { | ||
|
@@ -131,3 +132,32 @@ fn should_install_index_files() { | |
|
||
drop((root, mock_instance)); // cleanup | ||
} | ||
|
||
#[test] | ||
fn should_install_duplicated_dependencies() { | ||
let CommandTempCwd { pacquet, root, workspace, npmrc_info, .. } = | ||
CommandTempCwd::init().add_mocked_registry(); | ||
let AddMockedRegistry { mock_instance, .. } = npmrc_info; | ||
|
||
eprintln!("Creating package.json..."); | ||
let manifest_path = workspace.join("package.json"); | ||
let package_json_content = serde_json::json!({ | ||
"dependencies": { | ||
"express": "4.18.2", | ||
}, | ||
"devDependencies": { | ||
"express": "4.18.2", | ||
}, | ||
}); | ||
fs::write(manifest_path, package_json_content.to_string()).expect("write to package.json"); | ||
|
||
panic_after(Duration::from_secs(30), move || { | ||
thread::sleep(Duration::from_millis(200)); | ||
eprintln!("Executing command..."); | ||
pacquet.with_arg("install").assert().success(); | ||
eprintln!("Make sure the package is installed"); | ||
assert!(is_symlink_or_junction(&workspace.join("node_modules/express")).unwrap()); | ||
assert!(workspace.join("node_modules/.pnpm/[email protected]").exists()); | ||
}); | ||
drop((root, mock_instance)); // cleanup | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,6 @@ use std::{ | |
}; | ||
|
||
use base64::{engine::general_purpose::STANDARD as BASE64_STD, Engine}; | ||
use dashmap::DashMap; | ||
use derive_more::{Display, Error, From}; | ||
use miette::Diagnostic; | ||
use pacquet_fs::file_mode; | ||
|
@@ -88,7 +87,7 @@ pub enum CacheValue { | |
/// Internal in-memory cache of tarballs. | ||
/// | ||
/// The key of this hashmap is the url of each tarball. | ||
pub type MemCache = DashMap<String, Arc<RwLock<CacheValue>>>; | ||
pub type MemCache = RwLock<HashMap<String, Arc<RwLock<CacheValue>>>>; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like it is perfect for our scene. I just tried I've also tried integrated-benchmark on the branch
It seems that integrated-benchmark on the branch Overall, thanks for your suggestion, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Frozen lockfile doesn't use memory cache so it's irrelevant (It invokes
The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for this package.json:
Run
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I want to use this package.json to do a benchmark, but there will be a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Try a smaller one, for example: {
"dependencies": {
"@babel/core": "^7.23.2",
"@tsfun/all": "0.0.37",
"@types/node": "16.18.39",
"cross-env": "7.0.3",
"glob-parent": "6.0.2",
"husky": "8.0.3",
"jest": "29.7.0",
"json5": "2.2.3",
"jsonwebtoken": "9.0.2",
"keyv": "4.5.3",
"react": "18.2.0",
"rimraf": "3.0.2",
"semver": "7.5.4",
"shx": "0.3.4",
"ts-jest": "29.1.1",
"ts-node": "10.9.1",
"typescript": "5.2.2",
"verdaccio": "5.27.0",
"yaml": "2.3.4"
}
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately, this will get an error: Maybe we should test the benchmark with a bigger repository after this is resolved. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you try combine your branches with #210 into 2 new branches and see if the "too many open files" error still exist? If the error disappear, can you try benchmark them? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried, but I still can't get the results of integrated-benchmark, it seems to be because of a circular dependency in alotta-files? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @zkochan It seems pacquet can't yet handle circular dependencies (without lockfile). Can you tell us which is the circular dependencies to remove? Also, @await-ovo, another PR was just merged, you should pull from upstream or rebase. |
||
|
||
#[instrument(skip(gz_data), fields(gz_data_len = gz_data.len()))] | ||
fn decompress_gzip(gz_data: &[u8], unpacked_size: Option<usize>) -> Result<Vec<u8>, TarballError> { | ||
|
@@ -130,9 +129,9 @@ impl<'a> DownloadTarballToStore<'a> { | |
|
||
// QUESTION: I see no copying from existing store_dir, is there such mechanism? | ||
// TODO: If it's not implemented yet, implement it | ||
|
||
if let Some(cache_lock) = mem_cache.get(package_url) { | ||
let notify = match &*cache_lock.write().await { | ||
let mem_cache_reader = mem_cache.read().await; | ||
if let Some(cache_lock) = mem_cache_reader.get(package_url) { | ||
let notify = match &*cache_lock.read().await { | ||
CacheValue::Available(cas_paths) => { | ||
return Ok(Arc::clone(cas_paths)); | ||
} | ||
|
@@ -146,13 +145,19 @@ impl<'a> DownloadTarballToStore<'a> { | |
} | ||
unreachable!("Failed to get or compute tarball data for {package_url:?}"); | ||
} else { | ||
drop(mem_cache_reader); | ||
let notify = Arc::new(Notify::new()); | ||
let cache_lock = notify | ||
.pipe_ref(Arc::clone) | ||
.pipe(CacheValue::InProgress) | ||
.pipe(RwLock::new) | ||
.pipe(Arc::new); | ||
if mem_cache.insert(package_url.to_string(), Arc::clone(&cache_lock)).is_some() { | ||
if mem_cache | ||
.write() | ||
.await | ||
.insert(package_url.to_string(), Arc::clone(&cache_lock)) | ||
.is_some() | ||
{ | ||
tracing::warn!(target: "pacquet::download", ?package_url, "Race condition detected when writing to cache"); | ||
} | ||
let cas_paths = self.run_without_mem_cache().await?.pipe(Arc::new); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
pub mod bin; | ||
pub mod fs; | ||
pub mod misc; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
use std::{sync::mpsc, thread, time::Duration}; | ||
|
||
pub fn panic_after<T, F>(d: Duration, f: F) -> T | ||
where | ||
T: Send + 'static, | ||
F: FnOnce() -> T, | ||
F: Send + 'static, | ||
{ | ||
let (done_tx, done_rx) = mpsc::channel(); | ||
let handle = thread::spawn(move || { | ||
let val = f(); | ||
done_tx.send(()).expect("Unable to send completion signal"); | ||
val | ||
}); | ||
|
||
match done_rx.recv_timeout(d) { | ||
Ok(_) => handle.join().expect("test panicked"), | ||
Err(_) => panic!("test timeout"), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before rebase the branch main, i.e. without mock-registry, the test passed, but now it doesn't seem to be able to install
express
, so it will take a bit more time to see what's going on.