Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Roland sipos/data move cb registry #121

Merged
merged 6 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions plugins/NICReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,12 @@ NICReceiver::init(const data_t& args)
ers::fatal(dunedaq::readoutlibs::InitializationError(
ERS_HERE, "Output link ID could not be parsed on queue instance name! "));
}
TLOG() << "Creating source for target queue: " << target << " DLH number: " << sourceid;
m_sources[sourceid] = createSourceModel(qi.uid);
bool callback_mode = false;
if (words.front() == "cb") {
callback_mode = true;
}
TLOG() << "Creating source for target queue: " << target << " DLH number: " << sourceid << " CallbacksMode?=" << callback_mode;
m_sources[sourceid] = createSourceModel(qi.uid, callback_mode);
if (m_sources[sourceid] == nullptr) {
ers::fatal(dunedaq::readoutlibs::InitializationError(
ERS_HERE, "CreateSource failed to provide an appropriate model for queue!"));
Expand Down Expand Up @@ -209,6 +213,13 @@ NICReceiver::do_start(const data_t&)
// } else {
// TLOG_DEBUG(5) << "NICReader is already running!";
// }
//

// Setup callbacks on all sourcemodels
for (auto& [sourceid, source] : m_sources) {
source->acquire_callback();
}

for (auto& [iface_id, iface] : m_ifaces) {
iface->enable_flow();
}
Expand Down
9 changes: 6 additions & 3 deletions src/CreateSource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ DUNE_DAQ_TYPESTRING(dunedaq::fdreadoutlibs::types::TDEFrameTypeAdapter, "TDEFram
namespace dpdklibs {

std::unique_ptr<SourceConcept>
createSourceModel(const std::string& conn_uid)
createSourceModel(const std::string& conn_uid, bool callback_mode)
{
auto datatypes = dunedaq::iomanager::IOManager::get()->get_datatypes(conn_uid);
if (datatypes.size() != 1) {
Expand All @@ -41,8 +41,11 @@ createSourceModel(const std::string& conn_uid)
// Create Model
auto source_model = std::make_unique<SourceModel<fdreadoutlibs::types::DUNEWIBEthTypeAdapter>>();

// For callback acquisition later (lazy)
source_model->set_sink_name(conn_uid);

// Setup sink (acquire pointer from QueueRegistry)
source_model->set_sink(conn_uid);
source_model->set_sink(conn_uid, callback_mode);

// Get parser and sink
//auto& parser = source_model->get_parser();
Expand All @@ -62,7 +65,7 @@ createSourceModel(const std::string& conn_uid)
} else if (raw_dt.find("TDEFrame") != std::string::npos) {
// WIB2 specific char arrays
auto source_model = std::make_unique<SourceModel<fdreadoutlibs::types::TDEFrameTypeAdapter>>();
source_model->set_sink(conn_uid);
source_model->set_sink(conn_uid, callback_mode);
//auto& parser = source_model->get_parser();
//parser.process_chunk_func = parsers::fixsizedChunkInto<fdreadoutlibs::types::DUNEWIBSuperChunkTypeAdapter>(sink);
return source_model;
Expand Down
9 changes: 8 additions & 1 deletion src/SourceConcept.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class SourceConcept
SourceConcept& operator=(SourceConcept&&) = delete; ///< SourceConcept is not move-assignable

virtual void init(const nlohmann::json& args) = 0;
virtual void set_sink(const std::string& sink_name) = 0;
virtual void set_sink(const std::string& sink_name, bool callback_mode) = 0;
virtual void acquire_callback() = 0;
virtual void conf(const nlohmann::json& args) = 0;
virtual void start(const nlohmann::json& args) = 0;
virtual void stop(const nlohmann::json& args) = 0;
Expand All @@ -57,6 +58,11 @@ class SourceConcept

//DefaultParserImpl& get_parser() { return std::ref(m_parser_impl); }

void set_sink_name(const std::string& sink_name)
{
m_sink_name = sink_name;
}

void set_ids(int card, int pid, int sip, int lid)
{
m_card_id = card;
Expand Down Expand Up @@ -92,6 +98,7 @@ class SourceConcept
std::string m_source_str;
std::string m_opmon_str;
std::string m_source_tid;
std::string m_sink_name;
std::chrono::time_point<std::chrono::high_resolution_clock> m_t0;

private:
Expand Down
57 changes: 47 additions & 10 deletions src/SourceModel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "logging/Logging.hpp"
// #include "readoutlibs/utils/ReusableThread.hpp"

#include "readoutlibs/DataMoveCallbackRegistry.hpp"

// #include <folly/ProducerConsumerQueue.h>
// #include <nlohmann/json.hpp>

Expand Down Expand Up @@ -50,13 +52,34 @@ class SourceModel : public SourceConcept
{}
~SourceModel() {}

void set_sink(const std::string& sink_name) override
void set_sink(const std::string& sink_name, bool callback_mode) override
{
m_callback_mode = callback_mode;
if (callback_mode) {
TLOG_DEBUG(5) << "Callback mode requested. Won't acquire iom sender!";
} else {
if (m_sink_is_set) {
TLOG_DEBUG(5) << "SourceModel sink is already set in initialized!";
} else {
m_sink_queue = get_iom_sender<TargetPayloadType>(sink_name);
m_sink_is_set = true;
}
}
}

void acquire_callback() override
{
if (m_sink_is_set) {
TLOG_DEBUG(5) << "SourceModel sink is already set in initialized!";
if (m_callback_mode) {
if (m_callback_is_acquired) {
TLOG_DEBUG(5) << "SourceModel callback is already acquired!";
} else {
// Getting DataMoveCBRegistry
auto dmcbr = readoutlibs::DataMoveCallbackRegistry::get();
m_sink_callback = dmcbr->get_callback<TargetPayloadType>(inherited::m_sink_name);
m_callback_is_acquired = true;
}
} else {
m_sink_queue = get_iom_sender<TargetPayloadType>(sink_name);
m_sink_is_set = true;
TLOG_DEBUG(5) << "Won't acquire callback, as IOM sink is set!";
}
}

Expand Down Expand Up @@ -114,12 +137,18 @@ class SourceModel : public SourceConcept
if (push_out) {

TargetPayloadType& target_payload = *reinterpret_cast<TargetPayloadType*>(message);
if (!m_sink_queue->try_send(std::move(target_payload), iomanager::Sender::s_no_block)) {
//if(m_dropped_packets == 0 || m_dropped_packets%10000) {
// TLOG() << "Dropped data " << m_dropped_packets;
//}
++m_dropped_packets;

if (m_callback_mode) {
(*m_sink_callback)(std::move(target_payload));
} else {
if (!m_sink_queue->try_send(std::move(target_payload), iomanager::Sender::s_no_block)) {
//if(m_dropped_packets == 0 || m_dropped_packets%10000) {
// TLOG() << "Dropped data " << m_dropped_packets;
//}
++m_dropped_packets;
}
}

} else {
TargetPayloadType target_payload;
uint32_t bytes_copied = 0;
Expand All @@ -141,11 +170,19 @@ class SourceModel : public SourceConcept
std::atomic<bool> m_run_marker;
bool m_configured{ false };

std::string m_sink_id;
bool m_callback_mode;

// Sink
bool m_sink_is_set{ false };
std::shared_ptr<sink_t> m_sink_queue;
//std::shared_ptr<err_sink_t> m_error_sink_queue;

// Callback
bool m_callback_is_acquired{ false };
using sink_cb_t = std::shared_ptr<std::function<void(TargetPayloadType&&)>>;
sink_cb_t m_sink_callback;

std::atomic<uint64_t> m_dropped_packets{0};

};
Expand Down
12 changes: 9 additions & 3 deletions src/detail/IfaceWrapper.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,16 @@ IfaceWrapper::rx_runner(void *arg __rte_unused) {
const uint16_t nb_rx = nb_rx_map[src_rx_q];

// We got packets from burst on this queue
if (nb_rx != 0) {
if (nb_rx != 0) [[likely]] {

m_max_burst_size[src_rx_q] = std::max(nb_rx, m_max_burst_size[src_rx_q].load());
// -------
// Iterate on burst packets
for (int i_b=0; i_b<nb_rx; ++i_b) {


// RS FIXME: removed for performance improvement hope
/*
// Check if packet is segmented. Implement support for it if needed.
if (q_bufs[i_b]->nb_segs > 1) {
//TLOG_DEBUG(10) << "It appears a packet is spread across more than one receiving buffer;"
Expand All @@ -61,6 +64,7 @@ IfaceWrapper::rx_runner(void *arg __rte_unused) {

// Check packet type, ommit/drop unexpected ones.
auto pkt_type = q_bufs[i_b]->packet_type;

//// Handle non IPV4 packets
if (not RTE_ETH_IS_IPV4_HDR(pkt_type)) {
//TLOG_DEBUG(10) << "Non-Ethernet packet type: " << (unsigned)pkt_type << " original: " << pkt_type;
Expand All @@ -74,15 +78,17 @@ IfaceWrapper::rx_runner(void *arg __rte_unused) {
}
continue;
}
*/
// RS FIXME

// Check for UDP frames
//if (pkt_type == RTE_PTYPE_L4_UDP) { // RS FIXME: doesn't work. Why? What is the PKT_TYPE in our ETH frames?
// Check for JUMBO frames
if (q_bufs[i_b]->pkt_len > 7000) { // RS FIXME: do proper check on data length later
if (q_bufs[i_b]->pkt_len > 7000) [[likely]] { // RS FIXME: do proper check on data length later
// Handle them!
std::size_t data_len = q_bufs[i_b]->data_len;

if ( m_lcore_enable_flow.load() ) {
if ( m_lcore_enable_flow.load() ) [[likely]] {
char* message = udp::get_udp_payload(q_bufs[i_b]);
handle_eth_payload(src_rx_q, message, data_len);
}
Expand Down