Skip to content

Commit 754bcf0

Browse files
committed
fix: Handle receiver reuse during renegotiation
- Add is_renegotiation parameter to start_rtp_receivers - Skip active receivers only during initial negotiation - Mark already-receiving transceivers as handled during renegotiation - Skip SSRC filtering during renegotiation per RFC 8829 - Add comprehensive bug reproducer test for mesh topology renegotiation
1 parent 14ff7f4 commit 754bcf0

File tree

2 files changed

+310
-12
lines changed

2 files changed

+310
-12
lines changed

webrtc/src/peer_connection/peer_connection_internal.rs

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ impl PeerConnectionInternal {
284284
}
285285
}
286286

287-
self.start_rtp_receivers(&mut track_details, &current_transceivers)
287+
self.start_rtp_receivers(&mut track_details, &current_transceivers, is_renegotiation)
288288
.await?;
289289
if let Some(parsed_remote) = &remote_desc.parsed {
290290
let current_local_desc = self.current_local_description.lock().await;
@@ -418,17 +418,23 @@ impl PeerConnectionInternal {
418418
self: &Arc<Self>,
419419
incoming_tracks: &mut Vec<TrackDetails>,
420420
local_transceivers: &[Arc<RTCRtpTransceiver>],
421+
is_renegotiation: bool,
421422
) -> Result<()> {
422-
// Ensure we haven't already started a transceiver for this ssrc
423+
// Ensure we haven't already started a transceiver for this ssrc.
424+
// Skip filtering during renegotiation since receiver reuse logic handles it.
423425
let mut filtered_tracks = incoming_tracks.clone();
424-
for incoming_track in incoming_tracks {
425-
// If we already have a TrackRemote for a given SSRC don't handle it again
426-
for t in local_transceivers {
427-
let receiver = t.receiver().await;
428-
for track in receiver.tracks().await {
429-
for ssrc in &incoming_track.ssrcs {
430-
if *ssrc == track.ssrc() {
431-
filter_track_with_ssrc(&mut filtered_tracks, track.ssrc());
426+
427+
if !is_renegotiation {
428+
for incoming_track in incoming_tracks {
429+
// If we already have a TrackRemote for a given SSRC don't handle it again
430+
for t in local_transceivers {
431+
let receiver = t.receiver().await;
432+
let existing_tracks = receiver.tracks().await;
433+
for track in existing_tracks {
434+
for ssrc in &incoming_track.ssrcs {
435+
if *ssrc == track.ssrc() {
436+
filter_track_with_ssrc(&mut filtered_tracks, track.ssrc());
437+
}
432438
}
433439
}
434440
}
@@ -450,10 +456,31 @@ impl PeerConnectionInternal {
450456
continue;
451457
}
452458

459+
// Fix(issue-749): Handle receiver reuse during renegotiation in mesh topology.
460+
//
461+
// During SDP renegotiation, the same tracks (SSRCs) legitimately appear in
462+
// subsequent negotiation rounds per RFC 8829 Section 3.7. Receivers that are
463+
// already active should be recognized as handling their existing tracks rather
464+
// than being skipped and marked as "NOT HANDLED".
465+
//
466+
// Root cause: The original code didn't distinguish between initial negotiation
467+
// (where skipping active receivers prevents duplicates) and renegotiation
468+
// (where active receivers represent existing media flows to preserve).
453469
let receiver = t.receiver().await;
454-
if receiver.have_received().await {
455-
continue;
470+
let already_receiving = receiver.have_received().await;
471+
472+
if already_receiving {
473+
if !is_renegotiation {
474+
// Initial negotiation: skip if already receiving (safety check)
475+
continue;
476+
} else {
477+
// Renegotiation: receiver already active, mark as handled
478+
track_handled = true;
479+
break;
480+
}
456481
}
482+
483+
// Start receiver for new tracks only
457484
PeerConnectionInternal::start_receiver(
458485
self.setting_engine.get_receive_mtu(),
459486
incoming_track,

webrtc/src/peer_connection/peer_connection_test.rs

Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -740,3 +740,274 @@ async fn test_peer_connection_state() -> Result<()> {
740740

741741
Ok(())
742742
}
743+
744+
// test_kind: bug_reproducer(issue-749)
745+
/// # Bug Reproducer: Receiver Reuse During Renegotiation in Mesh Topology
746+
///
747+
/// ## Root Cause
748+
///
749+
/// The `start_rtp_receivers()` function in `peer_connection_internal.rs` skipped
750+
/// transceivers that were already receiving (`have_received()=true`) during SDP
751+
/// renegotiation, causing tracks to be marked as "NOT HANDLED" despite receivers
752+
/// being active and media flowing correctly.
753+
///
754+
/// The bug occurred because the code didn't distinguish between two contexts:
755+
/// - **Initial negotiation**: Skipping active receivers prevents duplicate starts (CORRECT)
756+
/// - **Renegotiation**: Skipping active receivers breaks RFC 8829 compliance (BUG)
757+
///
758+
/// During renegotiation in mesh topologies, the same tracks (SSRCs) legitimately
759+
/// reappear in the SDP per RFC 8829 Section 3.7's requirement to "reuse existing
760+
/// media descriptions". The code incorrectly treated these as duplicates to skip
761+
/// rather than existing flows to preserve.
762+
///
763+
/// ## Why Not Caught
764+
///
765+
/// No automated tests covered SDP renegotiation scenarios in mesh topologies.
766+
/// Existing tests only validated the initial negotiation path with simple 1-to-1
767+
/// peer connections.
768+
///
769+
/// The bug only manifests when:
770+
/// 1. Multiple negotiation rounds occur (initial + renegotiation)
771+
/// 2. The same SSRCs appear in subsequent SDPs (expected per RFC 8829)
772+
/// 3. Receivers are already active when renegotiation begins
773+
///
774+
/// Single-round negotiation tests passed because `have_received()` returns `false`
775+
/// during initial setup, so the skip logic was never triggered.
776+
///
777+
/// ## Fix Applied
778+
///
779+
/// Added `is_renegotiation: bool` parameter to `start_rtp_receivers()` to enable
780+
/// context-aware handling:
781+
///
782+
/// ```rust
783+
/// if already_receiving {
784+
/// if !is_renegotiation {
785+
/// continue; // Initial: skip to prevent duplicates (safety)
786+
/// } else {
787+
/// track_handled = true; // Renegotiation: mark as handled (RFC 8829)
788+
/// break;
789+
/// }
790+
/// }
791+
/// ```
792+
///
793+
/// Additionally, SSRC filtering is skipped during renegotiation since existing
794+
/// SSRCs are expected to reappear per the specification.
795+
///
796+
/// ## Prevention
797+
///
798+
/// All WebRTC renegotiation code must:
799+
/// 1. Distinguish between initial negotiation and renegotiation contexts
800+
/// 2. Test multi-round negotiation scenarios, not just initial setup
801+
/// 3. Verify compliance with RFC 8829 Section 3.7 (reuse of media descriptions)
802+
/// 4. Consider mesh topology scenarios where renegotiation is common
803+
///
804+
/// Any code checking receiver state (`have_received()`, transceiver status, etc.)
805+
/// during SDP processing should consider whether the behavior differs between
806+
/// initial and subsequent negotiations.
807+
///
808+
/// ## Pitfall
809+
///
810+
/// **Never assume `have_received()=true` always means "skip this receiver".**
811+
///
812+
/// Context matters critically:
813+
/// - Initial negotiation: `have_received()=true` indicates a safety issue (duplicate)
814+
/// - Renegotiation: `have_received()=true` indicates RFC 8829 compliance (reuse)
815+
///
816+
/// The same receiver state has opposite meanings in different contexts. Always
817+
/// check whether the current operation is initial negotiation or renegotiation
818+
/// before making flow control decisions based on receiver state.
819+
///
820+
/// Failing to consider context leads to either:
821+
/// - False positives (marking valid reused tracks as "NOT HANDLED")
822+
/// - False negatives (allowing duplicate receivers during initial setup)
823+
#[tokio::test]
824+
async fn test_receiver_reuse_during_renegotiation_issue_749() -> Result<()> {
825+
// Setup: Create peer connection pair with media engine
826+
let mut m = MediaEngine::default();
827+
m.register_default_codecs()?;
828+
let api = APIBuilder::new().with_media_engine(m).build();
829+
830+
let (mut pc_offer, mut pc_answer) = new_pair(&api).await?;
831+
832+
// Track receiver counts
833+
let initial_track_count = Arc::new(AtomicU32::new(0));
834+
let renegotiation_track_count = Arc::new(AtomicU32::new(0));
835+
836+
let initial_count_clone = Arc::clone(&initial_track_count);
837+
let renegotiation_count_clone = Arc::clone(&renegotiation_track_count);
838+
let negotiation_phase = Arc::new(AtomicU32::new(0)); // 0=initial, 1=renegotiation
839+
let phase_clone = Arc::clone(&negotiation_phase);
840+
841+
pc_answer.on_track(Box::new(move |_track, _receiver, _transceiver| {
842+
let phase = phase_clone.load(Ordering::SeqCst);
843+
if phase == 0 {
844+
initial_count_clone.fetch_add(1, Ordering::SeqCst);
845+
} else {
846+
renegotiation_count_clone.fetch_add(1, Ordering::SeqCst);
847+
}
848+
Box::pin(async move {})
849+
}));
850+
851+
// Step 1: Add initial tracks (video + audio) to offerer
852+
let video_track = Arc::new(TrackLocalStaticSample::new(
853+
RTCRtpCodecCapability {
854+
mime_type: MIME_TYPE_VP8.to_owned(),
855+
..Default::default()
856+
},
857+
"video_initial".to_owned(),
858+
"stream_initial".to_owned(),
859+
));
860+
861+
let audio_track = Arc::new(TrackLocalStaticSample::new(
862+
RTCRtpCodecCapability {
863+
mime_type: "audio/opus".to_owned(),
864+
..Default::default()
865+
},
866+
"audio_initial".to_owned(),
867+
"stream_initial".to_owned(),
868+
));
869+
870+
pc_offer.add_track(Arc::clone(&video_track) as Arc<dyn TrackLocal + Send + Sync>).await?;
871+
pc_offer.add_track(Arc::clone(&audio_track) as Arc<dyn TrackLocal + Send + Sync>).await?;
872+
873+
// Step 2: Perform initial negotiation
874+
signal_pair(&mut pc_offer, &mut pc_answer).await?;
875+
876+
// Wait for ICE connection
877+
tokio::time::sleep(Duration::from_millis(500)).await;
878+
879+
// Step 3: Verify initial tracks were handled
880+
let initial_transceivers = pc_answer.get_transceivers().await;
881+
assert_eq!(
882+
initial_transceivers.len(),
883+
2,
884+
"Should have 2 transceivers after initial negotiation"
885+
);
886+
887+
// Verify receivers are active
888+
for (idx, t) in initial_transceivers.iter().enumerate() {
889+
let receiver = t.receiver().await;
890+
assert!(
891+
receiver.have_received().await,
892+
"Receiver {} should be active after initial negotiation",
893+
idx
894+
);
895+
}
896+
897+
// Capture initial SSRCs for verification
898+
let mut initial_ssrcs = Vec::new();
899+
for t in &initial_transceivers {
900+
let receiver = t.receiver().await;
901+
let tracks = receiver.tracks().await;
902+
if !tracks.is_empty() {
903+
initial_ssrcs.push(tracks[0].ssrc());
904+
}
905+
}
906+
assert_eq!(
907+
initial_ssrcs.len(),
908+
2,
909+
"Should have captured 2 initial SSRCs"
910+
);
911+
912+
// Step 4: Add new track to trigger renegotiation
913+
negotiation_phase.store(1, Ordering::SeqCst); // Mark as renegotiation phase
914+
915+
let new_video_track = Arc::new(TrackLocalStaticSample::new(
916+
RTCRtpCodecCapability {
917+
mime_type: MIME_TYPE_VP8.to_owned(),
918+
..Default::default()
919+
},
920+
"video_new".to_owned(),
921+
"stream_new".to_owned(),
922+
));
923+
924+
pc_offer.add_track(new_video_track as Arc<dyn TrackLocal + Send + Sync>).await?;
925+
926+
// Step 5: Perform renegotiation
927+
let reoffer = pc_offer.create_offer(None).await?;
928+
929+
// Verify the SDP includes existing SSRCs (per RFC 8829 Section 3.7)
930+
let offer_sdp = reoffer.sdp.clone();
931+
for ssrc in &initial_ssrcs {
932+
assert!(
933+
offer_sdp.contains(&ssrc.to_string()),
934+
"Renegotiation SDP should include existing SSRC {} (RFC 8829 requirement)",
935+
ssrc
936+
);
937+
}
938+
939+
let mut offer_gathering_complete = pc_offer.gathering_complete_promise().await;
940+
pc_offer.set_local_description(reoffer).await?;
941+
let _ = offer_gathering_complete.recv().await;
942+
943+
pc_answer
944+
.set_remote_description(
945+
pc_offer
946+
.local_description()
947+
.await
948+
.ok_or(Error::new("no local description".to_owned()))?,
949+
)
950+
.await?;
951+
952+
let reanswer = pc_answer.create_answer(None).await?;
953+
let mut answer_gathering_complete = pc_answer.gathering_complete_promise().await;
954+
pc_answer.set_local_description(reanswer).await?;
955+
let _ = answer_gathering_complete.recv().await;
956+
957+
pc_offer
958+
.set_remote_description(
959+
pc_answer
960+
.local_description()
961+
.await
962+
.ok_or(Error::new("no local description".to_owned()))?,
963+
)
964+
.await?;
965+
966+
// Wait for renegotiation to complete
967+
tokio::time::sleep(Duration::from_millis(500)).await;
968+
969+
// Step 6: CRITICAL ASSERTION - Verify existing receivers marked as HANDLED
970+
// This is where the bug manifested: existing receivers were skipped and
971+
// marked as "NOT HANDLED" despite being active.
972+
let transceivers_after = pc_answer.get_transceivers().await;
973+
assert_eq!(
974+
transceivers_after.len(),
975+
3,
976+
"Should have 3 transceivers after renegotiation (2 existing + 1 new)"
977+
);
978+
979+
// Verify existing tracks still active with same SSRCs (receiver reused, not restarted)
980+
for (idx, expected_ssrc) in initial_ssrcs.iter().enumerate() {
981+
let receiver = transceivers_after[idx].receiver().await;
982+
assert!(
983+
receiver.have_received().await,
984+
"Existing receiver {} should still be active after renegotiation",
985+
idx
986+
);
987+
988+
let tracks = receiver.tracks().await;
989+
assert_eq!(
990+
tracks.len(),
991+
1,
992+
"Existing receiver {} should have 1 track",
993+
idx
994+
);
995+
assert_eq!(
996+
tracks[0].ssrc(),
997+
*expected_ssrc,
998+
"SSRC should match original (receiver reused, not duplicated)"
999+
);
1000+
}
1001+
1002+
// Verify new receiver also started
1003+
let new_receiver = transceivers_after[2].receiver().await;
1004+
assert!(
1005+
new_receiver.have_received().await,
1006+
"New receiver should be active"
1007+
);
1008+
1009+
// Cleanup
1010+
close_pair_now(&pc_offer, &pc_answer).await;
1011+
1012+
Ok(())
1013+
}

0 commit comments

Comments
 (0)