diff --git a/.github/workflows/builds.yml b/.github/workflows/builds.yml index 99acf50c4..919877efe 100644 --- a/.github/workflows/builds.yml +++ b/.github/workflows/builds.yml @@ -88,6 +88,8 @@ jobs: - name: Build (Android) if: ${{ contains(matrix.target, 'android') }} run: | + sudo apt install -y gcc-multilib + ln -sf $ANDROID_NDK_ROOT/toolchains/llvm/prebuilt/linux-x86_64/lib/${{ matrix.ndk_arch }}/{libunwind.so,libc++abi.a} $ANDROID_NDK_ROOT/toolchains/llvm/prebuilt/linux-x86_64/lib/ cargo install cargo-ndk cargo ndk --target ${{ matrix.target }} build --release -p livekit --workspace -vv diff --git a/Cargo.lock b/Cargo.lock index 36b112358..625bff3cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2980,16 +2980,15 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.10" +version = "0.7.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" +checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" dependencies = [ "bytes", "futures-core", "futures-sink", "pin-project-lite", "tokio", - "tracing", ] [[package]] diff --git a/livekit-ffi/generate_proto_win.bat b/livekit-ffi/generate_proto_win.bat index 303cf3056..2132641cd 100644 --- a/livekit-ffi/generate_proto_win.bat +++ b/livekit-ffi/generate_proto_win.bat @@ -30,4 +30,5 @@ protoc.exe ^ %PROTOCOL%/audio_frame.proto ^ %PROTOCOL%/e2ee.proto ^ %PROTOCOL%/stats.proto ^ - %PROTOCOL%/rpc.proto + %PROTOCOL%/rpc.proto ^ + %PROTOCOL%/data_stream.proto diff --git a/livekit-ffi/protocol/ffi.proto b/livekit-ffi/protocol/ffi.proto index 7e58123d4..c993ecadf 100644 --- a/livekit-ffi/protocol/ffi.proto +++ b/livekit-ffi/protocol/ffi.proto @@ -148,7 +148,9 @@ message FfiRequest { TextStreamWriterWriteRequest text_stream_write = 65; TextStreamWriterCloseRequest text_stream_close = 66; - // NEXT_ID: 67 + PublishMetricsRequest publish_metrics = 67; + + // NEXT_ID: 68 } } @@ -243,7 +245,9 @@ message FfiResponse { TextStreamWriterWriteResponse text_stream_write = 64; TextStreamWriterCloseResponse text_stream_close = 65; - // NEXT_ID: 66 + PublishMetricsResponse publish_metrics = 66; + + // NEXT_ID: 67 } } @@ -301,6 +305,17 @@ message FfiEvent { } } +message PublishMetricsRequest { + required uint64 local_participant_handle = 1; + required uint64 data_ptr = 2; + required uint64 data_len = 3; + optional uint64 async_id = 4; +} + +message PublishMetricsResponse { + required uint64 async_id = 1; +} + // Stop all rooms synchronously (Do we need async here?). // e.g: This is used for the Unity Editor after each assemblies reload. // TODO(theomonnom): Implement a debug mode where we can find all leaked handles? diff --git a/livekit-ffi/src/livekit.proto.rs b/livekit-ffi/src/livekit.proto.rs index be0e5c1bc..a832f7168 100644 --- a/livekit-ffi/src/livekit.proto.rs +++ b/livekit-ffi/src/livekit.proto.rs @@ -4804,7 +4804,7 @@ pub struct RpcMethodInvocationEvent { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiRequest { - #[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 48, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66")] + #[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 48, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiRequest`. @@ -4952,13 +4952,15 @@ pub mod ffi_request { TextStreamWrite(super::TextStreamWriterWriteRequest), #[prost(message, tag="66")] TextStreamClose(super::TextStreamWriterCloseRequest), + #[prost(message, tag="67")] + PublishMetrics(super::PublishMetricsRequest), } } /// This is the output of livekit_ffi_request function. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiResponse { - #[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 47, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65")] + #[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 47, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiResponse`. @@ -5104,6 +5106,8 @@ pub mod ffi_response { TextStreamWrite(super::TextStreamWriterWriteResponse), #[prost(message, tag="65")] TextStreamClose(super::TextStreamWriterCloseResponse), + #[prost(message, tag="66")] + PublishMetrics(super::PublishMetricsResponse), } } /// To minimize complexity, participant events are not included in the protocol. @@ -5202,6 +5206,24 @@ pub mod ffi_event { SendText(super::StreamSendTextCallback), } } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PublishMetricsRequest { + #[prost(uint64, required, tag="1")] + pub local_participant_handle: u64, + #[prost(uint64, required, tag="2")] + pub data_ptr: u64, + #[prost(uint64, required, tag="3")] + pub data_len: u64, + #[prost(uint64, optional, tag="4")] + pub async_id: ::core::option::Option, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PublishMetricsResponse { + #[prost(uint64, required, tag="1")] + pub async_id: u64, +} /// Stop all rooms synchronously (Do we need async here?). /// e.g: This is used for the Unity Editor after each assemblies reload. /// TODO(theomonnom): Implement a debug mode where we can find all leaked handles? diff --git a/livekit-ffi/src/server/requests.rs b/livekit-ffi/src/server/requests.rs index c45dba855..863ee647e 100644 --- a/livekit-ffi/src/server/requests.rs +++ b/livekit-ffi/src/server/requests.rs @@ -1161,6 +1161,16 @@ fn on_text_stream_close( writer.close(server, request) } +fn on_publish_metrics( + server: &'static FfiServer, + publish: proto::PublishMetricsRequest, +) -> FfiResult { + let ffi_participant = + server.retrieve_handle::(publish.local_participant_handle)?; + + ffi_participant.room.publish_metrics(server, publish) +} + #[allow(clippy::field_reassign_with_default)] // Avoid uggly format pub fn handle_request( server: &'static FfiServer, @@ -1414,6 +1424,12 @@ pub fn handle_request( on_set_track_subscription_permissions(server, request)?, ) } + proto::ffi_request::Message::PublishMetrics(publish_metrics) => { + proto::ffi_response::Message::PublishMetrics(on_publish_metrics( + server, + publish_metrics, + )?) + } }); Ok(res) diff --git a/livekit-ffi/src/server/room.rs b/livekit-ffi/src/server/room.rs index 7a2437280..a4d73ec9f 100644 --- a/livekit-ffi/src/server/room.rs +++ b/livekit-ffi/src/server/room.rs @@ -839,6 +839,35 @@ impl RoomInner { pub fn url(&self) -> String { self.url.clone() } + + pub fn publish_metrics( + self: &Arc, + server: &'static FfiServer, + publish: proto::PublishMetricsRequest, + ) -> FfiResult { + let async_id = publish.async_id.unwrap_or_else(|| server.next_id()); + + let data = unsafe { + slice::from_raw_parts(publish.data_ptr as *const u8, publish.data_len as usize) + } + .to_vec(); + + let self_clone = self.clone(); + let handle = server.async_runtime.spawn(async move { + if let Err(err) = self_clone.data_tx.send(FfiDataPacket { + payload: DataPacket { + payload: data.to_vec(), // Avoid copy? + ..Default::default() + }, + async_id, + }) { + log::error!("Failed to publish metrics: {:?}", err); + } + }); + server.watch_panic(handle); + + Ok(proto::PublishMetricsResponse { async_id }) + } } // Task used to publish data without blocking the client thread diff --git a/livekit/src/room/data_stream/outgoing.rs b/livekit/src/room/data_stream/outgoing.rs index 7984f4e32..180d53caf 100644 --- a/livekit/src/room/data_stream/outgoing.rs +++ b/livekit/src/room/data_stream/outgoing.rs @@ -74,7 +74,7 @@ impl<'a> StreamWriter<'a> for ByteStreamWriter { &self.info } - async fn write(&self, bytes: &[u8]) -> StreamResult<()> { + async fn write(&self, bytes: &'a [u8]) -> StreamResult<()> { let mut stream = self.stream.lock().await; for chunk in bytes.chunks(CHUNK_SIZE) { stream.write_chunk(chunk).await?; @@ -116,7 +116,7 @@ impl<'a> StreamWriter<'a> for TextStreamWriter { &self.info } - async fn write(&self, text: &str) -> StreamResult<()> { + async fn write(&self, text: &'a str) -> StreamResult<()> { let mut stream = self.stream.lock().await; for chunk in text.as_bytes().utf8_aware_chunks(CHUNK_SIZE) { stream.write_chunk(chunk).await?; diff --git a/livekit/src/room/metrics/mod.rs b/livekit/src/room/metrics/mod.rs new file mode 100644 index 000000000..12286a0c1 --- /dev/null +++ b/livekit/src/room/metrics/mod.rs @@ -0,0 +1,386 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::time::Duration; + +use livekit_protocol::{self as proto, data_packet}; +use proto::{DataPacket, MetricLabel, MetricSample, MetricsBatch, TimeSeriesMetric}; +use tokio::sync::broadcast; + +use crate::prelude::LocalTrackPublication; +use crate::room::{Room, RtcEngine}; +use libwebrtc::stats::{InboundRtpStats, MediaSourceStats, OutboundRtpStats, RtcStats}; + +use super::id::{ParticipantIdentity, TrackSid}; +use super::DataPacketKind; + +type GenericMetricLabel = (MetricLabel, T); + +const QUALITY_LIMITATIONS: [GenericMetricLabel<&str>; 3] = [ + (MetricLabel::ClientVideoPublisherQualityLimitationDurationCpu, "cpu"), + (MetricLabel::ClientVideoPublisherQualityLimitationDurationBandwidth, "bandwidth"), + (MetricLabel::ClientVideoPublisherQualityLimitationDurationOther, "other"), +]; + +pub struct RtcMetricsManager {} + +impl RtcMetricsManager { + pub fn new() -> Self { + Self {} + } + + fn micro_to_milli(micros: i64) -> i64 { + micros / 1000 + } + + fn get_or_create_index(&self, strings: &mut Vec, string: String) -> u32 { + let position: Option = strings.iter().position(|s| s == &string); + + match position { + Some(index) => return (index as u32) + (MetricLabel::PredefinedMaxValue as u32), + None => { + strings.push(string); + return ((strings.len() - 1) as u32) + (MetricLabel::PredefinedMaxValue as u32); + } + } + } + + fn create_metric_sample(&self, timestamp_ms: i64, value: f32) -> MetricSample { + MetricSample { timestamp_ms, value, ..Default::default() } + } + + fn create_time_series( + &self, + label: MetricLabel, + strings: &mut Vec, + samples: Vec, + identity: Option<&ParticipantIdentity>, + track_sid: Option<&str>, + rid: Option<&str>, + ) -> TimeSeriesMetric { + let mut time_series = + TimeSeriesMetric { label: label as u32, samples, ..Default::default() }; + + if let Some(id) = identity { + time_series.participant_identity = self.get_or_create_index(strings, id.to_string()); + } + + if let Some(sid) = track_sid { + time_series.track_sid = self.get_or_create_index(strings, sid.to_string()); + } + + if let Some(r) = rid { + time_series.rid = self.get_or_create_index(strings, r.to_string()); + } + + time_series + } + + fn create_time_series_for_metric( + &self, + strings: &mut Vec, + label: MetricLabel, + stat_value: f32, + stat_timestamp_us: i64, + track_sid: String, + rid: Option<&str>, + identity: Option<&ParticipantIdentity>, + ) -> TimeSeriesMetric { + let timestamp_ms = Self::micro_to_milli(stat_timestamp_us); + let sample = self.create_metric_sample(timestamp_ms, stat_value); + + self.create_time_series(label, strings, vec![sample], identity, Some(&track_sid), rid) + } + + pub(crate) async fn collect_metrics( + &self, + room: &Room, + rtc_engine: &RtcEngine, + mut close_rx: broadcast::Receiver<()>, + ) { + loop { + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(1)) => {}, + _ = close_rx.recv() => { + // Room is closing, exit the collection loop + return; + } + } + + match room.get_stats().await { + Ok(stats) => { + let publisher_stats = stats.publisher_stats; + let subscriber_stats = stats.subscriber_stats; + + tokio::join!( + self.collect_publisher_metrics(room, rtc_engine, &publisher_stats), + self.collect_subscriber_metrics(room, rtc_engine, &subscriber_stats) + ); + } + Err(e) => { + log::error!("Failed to retrieve stats: {:?}", e); + } + } + } + } + + fn find_publisher_stats( + &self, + room: &Room, + strings: &mut Vec, + stats: &[RtcStats], + participant_identity: Option, + ) -> Vec { + let mut media_sources: Vec = Vec::new(); + let mut video_tracks: HashMap = HashMap::new(); + + let track_publication = room.local_participant().track_publications(); + + for stat in stats { + if let RtcStats::MediaSource(media_source) = stat { + // why is flattening the source not working? + if media_source.source.kind == "video" { + media_sources.push(media_source.clone()) + } + } + } + + // TODO: would like to do this in a single pass, but sources are not available at the time + for stat in stats { + if let RtcStats::OutboundRtp(outbound_rtp) = stat { + if outbound_rtp.stream.kind == "video" { + if let Some(track_sid) = self.get_published_track_sid( + &media_sources, + &outbound_rtp, + &track_publication, + ) { + video_tracks.insert(track_sid, outbound_rtp.clone()); + } + } + } + } + + let mut metrics: Vec = Vec::new(); + for (track_sid, outbound_rtp) in video_tracks { + let durations = &outbound_rtp.outbound.quality_limitation_durations; + let rid = outbound_rtp.outbound.rid.clone(); + + for &(quality_label, key) in &QUALITY_LIMITATIONS { + if let Some(&duration) = durations.get(key) { + let sample = self.create_metric_sample( + Self::micro_to_milli(outbound_rtp.rtc.timestamp), + duration as f32, // u64 to f32 conversion + ); + + metrics.push(self.create_time_series( + quality_label, + strings, + vec![sample], + participant_identity.as_ref(), + Some(&track_sid), + Some(&rid), + )); + } + } + } + + metrics + } + + fn get_published_track_sid( + &self, + media_source_stats: &[MediaSourceStats], + track_stats: &OutboundRtpStats, + track_publication: &HashMap, + ) -> Option { + let track_identifier = media_source_stats + .iter() + .find(|m| m.rtc.id == track_stats.outbound.media_source_id)? + .source + .track_identifier + .clone(); + + track_publication + .iter() + .find(|(_, track)| track.track().unwrap().rtc_track().id() == track_identifier) + .map(|(sid, _)| sid.to_string()) + } + + async fn collect_publisher_metrics( + &self, + room: &Room, + rtc_engine: &RtcEngine, + stats: &Vec, + ) { + let mut strings = Vec::new(); + let publisher_ts_metrics = self.find_publisher_stats( + room, + &mut strings, + stats, + room.local_participant().identity().into(), + ); + + self.send_metrics(rtc_engine, strings, publisher_ts_metrics).await; + } + + fn find_subscriber_stats( + &self, + strings: &mut Vec, + stats: &[RtcStats], + participant_identity: Option, + ) -> Vec { + let mut video_tracks: Vec = Vec::new(); + let mut audio_tracks: Vec = Vec::new(); + + for stat in stats { + if let RtcStats::InboundRtp(inbound_rtp) = stat { + match inbound_rtp.stream.kind.as_str() { + "audio" => audio_tracks.push(inbound_rtp.clone()), + "video" => video_tracks.push(inbound_rtp.clone()), + _ => log::warn!("Unknown stream kind: {}", inbound_rtp.stream.kind), + } + } + } + + let mut metrics: Vec = Vec::new(); + + for track in video_tracks { + let timestamp_us = track.rtc.timestamp; + let sid = track.inbound.track_identifier; + let rid = None; // SFU only sends a single layer downstream, so no rid involved + + // Forced f32 conversions here, might lose some precision here or even overflow... + let metrics_to_create: [GenericMetricLabel; 6] = [ + (MetricLabel::ClientVideoSubscriberFreezeCount, track.inbound.freeze_count as f32), + ( + MetricLabel::ClientVideoSubscriberTotalFreezeDuration, + track.inbound.total_freeze_duration as f32, + ), + (MetricLabel::ClientVideoSubscriberPauseCount, track.inbound.pause_count as f32), + ( + MetricLabel::ClientVideoSubscriberTotalPausesDuration, + track.inbound.total_pause_duration as f32, + ), + ( + MetricLabel::ClientSubscriberJitterBufferDelay, + track.inbound.jitter_buffer_delay as f32, + ), + ( + MetricLabel::ClientSubscriberJitterBufferEmittedCount, + track.inbound.jitter_buffer_emitted_count as f32, + ), + ]; + + for (label, value) in metrics_to_create { + metrics.push(self.create_time_series_for_metric( + strings, + label, + value, + timestamp_us, + sid.clone(), + rid, + participant_identity.as_ref(), + )); + } + } + + for track in audio_tracks { + let timestamp_us = track.rtc.timestamp; + let sid = track.inbound.track_identifier; + let rid = None; // Audio tracks don't need rid + + let metrics_to_create: [GenericMetricLabel; 5] = [ + ( + MetricLabel::ClientAudioSubscriberConcealedSamples, + track.inbound.concealed_samples as f32, + ), + ( + MetricLabel::ClientAudioSubscriberConcealmentEvents, + track.inbound.concealment_events as f32, + ), + ( + MetricLabel::ClientAudioSubscriberSilentConcealedSamples, + track.inbound.silent_concealed_samples as f32, + ), + ( + MetricLabel::ClientSubscriberJitterBufferDelay, + track.inbound.jitter_buffer_delay as f32, + ), + ( + MetricLabel::ClientSubscriberJitterBufferEmittedCount, + track.inbound.jitter_buffer_emitted_count as f32, + ), + ]; + + for (label, value) in metrics_to_create { + metrics.push(self.create_time_series_for_metric( + strings, + label, + value, + timestamp_us, + sid.clone(), + rid, + participant_identity.as_ref(), + )); + } + } + + metrics + } + + async fn collect_subscriber_metrics( + &self, + room: &Room, + rtc_engine: &RtcEngine, + stats: &Vec, + ) { + let mut strings: Vec = Vec::new(); + let subscriber_ts_metrics = self.find_subscriber_stats( + &mut strings, + stats, + room.local_participant().identity().into(), + ); + + self.send_metrics(rtc_engine, strings, subscriber_ts_metrics).await; + } + + pub async fn send_metrics( + &self, + rtc_engine: &RtcEngine, + strings: Vec, + metrics: Vec, + ) { + if metrics.len() > 0 { + let timestamp = metrics + .first() + .and_then(|metric| metric.samples.first()) + .map(|sample| sample.timestamp_ms) + .unwrap(); + let data_packet = DataPacket { + value: Some(data_packet::Value::Metrics(MetricsBatch { + str_data: strings, + time_series: metrics, + timestamp_ms: timestamp, + ..Default::default() + })), + ..Default::default() + }; + + if let Err(e) = rtc_engine.publish_data(data_packet, DataPacketKind::Reliable).await { + log::warn!("Failed to publish metrics: {:?}", e); + }; + } + } +} diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 06c6350f8..78f5d74f9 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -26,6 +26,7 @@ use livekit_api::signal_client::{SignalOptions, SignalSdkOptions}; use livekit_protocol as proto; use livekit_protocol::observer::Dispatcher; use livekit_runtime::JoinHandle; +use metrics::RtcMetricsManager; use parking_lot::RwLock; pub use proto::DisconnectReason; use proto::{promise::Promise, SignalTarget}; @@ -56,6 +57,7 @@ use crate::{ pub mod data_stream; pub mod e2ee; pub mod id; +mod metrics; pub mod options; pub mod participant; pub mod publication; @@ -345,6 +347,7 @@ pub struct RoomOptions { pub join_retries: u32, pub sdk_options: RoomSdkOptions, pub preregistration: Option, + pub enable_metrics: bool, } #[derive(Debug, Clone)] @@ -372,10 +375,12 @@ impl Default for RoomOptions { join_retries: 3, sdk_options: RoomSdkOptions::default(), preregistration: None, + enable_metrics: true, } } } +#[derive(Clone)] pub struct Room { inner: Arc, } @@ -429,6 +434,7 @@ struct Handle { incoming_stream_handle: JoinHandle<()>, outgoing_stream_handle: JoinHandle<()>, close_tx: broadcast::Sender<()>, + metrics_handle: Option>, } impl Debug for RoomSession { @@ -636,22 +642,44 @@ impl Room { let (close_tx, close_rx) = broadcast::channel(1); + let close_rx_room = close_rx; + let close_rx_incoming = close_rx_room.resubscribe(); + let close_rx_outgoing = close_rx_room.resubscribe(); + let close_rx_metrics = close_rx_room.resubscribe(); + let incoming_stream_handle = livekit_runtime::spawn(incoming_data_stream_task( open_rx, dispatcher.clone(), - close_rx.resubscribe(), + close_rx_incoming, )); let outgoing_stream_handle = livekit_runtime::spawn(outgoing_data_stream_task( packet_rx, identity, rtc_engine.clone(), - close_rx.resubscribe(), + close_rx_outgoing, )); - let room_handle = livekit_runtime::spawn(inner.clone().room_task(engine_events, close_rx)); + let room_handle = + livekit_runtime::spawn(inner.clone().room_task(engine_events, close_rx_room)); + + let metrics_handle = if options.enable_metrics { + let room_clone = Self { inner: inner.clone() }; + + Some(livekit_runtime::spawn(async move { + let metrics_manager = RtcMetricsManager::new(); + metrics_manager.collect_metrics(&room_clone, &rtc_engine, close_rx_metrics).await; + })) + } else { + None + }; - let handle = - Handle { room_handle, incoming_stream_handle, outgoing_stream_handle, close_tx }; + let handle = Handle { + room_handle, + incoming_stream_handle, + outgoing_stream_handle, + metrics_handle, + close_tx, + }; inner.handle.lock().await.replace(handle); Ok((Self { inner }, events)) @@ -848,6 +876,9 @@ impl RoomSession { let _ = handle.incoming_stream_handle.await; let _ = handle.outgoing_stream_handle.await; let _ = handle.room_handle.await; + if let Some(metrics_handle) = handle.metrics_handle { + let _ = metrics_handle.await; + } self.dispatcher.clear(); Ok(()) diff --git a/webrtc-sys/build.rs b/webrtc-sys/build.rs index 9b32db1d7..b8557b73f 100644 --- a/webrtc-sys/build.rs +++ b/webrtc-sys/build.rs @@ -164,6 +164,7 @@ fn main() { .flag("-std=c++20"); } "ios" => { + println!("cargo:rustc-link-lib=framework=Foundation"); println!("cargo:rustc-link-lib=framework=CoreFoundation"); println!("cargo:rustc-link-lib=framework=AVFoundation"); println!("cargo:rustc-link-lib=framework=CoreAudio");