Skip to content

Commit

Permalink
Added automatic generation of rte core masks for eal_init
Browse files Browse the repository at this point in the history
  • Loading branch information
alessandrothea committed Jun 17, 2024
1 parent bfdca93 commit 3d0c769
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 9 deletions.
14 changes: 14 additions & 0 deletions plugins/DPDKReaderModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include "appmodel/DataReceiverModule.hpp"
#include "appmodel/DPDKReaderConf.hpp"
#include "appmodel/DPDKPortConfiguration.hpp"
#include "confmodel/ProcessingResource.hpp"
#include "confmodel/NetworkDevice.hpp"
#include "confmodel/QueueWithSourceId.hpp"

Expand Down Expand Up @@ -141,6 +143,11 @@ DPDKReaderModule::do_configure(const data_t& /*args*/)
// Construct the pcie devices allowed mask
std::string first_pcie_addr;
bool is_first_pcie_addr = true;
std::vector<uint16_t> rte_cores;

// FIXME: Hardcoding core 0 for GARP. Replace with better param.
rte_cores.push_back(0);

std::vector<const confmodel::DetectorToDaqConnection*> d2d_conns;
for (auto res : res_set) {
auto connection = res->cast<confmodel::DetectorToDaqConnection>();
Expand Down Expand Up @@ -172,8 +179,15 @@ DPDKReaderModule::do_configure(const data_t& /*args*/)
}
eal_params.push_back("-a");
eal_params.push_back(net_device->get_pcie_addr());

for ( const auto* proc_res : receiver->get_configuration()->get_used_lcores() ) {
rte_cores.insert(rte_cores.end(), proc_res->get_cpu_cores().begin(), proc_res->get_cpu_cores().end());
}
}

eal_params.push_back("-l");
eal_params.push_back(fmt::format("{}", fmt::join(rte_cores,",")));


// Use the first pcie device id as file prefix
// FIXME: Review this strategy - should work in most of cases, but it could be
Expand Down
21 changes: 18 additions & 3 deletions scripts/udp_receiver.py → scripts/dpdklibs_udp_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
N_STREAM = 128
FRAME_SIZE = 7200
FRAME_TS_GAP = 2048
FRAME_TS_GAP = 2000

def print_header(wib_frame,prefix="\t"):
header = wib_frame.get_daqheader()
Expand All @@ -35,20 +36,34 @@ def dump_data(data):
@click.command()
@click.option('-d', '--dump', is_flag=True, default=False)
@click.option('-c', '--count', type=int, default=None)
def main(dump, count):
@click.option('-p', '--port', type=int, default=None)
@click.option('-g', '--gap', type=int, default=None)
@click.option('-f', '--frame-type', type=click.Choice(['wib', 'tde']), default='wib')
def main(dump, count, port, gap, frame_type):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)

s.bind(('0.0.0.0', 0x4444))
s.bind(('0.0.0.0', port))

prev_stream = {}
i=0
dtstart = time.time()
dtlast = dtstart
sampling = 100000
match frame_type:
case 'wib':
frame_class = fddetdataformats.WIBEthFrame
port = port if not port is None else 0x4444
gap = gap if not gap is None else 2048
case 'tde':
frame_class = fddetdataformats.TDEEthFrame
port = port if not port is None else 54323
gap = gap if not gap is None else 2000

print('Starting receiver')
while (count==None or i<count):
# while i<10:
data, address = s.recvfrom(20000)
wf = fddetdataformats.WIBEthFrame(data)
wf = frame_class(data)
header = wf.get_daqheader()


Expand Down
File renamed without changes.
9 changes: 4 additions & 5 deletions src/IfaceWrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,8 @@ IfaceWrapper::IfaceWrapper(


// Here is my list of cores
std::vector<uint16_t> rte_cores;
for( auto proc_res : iface_cfg->get_used_lcores()) {
rte_cores.insert(rte_cores.end(), proc_res->get_cpu_cores().begin(), proc_res->get_cpu_cores().end());
for( const auto* proc_res : iface_cfg->get_used_lcores()) {
m_rte_cores.insert(m_rte_cores.end(), proc_res->get_cpu_cores().begin(), proc_res->get_cpu_cores().end());
}

// iterate through active streams
Expand Down Expand Up @@ -125,11 +124,11 @@ IfaceWrapper::IfaceWrapper(
m_num_frames_rxq[rx_q] = { 0 };
m_num_bytes_rxq[rx_q] = { 0 };

m_rx_core_map[rte_cores[core_idx]][rx_q] = tx_ip;
m_rx_core_map[m_rte_cores[core_idx]][rx_q] = tx_ip;
m_stream_id_to_source_id[rx_q] = strm_src;

++rx_q;
if ( ++core_idx == rte_cores.size()) {
if ( ++core_idx == m_rte_cores.size()) {
core_idx = 0;
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/IfaceWrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ class IfaceWrapper
void setup_interface();
void setup_flow_steering();
void setup_xstats();

void enable_flow() { m_lcore_enable_flow.store(true);}
void disable_flow() { m_lcore_enable_flow.store(false);}

const std::vector<uint16_t>& get_rte_cores() const { return m_rte_cores; }

protected:
//iface_conf_t m_cfg;
int m_iface_id;
Expand All @@ -93,6 +95,7 @@ class IfaceWrapper
std::set<std::string> m_ips;
std::set<int> m_rx_qs;
std::set<int> m_tx_qs;
std::vector<uint16_t> m_rte_cores;

// CPU core ID -> [queue -> ip]
std::map<int, std::map<int, std::string>> m_rx_core_map;
Expand Down

0 comments on commit 3d0c769

Please sign in to comment.