1616
1717from __future__ import annotations
1818
19+ import asyncio
1920import logging
2021import os
2122import queue
2930
3031from pymongo import _csot , common , helpers_shared , periodic_executor
3132from pymongo .asynchronous .client_session import _ServerSession , _ServerSessionPool
32- from pymongo .asynchronous .monitor import SrvMonitor
33+ from pymongo .asynchronous .monitor import MonitorBase , SrvMonitor
3334from pymongo .asynchronous .pool import Pool
3435from pymongo .asynchronous .server import Server
3536from pymongo .errors import (
@@ -207,6 +208,9 @@ async def target() -> bool:
207208 if self ._settings .fqdn is not None and not self ._settings .load_balanced :
208209 self ._srv_monitor = SrvMonitor (self , self ._settings )
209210
211+ # Stores all monitor tasks that need to be joined on close or server selection
212+ self ._monitor_tasks : list [MonitorBase ] = []
213+
210214 async def open (self ) -> None :
211215 """Start monitoring, or restart after a fork.
212216
@@ -241,6 +245,8 @@ async def open(self) -> None:
241245 # Close servers and clear the pools.
242246 for server in self ._servers .values ():
243247 await server .close ()
248+ if not _IS_SYNC :
249+ self ._monitor_tasks .append (server ._monitor )
244250 # Reset the session pool to avoid duplicate sessions in
245251 # the child process.
246252 self ._session_pool .reset ()
@@ -283,6 +289,10 @@ async def select_servers(
283289 else :
284290 server_timeout = server_selection_timeout
285291
292+ # Cleanup any completed monitor tasks safely
293+ if not _IS_SYNC and self ._monitor_tasks :
294+ await self .cleanup_monitors ()
295+
286296 async with self ._lock :
287297 server_descriptions = await self ._select_servers_loop (
288298 selector , server_timeout , operation , operation_id , address
@@ -520,6 +530,8 @@ async def _process_change(
520530 and self ._description .topology_type not in SRV_POLLING_TOPOLOGIES
521531 ):
522532 await self ._srv_monitor .close ()
533+ if not _IS_SYNC :
534+ self ._monitor_tasks .append (self ._srv_monitor )
523535
524536 # Clear the pool from a failed heartbeat.
525537 if reset_pool :
@@ -695,6 +707,8 @@ async def close(self) -> None:
695707 old_td = self ._description
696708 for server in self ._servers .values ():
697709 await server .close ()
710+ if not _IS_SYNC :
711+ self ._monitor_tasks .append (server ._monitor )
698712
699713 # Mark all servers Unknown.
700714 self ._description = self ._description .reset ()
@@ -705,6 +719,8 @@ async def close(self) -> None:
705719 # Stop SRV polling thread.
706720 if self ._srv_monitor :
707721 await self ._srv_monitor .close ()
722+ if not _IS_SYNC :
723+ self ._monitor_tasks .append (self ._srv_monitor )
708724
709725 self ._opened = False
710726 self ._closed = True
@@ -944,6 +960,8 @@ async def _update_servers(self) -> None:
944960 for address , server in list (self ._servers .items ()):
945961 if not self ._description .has_server (address ):
946962 await server .close ()
963+ if not _IS_SYNC :
964+ self ._monitor_tasks .append (server ._monitor )
947965 self ._servers .pop (address )
948966
949967 def _create_pool_for_server (self , address : _Address ) -> Pool :
@@ -1031,6 +1049,15 @@ def _error_message(self, selector: Callable[[Selection], Selection]) -> str:
10311049 else :
10321050 return "," .join (str (server .error ) for server in servers if server .error )
10331051
1052+ async def cleanup_monitors (self ) -> None :
1053+ tasks = []
1054+ try :
1055+ while self ._monitor_tasks :
1056+ tasks .append (self ._monitor_tasks .pop ())
1057+ except IndexError :
1058+ pass
1059+ await asyncio .gather (* [t .join () for t in tasks ], return_exceptions = True ) # type: ignore[func-returns-value]
1060+
10341061 def __repr__ (self ) -> str :
10351062 msg = ""
10361063 if not self ._opened :
0 commit comments