11import asyncio
22import math
3- import multiprocessing
4- import multiprocessing .queues
53import time
64from collections .abc import AsyncGenerator , Iterable , Iterator
75from concurrent .futures import ProcessPoolExecutor
6+ from multiprocessing import Manager
7+ from queue import Empty as QueueEmpty
8+ from queue import Queue
9+ from threading import Event
810from typing import (
911 Any ,
1012 Generic ,
1517from loguru import logger
1618
1719from guidellm .config import settings
20+ from guidellm .request .session import RequestSession
1821from guidellm .scheduler .result import (
22+ MPQueues ,
1923 SchedulerRequestResult ,
2024 SchedulerResult ,
2125 SchedulerRunInfo ,
26+ WorkerProcessRequestTime ,
27+ WorkerProcessResult ,
2228)
2329from guidellm .scheduler .strategy import SchedulingStrategy
24- from guidellm .scheduler .types import RequestT , ResponseT
30+ from guidellm .scheduler .types import (
31+ RequestT ,
32+ ResponseT ,
33+ )
2534from guidellm .scheduler .worker import (
2635 RequestsWorker ,
27- WorkerProcessRequest ,
28- WorkerProcessResult ,
2936)
3037
3138__all__ = ["Scheduler" ]
@@ -114,13 +121,13 @@ async def run(
114121 raise ValueError (f"Invalid max_duration: { max_duration } " )
115122
116123 with (
117- multiprocessing . Manager () as manager ,
124+ Manager () as manager ,
118125 ProcessPoolExecutor (
119126 max_workers = scheduling_strategy .processes_limit
120127 ) as executor ,
121128 ):
122129 requests_iter : Optional [Iterator [Any ]] = None
123- futures , requests_queue , responses_queue = await self ._start_processes (
130+ futures , queues , stop_event = await self ._start_processes (
124131 manager , executor , scheduling_strategy
125132 )
126133 run_info , requests_iter , times_iter = self ._run_setup (
@@ -149,13 +156,14 @@ async def run(
149156 requests_iter = self ._add_requests (
150157 requests_iter ,
151158 times_iter ,
152- requests_queue ,
159+ queues .requests ,
160+ queues .times ,
153161 run_info ,
154162 )
155163 await asyncio .sleep (0 ) # enable requests to start
156164
157165 iter_result = self ._check_result_ready (
158- responses_queue ,
166+ queues . responses ,
159167 run_info ,
160168 )
161169 if iter_result is not None :
@@ -171,7 +179,7 @@ async def run(
171179 run_info = run_info ,
172180 )
173181
174- await self ._stop_processes (futures , requests_queue )
182+ await self ._stop_processes (futures , stop_event )
175183
176184 async def _start_processes (
177185 self ,
@@ -180,14 +188,18 @@ async def _start_processes(
180188 scheduling_strategy : SchedulingStrategy ,
181189 ) -> tuple [
182190 list [asyncio .Future ],
183- multiprocessing . Queue ,
184- multiprocessing . Queue ,
191+ MPQueues [ RequestT , ResponseT ] ,
192+ Event ,
185193 ]:
186194 await self .worker .prepare_multiprocessing ()
187- requests_queue = manager .Queue (
188- maxsize = scheduling_strategy .queued_requests_limit
195+ queues : MPQueues [RequestT , ResponseT ] = MPQueues (
196+ requests = manager .Queue (
197+ maxsize = scheduling_strategy .processing_requests_limit
198+ ),
199+ times = manager .Queue (maxsize = scheduling_strategy .processing_requests_limit ),
200+ responses = manager .Queue (),
189201 )
190- responses_queue = manager .Queue ()
202+ stop_event = manager .Event ()
191203
192204 num_processes = min (
193205 scheduling_strategy .processes_limit ,
@@ -212,36 +224,21 @@ async def _start_processes(
212224 futures = []
213225 loop = asyncio .get_event_loop ()
214226 for id_ , requests_limit in zip (process_ids , process_requests_limits ):
215- if scheduling_strategy .processing_mode == "sync" :
216- futures .append (
217- loop .run_in_executor (
218- executor ,
219- self .worker .process_loop_synchronous ,
220- requests_queue ,
221- responses_queue ,
222- id_ ,
223- )
224- )
225- elif scheduling_strategy .processing_mode == "async" :
226- futures .append (
227- loop .run_in_executor (
228- executor ,
229- self .worker .process_loop_asynchronous ,
230- requests_queue ,
231- responses_queue ,
232- requests_limit ,
233- id_ ,
234- )
235- )
236- else :
237- raise ValueError (
238- f"Invalid processing mode: { scheduling_strategy .processing_mode } "
239- f"for strategy: { scheduling_strategy } "
227+ futures .append (
228+ loop .run_in_executor (
229+ executor ,
230+ self .worker .process_loop_asynchronous ,
231+ queues ,
232+ stop_event ,
233+ False , # TODO: Make configurable
234+ requests_limit ,
235+ id_ ,
240236 )
237+ )
241238
242239 await asyncio .sleep (0.1 ) # give time for processes to start
243240
244- return futures , requests_queue , responses_queue
241+ return futures , queues , stop_event
245242
246243 def _run_setup (
247244 self ,
@@ -284,7 +281,8 @@ def _add_requests(
284281 self ,
285282 requests_iter : Optional [Iterator [Any ]],
286283 times_iter : Iterator [float ],
287- requests_queue : multiprocessing .Queue ,
284+ requests_queue : Queue [RequestSession [RequestT , ResponseT ]],
285+ times_queue : Queue [WorkerProcessRequestTime ],
288286 run_info : SchedulerRunInfo ,
289287 ) -> Optional [Iterator [Any ]]:
290288 if requests_iter is not None :
@@ -298,23 +296,24 @@ def _add_requests(
298296 if run_info .created_requests >= run_info .end_number :
299297 raise StopIteration
300298
301- if (
302- request_time := next (times_iter )
303- ) >= run_info .end_time or time .time () >= run_info .end_time :
304- raise StopIteration
305-
306- request = next (requests_iter )
307- work_req : WorkerProcessRequest [RequestT ] = WorkerProcessRequest (
308- request = request ,
309- start_time = request_time ,
310- timeout_time = run_info .end_time ,
311- queued_time = time .time (),
312- )
313- requests_queue .put (work_req )
314-
315- run_info .created_requests += 1
316- run_info .queued_requests += 1
317- added_count += 1
299+ session = next (requests_iter )
300+ requests_queue .put (session )
301+ for _ in range (len (session )):
302+ if (
303+ request_time := next (times_iter )
304+ ) >= run_info .end_time or time .time () >= run_info .end_time :
305+ raise StopIteration
306+
307+ work_req = WorkerProcessRequestTime (
308+ start_time = request_time ,
309+ timeout_time = run_info .end_time ,
310+ queued_time = time .time (),
311+ )
312+ times_queue .put (work_req )
313+
314+ run_info .created_requests += 1
315+ run_info .queued_requests += 1
316+ added_count += 1
318317 except StopIteration :
319318 # we've reached the limit number, limit time, or exhausted the requests
320319 # set to None to stop adding more and tell the loop no more requests
@@ -324,14 +323,14 @@ def _add_requests(
324323
325324 def _check_result_ready (
326325 self ,
327- responses_queue : multiprocessing . Queue ,
326+ responses_queue : Queue [ WorkerProcessResult [ RequestT , ResponseT ]] ,
328327 run_info : SchedulerRunInfo ,
329328 ) -> Optional [SchedulerRequestResult [RequestT , ResponseT ]]:
330329 try :
331330 process_response : WorkerProcessResult [RequestT , ResponseT ] = (
332331 responses_queue .get_nowait ()
333332 )
334- except multiprocessing . queues . Empty : # type: ignore[attr-defined]
333+ except QueueEmpty :
335334 return None
336335
337336 if process_response .type_ == "request_scheduled" :
@@ -374,9 +373,9 @@ def _check_result_ready(
374373 async def _stop_processes (
375374 self ,
376375 futures : list [asyncio .Future ],
377- requests_queue : multiprocessing . Queue ,
376+ stop_event : Event ,
378377 ):
379- for _ in futures :
380- requests_queue . put ( None )
378+ # stop all processes
379+ stop_event . set ( )
381380
382381 await asyncio .gather (* futures )
0 commit comments