diff --git a/tokio/src/fs/mocks.rs b/tokio/src/fs/mocks.rs index ae5d7e5368e..138e12ac6bb 100644 --- a/tokio/src/fs/mocks.rs +++ b/tokio/src/fs/mocks.rs @@ -106,6 +106,15 @@ impl From for OwnedFd { } } +#[cfg(all(test, unix))] +impl From for MockFile { + #[inline] + fn from(file: OwnedFd) -> MockFile { + use std::os::fd::IntoRawFd; + unsafe { MockFile::from_raw_fd(IntoRawFd::into_raw_fd(file)) } + } +} + tokio_thread_local! { static QUEUE: RefCell>> = RefCell::new(VecDeque::new()) } diff --git a/tokio/src/fs/open_options.rs b/tokio/src/fs/open_options.rs index 27e98b5b234..99073a9d185 100644 --- a/tokio/src/fs/open_options.rs +++ b/tokio/src/fs/open_options.rs @@ -518,6 +518,10 @@ impl OpenOptions { /// [`Other`]: std::io::ErrorKind::Other /// [`PermissionDenied`]: std::io::ErrorKind::PermissionDenied pub async fn open(&self, path: impl AsRef) -> io::Result { + self.open_inner(path.as_ref()).await + } + + async fn open_inner(&self, path: &Path) -> io::Result { match &self.inner { Kind::Std(opts) => Self::std_open(opts, path).await, #[cfg(all( @@ -535,7 +539,7 @@ impl OpenOptions { .check_and_init(io_uring::opcode::OpenAt::CODE) .await? { - Op::open(path.as_ref(), opts)?.await + Op::open(path, opts)?.await } else { let opts = opts.clone().into(); Self::std_open(&opts, path).await @@ -544,12 +548,11 @@ impl OpenOptions { } } - async fn std_open(opts: &StdOpenOptions, path: impl AsRef) -> io::Result { - let path = path.as_ref().to_owned(); + async fn std_open(opts: &StdOpenOptions, path: &Path) -> io::Result { + let path = path.to_owned(); let opts = opts.clone(); - let std = asyncify(move || opts.open(path)).await?; - Ok(File::from_std(std)) + Ok(asyncify(move || opts.open(path)).await?.into()) } #[cfg(windows)] diff --git a/tokio/src/fs/read.rs b/tokio/src/fs/read.rs index aabc994e95f..f4de3913144 100644 --- a/tokio/src/fs/read.rs +++ b/tokio/src/fs/read.rs @@ -54,14 +54,23 @@ use std::{io, path::Path}; /// } /// ``` pub async fn read(path: impl AsRef) -> io::Result> { - let path = path.as_ref().to_owned(); + let path = path.as_ref(); #[cfg(all( tokio_unstable, feature = "io-uring", feature = "rt", feature = "fs", - target_os = "linux" + // libc::statx is only supported on these platforms + // FIXME: Add musl target env when our minimum supported + // rust version is 1.93. To clarify, statx support is + // introduced to musl in 1.25 as mentioned officially here: + // https://musl.libc.org/releases.html. + // However, rustup target_env building for *-linux-musl + // uses 1.25 musl on all *-linux-musl platforms starting + // in 1.93 stable rust version. + // https://blog.rust-lang.org/2025/12/05/Updating-musl-1.2.5/ + any(target_env = "gnu", target_os = "android") ))] { use crate::fs::read_uring; @@ -72,9 +81,14 @@ pub async fn read(path: impl AsRef) -> io::Result> { .check_and_init(io_uring::opcode::Read::CODE) .await? { - return read_uring(&path).await; + return read_uring(path).await; } } + read_spawn_blocking(path).await +} + +async fn read_spawn_blocking(path: &Path) -> io::Result> { + let path = path.to_owned(); asyncify(move || std::fs::read(path)).await } diff --git a/tokio/src/fs/read_uring.rs b/tokio/src/fs/read_uring.rs index 67d709a2ce3..ef5b2980979 100644 --- a/tokio/src/fs/read_uring.rs +++ b/tokio/src/fs/read_uring.rs @@ -19,9 +19,26 @@ const MAX_READ_SIZE: usize = 64 * 1024 * 1024; pub(crate) async fn read_uring(path: &Path) -> io::Result> { let file = OpenOptions::new().read(true).open(path).await?; - // TODO: use io uring in the future to obtain metadata + #[cfg(not(any(target_env = "gnu", target_os = "android")))] let size_hint: Option = file.metadata().await.map(|m| m.len() as usize).ok(); + #[cfg( + // libc::statx is only supported on these platforms + // FIXME: Add musl target env when our minimum supported + // rust version is 1.93. To clarify, statx support is + // introduced to musl in 1.25 as mentioned officially here: + // https://musl.libc.org/releases.html. + // However, rustup target_env building for *-linux-musl + // uses 1.25 musl on all *-linux-musl platforms starting + // in 1.93 stable rust version. + // https://blog.rust-lang.org/2025/12/05/Updating-musl-1.2.5/ + any(target_env = "gnu", target_os = "android") + )] + let size_hint = Op::file_metadata(&file)? + .await + .map(|m| m.len() as usize) + .ok(); + let fd: OwnedFd = file .try_into_std() .expect("unexpected in-flight operation detected") diff --git a/tokio/src/fs/try_exists.rs b/tokio/src/fs/try_exists.rs index 2e8de04e0c5..01c439356fa 100644 --- a/tokio/src/fs/try_exists.rs +++ b/tokio/src/fs/try_exists.rs @@ -23,6 +23,67 @@ use std::path::Path; /// # } /// ``` pub async fn try_exists(path: impl AsRef) -> io::Result { - let path = path.as_ref().to_owned(); + let path = path.as_ref(); + + #[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + // libc::statx is only supported on these platforms + // FIXME: Add musl target env when our minimum supported + // rust version is 1.93. To clarify, statx support is + // introduced to musl in 1.25 as mentioned officially here: + // https://musl.libc.org/releases.html. + // However, rustup target_env building for *-linux-musl + // uses 1.25 musl on all *-linux-musl platforms starting + // in 1.93 stable rust version. + // https://blog.rust-lang.org/2025/12/05/Updating-musl-1.2.5/ + any(target_env = "gnu", target_os = "android") + ))] + { + let handle = crate::runtime::Handle::current(); + let driver_handle = handle.inner.driver().io(); + if driver_handle + .check_and_init(io_uring::opcode::Statx::CODE) + .await? + { + return try_exists_uring(path).await; + } + } + + try_exists_spawn_blocking(path).await +} + +cfg_io_uring! { + #[inline] + #[cfg( + // libc::statx is only supported on these platforms + // FIXME: Add musl target env when our minimum supported + // rust version is 1.93. To clarify, statx support is + // introduced to musl in 1.25 as mentioned officially here: + // https://musl.libc.org/releases.html. + // However, rustup target_env building for *-linux-musl + // uses 1.25 musl on all *-linux-musl platforms starting + // in 1.93 stable rust version. + // https://blog.rust-lang.org/2025/12/05/Updating-musl-1.2.5/ + any(target_env = "gnu", target_os = "android") + )] + async fn try_exists_uring(path: &Path) -> io::Result { + use crate::runtime::driver::op::Op; + + match Op::metadata(path)?.await { + Ok(_) => Ok(true), + Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(false), + Err(error) => Err(error), + } + } +} + +async fn try_exists_spawn_blocking(path: &Path) -> io::Result { + let path = path.to_owned(); + // FIXME: When MSRV is 1.81, change this to + // std::fs::exists() to be consistent with + // all other tokio::fs operations asyncify(move || path.try_exists()).await } diff --git a/tokio/src/io/uring/mod.rs b/tokio/src/io/uring/mod.rs index facad596f63..c398f3d88e5 100644 --- a/tokio/src/io/uring/mod.rs +++ b/tokio/src/io/uring/mod.rs @@ -1,4 +1,5 @@ pub(crate) mod open; pub(crate) mod read; +pub(crate) mod statx; pub(crate) mod utils; pub(crate) mod write; diff --git a/tokio/src/io/uring/statx.rs b/tokio/src/io/uring/statx.rs new file mode 100644 index 00000000000..06a3e7e7bb8 --- /dev/null +++ b/tokio/src/io/uring/statx.rs @@ -0,0 +1,161 @@ +#![cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + // libc::statx is only supported on these platforms + // FIXME: Add musl target env when our minimum supported + // rust version is 1.93. To clarify, statx support is + // introduced to musl in 1.25 as mentioned officially here: + // https://musl.libc.org/releases.html. + // However, rustup target_env building for *-linux-musl + // uses 1.25 musl on all *-linux-musl platforms starting + // in 1.93 stable rust version. + // https://blog.rust-lang.org/2025/12/05/Updating-musl-1.2.5/ + any(target_env = "gnu", target_os = "android") +))] + +use crate::fs::File; +use crate::io::uring::utils::cstr; +use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}; +use io_uring::{opcode, types}; +use libc::statx; +use std::ffi::{CStr, CString}; +use std::fmt::{Debug, Formatter}; +use std::io; +use std::mem::MaybeUninit; +use std::os::fd::AsRawFd; +use std::path::Path; + +pub(crate) struct Metadata(statx); + +impl Metadata { + /// Returns the size of the file, in bytes, this metadata is for. + pub(crate) fn len(&self) -> u64 { + self.0.stx_size + } +} + +impl Debug for Metadata { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut debug = f.debug_struct("Metadata"); + debug.field("len", &self.len()); + debug.finish_non_exhaustive() + } +} + +#[derive(Debug)] +pub(crate) struct Statx { + /// This field will be read by the kernel during the operation, so we + /// need to ensure it is valid for the entire duration of the operation. + _path: CString, + buffer: Box>, +} + +impl Completable for Statx { + type Output = io::Result; + + fn complete(self, cqe: CqeResult) -> Self::Output { + // SAFETY: On success, we always receive 0, which should guarantee + // that the information about a file is stored inside the + // statx buffer. On failure, we'll receive an Error value. + // Refer to man page description and return value: + // https://man7.org/linux/man-pages/man2/statx.2.html + cqe.result + .map(|_| Metadata(unsafe { *self.buffer.as_ptr() })) + } + + fn complete_with_error(self, error: io::Error) -> Self::Output { + Err(error) + } +} + +impl Cancellable for Statx { + fn cancel(self) -> CancelData { + CancelData::Statx(self) + } +} + +impl Op { + /// Submit a request to retrieve a file's status. + #[inline] + fn statx(path: &Path, flags: i32) -> io::Result> { + let path = cstr(path)?; + let mut buffer = Box::new(MaybeUninit::::uninit()); + + let statx_op = opcode::Statx::new( + types::Fd(libc::AT_FDCWD), + path.as_ptr(), + buffer.as_mut_ptr().cast(), + ) + .flags(flags) + .mask(libc::STATX_BASIC_STATS | libc::STATX_BTIME) + .build(); + + // SAFETY: Parameters are valid for the entire duration of the operation + Ok(unsafe { + Op::new( + statx_op, + Statx { + _path: path, + buffer, + }, + ) + }) + } + + /// Retrieves the metadata information of the given path, following symlinks + /// if the path provided points to a symlink location. + #[inline] + pub(crate) fn metadata(path: &Path) -> io::Result> { + Op::statx(path, libc::AT_STATX_SYNC_AS_STAT) + } + + /// Retrieves the metadata information of the given file + pub(crate) fn file_metadata(file: &File) -> io::Result> { + let mut buffer = Box::new(MaybeUninit::::uninit()); + let empty_path: &'static CStr = c""; + + // io-uring was introduced in linux 5.1 + // pass in an empty path instead of null to target the file descriptor + // status as specified by man: + // https://man7.org/linux/man-pages/man2/statx.2.html + let statx_op = opcode::Statx::new( + types::Fd(file.as_raw_fd()), + // it should be fine to pass in `empty_path` whose lifetime + // does not exceed the `file_metadata()` function as a ptr here + // because we want to stat the dirfd not this pathname + empty_path.as_ptr(), + buffer.as_mut_ptr().cast(), + ) + .flags(libc::AT_STATX_SYNC_AS_STAT | libc::AT_EMPTY_PATH) + .mask(libc::STATX_BASIC_STATS | libc::STATX_BTIME) + .build(); + + // SAFETY: Parameters are valid for the entire duration of the operation + Ok(unsafe { + Op::new( + statx_op, + Statx { + _path: empty_path.into(), + buffer, + }, + ) + }) + } + + // TODO: Once `Metadata::from_statx` is stabilized, we can use use this function + // to enable io-uring support on `tokio::fs::symlink_metadata`. + // See this PR for more detail: https://github.com/tokio-rs/tokio/pull/8080 + // See `Metadata::from_statx` tracking issue to see progress: + // https://github.com/rust-lang/rust/issues/156268 + /// Retrieves the metadata information of the given path without following symlinks. + #[inline] + #[allow(dead_code)] + pub(crate) fn symlink_metadata(path: &Path) -> io::Result> { + Op::statx( + path, + libc::AT_STATX_SYNC_AS_STAT | libc::AT_SYMLINK_NOFOLLOW, + ) + } +} diff --git a/tokio/src/runtime/driver/op.rs b/tokio/src/runtime/driver/op.rs index f5fe4c37bbd..16b48aee833 100644 --- a/tokio/src/runtime/driver/op.rs +++ b/tokio/src/runtime/driver/op.rs @@ -6,6 +6,19 @@ use crate::io::uring::write::Write; use crate::runtime::Handle; +#[cfg( + // libc::statx is only supported on these platforms + // FIXME: Add musl target env when our minimum supported + // rust version is 1.93. To clarify, statx support is + // introduced to musl in 1.25 as mentioned officially here: + // https://musl.libc.org/releases.html. + // However, rustup target_env building for *-linux-musl + // uses 1.25 musl on all *-linux-musl platforms starting + // in 1.93 stable rust version. + // https://blog.rust-lang.org/2025/12/05/Updating-musl-1.2.5/ + any(target_env = "gnu", target_os = "android") +)] +use crate::io::uring::statx::Statx; use io_uring::cqueue; use io_uring::squeue::Entry; use std::future::Future; @@ -24,6 +37,19 @@ pub(crate) enum CancelData { Write(Write), ReadVec(Read, OwnedFd>), ReadBuf(Read), + #[cfg( + // libc::statx is only supported on these platforms + // FIXME: Add musl target env when our minimum supported + // rust version is 1.93. To clarify, statx support is + // introduced to musl in 1.25 as mentioned officially here: + // https://musl.libc.org/releases.html. + // However, rustup target_env building for *-linux-musl + // uses 1.25 musl on all *-linux-musl platforms starting + // in 1.93 stable rust version. + // https://blog.rust-lang.org/2025/12/05/Updating-musl-1.2.5/ + any(target_env = "gnu", target_os = "android") + )] + Statx(Statx), } #[derive(Debug)] diff --git a/tokio/tests/fs_uring_statx.rs b/tokio/tests/fs_uring_statx.rs new file mode 100644 index 00000000000..3ddc5866e85 --- /dev/null +++ b/tokio/tests/fs_uring_statx.rs @@ -0,0 +1,303 @@ +//! Uring file operations tests. + +#![cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" +))] + +use futures::future::Future; +use futures::future::FutureExt; +use libc::PATH_MAX; +use std::future::poll_fn; +use std::io::Write; +use std::os::unix::fs::PermissionsExt; +use std::path::PathBuf; +use std::sync::mpsc; +use std::task::Poll; +use std::time::Duration; +use tempfile::{tempdir, NamedTempFile}; +use tokio::fs::{create_dir, metadata, set_permissions, symlink, try_exists, write}; +use tokio::runtime::{Builder, Runtime}; +use tokio_test::assert_pending; +use tokio_util::task::TaskTracker; + +use crate::support::io_uring::io_uring_supported; + +mod support { + pub(crate) mod io_uring; +} + +fn multi_rt(n: usize) -> Box Runtime> { + Box::new(move || { + Builder::new_multi_thread() + .worker_threads(n) + .enable_all() + .build() + .unwrap() + }) +} + +fn current_rt() -> Box Runtime> { + Box::new(|| Builder::new_current_thread().enable_all().build().unwrap()) +} + +fn rt_combinations() -> Vec Runtime>> { + vec![ + current_rt(), + multi_rt(1), + multi_rt(2), + multi_rt(8), + multi_rt(64), + multi_rt(256), + ] +} + +#[test] +fn shutdown_runtime_while_performing_io_uring_ops() { + if !io_uring_supported() { + return; + } + + fn run(rt: Runtime) { + let (done_tx, done_rx) = mpsc::channel(); + let (_tmp, path) = create_tmp_files(1); + // keep 100 permits + const N: i32 = 100; + rt.spawn(async move { + let path = path[0].clone(); + + // spawning a bunch of uring operations. + let mut futs = vec![]; + + // spawning a bunch of uring operations. + for _ in 0..N { + let path = path.clone(); + let mut fut = Box::pin(try_exists(path)); + + poll_fn(|cx| { + assert_pending!(fut.as_mut().poll(cx)); + Poll::<()>::Pending + }) + .await; + + futs.push(fut); + } + + tokio::task::yield_now().await; + }); + + std::thread::spawn(move || { + rt.shutdown_timeout(Duration::from_millis(300)); + done_tx.send(()).unwrap(); + }); + + done_rx.recv().unwrap(); + } + + for rt in rt_combinations() { + run(rt()); + } +} + +#[test] +fn stat_many_files() { + fn run(rt: Runtime) { + const NUM_FILES: usize = 512; + + let (_tmp_files, paths): (Vec, Vec) = create_tmp_files(NUM_FILES); + + rt.block_on(async move { + let tracker = TaskTracker::new(); + + for i in 0..10_000 { + let path = paths.get(i % NUM_FILES).unwrap().clone(); + tracker.spawn(async move { + let exists = try_exists(path).await.unwrap(); + assert!(exists); + }); + } + tracker.close(); + tracker.wait().await; + }); + } + + for rt in rt_combinations() { + run(rt()); + } +} + +#[tokio::test] +async fn stat_small_large_files() { + let (_tmp, path) = create_large_temp_file(); + + let exists = try_exists(path).await.unwrap(); + assert!(exists); + + let (_tmp, path) = create_small_temp_file(); + + let exists = try_exists(path).await.unwrap(); + assert!(exists); +} + +#[tokio::test] +async fn stat_nonexistent_file() { + let path = tempdir().unwrap().path().join("nonexistent_path"); + let exists = try_exists(path).await.unwrap(); + assert!(!exists); +} + +// Error is not produced on Linux 4.19 (Linux 7.1 it works) +#[tokio::test] +#[cfg_attr(miri, ignore)] // No `chmod` in miri. +#[cfg(unix)] +async fn stat_permission_denied() { + if !io_uring_supported() { + return; + } + + let dir = tempdir().unwrap(); + let permission_denied_directory_path = dir.path().join("baz"); + create_dir(&permission_denied_directory_path).await.unwrap(); + let permission_denied_file_path = permission_denied_directory_path.join("baz.txt"); + write(&permission_denied_file_path, b"Hello File!") + .await + .unwrap(); + let mut perms = metadata(&permission_denied_directory_path) + .await + .unwrap() + .permissions(); + + perms.set_mode(0o244); + set_permissions(&permission_denied_directory_path, perms) + .await + .unwrap(); + let permission_denied_result = try_exists(permission_denied_file_path).await; + assert_eq!( + permission_denied_result + .err() + .unwrap() + .raw_os_error() + .unwrap(), + libc::EACCES + ); +} + +#[tokio::test] +#[cfg(unix)] +async fn stat_filesystem_loop() { + let dir = tempdir().unwrap(); + let first_symlink = dir.path().join("bar"); + let second_symlink = dir.path().join("foo"); + symlink(&first_symlink, &second_symlink).await.unwrap(); + symlink(&second_symlink, &first_symlink).await.unwrap(); + + // Both symlinks loop on each other, so stating either one should produce + // a file system loop error. This produces a `std::io::ErrorKind::FilesystemLoop` + // error, but that error is gated behind io_error_more feature, and we can't be + // sure if that name will ever be changed, so preferred using libc::ELOOP instead + let filesystem_loop_result = try_exists(first_symlink).await; + assert_eq!( + filesystem_loop_result + .err() + .unwrap() + .raw_os_error() + .unwrap(), + libc::ELOOP + ); + let filesystem_loop_result = try_exists(second_symlink).await; + assert_eq!( + filesystem_loop_result + .err() + .unwrap() + .raw_os_error() + .unwrap(), + libc::ELOOP + ); +} + +#[tokio::test] +async fn stat_path_name_too_long() { + let dir = tempdir().unwrap(); + // if we stat a file whose name is above PATH_MAX (Linux is 4096 bytes, Windows 260 chars or 32767 chars if extended + // path is permitted), we should receive an std::io::ErrorKind::InvalidFilename error + let long_nonexistent_path = dir.path().join(vec!["a"; (PATH_MAX + 1) as usize].join("")); + let name_too_long_result = try_exists(long_nonexistent_path).await; + assert_eq!( + name_too_long_result.err().unwrap().kind(), + std::io::ErrorKind::InvalidFilename + ); +} + +#[tokio::test] +async fn cancel_op_future() { + if !io_uring_supported() { + return; + } + + let (_tmp_file, path): (Vec, Vec) = create_tmp_files(1); + let path = path[0].clone(); + + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + + let handle = tokio::spawn(async move { + poll_fn(|cx| { + let fut = try_exists(path.clone()); + + // the first poll should return Pending. + assert_pending!(Box::pin(fut).poll_unpin(cx)); + tx.send(true).unwrap(); + + Poll::<()>::Pending + }) + .await; + }); + + // Wait for the first poll + let val = rx.recv().await; + assert!(val.unwrap()); + + handle.abort(); + + let res = handle.await.unwrap_err(); + assert!(res.is_cancelled()); +} + +fn create_tmp_files(num_files: usize) -> (Vec, Vec) { + let mut files = Vec::with_capacity(num_files); + for _ in 0..num_files { + let mut tmp = NamedTempFile::new().unwrap(); + let buf = vec![20; 1023]; + tmp.write_all(&buf).unwrap(); + let path = tmp.path().to_path_buf(); + files.push((tmp, path)); + } + + files.into_iter().unzip() +} + +fn create_large_temp_file() -> (NamedTempFile, PathBuf) { + let mut tmp = NamedTempFile::new().unwrap(); + let buf = create_buf(5000); + + tmp.write_all(&buf).unwrap(); + let path = tmp.path().to_path_buf(); + + (tmp, path) +} + +fn create_small_temp_file() -> (NamedTempFile, PathBuf) { + let mut tmp = NamedTempFile::new().unwrap(); + let buf = create_buf(20); + + tmp.write_all(&buf).unwrap(); + let path = tmp.path().to_path_buf(); + + (tmp, path) +} + +fn create_buf(length: usize) -> Vec { + (0..length).map(|i| i as u8).collect() +} diff --git a/tokio/tests/fs_uring_statx_fd_leak_test.rs b/tokio/tests/fs_uring_statx_fd_leak_test.rs new file mode 100644 index 00000000000..c9d1efad1a1 --- /dev/null +++ b/tokio/tests/fs_uring_statx_fd_leak_test.rs @@ -0,0 +1,161 @@ +#![cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" +))] + +mod support { + pub(crate) mod io_uring; +} + +use std::fmt::Debug; +use std::fs; +use std::future::Future; +use std::path::PathBuf; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +use tempfile::NamedTempFile; +use tokio::fs::read; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; +use tokio::time::timeout; +use tokio_test::assert_pending; + +use crate::support::io_uring::io_uring_supported; + +/// Count currently-open fds in this process. +fn fd_count() -> usize { + fs::read_dir("/proc/self/fd").unwrap().count() +} + +/// First poll: +/// - polls the inner `tokio::fs::read()` future once, +/// - expects `Pending` so we know we took the io_uring path, +/// - registers the task waker with Tokio's uring machinery. +/// +/// Second poll: +/// - happens after the kernel completes the open and Tokio stores the CQE as +/// `Lifecycle::Completed(cqe)` and wakes the task, +/// - polls inner future again to execute `Op::file_metadata`/Statx operation +/// +/// Third poll: +/// - happens after the kernel completes the statx and Tokio stores the CQE as +/// `Lifecycle::Completed(cqe)` and wakes the task, +/// - stays pending forever so the task can be aborted. +/// +/// Aborting the task here drops the inner `Op::file_metadata()` future while Tokio +/// still has a completed CQE sitting in the slab. +struct PollOpenOnceThenNeverRepoll { + inner: Pin>, + poll_senders: Vec>, + poll_pending_counts: usize, +} + +impl Future for PollOpenOnceThenNeverRepoll +where + F::Output: Debug, +{ + type Output = F::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let poll_count = self.poll_pending_counts; + let sender_count = self.poll_senders.len(); + if poll_count < sender_count { + // The first two poll should return `Poll::Pending`` because it verifies + // that io_uring is enabled and then executes the open operation in + // `tokio::fs::read` + if poll_count != sender_count - 1 { + assert_pending!(self.inner.as_mut().poll(cx)); + } + self.poll_pending_counts += 1; + self.poll_senders[poll_count].send(()).unwrap(); + } + + Poll::Pending + } +} + +async fn completed_then_dropped_before_repoll(path: PathBuf) { + // `tokio::fs::read` has an Open operation occurring (via OpenOptions) before doing a + // Statx operation (via `Op::file_metadata`), we must poll and complete the Open operation + // first to then be able to poll the Statx operation. + const POLL_PENDING_COUNT: usize = 3; + let mut poll_senders: Vec> = Vec::new(); + let mut poll_receivers = Vec::new(); + + for _ in 0..POLL_PENDING_COUNT { + let (sender, receiver) = unbounded_channel(); + poll_senders.push(sender); + poll_receivers.push(receiver); + } + + let handle = tokio::spawn(async move { + let fut = read(&path); + PollOpenOnceThenNeverRepoll { + inner: Box::pin(fut), + poll_senders, + poll_pending_counts: 0, + } + .await + }); + + for (i, poll_receiver) in poll_receivers + .iter_mut() + .enumerate() + .take(POLL_PENDING_COUNT) + { + // Wait until the inner open has been polled once and registered with io_uring. + if i == 0 { + poll_receiver.recv().await.unwrap(); + } else { + // Wait until Tokio wakes the task because the open/statx completed. At this point + // the CQE should already be stored as `Lifecycle::Completed(cqe)`. + let _ = timeout(Duration::from_secs(2), poll_receiver.recv()).await; + } + } + + // Abort now, before the inner statx future gets re-polled and consumes the CQE. + handle.abort(); + let err = handle.await.unwrap_err(); + assert!(err.is_cancelled(), "task was not cancelled as expected"); +} + +#[test] +fn uring_completed_then_dropped() { + if !io_uring_supported() { + return; + } + + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + + rt.block_on(async { + let before = fd_count(); + let tmp = NamedTempFile::new().unwrap(); + let path = tmp.path().to_path_buf(); + + for _ in 0..128 { + completed_then_dropped_before_repoll(path.clone()).await; + } + + // Give completions a moment to settle before counting fds. + tokio::time::sleep(Duration::from_millis(250)).await; + + let after = fd_count(); + let leaked = after.saturating_sub(before); + + // Since we are opening 128 files, we expect that the related fds + // related to this operation will be closed. Since some other fds + // can be opened in the meantime, we expect this number to be higher + // than the counter before opening the files. This number could be + // lower, but to avoid test flakiness we check that this is at most + // half the number of the file we opened to check if there's a leak. + assert!(leaked <= 64); + }); +}