3232"""
3333
3434import asyncio
35- from concurrent .futures import ThreadPoolExecutor
3635from copy import deepcopy
3736from pathlib import Path
3837import time
3938from typing import TYPE_CHECKING , Dict , Optional , Set , cast
4039
40+ import zmq
41+
4142from cylc .flow .exceptions import WorkflowStopped
4243from cylc .flow .id import Tokens
4344from cylc .flow .network .server import PB_METHOD_MAP
@@ -105,7 +106,6 @@ def __init__(self, workflows_mgr, log, max_threads=10):
105106 self .w_subs : Dict [str , WorkflowSubscriber ] = {}
106107 self .topics = {ALL_DELTAS .encode ('utf-8' ), b'shutdown' }
107108 self .loop = None
108- self .executor = ThreadPoolExecutor (max_threads )
109109 self .delta_queues = {}
110110
111111 @log_call
@@ -163,10 +163,7 @@ async def connect_workflow(self, w_id, contact_data):
163163
164164 self .delta_queues [w_id ] = {}
165165
166- # Might be options other than threads to achieve
167- # non-blocking subscriptions, but this works.
168- self .executor .submit (
169- self ._start_subscription ,
166+ self ._start_subscription (
170167 w_id ,
171168 contact_data ['name' ],
172169 contact_data [CFF .HOST ],
@@ -256,11 +253,27 @@ def _start_subscription(self, w_id, reg, host, port):
256253 context = self .workflows_mgr .context ,
257254 topics = self .topics
258255 )
259- self .w_subs [w_id ].loop .run_until_complete (
260- self .w_subs [w_id ].subscribe (
261- process_delta_msg ,
262- func = self ._update_workflow_data ,
263- w_id = w_id ))
256+
257+ async def process_incoming_deltas (self ):
258+ w_empty = set ()
259+ while set (self .w_subs .keys ()).difference (w_empty ):
260+ for w_id , sub in self .w_subs .items ():
261+ if w_id in w_empty :
262+ continue
263+ try :
264+ [topic , delta ] = await sub .socket .recv_multipart (
265+ flags = zmq .NOBLOCK
266+ )
267+ except zmq .ZMQError :
268+ w_empty .add (w_id )
269+ continue
270+
271+ process_delta_msg (
272+ topic ,
273+ delta ,
274+ func = self ._update_workflow_data ,
275+ w_id = w_id
276+ )
264277
265278 def _update_workflow_data (self , topic , delta , w_id ):
266279 """Manage and apply incoming data-store deltas.
@@ -346,16 +359,15 @@ def _reconcile_update(self, topic, delta, w_id):
346359 self .log .debug (
347360 f'Out of sync with { topic } of { w_id } ... Reconciling.' )
348361 try :
349- # use threadsafe as client socket is in main loop thread.
350- future = asyncio .run_coroutine_threadsafe (
362+ task = asyncio .create_task (
351363 workflow_request (
352364 self .workflows_mgr .workflows [w_id ]['req_client' ],
353365 'pb_data_elements' ,
354366 args = {'element_type' : topic }
355367 ),
356368 self .loop
357369 )
358- new_delta_msg = future .result (self .RECONCILE_TIMEOUT )
370+ new_delta_msg = task .result (self .RECONCILE_TIMEOUT )
359371 new_delta = DELTAS_MAP [topic ]()
360372 new_delta .ParseFromString (new_delta_msg )
361373 self ._clear_data_field (w_id , topic )
@@ -366,7 +378,7 @@ def _reconcile_update(self, topic, delta, w_id):
366378 f'The reconcile update coroutine { w_id } { topic } '
367379 f'took too long, cancelling the subscription/sync.'
368380 )
369- future .cancel ()
381+ task .cancel ()
370382 except Exception as exc :
371383 self .log .exception (exc )
372384
0 commit comments