From 64f3cf9955feca5d9e8c3eb93ca5e790ec320f3d Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Fri, 26 Sep 2025 19:28:02 +0100 Subject: [PATCH 1/2] Reset client queue upon disconnection (Fixes #414) --- src/engineio/async_client.py | 6 ++++++ src/engineio/client.py | 9 +++++++++ tests/common/test_client.py | 23 +++++++++++++++++++++++ 3 files changed, 38 insertions(+) diff --git a/src/engineio/async_client.py b/src/engineio/async_client.py index 3c0acea..43f3a56 100644 --- a/src/engineio/async_client.py +++ b/src/engineio/async_client.py @@ -214,6 +214,12 @@ def create_event(self): async def _reset(self): super()._reset() + while True: # pragma: no cover + try: + self.queue.get_nowait() + self.queue.task_done() + except self.queue_empty: + break if not self.external_http: # pragma: no cover if self.http and not self.http.closed: await self.http.close() diff --git a/src/engineio/client.py b/src/engineio/client.py index 86c06ef..c04e080 100644 --- a/src/engineio/client.py +++ b/src/engineio/client.py @@ -173,6 +173,15 @@ def create_event(self, *args, **kwargs): """Create an event object.""" return threading.Event(*args, **kwargs) + def _reset(self): + super()._reset() + while True: # pragma: no cover + try: + self.queue.get_nowait() + self.queue.task_done() + except self.queue_empty: + break + def _connect_polling(self, url, headers, engineio_path): """Establish a long-polling connection to the Engine.IO server.""" if requests is None: # pragma: no cover diff --git a/tests/common/test_client.py b/tests/common/test_client.py index b3322a8..45d0faf 100644 --- a/tests/common/test_client.py +++ b/tests/common/test_client.py @@ -210,6 +210,7 @@ def test_disconnect_polling(self): c.state = 'connected' c.current_transport = 'polling' c.queue = mock.MagicMock() + c.queue.get_nowait.side_effect = c.queue_empty c.read_loop_task = mock.MagicMock() c.ws = mock.MagicMock() c._trigger_event = mock.MagicMock() @@ -226,6 +227,7 @@ def test_disconnect_websocket(self): c.state = 'connected' c.current_transport = 'websocket' c.queue = mock.MagicMock() + c.queue.get_nowait.side_effect = c.queue_empty c.read_loop_task = mock.MagicMock() c.ws = mock.MagicMock() c._trigger_event = mock.MagicMock() @@ -242,6 +244,7 @@ def test_disconnect_polling_abort(self): c.state = 'connected' c.current_transport = 'polling' c.queue = mock.MagicMock() + c.queue.get_nowait.side_effect = c.queue_empty c.read_loop_task = mock.MagicMock() c.ws = mock.MagicMock() c.disconnect(abort=True) @@ -256,6 +259,7 @@ def test_disconnect_websocket_abort(self): c.state = 'connected' c.current_transport = 'websocket' c.queue = mock.MagicMock() + c.queue.get_nowait.side_effect = c.queue_empty c.read_loop_task = mock.MagicMock() c.ws = mock.MagicMock() c.disconnect(abort=True) @@ -1333,6 +1337,7 @@ def test_read_loop_polling_no_response(self, _time): c.state = 'connected' c.base_url = 'http://foo' c.queue = mock.MagicMock() + c.queue.get_nowait.side_effect = c.queue_empty c._send_request = mock.MagicMock(return_value=None) c._trigger_event = mock.MagicMock() c.write_loop_task = mock.MagicMock() @@ -1354,6 +1359,7 @@ def test_read_loop_polling_bad_status(self, _time): c.state = 'connected' c.base_url = 'http://foo' c.queue = mock.MagicMock() + c.queue.get_nowait.side_effect = c.queue_empty c._send_request = mock.MagicMock() c._send_request.return_value.status_code = 400 c.write_loop_task = mock.MagicMock() @@ -1373,6 +1379,7 @@ def test_read_loop_polling_bad_packet(self, _time): c.state = 'connected' c.base_url = 'http://foo' c.queue = mock.MagicMock() + c.queue.get_nowait.side_effect = c.queue_empty c._send_request = mock.MagicMock() c._send_request.return_value.status_code = 200 c._send_request.return_value.content = b'foo' @@ -1392,6 +1399,7 @@ def test_read_loop_polling(self): c.state = 'connected' c.base_url = 'http://foo' c.queue = mock.MagicMock() + c.queue.get_nowait.side_effect = c.queue_empty c._send_request = mock.MagicMock() c._send_request.side_effect = [ mock.MagicMock( @@ -1426,6 +1434,7 @@ def test_read_loop_websocket_timeout(self): c = client.Client() c.state = 'connected' c.queue = mock.MagicMock() + c.queue.get_nowait.side_effect = c.queue_empty c.ws = mock.MagicMock() c.ws.recv.side_effect = websocket.WebSocketTimeoutException c.write_loop_task = mock.MagicMock() @@ -1438,6 +1447,7 @@ def test_read_loop_websocket_no_response(self): c = client.Client() c.state = 'connected' c.queue = mock.MagicMock() + c.queue.get_nowait.side_effect = c.queue_empty c.ws = mock.MagicMock() c.ws.recv.side_effect = websocket.WebSocketConnectionClosedException c.write_loop_task = mock.MagicMock() @@ -1450,6 +1460,7 @@ def test_read_loop_websocket_unexpected_error(self): c = client.Client() c.state = 'connected' c.queue = mock.MagicMock() + c.queue.get_nowait.side_effect = c.queue_empty c.ws = mock.MagicMock() c.ws.recv.side_effect = ValueError c.write_loop_task = mock.MagicMock() @@ -1464,6 +1475,7 @@ def test_read_loop_websocket(self): c.ping_timeout = 2 c.state = 'connected' c.queue = mock.MagicMock() + c.queue.get_nowait.side_effect = c.queue_empty c.ws = mock.MagicMock() c.ws.recv.side_effect = [ packet.Packet(packet.PING).encode(), @@ -1489,6 +1501,7 @@ def test_write_loop_no_packets(self): c.ping_interval = 1 c.ping_timeout = 2 c.queue = mock.MagicMock() + c.queue.get_nowait.side_effect = c.queue_empty c.queue.get.return_value = None c._write_loop() c.queue.task_done.assert_called_once_with() @@ -1501,6 +1514,7 @@ def test_write_loop_empty_queue(self): c.ping_timeout = 2 c.queue = mock.MagicMock() c.queue_empty = RuntimeError + c.queue.get_nowait.side_effect = c.queue_empty c.queue.get.side_effect = RuntimeError c._write_loop() c.queue.get.assert_called_once_with(timeout=7) @@ -1514,6 +1528,7 @@ def test_write_loop_polling_one_packet(self): c.current_transport = 'polling' c.queue = mock.MagicMock() c.queue_empty = RuntimeError + c.queue.get_nowait.side_effect = c.queue_empty c.queue.get.side_effect = [ packet.Packet(packet.MESSAGE, {'foo': 'bar'}), RuntimeError, @@ -1543,6 +1558,7 @@ def test_write_loop_polling_three_packets(self): c.current_transport = 'polling' c.queue = mock.MagicMock() c.queue_empty = RuntimeError + c.queue.get_nowait.side_effect = c.queue_empty c.queue.get.side_effect = [ packet.Packet(packet.MESSAGE, {'foo': 'bar'}), packet.Packet(packet.PING), @@ -1578,6 +1594,7 @@ def test_write_loop_polling_two_packets_done(self): c.current_transport = 'polling' c.queue = mock.MagicMock() c.queue_empty = RuntimeError + c.queue.get_nowait.side_effect = c.queue_empty c.queue.get.side_effect = [ packet.Packet(packet.MESSAGE, {'foo': 'bar'}), packet.Packet(packet.PING), @@ -1612,6 +1629,7 @@ def test_write_loop_polling_bad_connection(self): c.current_transport = 'polling' c.queue = mock.MagicMock() c.queue_empty = RuntimeError + c.queue.get_nowait.side_effect = c.queue_empty c.queue.get.side_effect = [ packet.Packet(packet.MESSAGE, {'foo': 'bar'}), RuntimeError, @@ -1641,6 +1659,7 @@ def test_write_loop_polling_bad_status(self): c.current_transport = 'polling' c.queue = mock.MagicMock() c.queue_empty = RuntimeError + c.queue.get_nowait.side_effect = c.queue_empty c.queue.get.side_effect = [ packet.Packet(packet.MESSAGE, {'foo': 'bar'}), RuntimeError, @@ -1670,6 +1689,7 @@ def test_write_loop_websocket_one_packet(self): c.current_transport = 'websocket' c.queue = mock.MagicMock() c.queue_empty = RuntimeError + c.queue.get_nowait.side_effect = c.queue_empty c.queue.get.side_effect = [ packet.Packet(packet.MESSAGE, {'foo': 'bar'}), RuntimeError, @@ -1690,6 +1710,7 @@ def test_write_loop_websocket_three_packets(self): c.current_transport = 'websocket' c.queue = mock.MagicMock() c.queue_empty = RuntimeError + c.queue.get_nowait.side_effect = c.queue_empty c.queue.get.side_effect = [ packet.Packet(packet.MESSAGE, {'foo': 'bar'}), packet.Packet(packet.PING), @@ -1714,6 +1735,7 @@ def test_write_loop_websocket_one_packet_binary(self): c.current_transport = 'websocket' c.queue = mock.MagicMock() c.queue_empty = RuntimeError + c.queue.get_nowait.side_effect = c.queue_empty c.queue.get.side_effect = [ packet.Packet(packet.MESSAGE, b'foo'), RuntimeError, @@ -1734,6 +1756,7 @@ def test_write_loop_websocket_bad_connection(self): c.current_transport = 'websocket' c.queue = mock.MagicMock() c.queue_empty = RuntimeError + c.queue.get_nowait.side_effect = c.queue_empty c.queue.get.side_effect = [ packet.Packet(packet.MESSAGE, {'foo': 'bar'}), RuntimeError, From c5fc18eb92e03650e202e1eb0a5af9c52f98a4df Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Sat, 27 Sep 2025 20:43:13 +0100 Subject: [PATCH 2/2] fix queue mocks in client unit tests --- tests/async/test_client.py | 112 +++++++++++++++------------------- tests/common/test_client.py | 116 ++++++++++++++---------------------- 2 files changed, 91 insertions(+), 137 deletions(-) diff --git a/tests/async/test_client.py b/tests/async/test_client.py index bd3acf4..59563a1 100644 --- a/tests/async/test_client.py +++ b/tests/async/test_client.py @@ -16,6 +16,14 @@ class TestAsyncClient: + def mock_queue(self, client): + client.queue = mock.MagicMock() + client.queue_empty = RuntimeError + client.queue.get_nowait.side_effect = client.queue_empty + client.queue.get = mock.AsyncMock() + client.queue.put = mock.AsyncMock() + client.queue.join = mock.AsyncMock() + async def test_is_asyncio_based(self): c = async_client.AsyncClient() assert c.is_asyncio_based() @@ -143,9 +151,7 @@ async def test_disconnect_polling(self): base_client.connected_clients.append(c) c.state = 'connected' c.current_transport = 'polling' - c.queue = mock.MagicMock() - c.queue.put = mock.AsyncMock() - c.queue.join = mock.AsyncMock() + self.mock_queue(c) c.read_loop_task = mock.AsyncMock()() c.ws = mock.MagicMock() c.ws.close = mock.AsyncMock() @@ -162,9 +168,7 @@ async def test_disconnect_websocket(self): base_client.connected_clients.append(c) c.state = 'connected' c.current_transport = 'websocket' - c.queue = mock.MagicMock() - c.queue.put = mock.AsyncMock() - c.queue.join = mock.AsyncMock() + self.mock_queue(c) c.read_loop_task = mock.AsyncMock()() c.ws = mock.MagicMock() c.ws.close = mock.AsyncMock() @@ -181,9 +185,7 @@ async def test_disconnect_polling_abort(self): base_client.connected_clients.append(c) c.state = 'connected' c.current_transport = 'polling' - c.queue = mock.MagicMock() - c.queue.put = mock.AsyncMock() - c.queue.join = mock.AsyncMock() + self.mock_queue(c) c.read_loop_task = mock.AsyncMock()() c.ws = mock.MagicMock() c.ws.close = mock.AsyncMock() @@ -197,9 +199,7 @@ async def test_disconnect_websocket_abort(self): base_client.connected_clients.append(c) c.state = 'connected' c.current_transport = 'websocket' - c.queue = mock.MagicMock() - c.queue.put = mock.AsyncMock() - c.queue.join = mock.AsyncMock() + self.mock_queue(c) c.read_loop_task = mock.AsyncMock()() c.ws = mock.MagicMock() c.ws.close = mock.AsyncMock() @@ -1014,8 +1014,7 @@ async def test_read_loop_polling_no_response(self, _time): c.ping_timeout = 5 c.state = 'connected' c.base_url = 'http://foo' - c.queue = mock.MagicMock() - c.queue.put = mock.AsyncMock() + self.mock_queue(c) c._send_request = mock.AsyncMock(return_value=None) c._trigger_event = mock.AsyncMock() c.write_loop_task = mock.AsyncMock()() @@ -1036,8 +1035,7 @@ async def test_read_loop_polling_bad_status(self, _time): c.ping_timeout = 5 c.state = 'connected' c.base_url = 'http://foo' - c.queue = mock.MagicMock() - c.queue.put = mock.AsyncMock() + self.mock_queue(c) c._send_request = mock.AsyncMock() c._send_request.return_value.status = 400 c.write_loop_task = mock.AsyncMock()() @@ -1055,8 +1053,7 @@ async def test_read_loop_polling_bad_packet(self, _time): c.ping_timeout = 60 c.state = 'connected' c.base_url = 'http://foo' - c.queue = mock.MagicMock() - c.queue.put = mock.AsyncMock() + self.mock_queue(c) c._send_request = mock.AsyncMock() c._send_request.return_value.status = 200 c._send_request.return_value.read = mock.AsyncMock(return_value=b'foo') @@ -1074,8 +1071,7 @@ async def test_read_loop_polling(self): c.ping_timeout = 5 c.state = 'connected' c.base_url = 'http://foo' - c.queue = mock.MagicMock() - c.queue.put = mock.AsyncMock() + self.mock_queue(c) c._send_request = mock.AsyncMock() c._send_request.side_effect = [ mock.MagicMock( @@ -1114,8 +1110,7 @@ async def test_read_loop_websocket_timeout(self): c.ping_timeout = 2 c.base_url = 'ws://foo' c.state = 'connected' - c.queue = mock.MagicMock() - c.queue.put = mock.AsyncMock() + self.mock_queue(c) c.ws = mock.MagicMock() c.ws.receive = mock.AsyncMock(side_effect=asyncio.TimeoutError()) c.write_loop_task = mock.AsyncMock()() @@ -1129,8 +1124,7 @@ async def test_read_loop_websocket_no_response(self): c.ping_timeout = 2 c.base_url = 'ws://foo' c.state = 'connected' - c.queue = mock.MagicMock() - c.queue.put = mock.AsyncMock() + self.mock_queue(c) c.ws = mock.MagicMock() c.ws.receive = mock.AsyncMock( side_effect=aiohttp.client_exceptions.ServerDisconnectedError() @@ -1146,8 +1140,7 @@ async def test_read_loop_websocket_unexpected_error(self): c.ping_timeout = 2 c.base_url = 'ws://foo' c.state = 'connected' - c.queue = mock.MagicMock() - c.queue.put = mock.AsyncMock() + self.mock_queue(c) c.ws = mock.MagicMock() c.ws.receive = mock.AsyncMock(side_effect=ValueError) c.write_loop_task = mock.AsyncMock()() @@ -1161,8 +1154,7 @@ async def test_read_loop_websocket(self): c.ping_timeout = 2 c.base_url = 'ws://foo' c.state = 'connected' - c.queue = mock.MagicMock() - c.queue.put = mock.AsyncMock() + self.mock_queue(c) c.ws = mock.MagicMock() c.ws.receive = mock.AsyncMock( side_effect=[ @@ -1188,7 +1180,7 @@ async def test_write_loop_no_packets(self): c.state = 'connected' c.ping_interval = 1 c.ping_timeout = 2 - c.queue = mock.MagicMock() + self.mock_queue(c) c.queue.get = mock.AsyncMock(return_value=None) await c._write_loop() c.queue.task_done.assert_called_once_with() @@ -1199,9 +1191,8 @@ async def test_write_loop_empty_queue(self): c.state = 'connected' c.ping_interval = 1 c.ping_timeout = 2 - c.queue = mock.MagicMock() - c.queue_empty = RuntimeError - c.queue.get = mock.AsyncMock(side_effect=RuntimeError) + self.mock_queue(c) + c.queue.get = mock.AsyncMock(side_effect=c.queue_empty) await c._write_loop() c.queue.get.assert_awaited_once_with() @@ -1212,15 +1203,14 @@ async def test_write_loop_polling_one_packet(self): c.ping_interval = 1 c.ping_timeout = 2 c.current_transport = 'polling' - c.queue = mock.MagicMock() - c.queue_empty = RuntimeError + self.mock_queue(c) c.queue.get = mock.AsyncMock( side_effect=[ packet.Packet(packet.MESSAGE, {'foo': 'bar'}), - RuntimeError, + c.queue_empty, ] ) - c.queue.get_nowait = mock.MagicMock(side_effect=RuntimeError) + c.queue.get_nowait = mock.MagicMock(side_effect=c.queue_empty) c._send_request = mock.AsyncMock() c._send_request.return_value.status = 200 await c._write_loop() @@ -1243,19 +1233,18 @@ async def test_write_loop_polling_three_packets(self): c.ping_interval = 1 c.ping_timeout = 2 c.current_transport = 'polling' - c.queue = mock.MagicMock() - c.queue_empty = RuntimeError + self.mock_queue(c) c.queue.get = mock.AsyncMock( side_effect=[ packet.Packet(packet.MESSAGE, {'foo': 'bar'}), - RuntimeError, + c.queue_empty, ] ) c.queue.get_nowait = mock.MagicMock( side_effect=[ packet.Packet(packet.PING), packet.Packet(packet.NOOP), - RuntimeError, + c.queue_empty, ] ) c._send_request = mock.AsyncMock() @@ -1284,12 +1273,11 @@ async def test_write_loop_polling_two_packets_done(self): c.ping_interval = 1 c.ping_timeout = 2 c.current_transport = 'polling' - c.queue = mock.MagicMock() - c.queue_empty = RuntimeError + self.mock_queue(c) c.queue.get = mock.AsyncMock( side_effect=[ packet.Packet(packet.MESSAGE, {'foo': 'bar'}), - RuntimeError, + c.queue_empty, ] ) c.queue.get_nowait = mock.MagicMock( @@ -1321,12 +1309,11 @@ async def test_write_loop_polling_bad_connection(self): c.ping_interval = 1 c.ping_timeout = 2 c.current_transport = 'polling' - c.queue = mock.MagicMock() - c.queue_empty = RuntimeError + self.mock_queue(c) c.queue.get = mock.AsyncMock( side_effect=[packet.Packet(packet.MESSAGE, {'foo': 'bar'})] ) - c.queue.get_nowait = mock.MagicMock(side_effect=[RuntimeError]) + c.queue.get_nowait = mock.MagicMock(side_effect=[c.queue_empty]) c._send_request = mock.AsyncMock(return_value=None) await c._write_loop() assert c.queue.task_done.call_count == 1 @@ -1349,12 +1336,11 @@ async def test_write_loop_polling_bad_status(self): c.ping_interval = 1 c.ping_timeout = 2 c.current_transport = 'polling' - c.queue = mock.MagicMock() - c.queue_empty = RuntimeError + self.mock_queue(c) c.queue.get = mock.AsyncMock( side_effect=[packet.Packet(packet.MESSAGE, {'foo': 'bar'})] ) - c.queue.get_nowait = mock.MagicMock(side_effect=[RuntimeError]) + c.queue.get_nowait = mock.MagicMock(side_effect=[c.queue_empty]) c._send_request = mock.AsyncMock() c._send_request.return_value.status = 500 await c._write_loop() @@ -1378,15 +1364,14 @@ async def test_write_loop_websocket_one_packet(self): c.ping_interval = 1 c.ping_timeout = 2 c.current_transport = 'websocket' - c.queue = mock.MagicMock() - c.queue_empty = RuntimeError + self.mock_queue(c) c.queue.get = mock.AsyncMock( side_effect=[ packet.Packet(packet.MESSAGE, {'foo': 'bar'}), - RuntimeError, + c.queue_empty, ] ) - c.queue.get_nowait = mock.MagicMock(side_effect=[RuntimeError]) + c.queue.get_nowait = mock.MagicMock(side_effect=[c.queue_empty]) c.ws = mock.MagicMock() c.ws.send_str = mock.AsyncMock() await c._write_loop() @@ -1400,19 +1385,18 @@ async def test_write_loop_websocket_three_packets(self): c.ping_interval = 1 c.ping_timeout = 2 c.current_transport = 'websocket' - c.queue = mock.MagicMock() - c.queue_empty = RuntimeError + self.mock_queue(c) c.queue.get = mock.AsyncMock( side_effect=[ packet.Packet(packet.MESSAGE, {'foo': 'bar'}), - RuntimeError, + c.queue_empty, ] ) c.queue.get_nowait = mock.MagicMock( side_effect=[ packet.Packet(packet.PING), packet.Packet(packet.NOOP), - RuntimeError, + c.queue_empty, ] ) c.ws = mock.MagicMock() @@ -1430,12 +1414,11 @@ async def test_write_loop_websocket_one_packet_binary(self): c.ping_interval = 1 c.ping_timeout = 2 c.current_transport = 'websocket' - c.queue = mock.MagicMock() - c.queue_empty = RuntimeError + self.mock_queue(c) c.queue.get = mock.AsyncMock( - side_effect=[packet.Packet(packet.MESSAGE, b'foo'), RuntimeError] + side_effect=[packet.Packet(packet.MESSAGE, b'foo'), c.queue_empty] ) - c.queue.get_nowait = mock.MagicMock(side_effect=[RuntimeError]) + c.queue.get_nowait = mock.MagicMock(side_effect=[c.queue_empty]) c.ws = mock.MagicMock() c.ws.send_bytes = mock.AsyncMock() await c._write_loop() @@ -1449,15 +1432,14 @@ async def test_write_loop_websocket_bad_connection(self): c.ping_interval = 1 c.ping_timeout = 2 c.current_transport = 'websocket' - c.queue = mock.MagicMock() - c.queue_empty = RuntimeError + self.mock_queue(c) c.queue.get = mock.AsyncMock( side_effect=[ packet.Packet(packet.MESSAGE, {'foo': 'bar'}), - RuntimeError, + c.queue_empty, ] ) - c.queue.get_nowait = mock.MagicMock(side_effect=[RuntimeError]) + c.queue.get_nowait = mock.MagicMock(side_effect=[c.queue_empty]) c.ws = mock.MagicMock() c.ws.send_str = mock.AsyncMock( side_effect=aiohttp.client_exceptions.ServerDisconnectedError() diff --git a/tests/common/test_client.py b/tests/common/test_client.py index 45d0faf..a21a699 100644 --- a/tests/common/test_client.py +++ b/tests/common/test_client.py @@ -15,6 +15,11 @@ class TestClient: + def mock_queue(self, client): + client.queue = mock.MagicMock() + client.queue_empty = RuntimeError + client.queue.get_nowait.side_effect = client.queue_empty + def test_is_asyncio_based(self): c = client.Client() assert not c.is_asyncio_based() @@ -209,8 +214,7 @@ def test_disconnect_polling(self): base_client.connected_clients.append(c) c.state = 'connected' c.current_transport = 'polling' - c.queue = mock.MagicMock() - c.queue.get_nowait.side_effect = c.queue_empty + self.mock_queue(c) c.read_loop_task = mock.MagicMock() c.ws = mock.MagicMock() c._trigger_event = mock.MagicMock() @@ -226,8 +230,7 @@ def test_disconnect_websocket(self): base_client.connected_clients.append(c) c.state = 'connected' c.current_transport = 'websocket' - c.queue = mock.MagicMock() - c.queue.get_nowait.side_effect = c.queue_empty + self.mock_queue(c) c.read_loop_task = mock.MagicMock() c.ws = mock.MagicMock() c._trigger_event = mock.MagicMock() @@ -243,8 +246,7 @@ def test_disconnect_polling_abort(self): base_client.connected_clients.append(c) c.state = 'connected' c.current_transport = 'polling' - c.queue = mock.MagicMock() - c.queue.get_nowait.side_effect = c.queue_empty + self.mock_queue(c) c.read_loop_task = mock.MagicMock() c.ws = mock.MagicMock() c.disconnect(abort=True) @@ -258,8 +260,7 @@ def test_disconnect_websocket_abort(self): base_client.connected_clients.append(c) c.state = 'connected' c.current_transport = 'websocket' - c.queue = mock.MagicMock() - c.queue.get_nowait.side_effect = c.queue_empty + self.mock_queue(c) c.read_loop_task = mock.MagicMock() c.ws = mock.MagicMock() c.disconnect(abort=True) @@ -1336,8 +1337,7 @@ def test_read_loop_polling_no_response(self, _time): c.ping_timeout = 5 c.state = 'connected' c.base_url = 'http://foo' - c.queue = mock.MagicMock() - c.queue.get_nowait.side_effect = c.queue_empty + self.mock_queue(c) c._send_request = mock.MagicMock(return_value=None) c._trigger_event = mock.MagicMock() c.write_loop_task = mock.MagicMock() @@ -1358,8 +1358,7 @@ def test_read_loop_polling_bad_status(self, _time): c.ping_timeout = 5 c.state = 'connected' c.base_url = 'http://foo' - c.queue = mock.MagicMock() - c.queue.get_nowait.side_effect = c.queue_empty + self.mock_queue(c) c._send_request = mock.MagicMock() c._send_request.return_value.status_code = 400 c.write_loop_task = mock.MagicMock() @@ -1378,8 +1377,7 @@ def test_read_loop_polling_bad_packet(self, _time): c.ping_timeout = 60 c.state = 'connected' c.base_url = 'http://foo' - c.queue = mock.MagicMock() - c.queue.get_nowait.side_effect = c.queue_empty + self.mock_queue(c) c._send_request = mock.MagicMock() c._send_request.return_value.status_code = 200 c._send_request.return_value.content = b'foo' @@ -1398,8 +1396,7 @@ def test_read_loop_polling(self): c.ping_timeout = 5 c.state = 'connected' c.base_url = 'http://foo' - c.queue = mock.MagicMock() - c.queue.get_nowait.side_effect = c.queue_empty + self.mock_queue(c) c._send_request = mock.MagicMock() c._send_request.side_effect = [ mock.MagicMock( @@ -1433,8 +1430,7 @@ def test_read_loop_websocket_disconnected(self): def test_read_loop_websocket_timeout(self): c = client.Client() c.state = 'connected' - c.queue = mock.MagicMock() - c.queue.get_nowait.side_effect = c.queue_empty + self.mock_queue(c) c.ws = mock.MagicMock() c.ws.recv.side_effect = websocket.WebSocketTimeoutException c.write_loop_task = mock.MagicMock() @@ -1446,8 +1442,7 @@ def test_read_loop_websocket_timeout(self): def test_read_loop_websocket_no_response(self): c = client.Client() c.state = 'connected' - c.queue = mock.MagicMock() - c.queue.get_nowait.side_effect = c.queue_empty + self.mock_queue(c) c.ws = mock.MagicMock() c.ws.recv.side_effect = websocket.WebSocketConnectionClosedException c.write_loop_task = mock.MagicMock() @@ -1459,8 +1454,7 @@ def test_read_loop_websocket_no_response(self): def test_read_loop_websocket_unexpected_error(self): c = client.Client() c.state = 'connected' - c.queue = mock.MagicMock() - c.queue.get_nowait.side_effect = c.queue_empty + self.mock_queue(c) c.ws = mock.MagicMock() c.ws.recv.side_effect = ValueError c.write_loop_task = mock.MagicMock() @@ -1474,8 +1468,7 @@ def test_read_loop_websocket(self): c.ping_interval = 1 c.ping_timeout = 2 c.state = 'connected' - c.queue = mock.MagicMock() - c.queue.get_nowait.side_effect = c.queue_empty + self.mock_queue(c) c.ws = mock.MagicMock() c.ws.recv.side_effect = [ packet.Packet(packet.PING).encode(), @@ -1500,8 +1493,7 @@ def test_write_loop_no_packets(self): c.state = 'connected' c.ping_interval = 1 c.ping_timeout = 2 - c.queue = mock.MagicMock() - c.queue.get_nowait.side_effect = c.queue_empty + self.mock_queue(c) c.queue.get.return_value = None c._write_loop() c.queue.task_done.assert_called_once_with() @@ -1512,10 +1504,8 @@ def test_write_loop_empty_queue(self): c.state = 'connected' c.ping_interval = 1 c.ping_timeout = 2 - c.queue = mock.MagicMock() - c.queue_empty = RuntimeError - c.queue.get_nowait.side_effect = c.queue_empty - c.queue.get.side_effect = RuntimeError + self.mock_queue(c) + c.queue.get.side_effect = c.queue_empty c._write_loop() c.queue.get.assert_called_once_with(timeout=7) @@ -1526,13 +1516,11 @@ def test_write_loop_polling_one_packet(self): c.ping_interval = 1 c.ping_timeout = 2 c.current_transport = 'polling' - c.queue = mock.MagicMock() - c.queue_empty = RuntimeError - c.queue.get_nowait.side_effect = c.queue_empty + self.mock_queue(c) c.queue.get.side_effect = [ packet.Packet(packet.MESSAGE, {'foo': 'bar'}), - RuntimeError, - RuntimeError, + c.queue_empty, + c.queue_empty, ] c._send_request = mock.MagicMock() c._send_request.return_value.status_code = 200 @@ -1556,15 +1544,13 @@ def test_write_loop_polling_three_packets(self): c.ping_interval = 1 c.ping_timeout = 2 c.current_transport = 'polling' - c.queue = mock.MagicMock() - c.queue_empty = RuntimeError - c.queue.get_nowait.side_effect = c.queue_empty + self.mock_queue(c) c.queue.get.side_effect = [ packet.Packet(packet.MESSAGE, {'foo': 'bar'}), packet.Packet(packet.PING), packet.Packet(packet.NOOP), - RuntimeError, - RuntimeError, + c.queue_empty, + c.queue_empty, ] c._send_request = mock.MagicMock() c._send_request.return_value.status_code = 200 @@ -1592,14 +1578,12 @@ def test_write_loop_polling_two_packets_done(self): c.ping_interval = 1 c.ping_timeout = 2 c.current_transport = 'polling' - c.queue = mock.MagicMock() - c.queue_empty = RuntimeError - c.queue.get_nowait.side_effect = c.queue_empty + self.mock_queue(c) c.queue.get.side_effect = [ packet.Packet(packet.MESSAGE, {'foo': 'bar'}), packet.Packet(packet.PING), None, - RuntimeError, + c.queue_empty, ] c._send_request = mock.MagicMock() c._send_request.return_value.status_code = 200 @@ -1627,12 +1611,10 @@ def test_write_loop_polling_bad_connection(self): c.ping_interval = 1 c.ping_timeout = 2 c.current_transport = 'polling' - c.queue = mock.MagicMock() - c.queue_empty = RuntimeError - c.queue.get_nowait.side_effect = c.queue_empty + self.mock_queue(c) c.queue.get.side_effect = [ packet.Packet(packet.MESSAGE, {'foo': 'bar'}), - RuntimeError, + c.queue_empty, ] c._send_request = mock.MagicMock() c._send_request.return_value = None @@ -1657,12 +1639,10 @@ def test_write_loop_polling_bad_status(self): c.ping_interval = 1 c.ping_timeout = 2 c.current_transport = 'polling' - c.queue = mock.MagicMock() - c.queue_empty = RuntimeError - c.queue.get_nowait.side_effect = c.queue_empty + self.mock_queue(c) c.queue.get.side_effect = [ packet.Packet(packet.MESSAGE, {'foo': 'bar'}), - RuntimeError, + c.queue_empty, ] c._send_request = mock.MagicMock() c._send_request.return_value.status_code = 500 @@ -1687,13 +1667,11 @@ def test_write_loop_websocket_one_packet(self): c.ping_interval = 1 c.ping_timeout = 2 c.current_transport = 'websocket' - c.queue = mock.MagicMock() - c.queue_empty = RuntimeError - c.queue.get_nowait.side_effect = c.queue_empty + self.mock_queue(c) c.queue.get.side_effect = [ packet.Packet(packet.MESSAGE, {'foo': 'bar'}), - RuntimeError, - RuntimeError, + c.queue_empty, + c.queue_empty, ] c.ws = mock.MagicMock() c._write_loop() @@ -1708,15 +1686,13 @@ def test_write_loop_websocket_three_packets(self): c.ping_interval = 1 c.ping_timeout = 2 c.current_transport = 'websocket' - c.queue = mock.MagicMock() - c.queue_empty = RuntimeError - c.queue.get_nowait.side_effect = c.queue_empty + self.mock_queue(c) c.queue.get.side_effect = [ packet.Packet(packet.MESSAGE, {'foo': 'bar'}), packet.Packet(packet.PING), packet.Packet(packet.NOOP), - RuntimeError, - RuntimeError, + c.queue_empty, + c.queue_empty, ] c.ws = mock.MagicMock() c._write_loop() @@ -1733,13 +1709,11 @@ def test_write_loop_websocket_one_packet_binary(self): c.ping_interval = 1 c.ping_timeout = 2 c.current_transport = 'websocket' - c.queue = mock.MagicMock() - c.queue_empty = RuntimeError - c.queue.get_nowait.side_effect = c.queue_empty + self.mock_queue(c) c.queue.get.side_effect = [ packet.Packet(packet.MESSAGE, b'foo'), - RuntimeError, - RuntimeError, + c.queue_empty, + c.queue_empty, ] c.ws = mock.MagicMock() c._write_loop() @@ -1754,13 +1728,11 @@ def test_write_loop_websocket_bad_connection(self): c.ping_interval = 1 c.ping_timeout = 2 c.current_transport = 'websocket' - c.queue = mock.MagicMock() - c.queue_empty = RuntimeError - c.queue.get_nowait.side_effect = c.queue_empty + self.mock_queue(c) c.queue.get.side_effect = [ packet.Packet(packet.MESSAGE, {'foo': 'bar'}), - RuntimeError, - RuntimeError, + c.queue_empty, + c.queue_empty, ] c.ws = mock.MagicMock() c.ws.send.side_effect = websocket.WebSocketConnectionClosedException