diff --git a/foambench_main.py b/foambench_main.py index 1aee818..8646d2e 100644 --- a/foambench_main.py +++ b/foambench_main.py @@ -35,6 +35,18 @@ def parse_args(): default=None, help="Path to custom mesh file (e.g., .msh, .stl, .obj). If not provided, no custom mesh will be used." ) + parser.add_argument( + '--dataset_log_path', + type=str, + default="", + help="Path to per-case dataset.jsonl for fine-tuning data extraction." + ) + parser.add_argument( + '--case_id', + type=str, + default="", + help="Case identifier, e.g. 'Basic/Cavity/1' or 'Advanced/Cavity_LES'." + ) return parser.parse_args() def run_command(command_str): @@ -76,6 +88,10 @@ def main(): main_cmd = f"python src/main.py --prompt_path='{args.prompt_path}' --output_dir='{args.output}'" if args.custom_mesh_path: main_cmd += f" --custom_mesh_path='{args.custom_mesh_path}'" + if args.dataset_log_path: + main_cmd += f" --dataset_log_path='{args.dataset_log_path}'" + if args.case_id: + main_cmd += f" --case_id='{args.case_id}'" print(f"Main workflow command: {main_cmd}") diff --git a/src/config.py b/src/config.py index 651697b..463f1fe 100644 --- a/src/config.py +++ b/src/config.py @@ -23,6 +23,8 @@ class Config: # If set, InputWriter will check // first. # When present, it will copy into the current case_dir and skip LLM generation. reuse_generated_dir: str = "" + dataset_log_path: str = "" + case_id: str = "" # LLM backend: # - "openai": OpenAI Platform usage-based (API key) # - "openai-codex": ChatGPT/Codex subscription sign-in (Codex auth cache) diff --git a/src/main.py b/src/main.py index f44496c..db214a3 100644 --- a/src/main.py +++ b/src/main.py @@ -12,13 +12,15 @@ from nodes.input_writer_node import input_writer_node from nodes.local_runner_node import local_runner_node from nodes.reviewer_node import reviewer_node +from nodes.restore_best_node import restore_best_node from nodes.visualization_node import visualization_node from nodes.hpc_runner_node import hpc_runner_node from router_func import ( route_after_planner, route_after_input_writer, route_after_runner, - route_after_reviewer + route_after_reviewer, + route_after_restore_best, ) from logger import close_logging import json @@ -36,8 +38,9 @@ def create_foam_agent_graph() -> StateGraph: workflow.add_node("local_runner", local_runner_node) workflow.add_node("hpc_runner", hpc_runner_node) workflow.add_node("reviewer", reviewer_node) + workflow.add_node("restore_best", restore_best_node) workflow.add_node("visualization", visualization_node) - + # Add edges workflow.add_edge(START, "planner") workflow.add_conditional_edges("planner", route_after_planner) @@ -46,6 +49,7 @@ def create_foam_agent_graph() -> StateGraph: workflow.add_conditional_edges("hpc_runner", route_after_runner) workflow.add_conditional_edges("local_runner", route_after_runner) workflow.add_conditional_edges("reviewer", route_after_reviewer) + workflow.add_conditional_edges("restore_best", route_after_restore_best) workflow.add_edge("visualization", END) return workflow @@ -92,7 +96,9 @@ def initialize_state(user_requirement: str, config: Config, custom_mesh_path: Op job_id=None, cluster_info=None, slurm_script_path=None, - termination_reason=None + termination_reason=None, + best_case_snapshot_dir=None, + best_progress_score=None, ) if custom_mesh_path: print(f"{custom_mesh_path}") @@ -164,7 +170,19 @@ def main(user_requirement: str, config: Config, custom_mesh_path: Optional[str] "If a file exists at //, Foam-Agent will copy it into the current output and skip generation for that file." ), ) - + parser.add_argument( + "--dataset_log_path", + type=str, + default="", + help="Path to per-case dataset.jsonl for fine-tuning data extraction.", + ) + parser.add_argument( + "--case_id", + type=str, + default="", + help="Case identifier, e.g. 'Basic/Cavity/1' or 'Advanced/Cavity_LES'.", + ) + args = parser.parse_args() print(f"args: {args}") @@ -178,7 +196,16 @@ def main(user_requirement: str, config: Config, custom_mesh_path: Optional[str] if args.reuse_generated_dir: config.reuse_generated_dir = args.reuse_generated_dir - + if args.dataset_log_path: + config.dataset_log_path = args.dataset_log_path + if args.case_id: + config.case_id = args.case_id + + # Sync global LLM service with CLI-provided dataset_log_path/case_id + from services import global_llm_service + global_llm_service.dataset_log_path = config.dataset_log_path + global_llm_service.case_id = config.case_id + with open(args.prompt_path, 'r') as f: user_requirement = f.read() diff --git a/src/nodes/input_writer_node.py b/src/nodes/input_writer_node.py index 76f9237..6fd3aa3 100644 --- a/src/nodes/input_writer_node.py +++ b/src/nodes/input_writer_node.py @@ -6,19 +6,8 @@ from typing import List from pydantic import BaseModel, Field -# System prompts for different modes -INITIAL_WRITE_SYSTEM_PROMPT = ( - "You are an expert in OpenFOAM simulation and numerical modeling." - f"Your task is to generate a complete and functional file named: {{file_name}} within the {{folder_name}} directory. " - "Ensure all required values are present and match with the files content already generated." - "Before finalizing the output, ensure:\n" - "- All necessary fields exist (e.g., if `nu` is defined in `constant/transportProperties`, it must be used correctly in `0/U`).\n" - "- Cross-check field names between different files to avoid mismatches.\n" - "- Ensure units and dimensions are correct** for all physical variables.\n" - f"- Ensure case solver settings are consistent with the user's requirements. Available solvers are: {{case_solver}}.\n" - "Provide only the code—no explanations, comments, or additional text." -) - + + def parse_allrun(text: str) -> str: match = re.search(r'```(.*?)```', text, re.DOTALL) @@ -56,17 +45,17 @@ def _rewrite_mode(state): print("No review analysis available for rewrite mode.") print("") return state - out = rewrite_files( + + return rewrite_files( case_dir=state["case_dir"], error_logs=state.get("error_logs", []), review_analysis=state.get("review_analysis", ""), - rewrite_plan=state.get("rewrite_plan"), + rewrite_plan=None, user_requirement=state.get("user_requirement", ""), foamfiles=state.get("foamfiles"), dir_structure=state.get("dir_structure", {}), + loop_count=state.get("loop_count", 0), ) - print("") - return out def _initial_write_mode(state): """ @@ -92,6 +81,9 @@ def _initial_write_mode(state): # Build Allrun via service mesh_type = state.get("mesh_type") mesh_commands = state.get("mesh_commands") or [] + advice = state.get("similar_case_advice") + advice_d = advice.model_dump() if hasattr(advice, "model_dump") else (advice if isinstance(advice, dict) else {}) + pre_solver_steps = advice_d.get("pre_solver_steps") if advice_d else None allrun_out = build_allrun( case_dir=state["case_dir"], database_path=config.database_path, @@ -101,6 +93,7 @@ def _initial_write_mode(state): allrun_reference=state["allrun_reference"], mesh_type=mesh_type, mesh_commands=mesh_commands, + pre_solver_steps=pre_solver_steps, ) print("") diff --git a/src/nodes/planner_node.py b/src/nodes/planner_node.py index e147f41..f63749f 100644 --- a/src/nodes/planner_node.py +++ b/src/nodes/planner_node.py @@ -57,26 +57,42 @@ def planner_node(state): subtasks = plan_data["subtasks"] similar_case_advice = plan_data.get("similar_case_advice") - # Handle case directory creation/cleanup + print(f"Parsed case name: {case_name}") + print(f"Parsed case domain: {case_domain}") + print(f"Parsed case category: {case_category}") + print(f"Parsed case solver: {case_solver}") + print(f"Created case directory: {case_dir}") + print(f"Retrieved similar case structure: {dir_structure_reference}") + print(f"Generated {len(subtasks)} subtasks.") + if similar_case_advice: + print(f"Similar case advice: {similar_case_advice}") + + # Handle case directory creation/cleanup. + # Preserve dataset_log_path across rmtree when it lives inside case_dir, + # so plan-phase LLM records (already written this run) aren't destroyed. + dataset_log_path = getattr(config, "dataset_log_path", "") or "" + preserved_log_bytes = None + if dataset_log_path and os.path.exists(case_dir) and os.path.exists(dataset_log_path): + try: + rel = os.path.relpath(os.path.abspath(dataset_log_path), os.path.abspath(case_dir)) + if not rel.startswith("..") and not os.path.isabs(rel): + with open(dataset_log_path, "rb") as _f: + preserved_log_bytes = _f.read() + except Exception as _e: + print(f"Warning: failed to snapshot dataset_log_path before rmtree: {_e}") if os.path.exists(case_dir): print(f"Warning: Case directory {case_dir} already exists. Overwriting.") shutil.rmtree(case_dir) os.makedirs(case_dir) + if preserved_log_bytes is not None: + os.makedirs(os.path.dirname(dataset_log_path), exist_ok=True) + with open(dataset_log_path, "wb") as _f: + _f.write(preserved_log_bytes) + # Initialize logging now that case_dir exists setup_logging(case_dir) - print("") - print(f"{case_name}") - print(f"{case_domain}") - print(f"{case_category}") - print(f"{case_solver}") - print(f"{case_dir}") - print(f"{dir_structure_reference}") - print(f"{len(subtasks)} subtasks generated.") - if similar_case_advice: - print(f"{similar_case_advice}") - # Save reference file save_file(case_path_reference, f"{faiss_detailed}\n\n\n{allrun_reference}") diff --git a/src/nodes/restore_best_node.py b/src/nodes/restore_best_node.py new file mode 100644 index 0000000..da1751d --- /dev/null +++ b/src/nodes/restore_best_node.py @@ -0,0 +1,19 @@ +import os +import shutil + + +def restore_best_node(state): + """Restore the best-snapshot case directory when the review loop exits at max_loop.""" + snap = state.get("best_case_snapshot_dir") + if snap and os.path.exists(snap): + case_dir = state["case_dir"] + if os.path.exists(case_dir): + shutil.rmtree(case_dir) + shutil.copytree(snap, case_dir) + print( + f"Restored best snapshot from {snap} " + f"(score={state.get('best_progress_score')})" + ) + else: + print("No best snapshot to restore.") + return {} diff --git a/src/nodes/reviewer_node.py b/src/nodes/reviewer_node.py index 1cd626f..0c49689 100644 --- a/src/nodes/reviewer_node.py +++ b/src/nodes/reviewer_node.py @@ -1,51 +1,76 @@ # reviewer_node.py -from pydantic import BaseModel, Field -from typing import List -from services.review import review_error_logs, generate_rewrite_plan +import os +import glob +import re +import shutil +from services.review import review_error_logs from logger import log_review +def _compute_progress_score(case_dir: str, error_logs: list) -> float: + """Highest timestep reached across log.* files; fall back to -len(error_logs).""" + best_t = None + for log_path in glob.glob(os.path.join(case_dir, "log.*")): + try: + with open(log_path, errors="ignore") as f: + for line in f: + m = re.match(r"^Time = ([\d.eE+\-]+)", line) + if m: + t = float(m.group(1)) + if best_t is None or t > best_t: + best_t = t + except Exception: + pass + return float(best_t) if best_t is not None else -len(error_logs) + + def reviewer_node(state): - """ - Reviewer node: Reviews the error logs and provides analysis and suggestions - for fixing the errors. This node only focuses on analysis, not file modification. - """ + """Reviewer node: single-call review (FA 1.1.0 style) + best-state snapshot.""" print("") if len(state["error_logs"]) == 0: print("No error to review.") print("") return state - # Log error logs to review.log log_review(str(state["error_logs"]), "error_logs") - # Stateless review via service + case_dir = state["case_dir"] + error_logs = state.get("error_logs", []) + loop_count = state.get("loop_count", 0) history_text = state.get("history_text") or [] + + # Best-state snapshot before this loop's rewrite can regress it + snapshot_updates = {} + score = _compute_progress_score(case_dir, error_logs) + best_score = state.get("best_progress_score") + if best_score is None: + best_score = float("-inf") + if score > best_score: + snap = case_dir.rstrip("/") + "_best" + if os.path.exists(snap): + shutil.rmtree(snap) + shutil.copytree(case_dir, snap) + print(f"progress={score:.4g} > {best_score:.4g}, saved to {snap}") + snapshot_updates = {"best_case_snapshot_dir": snap, "best_progress_score": score} + review_content, updated_history = review_error_logs( - tutorial_reference=state.get('tutorial_reference', ''), - foamfiles=state.get('foamfiles'), - error_logs=state.get('error_logs'), - user_requirement=state.get('user_requirement', ''), - similar_case_advice=state.get('similar_case_advice'), + tutorial_reference=state.get("tutorial_reference", ""), + foamfiles=state.get("foamfiles"), + error_logs=error_logs, + user_requirement=state.get("user_requirement", ""), + similar_case_advice=state.get("similar_case_advice"), history_text=history_text, + loop_count=loop_count, ) log_review(review_content, "review_analysis") - rewrite_plan = generate_rewrite_plan( - foamfiles=state.get('foamfiles'), - error_logs=state.get('error_logs', []), - review_analysis=review_content, - user_requirement=state.get('user_requirement', ''), - ) - log_review(str(rewrite_plan), "rewrite_plan") - print("") return { + **snapshot_updates, "history_text": updated_history, "review_analysis": review_content, - "rewrite_plan": rewrite_plan, - "loop_count": state.get("loop_count", 0) + 1, + "loop_count": loop_count + 1, "input_writer_mode": "rewrite", } diff --git a/src/router_func.py b/src/router_func.py index 05174f5..d661e5b 100644 --- a/src/router_func.py +++ b/src/router_func.py @@ -1,3 +1,4 @@ +import os from typing import TypedDict, List, Optional from config import Config from utils import LLMService, GraphState @@ -25,7 +26,7 @@ def llm_requires_custom_mesh(state: GraphState) -> int: "If the user explicitly mentions or implies they want to use a custom mesh file, return 'custom_mesh'. " "If they want to use standard OpenFOAM mesh generation (blockMesh, snappyHexMesh with STL, etc.), return 'standard_mesh'. " "Look for keywords like gmsh and determine if they want to create mesh using gmsh. If they want to create mesh using gmsh, return 'gmsh_mesh'. " - "Be conservative - if unsure, assume 'standard_mesh' unless clearly specified otherwise." + "Be conservative - if unsure, assume 'standard_mesh' unless clearly specified otherwise. " "Only return 'custom_mesh' or 'standard_mesh' or 'gmsh_mesh'. Don't return anything else." ) @@ -155,13 +156,17 @@ def route_after_reviewer(state: GraphState): loop_count = state.get("loop_count", 0) max_loop = state["config"].max_loop if loop_count >= max_loop: - print(f"Maximum loop count ({max_loop}) reached. Ending workflow.") + print(f"Maximum loop count ({max_loop}) reached.") state["termination_reason"] = "max_review_loop_reached" - requires_visualization = state.get("requires_visualization") - if requires_visualization is None: - requires_visualization = llm_requires_visualization(state) - state["requires_visualization"] = requires_visualization - return "visualization" if requires_visualization else END + return "restore_best" print(f"Loop {loop_count}: Continuing to fix errors.") return "input_writer" + + +def route_after_restore_best(state: GraphState): + requires_visualization = state.get("requires_visualization") + if requires_visualization is None: + requires_visualization = llm_requires_visualization(state) + state["requires_visualization"] = requires_visualization + return "visualization" if requires_visualization else END diff --git a/src/services/input_writer.py b/src/services/input_writer.py index dfa2780..15ed5e8 100644 --- a/src/services/input_writer.py +++ b/src/services/input_writer.py @@ -103,14 +103,14 @@ def _report_progress(current: int, total: int, message: str) -> None: # System prompt for file generation INITIAL_WRITE_SYSTEM_PROMPT = ( - "You are an expert in OpenFOAM simulation and numerical modeling." - f"Your task is to generate a complete and functional file named: {{file_name}} within the {{folder_name}} directory. " - "Ensure all required values are present and match with the files content already generated." + "You are an expert in OpenFOAM simulation and numerical modeling. " + "Your task is to generate a complete and functional file named: {file_name} within the {folder_name} directory. " + "Ensure all required values are present and match with the files content already generated. " "Before finalizing the output, ensure:\n" "- All necessary fields exist (e.g., if `nu` is defined in `constant/transportProperties`, it must be used correctly in `0/U`).\n" "- Cross-check field names between different files to avoid mismatches.\n" - "- Ensure units and dimensions are correct** for all physical variables.\n" - f"- Ensure case solver settings are consistent with the user's requirements. Available solvers are: {{case_solver}}.\n" + "- Ensure units and dimensions are correct for all physical variables.\n" + "- Ensure case solver settings are consistent with the user's requirements. Available solvers are: {case_solver}.\n" "Provide only the code—no explanations, comments, or additional text." ) @@ -121,30 +121,21 @@ def _build_prompts(file_name: str, folder_name: str, written_files_ctx: List[Foa case_solver=case_solver, ) - advice_text = "" - if isinstance(similar_case_advice, dict): - advice_text = ( - f"Similar case match level: {similar_case_advice.get('match_level')}\n" - f"Use scope: {similar_case_advice.get('use_scope')}\n" - f"Advice: {similar_case_advice.get('advice')}\n" - ) - elif similar_case_advice: - advice_text = str(similar_case_advice) - similar_ref_block = ( - f"Refer to the following similar case file content if helpful:\n{tutorial_reference}\n" + f"Refer to the following similar case file content to ensure the generated file aligns with the user requirement:\n" + f"{tutorial_reference}\n" + "Similar case reference is always correct. If you find the user requirement is very consistent with the similar case reference, " + "you should use the similar case reference as the template to generate the file. " + "Just modify the necessary parts to make the file complete and functional.\n" if tutorial_reference else "No suitable similar case was found for this domain.\n" ) code_user_prompt = ( f"User requirement: {user_requirement}\n" f"{similar_ref_block}" - f"{advice_text}" - "If the similar case is a weak match, do not copy it blindly. Use it only where it is consistent with the user requirement. " - "Just modify the necessary parts to make the file complete and functional." - "Please ensure that the generated file is complete, functional, and logically sound." - "Additionally, apply your domain expertise to verify that all numerical values are consistent with the user's requirements, maintaining accuracy and coherence." - "When generating controlDict, do not include anything to preform post processing. Just include the necessary settings to run the simulation." + "Please ensure that the generated file is complete, functional, and logically sound. " + "Additionally, apply your domain expertise to verify that all numerical values are consistent with the user's requirements, maintaining accuracy and coherence. " + "When generating controlDict, do not include anything to perform post processing. Just include the necessary settings to run the simulation." ) if generation_mode == "sequential_dependency" and written_files_ctx: @@ -177,14 +168,16 @@ def _generate_one(subtask: Dict[str, str], written_files_ctx: List[FoamfilePydan code_user_prompt, code_system_prompt = _build_prompts(file_name, folder_name, written_files_ctx) + log_ctx = {"step": "initial_write", "substep": "generate_file", + "file_target": f"{folder_name}/{file_name}"} if generation_mode == "parallel_no_context": # Avoid shared global LLM instance in parallel mode. from utils import LLMService from config import Config llm = LLMService(Config()) - generation_response = llm.invoke(code_user_prompt, code_system_prompt) + generation_response = llm.invoke(code_user_prompt, code_system_prompt, log_context=log_ctx) else: - generation_response = global_llm_service.invoke(code_user_prompt, code_system_prompt) + generation_response = global_llm_service.invoke(code_user_prompt, code_system_prompt, log_context=log_ctx) code_context = parse_context(generation_response) save_file(file_path, code_context) @@ -263,6 +256,7 @@ def build_allrun( mesh_type: str, mesh_commands: List[str], user_requirement: str = "", + pre_solver_steps: Optional[List[str]] = None, progress_callback: Optional[Callable[[int, int, str], None]] = None, progress_offset: int = 0, total_steps: int = 0, @@ -351,13 +345,15 @@ class CommandsPydantic(BaseModel): f"Case directory structure: {dir_structure}\n" f"User case information: {case_info}\n" f"Reference Allrun scripts from similar cases: {allrun_reference}\n" - "Generate only the required OpenFOAM command list—no extra text." + + (f"Required pre-solver steps (must be included in this order before the solver): {pre_solver_steps}\n" if pre_solver_steps else "") + + "Generate only the required OpenFOAM command list—no extra text." ) if mesh_type == "custom_mesh": command_user_prompt += f"{mesh_commands_info}\n" - command_response = global_llm_service.invoke(command_user_prompt, command_system_prompt, pydantic_obj=CommandsPydantic) + command_response = global_llm_service.invoke(command_user_prompt, command_system_prompt, pydantic_obj=CommandsPydantic, + log_context={"step": "initial_write", "substep": "allrun_commands"}) if progress_callback: try: @@ -380,12 +376,12 @@ class CommandsPydantic(BaseModel): # Allrun generation system prompt allrun_system_prompt = ( - "You are an expert in OpenFOAM. Generate an Allrun script based on the provided details." + "You are an expert in OpenFOAM. Generate an Allrun script based on the provided details. " f"Available commands with descriptions: {commands_help}\n\n" f"Reference Allrun scripts from similar cases: {allrun_reference}\n\n" "If custom mesh commands are provided, make sure to include them in the appropriate order in the Allrun script. " - "CRITICAL: Do not include any post processing commands in the Allrun script." - "CRITICAL: Do not include any commands to convert mesh to foam format like gmshToFoam or others." + "CRITICAL: Do not include any post processing commands in the Allrun script. " + "CRITICAL: Do not include any commands to convert mesh to foam format like gmshToFoam or others. " ) if mesh_type == "custom_mesh": @@ -395,14 +391,14 @@ class CommandsPydantic(BaseModel): allrun_user_prompt = ( f"User requirement: {user_requirement}\n" f"Case directory structure: {dir_structure}\n" - f"User case infomation: {case_info}\n" + f"User case information: {case_info}\n" f"{mesh_commands_info}\n" "All run scripts for these similar cases are for reference only and may not be correct, as you might be a different case solver or have a different directory structure. " "You need to rely on your OpenFOAM and physics knowledge to discern this, and pay more attention to user requirements, " - "as your ultimate goal is to fulfill the user's requirements and generate an allrun script that meets those requirements." - "CRITICAL: Do not include any post processing commands in the Allrun script." - "CRITICAL: Do not include any commands to convert mesh to foam format like gmshToFoam or others." - "CRITICAL: Do not include any commands that run gmsh to create the mesh." + "as your ultimate goal is to fulfill the user's requirements and generate an allrun script that meets those requirements. " + "CRITICAL: Do not include any post processing commands in the Allrun script. " + "CRITICAL: Do not include any commands to convert mesh to foam format like gmshToFoam or others. " + "CRITICAL: Do not include any commands that run gmsh to create the mesh. " "Generate the Allrun script strictly based on the above information. Do not include explanations, comments, or additional text. Put the code in ``` tags." ) @@ -410,7 +406,8 @@ class CommandsPydantic(BaseModel): allrun_user_prompt += "CRITICAL: Do not include any other mesh commands other than the custom mesh commands.\n" allrun_user_prompt += "CRITICAL: Do not include any gmshToFoam commands in the Allrun script." - allrun_response = global_llm_service.invoke(allrun_user_prompt, allrun_system_prompt) + allrun_response = global_llm_service.invoke(allrun_user_prompt, allrun_system_prompt, + log_context={"step": "initial_write", "substep": "allrun_script"}) if progress_callback: try: @@ -421,7 +418,7 @@ class CommandsPydantic(BaseModel): allrun_script = parse_allrun(allrun_response) allrun_file_path = os.path.join(case_dir, "Allrun") save_file(allrun_file_path, allrun_script) - + return {"allrun_path": allrun_file_path, "allrun_script": allrun_script, "commands": command_response.commands} @@ -430,10 +427,11 @@ def rewrite_files( case_dir: str, error_logs: List[str], review_analysis: str, - rewrite_plan: Optional[Dict[str, Any]], user_requirement: str, + rewrite_plan: Optional[Dict[str, Any]] = None, # unused; kept for backwards-compat foamfiles: Optional[Any] = None, - dir_structure: Optional[Dict[str, List[str]]] = None + dir_structure: Optional[Dict[str, List[str]]] = None, + loop_count: int = 0 ) -> Dict[str, Any]: """ Rewrite OpenFOAM files based on error analysis and reviewer suggestions. @@ -499,33 +497,26 @@ def rewrite_files( rewrite_system_prompt = ( "You are an expert in OpenFOAM simulation and numerical modeling. " - "Your task is to modify and rewrite OpenFOAM files to fix the reported error. " + "Your task is to modify and rewrite the necessary OpenFOAM files to fix the reported error. " "Please do not propose solutions that require modifying any parameters declared in the user requirement, try other approaches instead. " - "You will receive a rewrite_plan. Follow it strictly: only modify files listed in rewrite_plan.target_files and apply only the requested changes. " - "Do not modify files outside the plan. " - "Return the complete, corrected file contents in JSON format: " + "The user will provide the error content, error command, reviewer's suggestions, and all relevant foam files. " + "Only return files that require rewriting, modification, or addition; do not include files that remain unchanged. " + "Return the complete, corrected file contents in the following JSON format: " "list of foamfile: [{file_name: 'file_name', folder_name: 'folder_name', content: 'content'}]. " - "Ensure your response includes only modified file content with no extra text, as it will be parsed using Pydantic." + "Ensure your response includes only the modified file content with no extra text, as it will be parsed using Pydantic." ) rewrite_user_prompt = ( f"{str(foamfiles)}\n" f"{error_logs}\n" - f"{review_analysis}\n" - f"{rewrite_plan}\n\n" + f"{review_analysis}\n\n" f"{user_requirement}\n\n" - "Please update OpenFOAM files according to rewrite_plan only. " - "Only include files from rewrite_plan.target_files in your output." + "Please update the relevant OpenFOAM files to resolve the reported errors, ensuring that all modifications strictly adhere to the specified formats. Ensure all modifications adhere to user requirement." ) - response = global_llm_service.invoke(rewrite_user_prompt, rewrite_system_prompt, pydantic_obj=FoamPydantic) - - allowed_files = set() - if rewrite_plan and isinstance(rewrite_plan, dict): - for item in rewrite_plan.get("target_files", []): - file_path = item.get("file") if isinstance(item, dict) else None - if file_path: - allowed_files.add(file_path.strip().lstrip("./")) + response = global_llm_service.invoke(rewrite_user_prompt, rewrite_system_prompt, pydantic_obj=FoamPydantic, + log_context={"step": "rewrite", "substep": "rewrite_files", + "loop_iteration": loop_count}) # Prepare updated structures updated_dir = dict(dir_structure) if dir_structure else {} @@ -534,11 +525,6 @@ def rewrite_files( foamfiles_list = list(foamfiles.list_foamfile) for foamfile in response.list_foamfile: - rel_path = os.path.join(foamfile.folder_name, foamfile.file_name).replace('\\', '/').lstrip('./') - if allowed_files and rel_path not in allowed_files: - print(f"Warning: Skipping unplanned rewrite file: {rel_path}") - continue - file_path = os.path.join(case_dir, foamfile.folder_name, foamfile.file_name) os.makedirs(os.path.dirname(file_path), exist_ok=True) save_file(file_path, foamfile.content) diff --git a/src/services/plan.py b/src/services/plan.py index a1376f4..4bb2ce9 100644 --- a/src/services/plan.py +++ b/src/services/plan.py @@ -62,14 +62,15 @@ def parse_requirement_to_case_info(user_requirement: str, case_stats: Dict[str, >>> print(f"Case: {result['case_name']}, Solver: {result['case_solver']}") """ parse_system_prompt = ( - "Please transform the following user requirement into a standard case description using a structured format." - "The key elements should include case name, case domain, case category, and case solver." + "Please transform the following user requirement into a standard case description using a structured format. " + "The key elements should include case name, case domain, case category, and case solver. " f"Note: case domain must be one of {case_stats.get('case_domain', [])}." f"Note: case category must be one of {case_stats.get('case_category', [])}." f"Note: case solver must be one of {case_stats.get('case_solver', [])}." ) parse_user_prompt = f"User requirement: {user_requirement}." - res = global_llm_service.invoke(parse_user_prompt, parse_system_prompt, pydantic_obj=CaseSummaryModel) + res = global_llm_service.invoke(parse_user_prompt, parse_system_prompt, pydantic_obj=CaseSummaryModel, + log_context={"step": "plan", "substep": "parse_case_info"}) return { "case_name": res.case_name.replace(" ", "_"), "case_domain": res.case_domain, @@ -123,10 +124,17 @@ def resolve_case_dir( return os.path.join(base_dir, case_name) +class PerFileAdvice(BaseModel): + folder_name: str = Field(description="folder containing the file, e.g. system") + file_name: str = Field(description="file name, e.g. fvSolution") + classification: str = Field(description="one of: reuse, modify, present_but_unused, write_fresh") + delta: Optional[str] = Field(default=None, description="for 'modify' only: specific changes to apply") + + class SimilarCaseAdviceModel(BaseModel): - match_level: str = Field(description="high/medium/low/none") - use_scope: str = Field(description="short guidance about which files can use the reference") - advice: str = Field(description="one-sentence advice to include in prompts") + pre_solver_steps: List[str] = Field(description="commands to run before the solver, e.g. blockMesh, potentialFoam") + solver: str = Field(description="OpenFOAM solver, e.g. simpleFoam") + per_file: List[PerFileAdvice] = Field(description="per-file classification for each subtask") def _log_top3(label: str, items: List[Dict[str, Any]]) -> None: @@ -140,12 +148,14 @@ def _log_top3(label: str, items: List[Dict[str, Any]]) -> None: def _rerank_candidates( candidates: List[Dict[str, Any]], case_solver: str, + case_domain: str, ) -> List[Dict[str, Any]]: def key(item: Dict[str, Any]) -> tuple: solver_match = 1 if item.get("case_solver") == case_solver else 0 + domain_match = 1 if item.get("case_domain") == case_domain else 0 score = item.get("score") score_val = 0.0 if score is None else float(score) - return (-solver_match, score_val) + return (-domain_match, -solver_match, score_val) return sorted(candidates, key=key) @@ -155,32 +165,42 @@ def _build_advice( case_info: str, selected: Optional[Dict[str, Any]], candidates: List[Dict[str, Any]], + faiss_detailed: str, + subtasks: List[Dict[str, str]], ) -> SimilarCaseAdviceModel: - cand_lines = [ - f"- {c.get('case_name')} | {c.get('case_domain')} | {c.get('case_category')} | {c.get('case_solver')} | score={c.get('score')}" - for c in candidates[:5] - ] - cand_block = "\n".join(cand_lines) if cand_lines else "(none)" - - selected_line = ( - f"{selected.get('case_name')} | {selected.get('case_domain')} | {selected.get('case_category')} | {selected.get('case_solver')} | score={selected.get('score')}" - if selected else "(none)" - ) + if not faiss_detailed or not subtasks: + return SimilarCaseAdviceModel( + pre_solver_steps=[], + solver="", + per_file=[ + PerFileAdvice(folder_name=s["folder_name"], file_name=s["file_name"], classification="write_fresh") + for s in subtasks + ], + ) + + subtask_lines = "\n".join(f" {s['folder_name']}/{s['file_name']}" for s in subtasks) sys_prompt = ( - "You are a CFD expert. Based on the user requirement and the retrieved similar cases, " - "produce a concise usage guidance. If no suitable case is available, set match_level to 'none' " - "and advise not to rely on similar case templates." + "You are a CFD expert. Analyze the reference tutorial and provide per-file guidance for generating a new OpenFOAM simulation. " + "For each file in the subtask list, examine the tutorial content and classify how it should be used:\n" + " reuse: tutorial file is a close match — use it directly with only user-requirement-driven changes\n" + " modify: tutorial file has the right structure but needs targeted changes — describe those changes in delta\n" + " present_but_unused: tutorial has this file but it does not apply to the user's case — generate from scratch\n" + " write_fresh: tutorial has no useful version of this file — generate entirely from scratch\n" + "Also identify pre_solver_steps (e.g. blockMesh, potentialFoam) and solver from the tutorial Allrun and user requirement. " + "Output strict JSON only." ) user_prompt = ( f"User requirement:\n{user_requirement}\n\n" f"Case info:\n{case_info}\n\n" - f"Selected similar case:\n{selected_line}\n\n" - f"Top candidates:\n{cand_block}\n\n" - "Return JSON with keys: match_level (high/medium/low/none), use_scope, advice." + f"Reference tutorial:\n{faiss_detailed}\n\n" + f"Files to generate:\n{subtask_lines}\n\n" + "Return JSON with keys: pre_solver_steps (list of strings), solver (string), " + "per_file (list of {folder_name, file_name, classification, delta})." ) - return global_llm_service.invoke(user_prompt, sys_prompt, pydantic_obj=SimilarCaseAdviceModel) + return global_llm_service.invoke(user_prompt, sys_prompt, pydantic_obj=SimilarCaseAdviceModel, + log_context={"step": "plan", "substep": "build_advice"}) def retrieve_references(case_name: str, @@ -188,37 +208,49 @@ def retrieve_references(case_name: str, case_domain: str, case_category: str, searchdocs: int = 2, - user_requirement: str = "") -> Tuple[str, str, str, str, SimilarCaseAdviceModel]: + user_requirement: str = "") -> Tuple[str, str, str, str, Optional[Dict[str, Any]], List[Dict[str, Any]]]: # Build case_info case_info = f"case name: {case_name}\ncase domain: {case_domain}\ncase category: {case_category}\ncase solver: {case_solver}" print("Retrieval query:\n" + case_info) - recall_k = max(10, int(searchdocs)) + recall_k = int(searchdocs) faiss_structure_all = retrieve_faiss("openfoam_tutorials_structure", case_info, topk=recall_k) print(f"Retrieved {len(faiss_structure_all)} candidates from FAISS.") - # Hard constraint: domain must match - domain_matched = [c for c in faiss_structure_all if c.get("case_domain") == case_domain] - _log_top3("Domain-matched structure candidates", domain_matched) - - if not domain_matched: - print(f"No suitable similar case found under domain={case_domain}.") - advice = _build_advice(user_requirement, case_info, None, faiss_structure_all) - return "", "", "", "", advice - - # Rerank by solver match, then semantic score - ranked = _rerank_candidates(domain_matched, case_solver) + # Stash top-3 tutorial-structure similarity scores + case names on the + # LLM service so every downstream invoke() logs them into the JSONL. + try: + top3 = faiss_structure_all[:3] + global_llm_service.retrieval_top3_scores = [float(c.get("score")) for c in top3] + global_llm_service.retrieval_top3_cases = [str(c.get("case_name", "")) for c in top3] + except Exception as _e: + print(f"Warning: failed to stash top-3 retrieval metadata: {_e}") + + if not faiss_structure_all: + print("No similar cases returned by FAISS retrieval.") + return "", "", "", "", None, [] + + ranked = faiss_structure_all + _log_top3("Structure candidates (FAISS order)", ranked) selected = ranked[0] - # Use details from the same candidate (no re-query on structure text) - faiss_detailed = selected.get("full_content", "") + # Restore FA 1.1.0's two-step retrieval (structure -> details). The + # structure entry's full_content is used as the query text for the + # details lookup so both queries anchor on the same case. The + # details-index full_content contains the block with + # every file's body inside , which is what the writer + # prompt's is meant to carry. + faiss_structure = selected.get("full_content", "") + faiss_structure = re.sub(r"\n{3}", "\n", faiss_structure) + faiss_detailed = retrieve_faiss( + "openfoam_tutorials_details", faiss_structure, topk=searchdocs + )[0]["full_content"] faiss_detailed = re.sub(r"\n{3}", "\n", faiss_detailed) m = re.search(r"(.*?)", faiss_detailed, re.DOTALL) if not m: print("Warning: No directory_structure found in selected similar case details.") - advice = _build_advice(user_requirement, case_info, selected, ranked) - return "", "", "", "", advice + return "", "", "", "", selected, ranked dir_structure = m.group(1).strip() dir_counts = parse_directory_structure(dir_structure) dir_counts_str = ',\n'.join([f"There are {count} files in Directory: {directory}" for directory, count in dir_counts.items()]) @@ -230,8 +262,7 @@ def retrieve_references(case_name: str, for idx, item in enumerate(faiss_allrun): allrun_reference += f"{item['full_content']}\n\n\n" - advice = _build_advice(user_requirement, case_info, selected, ranked) - return faiss_detailed, dir_structure, dir_counts_str, allrun_reference, advice + return faiss_detailed, dir_structure, dir_counts_str, allrun_reference, selected, ranked def decompose_to_subtasks(user_requirement: str, dir_structure: str, dir_counts_str: str) -> List[Dict]: @@ -241,20 +272,21 @@ def decompose_to_subtasks(user_requirement: str, dir_structure: str, dir_counts_ "For each subtask, identify the file name of the OpenFOAM input file (foamfile) and the corresponding folder name where it should be stored. " "Your final output must strictly follow the JSON schema below and include no additional keys or information:\n\n" "```\n{\n \"subtasks\": [\n {\n \"file_name\": \"\",\n \"folder_name\": \"\"\n }\n // ... more subtasks\n ]\n}\n```\n\n" - "Make sure that your output is valid JSON and strictly adheres to the provided schema." + "Make sure that your output is valid JSON and strictly adheres to the provided schema. " "Make sure you generate all the necessary files for the user's requirements." ) decompose_user_prompt = ( f"User Requirement: {user_requirement}\n\n" f"Reference Directory Structure (similar case): {dir_structure}\n\n{dir_counts_str}\n\n" - "Make sure you generate all the necessary files for the user's requirements." - "Do not include any gmsh files like .geo etc. in the subtasks." - "Only include blockMesh or snappyHexMesh if the user hasnt requested for gmsh mesh or user isnt using an external uploaded custom mesh" + "Make sure you generate all the necessary files for the user's requirements. " + "Do not include any gmsh files like .geo etc. in the subtasks. " + "Only include blockMesh or snappyHexMesh if the user hasn't requested for gmsh mesh or user isn't using an external uploaded custom mesh. " "Please generate the output as structured JSON." ) - res = global_llm_service.invoke(decompose_user_prompt, decompose_system_prompt, pydantic_obj=OpenFOAMPlanModel) + res = global_llm_service.invoke(decompose_user_prompt, decompose_system_prompt, pydantic_obj=OpenFOAMPlanModel, + log_context={"step": "plan", "substep": "decompose_subtasks"}) return [{"file_name": s.file_name, "folder_name": s.folder_name} for s in res.subtasks] @@ -309,7 +341,7 @@ def generate_simulation_plan( ) # Step 3: Retrieve references - faiss_detailed, dir_structure, dir_counts_str, allrun_reference, advice = retrieve_references( + faiss_detailed, dir_structure, dir_counts_str, allrun_reference, selected, ranked = retrieve_references( case_name=case_name, case_solver=case_solver, case_domain=case_domain, @@ -317,9 +349,13 @@ def generate_simulation_plan( searchdocs=searchdocs, user_requirement=user_requirement, ) - + # Step 4: Decompose to subtasks subtasks = decompose_to_subtasks(user_requirement, dir_structure, dir_counts_str) + + # Step 5: Build per-file advice (needs both tutorial bodies and the actual subtask list) + case_info_str = f"case name: {case_name}\ncase domain: {case_domain}\ncase category: {case_category}\ncase solver: {case_solver}" + advice = _build_advice(user_requirement, case_info_str, selected, ranked, faiss_detailed, subtasks) if len(subtasks) == 0: raise ValueError("Failed to generate subtasks.") diff --git a/src/services/review.py b/src/services/review.py index d09fa62..bd75615 100644 --- a/src/services/review.py +++ b/src/services/review.py @@ -1,25 +1,15 @@ from typing import List, Optional, Tuple, Any -from pydantic import BaseModel, Field from . import global_llm_service -class PlannedFileChange(BaseModel): - file: str = Field(description="Relative file path, e.g. system/fvSchemes or 0/U") - changes: str = Field(description="Semicolon-separated concrete changes for this file") - - -class RewritePlan(BaseModel): - target_files: List[PlannedFileChange] = Field(description="Files to modify and required changes") - - REVIEWER_SYSTEM_PROMPT = ( "You are an expert in OpenFOAM simulation and numerical modeling. " "Your task is to review the provided error logs and diagnose the underlying issues. " - "You will be provided with a similar case reference, which is a list of similar cases that are ordered by similarity. You can use this reference to help you understand the user requirement and the error." + "You will be provided with a similar case reference, which is a list of similar cases that are ordered by similarity. You can use this reference to help you understand the user requirement and the error. " "When an error indicates that a specific keyword is undefined (for example, 'div(phi,(p|rho)) is undefined'), your response must propose a solution that simply defines that exact keyword as shown in the error log. " "Do not reinterpret or modify the keyword (e.g., do not treat '|' as 'or'); instead, assume it is meant to be taken literally. " "Propose ideas on how to resolve the errors, but do not modify any files directly. " - "Please do not propose solutions that require modifying any parameters declared in the user requirement, try other approaches instead. Do not ask the user any questions." + "Please do not propose solutions that require modifying any parameters declared in the user requirement, try other approaches instead. Do not ask the user any questions. " "The user will supply all relevant foam files along with the error logs, and within the logs, you will find both the error content and the corresponding error command indicated by the log file name." ) @@ -31,29 +21,21 @@ def review_error_logs( user_requirement: str, similar_case_advice: Optional[Any] = None, history_text: Optional[List[str]] = None, + loop_count: int = 0, ) -> Tuple[str, List[str]]: """Stateless reviewer: returns (review_analysis, updated_history).""" advice_text = "" - if isinstance(similar_case_advice, dict): - advice_text = ( - f"\n" - f"match_level: {similar_case_advice.get('match_level')}\n" - f"use_scope: {similar_case_advice.get('use_scope')}\n" - f"advice: {similar_case_advice.get('advice')}\n" - f"\n" - ) - elif similar_case_advice: - advice_text = f"{similar_case_advice}\n" + updated_history = list(history_text) if history_text else [] - if history_text: + if updated_history: reviewer_user_prompt = ( f"{tutorial_reference}\n" f"{advice_text}" f"{str(foamfiles)}\n" f"{error_logs}\n" - f"\n{chr(10).join(history_text)}\n\n\n" + f"\n{chr(10).join(updated_history)}\n\n\n" f"{user_requirement}\n\n" - f"I have modified the files according to your previous suggestions. If the error persists, please provide further guidance. Make sure your suggestions adhere to user requirements and do not contradict it. Also, please consider the previous attempts and try a different approach." + f"I have modified the files according to your previous suggestions. If the error persists, please provide further guidance. Make sure your suggestions adhere to user requirements and do not contradict them. Also, please consider the previous attempts and try a different approach." ) else: reviewer_user_prompt = ( @@ -62,15 +44,17 @@ def review_error_logs( f"{str(foamfiles)}\n" f"{error_logs}\n" f"{user_requirement}\n" - "Please review the error logs and provide guidance on how to resolve the reported errors. Make sure your suggestions adhere to user requirements and do not contradict it." + "Please review the error logs and provide guidance on how to resolve the reported errors. Make sure your suggestions adhere to user requirements and do not contradict them." ) - review_response = global_llm_service.invoke(reviewer_user_prompt, REVIEWER_SYSTEM_PROMPT) + review_response = global_llm_service.invoke(reviewer_user_prompt, REVIEWER_SYSTEM_PROMPT, + log_context={"step": "review_analysis", "substep": "error_analysis", + "loop_iteration": loop_count}) review_content = review_response - updated_history = list(history_text) if history_text else [] + attempt_num = sum(1 for item in updated_history if item.startswith("\n", + f"\n", f"\n{error_logs}\n", f"\n{review_content}\n", f"\n", @@ -78,39 +62,3 @@ def review_error_logs( updated_history.extend(current_attempt) return review_content, updated_history - -def generate_rewrite_plan( - foamfiles: Any, - error_logs: List[str], - review_analysis: str, - user_requirement: str, -) -> dict: - """Generate a minimal, explicit rewrite plan for downstream rewrite step.""" - planner_system_prompt = ( - "You are an OpenFOAM debugging planner. " - "Given current foam files, error logs and reviewer analysis, create a minimal rewrite plan. " - "Output MUST be strict JSON only, with this exact schema: " - "{\"target_files\": [{\"file\": \"relative/path\", \"changes\": \"change1; change2\"}]}. " - "Rules: " - "1) Do not use markdown, backticks, or comments. " - "2) Use double quotes for all strings. " - "3) In changes, use short plain text actions separated by semicolons. " - "4) Do not include parentheses, backticks, or quote characters inside changes text. " - "5) Do not include run steps; only file edits." - ) - - planner_user_prompt = ( - f"{str(foamfiles)}\n" - f"{error_logs}\n" - f"{review_analysis}\n" - f"{user_requirement}\n" - "Return strict JSON now with key target_files only." - ) - - response = global_llm_service.invoke( - planner_user_prompt, - planner_system_prompt, - pydantic_obj=RewritePlan, - ) - return response.model_dump() - diff --git a/src/services/run_hpc.py b/src/services/run_hpc.py index 0210531..eca3634 100644 --- a/src/services/run_hpc.py +++ b/src/services/run_hpc.py @@ -22,16 +22,15 @@ def create_slurm_script(case_dir: str, cluster_info: dict) -> str: system_prompt = ( "You are an expert in HPC cluster job submission and SLURM scripting. " "Create a complete SLURM script for running OpenFOAM simulations. " - "The script should include:" - "1. Proper SLURM directives (#SBATCH) based on the cluster information provided" - "2. Do not load openfoam" - "3. Load libaraies for openfoam for run in parallel" - "4. Directory navigation and execution of the Allrun script" - "5. Error handling and status reporting" - "6. Any cluster-specific optimizations or requirements" - "7. Use your understanding of the documentation of the cluster and figure out the syntax of their jobscript." - "" - "Return ONLY the complete SLURM script content. Do not include any explanations or markdown formatting." + "The script should include: " + "1. Proper SLURM directives (#SBATCH) based on the cluster information provided. " + "2. Do not load openfoam. " + "3. Load libraries for openfoam for run in parallel. " + "4. Directory navigation and execution of the Allrun script. " + "5. Error handling and status reporting. " + "6. Any cluster-specific optimizations or requirements. " + "7. Use your understanding of the documentation of the cluster and figure out the syntax of their jobscript. " + "Return ONLY the complete SLURM script content. Do not include any explanations or markdown formatting. " "Make sure the script is executable and follows best practices for the specified cluster." ) @@ -86,18 +85,17 @@ def create_slurm_script_with_error_context(case_dir: str, cluster_info: dict, er system_prompt = ( "You are an expert in HPC cluster job submission and SLURM scripting. " "Create a complete SLURM script for running OpenFOAM simulations. " - "The script should include:" - "1. Proper SLURM directives (#SBATCH) based on the cluster information provided" - "2. Do not load OpenFOAM" - "3. Load libaraies for openfoam for run in parallel" - "4. Directory navigation and execution of the Allrun script" - "5. Error handling and status reporting" - "6. Any cluster-specific optimizations or requirements" - "7. Use your understanding of the documentation of the cluster and figure out the syntax of their jobscript." - "" + "The script should include: " + "1. Proper SLURM directives (#SBATCH) based on the cluster information provided. " + "2. Do not load OpenFOAM. " + "3. Load libraries for openfoam for run in parallel. " + "4. Directory navigation and execution of the Allrun script. " + "5. Error handling and status reporting. " + "6. Any cluster-specific optimizations or requirements. " + "7. Use your understanding of the documentation of the cluster and figure out the syntax of their jobscript. " "If a previous script and error message are provided, analyze the error and the script " - "to identify what went wrong and fix it. Common issues to consider:" - "- Invalid account numbers or partitions" + "to identify what went wrong and fix it. Common issues to consider: " + "- Invalid account numbers or partitions. " "- Insufficient resources (memory, time, nodes)" "- Missing modules or environment variables" "- Incorrect file paths or permissions" diff --git a/src/utils.py b/src/utils.py index e5d64ec..7f55369 100644 --- a/src/utils.py +++ b/src/utils.py @@ -416,6 +416,7 @@ def _load_codex_oauth(self) -> tuple[str, Optional[str]]: 1) $CODEX_HOME/auth.json (Codex CLI file-based cache) 2) ~/.codex/auth.json (Codex CLI default) 3) ~/.clawdbot/agents/main/agent/auth-profiles.json (Clawdbot OpenAI-Codex OAuth cache) + 4) ~/.openclaw/agents/main/agent/auth-profiles.json (OpenClaw OpenAI-Codex OAuth cache) Note: These files contain access/refresh tokens. Treat them like passwords. """ @@ -437,6 +438,16 @@ def _load_codex_oauth(self) -> tuple[str, Optional[str]]: / "auth-profiles.json" ) + # OpenClaw stores the OpenAI-Codex OAuth profile here. + candidates.append( + Path.home() + / ".openclaw" + / "agents" + / "main" + / "agent" + / "auth-profiles.json" + ) + for p in candidates: if not p.exists(): continue @@ -462,7 +473,9 @@ def __init__(self, config: object): self.temperature = getattr(config, "temperature", 0) self.model_provider = getattr(config, "model_provider", "openai") self._config = config - + self.dataset_log_path = getattr(config, "dataset_log_path", "") or "" + self.case_id = getattr(config, "case_id", "") or "" + # Initialize statistics self.total_calls = 0 self.total_prompt_tokens = 0 @@ -596,11 +609,35 @@ def _handle_throttling_retry(self, error: Exception, retry_count: int, max_retri return retry_count - def invoke(self, - user_prompt: str, - system_prompt: Optional[str] = None, + def _log_dataset_record(self, log_context: dict, system_prompt: str, + user_prompt: str, response: str, + prompt_tokens: int, completion_tokens: int) -> None: + import json as _json + from datetime import datetime, timezone + record = { + "case_id": self.case_id, + "step": log_context.get("step", ""), + "substep": log_context.get("substep", ""), + "loop_iteration": log_context.get("loop_iteration", 0), + "file_target": log_context.get("file_target", ""), + "system_prompt": system_prompt or "", + "user_prompt": user_prompt, + "response": response, + "model": self.model_version, + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "timestamp": datetime.now(timezone.utc).isoformat(), + } + os.makedirs(os.path.dirname(self.dataset_log_path), exist_ok=True) + with open(self.dataset_log_path, "a") as f: + f.write(_json.dumps(record, ensure_ascii=False) + "\n") + + def invoke(self, + user_prompt: str, + system_prompt: Optional[str] = None, pydantic_obj: Optional[Type[BaseModel]] = None, - max_retries: int = 10) -> Any: + max_retries: int = 10, + log_context: Optional[dict] = None) -> Any: """ Invoke the LLM with the given prompts and return the response. @@ -651,7 +688,13 @@ def invoke(self, self.total_prompt_tokens += prompt_tokens self.total_completion_tokens += completion_tokens self.total_tokens += total_tokens - + + if log_context and self.dataset_log_path: + self._log_dataset_record( + log_context, system_prompt, user_prompt, + response_content, prompt_tokens, completion_tokens, + ) + return response except Exception as e: @@ -756,6 +799,9 @@ class GraphState(TypedDict): cluster_info: Optional[dict] slurm_script_path: Optional[str] termination_reason: Optional[str] + # Best-state snapshot for restore-on-max-loop + best_case_snapshot_dir: Optional[str] + best_progress_score: Optional[float] def tokenize(text: str) -> str: # Replace underscores with spaces