From 0a0a9d13b0f4864838224b8f7fb0f1f106d6e490 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Wed, 19 Apr 2023 20:49:57 -0700 Subject: [PATCH 1/5] Gracefully handle SIGINT and SIGTERM signal for rosbag2 recorder - Call recorder->stop() and exec_->cancel() instead of rclcpp::shutdown Signed-off-by: Michael Orlov --- rosbag2_py/src/rosbag2_py/_transport.cpp | 42 ++++++++++++++++++------ 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 1b51f30429..e5a4fc8f35 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -190,16 +190,23 @@ class Player class Recorder { private: - std::unique_ptr exec_; + static std::unique_ptr exec_; + std::shared_ptr recorder_; public: Recorder() { - rclcpp::init(0, nullptr); + auto init_options = rclcpp::InitOptions(); + init_options.shutdown_on_signal = false; + rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::None); exec_ = std::make_unique(); std::signal( SIGTERM, [](int /* signal */) { - rclcpp::shutdown(); + rosbag2_py::Recorder::cancel(); + }); + std::signal( + SIGINT, [](int /* signal */) { + rosbag2_py::Recorder::cancel(); }); } @@ -218,24 +225,39 @@ class Recorder } auto writer = rosbag2_transport::ReaderWriterFactory::make_writer(record_options); - auto recorder = std::make_shared( + recorder_ = std::make_shared( std::move(writer), storage_options, record_options, node_name); - recorder->record(); + recorder_->record(); - exec_->add_node(recorder); + exec_->add_node(recorder_); // Release the GIL for long-running record, so that calling Python code can use other threads { py::gil_scoped_release release; - exec_->spin(); + while (!exit_) { + exec_->spin_all(std::chrono::milliseconds(30)); + } + if (recorder_) { + recorder_->stop(); + // Need to spin once to be able to send notifications about bag split and close + exec_->spin_some(std::chrono::milliseconds(30)); + } } } - void cancel() + static void cancel() { - exec_->cancel(); + exit_ = true; + if (exec_) { + exec_->cancel(); + } } + + static std::atomic_bool exit_; }; +std::unique_ptr Recorder::exec_; +std::atomic_bool Recorder::exit_{false}; + // Return a RecordOptions struct with defaults set for rewriting bags. rosbag2_transport::RecordOptions bag_rewrite_default_record_options() { @@ -360,7 +382,7 @@ PYBIND11_MODULE(_transport, m) { .def( "record", &rosbag2_py::Recorder::record, py::arg("storage_options"), py::arg("record_options"), py::arg("node_name") = "rosbag2_recorder") - .def("cancel", &rosbag2_py::Recorder::cancel) + .def_static("cancel", &rosbag2_py::Recorder::cancel) ; m.def( From a6adfb2edd5d8df011eb53ae45c0044728265b0c Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sat, 3 Jun 2023 11:39:30 -0700 Subject: [PATCH 2/5] Use singleton for static executor in the rosbag2_py::Recorder Signed-off-by: Michael Orlov --- rosbag2_py/src/rosbag2_py/_transport.cpp | 41 +++++++++++++++--------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index e5a4fc8f35..619759a9dd 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -190,7 +190,18 @@ class Player class Recorder { private: - static std::unique_ptr exec_; + using ExecutorT = rclcpp::executors::SingleThreadedExecutor; + static std::unique_ptr & get_executor_instance() + { + static std::unique_ptr executor( + new ExecutorT(), + [](ExecutorT * exec_ptr) { + delete exec_ptr; + rclcpp::shutdown(); + } + ); + return executor; + } std::shared_ptr recorder_; public: @@ -198,8 +209,9 @@ class Recorder { auto init_options = rclcpp::InitOptions(); init_options.shutdown_on_signal = false; - rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::None); - exec_ = std::make_unique(); + if (!rclcpp::ok()) { + rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::None); + } std::signal( SIGTERM, [](int /* signal */) { rosbag2_py::Recorder::cancel(); @@ -210,10 +222,7 @@ class Recorder }); } - virtual ~Recorder() - { - rclcpp::shutdown(); - } + virtual ~Recorder() = default; void record( const rosbag2_storage::StorageOptions & storage_options, @@ -229,33 +238,35 @@ class Recorder std::move(writer), storage_options, record_options, node_name); recorder_->record(); - exec_->add_node(recorder_); - // Release the GIL for long-running record, so that calling Python code can use other threads - { + get_executor_instance()->add_node(recorder_); + try { + // Release the GIL for long-running record, so that calling Python code can use other threads py::gil_scoped_release release; while (!exit_) { - exec_->spin_all(std::chrono::milliseconds(30)); + get_executor_instance()->spin_all(std::chrono::milliseconds(30)); } if (recorder_) { recorder_->stop(); // Need to spin once to be able to send notifications about bag split and close - exec_->spin_some(std::chrono::milliseconds(30)); + get_executor_instance()->spin_some(std::chrono::milliseconds(30)); } + } catch (...) { + get_executor_instance()->remove_node(recorder_); } + get_executor_instance()->remove_node(recorder_); } static void cancel() { exit_ = true; - if (exec_) { - exec_->cancel(); + if (get_executor_instance()) { + get_executor_instance()->cancel(); } } static std::atomic_bool exit_; }; -std::unique_ptr Recorder::exec_; std::atomic_bool Recorder::exit_{false}; // Return a RecordOptions struct with defaults set for rewriting bags. From 6fee40cbb60caaa9592204344bf70b9375fcd6d2 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Fri, 9 Jun 2023 10:08:47 -0700 Subject: [PATCH 3/5] Rollback to the non-static executor and don't call executor->cancel() - In case when signal will arrive we will trigger our internal exit_ variable and wait while current exec_->spin_all(10msec) will exit. Signed-off-by: Michael Orlov --- rosbag2_py/src/rosbag2_py/_transport.cpp | 38 +++++++++--------------- 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 619759a9dd..dcdd2d5825 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -191,17 +191,7 @@ class Recorder { private: using ExecutorT = rclcpp::executors::SingleThreadedExecutor; - static std::unique_ptr & get_executor_instance() - { - static std::unique_ptr executor( - new ExecutorT(), - [](ExecutorT * exec_ptr) { - delete exec_ptr; - rclcpp::shutdown(); - } - ); - return executor; - } + std::unique_ptr exec_; std::shared_ptr recorder_; public: @@ -209,9 +199,9 @@ class Recorder { auto init_options = rclcpp::InitOptions(); init_options.shutdown_on_signal = false; - if (!rclcpp::ok()) { - rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::None); - } + rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::None); + + exec_ = std::make_unique(); std::signal( SIGTERM, [](int /* signal */) { rosbag2_py::Recorder::cancel(); @@ -222,7 +212,10 @@ class Recorder }); } - virtual ~Recorder() = default; + virtual ~Recorder() + { + rclcpp::shutdown(); + } void record( const rosbag2_storage::StorageOptions & storage_options, @@ -238,30 +231,27 @@ class Recorder std::move(writer), storage_options, record_options, node_name); recorder_->record(); - get_executor_instance()->add_node(recorder_); + exec_->add_node(recorder_); try { // Release the GIL for long-running record, so that calling Python code can use other threads py::gil_scoped_release release; while (!exit_) { - get_executor_instance()->spin_all(std::chrono::milliseconds(30)); + exec_->spin_all(std::chrono::milliseconds(10)); } if (recorder_) { recorder_->stop(); - // Need to spin once to be able to send notifications about bag split and close - get_executor_instance()->spin_some(std::chrono::milliseconds(30)); + // Need to spin node to be able to send notifications about bag split and close + exec_->spin_all(std::chrono::milliseconds(10)); } } catch (...) { - get_executor_instance()->remove_node(recorder_); + exec_->remove_node(recorder_); } - get_executor_instance()->remove_node(recorder_); + exec_->remove_node(recorder_); } static void cancel() { exit_ = true; - if (get_executor_instance()) { - get_executor_instance()->cancel(); - } } static std::atomic_bool exit_; From c0b75eb80e740e8370f5db884a016657c7de1615 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sat, 10 Jun 2023 23:48:22 -0700 Subject: [PATCH 4/5] Spin recorder node in a separate thread for better handling exit - Run exec->spin() in a separate thread, because we need to call exec->cancel() after recorder->stop() to be able to send notifications about bag split and close. - Wait on conditional variable for exit_ flag Signed-off-by: Michael Orlov --- rosbag2_py/src/rosbag2_py/_transport.cpp | 46 +++++++++++++----------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index dcdd2d5825..0237bf5cfa 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -189,11 +189,6 @@ class Player class Recorder { -private: - using ExecutorT = rclcpp::executors::SingleThreadedExecutor; - std::unique_ptr exec_; - std::shared_ptr recorder_; - public: Recorder() { @@ -201,7 +196,6 @@ class Recorder init_options.shutdown_on_signal = false; rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::None); - exec_ = std::make_unique(); std::signal( SIGTERM, [](int /* signal */) { rosbag2_py::Recorder::cancel(); @@ -222,42 +216,52 @@ class Recorder RecordOptions & record_options, std::string & node_name) { + exit_ = false; + auto exec = std::make_unique(); if (record_options.rmw_serialization_format.empty()) { record_options.rmw_serialization_format = std::string(rmw_get_serialization_format()); } auto writer = rosbag2_transport::ReaderWriterFactory::make_writer(record_options); - recorder_ = std::make_shared( + auto recorder = std::make_shared( std::move(writer), storage_options, record_options, node_name); - recorder_->record(); + recorder->record(); - exec_->add_node(recorder_); - try { + exec->add_node(recorder); + // Run exec->spin() in a separate thread, because we need to call exec->cancel() after + // recorder->stop() to be able to send notifications about bag split and close. + auto spin_thread = std::thread( + [&exec]() { + exec->spin(); + }); + { // Release the GIL for long-running record, so that calling Python code can use other threads py::gil_scoped_release release; - while (!exit_) { - exec_->spin_all(std::chrono::milliseconds(10)); - } - if (recorder_) { - recorder_->stop(); - // Need to spin node to be able to send notifications about bag split and close - exec_->spin_all(std::chrono::milliseconds(10)); - } - } catch (...) { - exec_->remove_node(recorder_); + std::unique_lock lock(wait_for_exit_mutex_); + wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Recorder::exit_.load();}); + recorder->stop(); + } + exec->cancel(); + if (spin_thread.joinable()) { + spin_thread.join(); } - exec_->remove_node(recorder_); + exec->remove_node(recorder); } static void cancel() { exit_ = true; + wait_for_exit_cv_.notify_all(); } +protected: static std::atomic_bool exit_; + static std::condition_variable wait_for_exit_cv_; + std::mutex wait_for_exit_mutex_; }; std::atomic_bool Recorder::exit_{false}; +std::condition_variable Recorder::wait_for_exit_cv_{}; // Return a RecordOptions struct with defaults set for rewriting bags. rosbag2_transport::RecordOptions bag_rewrite_default_record_options() From 896d8b8ae63b2ad47aa4e69e3d04aa0a10fd9178 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Mon, 12 Jun 2023 15:15:01 -0700 Subject: [PATCH 5/5] Address race condition in rosbag2_py.test_record_cancel - Add `record_thread.join()` before trying to parse metadata. Signed-off-by: Michael Orlov --- rosbag2_py/test/test_transport.py | 1 + 1 file changed, 1 insertion(+) diff --git a/rosbag2_py/test/test_transport.py b/rosbag2_py/test/test_transport.py index 345cb1323b..548708d409 100644 --- a/rosbag2_py/test/test_transport.py +++ b/rosbag2_py/test/test_transport.py @@ -80,6 +80,7 @@ def test_record_cancel(tmp_path, storage_id): metadata_io = rosbag2_py.MetadataIo() assert wait_for(lambda: metadata_io.metadata_file_exists(bag_path), timeout=rclpy.duration.Duration(seconds=3)) + record_thread.join() metadata = metadata_io.read_metadata(bag_path) assert(len(metadata.relative_file_paths))