Skip to content

Commit

Permalink
read a complete frame every time when use rtspovertcp
Browse files Browse the repository at this point in the history
  • Loading branch information
bailb committed May 1, 2024
1 parent 7df491b commit 9ed5970
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 2 deletions.
65 changes: 64 additions & 1 deletion library/bytesio/src/bytesio.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::net::SocketAddr;
use std::time::Duration;

use std::time::Duration;
use async_trait::async_trait;
use bytes::BufMut;
use bytes::Bytes;
Expand All @@ -24,6 +24,11 @@ pub trait TNetIO: Send + Sync {
async fn write(&mut self, bytes: Bytes) -> Result<(), BytesIOError>;
async fn read(&mut self) -> Result<BytesMut, BytesIOError>;
async fn read_timeout(&mut self, duration: Duration) -> Result<BytesMut, BytesIOError>;
async fn read_min_bytes_with_timeout(
&mut self,
duration: Duration,
size: usize
) -> Result<BytesMut, BytesIOError>;
fn get_net_type(&self) -> NetType;
}

Expand Down Expand Up @@ -86,6 +91,20 @@ impl TNetIO for UdpIO {
}
}

// As udp is based on packet, we can't read part of them, so the size parameter is not effective
async fn read_min_bytes_with_timeout(
&mut self,
duration: Duration,
_size: usize
) -> Result<BytesMut, BytesIOError> {
match tokio::time::timeout(duration, self.read()).await {
Ok(data) => data,
Err(err) => Err(BytesIOError {
value: BytesIOErrorValue::TimeoutError(err),
}),
}
}

async fn read(&mut self) -> Result<BytesMut, BytesIOError> {
let mut buf = vec![0; 4096];
let len = self.socket.recv(&mut buf).await?;
Expand Down Expand Up @@ -131,6 +150,50 @@ impl TNetIO for TcpIO {
}
}

async fn read_min_bytes_with_timeout(
&mut self,
duration: Duration,
size: usize
) -> Result<BytesMut, BytesIOError> {
let mut result: BytesMut = BytesMut::new();
let start_time = tokio::time::Instant::now();
loop {
let current_time = tokio::time::Instant::now();
let remaining_duration = match duration.checked_sub(current_time - start_time) {
Some(remaining) => remaining,
None => Duration::from_secs(0),
};
let message_result = tokio::time::timeout(remaining_duration, self.stream.next()).await;
match message_result {
Ok(message) => match message {
Some(data) => match data {
Ok(bytes) => {
result.extend_from_slice(&bytes);
if result.len() >= size {
return Ok(result);
}
}
Err(err) => {
return Err(BytesIOError {
value: BytesIOErrorValue::IOError(err),
})
}
},
None => {
return Err(BytesIOError {
value: BytesIOErrorValue::NoneReturn,
})
}
},
Err(err) => {
return Err(BytesIOError {
value: BytesIOErrorValue::TimeoutError(err),
})
}
}
}
}

async fn read(&mut self) -> Result<BytesMut, BytesIOError> {
let message = self.stream.next().await;

Expand Down
8 changes: 7 additions & 1 deletion protocol/rtsp/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use define::rtsp_method_name;

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;

use commonlib::auth::Auth;
Expand Down Expand Up @@ -150,7 +151,12 @@ impl RtspServerSession {
match data {
Some(a) => {
if self.reader.len() < a.length as usize {
let data = self.io.lock().await.read().await?;
let data = self
.io
.lock()
.await
.read_min_bytes_with_timeout(Duration::from_millis(1000), a.length.into())
.await?;
self.reader.extend_from_slice(&data[..]);
}
self.on_rtp_over_rtsp_message(a.channel_identifier, a.length as usize)
Expand Down

0 comments on commit 9ed5970

Please sign in to comment.