From 6ff7c464e1950f80d3ad6038b98317f96d59fd66 Mon Sep 17 00:00:00 2001 From: Colin Marc Date: Wed, 26 Feb 2025 16:23:46 +0100 Subject: [PATCH 1/2] Add PulseAudio support This adds support for PulseAudio on hosts with a PA or PipeWire server (the latter via pipewire-pulse). Since the underlying client is async, some amount of bridging has to be done. --- Cargo.toml | 8 +- examples/beep.rs | 77 ++++--- examples/feedback.rs | 86 ++++---- src/error.rs | 3 + src/host/mod.rs | 10 + src/host/pulseaudio/mod.rs | 395 ++++++++++++++++++++++++++++++++++ src/host/pulseaudio/stream.rs | 216 +++++++++++++++++++ src/platform/mod.rs | 4 +- 8 files changed, 713 insertions(+), 86 deletions(-) create mode 100644 src/host/pulseaudio/mod.rs create mode 100644 src/host/pulseaudio/stream.rs diff --git a/Cargo.toml b/Cargo.toml index 2f80d858c..c1124f14c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,10 +10,8 @@ edition = "2021" rust-version = "1.70" [features] -asio = [ - "asio-sys", - "num-traits", -] # Only available on Windows. See README for setup instructions. +asio = ["asio-sys", "num-traits"] # Only available on Windows. See README for setup instructions. +pulseaudio = ["dep:pulseaudio", "dep:futures"] # Only available on some Unix platforms. # Deprecated, the `oboe` backend has been removed oboe-shared-stdcxx = [] @@ -53,6 +51,8 @@ alsa = "0.10" libc = "0.2" audio_thread_priority = { version = "0.34.0", optional = true } jack = { version = "0.13.0", optional = true } +pulseaudio = { version = "0.3", optional = true } +futures = { version = "0.3", optional = true } [target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies] mach2 = "0.5" # For access to mach_timebase type. diff --git a/examples/beep.rs b/examples/beep.rs index 2a6872320..2f81a6d09 100644 --- a/examples/beep.rs +++ b/examples/beep.rs @@ -1,7 +1,7 @@ use clap::Parser; use cpal::{ traits::{DeviceTrait, HostTrait, StreamTrait}, - FromSample, Sample, SizedSample, I24, + FromSample, HostUnavailable, Sample, SizedSample, I24, }; #[derive(Parser, Debug)] @@ -11,58 +11,57 @@ struct Opt { #[arg(short, long, default_value_t = String::from("default"))] device: String, - /// Use the JACK host - #[cfg(all( - any( - target_os = "linux", - target_os = "dragonfly", - target_os = "freebsd", - target_os = "netbsd" - ), - feature = "jack" - ))] - #[arg(short, long)] - #[allow(dead_code)] + /// Use the JACK host. Requires `--features jack`. + #[arg(long, default_value_t = false)] jack: bool, + + /// Use the PulseAudio host. Requires `--features pulseaudio`. + #[arg(long, default_value_t = false)] + pulseaudio: bool, } fn main() -> anyhow::Result<()> { let opt = Opt::parse(); - // Conditionally compile with jack if the feature is specified. - #[cfg(all( - any( - target_os = "linux", - target_os = "dragonfly", - target_os = "freebsd", - target_os = "netbsd" - ), - feature = "jack" + // Jack/PulseAudio support must be enabled at compile time, and is + // only available on some platforms. + #[allow(unused_mut, unused_assignments)] + let mut jack_host_id = Err(HostUnavailable); + #[allow(unused_mut, unused_assignments)] + let mut pulseaudio_host_id = Err(HostUnavailable); + + #[cfg(any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" ))] + { + #[cfg(feature = "jack")] + { + jack_host_id = Ok(cpal::HostId::Jack); + } + + #[cfg(feature = "pulseaudio")] + { + pulseaudio_host_id = Ok(cpal::HostId::PulseAudio); + } + } + // Manually check for flags. Can be passed through cargo with -- e.g. // cargo run --release --example beep --features jack -- --jack let host = if opt.jack { - cpal::host_from_id(cpal::available_hosts() - .into_iter() - .find(|id| *id == cpal::HostId::Jack) - .expect( - "make sure --features jack is specified. only works on OSes where jack is available", - )).expect("jack host unavailable") + jack_host_id + .and_then(cpal::host_from_id) + .expect("make sure `--features jack` is specified, and the platform is supported") + } else if opt.pulseaudio { + pulseaudio_host_id + .and_then(cpal::host_from_id) + .expect("make sure `--features pulseaudio` is specified, and the platform is supported") } else { cpal::default_host() }; - #[cfg(any( - not(any( - target_os = "linux", - target_os = "dragonfly", - target_os = "freebsd", - target_os = "netbsd" - )), - not(feature = "jack") - ))] - let host = cpal::default_host(); - let device = if opt.device == "default" { host.default_output_device() } else { diff --git a/examples/feedback.rs b/examples/feedback.rs index 23b7386e9..5204cc180 100644 --- a/examples/feedback.rs +++ b/examples/feedback.rs @@ -7,7 +7,10 @@ //! precisely synchronised. use clap::Parser; -use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +use cpal::{ + traits::{DeviceTrait, HostTrait, StreamTrait}, + HostUnavailable, +}; use ringbuf::{ traits::{Consumer, Producer, Split}, HeapRb, @@ -28,58 +31,57 @@ struct Opt { #[arg(short, long, value_name = "DELAY_MS", default_value_t = 150.0)] latency: f32, - /// Use the JACK host - #[cfg(all( - any( - target_os = "linux", - target_os = "dragonfly", - target_os = "freebsd", - target_os = "netbsd" - ), - feature = "jack" - ))] - #[arg(short, long)] - #[allow(dead_code)] + /// Use the JACK host. Requires `--features jack`. + #[arg(long, default_value_t = false)] jack: bool, + + /// Use the PulseAudio host. Requires `--features pulseaudio`. + #[arg(long, default_value_t = false)] + pulseaudio: bool, } fn main() -> anyhow::Result<()> { let opt = Opt::parse(); - // Conditionally compile with jack if the feature is specified. - #[cfg(all( - any( - target_os = "linux", - target_os = "dragonfly", - target_os = "freebsd", - target_os = "netbsd" - ), - feature = "jack" + // Jack/PulseAudio support must be enabled at compile time, and is + // only available on some platforms. + #[allow(unused_mut, unused_assignments)] + let mut jack_host_id = Err(HostUnavailable); + #[allow(unused_mut, unused_assignments)] + let mut pulseaudio_host_id = Err(HostUnavailable); + + #[cfg(any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" ))] + { + #[cfg(feature = "jack")] + { + jack_host_id = Ok(cpal::HostId::Jack); + } + + #[cfg(feature = "pulseaudio")] + { + pulseaudio_host_id = Ok(cpal::HostId::PulseAudio); + } + } + // Manually check for flags. Can be passed through cargo with -- e.g. // cargo run --release --example beep --features jack -- --jack let host = if opt.jack { - cpal::host_from_id(cpal::available_hosts() - .into_iter() - .find(|id| *id == cpal::HostId::Jack) - .expect( - "make sure --features jack is specified. only works on OSes where jack is available", - )).expect("jack host unavailable") + jack_host_id + .and_then(cpal::host_from_id) + .expect("make sure `--features jack` is specified, and the platform is supported") + } else if opt.pulseaudio { + pulseaudio_host_id + .and_then(cpal::host_from_id) + .expect("make sure `--features pulseaudio` is specified, and the platform is supported") } else { cpal::default_host() }; - #[cfg(any( - not(any( - target_os = "linux", - target_os = "dragonfly", - target_os = "freebsd", - target_os = "netbsd" - )), - not(feature = "jack") - ))] - let host = cpal::default_host(); - // Find devices. let input_device = if opt.input_device == "default" { host.default_input_device() @@ -160,9 +162,9 @@ fn main() -> anyhow::Result<()> { input_stream.play()?; output_stream.play()?; - // Run for 3 seconds before closing. - println!("Playing for 3 seconds... "); - std::thread::sleep(std::time::Duration::from_secs(3)); + // Run for 10 seconds before closing. + println!("Playing for 10 seconds... "); + std::thread::sleep(std::time::Duration::from_secs(10)); drop(input_stream); drop(output_stream); println!("Done!"); diff --git a/src/error.rs b/src/error.rs index 2fed3622b..f5c6a06c1 100644 --- a/src/error.rs +++ b/src/error.rs @@ -70,12 +70,15 @@ impl From for DevicesError { pub enum DeviceNameError { /// See the [`BackendSpecificError`] docs for more information about this error variant. BackendSpecific { err: BackendSpecificError }, + /// The name is not valid UTF-8. + InvalidUtf8, } impl Display for DeviceNameError { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { Self::BackendSpecific { err } => err.fmt(f), + Self::InvalidUtf8 => write!(f, "The name is not valid UTF-8"), } } } diff --git a/src/host/mod.rs b/src/host/mod.rs index 0c61a5910..da3bd5d23 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -24,6 +24,16 @@ pub(crate) mod emscripten; ))] pub(crate) mod jack; pub(crate) mod null; +#[cfg(all( + any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" + ), + feature = "pulseaudio" +))] +pub(crate) mod pulseaudio; #[cfg(windows)] pub(crate) mod wasapi; #[cfg(all(target_arch = "wasm32", feature = "wasm-bindgen"))] diff --git a/src/host/pulseaudio/mod.rs b/src/host/pulseaudio/mod.rs new file mode 100644 index 000000000..396e8cac6 --- /dev/null +++ b/src/host/pulseaudio/mod.rs @@ -0,0 +1,395 @@ +extern crate pulseaudio; + +use futures::executor::block_on; +use pulseaudio::protocol; + +mod stream; +use core::str; + +pub use stream::Stream; + +use crate::{ + traits::{DeviceTrait, HostTrait}, + BackendSpecificError, BuildStreamError, Data, DefaultStreamConfigError, DeviceNameError, + DevicesError, HostUnavailable, InputCallbackInfo, OutputCallbackInfo, SampleFormat, SampleRate, + StreamConfig, StreamError, SupportedBufferSize, SupportedStreamConfig, + SupportedStreamConfigRange, SupportedStreamConfigsError, +}; + +const PULSE_FORMATS: &[SampleFormat] = &[ + SampleFormat::U8, + SampleFormat::I16, + SampleFormat::I24, + SampleFormat::I32, + SampleFormat::F32, +]; + +impl TryFrom for SampleFormat { + type Error = (); + + fn try_from(spec: protocol::SampleFormat) -> Result { + match spec { + protocol::SampleFormat::U8 => Ok(SampleFormat::U8), + protocol::SampleFormat::S16Le | protocol::SampleFormat::S16Be => Ok(SampleFormat::I16), + protocol::SampleFormat::S24Le | protocol::SampleFormat::S24Be => Ok(SampleFormat::I24), + protocol::SampleFormat::S32Le | protocol::SampleFormat::S32Be => Ok(SampleFormat::I32), + protocol::SampleFormat::Float32Le | protocol::SampleFormat::Float32Be => { + Ok(SampleFormat::F32) + } + _ => Err(()), + } + } +} + +impl TryFrom for protocol::SampleFormat { + type Error = (); + + fn try_from(format: SampleFormat) -> Result { + match (format, cfg!(target_endian = "little")) { + (SampleFormat::U8, _) => Ok(protocol::SampleFormat::U8), + (SampleFormat::I16, true) => Ok(protocol::SampleFormat::S16Le), + (SampleFormat::I16, false) => Ok(protocol::SampleFormat::S16Be), + (SampleFormat::I24, true) => Ok(protocol::SampleFormat::S24Le), + (SampleFormat::I24, false) => Ok(protocol::SampleFormat::S24Be), + (SampleFormat::I32, true) => Ok(protocol::SampleFormat::S32Le), + (SampleFormat::I32, false) => Ok(protocol::SampleFormat::S32Be), + (SampleFormat::F32, true) => Ok(protocol::SampleFormat::Float32Le), + (SampleFormat::F32, false) => Ok(protocol::SampleFormat::Float32Be), + _ => Err(()), + } + } +} + +impl From for BackendSpecificError { + fn from(err: pulseaudio::ClientError) -> Self { + BackendSpecificError { + description: err.to_string(), + } + } +} + +/// A Host for connecting to the popular PulseAudio and PipeWire (via +/// pipewire-pulse) audio servers on linux. +pub struct Host { + client: pulseaudio::Client, +} + +impl Host { + pub fn new() -> Result { + let client = + pulseaudio::Client::from_env(c"cpal-pulseaudio").map_err(|_| HostUnavailable)?; + + Ok(Self { client }) + } +} + +impl HostTrait for Host { + type Devices = std::vec::IntoIter; + type Device = Device; + + fn is_available() -> bool { + pulseaudio::socket_path_from_env().is_some() + } + + fn devices(&self) -> Result { + let sinks = block_on(self.client.list_sinks()).map_err(|_| BackendSpecificError { + description: "Failed to list sinks".to_owned(), + })?; + + let sources = block_on(self.client.list_sources()).map_err(|_| BackendSpecificError { + description: "Failed to list sources".to_owned(), + })?; + + Ok(sinks + .into_iter() + .map(|sink_info| Device::Sink { + client: self.client.clone(), + info: sink_info, + }) + .chain(sources.into_iter().map(|source_info| Device::Source { + client: self.client.clone(), + info: source_info, + })) + .collect::>() + .into_iter()) + } + + fn default_input_device(&self) -> Option { + let source_info = block_on( + self.client + .source_info_by_name(protocol::DEFAULT_SOURCE.to_owned()), + ) + .ok()?; + + Some(Device::Source { + client: self.client.clone(), + info: source_info, + }) + } + + fn default_output_device(&self) -> Option { + let sink_info = block_on( + self.client + .sink_info_by_name(protocol::DEFAULT_SINK.to_owned()), + ) + .ok()?; + + Some(Device::Sink { + client: self.client.clone(), + info: sink_info, + }) + } +} + +/// A PulseAudio sink or source. +#[derive(Debug, Clone)] +pub enum Device { + Sink { + client: pulseaudio::Client, + info: protocol::SinkInfo, + }, + Source { + client: pulseaudio::Client, + info: protocol::SourceInfo, + }, +} + +impl DeviceTrait for Device { + type SupportedInputConfigs = std::vec::IntoIter; + type SupportedOutputConfigs = std::vec::IntoIter; + type Stream = Stream; + + fn name(&self) -> Result { + let name = match self { + Device::Sink { info, .. } => &info.name, + Device::Source { info, .. } => &info.name, + }; + + match str::from_utf8(name.as_bytes()) { + Ok(name) => Ok(name.to_string()), + Err(_) => Err(DeviceNameError::InvalidUtf8), + } + } + + fn supported_input_configs( + &self, + ) -> Result { + let Device::Source { .. } = self else { + return Ok(vec![].into_iter()); + }; + + let mut ranges = vec![]; + for format in PULSE_FORMATS { + for channel_count in 1..protocol::sample_spec::MAX_CHANNELS { + ranges.push(SupportedStreamConfigRange { + channels: channel_count as _, + min_sample_rate: SampleRate(1), + max_sample_rate: SampleRate(protocol::sample_spec::MAX_RATE), + buffer_size: SupportedBufferSize::Range { + min: 0, + max: protocol::MAX_MEMBLOCKQ_LENGTH as _, + }, + sample_format: *format, + }) + } + } + + Ok(ranges.into_iter()) + } + + fn supported_output_configs( + &self, + ) -> Result { + let Device::Sink { .. } = self else { + return Ok(vec![].into_iter()); + }; + + let mut ranges = vec![]; + for format in PULSE_FORMATS { + for channel_count in 1..protocol::sample_spec::MAX_CHANNELS { + ranges.push(SupportedStreamConfigRange { + channels: channel_count as _, + min_sample_rate: SampleRate(1), + max_sample_rate: SampleRate(protocol::sample_spec::MAX_RATE), + buffer_size: SupportedBufferSize::Range { + min: 0, + max: protocol::MAX_MEMBLOCKQ_LENGTH as _, + }, + sample_format: *format, + }) + } + } + + Ok(ranges.into_iter()) + } + + fn default_input_config(&self) -> Result { + let Device::Source { info, .. } = self else { + return Err(DefaultStreamConfigError::StreamTypeNotSupported); + }; + + Ok(SupportedStreamConfig { + channels: info.channel_map.num_channels() as _, + sample_rate: SampleRate(info.sample_spec.sample_rate), + buffer_size: SupportedBufferSize::Range { + min: 0, + max: protocol::MAX_MEMBLOCKQ_LENGTH as _, + }, + sample_format: info + .sample_spec + .format + .try_into() + .unwrap_or(SampleFormat::F32), + }) + } + + fn default_output_config(&self) -> Result { + let Device::Sink { info, .. } = self else { + return Err(DefaultStreamConfigError::StreamTypeNotSupported); + }; + + Ok(SupportedStreamConfig { + channels: info.channel_map.num_channels() as _, + sample_rate: SampleRate(info.sample_spec.sample_rate), + buffer_size: SupportedBufferSize::Range { + min: 0, + max: protocol::MAX_MEMBLOCKQ_LENGTH as _, + }, + sample_format: info + .sample_spec + .format + .try_into() + .unwrap_or(SampleFormat::F32), + }) + } + + fn build_input_stream_raw( + &self, + config: &StreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, + _timeout: Option, + ) -> Result + where + D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + let Device::Source { client, info } = self else { + return Err(BuildStreamError::StreamConfigNotSupported); + }; + + let format: protocol::SampleFormat = sample_format + .try_into() + .map_err(|_| BuildStreamError::StreamConfigNotSupported)?; + + let sample_spec = make_sample_spec(config, format); + let channel_map = make_channel_map(config); + let buffer_attr = make_buffer_attr(config, format); + + let params = protocol::RecordStreamParams { + sample_spec, + channel_map, + source_index: Some(info.index), + buffer_attr, + flags: protocol::stream::StreamFlags { + // Start the stream suspended. + start_corked: true, + ..Default::default() + }, + ..Default::default() + }; + + stream::Stream::new_record( + client.clone(), + params, + sample_format, + data_callback, + error_callback, + ) + } + + fn build_output_stream_raw( + &self, + config: &StreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, + _timeout: Option, + ) -> Result + where + D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + let Device::Sink { client, info } = self else { + return Err(BuildStreamError::StreamConfigNotSupported); + }; + + let format: protocol::SampleFormat = sample_format + .try_into() + .map_err(|_| BuildStreamError::StreamConfigNotSupported)?; + + let sample_spec = make_sample_spec(config, format); + let channel_map = make_channel_map(config); + let buffer_attr = make_buffer_attr(config, format); + + let params = protocol::PlaybackStreamParams { + sink_index: Some(info.index), + sample_spec, + channel_map, + buffer_attr, + flags: protocol::stream::StreamFlags { + // Start the stream suspended. + start_corked: true, + ..Default::default() + }, + ..Default::default() + }; + + stream::Stream::new_playback( + client.clone(), + params, + sample_format, + data_callback, + error_callback, + ) + } +} + +fn make_sample_spec(config: &StreamConfig, format: protocol::SampleFormat) -> protocol::SampleSpec { + protocol::SampleSpec { + format, + sample_rate: config.sample_rate.0, + channels: config.channels as _, + } +} + +fn make_channel_map(config: &StreamConfig) -> protocol::ChannelMap { + if config.channels == 2 { + return protocol::ChannelMap::stereo(); + } + + let mut map = protocol::ChannelMap::empty(); + for _ in 0..config.channels { + map.push(protocol::ChannelPosition::Mono); + } + + map +} + +fn make_buffer_attr( + config: &StreamConfig, + format: protocol::SampleFormat, +) -> protocol::stream::BufferAttr { + match config.buffer_size { + crate::BufferSize::Default => Default::default(), + crate::BufferSize::Fixed(frame_count) => { + let len = frame_count * config.channels as u32 * format.bytes_per_sample() as u32; + protocol::stream::BufferAttr { + max_length: len, + target_length: len, + ..Default::default() + } + } + } +} diff --git a/src/host/pulseaudio/stream.rs b/src/host/pulseaudio/stream.rs new file mode 100644 index 000000000..44898a5bf --- /dev/null +++ b/src/host/pulseaudio/stream.rs @@ -0,0 +1,216 @@ +use std::{ + sync::{ + atomic::{self, AtomicU64}, + Arc, + }, + time::{self, SystemTime}, +}; + +use futures::executor::block_on; +use pulseaudio::{protocol, AsPlaybackSource}; + +use crate::{ + traits::StreamTrait, BackendSpecificError, BuildStreamError, Data, InputCallbackInfo, + InputStreamTimestamp, OutputCallbackInfo, OutputStreamTimestamp, PlayStreamError, SampleFormat, + StreamError, StreamInstant, +}; + +pub enum Stream { + Playback(pulseaudio::PlaybackStream), + Record(pulseaudio::RecordStream), +} + +impl StreamTrait for Stream { + fn play(&self) -> Result<(), PlayStreamError> { + match self { + Stream::Playback(stream) => { + block_on(stream.uncork()).map_err(Into::::into)?; + } + Stream::Record(stream) => { + block_on(stream.uncork()).map_err(Into::::into)?; + block_on(stream.started()).map_err(Into::::into)?; + } + }; + + Ok(()) + } + + fn pause(&self) -> Result<(), crate::PauseStreamError> { + let res = match self { + Stream::Playback(stream) => block_on(stream.cork()), + Stream::Record(stream) => block_on(stream.cork()), + }; + + res.map_err(Into::::into)?; + Ok(()) + } +} + +impl Stream { + pub fn new_playback( + client: pulseaudio::Client, + params: protocol::PlaybackStreamParams, + sample_format: SampleFormat, + mut data_callback: D, + _error_callback: E, + ) -> Result + where + D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + let epoch = std::time::SystemTime::now(); + + let current_latency_micros = Arc::new(AtomicU64::new(0)); + let latency_clone = current_latency_micros.clone(); + let sample_spec = params.sample_spec.clone(); + + // Wrap the write callback to match the pulseaudio signature. + let callback = move |buf: &mut [u8]| { + let now = SystemTime::now().duration_since(epoch).unwrap_or_default(); + let latency = latency_clone.load(atomic::Ordering::Relaxed); + let playback_time = now + time::Duration::from_micros(latency as u64); + + let timestamp = OutputStreamTimestamp { + callback: StreamInstant { + secs: now.as_secs() as i64, + nanos: now.subsec_nanos(), + }, + playback: StreamInstant { + secs: playback_time.as_secs() as i64, + nanos: playback_time.subsec_nanos(), + }, + }; + + let bps = sample_spec.format.bytes_per_sample(); + let n_samples = buf.len() / bps; + let mut data = + unsafe { Data::from_parts(buf.as_mut_ptr().cast(), n_samples, sample_format) }; + + data_callback(&mut data, &OutputCallbackInfo { timestamp }); + + // We always consider the full buffer filled, because cpal's + // user-facing api doesn't allow for short writes. + // TODO: should we preemptively zero the output buffer before + // passing it to the user? + n_samples * bps + }; + + let stream = block_on(client.create_playback_stream(params, callback.as_playback_source())) + .map_err(Into::::into)?; + + // Spawn a thread to drive the stream future. + let stream_clone = stream.clone(); + let _worker_thread = std::thread::spawn(move || block_on(stream_clone.play_all())); + + // Spawn a thread to monitor the stream's latency in a loop. + let stream_clone = stream.clone(); + let latency_clone = current_latency_micros.clone(); + std::thread::spawn(move || loop { + let Ok(timing_info) = block_on(stream_clone.timing_info()) else { + break; + }; + + store_latency( + &latency_clone, + sample_spec, + timing_info.sink_usec, + timing_info.write_offset, + timing_info.read_offset, + ); + + std::thread::sleep(time::Duration::from_millis(100)); + }); + + Ok(Self::Playback(stream)) + } + + pub fn new_record( + client: pulseaudio::Client, + params: protocol::RecordStreamParams, + sample_format: SampleFormat, + mut data_callback: D, + _error_callback: E, + ) -> Result + where + D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + let epoch = std::time::SystemTime::now(); + + let current_latency_micros = Arc::new(AtomicU64::new(0)); + let latency_clone = current_latency_micros.clone(); + let sample_spec = params.sample_spec.clone(); + + let callback = move |buf: &[u8]| { + let now = SystemTime::now().duration_since(epoch).unwrap_or_default(); + let latency = latency_clone.load(atomic::Ordering::Relaxed); + let capture_time = now + .checked_sub(time::Duration::from_micros(latency as u64)) + .unwrap_or_default(); + + let timestamp = InputStreamTimestamp { + callback: StreamInstant { + secs: now.as_secs() as i64, + nanos: now.subsec_nanos(), + }, + capture: StreamInstant { + secs: capture_time.as_secs() as i64, + nanos: capture_time.subsec_nanos(), + }, + }; + + let bps = sample_spec.format.bytes_per_sample(); + let n_samples = buf.len() / bps; + let data = + unsafe { Data::from_parts(buf.as_ptr() as *mut _, n_samples, sample_format) }; + + data_callback(&data, &InputCallbackInfo { timestamp }); + }; + + let stream = block_on(client.create_record_stream(params, callback)) + .map_err(Into::::into)?; + + // Spawn a thread to monitor the stream's latency in a loop. + let stream_clone = stream.clone(); + let latency_clone = current_latency_micros.clone(); + std::thread::spawn(move || loop { + let Ok(timing_info) = block_on(stream_clone.timing_info()) else { + break; + }; + + store_latency( + &latency_clone, + sample_spec, + timing_info.sink_usec, + timing_info.write_offset, + timing_info.read_offset, + ); + + std::thread::sleep(time::Duration::from_millis(100)); + }); + + Ok(Self::Record(stream)) + } +} + +fn store_latency( + latency_micros: &AtomicU64, + sample_spec: protocol::SampleSpec, + device_latency_usec: u64, + write_offset: i64, + read_offset: i64, +) -> time::Duration { + let offset = (write_offset as u64) + .checked_sub(read_offset as u64) + .unwrap_or(0); + + let latency = time::Duration::from_micros(device_latency_usec) + + sample_spec.bytes_to_duration(offset as usize); + + latency_micros.store( + latency.as_micros().try_into().unwrap_or(u64::MAX), + atomic::Ordering::Relaxed, + ); + + latency +} diff --git a/src/platform/mod.rs b/src/platform/mod.rs index fd12eaaac..1c06f865c 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -593,7 +593,6 @@ macro_rules! impl_platform_host { }; } -// TODO: Add pulseaudio and jack here eventually. #[cfg(any( target_os = "linux", target_os = "dragonfly", @@ -604,8 +603,11 @@ mod platform_impl { pub use crate::host::alsa::Host as AlsaHost; #[cfg(feature = "jack")] pub use crate::host::jack::Host as JackHost; + #[cfg(feature = "pulseaudio")] + pub use crate::host::pulseaudio::Host as PulseAudioHost; impl_platform_host!( + #[cfg(feature = "pulseaudio")] PulseAudio => PulseAudioHost, #[cfg(feature = "jack")] Jack => JackHost, Alsa => AlsaHost, ); From 0297d4a530a558bf581b07d1d1ce4de32ed593a5 Mon Sep 17 00:00:00 2001 From: Colin Marc Date: Sat, 4 Oct 2025 15:16:44 +0200 Subject: [PATCH 2/2] fixup! Add PulseAudio support --- src/host/pulseaudio/stream.rs | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/src/host/pulseaudio/stream.rs b/src/host/pulseaudio/stream.rs index 44898a5bf..750ee68b3 100644 --- a/src/host/pulseaudio/stream.rs +++ b/src/host/pulseaudio/stream.rs @@ -83,6 +83,12 @@ impl Stream { let bps = sample_spec.format.bytes_per_sample(); let n_samples = buf.len() / bps; + + // SAFETY: We verify that: + // - buf.as_ptr() points to valid memory for at least n_samples * bytes_per_sample + // - n_samples is calculated from buf.len() / bytes_per_sample, ensuring validity + // - The buffer remains valid for the duration of the callback + // - sample_format matches the actual data layout in the buffer let mut data = unsafe { Data::from_parts(buf.as_mut_ptr().cast(), n_samples, sample_format) }; @@ -98,11 +104,13 @@ impl Stream { let stream = block_on(client.create_playback_stream(params, callback.as_playback_source())) .map_err(Into::::into)?; - // Spawn a thread to drive the stream future. + // Spawn a thread to drive the stream future. It will exit automatically + // when the stream is stopped by the user. let stream_clone = stream.clone(); let _worker_thread = std::thread::spawn(move || block_on(stream_clone.play_all())); - // Spawn a thread to monitor the stream's latency in a loop. + // Spawn a thread to monitor the stream's latency in a loop. It will + // exit automatically when the stream ends. let stream_clone = stream.clone(); let latency_clone = current_latency_micros.clone(); std::thread::spawn(move || loop { @@ -161,6 +169,12 @@ impl Stream { let bps = sample_spec.format.bytes_per_sample(); let n_samples = buf.len() / bps; + + // SAFETY: We verify that: + // - buf.as_ptr() points to valid memory for at least n_samples * bytes_per_sample + // - n_samples is calculated from buf.len() / bytes_per_sample, ensuring validity + // - The buffer remains valid for the duration of the callback + // - sample_format matches the actual data layout in the buffer let data = unsafe { Data::from_parts(buf.as_ptr() as *mut _, n_samples, sample_format) }; @@ -170,7 +184,8 @@ impl Stream { let stream = block_on(client.create_record_stream(params, callback)) .map_err(Into::::into)?; - // Spawn a thread to monitor the stream's latency in a loop. + // Spawn a thread to monitor the stream's latency in a loop. It will + // exit automatically when the stream ends. let stream_clone = stream.clone(); let latency_clone = current_latency_micros.clone(); std::thread::spawn(move || loop {