Skip to content

Commit 3f17316

Browse files
authored
Disconnect dispatcher at cleanup (#146)
fixes #145
2 parents 71dea72 + 31f6935 commit 3f17316

File tree

2 files changed

+12
-5
lines changed

2 files changed

+12
-5
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@
1515
## Bug Fixes
1616

1717
* Fixes reconnecting after connection loss for streams
18+
* Fixed an issue in the `Dispatcher` class where the client connection was not properly disconnected during cleanup, potentially causing unclosed socket errors.

src/frequenz/dispatch/_dispatcher.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,13 @@ def __init__(
220220
)
221221
self._actor_dispatchers: dict[str, ActorDispatcher] = {}
222222
self._empty_event = Event()
223-
self._empty_event.set()
223+
self._disconnecting_future: asyncio.Future[None] | None = None
224224

225225
@override
226226
def start(self) -> None:
227227
"""Start the local dispatch service."""
228228
self._bg_service.start()
229+
self._empty_event.set()
229230

230231
@property
231232
@override
@@ -235,19 +236,23 @@ def is_running(self) -> bool:
235236

236237
@override
237238
async def wait(self) -> None:
238-
"""Wait until all actor dispatches are stopped."""
239-
await asyncio.gather(self._bg_service.wait(), self._empty_event.wait())
239+
"""Wait until all actor dispatches are stopped and client is disconnected."""
240+
if self._disconnecting_future is not None:
241+
await self._disconnecting_future
240242

243+
await asyncio.gather(self._bg_service.wait(), self._empty_event.wait())
241244
self._actor_dispatchers.clear()
242245

243-
@override
244246
def cancel(self, msg: str | None = None) -> None:
245-
"""Stop the local dispatch service."""
247+
"""Stop the local dispatch service and initiate client disconnection."""
246248
self._bg_service.cancel(msg)
247249

248250
for instance in self._actor_dispatchers.values():
249251
instance.cancel()
250252

253+
# Initiate client disconnection asynchronously
254+
self._disconnecting_future = asyncio.ensure_future(self._client.disconnect())
255+
251256
async def wait_for_initialization(self) -> None:
252257
"""Wait until the background service is initialized."""
253258
await self._bg_service.wait_for_initialization()
@@ -358,6 +363,7 @@ async def __aenter__(self) -> Self:
358363
This background service.
359364
"""
360365
await super().__aenter__()
366+
await self._client.__aenter__()
361367
await self.wait_for_initialization()
362368
return self
363369

0 commit comments

Comments
 (0)