Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions src/api/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct Downloader {
#[rpc_requests(message = SwarmMsg, alias = "Msg")]
#[derive(Debug, Serialize, Deserialize)]
enum SwarmProtocol {
#[rpc(tx = mpsc::Sender<DownloadProgessItem>)]
#[rpc(tx = mpsc::Sender<DownloadProgressItem>)]
Download(DownloadRequest),
}

Expand All @@ -46,7 +46,7 @@ struct DownloaderActor {
}

#[derive(Debug, Serialize, Deserialize)]
pub enum DownloadProgessItem {
pub enum DownloadProgressItem {
#[serde(skip)]
Error(anyhow::Error),
TryProvider {
Expand Down Expand Up @@ -98,15 +98,15 @@ impl DownloaderActor {
async fn handle_download(store: Store, pool: ConnectionPool, msg: DownloadMsg) {
let DownloadMsg { inner, mut tx, .. } = msg;
if let Err(cause) = handle_download_impl(store, pool, inner, &mut tx).await {
tx.send(DownloadProgessItem::Error(cause)).await.ok();
tx.send(DownloadProgressItem::Error(cause)).await.ok();
}
}

async fn handle_download_impl(
store: Store,
pool: ConnectionPool,
request: DownloadRequest,
tx: &mut mpsc::Sender<DownloadProgessItem>,
tx: &mut mpsc::Sender<DownloadProgressItem>,
) -> anyhow::Result<()> {
match request.strategy {
SplitStrategy::Split => handle_download_split_impl(store, pool, request, tx).await?,
Expand All @@ -127,7 +127,7 @@ async fn handle_download_split_impl(
store: Store,
pool: ConnectionPool,
request: DownloadRequest,
tx: &mut mpsc::Sender<DownloadProgessItem>,
tx: &mut mpsc::Sender<DownloadProgressItem>,
) -> anyhow::Result<()> {
let providers = request.providers;
let requests = split_request(&request.request, &providers, &pool, &store, Drain).await?;
Expand All @@ -140,7 +140,7 @@ async fn handle_download_split_impl(
let progress_tx = progress_tx.clone();
async move {
let hash = request.hash;
let (tx, rx) = tokio::sync::mpsc::channel::<(usize, DownloadProgessItem)>(16);
let (tx, rx) = tokio::sync::mpsc::channel::<(usize, DownloadProgressItem)>(16);
progress_tx.send(rx).await.ok();
let sink = TokioMpscSenderSink(tx).with_map(move |x| (id, x));
let res = execute_get(&pool, Arc::new(request), &providers, &store, sink).await;
Expand All @@ -154,12 +154,12 @@ async fn handle_download_split_impl(
into_stream(progress_rx)
.flat_map(into_stream)
.map(move |(id, item)| match item {
DownloadProgessItem::Progress(offset) => {
DownloadProgressItem::Progress(offset) => {
total += offset;
if let Some(prev) = offsets.insert(id, offset) {
total -= prev;
}
DownloadProgessItem::Progress(total)
DownloadProgressItem::Progress(total)
}
x => x,
})
Expand All @@ -174,7 +174,7 @@ async fn handle_download_split_impl(
Some((_hash, Ok(()))) => {
}
Some((_hash, Err(_e))) => {
tx.send(DownloadProgessItem::DownloadError).await?;
tx.send(DownloadProgressItem::DownloadError).await?;
}
None => break,
}
Expand Down Expand Up @@ -298,19 +298,19 @@ impl<'de> Deserialize<'de> for DownloadRequest {
pub type DownloadOptions = DownloadRequest;

pub struct DownloadProgress {
fut: future::Boxed<irpc::Result<mpsc::Receiver<DownloadProgessItem>>>,
fut: future::Boxed<irpc::Result<mpsc::Receiver<DownloadProgressItem>>>,
}

impl DownloadProgress {
fn new(fut: future::Boxed<irpc::Result<mpsc::Receiver<DownloadProgessItem>>>) -> Self {
fn new(fut: future::Boxed<irpc::Result<mpsc::Receiver<DownloadProgressItem>>>) -> Self {
Self { fut }
}

pub async fn stream(self) -> irpc::Result<impl Stream<Item = DownloadProgessItem> + Unpin> {
pub async fn stream(self) -> irpc::Result<impl Stream<Item = DownloadProgressItem> + Unpin> {
let rx = self.fut.await?;
Ok(Box::pin(rx.into_stream().map(|item| match item {
Ok(item) => item,
Err(e) => DownloadProgessItem::Error(e.into()),
Err(e) => DownloadProgressItem::Error(e.into()),
})))
}

Expand All @@ -320,8 +320,8 @@ impl DownloadProgress {
tokio::pin!(stream);
while let Some(item) = stream.next().await {
match item? {
DownloadProgessItem::Error(e) => Err(e)?,
DownloadProgessItem::DownloadError => anyhow::bail!("Download error"),
DownloadProgressItem::Error(e) => Err(e)?,
DownloadProgressItem::DownloadError => anyhow::bail!("Download error"),
_ => {}
}
}
Expand Down Expand Up @@ -372,7 +372,7 @@ async fn split_request<'a>(
providers: &Arc<dyn ContentDiscovery>,
pool: &ConnectionPool,
store: &Store,
progress: impl Sink<DownloadProgessItem, Error = irpc::channel::SendError>,
progress: impl Sink<DownloadProgressItem, Error = irpc::channel::SendError>,
) -> anyhow::Result<Box<dyn Iterator<Item = GetRequest> + Send + 'a>> {
Ok(match request {
FiniteRequest::Get(req) => {
Expand Down Expand Up @@ -428,13 +428,13 @@ async fn execute_get(
request: Arc<GetRequest>,
providers: &Arc<dyn ContentDiscovery>,
store: &Store,
mut progress: impl Sink<DownloadProgessItem, Error = irpc::channel::SendError>,
mut progress: impl Sink<DownloadProgressItem, Error = irpc::channel::SendError>,
) -> anyhow::Result<()> {
let remote = store.remote();
let mut providers = providers.find_providers(request.content());
while let Some(provider) = providers.next().await {
progress
.send(DownloadProgessItem::TryProvider {
.send(DownloadProgressItem::TryProvider {
id: provider,
request: request.clone(),
})
Expand All @@ -447,7 +447,7 @@ async fn execute_get(
let local_bytes = local.local_bytes();
let Ok(conn) = conn.await else {
progress
.send(DownloadProgessItem::ProviderFailed {
.send(DownloadProgressItem::ProviderFailed {
id: provider,
request: request.clone(),
})
Expand All @@ -458,21 +458,21 @@ async fn execute_get(
.execute_get_sink(
&conn,
local.missing(),
(&mut progress).with_map(move |x| DownloadProgessItem::Progress(x + local_bytes)),
(&mut progress).with_map(move |x| DownloadProgressItem::Progress(x + local_bytes)),
)
.await
{
Ok(_stats) => {
progress
.send(DownloadProgessItem::PartComplete {
.send(DownloadProgressItem::PartComplete {
request: request.clone(),
})
.await?;
return Ok(());
}
Err(_cause) => {
progress
.send(DownloadProgessItem::ProviderFailed {
.send(DownloadProgressItem::ProviderFailed {
id: provider,
request: request.clone(),
})
Expand Down
Loading