Skip to content

Commit

Permalink
Merge pull request #92 from DUNE-DAQ/roland-sipos/xstats
Browse files Browse the repository at this point in the history
Roland sipos/xstats
  • Loading branch information
roland-sipos authored Sep 21, 2023
2 parents aaa6e65 + 9fee101 commit e6ac26c
Show file tree
Hide file tree
Showing 8 changed files with 421 additions and 120 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ daq_add_application(dpdklibs_test_frame_transmitter test_frame_transmitter.cxx T
daq_add_application(dpdklibs_test_arp_response test_arp_response.cxx TEST LINK_LIBRARIES dpdklibs ${DPDK_LIBRARIES})
daq_add_application(dpdklibs_test_transmit_and_receive test_transmit_and_receive.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES})
daq_add_application(dpdklibs_test_stats_reporting test_stats_reporting.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES})
daq_add_application(dpdklibs_test_dpdk_stats test_dpdk_stats.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES})

target_compile_options(dpdklibs PUBLIC ${DPDK_CFLAGS})
target_include_directories(dpdklibs PUBLIC ${DPDK_INCLUDE_DIRS})
Expand Down
94 changes: 94 additions & 0 deletions include/dpdklibs/XstatsHelper.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#ifndef DPDKLIBS_INCLUDE_DPDKLIBS_XSTATSHELPER_HPP_
#define DPDKLIBS_INCLUDE_DPDKLIBS_XSTATSHELPER_HPP_

#include <rte_ethdev.h>

namespace dunedaq::dpdklibs {

struct IfaceXstats {
IfaceXstats(){}
~IfaceXstats()
{
if (m_xstats_values != nullptr) {
free(m_xstats_values);
}
if (m_xstats_ids != nullptr) {
free(m_xstats_ids);
}
if (m_xstats_names != nullptr) {
free(m_xstats_names);
}
}

void setup(int iface) {
m_iface_id = iface;
rte_eth_stats_reset(m_iface_id);
rte_eth_xstats_reset(m_iface_id);

// Get number of stats
m_len = rte_eth_xstats_get_names_by_id(m_iface_id, NULL, NULL, 0);
if (m_len < 0) {
printf("Cannot get xstats count\n");
}

// Get names of HW registered stat fields
m_xstats_names = (rte_eth_xstat_name*)(malloc(sizeof(struct rte_eth_xstat_name) * m_len));
if (m_xstats_names == nullptr) {
printf("Cannot allocate memory for xstat names\n");
}

// Retrieve xstats names, passing NULL for IDs to return all statistics
if (m_len != rte_eth_xstats_get_names(m_iface_id, m_xstats_names, m_len)) {
printf("Cannot get xstat names\n");
}

// Allocate value fields
m_xstats_values = (uint64_t*)(malloc(sizeof(m_xstats_values) * m_len));
if (m_xstats_values == nullptr) {
printf("Cannot allocate memory for xstats\n");
}

// Getting xstats values (this is that we call in a loop/get_info
if (m_len != rte_eth_xstats_get_by_id(m_iface_id, nullptr, m_xstats_values, m_len)) {
printf("Cannot get xstat values\n");
}

// Print all xstats names and values to be amazed (WOW!)
TLOG() << "Registered HW based metrics: ";
for (int i = 0; i < m_len; i++) {
TLOG() << " XName: " << m_xstats_names[i].name;
}

m_allocated = true;
};

void reset_counters() {
if (m_allocated) {
rte_eth_xstats_reset(m_iface_id); //{
// TLOG() << "Cannot reset xstat values!";
//} else {
//}
}
}

void poll() {
if (m_allocated) {
if (m_len != rte_eth_xstats_get_by_id(m_iface_id, nullptr, m_xstats_values, m_len)) {
TLOG() << "Cannot get xstat values!";
//} else {
}
}
}

int m_iface_id;
bool m_allocated = false;
struct rte_eth_xstat_name *m_xstats_names;
uint64_t *m_xstats_ids;
uint64_t *m_xstats_values;
int m_len;

};

}

#endif // DPDKLIBS_INCLUDE_DPDKLIBS_XSTATSHELPER_HPP_
77 changes: 4 additions & 73 deletions plugins/NICReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ NICReceiver::do_configure(const data_t& args)
m_ifaces[iface_id]->allocate_mbufs();
m_ifaces[iface_id]->setup_interface();
m_ifaces[iface_id]->setup_flow_steering();
m_ifaces[iface_id]->setup_xstats();
} else {
TLOG() << "No available interface with MAC=" << iface_mac_addr;
ers::fatal(dunedaq::readoutlibs::InitializationError(
Expand Down Expand Up @@ -257,58 +258,6 @@ void
NICReceiver::do_start(const data_t&)
{
TLOG() << get_name() << ": Entering do_start() method";

m_stat_thread = std::thread([&]() {
uint64_t time_per_report = 10; // In seconds
std::atomic<bool> last_run_marker_value = m_run_marker.load();
std::atomic<bool> current_run_marker = false;

while (true) {

std::chrono::steady_clock::time_point loop_start_time = std::chrono::steady_clock::now();

current_run_marker = m_run_marker.load();
if (current_run_marker == false && last_run_marker_value == true) {
break;
} else {
last_run_marker_value = current_run_marker.load();
}

std::map<udp::StreamUID, udp::ReceiverStats> receiver_stats_across_ifaces;

for (auto& [iface_id, iface] : m_ifaces) {
auto receiver_stats_by_stream = iface->get_and_reset_stream_stats();

for (auto& [suid, stats] : receiver_stats_by_stream) {

// std::map::contains is available in C++20...
if (receiver_stats_across_ifaces.find(suid) == receiver_stats_across_ifaces.end()) {
receiver_stats_across_ifaces[suid];
}

receiver_stats_across_ifaces[suid].merge({ receiver_stats_by_stream[suid] });
}
}

opmonlib::InfoCollector ic;

for (auto& [suid, stats] : receiver_stats_across_ifaces) {

receiverinfo::Info derived_stats = DeriveFromReceiverStats(receiver_stats_across_ifaces[suid], time_per_report);
opmonlib::InfoCollector tmp_ic;
tmp_ic.add(derived_stats);
ic.add(udp::get_opmon_string(suid), tmp_ic);
}

{
std::lock_guard<std::mutex> l(m_ic_mutex);
m_ic = ic;
}

std::this_thread::sleep_until(loop_start_time + std::chrono::milliseconds(1000 * time_per_report));
}
});

if (!m_run_marker.load()) {
set_running(true);
TLOG() << "Starting iface wrappers.";
Expand Down Expand Up @@ -336,14 +285,6 @@ NICReceiver::do_stop(const data_t&)
} else {
TLOG_DEBUG(5) << "DPDK lcore processor is already stopped!";
}

if (m_stat_thread.joinable()) {
m_stat_thread.join();
TLOG() << "Stat collecting thread has been join()'d";
} else {
TLOG() << "Stats thread is not joinable!";
}

return;
}

Expand All @@ -359,19 +300,9 @@ NICReceiver::do_scrap(const data_t&)
void
NICReceiver::get_info(opmonlib::InfoCollector& ci, int level)
{

{
std::lock_guard<std::mutex> l(m_ic_mutex);
ci = m_ic;
}

TLOG() << "opmonlib::InfoCollector object passed by reference to NICReceiver::get_info looks like the following:\n" << ci.get_collected_infos();

nicreaderinfo::Info nri;
nri.groups_sent = m_groups_sent.exchange(0);
nri.total_groups_sent = m_total_groups_sent.load();


for (auto& [iface_id, iface] : m_ifaces) {
iface->get_info(ci, level);
}
}

void
Expand Down
4 changes: 0 additions & 4 deletions plugins/NICReceiver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,6 @@ class NICReceiver : public dunedaq::appfwk::DAQModule
using source_to_sink_map_t = std::map<int, std::unique_ptr<SourceConcept>>;
source_to_sink_map_t m_sources;

// Opmon
std::atomic<int> m_total_groups_sent {0};
std::atomic<int> m_groups_sent {0};

std::unique_ptr<udp::PacketInfoAccumulator> m_accum_ptr;
bool m_per_stream_reports = true;

Expand Down
62 changes: 58 additions & 4 deletions schema/dpdklibs/nicreaderinfo.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,67 @@ local moo = import "moo.jsonnet";
local s = moo.oschema.schema("dunedaq.dpdklibs.nicreaderinfo");

local info = {
uint8 : s.number("uint8", "u8", doc="An unsigned of 8 bytes"),
double8 : s.number("double8", "f8", doc="A double of 8 bytes"),
float8 : s.number("float8", "f8", doc="A float of 8 bytes"),

info: s.record("Info", [
s.field("groups_sent", self.uint8, 0, doc="Number of groups of frames sent"),
s.field("total_groups_sent", self.uint8, 0, doc="Total number of groups of frames sent")
], doc="NIC Reader information")
s.field("groups_sent", self.double8, 0, doc="Number of groups of frames sent"),
s.field("total_groups_sent", self.double8, 0, doc="Total groups of frames sent"),
s.field("rx_good_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_good_bytes", self.double8, 0, doc="New since last poll"),
s.field("rx_missed_errors", self.double8, 0, doc="New since last poll"),
s.field("rx_errors", self.double8, 0, doc="New since last poll"),
s.field("rx_unicast_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_multicast_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_broadcast_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_dropped_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_unknown_protocol_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_crc_errors", self.double8, 0, doc="New since last poll"),
s.field("rx_illegal_byte_errors", self.double8, 0, doc="New since last poll"),
s.field("rx_error_bytes", self.double8, 0, doc="New since last poll"),
s.field("mac_local_errors", self.double8, 0, doc="New since last poll"),
s.field("mac_remote_errors", self.double8, 0, doc="New since last poll"),
s.field("rx_len_errors", self.double8, 0, doc="New since last poll"),
s.field("rx_xon_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_xoff_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_size_64_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_size_65_to_127_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_size_128_to_255_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_size_256_to_511_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_size_512_to_1023_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_size_1024_to_1522_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_size_1523_to_max_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_undersized_errors", self.double8, 0, doc="New since last poll"),
s.field("rx_oversize_errors", self.double8, 0, doc="New since last poll"),
s.field("rx_mac_short_pkt_dropped", self.double8, 0, doc="New since last poll"),
s.field("rx_fragmented_errors", self.double8, 0, doc="New since last poll"),
s.field("rx_jabber_errors", self.double8, 0, doc="New since last poll"),
s.field("rx_q0_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_q1_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_q2_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_q3_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_q4_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_q5_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_q6_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_q7_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_q8_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_q9_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_q10_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_q11_packets", self.double8, 0, doc="New since last poll"),
s.field("rx_q0_bytes", self.double8, 0, doc="New since last poll"),
s.field("rx_q1_bytes", self.double8, 0, doc="New since last poll"),
s.field("rx_q2_bytes", self.double8, 0, doc="New since last poll"),
s.field("rx_q3_bytes", self.double8, 0, doc="New since last poll"),
s.field("rx_q4_bytes", self.double8, 0, doc="New since last poll"),
s.field("rx_q5_bytes", self.double8, 0, doc="New since last poll"),
s.field("rx_q6_bytes", self.double8, 0, doc="New since last poll"),
s.field("rx_q7_bytes", self.double8, 0, doc="New since last poll"),
s.field("rx_q8_bytes", self.double8, 0, doc="New since last poll"),
s.field("rx_q9_bytes", self.double8, 0, doc="New since last poll"),
s.field("rx_q10_bytes", self.double8, 0, doc="New since last poll"),
s.field("rx_q11_bytes", self.double8, 0, doc="New since last poll"),
], doc="NIC Reader information"),

};

moo.oschema.sort_select(info)
Loading

0 comments on commit e6ac26c

Please sign in to comment.