Skip to content

Commit

Permalink
Add options to conditionally stop the recording
Browse files Browse the repository at this point in the history
- publish event when recording stops

Signed-off-by: Nicola Loi <[email protected]>
  • Loading branch information
nicolaloi committed Jan 13, 2025
1 parent ffd8c7d commit 36978ea
Show file tree
Hide file tree
Showing 19 changed files with 511 additions and 14 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ Messages written to the bag will use the latest received value of `/clock` for t
Note: Until the first `/clock` message is received, the recorder will not write any messages.
Before that message is received, the time is 0, which leads to a significant time jump once simulation time begins, making the bag essentially unplayable if messages are written first with time 0 and then time N from `/clock`.

#### Recording with termination conditions

rosbag2 can be configured to stop recording after a certain size, duration or number of messages is reached (considering **all** the bag files of the current recording). By default the recording will continue until an external signal is stopping it, but this can be changed using the CLI options.

_Stopping by size_: `ros2 bag record -a --max-recording-size 100000` will stop the recording after it becomes greater than 100 kilobytes. This option defaults to `0`, which means the recording will not stop based on size.

_Stopping by duration_: `ros2 bag record -a --max-recording-duration 9000` will stop the recording after `9000` seconds. This option defaults to `0`, which means the recording will not stop based on duration.

_Stopping by number of messages_: `ros2 bag record -a --max-recording-messages 1000` will stop the recording after `1000` messages have been recorded. This option defaults to `0`, which means the recording will not stop based on number of messages.

If multiple stopping conditions are enabled, the recording will stop at whichever threshold is reached first.

#### Splitting files during recording

rosbag2 offers the capability to split bag files when they reach a maximum size or after a specified duration. By default rosbag2 will record all data into a single bag file, but this can be changed using the CLI options.
Expand Down Expand Up @@ -253,6 +265,9 @@ output_bags:
storage_id: "" # will use the default storage plugin, if unspecified
max_bagfile_size: 0
max_bagfile_duration: 0
max_recording_size: 0
max_recording_duration: 0
max_recording_messages: 0
storage_preset_profile: ""
storage_config_uri: ""
all_topics: false
Expand Down
17 changes: 16 additions & 1 deletion ros2bag/ros2bag/verb/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,18 @@ def add_recorder_arguments(parser: ArgumentParser) -> None:
'Default: %(default)d, recording written in single bagfile and splitting '
'is disabled. If both splitting by size and duration are enabled, '
'the bag will split at whichever threshold is reached first.')
parser.add_argument(
'--max-recording-size', type=int, default=0,
help='Maximum size in bytes before the recording will be stopped. '
'Default: %(default)d, recording will not stop based on size.')
parser.add_argument(
'--max-recording-duration', type=int, default=0,
help='Maximum duration in seconds before the recording will be stopped. '
'Default: %(default)d, recording will not stop based on duration.')
parser.add_argument(
'--max-recording-messages', type=int, default=0,
help='Maximum number of messages before the recording will be stopped. '
'Default: %(default)d, recording will not stop based on number of messages.')
parser.add_argument(
'--max-cache-size', type=int, default=100 * 1024 * 1024,
help='Maximum size (in bytes) of messages to hold in each buffer of cache. '
Expand Down Expand Up @@ -325,7 +337,10 @@ def main(self, *, args): # noqa: D102
storage_preset_profile=args.storage_preset_profile,
storage_config_uri=storage_config_file,
snapshot_mode=args.snapshot_mode,
custom_data=custom_data
custom_data=custom_data,
max_recording_size=args.max_recording_size,
max_recording_duration=args.max_recording_duration,
max_recording_messages=args.max_recording_messages,
)
record_options = RecordOptions()
record_options.all_topics = args.all_topics or args.all
Expand Down
16 changes: 16 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/bag_events.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ enum class BagEvent
WRITE_SPLIT,
/// Reading of the input bag file has gone over a split, opening the next file.
READ_SPLIT,
/// The recording has reached a predetermined stopping condition.
STOP_RECORDING
};

/**
Expand All @@ -51,7 +53,19 @@ struct BagSplitInfo
std::string opened_file;
};

/**
* \brief The information structure passed to callbacks for the STOP_RECORDING event.
*/
struct StopRecordingInfo
{
/// The base folder where the recording was taking place.
std::string base_folder;
/// The reason for stopping the recording.
std::string reason;
};

using BagSplitCallbackType = std::function<void (BagSplitInfo &)>;
using BagStopRecordingCallbackType = std::function<void (StopRecordingInfo &)>;

/**
* \brief Use this structure to register callbacks with Writers.
Expand All @@ -60,6 +74,8 @@ struct WriterEventCallbacks
{
/// The callback to call for the WRITE_SPLIT event.
BagSplitCallbackType write_split_callback;
/// The callback to call for the STOP_RECORDING event.
BagStopRecordingCallbackType stop_recording_callback;
};

/**
Expand Down
32 changes: 32 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/stop_recording_options.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2025 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_CPP__STOP_RECORDING_OPTIONS_HPP_
#define ROSBAG2_CPP__STOP_RECORDING_OPTIONS_HPP_

#include <cstdint>

namespace rosbag2_cpp
{

struct StopRecordingOptions
{
uint64_t max_duration;
uint64_t max_size;
uint64_t max_messages;
};

} // namespace rosbag2_cpp

#endif // ROSBAG2_CPP__STOP_RECORDING_OPTIONS_HPP_
1 change: 1 addition & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "rosbag2_cpp/bag_events.hpp"
#include "rosbag2_cpp/converter_options.hpp"
#include "rosbag2_cpp/stop_recording_options.hpp"
#include "rosbag2_cpp/visibility_control.hpp"
#include "rosbag2_cpp/writers/sequential_writer.hpp"

Expand Down
23 changes: 23 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
void execute_bag_split_callbacks(
const std::string & closed_file, const std::string & opened_file);

void signal_stop_recording(const std::string & out_stop_reason);

void execute_signal_stop_recording_callbacks(const std::string & out_stop_reason);

void switch_to_next_storage();

std::string format_storage_uri(
Expand All @@ -183,6 +187,16 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
bool should_split_bagfile(
const std::chrono::time_point<std::chrono::high_resolution_clock> & current_time) const;

// Checks if the recording needs to be stopped due to predefined conditions
// before writing the current message.
bool should_stop_recording_pre_write(
const std::chrono::time_point<std::chrono::high_resolution_clock> & current_time,
std::string & stop_reason) const;

// Checks if the recording needs to be stopped due to predefined conditions
// after writing the current message.
bool should_stop_recording_post_write(std::string & stop_reason) const;

// Checks if the message to be written is within accepted time range
bool message_within_accepted_time_range(
const rcutils_time_point_value_t current_time) const;
Expand All @@ -200,6 +214,15 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
get_writeable_message(
std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message);

// Used to track the approximate size of the previously splitted bags.
uint64_t previous_bags_size_ {0};

// Used to track the number of messages in the previously splitted bags.
uint64_t previous_bags_num_messages_ {0};

// Used to track if the stop recording signal has been already sent.
bool signaled_stop_recording_ {false};

private:
/// Helper method to write messages while also updating tracked metadata.
void write_messages(
Expand Down
87 changes: 87 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@ void SequentialWriter::switch_to_next_storage()
message_cache_->log_dropped();
}

previous_bags_size_ += storage_->get_bagfile_size();
previous_bags_num_messages_ += metadata_.files.back().message_count;

storage_->update_metadata(metadata_);
storage_options_.uri = format_storage_uri(
base_folder_,
Expand Down Expand Up @@ -385,8 +388,30 @@ void SequentialWriter::split_bagfile()
(void)split_bagfile_local();
}

void SequentialWriter::signal_stop_recording(const std::string & reason)
{
if (signaled_stop_recording_) {
return;
}
execute_signal_stop_recording_callbacks(reason);
signaled_stop_recording_ = true;
}

void SequentialWriter::execute_signal_stop_recording_callbacks(
const std::string & reason)
{
auto info = std::make_shared<bag_events::StopRecordingInfo>();
info->base_folder = base_folder_;
info->reason = reason;
callback_manager_.execute_callbacks(bag_events::BagEvent::STOP_RECORDING, info);
}

void SequentialWriter::write(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message)
{
if (signaled_stop_recording_) {
return;
}

if (!is_open_) {
throw std::runtime_error("Bag is not open. Call open() before writing.");
}
Expand Down Expand Up @@ -415,6 +440,12 @@ void SequentialWriter::write(std::shared_ptr<const rosbag2_storage::SerializedBa
is_first_message_ = false;
}

std::string out_stop_reason;
if (should_stop_recording_pre_write(message_timestamp, out_stop_reason)) {
signal_stop_recording(out_stop_reason);
return;
}

if (!storage_options_.snapshot_mode && should_split_bagfile(message_timestamp)) {
split_bagfile();
metadata_.files.back().starting_time = message_timestamp;
Expand Down Expand Up @@ -442,6 +473,11 @@ void SequentialWriter::write(std::shared_ptr<const rosbag2_storage::SerializedBa
// Otherwise, use cache buffer
message_cache_->push(converted_msg);
}

if (should_stop_recording_post_write(out_stop_reason)) {
signal_stop_recording(out_stop_reason);
return;
}
}

bool SequentialWriter::take_snapshot()
Expand Down Expand Up @@ -508,6 +544,52 @@ bool SequentialWriter::message_within_accepted_time_range(
return true;
}

bool SequentialWriter::should_stop_recording_pre_write(
const std::chrono::time_point<std::chrono::high_resolution_clock> & current_time,
std::string & out_stop_reason) const
{
// Stopping by time
if (storage_options_.max_recording_duration !=
rosbag2_storage::storage_interfaces::MAX_RECORDING_DURATION_NO_STOP)
{
auto max_duration_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::seconds(storage_options_.max_recording_duration));
if ((current_time - metadata_.starting_time) > max_duration_ns) {
out_stop_reason = "max duration";
return true;
}
}

return false;
}

bool SequentialWriter::should_stop_recording_post_write(std::string & out_stop_reason) const
{
// Stopping by size
if (storage_options_.max_recording_size !=
rosbag2_storage::storage_interfaces::MAX_RECORDING_SIZE_NO_STOP)
{
if (previous_bags_size_ + storage_->get_bagfile_size() >= storage_options_.max_recording_size) {
out_stop_reason = "max size";
return true;
}
}

// Stopping by number of messages
if (storage_options_.max_recording_messages !=
rosbag2_storage::storage_interfaces::MAX_RECORDING_MESSAGES_NO_STOP)
{
if (previous_bags_num_messages_ + metadata_.files.back().message_count >=
storage_options_.max_recording_messages)
{
out_stop_reason = "max messages";
return true;
}
}

return false;
}

void SequentialWriter::finalize_metadata()
{
metadata_.bag_size = 0;
Expand Down Expand Up @@ -563,6 +645,11 @@ void SequentialWriter::add_event_callbacks(const bag_events::WriterEventCallback
callbacks.write_split_callback,
bag_events::BagEvent::WRITE_SPLIT);
}
if (callbacks.stop_recording_callback) {
callback_manager_.add_event_callback(
callbacks.stop_recording_callback,
bag_events::BagEvent::STOP_RECORDING);
}
}

} // namespace writers
Expand Down
1 change: 1 addition & 0 deletions rosbag2_interfaces/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
"msg/ReadSplitEvent.msg"
"msg/StopRecordingEvent.msg"
"msg/WriteSplitEvent.msg"
"srv/Burst.srv"
"srv/GetRate.srv"
Expand Down
8 changes: 8 additions & 0 deletions rosbag2_interfaces/msg/StopRecordingEvent.msg
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# The full path of the base folder where the recording was taking place.
string base_folder

# The reason for stopping the recording.
string reason

# The fully qualified node name of the event sender
string node_name
5 changes: 4 additions & 1 deletion rosbag2_py/rosbag2_py/_storage.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,16 @@ class StorageOptions:
max_bagfile_duration: int
max_bagfile_size: int
max_cache_size: int
max_recording_duration: int
max_recording_messages: int
max_recording_size: int
snapshot_mode: bool
start_time_ns: int
storage_config_uri: str
storage_id: str
storage_preset_profile: str
uri: str
def __init__(self, uri: str, storage_id: str = ..., max_bagfile_size: int = ..., max_bagfile_duration: int = ..., max_cache_size: int = ..., storage_preset_profile: str = ..., storage_config_uri: str = ..., snapshot_mode: bool = ..., start_time_ns: int = ..., end_time_ns: int = ..., custom_data: Dict[str, str] = ...) -> None: ...
def __init__(self, uri: str, storage_id: str = ..., max_bagfile_size: int = ..., max_bagfile_duration: int = ..., max_cache_size: int = ..., storage_preset_profile: str = ..., storage_config_uri: str = ..., snapshot_mode: bool = ..., start_time_ns: int = ..., end_time_ns: int = ..., custom_data: Dict[str, str] = ..., max_recording_size: int = ..., max_recording_duration: int = ..., max_recording_messages: int = ...) -> None: ...

class TopicInformation:
message_count: int
Expand Down
18 changes: 15 additions & 3 deletions rosbag2_py/src/rosbag2_py/_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ PYBIND11_MODULE(_storage, m) {
.def(
pybind11::init<
std::string, std::string, uint64_t, uint64_t, uint64_t, std::string, std::string, bool,
int64_t, int64_t, KEY_VALUE_MAP>(),
int64_t, int64_t, KEY_VALUE_MAP, uint64_t, uint64_t, uint64_t>(),
pybind11::arg("uri"),
pybind11::arg("storage_id") = "",
pybind11::arg("max_bagfile_size") = 0,
Expand All @@ -94,7 +94,10 @@ PYBIND11_MODULE(_storage, m) {
pybind11::arg("snapshot_mode") = false,
pybind11::arg("start_time_ns") = -1,
pybind11::arg("end_time_ns") = -1,
pybind11::arg("custom_data") = KEY_VALUE_MAP{})
pybind11::arg("custom_data") = KEY_VALUE_MAP{},
pybind11::arg("max_recording_size") = 0,
pybind11::arg("max_recording_duration") = 0,
pybind11::arg("max_recording_messages") = 0)
.def_readwrite("uri", &rosbag2_storage::StorageOptions::uri)
.def_readwrite("storage_id", &rosbag2_storage::StorageOptions::storage_id)
.def_readwrite(
Expand Down Expand Up @@ -123,7 +126,16 @@ PYBIND11_MODULE(_storage, m) {
&rosbag2_storage::StorageOptions::end_time_ns)
.def_readwrite(
"custom_data",
&rosbag2_storage::StorageOptions::custom_data);
&rosbag2_storage::StorageOptions::custom_data)
.def_readwrite(
"max_recording_size",
&rosbag2_storage::StorageOptions::max_recording_size)
.def_readwrite(
"max_recording_duration",
&rosbag2_storage::StorageOptions::max_recording_duration)
.def_readwrite(
"max_recording_messages",
&rosbag2_storage::StorageOptions::max_recording_messages);

pybind11::class_<rosbag2_storage::StorageFilter>(m, "StorageFilter")
.def(
Expand Down
Loading

0 comments on commit 36978ea

Please sign in to comment.