Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,38 @@ let result = fastn_p2p_client::call(
).await?;
```

### 4. Server Applications
### 4. Server Applications (Revolutionary!)

```rust
// Add to Cargo.toml: fastn-p2p = "0.1"
use fastn_p2p;

// Server uses full fastn-p2p crate with daemon utilities
let private_key = load_identity_key("alice").await?;
fastn_p2p::listen(private_key)
.handle_requests("Echo", echo_handler)
.await?;
// Modern multi-identity server with complete lifecycle
#[fastn_p2p::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
fastn_p2p::serve_all()
.protocol("mail.fastn.com", |p| p
.on_global_load(setup_shared_database)
.on_create(create_workspace)
.on_activate(start_mail_services)
.handle_requests("inbox.get-mails", get_mails_handler)
.handle_requests("send.send-mail", send_mail_handler)
.handle_requests("settings.add-forwarding", forwarding_handler)
.handle_streams("sync.imap-sync", imap_sync_handler)
.on_deactivate(stop_mail_services)
.on_delete(cleanup_workspace)
.on_global_unload(cleanup_shared_database)
)
.serve()
.await
}

async fn echo_handler(req: EchoRequest) -> Result<EchoResponse, EchoError> {
Ok(EchoResponse { echoed: format!("Echo: {}", req.message) })
fn create_workspace(ctx: BindingContext) -> impl Future<Output = Result<(), Error>> {
async move {
// Create mail storage dirs in ctx.protocol_dir for ctx.identity
println!("Creating workspace for {} ({})", ctx.identity.id52(), ctx.bind_alias);
Ok(())
}
}
```

Expand Down
1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ fastn-p2p-client = { path = "../fastn-p2p-client" }
fastn-p2p = { path = "../fastn-p2p" } # For server-side APIs
tokio.workspace = true
serde.workspace = true
serde_json.workspace = true
clap.workspace = true
thiserror.workspace = true
reqwest.workspace = true
Expand Down
52 changes: 46 additions & 6 deletions examples/src/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.on_create(echo_create_handler)
.on_activate(echo_activate_handler)
.on_check(echo_check_handler)
.handle_requests("basic-echo", fastn_p2p::echo_request_handler)
.handle_requests("basic-echo", echo_request_handler)
.on_reload(echo_reload_handler)
.on_deactivate(echo_deactivate_handler)
)
Expand All @@ -53,7 +53,7 @@ use std::pin::Pin;
use std::future::Future;

fn echo_create_handler(
ctx: fastn_p2p::server::BindingContext,
ctx: fastn_p2p::BindingContext,
) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send>> {
Box::pin(async move {
println!("πŸ”§ Echo create: {} {} ({})", ctx.identity.id52(), ctx.bind_alias, ctx.protocol_dir.display());
Expand All @@ -63,7 +63,7 @@ fn echo_create_handler(
}

fn echo_activate_handler(
ctx: fastn_p2p::server::BindingContext,
ctx: fastn_p2p::BindingContext,
) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send>> {
Box::pin(async move {
println!("πŸš€ Echo activate: {} {} ({})", ctx.identity.id52(), ctx.bind_alias, ctx.protocol_dir.display());
Expand All @@ -73,7 +73,7 @@ fn echo_activate_handler(
}

fn echo_check_handler(
ctx: fastn_p2p::server::BindingContext,
ctx: fastn_p2p::BindingContext,
) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send>> {
Box::pin(async move {
println!("πŸ” Echo check: {} {} ({})", ctx.identity.id52(), ctx.bind_alias, ctx.protocol_dir.display());
Expand All @@ -83,7 +83,7 @@ fn echo_check_handler(
}

fn echo_reload_handler(
ctx: fastn_p2p::server::BindingContext,
ctx: fastn_p2p::BindingContext,
) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send>> {
Box::pin(async move {
println!("πŸ”„ Echo reload: {} {} ({})", ctx.identity.id52(), ctx.bind_alias, ctx.protocol_dir.display());
Expand All @@ -93,11 +93,51 @@ fn echo_reload_handler(
}

fn echo_deactivate_handler(
ctx: fastn_p2p::server::BindingContext,
ctx: fastn_p2p::BindingContext,
) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send>> {
Box::pin(async move {
println!("πŸ›‘ Echo deactivate: {} {} ({})", ctx.identity.id52(), ctx.bind_alias, ctx.protocol_dir.display());
// TODO: Stop accepting requests, but preserve data
Ok(())
})
}

// Typed Echo request handler for this application
fn echo_request_handler(
ctx: fastn_p2p::RequestContext,
request: serde_json::Value,
) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>>> + Send>> {
Box::pin(async move {
println!("πŸ’¬ Echo handler called:");
println!(" Identity: {}", ctx.identity.id52());
println!(" Bind alias: {}", ctx.bind_alias);
println!(" Command: {}", ctx.command);
println!(" Protocol dir: {}", ctx.protocol_dir.display());
println!(" Args: {:?}", ctx.args);

// Parse typed request
let echo_request: EchoRequest = serde_json::from_value(request)
.map_err(|e| format!("Invalid EchoRequest: {}", e))?;

if echo_request.message.is_empty() {
return Err("Message cannot be empty".into());
}

println!(" Message: '{}'", echo_request.message);

// Create typed response with args support
let args_info = if ctx.args.is_empty() {
String::new()
} else {
format!(" [args: {}]", ctx.args.join(", "))
};

let echo_response = EchoResponse {
echoed: format!("Echo from {} ({}): {}{}", ctx.identity.id52(), ctx.command, echo_request.message, args_info)
};

let response = serde_json::to_value(echo_response)?;
println!("πŸ“€ Echo response: {}", response);
Ok(response)
})
}
94 changes: 63 additions & 31 deletions fastn-net/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,36 +83,19 @@
//! based on performance requirements.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
pub enum Protocol {
/// client can send this message to check if the connection is open / healthy.
/// Active built-in protocols
Ping,
/// client may not be using NTP, or may only have p2p access and no other internet access, in
/// which case it can ask for the time from the peers and try to create a consensus.
WhatTimeIsIt,
/// client wants to make an HTTP request to a device whose ID is specified. note that the exact
/// ip:port is not known to peers, they only the "device id" for the service. server will figure
/// out the ip:port from the device id.
Http,
HttpProxy,
/// if the client wants their traffic to route via this server, they can send this. for this to
/// work, the person owning the device must have created a SOCKS5 device, and allowed this peer
/// to access it.
Socks5,
Tcp,
// TODO: RTP/"RTCP" for audio video streaming

// Fastn-specific protocols for entity communication
/// Messages from Device to Account (sync requests, status reports, etc.)
DeviceToAccount,
/// Messages between Accounts (email, file sharing, automerge sync, etc.)
AccountToAccount,
/// Messages from Account to Device (commands, config updates, notifications, etc.)
AccountToDevice,
/// Control messages for Rig management (bring online/offline, set current, etc.)
RigControl,

/// Generic protocol for user-defined types
/// This allows users to define their own protocol types while maintaining
/// compatibility with the existing fastn-net infrastructure.

/// Revolutionary per-application protocols for serve_all() architecture
/// Protocol names like "mail.fastn.com", "echo.fastn.com", etc.
Application(String),

/// Legacy generic protocol (still used by existing fastn-p2p code)
Generic(serde_json::Value),
}

Expand All @@ -125,10 +108,7 @@ impl std::fmt::Display for Protocol {
Protocol::HttpProxy => write!(f, "HttpProxy"),
Protocol::Socks5 => write!(f, "Socks5"),
Protocol::Tcp => write!(f, "Tcp"),
Protocol::DeviceToAccount => write!(f, "DeviceToAccount"),
Protocol::AccountToAccount => write!(f, "AccountToAccount"),
Protocol::AccountToDevice => write!(f, "AccountToDevice"),
Protocol::RigControl => write!(f, "RigControl"),
Protocol::Application(name) => write!(f, "{}", name),
Protocol::Generic(value) => write!(f, "Generic({value})"),
}
}
Expand Down Expand Up @@ -165,21 +145,73 @@ mod tests {
/// See module documentation for detailed rationale.
pub const APNS_IDENTITY: &[u8] = b"/fastn/entity/0.1";

/// Protocol header with optional metadata.
/// Protocol header for both built-in and revolutionary serve_all() protocols.
///
/// Sent at the beginning of each bidirectional stream to identify
/// the protocol and provide any protocol-specific metadata.
#[derive(Debug)]
/// the protocol and provide routing information when needed.
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct ProtocolHeader {
/// Protocol identifier
pub protocol: Protocol,

/// Command within the protocol (for Application protocols only)
pub command: Option<String>,

/// Protocol binding alias (for Application protocols only)
pub bind_alias: Option<String>,

/// CLI arguments support (issue #13: stdargs)
pub args: Vec<String>,

/// Legacy compatibility for extra protocol data
pub extra: Option<String>,
}

impl From<Protocol> for ProtocolHeader {
fn from(protocol: Protocol) -> Self {
Self {
protocol,
command: None, // Built-in protocols don't need commands
bind_alias: None, // Built-in protocols don't need bind aliases
args: Vec::new(),
extra: None,
}
}
}

impl ProtocolHeader {
/// Create protocol header for serve_all() Application protocols
pub fn for_application(
protocol_name: String,
command: String,
bind_alias: String,
args: Vec<String>,
) -> Self {
Self {
protocol: Protocol::Application(protocol_name),
command: Some(command),
bind_alias: Some(bind_alias),
args,
extra: None,
}
}

/// Get routing information for serve_all() (returns None for built-in protocols)
pub fn serve_all_routing(&self) -> Option<(&str, &str, &str, &[String])> {
match &self.protocol {
Protocol::Application(protocol_name) => {
if let (Some(command), Some(bind_alias)) = (&self.command, &self.bind_alias) {
Some((protocol_name, command, bind_alias, &self.args))
} else {
None
}
}
_ => None, // Built-in protocols don't have serve_all routing
}
}

/// Check if this is a built-in protocol
pub fn is_builtin(&self) -> bool {
!matches!(self.protocol, Protocol::Application(_))
}
}
10 changes: 10 additions & 0 deletions fastn-p2p-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ pub enum DaemonRequest<T> {
bind_alias: String,
request: T,
},
#[serde(rename = "call_with_args")]
CallWithArgs {
from_identity: String,
to_peer: fastn_id52::PublicKey,
protocol: String,
command: String,
bind_alias: String,
args: Vec<String>,
request: T,
},
#[serde(rename = "stream")]
Stream {
from_identity: String,
Expand Down
86 changes: 86 additions & 0 deletions fastn-p2p/src/cli/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,92 @@ pub async fn call(
Ok(())
}

/// Make a request/response call with args support (issue #13)
pub async fn call_with_args(
fastn_home: PathBuf,
peer_id52: String,
protocol: String,
command: String,
bind_alias: String,
extra_args: Vec<String>,
as_identity: Option<String>,
) -> Result<(), Box<dyn std::error::Error>> {
// Check if daemon is running
let socket_path = fastn_home.join("control.sock");
if !socket_path.exists() {
return Err(format!("Daemon not running. Socket not found: {}. Start with: request_response run", socket_path.display()).into());
}

// Determine identity to send from
let from_identity = match as_identity {
Some(identity) => identity,
None => {
// TODO: Auto-detect identity if only one configured
"alice".to_string() // Hardcoded for testing
}
};

// Parse peer ID to PublicKey for type safety
let to_peer: fastn_id52::PublicKey = peer_id52.parse()
.map_err(|e| format!("Invalid peer ID '{}': {}", peer_id52, e))?;

// Read JSON request from stdin
let mut stdin_input = String::new();
io::stdin().read_to_string(&mut stdin_input)?;
let stdin_input = stdin_input.trim();

if stdin_input.is_empty() {
return Err("No JSON input provided on stdin".into());
}

// Parse JSON to validate it's valid
let request_json: serde_json::Value = serde_json::from_str(stdin_input)?;

// Enhanced DaemonRequest with command, bind_alias, and args
let daemon_request = fastn_p2p_client::DaemonRequest::CallWithArgs {
from_identity,
to_peer,
protocol,
command,
bind_alias,
args: extra_args,
request: request_json,
};

println!("πŸ“€ Sending enhanced P2P request with args support");

// Connect to daemon control socket directly
use tokio::net::UnixStream;
use tokio::io::{AsyncWriteExt, AsyncBufReadExt, BufReader};

let mut stream = UnixStream::connect(&socket_path).await
.map_err(|e| format!("Failed to connect to daemon: {}", e))?;

// Send request to daemon
let request_data = serde_json::to_string(&daemon_request)?;
stream.write_all(request_data.as_bytes()).await?;
stream.write_all(b"\n").await?;

println!("πŸ“‘ Enhanced request sent to daemon, reading response...");

// Read response from daemon
let (reader, _writer) = stream.into_split();
let mut buf_reader = BufReader::new(reader);
let mut response_line = String::new();

match buf_reader.read_line(&mut response_line).await {
Ok(0) => return Err("Daemon closed connection without response".into()),
Ok(_) => {
let response: serde_json::Value = serde_json::from_str(response_line.trim())?;
println!("πŸ“₯ Response from daemon:");
println!("{}", serde_json::to_string_pretty(&response)?);
}
Err(e) => return Err(format!("Failed to read daemon response: {}", e).into()),
}

Ok(())
}

/// Open a bidirectional stream to a peer via the daemon
pub async fn stream(
_fastn_home: PathBuf,
Expand Down
Loading