Skip to content

Commit 5800cba

Browse files
feat(downloads): remove the mpsc channel in favor of a FuturesUnordered
As installations are still done sequentially, we do not need to have a mpsc channel and can instead resort to a `FuturesUnordered`.
1 parent 9dc7029 commit 5800cba

File tree

1 file changed

+43
-71
lines changed

1 file changed

+43
-71
lines changed

src/dist/manifestation.rs

Lines changed: 43 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ mod tests;
77
use std::path::Path;
88

99
use anyhow::{Context, Error, Result, anyhow, bail};
10-
use futures_util::stream::StreamExt;
10+
use futures_util::stream::{FuturesUnordered, StreamExt};
1111
use std::sync::Arc;
12-
use tokio::sync::{Semaphore, mpsc};
12+
use tokio::sync::Semaphore;
1313
use tracing::{info, warn};
1414
use url::Url;
1515

@@ -228,110 +228,82 @@ impl Manifestation {
228228
// The `mpsc` channel was used as we need to send many messages from one producer (download's thread) to one consumer (install's thread)
229229
// This is recommended in the official docs: https://docs.rs/tokio/latest/tokio/sync/index.html#mpsc-channel
230230
let total_components = components.len();
231-
let (download_tx, mut download_rx) =
232-
mpsc::channel::<Result<(ComponentBinary, File)>>(total_components);
233231

234-
#[allow(clippy::too_many_arguments)]
235-
fn component_stream(
232+
fn create_download_futures(
236233
components: Vec<ComponentBinary>,
237234
semaphore: Arc<Semaphore>,
238-
download_tx: mpsc::Sender<Result<(ComponentBinary, File)>>,
239235
altered: bool,
240236
dist_server: &str,
241237
download_cfg: &DownloadCfg,
242238
max_retries: usize,
243239
new_manifest: &Manifest,
244-
) -> impl futures_util::Stream<Item = impl Future<Output = Result<String>>>
240+
) -> FuturesUnordered<impl Future<Output = Result<(ComponentBinary, File, String)>>>
245241
{
246-
tokio_stream::iter(components).map(move |bin| {
242+
let futures = FuturesUnordered::new();
243+
for bin in components {
247244
let sem = semaphore.clone();
248-
let download_tx = download_tx.clone();
249-
async move {
245+
let dist_server = dist_server.to_string();
246+
let download_cfg = download_cfg.clone();
247+
let new_manifest = new_manifest.clone();
248+
249+
let future = async move {
250250
let _permit = sem.acquire().await.unwrap();
251251
let url = if altered {
252252
utils::parse_url(
253-
&bin.binary.url.replace(DEFAULT_DIST_SERVER, dist_server),
253+
&bin.binary.url.replace(DEFAULT_DIST_SERVER, &dist_server),
254254
)?
255255
} else {
256256
utils::parse_url(&bin.binary.url)?
257257
};
258258

259259
let installer_file = bin
260-
.download(&url, download_cfg, max_retries, new_manifest)
260+
.download(&url, &download_cfg, max_retries, &new_manifest)
261261
.await?;
262262
let hash = bin.binary.hash.clone();
263-
let _ = download_tx.send(Ok((bin, installer_file))).await;
264-
Ok(hash)
265-
}
266-
})
267-
}
268-
269-
async fn download_handle(
270-
mut stream: impl futures_util::Stream<Item = Result<String>> + Unpin,
271-
download_tx: mpsc::Sender<Result<(ComponentBinary, File)>>,
272-
) -> Vec<String> {
273-
let mut hashes = Vec::new();
274-
while let Some(result) = stream.next().await {
275-
match result {
276-
Ok(hash) => {
277-
hashes.push(hash);
278-
}
279-
Err(e) => {
280-
let _ = download_tx.send(Err(e)).await;
281-
}
282-
}
263+
Ok((bin, installer_file, hash))
264+
};
265+
futures.push(future);
283266
}
284-
hashes
267+
futures
285268
}
286269

287270
let semaphore = Arc::new(Semaphore::new(concurrent_downloads));
288-
let component_stream = component_stream(
271+
let mut download_stream = create_download_futures(
289272
components,
290273
semaphore,
291-
download_tx.clone(),
292274
altered,
293275
tmp_cx.dist_server.as_str(),
294276
&download_cfg,
295277
max_retries,
296278
&new_manifest,
297279
);
298280

299-
let stream = component_stream.buffered(components_len);
300-
let download_handle = download_handle(stream, download_tx.clone());
301-
let install_handle = {
302-
let new_manifest = new_manifest.clone();
303-
let download_cfg = download_cfg.clone();
304-
async move {
305-
let mut current_tx = tx;
306-
let mut counter = 0;
307-
while counter < total_components
308-
&& let Some(message) = download_rx.recv().await
309-
{
310-
let (component_bin, installer_file) = message?;
311-
current_tx = tokio::task::spawn_blocking({
312-
let this = self.clone();
313-
let new_manifest = new_manifest.clone();
314-
let download_cfg = download_cfg.clone();
315-
move || {
316-
component_bin.install(
317-
installer_file,
318-
current_tx,
319-
&new_manifest,
320-
&this,
321-
&download_cfg,
322-
)
323-
}
324-
})
325-
.await??;
326-
counter += 1;
327-
}
328-
Ok::<_, Error>(current_tx)
281+
let mut counter = 0;
282+
while counter < total_components {
283+
if let Some(result) = download_stream.next().await {
284+
let (component_bin, installer_file, hash) = result?;
285+
things_downloaded.push(hash);
286+
287+
tx = tokio::task::spawn_blocking({
288+
let this = self.clone();
289+
let new_manifest = new_manifest.clone();
290+
let download_cfg = download_cfg.clone();
291+
move || {
292+
component_bin.install(
293+
installer_file,
294+
tx,
295+
&new_manifest,
296+
&this,
297+
&download_cfg,
298+
)
299+
}
300+
})
301+
.await??;
302+
counter += 1;
303+
} else {
304+
break;
329305
}
330-
};
331-
332-
let (download_results, install_result) = tokio::join!(download_handle, install_handle);
333-
things_downloaded = download_results;
334-
tx = install_result?;
306+
}
335307
}
336308

337309
// Install new distribution manifest

0 commit comments

Comments
 (0)