Skip to content

Commit

Permalink
Merge pull request #118 from DUNE-DAQ/feature/OKS
Browse files Browse the repository at this point in the history
Feature/oks
  • Loading branch information
jcfreeman2 authored Mar 1, 2024
2 parents cd47f17 + 8916834 commit 272c23f
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 412 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

.vscode/settings.json
.DS_Store
224 changes: 96 additions & 128 deletions plugins/NICReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,19 @@
* Licensing/copyright details are in the COPYING file that you should have
* received with this code.
*/
#include "dpdklibs/nicreader/Nljs.hpp"
//#include "dpdklibs/nicreader/Nljs.hpp"

#include "appfwk/ConfigurationManager.hpp"
#include "appfwk/ModuleConfiguration.hpp"

#include "appdal/DataReader.hpp"
#include "coredal/ReadoutInterface.hpp"
#include "appdal/NICReceiverConf.hpp"
#include "appdal/NICInterface.hpp"
#include "appdal/NICInterfaceConfiguration.hpp"
#include "appdal/NICStatsConf.hpp"
#include "appdal/EthStreamParameters.hpp"
#include "coredal/QueueWithId.hpp"

#include "logging/Logging.hpp"

Expand Down Expand Up @@ -53,10 +65,7 @@ namespace dpdklibs {

NICReceiver::NICReceiver(const std::string& name)
: DAQModule(name),
m_run_marker{ false },
m_routing_policy{ "incremental" },
m_prev_sink(0),
m_next_sink(0)
m_run_marker{ false }
{
register_command("conf", &NICReceiver::do_configure);
register_command("start", &NICReceiver::do_start);
Expand All @@ -82,73 +91,72 @@ tokenize(std::string const& str, const char delim, std::vector<std::string>& out
}

void
NICReceiver::init(const data_t& args)
NICReceiver::init(const std::shared_ptr<appfwk::ModuleConfiguration> mcfg )
{
auto ini = args.get<appfwk::app::ModInit>();
for (const auto& qi : ini.conn_refs) {
if (qi.uid == "errored_chunks_q") {
continue;
} else {
TLOG_DEBUG(TLVL_WORK_STEPS) << ": NICCardReader output queue is " << qi.uid;
const char delim = '_';
std::string target = qi.uid;
std::vector<std::string> words;
tokenize(target, delim, words);
int sourceid = -1;
try {
sourceid = std::stoi(words.back());
} catch (const std::exception& ex) {
ers::fatal(dunedaq::readoutlibs::InitializationError(
ERS_HERE, "Output link ID could not be parsed on queue instance name! "));
}
TLOG() << "Creating source for target queue: " << target << " DLH number: " << sourceid;
m_sources[sourceid] = createSourceModel(qi.uid);
if (m_sources[sourceid] == nullptr) {
ers::fatal(dunedaq::readoutlibs::InitializationError(
ERS_HERE, "CreateSource failed to provide an appropriate model for queue!"));
}
m_sources[sourceid]->init(args);
}
auto mdal = mcfg->module<appdal::DataReader>(get_name());
m_cfg = mcfg;
if (mdal->get_outputs().empty()) {
auto err = dunedaq::readoutlibs::InitializationError(ERS_HERE, "No outputs defined for NIC reader in configuration.");
ers::fatal(err);
throw err;
}

for (auto con : mdal->get_outputs()) {
auto queue = con->cast<coredal::QueueWithId>();
if(queue == nullptr) {
auto err = dunedaq::readoutlibs::InitializationError(ERS_HERE, "Outputs are not of type QueueWithGeoId.");
ers::fatal(err);
throw err;
}

m_sources[queue->get_source_id()] = createSourceModel(queue->UID());
//m_sources[queue->get_source_id()]->init();
}
}

void
NICReceiver::do_configure(const data_t& args)
NICReceiver::do_configure(const data_t& /*args*/)
{
TLOG() << get_name() << ": Entering do_conf() method";
m_cfg = args.get<module_conf_t>();
auto ifaces_cfg = m_cfg.ifaces;

//auto session = appfwk::ModuleManager::get()->session();
auto mdal = m_cfg->module<appdal::DataReader>(get_name());
auto module_conf = mdal->get_configuration()->cast<appdal::NICReceiverConf>();
auto res_set = mdal->get_interfaces();
// EAL setup
TLOG() << "Setting up EAL with params from config.";
std::vector<std::string> eal_args;

eal_args.push_back("eal_cmdline");

// Enforce the process type to primary
eal_args.push_back("--proc-type=primary");

std::vector<std::string> eal_params ;
eal_params.push_back("eal_cmdline");
eal_params.push_back("--proc-type=primary");
// Construct the pcie devices allowed mask
for (const auto& iface_cfg : ifaces_cfg) {
eal_args.push_back("-a");
eal_args.push_back(iface_cfg.pci_addr);
std::string first_pcie_addr;
bool is_first_pcie_addr = true;
for (auto res : res_set) {
auto interface = res->cast<appdal::NICInterface>();
if (interface == nullptr) {
dunedaq::readoutlibs::GenericConfigurationError err(
ERS_HERE, "NICReceiver configuration failed due expected but unavailable interface!");
ers::fatal(err);
throw err;
}
if (interface->disabled(*(m_cfg->configuration_manager()->session()))) {
continue;
}
if (is_first_pcie_addr) {
first_pcie_addr = interface->get_rx_pcie_addr();
is_first_pcie_addr = false;
}
eal_params.push_back("-a");
eal_params.push_back(interface->get_rx_pcie_addr());
}

// Use the first pcie device id as file prefix
// FIXME: Review this strategy - should work in most of cases, but it could be
// confusing in configs with multiple interfaces
eal_args.push_back(fmt::format("--file-prefix={}", ifaces_cfg.front().pci_addr));
eal_params.push_back(fmt::format("--file-prefix={}", first_pcie_addr));

// Append the remaining dpdk parameters
std::istringstream iss(m_cfg.eal_arg_list);
std::string arg_from_str;
while (iss >> arg_from_str) {
if (!arg_from_str.empty()) {
eal_args.push_back(arg_from_str);
}
}
eal_params.push_back(module_conf->get_eal_args());

ealutils::init_eal(eal_args);
ealutils::init_eal(eal_params);

// Get available Interfaces from EAL
auto available_ifaces = ifaceutils::get_num_available_ifaces();
Expand All @@ -161,27 +169,39 @@ NICReceiver::do_configure(const data_t& args)
TLOG() << "Available iface with MAC=" << mac_addr_str << " PCIe=" << pci_addr_str << " logical ID=" << ifc_id;
}

// Configure expected (and available!) interfaces
for (const auto& iface_cfg : ifaces_cfg) {
auto iface_mac_addr = iface_cfg.mac_addr;
auto iface_pci_addr = iface_cfg.pci_addr;
if ((m_mac_to_id_map.count(iface_mac_addr) != 0) &&
(m_pci_to_id_map.count(iface_pci_addr) != 0)) {
auto iface_id = m_mac_to_id_map[iface_mac_addr];
TLOG() << "Configuring expected interface with MAC=" << iface_mac_addr
<< " PCIe=" << iface_pci_addr << " Logical ID=" << iface_id;
m_ifaces[iface_id] = std::make_unique<IfaceWrapper>(iface_id, m_sources, m_run_marker);
m_ifaces[iface_id]->conf(iface_cfg);
m_ifaces[iface_id]->allocate_mbufs();
m_ifaces[iface_id]->setup_interface();
m_ifaces[iface_id]->setup_flow_steering();
m_ifaces[iface_id]->setup_xstats();
} else {
TLOG() << "No available interface with MAC=" << iface_mac_addr << " PCI=" << iface_pci_addr;
throw dunedaq::readoutlibs::InitializationError(ERS_HERE, "NICReceiver configuration failed due expected but unavailable interface!");
}
for (auto res : res_set) {
auto interface = res->cast<appdal::NICInterface>();
if (interface == nullptr) {
dunedaq::readoutlibs::GenericConfigurationError err(
ERS_HERE, "NICReceiver configuration failed due expected but unavailable interface!");
ers::fatal(err);
throw err;
}
if (interface->disabled(*(m_cfg->configuration_manager()->session()))) {
continue;
}

if ((m_mac_to_id_map.count(interface->get_rx_mac()) != 0) && (m_pci_to_id_map.count(interface->get_rx_pcie_addr()) != 0)) {
m_mac_to_id_map[interface->get_rx_mac()] = interface->get_rx_iface();
m_ifaces[interface->get_rx_iface()] = std::make_unique<IfaceWrapper>(interface, m_sources, m_run_marker);
//m_ifaces[interface->get_rx_iface()] = conf(iface_cfg);
m_ifaces[interface->get_rx_iface()]->allocate_mbufs();;
m_ifaces[interface->get_rx_iface()]->setup_flow_steering();
m_ifaces[interface->get_rx_iface()]->setup_xstats();
} else {
TLOG() << "No available interface with MAC=" << interface->get_rx_mac();
ers::fatal(dunedaq::readoutlibs::InitializationError(
ERS_HERE, "NICReceiver configuration failed due expected but unavailable interface!"));
}
}

return;

}

void
NICReceiver::do_start(const data_t&)
{
TLOG() << get_name() << ": Entering do_start() method";
if (!m_run_marker.load()) {
set_running(true);
Expand All @@ -193,22 +213,6 @@ NICReceiver::do_configure(const data_t& args)
TLOG_DEBUG(5) << "NICReader is already running!";
}

return;
}

void
NICReceiver::do_start(const data_t&)
{
// TLOG() << get_name() << ": Entering do_start() method";
// if (!m_run_marker.load()) {
// set_running(true);
// TLOG() << "Starting iface wrappers.";
// for (auto& [iface_id, iface] : m_ifaces) {
// iface->start();
// }
// } else {
// TLOG_DEBUG(5) << "NICReader is already running!";
// }
for (auto& [iface_id, iface] : m_ifaces) {
iface->enable_flow();
}
Expand Down Expand Up @@ -236,28 +240,11 @@ NICReceiver::do_stop(const data_t&)
}
}


void
NICReceiver::do_scrap(const data_t&)
{

TLOG() << get_name() << ": Entering do_stop() method";
if (m_run_marker.load()) {
TLOG() << "Raising stop through variables!";
set_running(false);
TLOG() << "Stopping iface wrappers.";
for (auto& [iface_id, iface] : m_ifaces) {
iface->stop();
}
ealutils::wait_for_lcores();
TLOG() << "Stoppped DPDK lcore processors and internal threads...";
} else {
TLOG_DEBUG(5) << "DPDK lcore processor is already stopped!";
}

TLOG() << get_name() << ": Entering do_scrap() method";
for (auto& [iface_id, iface] : m_ifaces) {
iface->scrap();
}

}

void
Expand All @@ -268,25 +255,6 @@ NICReceiver::get_info(opmonlib::InfoCollector& ci, int level)
}
}

void
NICReceiver::handle_eth_payload(int src_rx_q, char* payload, std::size_t size) {
// Get DAQ Header and its StreamID
auto* daq_header = reinterpret_cast<dunedaq::detdataformats::DAQEthHeader*>(payload);
//auto strid = (unsigned)daq_header->stream_id;
auto strid = (unsigned)daq_header->stream_id+(daq_header->slot_id<<8)+(daq_header->crate_id<<(8+4))+(daq_header->det_id<<(8+4+10));
if (m_sources.count(strid) != 0) {
auto ret = m_sources[strid]->handle_payload(payload, size);
} else {
// Really bad -> unexpeced StreamID in UDP Payload.
// This check is needed in order to avoid dynamically add thousands
// of Sources on the fly, in case the data corruption is extremely severe.
if (m_num_unexid_frames.count(strid) == 0) {
m_num_unexid_frames[strid] = 0;
}
m_num_unexid_frames[strid]++;
}
}

void
NICReceiver::set_running(bool should_run)
{
Expand Down
49 changes: 8 additions & 41 deletions plugins/NICReceiver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,22 @@

#include "dpdklibs/udp/Utils.hpp"

#include "appfwk/app/Nljs.hpp"
//#include "appfwk/app/Nljs.hpp"
#include "appfwk/cmd/Nljs.hpp"
#include "appfwk/cmd/Structs.hpp"

#include "appfwk/DAQModule.hpp"

#include "iomanager/IOManager.hpp"
#include "iomanager/Sender.hpp"

#include "readoutlibs/utils/ReusableThread.hpp"

#include "dpdklibs/nicreader/Structs.hpp"
#include "dpdklibs/nicreaderinfo/InfoNljs.hpp"
//#include "dpdklibs/nicreader/Structs.hpp"
//#include "dpdklibs/nicreaderinfo/InfoNljs.hpp"
#include "dpdklibs/EALSetup.hpp"
#include "IfaceWrapper.hpp"

#include <folly/ProducerConsumerQueue.h>

#include <future>
#include <map>
#include <memory>
Expand All @@ -51,11 +50,11 @@ class NICReceiver : public dunedaq::appfwk::DAQModule
NICReceiver(NICReceiver&&) = delete; ///< NICReceiver is not move-constructible
NICReceiver& operator=(NICReceiver&&) = delete; ///< NICReceiver is not move-assignable

void init(const data_t& args) override;
void init(const std::shared_ptr<appfwk::ModuleConfiguration> mfcg) override;

private:
// Types
using module_conf_t = dunedaq::dpdklibs::nicreader::Conf;
//using module_conf_t = dunedaq::dpdklibs::nicreader::Conf;

// Commands
void do_configure(const data_t&);
Expand All @@ -65,41 +64,12 @@ class NICReceiver : public dunedaq::appfwk::DAQModule
void get_info(opmonlib::InfoCollector& ci, int level);

// Internals
std::shared_ptr<appfwk::ModuleConfiguration> m_cfg;

int m_running = 0;
std::atomic<bool> m_run_marker;
void set_running(bool /*should_run*/);

// Configuration
module_conf_t m_cfg;
std::string m_dest_ip;
int m_num_ip_sources;
int m_num_rx_cores;
std::set<int> m_rx_qs;
std::map<int, std::map<int, std::string>> m_rx_core_map;

// Routing policy
std::string m_routing_policy;
int m_prev_sink;
int m_next_sink;

// What to do with every payload
void handle_eth_payload(int src_rx_q, char* payload, std::size_t size);

// Stats
int m_burst_number = 0;
int m_sum = 0;
std::map<int, std::atomic<std::size_t>> m_num_frames;
std::map<int, std::atomic<std::size_t>> m_num_bytes;
std::map<int, std::atomic<std::size_t>> m_num_unexid_frames;
std::thread m_stat_thread;

// DPDK
unsigned m_num_ifaces;
uint16_t m_iface_id;
const int m_burst_size = 256;
std::map<int, std::unique_ptr<rte_mempool>> m_mbuf_pools;
std::map<int, struct rte_mbuf **> m_bufs;

// Interfaces (logical ID, MAC) -> IfaceWrapper
std::map<std::string, uint16_t> m_mac_to_id_map;
std::map<std::string, uint16_t> m_pci_to_id_map;
Expand All @@ -109,9 +79,6 @@ class NICReceiver : public dunedaq::appfwk::DAQModule
using source_to_sink_map_t = std::map<int, std::unique_ptr<SourceConcept>>;
source_to_sink_map_t m_sources;


opmonlib::InfoCollector m_ic;
std::mutex m_ic_mutex;
};

} // namespace dunedaq::dpdklibs
Expand Down
Loading

0 comments on commit 272c23f

Please sign in to comment.