Skip to content

Commit aa96e2d

Browse files
authored
fix: pending requests not forwarded on worker termination (#316)
When a new config is received that is supposed to terminate some workers, we blindly close connection between these workers and their receivers, which blocks the worker to be terminated forward remaining requests to it's receiver. In order to fix this, code was updated in a way that only the senders of the workers to be terminated will close their connection and the receivers of the workers to be terminated will close the connection when a broken world exception is raised, meaning that the worker is terminated and all requests were forwarded. Also, refactored the code so the cleanup is done in router only.
1 parent 2d3426c commit aa96e2d

File tree

2 files changed

+10
-15
lines changed

2 files changed

+10
-15
lines changed

infscale/execution/pipeline.py

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -134,14 +134,6 @@ async def _configure_control_channel(self, world_info: WorldInfo) -> None:
134134

135135
await world_info.channel.wait_readiness()
136136

137-
def _reset_multiworld(self, world_info: WorldInfo) -> None:
138-
self.world_manager.remove_world(world_info.multiworld_name)
139-
logger.info(f"remove world {world_info.multiworld_name} from multiworld")
140-
141-
def _reset_control_channel(self, world_info: WorldInfo) -> None:
142-
world_info.channel.cleanup()
143-
logger.info(f"remove world {world_info.name} from control channel")
144-
145137
async def _cleanup_recovered_worlds(self) -> None:
146138
"""Clean up world infos for recovered worlds."""
147139
world_infos = self.config_manager.get_world_infos()
@@ -165,8 +157,6 @@ async def _cleanup_recovered_worlds(self) -> None:
165157
wi = world_infos.get(world_info.name, None)
166158

167159
await self.router.cleanup_world(wi)
168-
self._reset_control_channel(wi)
169-
self._reset_multiworld(wi)
170160

171161
self.config_manager.remove_world_info(wi.name)
172162

@@ -217,10 +207,11 @@ async def _configure(self) -> None:
217207
# handle unnecessary world
218208
# remove is executed in the reverse order of add
219209
for world_info in worlds_to_remove:
220-
# 1. remove unnecessary world from control channel
221-
self._reset_control_channel(world_info)
222-
# 2. remove unnecessary world from multiworld
223-
self._reset_multiworld(world_info)
210+
# cleanup of control channel and multiworld was moved into router
211+
# since we need to do async world cleanup based on certain scenarios
212+
# sender can do the cleanup when new config is processed to stop
213+
# sending requests to failed / removed worker
214+
# received needs to keep waiting for requests until an exception is raised
224215

225216
self.config_manager.remove_world_info(world_info.name)
226217

infscale/execution/router.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ async def configure(
153153
if world_info.me == 0:
154154
continue
155155

156+
# do cleanup for send task and world
156157
await self.cleanup_world(world_info)
157158

158159
async def _recv(self, world_info: WorldInfo, cancellable: asyncio.Event) -> None:
@@ -201,7 +202,7 @@ async def wait_on_term_ready(self) -> None:
201202
async def cleanup_world(
202203
self, world_info: WorldInfo, cancel_task: bool = True
203204
) -> None:
204-
"""Cleanup state info on world."""
205+
"""Cleanup state info, channel and multiworld for world."""
205206
# reset tx q related to a given world info
206207
self._cleanup_tx_q(world_info)
207208

@@ -217,6 +218,9 @@ async def cleanup_world(
217218
except Exception as e:
218219
logger.warning(f"failed to cancel task for world {name}: {e}")
219220

221+
world_info.channel.cleanup()
222+
self.world_manager.remove_world(world_info.multiworld_name)
223+
220224
def _find_tx_q(self, world_info: WorldInfo) -> asyncio.Queue:
221225
for _, v in self.__tx_qs.items():
222226
for wi, q in v:

0 commit comments

Comments
 (0)