Skip to content

Commit 2115c24

Browse files
committed
draft: impl lazy input consumption in mp.Pool.imap(_unordered)
1 parent 03017a8 commit 2115c24

File tree

1 file changed

+85
-24
lines changed

1 file changed

+85
-24
lines changed

Lib/multiprocessing/pool.py

Lines changed: 85 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -390,64 +390,108 @@ def _guarded_task_generation(self, result_job, func, iterable):
390390
i = -1
391391
for i, x in enumerate(iterable):
392392
yield (result_job, i, func, (x,), {})
393+
394+
except Exception as e:
395+
yield (result_job, i+1, _helper_reraises_exception, (e,), {})
396+
397+
def _guarded_task_generation_lazy(self, result_job, func, iterable,
398+
lazy_task_gen_helper):
399+
'''Provides a generator of tasks for imap and imap_unordered with
400+
appropriate handling for iterables which throw exceptions during
401+
iteration.'''
402+
if not lazy_task_gen_helper.feature_enabled:
403+
yield from self._guarded_task_generation(result_job, func, iterable)
404+
return
405+
406+
try:
407+
i = -1
408+
enumerated_iter = iter(enumerate(iterable))
409+
thread = threading.current_thread()
410+
max_generated_tasks = self._processes + lazy_task_gen_helper.buffersize
411+
412+
while thread._state == RUN:
413+
with lazy_task_gen_helper.iterator_cond:
414+
if lazy_task_gen_helper.not_finished_tasks >= max_generated_tasks:
415+
continue # wait for some task to be (picked up and) finished
416+
417+
try:
418+
i, x = enumerated_iter.__next__()
419+
except StopIteration:
420+
break
421+
422+
yield (result_job, i, func, (x,), {})
423+
lazy_task_gen_helper.tasks_generated += 1
424+
393425
except Exception as e:
394426
yield (result_job, i+1, _helper_reraises_exception, (e,), {})
395427

396-
def imap(self, func, iterable, chunksize=1):
428+
def imap(self, func, iterable, chunksize=1, buffersize=None):
397429
'''
398430
Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
399431
'''
400432
self._check_running()
401433
if chunksize == 1:
402-
result = IMapIterator(self)
434+
result = IMapIterator(self, buffersize)
403435
self._taskqueue.put(
404436
(
405-
self._guarded_task_generation(result._job, func, iterable),
406-
result._set_length
407-
))
437+
self._guarded_task_generation_lazy(result._job,
438+
func,
439+
iterable,
440+
result._lazy_task_gen_helper),
441+
result._set_length,
442+
)
443+
)
408444
return result
409445
else:
410446
if chunksize < 1:
411447
raise ValueError(
412448
"Chunksize must be 1+, not {0:n}".format(
413449
chunksize))
414450
task_batches = Pool._get_tasks(func, iterable, chunksize)
415-
result = IMapIterator(self)
451+
result = IMapIterator(self, buffersize)
416452
self._taskqueue.put(
417453
(
418-
self._guarded_task_generation(result._job,
419-
mapstar,
420-
task_batches),
421-
result._set_length
422-
))
454+
self._guarded_task_generation_lazy(result._job,
455+
mapstar,
456+
task_batches,
457+
result._lazy_task_gen_helper),
458+
result._set_length,
459+
)
460+
)
423461
return (item for chunk in result for item in chunk)
424462

425-
def imap_unordered(self, func, iterable, chunksize=1):
463+
def imap_unordered(self, func, iterable, chunksize=1, buffersize=None):
426464
'''
427465
Like `imap()` method but ordering of results is arbitrary.
428466
'''
429467
self._check_running()
430468
if chunksize == 1:
431-
result = IMapUnorderedIterator(self)
469+
result = IMapUnorderedIterator(self, buffersize)
432470
self._taskqueue.put(
433471
(
434-
self._guarded_task_generation(result._job, func, iterable),
435-
result._set_length
436-
))
472+
self._guarded_task_generation_lazy(result._job,
473+
func,
474+
iterable,
475+
result._lazy_task_gen_helper),
476+
result._set_length,
477+
)
478+
)
437479
return result
438480
else:
439481
if chunksize < 1:
440482
raise ValueError(
441483
"Chunksize must be 1+, not {0!r}".format(chunksize))
442484
task_batches = Pool._get_tasks(func, iterable, chunksize)
443-
result = IMapUnorderedIterator(self)
485+
result = IMapUnorderedIterator(self, buffersize)
444486
self._taskqueue.put(
445487
(
446-
self._guarded_task_generation(result._job,
447-
mapstar,
448-
task_batches),
449-
result._set_length
450-
))
488+
self._guarded_task_generation_lazy(result._job,
489+
mapstar,
490+
task_batches,
491+
result._lazy_task_gen_helper),
492+
result._set_length,
493+
)
494+
)
451495
return (item for chunk in result for item in chunk)
452496

453497
def apply_async(self, func, args=(), kwds={}, callback=None,
@@ -835,8 +879,7 @@ def _set(self, i, success_result):
835879
#
836880

837881
class IMapIterator(object):
838-
839-
def __init__(self, pool):
882+
def __init__(self, pool, buffersize):
840883
self._pool = pool
841884
self._cond = threading.Condition(threading.Lock())
842885
self._job = next(job_counter)
@@ -846,6 +889,7 @@ def __init__(self, pool):
846889
self._length = None
847890
self._unsorted = {}
848891
self._cache[self._job] = self
892+
self._lazy_task_gen_helper = _LazyTaskGenHelper(buffersize, self._cond)
849893

850894
def __iter__(self):
851895
return self
@@ -866,6 +910,7 @@ def next(self, timeout=None):
866910
self._pool = None
867911
raise StopIteration from None
868912
raise TimeoutError from None
913+
self._lazy_task_gen_helper.tasks_finished += 1
869914

870915
success, value = item
871916
if success:
@@ -914,6 +959,22 @@ def _set(self, i, obj):
914959
del self._cache[self._job]
915960
self._pool = None
916961

962+
#
963+
# Class to store stats for lazy task generation and share them
964+
# between the main thread and `_guarded_task_generation()` thread.
965+
#
966+
class _LazyTaskGenHelper(object):
967+
def __init__(self, buffersize, iterator_cond):
968+
self.feature_enabled = buffersize is not None
969+
self.buffersize = buffersize
970+
self.tasks_generated = 0
971+
self.tasks_finished = 0
972+
self.iterator_cond = iterator_cond
973+
974+
@property
975+
def not_finished_tasks(self):
976+
return self.tasks_generated - self.tasks_finished
977+
917978
#
918979
#
919980
#

0 commit comments

Comments
 (0)