From be451dedf690ee2f9455b7bf70c1ef6112e1ebca Mon Sep 17 00:00:00 2001 From: ashneverdawn <8341280+ashneverdawn@users.noreply.github.com> Date: Mon, 2 Dec 2024 14:05:27 -0400 Subject: [PATCH] impl community message attachments --- .../src/store/message/community_task.rs | 142 +++++++++-- extensions/warp-ipfs/tests/community.rs | 240 ++++++++++++++++++ 2 files changed, 361 insertions(+), 21 deletions(-) diff --git a/extensions/warp-ipfs/src/store/message/community_task.rs b/extensions/warp-ipfs/src/store/message/community_task.rs index cf13174ed..cb1919eb8 100644 --- a/extensions/warp-ipfs/src/store/message/community_task.rs +++ b/extensions/warp-ipfs/src/store/message/community_task.rs @@ -29,9 +29,7 @@ use warp::raygun::community::{ CommunityPermission, CommunityRole, RoleId, }; use warp::raygun::{ - AttachmentEventStream, ConversationImage, Location, MessageEvent, MessageOptions, - MessageReference, MessageStatus, Messages, MessagesType, PinState, RayGunEventKind, - ReactionState, + AttachmentEventStream, ConversationImage, Location, MessageEvent, MessageOptions, MessageReference, MessageStatus, MessageType, Messages, MessagesType, PinState, RayGunEventKind, ReactionState }; use warp::{ crypto::{cipher::Cipher, generate}, @@ -68,6 +66,8 @@ use crate::{ }, }; +use super::attachment::AttachmentStream; + type AttachmentOneshot = (MessageDocument, oneshot::Sender>); #[allow(dead_code)] @@ -321,7 +321,7 @@ pub struct CommunityTask { community_id: Uuid, ipfs: Ipfs, root: RootDocumentMap, - _file: FileStore, + file: FileStore, identity: IdentityStore, discovery: Discovery, pending_key_exchange: IndexMap, bool)>>, @@ -332,7 +332,7 @@ pub struct CommunityTask { event_stream: SubscriptionStream, request_stream: SubscriptionStream, - _attachment_tx: futures::channel::mpsc::Sender, + attachment_tx: futures::channel::mpsc::Sender, attachment_rx: futures::channel::mpsc::Receiver, message_command: futures::channel::mpsc::Sender, event_broadcast: tokio::sync::broadcast::Sender, @@ -403,7 +403,7 @@ impl CommunityTask { community_id, ipfs: ipfs.clone(), root: root.clone(), - _file: file.clone(), + file: file.clone(), identity: identity.clone(), discovery: discovery.clone(), pending_key_exchange: Default::default(), @@ -414,7 +414,7 @@ impl CommunityTask { request_stream, event_stream, - _attachment_tx: atx, + attachment_tx: atx, attachment_rx: arx, event_broadcast: btx, _event_subscription, @@ -3304,29 +3304,129 @@ impl CommunityTask { } pub async fn attach_to_community_channel_message( &mut self, - _channel_id: Uuid, - _message_id: Option, - _locations: Vec, - _message: Vec, + channel_id: Uuid, + message_id: Option, + locations: Vec, + messages: Vec, ) -> Result<(Uuid, AttachmentEventStream), Error> { - Err(Error::Unimplemented) + let own_did = &self.identity.did_key(); + if !self.document.has_channel_permission( + own_did, + &CommunityChannelPermission::SendMessages, + channel_id, + ) { + return Err(Error::Unauthorized); + } + + let keystore = pubkey_or_keystore(&*self)?; + + let stream = AttachmentStream::new( + &self.ipfs, + self.root.keypair(), + &self.identity.did_key(), + &self.file, + channel_id, + keystore, + self.attachment_tx.clone(), + ) + .set_reply(message_id) + .set_locations(locations)? + .set_lines(messages)?; + + let message_id = stream.message_id(); + + Ok((message_id, stream.boxed())) } pub async fn download_from_community_channel_message( &self, - _channel_id: Uuid, - _message_id: Uuid, - _file: String, - _path: PathBuf, + channel_id: Uuid, + message_id: Uuid, + file: String, + path: PathBuf, ) -> Result { - Err(Error::Unimplemented) + let own_did = &self.identity.did_key(); + if !self.document.has_channel_permission( + own_did, + &CommunityChannelPermission::ViewChannel, + channel_id, + ) { + return Err(Error::Unauthorized); + } + + let channel = match self.document.channels.get(&channel_id.to_string()) { + Some(c) => c, + None => return Err(Error::CommunityChannelDoesntExist), + }; + + let members = self + .document + .participants() + .iter() + .filter_map(|did| did.to_peer_id().ok()) + .collect::>(); + + let message = channel + .get_message_document(&self.ipfs, message_id) + .await?; + + if message.message_type != MessageType::Attachment { + return Err(Error::InvalidMessage); + } + + let attachment = message + .attachments() + .iter() + .find(|attachment| attachment.name == file) + .ok_or(Error::FileNotFound)?; + + let stream = attachment.download(&self.ipfs, path, &members, None); + + Ok(stream) } pub async fn download_stream_from_community_channel_message( &self, - _channel_id: Uuid, - _message_id: Uuid, - _file: String, + channel_id: Uuid, + message_id: Uuid, + file: String, ) -> Result>, Error> { - Err(Error::Unimplemented) + let own_did = &self.identity.did_key(); + if !self.document.has_channel_permission( + own_did, + &CommunityChannelPermission::ViewChannel, + channel_id, + ) { + return Err(Error::Unauthorized); + } + + let channel = match self.document.channels.get(&channel_id.to_string()) { + Some(c) => c, + None => return Err(Error::CommunityChannelDoesntExist), + }; + + let members = self + .document + .participants() + .iter() + .filter_map(|did| did.to_peer_id().ok()) + .collect::>(); + + let message = channel + .get_message_document(&self.ipfs, message_id) + .await?; + + if message.message_type != MessageType::Attachment { + return Err(Error::InvalidMessage); + } + + let attachment = message + .attachments() + .iter() + .find(|attachment| attachment.name == file) + .ok_or(Error::FileNotFound)?; + + let stream = attachment.download_stream(&self.ipfs, &members, None); + + Ok(stream) } async fn store_direct_for_attachment(&mut self, message: MessageDocument) -> Result<(), Error> { diff --git a/extensions/warp-ipfs/tests/community.rs b/extensions/warp-ipfs/tests/community.rs index cd6abd0af..3925b3534 100644 --- a/extensions/warp-ipfs/tests/community.rs +++ b/extensions/warp-ipfs/tests/community.rs @@ -4310,4 +4310,244 @@ mod test { .await?; Ok(()) } + #[async_test] + async fn unauthorized_attach_to_community_channel_message() -> anyhow::Result<()> { + let context = Some("test::unauthorized_attach_to_community_channel_message".into()); + let acc = (None, None, context); + let accounts = create_accounts(vec![acc.clone(), acc]).await?; + let (instance_a, did_a, _) = &mut accounts[0].clone(); + let (instance_b, did_b, _) = &mut accounts[1].clone(); + + let community = instance_a.create_community("Community0").await?; + let channel = instance_a + .create_community_channel(community.id(), "Channel0", CommunityChannelType::Standard) + .await?; + + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + let invite = instance_a + .create_community_invite(community.id(), Some(did_b.clone()), None) + .await?; + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityInvited { + community_id: community.id(), + invite_id: invite.id() + } + ); + + let mut stream_a = instance_a.get_community_stream(community.id()).await?; + instance_b + .accept_community_invite(community.id(), invite.id()) + .await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::AcceptedCommunityInvite { + community_id: community.id(), + invite_id: invite.id(), + user: did_b.clone() + } + ); + assert!(false); + Ok(()) + } + #[async_test] + async fn authorized_attach_to_community_channel_message() -> anyhow::Result<()> { + let context = Some("test::authorized_attach_to_community_channel_message".into()); + let acc = (None, None, context); + let accounts = create_accounts(vec![acc.clone(), acc]).await?; + let (instance_a, did_a, _) = &mut accounts[0].clone(); + let (instance_b, did_b, _) = &mut accounts[1].clone(); + + let community = instance_a.create_community("Community0").await?; + let channel = instance_a + .create_community_channel(community.id(), "Channel0", CommunityChannelType::Standard) + .await?; + + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + let invite = instance_a + .create_community_invite(community.id(), Some(did_b.clone()), None) + .await?; + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityInvited { + community_id: community.id(), + invite_id: invite.id() + } + ); + + let mut stream_a = instance_a.get_community_stream(community.id()).await?; + instance_b + .accept_community_invite(community.id(), invite.id()) + .await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::AcceptedCommunityInvite { + community_id: community.id(), + invite_id: invite.id(), + user: did_b.clone() + } + ); + assert!(false); + Ok(()) + } + #[async_test] + async fn unauthorized_download_from_community_channel_message() -> anyhow::Result<()> { + let context = Some("test::unauthorized_download_from_community_channel_message".into()); + let acc = (None, None, context); + let accounts = create_accounts(vec![acc.clone(), acc]).await?; + let (instance_a, did_a, _) = &mut accounts[0].clone(); + let (instance_b, did_b, _) = &mut accounts[1].clone(); + + let community = instance_a.create_community("Community0").await?; + let channel = instance_a + .create_community_channel(community.id(), "Channel0", CommunityChannelType::Standard) + .await?; + + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + let invite = instance_a + .create_community_invite(community.id(), Some(did_b.clone()), None) + .await?; + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityInvited { + community_id: community.id(), + invite_id: invite.id() + } + ); + + let mut stream_a = instance_a.get_community_stream(community.id()).await?; + instance_b + .accept_community_invite(community.id(), invite.id()) + .await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::AcceptedCommunityInvite { + community_id: community.id(), + invite_id: invite.id(), + user: did_b.clone() + } + ); + assert!(false); + Ok(()) + } + #[async_test] + async fn authorized_download_from_community_channel_message() -> anyhow::Result<()> { + let context = Some("test::authorized_download_from_community_channel_message".into()); + let acc = (None, None, context); + let accounts = create_accounts(vec![acc.clone(), acc]).await?; + let (instance_a, did_a, _) = &mut accounts[0].clone(); + let (instance_b, did_b, _) = &mut accounts[1].clone(); + + let community = instance_a.create_community("Community0").await?; + let channel = instance_a + .create_community_channel(community.id(), "Channel0", CommunityChannelType::Standard) + .await?; + + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + let invite = instance_a + .create_community_invite(community.id(), Some(did_b.clone()), None) + .await?; + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityInvited { + community_id: community.id(), + invite_id: invite.id() + } + ); + + let mut stream_a = instance_a.get_community_stream(community.id()).await?; + instance_b + .accept_community_invite(community.id(), invite.id()) + .await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::AcceptedCommunityInvite { + community_id: community.id(), + invite_id: invite.id(), + user: did_b.clone() + } + ); + assert!(false); + Ok(()) + } + #[async_test] + async fn unauthorized_download_stream_from_community_channel_message() -> anyhow::Result<()> { + let context = Some("test::unauthorized_download_stream_from_community_channel_message".into()); + let acc = (None, None, context); + let accounts = create_accounts(vec![acc.clone(), acc]).await?; + let (instance_a, did_a, _) = &mut accounts[0].clone(); + let (instance_b, did_b, _) = &mut accounts[1].clone(); + + let community = instance_a.create_community("Community0").await?; + let channel = instance_a + .create_community_channel(community.id(), "Channel0", CommunityChannelType::Standard) + .await?; + + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + let invite = instance_a + .create_community_invite(community.id(), Some(did_b.clone()), None) + .await?; + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityInvited { + community_id: community.id(), + invite_id: invite.id() + } + ); + + let mut stream_a = instance_a.get_community_stream(community.id()).await?; + instance_b + .accept_community_invite(community.id(), invite.id()) + .await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::AcceptedCommunityInvite { + community_id: community.id(), + invite_id: invite.id(), + user: did_b.clone() + } + ); + assert!(false); + Ok(()) + } + #[async_test] + async fn authorized_download_stream_from_community_channel_message() -> anyhow::Result<()> { + let context = Some("test::authorized_download_stream_from_community_channel_message".into()); + let acc = (None, None, context); + let accounts = create_accounts(vec![acc.clone(), acc]).await?; + let (instance_a, did_a, _) = &mut accounts[0].clone(); + let (instance_b, did_b, _) = &mut accounts[1].clone(); + + let community = instance_a.create_community("Community0").await?; + let channel = instance_a + .create_community_channel(community.id(), "Channel0", CommunityChannelType::Standard) + .await?; + + let mut rg_stream_b = instance_b.raygun_subscribe().await?; + let invite = instance_a + .create_community_invite(community.id(), Some(did_b.clone()), None) + .await?; + assert_eq!( + next_event(&mut rg_stream_b, Duration::from_secs(60)).await?, + RayGunEventKind::CommunityInvited { + community_id: community.id(), + invite_id: invite.id() + } + ); + + let mut stream_a = instance_a.get_community_stream(community.id()).await?; + instance_b + .accept_community_invite(community.id(), invite.id()) + .await?; + assert_eq!( + next_event(&mut stream_a, Duration::from_secs(60)).await?, + MessageEventKind::AcceptedCommunityInvite { + community_id: community.id(), + invite_id: invite.id(), + user: did_b.clone() + } + ); + assert!(false); + Ok(()) + } }