diff --git a/Cargo.toml b/Cargo.toml index 7dcd40edb..18e37ec3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,7 +81,7 @@ either = "1" void = "1" #ipfs dependency -rust-ipfs = "0.7.1" +rust-ipfs = "0.8.0" # Blink related crates diff --git a/README.md b/README.md index bd36e8830..e7e215b6f 100644 --- a/README.md +++ b/README.md @@ -4,11 +4,11 @@ Interface Driven Distributed Data Service ### Overview -Warp can run as a single binary, providing an interface into the core technologies that run Satellite. This allows us to avoid rewriting the same tech over and over when developing for different platforms. Warp will work on most phones, tablets, computers, and even some consoles. +Warp can run as a single binary, providing an interface into the core technologies that run Satellite. This allows us to avoid rewriting the same tech over and over when developing for different platforms. Warp will work on most phones, tablets, computers, and consoles. -It provides abstractions to many different modules which are required to run Satellite. These modules include Messaging, Caching, File Sharing & Storage, RTC connections, and more. Because we focus on building these modules as interfaces first, then allow implementation layers to be built on top of these, it allows us to change the core technologies easily with no extra development required on the "front-end" stacks. This means we can jump from multiple blockchains or even some other type of decentralized solution without affecting the front-end application. +It provides abstractions to many different modules which are required to run Satellite. These modules include Messaging, Caching, File Sharing & Storage, RTC connections, and more. Because we focus on building these modules as interfaces first and then allow implementation layers to be built on top of these, we can easily change the core technologies with no extra development required on the "front-end" stacks. This means we can jump from multiple blockchains or decentralized solutions without affecting the front-end application. -Additionally, libraries to interface with Warp (will) exist in JavaScript (TypeScript), Java, Python, and more. So you can easily develop your own platforms and integrations on top of the Satellite tech stack. Lastly, a REST API service can be enabled for Warp, however, it should never be exposed outside of localhost. +Additionally, libraries to interface with Warp (will) exist in JavaScript (TypeScript), Java, Python, and more. So you can quickly develop your platforms and integrations on top of the Satellite tech stack. Lastly, a REST API service can be enabled for Warp. However, it should never be exposed outside of localhost. ### Build Requirement diff --git a/extensions/warp-blink-wrtc/src/host_media/mp4_logger/mod.rs b/extensions/warp-blink-wrtc/src/host_media/mp4_logger/mod.rs index 428ef02f2..9e83fb56b 100644 --- a/extensions/warp-blink-wrtc/src/host_media/mp4_logger/mod.rs +++ b/extensions/warp-blink-wrtc/src/host_media/mp4_logger/mod.rs @@ -480,7 +480,7 @@ fn write_mp4_header( // there is no redundant coding in this sample // padding: 0 // sample_is_non_sync_sample ... set this to 1? - // sample_degredation_priority + // sample_degradation_priority flags: 0, //(2 << 26) | (2 << 24) | (2 << 22) | (2 << 20), track_id: *track_id, // stsd entry 1 is for Opus diff --git a/extensions/warp-ipfs/Cargo.toml b/extensions/warp-ipfs/Cargo.toml index 3b066bcf5..9d3a51b17 100644 --- a/extensions/warp-ipfs/Cargo.toml +++ b/extensions/warp-ipfs/Cargo.toml @@ -42,17 +42,15 @@ bytes.workspace = true shuttle = { path = "../../tools/shuttle" } [dev-dependencies] -fdlimit = "0.2" -bs58 = "0.4" -rustyline-async = "0.3" -comfy-table = "6.1" +fdlimit = "0.3" +rustyline-async = "0.4" +comfy-table = "7.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-appender = "0.2" clap = { version = "4.0", features = ["derive"] } -dirs = "4.0" -rpassword = "7.2" +rpassword = "7.3" [features] diff --git a/extensions/warp-ipfs/examples/identity-interface.rs b/extensions/warp-ipfs/examples/identity-interface.rs index 1fdb0ea69..d9ac3a470 100644 --- a/extensions/warp-ipfs/examples/identity-interface.rs +++ b/extensions/warp-ipfs/examples/identity-interface.rs @@ -2,7 +2,7 @@ use clap::Parser; use comfy_table::Table; use futures::prelude::*; use rust_ipfs::Multiaddr; -use rustyline_async::{Readline, ReadlineError}; +use rustyline_async::Readline; use std::io::Write; use std::path::PathBuf; use std::str::FromStr; @@ -151,9 +151,7 @@ async fn account( #[tokio::main] async fn main() -> anyhow::Result<()> { let opt = Opt::parse(); - if fdlimit::raise_fd_limit().is_none() { - //raising fd limit - } + _ = fdlimit::raise_fd_limit().is_ok(); let file_appender = match &opt.path { Some(path) => tracing_appender::rolling::hourly(path, "warp_mp_identity_interface.log"), @@ -361,7 +359,7 @@ async fn main() -> anyhow::Result<()> { } } line = rl.readline().fuse() => match line { - Ok(line) => { + Ok(rustyline_async::ReadlineEvent::Line(line)) => { rl.add_history_entry(line.clone()); let mut cmd_line = line.trim().split(' '); match cmd_line.next() { @@ -792,8 +790,7 @@ async fn main() -> anyhow::Result<()> { _ => continue } }, - Err(ReadlineError::Interrupted) => break, - Err(ReadlineError::Eof) => break, + Ok(rustyline_async::ReadlineEvent::Eof) | Ok(rustyline_async::ReadlineEvent::Interrupted) => break, Err(e) => { writeln!(stdout, "Error: {e}")?; } diff --git a/extensions/warp-ipfs/examples/ipfs-persistent.rs b/extensions/warp-ipfs/examples/ipfs-persistent.rs index 2f13a8c50..b164f171d 100644 --- a/extensions/warp-ipfs/examples/ipfs-persistent.rs +++ b/extensions/warp-ipfs/examples/ipfs-persistent.rs @@ -86,9 +86,7 @@ async fn setup_persistent>( #[tokio::main] async fn main() -> anyhow::Result<()> { let opt = Opt::parse(); - if fdlimit::raise_fd_limit().is_none() { - //raising fd limit - } + _ = fdlimit::raise_fd_limit().is_ok(); let (_, mut filesystem) = setup_persistent(None, opt.path.clone(), &opt).await?; diff --git a/extensions/warp-ipfs/examples/messenger.rs b/extensions/warp-ipfs/examples/messenger.rs index 81ecfb775..abf26d2cd 100644 --- a/extensions/warp-ipfs/examples/messenger.rs +++ b/extensions/warp-ipfs/examples/messenger.rs @@ -2,7 +2,7 @@ use clap::Parser; use comfy_table::Table; use futures::prelude::*; use rust_ipfs::Multiaddr; -use rustyline_async::{Readline, ReadlineError, SharedWriter}; +use rustyline_async::{Readline, SharedWriter}; use std::collections::HashMap; use std::io::Write; use std::path::{Path, PathBuf}; @@ -147,9 +147,7 @@ async fn setup>( #[tokio::main] async fn main() -> anyhow::Result<()> { let opt = Opt::parse(); - if fdlimit::raise_fd_limit().is_none() { - //raising fd limit - } + _ = fdlimit::raise_fd_limit().is_ok(); let mut _log_guard = None; @@ -305,7 +303,8 @@ async fn main() -> anyhow::Result<()> { } } line = rl.readline().fuse() => match line { - Ok(line) => { + Ok(rustyline_async::ReadlineEvent::Line(line)) => { + rl.add_history_entry(line.clone()); let mut cmd_line = line.trim().split(' '); match cmd_line.next() { Some("/create") => { @@ -1093,8 +1092,7 @@ async fn main() -> anyhow::Result<()> { } } }, - Err(ReadlineError::Interrupted) => break, - Err(ReadlineError::Eof) => break, + Ok(rustyline_async::ReadlineEvent::Eof) | Ok(rustyline_async::ReadlineEvent::Interrupted) => break, Err(e) => { writeln!(stdout, "Error: {e}")?; } diff --git a/extensions/warp-ipfs/src/config.rs b/extensions/warp-ipfs/src/config.rs index 428868d3e..99305c2fd 100644 --- a/extensions/warp-ipfs/src/config.rs +++ b/extensions/warp-ipfs/src/config.rs @@ -113,101 +113,37 @@ impl Default for RelayClient { relay_address: vec![ //NYC-1 "/ip4/146.190.184.59/tcp/4001/p2p/12D3KooWCHWLQXTR2N6ukWM99pZYc4TM82VS7eVaDE4Ryk8ked8h".parse().unwrap(), + "/ip4/146.190.184.59/udp/4001/quic-v1/p2p/12D3KooWCHWLQXTR2N6ukWM99pZYc4TM82VS7eVaDE4Ryk8ked8h".parse().unwrap(), //SF-1 + "/ip4/64.225.88.100/udp/4001/quic-v1/p2p/12D3KooWMfyuTCbehQYy68zPH6vpGUwg8raKbrS7pd3qZrG7bFuB".parse().unwrap(), "/ip4/64.225.88.100/tcp/4001/p2p/12D3KooWMfyuTCbehQYy68zPH6vpGUwg8raKbrS7pd3qZrG7bFuB".parse().unwrap(), //NYC-1-EXP + "/ip4/24.199.86.91/tcp/46315/p2p/12D3KooWQcyxuNXxpiM7xyoXRZC7Vhfbh2yCtRg272CerbpFkhE6".parse().unwrap(), "/ip4/24.199.86.91/tcp/46315/p2p/12D3KooWQcyxuNXxpiM7xyoXRZC7Vhfbh2yCtRg272CerbpFkhE6".parse().unwrap() ] } } } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Swarm { - /// Concurrent dial factor - pub dial_factor: u8, - pub notify_buffer_size: usize, - pub connection_buffer_size: usize, - pub limit: Option, -} - -impl Default for Swarm { - fn default() -> Self { - Self { - dial_factor: 8, //Same dial factor as default for libp2p - notify_buffer_size: 32, - connection_buffer_size: 1024, - limit: None, - } - } -} - -#[derive(Default, Debug, Clone, Serialize, Deserialize)] -pub struct ConnectionLimit { - pub max_pending_incoming: Option, - pub max_pending_outgoing: Option, - pub max_established_incoming: Option, - pub max_established_outgoing: Option, - pub max_established: Option, - pub max_established_per_peer: Option, -} - -impl ConnectionLimit { - pub fn testing() -> Self { - Self { - max_pending_incoming: Some(10), - max_pending_outgoing: Some(10), - max_established_incoming: Some(32), - max_established_outgoing: Some(32), - max_established: None, - max_established_per_peer: None, - } - } - - pub fn minimal() -> Self { - Self { - max_pending_incoming: Some(128), - max_pending_outgoing: Some(128), - max_established_incoming: Some(128), - max_established_outgoing: Some(128), - max_established: None, - max_established_per_peer: None, - } - } - - pub fn recommended() -> Self { - Self { - max_pending_incoming: Some(512), - max_pending_outgoing: Some(512), - max_established_incoming: Some(512), - max_established_outgoing: Some(512), - max_established: None, - max_established_per_peer: None, - } - } - - pub fn maximum() -> Self { - Self { - max_pending_incoming: Some(512), - max_pending_outgoing: Some(512), - max_established_incoming: Some(1024), - max_established_outgoing: Some(1024), - max_established: None, - max_established_per_peer: None, - } - } +// #[derive(Debug, Clone, Serialize, Deserialize)] +// pub struct Swarm { +// /// Concurrent dial factor +// pub dial_factor: u8, +// pub notify_buffer_size: usize, +// pub connection_buffer_size: usize, +// pub limit: Option, +// } - pub fn unrestricted() -> Self { - Self { - max_pending_incoming: None, - max_pending_outgoing: None, - max_established_incoming: None, - max_established_outgoing: None, - max_established: None, - max_established_per_peer: None, - } - } -} +// impl Default for Swarm { +// fn default() -> Self { +// Self { +// dial_factor: 8, //Same dial factor as default for libp2p +// notify_buffer_size: 32, +// connection_buffer_size: 1024, +// limit: None, +// } +// } +// } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Pubsub { @@ -227,13 +163,13 @@ pub struct IpfsSetting { pub mdns: Mdns, pub relay_client: RelayClient, pub pubsub: Pubsub, - pub swarm: Swarm, pub bootstrap: bool, pub portmapping: bool, pub agent_version: Option, /// Used for testing with a memory transport pub memory_transport: bool, pub dht_client: bool, + pub disable_quic: bool, } #[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)] @@ -363,7 +299,7 @@ impl Default for Config { path: None, network: Network::Ipfs, bootstrap: Bootstrap::Ipfs, - listen_on: ["/ip4/0.0.0.0/tcp/0", "/ip6/::/tcp/0"] + listen_on: ["/ip4/0.0.0.0/tcp/0", "/ip4/0.0.0.0/udp/0/quic-v1"] .iter() .filter_map(|s| Multiaddr::from_str(s).ok()) .collect::>(), @@ -375,7 +311,7 @@ impl Default for Config { store_setting: Default::default(), enable_relay: false, save_phrase: false, - max_storage_size: Some(1024 * 1024 * 1024), + max_storage_size: Some(10 * 1024 * 1024 * 1024), max_file_size: Some(50 * 1024 * 1024), thumbnail_size: (128, 128), chunking: None, diff --git a/extensions/warp-ipfs/src/lib.rs b/extensions/warp-ipfs/src/lib.rs index 0546940c5..ace272c5f 100644 --- a/extensions/warp-ipfs/src/lib.rs +++ b/extensions/warp-ipfs/src/lib.rs @@ -15,8 +15,8 @@ use ipfs::libp2p::core::transport::{Boxed, MemoryTransport, OrTransport}; use ipfs::libp2p::core::upgrade::Version; use ipfs::libp2p::Transport; use ipfs::p2p::{ - ConnectionLimits, IdentifyConfiguration, KadConfig, KadInserts, MultiaddrExt, PubsubConfig, - TransportConfig, UpdateMode, + IdentifyConfiguration, KadConfig, KadInserts, MultiaddrExt, PubsubConfig, TransportConfig, + UpdateMode, }; use rust_ipfs as ipfs; @@ -229,41 +229,6 @@ impl WarpIpfs { warn!("Bootstrap list is empty. Will not be able to perform a bootstrap for DHT"); } - let swarm_config = config.ipfs_setting.swarm.clone(); - - let mut swarm_configuration = ipfs::p2p::SwarmConfig { - dial_concurrency_factor: swarm_config - .dial_factor - .try_into() - .unwrap_or_else(|_| 8.try_into().expect("8 > 0")), - notify_handler_buffer_size: swarm_config - .notify_buffer_size - .try_into() - .unwrap_or_else(|_| 32.try_into().expect("32 > 0")), - connection_event_buffer_size: if swarm_config.connection_buffer_size > 0 { - swarm_config.connection_buffer_size - } else { - 32 - }, - connection: ConnectionLimits::default(), - ..Default::default() - }; - - if let Some(limit) = swarm_config.limit { - swarm_configuration.connection = ConnectionLimits::default() - .with_max_pending_outgoing(limit.max_pending_outgoing) - .with_max_pending_incoming(limit.max_pending_incoming) - .with_max_established_incoming(limit.max_established_incoming) - .with_max_established_outgoing(limit.max_established_outgoing) - .with_max_established(limit.max_established) - .with_max_established_per_peer(limit.max_established_per_peer); - - info!( - "Connection configuration: {:?}", - swarm_configuration.connection - ); - } - let (pb_tx, pb_rx) = channel(50); let (id_sh_tx, id_sh_rx) = futures::channel::mpsc::channel(1); @@ -285,16 +250,6 @@ impl WarpIpfs { idconfig })) .with_bitswap(None) - .with_kademlia( - Some(either::Either::Left(KadConfig { - query_timeout: std::time::Duration::from_secs(60), - publication_interval: Some(Duration::from_secs(30 * 60)), - provider_record_ttl: Some(Duration::from_secs(60 * 60)), - insert_method: KadInserts::Manual, - ..Default::default() - })), - Default::default(), - ) .with_ping(None) .with_pubsub(Some(PubsubConfig { max_transmit_size: config.ipfs_setting.pubsub.max_transmit_size, @@ -306,13 +261,10 @@ impl WarpIpfs { .set_keypair(keypair) .with_rendezvous_client() .set_transport_configuration(TransportConfig { - yamux_receive_window_size: 256 * 1024, - yamux_max_buffer_size: 1024 * 1024, yamux_update_mode: UpdateMode::Read, + enable_quic: !config.ipfs_setting.disable_quic, ..Default::default() - }) - .listen_as_external_addr() - .set_swarm_configuration(swarm_configuration); + }); if let Some(path) = self.config.path.as_ref() { info!("Instance will be persistent"); @@ -325,36 +277,51 @@ impl WarpIpfs { uninitialized = uninitialized.set_path(path); } - if config.ipfs_setting.bootstrap { - for addr in config.bootstrap.address() { - uninitialized = uninitialized.add_bootstrap(addr); + if config.store_setting.discovery != config::Discovery::None { + uninitialized = uninitialized.with_kademlia( + Some(either::Either::Left(KadConfig { + query_timeout: std::time::Duration::from_secs(60), + publication_interval: Some(Duration::from_secs(30 * 60)), + provider_record_ttl: Some(Duration::from_secs(60 * 60)), + insert_method: KadInserts::Manual, + ..Default::default() + })), + Default::default(), + ); + + if config.ipfs_setting.bootstrap { + for addr in config.bootstrap.address() { + uninitialized = uninitialized.add_bootstrap(addr); + } } } if config.ipfs_setting.memory_transport { - uninitialized = uninitialized.with_custom_transport(Box::new( - |keypair, relay| -> std::io::Result> { - let noise_config = rust_ipfs::libp2p::noise::Config::new(keypair) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; - - let transport = match relay { - Some(relay) => OrTransport::new(relay, MemoryTransport::default()) - .upgrade(Version::V1) - .authenticate(noise_config) - .multiplex(rust_ipfs::libp2p::yamux::Config::default()) - .timeout(Duration::from_secs(20)) - .boxed(), - None => MemoryTransport::default() - .upgrade(Version::V1) - .authenticate(noise_config) - .multiplex(rust_ipfs::libp2p::yamux::Config::default()) - .timeout(Duration::from_secs(20)) - .boxed(), - }; - - Ok(transport) - }, - )); + uninitialized = uninitialized + .with_custom_transport(Box::new( + |keypair, relay| -> std::io::Result> { + let noise_config = rust_ipfs::libp2p::noise::Config::new(keypair) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + + let transport = match relay { + Some(relay) => OrTransport::new(relay, MemoryTransport::default()) + .upgrade(Version::V1) + .authenticate(noise_config) + .multiplex(rust_ipfs::libp2p::yamux::Config::default()) + .timeout(Duration::from_secs(20)) + .boxed(), + None => MemoryTransport::default() + .upgrade(Version::V1) + .authenticate(noise_config) + .multiplex(rust_ipfs::libp2p::yamux::Config::default()) + .timeout(Duration::from_secs(20)) + .boxed(), + }; + + Ok(transport) + }, + )) + .listen_as_external_addr(); } if config.ipfs_setting.portmapping { @@ -429,15 +396,33 @@ impl WarpIpfs { break; } - if config.ipfs_setting.dht_client { + if config.ipfs_setting.dht_client + && config.store_setting.discovery != config::Discovery::None + { ipfs.dht_mode(DhtMode::Client).await?; } - if config.ipfs_setting.bootstrap && !empty_bootstrap { - //TODO: determine if bootstrap should run in intervals - if let Err(e) = ipfs.bootstrap().await { - error!("Error bootstrapping: {e}"); - } + if config.store_setting.discovery != config::Discovery::None + && config.ipfs_setting.bootstrap + && !empty_bootstrap + { + tokio::spawn({ + let ipfs = ipfs.clone(); + async move { + loop { + match ipfs.bootstrap().await { + Ok(task) => { + if let Err(e) = task.await { + error!("Failed to bootstrap: {e}"); + } + } + Err(e) => error!("Failed to bootstrap: {e}"), + }; + + tokio::time::sleep(Duration::from_secs(60 * 5)).await; + } + } + }); } let relays = ipfs diff --git a/tools/audio-codec-repl/src/encode.rs b/tools/audio-codec-repl/src/encode.rs index 421369c96..f8cc21854 100644 --- a/tools/audio-codec-repl/src/encode.rs +++ b/tools/audio-codec-repl/src/encode.rs @@ -137,7 +137,7 @@ pub fn f32_mp4( // there is no redundant coding in this sample // padding: 0 // sample_is_non_sync_sample ... set this to 1? - // sample_degredation_priority + // sample_degradation_priority flags: 0, //(2 << 26) | (2 << 24) | (2 << 22) | (2 << 20), track_id: 1, // stsd entry 1 is for Opus diff --git a/warp/src/constellation/mod.rs b/warp/src/constellation/mod.rs index f79560522..354f29f6f 100644 --- a/warp/src/constellation/mod.rs +++ b/warp/src/constellation/mod.rs @@ -12,6 +12,7 @@ use chrono::{DateTime, Utc}; use directory::Directory; use dyn_clone::DynClone; +use futures::Stream; use futures::stream::BoxStream; use item::Item; @@ -40,16 +41,13 @@ pub enum ConstellationEventKind { pub struct ConstellationEventStream(pub BoxStream<'static, ConstellationEventKind>); -impl core::ops::Deref for ConstellationEventStream { - type Target = BoxStream<'static, ConstellationEventKind>; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl core::ops::DerefMut for ConstellationEventStream { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 +impl Stream for ConstellationEventStream { + type Item = ConstellationEventKind; + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.0.as_mut().poll_next(cx) } } @@ -86,16 +84,13 @@ pub enum Progression { pub struct ConstellationProgressStream(pub BoxStream<'static, Progression>); -impl core::ops::Deref for ConstellationProgressStream { - type Target = BoxStream<'static, Progression>; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl core::ops::DerefMut for ConstellationProgressStream { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 +impl Stream for ConstellationProgressStream { + type Item = Progression; + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.0.as_mut().poll_next(cx) } } diff --git a/warp/src/multipass/mod.rs b/warp/src/multipass/mod.rs index de430ac31..827fd79ab 100644 --- a/warp/src/multipass/mod.rs +++ b/warp/src/multipass/mod.rs @@ -6,6 +6,7 @@ use std::path::PathBuf; use dyn_clone::DynClone; use futures::stream::BoxStream; +use futures::Stream; use serde::{Deserialize, Serialize}; use warp_derive::FFIFree; @@ -66,16 +67,13 @@ pub enum IdentityImportOption<'a> { #[derive(FFIFree)] pub struct MultiPassEventStream(pub BoxStream<'static, MultiPassEventKind>); -impl core::ops::Deref for MultiPassEventStream { - type Target = BoxStream<'static, MultiPassEventKind>; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl core::ops::DerefMut for MultiPassEventStream { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 +impl Stream for MultiPassEventStream { + type Item = MultiPassEventKind; + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.0.as_mut().poll_next(cx) } } diff --git a/warp/src/raygun/mod.rs b/warp/src/raygun/mod.rs index 2937b0cf2..8fa7dcb5c 100644 --- a/warp/src/raygun/mod.rs +++ b/warp/src/raygun/mod.rs @@ -9,6 +9,7 @@ use crate::{Extension, SingleHandle}; use derive_more::Display; use dyn_clone::DynClone; use futures::stream::BoxStream; +use futures::Stream; use warp_derive::FFIFree; use chrono::{DateTime, Utc}; @@ -128,32 +129,26 @@ pub enum AttachmentKind { #[derive(FFIFree)] pub struct AttachmentEventStream(pub BoxStream<'static, AttachmentKind>); -impl core::ops::Deref for AttachmentEventStream { - type Target = BoxStream<'static, AttachmentKind>; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl core::ops::DerefMut for AttachmentEventStream { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 +impl Stream for AttachmentEventStream { + type Item = AttachmentKind; + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.0.as_mut().poll_next(cx) } } #[derive(FFIFree)] pub struct MessageEventStream(pub BoxStream<'static, MessageEventKind>); -impl core::ops::Deref for MessageEventStream { - type Target = BoxStream<'static, MessageEventKind>; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl core::ops::DerefMut for MessageEventStream { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 +impl Stream for MessageEventStream { + type Item = MessageEventKind; + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.0.as_mut().poll_next(cx) } } @@ -166,16 +161,13 @@ impl Debug for MessageStream { } } -impl core::ops::Deref for MessageStream { - type Target = BoxStream<'static, Message>; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl core::ops::DerefMut for MessageStream { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 +impl Stream for MessageStream { + type Item = Message; + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.0.as_mut().poll_next(cx) } }