Open
Description
Currently dask workers only have two cores and four threads by default. Since quilt uses a ThreadPoolExecutor to download files this function running in a dask worker severely limits how quickly files will be downloaded.
If we re-configure the combination of {loader}.get_data
and the actually fetching to basically be {loader}.get_data
returns a list partial functions to call and then map out those partial functions the dask workers can each take a file to download instead of a single worker being used to download everything.
In pseudo-code with a lof of metadata handling removed:
def quilt.get_file_fetchers(save_dir, protein_list, n_fovs, overwrite):
package = quilt3.Package.browse("aics/pipeline_integrated_cell", "s3://allencell")
fetchers = []
for protein in protein_list:
for i in range(n_fovs):
fetchers.append(
# this is wrong but something like this
partial(package[protein][i].fetch, save_path=save_dir / package[protein][i].name)
)
return fetchers
@task
def run_fetcher(fetcher):
return fetcher()
@task
def get_save_load_data_functions(
save_dir=None,
protein_list=None,
n_fovs=100,
overwrite=False,
dataset="quilt"
) -> List[Callable]:
if dataset == "quilt":
return quilt.get_file_fetchers(save_dir, protein_list, n_fovs, overwrite)
return labkey.get_file_fetchers(save_dir, protein_list, n_fovs, overwrite)
with Flow() as flow:
data_fetchers = get_save_load_data_functions(**kwargs)
save_paths = run_fetcher.map(data_fetchers)