Skip to content

Commit fee7e19

Browse files
committed
moss: parallelize 'state verify'
Use rayon to iterate over entries in parallel and use a Arc<Rwlock<>> to update the vec of issues atomically. Resolves #390.
1 parent cb1207e commit fee7e19

File tree

1 file changed

+70
-51
lines changed

1 file changed

+70
-51
lines changed

moss/src/client/verify.rs

Lines changed: 70 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@
22
//
33
// SPDX-License-Identifier: MPL-2.0
44

5+
use std::sync::atomic::{AtomicUsize, Ordering};
6+
use std::sync::{Arc, RwLock};
57
use std::{collections::BTreeSet, fmt, io, path::PathBuf};
68

79
use itertools::Itertools;
810

911
use fs_err as fs;
12+
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
1013
use stone::{payload::layout, write::digest};
1114
use tui::{
1215
dialoguer::{theme::ColorfulTheme, Confirm},
@@ -37,9 +40,6 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
3740
})
3841
.into_group_map();
3942

40-
let mut issues = vec![];
41-
let mut hasher = digest::Hasher::new();
42-
4343
let pb = ProgressBar::new(unique_assets.len() as u64)
4444
.with_message("Verifying")
4545
.with_style(
@@ -49,14 +49,18 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
4949
);
5050
pb.tick();
5151

52-
// For each asset, ensure it exists in the content store and isn't corrupt (hash is correct)
53-
for (hash, meta) in unique_assets
52+
let issues_arcrw = Arc::new(RwLock::new(Vec::new()));
53+
54+
let sorted_unique_assets = unique_assets
5455
.into_iter()
5556
.sorted_by_key(|(key, _)| format!("{key:0>32}"))
56-
{
57+
.collect::<Vec<_>>();
58+
59+
// For each asset, ensure it exists in the content store and isn't corrupt (hash is correct)
60+
sorted_unique_assets.par_iter().for_each(|(hash, meta)| {
5761
let display_hash = format!("{hash:0>32}");
5862

59-
let path = cache::asset_path(&client.installation, &hash);
63+
let path = cache::asset_path(&client.installation, hash);
6064

6165
let files = meta.iter().map(|(_, file)| file).cloned().collect::<BTreeSet<_>>();
6266

@@ -67,43 +71,55 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
6771
if verbose {
6872
pb.suspend(|| println!(" {} {display_hash} - {files:?}", "×".yellow()));
6973
}
70-
issues.push(Issue::MissingAsset {
71-
hash: display_hash,
72-
files,
73-
packages: meta.into_iter().map(|(package, _)| package).collect(),
74-
});
75-
continue;
74+
{
75+
let mut lock = issues_arcrw.write().unwrap();
76+
lock.push(Issue::MissingAsset {
77+
hash: display_hash,
78+
files,
79+
packages: meta.iter().map(|(package, _)| package).collect(),
80+
});
81+
}
82+
return;
7683
}
7784

85+
let mut hasher = digest::Hasher::new();
7886
hasher.reset();
7987

8088
let mut digest_writer = digest::Writer::new(io::sink(), &mut hasher);
81-
let mut file = fs::File::open(&path)?;
89+
let res = fs::File::open(&path);
90+
91+
if res.is_err() {
92+
return;
93+
}
94+
95+
let mut file = res.unwrap();
8296

83-
// Copy bytes to null sink so we don't
84-
// explode memory
85-
io::copy(&mut file, &mut digest_writer)?;
97+
// Copy bytes to null sink so we don't explode memory
98+
io::copy(&mut file, &mut digest_writer).unwrap_or_default();
8699

87100
let verified_hash = format!("{:02x}", hasher.digest128());
88101

89-
if verified_hash != hash {
102+
if &verified_hash != hash {
90103
pb.inc(1);
91104
if verbose {
92105
pb.suspend(|| println!(" {} {display_hash} - {files:?}", "×".yellow()));
93106
}
94-
issues.push(Issue::CorruptAsset {
95-
hash: display_hash,
96-
files,
97-
packages: meta.into_iter().map(|(package, _)| package).collect(),
98-
});
99-
continue;
107+
{
108+
let mut lock = issues_arcrw.write().unwrap();
109+
lock.push(Issue::CorruptAsset {
110+
hash: display_hash,
111+
files,
112+
packages: meta.iter().map(|(package, _)| package).collect(),
113+
});
114+
}
115+
return;
100116
}
101117

102118
pb.inc(1);
103119
if verbose {
104120
pb.suspend(|| println!(" {} {display_hash} - {files:?}", "»".green()));
105121
}
106-
}
122+
});
107123

108124
// Get all states
109125
let states = client.state_db.all()?;
@@ -114,13 +130,11 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
114130
println!("Verifying states");
115131
});
116132

117-
// Check the VFS of each state exists properly on the FS
118-
for state in &states {
133+
states.par_iter().for_each(|state| {
119134
pb.set_message(format!("Verifying state #{}", state.id));
120-
121135
let is_active = client.installation.active_state == Some(state.id);
122136

123-
let vfs = client.vfs(state.selections.iter().map(|s| &s.package))?;
137+
let vfs = client.vfs(state.selections.iter().map(|s| &s.package)).unwrap();
124138

125139
let base = if is_active {
126140
client.installation.root.join("usr")
@@ -130,7 +144,7 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
130144

131145
let files = vfs.iter().collect::<Vec<_>>();
132146

133-
let mut num_issues = 0;
147+
let counter = Arc::new(AtomicUsize::new(0));
134148

135149
for file in files {
136150
let path = base.join(file.path().strip_prefix("/usr/").unwrap_or_default());
@@ -144,33 +158,38 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
144158
Ok(true) => {}
145159
Ok(false) if path.is_symlink() => {}
146160
_ => {
147-
num_issues += 1;
148-
issues.push(Issue::MissingVFSPath { path, state: state.id });
161+
counter.fetch_add(1, Ordering::Relaxed);
162+
{
163+
let mut lock = issues_arcrw.write().unwrap();
164+
lock.push(Issue::MissingVFSPath { path, state: state.id });
165+
}
149166
}
150167
}
151168
}
152169

153170
pb.inc(1);
154171
if verbose {
155-
let mark = if num_issues > 0 { "×".yellow() } else { "»".green() };
172+
let mark = if counter.load(Ordering::Relaxed) > 0 {
173+
"×".yellow()
174+
} else {
175+
"»".green()
176+
};
156177
pb.suspend(|| println!(" {mark} state #{}", state.id));
157178
}
158-
}
179+
});
159180

160181
pb.finish_and_clear();
161182

162-
if issues.is_empty() {
183+
let lock = issues_arcrw.write().unwrap();
184+
185+
if lock.is_empty() {
163186
println!("No issues found");
164187
return Ok(());
165188
}
166189

167-
println!(
168-
"Found {} issue{}",
169-
issues.len(),
170-
if issues.len() == 1 { "" } else { "s" }
171-
);
190+
println!("Found {} issue{}", lock.len(), if lock.len() == 1 { "" } else { "s" });
172191

173-
for issue in &issues {
192+
for issue in lock.iter() {
174193
println!(" {} {issue}", "×".yellow());
175194
}
176195

@@ -187,15 +206,15 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
187206
}
188207

189208
// Calculate and resolve the unique set of packages with asset issues
190-
let issue_packages = issues
209+
let issue_packages = lock
191210
.iter()
192211
.filter_map(Issue::packages)
193212
.flatten()
194213
.collect::<BTreeSet<_>>()
195214
.into_iter()
196215
.map(|id| {
197216
client.install_db.get(id).map(|meta| Package {
198-
id: id.clone(),
217+
id: id.to_owned().to_owned(),
199218
meta,
200219
flags: package::Flags::default(),
201220
})
@@ -205,7 +224,7 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
205224
// We had some corrupt or missing assets, let's resolve that!
206225
if !issue_packages.is_empty() {
207226
// Remove all corrupt assets
208-
for corrupt_hash in issues.iter().filter_map(Issue::corrupt_hash) {
227+
for corrupt_hash in lock.iter().filter_map(Issue::corrupt_hash) {
209228
let path = cache::asset_path(&client.installation, corrupt_hash);
210229
fs::remove_file(&path)?;
211230
}
@@ -227,7 +246,7 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
227246
.any(|s| issue_packages.iter().any(|p| p.id == s.package))
228247
.then_some(&state.id)
229248
})
230-
.chain(issues.iter().filter_map(Issue::state))
249+
.chain(lock.iter().filter_map(Issue::state))
231250
.collect::<BTreeSet<_>>();
232251

233252
println!("Reblitting affected states");
@@ -277,24 +296,24 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
277296
}
278297

279298
#[derive(Debug)]
280-
enum Issue {
299+
enum Issue<'a> {
281300
CorruptAsset {
282301
hash: String,
283302
files: BTreeSet<String>,
284-
packages: BTreeSet<package::Id>,
303+
packages: BTreeSet<&'a package::Id>,
285304
},
286305
MissingAsset {
287306
hash: String,
288307
files: BTreeSet<String>,
289-
packages: BTreeSet<package::Id>,
308+
packages: BTreeSet<&'a package::Id>,
290309
},
291310
MissingVFSPath {
292311
path: PathBuf,
293312
state: state::Id,
294313
},
295314
}
296315

297-
impl Issue {
316+
impl Issue<'_> {
298317
fn corrupt_hash(&self) -> Option<&str> {
299318
match self {
300319
Issue::CorruptAsset { hash, .. } => Some(hash),
@@ -303,7 +322,7 @@ impl Issue {
303322
}
304323
}
305324

306-
fn packages(&self) -> Option<&BTreeSet<package::Id>> {
325+
fn packages(&self) -> Option<&BTreeSet<&package::Id>> {
307326
match self {
308327
Issue::CorruptAsset { packages, .. } | Issue::MissingAsset { packages, .. } => Some(packages),
309328
Issue::MissingVFSPath { .. } => None,
@@ -318,7 +337,7 @@ impl Issue {
318337
}
319338
}
320339

321-
impl fmt::Display for Issue {
340+
impl fmt::Display for Issue<'_> {
322341
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
323342
match self {
324343
Issue::CorruptAsset { hash, files, .. } => write!(f, "Corrupt asset {hash} - {files:?}"),

0 commit comments

Comments
 (0)