Skip to content

Commit

Permalink
new HW based xstats published via Opmon service
Browse files Browse the repository at this point in the history
  • Loading branch information
roland-sipos committed Sep 19, 2023
1 parent dbb1905 commit ad6987e
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 104 deletions.
9 changes: 9 additions & 0 deletions include/dpdklibs/XstatsHelper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ namespace dunedaq::dpdklibs {
m_allocated = true;
};

void reset_counters() {
if (m_allocated) {
rte_eth_xstats_reset(m_iface_id); //{
// TLOG() << "Cannot reset xstat values!";
//} else {
//}
}
}

void poll() {
if (m_allocated) {
if (m_len != rte_eth_xstats_get_by_id(m_iface_id, nullptr, m_xstats_values, m_len)) {
Expand Down
119 changes: 56 additions & 63 deletions plugins/NICReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,56 +259,57 @@ NICReceiver::do_start(const data_t&)
{
TLOG() << get_name() << ": Entering do_start() method";

m_stat_thread = std::thread([&]() {
uint64_t time_per_report = 10; // In seconds
std::atomic<bool> last_run_marker_value = m_run_marker.load();
std::atomic<bool> current_run_marker = false;

while (true) {

std::chrono::steady_clock::time_point loop_start_time = std::chrono::steady_clock::now();

current_run_marker = m_run_marker.load();
if (current_run_marker == false && last_run_marker_value == true) {
break;
} else {
last_run_marker_value = current_run_marker.load();
}

std::map<udp::StreamUID, udp::ReceiverStats> receiver_stats_across_ifaces;

for (auto& [iface_id, iface] : m_ifaces) {
auto receiver_stats_by_stream = iface->get_and_reset_stream_stats();

for (auto& [suid, stats] : receiver_stats_by_stream) {

// std::map::contains is available in C++20...
if (receiver_stats_across_ifaces.find(suid) == receiver_stats_across_ifaces.end()) {
receiver_stats_across_ifaces[suid];
}

receiver_stats_across_ifaces[suid].merge({ receiver_stats_by_stream[suid] });
}
}

opmonlib::InfoCollector ic;

for (auto& [suid, stats] : receiver_stats_across_ifaces) {

receiverinfo::Info derived_stats = DeriveFromReceiverStats(receiver_stats_across_ifaces[suid], time_per_report);
opmonlib::InfoCollector tmp_ic;
tmp_ic.add(derived_stats);
ic.add(udp::get_opmon_string(suid), tmp_ic);
}

{
std::lock_guard<std::mutex> l(m_ic_mutex);
m_ic = ic;
}

std::this_thread::sleep_until(loop_start_time + std::chrono::milliseconds(1000 * time_per_report));
}
});
// RS fixme: Removed for HW based XStats
// m_stat_thread = std::thread([&]() {
// uint64_t time_per_report = 10; // In seconds
// std::atomic<bool> last_run_marker_value = m_run_marker.load();
// std::atomic<bool> current_run_marker = false;
//
// while (true) {
//
// std::chrono::steady_clock::time_point loop_start_time = std::chrono::steady_clock::now();
//
// current_run_marker = m_run_marker.load();
// if (current_run_marker == false && last_run_marker_value == true) {
// break;
// } else {
// last_run_marker_value = current_run_marker.load();
// }
//
// std::map<udp::StreamUID, udp::ReceiverStats> receiver_stats_across_ifaces;
//
// for (auto& [iface_id, iface] : m_ifaces) {
// auto receiver_stats_by_stream = iface->get_and_reset_stream_stats();
//
// for (auto& [suid, stats] : receiver_stats_by_stream) {
//
// // std::map::contains is available in C++20...
// if (receiver_stats_across_ifaces.find(suid) == receiver_stats_across_ifaces.end()) {
// receiver_stats_across_ifaces[suid];
// }
//
// receiver_stats_across_ifaces[suid].merge({ receiver_stats_by_stream[suid] });
// }
// }
//
// opmonlib::InfoCollector ic;
//
// for (auto& [suid, stats] : receiver_stats_across_ifaces) {
//
// receiverinfo::Info derived_stats = DeriveFromReceiverStats(receiver_stats_across_ifaces[suid], time_per_report);
// opmonlib::InfoCollector tmp_ic;
// tmp_ic.add(derived_stats);
// ic.add(udp::get_opmon_string(suid), tmp_ic);
// }
//
// {
// std::lock_guard<std::mutex> l(m_ic_mutex);
// m_ic = ic;
// }
//
// std::this_thread::sleep_until(loop_start_time + std::chrono::milliseconds(1000 * time_per_report));
// }
// });

if (!m_run_marker.load()) {
set_running(true);
Expand Down Expand Up @@ -338,12 +339,14 @@ NICReceiver::do_stop(const data_t&)
TLOG_DEBUG(5) << "DPDK lcore processor is already stopped!";
}

/*
if (m_stat_thread.joinable()) {
m_stat_thread.join();
TLOG() << "Stat collecting thread has been join()'d";
} else {
TLOG() << "Stats thread is not joinable!";
}
*/

return;
}
Expand All @@ -360,19 +363,9 @@ NICReceiver::do_scrap(const data_t&)
void
NICReceiver::get_info(opmonlib::InfoCollector& ci, int level)
{

{
std::lock_guard<std::mutex> l(m_ic_mutex);
ci = m_ic;
}

TLOG() << "opmonlib::InfoCollector object passed by reference to NICReceiver::get_info looks like the following:\n" << ci.get_collected_infos();

nicreaderinfo::Info nri;
nri.groups_sent = m_groups_sent.exchange(0);
nri.total_groups_sent = m_total_groups_sent.load();


for (auto& [iface_id, iface] : m_ifaces) {
iface->get_info(ci, level);
}
}

void
Expand Down
4 changes: 0 additions & 4 deletions plugins/NICReceiver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,6 @@ class NICReceiver : public dunedaq::appfwk::DAQModule
using source_to_sink_map_t = std::map<int, std::unique_ptr<SourceConcept>>;
source_to_sink_map_t m_sources;

// Opmon
std::atomic<int> m_total_groups_sent {0};
std::atomic<int> m_groups_sent {0};

std::unique_ptr<udp::PacketInfoAccumulator> m_accum_ptr;
bool m_per_stream_reports = true;

Expand Down
62 changes: 58 additions & 4 deletions schema/dpdklibs/nicreaderinfo.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,67 @@ local moo = import "moo.jsonnet";
local s = moo.oschema.schema("dunedaq.dpdklibs.nicreaderinfo");

local info = {
uint8 : s.number("uint8", "u8", doc="An unsigned of 8 bytes"),
double8 : s.number("double8", "f8", doc="A double of 8 bytes"),
float8 : s.number("float8", "f8", doc="A float of 8 bytes"),

info: s.record("Info", [
s.field("groups_sent", self.uint8, 0, doc="Number of groups of frames sent"),
s.field("total_groups_sent", self.uint8, 0, doc="Total number of groups of frames sent")
], doc="NIC Reader information")
s.field("groups_sent", self.double8, 0, doc="Number of groups of frames sent"),
s.field("total_groups_sent", self.double8, 0, doc="Total number of groups of frames sent"),
s.field("rx_good_packets", self.double8, 0, doc="Total number of"),
s.field("rx_good_bytes", self.double8, 0, doc="Total number of"),
s.field("rx_missed_errors", self.double8, 0, doc="Total number of"),
s.field("rx_errors", self.double8, 0, doc="Total number of"),
s.field("rx_unicast_packets", self.double8, 0, doc="Total number of"),
s.field("rx_multicast_packets", self.double8, 0, doc="Total number of"),
s.field("rx_broadcast_packets", self.double8, 0, doc="Total number of"),
s.field("rx_dropped_packets", self.double8, 0, doc="Total number of"),
s.field("rx_unknown_protocol_packets", self.double8, 0, doc="Total number of"),
s.field("rx_crc_errors", self.double8, 0, doc="Total number of"),
s.field("rx_illegal_byte_errors", self.double8, 0, doc="Total number of"),
s.field("rx_error_bytes", self.double8, 0, doc="Total number of"),
s.field("mac_local_errors", self.double8, 0, doc="Total number of"),
s.field("mac_remote_errors", self.double8, 0, doc="Total number of"),
s.field("rx_len_errors", self.double8, 0, doc="Total number of"),
s.field("rx_xon_packets", self.double8, 0, doc="Total number of"),
s.field("rx_xoff_packets", self.double8, 0, doc="Total number of"),
s.field("rx_size_64_packets", self.double8, 0, doc="Total number of"),
s.field("rx_size_65_to_127_packets", self.double8, 0, doc="Total number of"),
s.field("rx_size_128_to_255_packets", self.double8, 0, doc="Total number of"),
s.field("rx_size_256_to_511_packets", self.double8, 0, doc="Total number of"),
s.field("rx_size_512_to_1023_packets", self.double8, 0, doc="Total number of"),
s.field("rx_size_1024_to_1522_packets", self.double8, 0, doc="Total number of"),
s.field("rx_size_1523_to_max_packets", self.double8, 0, doc="Total number of"),
s.field("rx_undersized_errors", self.double8, 0, doc="Total number of"),
s.field("rx_oversize_errors", self.double8, 0, doc="Total number of"),
s.field("rx_mac_short_pkt_dropped", self.double8, 0, doc="Total number of"),
s.field("rx_fragmented_errors", self.double8, 0, doc="Total number of"),
s.field("rx_jabber_errors", self.double8, 0, doc="Total number of"),
s.field("rx_q0_packets", self.double8, 0, doc="Total number of"),
s.field("rx_q1_packets", self.double8, 0, doc="Total number of"),
s.field("rx_q2_packets", self.double8, 0, doc="Total number of"),
s.field("rx_q3_packets", self.double8, 0, doc="Total number of"),
s.field("rx_q4_packets", self.double8, 0, doc="Total number of"),
s.field("rx_q5_packets", self.double8, 0, doc="Total number of"),
s.field("rx_q6_packets", self.double8, 0, doc="Total number of"),
s.field("rx_q7_packets", self.double8, 0, doc="Total number of"),
s.field("rx_q8_packets", self.double8, 0, doc="Total number of"),
s.field("rx_q9_packets", self.double8, 0, doc="Total number of"),
s.field("rx_q10_packets", self.double8, 0, doc="Total number of"),
s.field("rx_q11_packets", self.double8, 0, doc="Total number of"),
s.field("rx_q0_bytes", self.double8, 0, doc="Total number of"),
s.field("rx_q1_bytes", self.double8, 0, doc="Total number of"),
s.field("rx_q2_bytes", self.double8, 0, doc="Total number of"),
s.field("rx_q3_bytes", self.double8, 0, doc="Total number of"),
s.field("rx_q4_bytes", self.double8, 0, doc="Total number of"),
s.field("rx_q5_bytes", self.double8, 0, doc="Total number of"),
s.field("rx_q6_bytes", self.double8, 0, doc="Total number of"),
s.field("rx_q7_bytes", self.double8, 0, doc="Total number of"),
s.field("rx_q8_bytes", self.double8, 0, doc="Total number of"),
s.field("rx_q9_bytes", self.double8, 0, doc="Total number of"),
s.field("rx_q10_bytes", self.double8, 0, doc="Total number of"),
s.field("rx_q11_bytes", self.double8, 0, doc="Total number of"),
], doc="NIC Reader information"),

};

moo.oschema.sort_select(info)
102 changes: 70 additions & 32 deletions src/IfaceWrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ IfaceWrapper::setup_xstats()
{
// Stats setup
m_iface_xstats.setup(m_iface_id);
m_iface_xstats.reset_counters();
}

void
Expand Down Expand Up @@ -243,32 +244,38 @@ IfaceWrapper::start()
{
m_lcore_quit_signal.store(false);

m_stat_thread = std::thread([&]() {
TLOG() << "Launching stat thread of iface=" << m_iface_id;

while (m_run_marker.load()) {

for (auto& [qid, nframes] : m_num_frames_rxq) { // check for new frames
if (nframes.load() > 0) {
auto nbytes = m_num_bytes_rxq[qid].load();
TLOG_DEBUG(10) << "Received payloads on iface=" << m_iface_id
<< " of q[" << qid << "] is: " << nframes.load()
<< " Bytes: " << nbytes << " Rate: " << nbytes / 1e6 * 8 << " Mbps";
nframes.exchange(0);
m_num_bytes_rxq[qid].exchange(0);
}
}
for (auto& [strid, nframes] : m_num_unexid_frames) { // check for unexpected StreamID frames
if (nframes.load() > 0) {
TLOG_DEBUG(10) << "Unexpected StreamID frames on iface= " << m_iface_id
<< " with strid[" << strid << "]! Num: " << nframes.load();
nframes.exchange(0);
}
}

std::this_thread::sleep_for(std::chrono::seconds(1));
}
});
// RS: Removed for HW based Xstats
// m_stat_thread = std::thread([&]() {
// TLOG() << "Launching stat thread of iface=" << m_iface_id;
// while (m_run_marker.load()) {
// for (auto& [qid, nframes] : m_num_frames_rxq) { // check for new frames
// if (nframes.load() > 0) {
// auto nbytes = m_num_bytes_rxq[qid].load();
// TLOG_DEBUG(10) << "Received payloads on iface=" << m_iface_id
// << " of q[" << qid << "] is: " << nframes.load()
// << " Bytes: " << nbytes << " Rate: " << nbytes / 1e6 * 8 << " Mbps";
// nframes.exchange(0);
// m_num_bytes_rxq[qid].exchange(0);
// }
// }
// for (auto& [strid, nframes] : m_num_unexid_frames) { // check for unexpected StreamID frames
// if (nframes.load() > 0) {
// TLOG_DEBUG(10) << "Unexpected StreamID frames on iface= " << m_iface_id
// << " with strid[" << strid << "]! Num: " << nframes.load();
// nframes.exchange(0);
// }
// }
// // Poll, accumulate, then reset xstats
// //m_iface_xstats.poll();
// //for (int i = 0; i < m_iface_xstats.m_len; ++i) {
// // m_xstats_counters[m_iface_xstats.m_xstats_names[i].name] += m_iface_xstats.m_xstats_values[i];
// //}
// //m_iface_xstats.reset_counters();
//
// std::this_thread::sleep_for(std::chrono::seconds(1));
// }
// });


TLOG() << "Launching GARP thread with garp_func...";
m_garp_thread = std::thread(&IfaceWrapper::garp_func, this);
Expand All @@ -286,12 +293,16 @@ IfaceWrapper::stop()
{
m_lcore_quit_signal.store(true);

if (m_stat_thread.joinable()) {
m_stat_thread.join();
} else {
TLOG() << "Stats thread is not joinable!";
}

// RS: Removed for HW based Xstats
// // Stop stats thread
// if (m_stat_thread.joinable()) {
// m_stat_thread.join();
// } else {
// TLOG() << "Stats thread is not joinable!";
// }
//

// Stop GARP sender thread
if (m_garp_thread.joinable()) {
m_garp_thread.join();
} else {
Expand All @@ -310,6 +321,33 @@ IfaceWrapper::scrap()
m_accum_ptr.reset(nullptr);
}

void
IfaceWrapper::get_info(opmonlib::InfoCollector& ci, int level)
{
// Empty stat JSON placeholder
nlohmann::json stat_json;

// Poll stats from HW
m_iface_xstats.poll();

// Build JSON from values
for (int i = 0; i < m_iface_xstats.m_len; ++i) {
stat_json[m_iface_xstats.m_xstats_names[i].name] = m_iface_xstats.m_xstats_values[i];
}

// Reset HW counters
m_iface_xstats.reset_counters();

// Convert JSON to NICReaderInfo struct
nicreaderinfo::Info nri;
nicreaderinfo::from_json(stat_json, nri);

// Push to InfoCollector
ci.add(nri);
TLOG_DEBUG(TLVL_WORK_STEPS) << "opmonlib::InfoCollector object passed by reference to IfaceWrapper::get_info"
<< " -> Result looks like the following:\n" << ci.get_collected_infos();
}

void
IfaceWrapper::garp_func()
{
Expand Down
Loading

0 comments on commit ad6987e

Please sign in to comment.