11import asyncio
2+ import time
3+ from typing import Any , Callable , Dict , Optional
4+
25import parsl
36from parsl .app .app import python_app
4- from typing import Callable , Dict , Any
57from parsl .config import Config
8+
69from flowgentic .backend_engines .base import BaseEngine
710
811
912class ParslEngine (BaseEngine ):
10- def __init__ (self , config : Config = None ):
13+ def __init__ (
14+ self ,
15+ config : Config = None ,
16+ observer : Optional [Callable [[Dict [str , Any ]], None ]] = None ,
17+ ):
18+ super ().__init__ (observer = observer )
1119 parsl .load (config )
1220
13- self ._task_registry = {}
21+ self ._task_registry : Dict [ str , Any ] = {}
1422
1523 def _make_parsl_app (self , func : Callable ):
1624 if asyncio .iscoroutinefunction (func ):
@@ -21,15 +29,48 @@ def wrapper(*args, **kwargs):
2129 return python_app (wrapper )
2230 return python_app (func )
2331
24- async def execute_tool (self , func : Callable , * args , ** kwargs ) -> Dict [str , Any ]:
25- if func .__name__ not in self ._task_registry :
26- self ._task_registry [func .__name__ ] = self ._make_parsl_app (func )
32+ async def execute_tool (
33+ self ,
34+ func : Callable ,
35+ * args ,
36+ task_kwargs : Optional [Dict [str , Any ]] = None ,
37+ invocation_id : Optional [str ] = None ,
38+ ** kwargs ,
39+ ) -> Dict [str , Any ]:
40+ task_name = getattr (func , "__name__" , str (func ))
41+
42+ # Track whether the task app was already cached
43+ cache_hit = task_name in self ._task_registry
44+ if not cache_hit :
45+ self ._task_registry [task_name ] = self ._make_parsl_app (func )
2746
28- task_app = self ._task_registry [func .__name__ ]
47+ task_app = self ._task_registry [task_name ]
48+
49+ # Ts_resolve_end: Task descriptor resolved, about to enter Parsl
50+ self .emit (
51+ {
52+ "event" : "tool_resolve_end" ,
53+ "ts" : time .perf_counter (),
54+ "tool_name" : task_name ,
55+ "invocation_id" : invocation_id ,
56+ "cache_hit" : cache_hit ,
57+ }
58+ )
2959
3060 future = task_app (* args , ** kwargs )
61+ result = await asyncio .to_thread (future .result )
62+
63+ # Ts_collect_start: Result received from Parsl
64+ self .emit (
65+ {
66+ "event" : "tool_collect_start" ,
67+ "ts" : time .perf_counter (),
68+ "tool_name" : task_name ,
69+ "invocation_id" : invocation_id ,
70+ }
71+ )
3172
32- return await asyncio . to_thread ( future . result )
73+ return result
3374
3475 def wrap_node (self , node_func : Callable ):
3576 node_app = self ._make_parsl_app (node_func )
0 commit comments