Skip to content

Commit

Permalink
Add broadcast to the new asyncio implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
aaugustin committed Aug 17, 2024
1 parent 5eafbe4 commit c3b162d
Show file tree
Hide file tree
Showing 15 changed files with 341 additions and 80 deletions.
2 changes: 1 addition & 1 deletion docs/faq/server.rst
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ Record all connections in a global variable::
finally:
CONNECTIONS.remove(websocket)

Then, call :func:`~websockets.broadcast`::
Then, call :func:`~asyncio.connection.broadcast`::

import websockets

Expand Down
10 changes: 2 additions & 8 deletions docs/howto/upgrade.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,6 @@ Missing features
If your application relies on one of them, you should stick to the original
implementation until the new implementation supports it in a future release.

Broadcast
.........

The new implementation doesn't support :doc:`broadcasting messages
<../topics/broadcast>` yet.

Keepalive
.........

Expand Down Expand Up @@ -178,8 +172,8 @@ Server APIs
| :class:`websockets.server.WebSocketServerProtocol` |br| | |
| ``websockets.legacy.server.WebSocketServerProtocol`` | |
+-------------------------------------------------------------------+-----------------------------------------------------+
| :func:`websockets.broadcast` |br| | *not available yet* |
| ``websockets.legacy.protocol.broadcast()`` | |
| ``websockets.broadcast`` |br| | :func:`websockets.asyncio.connection.broadcast` |
| :func:`websockets.legacy.protocol.broadcast()` | |
+-------------------------------------------------------------------+-----------------------------------------------------+
| ``websockets.BasicAuthWebSocketServerProtocol`` |br| | *not available yet* |
| :class:`websockets.auth.BasicAuthWebSocketServerProtocol` |br| | |
Expand Down
16 changes: 9 additions & 7 deletions docs/intro/tutorial2.rst
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ you're using this pattern:
...
Since this is a very common pattern in WebSocket servers, websockets provides
the :func:`broadcast` helper for this purpose:
the :func:`~legacy.protocol.broadcast` helper for this purpose:

.. code-block:: python
Expand All @@ -494,13 +494,14 @@ the :func:`broadcast` helper for this purpose:
...
Calling :func:`broadcast` once is more efficient than
Calling :func:`legacy.protocol.broadcast` once is more efficient than
calling :meth:`~legacy.protocol.WebSocketCommonProtocol.send` in a loop.

However, there's a subtle difference in behavior. Did you notice that there's
no ``await`` in the second version? Indeed, :func:`broadcast` is a function,
not a coroutine like :meth:`~legacy.protocol.WebSocketCommonProtocol.send`
or :meth:`~legacy.protocol.WebSocketCommonProtocol.recv`.
However, there's a subtle difference in behavior. Did you notice that there's no
``await`` in the second version? Indeed, :func:`legacy.protocol.broadcast` is a
function, not a coroutine like
:meth:`~legacy.protocol.WebSocketCommonProtocol.send` or
:meth:`~legacy.protocol.WebSocketCommonProtocol.recv`.

It's quite obvious why :meth:`~legacy.protocol.WebSocketCommonProtocol.recv`
is a coroutine. When you want to receive the next message, you have to wait
Expand All @@ -521,7 +522,8 @@ That said, when you're sending the same messages to many clients in a loop,
applying backpressure in this way can become counterproductive. When you're
broadcasting, you don't want to slow down everyone to the pace of the slowest
clients; you want to drop clients that cannot keep up with the data stream.
That's why :func:`broadcast` doesn't wait until write buffers drain.
That's why :func:`legacy.protocol.broadcast` doesn't wait until write buffers
drain.

For our Connect Four game, there's no difference in practice: the total amount
of data sent on a connection for a game of Connect Four is less than 64 KB,
Expand Down
4 changes: 2 additions & 2 deletions docs/project/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ Improvements

* Added platform-independent wheels.

* Improved error handling in :func:`~websockets.broadcast`.
* Improved error handling in :func:`~legacy.protocol.broadcast`.

* Set ``server_hostname`` automatically on TLS connections when providing a
``sock`` argument to :func:`~sync.client.connect`.
Expand Down Expand Up @@ -402,7 +402,7 @@ New features

* Added compatibility with Python 3.10.

* Added :func:`~websockets.broadcast` to send a message to many clients.
* Added :func:`~legacy.protocol.broadcast` to send a message to many clients.

* Added support for reconnecting automatically by using
:func:`~client.connect` as an asynchronous iterator.
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/asyncio/server.rst
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,4 @@ websockets supports HTTP Basic Authentication according to
Broadcast
---------

.. autofunction:: websockets.broadcast
.. autofunction:: websockets.legacy.protocol.broadcast
5 changes: 5 additions & 0 deletions docs/reference/new-asyncio/server.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,8 @@ Using a connection
.. autoattribute:: response

.. autoproperty:: subprotocol

Broadcast
---------

.. autofunction:: websockets.asyncio.connection.broadcast
69 changes: 36 additions & 33 deletions docs/topics/broadcast.rst
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
Broadcasting messages
=====================
Broadcasting
============

.. currentmodule:: websockets


.. admonition:: If you just want to send a message to all connected clients,
use :func:`broadcast`.
.. admonition:: If you want to send a message to all connected clients,
use :func:`~asyncio.connection.broadcast`.
:class: tip

If you want to learn about its design in depth, continue reading this
document.
If you want to learn about its design, continue reading this document.

For the legacy :mod:`asyncio` implementation, use
:func:`~legacy.protocol.broadcast`.

WebSocket servers often send the same message to all connected clients or to a
subset of clients for which the message is relevant.

Let's explore options for broadcasting a message, explain the design
of :func:`broadcast`, and discuss alternatives.
Let's explore options for broadcasting a message, explain the design of
:func:`~asyncio.connection.broadcast`, and discuss alternatives.

For each option, we'll provide a connection handler called ``handler()`` and a
function or coroutine called ``broadcast()`` that sends a message to all
Expand All @@ -24,7 +25,7 @@ connected clients.
Integrating them is left as an exercise for the reader. You could start with::

import asyncio
import websockets
from websockets.asyncio.server import serve

async def handler(websocket):
...
Expand All @@ -39,7 +40,7 @@ Integrating them is left as an exercise for the reader. You could start with::
await broadcast(message)

async def main():
async with websockets.serve(handler, "localhost", 8765):
async with serve(handler, "localhost", 8765):
await broadcast_messages() # runs forever

if __name__ == "__main__":
Expand Down Expand Up @@ -82,11 +83,13 @@ to::

Here's a coroutine that broadcasts a message to all clients::

from websockets import ConnectionClosed

async def broadcast(message):
for websocket in CLIENTS.copy():
try:
await websocket.send(message)
except websockets.ConnectionClosed:
except ConnectionClosed:
pass

There are two tricks in this version of ``broadcast()``.
Expand Down Expand Up @@ -117,11 +120,11 @@ which is usually outside of the control of the server.

If you know for sure that you will never write more than ``write_limit`` bytes
within ``ping_interval + ping_timeout``, then websockets will terminate slow
connections before the write buffer has time to fill up.
connections before the write buffer can fill up.

Don't set extreme ``write_limit``, ``ping_interval``, and ``ping_timeout``
values to ensure that this condition holds. Set reasonable values and use the
built-in :func:`broadcast` function instead.
Don't set extreme values of ``write_limit``, ``ping_interval``, or
``ping_timeout`` to ensure that this condition holds! Instead, set reasonable
values and use the built-in :func:`~asyncio.connection.broadcast` function.

The concurrent way
------------------
Expand All @@ -134,7 +137,7 @@ Let's modify ``broadcast()`` to send messages concurrently::
async def send(websocket, message):
try:
await websocket.send(message)
except websockets.ConnectionClosed:
except ConnectionClosed:
pass

def broadcast(message):
Expand Down Expand Up @@ -179,20 +182,20 @@ doesn't work well when broadcasting a message to thousands of clients.

When you're sending messages to a single client, you don't want to send them
faster than the network can transfer them and the client accept them. This is
why :meth:`~server.WebSocketServerProtocol.send` checks if the write buffer
is full and, if it is, waits until it drain, giving the network and the
client time to catch up. This provides backpressure.
why :meth:`~asyncio.server.ServerConnection.send` checks if the write buffer is
above the high-water mark and, if it is, waits until it drains, giving the
network and the client time to catch up. This provides backpressure.

Without backpressure, you could pile up data in the write buffer until the
server process runs out of memory and the operating system kills it.

The :meth:`~server.WebSocketServerProtocol.send` API is designed to enforce
The :meth:`~asyncio.server.ServerConnection.send` API is designed to enforce
backpressure by default. This helps users of websockets write robust programs
even if they never heard about backpressure.

For comparison, :class:`asyncio.StreamWriter` requires users to understand
backpressure and to await :meth:`~asyncio.StreamWriter.drain` explicitly
after each :meth:`~asyncio.StreamWriter.write`.
backpressure and to await :meth:`~asyncio.StreamWriter.drain` after each
:meth:`~asyncio.StreamWriter.write` — or at least sufficiently frequently.

When broadcasting messages, backpressure consists in slowing down all clients
in an attempt to let the slowest client catch up. With thousands of clients,
Expand All @@ -203,14 +206,14 @@ How do we avoid running out of memory when slow clients can't keep up with the
broadcast rate, then? The most straightforward option is to disconnect them.

If a client gets too far behind, eventually it reaches the limit defined by
``ping_timeout`` and websockets terminates the connection. You can read the
discussion of :doc:`keepalive and timeouts <./timeouts>` for details.
``ping_timeout`` and websockets terminates the connection. You can refer to
the discussion of :doc:`keepalive and timeouts <timeouts>` for details.

How :func:`broadcast` works
---------------------------
How :func:`~asyncio.connection.broadcast` works
-----------------------------------------------

The built-in :func:`broadcast` function is similar to the naive way. The main
difference is that it doesn't apply backpressure.
The built-in :func:`~asyncio.connection.broadcast` function is similar to the
naive way. The main difference is that it doesn't apply backpressure.

This provides the best performance by avoiding the overhead of scheduling and
running one task per client.
Expand Down Expand Up @@ -321,9 +324,9 @@ the asynchronous iterator returned by ``subscribe()``.
Performance considerations
--------------------------

The built-in :func:`broadcast` function sends all messages without yielding
control to the event loop. So does the naive way when the network and clients
are fast and reliable.
The built-in :func:`~asyncio.connection.broadcast` function sends all messages
without yielding control to the event loop. So does the naive way when the
network and clients are fast and reliable.

For each client, a WebSocket frame is prepared and sent to the network. This
is the minimum amount of work required to broadcast a message.
Expand All @@ -343,7 +346,7 @@ However, this isn't possible in general for two reasons:

All other patterns discussed above yield control to the event loop once per
client because messages are sent by different tasks. This makes them slower
than the built-in :func:`broadcast` function.
than the built-in :func:`~asyncio.connection.broadcast` function.

There is no major difference between the performance of per-client queues and
publish–subscribe.
2 changes: 1 addition & 1 deletion docs/topics/logging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ Here's what websockets logs at each level.
``WARNING``
...........

* Failures in :func:`~websockets.broadcast`
* Failures in :func:`~asyncio.connection.broadcast`

``INFO``
........
Expand Down
6 changes: 4 additions & 2 deletions docs/topics/performance.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
Performance
===========

.. currentmodule:: websockets

Here are tips to optimize performance.

uvloop
Expand All @@ -16,5 +18,5 @@ application.)
broadcast
---------

:func:`~websockets.broadcast` is the most efficient way to send a message to
many clients.
:func:`~asyncio.connection.broadcast` is the most efficient way to send a
message to many clients.
21 changes: 12 additions & 9 deletions experiments/broadcast/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import sys
import time

import websockets
from websockets import ConnectionClosed
from websockets.asyncio.server import serve
from websockets.asyncio.connection import broadcast


CLIENTS = set()
Expand All @@ -15,7 +17,7 @@
async def send(websocket, message):
try:
await websocket.send(message)
except websockets.ConnectionClosed:
except ConnectionClosed:
pass


Expand Down Expand Up @@ -43,9 +45,6 @@ async def subscribe(self):
__aiter__ = subscribe


PUBSUB = PubSub()


async def handler(websocket, method=None):
if method in ["default", "naive", "task", "wait"]:
CLIENTS.add(websocket)
Expand All @@ -63,14 +62,18 @@ async def handler(websocket, method=None):
CLIENTS.remove(queue)
relay_task.cancel()
elif method == "pubsub":
global PUBSUB
async for message in PUBSUB:
await websocket.send(message)
else:
raise NotImplementedError(f"unsupported method: {method}")


async def broadcast(method, size, delay):
async def broadcast_messages(method, size, delay):
"""Broadcast messages at regular intervals."""
if method == "pubsub":
global PUBSUB
PUBSUB = PubSub()
load_average = 0
time_average = 0
pc1, pt1 = time.perf_counter_ns(), time.process_time_ns()
Expand All @@ -90,7 +93,7 @@ async def broadcast(method, size, delay):
message = str(time.time_ns()).encode() + b" " + os.urandom(size - 20)

if method == "default":
websockets.broadcast(CLIENTS, message)
broadcast(CLIENTS, message)
elif method == "naive":
# Since the loop can yield control, make a copy of CLIENTS
# to avoid: RuntimeError: Set changed size during iteration
Expand Down Expand Up @@ -128,14 +131,14 @@ async def broadcast(method, size, delay):


async def main(method, size, delay):
async with websockets.serve(
async with serve(
functools.partial(handler, method=method),
"localhost",
8765,
compression=None,
ping_timeout=None,
):
await broadcast(method, size, delay)
await broadcast_messages(method, size, delay)


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit c3b162d

Please sign in to comment.