From ea19bec46f65db217305f5ec51bacae0acc3991a Mon Sep 17 00:00:00 2001 From: Mathieu Dupuy Date: Thu, 4 Sep 2025 09:36:37 +0200 Subject: [PATCH 1/8] migrate ChannelsLiveServerTestCase background implementation to threads (fixes #2181) It was previously using processes, but it was causing errors when tests were ran with '--parallel', as it was a daemon process and daemon processes can't be spawned when already in a subprocesses. --- channels/testing/live.py | 183 +++++++++++++++++++++++++++++++-------- 1 file changed, 146 insertions(+), 37 deletions(-) diff --git a/channels/testing/live.py b/channels/testing/live.py index aa1a7880..3e3e3165 100644 --- a/channels/testing/live.py +++ b/channels/testing/live.py @@ -1,12 +1,12 @@ +import threading from functools import partial -from daphne.testing import DaphneProcess from django.contrib.staticfiles.handlers import ASGIStaticFilesHandler -from django.core.exceptions import ImproperlyConfigured from django.db import connections from django.db.backends.base.creation import TEST_DATABASE_PREFIX from django.test.testcases import TransactionTestCase from django.test.utils import modify_settings +from django.utils.functional import classproperty from channels.routing import get_default_application @@ -28,65 +28,174 @@ def set_database_connection(): settings.DATABASES["default"]["NAME"] = test_db_name +class ChannelsLiveServerThread(threading.Thread): + """Thread for running a live ASGI server while the tests are running.""" + + def __init__( + self, host, get_application, connections_override=None, port=0, setup=None + ): + self.host = host + self.port = port + self.get_application = get_application + self.connections_override = connections_override + self.setup = setup + self.is_ready = threading.Event() + self.error = None + super().__init__() + + def run(self): + """ + Set up the live server and databases, and then loop over handling + ASGI requests. + """ + if self.connections_override: + # Override this thread's database connections with the ones + # provided by the main thread. + for alias, conn in self.connections_override.items(): + connections[alias] = conn + + try: + # Reinstall the reactor for this thread (same as DaphneProcess) + from daphne.testing import _reinstall_reactor + + _reinstall_reactor() + + from daphne.endpoints import build_endpoint_description_strings + from daphne.server import Server + + # Get the application + application = self.get_application() + + # Create the server + endpoints = build_endpoint_description_strings( + host=self.host, port=self.port + ) + self.server = Server( + application=application, + endpoints=endpoints, + signal_handlers=False, + ready_callable=self._set_ready, + verbosity=0, + ) + + # Run setup if provided + if self.setup is not None: + self.setup() + + # Start the server + self.server.run() + except Exception as e: + self.error = e + self.is_ready.set() + finally: + connections.close_all() + + def _set_ready(self): + """Called by Daphne when the server is ready.""" + if self.server.listening_addresses: + self.port = self.server.listening_addresses[0][1] + self.is_ready.set() + + def terminate(self): + if hasattr(self, "server"): + # Stop the ASGI server + from twisted.internet import reactor + + if reactor.running: + reactor.callFromThread(reactor.stop) + self.join(timeout=5) + + class ChannelsLiveServerTestCase(TransactionTestCase): """ - Does basically the same as TransactionTestCase but also launches a - live Daphne server in a separate process, so - that the tests may use another test framework, such as Selenium, - instead of the built-in dummy client. + Do basically the same as TransactionTestCase but also launch a live ASGI + server in a separate thread so that the tests may use another testing + framework, such as Selenium for example, instead of the built-in dummy + client. + It inherits from TransactionTestCase instead of TestCase because the + threads don't share the same transactions (unless if using in-memory + sqlite) and each thread needs to commit all their transactions so that the + other thread can see the changes. """ host = "localhost" - ProtocolServerProcess = DaphneProcess - static_wrapper = ASGIStaticFilesHandler + port = 0 + server_thread_class = ChannelsLiveServerThread + static_handler = ASGIStaticFilesHandler serve_static = True - @property - def live_server_url(self): - return "http://%s:%s" % (self.host, self._port) + @classproperty + def live_server_url(cls): + return "http://%s:%s" % (cls.host, cls.server_thread.port) - @property - def live_server_ws_url(self): - return "ws://%s:%s" % (self.host, self._port) + @classproperty + def live_server_ws_url(cls): + return "ws://%s:%s" % (cls.host, cls.server_thread.port) + + @classproperty + def allowed_host(cls): + return cls.host @classmethod - def setUpClass(cls): - for connection in connections.all(): - if cls._is_in_memory_db(connection): - raise ImproperlyConfigured( - "ChannelLiveServerTestCase can not be used with in memory databases" - ) + def _make_connections_override(cls): + connections_override = {} + for conn in connections.all(): + # If using in-memory sqlite databases, pass the connections to + # the server thread. + if conn.vendor == "sqlite" and conn.is_in_memory_db(): + connections_override[conn.alias] = conn + return connections_override + @classmethod + def setUpClass(cls): super().setUpClass() - - cls._live_server_modified_settings = modify_settings( - ALLOWED_HOSTS={"append": cls.host} + cls.enterClassContext( + modify_settings(ALLOWED_HOSTS={"append": cls.allowed_host}) ) - cls._live_server_modified_settings.enable() + cls._start_server_thread() + + @classmethod + def _start_server_thread(cls): + connections_override = cls._make_connections_override() + for conn in connections_override.values(): + # Explicitly enable thread-shareability for this connection. + conn.inc_thread_sharing() + + cls.server_thread = cls._create_server_thread(connections_override) + cls.server_thread.daemon = True + cls.server_thread.start() + cls.addClassCleanup(cls._terminate_thread) + + # Wait for the live server to be ready + cls.server_thread.is_ready.wait() + if cls.server_thread.error: + raise cls.server_thread.error + @classmethod + def _create_server_thread(cls, connections_override): get_application = partial( make_application, - static_wrapper=cls.static_wrapper if cls.serve_static else None, + static_wrapper=cls.static_handler if cls.serve_static else None, ) - cls._server_process = cls.ProtocolServerProcess( + return cls.server_thread_class( cls.host, get_application, + connections_override=connections_override, + port=cls.port, setup=set_database_connection, ) - cls._server_process.start() - while True: - if not cls._server_process.ready.wait(timeout=1): - if cls._server_process.is_alive(): - continue - raise RuntimeError("Server stopped") from None - break - cls._port = cls._server_process.port.value + + @classmethod + def _terminate_thread(cls): + # Terminate the live server's thread. + cls.server_thread.terminate() + # Restore shared connections' non-shareability. + for conn in cls.server_thread.connections_override.values(): + conn.dec_thread_sharing() @classmethod def tearDownClass(cls): - cls._server_process.terminate() - cls._server_process.join() - cls._live_server_modified_settings.disable() + # The cleanup is now handled by addClassCleanup in _start_server_thread super().tearDownClass() @classmethod From f3067b827db0b48db8573ddb360152ffdcb796b1 Mon Sep 17 00:00:00 2001 From: Mathieu Dupuy Date: Thu, 4 Sep 2025 11:44:43 +0200 Subject: [PATCH 2/8] unitest `enterClassContext` compatibility shim for python<3.11/django<5.1 --- channels/testing/live.py | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/channels/testing/live.py b/channels/testing/live.py index 3e3e3165..f343af5b 100644 --- a/channels/testing/live.py +++ b/channels/testing/live.py @@ -7,9 +7,27 @@ from django.test.testcases import TransactionTestCase from django.test.utils import modify_settings from django.utils.functional import classproperty +from django.utils.version import PY311 from channels.routing import get_default_application +if not PY311: + # Backport of unittest.case._enter_context() from Python 3.11. + def _enter_context(cm, addcleanup): + # Look up the special methods on the type to match the with statement. + cls = type(cm) + try: + enter = cls.__enter__ + exit = cls.__exit__ + except AttributeError: + raise TypeError( + f"'{cls.__module__}.{cls.__qualname__}' object does not support the " + f"context manager protocol" + ) from None + result = enter(cm) + addcleanup(exit, cm, None, None, None) + return result + def make_application(*, static_wrapper): # Module-level function for pickle-ability @@ -124,6 +142,12 @@ class ChannelsLiveServerTestCase(TransactionTestCase): static_handler = ASGIStaticFilesHandler serve_static = True + if not PY311: + # Backport of unittest.TestCase.enterClassContext() from Python 3.11. + @classmethod + def enterClassContext(cls, cm): + return _enter_context(cm, cls.addClassCleanup) + @classproperty def live_server_url(cls): return "http://%s:%s" % (cls.host, cls.server_thread.port) @@ -193,11 +217,6 @@ def _terminate_thread(cls): for conn in cls.server_thread.connections_override.values(): conn.dec_thread_sharing() - @classmethod - def tearDownClass(cls): - # The cleanup is now handled by addClassCleanup in _start_server_thread - super().tearDownClass() - @classmethod def _is_in_memory_db(cls, connection): """ From a51d321961e5d2d518f3a384961f497f098554ab Mon Sep 17 00:00:00 2001 From: Mathieu Dupuy Date: Sat, 6 Sep 2025 00:45:02 +0200 Subject: [PATCH 3/8] temp workpoint --- channels/testing/live.py | 30 +++++++++------------ tests/sample_project/tests/test_selenium.py | 1 - 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/channels/testing/live.py b/channels/testing/live.py index f343af5b..6fd318a0 100644 --- a/channels/testing/live.py +++ b/channels/testing/live.py @@ -1,5 +1,4 @@ import threading -from functools import partial from django.contrib.staticfiles.handlers import ASGIStaticFilesHandler from django.db import connections @@ -11,6 +10,7 @@ from channels.routing import get_default_application + if not PY311: # Backport of unittest.case._enter_context() from Python 3.11. def _enter_context(cm, addcleanup): @@ -49,14 +49,15 @@ def set_database_connection(): class ChannelsLiveServerThread(threading.Thread): """Thread for running a live ASGI server while the tests are running.""" - def __init__( - self, host, get_application, connections_override=None, port=0, setup=None - ): + # We don't use a traditional server_class like Django since we use Daphne, + # but we keep this for API consistency + server_class = None + + def __init__(self, host, static_handler, connections_override=None, port=0): self.host = host self.port = port - self.get_application = get_application + self.static_handler = static_handler self.connections_override = connections_override - self.setup = setup self.is_ready = threading.Event() self.error = None super().__init__() @@ -81,8 +82,8 @@ def run(self): from daphne.endpoints import build_endpoint_description_strings from daphne.server import Server - # Get the application - application = self.get_application() + # Create the application (similar to Django's pattern) + application = make_application(static_wrapper=self.static_handler) # Create the server endpoints = build_endpoint_description_strings( @@ -96,9 +97,8 @@ def run(self): verbosity=0, ) - # Run setup if provided - if self.setup is not None: - self.setup() + # Run database setup + set_database_connection() # Start the server self.server.run() @@ -140,7 +140,6 @@ class ChannelsLiveServerTestCase(TransactionTestCase): port = 0 server_thread_class = ChannelsLiveServerThread static_handler = ASGIStaticFilesHandler - serve_static = True if not PY311: # Backport of unittest.TestCase.enterClassContext() from Python 3.11. @@ -197,16 +196,11 @@ def _start_server_thread(cls): @classmethod def _create_server_thread(cls, connections_override): - get_application = partial( - make_application, - static_wrapper=cls.static_handler if cls.serve_static else None, - ) return cls.server_thread_class( cls.host, - get_application, + cls.static_handler, connections_override=connections_override, port=cls.port, - setup=set_database_connection, ) @classmethod diff --git a/tests/sample_project/tests/test_selenium.py b/tests/sample_project/tests/test_selenium.py index 0f875a1e..16840e5e 100644 --- a/tests/sample_project/tests/test_selenium.py +++ b/tests/sample_project/tests/test_selenium.py @@ -8,7 +8,6 @@ class TestSampleApp(SeleniumMixin, ChannelsLiveServerTestCase): - serve_static = True def setUp(self): super().setUp() From 3a64d9dfe34c337696d8e3b18d3212f891bed7b7 Mon Sep 17 00:00:00 2001 From: Mathieu Dupuy Date: Sat, 6 Sep 2025 00:59:02 +0200 Subject: [PATCH 4/8] make ChannelsLiveServerThread closer to LiveServerThread --- channels/testing/live.py | 50 +++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/channels/testing/live.py b/channels/testing/live.py index 6fd318a0..c87fef4f 100644 --- a/channels/testing/live.py +++ b/channels/testing/live.py @@ -9,6 +9,7 @@ from django.utils.version import PY311 from channels.routing import get_default_application +from daphne.testing import _reinstall_reactor if not PY311: @@ -49,17 +50,15 @@ def set_database_connection(): class ChannelsLiveServerThread(threading.Thread): """Thread for running a live ASGI server while the tests are running.""" - # We don't use a traditional server_class like Django since we use Daphne, - # but we keep this for API consistency server_class = None def __init__(self, host, static_handler, connections_override=None, port=0): self.host = host self.port = port - self.static_handler = static_handler - self.connections_override = connections_override self.is_ready = threading.Event() self.error = None + self.static_handler = static_handler + self.connections_override = connections_override super().__init__() def run(self): @@ -72,29 +71,18 @@ def run(self): # provided by the main thread. for alias, conn in self.connections_override.items(): connections[alias] = conn - try: # Reinstall the reactor for this thread (same as DaphneProcess) - from daphne.testing import _reinstall_reactor _reinstall_reactor() - from daphne.endpoints import build_endpoint_description_strings - from daphne.server import Server - # Create the application (similar to Django's pattern) application = make_application(static_wrapper=self.static_handler) - - # Create the server - endpoints = build_endpoint_description_strings( - host=self.host, port=self.port - ) - self.server = Server( + + # Create the server using _create_server method + self.server = self._create_server( application=application, - endpoints=endpoints, - signal_handlers=False, - ready_callable=self._set_ready, - verbosity=0, + connections_override=self.connections_override, ) # Run database setup @@ -108,11 +96,19 @@ def run(self): finally: connections.close_all() - def _set_ready(self): - """Called by Daphne when the server is ready.""" - if self.server.listening_addresses: - self.port = self.server.listening_addresses[0][1] - self.is_ready.set() + def _create_server(self, application, connections_override=None): + """Create and configure the Daphne server.""" + from daphne.endpoints import build_endpoint_description_strings + from daphne.server import Server + + endpoints = build_endpoint_description_strings(host=self.host, port=self.port) + return Server( + application=application, + endpoints=endpoints, + signal_handlers=False, + ready_callable=self._set_ready, + verbosity=0, + ) def terminate(self): if hasattr(self, "server"): @@ -124,6 +120,12 @@ def terminate(self): self.join(timeout=5) + def _set_ready(self): + if self.server.listening_addresses: + self.port = self.server.listening_addresses[0][1] + self.is_ready.set() + + class ChannelsLiveServerTestCase(TransactionTestCase): """ Do basically the same as TransactionTestCase but also launch a live ASGI From c40d9070da222e065373d7c2f4c5a56c5c6f23f1 Mon Sep 17 00:00:00 2001 From: Mathieu Dupuy Date: Sat, 6 Sep 2025 01:11:30 +0200 Subject: [PATCH 5/8] inline make_application + self.httpd --- channels/testing/live.py | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/channels/testing/live.py b/channels/testing/live.py index c87fef4f..bf5931f3 100644 --- a/channels/testing/live.py +++ b/channels/testing/live.py @@ -8,9 +8,13 @@ from django.utils.functional import classproperty from django.utils.version import PY311 + + from channels.routing import get_default_application from daphne.testing import _reinstall_reactor - +from daphne.endpoints import build_endpoint_description_strings +from daphne.server import Server + if not PY311: # Backport of unittest.case._enter_context() from Python 3.11. @@ -30,12 +34,6 @@ def _enter_context(cm, addcleanup): return result -def make_application(*, static_wrapper): - # Module-level function for pickle-ability - application = get_default_application() - if static_wrapper is not None: - application = static_wrapper(application) - return application def set_database_connection(): @@ -77,10 +75,12 @@ def run(self): _reinstall_reactor() # Create the application (similar to Django's pattern) - application = make_application(static_wrapper=self.static_handler) + application = get_default_application() + if self.static_handler is not None: + application = self.static_handler(application) # Create the server using _create_server method - self.server = self._create_server( + self.httpd = self._create_server( application=application, connections_override=self.connections_override, ) @@ -89,7 +89,7 @@ def run(self): set_database_connection() # Start the server - self.server.run() + self.httpd.run() except Exception as e: self.error = e self.is_ready.set() @@ -97,10 +97,7 @@ def run(self): connections.close_all() def _create_server(self, application, connections_override=None): - """Create and configure the Daphne server.""" - from daphne.endpoints import build_endpoint_description_strings - from daphne.server import Server - + endpoints = build_endpoint_description_strings(host=self.host, port=self.port) return Server( application=application, @@ -111,7 +108,7 @@ def _create_server(self, application, connections_override=None): ) def terminate(self): - if hasattr(self, "server"): + if hasattr(self, "httpd"): # Stop the ASGI server from twisted.internet import reactor @@ -121,8 +118,8 @@ def terminate(self): def _set_ready(self): - if self.server.listening_addresses: - self.port = self.server.listening_addresses[0][1] + if self.httpd.listening_addresses: + self.port = self.httpd.listening_addresses[0][1] self.is_ready.set() From a4e9ed6febb5f98baa9876bdf2887b506e789790 Mon Sep 17 00:00:00 2001 From: Mathieu Dupuy Date: Fri, 24 Oct 2025 23:25:09 +0200 Subject: [PATCH 6/8] align closer with django's LiveServerTestCase * Add server_class attribute * Move application creation logic into _create_server * Remove unnecessary static_handler None check * Add connections_override parameter to _create_server * remove useless print --- channels/testing/live.py | 39 ++++++++++----------------------------- 1 file changed, 10 insertions(+), 29 deletions(-) diff --git a/channels/testing/live.py b/channels/testing/live.py index bf5931f3..8b0ec229 100644 --- a/channels/testing/live.py +++ b/channels/testing/live.py @@ -48,7 +48,7 @@ def set_database_connection(): class ChannelsLiveServerThread(threading.Thread): """Thread for running a live ASGI server while the tests are running.""" - server_class = None + server_class = Server def __init__(self, host, static_handler, connections_override=None, port=0): self.host = host @@ -71,24 +71,19 @@ def run(self): connections[alias] = conn try: # Reinstall the reactor for this thread (same as DaphneProcess) - _reinstall_reactor() - # Create the application (similar to Django's pattern) - application = get_default_application() - if self.static_handler is not None: - application = self.static_handler(application) - - # Create the server using _create_server method self.httpd = self._create_server( - application=application, connections_override=self.connections_override, ) + # If binding to port zero, assign the port allocated by the OS. + if self.port == 0: + self.port = self.httpd.listening_addresses[0][1] # Run database setup set_database_connection() - # Start the server + self.is_ready.set() self.httpd.run() except Exception as e: self.error = e @@ -96,14 +91,14 @@ def run(self): finally: connections.close_all() - def _create_server(self, application, connections_override=None): - + def _create_server(self, connections_override=None): endpoints = build_endpoint_description_strings(host=self.host, port=self.port) - return Server( + # Create the handler for serving static files + application = self.static_handler(get_default_application()) + return self.server_class( application=application, endpoints=endpoints, signal_handlers=False, - ready_callable=self._set_ready, verbosity=0, ) @@ -114,13 +109,7 @@ def terminate(self): if reactor.running: reactor.callFromThread(reactor.stop) - self.join(timeout=5) - - - def _set_ready(self): - if self.httpd.listening_addresses: - self.port = self.httpd.listening_addresses[0][1] - self.is_ready.set() + self.join() class ChannelsLiveServerTestCase(TransactionTestCase): @@ -209,11 +198,3 @@ def _terminate_thread(cls): # Restore shared connections' non-shareability. for conn in cls.server_thread.connections_override.values(): conn.dec_thread_sharing() - - @classmethod - def _is_in_memory_db(cls, connection): - """ - Check if DatabaseWrapper holds in memory database. - """ - if connection.vendor == "sqlite": - return connection.is_in_memory_db() From eb292bad06c4fbbb5a7a4d2cfe716f9c683be414 Mon Sep 17 00:00:00 2001 From: Mathieu Dupuy Date: Sat, 25 Oct 2025 15:56:05 +0200 Subject: [PATCH 7/8] restore '_server_is_ready' and join() timeout --- channels/testing/live.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/channels/testing/live.py b/channels/testing/live.py index 8b0ec229..7f896a1b 100644 --- a/channels/testing/live.py +++ b/channels/testing/live.py @@ -76,14 +76,11 @@ def run(self): self.httpd = self._create_server( connections_override=self.connections_override, ) - # If binding to port zero, assign the port allocated by the OS. - if self.port == 0: - self.port = self.httpd.listening_addresses[0][1] # Run database setup set_database_connection() - self.is_ready.set() + # The server will call ready_callable when ready self.httpd.run() except Exception as e: self.error = e @@ -99,9 +96,17 @@ def _create_server(self, connections_override=None): application=application, endpoints=endpoints, signal_handlers=False, + ready_callable=self._server_is_ready, verbosity=0, ) + def _server_is_ready(self): + """Called by Daphne when the server is ready and listening.""" + # If binding to port zero, assign the port allocated by the OS. + if self.port == 0: + self.port = self.httpd.listening_addresses[0][1] + self.is_ready.set() + def terminate(self): if hasattr(self, "httpd"): # Stop the ASGI server @@ -109,7 +114,7 @@ def terminate(self): if reactor.running: reactor.callFromThread(reactor.stop) - self.join() + self.join(timeout=5) class ChannelsLiveServerTestCase(TransactionTestCase): From 5bdb0fe1a81ee17ea55a74fbdec65684bc1c72cd Mon Sep 17 00:00:00 2001 From: Mathieu Dupuy Date: Sat, 25 Oct 2025 16:37:54 +0200 Subject: [PATCH 8/8] tox -e qa: flake8 flix + isort --- channels/testing/live.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/channels/testing/live.py b/channels/testing/live.py index 7f896a1b..eb57e44e 100644 --- a/channels/testing/live.py +++ b/channels/testing/live.py @@ -1,5 +1,8 @@ import threading +from daphne.endpoints import build_endpoint_description_strings +from daphne.server import Server +from daphne.testing import _reinstall_reactor from django.contrib.staticfiles.handlers import ASGIStaticFilesHandler from django.db import connections from django.db.backends.base.creation import TEST_DATABASE_PREFIX @@ -8,13 +11,7 @@ from django.utils.functional import classproperty from django.utils.version import PY311 - - from channels.routing import get_default_application -from daphne.testing import _reinstall_reactor -from daphne.endpoints import build_endpoint_description_strings -from daphne.server import Server - if not PY311: # Backport of unittest.case._enter_context() from Python 3.11. @@ -34,8 +31,6 @@ def _enter_context(cm, addcleanup): return result - - def set_database_connection(): from django.conf import settings