From 9adf96f6c82c2b2a6ac9db87b4cb4127427b4e5b Mon Sep 17 00:00:00 2001 From: drew-wallace Date: Fri, 28 Aug 2020 14:17:46 -0400 Subject: [PATCH] Fix hanging close after Alpaca WebSocketException, add closing_timeout --- alpaca_trade_api/polygon/streamconn.py | 2 +- alpaca_trade_api/stream2.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/alpaca_trade_api/polygon/streamconn.py b/alpaca_trade_api/polygon/streamconn.py index 0068ae6f..1120fca8 100644 --- a/alpaca_trade_api/polygon/streamconn.py +++ b/alpaca_trade_api/polygon/streamconn.py @@ -33,7 +33,7 @@ async def connect(self): await self._dispatch({'ev': 'status', 'status': 'connecting', 'message': 'Connecting to Polygon'}) - self._ws = await websockets.connect(self._endpoint) + self._ws = await websockets.connect(self._endpoint, close_timeout=1) self._stream = self._recv() msg = await self._next() diff --git a/alpaca_trade_api/stream2.py b/alpaca_trade_api/stream2.py index e97619aa..7d56b195 100644 --- a/alpaca_trade_api/stream2.py +++ b/alpaca_trade_api/stream2.py @@ -30,7 +30,7 @@ def __init__(self, key_id: str, secret_key: str, base_url: URL): self._consume_task = None async def _connect(self): - ws = await websockets.connect(self._endpoint) + ws = await websockets.connect(self._endpoint, close_timeout=1) await ws.send(json.dumps({ 'action': 'authenticate', 'data': { @@ -80,7 +80,7 @@ async def _consume_msg(self): await self._dispatch(stream, msg) except websockets.WebSocketException as wse: logging.warn(wse) - await self.close() + self._ws = None asyncio.ensure_future(self._ensure_ws()) async def _ensure_ws(self): @@ -91,7 +91,7 @@ async def _ensure_ws(self): try: await self._connect() if self._streams: - await self.subscribe(self._streams) + await self.subscribe(list(self._streams)) break except websockets.WebSocketException as wse: logging.warn(wse)