1
1
2
2
/* Application will run until quit or killed. */
3
- # include < fmt/core.h >
3
+
4
4
#include < inttypes.h>
5
5
#include < rte_cycles.h>
6
6
#include < rte_eal.h>
21
21
#include " CLI/Formatter.hpp"
22
22
23
23
#include " detdataformats/DAQEthHeader.hpp"
24
+ #include " dpdklibs/EALSetup.hpp"
24
25
#include " dpdklibs/udp/PacketCtor.hpp"
25
26
#include " dpdklibs/udp/Utils.hpp"
26
27
#include " logging/Logging.hpp"
27
- #include " dpdklibs/EALSetup.hpp"
28
+
29
+ #include " dpdklibs/udp/PacketCtor.hpp"
30
+ #include " dpdklibs/udp/Utils.hpp"
31
+ #include " dpdklibs/arp/ARP.hpp"
32
+ #include " dpdklibs/ipv4_addr.hpp"
33
+
34
+ #include < fmt/core.h>
35
+ #include < fmt/ranges.h>
36
+
37
+ #include < regex>
28
38
29
39
#define RX_RING_SIZE 1024
30
40
#define TX_RING_SIZE 1024
@@ -117,8 +127,10 @@ namespace {
117
127
std::atomic<uint64_t > max_payload_size = 0 ;
118
128
std::atomic<uint64_t > udp_pkt_counter = 0 ;
119
129
120
- std::ofstream datafile;
121
- const std::string output_data_filename = " dpdklibs_test_frame_receiver.dat" ;
130
+ std::atomic<int64_t > garps_sent = 0 ;
131
+
132
+ // std::ofstream datafile;
133
+ // const std::string output_data_filename = "dpdklibs_test_frame_receiver.dat";
122
134
123
135
124
136
} // namespace
@@ -190,7 +202,7 @@ static inline int check_packet_size(struct rte_mbuf* mbuf, StreamUID unique_str_
190
202
return 0 ;
191
203
}
192
204
193
- static int lcore_main (struct rte_mempool * mbuf_pool, uint16_t iface, uint64_t time_per_report){
205
+ static int lcore_main (struct rte_mempool * mbuf_pool, uint16_t iface, uint64_t time_per_report, const std::vector<std::string>& garp_ip_addr_strs ){
194
206
/*
195
207
* Check that the iface is on the same NUMA node as the polling thread
196
208
* for best performance.
@@ -260,34 +272,69 @@ static int lcore_main(struct rte_mempool* mbuf_pool, uint16_t iface, uint64_t ti
260
272
}
261
273
});
262
274
263
- struct rte_mbuf **bufs = (rte_mbuf**) malloc (sizeof (struct rte_mbuf *) * burst_size);
264
- rte_pktmbuf_alloc_bulk (mbuf_pool, bufs, burst_size);
265
-
266
- datafile.open (output_data_filename, std::ios::out | std::ios::binary);
267
- if ( (datafile.rdstate () & std::ofstream::failbit ) != 0 ) {
268
- fmt::print (" WARNING: Unable to open output file \" {}\"\n " , output_data_filename);
275
+ //
276
+ // Prepare and start GARP thread
277
+ //
278
+ std::vector<rte_be32_t > ip_addr_bin_vector;
279
+ for ( const auto & ip_addr_str : garp_ip_addr_strs ) {
280
+ TLOG () << " IP address for ARP responses: " << ip_addr_str;
281
+ IpAddr ip_addr (ip_addr_str);
282
+ rte_be32_t ip_addr_bin = ip_address_dotdecimal_to_binary (
283
+ ip_addr.addr_bytes [3 ],
284
+ ip_addr.addr_bytes [2 ],
285
+ ip_addr.addr_bytes [1 ],
286
+ ip_addr.addr_bytes [0 ]
287
+ );
288
+ ip_addr_bin_vector.push_back (ip_addr_bin);
269
289
}
270
290
291
+ struct rte_mbuf **tx_bufs = (rte_mbuf**) malloc (sizeof (struct rte_mbuf *) * burst_size);
292
+ rte_pktmbuf_alloc_bulk (mbuf_pool, tx_bufs, burst_size);
293
+
294
+ auto garp = std::thread ([&]() {
295
+ while (true ) {
296
+ // TLOG() << "Packets/s: " << num_packets << " Bytes/s: " << num_bytes << " Total packets: " << total_packets << " Failed packets: " << failed_packets;
297
+ // num_packets.exchange(0);
298
+ // num_bytes.exchange(0);
299
+
300
+ for (const auto & ip_addr_bin : ip_addr_bin_vector ) {
301
+ arp::pktgen_send_garp (tx_bufs[0 ], iface, ip_addr_bin);
302
+ ++garps_sent;
303
+ }
304
+
305
+ std::this_thread::sleep_for (std::chrono::seconds (1 )); // If we sample for anything other than 1s, the rate calculation will need to change
306
+ }
307
+ });
308
+
309
+
310
+ struct rte_mbuf **rx_bufs = (rte_mbuf**) malloc (sizeof (struct rte_mbuf *) * burst_size);
311
+ rte_pktmbuf_alloc_bulk (mbuf_pool, rx_bufs, burst_size);
312
+
313
+ // datafile.open(output_data_filename, std::ios::out | std::ios::binary);
314
+ // if ( (datafile.rdstate() & std::ofstream::failbit ) != 0 ) {
315
+ // fmt::print("WARNING: Unable to open output file \"{}\"\n", output_data_filename);
316
+ // }
317
+
271
318
while (true ) {
272
319
/* Get burst of RX packets, from first iface of pair. */
273
- const uint16_t nb_rx = rte_eth_rx_burst (iface, 0 , bufs , burst_size);
320
+ const uint16_t nb_rx = rte_eth_rx_burst (iface, 0 , rx_bufs , burst_size);
274
321
275
322
num_packets += nb_rx;
276
323
total_packets += nb_rx;
277
324
278
325
for (int i_b = 0 ; i_b < nb_rx; ++i_b) {
279
- num_bytes += bufs [i_b]->pkt_len ;
326
+ num_bytes += rx_bufs [i_b]->pkt_len ;
280
327
281
328
282
329
bool dump_packet = false ;
283
- if (not RTE_ETH_IS_IPV4_HDR (bufs [i_b]->packet_type )) {
330
+ if (not RTE_ETH_IS_IPV4_HDR (rx_bufs [i_b]->packet_type )) {
284
331
non_ipv4_packets++;
285
332
dump_packet = true ;
286
333
continue ;
287
334
}
288
335
++udp_pkt_counter;
289
336
290
- char * udp_payload = udp::get_udp_payload (bufs [i_b]);
337
+ char * udp_payload = udp::get_udp_payload (rx_bufs [i_b]);
291
338
const detdataformats::DAQEthHeader* daq_hdr = reinterpret_cast <const detdataformats::DAQEthHeader*>(udp_payload);
292
339
293
340
// uint64_t unique_str_id = (daq_hdr->det_id<<22) + (daq_hdr->crate_id<<12) + (daq_hdr->slot_id<<8) + daq_hdr->stream_id;
@@ -300,12 +347,12 @@ static int lcore_main(struct rte_mempool* mbuf_pool, uint16_t iface, uint64_t ti
300
347
stream_stats[unique_str_id];
301
348
stream_stats[unique_str_id].prev_seq_id = daq_hdr->seq_id - 1 ;
302
349
}
303
- stream_stats[unique_str_id].num_bytes += bufs [i_b]->pkt_len ;
350
+ stream_stats[unique_str_id].num_bytes += rx_bufs [i_b]->pkt_len ;
304
351
305
352
if (check_against_previous_stream (daq_hdr, 2048 ) != 0 ){
306
353
dump_packet = true ;
307
354
}
308
- if (check_packet_size (bufs [i_b], unique_str_id) != 0 ){
355
+ if (check_packet_size (rx_bufs [i_b], unique_str_id) != 0 ){
309
356
dump_packet = true ;
310
357
}
311
358
@@ -316,11 +363,11 @@ static int lcore_main(struct rte_mempool* mbuf_pool, uint16_t iface, uint64_t ti
316
363
if (dump_packet && dumped_packet_count < max_packets_to_dump) {
317
364
dumped_packet_count++;
318
365
319
- rte_pktmbuf_dump (stdout, bufs [i_b], bufs [i_b]->pkt_len );
366
+ rte_pktmbuf_dump (stdout, rx_bufs [i_b], rx_bufs [i_b]->pkt_len );
320
367
}
321
368
}
322
369
323
- rte_pktmbuf_free_bulk (bufs , nb_rx);
370
+ rte_pktmbuf_free_bulk (rx_bufs , nb_rx);
324
371
}
325
372
326
373
return 0 ;
@@ -329,39 +376,66 @@ static int lcore_main(struct rte_mempool* mbuf_pool, uint16_t iface, uint64_t ti
329
376
// Define the function to be called when ctrl-c (SIGINT) is sent to process
330
377
void signal_callback_handler (int signum){
331
378
fmt::print (" Caught signal {}\n " , signum);
332
- if (datafile.is_open ()) {
333
- datafile.close ();
334
- }
335
- // Terminate program
336
379
std::exit (signum);
337
380
}
338
381
339
382
int main (int argc, char ** argv){
340
383
uint64_t time_per_report = 1 ;
341
384
uint16_t iface = 0 ;
385
+ std::vector<std::string> garp_ip_addresses;
386
+ std::vector<std::string> pcie_addresses;
342
387
343
388
CLI::App app{" test frame receiver" };
344
- app.add_option (" -s" , expected_packet_size, " Expected frame size" );
345
- app.add_option (" -i" , iface, " Interface to init" );
346
- app.add_option (" -t" , time_per_report, " Time Per Report" );
389
+ app.add_option (" -g,--garp-ip-address" , garp_ip_addresses, " IP Addresses" );
390
+ app.add_option (" -m,--pcie-mask" , pcie_addresses, " PCIE Addresses device mask" );
391
+ app.add_option (" -s,--exp-frame-size" , expected_packet_size, " Expected frame size" );
392
+ app.add_option (" -i,--iface" , iface, " Interface to init" );
393
+ app.add_option (" -t,--report-interval-time" , time_per_report, " Time Per Report" );
347
394
app.add_flag (" --check-time" , check_timestamp, " Report back differences in timestamp" );
348
- app.add_flag (" -p" , per_stream_reports, " Detailed per stream reports" );
395
+ app.add_flag (" -p,--per-stream-reports" , per_stream_reports, " Detailed per stream reports" );
396
+
349
397
CLI11_PARSE (app, argc, argv);
350
398
399
+ // Validate arguments
400
+ fmt::print (" ips : {}\n " , fmt::join (garp_ip_addresses," | " ));
401
+ fmt::print (" pcies : {}\n " , fmt::join (pcie_addresses," | " ));
402
+
403
+ std::regex re_ipv4 (" [0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}" );
404
+ std::regex re_pcie (" ^0{0,4}:[a-fA-F0-9]{2}:[a-fA-F0-9]{2}.[0-9]$" );
405
+
406
+ fmt::print (" IP addresses\n " );
407
+ bool all_ip_ok = true ;
408
+ for ( const auto & ip: garp_ip_addresses) {
409
+ bool ip_ok = std::regex_match (ip, re_ipv4);
410
+ fmt::print (" - {} {}\n " , ip, ip_ok);
411
+ all_ip_ok &= ip_ok;
412
+ }
413
+
414
+ fmt::print (" PCIE addresses\n " );
415
+ bool all_pcie_ok = true ;
416
+ for ( const auto & pcie: pcie_addresses) {
417
+ bool pcie_ok = std::regex_match (pcie, re_pcie);
418
+ fmt::print (" - {} {}\n " , pcie, pcie_ok);
419
+ all_pcie_ok &= pcie_ok;
420
+ }
421
+
422
+ if (!all_ip_ok or !all_pcie_ok) {
423
+ return -1 ;
424
+ }
425
+
426
+
351
427
// define function to be called when ctrl+c is called.
352
428
std::signal (SIGINT, signal_callback_handler);
353
429
354
430
std::vector<std::string> eal_args;
355
- eal_args.push_back (" dpdklibds_test_frame_receiver" );
356
-
431
+ eal_args.push_back (" dpdklibds_test_garp" );
432
+ for ( const auto & pcie: pcie_addresses) {
433
+ eal_args.push_back (" -a" );
434
+ eal_args.push_back (pcie);
435
+ }
357
436
358
- // initialise eal with constructed argc and argv
359
- std::vector<char *> vec_argv = construct_argv (eal_args);
360
- char ** constructed_argv = vec_argv.data ();
361
- int constructed_argc = eal_args.size ();
362
437
363
- int ret = rte_eal_init (constructed_argc, constructed_argv);
364
- if (ret < 0 ) {rte_exit (EXIT_FAILURE, " Error with EAL initialization\n " );}
438
+ dunedaq::dpdklibs::ealutils::init_eal (eal_args);
365
439
366
440
auto n_ifaces = rte_eth_dev_count_avail ();
367
441
fmt::print (" # of available ifaces: {}\n " , n_ifaces);
@@ -374,9 +448,10 @@ int main(int argc, char** argv){
374
448
// Allocate pools and mbufs per queue
375
449
std::map<int , std::unique_ptr<rte_mempool>> mbuf_pools;
376
450
std::map<int , struct rte_mbuf **> bufs;
377
- uint16_t n_rx_qs = 1 ;
378
- const uint16_t rx_ring_size = 1024 ;
379
- const uint16_t tx_ring_size = 1024 ;
451
+ const uint16_t n_rx_qs = 1 ;
452
+ const uint16_t n_tx_qs = 1 ;
453
+ const uint16_t rx_ring_size = 2048 ;
454
+ const uint16_t tx_ring_size = 2048 ;
380
455
381
456
std::cout << " Allocating pools and mbufs.\n " ;
382
457
for (size_t i=0 ; i<n_rx_qs; ++i) {
@@ -390,9 +465,9 @@ int main(int argc, char** argv){
390
465
391
466
// Setting up only one iface
392
467
fmt::print (" Initialize only iface {}!\n " , iface);
393
- ealutils::iface_init (iface, n_rx_qs, 0 , rx_ring_size, tx_ring_size, mbuf_pools); // just init iface, no TX queues
468
+ ealutils::iface_init (iface, n_rx_qs, n_tx_qs , rx_ring_size, tx_ring_size, mbuf_pools); // just init iface, no TX queues
394
469
395
- lcore_main (mbuf_pools[0 ].get (), iface, time_per_report);
470
+ lcore_main (mbuf_pools[0 ].get (), iface, time_per_report, garp_ip_addresses );
396
471
397
472
rte_eal_cleanup ();
398
473
0 commit comments