Skip to content
Open
66 changes: 66 additions & 0 deletions STREAMING_DESIGN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Client-Driven Audio Streaming Design

## Overview
Complete end-to-end analysis of client-driven audio streaming with buffer management and interactive controls.

## Architecture

```
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Audio Client β”‚ β”‚ Audio Server β”‚
β”‚ β”‚ β”‚ β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚ β”‚ Buffer β”‚ β”‚ β”‚ β”‚ Audio Data β”‚ β”‚
β”‚ β”‚ Management β”‚ │◄──►│ β”‚ Cache β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚ β”‚ β”‚ β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚ β”‚ Interactive β”‚ β”‚ β”‚ β”‚ Request β”‚ β”‚
β”‚ β”‚ Controls β”‚ β”‚ β”‚ β”‚ Handlers β”‚ β”‚
β”‚ β”‚ (SPACE/q) β”‚ β”‚ β”‚ β”‚ β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
```

## Protocol Design

### Option 1: String Constants (Recommended)
```rust
pub const AUDIO_GET_INFO: &'static str = "audio.get_info";
pub const AUDIO_REQUEST_CHUNK: &'static str = "audio.request_chunk";
```

### Option 2: Use Current Enum Approach
```rust
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)]
pub enum AudioProtocol {
GetInfo,
RequestChunk,
}
```

**Decision**: Start with enum approach (current fastn-p2p API), migrate to string constants later.

## Data Flow

1. **Client connects** β†’ Server loads audio file (once)
2. **Client requests info** β†’ Server returns metadata (duration, chunks, format)
3. **Client starts buffer loop** β†’ Requests chunks when buffer < 3s
4. **Client starts playback loop** β†’ Plays chunks from buffer
5. **Client interactive loop** β†’ SPACE pauses (stops requesting), resumes (starts requesting)

## Performance Targets

- **Buffer target**: 3 seconds of audio
- **Chunk size**: 256KB (~3 seconds of audio per chunk)
- **Request frequency**: Only when buffer < target
- **Pause mechanism**: Stop requesting chunks, drain buffer
- **Resume mechanism**: Start requesting chunks again

## Module Structure

- `protocol.rs` - Request/response types
- `server.rs` - Audio server with chunk serving
- `client.rs` - Buffer management and P2P requests
- `ui.rs` - Interactive controls and audio playback
- `main.rs` - Entry point orchestration
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,7 @@ path = "src/multi_protocol.rs"
name = "audio_test"
path = "src/audio_test.rs"

[[bin]]
name = "media_stream_v2"
path = "src/media_stream_v2.rs"

120 changes: 120 additions & 0 deletions examples/src/audio_decoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
//! Shared audio decoding functionality

// Audio decoding error types
#[derive(Debug, thiserror::Error)]
pub enum AudioError {
#[error("Audio file not found: {0}")]
FileNotFound(String),
#[error("Audio decode error: {0}")]
DecodeError(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
}

// Decode audio file (OGG/MP3) to PCM data
pub async fn decode_audio_file(filename: &str) -> Result<(Vec<u8>, u32, u16), AudioError> {
let file_data = tokio::fs::read(filename).await
.map_err(|_| AudioError::FileNotFound(filename.to_string()))?;

// Use symphonia for OGG files
use symphonia::core::audio::{AudioBufferRef, Signal};
use symphonia::core::codecs::DecoderOptions;
use symphonia::core::formats::FormatOptions;
use symphonia::core::io::MediaSourceStream;
use symphonia::core::meta::MetadataOptions;
use symphonia::core::probe::Hint;

let file_data_owned = file_data.to_vec();
let cursor = std::io::Cursor::new(file_data_owned);
let mss = MediaSourceStream::new(Box::new(cursor), Default::default());

let hint = Hint::new();
let meta_opts = MetadataOptions::default();
let fmt_opts = FormatOptions::default();

let probed = symphonia::default::get_probe()
.format(&hint, mss, &fmt_opts, &meta_opts)
.map_err(|e| AudioError::DecodeError(format!("Format probe failed: {:?}", e)))?;

let mut format = probed.format;
let track = format
.tracks()
.iter()
.find(|t| t.codec_params.codec != symphonia::core::codecs::CODEC_TYPE_NULL)
.ok_or_else(|| AudioError::DecodeError("No supported audio tracks found".to_string()))?;

let dec_opts = DecoderOptions::default();
let mut decoder = symphonia::default::get_codecs()
.make(&track.codec_params, &dec_opts)
.map_err(|e| AudioError::DecodeError(format!("Decoder creation failed: {:?}", e)))?;

let track_id = track.id;
let mut pcm_data = Vec::new();
let mut sample_rate = 44100;
let mut channels = 2;

// Decode with proper stereo interleaving
loop {
let packet = match format.next_packet() {
Ok(packet) => packet,
Err(symphonia::core::errors::Error::IoError(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
break;
}
Err(e) => {
return Err(AudioError::DecodeError(format!("Packet read error: {:?}", e)));
}
};

if packet.track_id() != track_id {
continue;
}

match decoder.decode(&packet) {
Ok(decoded) => {
sample_rate = decoded.spec().rate;
channels = decoded.spec().channels.count() as u16;

// Proper stereo interleaving: [L,R,L,R,L,R...]
match decoded {
AudioBufferRef::F32(buf) => {
let channels_count = buf.spec().channels.count();
let frames = buf.frames();

for frame_idx in 0..frames {
for ch in 0..channels_count {
let sample = buf.chan(ch)[frame_idx];
let sample_i16 = (sample * 32767.0).clamp(-32767.0, 32767.0) as i16;
pcm_data.extend_from_slice(&sample_i16.to_le_bytes());
}
}
}
AudioBufferRef::S16(buf) => {
let channels_count = buf.spec().channels.count();
let frames = buf.frames();

for frame_idx in 0..frames {
for ch in 0..channels_count {
let sample = buf.chan(ch)[frame_idx];
pcm_data.extend_from_slice(&sample.to_le_bytes());
}
}
}
_ => {
return Err(AudioError::DecodeError("Unsupported audio format".to_string()));
}
}
}
Err(symphonia::core::errors::Error::IoError(_)) => break,
Err(symphonia::core::errors::Error::DecodeError(_)) => continue,
Err(e) => {
return Err(AudioError::DecodeError(format!("Decode error: {:?}", e)));
}
}
}

if pcm_data.is_empty() {
return Err(AudioError::DecodeError("No audio data decoded".to_string()));
}

Ok((pcm_data, sample_rate, channels))
}
3 changes: 3 additions & 0 deletions examples/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,6 @@ pub fn parse_cli() -> Result<ParsedMode, Box<dyn std::error::Error>> {
// Clean re-exports for examples
pub use ParsedMode::Client;
pub use ParsedMode::Server;

// Audio decoding module (shared between audio_test and streaming)
pub mod audio_decoder;
16 changes: 8 additions & 8 deletions examples/src/media_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async fn run_publisher(
}

fastn_p2p::listen(private_key)
.handle_streams(MediaProtocol::AudioStream, audio_file, audio_publisher_handler)
.handle_requests(MediaProtocol::AudioStream, audio_file, audio_request_handler)
.await?;

Ok(())
Expand Down Expand Up @@ -351,14 +351,14 @@ async fn run_subscriber(
Ok(())
}

// Audio publisher handler - streams audio chunks to subscriber
async fn audio_publisher_handler(
mut session: fastn_p2p::Session<MediaProtocol>,
_data: (),
// Global audio data cache to avoid re-decoding for each request
static AUDIO_CACHE: tokio::sync::OnceCell<(Vec<u8>, u32, u16, f64)> = tokio::sync::OnceCell::const_new();

// Audio request handler - responds to client chunk requests
async fn audio_request_handler(
request: StreamRequest,
audio_file: String,
) -> Result<(), MediaError> {
let handler_start = Instant::now();
println!("πŸ”Š New subscriber connected: {}", session.peer().id52());
) -> Result<StreamResponse, MediaError> {

// Read and decode audio file to get actual audio format
let decode_start = Instant::now();
Expand Down
128 changes: 128 additions & 0 deletions examples/src/media_stream_v2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
//! Clean Stream-Based Media Example
//!
//! Demonstrates the new clean streaming API:
//! - Stream provider trait (app implements)
//! - Clean client/server separation
//! - No embedded connection info in types

mod streaming;
use streaming::*;

#[fastn_p2p::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
match examples::parse_cli()? {
examples::Server {
private_key: _,
config,
} => {
let server_key = examples::get_or_create_persistent_key("media_stream_v2");
let audio_file = config.first().cloned().unwrap_or_else(||
"examples/assets/SerenataGranados.ogg".to_string()
);
run_server(server_key, audio_file).await
}
examples::Client { target, config: _ } => {
run_client(target).await
}
}
}

/// Run server with stream provider
async fn run_server(
private_key: fastn_p2p::SecretKey,
audio_file: String,
) -> Result<(), Box<dyn std::error::Error>> {
println!("🎡 Stream Server V2 starting...");
println!("🎧 Server listening on: {}", private_key.id52());
println!("");
println!("πŸš€ To connect from another machine, run:");
println!(" cargo run --bin media_stream_v2 -- client {}", private_key.id52());
println!("");

// Create stream provider
let provider = SimpleAudioProvider::new(audio_file).await?;

// Start server (TODO: Need to wire up handlers properly with fastn-p2p)
println!("πŸ“‘ Server ready to serve audio streams...");

// For now, just keep server alive
tokio::signal::ctrl_c().await?;
println!("πŸ‘‹ Server shutting down...");

Ok(())
}

/// Run client with clean stream access
async fn run_client(
target: fastn_p2p::PublicKey,
) -> Result<(), Box<dyn std::error::Error>> {
println!("🎧 Stream Client V2 connecting to: {}", target);

// Create stream client
let stream_client = StreamClient::new(target);

// Open the audio stream
let stream = stream_client.open_stream("audio_stream").await?;
println!("βœ… Opened stream: {} with {} tracks", stream.name, stream.list_tracks().len());

// Get audio track
let audio_track = stream.get_track("audio")
.ok_or("Audio track not found in stream")?;

println!("πŸ“Š Audio track: {} bytes", audio_track.size_bytes);

// Simple test: read first chunk
let chunk_size = 32768; // 32KB
let chunk_data = stream_client.read_track_range("audio_stream", "audio", 0, chunk_size).await?;
println!("πŸ“₯ Read first chunk: {} bytes", chunk_data.len());

// TODO: Add full playback loop with buffering
// TODO: Add interactive controls (SPACE pause/resume)
// TODO: Add audio decoding + rodio playback

Ok(())
}

/// Simple audio stream provider implementation
struct SimpleAudioProvider {
audio_file: String,
audio_data: Vec<u8>,
}

impl SimpleAudioProvider {
async fn new(audio_file: String) -> Result<Self, Box<dyn std::error::Error>> {
// TODO: Load and decode audio file using examples::audio_decoder
let (audio_data, _sample_rate, _channels) = examples::audio_decoder::decode_audio_file(&audio_file).await
.map_err(|e| format!("Failed to decode audio: {}", e))?;

Ok(Self {
audio_file,
audio_data,
})
}
}

impl StreamProvider for SimpleAudioProvider {
async fn resolve_stream(&self, stream_name: &str) -> Option<ServerStream> {
// TODO: If stream_name == "audio_stream", return stream with single audio track
if stream_name == "audio_stream" {
let mut stream = ServerStream::new(stream_name.to_string());
stream.add_track("audio".to_string(), self.audio_data.len() as u64);
Some(stream)
} else {
None
}
}

async fn read_track_range(&self, _stream_name: &str, _track_name: &str, start: u64, length: u64) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
// TODO: Check bounds (start + length <= audio_data.len())
let end = std::cmp::min(start + length, self.audio_data.len() as u64) as usize;
let start = start as usize;

if start >= self.audio_data.len() {
return Err("Start position out of bounds".into());
}

Ok(self.audio_data[start..end].to_vec())
}
}
Loading