diff --git a/CMakeLists.txt b/CMakeLists.txt index e81f035..1758ab0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -76,10 +76,8 @@ daq_add_application(dpdklibs_test_frame_transmitter test_frame_transmitter.cxx T daq_add_application(dpdklibs_test_frame_receiver test_frame_receiver.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES} opmonlib::opmonlib) daq_add_application(dpdklibs_test_arp_response test_arp_response.cxx TEST LINK_LIBRARIES dpdklibs ${DPDK_LIBRARIES}) daq_add_application(dpdklibs_test_transmit_and_receive test_transmit_and_receive.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES}) -daq_add_application(dpdklibs_test_stats_reporting test_stats_reporting.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES}) daq_add_application(dpdklibs_test_dpdk_stats test_dpdk_stats.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES}) daq_add_application(dpdklibs_test_multi_process test_multi_proc.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES}) -#daq_add_application(dpdklibs_test_unique_caps test_unique_caps.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 fmt::fmt ${DPDK_LIBRARIES}) target_compile_options(dpdklibs PUBLIC ${DPDK_CFLAGS}) target_include_directories(dpdklibs PUBLIC ${DPDK_INCLUDE_DIRS}) diff --git a/include/dpdklibs/XstatsHelper.hpp b/include/dpdklibs/XstatsHelper.hpp index f1d5d90..f25badf 100644 --- a/include/dpdklibs/XstatsHelper.hpp +++ b/include/dpdklibs/XstatsHelper.hpp @@ -77,11 +77,14 @@ namespace dunedaq::dpdklibs { TLOG() << "Cannot get xstat values!"; //} else { } + + rte_eth_stats_get(m_iface_id, &m_eth_stats); } } int m_iface_id; bool m_allocated = false; + struct rte_eth_stats m_eth_stats; struct rte_eth_xstat_name *m_xstats_names; uint64_t *m_xstats_ids; uint64_t *m_xstats_values; diff --git a/include/dpdklibs/udp/Utils.hpp b/include/dpdklibs/udp/Utils.hpp index 08105c6..3b0b37d 100644 --- a/include/dpdklibs/udp/Utils.hpp +++ b/include/dpdklibs/udp/Utils.hpp @@ -9,7 +9,6 @@ #define DPDKLIBS_SRC_UTILS_HPP_ #include "IPV4UDPPacket.hpp" -#include "dpdklibs/receiverinfo/InfoStructs.hpp" #include "detdataformats/DAQEthHeader.hpp" #include "logging/Logging.hpp" @@ -92,114 +91,6 @@ struct StreamUID operator std::string() const { return fmt::format("({}, {}, {}, {})", det_id, crate_id, slot_id, stream_id); } }; -// Note that there's a difference between the string representation -// we'd want for a stream as an opmon label and the kind we'd want -// to print to screen. Thus this function... - -std::string -get_opmon_string(const StreamUID& sid); - -struct ReceiverStats -{ - - std::atomic total_packets = 0; - std::atomic min_packet_size = std::numeric_limits::max(); - std::atomic max_packet_size = std::numeric_limits::min(); - std::atomic max_timestamp_deviation = 0; // In absolute terms (positive *or* negative) - std::atomic max_seq_id_deviation = 0; // " " " - - std::atomic packets_since_last_reset = 0; - std::atomic bytes_since_last_reset = 0; - std::atomic bad_timestamps_since_last_reset = 0; - std::atomic bad_sizes_since_last_reset = 0; - std::atomic bad_seq_ids_since_last_reset = 0; - - ReceiverStats() = default; - ReceiverStats(const ReceiverStats& rhs); - ReceiverStats& operator=(const ReceiverStats& rhs); - - // If you've only been collecting stats for one out of every N - // packets, you'd want to pass N to "scale" to correct for this - - void scale(int64_t sf) - { - packets_since_last_reset = packets_since_last_reset.load() * sf; - bytes_since_last_reset = bytes_since_last_reset.load() * sf; - bad_timestamps_since_last_reset = bad_timestamps_since_last_reset.load() * sf; - bad_sizes_since_last_reset = bad_sizes_since_last_reset.load() * sf; - bad_seq_ids_since_last_reset = bad_seq_ids_since_last_reset.load() * sf; - } - - void reset() - { - packets_since_last_reset = 0; - bytes_since_last_reset = 0; - bad_timestamps_since_last_reset = 0; - bad_sizes_since_last_reset = 0; - bad_seq_ids_since_last_reset = 0; - max_timestamp_deviation = 0; - max_seq_id_deviation = 0; - } - - void merge(const std::vector& stats_vector); - - operator std::string() const; -}; - -// Derive quantities (e.g., bytes/s) from a ReceiverStats object and store it in a jsonnet-based struct -receiverinfo::Info -DeriveFromReceiverStats(const ReceiverStats& receiver_stats, double time_per_report); - -// This class will, on a per-stream basis, fill in ReceiverStats -// instances given packets. In the constructor you can tell it -// whether you want it to pay attention to a packet's sequence ID -// and/or timestamp and/or size. - -class PacketInfoAccumulator -{ - -public: - static constexpr int64_t s_ignorable_value = std::numeric_limits::max(); - static constexpr int64_t s_max_seq_id = 4095; - - PacketInfoAccumulator(int64_t expected_seq_id_step = s_ignorable_value, int64_t expected_timestamp_step = s_ignorable_value, int64_t expected_size = s_ignorable_value, int64_t process_nth_packet = 1); - - void process_packet(const detdataformats::DAQEthHeader& daq_hdr, const int64_t data_len); - void reset(); - void dump(); - - // get_and_reset_stream_stats() will take each per-stream statistic - // and reset the values used to calculate rates (packets per - // second, etc.) - - std::map get_and_reset_stream_stats(); - - // erase_stream_stats() is what you'd call between runs, so, e.g., your total packet count doesn't persist - void erase_stream_stats(); - - void set_expected_packet_size(int64_t packet_size) { m_expected_size = packet_size; } - - PacketInfoAccumulator(const PacketInfoAccumulator&) = delete; - PacketInfoAccumulator& operator=(const PacketInfoAccumulator&) = delete; - PacketInfoAccumulator(PacketInfoAccumulator&&) = delete; - PacketInfoAccumulator& operator=(PacketInfoAccumulator&&) = delete; - - ~PacketInfoAccumulator() = default; - -private: - std::map m_stream_stats_atomic; - - const int64_t m_expected_seq_id_step; - - const int64_t m_expected_timestamp_step; - int64_t m_expected_size; - const int64_t m_process_nth_packet; - - int64_t m_next_expected_seq_id[s_max_seq_id + 1]; - - std::map m_stream_last_timestamp; - std::map m_stream_last_seq_id; -}; } // namespace udp } // namespace dpdklibs } // namespace dunedaq diff --git a/plugins/NICReceiver.cpp b/plugins/NICReceiver.cpp index 1b53b6d..c41f252 100644 --- a/plugins/NICReceiver.cpp +++ b/plugins/NICReceiver.cpp @@ -108,8 +108,20 @@ NICReceiver::init(const std::shared_ptr mcfg ) ers::fatal(err); throw err; } - - m_sources[queue->get_source_id()] = createSourceModel(queue->UID()); + + // Check for CB prefix indicating Callback use + const char delim = '_'; + std::string target = queue->UID(); + std::vector words; + tokenize(target, delim, words); + int sourceid = -1; + + bool callback_mode = false; + if (words.front() == "cb") { + callback_mode = true; + } + + m_sources[queue->get_source_id()] = createSourceModel(queue->UID(), callback_mode); //m_sources[queue->get_source_id()]->init(); } } @@ -195,15 +207,7 @@ NICReceiver::do_configure(const data_t& /*args*/) ERS_HERE, "NICReceiver configuration failed due expected but unavailable interface!"); } } - - return; -} - -void -NICReceiver::do_start(const data_t&) -{ - TLOG() << get_name() << ": Entering do_start() method"; if (!m_run_marker.load()) { set_running(true); TLOG() << "Starting iface wrappers."; @@ -211,7 +215,20 @@ NICReceiver::do_start(const data_t&) iface->start(); } } else { - TLOG_DEBUG(5) << "NICReader is already running!"; + TLOG_DEBUG(5) << "iface wrappers are already running!"; + } + + return; + +} + +void +NICReceiver::do_start(const data_t&) +{ + + // Setup callbacks on all sourcemodels + for (auto& [sourceid, source] : m_sources) { + source->acquire_callback(); } for (auto& [iface_id, iface] : m_ifaces) { @@ -222,20 +239,6 @@ NICReceiver::do_start(const data_t&) void NICReceiver::do_stop(const data_t&) { - // TLOG() << get_name() << ": Entering do_stop() method"; - // if (m_run_marker.load()) { - // TLOG() << "Raising stop through variables!"; - // set_running(false); - // TLOG() << "Stopping iface wrappers."; - // for (auto& [iface_id, iface] : m_ifaces) { - // iface->stop(); - // } - // ealutils::wait_for_lcores(); - // TLOG() << "Stoppped DPDK lcore processors and internal threads..."; - // } else { - // TLOG_DEBUG(5) << "DPDK lcore processor is already stopped!"; - // } - // return; for (auto& [iface_id, iface] : m_ifaces) { iface->disable_flow(); } @@ -245,7 +248,19 @@ NICReceiver::do_stop(const data_t&) void NICReceiver::do_scrap(const data_t&) { - + TLOG() << get_name() << ": Entering do_scrap() method"; + if (m_run_marker.load()) { + TLOG() << "Raising stop through variables!"; + set_running(false); + TLOG() << "Stopping iface wrappers."; + for (auto& [iface_id, iface] : m_ifaces) { + iface->stop(); + } + ealutils::wait_for_lcores(); + TLOG() << "Stoppped DPDK lcore processors and internal threads..."; + } else { + TLOG_DEBUG(5) << "DPDK lcore processor is already stopped!"; + } } void diff --git a/schema/dpdklibs/confgen.jsonnet b/schema/dpdklibs/confgen.jsonnet deleted file mode 100644 index 1d4fefc..0000000 --- a/schema/dpdklibs/confgen.jsonnet +++ /dev/null @@ -1,39 +0,0 @@ - -// This is the configuration schema for dpdklibs - -local moo = import "moo.jsonnet"; -local sdc = import "daqconf/confgen.jsonnet"; -local daqconf = moo.oschema.hier(sdc).dunedaq.daqconf.confgen; - -local ns = "dunedaq.dpdklibs.confgen"; -local s = moo.oschema.schema(ns); - -// A temporary schema construction context. -local cs = { - number: s.number ("number", "i8", doc="a number"), - string : s.string("String", doc="A string field"), - flag: s.boolean( "Flag", doc="Parameter that can be used to enable or disable functionality"), - host: s.string( "Host", moo.re.dnshost, doc="A hostname"), - - dpdklibs : s.record("dpdklibs", [ - s.field('only_sender', self.flag, default=false, doc='Enable only the sender'), - s.field('only_reader', self.flag, default=false, doc='Enable only the reader'), - s.field('host_sender', self.host, default='np04-srv-021', doc='Host to run the sender on'), - s.field('host_reader', self.host, default='np04-srv-022', doc='Host to run the reader on'), - s.field('sender_rate', self.number, default=1, doc='Rate with which the sender sends packets'), - s.field('sender_cores', self.number, default=1, doc='How many cores to use for sending'), - s.field('sender_boards', self.number, default=1, doc='How many AMC boards to send from'), - s.field('sender_burst_size', self.number, default=1, doc='Burst size used for sending packets'), - s.field('sender_time_tick_difference', self.number, default=1000, doc='How many ticks between timestamps'), - s.field('opmon_impl', self.string, default='', doc='How many ticks between timestamps'), - s.field('ers_impl', self.string, default='', doc='How many ticks between timestamps'), - ]), - - dpdklibs_gen: s.record("dpdklibs_gen", [ - s.field('boot', daqconf.boot,default=daqconf.boot, doc='Boot parameters'), - s.field('dpdklibs', self.dpdklibs, default=self.dpdklibs, doc='dpdklibs parameters') - ]), -}; - -// Output a topologically sorted array. -sdc + moo.oschema.sort_select(cs, ns) \ No newline at end of file diff --git a/schema/dpdklibs/nicreaderinfo.jsonnet b/schema/dpdklibs/nicreaderinfo.jsonnet index 288930b..7acdcc8 100644 --- a/schema/dpdklibs/nicreaderinfo.jsonnet +++ b/schema/dpdklibs/nicreaderinfo.jsonnet @@ -9,65 +9,78 @@ local info = { // uint8 : s.number("uint8", "f8", doc="A double of 8 bytes"), uint8 : s.number("uint8", "u8", doc="An unsigned of 8 bytes"), uint4 : s.number("uint4", "u4", doc="An unsigned of 8 bytes"), -// TODO: rename to -// info: s.record("XStats", [ -info: s.record("Info", [ - s.field("groups_sent", self.uint8, 0, doc="Number of groups of frames sent"), - s.field("total_groups_sent", self.uint8, 0, doc="Total groups of frames sent"), - s.field("rx_good_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_good_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_missed_errors", self.uint8, 0, doc="New since last poll"), - s.field("rx_errors", self.uint8, 0, doc="New since last poll"), - s.field("rx_unicast_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_multicast_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_broadcast_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_dropped_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_unknown_protocol_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_crc_errors", self.uint8, 0, doc="New since last poll"), - s.field("rx_illegal_byte_errors", self.uint8, 0, doc="New since last poll"), - s.field("rx_error_bytes", self.uint8, 0, doc="New since last poll"), - s.field("mac_local_errors", self.uint8, 0, doc="New since last poll"), - s.field("mac_remote_errors", self.uint8, 0, doc="New since last poll"), - s.field("rx_len_errors", self.uint8, 0, doc="New since last poll"), - s.field("rx_xon_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_xoff_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_size_64_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_size_65_to_127_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_size_128_to_255_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_size_256_to_511_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_size_512_to_1023_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_size_1024_to_1522_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_size_1523_to_max_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_undersized_errors", self.uint8, 0, doc="New since last poll"), - s.field("rx_oversize_errors", self.uint8, 0, doc="New since last poll"), - s.field("rx_mac_short_pkt_dropped", self.uint8, 0, doc="New since last poll"), - s.field("rx_fragmented_errors", self.uint8, 0, doc="New since last poll"), - s.field("rx_jabber_errors", self.uint8, 0, doc="New since last poll"), - s.field("x", self.uint8, 0, doc="New since last poll"), - s.field("rx_q1_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_q2_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_q3_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_q4_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_q5_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_q6_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_q7_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_q8_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_q9_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_q10_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_q11_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_q0_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_q1_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_q2_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_q3_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_q4_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_q5_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_q6_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_q7_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_q8_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_q9_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_q10_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_q11_bytes", self.uint8, 0, doc="New since last poll"), - ], doc="NIC Extended Stats information"), + // TODO: rename to + // info: s.record("XStats", [ + ethinfo: s.record("EthStats", [ + s.field("ipackets", self.uint8, 0, doc="Received packets"), + s.field("opackets", self.uint8, 0, doc="Transmitted packets"), + s.field("ibytes", self.uint8, 0, doc="Received bytes"), + s.field("obytes", self.uint8, 0, doc="Transmitted bytes"), + s.field("imissed", self.uint8, 0, doc="Missed packets"), + s.field("ierrors", self.uint8, 0, doc="Receiver errors"), + s.field("oerrors", self.uint8, 0, doc="Output Errors"), + s.field("rx_nombuf", self.uint8, 0, doc="Number of Rx mbuf allocation failures"), + + ], doc="NIC Stats information"), + + info: s.record("EthXStats", [ + + s.field("groups_sent", self.uint8, 0, doc="Number of groups of frames sent"), + s.field("total_groups_sent", self.uint8, 0, doc="Total groups of frames sent"), + s.field("rx_good_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_good_bytes", self.uint8, 0, doc="New since last poll"), + s.field("rx_missed_errors", self.uint8, 0, doc="New since last poll"), + s.field("rx_errors", self.uint8, 0, doc="New since last poll"), + s.field("rx_unicast_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_multicast_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_broadcast_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_dropped_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_unknown_protocol_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_crc_errors", self.uint8, 0, doc="New since last poll"), + s.field("rx_illegal_byte_errors", self.uint8, 0, doc="New since last poll"), + s.field("rx_error_bytes", self.uint8, 0, doc="New since last poll"), + s.field("mac_local_errors", self.uint8, 0, doc="New since last poll"), + s.field("mac_remote_errors", self.uint8, 0, doc="New since last poll"), + s.field("rx_len_errors", self.uint8, 0, doc="New since last poll"), + s.field("rx_xon_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_xoff_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_size_64_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_size_65_to_127_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_size_128_to_255_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_size_256_to_511_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_size_512_to_1023_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_size_1024_to_1522_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_size_1523_to_max_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_undersized_errors", self.uint8, 0, doc="New since last poll"), + s.field("rx_oversize_errors", self.uint8, 0, doc="New since last poll"), + s.field("rx_mac_short_pkt_dropped", self.uint8, 0, doc="New since last poll"), + s.field("rx_fragmented_errors", self.uint8, 0, doc="New since last poll"), + s.field("rx_jabber_errors", self.uint8, 0, doc="New since last poll"), + s.field("x", self.uint8, 0, doc="New since last poll"), + s.field("rx_q1_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_q2_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_q3_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_q4_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_q5_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_q6_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_q7_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_q8_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_q9_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_q10_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_q11_packets", self.uint8, 0, doc="New since last poll"), + s.field("rx_q0_bytes", self.uint8, 0, doc="New since last poll"), + s.field("rx_q1_bytes", self.uint8, 0, doc="New since last poll"), + s.field("rx_q2_bytes", self.uint8, 0, doc="New since last poll"), + s.field("rx_q3_bytes", self.uint8, 0, doc="New since last poll"), + s.field("rx_q4_bytes", self.uint8, 0, doc="New since last poll"), + s.field("rx_q5_bytes", self.uint8, 0, doc="New since last poll"), + s.field("rx_q6_bytes", self.uint8, 0, doc="New since last poll"), + s.field("rx_q7_bytes", self.uint8, 0, doc="New since last poll"), + s.field("rx_q8_bytes", self.uint8, 0, doc="New since last poll"), + s.field("rx_q9_bytes", self.uint8, 0, doc="New since last poll"), + s.field("rx_q10_bytes", self.uint8, 0, doc="New since last poll"), + s.field("rx_q11_bytes", self.uint8, 0, doc="New since last poll"), + ], doc="NIC Extended Stats information"), queue: s.record("QueueStats", [ s.field("packets_received", self.uint8, 0, doc="Packets received"), @@ -76,9 +89,9 @@ queue: s.record("QueueStats", [ s.field("max_burst_size", self.uint4, 0, doc="Bytes received"), ], doc="Queue Statistics"), -source: s.record("SourceStats",[ - s.field("dropped_frames", self.uint4, 0, doc="Dropped frames"), -], doc="Source Statistics"), + source: s.record("SourceStats",[ + s.field("dropped_frames", self.uint4, 0, doc="Dropped frames"), + ], doc="Source Statistics"), }; diff --git a/src/CreateSource.hpp b/src/CreateSource.hpp index f02909c..26b0d86 100644 --- a/src/CreateSource.hpp +++ b/src/CreateSource.hpp @@ -26,7 +26,7 @@ DUNE_DAQ_TYPESTRING(dunedaq::fdreadoutlibs::types::TDEFrameTypeAdapter, "TDEFram namespace dpdklibs { std::unique_ptr -createSourceModel(const std::string& conn_uid) +createSourceModel(const std::string& conn_uid, bool callback_mode) { auto datatypes = dunedaq::iomanager::IOManager::get()->get_datatypes(conn_uid); if (datatypes.size() != 1) { @@ -41,8 +41,11 @@ createSourceModel(const std::string& conn_uid) // Create Model auto source_model = std::make_unique>(); + // For callback acquisition later (lazy) + source_model->set_sink_name(conn_uid); + // Setup sink (acquire pointer from QueueRegistry) - source_model->set_sink(conn_uid); + source_model->set_sink(conn_uid, callback_mode); // Get parser and sink //auto& parser = source_model->get_parser(); @@ -62,7 +65,8 @@ createSourceModel(const std::string& conn_uid) } else if (raw_dt.find("TDEFrame") != std::string::npos) { // WIB2 specific char arrays auto source_model = std::make_unique>(); - source_model->set_sink(conn_uid); + source_model->set_sink_name(conn_uid); + source_model->set_sink(conn_uid, callback_mode); //auto& parser = source_model->get_parser(); //parser.process_chunk_func = parsers::fixsizedChunkInto(sink); return source_model; diff --git a/src/IfaceWrapper.cpp b/src/IfaceWrapper.cpp index 6b25978..59fbe6e 100644 --- a/src/IfaceWrapper.cpp +++ b/src/IfaceWrapper.cpp @@ -124,14 +124,6 @@ IfaceWrapper::IfaceWrapper(const appdal::NICInterface *iface_cfg, source_to_sink TLOG() << "Append TX_Q=0 for ARP responses."; m_tx_qs.insert(0); - auto sr = iface_cfg->get_configuration()->get_stats_conf(); - /* - m_accum_ptr.reset( new udp::PacketInfoAccumulator(sr->get_expected_seq_id_step() > 0 ? sr->get_expected_seq_id_step() : udp::PacketInfoAccumulator::s_ignorable_value, - sr->get_expected_timestamp_step() > 0 ? sr->get_expected_timestamp_step() : udp::PacketInfoAccumulator::s_ignorable_value, - sr->get_expected_packet_size() > 0 ? sr->get_expected_packet_size() : udp::PacketInfoAccumulator::s_ignorable_value, - sr->get_analyze_nth_packet())); - - */ } IfaceWrapper::~IfaceWrapper() @@ -275,6 +267,18 @@ IfaceWrapper::scrap() void IfaceWrapper::get_info(opmonlib::InfoCollector& ci, int level) { + + nicreaderinfo::EthStats s; + s.ipackets = m_iface_xstats.m_eth_stats.ipackets; + s.opackets = m_iface_xstats.m_eth_stats.opackets; + s.ibytes = m_iface_xstats.m_eth_stats.ibytes; + s.obytes = m_iface_xstats.m_eth_stats.obytes; + s.imissed = m_iface_xstats.m_eth_stats.imissed; + s.ierrors = m_iface_xstats.m_eth_stats.ierrors; + s.oerrors = m_iface_xstats.m_eth_stats.oerrors; + s.rx_nombuf = m_iface_xstats.m_eth_stats.rx_nombuf; + ci.add(s); + // Empty stat JSON placeholder nlohmann::json stat_json; @@ -290,11 +294,11 @@ IfaceWrapper::get_info(opmonlib::InfoCollector& ci, int level) m_iface_xstats.reset_counters(); // Convert JSON to NICReaderInfo struct - nicreaderinfo::Info nri; - nicreaderinfo::from_json(stat_json, nri); + nicreaderinfo::EthXStats xs; + nicreaderinfo::from_json(stat_json, xs); // Push to InfoCollector - ci.add(nri); + ci.add(xs); TLOG_DEBUG(TLVL_WORK_STEPS) << "opmonlib::InfoCollector object passed by reference to IfaceWrapper::get_info" << " -> Result looks like the following:\n" << ci.get_collected_infos(); @@ -310,13 +314,6 @@ IfaceWrapper::get_info(opmonlib::InfoCollector& ci, int level) ci.add(fmt::format("queue_{}", src_rx_q), queue_ci); } - /* FIXME: to be reactivated - for( const auto& [src_id, src_obj] : m_sources) { - opmonlib::InfoCollector src_ci; - src_obj->get_info(src_ci, level); - ci.add(fmt::format("src_{}", src_id), src_ci); - } - */ } void diff --git a/src/IfaceWrapper.hpp b/src/IfaceWrapper.hpp index d87c104..0286d03 100644 --- a/src/IfaceWrapper.hpp +++ b/src/IfaceWrapper.hpp @@ -129,8 +129,6 @@ class IfaceWrapper void garp_func(); std::atomic m_garps_sent{0}; - // std::unique_ptr m_accum_ptr; - // Lcore processor //template int rx_runner(void *arg __rte_unused); diff --git a/src/SourceConcept.hpp b/src/SourceConcept.hpp index e40cd26..469c2d5 100644 --- a/src/SourceConcept.hpp +++ b/src/SourceConcept.hpp @@ -36,7 +36,8 @@ class SourceConcept SourceConcept& operator=(SourceConcept&&) = delete; ///< SourceConcept is not move-assignable // virtual void init(const nlohmann::json& args) = 0; - virtual void set_sink(const std::string& sink_name) = 0; + virtual void set_sink(const std::string& sink_name, bool callback_mode) = 0; + virtual void acquire_callback() = 0; // virtual void conf(const nlohmann::json& args) = 0; // virtual void start(const nlohmann::json& args) = 0; // virtual void stop(const nlohmann::json& args) = 0; @@ -44,6 +45,12 @@ class SourceConcept virtual bool handle_payload(char* message, std::size_t size) = 0; + void set_sink_name(const std::string& sink_name) + { + m_sink_name = sink_name; + } + + std::string m_sink_name; }; } // namespace dpdklibs diff --git a/src/SourceModel.hpp b/src/SourceModel.hpp index 81644b4..304acf9 100644 --- a/src/SourceModel.hpp +++ b/src/SourceModel.hpp @@ -14,7 +14,9 @@ #include "iomanager/IOManager.hpp" #include "iomanager/Sender.hpp" #include "logging/Logging.hpp" + // #include "readoutlibs/utils/ReusableThread.hpp" +#include "readoutlibs/DataMoveCallbackRegistry.hpp" // #include // #include @@ -46,13 +48,34 @@ class SourceModel : public SourceConcept {} ~SourceModel() {} - void set_sink(const std::string& sink_name) override + void set_sink(const std::string& sink_name, bool callback_mode) override + { + m_callback_mode = callback_mode; + if (callback_mode) { + TLOG_DEBUG(5) << "Callback mode requested. Won't acquire iom sender!"; + } else { + if (m_sink_is_set) { + TLOG_DEBUG(5) << "SourceModel sink is already set in initialized!"; + } else { + m_sink_queue = get_iom_sender(sink_name); + m_sink_is_set = true; + } + } + } + + void acquire_callback() override { - if (m_sink_is_set) { - TLOG_DEBUG(5) << "SourceModel sink is already set in initialized!"; + if (m_callback_mode) { + if (m_callback_is_acquired) { + TLOG_DEBUG(5) << "SourceModel callback is already acquired!"; + } else { + // Getting DataMoveCBRegistry + auto dmcbr = readoutlibs::DataMoveCallbackRegistry::get(); + m_sink_callback = dmcbr->get_callback(inherited::m_sink_name); + m_callback_is_acquired = true; + } } else { - m_sink_queue = get_iom_sender(sink_name); - m_sink_is_set = true; + TLOG_DEBUG(5) << "Won't acquire callback, as IOM sink is set!"; } } @@ -64,19 +87,24 @@ class SourceModel : public SourceConcept if (push_out) { TargetPayloadType& target_payload = *reinterpret_cast(message); - if (!m_sink_queue->try_send(std::move(target_payload), iomanager::Sender::s_no_block)) { - //if(m_dropped_packets == 0 || m_dropped_packets%10000) { - // TLOG() << "Dropped data " << m_dropped_packets; - //} - ++m_dropped_packets; + + if (m_callback_mode) { + (*m_sink_callback)(std::move(target_payload)); + } else { + if (!m_sink_queue->try_send(std::move(target_payload), iomanager::Sender::s_no_block)) { + //if(m_dropped_packets == 0 || m_dropped_packets%10000) { + // TLOG() << "Dropped data " << m_dropped_packets; + //} + ++m_dropped_packets; + } } + } else { TargetPayloadType target_payload; uint32_t bytes_copied = 0; readoutlibs::buffer_copy(message, size, static_cast(&target_payload), bytes_copied, sizeof(target_payload)); } - return true; } @@ -87,11 +115,17 @@ class SourceModel : public SourceConcept } private: - - // Sink + // Sink internals + std::string m_sink_id; bool m_sink_is_set{ false }; std::shared_ptr m_sink_queue; + // Callback internals + bool m_callback_mode; + bool m_callback_is_acquired{ false }; + using sink_cb_t = std::shared_ptr>; + sink_cb_t m_sink_callback; + std::atomic m_dropped_packets{0}; }; diff --git a/src/detail/IfaceWrapper.hxx b/src/detail/IfaceWrapper.hxx index 25e527b..854bb02 100644 --- a/src/detail/IfaceWrapper.hxx +++ b/src/detail/IfaceWrapper.hxx @@ -46,13 +46,15 @@ IfaceWrapper::rx_runner(void *arg __rte_unused) { const uint16_t nb_rx = nb_rx_map[src_rx_q]; // We got packets from burst on this queue - if (nb_rx != 0) { + if (nb_rx != 0) [[likely]] { m_max_burst_size[src_rx_q] = std::max(nb_rx, m_max_burst_size[src_rx_q].load()); // ------- // Iterate on burst packets for (int i_b=0; i_bnb_segs > 1) { //TLOG_DEBUG(10) << "It appears a packet is spread across more than one receiving buffer;" @@ -74,15 +76,16 @@ IfaceWrapper::rx_runner(void *arg __rte_unused) { } continue; } +*/ // Check for UDP frames //if (pkt_type == RTE_PTYPE_L4_UDP) { // RS FIXME: doesn't work. Why? What is the PKT_TYPE in our ETH frames? // Check for JUMBO frames - if (q_bufs[i_b]->pkt_len > 7000) { // RS FIXME: do proper check on data length later + if (q_bufs[i_b]->pkt_len > 7000) [[likely]] { // RS FIXME: do proper check on data length later // Handle them! std::size_t data_len = q_bufs[i_b]->data_len; - if ( m_lcore_enable_flow.load() ) { + if ( m_lcore_enable_flow.load() ) [[likely]] { char* message = udp::get_udp_payload(q_bufs[i_b]); handle_eth_payload(src_rx_q, message, data_len); } diff --git a/src/udp/Utils.cpp b/src/udp/Utils.cpp index fa7fbd1..2c49eaa 100644 --- a/src/udp/Utils.cpp +++ b/src/udp/Utils.cpp @@ -273,246 +273,6 @@ get_rte_mbuf_str(const rte_mbuf* mbuf) noexcept return ss.str(); } -PacketInfoAccumulator::PacketInfoAccumulator(int64_t expected_seq_id_step, int64_t expected_timestamp_step, int64_t expected_size, int64_t process_nth_packet) - : m_expected_seq_id_step(expected_seq_id_step) - , m_expected_timestamp_step(expected_timestamp_step) - , m_expected_size(expected_size) - , m_process_nth_packet(process_nth_packet) -{ - // To be clear, the reason you'd set "expected_seq_id_step" to - // anything other than 1 or PacketInfoAccumulator::s_ignorable_value - // is to test that the code correctly handles unexpected sequence - // IDs - - if (expected_seq_id_step != PacketInfoAccumulator::s_ignorable_value) { - - for (int i = 0; i <= s_max_seq_id; ++i) { - m_next_expected_seq_id[i] = i + expected_seq_id_step; - if (m_next_expected_seq_id[i] > s_max_seq_id) { - m_next_expected_seq_id[i] -= (s_max_seq_id + 1); - } - } - } -} - -void -PacketInfoAccumulator::process_packet(const detdataformats::DAQEthHeader& daq_hdr, const int64_t data_len) -{ - - StreamUID unique_str_id(daq_hdr); - bool first_packet_in_stream = false; - - // n.b. C++ knows to add the unique_str_id as a key and a default-constructed ReceiverStats as a value if it's not already in the map - ReceiverStats& receiver_stats{ m_stream_stats_atomic[unique_str_id] }; - - if (receiver_stats.total_packets % m_process_nth_packet != 0) { - receiver_stats.total_packets++; - m_stream_last_seq_id[unique_str_id] = static_cast(daq_hdr.seq_id); - m_stream_last_timestamp[unique_str_id] = static_cast(daq_hdr.timestamp); - return; - } - - if (receiver_stats.total_packets == 0) { - - first_packet_in_stream = true; - m_stream_last_seq_id[unique_str_id] = static_cast(daq_hdr.seq_id); - m_stream_last_timestamp[unique_str_id] = static_cast(daq_hdr.timestamp); - - // TLOG() << "Found first packet in " << static_cast(unique_str_id); - } - - receiver_stats.total_packets++; - receiver_stats.packets_since_last_reset++; - - receiver_stats.bytes_since_last_reset += data_len; - - if (data_len > receiver_stats.max_packet_size) { - receiver_stats.max_packet_size = data_len; - } - - if (data_len < receiver_stats.min_packet_size) { - receiver_stats.min_packet_size = data_len; - } - - if (m_expected_size != s_ignorable_value && data_len != m_expected_size) { - receiver_stats.bad_sizes_since_last_reset++; - } - - if (m_expected_seq_id_step != s_ignorable_value && !first_packet_in_stream) { - - // seq_id is represented in an unsigned 64-bit int in - // DAQEthHeader, so if we don't convert it to a *signed* int - // before doing arithmetic, bad things will happen. - - auto seq_id = static_cast(daq_hdr.seq_id); - auto& last_seq_id = m_stream_last_seq_id[unique_str_id]; - - if (seq_id != m_next_expected_seq_id[last_seq_id]) { - receiver_stats.bad_seq_ids_since_last_reset++; - - int64_t seq_id_delta = seq_id - m_next_expected_seq_id[last_seq_id]; - - if (seq_id_delta < 0) { // e.g., we expected seq ID 4095 but got 0 instead - seq_id_delta += PacketInfoAccumulator::s_max_seq_id + 1; - } - - if (seq_id_delta > receiver_stats.max_seq_id_deviation.load()) { - receiver_stats.max_seq_id_deviation = seq_id_delta; - } - } - - last_seq_id = daq_hdr.seq_id; - } - - if (m_expected_timestamp_step != s_ignorable_value && !first_packet_in_stream) { - - auto timestamp = static_cast(daq_hdr.timestamp); - auto& last_timestamp = m_stream_last_timestamp[unique_str_id]; - - if (timestamp != last_timestamp + m_expected_timestamp_step) { - - int64_t timestamp_delta = daq_hdr.timestamp - (last_timestamp + m_expected_timestamp_step); - receiver_stats.bad_timestamps_since_last_reset++; - - if (timestamp_delta > receiver_stats.max_timestamp_deviation.load()) { - receiver_stats.max_timestamp_deviation = timestamp_delta; - } - } - - last_timestamp = timestamp; - } -} - -// dump() is more a function to test the development of -// PacketInfoAccumulator itself than a function users of -// PacketInfoAccumulator would call - -void -PacketInfoAccumulator::dump() -{ - - for (auto& stream_stat : m_stream_stats_atomic) { - std::stringstream info; - - auto& streamid = stream_stat.first; - auto& stats = stream_stat.second; - - TLOG() << static_cast(streamid) + "\n" + static_cast(stats); - } -} - -void -PacketInfoAccumulator::erase_stream_stats() -{ - - m_stream_stats_atomic.clear(); - m_stream_last_seq_id.clear(); -} - -std::map -PacketInfoAccumulator::get_and_reset_stream_stats() -{ - - auto snapshot_before_reset = m_stream_stats_atomic; - - for (auto& stream_stat : m_stream_stats_atomic) { - stream_stat.second.reset(); - } - - if (m_process_nth_packet != 1) { - for (auto& stream_stat : snapshot_before_reset) { - stream_stat.second.scale(m_process_nth_packet); - } - } - - return snapshot_before_reset; -} - -ReceiverStats::ReceiverStats(const ReceiverStats& rhs) - : total_packets(rhs.total_packets.load()) - , min_packet_size(rhs.min_packet_size.load()) - , max_packet_size(rhs.max_packet_size.load()) - , max_timestamp_deviation(rhs.max_timestamp_deviation.load()) - , max_seq_id_deviation(rhs.max_seq_id_deviation.load()) - , packets_since_last_reset(rhs.packets_since_last_reset.load()) - , bytes_since_last_reset(rhs.bytes_since_last_reset.load()) - , bad_timestamps_since_last_reset(rhs.bad_timestamps_since_last_reset.load()) - , bad_sizes_since_last_reset(rhs.bad_sizes_since_last_reset.load()) - , bad_seq_ids_since_last_reset(rhs.bad_seq_ids_since_last_reset.load()) -{ -} - -ReceiverStats& -ReceiverStats::operator=(const ReceiverStats& rhs) -{ - - total_packets = rhs.total_packets.load(); - min_packet_size = rhs.min_packet_size.load(); - max_packet_size = rhs.max_packet_size.load(); - max_timestamp_deviation = rhs.max_timestamp_deviation.load(); - max_seq_id_deviation = rhs.max_seq_id_deviation.load(); - packets_since_last_reset = rhs.packets_since_last_reset.load(); - bytes_since_last_reset = rhs.bytes_since_last_reset.load(); - bad_timestamps_since_last_reset = rhs.bad_timestamps_since_last_reset.load(); - bad_sizes_since_last_reset = rhs.bad_sizes_since_last_reset.load(); - bad_seq_ids_since_last_reset = rhs.bad_seq_ids_since_last_reset.load(); - - return *this; -} - -ReceiverStats::operator std::string() const -{ - - std::stringstream reportstr; - - reportstr << "total_packets == " << total_packets << "\n" - << "min_packet_size == " << min_packet_size << "\n" - << "max_packet_size == " << max_packet_size << "\n" - << "max_timestamp_deviation == " << max_timestamp_deviation << "\n" - << "max_seq_id_deviation == " << max_seq_id_deviation << "\n" - << "packets_since_last_reset == " << packets_since_last_reset << "\n" - << "bytes_since_last_reset == " << bytes_since_last_reset << "\n" - << "bad_timestamps_since_last_reset == " << bad_timestamps_since_last_reset << "\n" - << "bad_sizes_since_last_reset == " << bad_sizes_since_last_reset << "\n" - << "bad_seq_ids_since_last_reset == " << bad_seq_ids_since_last_reset << "\n"; - - return reportstr.str(); -} - -void -ReceiverStats::merge(const std::vector& stats_vector) -{ - - for (auto& stats : stats_vector) { - total_packets += stats.total_packets.load(); - min_packet_size = min_packet_size.load() < stats.min_packet_size.load() ? min_packet_size.load() : stats.min_packet_size.load(); - max_packet_size = max_packet_size.load() > stats.max_packet_size.load() ? max_packet_size.load() : stats.max_packet_size.load(); - max_seq_id_deviation = max_seq_id_deviation.load() > stats.max_seq_id_deviation.load() ? max_seq_id_deviation.load() : stats.max_seq_id_deviation.load(); - packets_since_last_reset += stats.packets_since_last_reset.load(); - bytes_since_last_reset += stats.bytes_since_last_reset.load(); - bad_sizes_since_last_reset += stats.bad_sizes_since_last_reset.load(); - bad_seq_ids_since_last_reset += stats.bad_seq_ids_since_last_reset.load(); - } -} - -receiverinfo::Info -DeriveFromReceiverStats(const ReceiverStats& receiver_stats, double time_per_report) -{ - - receiverinfo::Info derived_stats; - - derived_stats.total_packets = receiver_stats.total_packets.load(); - derived_stats.packets_per_second = receiver_stats.packets_since_last_reset.load() / time_per_report; - derived_stats.bytes_per_second = receiver_stats.bytes_since_last_reset.load() / time_per_report; - derived_stats.bad_seq_id_packets_per_second = receiver_stats.bad_seq_ids_since_last_reset.load() / time_per_report; - derived_stats.max_bad_seq_id_deviation = receiver_stats.max_seq_id_deviation.load(); - derived_stats.bad_size_packets_per_second = receiver_stats.bad_sizes_since_last_reset.load() / time_per_report; - derived_stats.max_packet_size = receiver_stats.max_packet_size.load(); - derived_stats.min_packet_size = receiver_stats.min_packet_size.load(); - - return derived_stats; -} - std::string get_opmon_string(const StreamUID& sid) { diff --git a/test/apps/test_stats_reporting.cxx b/test/apps/test_stats_reporting.cxx deleted file mode 100644 index 2b4f6ad..0000000 --- a/test/apps/test_stats_reporting.cxx +++ /dev/null @@ -1,147 +0,0 @@ -/** - * @file test_stats_reporting.cxx Basic checking that stats reporting works as expected - * - * This is part of the DUNE DAQ Application Framework, copyright 2020. - * Licensing/copyright details are in the COPYING file that you should have - * received with this code. - */ - -#include "dpdklibs/EALSetup.hpp" -#include "dpdklibs/udp/PacketCtor.hpp" -#include "dpdklibs/udp/Utils.hpp" - -#include "detdataformats/DAQEthHeader.hpp" -#include "logging/Logging.hpp" - -#include "rte_cycles.h" -#include "rte_dev.h" -#include "rte_eal.h" -#include "rte_ethdev.h" -#include "rte_lcore.h" -#include "rte_mbuf.h" - -#include "CLI/App.hpp" -#include "CLI/Config.hpp" -#include "CLI/Formatter.hpp" - -#include -#include -#include - - -namespace { - - // Only 8 and above works - constexpr int burst_size = 256; - - constexpr int buffer_size = 9800; // Same number as in EALSetup.hpp - - std::string dst_mac_addr = ""; - int payload_bytes = 0; // payload past the DAQEthHeader - -} - -using namespace dunedaq; -using namespace dpdklibs; -using namespace udp; - -int main(int argc, char* argv[]) { - - CLI::App app{"test stats reporting"}; - - const std::string default_mac_address = "6c:fe:54:47:98:20"; - - app.add_option("--dst-mac", dst_mac_addr, "Destination MAC address (default " + default_mac_address + ")"); - - if (dst_mac_addr == "") { - dst_mac_addr = default_mac_address; - } - - CLI11_PARSE(app, argc, argv); - - argc = 1; // Set to 1 so rte_eal_init ignores all the CLI-parsed arguments - - int retval = rte_eal_init(argc, argv); - if (retval < 0) { - rte_exit(EXIT_FAILURE, "ERROR: EAL initialization failed, info: %s\n", strerror(abs(retval))); - } - - // Check that there is an even number of ports to send/receive on - auto nb_ports = rte_eth_dev_count_avail(); - TLOG() << "There are " << nb_ports << " ethernet ports available out of a total of " << rte_eth_dev_count_total(); - if (nb_ports == 0) { - rte_exit(EXIT_FAILURE, "ERROR: 0 ethernet ports are available. This can be caused either by someone else currently\nusing dpdk-based code or the necessary drivers not being bound to the NICs\n(see https://github.com/DUNE-DAQ/dpdklibs#readme for more)\n"); - } - - const uint16_t n_tx_qs = 1; - const uint16_t n_rx_qs = 0; - - struct rte_mempool *mbuf_pool = rte_pktmbuf_pool_create((std::string("MBUF_POOL")).c_str(), NUM_MBUFS * rte_eth_dev_count_avail(), - MBUF_CACHE_SIZE, 0, buffer_size, rte_socket_id()); - - - if (mbuf_pool == NULL) { - rte_exit(EXIT_FAILURE, "ERROR: call to rte_pktmbuf_pool_create failed, info: %s\n", rte_strerror(rte_errno)); - } - - struct rte_mbuf** bufs = (rte_mbuf**) malloc(sizeof(struct rte_mbuf*) * burst_size); - if (bufs == NULL) { - TLOG(TLVL_ERROR) << "Failure trying to acquire memory for buffers; exiting..."; - std::exit(1); - } - - retval = rte_pktmbuf_alloc_bulk(mbuf_pool, bufs, burst_size); - - if (retval != 0) { - rte_exit(EXIT_FAILURE, "ERROR: call to rte_pktmbuf_alloc_bulk failed, info: %s\n", strerror(abs(retval))); - } - - int iface = 0; - construct_packets_for_burst(iface, dst_mac_addr, payload_bytes, burst_size, bufs); - - rte_mbuf_sanity_check(bufs[0], 1); - - char* udp_payload = udp::get_udp_payload(bufs[0]); - auto daq_hdr = reinterpret_cast(udp_payload); - - PacketInfoAccumulator processor; - processor.process_packet(*daq_hdr, bufs[0]->data_len); - processor.process_packet(*daq_hdr, bufs[0]->data_len); - - TLOG() << "Dump after processing two identical packets while ignoring size, timestamp and sequence ID: "; - processor.dump(); - - // Expected step in sequence ID and timestamp is zero, expected size - // is bufs[0]->data_len - - PacketInfoAccumulator processor2(0, bufs[0]->data_len); - TLOG() << "Dump after processing two identical packets while expecting no change in the sequence ID and timestamp, and setting a (correct) expectation about the packet sizes: "; - processor2.process_packet(*daq_hdr, bufs[0]->data_len); - processor2.process_packet(*daq_hdr, bufs[0]->data_len); - processor2.dump(); - - // Expected step in sequence ID and timestamp is 1, expected size in - // 999 bytes. All three expectations will fail with our identical, - // constructed packets. - - TLOG() << "Dump after processing two identical packets while incorrectly expecting the sequence ID and timestamp to increment by 1, and incorrectly expecting the packet size to be 999 bytes: "; - - PacketInfoAccumulator processor3(1, 999); - processor3.process_packet(*daq_hdr, bufs[0]->data_len); - processor3.process_packet(*daq_hdr, bufs[0]->data_len); - processor3.dump(); - - // Make the constructed packet appear as if it's from a different stream - - daq_hdr->stream_id++ ; - - processor3.process_packet(*daq_hdr, bufs[0]->data_len); - processor3.process_packet(*daq_hdr, bufs[0]->data_len); - - TLOG() << "Dump after adding two more packets, where I've changed the stream. Should get reports for two different streams now: "; - processor3.dump(); - - rte_eal_cleanup(); - - return 0; -} diff --git a/unittest/Utils_test.cxx b/unittest/Utils_test.cxx index f95192f..ce3d442 100644 --- a/unittest/Utils_test.cxx +++ b/unittest/Utils_test.cxx @@ -114,128 +114,5 @@ BOOST_AUTO_TEST_CASE(GetEthernetPackets) BOOST_CHECK_NO_THROW(udp::get_ethernet_packets(buffervec)); // Just a quick check that we got things to return to normal } - - -BOOST_AUTO_TEST_CASE(TestReceiverStats) -{ - udp::ReceiverStats stats; - constexpr auto biggest_int = std::numeric_limits::max(); - - stats.total_packets = biggest_int; - stats.min_packet_size = biggest_int; - stats.max_packet_size = biggest_int; - stats.max_seq_id_deviation = biggest_int; - - stats.packets_since_last_reset = biggest_int; - stats.bytes_since_last_reset = biggest_int; - stats.bad_sizes_since_last_reset = biggest_int; - stats.bad_seq_ids_since_last_reset = biggest_int; - - receiverinfo::Info derived = DeriveFromReceiverStats(stats, 2.0); // Pretend we're sampling every two seconds - - BOOST_REQUIRE_EQUAL(derived.total_packets, stats.total_packets); - BOOST_REQUIRE_EQUAL(derived.min_packet_size, stats.min_packet_size); - BOOST_REQUIRE_EQUAL(derived.max_packet_size, stats.max_packet_size); - BOOST_REQUIRE_EQUAL(derived.max_bad_seq_id_deviation, stats.max_seq_id_deviation); - BOOST_REQUIRE_EQUAL(derived.max_packet_size, stats.max_packet_size); - BOOST_REQUIRE_EQUAL(derived.min_packet_size, stats.min_packet_size); - - BOOST_REQUIRE_EQUAL(derived.packets_per_second * 2.0, stats.packets_since_last_reset); - BOOST_REQUIRE_EQUAL(derived.bytes_per_second * 2.0, stats.bytes_since_last_reset); - BOOST_REQUIRE_EQUAL(derived.bad_seq_id_packets_per_second * 2.0, stats.bad_seq_ids_since_last_reset); - BOOST_REQUIRE_EQUAL(derived.bad_size_packets_per_second * 2.0, stats.bad_sizes_since_last_reset); - - stats.reset(); - derived = DeriveFromReceiverStats(stats, 2.0); - - BOOST_REQUIRE_EQUAL(derived.packets_per_second, 0); - BOOST_REQUIRE_EQUAL(derived.bytes_per_second, 0); - BOOST_REQUIRE_EQUAL(derived.bad_ts_packets_per_second, 0); - BOOST_REQUIRE_EQUAL(derived.bad_seq_id_packets_per_second, 0); - BOOST_REQUIRE_EQUAL(derived.bad_size_packets_per_second, 0); - - udp::ReceiverStats stats1; - udp::ReceiverStats stats2; - - stats1.total_packets = 1; - stats2.total_packets = 2; - - stats1.min_packet_size = 100; - stats2.min_packet_size = 200; - - stats1.max_packet_size = 300; - stats2.max_packet_size = 400; - - stats1.max_seq_id_deviation = 17; - stats2.max_seq_id_deviation = 0; - - stats1.packets_since_last_reset = 0; - stats2.packets_since_last_reset = 314; - - stats1.bytes_since_last_reset = 12; - stats2.bytes_since_last_reset = 13; - - - stats1.bad_sizes_since_last_reset = 32; - stats2.bad_sizes_since_last_reset = 33; - - stats1.bad_seq_ids_since_last_reset = 52; - stats2.bad_seq_ids_since_last_reset = 53; - - std::vector stats_vec; - stats_vec.emplace_back( stats1 ); - stats_vec.emplace_back( stats2 ); - - udp::ReceiverStats result; - result.merge(stats_vec); - - BOOST_REQUIRE_EQUAL(result.total_packets, 3); - BOOST_REQUIRE_EQUAL(result.min_packet_size, 100); - BOOST_REQUIRE_EQUAL(result.max_packet_size, 400); - BOOST_REQUIRE_EQUAL(result.max_seq_id_deviation, 17); - BOOST_REQUIRE_EQUAL(result.packets_since_last_reset, 314); - BOOST_REQUIRE_EQUAL(result.bytes_since_last_reset, 25); - BOOST_REQUIRE_EQUAL(result.bad_sizes_since_last_reset, 65); - BOOST_REQUIRE_EQUAL(result.bad_seq_ids_since_last_reset, 105); - - std::map am; - std::map am2; - // std::unordered_map uam; - - udp::StreamUID mystream; - mystream.det_id = 3; - mystream.crate_id = 1; - mystream.slot_id = 2; - mystream.stream_id = 64; - - am[mystream]; // = udp::ReceiverStats(); - am2[mystream]; // = udp::ReceiverStats(); - // uam[mystream] = udp::ReceiverStats(); - - am[mystream].total_packets = 49; - am2[mystream].total_packets = 149; - // uam[mystream].total_packets = 149; - - BOOST_REQUIRE_EQUAL(am[mystream].total_packets + 100, am2[mystream].total_packets); - BOOST_REQUIRE_EQUAL(am[mystream].total_packets + 100 + am2[mystream].total_packets, 298); - - - am[mystream].packets_since_last_reset = 10; - am[mystream].reset(); - BOOST_REQUIRE_EQUAL(am[mystream].packets_since_last_reset, 0); - - std::map am3 = am; - - udp::ReceiverStats default_constructed; - BOOST_REQUIRE_EQUAL(default_constructed.max_packet_size, std::numeric_limits::min()); - - default_constructed.max_packet_size = std::numeric_limits::max() ; - - udp::ReceiverStats assigned = default_constructed; - BOOST_REQUIRE_EQUAL(assigned.max_packet_size, std::numeric_limits::max()); - -} - - BOOST_AUTO_TEST_SUITE_END()