diff --git a/Cargo.lock b/Cargo.lock index a9cfd0f..fb4f239 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1704,7 +1704,7 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "retina" -version = "0.4.6" +version = "0.4.7" dependencies = [ "base64", "bitstream-io", diff --git a/src/codec/h264.rs b/src/codec/h264.rs index 5a980b1..07aacc0 100644 --- a/src/codec/h264.rs +++ b/src/codec/h264.rs @@ -44,6 +44,9 @@ pub(crate) struct Depacketizer { /// In state `PreMark`, an entry for each NAL. /// Kept around (empty) in other states to re-use the backing allocation. nals: Vec, + + /// RTP payload is an Annex B stream. + has_annex_b_stream: bool, } #[derive(Debug)] @@ -123,6 +126,7 @@ impl Depacketizer { pending: None, pieces: Vec::new(), nals: Vec::new(), + has_annex_b_stream: false, parameters, }) } @@ -323,18 +327,23 @@ impl Depacketizer { match (start, access_unit.in_fu_a) { (true, true) => return Err("FU-A with start bit while frag in progress".into()), (true, false) => { - self.add_piece(data)?; - self.nals.push(Nal { - hdr: nal_header, - next_piece_idx: u32::MAX, // should be overwritten later. - len: 1 + u32_len, - }); + if self.is_annex_b_stream(data.clone()) { + self.has_annex_b_stream = true; + self.read_annex_b_stream(nal_header, &mut data)?; + } else { + self.add_piece(data)?; + self.nals.push(Nal { + hdr: nal_header, + next_piece_idx: u32::MAX, // should be overwritten later. + len: 1 + u32_len, + }); + } access_unit.in_fu_a = true; } (false, true) => { let pieces = self.add_piece(data)?; let nal = self.nals.last_mut().expect("nals non-empty while in fu-a"); - if u8::from(nal_header) != u8::from(nal.hdr) { + if !self.has_annex_b_stream && u8::from(nal_header) != u8::from(nal.hdr) { return Err(format!( "FU-A has inconsistent NAL type: {:?} then {:?}", nal.hdr, nal_header, @@ -394,6 +403,97 @@ impl Depacketizer { u32::try_from(self.pieces.len()).map_err(|_| "more than u32::MAX pieces!".to_string()) } + /// Checks if Annex B start code is present in payload. + fn is_annex_b_stream(&mut self, piece: Bytes) -> bool { + // TODO: should we check for 3 byte start code too? + let _start_code_3_byte = [0x00, 0x00, 0x01]; + + let start_code_4_byte = [0x00, 0x00, 0x00, 0x01]; + let start_code_4_byte_idx = piece + .windows(start_code_4_byte[..].len()) + .position(|window| window == &start_code_4_byte[..]); + start_code_4_byte_idx.is_some() + } + + /// Parses an Annex B steam, splitting NALUs in it and adding them to `Depacketizer`. + fn read_annex_b_stream( + &mut self, + nal_header: NalHeader, + piece: &mut Bytes, + ) -> Result<(), String> { + const START_CODE_4_BYTE: [u8; 4] = [0x00, 0x00, 0x00, 0x01]; + + assert!(self.has_annex_b_stream); + + // TODO: check for start code broken b/w two fragmented packets. + + // Find start codes in payload. + let start_codes: Vec<_> = piece + .windows(START_CODE_4_BYTE[..].len()) + .enumerate() + .filter_map(|(i, window)| { + if window == START_CODE_4_BYTE { + Some(i) + } else { + None + } + }) + .collect(); + + let mut start_idx = 0; + for (idx, start_code_idx) in start_codes.into_iter().enumerate() { + let start_code_end_idx = start_code_idx + 3; + let mut nal_piece = piece.slice(start_idx..start_code_idx); + start_idx = start_code_end_idx + 1; + + if idx == 0 { + if self.pieces.is_empty() { + let pieces = self.add_piece(nal_piece.clone())?; + // use nal_header from argument + self.nals.push(Nal { + hdr: nal_header, + next_piece_idx: pieces, + len: 1 + u32::try_from(nal_piece.clone().len()) + .expect("NALU payload must be < u16::MAX"), + }) + } else { + // TODO: we received a start code in a fragmented FU-A packet. + // Handle it differently i.e. update the last NALU instead of + // creating a new one. + } + } else { + let nal_header_byte = nal_piece[0]; + nal_piece.advance(1); + + let pieces = self.add_piece(nal_piece.clone())?; + let nal_header = NalHeader::new(nal_header_byte).expect("NalHeader is valid"); + self.nals.push(Nal { + hdr: nal_header, + next_piece_idx: pieces, + len: 1 + u32::try_from(nal_piece.clone().len()) + .expect("NALU payload must be < u16::MAX"), + }) + } + } + + // Handle payload after the last found start code. + if start_idx < piece.len() { + let mut nal_piece = piece.slice(start_idx..); + let nal_header_byte = nal_piece[0]; + let nal_header = NalHeader::new(nal_header_byte).expect("NalHeader is valid"); + nal_piece.advance(1); + self.add_piece(nal_piece.clone())?; + self.nals.push(Nal { + hdr: nal_header, + next_piece_idx: u32::MAX, + len: 1 + u32::try_from(nal_piece.clone().len()) + .expect("NALU payload must be < u16::MAX"), + }) + } + + Ok(()) + } + /// Logs information about each access unit. /// Currently, "bad" access units (violating certain specification rules) /// are logged at debug priority, and others are logged at trace priority. @@ -1002,6 +1102,7 @@ mod tests { use crate::testutil::init_logging; use crate::{codec::CodecItem, rtp::ReceivedPacketBuilder}; + use h264_reader::nal::UnitType; /* * This test requires @@ -1527,4 +1628,161 @@ mod tests { assert!(frame.has_new_parameters); assert!(d.parameters().is_some()); } + + // FU-A packet containing Annex B stream (https://github.com/scottlamb/retina/issues/68) + #[test] + fn parse_annex_b_stream_in_fu_a() { + init_logging(); + let mut d = super::Depacketizer::new(90_000, Some("profile-level-id=TQAf;packetization-mode=1;sprop-parameter-sets=J00AH+dAKALdgKUFBQXwAAADABAAAAMCiwEAAtxoAAIlUX//AoA=,KO48gA==")).unwrap(); + let timestamp = crate::Timestamp { + timestamp: 0, + clock_rate: NonZeroU32::new(90_000).unwrap(), + start: 0, + }; + d.push( + ReceivedPacketBuilder { + // FU-A start fragment which includes Annex B stream of 3 NALs + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 0, + loss: 0, + mark: false, + payload_type: 0, + } + .build( + *b"\ + \x3c\x87\ + \x4d\x00\x1f\xe7\x40\x28\x02\xdd\x80\xa5\x05\x05\x05\xf0\x00\x00\x03\x00\x10\x00\x00\x03\x02\x8b\x01\x00\x02\xdc\x68\x00\x02\x25\x51\x7f\xff\x02\x80\ + \x00\x00\x00\x01\ + \x28\ + \xee\x3c\x80\ + \x00\x00\x00\x01\ + \x25\ + idr-slice, " + ) + .unwrap(), + ) + .unwrap(); + assert!(d.pull().is_none()); + + // should've parsed Annex B stream from first FU-A frag into 3 NALs (SPS, PPS & IDR slice) + let number_of_nals_in_first_frag = 3; + assert!(d.nals.len() == number_of_nals_in_first_frag); + assert!(d.pieces.len() == number_of_nals_in_first_frag); + assert!(d.nals[0].hdr.nal_unit_type() == UnitType::SeqParameterSet); + assert!(d.nals[1].hdr.nal_unit_type() == UnitType::PicParameterSet); + assert!(d.nals[2].hdr.nal_unit_type() == UnitType::SliceLayerWithoutPartitioningIdr); + + d.push( + ReceivedPacketBuilder { + // FU-A packet, middle. + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 1, + loss: 0, + mark: false, + payload_type: 0, + } + .build(*b"\x3c\x07idr-slice continued, ") + .unwrap(), + ) + .unwrap(); + assert!(d.pull().is_none()); + + // For Annex B stream in FU-A, make sure we append next frags to the last nal + // instead of creating a new one from the frag header, since the header is of the starting + // NAL of previous frag, but instead is supposed to be the continuation of the last NAL from Annex B stream. + + // This test will also test that retina shouldn't panic on receiving different nal headers in frags of + // a FU-A when the FU-A contains an Annex B stream. + + // no new nals are to be created + assert!(d.nals.len() == number_of_nals_in_first_frag); + // data from frag will get appended + assert!(d.pieces.len() == number_of_nals_in_first_frag + 1); + + d.push( + ReceivedPacketBuilder { + // FU-A packet, end. + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 2, + loss: 0, + mark: true, + payload_type: 0, + } + .build(*b"\x3c\x47idr-slice end") + .unwrap(), + ) + .unwrap(); + + let frame = match d.pull() { + Some(CodecItem::VideoFrame(frame)) => frame, + _ => panic!(), + }; + assert_eq!( + frame.data(), + b"\ + \x00\x00\x00\x26\ + \x27\ + \x4d\x00\x1f\xe7\x40\x28\x02\xdd\x80\xa5\x05\x05\x05\xf0\x00\x00\x03\x00\x10\x00\x00\x03\x02\x8b\x01\x00\x02\xdc\x68\x00\x02\x25\x51\x7f\xff\x02\x80\ + \x00\x00\x00\ + \x04\x28\ + \xee\x3c\x80\ + \x00\x00\x00\ + \x2e\x25\ + idr-slice, idr-slice continued, idr-slice end" + ); + } + + #[test] + fn exit_on_inconsistent_headers_between_fu_a() { + init_logging(); + let mut d = super::Depacketizer::new(90_000, Some("profile-level-id=TQAf;packetization-mode=1;sprop-parameter-sets=J00AH+dAKALdgKUFBQXwAAADABAAAAMCiwEAAtxoAAIlUX//AoA=,KO48gA==")).unwrap(); + let timestamp = crate::Timestamp { + timestamp: 0, + clock_rate: NonZeroU32::new(90_000).unwrap(), + start: 0, + }; + d.push( + ReceivedPacketBuilder { + // FU-A start fragment + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 0, + loss: 0, + mark: false, + payload_type: 0, + } + .build(*b"\x3c\x81start of non-idr") + .unwrap(), + ) + .unwrap(); + assert!(d.pull().is_none()); + + let push_result = d.push( + ReceivedPacketBuilder { + // FU-A packet, middle. + ctx: crate::PacketContext::dummy(), + stream_id: 0, + timestamp, + ssrc: 0, + sequence_number: 1, + loss: 0, + mark: false, + payload_type: 0, + } + .build(*b"\x3c\x07a wild sps appeared") + .unwrap(), + ); + assert!(push_result.is_err()); + } }