Skip to content

Commit

Permalink
Improve async Django support and improve docs
Browse files Browse the repository at this point in the history
  • Loading branch information
bigfootjon committed Jul 10, 2024
1 parent aa91c28 commit 412210c
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 7 deletions.
3 changes: 2 additions & 1 deletion channels/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from asgiref.sync import async_to_sync

from . import DEFAULT_CHANNEL_LAYER
from .db import database_sync_to_async
from .db import aclose_old_connections, database_sync_to_async
from .exceptions import StopConsumer
from .layers import get_channel_layer
from .utils import await_many_dispatch
Expand Down Expand Up @@ -70,6 +70,7 @@ async def dispatch(self, message):
"""
handler = getattr(self, get_handler_name(message), None)
if handler:
await aclose_old_connections()
await handler(message)
else:
raise ValueError("No handler for message type %s" % message["type"])
Expand Down
6 changes: 5 additions & 1 deletion channels/db.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from asgiref.sync import SyncToAsync
from asgiref.sync import SyncToAsync, sync_to_async
from django.db import close_old_connections


Expand All @@ -17,3 +17,7 @@ def thread_handler(self, loop, *args, **kwargs):

# The class is TitleCased, but we want to encourage use as a callable/decorator
database_sync_to_async = DatabaseSyncToAsync


async def aclose_old_connections():
return await sync_to_async(close_old_connections)()
2 changes: 2 additions & 0 deletions channels/generic/http.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from channels.consumer import AsyncConsumer

from ..db import aclose_old_connections
from ..exceptions import StopConsumer


Expand Down Expand Up @@ -88,4 +89,5 @@ async def http_disconnect(self, message):
Let the user do their cleanup and close the consumer.
"""
await self.disconnect()
await aclose_old_connections()
raise StopConsumer()
2 changes: 2 additions & 0 deletions channels/generic/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from asgiref.sync import async_to_sync

from ..consumer import AsyncConsumer, SyncConsumer
from ..db import aclose_old_connections
from ..exceptions import (
AcceptConnection,
DenyConnection,
Expand Down Expand Up @@ -247,6 +248,7 @@ async def websocket_disconnect(self, message):
"BACKEND is unconfigured or doesn't support groups"
)
await self.disconnect(message["code"])
await aclose_old_connections()
raise StopConsumer()

async def disconnect(self, code):
Expand Down
3 changes: 2 additions & 1 deletion docs/topics/consumers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ callable into an asynchronous coroutine.

If you want to call the Django ORM from an ``AsyncConsumer`` (or any other
asynchronous code), you should use the ``database_sync_to_async`` adapter
instead. See :doc:`/topics/databases` for more.
or use the async versions of the methods (prefixed with ``a``, like ``aget``).
See :doc:`/topics/databases` for more.


Closing Consumers
Expand Down
23 changes: 22 additions & 1 deletion docs/topics/databases.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ code is already run in a synchronous mode and Channels will do the cleanup
for you as part of the ``SyncConsumer`` code.

If you are writing asynchronous code, however, you will need to call
database methods in a safe, synchronous context, using ``database_sync_to_async``.
database methods in a safe, synchronous context, using ``database_sync_to_async``
or by using the asynchronous methods prefixed with ``a`` like ``Model.objects.aget()``.


Database Connections
Expand All @@ -26,6 +27,11 @@ Python 3.7 and below, and `min(32, os.cpu_count() + 4)` for Python 3.8+.

To avoid having too many threads idling in connections, you can instead rewrite your code to use async consumers and only dip into threads when you need to use Django's ORM (using ``database_sync_to_async``).

When using async consumers Channels will automatically call Django's ``close_old_connections`` method when a new connection is started, when a connection is closed, and whenever anything is received from the client.
This mirrors Django's logic for closing old connections at the start and end of a request, to the extent possible. Connections are *not* automatically closed when sending data from a consumer since Channels has no way
to determine if this is a one-off send (and connections could be closed) or a series of sends (in which closing connections would kill performance). Instead, if you have a long-lived async consumer you should
periodically call ``aclose_old_connections`` (see below).


database_sync_to_async
----------------------
Expand Down Expand Up @@ -58,3 +64,18 @@ You can also use it as a decorator:
@database_sync_to_async
def get_name(self):
return User.objects.all()[0].name
aclose_old_connections
----------------------

``django.db.aclose_old_connections`` is an async wrapper around Django's
``close_old_connections``. When using a long-lived ``AsyncConsumer`` that
calls the Django ORM it is important to call this function periodically.

Preferrably, this function should be called before making the first query
in a while. For example, it should be called if the Consumer is woken up
by a channels layer event and needs to make a few ORM queries to determine
what to send to the client. This function should be called *before* making
those queries. Calling this function more than necessary is not necessarily
a bad thing, but it does require a context switch to synchronous code and
so incurs a small penalty.
10 changes: 7 additions & 3 deletions docs/tutorial/part_3.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,20 @@ asynchronous consumers can provide a higher level of performance since they
don't need to create additional threads when handling requests.

``ChatConsumer`` only uses async-native libraries (Channels and the channel layer)
and in particular it does not access synchronous Django models. Therefore it can
and in particular it does not access synchronous code. Therefore it can
be rewritten to be asynchronous without complications.

.. note::
Even if ``ChatConsumer`` *did* access Django models or other synchronous code it
Even if ``ChatConsumer`` *did* access Django models or synchronous code it
would still be possible to rewrite it as asynchronous. Utilities like
:ref:`asgiref.sync.sync_to_async <sync_to_async>` and
:doc:`channels.db.database_sync_to_async </topics/databases>` can be
used to call synchronous code from an asynchronous consumer. The performance
gains however would be less than if it only used async-native libraries.
gains however would be less than if it only used async-native libraries. Django
models include methods prefixed with ``a`` that can be used safely from async
contexts, provided that
:doc:`channels.db.aclose_old_connections </topics/databases>` is called
occasionally.

Let's rewrite ``ChatConsumer`` to be asynchronous.
Put the following code in ``chat/consumers.py``:
Expand Down
1 change: 1 addition & 0 deletions tests/security/test_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from channels.testing import WebsocketCommunicator


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_origin_validator():
"""
Expand Down
4 changes: 4 additions & 0 deletions tests/test_generic_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from channels.testing import HttpCommunicator


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_async_http_consumer():
"""
Expand Down Expand Up @@ -38,6 +39,7 @@ async def handle(self, body):
assert response["headers"] == [(b"Content-Type", b"application/json")]


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_error():
class TestConsumer(AsyncHttpConsumer):
Expand All @@ -51,6 +53,7 @@ async def handle(self, body):
assert str(excinfo.value) == "Error correctly raised"


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_per_scope_consumers():
"""
Expand Down Expand Up @@ -87,6 +90,7 @@ async def handle(self, body):
assert response["body"] != second_response["body"]


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_async_http_consumer_future():
"""
Expand Down
7 changes: 7 additions & 0 deletions tests/test_generic_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def receive(self, text_data=None, bytes_data=None):
assert channel_layer.groups == {}


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_async_websocket_consumer():
"""
Expand Down Expand Up @@ -195,6 +196,7 @@ async def disconnect(self, code):
assert "disconnected" in results


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_async_websocket_consumer_subprotocol():
"""
Expand All @@ -217,6 +219,7 @@ async def connect(self):
assert subprotocol == "subprotocol2"


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_async_websocket_consumer_groups():
"""
Expand Down Expand Up @@ -253,6 +256,7 @@ async def receive(self, text_data=None, bytes_data=None):
assert channel_layer.groups == {}


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_async_websocket_consumer_specific_channel_layer():
"""
Expand Down Expand Up @@ -323,6 +327,7 @@ def receive_json(self, data=None):
await communicator.wait()


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_async_json_websocket_consumer():
"""
Expand Down Expand Up @@ -355,6 +360,7 @@ async def receive_json(self, data=None):
await communicator.wait()


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_block_underscored_type_function_call():
"""
Expand Down Expand Up @@ -390,6 +396,7 @@ async def _my_private_handler(self, _):
await communicator.receive_from()


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_block_leading_dot_type_function_call():
"""
Expand Down
1 change: 1 addition & 0 deletions tests/test_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ async def http_request(self, event):
await self.send({"type": "http.response.body", "body": b"test response"})


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_http_communicator():
"""
Expand Down

0 comments on commit 412210c

Please sign in to comment.