@@ -7,9 +7,7 @@ mod tests;
77use std:: path:: Path ;
88
99use anyhow:: { Context , Result , anyhow, bail} ;
10- use futures_util:: stream:: StreamExt ;
11- use std:: sync:: Arc ;
12- use tokio:: sync:: Semaphore ;
10+ use futures_util:: stream:: { FuturesUnordered , StreamExt } ;
1311use tracing:: { info, warn} ;
1412
1513use crate :: dist:: component:: {
@@ -153,8 +151,6 @@ impl Manifestation {
153151 }
154152
155153 // Download component packages and validate hashes
156- let mut things_to_install = Vec :: new ( ) ;
157- let mut things_downloaded = Vec :: new ( ) ;
158154 let components = update
159155 . components_urls_and_hashes ( new_manifest)
160156 . map ( |res| {
@@ -166,7 +162,6 @@ impl Manifestation {
166162 } )
167163 . collect :: < Result < Vec < _ > > > ( ) ?;
168164
169- let components_len = components. len ( ) ;
170165 const DEFAULT_CONCURRENT_DOWNLOADS : usize = 2 ;
171166 let concurrent_downloads = download_cfg
172167 . process
@@ -181,29 +176,6 @@ impl Manifestation {
181176 . and_then ( |s| s. parse ( ) . ok ( ) )
182177 . unwrap_or ( DEFAULT_MAX_RETRIES ) ;
183178
184- info ! ( "downloading component(s)" ) ;
185- let semaphore = Arc :: new ( Semaphore :: new ( concurrent_downloads) ) ;
186- let component_stream = tokio_stream:: iter ( components. into_iter ( ) ) . map ( |bin| {
187- let sem = semaphore. clone ( ) ;
188- async move {
189- let _permit = sem. acquire ( ) . await . unwrap ( ) ;
190- bin. download ( download_cfg, max_retries, new_manifest)
191- . await
192- . map ( |downloaded| ( bin, downloaded) )
193- }
194- } ) ;
195- if components_len > 0 {
196- let results = component_stream
197- . buffered ( components_len)
198- . collect :: < Vec < _ > > ( )
199- . await ;
200- for result in results {
201- let ( bin, downloaded_file) = result?;
202- things_downloaded. push ( bin. binary . hash . clone ( ) ) ;
203- things_to_install. push ( ( bin, downloaded_file) ) ;
204- }
205- }
206-
207179 // Begin transaction
208180 let mut tx = Transaction :: new ( prefix. clone ( ) , tmp_cx, download_cfg. process ) ;
209181
@@ -242,15 +214,40 @@ impl Manifestation {
242214 tx = self . uninstall_component ( component, new_manifest, tx, download_cfg. process ) ?;
243215 }
244216
245- // Install components
246- for ( component_bin, installer_file) in things_to_install {
247- tx = component_bin. install ( installer_file, tx, new_manifest, self , download_cfg) ?;
217+ let mut downloads = FuturesUnordered :: new ( ) ;
218+ let mut component_iter = components. iter ( ) ;
219+ let mut cleanup_downloads = vec ! [ ] ;
220+ loop {
221+ while downloads. len ( ) < concurrent_downloads {
222+ if let Some ( bin) = component_iter. next ( ) {
223+ downloads. push ( async move {
224+ bin. download ( download_cfg, max_retries, new_manifest)
225+ . await
226+ . map ( |downloaded| ( bin, downloaded) )
227+ } ) ;
228+ }
229+ }
230+
231+ if downloads. is_empty ( ) {
232+ break ;
233+ }
234+
235+ let ( bin, downloaded) = match downloads. next ( ) . await {
236+ Some ( Ok ( ( bin, downloaded) ) ) => ( bin, downloaded) ,
237+ Some ( Err ( e) ) => return Err ( e) ,
238+ None => continue ,
239+ } ;
240+
241+ cleanup_downloads. push ( & bin. binary . hash ) ;
242+ tx = bin. install ( downloaded, tx, new_manifest, self , download_cfg) ?;
248243 }
249244
250245 // Install new distribution manifest
251246 let new_manifest_str = new_manifest. clone ( ) . stringify ( ) ?;
252247 tx. modify_file ( rel_installed_manifest_path) ?;
253248 utils:: write_file ( "manifest" , & installed_manifest_path, & new_manifest_str) ?;
249+ download_cfg. clean ( & cleanup_downloads) ?;
250+ drop ( downloads) ;
254251
255252 // Write configuration.
256253 //
@@ -271,8 +268,6 @@ impl Manifestation {
271268 // End transaction
272269 tx. commit ( ) ;
273270
274- download_cfg. clean ( & things_downloaded) ?;
275-
276271 Ok ( UpdateStatus :: Changed )
277272 }
278273
0 commit comments