13
13
to modify the meaning of the API call itself.
14
14
"""
15
15
16
-
17
16
import collections
18
17
import concurrent .futures
19
18
import heapq
28
27
import traceback
29
28
import sys
30
29
import warnings
30
+ import weakref
31
31
32
32
from . import compat
33
33
from . import coroutines
@@ -242,6 +242,13 @@ def __init__(self):
242
242
self ._task_factory = None
243
243
self ._coroutine_wrapper_set = False
244
244
245
+ # A weak set of all asynchronous generators that are being iterated
246
+ # by the loop.
247
+ self ._asyncgens = weakref .WeakSet ()
248
+
249
+ # Set to True when `loop.shutdown_asyncgens` is called.
250
+ self ._asyncgens_shutdown_called = False
251
+
245
252
def __repr__ (self ):
246
253
return ('<%s running=%s closed=%s debug=%s>'
247
254
% (self .__class__ .__name__ , self .is_running (),
@@ -333,13 +340,56 @@ def _check_closed(self):
333
340
if self ._closed :
334
341
raise RuntimeError ('Event loop is closed' )
335
342
343
+ def _asyncgen_finalizer_hook (self , agen ):
344
+ self ._asyncgens .discard (agen )
345
+ if not self .is_closed ():
346
+ self .create_task (agen .aclose ())
347
+
348
+ def _asyncgen_firstiter_hook (self , agen ):
349
+ if self ._asyncgens_shutdown_called :
350
+ warnings .warn (
351
+ "asynchronous generator {!r} was scheduled after "
352
+ "loop.shutdown_asyncgens() call" .format (agen ),
353
+ ResourceWarning , source = self )
354
+
355
+ self ._asyncgens .add (agen )
356
+
357
+ @coroutine
358
+ def shutdown_asyncgens (self ):
359
+ """Shutdown all active asynchronous generators."""
360
+ self ._asyncgens_shutdown_called = True
361
+
362
+ if not len (self ._asyncgens ):
363
+ return
364
+
365
+ closing_agens = list (self ._asyncgens )
366
+ self ._asyncgens .clear ()
367
+
368
+ shutdown_coro = tasks .gather (
369
+ * [ag .aclose () for ag in closing_agens ],
370
+ return_exceptions = True ,
371
+ loop = self )
372
+
373
+ results = yield from shutdown_coro
374
+ for result , agen in zip (results , closing_agens ):
375
+ if isinstance (result , Exception ):
376
+ self .call_exception_handler ({
377
+ 'message' : 'an error occurred during closing of '
378
+ 'asynchronous generator {!r}' .format (agen ),
379
+ 'exception' : result ,
380
+ 'asyncgen' : agen
381
+ })
382
+
336
383
def run_forever (self ):
337
384
"""Run until stop() is called."""
338
385
self ._check_closed ()
339
386
if self .is_running ():
340
387
raise RuntimeError ('Event loop is running.' )
341
388
self ._set_coroutine_wrapper (self ._debug )
342
389
self ._thread_id = threading .get_ident ()
390
+ old_agen_hooks = sys .get_asyncgen_hooks ()
391
+ sys .set_asyncgen_hooks (firstiter = self ._asyncgen_firstiter_hook ,
392
+ finalizer = self ._asyncgen_finalizer_hook )
343
393
try :
344
394
while True :
345
395
self ._run_once ()
@@ -349,6 +399,7 @@ def run_forever(self):
349
399
self ._stopping = False
350
400
self ._thread_id = None
351
401
self ._set_coroutine_wrapper (False )
402
+ sys .set_asyncgen_hooks (* old_agen_hooks )
352
403
353
404
def run_until_complete (self , future ):
354
405
"""Run until the Future is done.
@@ -1179,7 +1230,9 @@ def call_exception_handler(self, context):
1179
1230
- 'handle' (optional): Handle instance;
1180
1231
- 'protocol' (optional): Protocol instance;
1181
1232
- 'transport' (optional): Transport instance;
1182
- - 'socket' (optional): Socket instance.
1233
+ - 'socket' (optional): Socket instance;
1234
+ - 'asyncgen' (optional): Asynchronous generator that caused
1235
+ the exception.
1183
1236
1184
1237
New keys maybe introduced in the future.
1185
1238
0 commit comments