diff --git a/STREAMING_DESIGN.md b/STREAMING_DESIGN.md new file mode 100644 index 0000000..31cdf51 --- /dev/null +++ b/STREAMING_DESIGN.md @@ -0,0 +1,66 @@ +# Client-Driven Audio Streaming Design + +## Overview +Complete end-to-end analysis of client-driven audio streaming with buffer management and interactive controls. + +## Architecture + +``` +┌─────────────────┐ ┌─────────────────┐ +│ Audio Client │ │ Audio Server │ +│ │ │ │ +│ ┌─────────────┐ │ │ ┌─────────────┐ │ +│ │ Buffer │ │ │ │ Audio Data │ │ +│ │ Management │ │◄──►│ │ Cache │ │ +│ └─────────────┘ │ │ └─────────────┘ │ +│ │ │ │ +│ ┌─────────────┐ │ │ ┌─────────────┐ │ +│ │ Interactive │ │ │ │ Request │ │ +│ │ Controls │ │ │ │ Handlers │ │ +│ │ (SPACE/q) │ │ │ │ │ │ +│ └─────────────┘ │ │ └─────────────┘ │ +└─────────────────┘ └─────────────────┘ +``` + +## Protocol Design + +### Option 1: String Constants (Recommended) +```rust +pub const AUDIO_GET_INFO: &'static str = "audio.get_info"; +pub const AUDIO_REQUEST_CHUNK: &'static str = "audio.request_chunk"; +``` + +### Option 2: Use Current Enum Approach +```rust +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)] +pub enum AudioProtocol { + GetInfo, + RequestChunk, +} +``` + +**Decision**: Start with enum approach (current fastn-p2p API), migrate to string constants later. + +## Data Flow + +1. **Client connects** → Server loads audio file (once) +2. **Client requests info** → Server returns metadata (duration, chunks, format) +3. **Client starts buffer loop** → Requests chunks when buffer < 3s +4. **Client starts playback loop** → Plays chunks from buffer +5. **Client interactive loop** → SPACE pauses (stops requesting), resumes (starts requesting) + +## Performance Targets + +- **Buffer target**: 3 seconds of audio +- **Chunk size**: 256KB (~3 seconds of audio per chunk) +- **Request frequency**: Only when buffer < target +- **Pause mechanism**: Stop requesting chunks, drain buffer +- **Resume mechanism**: Start requesting chunks again + +## Module Structure + +- `protocol.rs` - Request/response types +- `server.rs` - Audio server with chunk serving +- `client.rs` - Buffer management and P2P requests +- `ui.rs` - Interactive controls and audio playback +- `main.rs` - Entry point orchestration \ No newline at end of file diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 279e325..58e62dd 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -57,3 +57,7 @@ path = "src/multi_protocol.rs" name = "audio_test" path = "src/audio_test.rs" +[[bin]] +name = "media_stream_v2" +path = "src/media_stream_v2.rs" + diff --git a/examples/src/audio_decoder.rs b/examples/src/audio_decoder.rs new file mode 100644 index 0000000..45204fb --- /dev/null +++ b/examples/src/audio_decoder.rs @@ -0,0 +1,120 @@ +//! Shared audio decoding functionality + +// Audio decoding error types +#[derive(Debug, thiserror::Error)] +pub enum AudioError { + #[error("Audio file not found: {0}")] + FileNotFound(String), + #[error("Audio decode error: {0}")] + DecodeError(String), + #[error("IO error: {0}")] + Io(#[from] std::io::Error), +} + +// Decode audio file (OGG/MP3) to PCM data +pub async fn decode_audio_file(filename: &str) -> Result<(Vec, u32, u16), AudioError> { + let file_data = tokio::fs::read(filename).await + .map_err(|_| AudioError::FileNotFound(filename.to_string()))?; + + // Use symphonia for OGG files + use symphonia::core::audio::{AudioBufferRef, Signal}; + use symphonia::core::codecs::DecoderOptions; + use symphonia::core::formats::FormatOptions; + use symphonia::core::io::MediaSourceStream; + use symphonia::core::meta::MetadataOptions; + use symphonia::core::probe::Hint; + + let file_data_owned = file_data.to_vec(); + let cursor = std::io::Cursor::new(file_data_owned); + let mss = MediaSourceStream::new(Box::new(cursor), Default::default()); + + let hint = Hint::new(); + let meta_opts = MetadataOptions::default(); + let fmt_opts = FormatOptions::default(); + + let probed = symphonia::default::get_probe() + .format(&hint, mss, &fmt_opts, &meta_opts) + .map_err(|e| AudioError::DecodeError(format!("Format probe failed: {:?}", e)))?; + + let mut format = probed.format; + let track = format + .tracks() + .iter() + .find(|t| t.codec_params.codec != symphonia::core::codecs::CODEC_TYPE_NULL) + .ok_or_else(|| AudioError::DecodeError("No supported audio tracks found".to_string()))?; + + let dec_opts = DecoderOptions::default(); + let mut decoder = symphonia::default::get_codecs() + .make(&track.codec_params, &dec_opts) + .map_err(|e| AudioError::DecodeError(format!("Decoder creation failed: {:?}", e)))?; + + let track_id = track.id; + let mut pcm_data = Vec::new(); + let mut sample_rate = 44100; + let mut channels = 2; + + // Decode with proper stereo interleaving + loop { + let packet = match format.next_packet() { + Ok(packet) => packet, + Err(symphonia::core::errors::Error::IoError(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => { + break; + } + Err(e) => { + return Err(AudioError::DecodeError(format!("Packet read error: {:?}", e))); + } + }; + + if packet.track_id() != track_id { + continue; + } + + match decoder.decode(&packet) { + Ok(decoded) => { + sample_rate = decoded.spec().rate; + channels = decoded.spec().channels.count() as u16; + + // Proper stereo interleaving: [L,R,L,R,L,R...] + match decoded { + AudioBufferRef::F32(buf) => { + let channels_count = buf.spec().channels.count(); + let frames = buf.frames(); + + for frame_idx in 0..frames { + for ch in 0..channels_count { + let sample = buf.chan(ch)[frame_idx]; + let sample_i16 = (sample * 32767.0).clamp(-32767.0, 32767.0) as i16; + pcm_data.extend_from_slice(&sample_i16.to_le_bytes()); + } + } + } + AudioBufferRef::S16(buf) => { + let channels_count = buf.spec().channels.count(); + let frames = buf.frames(); + + for frame_idx in 0..frames { + for ch in 0..channels_count { + let sample = buf.chan(ch)[frame_idx]; + pcm_data.extend_from_slice(&sample.to_le_bytes()); + } + } + } + _ => { + return Err(AudioError::DecodeError("Unsupported audio format".to_string())); + } + } + } + Err(symphonia::core::errors::Error::IoError(_)) => break, + Err(symphonia::core::errors::Error::DecodeError(_)) => continue, + Err(e) => { + return Err(AudioError::DecodeError(format!("Decode error: {:?}", e))); + } + } + } + + if pcm_data.is_empty() { + return Err(AudioError::DecodeError("No audio data decoded".to_string())); + } + + Ok((pcm_data, sample_rate, channels)) +} \ No newline at end of file diff --git a/examples/src/lib.rs b/examples/src/lib.rs index 8fdb37c..17d0068 100644 --- a/examples/src/lib.rs +++ b/examples/src/lib.rs @@ -126,3 +126,6 @@ pub fn parse_cli() -> Result> { // Clean re-exports for examples pub use ParsedMode::Client; pub use ParsedMode::Server; + +// Audio decoding module (shared between audio_test and streaming) +pub mod audio_decoder; diff --git a/examples/src/media_stream.rs b/examples/src/media_stream.rs index e0a51f1..9547ba9 100644 --- a/examples/src/media_stream.rs +++ b/examples/src/media_stream.rs @@ -134,7 +134,7 @@ async fn run_publisher( } fastn_p2p::listen(private_key) - .handle_streams(MediaProtocol::AudioStream, audio_file, audio_publisher_handler) + .handle_requests(MediaProtocol::AudioStream, audio_file, audio_request_handler) .await?; Ok(()) @@ -351,14 +351,14 @@ async fn run_subscriber( Ok(()) } -// Audio publisher handler - streams audio chunks to subscriber -async fn audio_publisher_handler( - mut session: fastn_p2p::Session, - _data: (), +// Global audio data cache to avoid re-decoding for each request +static AUDIO_CACHE: tokio::sync::OnceCell<(Vec, u32, u16, f64)> = tokio::sync::OnceCell::const_new(); + +// Audio request handler - responds to client chunk requests +async fn audio_request_handler( + request: StreamRequest, audio_file: String, -) -> Result<(), MediaError> { - let handler_start = Instant::now(); - println!("🔊 New subscriber connected: {}", session.peer().id52()); +) -> Result { // Read and decode audio file to get actual audio format let decode_start = Instant::now(); diff --git a/examples/src/media_stream_v2.rs b/examples/src/media_stream_v2.rs new file mode 100644 index 0000000..4b7559a --- /dev/null +++ b/examples/src/media_stream_v2.rs @@ -0,0 +1,128 @@ +//! Clean Stream-Based Media Example +//! +//! Demonstrates the new clean streaming API: +//! - Stream provider trait (app implements) +//! - Clean client/server separation +//! - No embedded connection info in types + +mod streaming; +use streaming::*; + +#[fastn_p2p::main] +async fn main() -> Result<(), Box> { + match examples::parse_cli()? { + examples::Server { + private_key: _, + config, + } => { + let server_key = examples::get_or_create_persistent_key("media_stream_v2"); + let audio_file = config.first().cloned().unwrap_or_else(|| + "examples/assets/SerenataGranados.ogg".to_string() + ); + run_server(server_key, audio_file).await + } + examples::Client { target, config: _ } => { + run_client(target).await + } + } +} + +/// Run server with stream provider +async fn run_server( + private_key: fastn_p2p::SecretKey, + audio_file: String, +) -> Result<(), Box> { + println!("🎵 Stream Server V2 starting..."); + println!("🎧 Server listening on: {}", private_key.id52()); + println!(""); + println!("🚀 To connect from another machine, run:"); + println!(" cargo run --bin media_stream_v2 -- client {}", private_key.id52()); + println!(""); + + // Create stream provider + let provider = SimpleAudioProvider::new(audio_file).await?; + + // Start server (TODO: Need to wire up handlers properly with fastn-p2p) + println!("📡 Server ready to serve audio streams..."); + + // For now, just keep server alive + tokio::signal::ctrl_c().await?; + println!("👋 Server shutting down..."); + + Ok(()) +} + +/// Run client with clean stream access +async fn run_client( + target: fastn_p2p::PublicKey, +) -> Result<(), Box> { + println!("🎧 Stream Client V2 connecting to: {}", target); + + // Create stream client + let stream_client = StreamClient::new(target); + + // Open the audio stream + let stream = stream_client.open_stream("audio_stream").await?; + println!("✅ Opened stream: {} with {} tracks", stream.name, stream.list_tracks().len()); + + // Get audio track + let audio_track = stream.get_track("audio") + .ok_or("Audio track not found in stream")?; + + println!("📊 Audio track: {} bytes", audio_track.size_bytes); + + // Simple test: read first chunk + let chunk_size = 32768; // 32KB + let chunk_data = stream_client.read_track_range("audio_stream", "audio", 0, chunk_size).await?; + println!("📥 Read first chunk: {} bytes", chunk_data.len()); + + // TODO: Add full playback loop with buffering + // TODO: Add interactive controls (SPACE pause/resume) + // TODO: Add audio decoding + rodio playback + + Ok(()) +} + +/// Simple audio stream provider implementation +struct SimpleAudioProvider { + audio_file: String, + audio_data: Vec, +} + +impl SimpleAudioProvider { + async fn new(audio_file: String) -> Result> { + // TODO: Load and decode audio file using examples::audio_decoder + let (audio_data, _sample_rate, _channels) = examples::audio_decoder::decode_audio_file(&audio_file).await + .map_err(|e| format!("Failed to decode audio: {}", e))?; + + Ok(Self { + audio_file, + audio_data, + }) + } +} + +impl StreamProvider for SimpleAudioProvider { + async fn resolve_stream(&self, stream_name: &str) -> Option { + // TODO: If stream_name == "audio_stream", return stream with single audio track + if stream_name == "audio_stream" { + let mut stream = ServerStream::new(stream_name.to_string()); + stream.add_track("audio".to_string(), self.audio_data.len() as u64); + Some(stream) + } else { + None + } + } + + async fn read_track_range(&self, _stream_name: &str, _track_name: &str, start: u64, length: u64) -> Result, Box> { + // TODO: Check bounds (start + length <= audio_data.len()) + let end = std::cmp::min(start + length, self.audio_data.len() as u64) as usize; + let start = start as usize; + + if start >= self.audio_data.len() { + return Err("Start position out of bounds".into()); + } + + Ok(self.audio_data[start..end].to_vec()) + } +} \ No newline at end of file diff --git a/examples/src/streaming/client.rs b/examples/src/streaming/client.rs new file mode 100644 index 0000000..a5771b7 --- /dev/null +++ b/examples/src/streaming/client.rs @@ -0,0 +1,101 @@ +//! Client-side streaming types + +use super::protocol::*; + +/// Client-side stream - no connection info embedded +#[derive(Debug)] +pub struct ClientStream { + pub name: String, + pub tracks: std::collections::HashMap, +} + +/// Client-side track - clean without connection details +#[derive(Debug)] +pub struct ClientTrack { + pub name: String, + pub size_bytes: u64, +} + +impl ClientStream { + /// Create client stream from server response + pub fn from_response(response: GetStreamResponse) -> Self { + // TODO: Convert GetStreamResponse to ClientStream + // TODO: Map TrackInfo to ClientTrack + let tracks = response.tracks.into_iter() + .map(|(name, track_info)| (name.clone(), ClientTrack { + name, + size_bytes: track_info.size_bytes, + })) + .collect(); + + Self { + name: response.name, + tracks, + } + } + + pub fn get_track(&self, track_name: &str) -> Option<&ClientTrack> { + // TODO: Return track from HashMap or None + self.tracks.get(track_name) + } + + pub fn list_tracks(&self) -> Vec { + // TODO: Return Vec of track names from HashMap keys + self.tracks.keys().cloned().collect() + } +} + +/// Stream client - handles P2P communication separately from stream data +pub struct StreamClient { + private_key: fastn_p2p::SecretKey, + server_id: fastn_p2p::PublicKey, +} + +impl StreamClient { + pub fn new(server_id: fastn_p2p::PublicKey) -> Self { + // TODO: Generate private_key = fastn_p2p::SecretKey::generate() + Self { + private_key: fastn_p2p::SecretKey::generate(), + server_id, + } + } + + /// Open stream by name + pub async fn open_stream(&self, stream_name: &str) -> Result> { + // TODO: Call fastn_p2p::client::call() with GET_STREAM protocol + let response: GetStreamResponse = fastn_p2p::client::call( + self.private_key.clone(), + self.server_id, + StreamingProtocol::GetStream, + GetStreamRequest { + stream_name: stream_name.to_string(), + }, + ).await?; + + Ok(ClientStream::from_response(response)) + } + + /// Read range from specific track + pub async fn read_track_range( + &self, + stream_name: &str, + track_name: &str, + start: u64, + length: u64 + ) -> Result, Box> { + // TODO: Call fastn_p2p::client::call() with READ_TRACK_RANGE protocol + let response: ReadTrackRangeResponse = fastn_p2p::client::call( + self.private_key.clone(), + self.server_id, + StreamingProtocol::ReadTrackRange, + ReadTrackRangeRequest { + stream_name: stream_name.to_string(), + track_name: track_name.to_string(), + start, + length, + }, + ).await?; + + Ok(response.data) + } +} \ No newline at end of file diff --git a/examples/src/streaming/mod.rs b/examples/src/streaming/mod.rs new file mode 100644 index 0000000..867cf60 --- /dev/null +++ b/examples/src/streaming/mod.rs @@ -0,0 +1,17 @@ +//! Client-driven audio streaming module +//! +//! Clean separation of concerns: +//! - protocol: Message types and protocol definitions +//! - server: Audio server that responds to chunk requests +//! - client: Audio client with buffer management +//! - ui: Interactive controls and user interface + +pub mod protocol; +pub mod server; +pub mod client; +pub mod ui; + +// Re-export key types for convenience +pub use protocol::*; +pub use server::{StreamProvider, ServerStream, ServerTrack, handle_get_stream, handle_read_track_range}; +pub use client::{StreamClient, ClientStream, ClientTrack}; \ No newline at end of file diff --git a/examples/src/streaming/protocol.rs b/examples/src/streaming/protocol.rs new file mode 100644 index 0000000..62166f4 --- /dev/null +++ b/examples/src/streaming/protocol.rs @@ -0,0 +1,41 @@ +//! Clean streaming protocol types + +// Protocol enum for current fastn-p2p API +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)] +pub enum StreamingProtocol { + GetStream, + ReadTrackRange, +} + +// Get stream metadata +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct GetStreamRequest { + pub stream_name: String, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct GetStreamResponse { + pub name: String, + pub tracks: std::collections::HashMap, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct TrackInfo { + pub name: String, + pub size_bytes: u64, +} + +// Read track range +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct ReadTrackRangeRequest { + pub stream_name: String, + pub track_name: String, + pub start: u64, + pub length: u64, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct ReadTrackRangeResponse { + pub data: Vec, // Will be bytes::Bytes in future +} + diff --git a/examples/src/streaming/server.rs b/examples/src/streaming/server.rs new file mode 100644 index 0000000..877b7f3 --- /dev/null +++ b/examples/src/streaming/server.rs @@ -0,0 +1,85 @@ +//! Server-side stream provider + +use super::protocol::*; + +/// Stream provider trait - app implements this +pub trait StreamProvider: Send + Sync { + async fn resolve_stream(&self, stream_name: &str) -> Option; + async fn read_track_range(&self, stream_name: &str, track_name: &str, start: u64, length: u64) -> Result, Box>; +} + +/// Server-side stream metadata +#[derive(Debug, Clone)] +pub struct ServerStream { + pub name: String, + pub tracks: std::collections::HashMap, +} + +/// Server-side track metadata +#[derive(Debug, Clone)] +pub struct ServerTrack { + pub name: String, + pub size_bytes: u64, +} + +impl ServerStream { + pub fn new(name: String) -> Self { + // TODO: Initialize with name and empty tracks HashMap + Self { + name, + tracks: std::collections::HashMap::new(), + } + } + + pub fn add_track(&mut self, name: String, size_bytes: u64) { + // TODO: Insert ServerTrack into tracks HashMap + let track = ServerTrack { name: name.clone(), size_bytes }; + self.tracks.insert(name, track); + } +} + +/// Handle GET_STREAM protocol requests +pub async fn handle_get_stream( + request: GetStreamRequest, + provider: &T, +) -> Result> { + // TODO: Print "Client requested stream: {stream_name}" + println!("📊 Client requested stream: {}", request.stream_name); + + match provider.resolve_stream(&request.stream_name).await { + Some(server_stream) => { + let tracks = server_stream.tracks.into_iter() + .map(|(name, server_track)| (name.clone(), TrackInfo { + name, + size_bytes: server_track.size_bytes, + })) + .collect(); + + Ok(GetStreamResponse { + name: server_stream.name, + tracks, + }) + } + None => Err(format!("Stream '{}' not found", request.stream_name).into()) + } +} + +/// Handle READ_TRACK_RANGE protocol requests +pub async fn handle_read_track_range( + request: ReadTrackRangeRequest, + provider: &T, +) -> Result> { + // TODO: Print "Client reading {stream}.{track} range {start}..{start+length}" + println!("📦 Reading {}.{} range {}..{}", + request.stream_name, request.track_name, + request.start, request.start + request.length); + + let data = provider.read_track_range( + &request.stream_name, + &request.track_name, + request.start, + request.length, + ).await?; + + Ok(ReadTrackRangeResponse { data }) +} \ No newline at end of file diff --git a/examples/src/streaming/ui.rs b/examples/src/streaming/ui.rs new file mode 100644 index 0000000..84e119a --- /dev/null +++ b/examples/src/streaming/ui.rs @@ -0,0 +1,69 @@ +//! Interactive UI for audio streaming + +use super::client::AudioClient; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::Mutex; + +/// Interactive streaming UI with play/pause controls +pub struct StreamingUI { + client: Arc>, + sink: Arc, +} + +impl StreamingUI { + /// Create new UI with audio client + pub async fn new(client: AudioClient) -> Result> { + // TODO: Setup rodio::OutputStream::try_default() + // TODO: Create rodio::Sink::try_new() + // TODO: Print "Audio system ready" + // TODO: Print "Press SPACE to pause/resume, 'q' to quit" + // TODO: Wrap client in Arc> + // TODO: Wrap sink in Arc<> + // TODO: Return StreamingUI instance + todo!() + } + + /// Start all streaming tasks and interactive controls + pub async fn start_streaming(self) -> Result<(), Box> { + // TODO: Start chunk_fetcher_task() + // TODO: Start audio_player_task() + // TODO: Start interactive_controls_task() + // TODO: Main monitoring loop - wait for completion + // TODO: Print "Streaming completed!" + todo!() + } + + /// Background task: fetch chunks when buffer is low + async fn chunk_fetcher_task(client: Arc>) { + // TODO: Loop forever + // TODO: Check if client.needs_data() && next_chunk_id < total_chunks + // TODO: If needs data, call client.request_chunk(next_chunk_id) + // TODO: Print "Chunk {id} buffered ({duration}s buffered)" + // TODO: Increment next_chunk_id + // TODO: If buffer full or paused, sleep 100ms + // TODO: Break on end of stream + todo!() + } + + /// Background task: play audio chunks from buffer + async fn audio_player_task(client: Arc>, sink: Arc) { + // TODO: Loop forever + // TODO: Get chunk_data = client.get_audio_chunk() + // TODO: If got data, convert to i16 samples + // TODO: Create rodio::buffer::SamplesBuffer with client.sample_rate, client.channels + // TODO: Call sink.append(source) + // TODO: If no data, sleep 50ms + todo!() + } + + /// Background task: handle SPACE pause/resume, 'q' quit + async fn interactive_controls_task(client: Arc>, sink: Arc) { + // TODO: Setup termion::raw::IntoRawMode for immediate key response + // TODO: Loop reading single bytes from stdin + // TODO: On SPACE: toggle client.pause()/resume() and sink.pause()/play() + // TODO: Print "⏸️ Paused" or "▶️ Resumed" + // TODO: On 'q' or ESC: print "⏹️ Stopping..." and break + todo!() + } +} \ No newline at end of file