diff --git a/api.py b/api.py index 800985714..05ff5c3e8 100644 --- a/api.py +++ b/api.py @@ -5,6 +5,7 @@ import os import argparse import asyncio +import time import json import signal @@ -421,6 +422,26 @@ async def server_worker_start( with storage.open(get_global_log_path(), "a") as global_log: global_log.write(f"Model loaded successfully: {model_name}\n") + + try: + job = job_get(job_id) + experiment_id = job.get("experiment_id") + await job_update_status(job_id=job_id, status="RUNNING", experiment_id=experiment_id) + except Exception: + # best effort only + pass + try: + from lab import Job # noqa: used for manipulating job_data directly + + j = Job.get(job_id) + jd = j.get_job_data() or {} + tail = jd.get("tail", []) + tail.append(f"{time.strftime('%Y-%m-%d %H:%M:%S')} | INFO | launcher | Process started, waiting for readiness") + jd["tail"] = tail + j.set_job_data(jd) + except Exception: + pass + return {"status": "success", "job_id": job_id} diff --git a/transformerlab/shared/shared.py b/transformerlab/shared/shared.py index 8403ee6dd..37ccad0fd 100644 --- a/transformerlab/shared/shared.py +++ b/transformerlab/shared/shared.py @@ -271,8 +271,18 @@ async def async_run_python_daemon_and_update_status( with storage.open(pid_file, "w") as f: f.write(str(pid)) + # Mark job as RUNNING immediately after process starts so frontend sees progress + try: + job = job_service.job_get(job_id) + experiment_id = job.get("experiment_id") + await job_update_status(job_id=job_id, status="RUNNING", experiment_id=experiment_id) + except Exception: + # best effort only + pass + # keep a tail of recent lines so we can show them on failure: - recent_lines = deque(maxlen=10) + recent_lines = deque(maxlen=50) + last_update_time = 0.0 line = await process.stdout.readline() error_msg = None @@ -280,6 +290,20 @@ async def async_run_python_daemon_and_update_status( decoded = line.decode() recent_lines.append(decoded.strip()) + try: + now = time.time() + if now - last_update_time >= 1.0: + try: + job = Job.get(job_id) + job_data = job.get_job_data() or {} + job_data["tail"] = list(recent_lines) + job.set_job_data(job_data) + except Exception: + # Best-effort only; don't fail the loop + pass + last_update_time = now + except Exception: + pass # If we hit the begin_string then the daemon is started and we can return! if begin_string in decoded: if set_process_id_function is not None: @@ -288,7 +312,8 @@ async def async_run_python_daemon_and_update_status( print(f"Worker job {job_id} started successfully") job = job_service.job_get(job_id) experiment_id = job["experiment_id"] - await job_update_status(job_id=job_id, status="COMPLETE", experiment_id=experiment_id) + + await job_update_status(job_id=job_id, status="RUNNING", experiment_id=experiment_id) # Schedule the read_process_output coroutine in the current event # so we can keep watching this process, but return back to the caller