From bdd8196660e157e6db15f72e43e7ceb0d311ba78 Mon Sep 17 00:00:00 2001 From: Jari Nippula Date: Fri, 22 Sep 2023 20:53:23 +0300 Subject: [PATCH] mavlink_ulog: parallel mavlink logging --- src/modules/mavlink/mavlink_ulog.cpp | 85 +++++++++++++++++++--------- src/modules/mavlink/mavlink_ulog.h | 4 +- 2 files changed, 62 insertions(+), 27 deletions(-) diff --git a/src/modules/mavlink/mavlink_ulog.cpp b/src/modules/mavlink/mavlink_ulog.cpp index 64b9ec981d20..bc7f05d33205 100644 --- a/src/modules/mavlink/mavlink_ulog.cpp +++ b/src/modules/mavlink/mavlink_ulog.cpp @@ -60,6 +60,12 @@ MavlinkULog::MavlinkULog(int datarate, float max_rate_factor, uint8_t target_sys } + PX4_INFO("[Mavlink_ulog] Parallel logging supported"); + + while (_ulog_stream_acked_sub.update()) { + + } + _waiting_for_initial_ack = true; _last_sent_time = hrt_absolute_time(); //(ab)use this timestamp during initialization _next_rate_check = _last_sent_time + _rate_calculation_delta_t; @@ -84,7 +90,7 @@ int MavlinkULog::handle_update(mavlink_channel_t channel) static_assert(sizeof(ulog_stream_s::data) == MAVLINK_MSG_LOGGING_DATA_FIELD_DATA_LEN, "Invalid uorb ulog_stream.data length"); static_assert(sizeof(ulog_stream_s::data) == MAVLINK_MSG_LOGGING_DATA_ACKED_FIELD_DATA_LEN, - "Invalid uorb ulog_stream.data length"); + "Invalid uorb ulog_stream_acked.data length"); if (_waiting_for_initial_ack) { if (hrt_elapsed_time(&_last_sent_time) > 3e5) { @@ -96,8 +102,10 @@ int MavlinkULog::handle_update(mavlink_channel_t channel) } // check if we're waiting for an ACK + bool check_for_updates = true; + if (_last_sent_time) { - bool check_for_updates = false; + check_for_updates = false; if (_ack_received) { _last_sent_time = 0; @@ -113,7 +121,7 @@ int MavlinkULog::handle_update(mavlink_channel_t channel) PX4_DEBUG("re-sending ulog mavlink message (try=%i)", _sent_tries); _last_sent_time = hrt_absolute_time(); - const ulog_stream_s &ulog_data = _ulog_stream_sub.get(); + const ulog_stream_s &ulog_data = _ulog_stream_acked_sub.get(); mavlink_logging_data_acked_t msg; msg.sequence = ulog_data.msg_sequence; @@ -126,25 +134,58 @@ int MavlinkULog::handle_update(mavlink_channel_t channel) } } } - - if (!check_for_updates) { - return 0; - } } + while ((_current_num_msgs < _max_num_messages) && (_ulog_stream_sub.updated() || _ulog_stream_acked_sub.updated())) { + if (_ulog_stream_sub.updated()) { + const unsigned last_generation = _ulog_stream_sub.get_last_generation(); + _ulog_stream_sub.update(); + + if (_ulog_stream_sub.get_last_generation() != last_generation + 1) { + perf_count(_msg_missed_ulog_stream_perf); + } + + const ulog_stream_s &ulog_data = _ulog_stream_sub.get(); + + if (ulog_data.timestamp > 0) { + if (ulog_data.flags & ulog_stream_s::FLAGS_NEED_ACK) { + _sent_tries = 1; + _last_sent_time = hrt_absolute_time(); + lock(); + _wait_for_ack_sequence = ulog_data.msg_sequence; + _ack_received = false; + unlock(); + + mavlink_logging_data_acked_t msg; + msg.sequence = ulog_data.msg_sequence; + msg.length = ulog_data.length; + msg.first_message_offset = ulog_data.first_message_offset; + msg.target_system = _target_system; + msg.target_component = _target_component; + memcpy(msg.data, ulog_data.data, sizeof(msg.data)); + mavlink_msg_logging_data_acked_send_struct(channel, &msg); - while ((_current_num_msgs < _max_num_messages) && _ulog_stream_sub.updated()) { - const unsigned last_generation = _ulog_stream_sub.get_last_generation(); - _ulog_stream_sub.update(); + } else { + mavlink_logging_data_t msg; + msg.sequence = ulog_data.msg_sequence; + msg.length = ulog_data.length; + msg.first_message_offset = ulog_data.first_message_offset; + msg.target_system = _target_system; + msg.target_component = _target_component; + memcpy(msg.data, ulog_data.data, sizeof(msg.data)); + mavlink_msg_logging_data_send_struct(channel, &msg); + } + } - if (_ulog_stream_sub.get_last_generation() != last_generation + 1) { - perf_count(_msg_missed_ulog_stream_perf); + ++_current_num_msgs; } - const ulog_stream_s &ulog_data = _ulog_stream_sub.get(); + if (check_for_updates && _ulog_stream_acked_sub.updated()) { + _ulog_stream_acked_sub.update(); + + const ulog_stream_s &ulog_data = _ulog_stream_acked_sub.get(); - if (ulog_data.timestamp > 0) { - if (ulog_data.flags & ulog_stream_s::FLAGS_NEED_ACK) { + if (ulog_data.timestamp > 0) { _sent_tries = 1; _last_sent_time = hrt_absolute_time(); lock(); @@ -161,19 +202,10 @@ int MavlinkULog::handle_update(mavlink_channel_t channel) memcpy(msg.data, ulog_data.data, sizeof(msg.data)); mavlink_msg_logging_data_acked_send_struct(channel, &msg); - } else { - mavlink_logging_data_t msg; - msg.sequence = ulog_data.msg_sequence; - msg.length = ulog_data.length; - msg.first_message_offset = ulog_data.first_message_offset; - msg.target_system = _target_system; - msg.target_component = _target_component; - memcpy(msg.data, ulog_data.data, sizeof(msg.data)); - mavlink_msg_logging_data_send_struct(channel, &msg); } - } - ++_current_num_msgs; + ++_current_num_msgs; + } } //need to update the rate? @@ -247,6 +279,7 @@ void MavlinkULog::handle_ack(mavlink_logging_ack_t ack) lock(); if (_instance) { // make sure stop() was not called right before + //PX4_INFO("<< %u (wait: %u)", ack.sequence, _wait_for_ack_sequence); if (_wait_for_ack_sequence == ack.sequence) { _ack_received = true; publish_ack(ack.sequence); diff --git a/src/modules/mavlink/mavlink_ulog.h b/src/modules/mavlink/mavlink_ulog.h index 7c96553c6d49..9e534075920a 100644 --- a/src/modules/mavlink/mavlink_ulog.h +++ b/src/modules/mavlink/mavlink_ulog.h @@ -125,7 +125,9 @@ class MavlinkULog static constexpr hrt_abstime _rate_calculation_delta_t = 100_ms; ///< rate update interval uORB::SubscriptionData _ulog_stream_sub{ORB_ID(ulog_stream)}; - uORB::Publication _ulog_stream_ack_pub{ORB_ID(ulog_stream_ack)}; + uORB::SubscriptionData _ulog_stream_acked_sub {ORB_ID(ulog_stream_acked)}; + + uORB::Publication _ulog_stream_ack_pub {ORB_ID(ulog_stream_ack)}; uint16_t _wait_for_ack_sequence; uint8_t _sent_tries = 0; volatile bool _ack_received = false; ///< set to true if a matching ack received