Skip to content

Commit

Permalink
chore: disable loading mailbox
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 committed Dec 10, 2024
1 parent 46a6b52 commit a3e0dff
Showing 1 changed file with 184 additions and 172 deletions.
356 changes: 184 additions & 172 deletions extensions/warp-ipfs/src/store/message/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@ use futures::{StreamExt, TryFutureExt};
use futures_timer::Delay;
use indexmap::{IndexMap, IndexSet};
use ipld_core::cid::Cid;
use pollable_map::futures::FutureMap;
use rust_ipfs::p2p::MultiaddrExt;
use rust_ipfs::{libp2p::gossipsub::Message, Ipfs};
use rust_ipfs::{IpfsPath, PeerId, SubscriptionStream};
use serde::{Deserialize, Serialize};
use std::borrow::BorrowMut;
use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
Expand Down Expand Up @@ -50,7 +48,7 @@ use crate::store::event_subscription::EventSubscription;
use crate::store::message::attachment::AttachmentStream;
use crate::store::topics::PeerTopic;
use crate::store::{
ecdh_shared_key, protocols, verify_serde_sig, ConversationEvents, ConversationImageType,
ecdh_shared_key, verify_serde_sig, ConversationEvents, ConversationImageType,
MAX_CONVERSATION_BANNER_SIZE, MAX_CONVERSATION_ICON_SIZE,
};
use crate::utils::{ByteCollection, ExtensionType};
Expand All @@ -74,6 +72,7 @@ type AttachmentOneshot = (MessageDocument, oneshot::Sender<Result<(), Error>>);

use super::DownloadStream;

#[derive(Debug)]
#[allow(dead_code)]
pub enum ConversationTaskCommand {
SetDescription {
Expand Down Expand Up @@ -446,174 +445,187 @@ impl ConversationTask {
impl ConversationTask {
#[allow(dead_code)]
async fn load_from_mailbox(&mut self) -> Result<(), Error> {
let crate::config::Discovery::Shuttle { addresses } =
self.discovery.discovery_config().clone()
else {
return Ok(());
};

if addresses.is_empty() {
return Err(Error::Other);
}

let ipfs = self.ipfs.clone();
let addresses = addresses.clone();
let keypair = self.identity.root_document().keypair().clone();
let conversation_id = self.conversation_id;

let payload = PayloadBuilder::new(
self.identity.root_document().keypair(),
crate::shuttle::message::protocol::Request::FetchMailBox {
conversation_id: self.conversation_id,
},
)
.build()?;

let bytes = payload.to_bytes().expect("valid deserialization");

let mut mailbox = BTreeMap::new();
let mut providers = vec![];
for peer_id in addresses.iter().filter_map(|addr| addr.peer_id()) {
let response = match ipfs
.send_request(peer_id, (protocols::SHUTTLE_MESSAGE, bytes.clone()))
.await
.and_then(|response| {
PayloadMessage::<crate::shuttle::message::protocol::Response>::from_bytes(
&response,
)
.map_err(anyhow::Error::from)
}) {
Ok(response) => response,
Err(_e) => {
continue;
}
};

match response.message() {
crate::shuttle::message::protocol::Response::Mailbox {
conversation_id: retrieved_id,
content,
} => {
debug_assert_eq!(*retrieved_id, conversation_id);
providers.push(peer_id);
mailbox.extend(content.clone());
break;
}
crate::shuttle::message::protocol::Response::Error(e) => {
tracing::error!(error = %e, %peer_id, "error handling request");
}
_ => {
tracing::error!(%peer_id, "response from shuttle node was invalid");
continue;
}
}
}

let conversation_mailbox = mailbox
.into_iter()
.filter_map(|(id, cid)| {
let id = Uuid::from_str(&id).ok()?;
Some((id, cid))
})
.collect::<BTreeMap<Uuid, Cid>>();

let mut messages = FutureMap::new();
for (id, cid) in conversation_mailbox {
let ipfs = ipfs.clone();
let providers = providers.clone();
let keypair = keypair.clone();
let fut = async move {
ipfs.fetch(&cid).recursive().await?;
let message_document = ipfs
.get_dag(cid)
.providers(&providers)
.deserialized::<MessageDocument>()
.await?;

if !message_document.verify() {
return Err(Error::InvalidMessage);
}

let payload = PayloadBuilder::new(
&keypair,
crate::shuttle::message::protocol::Request::FetchMailBox { conversation_id },
)
.build()?;

let bytes = payload.to_bytes().expect("valid deserialization");
for peer_id in providers {
let _response = ipfs
.send_request(peer_id, (protocols::SHUTTLE_MESSAGE, bytes.clone()))
.await;
}

Ok(message_document)
};
messages.insert(id, Box::pin(fut));
}

let mut messages = messages
.filter_map(|(_, result)| async move { result.ok() })
.collect::<Vec<_>>()
.await;

messages.sort_by(|a, b| b.cmp(a));

for message in messages {
if !message.verify() {
continue;
}
let message_id = message.id;
match self
.document
.contains(&self.ipfs, message_id)
.await
.unwrap_or_default()
{
true => {
let current_message = self
.document
.get_message_document(&self.ipfs, message_id)
.await?;

self.document
.update_message_document(&self.ipfs, &message)
.await?;

let is_edited = matches!((message.modified, current_message.modified), (Some(modified), Some(current_modified)) if modified > current_modified )
| matches!(
(message.modified, current_message.modified),
(Some(_), None)
);

match is_edited {
true => {
let _ = self.event_broadcast.send(MessageEventKind::MessageEdited {
conversation_id,
message_id,
});
}
false => {
//TODO: Emit event showing message was updated in some way
}
}
}
false => {
self.document
.insert_message_document(&self.ipfs, &message)
.await?;

let _ = self
.event_broadcast
.send(MessageEventKind::MessageReceived {
conversation_id,
message_id,
});
}
}
}

self.set_document().await?;
// let crate::config::Discovery::Shuttle { addresses } =
// self.discovery.discovery_config().clone()
// else {
// return Ok(());
// };
//
// if addresses.is_empty() {
// return Err(Error::Other);
// }
//
// let ipfs = self.ipfs.clone();
// let addresses = addresses.clone();
// let keypair = self.identity.root_document().keypair().clone();
// let conversation_id = self.conversation_id;
//
// let payload = PayloadBuilder::new(
// self.identity.root_document().keypair(),
// crate::shuttle::message::protocol::Request::FetchMailBox {
// conversation_id: self.conversation_id,
// },
// )
// .build()?;
//
// let bytes = payload.to_bytes().expect("valid deserialization");
//
// let mut mailbox = BTreeMap::new();
// let mut providers = vec![];
//
// let peers = addresses
// .iter()
// .filter_map(|addr| addr.peer_id())
// .collect::<IndexSet<_>>();
//
// let response_st = ipfs
// .send_requests(peers.clone(), (protocols::SHUTTLE_MESSAGE, bytes))
// .await?;
//
// let response_st = response_st
// .map(|(peer_id, result)| {
// (
// peer_id,
// result.and_then(|bytes| {
// PayloadMessage::<crate::shuttle::message::protocol::Response>::from_bytes(
// &bytes,
// )
// .map_err(std::io::Error::other)
// }),
// )
// })
// .filter_map(|(peer_id, result)| match result {
// Ok(payload) => futures::future::ready(Some((peer_id, payload))),
// Err(e) => {
// tracing::error!(error = %e, %peer_id, "unable to decode payload");
// futures::future::ready(None)
// }
// })
// .filter_map(|(peer_id, payload)| async move {
// match payload.message() {
// crate::shuttle::message::protocol::Response::Mailbox {
// conversation_id: retrieved_id,
// content,
// } => {
// debug_assert_eq!(*retrieved_id, conversation_id);
// Some(content.clone())
// }
// crate::shuttle::message::protocol::Response::Error(e) => {
// tracing::error!(error = %e, %peer_id, "error handling request");
// None
// }
// _ => {
// tracing::error!(%peer_id, "response from shuttle node was invalid");
// None
// }
// }
// });
//
// let conversation_mailbox = mailbox
// .into_iter()
// .filter_map(|(id, cid)| {
// let id = Uuid::from_str(&id).ok()?;
// Some((id, cid))
// })
// .collect::<BTreeMap<Uuid, Cid>>();
//
// let mut messages = FutureMap::new();
// for (id, cid) in conversation_mailbox {
// let ipfs = ipfs.clone();
// let providers = providers.clone();
// let keypair = keypair.clone();
// let fut = async move {
// ipfs.fetch(&cid).recursive().await?;
// let message_document = ipfs
// .get_dag(cid)
// .providers(&providers)
// .deserialized::<MessageDocument>()
// .await?;
//
// if !message_document.verify() {
// return Err(Error::InvalidMessage);
// }
//
// let payload = PayloadBuilder::new(
// &keypair,
// crate::shuttle::message::protocol::Request::FetchMailBox { conversation_id },
// )
// .build()?;
//
// let bytes = payload.to_bytes().expect("valid deserialization");
// for peer_id in providers {
// let _response = ipfs
// .send_request(peer_id, (protocols::SHUTTLE_MESSAGE, bytes.clone()))
// .await;
// }
//
// Ok(message_document)
// };
// messages.insert(id, Box::pin(fut));
// }
//
// let mut messages = messages
// .filter_map(|(_, result)| async move { result.ok() })
// .collect::<Vec<_>>()
// .await;
//
// messages.sort_by(|a, b| b.cmp(a));
//
// for message in messages {
// if !message.verify() {
// continue;
// }
// let message_id = message.id;
// match self
// .document
// .contains(&self.ipfs, message_id)
// .await
// .unwrap_or_default()
// {
// true => {
// let current_message = self
// .document
// .get_message_document(&self.ipfs, message_id)
// .await?;
//
// self.document
// .update_message_document(&self.ipfs, &message)
// .await?;
//
// let is_edited = matches!((message.modified, current_message.modified), (Some(modified), Some(current_modified)) if modified > current_modified )
// | matches!(
// (message.modified, current_message.modified),
// (Some(_), None)
// );
//
// match is_edited {
// true => {
// let _ = self.event_broadcast.send(MessageEventKind::MessageEdited {
// conversation_id,
// message_id,
// });
// }
// false => {
// //TODO: Emit event showing message was updated in some way
// }
// }
// }
// false => {
// self.document
// .insert_message_document(&self.ipfs, &message)
// .await?;
//
// let _ = self
// .event_broadcast
// .send(MessageEventKind::MessageReceived {
// conversation_id,
// message_id,
// });
// }
// }
// }
//
// self.set_document().await?;

Ok(())
}
Expand Down

0 comments on commit a3e0dff

Please sign in to comment.