Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions admin/backend/tasks/callbacks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import os
import shutil


def new_site_failure_callback(meta: dict):
"""Cleanup site from the directory to ensure reader picksup correct sites"""
site_name = meta["command_argv"][4]
site_path = os.path.join(meta["bench_root"], "sites", site_name)
shutil.rmtree(site_path, ignore_errors=True)
16 changes: 15 additions & 1 deletion admin/backend/tasks/manager/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import json
import os
import pickle
import secrets
import signal
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import TypedDict

from bench_cli.exceptions import TaskNotFoundError, TaskNotRunningError

Expand All @@ -33,11 +35,16 @@
}


class TaskCallbacks(TypedDict):
on_success: callable | None
on_failure: callable | None


class TaskRunner:
def __init__(self, bench_root: Path) -> None:
self._bench_root = bench_root

def run(self, command: str, args: dict) -> str:
def run(self, command: str, args: dict, callbacks: TaskCallbacks | None = None) -> str:
command_argv = self._build_argv(command, args)
task_id = self._generate_task_id()
task_dir = self._task_dir(task_id)
Expand All @@ -51,10 +58,17 @@ def run(self, command: str, args: dict) -> str:
"started_at": datetime.now(timezone.utc).isoformat(),
"finished_at": None,
"exit_code": None,
"bench_root": str(self._bench_root),
}
(task_dir / "meta.json").write_text(json.dumps(meta, indent=2))
(task_dir / "status").write_text("running")

if callbacks:
if on_success := callbacks.get("on_success"):
(task_dir / "on_success.bin").write_bytes(pickle.dumps(on_success))
if on_failure := callbacks.get("on_failure"):
(task_dir / "on_failure.bin").write_bytes(pickle.dumps(on_failure))

process = subprocess.Popen(
[sys.executable, "-m", "admin.backend.tasks.manager.wrapper", str(task_dir)],
start_new_session=True,
Expand Down
26 changes: 25 additions & 1 deletion admin/backend/tasks/manager/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,35 @@

This module uses only the standard library — no cli imports.
"""

import json
import pickle
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path


def callback_handler(callback_bin_path: Path, output_log: Path, meta: dict) -> None:
callback = pickle.loads(callback_bin_path.read_bytes())
callback_bin_path.unlink()
with open(output_log, "a") as log_file:
try:
callback(meta)
log_file.write("\nCallback successfully triggered")
except Exception as error:
log_file.write(f"\nCallback failed: {error!s}\n")


def main() -> None:
task_dir = Path(sys.argv[1])
meta = json.loads((task_dir / "meta.json").read_text())
on_success_bin = task_dir / "on_success.bin"
on_failure_bin = task_dir / "on_failure.bin"

bench_root = task_dir.parent.parent
# frappe's bench CLI (env/bin/bench) loads apps.txt from the current
# directory using sites_path=".", so cwd must be the sites/ subdirectory.
bench_root = Path(meta["bench_root"])
sites_dir = bench_root / "sites"
cwd = str(sites_dir) if sites_dir.is_dir() else str(bench_root)

Expand All @@ -30,6 +45,15 @@ def main() -> None:
stderr=subprocess.STDOUT,
)

if result.returncode == 0 and on_success_bin.exists():
callback_handler(on_success_bin, task_dir / "output.log", meta=meta)
elif result.returncode != 0 and on_failure_bin.exists():
callback_handler(on_failure_bin, task_dir / "output.log", meta=meta)

for leftover in (on_success_bin, on_failure_bin):
if leftover.exists():
leftover.unlink()

meta["finished_at"] = datetime.now(timezone.utc).isoformat()
meta["exit_code"] = result.returncode
(task_dir / "meta.json").write_text(json.dumps(meta, indent=2))
Expand Down
11 changes: 9 additions & 2 deletions admin/backend/views/sites.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

from flask import Blueprint, current_app, jsonify, request

from admin.backend.tasks.callbacks import new_site_failure_callback
from admin.backend.tasks.manager.task_runner import TaskRunner

from ..readers.app_reader import AppReader
from ..readers.site_reader import SiteReader
from admin.backend.tasks.manager.task_runner import TaskRunner

sites_bp = Blueprint("sites", __name__)

Expand Down Expand Up @@ -66,7 +68,11 @@ def create():
return jsonify({"ok": False, "error": f"Site '{name}' already exists."})

try:
task_id = TaskRunner(bench_root).run("new-site", {"name": name, "admin_password": admin_password})
task_id = TaskRunner(bench_root).run(
"new-site",
{"name": name, "admin_password": admin_password},
callbacks={"on_failure": new_site_failure_callback},
)
except Exception as e:
return jsonify({"ok": False, "error": f"Could not start new-site: {e}"})

Expand Down Expand Up @@ -176,6 +182,7 @@ def login_to_site(name: str):

import http.client
import urllib.parse

from bench_cli.config.bench_config import BenchConfig

try:
Expand Down
2 changes: 1 addition & 1 deletion bench_cli/managers/supervisor_process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,14 @@ def _render_program(self, pd: ProcessDefinition, safe_name: str) -> str:

def _prod_process_definitions(self) -> list[ProcessDefinition]:
"""Process definitions for production (no dev processes)."""
from bench_cli.managers.process_manager import ProcessDefinition
defs = [
self._web_definition(),
self._socketio_definition(),
self._admin_definition(),
*self._worker_definitions("default", self.bench.config.workers.default_count),
*self._worker_definitions("short", self.bench.config.workers.short_count),
*self._worker_definitions("long", self.bench.config.workers.long_count),
*[pd for entry in self.bench.config.workers.custom for pd in self._worker_definitions(entry.queue, entry.count)],
]
if self.bench.config.redis.is_single_instance:
defs.append(self._redis_definition("redis", "redis.conf"))
Expand Down
Loading