8
8
import threading
9
9
import time
10
10
import types
11
+ import weakref
11
12
12
13
FIRST_COMPLETED = 'FIRST_COMPLETED'
13
14
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
@@ -569,6 +570,15 @@ def set_exception(self, exception):
569
570
class Executor (object ):
570
571
"""This is an abstract base class for concrete asynchronous executors."""
571
572
573
+ def __init__ (self , max_workers = None ):
574
+ """Initializes a new Executor instance.
575
+
576
+ Args:
577
+ max_workers: The maximum number of workers that can be used to
578
+ execute the given calls.
579
+ """
580
+ self ._max_workers = max_workers
581
+
572
582
def submit (self , fn , / , * args , ** kwargs ):
573
583
"""Submits a callable to be executed with the given arguments.
574
584
@@ -580,7 +590,7 @@ def submit(self, fn, /, *args, **kwargs):
580
590
"""
581
591
raise NotImplementedError ()
582
592
583
- def map (self , fn , * iterables , timeout = None , chunksize = 1 ):
593
+ def map (self , fn , * iterables , timeout = None , chunksize = 1 , prefetch = None ):
584
594
"""Returns an iterator equivalent to map(fn, iter).
585
595
586
596
Args:
@@ -592,6 +602,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
592
602
before being passed to a child process. This argument is only
593
603
used by ProcessPoolExecutor; it is ignored by
594
604
ThreadPoolExecutor.
605
+ prefetch: The number of chunks to queue beyond the number of
606
+ workers on the executor. If None, all chunks are queued.
595
607
596
608
Returns:
597
609
An iterator equivalent to: map(func, *iterables) but the calls may
@@ -604,25 +616,44 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
604
616
"""
605
617
if timeout is not None :
606
618
end_time = timeout + time .monotonic ()
619
+ if prefetch is not None and prefetch < 0 :
620
+ raise ValueError ("prefetch count may not be negative" )
607
621
608
- fs = [self .submit (fn , * args ) for args in zip (* iterables )]
622
+ all_args = zip (* iterables )
623
+ if prefetch is None :
624
+ fs = collections .deque (self .submit (fn , * args ) for args in all_args )
625
+ else :
626
+ fs = collections .deque ()
627
+ for idx , args in enumerate (all_args ):
628
+ if idx >= self ._max_workers + prefetch :
629
+ break
630
+ fs .append (self .submit (fn , * args ))
609
631
610
632
# Yield must be hidden in closure so that the futures are submitted
611
633
# before the first iterator value is required.
612
- def result_iterator ():
634
+ def result_iterator (all_args , executor_ref ):
613
635
try :
614
- # reverse to keep finishing order
615
- fs .reverse ()
616
636
while fs :
617
637
# Careful not to keep a reference to the popped future
618
638
if timeout is None :
619
- yield _result_or_cancel (fs .pop ())
639
+ yield _result_or_cancel (fs .popleft ())
620
640
else :
621
- yield _result_or_cancel (fs .pop (), end_time - time .monotonic ())
641
+ yield _result_or_cancel (
642
+ fs .popleft (), end_time - time .monotonic ()
643
+ )
644
+
645
+ # Submit the next task if any and if the executor exists
646
+ if executor_ref ():
647
+ try :
648
+ args = next (all_args )
649
+ except StopIteration :
650
+ pass
651
+ else :
652
+ fs .append (executor_ref ().submit (fn , * args ))
622
653
finally :
623
654
for future in fs :
624
655
future .cancel ()
625
- return result_iterator ()
656
+ return result_iterator (all_args , weakref . ref ( self ) )
626
657
627
658
def shutdown (self , wait = True , * , cancel_futures = False ):
628
659
"""Clean-up the resources associated with the Executor.
0 commit comments