diff --git a/channels/consumer.py b/channels/consumer.py index 85543d6cd..fc065432b 100644 --- a/channels/consumer.py +++ b/channels/consumer.py @@ -3,7 +3,7 @@ from asgiref.sync import async_to_sync from . import DEFAULT_CHANNEL_LAYER -from .db import database_sync_to_async +from .db import aclose_old_connections, database_sync_to_async from .exceptions import StopConsumer from .layers import get_channel_layer from .utils import await_many_dispatch @@ -70,6 +70,7 @@ async def dispatch(self, message): """ handler = getattr(self, get_handler_name(message), None) if handler: + await aclose_old_connections() await handler(message) else: raise ValueError("No handler for message type %s" % message["type"]) diff --git a/channels/db.py b/channels/db.py index 0650e7a83..2961b5cdb 100644 --- a/channels/db.py +++ b/channels/db.py @@ -1,4 +1,4 @@ -from asgiref.sync import SyncToAsync +from asgiref.sync import SyncToAsync, sync_to_async from django.db import close_old_connections @@ -17,3 +17,7 @@ def thread_handler(self, loop, *args, **kwargs): # The class is TitleCased, but we want to encourage use as a callable/decorator database_sync_to_async = DatabaseSyncToAsync + + +async def aclose_old_connections(): + return await sync_to_async(close_old_connections)() diff --git a/channels/generic/http.py b/channels/generic/http.py index 909e85704..0d043cc3a 100644 --- a/channels/generic/http.py +++ b/channels/generic/http.py @@ -1,5 +1,6 @@ from channels.consumer import AsyncConsumer +from ..db import aclose_old_connections from ..exceptions import StopConsumer @@ -88,4 +89,5 @@ async def http_disconnect(self, message): Let the user do their cleanup and close the consumer. """ await self.disconnect() + await aclose_old_connections() raise StopConsumer() diff --git a/channels/generic/websocket.py b/channels/generic/websocket.py index 9ce2657b3..6d41c8ee2 100644 --- a/channels/generic/websocket.py +++ b/channels/generic/websocket.py @@ -3,6 +3,7 @@ from asgiref.sync import async_to_sync from ..consumer import AsyncConsumer, SyncConsumer +from ..db import aclose_old_connections from ..exceptions import ( AcceptConnection, DenyConnection, @@ -247,6 +248,7 @@ async def websocket_disconnect(self, message): "BACKEND is unconfigured or doesn't support groups" ) await self.disconnect(message["code"]) + await aclose_old_connections() raise StopConsumer() async def disconnect(self, code): diff --git a/docs/topics/consumers.rst b/docs/topics/consumers.rst index 692491476..294a9aeda 100644 --- a/docs/topics/consumers.rst +++ b/docs/topics/consumers.rst @@ -112,7 +112,8 @@ callable into an asynchronous coroutine. If you want to call the Django ORM from an ``AsyncConsumer`` (or any other asynchronous code), you should use the ``database_sync_to_async`` adapter - instead. See :doc:`/topics/databases` for more. + or use the async versions of the methods (prefixed with ``a``, like ``aget``). + See :doc:`/topics/databases` for more. Closing Consumers diff --git a/docs/topics/databases.rst b/docs/topics/databases.rst index 5d06bebee..e0d2c4afc 100644 --- a/docs/topics/databases.rst +++ b/docs/topics/databases.rst @@ -11,7 +11,8 @@ code is already run in a synchronous mode and Channels will do the cleanup for you as part of the ``SyncConsumer`` code. If you are writing asynchronous code, however, you will need to call -database methods in a safe, synchronous context, using ``database_sync_to_async``. +database methods in a safe, synchronous context, using ``database_sync_to_async`` +or by using the asynchronous methods prefixed with ``a`` like ``Model.objects.aget()``. Database Connections @@ -26,6 +27,11 @@ Python 3.7 and below, and `min(32, os.cpu_count() + 4)` for Python 3.8+. To avoid having too many threads idling in connections, you can instead rewrite your code to use async consumers and only dip into threads when you need to use Django's ORM (using ``database_sync_to_async``). +When using async consumers Channels will automatically call Django's ``close_old_connections`` method when a new connection is started, when a connection is closed, and whenever anything is received from the client. +This mirrors Django's logic for closing old connections at the start and end of a request, to the extent possible. Connections are *not* automatically closed when sending data from a consumer since Channels has no way +to determine if this is a one-off send (and connections could be closed) or a series of sends (in which closing connections would kill performance). Instead, if you have a long-lived async consumer you should +periodically call ``aclose_old_connections`` (see below). + database_sync_to_async ---------------------- @@ -58,3 +64,18 @@ You can also use it as a decorator: @database_sync_to_async def get_name(self): return User.objects.all()[0].name + +aclose_old_connections +---------------------- + +``django.db.aclose_old_connections`` is an async wrapper around Django's +``close_old_connections``. When using a long-lived ``AsyncConsumer`` that +calls the Django ORM it is important to call this function periodically. + +Preferrably, this function should be called before making the first query +in a while. For example, it should be called if the Consumer is woken up +by a channels layer event and needs to make a few ORM queries to determine +what to send to the client. This function should be called *before* making +those queries. Calling this function more than necessary is not necessarily +a bad thing, but it does require a context switch to synchronous code and +so incurs a small penalty. \ No newline at end of file diff --git a/docs/tutorial/part_3.rst b/docs/tutorial/part_3.rst index 99182362f..dbda84740 100644 --- a/docs/tutorial/part_3.rst +++ b/docs/tutorial/part_3.rst @@ -15,16 +15,20 @@ asynchronous consumers can provide a higher level of performance since they don't need to create additional threads when handling requests. ``ChatConsumer`` only uses async-native libraries (Channels and the channel layer) -and in particular it does not access synchronous Django models. Therefore it can +and in particular it does not access synchronous code. Therefore it can be rewritten to be asynchronous without complications. .. note:: - Even if ``ChatConsumer`` *did* access Django models or other synchronous code it + Even if ``ChatConsumer`` *did* access Django models or synchronous code it would still be possible to rewrite it as asynchronous. Utilities like :ref:`asgiref.sync.sync_to_async ` and :doc:`channels.db.database_sync_to_async ` can be used to call synchronous code from an asynchronous consumer. The performance - gains however would be less than if it only used async-native libraries. + gains however would be less than if it only used async-native libraries. Django + models include methods prefixed with ``a`` that can be used safely from async + contexts, provided that + :doc:`channels.db.aclose_old_connections ` is called + occasionally. Let's rewrite ``ChatConsumer`` to be asynchronous. Put the following code in ``chat/consumers.py``: diff --git a/tests/security/test_websocket.py b/tests/security/test_websocket.py index 52f9e21b4..1444ea829 100644 --- a/tests/security/test_websocket.py +++ b/tests/security/test_websocket.py @@ -5,6 +5,7 @@ from channels.testing import WebsocketCommunicator +@pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_origin_validator(): """ diff --git a/tests/test_generic_http.py b/tests/test_generic_http.py index bfb889c0e..0b6d0ecb5 100644 --- a/tests/test_generic_http.py +++ b/tests/test_generic_http.py @@ -8,6 +8,7 @@ from channels.testing import HttpCommunicator +@pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_async_http_consumer(): """ @@ -38,6 +39,7 @@ async def handle(self, body): assert response["headers"] == [(b"Content-Type", b"application/json")] +@pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_error(): class TestConsumer(AsyncHttpConsumer): @@ -51,6 +53,7 @@ async def handle(self, body): assert str(excinfo.value) == "Error correctly raised" +@pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_per_scope_consumers(): """ @@ -87,6 +90,7 @@ async def handle(self, body): assert response["body"] != second_response["body"] +@pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_async_http_consumer_future(): """ diff --git a/tests/test_generic_websocket.py b/tests/test_generic_websocket.py index 73cdb4862..c553eb843 100644 --- a/tests/test_generic_websocket.py +++ b/tests/test_generic_websocket.py @@ -154,6 +154,7 @@ def receive(self, text_data=None, bytes_data=None): assert channel_layer.groups == {} +@pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_async_websocket_consumer(): """ @@ -195,6 +196,7 @@ async def disconnect(self, code): assert "disconnected" in results +@pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_async_websocket_consumer_subprotocol(): """ @@ -217,6 +219,7 @@ async def connect(self): assert subprotocol == "subprotocol2" +@pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_async_websocket_consumer_groups(): """ @@ -253,6 +256,7 @@ async def receive(self, text_data=None, bytes_data=None): assert channel_layer.groups == {} +@pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_async_websocket_consumer_specific_channel_layer(): """ @@ -323,6 +327,7 @@ def receive_json(self, data=None): await communicator.wait() +@pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_async_json_websocket_consumer(): """ @@ -355,6 +360,7 @@ async def receive_json(self, data=None): await communicator.wait() +@pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_block_underscored_type_function_call(): """ @@ -390,6 +396,7 @@ async def _my_private_handler(self, _): await communicator.receive_from() +@pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_block_leading_dot_type_function_call(): """ diff --git a/tests/test_testing.py b/tests/test_testing.py index 6beae7d5b..fbfbf436f 100644 --- a/tests/test_testing.py +++ b/tests/test_testing.py @@ -23,6 +23,7 @@ async def http_request(self, event): await self.send({"type": "http.response.body", "body": b"test response"}) +@pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_http_communicator(): """