Skip to content
Merged
39 changes: 39 additions & 0 deletions Lib/test/test_asyncio/test_free_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,45 @@ async def main():
loop.set_task_factory(self.factory)
r.run(main())

def test_all_tasks_from_other_thread_includes_eager_tasks(self):
# gh-152020: all_tasks() called from another thread used to drop
# eager-started tasks on free-threaded builds.
loop = asyncio.new_event_loop()

async def wait_forever():
await asyncio.Event().wait()

def eager_factory(loop, coro, **kwargs):
return self.factory(loop, coro, eager_start=True, **kwargs)

async def setup():
loop.set_task_factory(eager_factory)
eager = loop.create_task(wait_forever(), name="EAGER")
loop.set_task_factory(None)
normal = loop.create_task(wait_forever(), name="NORMAL")
return eager, normal

async def teardown():
tasks = [t for t in asyncio.all_tasks()
if t is not asyncio.current_task()]
for t in tasks:
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)

thread = threading.Thread(target=loop.run_forever)
thread.start()
try:
held = asyncio.run_coroutine_threadsafe(setup(), loop).result()
names = {t.get_name() for t in asyncio.all_tasks(loop)}
self.assertIn("NORMAL", names)
self.assertIn("EAGER", names)
del held
finally:
asyncio.run_coroutine_threadsafe(teardown(), loop).result()
loop.call_soon_threadsafe(loop.stop)
thread.join()
loop.close()


class TestPyFreeThreading(TestFreeThreading, TestCase):

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
On the free-threaded build, :func:`asyncio.all_tasks` no longer loses
eager-started tasks when called from a thread other than the one running the
event loop.
10 changes: 5 additions & 5 deletions Modules/_asynciomodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -2366,6 +2366,11 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop,
return -1;
}
_PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET();
#ifdef Py_GIL_DISABLED
// This is required so that _Py_TryIncref(self)
// works correctly in non-owning threads.
_PyObject_SetMaybeWeakref((PyObject *)self);
#endif
if (eager_start) {
PyObject *res = PyObject_CallMethodNoArgs(loop, &_Py_ID(is_running));
if (res == NULL) {
Expand All @@ -2384,11 +2389,6 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop,
if (task_call_step_soon(state, self, NULL)) {
return -1;
}
#ifdef Py_GIL_DISABLED
// This is required so that _Py_TryIncref(self)
// works correctly in non-owning threads.
_PyObject_SetMaybeWeakref((PyObject *)self);
#endif
register_task(ts, self);
return 0;
}
Expand Down
Loading