diff --git a/CMakeLists.txt b/CMakeLists.txt index 1b2bede..5b5c76a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -24,6 +24,8 @@ find_package(fmt REQUIRED) daq_codegen( nicsender.jsonnet nicreader.jsonnet TEMPLATES Structs.hpp.j2 Nljs.hpp.j2 ) +daq_protobuf_codegen( opmon/*.proto ) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx2") ############################################################################## diff --git a/plugins/DPDKReaderModule.cpp b/plugins/DPDKReaderModule.cpp index faea5a9..8f39586 100644 --- a/plugins/DPDKReaderModule.cpp +++ b/plugins/DPDKReaderModule.cpp @@ -42,6 +42,7 @@ #include #include + /** * @brief Name used by TRACE TLOG calls from this source file */ @@ -93,9 +94,9 @@ DPDKReaderModule::init(const std::shared_ptr mcfg ) auto mdal = mcfg->module(get_name()); m_cfg = mcfg; if (mdal->get_outputs().empty()) { - auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE, "No outputs defined for NIC reader in configuration."); - ers::fatal(err); - throw err; + auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE, "No outputs defined for NIC reader in configuration."); + ers::fatal(err); + throw err; } for (auto con : mdal->get_outputs()) { @@ -118,7 +119,8 @@ DPDKReaderModule::init(const std::shared_ptr mcfg ) callback_mode = true; } - m_sources[queue->get_source_id()] = createSourceModel(queue->UID(), callback_mode); + auto ptr = m_sources[queue->get_source_id()] = createSourceModel(queue->UID(), callback_mode); + register_node( queue->UID(), ptr ); //m_sources[queue->get_source_id()]->init(); } } @@ -235,11 +237,12 @@ DPDKReaderModule::do_configure(const data_t& /*args*/) } uint iface_id = m_mac_to_id_map[net_device->get_mac_address()]; - m_ifaces[iface_id] = std::make_unique(iface_id, dpdk_receiver, nw_senders, m_sources, m_run_marker); - m_ifaces[iface_id]->allocate_mbufs(); - m_ifaces[iface_id]->setup_interface(); - m_ifaces[iface_id]->setup_flow_steering(); - m_ifaces[iface_id]->setup_xstats(); + auto ptr = m_ifaces[iface_id] = std::make_shared(iface_id, dpdk_receiver, nw_senders, m_sources, m_run_marker); + register_node( fmt::format("interface-{}", iface_id), ptr); + ptr->allocate_mbufs(); + ptr->setup_interface(); + ptr->setup_flow_steering(); + ptr->setup_xstats(); } @@ -298,13 +301,6 @@ DPDKReaderModule::do_scrap(const data_t&) } } -// void -// DPDKReaderModule::get_info(opmonlib::InfoCollector& ci, int level) -// { -// for (auto& [iface_id, iface] : m_ifaces) { -// iface->get_info(ci, level); -// } -// } void DPDKReaderModule::set_running(bool should_run) diff --git a/plugins/DPDKReaderModule.hpp b/plugins/DPDKReaderModule.hpp index 0cfe56c..7928c7e 100644 --- a/plugins/DPDKReaderModule.hpp +++ b/plugins/DPDKReaderModule.hpp @@ -51,6 +51,8 @@ class DPDKReaderModule : public dunedaq::appfwk::DAQModule void init(const std::shared_ptr mfcg) override; + + private: // Types //using module_conf_t = dunedaq::dpdklibs::nicreader::Conf; @@ -61,9 +63,6 @@ class DPDKReaderModule : public dunedaq::appfwk::DAQModule void do_stop(const data_t&); void do_scrap(const data_t&); - #warning MISSING OPMON - //void get_info(opmonlib::InfoCollector& ci, int level); - // Internals std::shared_ptr m_cfg; @@ -74,12 +73,18 @@ class DPDKReaderModule : public dunedaq::appfwk::DAQModule // Interfaces (logical ID, MAC) -> IfaceWrapper std::map m_mac_to_id_map; std::map m_pci_to_id_map; - std::map> m_ifaces; + std::map> m_ifaces; // Sinks (SourceConcepts) - using sid_to_source_map_t = std::map>; + using sid_to_source_map_t = std::map>; sid_to_source_map_t m_sources; + // Comment of Monitoring + // Both SourceConcepts and IfaceWrappers are Monitorable Objecets + // Both quantities are available for the ReaderModule and both are registered. + // There is no loop because the Sources passed to the Wrappers are not registered in the wrapper + + }; } // namespace dunedaq::dpdklibs diff --git a/schema/dpdklibs/nicreaderinfo.jsonnet b/schema/dpdklibs/nicreaderinfo.jsonnet deleted file mode 100644 index 7acdcc8..0000000 --- a/schema/dpdklibs/nicreaderinfo.jsonnet +++ /dev/null @@ -1,98 +0,0 @@ -// This is the application info schema used by the card reader module. -// It describes the information object structure passed by the application -// for operational monitoring - -local moo = import "moo.jsonnet"; -local s = moo.oschema.schema("dunedaq.dpdklibs.nicreaderinfo"); - -local info = { - // uint8 : s.number("uint8", "f8", doc="A double of 8 bytes"), - uint8 : s.number("uint8", "u8", doc="An unsigned of 8 bytes"), - uint4 : s.number("uint4", "u4", doc="An unsigned of 8 bytes"), - // TODO: rename to - // info: s.record("XStats", [ - ethinfo: s.record("EthStats", [ - s.field("ipackets", self.uint8, 0, doc="Received packets"), - s.field("opackets", self.uint8, 0, doc="Transmitted packets"), - s.field("ibytes", self.uint8, 0, doc="Received bytes"), - s.field("obytes", self.uint8, 0, doc="Transmitted bytes"), - s.field("imissed", self.uint8, 0, doc="Missed packets"), - s.field("ierrors", self.uint8, 0, doc="Receiver errors"), - s.field("oerrors", self.uint8, 0, doc="Output Errors"), - s.field("rx_nombuf", self.uint8, 0, doc="Number of Rx mbuf allocation failures"), - - ], doc="NIC Stats information"), - - info: s.record("EthXStats", [ - - s.field("groups_sent", self.uint8, 0, doc="Number of groups of frames sent"), - s.field("total_groups_sent", self.uint8, 0, doc="Total groups of frames sent"), - s.field("rx_good_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_good_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_missed_errors", self.uint8, 0, doc="New since last poll"), - s.field("rx_errors", self.uint8, 0, doc="New since last poll"), - s.field("rx_unicast_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_multicast_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_broadcast_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_dropped_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_unknown_protocol_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_crc_errors", self.uint8, 0, doc="New since last poll"), - s.field("rx_illegal_byte_errors", self.uint8, 0, doc="New since last poll"), - s.field("rx_error_bytes", self.uint8, 0, doc="New since last poll"), - s.field("mac_local_errors", self.uint8, 0, doc="New since last poll"), - s.field("mac_remote_errors", self.uint8, 0, doc="New since last poll"), - s.field("rx_len_errors", self.uint8, 0, doc="New since last poll"), - s.field("rx_xon_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_xoff_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_size_64_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_size_65_to_127_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_size_128_to_255_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_size_256_to_511_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_size_512_to_1023_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_size_1024_to_1522_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_size_1523_to_max_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_undersized_errors", self.uint8, 0, doc="New since last poll"), - s.field("rx_oversize_errors", self.uint8, 0, doc="New since last poll"), - s.field("rx_mac_short_pkt_dropped", self.uint8, 0, doc="New since last poll"), - s.field("rx_fragmented_errors", self.uint8, 0, doc="New since last poll"), - s.field("rx_jabber_errors", self.uint8, 0, doc="New since last poll"), - s.field("x", self.uint8, 0, doc="New since last poll"), - s.field("rx_q1_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_q2_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_q3_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_q4_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_q5_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_q6_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_q7_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_q8_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_q9_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_q10_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_q11_packets", self.uint8, 0, doc="New since last poll"), - s.field("rx_q0_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_q1_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_q2_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_q3_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_q4_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_q5_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_q6_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_q7_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_q8_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_q9_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_q10_bytes", self.uint8, 0, doc="New since last poll"), - s.field("rx_q11_bytes", self.uint8, 0, doc="New since last poll"), - ], doc="NIC Extended Stats information"), - -queue: s.record("QueueStats", [ - s.field("packets_received", self.uint8, 0, doc="Packets received"), - s.field("bytes_received", self.uint8, 0, doc="Bytes received"), - s.field("full_rx_burst", self.uint8, 0, doc="Bytes received"), - s.field("max_burst_size", self.uint4, 0, doc="Bytes received"), -], doc="Queue Statistics"), - - source: s.record("SourceStats",[ - s.field("dropped_frames", self.uint4, 0, doc="Dropped frames"), - ], doc="Source Statistics"), - -}; - -moo.oschema.sort_select(info) diff --git a/schema/dpdklibs/opmon/IfaceWrapper.proto b/schema/dpdklibs/opmon/IfaceWrapper.proto new file mode 100644 index 0000000..6260aee --- /dev/null +++ b/schema/dpdklibs/opmon/IfaceWrapper.proto @@ -0,0 +1,98 @@ +syntax = "proto3"; + +package dunedaq.dpdklibs.opmon; + +message EthStats { + + uint64 ipackets = 1; // Received packets + uint64 opackets = 2; // Transmitted packets + uint64 ibytes = 10; // Received bytes + uint64 obytes = 11; // Transmitted bytes + uint64 imissed = 20; // Missed packets + uint64 ierrors = 21; // Received errors + uint64 oerrors = 22; // Output Errors + uint64 rx_nombuf = 30; // Number of Rx mbuf allocation failures +} + + +message QueueInfo { + + uint64 packets_received = 1; + uint64 bytes_received = 2; + uint64 full_rx_burst = 3; + uint32 max_burst_size = 4; + +} + +message QueueEthXStats { + + uint64 packets = 1; + uint64 bytes = 2; + uint64 errors = 3; + +} + + +message EthXStatsInfo { + + uint64 groups_sent = 1; + uint64 total_groups_sent = 2; + uint64 rx_good_packets = 3; + uint64 rx_good_bytes = 4; + uint64 rx_dropped_packets = 5; + +uint64 rx_unicast_packets = 11; + uint64 rx_multicast_packets = 12; + uint64 rx_broadcast_packets = 13; + uint64 rx_unknown_protocol_packets = 14; + uint64 rx_xon_packets = 15; + uint64 rx_xoff_packets = 16; + + uint64 rx_size_64_packets = 49; + uint64 rx_size_65_to_127_packets = 50; + uint64 rx_size_128_to_255_packets = 51; + uint64 rx_size_256_to_511_packets = 52; + uint64 rx_size_512_to_1023_packets = 53; + uint64 rx_size_1024_to_1522_packets = 54; + uint64 rx_size_1523_to_max_packets = 55; + uint64 rx_mac_short_pkt_dropped = 60; + + uint64 tx_good_packets = 100; + uint64 tx_good_bytes = 101; + uint64 tx_unicast_packets = 102; + uint64 tx_multicast_packets = 103; + uint64 tx_broadcast_packets = 104; + uint64 tx_dropped_packets = 105; + uint64 tx_link_down_dropped = 106; + uint64 tx_xon_packets = 107; + uint64 tx_xoff_packets = 108; + + uint64 tx_size_64_packets = 149; + uint64 tx_size_65_to_127_packets = 150; + uint64 tx_size_128_to_255_packets = 151; + uint64 tx_size_256_to_511_packets = 152; + uint64 tx_size_512_to_1023_packets = 153; + uint64 tx_size_1024_to_1522_packets = 154; + uint64 tx_size_1523_to_max_packets = 155; + +} + +message EthXStatsErrors { + + uint64 rx_missed_errors = 1; + uint64 rx_errors = 2; + uint64 rx_error_bytes = 3; + uint64 rx_mbuf_allocation_errors = 4; + uint64 rx_crc_errors = 5; + uint64 rx_illegal_byte_errors = 6; + uint64 rx_jabber_errors = 7; + uint64 mac_local_errors = 20; + uint64 mac_remote_errors = 21; + uint64 rx_len_errors = 22; + uint64 rx_undersized_errors = 23; + uint64 rx_oversize_errors = 24; + uint64 rx_fragmented_errors = 25; + + uint64 tx_errors = 100; + +} \ No newline at end of file diff --git a/schema/dpdklibs/opmon/SourceModel.proto b/schema/dpdklibs/opmon/SourceModel.proto new file mode 100644 index 0000000..cd72989 --- /dev/null +++ b/schema/dpdklibs/opmon/SourceModel.proto @@ -0,0 +1,9 @@ +syntax = "proto3"; + +package dunedaq.dpdklibs.opmon; + +message SourceInfo { + + uint32 dropped_frames = 1; + +} \ No newline at end of file diff --git a/schema/dpdklibs/receiverinfo.jsonnet b/schema/dpdklibs/receiverinfo.jsonnet deleted file mode 100644 index 904fcde..0000000 --- a/schema/dpdklibs/receiverinfo.jsonnet +++ /dev/null @@ -1,32 +0,0 @@ -// This describes the variables we want reported to opmon, etc., which -// give an overview of the rate and quality of packets being received -// using the tools in this package - -local moo = import "moo.jsonnet"; -local s = moo.oschema.schema("dunedaq.dpdklibs.receiverinfo"); - -local info = { - int4 : s.number( "int4", "i4", doc="A signed integer of 4 bytes"), - uint4 : s.number( "uint4", "u4", doc="An unsigned integer of 4 bytes"), - int8 : s.number( "int8", "i8", doc="A signed integer of 8 bytes"), - uint8 : s.number( "uint8", "u8", doc="An unsigned integer of 8 bytes"), - float4 : s.number( "float4", "f4", doc="A float of 4 bytes"), - double8 : s.number( "double8", "f8", doc="A double of 8 bytes"), - boolean: s.boolean( "Boolean", doc="A boolean"), - string: s.string( "String", doc="A string"), - - info: s.record("Info", [ - s.field("total_packets", self.int8, -999, doc="Total number of packets received"), - s.field("packets_per_second", self.double8, -999, doc="Packets/s"), - s.field("bytes_per_second", self.double8, -999, doc="Bytes/s"), - s.field("bad_ts_packets_per_second", self.double8, -999, doc="Packets/s with an unexpected timestamp"), - s.field("max_bad_ts_deviation", self.int8, -999, doc="Largest difference found between expected and actual packet timestamp"), - s.field("bad_seq_id_packets_per_second", self.double8, -999, doc="Packets/s with an unexpected sequence ID"), - s.field("max_bad_seq_id_deviation", self.int8, -999, doc="Largest difference found between expected and actual packet sequence ID"), - s.field("bad_size_packets_per_second", self.double8, -999, doc="Packets/s with an unexpected payload size"), - s.field("max_packet_size", self.int8, -999, doc="Maximum bytes for a packet"), - s.field("min_packet_size", self.int8, -999, doc="Minimum bytes for a packet"), - ], doc="Packet receiver info") -}; - -moo.oschema.sort_select(info) diff --git a/src/CreateSource.hpp b/src/CreateSource.hpp index c0b02b4..c3ab732 100644 --- a/src/CreateSource.hpp +++ b/src/CreateSource.hpp @@ -25,7 +25,7 @@ DUNE_DAQ_TYPESTRING(dunedaq::fdreadoutlibs::types::TDEFrameTypeAdapter, "TDEFram namespace dpdklibs { -std::unique_ptr +std::shared_ptr createSourceModel(const std::string& conn_uid, bool callback_mode) { auto datatypes = dunedaq::iomanager::IOManager::get()->get_datatypes(conn_uid); @@ -39,7 +39,7 @@ createSourceModel(const std::string& conn_uid, bool callback_mode) if (raw_dt.find("WIBEthFrame") != std::string::npos) { // Create Model - auto source_model = std::make_unique>(); + auto source_model = std::make_shared>(); // For callback acquisition later (lazy) source_model->set_sink_name(conn_uid); @@ -64,7 +64,7 @@ createSourceModel(const std::string& conn_uid, bool callback_mode) } else if (raw_dt.find("TDEFrame") != std::string::npos) { // WIB2 specific char arrays - auto source_model = std::make_unique>(); + auto source_model = std::make_shared>(); source_model->set_sink_name(conn_uid); source_model->set_sink(conn_uid, callback_mode); //auto& parser = source_model->get_parser(); diff --git a/src/IfaceWrapper.cpp b/src/IfaceWrapper.cpp index d421410..fc9dd76 100644 --- a/src/IfaceWrapper.cpp +++ b/src/IfaceWrapper.cpp @@ -8,6 +8,8 @@ #include "logging/Logging.hpp" #include "datahandlinglibs/DataHandlingIssues.hpp" +#include "opmonlib/Utils.hpp" + #include "dpdklibs/Issues.hpp" #include "dpdklibs/nicreader/Structs.hpp" @@ -32,9 +34,12 @@ // #include "appmodel/NICStatsConf.hpp" // #include "appmodel/EthStreamParameters.hpp" +#include "dpdklibs/opmon/IfaceWrapper.pb.h" + #include #include #include +#include /** * @brief TRACE debug levels used in this source file @@ -299,57 +304,80 @@ IfaceWrapper::scrap() //----------------------------------------------------------------------------- -// void -// IfaceWrapper::get_info(opmonlib::InfoCollector& ci, int level) -// { - -// nicreaderinfo::EthStats s; -// s.ipackets = m_iface_xstats.m_eth_stats.ipackets; -// s.opackets = m_iface_xstats.m_eth_stats.opackets; -// s.ibytes = m_iface_xstats.m_eth_stats.ibytes; -// s.obytes = m_iface_xstats.m_eth_stats.obytes; -// s.imissed = m_iface_xstats.m_eth_stats.imissed; -// s.ierrors = m_iface_xstats.m_eth_stats.ierrors; -// s.oerrors = m_iface_xstats.m_eth_stats.oerrors; -// s.rx_nombuf = m_iface_xstats.m_eth_stats.rx_nombuf; -// ci.add(s); - -// // Empty stat JSON placeholder -// nlohmann::json stat_json; - -// // Poll stats from HW -// m_iface_xstats.poll(); - -// // Build JSON from values -// for (int i = 0; i < m_iface_xstats.m_len; ++i) { -// stat_json[m_iface_xstats.m_xstats_names[i].name] = m_iface_xstats.m_xstats_values[i]; -// } - -// // Reset HW counters -// m_iface_xstats.reset_counters(); - -// // Convert JSON to NICReaderInfo struct -// nicreaderinfo::EthXStats xs; -// nicreaderinfo::from_json(stat_json, xs); - -// // Push to InfoCollector -// ci.add(xs); -// TLOG_DEBUG(TLVL_WORK_STEPS) << "opmonlib::InfoCollector object passed by reference to IfaceWrapper::get_info" -// << " -> Result looks like the following:\n" << ci.get_collected_infos(); - -// for( const auto& [src_rx_q,_] : m_num_frames_rxq) { -// nicreaderinfo::QueueStats qs; -// qs.packets_received = m_num_frames_rxq[src_rx_q].load(); -// qs.bytes_received = m_num_bytes_rxq[src_rx_q].load(); -// qs.full_rx_burst = m_num_full_bursts[src_rx_q].load(); -// qs.max_burst_size = m_max_burst_size[src_rx_q].exchange(0); - -// opmonlib::InfoCollector queue_ci; -// queue_ci.add(qs); - -// ci.add(fmt::format("queue_{}", src_rx_q), queue_ci); -// } -// } +void +IfaceWrapper::generate_opmon_data() { + + opmon::EthStats s; + s.set_ipackets( m_iface_xstats.m_eth_stats.ipackets ); + s.set_opackets( m_iface_xstats.m_eth_stats.opackets ); + s.set_ibytes( m_iface_xstats.m_eth_stats.ibytes ); + s.set_obytes( m_iface_xstats.m_eth_stats.obytes ); + s.set_imissed( m_iface_xstats.m_eth_stats.imissed ); + s.set_ierrors( m_iface_xstats.m_eth_stats.ierrors ); + s.set_oerrors( m_iface_xstats.m_eth_stats.oerrors ); + s.set_rx_nombuf( m_iface_xstats.m_eth_stats.rx_nombuf ); + publish( std::move(s) ); + + // Poll stats from HW + m_iface_xstats.poll(); + + // loop over all the xstats information + opmon::EthXStatsInfo xinfos; + opmon::EthXStatsErrors xerrs; + std::map xq; + + for (int i = 0; i < m_iface_xstats.m_len; ++i) { + + std::string name(m_iface_xstats.m_xstats_names[i].name); + + // first we select the info from the queue + static std::regex queue_regex(R"((rx|tx)_q(\d+)_([^_]+))"); + std::smatch match; + + if ( std::regex_match(name, match, queue_regex) ) { + auto queue_name = match[1].str() + '-' + match[2].str(); + auto & entry = xq[queue_name]; + try { + opmonlib::set_value( entry, match[3], m_iface_xstats.m_xstats_values[i] ); + } catch ( const ers::Issue & e ) { + ers::warning( MetricPublishFailed( ERS_HERE, name, e) ); + } + continue; + } + + google::protobuf::Message * metric_p = nullptr; + static std::regex err_regex(R"(.+error.*)"); + if ( std::regex_match( name, err_regex ) ) metric_p = & xerrs; + else metric_p = & xinfos; + + try { + opmonlib::set_value(*metric_p, name, m_iface_xstats.m_xstats_values[i]); + } catch ( const ers::Issue & e ) { + ers::warning( MetricPublishFailed( ERS_HERE, name, e) ); + } + + } // loop over xstats + + // Reset HW counters + m_iface_xstats.reset_counters(); + + // finally we publish the information + publish( std::move(xinfos) ); + publish( std::move(xerrs) ); + for ( auto [id, stat] : xq ) { + publish( std::move(stat), {{"queue", id}} ); + } + + for( const auto& [src_rx_q,_] : m_num_frames_rxq) { + opmon::QueueInfo i; + i.set_packets_received( m_num_frames_rxq[src_rx_q].load() ); + i.set_bytes_received( m_num_bytes_rxq[src_rx_q].load() ); + i.set_full_rx_burst( m_num_full_bursts[src_rx_q].load() ); + i.set_max_burst_size( m_max_burst_size[src_rx_q].exchange(0) ); + + publish( std::move(i), {{"queue", std::to_string(src_rx_q)}} ); + } +} //----------------------------------------------------------------------------- void diff --git a/src/IfaceWrapper.hpp b/src/IfaceWrapper.hpp index 6a5b624..c620bb5 100644 --- a/src/IfaceWrapper.hpp +++ b/src/IfaceWrapper.hpp @@ -27,6 +27,8 @@ #include +#include + #include #include #include @@ -36,14 +38,23 @@ #include namespace dunedaq { + + ERS_DECLARE_ISSUE( dpdklibs, + MetricPublishFailed, + "Field " << field << " was not reported", + ((std::string)field) + ) + namespace dpdklibs { -class IfaceWrapper + class IfaceWrapper : public opmonlib::MonitorableObject { public: - using sid_to_source_map_t = std::map>; + using sid_to_source_map_t = std::map>; - IfaceWrapper(uint iface_id, const appmodel::DPDKReceiver* receiver, const std::vector& senders, sid_to_source_map_t& sources, std::atomic& run_marker); + IfaceWrapper(uint iface_id, const appmodel::DPDKReceiver* receiver, + const std::vector& senders, + sid_to_source_map_t& sources, std::atomic& run_marker); ~IfaceWrapper(); IfaceWrapper(const IfaceWrapper&) = delete; ///< IfaceWrapper is not copy-constructible @@ -55,8 +66,7 @@ class IfaceWrapper void start(); void stop(); - #warning MISSING OPMON - // void get_info(opmonlib::InfoCollector& ci, int level); + void generate_opmon_data() override; void allocate_mbufs(); void setup_interface(); diff --git a/src/SourceConcept.hpp b/src/SourceConcept.hpp index 469c2d5..78fa850 100644 --- a/src/SourceConcept.hpp +++ b/src/SourceConcept.hpp @@ -13,6 +13,7 @@ //#include "DefaultParserImpl.hpp" +#include "opmonlib/MonitorableObject.hpp" #include "appfwk/DAQModule.hpp" //#include "packetformat/detail/block_parser.hpp" #include @@ -22,38 +23,37 @@ #include namespace dunedaq { -namespace dpdklibs { - -class SourceConcept -{ -public: - SourceConcept() {} - virtual ~SourceConcept() {} - - SourceConcept(const SourceConcept&) = delete; ///< SourceConcept is not copy-constructible - SourceConcept& operator=(const SourceConcept&) = delete; ///< SourceConcept is not copy-assginable - SourceConcept(SourceConcept&&) = delete; ///< SourceConcept is not move-constructible - SourceConcept& operator=(SourceConcept&&) = delete; ///< SourceConcept is not move-assignable - -// virtual void init(const nlohmann::json& args) = 0; - virtual void set_sink(const std::string& sink_name, bool callback_mode) = 0; - virtual void acquire_callback() = 0; -// virtual void conf(const nlohmann::json& args) = 0; -// virtual void start(const nlohmann::json& args) = 0; -// virtual void stop(const nlohmann::json& args) = 0; - // virtual void get_info(opmonlib::InfoCollector& ci, int level) = 0; - - virtual bool handle_payload(char* message, std::size_t size) = 0; - - void set_sink_name(const std::string& sink_name) - { - m_sink_name = sink_name; - } - - std::string m_sink_name; -}; - -} // namespace dpdklibs + namespace dpdklibs { + + class SourceConcept : public opmonlib::MonitorableObject + { + public: + SourceConcept() {} + virtual ~SourceConcept() {} + + SourceConcept(const SourceConcept&) = delete; ///< SourceConcept is not copy-constructible + SourceConcept& operator=(const SourceConcept&) = delete; ///< SourceConcept is not copy-assginable + SourceConcept(SourceConcept&&) = delete; ///< SourceConcept is not move-constructible + SourceConcept& operator=(SourceConcept&&) = delete; ///< SourceConcept is not move-assignable + + // virtual void init(const nlohmann::json& args) = 0; + virtual void set_sink(const std::string& sink_name, bool callback_mode) = 0; + virtual void acquire_callback() = 0; + // virtual void conf(const nlohmann::json& args) = 0; + // virtual void start(const nlohmann::json& args) = 0; + // virtual void stop(const nlohmann::json& args) = 0; + + virtual bool handle_payload(char* message, std::size_t size) = 0; + + void set_sink_name(const std::string& sink_name) + { + m_sink_name = sink_name; + } + + std::string m_sink_name; + }; + + } // namespace dpdklibs } // namespace dunedaq #endif // DPDKLIBS_SRC_SOURCECONCEPT_HPP_ diff --git a/src/SourceModel.hpp b/src/SourceModel.hpp index 12adb17..d25283d 100644 --- a/src/SourceModel.hpp +++ b/src/SourceModel.hpp @@ -15,6 +15,8 @@ #include "iomanager/Sender.hpp" #include "logging/Logging.hpp" +#include "dpdklibs/opmon/SourceModel.pb.h" + // #include "datahandlinglibs/utils/ReusableThread.hpp" #include "datahandlinglibs/DataMoveCallbackRegistry.hpp" @@ -106,12 +108,13 @@ class SourceModel : public SourceConcept return true; } - #warning MISSING OPMON - // void get_info(opmonlib::InfoCollector& ci, int /*level*/) { - // nicreaderinfo::SourceStats ss; - // ss.dropped_frames = m_dropped_packets.load(); - // ci.add(ss); - // } + void generate_opmon_data() override { + + opmon::SourceInfo info; + info.set_dropped_frames( m_dropped_packets.load() ); + + publish( std::move(info) ); + } private: // Sink internals