Skip to content

Commit

Permalink
Merge pull request #125 from DUNE-DAQ/roland-sipos/prodfwdport-option…
Browse files Browse the repository at this point in the history
…alcbs

Forward porting of callback readout interface
  • Loading branch information
roland-sipos authored Apr 8, 2024
2 parents 00eb7de + 1bd5713 commit efc551f
Show file tree
Hide file tree
Showing 15 changed files with 202 additions and 788 deletions.
2 changes: 0 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,8 @@ daq_add_application(dpdklibs_test_frame_transmitter test_frame_transmitter.cxx T
daq_add_application(dpdklibs_test_frame_receiver test_frame_receiver.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES} opmonlib::opmonlib)
daq_add_application(dpdklibs_test_arp_response test_arp_response.cxx TEST LINK_LIBRARIES dpdklibs ${DPDK_LIBRARIES})
daq_add_application(dpdklibs_test_transmit_and_receive test_transmit_and_receive.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES})
daq_add_application(dpdklibs_test_stats_reporting test_stats_reporting.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES})
daq_add_application(dpdklibs_test_dpdk_stats test_dpdk_stats.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES})
daq_add_application(dpdklibs_test_multi_process test_multi_proc.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES})
#daq_add_application(dpdklibs_test_unique_caps test_unique_caps.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 fmt::fmt ${DPDK_LIBRARIES})

target_compile_options(dpdklibs PUBLIC ${DPDK_CFLAGS})
target_include_directories(dpdklibs PUBLIC ${DPDK_INCLUDE_DIRS})
Expand Down
3 changes: 3 additions & 0 deletions include/dpdklibs/XstatsHelper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,14 @@ namespace dunedaq::dpdklibs {
TLOG() << "Cannot get xstat values!";
//} else {
}

rte_eth_stats_get(m_iface_id, &m_eth_stats);
}
}

int m_iface_id;
bool m_allocated = false;
struct rte_eth_stats m_eth_stats;
struct rte_eth_xstat_name *m_xstats_names;
uint64_t *m_xstats_ids;
uint64_t *m_xstats_values;
Expand Down
109 changes: 0 additions & 109 deletions include/dpdklibs/udp/Utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#define DPDKLIBS_SRC_UTILS_HPP_

#include "IPV4UDPPacket.hpp"
#include "dpdklibs/receiverinfo/InfoStructs.hpp"

#include "detdataformats/DAQEthHeader.hpp"
#include "logging/Logging.hpp"
Expand Down Expand Up @@ -92,114 +91,6 @@ struct StreamUID
operator std::string() const { return fmt::format("({}, {}, {}, {})", det_id, crate_id, slot_id, stream_id); }
};

// Note that there's a difference between the string representation
// we'd want for a stream as an opmon label and the kind we'd want
// to print to screen. Thus this function...

std::string
get_opmon_string(const StreamUID& sid);

struct ReceiverStats
{

std::atomic<int64_t> total_packets = 0;
std::atomic<int64_t> min_packet_size = std::numeric_limits<int64_t>::max();
std::atomic<int64_t> max_packet_size = std::numeric_limits<int64_t>::min();
std::atomic<int64_t> max_timestamp_deviation = 0; // In absolute terms (positive *or* negative)
std::atomic<int64_t> max_seq_id_deviation = 0; // " " "

std::atomic<int64_t> packets_since_last_reset = 0;
std::atomic<int64_t> bytes_since_last_reset = 0;
std::atomic<int64_t> bad_timestamps_since_last_reset = 0;
std::atomic<int64_t> bad_sizes_since_last_reset = 0;
std::atomic<int64_t> bad_seq_ids_since_last_reset = 0;

ReceiverStats() = default;
ReceiverStats(const ReceiverStats& rhs);
ReceiverStats& operator=(const ReceiverStats& rhs);

// If you've only been collecting stats for one out of every N
// packets, you'd want to pass N to "scale" to correct for this

void scale(int64_t sf)
{
packets_since_last_reset = packets_since_last_reset.load() * sf;
bytes_since_last_reset = bytes_since_last_reset.load() * sf;
bad_timestamps_since_last_reset = bad_timestamps_since_last_reset.load() * sf;
bad_sizes_since_last_reset = bad_sizes_since_last_reset.load() * sf;
bad_seq_ids_since_last_reset = bad_seq_ids_since_last_reset.load() * sf;
}

void reset()
{
packets_since_last_reset = 0;
bytes_since_last_reset = 0;
bad_timestamps_since_last_reset = 0;
bad_sizes_since_last_reset = 0;
bad_seq_ids_since_last_reset = 0;
max_timestamp_deviation = 0;
max_seq_id_deviation = 0;
}

void merge(const std::vector<ReceiverStats>& stats_vector);

operator std::string() const;
};

// Derive quantities (e.g., bytes/s) from a ReceiverStats object and store it in a jsonnet-based struct
receiverinfo::Info
DeriveFromReceiverStats(const ReceiverStats& receiver_stats, double time_per_report);

// This class will, on a per-stream basis, fill in ReceiverStats
// instances given packets. In the constructor you can tell it
// whether you want it to pay attention to a packet's sequence ID
// and/or timestamp and/or size.

class PacketInfoAccumulator
{

public:
static constexpr int64_t s_ignorable_value = std::numeric_limits<int64_t>::max();
static constexpr int64_t s_max_seq_id = 4095;

PacketInfoAccumulator(int64_t expected_seq_id_step = s_ignorable_value, int64_t expected_timestamp_step = s_ignorable_value, int64_t expected_size = s_ignorable_value, int64_t process_nth_packet = 1);

void process_packet(const detdataformats::DAQEthHeader& daq_hdr, const int64_t data_len);
void reset();
void dump();

// get_and_reset_stream_stats() will take each per-stream statistic
// and reset the values used to calculate rates (packets per
// second, etc.)

std::map<StreamUID, ReceiverStats> get_and_reset_stream_stats();

// erase_stream_stats() is what you'd call between runs, so, e.g., your total packet count doesn't persist
void erase_stream_stats();

void set_expected_packet_size(int64_t packet_size) { m_expected_size = packet_size; }

PacketInfoAccumulator(const PacketInfoAccumulator&) = delete;
PacketInfoAccumulator& operator=(const PacketInfoAccumulator&) = delete;
PacketInfoAccumulator(PacketInfoAccumulator&&) = delete;
PacketInfoAccumulator& operator=(PacketInfoAccumulator&&) = delete;

~PacketInfoAccumulator() = default;

private:
std::map<StreamUID, ReceiverStats> m_stream_stats_atomic;

const int64_t m_expected_seq_id_step;

const int64_t m_expected_timestamp_step;
int64_t m_expected_size;
const int64_t m_process_nth_packet;

int64_t m_next_expected_seq_id[s_max_seq_id + 1];

std::map<StreamUID, int64_t> m_stream_last_timestamp;
std::map<StreamUID, int64_t> m_stream_last_seq_id;
};
} // namespace udp
} // namespace dpdklibs
} // namespace dunedaq
Expand Down
67 changes: 41 additions & 26 deletions plugins/NICReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,20 @@ NICReceiver::init(const std::shared_ptr<appfwk::ModuleConfiguration> mcfg )
ers::fatal(err);
throw err;
}

m_sources[queue->get_source_id()] = createSourceModel(queue->UID());

// Check for CB prefix indicating Callback use
const char delim = '_';
std::string target = queue->UID();
std::vector<std::string> words;
tokenize(target, delim, words);
int sourceid = -1;

bool callback_mode = false;
if (words.front() == "cb") {
callback_mode = true;
}

m_sources[queue->get_source_id()] = createSourceModel(queue->UID(), callback_mode);
//m_sources[queue->get_source_id()]->init();
}
}
Expand Down Expand Up @@ -195,23 +207,28 @@ NICReceiver::do_configure(const data_t& /*args*/)
ERS_HERE, "NICReceiver configuration failed due expected but unavailable interface!");
}
}

return;

}

void
NICReceiver::do_start(const data_t&)
{
TLOG() << get_name() << ": Entering do_start() method";
if (!m_run_marker.load()) {
set_running(true);
TLOG() << "Starting iface wrappers.";
for (auto& [iface_id, iface] : m_ifaces) {
iface->start();
}
} else {
TLOG_DEBUG(5) << "NICReader is already running!";
TLOG_DEBUG(5) << "iface wrappers are already running!";
}

return;

}

void
NICReceiver::do_start(const data_t&)
{

// Setup callbacks on all sourcemodels
for (auto& [sourceid, source] : m_sources) {
source->acquire_callback();
}

for (auto& [iface_id, iface] : m_ifaces) {
Expand All @@ -222,20 +239,6 @@ NICReceiver::do_start(const data_t&)
void
NICReceiver::do_stop(const data_t&)
{
// TLOG() << get_name() << ": Entering do_stop() method";
// if (m_run_marker.load()) {
// TLOG() << "Raising stop through variables!";
// set_running(false);
// TLOG() << "Stopping iface wrappers.";
// for (auto& [iface_id, iface] : m_ifaces) {
// iface->stop();
// }
// ealutils::wait_for_lcores();
// TLOG() << "Stoppped DPDK lcore processors and internal threads...";
// } else {
// TLOG_DEBUG(5) << "DPDK lcore processor is already stopped!";
// }
// return;
for (auto& [iface_id, iface] : m_ifaces) {
iface->disable_flow();
}
Expand All @@ -245,7 +248,19 @@ NICReceiver::do_stop(const data_t&)
void
NICReceiver::do_scrap(const data_t&)
{

TLOG() << get_name() << ": Entering do_scrap() method";
if (m_run_marker.load()) {
TLOG() << "Raising stop through variables!";
set_running(false);
TLOG() << "Stopping iface wrappers.";
for (auto& [iface_id, iface] : m_ifaces) {
iface->stop();
}
ealutils::wait_for_lcores();
TLOG() << "Stoppped DPDK lcore processors and internal threads...";
} else {
TLOG_DEBUG(5) << "DPDK lcore processor is already stopped!";
}
}

void
Expand Down
39 changes: 0 additions & 39 deletions schema/dpdklibs/confgen.jsonnet

This file was deleted.

Loading

0 comments on commit efc551f

Please sign in to comment.