Skip to content

Commit ea7ab01

Browse files
fix(installation): extract handle creation to named functions
1 parent 1830dcb commit ea7ab01

File tree

1 file changed

+110
-79
lines changed

1 file changed

+110
-79
lines changed

src/dist/manifestation.rs

Lines changed: 110 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -232,89 +232,32 @@ impl Manifestation {
232232
// The `mpsc` channel was used as we need to send many messages from one producer (download's thread) to one consumer (install's thread)
233233
// This is recommended in the official docs: https://docs.rs/tokio/latest/tokio/sync/index.html#mpsc-channel
234234
let total_components = components.len();
235-
let (download_tx, mut download_rx) =
235+
let (download_tx, download_rx) =
236236
mpsc::channel::<Result<(ComponentBinary, File)>>(total_components);
237237

238238
let semaphore = Arc::new(Semaphore::new(concurrent_downloads));
239-
let component_stream = tokio_stream::iter(components.into_iter()).map({
240-
let download_tx = download_tx.clone();
241-
{
242-
let new_manifest = Arc::clone(&new_manifest);
243-
let download_cfg = download_cfg.clone();
244-
move |bin| {
245-
let sem = semaphore.clone();
246-
let download_tx = download_tx.clone();
247-
let tmp_cx = Arc::clone(&tmp_cx);
248-
let new_manifest = Arc::clone(&new_manifest);
249-
let download_cfg = download_cfg.clone();
250-
async move {
251-
let _permit = sem.acquire().await.unwrap();
252-
let url = if altered {
253-
utils::parse_url(
254-
&bin.binary
255-
.url
256-
.replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str()),
257-
)?
258-
} else {
259-
utils::parse_url(&bin.binary.url)?
260-
};
261-
262-
let installer_file = bin
263-
.download(&url, &download_cfg, max_retries, &new_manifest)
264-
.await?;
265-
let hash = bin.binary.hash.clone();
266-
let _ = download_tx.send(Ok((bin, installer_file))).await;
267-
Ok(hash)
268-
}
269-
}
270-
}
271-
});
272239

273-
let mut stream = component_stream.buffered(components_len);
274-
let download_handle = async move {
275-
let mut hashes = Vec::new();
276-
while let Some(result) = stream.next().await {
277-
match result {
278-
Ok(hash) => {
279-
hashes.push(hash);
280-
}
281-
Err(e) => {
282-
let _ = download_tx.send(Err(e)).await;
283-
}
284-
}
285-
}
286-
hashes
287-
};
288-
let install_handle = {
289-
let new_manifest = Arc::clone(&new_manifest);
290-
let download_cfg = Arc::clone(&download_cfg);
291-
async move {
292-
let mut current_tx = tx;
293-
let mut counter = 0;
294-
while counter < total_components
295-
&& let Some(message) = download_rx.recv().await
296-
{
297-
let (component_bin, installer_file) = message?;
298-
current_tx = tokio::task::spawn_blocking({
299-
let this = Arc::clone(&self);
300-
let new_manifest = Arc::clone(&new_manifest);
301-
let download_cfg = Arc::clone(&download_cfg);
302-
move || {
303-
this.install_component(
304-
component_bin,
305-
installer_file,
306-
download_cfg,
307-
new_manifest,
308-
current_tx,
309-
)
310-
}
311-
})
312-
.await??;
313-
counter += 1;
314-
}
315-
Ok::<_, Error>(current_tx)
316-
}
317-
};
240+
let component_stream = Self::component_stream(
241+
components,
242+
download_tx.clone(),
243+
semaphore,
244+
Arc::clone(&new_manifest),
245+
Arc::clone(&download_cfg),
246+
tmp_cx.dist_server.as_str(),
247+
max_retries,
248+
altered,
249+
);
250+
251+
let stream = component_stream.buffered(components_len);
252+
let download_handle = Self::download_handle(stream, download_tx);
253+
let install_handle = Self::install_handle(
254+
Arc::clone(&self),
255+
download_rx,
256+
total_components,
257+
Arc::clone(&new_manifest),
258+
Arc::clone(&download_cfg),
259+
tx,
260+
);
318261

319262
let (download_results, install_result) = tokio::join!(download_handle, install_handle);
320263
things_downloaded = download_results;
@@ -568,6 +511,94 @@ impl Manifestation {
568511
component_bin.status.installed();
569512
tx
570513
}
514+
515+
#[allow(clippy::too_many_arguments)]
516+
fn component_stream(
517+
components: Vec<ComponentBinary>,
518+
download_tx: mpsc::Sender<Result<(ComponentBinary, File)>>,
519+
semaphore: Arc<Semaphore>,
520+
new_manifest: Arc<Manifest>,
521+
download_cfg: Arc<DownloadCfg>,
522+
dist_server: &str,
523+
max_retries: usize,
524+
altered: bool,
525+
) -> impl futures_util::Stream<Item = impl Future<Output = Result<String>>> {
526+
tokio_stream::iter(components).map({
527+
move |bin| {
528+
let sem = semaphore.clone();
529+
let download_tx = download_tx.clone();
530+
let new_manifest = Arc::clone(&new_manifest);
531+
let download_cfg = download_cfg.clone();
532+
async move {
533+
let _permit = sem.acquire().await.unwrap();
534+
let url = if altered {
535+
utils::parse_url(&bin.binary.url.replace(DEFAULT_DIST_SERVER, dist_server))?
536+
} else {
537+
utils::parse_url(&bin.binary.url)?
538+
};
539+
540+
let installer_file = bin
541+
.download(&url, &download_cfg, max_retries, &new_manifest)
542+
.await?;
543+
let hash = bin.binary.hash.clone();
544+
let _ = download_tx.send(Ok((bin, installer_file))).await;
545+
Ok(hash)
546+
}
547+
}
548+
})
549+
}
550+
551+
async fn download_handle(
552+
mut stream: impl futures_util::Stream<Item = Result<String>> + Unpin,
553+
download_tx: mpsc::Sender<Result<(ComponentBinary, File)>>,
554+
) -> Vec<String> {
555+
let mut hashes = Vec::new();
556+
while let Some(result) = stream.next().await {
557+
match result {
558+
Ok(hash) => {
559+
hashes.push(hash);
560+
}
561+
Err(e) => {
562+
let _ = download_tx.send(Err(e)).await;
563+
}
564+
}
565+
}
566+
hashes
567+
}
568+
569+
async fn install_handle(
570+
self: Arc<Manifestation>,
571+
mut download_rx: mpsc::Receiver<Result<(ComponentBinary, File)>>,
572+
total_components: usize,
573+
new_manifest: Arc<Manifest>,
574+
download_cfg: Arc<DownloadCfg>,
575+
tx: Transaction,
576+
) -> Result<Transaction> {
577+
let mut current_tx = tx;
578+
let mut counter = 0;
579+
while counter < total_components
580+
&& let Some(message) = download_rx.recv().await
581+
{
582+
let (component_bin, installer_file) = message?;
583+
current_tx = tokio::task::spawn_blocking({
584+
let this = Arc::clone(&self);
585+
let new_manifest = Arc::clone(&new_manifest);
586+
let download_cfg = Arc::clone(&download_cfg);
587+
move || {
588+
this.install_component(
589+
component_bin,
590+
installer_file,
591+
download_cfg,
592+
new_manifest,
593+
current_tx,
594+
)
595+
}
596+
})
597+
.await??;
598+
counter += 1;
599+
}
600+
Ok(current_tx)
601+
}
571602
}
572603

573604
#[derive(Debug)]

0 commit comments

Comments
 (0)