Skip to content

Commit f8b9b28

Browse files
rtt_roscomm: shortcut ROS spinner callback queue
The patch introduces a new "queue" without a queue to bypass the global CallbackQueue of ROS for the subscriptions of Orocos messages. Instead, the network thread will immediately execute the callback, and will pass it to the global CallbackQueue only in case that it fails to execute with TryAgain status. Helper functions are provided to generate a subscriber with a custom implementation of CallbackQueueInterface. This method is chosen for the subscribers of the message transporter.
1 parent c65cf80 commit f8b9b28

File tree

4 files changed

+309
-4
lines changed

4 files changed

+309
-4
lines changed

rtt_roscomm/CMakeLists.txt

+4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ orocos_library(rtt_rostopic
1717
src/rtt_rostopic.cpp)
1818
target_link_libraries(rtt_rostopic ${catkin_LIBRARIES})
1919

20+
orocos_library(passthrough_callback_queue
21+
src/passthrough_callback_queue.cpp)
22+
target_link_libraries(passthrough_callback_queue ${catkin_LIBRARIES})
23+
2024
orocos_service(rtt_rostopic_service
2125
src/rtt_rostopic_service.cpp
2226
src/rtt_rostopic_ros_publish_activity.cpp)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
2+
#ifndef RTT_ROSCOMM__PASSTHROUGH_CALLBACK_QUEUE_HPP_
3+
#define RTT_ROSCOMM__PASSTHROUGH_CALLBACK_QUEUE_HPP_
4+
5+
#include <ros/ros.h>
6+
#include <ros/callback_queue_interface.h>
7+
#include <ros/subscription_queue.h>
8+
#include <ros/callback_queue.h>
9+
10+
namespace rtt_roscomm {
11+
class PassthroughCallbackQueue: public ros::CallbackQueueInterface
12+
{
13+
public:
14+
PassthroughCallbackQueue();
15+
16+
/**
17+
* Implementation of ros::CallbackQueueInterface::addCallback()
18+
*
19+
* @param callback aaa
20+
* @param owner_id aaa
21+
*
22+
* @return void
23+
*/
24+
virtual void addCallback(
25+
const ros::CallbackInterfacePtr &callback,
26+
uint64_t owner_id=0);
27+
28+
/**
29+
* Implementation of ros::CallbackQueueInterface::removeByID()
30+
*
31+
* @param owner_id aaa
32+
*
33+
* @return void
34+
*/
35+
virtual void removeByID(uint64_t owner_id);
36+
37+
private:
38+
39+
/**
40+
* Port where to write the message
41+
*/
42+
// base::PortInterface* output_port_ptr_;
43+
44+
}; // class PassthroughCallbackQueue
45+
46+
/**
47+
* Helpers to generate subscribers with a custom ros::CallbackQueueInterface
48+
*/
49+
50+
// template<class M, class T>
51+
// ros::Subscriber subscribe(const std::string& topic, uint32_t queue_size,
52+
// void(T::*fp)(const boost::shared_ptr<M const>&) const, T* obj,
53+
// ros::CallbackQueueInterface* cq, ros::NodeHandle& nh,
54+
// const TransportHints& transport_hints = ros::TransportHints())
55+
// {
56+
// ros::SubscribeOptions ops;
57+
// ops.template init<M>(topic, queue_size, boost::bind(fp, obj, _1));
58+
// ops.transport_hints = transport_hints;
59+
// ops.callback_queue = cq;
60+
// return nh.subscribe(ops);
61+
// }
62+
63+
template<class M, class T>
64+
ros::Subscriber subscribe(
65+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
66+
const std::string& topic, uint32_t queue_size, void(T::*fp)(M), T* obj,
67+
const ros::TransportHints& transport_hints = ros::TransportHints())
68+
{
69+
ros::SubscribeOptions ops;
70+
ops.template initByFullCallbackType<M>(topic, queue_size, boost::bind(fp, obj, _1));
71+
ops.transport_hints = transport_hints;
72+
ops.callback_queue = cq;
73+
return nh.subscribe(ops);
74+
}
75+
76+
/// and the const version
77+
template<class M, class T>
78+
ros::Subscriber subscribe(
79+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
80+
const std::string& topic, uint32_t queue_size, void(T::*fp)(M) const, T* obj,
81+
const ros::TransportHints& transport_hints = ros::TransportHints())
82+
{
83+
ros::SubscribeOptions ops;
84+
ops.template initByFullCallbackType<M>(topic, queue_size, boost::bind(fp, obj, _1));
85+
ops.transport_hints = transport_hints;
86+
ops.callback_queue = cq;
87+
return nh.subscribe(ops);
88+
}
89+
template<class M, class T>
90+
ros::Subscriber subscribe(
91+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
92+
const std::string& topic, uint32_t queue_size,
93+
void(T::*fp)(const boost::shared_ptr<M const>&), T* obj,
94+
const ros::TransportHints& transport_hints = ros::TransportHints())
95+
{
96+
ros::SubscribeOptions ops;
97+
ops.template init<M>(topic, queue_size, boost::bind(fp, obj, _1));
98+
ops.transport_hints = transport_hints;
99+
ops.callback_queue = cq;
100+
return nh.subscribe(ops);
101+
}
102+
template<class M, class T>
103+
ros::Subscriber subscribe(
104+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
105+
const std::string& topic, uint32_t queue_size,
106+
void(T::*fp)(const boost::shared_ptr<M const>&) const, T* obj,
107+
const ros::TransportHints& transport_hints = ros::TransportHints())
108+
{
109+
ros::SubscribeOptions ops;
110+
ops.template init<M>(topic, queue_size, boost::bind(fp, obj, _1));
111+
ops.transport_hints = transport_hints;
112+
ops.callback_queue = cq;
113+
return nh.subscribe(ops);
114+
}
115+
template<class M, class T>
116+
ros::Subscriber subscribe(
117+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
118+
const std::string& topic, uint32_t queue_size, void(T::*fp)(M),
119+
const boost::shared_ptr<T>& obj,
120+
const ros::TransportHints& transport_hints = ros::TransportHints())
121+
{
122+
ros::SubscribeOptions ops;
123+
ops.template initByFullCallbackType<M>(topic, queue_size, boost::bind(fp, obj.get(), _1));
124+
ops.tracked_object = obj;
125+
ops.transport_hints = transport_hints;
126+
ops.callback_queue = cq;
127+
return nh.subscribe(ops);
128+
}
129+
130+
template<class M, class T>
131+
ros::Subscriber subscribe(
132+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
133+
const std::string& topic, uint32_t queue_size, void(T::*fp)(M) const,
134+
const boost::shared_ptr<T>& obj,
135+
const ros::TransportHints& transport_hints = ros::TransportHints())
136+
{
137+
ros::SubscribeOptions ops;
138+
ops.template initByFullCallbackType<M>(topic, queue_size, boost::bind(fp, obj.get(), _1));
139+
ops.tracked_object = obj;
140+
ops.transport_hints = transport_hints;
141+
ops.callback_queue = cq;
142+
return nh.subscribe(ops);
143+
}
144+
template<class M, class T>
145+
ros::Subscriber subscribe(
146+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
147+
const std::string& topic, uint32_t queue_size,
148+
void(T::*fp)(const boost::shared_ptr<M const>&),
149+
const boost::shared_ptr<T>& obj,
150+
const ros::TransportHints& transport_hints = ros::TransportHints())
151+
{
152+
ros::SubscribeOptions ops;
153+
ops.template init<M>(topic, queue_size, boost::bind(fp, obj.get(), _1));
154+
ops.tracked_object = obj;
155+
ops.transport_hints = transport_hints;
156+
ops.callback_queue = cq;
157+
return nh.subscribe(ops);
158+
}
159+
template<class M, class T>
160+
ros::Subscriber subscribe(
161+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
162+
const std::string& topic, uint32_t queue_size,
163+
void(T::*fp)(const boost::shared_ptr<M const>&) const,
164+
const boost::shared_ptr<T>& obj,
165+
const ros::TransportHints& transport_hints = ros::TransportHints())
166+
{
167+
ros::SubscribeOptions ops;
168+
ops.template init<M>(topic, queue_size, boost::bind(fp, obj.get(), _1));
169+
ops.tracked_object = obj;
170+
ops.transport_hints = transport_hints;
171+
ops.callback_queue = cq;
172+
return nh.subscribe(ops);
173+
}
174+
template<class M>
175+
ros::Subscriber subscribe(
176+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
177+
const std::string& topic, uint32_t queue_size, void(*fp)(M),
178+
const ros::TransportHints& transport_hints = ros::TransportHints())
179+
{
180+
ros::SubscribeOptions ops;
181+
ops.template initByFullCallbackType<M>(topic, queue_size, fp);
182+
ops.transport_hints = transport_hints;
183+
ops.callback_queue = cq;
184+
return nh.subscribe(ops);
185+
}
186+
template<class M>
187+
ros::Subscriber subscribe(
188+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
189+
const std::string& topic, uint32_t queue_size,
190+
void(*fp)(const boost::shared_ptr<M const>&),
191+
const ros::TransportHints& transport_hints = ros::TransportHints())
192+
{
193+
ros::SubscribeOptions ops;
194+
ops.template init<M>(topic, queue_size, fp);
195+
ops.transport_hints = transport_hints;
196+
ops.callback_queue = cq;
197+
return nh.subscribe(ops);
198+
}
199+
template<class M>
200+
ros::Subscriber subscribe(
201+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
202+
const std::string& topic, uint32_t queue_size,
203+
const boost::function<void (const boost::shared_ptr<M const>&)>& callback,
204+
const ros::VoidConstPtr& tracked_object = ros::VoidConstPtr(),
205+
const ros::TransportHints& transport_hints = ros::TransportHints())
206+
{
207+
ros::SubscribeOptions ops;
208+
ops.template init<M>(topic, queue_size, callback);
209+
ops.tracked_object = tracked_object;
210+
ops.transport_hints = transport_hints;
211+
ops.callback_queue = cq;
212+
return nh.subscribe(ops);
213+
}
214+
template<class M, class C>
215+
ros::Subscriber subscribe(
216+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
217+
const std::string& topic, uint32_t queue_size,
218+
const boost::function<void (C)>& callback,
219+
const ros::VoidConstPtr& tracked_object = ros::VoidConstPtr(),
220+
const ros::TransportHints& transport_hints = ros::TransportHints())
221+
{
222+
ros::SubscribeOptions ops;
223+
ops.template initByFullCallbackType<C>(topic, queue_size, callback);
224+
ops.tracked_object = tracked_object;
225+
ops.transport_hints = transport_hints;
226+
ops.callback_queue = cq;
227+
return nh.subscribe(ops);
228+
}
229+
230+
} // namespace rtt_roscomm
231+
232+
#endif // RTT_ROSCOMM__PASSTHROUGH_CALLBACK_QUEUE_HPP_

rtt_roscomm/include/rtt_roscomm/rtt_rostopic_ros_msg_transporter.hpp

+6-4
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
#include <rtt/internal/ConnFactory.hpp>
5757
#include <ros/ros.h>
5858

59+
#include <rtt_roscomm/passthrough_callback_queue.hpp>
5960
#include <rtt_roscomm/rtt_rostopic_ros_publish_activity.hpp>
6061

6162
#ifndef RTT_VERSION_GTE
@@ -241,6 +242,7 @@ namespace rtt_roscomm {
241242
ros::NodeHandle ros_node;
242243
ros::NodeHandle ros_node_private;
243244
ros::Subscriber ros_sub;
245+
PassthroughCallbackQueue prassthrough_callback_queue;
244246

245247
public:
246248
/**
@@ -264,9 +266,9 @@ namespace rtt_roscomm {
264266
RTT::log(RTT::Debug)<<"Creating ROS subscriber for port "<<port->getName()<<" on topic "<<policy.name_id<<RTT::endlog();
265267
}
266268
if(topicname.length() > 1 && topicname.at(0) == '~') {
267-
ros_sub = ros_node_private.subscribe(policy.name_id.substr(1), policy.size > 0 ? policy.size : 1, &RosSubChannelElement::newData, this); // minimum queue_size 1
269+
ros_sub = subscribe(ros_node_private, &prassthrough_callback_queue, policy.name_id.substr(1), policy.size > 0 ? policy.size : 1, &RosSubChannelElement::newData, this); // minimum queue_size 1
268270
} else {
269-
ros_sub = ros_node.subscribe(policy.name_id, policy.size > 0 ? policy.size : 1, &RosSubChannelElement::newData, this); // minimum queue_size 1
271+
ros_sub = subscribe(ros_node, &prassthrough_callback_queue, policy.name_id, policy.size > 0 ? policy.size : 1, &RosSubChannelElement::newData, this); // minimum queue_size 1
270272
}
271273
}
272274

@@ -350,6 +352,6 @@ namespace rtt_roscomm {
350352

351353
return channel;
352354
}
353-
};
354-
}
355+
}; // class RosMsgTransporter
356+
} // namespace rtt_roscomm
355357
#endif
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
2+
#include "rtt_roscomm/passthrough_callback_queue.hpp"
3+
4+
namespace rtt_roscomm {
5+
6+
PassthroughCallbackQueue::PassthroughCallbackQueue()
7+
{}
8+
9+
void PassthroughCallbackQueue::addCallback(
10+
const ros::CallbackInterfacePtr &callback,
11+
uint64_t owner_id)
12+
{
13+
14+
// Explanation about Callbacks and Spinning:
15+
// http://wiki.ros.org/roscpp/Overview/Callbacks%20and%20Spinning
16+
// Check specially the section 4. Advanced: Using Different Callback Queues
17+
// Our use-case is not in 4.1. Shortcutting the callback queue by not having
18+
// a queue.
19+
20+
// Reference 1:
21+
// https://github.com/ros/ros_comm/blob/ae2501fec33de9a19abe87199bbf60171c05dd34/clients/roscpp/include/ros/node_handle.h#L402
22+
// Function Subscriber::subscribe() creates a SubscribeOptions ops and calls subscribe(ops)
23+
// need to replace that one with my own options to use the ops.callback_queue different from 0 (0 is default callback_queue, the ROS one)
24+
// I need to do that when I call subscribe() and use directly my ops instead of calling this referenced subscribe()
25+
26+
// Reference 2:
27+
// https://github.com/ros/ros_comm/blob/ae2501fec33de9a19abe87199bbf60171c05dd34/clients/roscpp/src/libros/callback_queue.cpp#L346
28+
// Function CallbackQueue::callOneCB() executes a callback and then it removes it
29+
// The callback inherits from CallbackInterface and it contains a cb->call() method that is called before being removed
30+
// We don't need all this complexity, the idea is that we call the callback immediately to do the work
31+
// It also deals with TryAgain result, which would be the case if in a multi-threaded situation the callback is being taken
32+
// by another thread
33+
34+
// Reference 3:
35+
// https://github.com/ros/ros_comm/blob/ae2501fec33de9a19abe87199bbf60171c05dd34/clients/roscpp/src/libros/subscription.cpp#L657
36+
// Function: Subscription::handleMessage() is the function that calls the addCallback
37+
// What it pushes to addCallback is another queue for the subscription.
38+
// This queue would have the deserializers. Need to see what happens with the callback queue that is sent
39+
// as the callback
40+
41+
// CallbackInterfacePtr from it we need to extract the data
42+
//
43+
// We receive as callback a ros::SubscriptionQueue that is a std::queue with:
44+
// (info->helper_, deserializer, info->has_tracked_object_, info->tracked_object_, nonconst_need_copy, receipt_time, &was_full)
45+
// (const SubscriptionCallbackHelperPtr &), (const MessageDeserializerPtr &), (bool), (const VoidConstWPtr &), (bool), (ros::Time), (bool *)
46+
//
47+
// The deserializer has the message: deserializer = boost::make_shared<MessageDeserializer>(info->helper_, m, connection_header);
48+
//
49+
50+
// Reference A:
51+
// https://github.com/ros/ros_comm/blob/121bac963e90b9a0bb33cc368ef4373a2f65ddff/clients/roscpp/src/libros/subscription_queue.cpp#L102
52+
// Function SubscriptionQueue::call()
53+
//
54+
55+
// Call CallbackInterface::CallResult SubscriptionQueue::call()
56+
if (ros::CallbackInterface::TryAgain == callback->call()) {
57+
ros::getGlobalCallbackQueue()->addCallback(callback, owner_id);
58+
}
59+
60+
}
61+
62+
void PassthroughCallbackQueue::removeByID(uint64_t owner_id)
63+
{
64+
}
65+
66+
} // namespace rtt_roscomm
67+

0 commit comments

Comments
 (0)