Skip to content

Commit

Permalink
chore: use an optional stream
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 committed Jan 6, 2025
1 parent 72e6f83 commit 6d4a929
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ futures-timeout = "0.1.0"
async-trait = { version = "0.1" }
async-stream = "0.3"
async-broadcast = "0.5"
pollable-map = "0.1.0-alpha.1"
pollable-map = "0.1.0"
tokio = { version = "1", features = [
"macros",
"fs",
Expand Down
14 changes: 6 additions & 8 deletions extensions/warp-ipfs/src/store/message/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use bytes::Bytes;
use either::Either;
use futures::channel::oneshot;
use futures::stream::BoxStream;
use futures::{Stream, StreamExt, TryFutureExt};
use futures::{StreamExt, TryFutureExt};
use futures_timer::Delay;
use indexmap::{IndexMap, IndexSet};
use ipld_core::cid::Cid;
use pollable_map::stream::optional::OptionalStream;
use rust_ipfs::{libp2p::gossipsub::Message, Ipfs};
use rust_ipfs::{IpfsPath, PeerId, SubscriptionStream};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -248,7 +249,7 @@ pub struct ConversationTask {
event_broadcast: tokio::sync::broadcast::Sender<MessageEventKind>,
event_subscription: EventSubscription<RayGunEventKind>,

command_rx: BoxStream<'static, ConversationTaskCommand>,
command_rx: OptionalStream<futures::channel::mpsc::Receiver<ConversationTaskCommand>>,

//TODO: replace queue
queue: HashMap<DID, Vec<QueueItem>>,
Expand Down Expand Up @@ -329,7 +330,7 @@ impl ConversationTask {
attachment_rx: arx,
event_broadcast: btx,
event_subscription,
command_rx: futures::stream::empty().boxed(),
command_rx: Default::default(),
queue: Default::default(),
terminate: ConversationTermination::default(),
};
Expand Down Expand Up @@ -385,11 +386,8 @@ impl ConversationTask {
Ok(task)
}

pub fn set_receiver(
&mut self,
st: impl Stream<Item = ConversationTaskCommand> + 'static + Send,
) {
self.command_rx = Box::pin(st);
pub fn set_receiver(&mut self, st: futures::channel::mpsc::Receiver<ConversationTaskCommand>) {
self.command_rx.replace(st);
}
}

Expand Down

0 comments on commit 6d4a929

Please sign in to comment.