Skip to content

Commit

Permalink
Merge branch 'main' into ep/root-image
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky authored Feb 11, 2025
2 parents 7c29351 + df4d0d9 commit f7f676f
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 75 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions crates/test/tests/xdp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async fn simple_forwarding() {
frame_size: xdp::umem::FrameSize::TwoK,
head_room: 0,
frame_count: 1,
tx_metadata: false,
..Default::default()
}
.build()
.unwrap(),
Expand Down Expand Up @@ -162,7 +162,7 @@ async fn changes_ip_version() {
frame_size: xdp::umem::FrameSize::TwoK,
head_room: 20,
frame_count: 1,
tx_metadata: false,
..Default::default()
}
.build()
.unwrap(),
Expand Down Expand Up @@ -246,7 +246,7 @@ async fn packet_manipulation() {
frame_size: xdp::umem::FrameSize::TwoK,
head_room: 20,
frame_count: 1,
tx_metadata: false,
..Default::default()
}
.build()
.unwrap(),
Expand Down Expand Up @@ -525,7 +525,7 @@ async fn multiple_servers() {
frame_size: xdp::umem::FrameSize::TwoK,
head_room: 20,
frame_count: 20,
tx_metadata: false,
..Default::default()
}
.build()
.unwrap(),
Expand Down Expand Up @@ -605,7 +605,7 @@ async fn many_sessions() {
frame_size: xdp::umem::FrameSize::TwoK,
head_room: 0,
frame_count: 1,
tx_metadata: false,
..Default::default()
}
.build()
.unwrap(),
Expand Down Expand Up @@ -735,7 +735,7 @@ async fn frees_dropped_packets() {
frame_size: xdp::umem::FrameSize::TwoK,
head_room: 0,
frame_count: 1,
tx_metadata: false,
..Default::default()
}
.build()
.unwrap(),
Expand Down Expand Up @@ -829,7 +829,7 @@ async fn qcmp() {
frame_size: xdp::umem::FrameSize::TwoK,
head_room: 0,
frame_count: 1,
tx_metadata: false,
..Default::default()
}
.build()
.unwrap(),
Expand Down
2 changes: 1 addition & 1 deletion crates/xdp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ aya-log = "0.2.1"
libc.workspace = true
thiserror.workspace = true
tracing.workspace = true
xdp = "0.1.0"
xdp = "0.2.0"
#xdp = { git = "https://github.com/Jake-Shadle/xdp", branch = "main" }

[lints]
Expand Down
56 changes: 3 additions & 53 deletions crates/xdp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ pub struct XdpWorker {
/// and send packets are stored
pub umem: xdp::Umem,
/// The ring used to indicate to the kernel we wish to receive packets
pub fill: xdp::FillRing,
pub fill: xdp::WakableFillRing,
/// The ring the kernel pushes received packets to
pub rx: xdp::RxRing,
/// The ring we push packets we wish to send
pub tx: xdp::TxRing,
pub tx: xdp::WakableTxRing,
/// The ring the kernel pushes packets that have finished sending
pub completion: xdp::CompletionRing,
}
Expand Down Expand Up @@ -160,7 +160,7 @@ impl EbpfProgram {
for i in 0..device_caps.queue_count {
let umem = xdp::Umem::map(umem_cfg)?;
let mut sb = xdp::socket::XdpSocketBuilder::new()?;
let (rings, mut bind_flags) = sb.build_rings(&umem, ring_cfg)?;
let (rings, mut bind_flags) = sb.build_wakable_rings(&umem, ring_cfg)?;

if device_caps.zero_copy.is_available() {
bind_flags.force_zerocopy();
Expand Down Expand Up @@ -220,53 +220,3 @@ impl EbpfProgram {
program.detach(link_id)
}
}

/// Gets the information for the default NIC
pub fn get_default_nic() -> std::io::Result<Option<NicIndex>> {
let table = std::fs::read_to_string("/proc/net/route")?;

// In most cases there will probably only be one NIC that talks to
// the rest of the network, but just in case, fail if there is
// more than one, so the user is forced to specify. We _could_ go
// further and use netlink to get the route for a global IP eg. 8.8.8.8,
// but the rtnetlink crate is...pretty bad to work with, maybe neli?
// (though want to get rid of that as well)
let mut def_iface = None;

// skip column headers
for line in table.lines().skip(1) {
let mut iter = line.split(char::is_whitespace).filter_map(|s| {
let s = s.trim();
(!s.is_empty()).then_some(s)
});

let Some(name) = iter.next() else {
continue;
};
let Some(flags) = iter.nth(2).and_then(|f| u16::from_str_radix(f, 16).ok()) else {
continue;
};

if flags & (libc::RTF_UP | libc::RTF_GATEWAY) != libc::RTF_UP | libc::RTF_GATEWAY {
continue;
}

let Some(iface) = NicIndex::lookup_by_name(name)? else {
continue;
};

if let Some(def) = def_iface {
// A NIC can have multiple routes, so don't error when it comes up again
if def != iface {
return Err(std::io::Error::new(
std::io::ErrorKind::Unsupported,
format!("unable to determine default interface, found {def:?} and {iface:?}"),
));
}
}

def_iface = Some(iface);
}

Ok(def_iface)
}
54 changes: 42 additions & 12 deletions src/net/xdp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,27 @@ pub enum XdpSpawnError {
/// work in the `xdp` crate
pub fn setup_xdp_io(config: XdpConfig<'_>) -> Result<XdpWorkers, XdpSetupError> {
let nic_index = match config.nic {
NicConfig::Default => quilkin_xdp::get_default_nic()
.map_err(NicUnavailable::Query)?
.ok_or(NicUnavailable::NoAvailableDefault)?,
NicConfig::Name(name) => xdp::nic::NicIndex::lookup_by_name(name)
.map_err(NicUnavailable::Query)?
.ok_or_else(|| NicUnavailable::UnknownName(name.to_owned()))?,
NicConfig::Default => {
let mut chosen = None;

for iface in xdp::nic::InterfaceIter::new().map_err(NicUnavailable::Query)? {
if let Some(chosen) = chosen {
if iface != chosen {
return Err(NicUnavailable::NoAvailableDefault.into());
}
} else {
chosen = Some(iface);
}
}

chosen.ok_or(NicUnavailable::NoAvailableDefault)?
}
NicConfig::Name(name) => {
let cname = std::ffi::CString::new(name).unwrap();
xdp::nic::NicIndex::lookup_by_name(&cname)
.map_err(NicUnavailable::Query)?
.ok_or_else(|| NicUnavailable::UnknownName(name.to_owned()))?
}
NicConfig::Index(index) => xdp::nic::NicIndex::new(index),
};

Expand Down Expand Up @@ -233,6 +248,8 @@ pub fn setup_xdp_io(config: XdpConfig<'_>) -> Result<XdpWorkers, XdpSetupError>
// that doesn't change during the course of operation, but for now just
// do it at runtime
tx_metadata: device_caps.tx_metadata.checksum(),
#[cfg(debug_assertions)]
software_checksum: false,
}
.build()?;

Expand Down Expand Up @@ -315,7 +332,12 @@ pub fn spawn(workers: XdpWorkers, config: Arc<crate::Config>) -> Result<XdpLoop,
.spawn(move || {
// Enqueue buffers to the fill ring to ensure that we don't miss any packets
// SAFETY: we keep the umem alive for as long as the socket is alive
unsafe { worker.fill.enqueue(&mut worker.umem, BATCH_SIZE * 2) };
unsafe {
if let Err(error) = worker.fill.enqueue(&mut worker.umem, BATCH_SIZE * 2, true)
{
tracing::error!(%error, "failed to kick fill ring during initial spinup");
}
};

io_loop(
worker,
Expand Down Expand Up @@ -407,24 +429,32 @@ fn io_loop(
// the owner of the actual memory map
unsafe {
while !shutdown.load(std::sync::atomic::Ordering::Relaxed) {
// Wait for packets to be received/sent, note that
// Wait for packets to be received, note that
// [poll](https://www.man7.org/linux/man-pages/man2/poll.2.html) also acts
// as a [cancellation point](https://www.man7.org/linux/man-pages/man7/pthreads.7.html),
// so shutdown will cause the thread to exit here
let Ok(true) = socket.poll(POLL_TIMEOUT) else {
let Ok(true) = socket.poll_read(POLL_TIMEOUT) else {
continue;
};

let recvd = rx.recv(&umem, &mut rx_slab);

// Ensure the fill ring doesn't get starved, which could drop packets
fill.enqueue(&mut umem, BATCH_SIZE * 2 - recvd);
if let Err(error) = fill.enqueue(&mut umem, BATCH_SIZE * 2 - recvd, true) {
tracing::error!(%error, "RX kick failed");
}

// Process each of the packets that we received, potentially queuing
// packets to be sent
process::process_packets(&mut rx_slab, &mut umem, &mut tx_slab, &mut state);

let enqueued_sends = tx.send(&mut tx_slab);
let before = tx_slab.len();
let enqueued_sends = match tx.send(&mut tx_slab, true) {
Ok(es) => es,
Err(error) => {
tracing::error!(%error, "TX kick failed");
before - tx_slab.len()
}
};

// Return frames that have completed sending
pending_sends -= completion.dequeue(&mut umem, pending_sends);
Expand Down

0 comments on commit f7f676f

Please sign in to comment.