Skip to content

Commit 81259c4

Browse files
Totktonadaylobankov
authored andcommitted
dispatcher: lift pipe buffer size restriction
A task queue dispatcher puts all the tasks to the task queue at startup. Then workers are started and are taking the tasks from it. If there are many tasks in a task group (which roughly corresponds to a test suite), we can reach the pipe buffer size on putting into the queue, because `multiprocessing.SimpleQueue` uses a pipe under the hood. The solution is to use `multiprocessing.Queue`, which has an intermediate buffer before the underlying pipe and writes to the pipe in a background thread, without blocking a thread that calls `<queue>.put()`. The `Queue` API is a superset of the `SimpleQueue` API, so we can just replace the implementation. Let's also use `Queue` for the worker's output queue to be on the safe side and for consistency. Fixes #287
1 parent 1037299 commit 81259c4

File tree

1 file changed

+11
-18
lines changed

1 file changed

+11
-18
lines changed

dispatcher.py

+11-18
Original file line numberDiff line numberDiff line change
@@ -8,30 +8,23 @@
88

99
import multiprocessing
1010

11-
# SimpleQueue is available from multiprocessing.queues on
12-
# all Python versions known at the moment of writting the code
13-
# (up to 3.9).
14-
#
15-
# It was additionally exposed directly from the multiprocessing
16-
# module since Python 3.3 ([1]).
11+
# Queue is available from multiprocessing.queues on all Python
12+
# versions known at the moment of writting the code (up to 3.12).
1713
#
1814
# However the mandatory argument 'ctx'
1915
# (see multiprocessing.get_context()) was added to the constructor
20-
# of SimpleQueue from multiprocessing.queues since Python 3.4
21-
# ([2]).
16+
# of Queue from multiprocessing.queues since Python 3.4 ([1]).
2217
#
23-
# So we should import SimpleQueue from multiprocessing on
24-
# Python 3.3+ (and must to do so on Python 3.4+) to uniformly
25-
# instantiate it (without constructor arguments).
18+
# So we should import Queue from multiprocessing on Python 3.4+
19+
# to uniformly instantiate it (without constructor arguments).
2620
#
27-
# [1]: https://bugs.python.org/issue11836
28-
# [2]: https://bugs.python.org/issue18999
21+
# [1]: https://bugs.python.org/issue18999
2922
try:
30-
# Python 3.3+
31-
from multiprocessing import SimpleQueue
23+
# Python 3.4+
24+
from multiprocessing import Queue
3225
except ImportError:
3326
# Python 2
34-
from multiprocessing.queues import SimpleQueue
27+
from multiprocessing.queues import Queue
3528

3629
from lib import Options
3730
from lib.sampler import sampler
@@ -363,8 +356,8 @@ def __init__(self, key, task_group, randomize):
363356
random.shuffle(self.task_ids)
364357
else:
365358
self.randomize = False
366-
self.result_queue = SimpleQueue()
367-
self.task_queue = SimpleQueue()
359+
self.result_queue = Queue()
360+
self.task_queue = Queue()
368361

369362
# Don't expose queues file descriptors over Popen to, say, tarantool
370363
# running tests.

0 commit comments

Comments
 (0)