1+
2+ from abc import ABC , abstractmethod
3+ from pathlib import Path
4+ from dataclasses import dataclass , field , fields
5+ from enum import Enum , unique
6+ from geos .trame .app .ui .simulationStatusView import SimulationStatus
7+ from typing import Callable , Optional
8+ import datetime
9+ from trame_server .core import Server
10+ from trame_server .state import State
11+
12+ #TODO move outside
13+ @dataclass (frozen = True )
14+ class SimulationConstant :
15+ SIMULATION_GEOS_PATH = "/some/path/"
16+ SIMULATION_MACHINE_NAME = "p4log01" # Only run on P4 machine
17+
18+
19+ @unique
20+ class SlurmJobStatus (Enum ):
21+ PENDING = "PD"
22+ RUNNING = "R"
23+ COMPLETING = "CG"
24+ COMPLETED = "CD"
25+ SUSPENDED = "S"
26+ UNKNOWN = "UNKNOWN"
27+
28+ @classmethod
29+ def from_string (cls , job_str ) -> "SlurmJobStatus" :
30+ try :
31+ return cls (job_str )
32+ except ValueError :
33+ return cls .UNKNOWN
34+
35+ # TODO: dataclass_json
36+ # @dataclass_json
37+ @dataclass
38+ class SimulationInformation :
39+ pass
40+
41+ def get_simulation_status (
42+ self ,
43+ get_running_user_jobs_f : Callable [[], list [tuple [str , SlurmJobStatus ]]],
44+ ) -> SimulationStatus :
45+ """
46+ Returns the simulation status given the current Jobs running for the current user.
47+ Only runs the callback if the timeseries file is not already present in the done directory.
48+ """
49+ if not self .geos_job_id :
50+ return SimulationStatus .NOT_RUN
51+
52+ done_sim_path = self .get_simulation_dir (SimulationStatus .DONE )
53+ if self .get_timeseries_path (done_sim_path ).exists ():
54+ return SimulationStatus .DONE
55+
56+ user_jobs = get_running_user_jobs_f ()
57+ if (self .geos_job_id , SlurmJobStatus .RUNNING ) in user_jobs :
58+ return SimulationStatus .RUNNING
59+
60+ if (self .geos_job_id , SlurmJobStatus .COMPLETING ) in user_jobs :
61+ return SimulationStatus .COMPLETING
62+
63+ if (self .copy_back_job_id , SlurmJobStatus .RUNNING ) in user_jobs :
64+ return SimulationStatus .COPY_BACK
65+
66+ if (self .copy_job_id , SlurmJobStatus .RUNNING ) in user_jobs :
67+ return SimulationStatus .SCHEDULED
68+
69+ return SimulationStatus .UNKNOWN
70+
71+ @dataclass
72+ class LauncherParams :
73+ simulation_files_path : Optional [str ] = None
74+ simulation_cmd_filename : Optional [str ] = None
75+ simulation_job_name : Optional [str ] = None
76+ simulation_nb_process : int = 1
77+
78+ @classmethod
79+ def from_server_state (cls , server_state : State ) -> "LauncherParams" :
80+ state = cls ()
81+ for f in fields (cls ):
82+ setattr (state , f .name , server_state [f .name ])
83+ return state
84+
85+ def is_complete (self ) -> bool :
86+ return None not in [getattr (self , f .name ) for f in fields (self )]
87+
88+ def assert_is_complete (self ) -> None :
89+ if not self .is_complete ():
90+ raise RuntimeError (f"Incomplete simulation launch parameters : { self } ." )
91+
92+
93+ def get_timestamp () -> str :
94+ return datetime .utcnow ().strftime ("%Y-%m-%d_%H-%M-%S.%f" )[:- 3 ]
95+
96+
97+ def get_simulation_output_file_name (timestamp : str , user_name : str = "user_name" ):
98+ return f"{ user_name } _{ timestamp } .json"
99+
100+
101+ def parse_launcher_output (output : str ) -> SimulationInformation :
102+ split_output = output .split ("\n " )
103+
104+ information = SimulationInformation ()
105+ information_dict = information .to_dict () # type: ignore
106+
107+ content_to_parse = [
108+ ("Working directory: " , "working_directory" ),
109+ ("1. copy job id: " , "copy_job_id" ),
110+ ("2. geos job id: " , "geos_job_id" ),
111+ ("3. copy back job id: " , "copy_back_job_id" ),
112+ ("Run directory: " , "run_directory" ),
113+ ]
114+
115+ for line in split_output :
116+ for info_tuple in content_to_parse :
117+ if info_tuple [0 ] in line :
118+ split_line = line .split (info_tuple [0 ])
119+ if len (split_line ) < 2 :
120+ continue
121+ information_dict [info_tuple [1 ]] = split_line [- 1 ]
122+
123+ information_dict ["timestamp" ] = get_timestamp ()
124+ return SimulationInformation .from_dict (information_dict ) # type: ignore
125+
126+
127+ # def write_simulation_information_to_repo(info: SimulationInformation, sim_info_path: Path) -> Optional[Path]:
128+ # return write_file(
129+ # sim_info_path.as_posix(),
130+ # get_simulation_output_file_name(info.timestamp, info.user_igg),
131+ # json.dumps(info.to_dict()), # type: ignore
132+ # )
133+
134+
135+ ##TODO yay slurm
136+ def get_launcher_command (launcher_params : LauncherParams ) -> str :
137+ launcher_cmd_args = (
138+ f"{ SimulationConstant .SIMULATION_GEOS_PATH } "
139+ f"--nprocs { launcher_params .simulation_nb_process } "
140+ f"--fname { launcher_params .simulation_cmd_filename } "
141+ f"--job_name { launcher_params .simulation_job_name } "
142+ )
143+
144+ # state.simulation_nb_process is supposed to be an integer, but the UI present a VTextField,
145+ # so if user changes it, then it can be defined as a str
146+ if int (launcher_params .simulation_nb_process ) > 1 :
147+ launcher_cmd_args += " --partition"
148+ return launcher_cmd_args
149+
150+
151+ # def get_simulation_screenshot_timestep(filename: str) -> int:
152+ # """
153+ # From a given file name returns the time step.
154+ # Filename is defined as: RenderView0_000000.png with 000000 the time step to parse and return
155+ # """
156+ # if not filename:
157+ # print("Simulation filename is not defined")
158+ # return -1
159+
160+ # pattern = re.compile(r"RenderView[0-9]_[0-9]{6}\.png", re.IGNORECASE)
161+ # if pattern.match(filename) is None:
162+ # print("Simulation filename does not match the pattern: RenderView0_000000.png")
163+ # return -1
164+
165+ # timestep = os.path.splitext(filename)[0].split("_")[-1]
166+ # return int(timestep) if timestep else -1
167+
168+
169+ # def get_most_recent_file_from_list(files_list: list[str]) -> Optional[str]:
170+ # if not files_list:
171+ # return None
172+ # return max(files_list, key=get_simulation_screenshot_timestep)
173+
174+
175+ # def get_most_recent_simulation_screenshot(folder_path: Path) -> Optional[str]:
176+ # return get_most_recent_file_from_list(os.listdir(folder_path)) if folder_path.exists() else None
177+
178+
179+ class ISimRunner (ABC ):
180+ """
181+ Abstract interface for sim runner.
182+ Provides methods to trigger simulation, get simulation output path and knowing if simulation is done or not.
183+ """
184+
185+ @abstractmethod
186+ def launch_simulation (self , launcher_params : LauncherParams ) -> tuple [Path , SimulationInformation ]:
187+ pass
188+
189+ @abstractmethod
190+ def get_user_igg (self ) -> str :
191+ pass
192+
193+ @abstractmethod
194+ def get_running_user_jobs (self ) -> list [tuple [str , SlurmJobStatus ]]:
195+ pass
196+
197+
198+ class SimRunner (ISimRunner ):
199+ """
200+ Runs sim on HPC
201+ """
202+ pass
0 commit comments