diff --git a/rclcpp/include/rclcpp/clock.hpp b/rclcpp/include/rclcpp/clock.hpp index 547a3c2437..c67c63232e 100644 --- a/rclcpp/include/rclcpp/clock.hpp +++ b/rclcpp/include/rclcpp/clock.hpp @@ -201,12 +201,17 @@ class Clock ros_time_is_active(); /** + * Deprecated. This API is broken, as there is no way to get a deep + * copy of a clock. Therefore one can experience spurious wakeups triggered + * by some other instance of a clock. + * * Cancels an ongoing or future sleep operation of one thread. * * This function can be used by one thread, to wakeup another thread that is * blocked using any of the sleep_ or wait_ methods of this class. */ RCLCPP_PUBLIC + [[deprecated("Use ClockConditionalVariable")]] void cancel_sleep_or_wait(); @@ -267,6 +272,117 @@ class Clock std::shared_ptr impl_; }; +/** + * A synchronization primitive, equal to std::conditional_variable, + * that works with the rclcpp::Clock. + * + * For more information on the API see https://en.cppreference.com/w/cpp/thread/condition_variable. + * + * Note, this class does not handle shutdowns, if you want to + * haven them handles as well, use ClockConditionalVariable. + */ +class ClockWaiter +{ +private: + class ClockWaiterImpl; + std::unique_ptr impl_; + +public: + RCLCPP_SMART_PTR_DEFINITIONS(ClockWaiter) + + RCLCPP_PUBLIC + explicit ClockWaiter(const rclcpp::Clock::SharedPtr & clock); + + RCLCPP_PUBLIC + ~ClockWaiter(); + + /** + * Calling this function will block the current thread, until abs_time is reached, + * or pred returns true. + * @param lock A locked lock. The lock must be locked at call time, or this method will throw. + * The lock will be atomically released and this thread will blocked. + * @param abs_time The time until which this thread shall be blocked. + * @param pred may be called in cased of spurious wakeups, but must be called every time + * notify_one() was called. During the call to pred, the given lock will be locked. + * This method will return, if pred returns true. + */ + RCLCPP_PUBLIC + bool + wait_until( + std::unique_lock & lock, + const rclcpp::Time & abs_time, const std::function & pred); + + /** + * Notify the blocked thread, that it should reevaluate the wakeup condition. + * The given pred function in wait_until will be reevaluated and wait_until + * will return if it evaluates to true. + */ + RCLCPP_PUBLIC + void + notify_one(); +}; + + +/** + * A synchronization primitive, similar to std::conditional_variable, + * that works with the rclcpp::Clock. + * + * For more information on the API see https://en.cppreference.com/w/cpp/thread/condition_variable. + * + * This primitive will wake up if the context was shut down. + */ +class ClockConditionalVariable +{ + class Impl; + std::unique_ptr impl_; + +public: + RCLCPP_SMART_PTR_DEFINITIONS(ClockConditionalVariable) + + RCLCPP_PUBLIC + ClockConditionalVariable( + const rclcpp::Clock::SharedPtr & clock, + rclcpp::Context::SharedPtr context = rclcpp::contexts::get_global_default_context()); + RCLCPP_PUBLIC + ~ClockConditionalVariable(); + + /** + * Calling this function will block the current thread, until abs_time is reached, + * or pred returns true. + * @param lock A locked lock. The lock must be locked at call time, or this method will throw. + * The lock will be atomically released and this thread will blocked. + * The given lock must be created using the mutex returned my mutex(). + * @param abs_time The time until which this thread shall be blocked. + * @param pred may be called in cased of spurious wakeups, but must be called every time + * notify_one() was called. During the call to pred, the given lock will be locked. + * This method will return, if pred returns true. + * + * @return true if until was reached. + */ + RCLCPP_PUBLIC + bool + wait_until( + std::unique_lock & lock, rclcpp::Time until, + const std::function & pred); + + /** + * Notify the blocked thread, that is should reevaluate the wakeup condition. + * E.g. the given pred function in wait_until shall be reevaluated. + */ + RCLCPP_PUBLIC + void + notify_one(); + + /** + * Returns the internal mutex. In order to be race free with the context shutdown, + * this mutex must be used for the wait_until call. + */ + RCLCPP_PUBLIC + std::mutex & + mutex(); +}; + + } // namespace rclcpp #endif // RCLCPP__CLOCK_HPP_ diff --git a/rclcpp/src/rclcpp/clock.cpp b/rclcpp/src/rclcpp/clock.cpp index 5c13f19d13..f9b9c59d7a 100644 --- a/rclcpp/src/rclcpp/clock.cpp +++ b/rclcpp/src/rclcpp/clock.cpp @@ -367,4 +367,245 @@ Clock::create_jump_callback( // *INDENT-ON* } +class ClockWaiter::ClockWaiterImpl +{ +private: + std::condition_variable cv_; + + rclcpp::Clock::SharedPtr clock_; + bool time_source_changed_ = false; + std::function post_time_jump_callback; + + bool + wait_until_system_time( + std::unique_lock & lock, + const rclcpp::Time & abs_time, const std::function & pred) + { + auto system_time = std::chrono::system_clock::time_point( + // Cast because system clock resolution is too big for nanoseconds on some systems + std::chrono::duration_cast( + std::chrono::nanoseconds(abs_time.nanoseconds()))); + + return cv_.wait_until(lock, system_time, pred); + } + + bool + wait_until_steady_time( + std::unique_lock & lock, + const rclcpp::Time & abs_time, const std::function & pred) + { + // Synchronize because RCL steady clock epoch might differ from chrono::steady_clock epoch + const rclcpp::Time rcl_entry = clock_->now(); + const std::chrono::steady_clock::time_point chrono_entry = std::chrono::steady_clock::now(); + const rclcpp::Duration delta_t = abs_time - rcl_entry; + const std::chrono::steady_clock::time_point chrono_until = + chrono_entry + std::chrono::nanoseconds(delta_t.nanoseconds()); + + return cv_.wait_until(lock, chrono_until, pred); + } + + + bool + wait_until_ros_time( + std::unique_lock & lock, + const rclcpp::Time & abs_time, const std::function & pred) + { + // Install jump handler for any amount of time change, for two purposes: + // - if ROS time is active, check if time reached on each new clock sample + // - Trigger via on_clock_change to detect if time source changes, to invalidate sleep + rcl_jump_threshold_t threshold; + threshold.on_clock_change = true; + // 0 is disable, so -1 and 1 are smallest possible time changes + threshold.min_backward.nanoseconds = -1; + threshold.min_forward.nanoseconds = 1; + + time_source_changed_ = false; + + post_time_jump_callback = [this, &lock] (const rcl_time_jump_t & jump) + { + if (jump.clock_change != RCL_ROS_TIME_NO_CHANGE) { + std::lock_guard lk(*lock.mutex()); + time_source_changed_ = true; + } + cv_.notify_one(); + }; + + // Note this is a trade-off. Adding the callback for every call + // is expensive for high frequency calls. For low frequency waits + // its more overhead to have the callback being called all the time. + // As we expect the use case to be low frequency calls to wait_until + // with relative big pauses between the calls, we install it on demand. + auto clock_handler = clock_->create_jump_callback( + nullptr, + post_time_jump_callback, + threshold); + + if (!clock_->ros_time_is_active()) { + auto system_time = std::chrono::system_clock::time_point( + // Cast because system clock resolution is too big for nanoseconds on some systems + std::chrono::duration_cast( + std::chrono::nanoseconds(abs_time.nanoseconds()))); + + return cv_.wait_until(lock, system_time, [this, &pred] () { + return time_source_changed_ || pred(); + }); + } + + // RCL_ROS_TIME with ros_time_is_active. + // Just wait without "until" because installed + // jump callbacks wake the cv on every new sample. + cv_.wait(lock, [this, &pred, &abs_time] () { + return clock_->now() >= abs_time || time_source_changed_ || pred(); + }); + + return clock_->now() < abs_time; + } + +public: + explicit ClockWaiterImpl(const rclcpp::Clock::SharedPtr & clock) + :clock_(clock) + { + } + + bool + wait_until( + std::unique_lock & lock, + const rclcpp::Time & abs_time, const std::function & pred) + { + switch(clock_->get_clock_type()) { + case RCL_CLOCK_UNINITIALIZED: + throw std::runtime_error("Error, wait on uninitialized clock called"); + case RCL_ROS_TIME: + return wait_until_ros_time(lock, abs_time, pred); + break; + case RCL_STEADY_TIME: + return wait_until_steady_time(lock, abs_time, pred); + break; + case RCL_SYSTEM_TIME: + return wait_until_system_time(lock, abs_time, pred); + break; + } + + return false; + } + + void + notify_one() + { + cv_.notify_one(); + } +}; + +ClockWaiter::ClockWaiter(const rclcpp::Clock::SharedPtr & clock) +:impl_(std::make_unique(clock)) +{ +} + +ClockWaiter::~ClockWaiter() = default; + +bool +ClockWaiter::wait_until( + std::unique_lock & lock, + const rclcpp::Time & abs_time, const std::function & pred) +{ + return impl_->wait_until(lock, abs_time, pred); +} + +void +ClockWaiter::notify_one() +{ + impl_->notify_one(); +} + +class ClockConditionalVariable::Impl +{ + std::mutex pred_mutex_; + bool shutdown_ = false; + rclcpp::Context::SharedPtr context_; + rclcpp::OnShutdownCallbackHandle shutdown_cb_handle_; + ClockWaiter::UniquePtr clock_; + +public: + Impl(const rclcpp::Clock::SharedPtr & clock, rclcpp::Context::SharedPtr context) + :context_(context), + clock_(std::make_unique(clock)) + { + if (!context_ || !context_->is_valid()) { + throw std::runtime_error("context cannot be slept with because it's invalid"); + } + // Wake this thread if the context is shutdown + shutdown_cb_handle_ = context_->add_on_shutdown_callback( + [this]() { + { + std::unique_lock lock(pred_mutex_); + shutdown_ = true; + } + clock_->notify_one(); + }); + } + + ~Impl() + { + context_->remove_on_shutdown_callback(shutdown_cb_handle_); + } + + bool + wait_until( + std::unique_lock & lock, rclcpp::Time until, + const std::function & pred) + { + if(lock.mutex() != &pred_mutex_) { + throw std::runtime_error( + "ClockConditionalVariable::wait_until: Internal error, given lock does not use" + " mutex returned by this->mutex()"); + } + + clock_->wait_until(lock, until, [this, &pred] () -> bool { + return shutdown_ || pred(); + }); + return true; + } + + void + notify_one() + { + clock_->notify_one(); + } + + std::mutex & + mutex() + { + return pred_mutex_; + } +}; + +ClockConditionalVariable::ClockConditionalVariable( + const rclcpp::Clock::SharedPtr & clock, + rclcpp::Context::SharedPtr context) +:impl_(std::make_unique(clock, context)) +{ +} + +ClockConditionalVariable::~ClockConditionalVariable() = default; + +void +ClockConditionalVariable::notify_one() +{ + impl_->notify_one(); +} + +bool +ClockConditionalVariable::wait_until( + std::unique_lock & lock, rclcpp::Time until, + const std::function & pred) +{ + return impl_->wait_until(lock, until, pred); +} + +std::mutex & +ClockConditionalVariable::mutex() +{ + return impl_->mutex(); +} + } // namespace rclcpp diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index 35fa8fdd32..85e8b6c5e6 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -62,6 +62,11 @@ ament_add_test_label(test_clock mimick) if(TARGET test_clock) target_link_libraries(test_clock ${PROJECT_NAME} mimick ${rcl_interfaces_TARGETS} ${test_msgs_TARGETS}) endif() +ament_add_gtest(test_clock_conditional test_clock_conditional.cpp) +ament_add_test_label(test_clock_conditional mimick) +if(TARGET test_clock_conditional) + target_link_libraries(test_clock_conditional ${PROJECT_NAME} mimick ${rcl_interfaces_TARGETS} ${test_msgs_TARGETS}) +endif() ament_add_gtest(test_copy_all_parameter_values test_copy_all_parameter_values.cpp) if(TARGET test_copy_all_parameter_values) target_link_libraries(test_copy_all_parameter_values ${PROJECT_NAME}) diff --git a/rclcpp/test/rclcpp/test_clock.cpp b/rclcpp/test/rclcpp/test_clock.cpp index 2ddd775707..2d1742d0c6 100644 --- a/rclcpp/test/rclcpp/test_clock.cpp +++ b/rclcpp/test/rclcpp/test_clock.cpp @@ -15,6 +15,7 @@ #include #include +#include #include "rcl/error_handling.h" #include "rcl/time.h" @@ -41,9 +42,10 @@ class TestClockWakeup : public ::testing::TestWithParam clock->sleep_until(clock->now() + std::chrono::seconds(3)); thread_finished = true; }); - + RCPPUTILS_DEPRECATION_WARNING_OFF_START // notify the clock, that the sleep shall be interrupted clock->cancel_sleep_or_wait(); + RCPPUTILS_DEPRECATION_WARNING_OFF_STOP auto start_time = std::chrono::steady_clock::now(); auto cur_time = start_time; @@ -72,8 +74,10 @@ class TestClockWakeup : public ::testing::TestWithParam // make sure the thread is already sleeping before we send the cancel std::this_thread::sleep_for(std::chrono::milliseconds(100)); + RCPPUTILS_DEPRECATION_WARNING_OFF_START // notify the clock, that the sleep shall be interrupted clock->cancel_sleep_or_wait(); + RCPPUTILS_DEPRECATION_WARNING_OFF_STOP auto start_time = std::chrono::steady_clock::now(); auto cur_time = start_time; @@ -159,8 +163,10 @@ TEST_F(TestClockWakeup, no_wakeup_on_sim_time) { std::this_thread::sleep_for(std::chrono::milliseconds(500)); EXPECT_FALSE(thread_finished); + RCPPUTILS_DEPRECATION_WARNING_OFF_START // notify the clock, that the sleep shall be interrupted clock->cancel_sleep_or_wait(); + RCPPUTILS_DEPRECATION_WARNING_OFF_STOP auto start_time = std::chrono::steady_clock::now(); auto cur_time = start_time; diff --git a/rclcpp/test/rclcpp/test_clock_conditional.cpp b/rclcpp/test/rclcpp/test_clock_conditional.cpp new file mode 100644 index 0000000000..e924e212d7 --- /dev/null +++ b/rclcpp/test/rclcpp/test_clock_conditional.cpp @@ -0,0 +1,246 @@ +// Copyright 2025 Cellumation GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include "rcl/error_handling.h" +#include "rcl/time.h" +#include "rclcpp/clock.hpp" +#include "rclcpp/rclcpp.hpp" +#include "rclcpp/time_source.hpp" + +#include "../utils/rclcpp_gtest_macros.hpp" + +using namespace std::chrono_literals; + +class TestClockWakeup : public ::testing::TestWithParam +{ +public: + void test_wakeup_before_sleep(const rclcpp::Clock::SharedPtr & clock) + { + std::atomic_bool thread_finished = false; + rclcpp::ClockConditionalVariable cond(clock); + + bool stopSleeping = false; + + std::thread wait_thread = std::thread( + [&cond, &clock, &stopSleeping, &thread_finished]() + { + // make sure the thread starts sleeping late + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::unique_lock lk(cond.mutex()); + cond.wait_until(lk, clock->now() + std::chrono::seconds(3), + [&stopSleeping] () {return stopSleeping;}); + thread_finished = true; + }); + + { + std::lock_guard lk(cond.mutex()); + // stop sleeping after next notification + stopSleeping = true; + } + // notify the conditional, to recheck it pred + cond.notify_one(); + + auto start_time = std::chrono::steady_clock::now(); + auto cur_time = start_time; + while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + cur_time = std::chrono::steady_clock::now(); + } + + wait_thread.join(); + + EXPECT_TRUE(thread_finished); + EXPECT_LT(cur_time, start_time + std::chrono::seconds(1)); + } + + void test_wakeup_after_sleep(const rclcpp::Clock::SharedPtr & clock) + { + std::atomic_bool thread_finished = false; + rclcpp::ClockConditionalVariable cond(clock); + + bool stopSleeping = false; + + std::thread wait_thread = std::thread( + [&cond, &clock, &stopSleeping, &thread_finished]() + { + std::unique_lock lk(cond.mutex()); + cond.wait_until(lk, clock->now() + std::chrono::seconds(3), + [&stopSleeping] () {return stopSleeping;}); + thread_finished = true; + }); + + // make sure the thread is already sleeping before we send the cancel + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + { + std::lock_guard lk(cond.mutex()); + // stop sleeping after next notification + stopSleeping = true; + } + // notify the conditional, to recheck it pred + cond.notify_one(); + + auto start_time = std::chrono::steady_clock::now(); + auto cur_time = start_time; + while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + cur_time = std::chrono::steady_clock::now(); + } + + wait_thread.join(); + + EXPECT_TRUE(thread_finished); + EXPECT_LT(cur_time, start_time + std::chrono::seconds(1)); + } + +protected: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + + static void TearDownTestCase() + { + rclcpp::shutdown(); + } + + void SetUp() + { + node = std::make_shared("my_node"); + } + + void TearDown() + { + node.reset(); + } + + rclcpp::Node::SharedPtr node; +}; + +INSTANTIATE_TEST_SUITE_P( + ClockConditionalVariable, + TestClockWakeup, + ::testing::Values( + RCL_SYSTEM_TIME, RCL_ROS_TIME, RCL_STEADY_TIME +)); + +TEST_P(TestClockWakeup, wakeup_sleep) { + auto clock = std::make_shared(GetParam()); + test_wakeup_after_sleep(clock); + test_wakeup_before_sleep(clock); +} + +TEST_F(TestClockWakeup, wakeup_sleep_ros_time_active) { + node->set_parameter({"use_sim_time", true}); + auto clock = std::make_shared(RCL_ROS_TIME); + rclcpp::TimeSource time_source(node); + time_source.attachClock(clock); + + EXPECT_TRUE(clock->ros_time_is_active()); + + test_wakeup_after_sleep(clock); + test_wakeup_before_sleep(clock); +} + +TEST_F(TestClockWakeup, no_wakeup_on_sim_time) { + node->set_parameter({"use_sim_time", true}); + auto clock = std::make_shared(RCL_ROS_TIME); + EXPECT_FALSE(clock->ros_time_is_active()); + + rclcpp::TimeSource time_source(node); + time_source.attachClock(clock); + EXPECT_TRUE(clock->ros_time_is_active()); + + std::atomic_bool thread_finished = false; + rclcpp::ClockConditionalVariable cond(clock); + + bool stopSleeping = false; + + std::thread wait_thread = std::thread( + [&cond, &clock, &stopSleeping, &thread_finished]() + { + std::unique_lock lk(cond.mutex()); + // only sleep for an short period + cond.wait_until(lk, clock->now() + std::chrono::milliseconds(10), + [&stopSleeping] () {return stopSleeping;}); + thread_finished = true; + }); + + // make sure, that the sim time clock does not wakeup, as no clock is provided + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + EXPECT_FALSE(thread_finished); + + { + std::lock_guard lk(cond.mutex()); + // stop sleeping after next notification + stopSleeping = true; + } + // notify the conditional, to recheck it pred + cond.notify_one(); + + auto start_time = std::chrono::steady_clock::now(); + auto cur_time = start_time; + while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + cur_time = std::chrono::steady_clock::now(); + } + + wait_thread.join(); + + EXPECT_TRUE(thread_finished); + EXPECT_LT(cur_time, start_time + std::chrono::seconds(1)); +} + +TEST_F(TestClockWakeup, wakeup_on_ros_shutdown) { + auto clock = std::make_shared(RCL_ROS_TIME); + + std::atomic_bool thread_finished = false; + rclcpp::ClockConditionalVariable cond(clock); + + bool stopSleeping = false; + + std::thread wait_thread = std::thread( + [&cond, &clock, &stopSleeping, &thread_finished]() + { + std::unique_lock lk(cond.mutex()); + // only sleep for an short period + cond.wait_until(lk, clock->now() + std::chrono::seconds(10), + [&stopSleeping] () {return stopSleeping;}); + thread_finished = true; + }); + + // wait a bit to be sure the thread is sleeping + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + EXPECT_FALSE(thread_finished); + + rclcpp::shutdown(); + + auto start_time = std::chrono::steady_clock::now(); + auto cur_time = start_time; + while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + cur_time = std::chrono::steady_clock::now(); + } + + EXPECT_TRUE(thread_finished); + + wait_thread.join(); + + EXPECT_LT(cur_time, start_time + std::chrono::seconds(1)); +}