Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward porting of callback readout interface #125

Merged
merged 3 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,8 @@ daq_add_application(dpdklibs_test_frame_transmitter test_frame_transmitter.cxx T
daq_add_application(dpdklibs_test_frame_receiver test_frame_receiver.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES} opmonlib::opmonlib)
daq_add_application(dpdklibs_test_arp_response test_arp_response.cxx TEST LINK_LIBRARIES dpdklibs ${DPDK_LIBRARIES})
daq_add_application(dpdklibs_test_transmit_and_receive test_transmit_and_receive.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES})
daq_add_application(dpdklibs_test_stats_reporting test_stats_reporting.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES})
daq_add_application(dpdklibs_test_dpdk_stats test_dpdk_stats.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES})
daq_add_application(dpdklibs_test_multi_process test_multi_proc.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 ${DPDK_LIBRARIES})
#daq_add_application(dpdklibs_test_unique_caps test_unique_caps.cxx TEST LINK_LIBRARIES dpdklibs CLI11::CLI11 fmt::fmt ${DPDK_LIBRARIES})

target_compile_options(dpdklibs PUBLIC ${DPDK_CFLAGS})
target_include_directories(dpdklibs PUBLIC ${DPDK_INCLUDE_DIRS})
Expand Down
3 changes: 3 additions & 0 deletions include/dpdklibs/XstatsHelper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,14 @@ namespace dunedaq::dpdklibs {
TLOG() << "Cannot get xstat values!";
//} else {
}

rte_eth_stats_get(m_iface_id, &m_eth_stats);
}
}

int m_iface_id;
bool m_allocated = false;
struct rte_eth_stats m_eth_stats;
struct rte_eth_xstat_name *m_xstats_names;
uint64_t *m_xstats_ids;
uint64_t *m_xstats_values;
Expand Down
109 changes: 0 additions & 109 deletions include/dpdklibs/udp/Utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#define DPDKLIBS_SRC_UTILS_HPP_

#include "IPV4UDPPacket.hpp"
#include "dpdklibs/receiverinfo/InfoStructs.hpp"

#include "detdataformats/DAQEthHeader.hpp"
#include "logging/Logging.hpp"
Expand Down Expand Up @@ -92,114 +91,6 @@ struct StreamUID
operator std::string() const { return fmt::format("({}, {}, {}, {})", det_id, crate_id, slot_id, stream_id); }
};

// Note that there's a difference between the string representation
// we'd want for a stream as an opmon label and the kind we'd want
// to print to screen. Thus this function...

std::string
get_opmon_string(const StreamUID& sid);

struct ReceiverStats
{

std::atomic<int64_t> total_packets = 0;
std::atomic<int64_t> min_packet_size = std::numeric_limits<int64_t>::max();
std::atomic<int64_t> max_packet_size = std::numeric_limits<int64_t>::min();
std::atomic<int64_t> max_timestamp_deviation = 0; // In absolute terms (positive *or* negative)
std::atomic<int64_t> max_seq_id_deviation = 0; // " " "

std::atomic<int64_t> packets_since_last_reset = 0;
std::atomic<int64_t> bytes_since_last_reset = 0;
std::atomic<int64_t> bad_timestamps_since_last_reset = 0;
std::atomic<int64_t> bad_sizes_since_last_reset = 0;
std::atomic<int64_t> bad_seq_ids_since_last_reset = 0;

ReceiverStats() = default;
ReceiverStats(const ReceiverStats& rhs);
ReceiverStats& operator=(const ReceiverStats& rhs);

// If you've only been collecting stats for one out of every N
// packets, you'd want to pass N to "scale" to correct for this

void scale(int64_t sf)
{
packets_since_last_reset = packets_since_last_reset.load() * sf;
bytes_since_last_reset = bytes_since_last_reset.load() * sf;
bad_timestamps_since_last_reset = bad_timestamps_since_last_reset.load() * sf;
bad_sizes_since_last_reset = bad_sizes_since_last_reset.load() * sf;
bad_seq_ids_since_last_reset = bad_seq_ids_since_last_reset.load() * sf;
}

void reset()
{
packets_since_last_reset = 0;
bytes_since_last_reset = 0;
bad_timestamps_since_last_reset = 0;
bad_sizes_since_last_reset = 0;
bad_seq_ids_since_last_reset = 0;
max_timestamp_deviation = 0;
max_seq_id_deviation = 0;
}

void merge(const std::vector<ReceiverStats>& stats_vector);

operator std::string() const;
};

// Derive quantities (e.g., bytes/s) from a ReceiverStats object and store it in a jsonnet-based struct
receiverinfo::Info
DeriveFromReceiverStats(const ReceiverStats& receiver_stats, double time_per_report);

// This class will, on a per-stream basis, fill in ReceiverStats
// instances given packets. In the constructor you can tell it
// whether you want it to pay attention to a packet's sequence ID
// and/or timestamp and/or size.

class PacketInfoAccumulator
{

public:
static constexpr int64_t s_ignorable_value = std::numeric_limits<int64_t>::max();
static constexpr int64_t s_max_seq_id = 4095;

PacketInfoAccumulator(int64_t expected_seq_id_step = s_ignorable_value, int64_t expected_timestamp_step = s_ignorable_value, int64_t expected_size = s_ignorable_value, int64_t process_nth_packet = 1);

void process_packet(const detdataformats::DAQEthHeader& daq_hdr, const int64_t data_len);
void reset();
void dump();

// get_and_reset_stream_stats() will take each per-stream statistic
// and reset the values used to calculate rates (packets per
// second, etc.)

std::map<StreamUID, ReceiverStats> get_and_reset_stream_stats();

// erase_stream_stats() is what you'd call between runs, so, e.g., your total packet count doesn't persist
void erase_stream_stats();

void set_expected_packet_size(int64_t packet_size) { m_expected_size = packet_size; }

PacketInfoAccumulator(const PacketInfoAccumulator&) = delete;
PacketInfoAccumulator& operator=(const PacketInfoAccumulator&) = delete;
PacketInfoAccumulator(PacketInfoAccumulator&&) = delete;
PacketInfoAccumulator& operator=(PacketInfoAccumulator&&) = delete;

~PacketInfoAccumulator() = default;

private:
std::map<StreamUID, ReceiverStats> m_stream_stats_atomic;

const int64_t m_expected_seq_id_step;

const int64_t m_expected_timestamp_step;
int64_t m_expected_size;
const int64_t m_process_nth_packet;

int64_t m_next_expected_seq_id[s_max_seq_id + 1];

std::map<StreamUID, int64_t> m_stream_last_timestamp;
std::map<StreamUID, int64_t> m_stream_last_seq_id;
};
} // namespace udp
} // namespace dpdklibs
} // namespace dunedaq
Expand Down
67 changes: 41 additions & 26 deletions plugins/NICReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,20 @@ NICReceiver::init(const std::shared_ptr<appfwk::ModuleConfiguration> mcfg )
ers::fatal(err);
throw err;
}

m_sources[queue->get_source_id()] = createSourceModel(queue->UID());

// Check for CB prefix indicating Callback use
const char delim = '_';
std::string target = queue->UID();
std::vector<std::string> words;
tokenize(target, delim, words);
int sourceid = -1;

bool callback_mode = false;
if (words.front() == "cb") {
callback_mode = true;
}

m_sources[queue->get_source_id()] = createSourceModel(queue->UID(), callback_mode);
//m_sources[queue->get_source_id()]->init();
}
}
Expand Down Expand Up @@ -195,23 +207,28 @@ NICReceiver::do_configure(const data_t& /*args*/)
ERS_HERE, "NICReceiver configuration failed due expected but unavailable interface!");
}
}

return;

}

void
NICReceiver::do_start(const data_t&)
{
TLOG() << get_name() << ": Entering do_start() method";
if (!m_run_marker.load()) {
set_running(true);
TLOG() << "Starting iface wrappers.";
for (auto& [iface_id, iface] : m_ifaces) {
iface->start();
}
} else {
TLOG_DEBUG(5) << "NICReader is already running!";
TLOG_DEBUG(5) << "iface wrappers are already running!";
}

return;

}

void
NICReceiver::do_start(const data_t&)
{

// Setup callbacks on all sourcemodels
for (auto& [sourceid, source] : m_sources) {
source->acquire_callback();
}

for (auto& [iface_id, iface] : m_ifaces) {
Expand All @@ -222,20 +239,6 @@ NICReceiver::do_start(const data_t&)
void
NICReceiver::do_stop(const data_t&)
{
// TLOG() << get_name() << ": Entering do_stop() method";
// if (m_run_marker.load()) {
// TLOG() << "Raising stop through variables!";
// set_running(false);
// TLOG() << "Stopping iface wrappers.";
// for (auto& [iface_id, iface] : m_ifaces) {
// iface->stop();
// }
// ealutils::wait_for_lcores();
// TLOG() << "Stoppped DPDK lcore processors and internal threads...";
// } else {
// TLOG_DEBUG(5) << "DPDK lcore processor is already stopped!";
// }
// return;
for (auto& [iface_id, iface] : m_ifaces) {
iface->disable_flow();
}
Expand All @@ -245,7 +248,19 @@ NICReceiver::do_stop(const data_t&)
void
NICReceiver::do_scrap(const data_t&)
{

TLOG() << get_name() << ": Entering do_scrap() method";
if (m_run_marker.load()) {
TLOG() << "Raising stop through variables!";
set_running(false);
TLOG() << "Stopping iface wrappers.";
for (auto& [iface_id, iface] : m_ifaces) {
iface->stop();
}
ealutils::wait_for_lcores();
TLOG() << "Stoppped DPDK lcore processors and internal threads...";
} else {
TLOG_DEBUG(5) << "DPDK lcore processor is already stopped!";
}
}

void
Expand Down
39 changes: 0 additions & 39 deletions schema/dpdklibs/confgen.jsonnet

This file was deleted.

Loading
Loading