Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import argparse
import asyncio
import time

import json
import signal
Expand Down Expand Up @@ -424,6 +425,26 @@ async def server_worker_start(

with storage.open(get_global_log_path(), "a") as global_log:
global_log.write(f"Model loaded successfully: {model_name}\n")

try:
job = job_get(job_id)
experiment_id = job.get("experiment_id")
await job_update_status(job_id=job_id, status="RUNNING", experiment_id=experiment_id)
except Exception:
# best effort only
pass
try:
from lab import Job # noqa: used for manipulating job_data directly

j = Job.get(job_id)
jd = j.get_job_data() or {}
tail = jd.get("tail", [])
tail.append(f"{time.strftime('%Y-%m-%d %H:%M:%S')} | INFO | launcher | Process started, waiting for readiness")
jd["tail"] = tail
j.set_job_data(jd)
except Exception:
pass

return {"status": "success", "job_id": job_id}


Expand Down
29 changes: 27 additions & 2 deletions api/transformerlab/shared/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,39 @@ async def async_run_python_daemon_and_update_status(
with storage.open(pid_file, "w") as f:
f.write(str(pid))

# Mark job as RUNNING immediately after process starts so frontend sees progress
try:
job = job_service.job_get(job_id)
experiment_id = job.get("experiment_id")
await job_update_status(job_id=job_id, status="RUNNING", experiment_id=experiment_id)
except Exception:
# best effort only
pass

# keep a tail of recent lines so we can show them on failure:
recent_lines = deque(maxlen=10)
recent_lines = deque(maxlen=50)
last_update_time = 0.0

line = await process.stdout.readline()
error_msg = None
while line:
decoded = line.decode()
recent_lines.append(decoded.strip())

try:
now = time.time()
if now - last_update_time >= 1.0:
try:
job = Job.get(job_id)
job_data = job.get_job_data() or {}
job_data["tail"] = list(recent_lines)
job.set_job_data(job_data)
except Exception:
# Best-effort only; don't fail the loop
pass
last_update_time = now
except Exception:
pass
# If we hit the begin_string then the daemon is started and we can return!
if begin_string in decoded:
if set_process_id_function is not None:
Expand All @@ -288,7 +312,8 @@ async def async_run_python_daemon_and_update_status(
print(f"Worker job {job_id} started successfully")
job = job_service.job_get(job_id)
experiment_id = job["experiment_id"]
await job_update_status(job_id=job_id, status="COMPLETE", experiment_id=experiment_id)

await job_update_status(job_id=job_id, status="RUNNING", experiment_id=experiment_id)

# Schedule the read_process_output coroutine in the current event
# so we can keep watching this process, but return back to the caller
Expand Down
Loading
Loading