Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prep release/fddaq v4.3.0 #116

Merged
merged 14 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.12)
project(dpdklibs VERSION 1.2.0)
project(dpdklibs VERSION 1.2.3)

find_package(daq-cmake REQUIRED)

Expand Down Expand Up @@ -79,6 +79,7 @@ daq_add_application(dpdklibs_test_transmit_and_receive test_transmit_and_receive
daq_add_application(dpdklibs_test_stats_reporting test_stats_reporting.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES})
daq_add_application(dpdklibs_test_dpdk_stats test_dpdk_stats.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES})
daq_add_application(dpdklibs_test_multi_process test_multi_proc.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES})
#daq_add_application(dpdklibs_test_unique_caps test_unique_caps.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 fmt::fmt ${DPDK_LIBRARIES})

target_compile_options(dpdklibs PUBLIC ${DPDK_CFLAGS})
target_include_directories(dpdklibs PUBLIC ${DPDK_INCLUDE_DIRS})
Expand Down
2 changes: 1 addition & 1 deletion include/dpdklibs/EALSetup.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ int iface_promiscuous_mode(std::uint16_t iface, bool mode = false);
int iface_init(uint16_t iface, uint16_t rx_rings, uint16_t tx_rings,
uint16_t rx_ring_size, uint16_t tx_ring_size,
std::map<int, std::unique_ptr<rte_mempool>>& mbuf_pool,
bool with_reset=false, bool with_mq_rss=false);
bool with_reset=false, bool with_mq_rss=false, bool check_link_status=false);

std::unique_ptr<rte_mempool> get_mempool(const std::string& pool_name,
int num_mbufs=NUM_MBUFS, int mbuf_cache_size=MBUF_CACHE_SIZE,
Expand Down
57 changes: 44 additions & 13 deletions plugins/NICReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,10 @@ NICReceiver::do_configure(const data_t& args)
m_ifaces[iface_id]->setup_xstats();
} else {
TLOG() << "No available interface with MAC=" << iface_mac_addr << " PCI=" << iface_pci_addr;
ers::fatal(dunedaq::readoutlibs::InitializationError(
ERS_HERE, "NICReceiver configuration failed due expected but unavailable interface!"));
throw dunedaq::readoutlibs::InitializationError(ERS_HERE, "NICReceiver configuration failed due expected but unavailable interface!");
}
}

return;
}

void
NICReceiver::do_start(const data_t&)
{
TLOG() << get_name() << ": Entering do_start() method";
if (!m_run_marker.load()) {
set_running(true);
Expand All @@ -199,11 +192,54 @@ NICReceiver::do_start(const data_t&)
} else {
TLOG_DEBUG(5) << "NICReader is already running!";
}

return;
}

void
NICReceiver::do_start(const data_t&)
{
// TLOG() << get_name() << ": Entering do_start() method";
// if (!m_run_marker.load()) {
// set_running(true);
// TLOG() << "Starting iface wrappers.";
// for (auto& [iface_id, iface] : m_ifaces) {
// iface->start();
// }
// } else {
// TLOG_DEBUG(5) << "NICReader is already running!";
// }
for (auto& [iface_id, iface] : m_ifaces) {
iface->enable_flow();
}
}

void
NICReceiver::do_stop(const data_t&)
{
// TLOG() << get_name() << ": Entering do_stop() method";
// if (m_run_marker.load()) {
// TLOG() << "Raising stop through variables!";
// set_running(false);
// TLOG() << "Stopping iface wrappers.";
// for (auto& [iface_id, iface] : m_ifaces) {
// iface->stop();
// }
// ealutils::wait_for_lcores();
// TLOG() << "Stoppped DPDK lcore processors and internal threads...";
// } else {
// TLOG_DEBUG(5) << "DPDK lcore processor is already stopped!";
// }
// return;
for (auto& [iface_id, iface] : m_ifaces) {
iface->disable_flow();
}
}

void
NICReceiver::do_scrap(const data_t&)
{

TLOG() << get_name() << ": Entering do_stop() method";
if (m_run_marker.load()) {
TLOG() << "Raising stop through variables!";
Expand All @@ -217,12 +253,7 @@ NICReceiver::do_stop(const data_t&)
} else {
TLOG_DEBUG(5) << "DPDK lcore processor is already stopped!";
}
return;
}

void
NICReceiver::do_scrap(const data_t&)
{
TLOG() << get_name() << ": Entering do_scrap() method";
for (auto& [iface_id, iface] : m_ifaces) {
iface->scrap();
Expand Down
5 changes: 0 additions & 5 deletions plugins/NICReceiver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,6 @@ class NICReceiver : public dunedaq::appfwk::DAQModule
std::map<int, std::unique_ptr<rte_mempool>> m_mbuf_pools;
std::map<int, struct rte_mbuf **> m_bufs;

// Lcore processor
//template<class T>
// int rx_runner(void *arg __rte_unused);

// Interfaces (logical ID, MAC) -> IfaceWrapper
std::map<std::string, uint16_t> m_mac_to_id_map;
std::map<std::string, uint16_t> m_pci_to_id_map;
Expand All @@ -120,6 +116,5 @@ class NICReceiver : public dunedaq::appfwk::DAQModule

} // namespace dunedaq::dpdklibs

// #include "detail/NICReceiver.hxx"

#endif // DPDKLIBS_PLUGINS_NICRECEIVER_HPP_
96 changes: 0 additions & 96 deletions plugins/detail/NICReceiver.hxx

This file was deleted.

26 changes: 14 additions & 12 deletions src/EALSetup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ int
iface_init(uint16_t iface, uint16_t rx_rings, uint16_t tx_rings,
uint16_t rx_ring_size, uint16_t tx_ring_size,
std::map<int, std::unique_ptr<rte_mempool>>& mbuf_pool,
bool with_reset, bool with_mq_rss)
bool with_reset, bool with_mq_rss, bool check_link_status)
{
struct rte_eth_conf iface_conf = iface_conf_default;
uint16_t nb_rxd = rx_ring_size;
Expand Down Expand Up @@ -131,14 +131,6 @@ iface_init(uint16_t iface, uint16_t rx_rings, uint16_t tx_rings,
}
}

if ((retval = rte_eth_link_get_nowait(iface, &link)) < 0) {
throw FailedToRetrieveLinkStatus(ERS_HERE, iface, retval);
}

if (link.link_status == 0 ) {
throw LinkOffline(ERS_HERE, iface);
}

// Should we configure MQ RSS and offload?
if (with_mq_rss) {
iface_conf_rss_mode(iface_conf, true, true); // with_rss, with_offload
Expand Down Expand Up @@ -198,10 +190,20 @@ iface_init(uint16_t iface, uint16_t rx_rings, uint16_t tx_rings,
}

// Start the Ethernet interface.
retval = rte_eth_dev_start(iface);
if (retval < 0)
return retval;
if ((retval = rte_eth_dev_start(iface)) < 0) {
throw FailedToConfigureInterface(ERS_HERE, iface, "MAC address retrival", retval);
}

if ((retval = rte_eth_link_get(iface, &link)) != 0) {
throw FailedToRetrieveLinkStatus(ERS_HERE, iface, retval);
}

TLOG() << "Link: speed=" << link.link_speed << " duplex=" << link.link_duplex << " autoneg=" << link.link_autoneg << " status=" << link.link_status;

if ( check_link_status && link.link_status == 0 ) {
throw LinkOffline(ERS_HERE, iface);
}

// Display the interface MAC address.
struct rte_ether_addr addr;
if ((retval = rte_eth_macaddr_get(iface, &addr)) == 0) {
Expand Down
5 changes: 4 additions & 1 deletion src/IfaceWrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ IfaceWrapper::setup_interface()
{
TLOG() << "Initialize interface " << m_iface_id;
bool with_reset = true, with_mq_mode = true; // go to config
bool check_link_status = false;

int retval = ealutils::iface_init(m_iface_id, m_rx_qs.size(), m_tx_qs.size(), m_rx_ring_size, m_tx_ring_size, m_mbuf_pools, with_reset, with_mq_mode);
int retval = ealutils::iface_init(m_iface_id, m_rx_qs.size(), m_tx_qs.size(), m_rx_ring_size, m_tx_ring_size, m_mbuf_pools, with_reset, with_mq_mode, check_link_status);
if (retval != 0 ) {
throw FailedToSetupInterface(ERS_HERE, m_iface_id, retval);
}
Expand Down Expand Up @@ -265,6 +266,7 @@ IfaceWrapper::start()
}


m_lcore_enable_flow.store(false);
m_lcore_quit_signal.store(false);
TLOG() << "Launching GARP thread with garp_func...";
m_garp_thread = std::thread(&IfaceWrapper::garp_func, this);
Expand All @@ -280,6 +282,7 @@ IfaceWrapper::start()
void
IfaceWrapper::stop()
{
m_lcore_enable_flow.store(false);
m_lcore_quit_signal.store(true);
// Stop GARP sender thread
if (m_garp_thread.joinable()) {
Expand Down
7 changes: 4 additions & 3 deletions src/IfaceWrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,8 @@ class IfaceWrapper
void setup_flow_steering();
void setup_xstats();

// std::map<udp::StreamUID, udp::ReceiverStats> get_and_reset_stream_stats() {
// return m_accum_ptr->get_and_reset_stream_stats();
// }
void enable_flow() { m_lcore_enable_flow.store(true);}
void disable_flow() { m_lcore_enable_flow.store(false);}

protected:
iface_conf_t m_cfg;
Expand Down Expand Up @@ -93,6 +92,8 @@ class IfaceWrapper
// Lcore stop signal
std::atomic<bool> m_lcore_quit_signal{ false };

std::atomic<bool> m_lcore_enable_flow{ false };

// Mbufs and pools
std::map<int, std::unique_ptr<rte_mempool>> m_mbuf_pools;
std::map<int, struct rte_mbuf **> m_bufs; // by queue
Expand Down
7 changes: 5 additions & 2 deletions src/detail/IfaceWrapper.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,11 @@ IfaceWrapper::rx_runner(void *arg __rte_unused) {
if (q_bufs[i_b]->pkt_len > 7000) { // RS FIXME: do proper check on data length later
// Handle them!
std::size_t data_len = q_bufs[i_b]->data_len;
char* message = udp::get_udp_payload(q_bufs[i_b]);
handle_eth_payload(src_rx_q, message, data_len);

if ( m_lcore_enable_flow.load() ) {
char* message = udp::get_udp_payload(q_bufs[i_b]);
handle_eth_payload(src_rx_q, message, data_len);
}
++m_num_frames_rxq[src_rx_q];
m_num_bytes_rxq[src_rx_q] += data_len;
}
Expand Down
Loading