@@ -417,67 +417,14 @@ def imap(self, func, iterable, chunksize=1, buffersize=None):
417417 '''
418418 Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
419419 '''
420- self ._check_running ()
421- self ._check_chunksize (chunksize )
422- self ._check_buffersize (buffersize )
423-
424- result = IMapIterator (self , buffersize )
425- if chunksize == 1 :
426- self ._taskqueue .put (
427- (
428- self ._guarded_task_generation (result ._job , func , iterable ,
429- result ._buffersize_sema ),
430- result ._set_length ,
431- )
432- )
433- return result
434- else :
435- task_batches = Pool ._get_tasks (func , iterable , chunksize )
436- self ._taskqueue .put (
437- (
438- self ._guarded_task_generation (
439- result ._job ,
440- mapstar ,
441- task_batches ,
442- result ._buffersize_sema ,
443- ),
444- result ._set_length ,
445- )
446- )
447- return (item for chunk in result for item in chunk )
420+ return self ._imap (IMapIterator , func , iterable , chunksize , buffersize )
448421
449422 def imap_unordered (self , func , iterable , chunksize = 1 , buffersize = None ):
450423 '''
451424 Like `imap()` method but ordering of results is arbitrary.
452425 '''
453- self ._check_running ()
454- self ._check_chunksize (chunksize )
455- self ._check_buffersize (buffersize )
456-
457- result = IMapUnorderedIterator (self , buffersize )
458- if chunksize == 1 :
459- self ._taskqueue .put (
460- (
461- self ._guarded_task_generation (result ._job , func , iterable ,
462- result ._buffersize_sema ),
463- result ._set_length ,
464- )
465- )
466- return result
467- else :
468- task_batches = Pool ._get_tasks (func , iterable , chunksize )
469- self ._taskqueue .put (
470- (
471- self ._guarded_task_generation (
472- result ._job ,
473- mapstar ,
474- task_batches ,
475- result ._buffersize_sema ,
476- ),
477- result ._set_length ,
478- )
479- )
480- return (item for chunk in result for item in chunk )
426+ return self ._imap (IMapUnorderedIterator , func , iterable , chunksize ,
427+ buffersize )
481428
482429 def apply_async (self , func , args = (), kwds = {}, callback = None ,
483430 error_callback = None ):
@@ -526,6 +473,34 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
526473 )
527474 return result
528475
476+ def _imap (self , iterator_cls , func , iterable , chunksize = 1 ,
477+ buffersize = None ):
478+ self ._check_running ()
479+ self ._check_chunksize (chunksize )
480+ self ._check_buffersize (buffersize )
481+
482+ result = iterator_cls (self , buffersize )
483+ if chunksize == 1 :
484+ self ._taskqueue .put (
485+ (
486+ self ._guarded_task_generation (result ._job , func , iterable ,
487+ result ._buffersize_sema ),
488+ result ._set_length ,
489+ )
490+ )
491+ return result
492+ else :
493+ task_batches = Pool ._get_tasks (func , iterable , chunksize )
494+ self ._taskqueue .put (
495+ (
496+ self ._guarded_task_generation (result ._job , mapstar ,
497+ task_batches ,
498+ result ._buffersize_sema ),
499+ result ._set_length ,
500+ )
501+ )
502+ return (item for chunk in result for item in chunk )
503+
529504 @staticmethod
530505 def _check_chunksize (chunksize ):
531506 if chunksize < 1 :
0 commit comments