Skip to content

Commit 29b7f4a

Browse files
author
John Freeman
committed
Merge remote-tracking branch 'origin/prep-release/fddaq-v4.3.0' into production/v4
2 parents 898cc59 + 6a46333 commit 29b7f4a

File tree

9 files changed

+74
-134
lines changed

9 files changed

+74
-134
lines changed

CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
cmake_minimum_required(VERSION 3.12)
2-
project(dpdklibs VERSION 1.2.0)
2+
project(dpdklibs VERSION 1.2.3)
33

44
find_package(daq-cmake REQUIRED)
55

@@ -79,6 +79,7 @@ daq_add_application(dpdklibs_test_transmit_and_receive test_transmit_and_receive
7979
daq_add_application(dpdklibs_test_stats_reporting test_stats_reporting.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES})
8080
daq_add_application(dpdklibs_test_dpdk_stats test_dpdk_stats.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES})
8181
daq_add_application(dpdklibs_test_multi_process test_multi_proc.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES})
82+
#daq_add_application(dpdklibs_test_unique_caps test_unique_caps.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 fmt::fmt ${DPDK_LIBRARIES})
8283

8384
target_compile_options(dpdklibs PUBLIC ${DPDK_CFLAGS})
8485
target_include_directories(dpdklibs PUBLIC ${DPDK_INCLUDE_DIRS})

include/dpdklibs/EALSetup.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ int iface_promiscuous_mode(std::uint16_t iface, bool mode = false);
3636
int iface_init(uint16_t iface, uint16_t rx_rings, uint16_t tx_rings,
3737
uint16_t rx_ring_size, uint16_t tx_ring_size,
3838
std::map<int, std::unique_ptr<rte_mempool>>& mbuf_pool,
39-
bool with_reset=false, bool with_mq_rss=false);
39+
bool with_reset=false, bool with_mq_rss=false, bool check_link_status=false);
4040

4141
std::unique_ptr<rte_mempool> get_mempool(const std::string& pool_name,
4242
int num_mbufs=NUM_MBUFS, int mbuf_cache_size=MBUF_CACHE_SIZE,

plugins/NICReceiver.cpp

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -178,17 +178,10 @@ NICReceiver::do_configure(const data_t& args)
178178
m_ifaces[iface_id]->setup_xstats();
179179
} else {
180180
TLOG() << "No available interface with MAC=" << iface_mac_addr << " PCI=" << iface_pci_addr;
181-
ers::fatal(dunedaq::readoutlibs::InitializationError(
182-
ERS_HERE, "NICReceiver configuration failed due expected but unavailable interface!"));
181+
throw dunedaq::readoutlibs::InitializationError(ERS_HERE, "NICReceiver configuration failed due expected but unavailable interface!");
183182
}
184183
}
185184

186-
return;
187-
}
188-
189-
void
190-
NICReceiver::do_start(const data_t&)
191-
{
192185
TLOG() << get_name() << ": Entering do_start() method";
193186
if (!m_run_marker.load()) {
194187
set_running(true);
@@ -199,11 +192,54 @@ NICReceiver::do_start(const data_t&)
199192
} else {
200193
TLOG_DEBUG(5) << "NICReader is already running!";
201194
}
195+
196+
return;
197+
}
198+
199+
void
200+
NICReceiver::do_start(const data_t&)
201+
{
202+
// TLOG() << get_name() << ": Entering do_start() method";
203+
// if (!m_run_marker.load()) {
204+
// set_running(true);
205+
// TLOG() << "Starting iface wrappers.";
206+
// for (auto& [iface_id, iface] : m_ifaces) {
207+
// iface->start();
208+
// }
209+
// } else {
210+
// TLOG_DEBUG(5) << "NICReader is already running!";
211+
// }
212+
for (auto& [iface_id, iface] : m_ifaces) {
213+
iface->enable_flow();
214+
}
202215
}
203216

204217
void
205218
NICReceiver::do_stop(const data_t&)
206219
{
220+
// TLOG() << get_name() << ": Entering do_stop() method";
221+
// if (m_run_marker.load()) {
222+
// TLOG() << "Raising stop through variables!";
223+
// set_running(false);
224+
// TLOG() << "Stopping iface wrappers.";
225+
// for (auto& [iface_id, iface] : m_ifaces) {
226+
// iface->stop();
227+
// }
228+
// ealutils::wait_for_lcores();
229+
// TLOG() << "Stoppped DPDK lcore processors and internal threads...";
230+
// } else {
231+
// TLOG_DEBUG(5) << "DPDK lcore processor is already stopped!";
232+
// }
233+
// return;
234+
for (auto& [iface_id, iface] : m_ifaces) {
235+
iface->disable_flow();
236+
}
237+
}
238+
239+
void
240+
NICReceiver::do_scrap(const data_t&)
241+
{
242+
207243
TLOG() << get_name() << ": Entering do_stop() method";
208244
if (m_run_marker.load()) {
209245
TLOG() << "Raising stop through variables!";
@@ -217,12 +253,7 @@ NICReceiver::do_stop(const data_t&)
217253
} else {
218254
TLOG_DEBUG(5) << "DPDK lcore processor is already stopped!";
219255
}
220-
return;
221-
}
222256

223-
void
224-
NICReceiver::do_scrap(const data_t&)
225-
{
226257
TLOG() << get_name() << ": Entering do_scrap() method";
227258
for (auto& [iface_id, iface] : m_ifaces) {
228259
iface->scrap();

plugins/NICReceiver.hpp

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,6 @@ class NICReceiver : public dunedaq::appfwk::DAQModule
100100
std::map<int, std::unique_ptr<rte_mempool>> m_mbuf_pools;
101101
std::map<int, struct rte_mbuf **> m_bufs;
102102

103-
// Lcore processor
104-
//template<class T>
105-
// int rx_runner(void *arg __rte_unused);
106-
107103
// Interfaces (logical ID, MAC) -> IfaceWrapper
108104
std::map<std::string, uint16_t> m_mac_to_id_map;
109105
std::map<std::string, uint16_t> m_pci_to_id_map;
@@ -120,6 +116,5 @@ class NICReceiver : public dunedaq::appfwk::DAQModule
120116

121117
} // namespace dunedaq::dpdklibs
122118

123-
// #include "detail/NICReceiver.hxx"
124119

125120
#endif // DPDKLIBS_PLUGINS_NICRECEIVER_HPP_

plugins/detail/NICReceiver.hxx

Lines changed: 0 additions & 96 deletions
This file was deleted.

src/EALSetup.cpp

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ int
9595
iface_init(uint16_t iface, uint16_t rx_rings, uint16_t tx_rings,
9696
uint16_t rx_ring_size, uint16_t tx_ring_size,
9797
std::map<int, std::unique_ptr<rte_mempool>>& mbuf_pool,
98-
bool with_reset, bool with_mq_rss)
98+
bool with_reset, bool with_mq_rss, bool check_link_status)
9999
{
100100
struct rte_eth_conf iface_conf = iface_conf_default;
101101
uint16_t nb_rxd = rx_ring_size;
@@ -131,14 +131,6 @@ iface_init(uint16_t iface, uint16_t rx_rings, uint16_t tx_rings,
131131
}
132132
}
133133

134-
if ((retval = rte_eth_link_get_nowait(iface, &link)) < 0) {
135-
throw FailedToRetrieveLinkStatus(ERS_HERE, iface, retval);
136-
}
137-
138-
if (link.link_status == 0 ) {
139-
throw LinkOffline(ERS_HERE, iface);
140-
}
141-
142134
// Should we configure MQ RSS and offload?
143135
if (with_mq_rss) {
144136
iface_conf_rss_mode(iface_conf, true, true); // with_rss, with_offload
@@ -198,10 +190,20 @@ iface_init(uint16_t iface, uint16_t rx_rings, uint16_t tx_rings,
198190
}
199191

200192
// Start the Ethernet interface.
201-
retval = rte_eth_dev_start(iface);
202-
if (retval < 0)
203-
return retval;
193+
if ((retval = rte_eth_dev_start(iface)) < 0) {
194+
throw FailedToConfigureInterface(ERS_HERE, iface, "MAC address retrival", retval);
195+
}
204196

197+
if ((retval = rte_eth_link_get(iface, &link)) != 0) {
198+
throw FailedToRetrieveLinkStatus(ERS_HERE, iface, retval);
199+
}
200+
201+
TLOG() << "Link: speed=" << link.link_speed << " duplex=" << link.link_duplex << " autoneg=" << link.link_autoneg << " status=" << link.link_status;
202+
203+
if ( check_link_status && link.link_status == 0 ) {
204+
throw LinkOffline(ERS_HERE, iface);
205+
}
206+
205207
// Display the interface MAC address.
206208
struct rte_ether_addr addr;
207209
if ((retval = rte_eth_macaddr_get(iface, &addr)) == 0) {

src/IfaceWrapper.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,9 @@ IfaceWrapper::setup_interface()
9393
{
9494
TLOG() << "Initialize interface " << m_iface_id;
9595
bool with_reset = true, with_mq_mode = true; // go to config
96+
bool check_link_status = false;
9697

97-
int retval = ealutils::iface_init(m_iface_id, m_rx_qs.size(), m_tx_qs.size(), m_rx_ring_size, m_tx_ring_size, m_mbuf_pools, with_reset, with_mq_mode);
98+
int retval = ealutils::iface_init(m_iface_id, m_rx_qs.size(), m_tx_qs.size(), m_rx_ring_size, m_tx_ring_size, m_mbuf_pools, with_reset, with_mq_mode, check_link_status);
9899
if (retval != 0 ) {
99100
throw FailedToSetupInterface(ERS_HERE, m_iface_id, retval);
100101
}
@@ -265,6 +266,7 @@ IfaceWrapper::start()
265266
}
266267

267268

269+
m_lcore_enable_flow.store(false);
268270
m_lcore_quit_signal.store(false);
269271
TLOG() << "Launching GARP thread with garp_func...";
270272
m_garp_thread = std::thread(&IfaceWrapper::garp_func, this);
@@ -280,6 +282,7 @@ IfaceWrapper::start()
280282
void
281283
IfaceWrapper::stop()
282284
{
285+
m_lcore_enable_flow.store(false);
283286
m_lcore_quit_signal.store(true);
284287
// Stop GARP sender thread
285288
if (m_garp_thread.joinable()) {

src/IfaceWrapper.hpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,8 @@ class IfaceWrapper
5757
void setup_flow_steering();
5858
void setup_xstats();
5959

60-
// std::map<udp::StreamUID, udp::ReceiverStats> get_and_reset_stream_stats() {
61-
// return m_accum_ptr->get_and_reset_stream_stats();
62-
// }
60+
void enable_flow() { m_lcore_enable_flow.store(true);}
61+
void disable_flow() { m_lcore_enable_flow.store(false);}
6362

6463
protected:
6564
iface_conf_t m_cfg;
@@ -93,6 +92,8 @@ class IfaceWrapper
9392
// Lcore stop signal
9493
std::atomic<bool> m_lcore_quit_signal{ false };
9594

95+
std::atomic<bool> m_lcore_enable_flow{ false };
96+
9697
// Mbufs and pools
9798
std::map<int, std::unique_ptr<rte_mempool>> m_mbuf_pools;
9899
std::map<int, struct rte_mbuf **> m_bufs; // by queue

src/detail/IfaceWrapper.hxx

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,11 @@ IfaceWrapper::rx_runner(void *arg __rte_unused) {
8181
if (q_bufs[i_b]->pkt_len > 7000) { // RS FIXME: do proper check on data length later
8282
// Handle them!
8383
std::size_t data_len = q_bufs[i_b]->data_len;
84-
char* message = udp::get_udp_payload(q_bufs[i_b]);
85-
handle_eth_payload(src_rx_q, message, data_len);
84+
85+
if ( m_lcore_enable_flow.load() ) {
86+
char* message = udp::get_udp_payload(q_bufs[i_b]);
87+
handle_eth_payload(src_rx_q, message, data_len);
88+
}
8689
++m_num_frames_rxq[src_rx_q];
8790
m_num_bytes_rxq[src_rx_q] += data_len;
8891
}

0 commit comments

Comments
 (0)