Skip to content

Commit

Permalink
Merge pull request #121 from DUNE-DAQ/roland-sipos/data-move-cb-registry
Browse files Browse the repository at this point in the history
Roland sipos/data move cb registry
  • Loading branch information
roland-sipos authored Mar 28, 2024
2 parents 29b7f4a + 6cefade commit 3b28c55
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 19 deletions.
15 changes: 13 additions & 2 deletions plugins/NICReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,12 @@ NICReceiver::init(const data_t& args)
ers::fatal(dunedaq::readoutlibs::InitializationError(
ERS_HERE, "Output link ID could not be parsed on queue instance name! "));
}
TLOG() << "Creating source for target queue: " << target << " DLH number: " << sourceid;
m_sources[sourceid] = createSourceModel(qi.uid);
bool callback_mode = false;
if (words.front() == "cb") {
callback_mode = true;
}
TLOG() << "Creating source for target queue: " << target << " DLH number: " << sourceid << " CallbacksMode?=" << callback_mode;
m_sources[sourceid] = createSourceModel(qi.uid, callback_mode);
if (m_sources[sourceid] == nullptr) {
ers::fatal(dunedaq::readoutlibs::InitializationError(
ERS_HERE, "CreateSource failed to provide an appropriate model for queue!"));
Expand Down Expand Up @@ -209,6 +213,13 @@ NICReceiver::do_start(const data_t&)
// } else {
// TLOG_DEBUG(5) << "NICReader is already running!";
// }
//

// Setup callbacks on all sourcemodels
for (auto& [sourceid, source] : m_sources) {
source->acquire_callback();
}

for (auto& [iface_id, iface] : m_ifaces) {
iface->enable_flow();
}
Expand Down
9 changes: 6 additions & 3 deletions src/CreateSource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ DUNE_DAQ_TYPESTRING(dunedaq::fdreadoutlibs::types::TDEFrameTypeAdapter, "TDEFram
namespace dpdklibs {

std::unique_ptr<SourceConcept>
createSourceModel(const std::string& conn_uid)
createSourceModel(const std::string& conn_uid, bool callback_mode)
{
auto datatypes = dunedaq::iomanager::IOManager::get()->get_datatypes(conn_uid);
if (datatypes.size() != 1) {
Expand All @@ -41,8 +41,11 @@ createSourceModel(const std::string& conn_uid)
// Create Model
auto source_model = std::make_unique<SourceModel<fdreadoutlibs::types::DUNEWIBEthTypeAdapter>>();

// For callback acquisition later (lazy)
source_model->set_sink_name(conn_uid);

// Setup sink (acquire pointer from QueueRegistry)
source_model->set_sink(conn_uid);
source_model->set_sink(conn_uid, callback_mode);

// Get parser and sink
//auto& parser = source_model->get_parser();
Expand All @@ -62,7 +65,7 @@ createSourceModel(const std::string& conn_uid)
} else if (raw_dt.find("TDEFrame") != std::string::npos) {
// WIB2 specific char arrays
auto source_model = std::make_unique<SourceModel<fdreadoutlibs::types::TDEFrameTypeAdapter>>();
source_model->set_sink(conn_uid);
source_model->set_sink(conn_uid, callback_mode);
//auto& parser = source_model->get_parser();
//parser.process_chunk_func = parsers::fixsizedChunkInto<fdreadoutlibs::types::DUNEWIBSuperChunkTypeAdapter>(sink);
return source_model;
Expand Down
9 changes: 8 additions & 1 deletion src/SourceConcept.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class SourceConcept
SourceConcept& operator=(SourceConcept&&) = delete; ///< SourceConcept is not move-assignable

virtual void init(const nlohmann::json& args) = 0;
virtual void set_sink(const std::string& sink_name) = 0;
virtual void set_sink(const std::string& sink_name, bool callback_mode) = 0;
virtual void acquire_callback() = 0;
virtual void conf(const nlohmann::json& args) = 0;
virtual void start(const nlohmann::json& args) = 0;
virtual void stop(const nlohmann::json& args) = 0;
Expand All @@ -57,6 +58,11 @@ class SourceConcept

//DefaultParserImpl& get_parser() { return std::ref(m_parser_impl); }

void set_sink_name(const std::string& sink_name)
{
m_sink_name = sink_name;
}

void set_ids(int card, int pid, int sip, int lid)
{
m_card_id = card;
Expand Down Expand Up @@ -92,6 +98,7 @@ class SourceConcept
std::string m_source_str;
std::string m_opmon_str;
std::string m_source_tid;
std::string m_sink_name;
std::chrono::time_point<std::chrono::high_resolution_clock> m_t0;

private:
Expand Down
57 changes: 47 additions & 10 deletions src/SourceModel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "logging/Logging.hpp"
// #include "readoutlibs/utils/ReusableThread.hpp"

#include "readoutlibs/DataMoveCallbackRegistry.hpp"

// #include <folly/ProducerConsumerQueue.h>
// #include <nlohmann/json.hpp>

Expand Down Expand Up @@ -50,13 +52,34 @@ class SourceModel : public SourceConcept
{}
~SourceModel() {}

void set_sink(const std::string& sink_name) override
void set_sink(const std::string& sink_name, bool callback_mode) override
{
m_callback_mode = callback_mode;
if (callback_mode) {
TLOG_DEBUG(5) << "Callback mode requested. Won't acquire iom sender!";
} else {
if (m_sink_is_set) {
TLOG_DEBUG(5) << "SourceModel sink is already set in initialized!";
} else {
m_sink_queue = get_iom_sender<TargetPayloadType>(sink_name);
m_sink_is_set = true;
}
}
}

void acquire_callback() override
{
if (m_sink_is_set) {
TLOG_DEBUG(5) << "SourceModel sink is already set in initialized!";
if (m_callback_mode) {
if (m_callback_is_acquired) {
TLOG_DEBUG(5) << "SourceModel callback is already acquired!";
} else {
// Getting DataMoveCBRegistry
auto dmcbr = readoutlibs::DataMoveCallbackRegistry::get();
m_sink_callback = dmcbr->get_callback<TargetPayloadType>(inherited::m_sink_name);
m_callback_is_acquired = true;
}
} else {
m_sink_queue = get_iom_sender<TargetPayloadType>(sink_name);
m_sink_is_set = true;
TLOG_DEBUG(5) << "Won't acquire callback, as IOM sink is set!";
}
}

Expand Down Expand Up @@ -114,12 +137,18 @@ class SourceModel : public SourceConcept
if (push_out) {

TargetPayloadType& target_payload = *reinterpret_cast<TargetPayloadType*>(message);
if (!m_sink_queue->try_send(std::move(target_payload), iomanager::Sender::s_no_block)) {
//if(m_dropped_packets == 0 || m_dropped_packets%10000) {
// TLOG() << "Dropped data " << m_dropped_packets;
//}
++m_dropped_packets;

if (m_callback_mode) {
(*m_sink_callback)(std::move(target_payload));
} else {
if (!m_sink_queue->try_send(std::move(target_payload), iomanager::Sender::s_no_block)) {
//if(m_dropped_packets == 0 || m_dropped_packets%10000) {
// TLOG() << "Dropped data " << m_dropped_packets;
//}
++m_dropped_packets;
}
}

} else {
TargetPayloadType target_payload;
uint32_t bytes_copied = 0;
Expand All @@ -141,11 +170,19 @@ class SourceModel : public SourceConcept
std::atomic<bool> m_run_marker;
bool m_configured{ false };

std::string m_sink_id;
bool m_callback_mode;

// Sink
bool m_sink_is_set{ false };
std::shared_ptr<sink_t> m_sink_queue;
//std::shared_ptr<err_sink_t> m_error_sink_queue;

// Callback
bool m_callback_is_acquired{ false };
using sink_cb_t = std::shared_ptr<std::function<void(TargetPayloadType&&)>>;
sink_cb_t m_sink_callback;

std::atomic<uint64_t> m_dropped_packets{0};

};
Expand Down
12 changes: 9 additions & 3 deletions src/detail/IfaceWrapper.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,16 @@ IfaceWrapper::rx_runner(void *arg __rte_unused) {
const uint16_t nb_rx = nb_rx_map[src_rx_q];

// We got packets from burst on this queue
if (nb_rx != 0) {
if (nb_rx != 0) [[likely]] {

m_max_burst_size[src_rx_q] = std::max(nb_rx, m_max_burst_size[src_rx_q].load());
// -------
// Iterate on burst packets
for (int i_b=0; i_b<nb_rx; ++i_b) {


// RS FIXME: removed for performance improvement hope
/*
// Check if packet is segmented. Implement support for it if needed.
if (q_bufs[i_b]->nb_segs > 1) {
//TLOG_DEBUG(10) << "It appears a packet is spread across more than one receiving buffer;"
Expand All @@ -61,6 +64,7 @@ IfaceWrapper::rx_runner(void *arg __rte_unused) {
// Check packet type, ommit/drop unexpected ones.
auto pkt_type = q_bufs[i_b]->packet_type;
//// Handle non IPV4 packets
if (not RTE_ETH_IS_IPV4_HDR(pkt_type)) {
//TLOG_DEBUG(10) << "Non-Ethernet packet type: " << (unsigned)pkt_type << " original: " << pkt_type;
Expand All @@ -74,15 +78,17 @@ IfaceWrapper::rx_runner(void *arg __rte_unused) {
}
continue;
}
*/
// RS FIXME

// Check for UDP frames
//if (pkt_type == RTE_PTYPE_L4_UDP) { // RS FIXME: doesn't work. Why? What is the PKT_TYPE in our ETH frames?
// Check for JUMBO frames
if (q_bufs[i_b]->pkt_len > 7000) { // RS FIXME: do proper check on data length later
if (q_bufs[i_b]->pkt_len > 7000) [[likely]] { // RS FIXME: do proper check on data length later
// Handle them!
std::size_t data_len = q_bufs[i_b]->data_len;

if ( m_lcore_enable_flow.load() ) {
if ( m_lcore_enable_flow.load() ) [[likely]] {
char* message = udp::get_udp_payload(q_bufs[i_b]);
handle_eth_payload(src_rx_q, message, data_len);
}
Expand Down

0 comments on commit 3b28c55

Please sign in to comment.