Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/engineio/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ def create_event(self):

async def _reset(self):
super()._reset()
while True: # pragma: no cover
try:
self.queue.get_nowait()
self.queue.task_done()
except self.queue_empty:
break
if not self.external_http: # pragma: no cover
if self.http and not self.http.closed:
await self.http.close()
Expand Down
9 changes: 9 additions & 0 deletions src/engineio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ def create_event(self, *args, **kwargs):
"""Create an event object."""
return threading.Event(*args, **kwargs)

def _reset(self):
super()._reset()
while True: # pragma: no cover
try:
self.queue.get_nowait()
self.queue.task_done()
except self.queue_empty:
break

def _connect_polling(self, url, headers, engineio_path):
"""Establish a long-polling connection to the Engine.IO server."""
if requests is None: # pragma: no cover
Expand Down
112 changes: 47 additions & 65 deletions tests/async/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@


class TestAsyncClient:
def mock_queue(self, client):
client.queue = mock.MagicMock()
client.queue_empty = RuntimeError
client.queue.get_nowait.side_effect = client.queue_empty
client.queue.get = mock.AsyncMock()
client.queue.put = mock.AsyncMock()
client.queue.join = mock.AsyncMock()

async def test_is_asyncio_based(self):
c = async_client.AsyncClient()
assert c.is_asyncio_based()
Expand Down Expand Up @@ -143,9 +151,7 @@ async def test_disconnect_polling(self):
base_client.connected_clients.append(c)
c.state = 'connected'
c.current_transport = 'polling'
c.queue = mock.MagicMock()
c.queue.put = mock.AsyncMock()
c.queue.join = mock.AsyncMock()
self.mock_queue(c)
c.read_loop_task = mock.AsyncMock()()
c.ws = mock.MagicMock()
c.ws.close = mock.AsyncMock()
Expand All @@ -162,9 +168,7 @@ async def test_disconnect_websocket(self):
base_client.connected_clients.append(c)
c.state = 'connected'
c.current_transport = 'websocket'
c.queue = mock.MagicMock()
c.queue.put = mock.AsyncMock()
c.queue.join = mock.AsyncMock()
self.mock_queue(c)
c.read_loop_task = mock.AsyncMock()()
c.ws = mock.MagicMock()
c.ws.close = mock.AsyncMock()
Expand All @@ -181,9 +185,7 @@ async def test_disconnect_polling_abort(self):
base_client.connected_clients.append(c)
c.state = 'connected'
c.current_transport = 'polling'
c.queue = mock.MagicMock()
c.queue.put = mock.AsyncMock()
c.queue.join = mock.AsyncMock()
self.mock_queue(c)
c.read_loop_task = mock.AsyncMock()()
c.ws = mock.MagicMock()
c.ws.close = mock.AsyncMock()
Expand All @@ -197,9 +199,7 @@ async def test_disconnect_websocket_abort(self):
base_client.connected_clients.append(c)
c.state = 'connected'
c.current_transport = 'websocket'
c.queue = mock.MagicMock()
c.queue.put = mock.AsyncMock()
c.queue.join = mock.AsyncMock()
self.mock_queue(c)
c.read_loop_task = mock.AsyncMock()()
c.ws = mock.MagicMock()
c.ws.close = mock.AsyncMock()
Expand Down Expand Up @@ -1014,8 +1014,7 @@ async def test_read_loop_polling_no_response(self, _time):
c.ping_timeout = 5
c.state = 'connected'
c.base_url = 'http://foo'
c.queue = mock.MagicMock()
c.queue.put = mock.AsyncMock()
self.mock_queue(c)
c._send_request = mock.AsyncMock(return_value=None)
c._trigger_event = mock.AsyncMock()
c.write_loop_task = mock.AsyncMock()()
Expand All @@ -1036,8 +1035,7 @@ async def test_read_loop_polling_bad_status(self, _time):
c.ping_timeout = 5
c.state = 'connected'
c.base_url = 'http://foo'
c.queue = mock.MagicMock()
c.queue.put = mock.AsyncMock()
self.mock_queue(c)
c._send_request = mock.AsyncMock()
c._send_request.return_value.status = 400
c.write_loop_task = mock.AsyncMock()()
Expand All @@ -1055,8 +1053,7 @@ async def test_read_loop_polling_bad_packet(self, _time):
c.ping_timeout = 60
c.state = 'connected'
c.base_url = 'http://foo'
c.queue = mock.MagicMock()
c.queue.put = mock.AsyncMock()
self.mock_queue(c)
c._send_request = mock.AsyncMock()
c._send_request.return_value.status = 200
c._send_request.return_value.read = mock.AsyncMock(return_value=b'foo')
Expand All @@ -1074,8 +1071,7 @@ async def test_read_loop_polling(self):
c.ping_timeout = 5
c.state = 'connected'
c.base_url = 'http://foo'
c.queue = mock.MagicMock()
c.queue.put = mock.AsyncMock()
self.mock_queue(c)
c._send_request = mock.AsyncMock()
c._send_request.side_effect = [
mock.MagicMock(
Expand Down Expand Up @@ -1114,8 +1110,7 @@ async def test_read_loop_websocket_timeout(self):
c.ping_timeout = 2
c.base_url = 'ws://foo'
c.state = 'connected'
c.queue = mock.MagicMock()
c.queue.put = mock.AsyncMock()
self.mock_queue(c)
c.ws = mock.MagicMock()
c.ws.receive = mock.AsyncMock(side_effect=asyncio.TimeoutError())
c.write_loop_task = mock.AsyncMock()()
Expand All @@ -1129,8 +1124,7 @@ async def test_read_loop_websocket_no_response(self):
c.ping_timeout = 2
c.base_url = 'ws://foo'
c.state = 'connected'
c.queue = mock.MagicMock()
c.queue.put = mock.AsyncMock()
self.mock_queue(c)
c.ws = mock.MagicMock()
c.ws.receive = mock.AsyncMock(
side_effect=aiohttp.client_exceptions.ServerDisconnectedError()
Expand All @@ -1146,8 +1140,7 @@ async def test_read_loop_websocket_unexpected_error(self):
c.ping_timeout = 2
c.base_url = 'ws://foo'
c.state = 'connected'
c.queue = mock.MagicMock()
c.queue.put = mock.AsyncMock()
self.mock_queue(c)
c.ws = mock.MagicMock()
c.ws.receive = mock.AsyncMock(side_effect=ValueError)
c.write_loop_task = mock.AsyncMock()()
Expand All @@ -1161,8 +1154,7 @@ async def test_read_loop_websocket(self):
c.ping_timeout = 2
c.base_url = 'ws://foo'
c.state = 'connected'
c.queue = mock.MagicMock()
c.queue.put = mock.AsyncMock()
self.mock_queue(c)
c.ws = mock.MagicMock()
c.ws.receive = mock.AsyncMock(
side_effect=[
Expand All @@ -1188,7 +1180,7 @@ async def test_write_loop_no_packets(self):
c.state = 'connected'
c.ping_interval = 1
c.ping_timeout = 2
c.queue = mock.MagicMock()
self.mock_queue(c)
c.queue.get = mock.AsyncMock(return_value=None)
await c._write_loop()
c.queue.task_done.assert_called_once_with()
Expand All @@ -1199,9 +1191,8 @@ async def test_write_loop_empty_queue(self):
c.state = 'connected'
c.ping_interval = 1
c.ping_timeout = 2
c.queue = mock.MagicMock()
c.queue_empty = RuntimeError
c.queue.get = mock.AsyncMock(side_effect=RuntimeError)
self.mock_queue(c)
c.queue.get = mock.AsyncMock(side_effect=c.queue_empty)
await c._write_loop()
c.queue.get.assert_awaited_once_with()

Expand All @@ -1212,15 +1203,14 @@ async def test_write_loop_polling_one_packet(self):
c.ping_interval = 1
c.ping_timeout = 2
c.current_transport = 'polling'
c.queue = mock.MagicMock()
c.queue_empty = RuntimeError
self.mock_queue(c)
c.queue.get = mock.AsyncMock(
side_effect=[
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
RuntimeError,
c.queue_empty,
]
)
c.queue.get_nowait = mock.MagicMock(side_effect=RuntimeError)
c.queue.get_nowait = mock.MagicMock(side_effect=c.queue_empty)
c._send_request = mock.AsyncMock()
c._send_request.return_value.status = 200
await c._write_loop()
Expand All @@ -1243,19 +1233,18 @@ async def test_write_loop_polling_three_packets(self):
c.ping_interval = 1
c.ping_timeout = 2
c.current_transport = 'polling'
c.queue = mock.MagicMock()
c.queue_empty = RuntimeError
self.mock_queue(c)
c.queue.get = mock.AsyncMock(
side_effect=[
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
RuntimeError,
c.queue_empty,
]
)
c.queue.get_nowait = mock.MagicMock(
side_effect=[
packet.Packet(packet.PING),
packet.Packet(packet.NOOP),
RuntimeError,
c.queue_empty,
]
)
c._send_request = mock.AsyncMock()
Expand Down Expand Up @@ -1284,12 +1273,11 @@ async def test_write_loop_polling_two_packets_done(self):
c.ping_interval = 1
c.ping_timeout = 2
c.current_transport = 'polling'
c.queue = mock.MagicMock()
c.queue_empty = RuntimeError
self.mock_queue(c)
c.queue.get = mock.AsyncMock(
side_effect=[
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
RuntimeError,
c.queue_empty,
]
)
c.queue.get_nowait = mock.MagicMock(
Expand Down Expand Up @@ -1321,12 +1309,11 @@ async def test_write_loop_polling_bad_connection(self):
c.ping_interval = 1
c.ping_timeout = 2
c.current_transport = 'polling'
c.queue = mock.MagicMock()
c.queue_empty = RuntimeError
self.mock_queue(c)
c.queue.get = mock.AsyncMock(
side_effect=[packet.Packet(packet.MESSAGE, {'foo': 'bar'})]
)
c.queue.get_nowait = mock.MagicMock(side_effect=[RuntimeError])
c.queue.get_nowait = mock.MagicMock(side_effect=[c.queue_empty])
c._send_request = mock.AsyncMock(return_value=None)
await c._write_loop()
assert c.queue.task_done.call_count == 1
Expand All @@ -1349,12 +1336,11 @@ async def test_write_loop_polling_bad_status(self):
c.ping_interval = 1
c.ping_timeout = 2
c.current_transport = 'polling'
c.queue = mock.MagicMock()
c.queue_empty = RuntimeError
self.mock_queue(c)
c.queue.get = mock.AsyncMock(
side_effect=[packet.Packet(packet.MESSAGE, {'foo': 'bar'})]
)
c.queue.get_nowait = mock.MagicMock(side_effect=[RuntimeError])
c.queue.get_nowait = mock.MagicMock(side_effect=[c.queue_empty])
c._send_request = mock.AsyncMock()
c._send_request.return_value.status = 500
await c._write_loop()
Expand All @@ -1378,15 +1364,14 @@ async def test_write_loop_websocket_one_packet(self):
c.ping_interval = 1
c.ping_timeout = 2
c.current_transport = 'websocket'
c.queue = mock.MagicMock()
c.queue_empty = RuntimeError
self.mock_queue(c)
c.queue.get = mock.AsyncMock(
side_effect=[
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
RuntimeError,
c.queue_empty,
]
)
c.queue.get_nowait = mock.MagicMock(side_effect=[RuntimeError])
c.queue.get_nowait = mock.MagicMock(side_effect=[c.queue_empty])
c.ws = mock.MagicMock()
c.ws.send_str = mock.AsyncMock()
await c._write_loop()
Expand All @@ -1400,19 +1385,18 @@ async def test_write_loop_websocket_three_packets(self):
c.ping_interval = 1
c.ping_timeout = 2
c.current_transport = 'websocket'
c.queue = mock.MagicMock()
c.queue_empty = RuntimeError
self.mock_queue(c)
c.queue.get = mock.AsyncMock(
side_effect=[
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
RuntimeError,
c.queue_empty,
]
)
c.queue.get_nowait = mock.MagicMock(
side_effect=[
packet.Packet(packet.PING),
packet.Packet(packet.NOOP),
RuntimeError,
c.queue_empty,
]
)
c.ws = mock.MagicMock()
Expand All @@ -1430,12 +1414,11 @@ async def test_write_loop_websocket_one_packet_binary(self):
c.ping_interval = 1
c.ping_timeout = 2
c.current_transport = 'websocket'
c.queue = mock.MagicMock()
c.queue_empty = RuntimeError
self.mock_queue(c)
c.queue.get = mock.AsyncMock(
side_effect=[packet.Packet(packet.MESSAGE, b'foo'), RuntimeError]
side_effect=[packet.Packet(packet.MESSAGE, b'foo'), c.queue_empty]
)
c.queue.get_nowait = mock.MagicMock(side_effect=[RuntimeError])
c.queue.get_nowait = mock.MagicMock(side_effect=[c.queue_empty])
c.ws = mock.MagicMock()
c.ws.send_bytes = mock.AsyncMock()
await c._write_loop()
Expand All @@ -1449,15 +1432,14 @@ async def test_write_loop_websocket_bad_connection(self):
c.ping_interval = 1
c.ping_timeout = 2
c.current_transport = 'websocket'
c.queue = mock.MagicMock()
c.queue_empty = RuntimeError
self.mock_queue(c)
c.queue.get = mock.AsyncMock(
side_effect=[
packet.Packet(packet.MESSAGE, {'foo': 'bar'}),
RuntimeError,
c.queue_empty,
]
)
c.queue.get_nowait = mock.MagicMock(side_effect=[RuntimeError])
c.queue.get_nowait = mock.MagicMock(side_effect=[c.queue_empty])
c.ws = mock.MagicMock()
c.ws.send_str = mock.AsyncMock(
side_effect=aiohttp.client_exceptions.ServerDisconnectedError()
Expand Down
Loading
Loading