diff --git a/CMakeLists.txt b/CMakeLists.txt index 3fb207b..372de9c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -76,6 +76,7 @@ daq_add_application(dpdklibs_test_frame_transmitter test_frame_transmitter.cxx T daq_add_application(dpdklibs_test_arp_response test_arp_response.cxx TEST LINK_LIBRARIES dpdklibs ${DPDK_LIBRARIES}) daq_add_application(dpdklibs_test_transmit_and_receive test_transmit_and_receive.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES}) daq_add_application(dpdklibs_test_stats_reporting test_stats_reporting.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES}) +daq_add_application(dpdklibs_test_dpdk_stats test_dpdk_stats.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES}) target_compile_options(dpdklibs PUBLIC ${DPDK_CFLAGS}) target_include_directories(dpdklibs PUBLIC ${DPDK_INCLUDE_DIRS}) diff --git a/include/dpdklibs/XstatsHelper.hpp b/include/dpdklibs/XstatsHelper.hpp new file mode 100644 index 0000000..f1d5d90 --- /dev/null +++ b/include/dpdklibs/XstatsHelper.hpp @@ -0,0 +1,94 @@ +#ifndef DPDKLIBS_INCLUDE_DPDKLIBS_XSTATSHELPER_HPP_ +#define DPDKLIBS_INCLUDE_DPDKLIBS_XSTATSHELPER_HPP_ + +#include + +namespace dunedaq::dpdklibs { + + struct IfaceXstats { + IfaceXstats(){} + ~IfaceXstats() + { + if (m_xstats_values != nullptr) { + free(m_xstats_values); + } + if (m_xstats_ids != nullptr) { + free(m_xstats_ids); + } + if (m_xstats_names != nullptr) { + free(m_xstats_names); + } + } + + void setup(int iface) { + m_iface_id = iface; + rte_eth_stats_reset(m_iface_id); + rte_eth_xstats_reset(m_iface_id); + + // Get number of stats + m_len = rte_eth_xstats_get_names_by_id(m_iface_id, NULL, NULL, 0); + if (m_len < 0) { + printf("Cannot get xstats count\n"); + } + + // Get names of HW registered stat fields + m_xstats_names = (rte_eth_xstat_name*)(malloc(sizeof(struct rte_eth_xstat_name) * m_len)); + if (m_xstats_names == nullptr) { + printf("Cannot allocate memory for xstat names\n"); + } + + // Retrieve xstats names, passing NULL for IDs to return all statistics + if (m_len != rte_eth_xstats_get_names(m_iface_id, m_xstats_names, m_len)) { + printf("Cannot get xstat names\n"); + } + + // Allocate value fields + m_xstats_values = (uint64_t*)(malloc(sizeof(m_xstats_values) * m_len)); + if (m_xstats_values == nullptr) { + printf("Cannot allocate memory for xstats\n"); + } + + // Getting xstats values (this is that we call in a loop/get_info + if (m_len != rte_eth_xstats_get_by_id(m_iface_id, nullptr, m_xstats_values, m_len)) { + printf("Cannot get xstat values\n"); + } + + // Print all xstats names and values to be amazed (WOW!) + TLOG() << "Registered HW based metrics: "; + for (int i = 0; i < m_len; i++) { + TLOG() << " XName: " << m_xstats_names[i].name; + } + + m_allocated = true; + }; + + void reset_counters() { + if (m_allocated) { + rte_eth_xstats_reset(m_iface_id); //{ + // TLOG() << "Cannot reset xstat values!"; + //} else { + //} + } + } + + void poll() { + if (m_allocated) { + if (m_len != rte_eth_xstats_get_by_id(m_iface_id, nullptr, m_xstats_values, m_len)) { + TLOG() << "Cannot get xstat values!"; + //} else { + } + } + } + + int m_iface_id; + bool m_allocated = false; + struct rte_eth_xstat_name *m_xstats_names; + uint64_t *m_xstats_ids; + uint64_t *m_xstats_values; + int m_len; + + }; + +} + +#endif // DPDKLIBS_INCLUDE_DPDKLIBS_XSTATSHELPER_HPP_ diff --git a/plugins/NICReceiver.cpp b/plugins/NICReceiver.cpp index 30fc6b1..bb42a6d 100644 --- a/plugins/NICReceiver.cpp +++ b/plugins/NICReceiver.cpp @@ -144,6 +144,7 @@ NICReceiver::do_configure(const data_t& args) m_ifaces[iface_id]->allocate_mbufs(); m_ifaces[iface_id]->setup_interface(); m_ifaces[iface_id]->setup_flow_steering(); + m_ifaces[iface_id]->setup_xstats(); } else { TLOG() << "No available interface with MAC=" << iface_mac_addr; ers::fatal(dunedaq::readoutlibs::InitializationError( @@ -257,58 +258,6 @@ void NICReceiver::do_start(const data_t&) { TLOG() << get_name() << ": Entering do_start() method"; - - m_stat_thread = std::thread([&]() { - uint64_t time_per_report = 10; // In seconds - std::atomic last_run_marker_value = m_run_marker.load(); - std::atomic current_run_marker = false; - - while (true) { - - std::chrono::steady_clock::time_point loop_start_time = std::chrono::steady_clock::now(); - - current_run_marker = m_run_marker.load(); - if (current_run_marker == false && last_run_marker_value == true) { - break; - } else { - last_run_marker_value = current_run_marker.load(); - } - - std::map receiver_stats_across_ifaces; - - for (auto& [iface_id, iface] : m_ifaces) { - auto receiver_stats_by_stream = iface->get_and_reset_stream_stats(); - - for (auto& [suid, stats] : receiver_stats_by_stream) { - - // std::map::contains is available in C++20... - if (receiver_stats_across_ifaces.find(suid) == receiver_stats_across_ifaces.end()) { - receiver_stats_across_ifaces[suid]; - } - - receiver_stats_across_ifaces[suid].merge({ receiver_stats_by_stream[suid] }); - } - } - - opmonlib::InfoCollector ic; - - for (auto& [suid, stats] : receiver_stats_across_ifaces) { - - receiverinfo::Info derived_stats = DeriveFromReceiverStats(receiver_stats_across_ifaces[suid], time_per_report); - opmonlib::InfoCollector tmp_ic; - tmp_ic.add(derived_stats); - ic.add(udp::get_opmon_string(suid), tmp_ic); - } - - { - std::lock_guard l(m_ic_mutex); - m_ic = ic; - } - - std::this_thread::sleep_until(loop_start_time + std::chrono::milliseconds(1000 * time_per_report)); - } - }); - if (!m_run_marker.load()) { set_running(true); TLOG() << "Starting iface wrappers."; @@ -336,14 +285,6 @@ NICReceiver::do_stop(const data_t&) } else { TLOG_DEBUG(5) << "DPDK lcore processor is already stopped!"; } - - if (m_stat_thread.joinable()) { - m_stat_thread.join(); - TLOG() << "Stat collecting thread has been join()'d"; - } else { - TLOG() << "Stats thread is not joinable!"; - } - return; } @@ -359,19 +300,9 @@ NICReceiver::do_scrap(const data_t&) void NICReceiver::get_info(opmonlib::InfoCollector& ci, int level) { - - { - std::lock_guard l(m_ic_mutex); - ci = m_ic; - } - - TLOG() << "opmonlib::InfoCollector object passed by reference to NICReceiver::get_info looks like the following:\n" << ci.get_collected_infos(); - - nicreaderinfo::Info nri; - nri.groups_sent = m_groups_sent.exchange(0); - nri.total_groups_sent = m_total_groups_sent.load(); - - + for (auto& [iface_id, iface] : m_ifaces) { + iface->get_info(ci, level); + } } void diff --git a/plugins/NICReceiver.hpp b/plugins/NICReceiver.hpp index 5eff633..4e20bd5 100644 --- a/plugins/NICReceiver.hpp +++ b/plugins/NICReceiver.hpp @@ -112,10 +112,6 @@ class NICReceiver : public dunedaq::appfwk::DAQModule using source_to_sink_map_t = std::map>; source_to_sink_map_t m_sources; - // Opmon - std::atomic m_total_groups_sent {0}; - std::atomic m_groups_sent {0}; - std::unique_ptr m_accum_ptr; bool m_per_stream_reports = true; diff --git a/schema/dpdklibs/nicreaderinfo.jsonnet b/schema/dpdklibs/nicreaderinfo.jsonnet index a2f21ef..c37341b 100644 --- a/schema/dpdklibs/nicreaderinfo.jsonnet +++ b/schema/dpdklibs/nicreaderinfo.jsonnet @@ -6,13 +6,67 @@ local moo = import "moo.jsonnet"; local s = moo.oschema.schema("dunedaq.dpdklibs.nicreaderinfo"); local info = { - uint8 : s.number("uint8", "u8", doc="An unsigned of 8 bytes"), + double8 : s.number("double8", "f8", doc="A double of 8 bytes"), float8 : s.number("float8", "f8", doc="A float of 8 bytes"), info: s.record("Info", [ - s.field("groups_sent", self.uint8, 0, doc="Number of groups of frames sent"), - s.field("total_groups_sent", self.uint8, 0, doc="Total number of groups of frames sent") - ], doc="NIC Reader information") + s.field("groups_sent", self.double8, 0, doc="Number of groups of frames sent"), + s.field("total_groups_sent", self.double8, 0, doc="Total groups of frames sent"), + s.field("rx_good_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_good_bytes", self.double8, 0, doc="New since last poll"), + s.field("rx_missed_errors", self.double8, 0, doc="New since last poll"), + s.field("rx_errors", self.double8, 0, doc="New since last poll"), + s.field("rx_unicast_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_multicast_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_broadcast_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_dropped_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_unknown_protocol_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_crc_errors", self.double8, 0, doc="New since last poll"), + s.field("rx_illegal_byte_errors", self.double8, 0, doc="New since last poll"), + s.field("rx_error_bytes", self.double8, 0, doc="New since last poll"), + s.field("mac_local_errors", self.double8, 0, doc="New since last poll"), + s.field("mac_remote_errors", self.double8, 0, doc="New since last poll"), + s.field("rx_len_errors", self.double8, 0, doc="New since last poll"), + s.field("rx_xon_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_xoff_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_size_64_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_size_65_to_127_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_size_128_to_255_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_size_256_to_511_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_size_512_to_1023_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_size_1024_to_1522_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_size_1523_to_max_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_undersized_errors", self.double8, 0, doc="New since last poll"), + s.field("rx_oversize_errors", self.double8, 0, doc="New since last poll"), + s.field("rx_mac_short_pkt_dropped", self.double8, 0, doc="New since last poll"), + s.field("rx_fragmented_errors", self.double8, 0, doc="New since last poll"), + s.field("rx_jabber_errors", self.double8, 0, doc="New since last poll"), + s.field("rx_q0_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_q1_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_q2_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_q3_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_q4_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_q5_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_q6_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_q7_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_q8_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_q9_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_q10_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_q11_packets", self.double8, 0, doc="New since last poll"), + s.field("rx_q0_bytes", self.double8, 0, doc="New since last poll"), + s.field("rx_q1_bytes", self.double8, 0, doc="New since last poll"), + s.field("rx_q2_bytes", self.double8, 0, doc="New since last poll"), + s.field("rx_q3_bytes", self.double8, 0, doc="New since last poll"), + s.field("rx_q4_bytes", self.double8, 0, doc="New since last poll"), + s.field("rx_q5_bytes", self.double8, 0, doc="New since last poll"), + s.field("rx_q6_bytes", self.double8, 0, doc="New since last poll"), + s.field("rx_q7_bytes", self.double8, 0, doc="New since last poll"), + s.field("rx_q8_bytes", self.double8, 0, doc="New since last poll"), + s.field("rx_q9_bytes", self.double8, 0, doc="New since last poll"), + s.field("rx_q10_bytes", self.double8, 0, doc="New since last poll"), + s.field("rx_q11_bytes", self.double8, 0, doc="New since last poll"), + ], doc="NIC Reader information"), + }; moo.oschema.sort_select(info) diff --git a/src/IfaceWrapper.cpp b/src/IfaceWrapper.cpp index 6682e30..dcbbe97 100644 --- a/src/IfaceWrapper.cpp +++ b/src/IfaceWrapper.cpp @@ -144,6 +144,14 @@ IfaceWrapper::setup_flow_steering() // //} } +void +IfaceWrapper::setup_xstats() +{ + // Stats setup + m_iface_xstats.setup(m_iface_id); + m_iface_xstats.reset_counters(); +} + void IfaceWrapper::conf(const iface_conf_t& args) { @@ -235,34 +243,6 @@ void IfaceWrapper::start() { m_lcore_quit_signal.store(false); - - m_stat_thread = std::thread([&]() { - TLOG() << "Launching stat thread of iface=" << m_iface_id; - - while (m_run_marker.load()) { - - for (auto& [qid, nframes] : m_num_frames_rxq) { // check for new frames - if (nframes.load() > 0) { - auto nbytes = m_num_bytes_rxq[qid].load(); - TLOG_DEBUG(10) << "Received payloads on iface=" << m_iface_id - << " of q[" << qid << "] is: " << nframes.load() - << " Bytes: " << nbytes << " Rate: " << nbytes / 1e6 * 8 << " Mbps"; - nframes.exchange(0); - m_num_bytes_rxq[qid].exchange(0); - } - } - for (auto& [strid, nframes] : m_num_unexid_frames) { // check for unexpected StreamID frames - if (nframes.load() > 0) { - TLOG_DEBUG(10) << "Unexpected StreamID frames on iface= " << m_iface_id - << " with strid[" << strid << "]! Num: " << nframes.load(); - nframes.exchange(0); - } - } - - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - }); - TLOG() << "Launching GARP thread with garp_func..."; m_garp_thread = std::thread(&IfaceWrapper::garp_func, this); @@ -271,26 +251,18 @@ IfaceWrapper::start() int ret = rte_eal_remote_launch((int (*)(void*))(&IfaceWrapper::rx_runner), this, lcoreid); TLOG() << " -> LCore[" << lcoreid << "] launched with return code=" << ret; } - } void IfaceWrapper::stop() { m_lcore_quit_signal.store(true); - - if (m_stat_thread.joinable()) { - m_stat_thread.join(); - } else { - TLOG() << "Stats thread is not joinable!"; - } - + // Stop GARP sender thread if (m_garp_thread.joinable()) { m_garp_thread.join(); } else { TLOG() << "GARP thrad is not joinable!"; } - m_accum_ptr->erase_stream_stats(); } @@ -299,10 +271,36 @@ IfaceWrapper::scrap() { struct rte_flow_error error; rte_flow_flush(m_iface_id, &error); - m_accum_ptr.reset(nullptr); } +void +IfaceWrapper::get_info(opmonlib::InfoCollector& ci, int level) +{ + // Empty stat JSON placeholder + nlohmann::json stat_json; + + // Poll stats from HW + m_iface_xstats.poll(); + + // Build JSON from values + for (int i = 0; i < m_iface_xstats.m_len; ++i) { + stat_json[m_iface_xstats.m_xstats_names[i].name] = m_iface_xstats.m_xstats_values[i]; + } + + // Reset HW counters + m_iface_xstats.reset_counters(); + + // Convert JSON to NICReaderInfo struct + nicreaderinfo::Info nri; + nicreaderinfo::from_json(stat_json, nri); + + // Push to InfoCollector + ci.add(nri); + TLOG_DEBUG(TLVL_WORK_STEPS) << "opmonlib::InfoCollector object passed by reference to IfaceWrapper::get_info" + << " -> Result looks like the following:\n" << ci.get_collected_infos(); +} + void IfaceWrapper::garp_func() { diff --git a/src/IfaceWrapper.hpp b/src/IfaceWrapper.hpp index f6ac7a4..8c246fa 100644 --- a/src/IfaceWrapper.hpp +++ b/src/IfaceWrapper.hpp @@ -10,12 +10,14 @@ #define DPDKLIBS_SRC_IFACEWRAPPER_HPP_ #include "dpdklibs/nicreader/Structs.hpp" +#include "dpdklibs/nicreaderinfo/InfoNljs.hpp" #include "dpdklibs/EALSetup.hpp" #include "dpdklibs/udp/Utils.hpp" #include "dpdklibs/udp/PacketCtor.hpp" #include "dpdklibs/arp/ARP.hpp" #include "dpdklibs/ipv4_addr.hpp" +#include "dpdklibs/XstatsHelper.hpp" #include "SourceConcept.hpp" #include @@ -48,10 +50,12 @@ class IfaceWrapper void start(); void stop(); void scrap(); + void get_info(opmonlib::InfoCollector& ci, int level); void allocate_mbufs(); void setup_interface(); void setup_flow_steering(); + void setup_xstats(); std::map get_and_reset_stream_stats() { return m_accum_ptr->get_and_reset_stream_stats(); @@ -97,7 +101,10 @@ class IfaceWrapper std::map> m_num_frames_rxq; std::map> m_num_bytes_rxq; std::map> m_num_unexid_frames; - std::thread m_stat_thread; + //std::thread m_stat_thread; + + // DPDK HW stats + dpdklibs::IfaceXstats m_iface_xstats; // Source to sink map std::map> m_stream_to_source_id; diff --git a/test/apps/test_dpdk_stats.cxx b/test/apps/test_dpdk_stats.cxx new file mode 100644 index 0000000..db86888 --- /dev/null +++ b/test/apps/test_dpdk_stats.cxx @@ -0,0 +1,220 @@ +/* Application will run until quit or killed. */ + +#include "dpdklibs/EALSetup.hpp" +#include "logging/Logging.hpp" +#include "dpdklibs/udp/PacketCtor.hpp" +#include "dpdklibs/udp/Utils.hpp" +#include "dpdklibs/arp/ARP.hpp" +#include "dpdklibs/ipv4_addr.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace dunedaq; +using namespace dpdklibs; +using namespace udp; + +namespace { + constexpr int burst_size = 256; + + std::atomic num_packets = 0; + std::atomic num_bytes = 0; + std::atomic num_errors = 0; + std::atomic num_missed = 0; + +} // namespace "" + +static const struct rte_eth_conf port_conf_default = { + .rxmode = { + .mtu = 9000, + .offloads = (DEV_RX_OFFLOAD_IPV4_CKSUM | DEV_RX_OFFLOAD_UDP_CKSUM), + } +}; + +static int +lcore_main(struct rte_mempool *mbuf_pool) +{ + uint16_t iface = 0; + TLOG() << "Launch lcore for interface: " << iface; + + struct rte_mbuf **tx_bufs = (rte_mbuf**) malloc(sizeof(struct rte_mbuf*) * burst_size); + rte_pktmbuf_alloc_bulk(mbuf_pool, tx_bufs, burst_size); + + // Reset internal ETH DEV stat counters. + rte_eth_stats_reset(iface); + rte_eth_xstats_reset(iface); + +//////////// RS FIXME -> Copy pasta DPDK Docs, of course the docs are super misleading.... + struct rte_eth_xstat_name *xstats_names; + uint64_t *xstats_ids; + uint64_t *values; + int len, i; + + // Get number of stats + len = rte_eth_xstats_get_names_by_id(iface, NULL, NULL, 0); + if (len < 0) { + printf("Cannot get xstats count\n"); + } + + // Get names of HW registered stat fields + xstats_names = (rte_eth_xstat_name*)(malloc(sizeof(struct rte_eth_xstat_name) * len)); + if (xstats_names == NULL) { + printf("Cannot allocate memory for xstat names\n"); + } + + // Retrieve xstats names, passing NULL for IDs to return all statistics + if (len != rte_eth_xstats_get_names(iface, xstats_names, len)) { + printf("Cannot get xstat names\n"); + } + + // Allocate value fields + values = (uint64_t*)(malloc(sizeof(values) * len)); + if (values == NULL) { + printf("Cannot allocate memory for xstats\n"); + } + + // Getting xstats values (this is that we call in a loop/get_info + if (len != rte_eth_xstats_get_by_id(iface, NULL, values, len)) { + printf("Cannot get xstat values\n"); + } + + // Print all xstats names and values to be amazed (WOW!) + for (i = 0; i < len; i++) { + TLOG() << "Name: " << xstats_names[i].name << " value: " << values[i]; + } + +/////////////// RS FIXME: Stats thread spawn. Passed with the scope of attrocities above... + auto stats = std::thread([&]() { + +///////////// RS FIXME: Simple PMD based stats monitoring is also possible + struct rte_eth_stats iface_stats; + while (true) { + // RS: poll out dev stats. (SIMPLE MODE) + rte_eth_stats_get(iface, &iface_stats); + num_packets = (uint64_t)iface_stats.ipackets; + num_bytes = (uint64_t)iface_stats.ibytes; + num_missed = (uint64_t)iface_stats.imissed; + num_errors = (uint64_t)iface_stats.ierrors; + TLOG() << " Total packets: " << num_packets + << " Total bytes: " << num_bytes + << " Total missed: " << num_missed + << " Total errors: " << num_errors; + // Queue based counters doesn't seem to work neither here neither in module... :(((((( + for( unsigned long i = 0; i < RTE_ETHDEV_QUEUE_STAT_CNTRS; i++ ){ + TLOG() << "HW iface queue[" << i << "] received: " << (uint64_t)iface_stats.q_ipackets[i]; + } + +////////////// RS FIXME: HW counter based stats monitoring. Fields initialized just before thread spawn. + if (len != rte_eth_xstats_get_by_id(iface, NULL, values, len)) { + TLOG() << "Cannot get xstat values!"; + } else { + for (i = 0; i < len; i++) { + TLOG() << "Name: " << xstats_names[i].name << " value: " << values[i]; + } + } + int reset_res = rte_eth_xstats_reset(iface); + TLOG() << "Reset notification result: " << reset_res; +////////////// RS FIXME: HW counter based loop ends. + + std::this_thread::sleep_for(std::chrono::seconds(1)); // If we sample for anything other than 1s, the rate calculation will need to change + } + }); + + struct rte_mbuf **bufs = (rte_mbuf**) malloc(sizeof(struct rte_mbuf*) * burst_size); + rte_pktmbuf_alloc_bulk(mbuf_pool, bufs, burst_size); + + + + bool once = true; // one shot variable + while (true) { + const uint16_t nb_rx = rte_eth_rx_burst(iface, 0, bufs, burst_size); + if (nb_rx != 0) { + num_packets += nb_rx; + // Iterate on burst packets + for (int i_b=0; i_bpkt_len; + + // Check for segmentation + if (bufs[i_b]->nb_segs > 1) { + TLOG() << "It appears a packet is spread across more than one receiving buffer;" + << " there's currently no logic in this program to handle this"; + } + + // Check packet type + auto pkt_type = bufs[i_b]->packet_type; + //// Handle non IPV4 packets + if (not RTE_ETH_IS_IPV4_HDR(pkt_type)) { + TLOG() << "Non-Ethernet packet type: " << (unsigned)pkt_type; + if (pkt_type == RTE_PTYPE_L2_ETHER_ARP) { + TLOG() << "TODO: Handle ARP request!"; + rte_pktmbuf_dump(stdout, bufs[i_b], bufs[i_b]->pkt_len); + //arp::pktgen_process_arp(bufs[i_b], 0, ip_addr_bin); + } else if (pkt_type == RTE_PTYPE_L2_ETHER_LLDP) { + TLOG() << "TODO: Handle LLDP packet!"; + rte_pktmbuf_dump(stdout, bufs[i_b], bufs[i_b]->pkt_len); + } else { + TLOG() << "Unidentified! Dumping..."; + rte_pktmbuf_dump(stdout, bufs[i_b], bufs[i_b]->pkt_len); + } + continue; + } + } + rte_pktmbuf_free_bulk(bufs, nb_rx); + } + } // main loop + + + return 0; +} + +int +main(int argc, char* argv[]) +{ + int ret = rte_eal_init(argc, argv); + if (ret < 0) { + rte_exit(EXIT_FAILURE, "ERROR: EAL initialization failed.\n"); + } + + // Iface ID and its queue numbers + int iface_id = 0; + const uint16_t rx_qs = 5; + const uint16_t tx_qs = 1; + const uint16_t rx_ring_size = 1024; + const uint16_t tx_ring_size = 1024; + // Get pool + std::map> mbuf_pools; + TLOG() << "Allocating pool"; + for (unsigned p_i = 0; p_i