-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Open
Labels
bugSomething isn't workingSomething isn't working
Description
Bug summary
Original slack message: https://prefect-community.slack.com/archives/CL09KU1K7/p1738591882063619
Thanks to the latest release (3.1.15), I'm trying to make use of the new utility function run_flow_in_subprocess
to create and run subflows with multiprocessing. Here is a simplified example of what i'm trying to do:
import asyncio
import multiprocessing
from prefect import flow, task
from prefect.flow_engine import run_flow_in_subprocess
@task
async def long_running_task(sleep: int):
await asyncio.sleep(sleep)
@flow
async def my_flow(items: list[int]):
return await asyncio.gather(*[long_running_task(i) for i in items])
@flow
async def my_flow_distributed(items: list[int]):
n_procs = multiprocessing.cpu_count()
batch_size = len(items) // n_procs
procs = []
for i in range(0, len(items), batch_size):
proc = run_flow_in_subprocess(flow=my_flow, parameters={"items": items[i : i + batch_size]})
procs.append(proc)
exit_codes = [p.join() for p in procs]
if any(exit_codes):
raise ValueError()
return True
if __name__ == "__main__":
items = list(range(10000))
asyncio.run(my_flow_distributed(items))
It works but logs are missing in the parent flow:
- Logs not showing in the parent flow
- Tasks and logs missing in the sub flows
while locally I can see the logs:
Version info
Version: 3.1.15
API version: 0.8.4
Python version: 3.12.3
Git commit: 3ac3d548
Built: Thu, Jan 30, 2025 11:31 AM
OS/Arch: linux/x86_64
Profile: production
Server type: server
Pydantic version: 2.10.6
Integrations:
prefect-slack: 0.3.1
prefect-dask: 0.3.2
prefect-aws: 0.5.3
Additional context
No response
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working