Skip to content

Commit 56b490c

Browse files
authored
fix(rtsp): Fix issue preventing client destructor from completing (#463)
* fix(rtsp): Fix issue preventing client destructor from completing * ensure all rtsp sockets that are used for receive have proper receive timeouts. update logging for expected error conditions to be info instead * make timeouts configurable * add missing slash
1 parent 5f05cf7 commit 56b490c

File tree

8 files changed

+64
-17
lines changed

8 files changed

+64
-17
lines changed

components/rtsp/include/rtsp_client.hpp

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ class RtspClient : public BaseComponent {
9898
/// \note Starts the RTP and RTCP threads.
9999
/// Sends the SETUP request to the RTSP server and parses the response.
100100
/// \note The default ports are 5000 and 5001 for RTP and RTCP respectively.
101+
/// \note The default receive timeout is 5 seconds.
101102
/// \param ec The error code to set if an error occurs
102103
void setup(std::error_code &ec);
103104

@@ -106,8 +107,10 @@ class RtspClient : public BaseComponent {
106107
/// \note Starts the RTP and RTCP threads.
107108
/// \param rtp_port The RTP client port
108109
/// \param rtcp_port The RTCP client port
110+
/// \param receive_timeout The timeout for receiving RTP and RTCP packets
109111
/// \param ec The error code to set if an error occurs
110-
void setup(size_t rtp_port, size_t rtcp_port, std::error_code &ec);
112+
void setup(size_t rtp_port, size_t rtcp_port, const std::chrono::duration<float> &receive_timeout,
113+
std::error_code &ec);
111114

112115
/// Play the RTSP stream
113116
/// Sends the PLAY request to the RTSP server and parses the response.
@@ -139,14 +142,18 @@ class RtspClient : public BaseComponent {
139142
/// Initialize the RTP socket
140143
/// \note Starts the RTP socket task.
141144
/// \param rtp_port The RTP client port
145+
/// \param receive_timeout The timeout for receiving RTP packets
142146
/// \param ec The error code to set if an error occurs
143-
void init_rtp(size_t rtp_port, std::error_code &ec);
147+
void init_rtp(size_t rtp_port, const std::chrono::duration<float> &receive_timeout,
148+
std::error_code &ec);
144149

145150
/// Initialize the RTCP socket
146151
/// \note Starts the RTCP socket task.
147152
/// \param rtcp_port The RTCP client port
153+
/// \param receive_timeout The timeout for receiving RTCP packets
148154
/// \param ec The error code to set if an error occurs
149-
void init_rtcp(size_t rtcp_port, std::error_code &ec);
155+
void init_rtcp(size_t rtcp_port, const std::chrono::duration<float> &receive_timeout,
156+
std::error_code &ec);
150157

151158
/// Handle an RTP packet
152159
/// \note Parses the RTP packet and appends it to the current JPEG frame.

components/rtsp/include/rtsp_server.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,9 @@ class RtspServer : public BaseComponent {
6666

6767
/// @brief Start the RTSP server
6868
/// Starts the accept task, session task, and binds the RTSP socket
69+
/// @param accept_timeout The timeout for accepting new connections
6970
/// @return True if the server was started successfully, false otherwise
70-
bool start();
71+
bool start(const std::chrono::duration<float> &accept_timeout = std::chrono::seconds(5));
7172

7273
/// @brief Stop the FTP server
7374
/// Stops the accept task, session task, and closes the RTSP socket

components/rtsp/include/rtsp_session.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ class RtspSession : public BaseComponent {
3030
struct Config {
3131
std::string server_address; ///< The address of the server
3232
std::string rtsp_path; ///< The RTSP path of the session
33+
std::chrono::duration<float> receive_timeout =
34+
std::chrono::seconds(5); ///< The timeout for receiving data. Should be > 0.
3335
espp::Logger::Verbosity log_level =
3436
espp::Logger::Verbosity::WARN; ///< The log level of the session
3537
};

components/rtsp/src/rtsp_client.cpp

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,15 @@ void RtspClient::describe(std::error_code &ec) {
150150

151151
void RtspClient::setup(std::error_code &ec) {
152152
// default to rtp and rtcp client ports 5000 and 5001
153-
setup(5000, 50001, ec);
153+
using namespace std::chrono_literals;
154+
static constexpr size_t rtp_port = 5000;
155+
static constexpr size_t rtcp_port = 5001;
156+
static constexpr auto receive_timeout = 5s;
157+
setup(rtp_port, rtcp_port, receive_timeout, ec);
154158
}
155159

156-
void RtspClient::setup(size_t rtp_port, size_t rtcp_port, std::error_code &ec) {
160+
void RtspClient::setup(size_t rtp_port, size_t rtcp_port,
161+
const std::chrono::duration<float> &receive_timeout, std::error_code &ec) {
157162
// exit early if the error code is set
158163
if (ec) {
159164
return;
@@ -169,8 +174,8 @@ void RtspClient::setup(size_t rtp_port, size_t rtcp_port, std::error_code &ec) {
169174
return;
170175
}
171176

172-
init_rtp(rtp_port, ec);
173-
init_rtcp(rtcp_port, ec);
177+
init_rtp(rtp_port, receive_timeout, ec);
178+
init_rtcp(rtcp_port, receive_timeout, ec);
174179
}
175180

176181
void RtspClient::play(std::error_code &ec) {
@@ -238,12 +243,14 @@ bool RtspClient::parse_response(const std::string &response_data, std::error_cod
238243
return true;
239244
}
240245

241-
void RtspClient::init_rtp(size_t rtp_port, std::error_code &ec) {
246+
void RtspClient::init_rtp(size_t rtp_port, const std::chrono::duration<float> &receive_timeout,
247+
std::error_code &ec) {
242248
// exit early if the error code is set
243249
if (ec) {
244250
return;
245251
}
246252
logger_.debug("Starting rtp socket");
253+
rtp_socket_.set_receive_timeout(receive_timeout);
247254
auto rtp_task_config = espp::Task::BaseConfig{
248255
.name = "Rtp",
249256
.stack_size_bytes = 16 * 1024,
@@ -261,12 +268,14 @@ void RtspClient::init_rtp(size_t rtp_port, std::error_code &ec) {
261268
}
262269
}
263270

264-
void RtspClient::init_rtcp(size_t rtcp_port, std::error_code &ec) {
271+
void RtspClient::init_rtcp(size_t rtcp_port, const std::chrono::duration<float> &receive_timeout,
272+
std::error_code &ec) {
265273
// exit early if the error code is set
266274
if (ec) {
267275
return;
268276
}
269277
logger_.debug("Starting rtcp socket");
278+
rtcp_socket_.set_receive_timeout(receive_timeout);
270279
auto rtcp_task_config = espp::Task::BaseConfig{
271280
.name = "Rtcp",
272281
.stack_size_bytes = 6 * 1024,

components/rtsp/src/rtsp_server.cpp

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,18 @@ void RtspServer::set_session_log_level(Logger::Verbosity log_level) {
2626
session_log_level_ = log_level;
2727
}
2828

29-
bool RtspServer::start() {
29+
bool RtspServer::start(const std::chrono::duration<float> &accept_timeout) {
3030
if (accept_task_ && accept_task_->is_started()) {
3131
logger_.error("Server is already running");
3232
return false;
3333
}
3434

3535
logger_.info("Starting RTSP server on port {}", port_);
3636

37+
// ensure the receive timeout is set so that the accept will not block
38+
// indefinitely and the accept task can be stopped.
39+
rtsp_socket_.set_receive_timeout(accept_timeout);
40+
3741
if (!rtsp_socket_.bind(port_)) {
3842
logger_.error("Failed to bind to port {}", port_);
3943
return false;
@@ -161,7 +165,13 @@ bool RtspServer::accept_task_function(std::mutex &m, std::condition_variable &cv
161165
// accept a new connection
162166
auto control_socket = rtsp_socket_.accept();
163167
if (!control_socket) {
164-
logger_.error("Failed to accept new connection");
168+
logger_.info("Failed to accept new connection");
169+
// if we were notified, then we should stop the task
170+
if (task_notified) {
171+
task_notified = false;
172+
return true;
173+
}
174+
// do not stop the task, just try to accept another connection
165175
return false;
166176
}
167177

@@ -194,7 +204,7 @@ bool RtspServer::accept_task_function(std::mutex &m, std::condition_variable &cv
194204
session_task_->start();
195205
}
196206
// we do not want to stop the task
197-
return false;
207+
return task_notified;
198208
}
199209

200210
bool RtspServer::session_task_function(std::mutex &m, std::condition_variable &cv,
@@ -203,8 +213,11 @@ bool RtspServer::session_task_function(std::mutex &m, std::condition_variable &c
203213
{
204214
using namespace std::chrono_literals;
205215
std::unique_lock<std::mutex> lk(m);
206-
cv.wait_for(lk, 10ms, [&task_notified] { return task_notified; });
216+
auto stop_requested = cv.wait_for(lk, 10ms, [&task_notified] { return task_notified; });
207217
task_notified = false;
218+
if (stop_requested) {
219+
return true;
220+
}
208221
}
209222

210223
// when this function returns, the vector of pointers will go out of scope

components/rtsp/src/rtsp_session.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ RtspSession::RtspSession(std::unique_ptr<TcpSocket> control_socket, const Config
1313
, client_address_(control_socket_->get_remote_info().address) {
1414
// set the logger tag to include the session id
1515
logger_.set_tag("RtspSession " + std::to_string(session_id_));
16+
// ensure there is a timeout on the control socket receive
17+
control_socket_->set_receive_timeout(config.receive_timeout);
1618
// start the session task to handle RTSP commands
1719
using namespace std::placeholders;
1820
control_task_ = std::make_unique<Task>(Task::Config{
@@ -296,6 +298,16 @@ bool RtspSession::control_task_fn(std::mutex &m, std::condition_variable &cv, bo
296298
if (!handle_rtsp_request(request)) {
297299
logger_.warn("Failed to handle RTSP request");
298300
}
301+
} else {
302+
// if the receive failed, then let's wait a little / check the task_notified
303+
// flag to know if we should stop or not.
304+
using namespace std::chrono_literals;
305+
std::unique_lock<std::mutex> lk(m);
306+
auto stop_requested = cv.wait_for(lk, 1ms, [&task_notified] { return task_notified; });
307+
task_notified = false;
308+
if (stop_requested) {
309+
return true;
310+
}
299311
}
300312
// the receive handles most of the blocking, so we don't need to sleep
301313
// here, just return false to keep the task running

components/socket/src/tcp_socket.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ std::unique_ptr<TcpSocket> TcpSocket::accept() {
185185
// accept connection
186186
auto accepted_socket = ::accept(socket_, (struct sockaddr *)sender_address, &socklen);
187187
if (accepted_socket < 0) {
188-
logger_.error("Could not accept connection: {}", error_string());
188+
logger_.info("Could not accept connection: {}", error_string());
189189
return nullptr;
190190
}
191191
connected_client_info.update();

components/socket/src/udp_socket.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ bool UdpSocket::receive(size_t max_num_bytes, std::vector<uint8_t> &data,
9090
(struct sockaddr *)remote_address, &socklen);
9191
// if we didn't receive anything return false and don't do anything else
9292
if (num_bytes_received < 0) {
93-
logger_.error("Receive failed: {}", error_string());
93+
logger_.info("Receive failed: {}", error_string());
9494
return false;
9595
}
9696
// we received data, so call the callback function if one was provided.
@@ -157,8 +157,11 @@ bool UdpSocket::server_task_function(size_t buffer_size, std::mutex &m, std::con
157157
// if we failed to receive, then likely we should delay a little bit
158158
using namespace std::chrono_literals;
159159
std::unique_lock<std::mutex> lk(m);
160-
cv.wait_for(lk, 1ms, [&task_notified] { return task_notified; });
160+
auto stop_requested = cv.wait_for(lk, 1ms, [&task_notified] { return task_notified; });
161161
task_notified = false;
162+
if (stop_requested) {
163+
return true;
164+
}
162165
return false;
163166
}
164167
if (!server_receive_callback_) {

0 commit comments

Comments
 (0)