async --> sync --> async: why is tokio freezing up? #6007
Replies: 2 comments 2 replies
-
It's because all IO and timers depend on the runtime's IO driver. Since the IO driver is operated on Tokio's threads, it is one of the things you are blocking when you block the thread. You can try using |
Beta Was this translation helpful? Give feedback.
-
@Darksonn thanks for that suggestion, that's what we eventually decided to go with a la: Codefn execute_future<Fut>(fut: Fut) -> Fut::Output
where
Fut: std::future::IntoFuture + Send,
Fut::Output: Send,
{
use std::thread;
use tokio::runtime::{Builder, Handle, Runtime, RuntimeFlavor::MultiThread};
use tokio::task;
fn new_runtime() -> Runtime {
Builder::new_current_thread().enable_all().build().unwrap()
}
match Handle::try_current() {
Ok(handle) => {
if handle.runtime_flavor() == MultiThread {
task::block_in_place(move || handle.block_on(fut.into_future()))
} else {
thread::scope(|s| {
s.spawn(move || new_runtime().block_on(fut.into_future()))
.join()
.unwrap()
})
}
}
Err(_) => new_runtime().block_on(fut.into_future()),
}
} Regarding the original code, I'm still curious if there's some operating principle of Tokio that I'm missing or misunderstanding that explains the observed behavior. At a high level your explanation makes sense -- Tokio io/timers depend on the IO driver, the IO driver is operated Tokio's threads which we block, ergo the whole program locks up. However some of the details still aren't quite clear for me: To narrow it down, when my main looks like this, the program fails. fn main() {
let rt = tokio::runtime::Runtime::new().unwrap();
let (tx, rx) = oneshot::channel();
rt.spawn(async {
let server_socket_addr = "google.com:80".to_socket_addrs().unwrap().next().unwrap();
TokioTcpStream::connect(&server_socket_addr).await.unwrap();
run_execute_future();
tx.send(()).unwrap();
});
rx.blocking_recv().unwrap();
} however when I change Code// execute_future is exactly what the library uses to call the asynchronous function within run_execute_future.
fn execute_future<Fut>(fut: Fut) -> Fut::Output
where
Fut: std::future::IntoFuture + Send,
Fut::Output: Send,
{
use std::thread;
use tokio::runtime::{Builder, Handle};
match Handle::try_current() {
Ok(handle) => {
// Tokio runtime already exists, cannot block again on the same thread.
// Spawn a new thread to run the blocking code.
thread::scope(|s| {
s.spawn(move || handle.block_on(fut.into_future()))
.join()
.unwrap()
})
}
Err(err) => {
if err.is_missing_context() {
// No existing tokio runtime context, block on a new one.
let rt = Builder::new_current_thread().enable_all().build().unwrap();
return rt.block_on(fut.into_future());
}
// ThreadLocalDestroyed error should never happen.
panic!(
"Unexpected error when trying to get current runtime: {}",
err
);
}
}
} Ergo I suppose I can infer that Tokio's I/O driver runs on "worker" threads, hence when I block on the "worker" thread spawned by |
Beta Was this translation helpful? Give feedback.
-
I have a non-standard use case where I've managed to get tokio to break, however I've been unable to understand why. At a high level I'm writing an asynchronous program, which calls into a mostly-synchronous library, but that mostly-synchronous library needs to use async for a small subsection.
Below is a full toy example that illustrates what I'm running into:
When the program is run as given, it hangs indefinitely, and from some print debugging I can see that it hangs indefinitely on the first call to
await
(insleep_for_a_bit
in the example). I can fix the program (get it to print "slept for a bit") byrt.spawn
tort.block_on
.TokioTcpStream::connect(&server_socket_addr).await.unwrap();
execute_future
(omitted, because it's not important to my question).This behavior baffles me. I understand that by blocking in
execute_future
via the call tothread::scope
I'm doing something inadvisable (blocking an async program), but I don't understand why it's breaking the program entirely. Shouldn't I still be able toawait
a future? And why doesblock_on
fix that?Even more baffling is that commenting out
TokioTcpStream::connect(&server_socket_addr).await.unwrap();
fixes the program. What does a random tcp connection have to do with tokio'ssleep
function (it's not justsleep
, apparently anyawait
call will do the trick).It seems as if there must be something deep within tokio's internals that I'm bumping into, and I'd like to understand more if anybody has the knowledge to help.Beta Was this translation helpful? Give feedback.
All reactions