diff --git a/CMakeLists.txt b/CMakeLists.txt index 2e5c301..22e83c0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.12) -project(dpdklibs VERSION 1.2.0) +project(dpdklibs VERSION 1.2.3) find_package(daq-cmake REQUIRED) @@ -79,6 +79,7 @@ daq_add_application(dpdklibs_test_transmit_and_receive test_transmit_and_receive daq_add_application(dpdklibs_test_stats_reporting test_stats_reporting.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES}) daq_add_application(dpdklibs_test_dpdk_stats test_dpdk_stats.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES}) daq_add_application(dpdklibs_test_multi_process test_multi_proc.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES}) +#daq_add_application(dpdklibs_test_unique_caps test_unique_caps.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 fmt::fmt ${DPDK_LIBRARIES}) target_compile_options(dpdklibs PUBLIC ${DPDK_CFLAGS}) target_include_directories(dpdklibs PUBLIC ${DPDK_INCLUDE_DIRS}) diff --git a/include/dpdklibs/EALSetup.hpp b/include/dpdklibs/EALSetup.hpp index d5e58b8..ebd326c 100644 --- a/include/dpdklibs/EALSetup.hpp +++ b/include/dpdklibs/EALSetup.hpp @@ -36,7 +36,7 @@ int iface_promiscuous_mode(std::uint16_t iface, bool mode = false); int iface_init(uint16_t iface, uint16_t rx_rings, uint16_t tx_rings, uint16_t rx_ring_size, uint16_t tx_ring_size, std::map>& mbuf_pool, - bool with_reset=false, bool with_mq_rss=false); + bool with_reset=false, bool with_mq_rss=false, bool check_link_status=false); std::unique_ptr get_mempool(const std::string& pool_name, int num_mbufs=NUM_MBUFS, int mbuf_cache_size=MBUF_CACHE_SIZE, diff --git a/plugins/NICReceiver.cpp b/plugins/NICReceiver.cpp index 836b867..7a3fa3a 100644 --- a/plugins/NICReceiver.cpp +++ b/plugins/NICReceiver.cpp @@ -178,17 +178,10 @@ NICReceiver::do_configure(const data_t& args) m_ifaces[iface_id]->setup_xstats(); } else { TLOG() << "No available interface with MAC=" << iface_mac_addr << " PCI=" << iface_pci_addr; - ers::fatal(dunedaq::readoutlibs::InitializationError( - ERS_HERE, "NICReceiver configuration failed due expected but unavailable interface!")); + throw dunedaq::readoutlibs::InitializationError(ERS_HERE, "NICReceiver configuration failed due expected but unavailable interface!"); } } - return; -} - -void -NICReceiver::do_start(const data_t&) -{ TLOG() << get_name() << ": Entering do_start() method"; if (!m_run_marker.load()) { set_running(true); @@ -199,11 +192,54 @@ NICReceiver::do_start(const data_t&) } else { TLOG_DEBUG(5) << "NICReader is already running!"; } + + return; +} + +void +NICReceiver::do_start(const data_t&) +{ + // TLOG() << get_name() << ": Entering do_start() method"; + // if (!m_run_marker.load()) { + // set_running(true); + // TLOG() << "Starting iface wrappers."; + // for (auto& [iface_id, iface] : m_ifaces) { + // iface->start(); + // } + // } else { + // TLOG_DEBUG(5) << "NICReader is already running!"; + // } + for (auto& [iface_id, iface] : m_ifaces) { + iface->enable_flow(); + } } void NICReceiver::do_stop(const data_t&) { + // TLOG() << get_name() << ": Entering do_stop() method"; + // if (m_run_marker.load()) { + // TLOG() << "Raising stop through variables!"; + // set_running(false); + // TLOG() << "Stopping iface wrappers."; + // for (auto& [iface_id, iface] : m_ifaces) { + // iface->stop(); + // } + // ealutils::wait_for_lcores(); + // TLOG() << "Stoppped DPDK lcore processors and internal threads..."; + // } else { + // TLOG_DEBUG(5) << "DPDK lcore processor is already stopped!"; + // } + // return; + for (auto& [iface_id, iface] : m_ifaces) { + iface->disable_flow(); + } +} + +void +NICReceiver::do_scrap(const data_t&) +{ + TLOG() << get_name() << ": Entering do_stop() method"; if (m_run_marker.load()) { TLOG() << "Raising stop through variables!"; @@ -217,12 +253,7 @@ NICReceiver::do_stop(const data_t&) } else { TLOG_DEBUG(5) << "DPDK lcore processor is already stopped!"; } - return; -} -void -NICReceiver::do_scrap(const data_t&) -{ TLOG() << get_name() << ": Entering do_scrap() method"; for (auto& [iface_id, iface] : m_ifaces) { iface->scrap(); diff --git a/plugins/NICReceiver.hpp b/plugins/NICReceiver.hpp index 0629368..9d41a15 100644 --- a/plugins/NICReceiver.hpp +++ b/plugins/NICReceiver.hpp @@ -100,10 +100,6 @@ class NICReceiver : public dunedaq::appfwk::DAQModule std::map> m_mbuf_pools; std::map m_bufs; - // Lcore processor - //template - // int rx_runner(void *arg __rte_unused); - // Interfaces (logical ID, MAC) -> IfaceWrapper std::map m_mac_to_id_map; std::map m_pci_to_id_map; @@ -120,6 +116,5 @@ class NICReceiver : public dunedaq::appfwk::DAQModule } // namespace dunedaq::dpdklibs -// #include "detail/NICReceiver.hxx" #endif // DPDKLIBS_PLUGINS_NICRECEIVER_HPP_ diff --git a/plugins/detail/NICReceiver.hxx b/plugins/detail/NICReceiver.hxx deleted file mode 100644 index 0c2a0ba..0000000 --- a/plugins/detail/NICReceiver.hxx +++ /dev/null @@ -1,96 +0,0 @@ - -namespace dunedaq { -namespace dpdklibs { - -int -NICReceiver::rx_runner(void* arg __rte_unused) -{ - bool once = true; // One shot action variable. - uint16_t iface = m_iface_id; - - const uint16_t lid = rte_lcore_id(); - auto queues = m_rx_core_map[lid]; - - if (rte_eth_dev_socket_id(iface) >= 0 && rte_eth_dev_socket_id(iface) != (int)rte_socket_id()) { - TLOG() << "WARNING, iface " << iface << " is on remote NUMA node to polling thread! " - << "Performance will not be optimal."; - } - - TLOG() << "LCore RX runner on CPU[" << lid << "]: Main loop starts for iface " << iface << " !"; - - // while(!m_dpdk_quit_signal) { - while (!ealutils::dpdk_quit_signal) { - for (auto q : queues) { - auto src_rx_q = q.first; - auto* q_bufs = m_bufs[src_rx_q]; - // Get burst from queue - const uint16_t nb_rx = rte_eth_rx_burst(iface, src_rx_q, q_bufs, m_burst_size); - if (nb_rx != 0) { - // Print first packet for FYI - if (once && q_bufs[0]->pkt_len > 7000) { - TLOG() << "lid = " << lid; - TLOG() << "src_rx_q = " << src_rx_q; - TLOG() << "nb_rx = " << nb_rx; - TLOG() << "bufs.dta_len = " << q_bufs[0]->data_len; - TLOG() << "bufs.pkt_len = " << q_bufs[0]->pkt_len; - rte_pktmbuf_dump(stdout, q_bufs[0], q_bufs[0]->pkt_len); - std::string udp_hdr_str = udp::get_udp_header_str(q_bufs[0]); - TLOG() << "UDP Header: " << udp_hdr_str; - once = false; - } - - // Iterate on burst packets - for (int i_b = 0; i_b < nb_rx; ++i_b) { - - if (q_bufs[i_b]->nb_segs > 1) { - TLOG() << "It appears a packet is spread across more than one receiving buffer;" - << " there's currently no logic in this program to handle this"; - } - - // Check packet type - auto pkt_type = q_bufs[i_b]->packet_type; - //// Handle non IPV4 packets - if (not RTE_ETH_IS_IPV4_HDR(pkt_type)) { - TLOG() << "Non-Ethernet packet type: " << (unsigned)pkt_type << " original: " << pkt_type; - if (pkt_type == RTE_PTYPE_L2_ETHER_ARP) { - TLOG() << "TODO: Handle ARP request!"; - } else if (pkt_type == RTE_PTYPE_L2_ETHER_LLDP) { - TLOG() << "TODO: Handle LLDP packet!"; - } else { - TLOG() << "Unidentified! Dumping..."; - rte_pktmbuf_dump(stdout, q_bufs[i_b], q_bufs[i_b]->pkt_len); - } - continue; - } - - // Check for UDP frames - // if (pkt_type == RTE_PTYPE_L4_UDP) { - // Check for JUMBO frames - bool dummy = false; - if (q_bufs[i_b]->pkt_len > 7000) { // do proper check on data length later - // Handle them. - std::size_t data_len = q_bufs[i_b]->data_len; - char* message = udp::get_udp_payload(q_bufs[i_b]); - handle_eth_payload(src_rx_q, message, data_len); - m_num_frames[src_rx_q]++; - m_num_bytes[src_rx_q] += data_len; - } - } - - // From John's example: more efficient bulk free - rte_pktmbuf_free_bulk(q_bufs, nb_rx); - // Clear message buffers - // for (int i=0; i < nb_rx; i++) { - // rte_pktmbuf_free(q_bufs[i]); - //} - - } // per burst - } // per Q - } // main loop - - TLOG() << "LCore RX runner on CPU[" << lid << "] returned."; - return 0; -} - -} // namespace dpdklibs -} // namespace dunedaq diff --git a/src/EALSetup.cpp b/src/EALSetup.cpp index 33c87b1..862ef83 100644 --- a/src/EALSetup.cpp +++ b/src/EALSetup.cpp @@ -95,7 +95,7 @@ int iface_init(uint16_t iface, uint16_t rx_rings, uint16_t tx_rings, uint16_t rx_ring_size, uint16_t tx_ring_size, std::map>& mbuf_pool, - bool with_reset, bool with_mq_rss) + bool with_reset, bool with_mq_rss, bool check_link_status) { struct rte_eth_conf iface_conf = iface_conf_default; uint16_t nb_rxd = rx_ring_size; @@ -131,14 +131,6 @@ iface_init(uint16_t iface, uint16_t rx_rings, uint16_t tx_rings, } } - if ((retval = rte_eth_link_get_nowait(iface, &link)) < 0) { - throw FailedToRetrieveLinkStatus(ERS_HERE, iface, retval); - } - - if (link.link_status == 0 ) { - throw LinkOffline(ERS_HERE, iface); - } - // Should we configure MQ RSS and offload? if (with_mq_rss) { iface_conf_rss_mode(iface_conf, true, true); // with_rss, with_offload @@ -198,10 +190,20 @@ iface_init(uint16_t iface, uint16_t rx_rings, uint16_t tx_rings, } // Start the Ethernet interface. - retval = rte_eth_dev_start(iface); - if (retval < 0) - return retval; + if ((retval = rte_eth_dev_start(iface)) < 0) { + throw FailedToConfigureInterface(ERS_HERE, iface, "MAC address retrival", retval); + } + if ((retval = rte_eth_link_get(iface, &link)) != 0) { + throw FailedToRetrieveLinkStatus(ERS_HERE, iface, retval); + } + + TLOG() << "Link: speed=" << link.link_speed << " duplex=" << link.link_duplex << " autoneg=" << link.link_autoneg << " status=" << link.link_status; + + if ( check_link_status && link.link_status == 0 ) { + throw LinkOffline(ERS_HERE, iface); + } + // Display the interface MAC address. struct rte_ether_addr addr; if ((retval = rte_eth_macaddr_get(iface, &addr)) == 0) { diff --git a/src/IfaceWrapper.cpp b/src/IfaceWrapper.cpp index 53e6a8f..2a11cd0 100644 --- a/src/IfaceWrapper.cpp +++ b/src/IfaceWrapper.cpp @@ -93,8 +93,9 @@ IfaceWrapper::setup_interface() { TLOG() << "Initialize interface " << m_iface_id; bool with_reset = true, with_mq_mode = true; // go to config + bool check_link_status = false; - int retval = ealutils::iface_init(m_iface_id, m_rx_qs.size(), m_tx_qs.size(), m_rx_ring_size, m_tx_ring_size, m_mbuf_pools, with_reset, with_mq_mode); + int retval = ealutils::iface_init(m_iface_id, m_rx_qs.size(), m_tx_qs.size(), m_rx_ring_size, m_tx_ring_size, m_mbuf_pools, with_reset, with_mq_mode, check_link_status); if (retval != 0 ) { throw FailedToSetupInterface(ERS_HERE, m_iface_id, retval); } @@ -265,6 +266,7 @@ IfaceWrapper::start() } + m_lcore_enable_flow.store(false); m_lcore_quit_signal.store(false); TLOG() << "Launching GARP thread with garp_func..."; m_garp_thread = std::thread(&IfaceWrapper::garp_func, this); @@ -280,6 +282,7 @@ IfaceWrapper::start() void IfaceWrapper::stop() { + m_lcore_enable_flow.store(false); m_lcore_quit_signal.store(true); // Stop GARP sender thread if (m_garp_thread.joinable()) { diff --git a/src/IfaceWrapper.hpp b/src/IfaceWrapper.hpp index 3731512..b27ef91 100644 --- a/src/IfaceWrapper.hpp +++ b/src/IfaceWrapper.hpp @@ -57,9 +57,8 @@ class IfaceWrapper void setup_flow_steering(); void setup_xstats(); - // std::map get_and_reset_stream_stats() { - // return m_accum_ptr->get_and_reset_stream_stats(); - // } + void enable_flow() { m_lcore_enable_flow.store(true);} + void disable_flow() { m_lcore_enable_flow.store(false);} protected: iface_conf_t m_cfg; @@ -93,6 +92,8 @@ class IfaceWrapper // Lcore stop signal std::atomic m_lcore_quit_signal{ false }; + std::atomic m_lcore_enable_flow{ false }; + // Mbufs and pools std::map> m_mbuf_pools; std::map m_bufs; // by queue diff --git a/src/detail/IfaceWrapper.hxx b/src/detail/IfaceWrapper.hxx index 3aa3c38..25e527b 100644 --- a/src/detail/IfaceWrapper.hxx +++ b/src/detail/IfaceWrapper.hxx @@ -81,8 +81,11 @@ IfaceWrapper::rx_runner(void *arg __rte_unused) { if (q_bufs[i_b]->pkt_len > 7000) { // RS FIXME: do proper check on data length later // Handle them! std::size_t data_len = q_bufs[i_b]->data_len; - char* message = udp::get_udp_payload(q_bufs[i_b]); - handle_eth_payload(src_rx_q, message, data_len); + + if ( m_lcore_enable_flow.load() ) { + char* message = udp::get_udp_payload(q_bufs[i_b]); + handle_eth_payload(src_rx_q, message, data_len); + } ++m_num_frames_rxq[src_rx_q]; m_num_bytes_rxq[src_rx_q] += data_len; }