Run AsyncConsumer tasks concurrently #1933
Open
+35
−3
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
For #1924.
Exploratory PR which uses
asyncio.create_task
for dispatching tasks in a consumer, andadd_done_callback
to raise exceptions, rather thanawait
for each task one by one.A couple of notes:
The existing behaviour is that if one of the
dispatch
tasks raises an uncaught exception, that exception will be propagated to the parent task running the consumer. As far as I can see, this kills the consumer and means that no more tasks will be processed by that consumer. When usingcreate_task
andadd_done_callback
as in this PR the exception will still be raised by the dispatch task and appear in the console but it will not interrupt the parent task which will continue processing new requests.In order for the existing tests to pass, I added some extra code to keep the existing behaviour which is why the code is a bit messier. But I am wondering which behaviour is more desirable? It seems at the moment that if a user sends invalid data that causes an unhandled exception in one of the dispatch tasks, the entire consumer will stop working. But there may be other reasons why the existing behaviour is desirable (it does force the programmer to handle any possible exceptions caused by user input which is a good thing).
I did try using the Python 3.11
TaskGroup
backport for previous versions which actually implements the existing behaviour for point 1) and also would make the code cleaner, but discovered it's not exactly plug and play in previous Python versions, so not really an option. I could maybe implement a similar but much more simpler TaskGroup which make the code a bit cleaner, if you think it's needed.One backward compatibility consideration as brought up in the issue is that tasks will be finished out of order. Personally I think this is fine, as this is a feature of Asyncronous programming which async protocols and implementations should be able to handle. What I didn't know when I created the issue is that the SyncConsumer inherits from the AsyncConsumer so I think a change in this behaviour would also impact the SyncConsumer. Still, I think it shouldn't be an issue as network protocols should be able to deal with this. That said, I am mostly used to working with Channels from a
runworker
perspective, so someone with more experience with implementing http protocols may have a better idea if this is an issue from a backward compatibility POV. One option would be add a class boolean attribute which controls either old or new behaviour. Overall I think the new behaviour is worth it to allow the consumer to run in a truly asyncronous way which is faster.Everything is ok with the existing tests. Do you want me to add profiling tests to measure performance improvement?