Skip to content

fix: reconnect when send message fails #295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -17,18 +17,15 @@ python-dateutil = "^2.8.1"
typing-extensions = "^4.12.2"
aiohttp = "^3.11.14"

[tool.poetry.dev-dependencies]
[tool.poetry.group.dev.dependencies]
Copy link
Preview

Copilot AI Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new dev dependency group only includes a subset of the dependencies previously defined. Please verify if the removal of dependencies like pytest, python-dotenv, pytest-asyncio, and coveralls is intentional.

Copilot is powered by AI, so mistakes are possible. Review output carefully before use.

pytest = "^8.3.5"
pytest-cov = "^5.0.0"
python-dotenv = "^1.1.0"
pytest-asyncio = "^0.26.0"
coveralls = "^3.0.0"

[tool.poetry.group.dev.dependencies]
black = ">=23.11,<26.0"
isort = "^6.0.1"
pre-commit = "^4.2.0"
pytest-cov = "^5.0.0"

[build-system]
requires = ["poetry-core>=1.0.0"]
69 changes: 37 additions & 32 deletions realtime/_async/client.py
Original file line number Diff line number Diff line change
@@ -74,7 +74,7 @@ def __init__(
self.access_token = token
self.send_buffer: List[Callable] = []
self.hb_interval = hb_interval
self.ws_connection: Optional[ClientProtocol] = None
self._ws_connection: Optional[ClientProtocol] = None
self.ref = 0
self.auto_reconnect = auto_reconnect
self.channels: Dict[str, AsyncRealtimeChannel] = {}
@@ -86,39 +86,31 @@ def __init__(

@property
def is_connected(self) -> bool:
return self.ws_connection is not None
return self._ws_connection is not None

async def _listen(self) -> None:
"""
An infinite loop that keeps listening.
:return: None
"""

if not self.ws_connection:
if not self._ws_connection:
raise Exception("WebSocket connection not established")

try:
async for msg in self.ws_connection:
async for msg in self._ws_connection:
logger.info(f"receive: {msg}")

msg = Message(**json.loads(msg))
channel = self.channels.get(msg.topic)

if channel:
channel._trigger(msg.event, msg.payload, msg.ref)
except websockets.exceptions.ConnectionClosedError as e:
logger.error(
f"WebSocket connection closed with code: {e.code}, reason: {e.reason}"
)
if self.auto_reconnect:
logger.info("Initiating auto-reconnect sequence...")

await self._reconnect()
else:
logger.error("Auto-reconnect disabled, terminating connection")
except Exception as e:
await self._on_connect_error(e)

async def _reconnect(self) -> None:
self.ws_connection = None
self._ws_connection = None
await self.connect()

if self.is_connected:
@@ -156,7 +148,7 @@ async def connect(self) -> None:
while retries < self.max_retries:
try:
ws = await connect(self.url)
self.ws_connection = ws
self._ws_connection = ws
logger.info("WebSocket connection established successfully")
return await self._on_connect()
except Exception as e:
@@ -197,6 +189,20 @@ async def _on_connect(self) -> None:
self._heartbeat_task = asyncio.create_task(self._heartbeat())
await self._flush_send_buffer()

async def _on_connect_error(self, e: Exception) -> None:
if isinstance(e, websockets.exceptions.ConnectionClosedError):
logger.error(
f"WebSocket connection closed with code: {e.code}, reason: {e.reason}"
)

if self.auto_reconnect:
logger.info("Initiating auto-reconnect sequence...")
await self._reconnect()
else:
logger.error("Auto-reconnect disabled, terminating connection")
else:
logger.error(f"Error on connect: {e}")

async def _flush_send_buffer(self):
if self.is_connected and len(self.send_buffer) > 0:
for callback in self.send_buffer:
@@ -214,10 +220,10 @@ async def close(self) -> None:
NotConnectedError: If the connection is not established when this method is called.
"""

if self.ws_connection:
await self.ws_connection.close()
if self._ws_connection:
await self._ws_connection.close()

self.ws_connection = None
self._ws_connection = None

if self._listen_task:
self._listen_task.cancel()
@@ -228,7 +234,7 @@ async def close(self) -> None:
self._heartbeat_task = None

async def _heartbeat(self) -> None:
if not self.ws_connection:
if not self._ws_connection:
raise Exception("WebSocket connection not established")

while self.is_connected:
@@ -242,17 +248,8 @@ async def _heartbeat(self) -> None:
await self.send(data)
await asyncio.sleep(max(self.hb_interval, 15))

except websockets.exceptions.ConnectionClosed as e:
logger.error(
f"Connection closed during heartbeat. Code: {e.code}, reason: {e.reason}"
)

if self.auto_reconnect:
logger.info("Heartbeat failed - initiating reconnection sequence")
await self._reconnect()
else:
logger.error("Heartbeat failed - auto-reconnect disabled")
break
except Exception as e:
await self._on_connect_error(e)

def channel(
self, topic: str, params: Optional[RealtimeChannelOptions] = None
@@ -373,7 +370,15 @@ async def send(self, message: Dict[str, Any]) -> None:
logger.info(f"send: {message}")

async def send_message():
await self.ws_connection.send(message)
if not self._ws_connection:
raise Exception(
"WebSocket connection not established, a connection is expected to be established before sending a message"
)

try:
await self._ws_connection.send(message)
Copy link
Preview

Copilot AI Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the send_message function, errors during message sending are caught and handled with _on_connect_error, but the caller is not informed of the failure. Consider propagating the exception after handling so that the failure is not silently suppressed.

Copilot is powered by AI, so mistakes are possible. Review output carefully before use.

except Exception as e:
await self._on_connect_error(e)

if self.is_connected:
await send_message()
46 changes: 44 additions & 2 deletions tests/test_connection.py
Original file line number Diff line number Diff line change
@@ -259,12 +259,12 @@ async def test_multiple_connect_attempts(socket: AsyncRealtimeClient):
# First connection should succeed
await socket.connect()
assert socket.is_connected
initial_ws = socket.ws_connection
initial_ws = socket._ws_connection

# Second connection attempt should be a no-op since we're already connected
await socket.connect()
assert socket.is_connected
assert socket.ws_connection == initial_ws # Should be the same connection object
assert socket._ws_connection == initial_ws # Should be the same connection object

await socket.close()
assert not socket.is_connected
@@ -311,3 +311,45 @@ async def test_multiple_connect_attempts(socket: AsyncRealtimeClient):
assert socket.is_connected

await socket.close()


@pytest.mark.asyncio
async def test_send_message_reconnection(socket: AsyncRealtimeClient):
# First establish a connection
await socket.connect()
assert socket.is_connected

# Create a channel and subscribe to it
channel = socket.channel("test-channel")
subscribe_event = asyncio.Event()
await channel.subscribe(
lambda state, _: (
subscribe_event.set()
if state == RealtimeSubscribeStates.SUBSCRIBED
else None
)
)
await asyncio.wait_for(subscribe_event.wait(), 5)

# Simulate a connection failure by closing the WebSocket
if socket._ws_connection:
await socket._ws_connection.close()

# Try to send a message - this should trigger reconnection
message = {
"topic": "test-channel",
"event": "test-event",
"payload": {"test": "data"},
}
await socket.send(message)

# Wait for reconnection to complete
await asyncio.sleep(1) # Give some time for reconnection

# Verify we're connected again
assert socket.is_connected

# Try sending another message to verify the connection is working
await socket.send(message)

await socket.close()