Skip to content

Commit

Permalink
impl community message attachments
Browse files Browse the repository at this point in the history
  • Loading branch information
ashneverdawn committed Dec 2, 2024
1 parent ae3158c commit be451de
Show file tree
Hide file tree
Showing 2 changed files with 361 additions and 21 deletions.
142 changes: 121 additions & 21 deletions extensions/warp-ipfs/src/store/message/community_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ use warp::raygun::community::{
CommunityPermission, CommunityRole, RoleId,
};
use warp::raygun::{
AttachmentEventStream, ConversationImage, Location, MessageEvent, MessageOptions,
MessageReference, MessageStatus, Messages, MessagesType, PinState, RayGunEventKind,
ReactionState,
AttachmentEventStream, ConversationImage, Location, MessageEvent, MessageOptions, MessageReference, MessageStatus, MessageType, Messages, MessagesType, PinState, RayGunEventKind, ReactionState
};
use warp::{
crypto::{cipher::Cipher, generate},
Expand Down Expand Up @@ -68,6 +66,8 @@ use crate::{
},
};

use super::attachment::AttachmentStream;

type AttachmentOneshot = (MessageDocument, oneshot::Sender<Result<(), Error>>);

#[allow(dead_code)]
Expand Down Expand Up @@ -321,7 +321,7 @@ pub struct CommunityTask {
community_id: Uuid,
ipfs: Ipfs,
root: RootDocumentMap,
_file: FileStore,
file: FileStore,
identity: IdentityStore,
discovery: Discovery,
pending_key_exchange: IndexMap<DID, Vec<(Vec<u8>, bool)>>,
Expand All @@ -332,7 +332,7 @@ pub struct CommunityTask {
event_stream: SubscriptionStream,
request_stream: SubscriptionStream,

_attachment_tx: futures::channel::mpsc::Sender<AttachmentOneshot>,
attachment_tx: futures::channel::mpsc::Sender<AttachmentOneshot>,
attachment_rx: futures::channel::mpsc::Receiver<AttachmentOneshot>,
message_command: futures::channel::mpsc::Sender<MessageCommand>,
event_broadcast: tokio::sync::broadcast::Sender<MessageEventKind>,
Expand Down Expand Up @@ -403,7 +403,7 @@ impl CommunityTask {
community_id,
ipfs: ipfs.clone(),
root: root.clone(),
_file: file.clone(),
file: file.clone(),
identity: identity.clone(),
discovery: discovery.clone(),
pending_key_exchange: Default::default(),
Expand All @@ -414,7 +414,7 @@ impl CommunityTask {
request_stream,
event_stream,

_attachment_tx: atx,
attachment_tx: atx,
attachment_rx: arx,
event_broadcast: btx,
_event_subscription,
Expand Down Expand Up @@ -3304,29 +3304,129 @@ impl CommunityTask {
}
pub async fn attach_to_community_channel_message(
&mut self,
_channel_id: Uuid,
_message_id: Option<Uuid>,
_locations: Vec<Location>,
_message: Vec<String>,
channel_id: Uuid,
message_id: Option<Uuid>,
locations: Vec<Location>,
messages: Vec<String>,
) -> Result<(Uuid, AttachmentEventStream), Error> {
Err(Error::Unimplemented)
let own_did = &self.identity.did_key();
if !self.document.has_channel_permission(
own_did,
&CommunityChannelPermission::SendMessages,
channel_id,
) {
return Err(Error::Unauthorized);
}

let keystore = pubkey_or_keystore(&*self)?;

let stream = AttachmentStream::new(
&self.ipfs,
self.root.keypair(),
&self.identity.did_key(),
&self.file,
channel_id,
keystore,
self.attachment_tx.clone(),
)
.set_reply(message_id)
.set_locations(locations)?
.set_lines(messages)?;

let message_id = stream.message_id();

Ok((message_id, stream.boxed()))
}
pub async fn download_from_community_channel_message(
&self,
_channel_id: Uuid,
_message_id: Uuid,
_file: String,
_path: PathBuf,
channel_id: Uuid,
message_id: Uuid,
file: String,
path: PathBuf,
) -> Result<ConstellationProgressStream, Error> {
Err(Error::Unimplemented)
let own_did = &self.identity.did_key();
if !self.document.has_channel_permission(
own_did,
&CommunityChannelPermission::ViewChannel,
channel_id,
) {
return Err(Error::Unauthorized);
}

let channel = match self.document.channels.get(&channel_id.to_string()) {
Some(c) => c,
None => return Err(Error::CommunityChannelDoesntExist),
};

let members = self
.document
.participants()
.iter()
.filter_map(|did| did.to_peer_id().ok())
.collect::<Vec<_>>();

let message = channel
.get_message_document(&self.ipfs, message_id)
.await?;

if message.message_type != MessageType::Attachment {
return Err(Error::InvalidMessage);
}

let attachment = message
.attachments()
.iter()
.find(|attachment| attachment.name == file)
.ok_or(Error::FileNotFound)?;

let stream = attachment.download(&self.ipfs, path, &members, None);

Ok(stream)
}
pub async fn download_stream_from_community_channel_message(
&self,
_channel_id: Uuid,
_message_id: Uuid,
_file: String,
channel_id: Uuid,
message_id: Uuid,
file: String,
) -> Result<BoxStream<'static, Result<Bytes, std::io::Error>>, Error> {
Err(Error::Unimplemented)
let own_did = &self.identity.did_key();
if !self.document.has_channel_permission(
own_did,
&CommunityChannelPermission::ViewChannel,
channel_id,
) {
return Err(Error::Unauthorized);
}

let channel = match self.document.channels.get(&channel_id.to_string()) {
Some(c) => c,
None => return Err(Error::CommunityChannelDoesntExist),
};

let members = self
.document
.participants()
.iter()
.filter_map(|did| did.to_peer_id().ok())
.collect::<Vec<_>>();

let message = channel
.get_message_document(&self.ipfs, message_id)
.await?;

if message.message_type != MessageType::Attachment {
return Err(Error::InvalidMessage);
}

let attachment = message
.attachments()
.iter()
.find(|attachment| attachment.name == file)
.ok_or(Error::FileNotFound)?;

let stream = attachment.download_stream(&self.ipfs, &members, None);

Ok(stream)
}

async fn store_direct_for_attachment(&mut self, message: MessageDocument) -> Result<(), Error> {
Expand Down
Loading

0 comments on commit be451de

Please sign in to comment.