Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ClockWaiter and ClockConditionalVariable #2691

Merged
merged 2 commits into from
Mar 19, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions rclcpp/include/rclcpp/clock.hpp
Original file line number Diff line number Diff line change
@@ -201,12 +201,17 @@ class Clock
ros_time_is_active();

/**
* Deprecated. This API is broken, as there is no way to get a deep
* copy of a clock. Therefore one can experience spurious wakeups triggered
* by some other instance of a clock.
*
* Cancels an ongoing or future sleep operation of one thread.
*
* This function can be used by one thread, to wakeup another thread that is
* blocked using any of the sleep_ or wait_ methods of this class.
*/
RCLCPP_PUBLIC
[[deprecated("Use ClockConditionalVariable")]]
void
cancel_sleep_or_wait();

@@ -267,6 +272,117 @@ class Clock
std::shared_ptr<Impl> impl_;
};

/**
* A synchronization primitive, equal to std::conditional_variable,
* that works with the rclcpp::Clock.
*
* For more information on the API see https://en.cppreference.com/w/cpp/thread/condition_variable.
*
* Note, this class does not handle shutdowns, if you want to
* haven them handles as well, use ClockConditionalVariable.
*/
class ClockWaiter
{
private:
class ClockWaiterImpl;
std::unique_ptr<ClockWaiterImpl> impl_;

public:
RCLCPP_SMART_PTR_DEFINITIONS(ClockWaiter)

RCLCPP_PUBLIC
explicit ClockWaiter(const rclcpp::Clock::SharedPtr & clock);

RCLCPP_PUBLIC
~ClockWaiter();

/**
* Calling this function will block the current thread, until abs_time is reached,
* or pred returns true.
* @param lock A locked lock. The lock must be locked at call time, or this method will throw.
* The lock will be atomically released and this thread will blocked.
* @param abs_time The time until which this thread shall be blocked.
* @param pred may be called in cased of spurious wakeups, but must be called every time
* notify_one() was called. During the call to pred, the given lock will be locked.
* This method will return, if pred returns true.
*/
RCLCPP_PUBLIC
bool
wait_until(
std::unique_lock<std::mutex> & lock,
const rclcpp::Time & abs_time, const std::function<bool ()> & pred);

/**
* Notify the blocked thread, that it should reevaluate the wakeup condition.
* The given pred function in wait_until will be reevaluated and wait_until
* will return if it evaluates to true.
*/
RCLCPP_PUBLIC
void
notify_one();
};


/**
* A synchronization primitive, similar to std::conditional_variable,
* that works with the rclcpp::Clock.
*
* For more information on the API see https://en.cppreference.com/w/cpp/thread/condition_variable.
*
* This primitive will wake up if the context was shut down.
*/
class ClockConditionalVariable
{
class Impl;
std::unique_ptr<Impl> impl_;

public:
RCLCPP_SMART_PTR_DEFINITIONS(ClockConditionalVariable)

RCLCPP_PUBLIC
ClockConditionalVariable(
const rclcpp::Clock::SharedPtr & clock,
rclcpp::Context::SharedPtr context = rclcpp::contexts::get_global_default_context());
RCLCPP_PUBLIC
~ClockConditionalVariable();

/**
* Calling this function will block the current thread, until abs_time is reached,
* or pred returns true.
* @param lock A locked lock. The lock must be locked at call time, or this method will throw.
* The lock will be atomically released and this thread will blocked.
* The given lock must be created using the mutex returned my mutex().
* @param abs_time The time until which this thread shall be blocked.
* @param pred may be called in cased of spurious wakeups, but must be called every time
* notify_one() was called. During the call to pred, the given lock will be locked.
* This method will return, if pred returns true.
*
* @return true if until was reached.
*/
RCLCPP_PUBLIC
bool
wait_until(
std::unique_lock<std::mutex> & lock, rclcpp::Time until,
const std::function<bool ()> & pred);

/**
* Notify the blocked thread, that is should reevaluate the wakeup condition.
* E.g. the given pred function in wait_until shall be reevaluated.
*/
RCLCPP_PUBLIC
void
notify_one();

/**
* Returns the internal mutex. In order to be race free with the context shutdown,
* this mutex must be used for the wait_until call.
*/
RCLCPP_PUBLIC
std::mutex &
mutex();
};


} // namespace rclcpp

#endif // RCLCPP__CLOCK_HPP_
241 changes: 241 additions & 0 deletions rclcpp/src/rclcpp/clock.cpp
Original file line number Diff line number Diff line change
@@ -367,4 +367,245 @@ Clock::create_jump_callback(
// *INDENT-ON*
}

class ClockWaiter::ClockWaiterImpl
{
private:
std::condition_variable cv_;

rclcpp::Clock::SharedPtr clock_;
bool time_source_changed_ = false;
std::function<void(const rcl_time_jump_t &)> post_time_jump_callback;

bool
wait_until_system_time(
std::unique_lock<std::mutex> & lock,
const rclcpp::Time & abs_time, const std::function<bool ()> & pred)
{
auto system_time = std::chrono::system_clock::time_point(
// Cast because system clock resolution is too big for nanoseconds on some systems
std::chrono::duration_cast<std::chrono::system_clock::duration>(
std::chrono::nanoseconds(abs_time.nanoseconds())));

return cv_.wait_until(lock, system_time, pred);
}

bool
wait_until_steady_time(
std::unique_lock<std::mutex> & lock,
const rclcpp::Time & abs_time, const std::function<bool ()> & pred)
{
// Synchronize because RCL steady clock epoch might differ from chrono::steady_clock epoch
const rclcpp::Time rcl_entry = clock_->now();
const std::chrono::steady_clock::time_point chrono_entry = std::chrono::steady_clock::now();
const rclcpp::Duration delta_t = abs_time - rcl_entry;
const std::chrono::steady_clock::time_point chrono_until =
chrono_entry + std::chrono::nanoseconds(delta_t.nanoseconds());

return cv_.wait_until(lock, chrono_until, pred);
}


bool
wait_until_ros_time(
std::unique_lock<std::mutex> & lock,
const rclcpp::Time & abs_time, const std::function<bool ()> & pred)
{
// Install jump handler for any amount of time change, for two purposes:
// - if ROS time is active, check if time reached on each new clock sample
// - Trigger via on_clock_change to detect if time source changes, to invalidate sleep
rcl_jump_threshold_t threshold;
threshold.on_clock_change = true;
// 0 is disable, so -1 and 1 are smallest possible time changes
threshold.min_backward.nanoseconds = -1;
threshold.min_forward.nanoseconds = 1;

time_source_changed_ = false;

post_time_jump_callback = [this, &lock] (const rcl_time_jump_t & jump)
{
if (jump.clock_change != RCL_ROS_TIME_NO_CHANGE) {
std::lock_guard<std::mutex> lk(*lock.mutex());
time_source_changed_ = true;
}
cv_.notify_one();
};

// Note this is a trade-off. Adding the callback for every call
// is expensive for high frequency calls. For low frequency waits
// its more overhead to have the callback being called all the time.
// As we expect the use case to be low frequency calls to wait_until
// with relative big pauses between the calls, we install it on demand.
auto clock_handler = clock_->create_jump_callback(
nullptr,
post_time_jump_callback,
threshold);

if (!clock_->ros_time_is_active()) {
auto system_time = std::chrono::system_clock::time_point(
// Cast because system clock resolution is too big for nanoseconds on some systems
std::chrono::duration_cast<std::chrono::system_clock::duration>(
std::chrono::nanoseconds(abs_time.nanoseconds())));

return cv_.wait_until(lock, system_time, [this, &pred] () {
return time_source_changed_ || pred();
});
}

// RCL_ROS_TIME with ros_time_is_active.
// Just wait without "until" because installed
// jump callbacks wake the cv on every new sample.
cv_.wait(lock, [this, &pred, &abs_time] () {
return clock_->now() >= abs_time || time_source_changed_ || pred();
});

return clock_->now() < abs_time;
}

public:
explicit ClockWaiterImpl(const rclcpp::Clock::SharedPtr & clock)
:clock_(clock)
{
}

bool
wait_until(
std::unique_lock<std::mutex> & lock,
const rclcpp::Time & abs_time, const std::function<bool ()> & pred)
{
switch(clock_->get_clock_type()) {
case RCL_CLOCK_UNINITIALIZED:
throw std::runtime_error("Error, wait on uninitialized clock called");
case RCL_ROS_TIME:
return wait_until_ros_time(lock, abs_time, pred);
break;
case RCL_STEADY_TIME:
return wait_until_steady_time(lock, abs_time, pred);
break;
case RCL_SYSTEM_TIME:
return wait_until_system_time(lock, abs_time, pred);
break;
}

return false;
}

void
notify_one()
{
cv_.notify_one();
}
};

ClockWaiter::ClockWaiter(const rclcpp::Clock::SharedPtr & clock)
:impl_(std::make_unique<ClockWaiterImpl>(clock))
{
}

ClockWaiter::~ClockWaiter() = default;

bool
ClockWaiter::wait_until(
std::unique_lock<std::mutex> & lock,
const rclcpp::Time & abs_time, const std::function<bool ()> & pred)
{
return impl_->wait_until(lock, abs_time, pred);
}

void
ClockWaiter::notify_one()
{
impl_->notify_one();
}

class ClockConditionalVariable::Impl
{
std::mutex pred_mutex_;
bool shutdown_ = false;
rclcpp::Context::SharedPtr context_;
rclcpp::OnShutdownCallbackHandle shutdown_cb_handle_;
ClockWaiter::UniquePtr clock_;

public:
Impl(const rclcpp::Clock::SharedPtr & clock, rclcpp::Context::SharedPtr context)
:context_(context),
clock_(std::make_unique<ClockWaiter>(clock))
{
if (!context_ || !context_->is_valid()) {
throw std::runtime_error("context cannot be slept with because it's invalid");
}
// Wake this thread if the context is shutdown
shutdown_cb_handle_ = context_->add_on_shutdown_callback(
[this]() {
{
std::unique_lock lock(pred_mutex_);
shutdown_ = true;
}
clock_->notify_one();
});
}

~Impl()
{
context_->remove_on_shutdown_callback(shutdown_cb_handle_);
}

bool
wait_until(
std::unique_lock<std::mutex> & lock, rclcpp::Time until,
const std::function<bool ()> & pred)
{
if(lock.mutex() != &pred_mutex_) {
throw std::runtime_error(
"ClockConditionalVariable::wait_until: Internal error, given lock does not use"
" mutex returned by this->mutex()");
}

clock_->wait_until(lock, until, [this, &pred] () -> bool {
return shutdown_ || pred();
});
return true;
}

void
notify_one()
{
clock_->notify_one();
}

std::mutex &
mutex()
{
return pred_mutex_;
}
};

ClockConditionalVariable::ClockConditionalVariable(
const rclcpp::Clock::SharedPtr & clock,
rclcpp::Context::SharedPtr context)
:impl_(std::make_unique<Impl>(clock, context))
{
}

ClockConditionalVariable::~ClockConditionalVariable() = default;

void
ClockConditionalVariable::notify_one()
{
impl_->notify_one();
}

bool
ClockConditionalVariable::wait_until(
std::unique_lock<std::mutex> & lock, rclcpp::Time until,
const std::function<bool ()> & pred)
{
return impl_->wait_until(lock, until, pred);
}

std::mutex &
ClockConditionalVariable::mutex()
{
return impl_->mutex();
}

} // namespace rclcpp
5 changes: 5 additions & 0 deletions rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -62,6 +62,11 @@ ament_add_test_label(test_clock mimick)
if(TARGET test_clock)
target_link_libraries(test_clock ${PROJECT_NAME} mimick ${rcl_interfaces_TARGETS} ${test_msgs_TARGETS})
endif()
ament_add_gtest(test_clock_conditional test_clock_conditional.cpp)
ament_add_test_label(test_clock_conditional mimick)
if(TARGET test_clock_conditional)
target_link_libraries(test_clock_conditional ${PROJECT_NAME} mimick ${rcl_interfaces_TARGETS} ${test_msgs_TARGETS})
endif()
ament_add_gtest(test_copy_all_parameter_values test_copy_all_parameter_values.cpp)
if(TARGET test_copy_all_parameter_values)
target_link_libraries(test_copy_all_parameter_values ${PROJECT_NAME})
8 changes: 7 additions & 1 deletion rclcpp/test/rclcpp/test_clock.cpp
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
#include <gtest/gtest.h>

#include <chrono>
#include <rcpputils/compile_warnings.hpp>

#include "rcl/error_handling.h"
#include "rcl/time.h"
@@ -41,9 +42,10 @@ class TestClockWakeup : public ::testing::TestWithParam<rcl_clock_type_e>
clock->sleep_until(clock->now() + std::chrono::seconds(3));
thread_finished = true;
});

RCPPUTILS_DEPRECATION_WARNING_OFF_START
// notify the clock, that the sleep shall be interrupted
clock->cancel_sleep_or_wait();
RCPPUTILS_DEPRECATION_WARNING_OFF_STOP

auto start_time = std::chrono::steady_clock::now();
auto cur_time = start_time;
@@ -72,8 +74,10 @@ class TestClockWakeup : public ::testing::TestWithParam<rcl_clock_type_e>
// make sure the thread is already sleeping before we send the cancel
std::this_thread::sleep_for(std::chrono::milliseconds(100));

RCPPUTILS_DEPRECATION_WARNING_OFF_START
// notify the clock, that the sleep shall be interrupted
clock->cancel_sleep_or_wait();
RCPPUTILS_DEPRECATION_WARNING_OFF_STOP

auto start_time = std::chrono::steady_clock::now();
auto cur_time = start_time;
@@ -159,8 +163,10 @@ TEST_F(TestClockWakeup, no_wakeup_on_sim_time) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
EXPECT_FALSE(thread_finished);

RCPPUTILS_DEPRECATION_WARNING_OFF_START
// notify the clock, that the sleep shall be interrupted
clock->cancel_sleep_or_wait();
RCPPUTILS_DEPRECATION_WARNING_OFF_STOP

auto start_time = std::chrono::steady_clock::now();
auto cur_time = start_time;
246 changes: 246 additions & 0 deletions rclcpp/test/rclcpp/test_clock_conditional.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
// Copyright 2025 Cellumation GmbH
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <gtest/gtest.h>

#include <chrono>

#include "rcl/error_handling.h"
#include "rcl/time.h"
#include "rclcpp/clock.hpp"
#include "rclcpp/rclcpp.hpp"
#include "rclcpp/time_source.hpp"

#include "../utils/rclcpp_gtest_macros.hpp"

using namespace std::chrono_literals;

class TestClockWakeup : public ::testing::TestWithParam<rcl_clock_type_e>
{
public:
void test_wakeup_before_sleep(const rclcpp::Clock::SharedPtr & clock)
{
std::atomic_bool thread_finished = false;
rclcpp::ClockConditionalVariable cond(clock);

bool stopSleeping = false;

std::thread wait_thread = std::thread(
[&cond, &clock, &stopSleeping, &thread_finished]()
{
// make sure the thread starts sleeping late
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::unique_lock lk(cond.mutex());
cond.wait_until(lk, clock->now() + std::chrono::seconds(3),
[&stopSleeping] () {return stopSleeping;});
thread_finished = true;
});

{
std::lock_guard lk(cond.mutex());
// stop sleeping after next notification
stopSleeping = true;
}
// notify the conditional, to recheck it pred
cond.notify_one();

auto start_time = std::chrono::steady_clock::now();
auto cur_time = start_time;
while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
cur_time = std::chrono::steady_clock::now();
}

wait_thread.join();

EXPECT_TRUE(thread_finished);
EXPECT_LT(cur_time, start_time + std::chrono::seconds(1));
}

void test_wakeup_after_sleep(const rclcpp::Clock::SharedPtr & clock)
{
std::atomic_bool thread_finished = false;
rclcpp::ClockConditionalVariable cond(clock);

bool stopSleeping = false;

std::thread wait_thread = std::thread(
[&cond, &clock, &stopSleeping, &thread_finished]()
{
std::unique_lock lk(cond.mutex());
cond.wait_until(lk, clock->now() + std::chrono::seconds(3),
[&stopSleeping] () {return stopSleeping;});
thread_finished = true;
});

// make sure the thread is already sleeping before we send the cancel
std::this_thread::sleep_for(std::chrono::milliseconds(100));

{
std::lock_guard lk(cond.mutex());
// stop sleeping after next notification
stopSleeping = true;
}
// notify the conditional, to recheck it pred
cond.notify_one();

auto start_time = std::chrono::steady_clock::now();
auto cur_time = start_time;
while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
cur_time = std::chrono::steady_clock::now();
}

wait_thread.join();

EXPECT_TRUE(thread_finished);
EXPECT_LT(cur_time, start_time + std::chrono::seconds(1));
}

protected:
static void SetUpTestCase()
{
rclcpp::init(0, nullptr);
}

static void TearDownTestCase()
{
rclcpp::shutdown();
}

void SetUp()
{
node = std::make_shared<rclcpp::Node>("my_node");
}

void TearDown()
{
node.reset();
}

rclcpp::Node::SharedPtr node;
};

INSTANTIATE_TEST_SUITE_P(
ClockConditionalVariable,
TestClockWakeup,
::testing::Values(
RCL_SYSTEM_TIME, RCL_ROS_TIME, RCL_STEADY_TIME
));

TEST_P(TestClockWakeup, wakeup_sleep) {
auto clock = std::make_shared<rclcpp::Clock>(GetParam());
test_wakeup_after_sleep(clock);
test_wakeup_before_sleep(clock);
}

TEST_F(TestClockWakeup, wakeup_sleep_ros_time_active) {
node->set_parameter({"use_sim_time", true});
auto clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
rclcpp::TimeSource time_source(node);
time_source.attachClock(clock);

EXPECT_TRUE(clock->ros_time_is_active());

test_wakeup_after_sleep(clock);
test_wakeup_before_sleep(clock);
}

TEST_F(TestClockWakeup, no_wakeup_on_sim_time) {
node->set_parameter({"use_sim_time", true});
auto clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
EXPECT_FALSE(clock->ros_time_is_active());

rclcpp::TimeSource time_source(node);
time_source.attachClock(clock);
EXPECT_TRUE(clock->ros_time_is_active());

std::atomic_bool thread_finished = false;
rclcpp::ClockConditionalVariable cond(clock);

bool stopSleeping = false;

std::thread wait_thread = std::thread(
[&cond, &clock, &stopSleeping, &thread_finished]()
{
std::unique_lock lk(cond.mutex());
// only sleep for an short period
cond.wait_until(lk, clock->now() + std::chrono::milliseconds(10),
[&stopSleeping] () {return stopSleeping;});
thread_finished = true;
});

// make sure, that the sim time clock does not wakeup, as no clock is provided
std::this_thread::sleep_for(std::chrono::milliseconds(500));
EXPECT_FALSE(thread_finished);

{
std::lock_guard lk(cond.mutex());
// stop sleeping after next notification
stopSleeping = true;
}
// notify the conditional, to recheck it pred
cond.notify_one();

auto start_time = std::chrono::steady_clock::now();
auto cur_time = start_time;
while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
cur_time = std::chrono::steady_clock::now();
}

wait_thread.join();

EXPECT_TRUE(thread_finished);
EXPECT_LT(cur_time, start_time + std::chrono::seconds(1));
}

TEST_F(TestClockWakeup, wakeup_on_ros_shutdown) {
auto clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);

std::atomic_bool thread_finished = false;
rclcpp::ClockConditionalVariable cond(clock);

bool stopSleeping = false;

std::thread wait_thread = std::thread(
[&cond, &clock, &stopSleeping, &thread_finished]()
{
std::unique_lock lk(cond.mutex());
// only sleep for an short period
cond.wait_until(lk, clock->now() + std::chrono::seconds(10),
[&stopSleeping] () {return stopSleeping;});
thread_finished = true;
});

// wait a bit to be sure the thread is sleeping
std::this_thread::sleep_for(std::chrono::milliseconds(500));

EXPECT_FALSE(thread_finished);

rclcpp::shutdown();

auto start_time = std::chrono::steady_clock::now();
auto cur_time = start_time;
while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
cur_time = std::chrono::steady_clock::now();
}

EXPECT_TRUE(thread_finished);

wait_thread.join();

EXPECT_LT(cur_time, start_time + std::chrono::seconds(1));
}