Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: yield when the next file is ready to open to prevent CPU starvation #14028

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion datafusion/core/src/datasource/physical_plan/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,12 @@ impl<F: FileOpener> FileStream<F> {
reader,
)),
partition_values,
}
};
// Return control to the runtime when we're ready to open the next file
// to prevent uncancellable queries in scenarios with many large files.
// This functions similarly to a `tokio::task::yield_now()`.
cx.waker().wake_by_ref();
return Poll::Pending;
Comment on lines +485 to +486
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may not be correct. The wake_by_ref should be called AFTER returning Pending. I think you may be lucky that this works, but it's undefined behavior. If I understand the yield_now impl. correctly, it tries to delay the "wake" call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it was odd too (I had seen this pattern elsewhere) but looking at the implementation of yield_now, when you poll it does a context::defer and then returns Pending (source). context::defer will directly call wake_by_ref if called from outside of the runtime (source).

I could be misunderstanding what "outside the runtime" means in this case/how that will actually interact with the yield.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the multi-thread RT (which is the only thing we really care about, I think it's NOT instantly calling wake_by_ref instantly:

  1. yield_now: https://github.com/tokio-rs/tokio/blob/bd3e8577377a2b684b50fc0cb50d98f03ad09703/tokio/src/task/yield_now.rs#L57
  2. context::defer: https://github.com/tokio-rs/tokio/blob/bd3e8577377a2b684b50fc0cb50d98f03ad09703/tokio/src/runtime/context.rs#L166-L177
  3. with_scheduler: https://github.com/tokio-rs/tokio/blob/bd3e8577377a2b684b50fc0cb50d98f03ad09703/tokio/src/runtime/context.rs#L183-L195
  4. (via some indirection) Context::defer: https://github.com/tokio-rs/tokio/blob/bd3e8577377a2b684b50fc0cb50d98f03ad09703/tokio/src/runtime/scheduler/mod.rs#L287-L289
  5. (via some indirection) Context::defer (different Context this time): https://github.com/tokio-rs/tokio/blob/bd3e8577377a2b684b50fc0cb50d98f03ad09703/tokio/src/runtime/scheduler/multi_thread/worker.rs#L766-L768
  6. Defer::defer: https://github.com/tokio-rs/tokio/blob/bd3e8577377a2b684b50fc0cb50d98f03ad09703/tokio/src/runtime/scheduler/defer.rs#L15-L26
  7. The deferred threads are woken in two places (1, 2) for which the 2nd one seems relevant. The actual waking will happen when the worker is -- I think -- unparked after a "park timeout". This is definitely long after Pending is returned, because the thread that is returning Pending is the very same worker, i.e. it cannot possibly be parked at this point in time.

Copy link
Contributor

@alamb alamb Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be possible to call yield_now directly rather than trying to insert waker manipulation directly into the file opener

Like could we do something like

let future = tokio::task::yield_now()

And return future.poll_next() 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may not be correct. The wake_by_ref should be called AFTER returning Pending. I think you may be lucky that this works, but it's undefined behavior. If I understand the yield_now impl. correctly, it tries to delay the "wake" call.

I was thinking similarly, but those have changed my mind:
https://github.com/rust-lang/rust/blob/1f81f906893d166d05fb4839f169983f2b564cc7/library/core/src/task/wake.rs#L423-L426

and this example:
https://github.com/rust-lang/rust/blob/1f81f906893d166d05fb4839f169983f2b564cc7/library/core/src/task/wake.rs#L703-L704

I believe the usage is not a problem, but yielding after being ready to open the next file seems not the correct solution to me. Does this also start to degrade the throughput for small files? Do we have any example of this in the codebase (returning pending not because of an IO)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may not be correct. The wake_by_ref should be called AFTER returning Pending. I think you may be lucky that this works, but it's undefined behavior. If I understand the yield_now impl. correctly, it tries to delay the "wake" call.

I was thinking similarly, but those have changed my mind: rust-lang/rust@1f81f90/library/core/src/task/wake.rs#L423-L426

and this example: rust-lang/rust@1f81f90/library/core/src/task/wake.rs#L703-L704

good point, thanks

Do we have any example of this in the codebase (returning pending not because of an IO)?

We do, see #5299 .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If its clearer, we can remove these lines and change the lines above it to

self.state = FileStreamState::Open {
  future: Box::pin(async {
    yield_now().await;
    reader
  }),
  partition_values,
};

This works because this future gets polled in the next iteration of the loop when we transition to the Open state.

}
}
}
Expand Down
Loading