Skip to content

Commit

Permalink
refactor: remove async sig from {FileDocument, MessageDocument}::new (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 authored Dec 28, 2024
1 parent 20358ee commit 37bb6e3
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 64 deletions.
15 changes: 5 additions & 10 deletions extensions/warp-ipfs/src/store/conversation/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ impl Ord for MessageDocument {
}

impl MessageDocument {
pub async fn new(
ipfs: &Ipfs,
pub fn new(
keypair: &Keypair,
message: Message,
key: Either<&DID, &Keystore>,
Expand Down Expand Up @@ -122,14 +121,10 @@ impl MessageDocument {
});
}

let attachments = FuturesUnordered::from_iter(
attachments
.iter()
.map(|file| FileAttachmentDocument::new(ipfs, file).into_future()),
)
.filter_map(|result| async move { result.ok() })
.collect::<Vec<_>>()
.await;
let attachments = attachments
.iter()
.filter_map(|file| FileAttachmentDocument::new(file).ok())
.collect::<Vec<_>>();

if !lines.is_empty() {
let lines_value_length: usize = lines
Expand Down
4 changes: 2 additions & 2 deletions extensions/warp-ipfs/src/store/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,8 @@ pub struct FileAttachmentDocument {
}

impl FileAttachmentDocument {
pub async fn new(ipfs: &Ipfs, file: &File) -> Result<Self, Error> {
let file_document = FileDocument::new(ipfs, file).await?;
pub fn new(file: &File) -> Result<Self, Error> {
let file_document = FileDocument::new(file);
file_document.to_attachment()
}

Expand Down
41 changes: 9 additions & 32 deletions extensions/warp-ipfs/src/store/document/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,10 @@ impl DirectoryDocument {

document.items = Some(cid);

if let Some(cid) = root
document.thumbnail = root
.thumbnail_reference()
.and_then(|refs| refs.parse::<IpfsPath>().ok())
.and_then(|path| path.root().cid().copied())
{
document.thumbnail = ipfs
.repo()
.contains(&cid)
.await
.unwrap_or_default()
.then_some(cid)
}
.and_then(|path| path.root().cid().copied());

Ok(document)
}
Expand Down Expand Up @@ -141,7 +133,7 @@ impl ItemDocument {
pub async fn new(ipfs: &Ipfs, item: &Item) -> Result<ItemDocument, Error> {
let document = match item {
Item::File(file) => {
let document = FileDocument::new(ipfs, file).await?;
let document = FileDocument::new(file);
let cid = ipfs.put_dag(document).await?;
ItemDocument::File(cid)
}
Expand Down Expand Up @@ -198,7 +190,7 @@ pub struct FileDocument {
}

impl FileDocument {
pub async fn new(ipfs: &Ipfs, file: &File) -> Result<FileDocument, Error> {
pub fn new(file: &File) -> FileDocument {
let mut document = FileDocument {
name: file.name(),
size: file.size(),
Expand All @@ -212,33 +204,18 @@ impl FileDocument {
thumbnail: None,
};

if let Some(cid) = file
document.reference = file
.reference()
.and_then(|refs| refs.parse::<IpfsPath>().ok())
.and_then(|path| path.root().cid().copied())
{
document.reference = ipfs
.repo()
.contains(&cid)
.await
.unwrap_or_default()
.then(|| cid.to_string())
}
.map(|cid| cid.to_string());

if let Some(cid) = file
document.thumbnail = file
.thumbnail_reference()
.and_then(|refs| refs.parse::<IpfsPath>().ok())
.and_then(|path| path.root().cid().copied())
{
document.thumbnail = ipfs
.repo()
.contains(&cid)
.await
.unwrap_or_default()
.then_some(cid)
}
.and_then(|path| path.root().cid().copied());

Ok(document)
document
}

pub fn to_attachment(&self) -> Result<FileAttachmentDocument, Error> {
Expand Down
15 changes: 3 additions & 12 deletions extensions/warp-ipfs/src/store/message/attachment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::stream::SelectAll;
use futures::{stream, FutureExt, SinkExt, Stream, StreamExt};
use rust_ipfs::{Ipfs, Keypair};
use rust_ipfs::Keypair;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand All @@ -24,7 +24,6 @@ use warp::raygun::{AttachmentKind, Location, LocationKind, MessageType};
type AOneShot = (MessageDocument, oneshot::Sender<Result<(), Error>>);
type ProgressedStream = BoxStream<'static, (LocationKind, Progression, Option<File>)>;
pub struct AttachmentStream {
ipfs: Ipfs,
keypair: Keypair,
local_did: DID,
attachment_tx: futures::channel::mpsc::Sender<AOneShot>,
Expand Down Expand Up @@ -52,7 +51,6 @@ enum AttachmentState {

impl AttachmentStream {
pub fn new(
ipfs: &Ipfs,
keypair: &Keypair,
local_did: &DID,
file_store: &FileStore,
Expand All @@ -61,7 +59,6 @@ impl AttachmentStream {
attachment_tx: futures::channel::mpsc::Sender<AOneShot>,
) -> Self {
Self {
ipfs: ipfs.clone(),
keypair: keypair.clone(),
local_did: local_did.clone(),
file_store: file_store.clone(),
Expand Down Expand Up @@ -455,7 +452,6 @@ impl Stream for AttachmentStream {
let reply_id = this.reply_to;
let message_id = this.message_id;
let local_did = this.local_did.clone();
let ipfs = this.ipfs.clone();
let keystore = this.keystore.clone();
let keypair = this.keypair.clone();
let mut atx = this.attachment_tx.clone();
Expand All @@ -471,13 +467,8 @@ impl Stream for AttachmentStream {
}
message.set_replied(reply_id);

let message = MessageDocument::new(
&ipfs,
&keypair,
message,
keystore.as_ref(),
)
.await?;
let message =
MessageDocument::new(&keypair, message, keystore.as_ref())?;

let (tx, rx) = oneshot::channel();
_ = atx.send((message, tx)).await;
Expand Down
7 changes: 3 additions & 4 deletions extensions/warp-ipfs/src/store/message/community_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1456,7 +1456,7 @@ impl CommunityTask {
});
}

let document = FileDocument::new(&self.ipfs, &file).await?;
let document = FileDocument::new(&file);
let cid = document
.reference
.as_ref()
Expand Down Expand Up @@ -2844,7 +2844,7 @@ impl CommunityTask {
let message_id = message.id();
let keystore = pubkey_or_keystore(&*self)?;

let message = MessageDocument::new(&self.ipfs, keypair, message, keystore.as_ref()).await?;
let message = MessageDocument::new(keypair, message, keystore.as_ref())?;

let channel = match self.document.channels.get_mut(&channel_id.to_string()) {
Some(c) => c,
Expand Down Expand Up @@ -3072,7 +3072,7 @@ impl CommunityTask {

let keystore = pubkey_or_keystore(&*self)?;

let message = MessageDocument::new(&self.ipfs, keypair, message, keystore.as_ref()).await?;
let message = MessageDocument::new(keypair, message, keystore.as_ref())?;

let message_id = message.id;

Expand Down Expand Up @@ -3486,7 +3486,6 @@ impl CommunityTask {
let keystore = pubkey_or_keystore(&*self)?;

let stream = AttachmentStream::new(
&self.ipfs,
self.root.keypair(),
&self.identity.did_key(),
&self.file,
Expand Down
7 changes: 3 additions & 4 deletions extensions/warp-ipfs/src/store/message/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,7 @@ impl ConversationTask {
let message_id = message.id();
let keystore = pubkey_or_keystore(&*self)?;

let message = MessageDocument::new(&self.ipfs, keypair, message, keystore.as_ref()).await?;
let message = MessageDocument::new(keypair, message, keystore.as_ref())?;

let _message_cid = self
.document
Expand Down Expand Up @@ -1525,7 +1525,7 @@ impl ConversationTask {

let keystore = pubkey_or_keystore(&*self)?;

let message = MessageDocument::new(&self.ipfs, keypair, message, keystore.as_ref()).await?;
let message = MessageDocument::new(keypair, message, keystore.as_ref())?;

let message_id = message.id;

Expand Down Expand Up @@ -2225,7 +2225,7 @@ impl ConversationTask {
});
}

let document = FileDocument::new(&self.ipfs, &file).await?;
let document = FileDocument::new(&file);
let cid = document
.reference
.as_ref()
Expand Down Expand Up @@ -2448,7 +2448,6 @@ impl ConversationTask {
let keystore = pubkey_or_keystore(&*self)?;

let stream = AttachmentStream::new(
&self.ipfs,
self.root.keypair(),
&self.identity.did_key(),
&self.file,
Expand Down

0 comments on commit 37bb6e3

Please sign in to comment.