1414#
1515
1616import collections
17+ import functools
1718import itertools
1819import os
1920import queue
2526
2627# If threading is available then ThreadPool should be provided. Therefore
2728# we avoid top-level imports which are liable to fail on some systems.
28- from . import util
29- from . import get_context , TimeoutError
29+ from . import TimeoutError , get_context , util
3030from .connection import wait
3131
3232#
@@ -395,32 +395,20 @@ def _guarded_task_generation(self, result_job, func, iterable):
395395 yield (result_job , i + 1 , _helper_reraises_exception , (e ,), {})
396396
397397 def _guarded_task_generation_lazy (self , result_job , func , iterable ,
398- lazy_task_gen_helper ):
399- ''' Provides a generator of tasks for imap and imap_unordered with
398+ backpressure_sema ):
399+ """ Provides a generator of tasks for imap and imap_unordered with
400400 appropriate handling for iterables which throw exceptions during
401- iteration.'''
402- if not lazy_task_gen_helper .feature_enabled :
403- yield from self ._guarded_task_generation (result_job , func , iterable )
404- return
405-
401+ iteration."""
406402 try :
407403 i = - 1
408404 enumerated_iter = iter (enumerate (iterable ))
409- thread = threading .current_thread ()
410- max_generated_tasks = self ._processes + lazy_task_gen_helper .buffersize
411-
412- while thread ._state == RUN :
413- with lazy_task_gen_helper .iterator_cond :
414- if lazy_task_gen_helper .not_finished_tasks >= max_generated_tasks :
415- continue # wait for some task to be (picked up and) finished
416-
405+ while True :
406+ backpressure_sema .acquire ()
417407 try :
418- i , x = enumerated_iter . __next__ ( )
408+ i , x = next ( enumerated_iter )
419409 except StopIteration :
420410 break
421-
422411 yield (result_job , i , func , (x ,), {})
423- lazy_task_gen_helper .tasks_generated += 1
424412
425413 except Exception as e :
426414 yield (result_job , i + 1 , _helper_reraises_exception , (e ,), {})
@@ -430,31 +418,32 @@ def imap(self, func, iterable, chunksize=1, buffersize=None):
430418 Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
431419 '''
432420 self ._check_running ()
421+ if chunksize < 1 :
422+ raise ValueError ("Chunksize must be 1+, not {0:n}" .format (chunksize ))
423+
424+ result = IMapIterator (self , buffersize )
425+
426+ if result ._backpressure_sema is None :
427+ task_generation = self ._guarded_task_generation
428+ else :
429+ task_generation = functools .partial (
430+ self ._guarded_task_generation_lazy ,
431+ backpressure_sema = result ._backpressure_sema ,
432+ )
433+
433434 if chunksize == 1 :
434- result = IMapIterator (self , buffersize )
435435 self ._taskqueue .put (
436436 (
437- self ._guarded_task_generation_lazy (result ._job ,
438- func ,
439- iterable ,
440- result ._lazy_task_gen_helper ),
437+ task_generation (result ._job , func , iterable ),
441438 result ._set_length ,
442439 )
443440 )
444441 return result
445442 else :
446- if chunksize < 1 :
447- raise ValueError (
448- "Chunksize must be 1+, not {0:n}" .format (
449- chunksize ))
450443 task_batches = Pool ._get_tasks (func , iterable , chunksize )
451- result = IMapIterator (self , buffersize )
452444 self ._taskqueue .put (
453445 (
454- self ._guarded_task_generation_lazy (result ._job ,
455- mapstar ,
456- task_batches ,
457- result ._lazy_task_gen_helper ),
446+ task_generation (result ._job , mapstar , task_batches ),
458447 result ._set_length ,
459448 )
460449 )
@@ -465,30 +454,34 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None):
465454 Like `imap()` method but ordering of results is arbitrary.
466455 '''
467456 self ._check_running ()
457+ if chunksize < 1 :
458+ raise ValueError (
459+ "Chunksize must be 1+, not {0!r}" .format (chunksize )
460+ )
461+
462+ result = IMapUnorderedIterator (self , buffersize )
463+
464+ if result ._backpressure_sema is None :
465+ task_generation = self ._guarded_task_generation
466+ else :
467+ task_generation = functools .partial (
468+ self ._guarded_task_generation_lazy ,
469+ backpressure_sema = result ._backpressure_sema ,
470+ )
471+
468472 if chunksize == 1 :
469- result = IMapUnorderedIterator (self , buffersize )
470473 self ._taskqueue .put (
471474 (
472- self ._guarded_task_generation_lazy (result ._job ,
473- func ,
474- iterable ,
475- result ._lazy_task_gen_helper ),
475+ task_generation (result ._job , func , iterable ),
476476 result ._set_length ,
477477 )
478478 )
479479 return result
480480 else :
481- if chunksize < 1 :
482- raise ValueError (
483- "Chunksize must be 1+, not {0!r}" .format (chunksize ))
484481 task_batches = Pool ._get_tasks (func , iterable , chunksize )
485- result = IMapUnorderedIterator (self , buffersize )
486482 self ._taskqueue .put (
487483 (
488- self ._guarded_task_generation_lazy (result ._job ,
489- mapstar ,
490- task_batches ,
491- result ._lazy_task_gen_helper ),
484+ task_generation (result ._job , mapstar , task_batches ),
492485 result ._set_length ,
493486 )
494487 )
@@ -889,7 +882,13 @@ def __init__(self, pool, buffersize):
889882 self ._length = None
890883 self ._unsorted = {}
891884 self ._cache [self ._job ] = self
892- self ._lazy_task_gen_helper = _LazyTaskGenHelper (buffersize , self ._cond )
885+
886+ if buffersize is None :
887+ self ._backpressure_sema = None
888+ else :
889+ self ._backpressure_sema = threading .Semaphore (
890+ value = self ._pool ._processes + buffersize
891+ )
893892
894893 def __iter__ (self ):
895894 return self
@@ -910,7 +909,9 @@ def next(self, timeout=None):
910909 self ._pool = None
911910 raise StopIteration from None
912911 raise TimeoutError from None
913- self ._lazy_task_gen_helper .tasks_finished += 1
912+
913+ if self ._backpressure_sema :
914+ self ._backpressure_sema .release ()
914915
915916 success , value = item
916917 if success :
@@ -959,22 +960,6 @@ def _set(self, i, obj):
959960 del self ._cache [self ._job ]
960961 self ._pool = None
961962
962- #
963- # Class to store stats for lazy task generation and share them
964- # between the main thread and `_guarded_task_generation()` thread.
965- #
966- class _LazyTaskGenHelper (object ):
967- def __init__ (self , buffersize , iterator_cond ):
968- self .feature_enabled = buffersize is not None
969- self .buffersize = buffersize
970- self .tasks_generated = 0
971- self .tasks_finished = 0
972- self .iterator_cond = iterator_cond
973-
974- @property
975- def not_finished_tasks (self ):
976- return self .tasks_generated - self .tasks_finished
977-
978963#
979964#
980965#
0 commit comments