Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 137 additions & 105 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,5 @@ members = [
]
resolver = "2"

[workspace.dependencies]
tokio-test = { version = "0.4.4" }

[profile.dev]
opt-level = 0
4 changes: 2 additions & 2 deletions examples/examples/broadcast/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ async fn main() -> Result<()> {
let _ = local_track_chan_tx2.send(Arc::clone(&local_track)).await;

// Read RTP packets being sent to webrtc-rs
while let Ok((rtp, _)) = track.read_rtp().await {
while let Ok(rtp) = track.read_rtp().await {
if let Err(err) = local_track.write_rtp(&rtp).await {
if Error::ErrClosedPipe != err {
print!("output track write_rtp got error: {err} and break");
Expand Down Expand Up @@ -260,7 +260,7 @@ async fn main() -> Result<()> {
// like NACK this needs to be called.
tokio::spawn(async move {
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
while let Ok(_) = rtp_sender.read(&mut rtcp_buf).await {}
Result::<()>::Ok(())
});

Expand Down
2 changes: 1 addition & 1 deletion examples/examples/insertable-streams/insertable-streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ async fn main() -> Result<()> {
// like NACK this needs to be called.
tokio::spawn(async move {
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
while let Ok(_) = rtp_sender.read(&mut rtcp_buf).await {}
Result::<()>::Ok(())
});

Expand Down
4 changes: 2 additions & 2 deletions examples/examples/play-from-disk-h264/play-from-disk-h264.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ async fn main() -> Result<()> {
// like NACK this needs to be called.
tokio::spawn(async move {
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
while let Ok(_) = rtp_sender.read(&mut rtcp_buf).await {}
Result::<()>::Ok(())
});

Expand Down Expand Up @@ -239,7 +239,7 @@ async fn main() -> Result<()> {
// like NACK this needs to be called.
tokio::spawn(async move {
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
while let Ok(_) = rtp_sender.read(&mut rtcp_buf).await {}
Result::<()>::Ok(())
});

Expand Down
6 changes: 3 additions & 3 deletions examples/examples/play-from-disk-hevc/play-from-disk-hevc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ async fn main() -> Result<()> {
}
m = track.read_rtp() => {
println!("rtp readed");
if let Ok((p, _)) = m {
if let Ok(p) = m {
let data = pck.depacketize(&p.payload).unwrap();
match pck.payload() {
H265Payload::H265PACIPacket(p) => {
Expand Down Expand Up @@ -398,7 +398,7 @@ async fn offer_worker(
.await?;
tokio::spawn(async move {
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((_, _)) = video_rtp_sender.read(&mut rtcp_buf).await {}
while let Ok(_) = video_rtp_sender.read(&mut rtcp_buf).await {}
Result::<()>::Ok(())
});
let notify1 = notify_connect.clone();
Expand Down Expand Up @@ -471,7 +471,7 @@ async fn offer_worker(
.await?;
tokio::spawn(async move {
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((_, _)) = audio_rtp_sender.read(&mut rtcp_buf).await {}
while let Ok(_) = audio_rtp_sender.read(&mut rtcp_buf).await {}
Result::<()>::Ok(())
});
let notify1 = notify_connect.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ async fn add_video(
// like NACK this needs to be called.
tokio::spawn(async move {
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
while let Ok(_) = rtp_sender.read(&mut rtcp_buf).await {}
Result::<()>::Ok(())
});

Expand Down
4 changes: 2 additions & 2 deletions examples/examples/play-from-disk-vpx/play-from-disk-vpx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ async fn main() -> Result<()> {
// like NACK this needs to be called.
tokio::spawn(async move {
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
while let Ok(_) = rtp_sender.read(&mut rtcp_buf).await {}
Result::<()>::Ok(())
});

Expand Down Expand Up @@ -245,7 +245,7 @@ async fn main() -> Result<()> {
// like NACK this needs to be called.
tokio::spawn(async move {
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
while let Ok(_) = rtp_sender.read(&mut rtcp_buf).await {}
Result::<()>::Ok(())
});

Expand Down
4 changes: 2 additions & 2 deletions examples/examples/reflect/reflect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ async fn main() -> Result<()> {
let m = s.to_owned();
tokio::spawn(async move {
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
while let Ok(_) = rtp_sender.read(&mut rtcp_buf).await {}
println!("{m} rtp_sender.read loop exit");
Result::<()>::Ok(())
});
Expand Down Expand Up @@ -246,7 +246,7 @@ async fn main() -> Result<()> {
track.codec().capability.mime_type
);
// Read RTP packets being sent to webrtc-rs
while let Ok((rtp, _)) = track.read_rtp().await {
while let Ok(rtp) = track.read_rtp().await {
if let Err(err) = output_track2.write_rtp(&rtp).await {
println!("output track write_rtp got error: {err}");
break;
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/rtp-forwarder/rtp-forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ async fn main() -> Result<()> {

tokio::spawn(async move {
let mut b = vec![0u8; 1500];
while let Ok((mut rtp_packet, _)) = track.read(&mut b).await {
while let Ok(mut rtp_packet) = track.read(&mut b).await {
// Update the PayloadType
rtp_packet.header.payload_type = c.payload_type;

Expand Down
2 changes: 1 addition & 1 deletion examples/examples/rtp-to-webrtc/rtp-to-webrtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ async fn main() -> Result<()> {
// like NACK this needs to be called.
tokio::spawn(async move {
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
while let Ok(_) = rtp_sender.read(&mut rtcp_buf).await {}
Result::<()>::Ok(())
});

Expand Down
2 changes: 1 addition & 1 deletion examples/examples/save-to-disk-h264/save-to-disk-h264.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn save_to_disk(
loop {
tokio::select! {
result = track.read_rtp() => {
if let Ok((rtp_packet, _)) = result {
if let Ok(rtp_packet) = result {
let mut w = writer.lock().await;
w.write_rtp(&rtp_packet)?;
}else{
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/save-to-disk-vpx/save-to-disk-vpx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn save_to_disk(
loop {
tokio::select! {
result = track.read_rtp() => {
if let Ok((rtp_packet, _)) = result {
if let Ok(rtp_packet) = result {
let mut w = writer.lock().await;
w.write_rtp(&rtp_packet)?;
}else{
Expand Down
4 changes: 2 additions & 2 deletions examples/examples/simulcast/simulcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ async fn main() -> Result<()> {
// like NACK this needs to be called.
tokio::spawn(async move {
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
while let Ok(_) = rtp_sender.read(&mut rtcp_buf).await {}
Result::<()>::Ok(())
});

Expand Down Expand Up @@ -193,7 +193,7 @@ async fn main() -> Result<()> {
tokio::spawn(async move {
// Read RTP packets being sent to webrtc-rs
println!("enter track loop {}", track.rid());
while let Ok((rtp, _)) = track.read_rtp().await {
while let Ok(rtp) = track.read_rtp().await {
if let Err(err) = output_track.write_rtp(&rtp).await {
if Error::ErrClosedPipe != err {
println!("output track write_rtp got error: {err} and break");
Expand Down
4 changes: 2 additions & 2 deletions examples/examples/swap-tracks/swap-tracks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ async fn main() -> Result<()> {
// like NACK this needs to be called.
tokio::spawn(async move {
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
while let Ok(_) = rtp_sender.read(&mut rtcp_buf).await {}
Result::<()>::Ok(())
});

Expand Down Expand Up @@ -159,7 +159,7 @@ async fn main() -> Result<()> {

let mut last_timestamp = 0;
let mut is_curr_track = false;
while let Ok((mut rtp, _)) = track.read_rtp().await {
while let Ok(mut rtp) = track.read_rtp().await {
// Change the timestamp to only be the delta
let old_timestamp = rtp.header.timestamp;
if last_timestamp == 0 {
Expand Down
4 changes: 3 additions & 1 deletion ice/src/candidate/candidate_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ impl Candidate for CandidateBase {
{
let mut closed_ch = self.closed_ch.lock().await;
if closed_ch.is_none() {
return Err(Error::ErrClosed);
// Если кандидат уже был ранее закрыт, не возвращать ошибку, а просто вернуть успех
return Ok(());
// return Err(Error::ErrClosed);
}
closed_ch.take();
}
Expand Down
63 changes: 16 additions & 47 deletions interceptor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,11 @@ pub trait Interceptor {
#[async_trait]
pub trait RTPWriter {
/// write a rtp packet
async fn write(&self, pkt: &rtp::packet::Packet, attributes: &Attributes) -> Result<usize>;
async fn write(&self, pkt: &rtp::packet::Packet) -> Result<usize>;
}

pub type RTPWriterBoxFn = Box<
dyn (Fn(
&rtp::packet::Packet,
&Attributes,
) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + Sync>>)
dyn (Fn(&rtp::packet::Packet) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + Sync>>)
+ Send
+ Sync,
>;
Expand All @@ -95,28 +92,20 @@ pub struct RTPWriterFn(pub RTPWriterBoxFn);
#[async_trait]
impl RTPWriter for RTPWriterFn {
/// write a rtp packet
async fn write(&self, pkt: &rtp::packet::Packet, attributes: &Attributes) -> Result<usize> {
self.0(pkt, attributes).await
async fn write(&self, pkt: &rtp::packet::Packet) -> Result<usize> {
self.0(pkt).await
}
}

/// RTPReader is used by Interceptor.bind_remote_stream.
#[async_trait]
pub trait RTPReader {
/// read a rtp packet
async fn read(
&self,
buf: &mut [u8],
attributes: &Attributes,
) -> Result<(rtp::packet::Packet, Attributes)>;
async fn read(&self, buf: &mut [u8]) -> Result<rtp::packet::Packet>;
}

pub type RTPReaderBoxFn = Box<
dyn (Fn(
&mut [u8],
&Attributes,
)
-> Pin<Box<dyn Future<Output = Result<(rtp::packet::Packet, Attributes)>> + Send + Sync>>)
dyn (Fn(&mut [u8]) -> Pin<Box<dyn Future<Output = Result<rtp::packet::Packet>> + Send + Sync>>)
+ Send
+ Sync,
>;
Expand All @@ -125,30 +114,21 @@ pub struct RTPReaderFn(pub RTPReaderBoxFn);
#[async_trait]
impl RTPReader for RTPReaderFn {
/// read a rtp packet
async fn read(
&self,
buf: &mut [u8],
attributes: &Attributes,
) -> Result<(rtp::packet::Packet, Attributes)> {
self.0(buf, attributes).await
async fn read(&self, buf: &mut [u8]) -> Result<rtp::packet::Packet> {
self.0(buf).await
}
}

/// RTCPWriter is used by Interceptor.bind_rtcpwriter.
#[async_trait]
pub trait RTCPWriter {
/// write a batch of rtcp packets
async fn write(
&self,
pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>],
attributes: &Attributes,
) -> Result<usize>;
async fn write(&self, pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>]) -> Result<usize>;
}

pub type RTCPWriterBoxFn = Box<
dyn (Fn(
&[Box<dyn rtcp::packet::Packet + Send + Sync>],
&Attributes,
) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + Sync>>)
+ Send
+ Sync,
Expand All @@ -159,12 +139,8 @@ pub struct RTCPWriterFn(pub RTCPWriterBoxFn);
#[async_trait]
impl RTCPWriter for RTCPWriterFn {
/// write a batch of rtcp packets
async fn write(
&self,
pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>],
attributes: &Attributes,
) -> Result<usize> {
self.0(pkts, attributes).await
async fn write(&self, pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>]) -> Result<usize> {
self.0(pkts).await
}
}

Expand All @@ -175,22 +151,16 @@ pub trait RTCPReader {
async fn read(
&self,
buf: &mut [u8],
attributes: &Attributes,
) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)>;
) -> Result<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>>;
}

pub type RTCPReaderBoxFn = Box<
dyn (Fn(
&mut [u8],
&Attributes,
) -> Pin<
Box<
dyn Future<
Output = Result<(
Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>,
Attributes,
)>,
> + Send
dyn Future<Output = Result<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>>>
+ Send
+ Sync,
>,
>) + Send
Expand All @@ -205,9 +175,8 @@ impl RTCPReader for RTCPReaderFn {
async fn read(
&self,
buf: &mut [u8],
attributes: &Attributes,
) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> {
self.0(buf, attributes).await
) -> Result<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>> {
self.0(buf).await
}
}

Expand Down
Loading