Skip to content

Commit 7379274

Browse files
MichaelOrlovmorlov-apexai
authored andcommitted
Temporary removes the on_new_[message]event on disable_callbacks()
Note: While we are temporary removes the on new message callback and all on new event callbacks from the underlying rmw layer to prevent them from being called. We can't guarantee that the currently executing on_new_[message]event callbacks are finished before we exit from the disable_callbacks(). Signed-off-by: Michael Orlov <[email protected]>
1 parent 19cab23 commit 7379274

File tree

9 files changed

+138
-16
lines changed

9 files changed

+138
-16
lines changed

rclcpp/include/rclcpp/event_handler.hpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,15 +324,40 @@ class EventHandler : public EventHandlerBase
324324
}
325325

326326
/// Disable the event callback from being called when execute(..) invoked
327+
/**
328+
* This will also temporarily remove the on_new_event_callback from the underlying rmw layer,
329+
* so that it is not called from the middleware while disabled.
330+
*/
327331
void disable() override
328332
{
333+
{
334+
// Temporary remove the on_new_event_callback_ to prevent it from being called
335+
std::lock_guard<std::recursive_mutex> on_new_event_lock(on_new_event_callback_mutex_);
336+
if (on_new_event_callback_) {
337+
set_on_new_event_callback(nullptr, nullptr);
338+
}
339+
}
329340
std::lock_guard<std::mutex> event_callback_lock(event_callback_mutex_);
330341
disabled_.store(true);
331342
}
332343

333344
/// Enable the event callback to be called when execute(..) invoked
345+
/**
346+
* This will also set back the on_new_event_callback to the underlying rmw layer, if it was
347+
* previously removed with disable().
348+
*/
334349
void enable() override
335350
{
351+
{
352+
// Set callback again if it was previously removed in disable()
353+
std::lock_guard<std::recursive_mutex> on_new_event_lock(on_new_event_callback_mutex_);
354+
if (on_new_event_callback_) {
355+
set_on_new_event_callback(
356+
rclcpp::detail::cpp_callback_trampoline<
357+
decltype(on_new_event_callback_), const void *, size_t>,
358+
static_cast<const void *>(&on_new_event_callback_));
359+
}
360+
}
336361
std::lock_guard<std::mutex> event_callback_lock(event_callback_mutex_);
337362
disabled_.store(false);
338363
}

rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,13 +154,26 @@ class SubscriptionIntraProcess
154154
execute_impl<SubscribedType>(data);
155155
}
156156

157+
/// Disable callbacks from being called
158+
/**
159+
* This method will block, until any subscription's callbacks currently being executed are
160+
* finished.
161+
* This method is thread safe, and provides a safe way to atomically disable the callbacks.
162+
*/
157163
void disable_callbacks() override
158164
{
165+
SubscriptionIntraProcessBase::disable_callbacks();
159166
any_callback_.disable();
160167
}
161168

169+
/// Enable the callbacks to be called
170+
/**
171+
* This method is thread safe, and provides a safe way to atomically enable the callbacks
172+
* in a multithreaded environment.
173+
*/
162174
void enable_callbacks() override
163175
{
176+
SubscriptionIntraProcessBase::enable_callbacks();
164177
any_callback_.enable();
165178
}
166179

rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,20 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
8888
execute(const std::shared_ptr<void> & data) override = 0;
8989

9090
/// Disable callbacks from being called
91+
/**
92+
* This function temporary disable on_new_message_callback to prevent it from being called.
93+
*/
9194
RCLCPP_PUBLIC
9295
virtual
93-
void disable_callbacks() = 0;
96+
void disable_callbacks();
9497

9598
/// Enable the callbacks to be called
99+
/**
100+
* This function enable the on_new_message_callback if it was previously set.
101+
*/
96102
RCLCPP_PUBLIC
97103
virtual
98-
void enable_callbacks() = 0;
104+
void enable_callbacks();
99105

100106
virtual
101107
bool
@@ -168,7 +174,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
168174
}
169175
};
170176

171-
std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
177+
std::lock_guard<std::recursive_mutex> lock(on_new_message_callback_mutex_);
172178
on_new_message_callback_ = new_callback;
173179

174180
if (unread_count_ > 0) {
@@ -186,7 +192,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
186192
void
187193
clear_on_ready_callback() override
188194
{
189-
std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
195+
std::lock_guard<std::recursive_mutex> lock(on_new_message_callback_mutex_);
190196
on_new_message_callback_ = nullptr;
191197
}
192198

@@ -198,8 +204,9 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
198204
}
199205

200206
protected:
201-
std::recursive_mutex callback_mutex_;
207+
std::recursive_mutex on_new_message_callback_mutex_;
202208
std::function<void(size_t)> on_new_message_callback_ {nullptr};
209+
bool on_new_message_callback_disabled_{false};
203210
size_t unread_count_{0};
204211
rclcpp::GuardCondition gc_;
205212

@@ -209,11 +216,13 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
209216
void
210217
invoke_on_new_message()
211218
{
212-
std::lock_guard<std::recursive_mutex> lock(this->callback_mutex_);
213-
if (this->on_new_message_callback_) {
214-
this->on_new_message_callback_(1);
215-
} else {
216-
this->unread_count_++;
219+
std::lock_guard<std::recursive_mutex> lock(this->on_new_message_callback_mutex_);
220+
if (!on_new_message_callback_disabled_) {
221+
if (this->on_new_message_callback_) {
222+
this->on_new_message_callback_(1);
223+
} else {
224+
this->unread_count_++;
225+
}
217226
}
218227
}
219228

rclcpp/include/rclcpp/generic_subscription.hpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,23 @@ class GenericSubscription : public rclcpp::SubscriptionBase
111111
RCLCPP_PUBLIC
112112
std::shared_ptr<rclcpp::SerializedMessage> create_serialized_message() override;
113113

114+
/// Disable callbacks from being called
115+
/**
116+
* This method will block, until any subscription's callbacks provided during construction
117+
* currently being executed are finished.
118+
* \note This method also temporary removes the on new message callback and all
119+
* on new event callbacks from the rmw layer to prevent them from being called. However, this
120+
* method will not block and wait until the currently executing on_new_[message]event callbacks
121+
* are finished.
122+
*/
114123
RCLCPP_PUBLIC
115124
void disable_callbacks() override;
116125

126+
/// Enable the callbacks to be called
127+
/**
128+
* This method is thread safe, and provides a safe way to atomically enable the callbacks
129+
* in a multithreaded environment.
130+
*/
117131
RCLCPP_PUBLIC
118132
void enable_callbacks() override;
119133

rclcpp/include/rclcpp/subscription.hpp

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,10 +279,19 @@ class Subscription : public SubscriptionBase
279279
return message_memory_strategy_->borrow_serialized_message();
280280
}
281281

282-
282+
/// Disable callbacks from being called
283+
/**
284+
* This method will block, until any subscription's callbacks provided during construction
285+
* currently being executed are finished.
286+
* \note This method also temporary removes the on new message callback and all
287+
* on new event callbacks from the rmw layer to prevent them from being called. However, this
288+
* method will not block and wait until the currently executing on_new_[message]event callbacks
289+
* are finished.
290+
*/
283291
void
284292
disable_callbacks() override
285293
{
294+
SubscriptionBase::disable_callbacks();
286295
any_callback_.disable();
287296
if (subscription_intra_process_) {
288297
subscription_intra_process_->disable_callbacks();
@@ -294,9 +303,15 @@ class Subscription : public SubscriptionBase
294303
}
295304
}
296305

306+
/// Enable the callbacks to be called
307+
/**
308+
* This method is thread safe, and provides a safe way to atomically enable the callbacks
309+
* in a multithreaded environment.
310+
*/
297311
void
298312
enable_callbacks() override
299313
{
314+
SubscriptionBase::enable_callbacks();
300315
any_callback_.enable();
301316
if (subscription_intra_process_) {
302317
subscription_intra_process_->enable_callbacks();

rclcpp/include/rclcpp/subscription_base.hpp

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -213,14 +213,21 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
213213
create_serialized_message() = 0;
214214

215215
/// Disable callbacks from being called
216+
/**
217+
* This function temporary removes the on_new_message_callback to prevent it from being called.
218+
*/
216219
RCLCPP_PUBLIC
217220
virtual
218-
void disable_callbacks() = 0;
221+
void disable_callbacks();
219222

220223
/// Enable the callbacks to be called
224+
/**
225+
* This function sets back the on_new_message_callback if it was previously removed in
226+
* disable_callbacks().
227+
*/
221228
RCLCPP_PUBLIC
222229
virtual
223-
void enable_callbacks() = 0;
230+
void enable_callbacks();
224231

225232
/// Check if we need to handle the message, and execute the callback if we do.
226233
/**
@@ -393,7 +400,7 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
393400
}
394401
};
395402

396-
std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
403+
std::lock_guard<std::recursive_mutex> lock(on_new_message_callback_mutex_);
397404

398405
// Set it temporarily to the new callback, while we replace the old one.
399406
// This two-step setting, prevents a gap where the old std::function has
@@ -416,7 +423,7 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
416423
void
417424
clear_on_new_message_callback()
418425
{
419-
std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
426+
std::lock_guard<std::recursive_mutex> lock(on_new_message_callback_mutex_);
420427

421428
if (on_new_message_callback_) {
422429
set_on_new_message_callback(nullptr, nullptr);
@@ -656,7 +663,7 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
656663

657664
std::shared_ptr<rcl_node_t> node_handle_;
658665

659-
std::recursive_mutex callback_mutex_;
666+
std::recursive_mutex on_new_message_callback_mutex_;
660667
// It is important to declare on_new_message_callback_ before
661668
// subscription_handle_, so on destruction the subscription is
662669
// destroyed first. Otherwise, the rmw subscription callback

rclcpp/src/rclcpp/generic_subscription.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ GenericSubscription::create_serialized_message()
4040
void
4141
GenericSubscription::disable_callbacks()
4242
{
43+
SubscriptionBase::disable_callbacks();
4344
any_callback_.disable();
4445
for (const auto & [_, event_ptr] : event_handlers_) {
4546
if (event_ptr) {
@@ -51,6 +52,7 @@ GenericSubscription::disable_callbacks()
5152
void
5253
GenericSubscription::enable_callbacks()
5354
{
55+
SubscriptionBase::enable_callbacks();
5456
any_callback_.enable();
5557
for (const auto & [_, event_ptr] : event_handlers_) {
5658
if (event_ptr) {

rclcpp/src/rclcpp/subscription_base.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,3 +577,26 @@ SubscriptionBase::take_dynamic_message(
577577
throw std::runtime_error("Unimplemented");
578578
return false;
579579
}
580+
581+
void
582+
SubscriptionBase::disable_callbacks()
583+
{
584+
// Temporary remove the on_new_message_callback_ to prevent it from being called
585+
std::lock_guard<std::recursive_mutex> lock(on_new_message_callback_mutex_);
586+
if (on_new_message_callback_) {
587+
set_on_new_message_callback(nullptr, nullptr);
588+
}
589+
}
590+
591+
void
592+
SubscriptionBase::enable_callbacks()
593+
{
594+
// Set callback again if it was previously removed in disable_callbacks()
595+
std::lock_guard<std::recursive_mutex> lock(on_new_message_callback_mutex_);
596+
if (on_new_message_callback_) {
597+
set_on_new_message_callback(
598+
rclcpp::detail::cpp_callback_trampoline<
599+
decltype(on_new_message_callback_), const void *, size_t>,
600+
static_cast<const void *>(&on_new_message_callback_));
601+
}
602+
}

rclcpp/src/rclcpp/subscription_intra_process_base.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,17 @@ SubscriptionIntraProcessBase::is_durability_transient_local() const
4040
{
4141
return qos_profile_.durability() == rclcpp::DurabilityPolicy::TransientLocal;
4242
}
43+
44+
void
45+
SubscriptionIntraProcessBase::disable_callbacks()
46+
{
47+
std::lock_guard<std::recursive_mutex> lock(on_new_message_callback_mutex_);
48+
on_new_message_callback_disabled_ = true;
49+
}
50+
51+
void
52+
SubscriptionIntraProcessBase::enable_callbacks()
53+
{
54+
std::lock_guard<std::recursive_mutex> lock(on_new_message_callback_mutex_);
55+
on_new_message_callback_disabled_ = false;
56+
}

0 commit comments

Comments
 (0)