diff --git a/README.md b/README.md index 16499038f6..992cff3a12 100644 --- a/README.md +++ b/README.md @@ -74,6 +74,18 @@ Messages written to the bag will use the latest received value of `/clock` for t Note: Until the first `/clock` message is received, the recorder will not write any messages. Before that message is received, the time is 0, which leads to a significant time jump once simulation time begins, making the bag essentially unplayable if messages are written first with time 0 and then time N from `/clock`. +#### Recording with termination conditions + +rosbag2 can be configured to stop recording after a certain size, duration or number of messages is reached (considering **all** the bag files of the current recording). By default the recording will continue until an external signal is stopping it, but this can be changed using the CLI options. + +_Stopping by size_: `ros2 bag record -a --max-recording-size 100000` will stop the recording after it becomes greater than 100 kilobytes. This option defaults to `0`, which means the recording will not stop based on size. + +_Stopping by duration_: `ros2 bag record -a --max-recording-duration 9000` will stop the recording after `9000` seconds. This option defaults to `0`, which means the recording will not stop based on duration. + +_Stopping by number of messages_: `ros2 bag record -a --max-recording-messages 1000` will stop the recording after `1000` messages have been recorded. This option defaults to `0`, which means the recording will not stop based on number of messages. + +If multiple stopping conditions are enabled, the recording will stop at whichever threshold is reached first. + #### Splitting files during recording rosbag2 offers the capability to split bag files when they reach a maximum size or after a specified duration. By default rosbag2 will record all data into a single bag file, but this can be changed using the CLI options. @@ -253,6 +265,9 @@ output_bags: storage_id: "" # will use the default storage plugin, if unspecified max_bagfile_size: 0 max_bagfile_duration: 0 + max_recording_size: 0 + max_recording_duration: 0 + max_recording_messages: 0 storage_preset_profile: "" storage_config_uri: "" all_topics: false diff --git a/ros2bag/ros2bag/verb/record.py b/ros2bag/ros2bag/verb/record.py index dadb6807ba..9ed48b594e 100644 --- a/ros2bag/ros2bag/verb/record.py +++ b/ros2bag/ros2bag/verb/record.py @@ -145,6 +145,18 @@ def add_recorder_arguments(parser: ArgumentParser) -> None: 'Default: %(default)d, recording written in single bagfile and splitting ' 'is disabled. If both splitting by size and duration are enabled, ' 'the bag will split at whichever threshold is reached first.') + parser.add_argument( + '--max-recording-size', type=int, default=0, + help='Maximum size in bytes before the recording will be stopped. ' + 'Default: %(default)d, recording will not stop based on size.') + parser.add_argument( + '--max-recording-duration', type=int, default=0, + help='Maximum duration in seconds before the recording will be stopped. ' + 'Default: %(default)d, recording will not stop based on duration.') + parser.add_argument( + '--max-recording-messages', type=int, default=0, + help='Maximum number of messages before the recording will be stopped. ' + 'Default: %(default)d, recording will not stop based on number of messages.') parser.add_argument( '--max-cache-size', type=int, default=100 * 1024 * 1024, help='Maximum size (in bytes) of messages to hold in each buffer of cache. ' @@ -325,7 +337,10 @@ def main(self, *, args): # noqa: D102 storage_preset_profile=args.storage_preset_profile, storage_config_uri=storage_config_file, snapshot_mode=args.snapshot_mode, - custom_data=custom_data + custom_data=custom_data, + max_recording_size=args.max_recording_size, + max_recording_duration=args.max_recording_duration, + max_recording_messages=args.max_recording_messages, ) record_options = RecordOptions() record_options.all_topics = args.all_topics or args.all diff --git a/rosbag2_cpp/include/rosbag2_cpp/bag_events.hpp b/rosbag2_cpp/include/rosbag2_cpp/bag_events.hpp index 8e0ac48861..24ed7b2c24 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/bag_events.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/bag_events.hpp @@ -38,6 +38,8 @@ enum class BagEvent WRITE_SPLIT, /// Reading of the input bag file has gone over a split, opening the next file. READ_SPLIT, + /// The recording has reached a predetermined stopping condition. + STOP_RECORDING }; /** @@ -51,7 +53,19 @@ struct BagSplitInfo std::string opened_file; }; +/** + * \brief The information structure passed to callbacks for the STOP_RECORDING event. + */ +struct StopRecordingInfo +{ + /// The base folder where the recording was taking place. + std::string base_folder; + /// The reason for stopping the recording. + std::string reason; +}; + using BagSplitCallbackType = std::function; +using BagStopRecordingCallbackType = std::function; /** * \brief Use this structure to register callbacks with Writers. @@ -60,6 +74,8 @@ struct WriterEventCallbacks { /// The callback to call for the WRITE_SPLIT event. BagSplitCallbackType write_split_callback; + /// The callback to call for the STOP_RECORDING event. + BagStopRecordingCallbackType stop_recording_callback; }; /** diff --git a/rosbag2_cpp/include/rosbag2_cpp/stop_recording_options.hpp b/rosbag2_cpp/include/rosbag2_cpp/stop_recording_options.hpp new file mode 100644 index 0000000000..84e8c77d05 --- /dev/null +++ b/rosbag2_cpp/include/rosbag2_cpp/stop_recording_options.hpp @@ -0,0 +1,32 @@ +// Copyright 2025 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_CPP__STOP_RECORDING_OPTIONS_HPP_ +#define ROSBAG2_CPP__STOP_RECORDING_OPTIONS_HPP_ + +#include + +namespace rosbag2_cpp +{ + +struct StopRecordingOptions +{ + uint64_t max_duration; + uint64_t max_size; + uint64_t max_messages; +}; + +} // namespace rosbag2_cpp + +#endif // ROSBAG2_CPP__STOP_RECORDING_OPTIONS_HPP_ diff --git a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp index c2597f9b03..7244d83b6a 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp @@ -27,6 +27,7 @@ #include "rosbag2_cpp/bag_events.hpp" #include "rosbag2_cpp/converter_options.hpp" +#include "rosbag2_cpp/stop_recording_options.hpp" #include "rosbag2_cpp/visibility_control.hpp" #include "rosbag2_cpp/writers/sequential_writer.hpp" diff --git a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp index fd9aae481d..260f53afe7 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp @@ -158,6 +158,10 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter void execute_bag_split_callbacks( const std::string & closed_file, const std::string & opened_file); + void signal_stop_recording(const std::string & out_stop_reason); + + void execute_signal_stop_recording_callbacks(const std::string & out_stop_reason); + void switch_to_next_storage(); std::string format_storage_uri( @@ -183,6 +187,16 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter bool should_split_bagfile( const std::chrono::time_point & current_time) const; + // Checks if the recording needs to be stopped due to predefined conditions + // before writing the current message. + bool should_stop_recording_pre_write( + const std::chrono::time_point & current_time, + std::string & stop_reason) const; + + // Checks if the recording needs to be stopped due to predefined conditions + // after writing the current message. + bool should_stop_recording_post_write(std::string & stop_reason) const; + // Checks if the message to be written is within accepted time range bool message_within_accepted_time_range( const rcutils_time_point_value_t current_time) const; @@ -200,6 +214,15 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter get_writeable_message( std::shared_ptr message); + // Used to track the approximate size of the previously splitted bags. + uint64_t previous_bags_size_ {0}; + + // Used to track the number of messages in the previously splitted bags. + uint64_t previous_bags_num_messages_ {0}; + + // Used to track if the stop recording signal has been already sent. + bool signaled_stop_recording_ {false}; + private: /// Helper method to write messages while also updating tracked metadata. void write_messages( diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index ea6d2fa579..700c4422e5 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -327,6 +327,9 @@ void SequentialWriter::switch_to_next_storage() message_cache_->log_dropped(); } + previous_bags_size_ += storage_->get_bagfile_size(); + previous_bags_num_messages_ += metadata_.files.back().message_count; + storage_->update_metadata(metadata_); storage_options_.uri = format_storage_uri( base_folder_, @@ -385,8 +388,30 @@ void SequentialWriter::split_bagfile() (void)split_bagfile_local(); } +void SequentialWriter::signal_stop_recording(const std::string & reason) +{ + if (signaled_stop_recording_) { + return; + } + execute_signal_stop_recording_callbacks(reason); + signaled_stop_recording_ = true; +} + +void SequentialWriter::execute_signal_stop_recording_callbacks( + const std::string & reason) +{ + auto info = std::make_shared(); + info->base_folder = base_folder_; + info->reason = reason; + callback_manager_.execute_callbacks(bag_events::BagEvent::STOP_RECORDING, info); +} + void SequentialWriter::write(std::shared_ptr message) { + if (signaled_stop_recording_) { + return; + } + if (!is_open_) { throw std::runtime_error("Bag is not open. Call open() before writing."); } @@ -415,6 +440,12 @@ void SequentialWriter::write(std::shared_ptrpush(converted_msg); } + + if (should_stop_recording_post_write(out_stop_reason)) { + signal_stop_recording(out_stop_reason); + return; + } } bool SequentialWriter::take_snapshot() @@ -508,6 +544,52 @@ bool SequentialWriter::message_within_accepted_time_range( return true; } +bool SequentialWriter::should_stop_recording_pre_write( + const std::chrono::time_point & current_time, + std::string & out_stop_reason) const +{ + // Stopping by time + if (storage_options_.max_recording_duration != + rosbag2_storage::storage_interfaces::MAX_RECORDING_DURATION_NO_STOP) + { + auto max_duration_ns = std::chrono::duration_cast( + std::chrono::seconds(storage_options_.max_recording_duration)); + if ((current_time - metadata_.starting_time) > max_duration_ns) { + out_stop_reason = "max duration"; + return true; + } + } + + return false; +} + +bool SequentialWriter::should_stop_recording_post_write(std::string & out_stop_reason) const +{ + // Stopping by size + if (storage_options_.max_recording_size != + rosbag2_storage::storage_interfaces::MAX_RECORDING_SIZE_NO_STOP) + { + if (previous_bags_size_ + storage_->get_bagfile_size() >= storage_options_.max_recording_size) { + out_stop_reason = "max size"; + return true; + } + } + + // Stopping by number of messages + if (storage_options_.max_recording_messages != + rosbag2_storage::storage_interfaces::MAX_RECORDING_MESSAGES_NO_STOP) + { + if (previous_bags_num_messages_ + metadata_.files.back().message_count >= + storage_options_.max_recording_messages) + { + out_stop_reason = "max messages"; + return true; + } + } + + return false; +} + void SequentialWriter::finalize_metadata() { metadata_.bag_size = 0; @@ -563,6 +645,11 @@ void SequentialWriter::add_event_callbacks(const bag_events::WriterEventCallback callbacks.write_split_callback, bag_events::BagEvent::WRITE_SPLIT); } + if (callbacks.stop_recording_callback) { + callback_manager_.add_event_callback( + callbacks.stop_recording_callback, + bag_events::BagEvent::STOP_RECORDING); + } } } // namespace writers diff --git a/rosbag2_interfaces/CMakeLists.txt b/rosbag2_interfaces/CMakeLists.txt index 6999fdc962..8dc15cd943 100644 --- a/rosbag2_interfaces/CMakeLists.txt +++ b/rosbag2_interfaces/CMakeLists.txt @@ -15,6 +15,7 @@ find_package(rosidl_default_generators REQUIRED) rosidl_generate_interfaces(${PROJECT_NAME} "msg/ReadSplitEvent.msg" + "msg/StopRecordingEvent.msg" "msg/WriteSplitEvent.msg" "srv/Burst.srv" "srv/GetRate.srv" diff --git a/rosbag2_interfaces/msg/StopRecordingEvent.msg b/rosbag2_interfaces/msg/StopRecordingEvent.msg new file mode 100644 index 0000000000..fd10ce023a --- /dev/null +++ b/rosbag2_interfaces/msg/StopRecordingEvent.msg @@ -0,0 +1,8 @@ +# The full path of the base folder where the recording was taking place. +string base_folder + +# The reason for stopping the recording. +string reason + +# The fully qualified node name of the event sender +string node_name diff --git a/rosbag2_py/rosbag2_py/_storage.pyi b/rosbag2_py/rosbag2_py/_storage.pyi index 2520c4e070..1a213898d2 100644 --- a/rosbag2_py/rosbag2_py/_storage.pyi +++ b/rosbag2_py/rosbag2_py/_storage.pyi @@ -101,13 +101,16 @@ class StorageOptions: max_bagfile_duration: int max_bagfile_size: int max_cache_size: int + max_recording_duration: int + max_recording_messages: int + max_recording_size: int snapshot_mode: bool start_time_ns: int storage_config_uri: str storage_id: str storage_preset_profile: str uri: str - def __init__(self, uri: str, storage_id: str = ..., max_bagfile_size: int = ..., max_bagfile_duration: int = ..., max_cache_size: int = ..., storage_preset_profile: str = ..., storage_config_uri: str = ..., snapshot_mode: bool = ..., start_time_ns: int = ..., end_time_ns: int = ..., custom_data: Dict[str, str] = ...) -> None: ... + def __init__(self, uri: str, storage_id: str = ..., max_bagfile_size: int = ..., max_bagfile_duration: int = ..., max_cache_size: int = ..., storage_preset_profile: str = ..., storage_config_uri: str = ..., snapshot_mode: bool = ..., start_time_ns: int = ..., end_time_ns: int = ..., custom_data: Dict[str, str] = ..., max_recording_size: int = ..., max_recording_duration: int = ..., max_recording_messages: int = ...) -> None: ... class TopicInformation: message_count: int diff --git a/rosbag2_py/src/rosbag2_py/_storage.cpp b/rosbag2_py/src/rosbag2_py/_storage.cpp index e12898053d..64d709744c 100644 --- a/rosbag2_py/src/rosbag2_py/_storage.cpp +++ b/rosbag2_py/src/rosbag2_py/_storage.cpp @@ -83,7 +83,7 @@ PYBIND11_MODULE(_storage, m) { .def( pybind11::init< std::string, std::string, uint64_t, uint64_t, uint64_t, std::string, std::string, bool, - int64_t, int64_t, KEY_VALUE_MAP>(), + int64_t, int64_t, KEY_VALUE_MAP, uint64_t, uint64_t, uint64_t>(), pybind11::arg("uri"), pybind11::arg("storage_id") = "", pybind11::arg("max_bagfile_size") = 0, @@ -94,7 +94,10 @@ PYBIND11_MODULE(_storage, m) { pybind11::arg("snapshot_mode") = false, pybind11::arg("start_time_ns") = -1, pybind11::arg("end_time_ns") = -1, - pybind11::arg("custom_data") = KEY_VALUE_MAP{}) + pybind11::arg("custom_data") = KEY_VALUE_MAP{}, + pybind11::arg("max_recording_size") = 0, + pybind11::arg("max_recording_duration") = 0, + pybind11::arg("max_recording_messages") = 0) .def_readwrite("uri", &rosbag2_storage::StorageOptions::uri) .def_readwrite("storage_id", &rosbag2_storage::StorageOptions::storage_id) .def_readwrite( @@ -123,7 +126,16 @@ PYBIND11_MODULE(_storage, m) { &rosbag2_storage::StorageOptions::end_time_ns) .def_readwrite( "custom_data", - &rosbag2_storage::StorageOptions::custom_data); + &rosbag2_storage::StorageOptions::custom_data) + .def_readwrite( + "max_recording_size", + &rosbag2_storage::StorageOptions::max_recording_size) + .def_readwrite( + "max_recording_duration", + &rosbag2_storage::StorageOptions::max_recording_duration) + .def_readwrite( + "max_recording_messages", + &rosbag2_storage::StorageOptions::max_recording_messages); pybind11::class_(m, "StorageFilter") .def( diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index efb7fc536d..a204c966bb 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -383,14 +383,25 @@ class Recorder [&exec]() { exec->spin(); }); + + auto wait_for_exit_thread = std::thread( + [&]() { + std::unique_lock lock(wait_for_exit_mutex_); + wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Recorder::exit_.load();}); + recorder->stop(); + }); { // Release the GIL for long-running record, so that calling Python code // can use other threads py::gil_scoped_release release; - std::unique_lock lock(wait_for_exit_mutex_); - wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Recorder::exit_.load();}); - recorder->stop(); + recorder->wait_for_recording_to_finish(); } + + rosbag2_py::Recorder::cancel(); // Need to trigger exit from wait_for_exit_thread + if (wait_for_exit_thread.joinable()) { + wait_for_exit_thread.join(); + } + exec->cancel(); if (spin_thread.joinable()) { spin_thread.join(); diff --git a/rosbag2_storage/include/rosbag2_storage/storage_interfaces/base_io_interface.hpp b/rosbag2_storage/include/rosbag2_storage/storage_interfaces/base_io_interface.hpp index cb93cc1d4d..4cef7b2405 100644 --- a/rosbag2_storage/include/rosbag2_storage/storage_interfaces/base_io_interface.hpp +++ b/rosbag2_storage/include/rosbag2_storage/storage_interfaces/base_io_interface.hpp @@ -40,6 +40,18 @@ ROSBAG2_STORAGE_PUBLIC extern const uint64_t MAX_BAGFILE_SIZE_NO_SPLIT; // use 0 as the default maximum bagfile duration value. ROSBAG2_STORAGE_PUBLIC extern const uint64_t MAX_BAGFILE_DURATION_NO_SPLIT; +// When recording stopping feature is not enabled or applicable, +// use 0 as the default maximum recording size value. +ROSBAG2_STORAGE_PUBLIC extern const uint64_t MAX_RECORDING_SIZE_NO_STOP; + +// When recording stopping feature is not enabled or applicable, +// use 0 as the default maximum recording duration value. +ROSBAG2_STORAGE_PUBLIC extern const uint64_t MAX_RECORDING_DURATION_NO_STOP; + +// When recording stopping feature is not enabled or applicable, +// use 0 as the default maximum recording number of messages value. +ROSBAG2_STORAGE_PUBLIC extern const uint64_t MAX_RECORDING_MESSAGES_NO_STOP; + class ROSBAG2_STORAGE_PUBLIC BaseIOInterface { public: diff --git a/rosbag2_storage/include/rosbag2_storage/storage_options.hpp b/rosbag2_storage/include/rosbag2_storage/storage_options.hpp index 8d6d188459..cd4f30ee3e 100644 --- a/rosbag2_storage/include/rosbag2_storage/storage_options.hpp +++ b/rosbag2_storage/include/rosbag2_storage/storage_options.hpp @@ -62,6 +62,18 @@ struct StorageOptions // Stores the custom data std::unordered_map custom_data{}; + + // The maximum size the recording can be, in bytes, before the recording is stopped. + // A value of 0 indicates that the recording will not be stopped based on size. + uint64_t max_recording_size = 0; + + // The maximum duration the recording can be, in seconds, before the recording is stopped. + // A value of 0 indicates that the recording will not be stopped based on duration. + uint64_t max_recording_duration = 0; + + // The maximum number of messages before the recording is stopped. + // A value of 0 indicates that the recording will not be stopped based on number of messages. + uint64_t max_recording_messages = 0; }; } // namespace rosbag2_storage diff --git a/rosbag2_storage/src/rosbag2_storage/base_io_interface.cpp b/rosbag2_storage/src/rosbag2_storage/base_io_interface.cpp index 3936d2e636..821376e13a 100644 --- a/rosbag2_storage/src/rosbag2_storage/base_io_interface.cpp +++ b/rosbag2_storage/src/rosbag2_storage/base_io_interface.cpp @@ -20,5 +20,8 @@ namespace storage_interfaces { const uint64_t MAX_BAGFILE_SIZE_NO_SPLIT = 0; const uint64_t MAX_BAGFILE_DURATION_NO_SPLIT = 0; +const uint64_t MAX_RECORDING_SIZE_NO_STOP = 0; +const uint64_t MAX_RECORDING_DURATION_NO_STOP = 0; +const uint64_t MAX_RECORDING_MESSAGES_NO_STOP = 0; } -} +} // namespace rosbag2_storage diff --git a/rosbag2_storage/src/rosbag2_storage/storage_options.cpp b/rosbag2_storage/src/rosbag2_storage/storage_options.cpp index 717fcb9082..ae707a1a7d 100644 --- a/rosbag2_storage/src/rosbag2_storage/storage_options.cpp +++ b/rosbag2_storage/src/rosbag2_storage/storage_options.cpp @@ -28,6 +28,10 @@ Node convert::encode( node["storage_id"] = storage_options.storage_id; node["max_bagfile_size"] = storage_options.max_bagfile_size; node["max_bagfile_duration"] = storage_options.max_bagfile_duration; + node["max_recording_size"] = storage_options.max_recording_size; + node["max_recording_duration"] = storage_options.max_recording_duration; + node["max_recording_messages"] = + storage_options.max_recording_messages; node["max_cache_size"] = storage_options.max_cache_size; node["storage_preset_profile"] = storage_options.storage_preset_profile; node["storage_config_uri"] = storage_options.storage_config_uri; @@ -45,6 +49,12 @@ bool convert::decode( optional_assign(node, "storage_id", storage_options.storage_id); optional_assign(node, "max_bagfile_size", storage_options.max_bagfile_size); optional_assign(node, "max_bagfile_duration", storage_options.max_bagfile_duration); + optional_assign(node, "max_recording_size", storage_options.max_recording_size); + optional_assign( + node, "max_recording_duration", storage_options.max_recording_duration); + optional_assign( + node, "max_recording_messages", + storage_options.max_recording_messages); optional_assign(node, "max_cache_size", storage_options.max_cache_size); optional_assign( node, "storage_preset_profile", storage_options.storage_preset_profile); diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp index 3126e41884..0d38b4d236 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp @@ -520,6 +520,151 @@ TEST_P(RecordFixture, record_end_to_end_test_with_zstd_file_compression_compress } } +TEST_P(RecordFixture, record_end_to_end_with_size_stop_recording_splits_bagfile) { + constexpr const char topic_name[] = "/test_topic"; + constexpr const int recording_size = 5 * 1024 * 1024; // 5MB. + constexpr const int bagfile_split_size = 2 * 1024 * 1024; // 2MB. + constexpr const int expected_splits = 3; + constexpr const char message_str[] = "Test"; + constexpr const int message_size = 1024 * 1024 / 2; // 0.5MB + // string message from test_msgs + const auto message = create_string_message(message_str, message_size); + // Use double the messages to continue to publish beyond the max recording size + constexpr const int message_count = 2 * recording_size / message_size; + + rosbag2_test_common::PublicationManager pub_manager; + pub_manager.setup_publisher(topic_name, message, message_count); + + std::stringstream command; + command << get_base_record_command() << + " --max-recording-size " << recording_size << + " --max-bag-size " << bagfile_split_size << + " --topics " << topic_name; + auto process_handle = start_execution(command.str()); + auto cleanup_process_handle = rcpputils::make_scope_exit( + [process_handle]() { + stop_execution(process_handle); + }); + + ASSERT_TRUE(pub_manager.wait_for_matched(topic_name)) << + "Expected find rosbag subscription"; + + pub_manager.run_publishers(); + + wait_for_storage_file(); + + stop_execution(process_handle); + cleanup_process_handle.cancel(); + + finalize_metadata_kludge(expected_splits); + wait_for_metadata(); + rosbag2_storage::MetadataIo metadata_io; + const auto metadata = metadata_io.read_metadata(root_bag_path_.generic_string()); + const auto actual_splits = static_cast(metadata.files.size()); + + EXPECT_EQ(actual_splits, expected_splits); + + uintmax_t actual_recording_size = 0; + for (int i = 0; i < actual_splits; ++i) { + const auto bagfile_path = root_bag_path_ / fs::path{metadata.files[i].path}; + ASSERT_TRUE(fs::exists(bagfile_path)) << + "Expected bag file: \"" << bagfile_path.generic_string() << "\" to exist."; + + const auto actual_split_size = static_cast(fs::file_size(bagfile_path)); + actual_recording_size += actual_split_size; + } + + // Actual size is guaranteed to be >= recording_size + EXPECT_LT(recording_size, actual_recording_size); +} + +TEST_P(RecordFixture, record_end_to_end_with_duration_stop_recording) { + constexpr const char topic_name[] = "/test_topic"; + constexpr const int recording_duration = 3; // 3 seconds + constexpr const char message_str[] = "Test"; + constexpr const int message_size = 1024 * 1024 / 4; // 0.25MB + constexpr const int message_time = 250; // 250ms + // string message from test_msgs + const auto message = create_string_message(message_str, message_size); + // Use double the messages to continue to publish beyond the max recording duration + constexpr const int message_count = 2 * recording_duration / message_time; + + rosbag2_test_common::PublicationManager pub_manager; + pub_manager.setup_publisher(topic_name, message, message_count); + + std::stringstream command; + command << get_base_record_command() << + " --max-recording-duration " << recording_duration << + " --topics " << topic_name; + auto process_handle = start_execution(command.str()); + auto cleanup_process_handle = rcpputils::make_scope_exit( + [process_handle]() { + stop_execution(process_handle); + }); + + ASSERT_TRUE(pub_manager.wait_for_matched(topic_name)) << + "Expected find rosbag subscription"; + + wait_for_storage_file(); + + pub_manager.run_publishers(); + + stop_execution(process_handle); + cleanup_process_handle.cancel(); + + finalize_metadata_kludge(); + wait_for_metadata(); + rosbag2_storage::MetadataIo metadata_io; + const auto metadata = metadata_io.read_metadata(root_bag_path_.generic_string()); + auto actual_recording_duration = std::chrono::duration_cast( + metadata.duration).count(); + + // Actual duration is guaranteed to be < recording_duration + EXPECT_GT(recording_duration, actual_recording_duration); +} + +TEST_P(RecordFixture, record_end_to_end_with_messages_stop_recording) { + constexpr const char topic_name[] = "/test_topic"; + constexpr const int recording_messages = 10; + constexpr const char message_str[] = "Test"; + constexpr const int message_size = 1024 * 1024 / 2; // 0.5MB + // string message from test_msgs + const auto message = create_string_message(message_str, message_size); + // Use double the messages to continue to publish beyond the max recording messages + constexpr const int message_count = 2 * recording_messages; + + rosbag2_test_common::PublicationManager pub_manager; + pub_manager.setup_publisher(topic_name, message, message_count); + + std::stringstream command; + command << get_base_record_command() << + " --max-recording-messages " << recording_messages << + " --topics " << topic_name; + auto process_handle = start_execution(command.str()); + auto cleanup_process_handle = rcpputils::make_scope_exit( + [process_handle]() { + stop_execution(process_handle); + }); + + ASSERT_TRUE(pub_manager.wait_for_matched(topic_name)) << + "Expected find rosbag subscription"; + + pub_manager.run_publishers(); + + wait_for_storage_file(); + + stop_execution(process_handle); + cleanup_process_handle.cancel(); + + finalize_metadata_kludge(); + wait_for_metadata(); + rosbag2_storage::MetadataIo metadata_io; + const auto metadata = metadata_io.read_metadata(root_bag_path_.generic_string()); + uintmax_t actual_recording_messages = metadata.message_count; + + EXPECT_EQ(recording_messages, actual_recording_messages); +} + TEST_P(RecordFixture, record_fails_gracefully_if_bag_already_exists) { auto bag_path = _SRC_RESOURCES_DIR_PATH; // variable defined in CMakeLists.txt diff --git a/rosbag2_transport/include/rosbag2_transport/recorder.hpp b/rosbag2_transport/include/rosbag2_transport/recorder.hpp index 6755fd5ff6..ecc8c32973 100644 --- a/rosbag2_transport/include/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/include/rosbag2_transport/recorder.hpp @@ -36,6 +36,7 @@ #include "rosbag2_interfaces/srv/snapshot.hpp" #include "rosbag2_interfaces/srv/split_bagfile.hpp" +#include "rosbag2_interfaces/msg/stop_recording_event.hpp" #include "rosbag2_interfaces/msg/write_split_event.hpp" #include "rosbag2_storage/topic_metadata.hpp" @@ -154,6 +155,14 @@ class Recorder : public rclcpp::Node ROSBAG2_TRANSPORT_PUBLIC bool is_paused(); + /// \brief Waits on the condition variable until the record thread finishes. + /// @param timeout Maximum time in the fraction of seconds to wait for recorder to finish. + /// If timeout is negative, the wait_for_recording_to_finish will be a blocking call. + /// @return true if recording finished during timeout, otherwise false. + ROSBAG2_TRANSPORT_PUBLIC + bool wait_for_recording_to_finish( + std::chrono::duration timeout = std::chrono::seconds(-1)); + inline constexpr static const auto kPauseResumeToggleKey = KeyboardHandler::KeyCode::SPACE; protected: diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index 62a69732b5..ce282f3d34 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -29,6 +29,8 @@ #include "rclcpp/logging.hpp" #include "rclcpp/clock.hpp" +#include "rcpputils/unique_lock.hpp" + #include "rmw/types.h" #include "rosbag2_cpp/bag_events.hpp" @@ -88,6 +90,9 @@ class RecorderImpl std::unordered_map get_requested_or_available_topics(); + bool wait_for_recording_to_finish( + std::chrono::duration timeout = std::chrono::seconds(-1)); + /// Public members for access by wrapper std::unordered_set topics_warned_about_incompatibility_; std::shared_ptr writer_; @@ -128,6 +133,7 @@ class RecorderImpl void event_publisher_thread_main(); bool event_publisher_thread_should_wake(); + bool event_publisher_thread_should_exit(); rclcpp::Node * node; std::unique_ptr topic_filter_; @@ -145,19 +151,26 @@ class RecorderImpl std::mutex discovery_mutex_; std::atomic stop_discovery_ = false; std::atomic_uchar paused_ = 0; - std::atomic in_recording_ = false; + std::atomic in_recording_ RCPPUTILS_TSA_GUARDED_BY(start_stop_transition_mutex_) = false; std::shared_ptr keyboard_handler_; KeyboardHandler::callback_handle_t toggle_paused_key_callback_handle_ = KeyboardHandler::invalid_handle; // Variables for event publishing rclcpp::Publisher::SharedPtr split_event_pub_; + rclcpp::Publisher::SharedPtr + stop_recording_event_pub_; std::atomic event_publisher_thread_should_exit_ = false; std::atomic write_split_has_occurred_ = false; rosbag2_cpp::bag_events::BagSplitInfo bag_split_info_; std::mutex event_publisher_thread_mutex_; std::condition_variable event_publisher_thread_wake_cv_; std::thread event_publisher_thread_; + std::mutex signal_stop_recording_mutex_; + std::atomic signal_stop_recording_ = false; + std::atomic stop_recording_has_occurred_ = false; + std::condition_variable stop_recording_cv_; + rosbag2_cpp::bag_events::StopRecordingInfo stop_recording_info_; }; RecorderImpl::RecorderImpl( @@ -229,7 +242,7 @@ RecorderImpl::~RecorderImpl() void RecorderImpl::stop() { - std::lock_guard state_lock(start_stop_transition_mutex_); + rcpputils::unique_lock state_lock(start_stop_transition_mutex_); if (!in_recording_) { RCLCPP_DEBUG(node->get_logger(), "Recording has already been stopped or not running."); return; @@ -242,6 +255,7 @@ void RecorderImpl::stop() { std::lock_guard lock(event_publisher_thread_mutex_); + stop_recording_has_occurred_ = true; event_publisher_thread_should_exit_ = true; } event_publisher_thread_wake_cv_.notify_all(); @@ -250,11 +264,12 @@ void RecorderImpl::stop() } in_recording_ = false; RCLCPP_INFO(node->get_logger(), "Recording stopped"); + stop_recording_cv_.notify_all(); } void RecorderImpl::record() { - std::lock_guard state_lock(start_stop_transition_mutex_); + rcpputils::unique_lock state_lock(start_stop_transition_mutex_); if (in_recording_.exchange(true)) { RCLCPP_WARN_STREAM( node->get_logger(), @@ -327,6 +342,9 @@ void RecorderImpl::record() split_event_pub_ = node->create_publisher("events/write_split", 1); + stop_recording_event_pub_ = + node->create_publisher("events/stop_recording", 1); + // Start the thread that will publish events { std::lock_guard lock(event_publisher_thread_mutex_); @@ -334,6 +352,7 @@ void RecorderImpl::record() event_publisher_thread_ = std::thread(&RecorderImpl::event_publisher_thread_main, this); } + stop_recording_info_ = {storage_options_.uri, ""}; rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; callbacks.write_split_callback = [this](rosbag2_cpp::bag_events::BagSplitInfo & info) { @@ -344,6 +363,20 @@ void RecorderImpl::record() } event_publisher_thread_wake_cv_.notify_all(); }; + callbacks.stop_recording_callback = + [this](rosbag2_cpp::bag_events::StopRecordingInfo & info) { + // Don't notify the event publisher thread here, since it will + // be notified by the RecorderImpl::stop() function. + { + std::lock_guard lock(event_publisher_thread_mutex_); + stop_recording_info_ = info; + } + { + std::lock_guard lock(signal_stop_recording_mutex_); + signal_stop_recording_ = true; + } + stop_recording_cv_.notify_all(); + }; writer_->add_event_callbacks(callbacks); serialization_format_ = record_options_.rmw_serialization_format; @@ -366,7 +399,7 @@ void RecorderImpl::record() void RecorderImpl::event_publisher_thread_main() { RCLCPP_INFO(node->get_logger(), "Event publisher thread: Starting"); - while (!event_publisher_thread_should_exit_.load()) { + while (!event_publisher_thread_should_exit()) { std::unique_lock lock(event_publisher_thread_mutex_); event_publisher_thread_wake_cv_.wait( lock, @@ -391,13 +424,41 @@ void RecorderImpl::event_publisher_thread_main() "Failed to publish message on '/events/write_split' topic."); } } + + if (stop_recording_has_occurred_) { + stop_recording_has_occurred_ = false; + + auto message = rosbag2_interfaces::msg::StopRecordingEvent(); + message.base_folder = stop_recording_info_.base_folder; + message.reason = stop_recording_info_.reason; + message.node_name = node->get_fully_qualified_name(); + try { + stop_recording_event_pub_->publish(message); + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + node->get_logger(), + "Failed to publish message on '/events/stop_recording' topic. \nError: " << e.what()); + } catch (...) { + RCLCPP_ERROR_STREAM( + node->get_logger(), + "Failed to publish message on '/events/stop_recording' topic."); + } + } } RCLCPP_INFO(node->get_logger(), "Event publisher thread: Exiting"); } bool RecorderImpl::event_publisher_thread_should_wake() { - return write_split_has_occurred_ || event_publisher_thread_should_exit_; + return write_split_has_occurred_ || stop_recording_has_occurred_ || + event_publisher_thread_should_exit_; +} + +bool RecorderImpl::event_publisher_thread_should_exit() +{ + std::lock_guard lock(event_publisher_thread_mutex_); + return !write_split_has_occurred_ && !stop_recording_has_occurred_ && + event_publisher_thread_should_exit_; } const rosbag2_cpp::Writer & RecorderImpl::get_writer_handle() @@ -517,6 +578,22 @@ RecorderImpl::get_requested_or_available_topics() return topic_filter_->filter_topics(all_topics_and_types); } +bool RecorderImpl::wait_for_recording_to_finish(std::chrono::duration timeout) +{ + rcpputils::unique_lock state_lock(start_stop_transition_mutex_); + if (timeout.count() < 0) { + stop_recording_cv_.wait(state_lock, [this] { + return !in_recording_.load() || signal_stop_recording_.load(); + } + ); + return true; + } else { + return stop_recording_cv_.wait_for( + state_lock, + timeout, [this] {return !in_recording_.load() || signal_stop_recording_.load();}); + } +} + std::unordered_map RecorderImpl::get_missing_topics(const std::unordered_map & all_topics) { @@ -853,6 +930,11 @@ Recorder::is_paused() return pimpl_->is_paused(); } +bool Recorder::wait_for_recording_to_finish(std::chrono::duration timeout) +{ + return pimpl_->wait_for_recording_to_finish(timeout); +} + std::unordered_map Recorder::get_requested_or_available_topics() {