Skip to content

Commit

Permalink
feat: open community invites
Browse files Browse the repository at this point in the history
  • Loading branch information
ashneverdawn committed Dec 17, 2024
1 parent fdb96da commit 4604b86
Show file tree
Hide file tree
Showing 10 changed files with 413 additions and 599 deletions.
8 changes: 2 additions & 6 deletions extensions/warp-ipfs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1785,13 +1785,9 @@ impl RayGunCommunity for WarpIpfs {
.get_community_invite(community_id, invite_id)
.await
}
async fn accept_community_invite(
&mut self,
community_id: Uuid,
invite_id: Uuid,
) -> Result<(), Error> {
async fn request_join_community(&mut self, community_id: Uuid) -> Result<(), Error> {
self.messaging_store()?
.accept_community_invite(community_id, invite_id)
.request_join_community(community_id)
.await
}
async fn edit_community_invite(
Expand Down
13 changes: 7 additions & 6 deletions extensions/warp-ipfs/src/store/community.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ impl CommunityDocument {
self.id.exchange_topic(did)
}

pub fn join_topic(&self) -> String {
self.id.join_topic()
}

pub fn sign(&mut self, keypair: &Keypair) -> Result<(), Error> {
let construct = warp::crypto::hash::sha256_iter(
[
Expand Down Expand Up @@ -275,12 +279,9 @@ impl From<CommunityDocument> for Community {
}
impl CommunityDocument {
pub fn participants(&self) -> IndexSet<DID> {
self.invites
.iter()
.filter_map(|(_, invite)| invite.target_user.clone())
.chain(self.members.clone())
.chain(std::iter::once(self.owner.clone()))
.collect::<IndexSet<_>>()
let mut participants = self.members.clone();
participants.insert(self.owner.clone());
participants
}
pub fn has_valid_invite(&self, user: &DID) -> bool {
for (_, invite) in &self.invites {
Expand Down
120 changes: 76 additions & 44 deletions extensions/warp-ipfs/src/store/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use rust_ipfs::{Ipfs, PeerId};
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use super::topics::ConversationTopic;
use super::{document::root::RootDocumentMap, ds_key::DataStoreKey, PeerIdExt};
use crate::store::CommunityJoinEvents;
use crate::store::{
conversation::ConversationDocument,
discovery::Discovery,
Expand Down Expand Up @@ -1064,26 +1066,30 @@ impl MessageStore {
.await;
rx.await.map_err(anyhow::Error::from)?
}
pub async fn accept_community_invite(
&mut self,
community_id: Uuid,
invite_id: Uuid,
) -> Result<(), Error> {
pub async fn request_join_community(&mut self, community_id: Uuid) -> Result<(), Error> {
let inner = &*self.inner.read().await;
let community_meta = inner
.community_task
.get(&community_id)
.ok_or(Error::InvalidCommunity)?;
let (tx, rx) = oneshot::channel();
let _ = community_meta
.command_tx
.clone()
.send(CommunityTaskCommand::AcceptCommunityInvite {
invite_id,
response: tx,
})
.await;
rx.await.map_err(anyhow::Error::from)?
let own_did = &inner.identity.did_key();

let keypair = inner.root.keypair();

let event = CommunityJoinEvents::Join {
community_id,
user: own_did.clone(),
};
let bytes = serde_json::to_vec(&event)?;
let payload = PayloadBuilder::new(keypair, bytes)
.from_ipfs(&inner.ipfs)
.await?;
let bytes = payload.to_bytes()?;

if let Err(e) = inner
.ipfs
.pubsub_publish(community_id.join_topic(), bytes)
.await
{
tracing::error!(id=%community_id, "Unable to send event: {e}");
}
Ok(())
}
pub async fn edit_community_invite(
&mut self,
Expand Down Expand Up @@ -3031,8 +3037,7 @@ impl ConversationInner {
pub async fn get_community(&mut self, community_id: Uuid) -> Result<Community, Error> {
let doc = self.get_community_document(community_id).await?;
let own_did = &self.identity.did_key();
if own_did != &doc.owner && !doc.has_valid_invite(own_did) && !doc.members.contains(own_did)
{
if !doc.participants().contains(own_did) {
return Err(Error::Unauthorized);
}
Ok(doc.into())
Expand Down Expand Up @@ -3211,39 +3216,66 @@ async fn process_conversation(
ConversationEvents::NewCommunityInvite {
community_id,
invite,
community_document,
} => {
let did = this.identity.did_key();

if this.contains_community(community_id).await {
return Err(anyhow::anyhow!("Already apart of {community_id}").into());
this.event
.emit(RayGunEventKind::CommunityInvited {
community_id,
invite_id: invite.id,
})
.await;
}
ConversationEvents::JoinCommunity {
community_id,
community_document: result,
} => match result {
None => {
this.event
.emit(RayGunEventKind::CommunityJoinRejected { community_id })
.await;
return Ok(());
}
Some(community_document) => {
let did = this.identity.did_key();

if this.contains_community(community_id).await {
return Ok(());
}

let recipients = community_document.participants().clone();
let recipients = community_document.participants().clone();

for recipient in &recipients {
if !this.discovery.contains(recipient).await {
let _ = this.discovery.insert(recipient).await;
for recipient in &recipients {
if !this.discovery.contains(recipient).await {
let _ = this.discovery.insert(recipient).await;
}
}
}

this.set_community_document(community_document).await?;
this.set_community_document(community_document).await?;

this.create_community_task(community_id).await?;
this.create_community_task(community_id).await?;

for recipient in recipients.iter().filter(|d| did.ne(d)) {
if let Err(e) = this.request_community_key(community_id, recipient).await {
tracing::warn!(%community_id, error = %e, %recipient, "Failed to send exchange request");
for recipient in recipients.iter().filter(|d| did.ne(d)) {
if let Err(e) = this.request_community_key(community_id, recipient).await {
tracing::warn!(%community_id, error = %e, %recipient, "Failed to send exchange request");
}
}
}

this.event
.emit(RayGunEventKind::CommunityInvited {
community_id,
invite_id: invite.id,
})
.await;
}
let community_meta = this
.community_task
.get(&community_id)
.ok_or(Error::InvalidCommunity)?;
let (tx, rx) = oneshot::channel();
let _ = community_meta
.command_tx
.clone()
.send(CommunityTaskCommand::SendJoinedCommunityEvent { response: tx })
.await;
let _ = rx.await.map_err(anyhow::Error::from)?;

this.event
.emit(RayGunEventKind::CommunityJoined { community_id })
.await;
}
},
ConversationEvents::DeleteCommunity { community_id } => {
tracing::trace!("Delete community event received for {community_id}");
if !this.contains_community(community_id).await {
Expand Down
Loading

0 comments on commit 4604b86

Please sign in to comment.