From c8feaea5b64e824bbe76e920f48a3ca39b72f9fc Mon Sep 17 00:00:00 2001 From: Christophe Bedard Date: Mon, 2 Dec 2024 16:29:50 -0800 Subject: [PATCH] Add PlayerClock::wakeup() to interrupt sleeping (#1869) * Add PlayerClock::wakeup() to interrupt sleeping Signed-off-by: Christophe Bedard * Use PlayerClock::wakeup() in new play_next() impl Signed-off-by: Christophe Bedard --------- Signed-off-by: Christophe Bedard --- .../rosbag2_cpp/clocks/player_clock.hpp | 13 ++++++++++ .../clocks/time_controller_clock.hpp | 6 +++++ .../clocks/time_controller_clock.cpp | 16 ++++++++++++ .../test_time_controller_clock.cpp | 26 +++++++++++++++++++ .../src/rosbag2_transport/player.cpp | 15 +++++------ 5 files changed, 67 insertions(+), 9 deletions(-) diff --git a/rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp b/rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp index 1b8a7c2594..77f236ed16 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp @@ -77,6 +77,19 @@ class PlayerClock ROSBAG2_CPP_PUBLIC virtual bool sleep_until(rclcpp::Time time) = 0; + /** + * \return whether the clock is currently sleeping. + */ + ROSBAG2_CPP_PUBLIC + virtual bool is_sleeping() = 0; + + /** + * \brief Wake up the clock if it is sleeping. + * \note This will wake any waiting `sleep_until`. + */ + ROSBAG2_CPP_PUBLIC + virtual void wakeup() = 0; + /** * Change the rate of the flow of time for the clock. * \param rate new rate of clock playback diff --git a/rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp b/rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp index e1fdadc96c..0c68b5f657 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp @@ -81,6 +81,12 @@ class TimeControllerClock : public PlayerClock ROSBAG2_CPP_PUBLIC bool sleep_until(rclcpp::Time until) override; + ROSBAG2_CPP_PUBLIC + bool is_sleeping() override; + + ROSBAG2_CPP_PUBLIC + void wakeup() override; + /** * Change the rate of the flow of time for the clock. * diff --git a/rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp b/rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp index 24e4adae0a..a936acb2a5 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp @@ -123,6 +123,7 @@ class TimeControllerClockImpl std::condition_variable cv RCPPUTILS_TSA_GUARDED_BY(state_mutex); double rate RCPPUTILS_TSA_GUARDED_BY(state_mutex) = 1.0; bool paused RCPPUTILS_TSA_GUARDED_BY(state_mutex) = false; + bool is_sleeping RCPPUTILS_TSA_GUARDED_BY(state_mutex) = false; TimeReference reference RCPPUTILS_TSA_GUARDED_BY(state_mutex); }; @@ -165,12 +166,16 @@ bool TimeControllerClock::sleep_until(rcutils_time_point_value_t until) { rcpputils::unique_lock lock(impl_->state_mutex); if (impl_->paused) { + impl_->is_sleeping = true; impl_->cv.wait_for(lock, impl_->sleep_time_while_paused); + impl_->is_sleeping = false; } else { const auto steady_until = impl_->ros_to_steady(until); // wait only if necessary for performance if (steady_until > impl_->now_fn()) { + impl_->is_sleeping = true; impl_->cv.wait_until(lock, steady_until); + impl_->is_sleeping = false; } } if (impl_->paused) { @@ -187,6 +192,17 @@ bool TimeControllerClock::sleep_until(rclcpp::Time until) return sleep_until(until.nanoseconds()); } +bool TimeControllerClock::is_sleeping() +{ + std::lock_guard lock(impl_->state_mutex); + return impl_->is_sleeping; +} + +void TimeControllerClock::wakeup() +{ + impl_->cv.notify_all(); +} + bool TimeControllerClock::set_rate(double rate) { if (rate <= 0) { diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_time_controller_clock.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_time_controller_clock.cpp index 07c5e1ce51..34bb4aac9a 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_time_controller_clock.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_time_controller_clock.cpp @@ -211,6 +211,32 @@ TEST_F(TimeControllerClockTest, interrupted_sleep_returns_false_immediately) EXPECT_FALSE(thread_sleep_result); } +TEST_F(TimeControllerClockTest, wakeup_sleep_returns_immediately) +{ + // Use a bigger value for sleep_time_while_paused + auto sleep_time_while_paused = std::chrono::seconds(30); + rosbag2_cpp::TimeControllerClock clock(ros_start_time, now_fn, sleep_time_while_paused); + clock.pause(); + EXPECT_FALSE(clock.is_sleeping()); + auto steady_start = std::chrono::steady_clock::now(); + std::atomic_bool thread_sleep_result{true}; + auto sleep_long_thread = std::thread( + [&clock, &thread_sleep_result]() { + bool sleep_result = clock.sleep_until(clock.now() + RCUTILS_S_TO_NS(10)); + thread_sleep_result.store(sleep_result); + }); + // Wait for the thread to be sleeping for the purposes of this test + while (!clock.is_sleeping()) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + clock.wakeup(); + sleep_long_thread.join(); + EXPECT_FALSE(clock.is_sleeping()); + // Check that the clock slept less than the sleep_time_while_paused value + EXPECT_LT(std::chrono::steady_clock::now() - steady_start, sleep_time_while_paused); + EXPECT_FALSE(thread_sleep_result); +} + TEST_F(TimeControllerClockTest, resumes_at_correct_ros_time) { rosbag2_cpp::TimeControllerClock clock(ros_start_time, now_fn); diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 2f3ba90dee..d3b322ee5f 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -641,8 +641,8 @@ void PlayerImpl::stop() } if (clock_->is_paused()) { - clock_->resume(); // Temporary resume clock to force wakeup in clock_->sleep_until(time) - clock_->pause(); // Return in pause mode to preserve original state of the player + // Wake up the clock in case it's in a sleep_until(time) call + clock_->wakeup(); } // Note: Don't clean up message queue here. It will be cleaned up automatically in // playback thread after finishing play_messages_from_queue(); @@ -739,10 +739,8 @@ bool PlayerImpl::play_next() // Wait for play next to be done, and then return the result std::unique_lock lk(finished_play_next_mutex_); - // Temporarily resume clock to force wakeup in clock_->sleep_until(time), - // then return in pause mode to preserve original state of the player - clock_->resume(); - clock_->pause(); + // Wake up the clock in case it's in a sleep_until(time) call + clock_->wakeup(); finished_play_next_ = false; finished_play_next_cv_.wait(lk, [this] {return finished_play_next_.load();}); play_next_ = false; @@ -1044,9 +1042,8 @@ void PlayerImpl::play_messages_from_queue() while (!stop_playback_ && is_paused() && !play_next_.load() && rclcpp::ok()) { clock_->sleep_until(clock_->now()); } - // If we ran out of messages and are not in pause state, it means we're done playing, - // unless play_next() is resuming and pausing the clock in order to wake us up - if (!is_paused() && !play_next_.load()) { + // If we ran out of messages and are not in pause state, it means we're done playing + if (!is_paused()) { break; }