Skip to content

Commit

Permalink
Merge pull request #24 from slub/fix-jobinfo
Browse files Browse the repository at this point in the history
bug fixes and more
  • Loading branch information
SvenMarcus authored Apr 14, 2023
2 parents 8c33191 + 2342a45 commit 2fe3b2d
Show file tree
Hide file tree
Showing 17 changed files with 214 additions and 127 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ Variables:
currently: "$(MANAGER_KEY)"
- MANAGER_DATA host directory to mount into `/data` (shared with Manager)
currently: "$(MANAGER_DATA)"
- MANAGER_WORKFLOWS host directory to mount into `/workflows` (shared with Manager)
currently: "$(MANAGER_WORKFLOWS)"
- NETWORK Docker network to use (manage via "docker network")
currently: $(NETWORK)
- CONTROLLER_HOST network address for the Controller client
Expand All @@ -42,6 +44,7 @@ help: ; @eval "$$HELP"

MANAGER_KEY ?= $(firstword $(filter-out %.pub,$(wildcard $(HOME)/.ssh/id_*)))
MANAGER_DATA ?= $(CURDIR)
MANAGER_WORKFLOWS ?= $(CURDIR)
MONITOR_PORT_WEB ?= 5000
NETWORK ?= bridge
CONTROLLER_HOST ?= $(shell dig +short $$HOSTNAME)
Expand All @@ -54,6 +57,8 @@ run: $(DATA)
-p $(MONITOR_PORT_WEB):5000 \
-v ${MANAGER_KEY}:/id_rsa \
--mount type=bind,source=$(MANAGER_KEY),target=/id_rsa \
-v $(MANAGER_DATA):/data \
-v $(MANAGER_WORKFLOWS):/workflows \
-v shared:/run/lock/ocrd.jobs \
-e CONTROLLER=$(CONTROLLER_HOST):$(CONTROLLER_PORT_SSH) \
-e MONITOR_PORT_LOG=${MONITOR_PORT_LOG} \
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ In order to work properly, the following **environment variables** must be set:
| CONTROLLER_HOST | Hostname of the OCR-D Controller |
| CONTROLLER_PORT_SSH | Port on the OCR-D Controller host that allows a SSH connection |
| MANAGER_DATA | Path to the OCR-D workspaces on the host |
| MANAGER_WORKFLOWS | Path to the OCR-D workflows on the host |
| MANAGER_KEY | Path to a private key that can be used to authenticate with the OCR-D Controller |
| MONITOR_PORT_WEB | The port at which the OCR-D Monitor will be available on the host |
| MONITOR_PORT_LOG | The port at which the Dozzle logs will be available on the host |
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ services:

volumes:
- ${MANAGER_DATA}:/data
- ${MANAGER_WORKFLOWS}:/workflows
- ${MANAGER_KEY}:/id_rsa
- shared:/run/lock/ocrd.jobs

Expand Down
2 changes: 1 addition & 1 deletion init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export OCRD_LOGVIEW__PORT=$MONITOR_PORT_LOG
export OCRD_CONTROLLER__JOB_DIR=/run/lock/ocrd.jobs
export OCRD_CONTROLLER__HOST=$CONTROLLER_HOST
export OCRD_CONTROLLER__PORT=$CONTROLLER_PORT
export OCRD_CONTROLLER__USER=ocrd
export OCRD_CONTROLLER__USER=admin
export OCRD_CONTROLLER__KEYFILE=~/.ssh/id_rsa

cd /usr/local/ocrd-monitor
Expand Down
45 changes: 30 additions & 15 deletions ocrdmonitor/ocrdcontroller.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,67 @@
from __future__ import annotations

import sys
import logging
from pathlib import Path
from typing import Protocol
from ocrdmonitor import sshremote

from ocrdmonitor.ocrdjob import OcrdJob
from ocrdmonitor.processstatus import ProcessStatus
from ocrdmonitor.processstatus import ProcessStatus, ProcessState

if sys.version_info >= (3, 10):
from typing import TypeGuard
else:
from typing_extensions import TypeGuard


class ProcessQuery(Protocol):
def __call__(self, process_group: int) -> list[ProcessStatus]:
class RemoteServer(Protocol):
async def read_file(self, path: str) -> str:
...

async def process_status(self, process_group: int) -> list[ProcessStatus]:
...


class OcrdController:
def __init__(self, process_query: ProcessQuery, job_dir: Path) -> None:
self._process_query = process_query
def __init__(self, remote: RemoteServer, job_dir: Path) -> None:
self._remote = remote
self._job_dir = job_dir
logging.info(f"process_query: {remote}")
logging.info(f"job_dir: {job_dir}")

def get_jobs(self) -> list[OcrdJob]:
def is_ocrd_job(j: OcrdJob | None) -> TypeGuard[OcrdJob]:
return j is not None

job_candidates = [
self._try_parse(job_file.read_text())
self._try_parse(job_file)
for job_file in self._job_dir.iterdir()
if job_file.is_file()
]

return list(filter(is_ocrd_job, job_candidates))

def _try_parse(self, job_str: str) -> OcrdJob | None:
def _try_parse(self, job_file: Path) -> OcrdJob | None:
job_str = job_file.read_text()
try:
return OcrdJob.from_str(job_str)
except (ValueError, KeyError):
except (ValueError, KeyError) as e:
logging.warning(f"found invalid job file: {job_file.name} - {e}")
return None

def status_for(self, ocrd_job: OcrdJob) -> ProcessStatus | None:
if ocrd_job.pid is None:
async def status_for(self, ocrd_job: OcrdJob) -> ProcessStatus | None:
if ocrd_job.remotedir is None:
return None

process_statuses = self._process_query(ocrd_job.pid)
matching_statuses = (
status for status in process_statuses if status.pid == ocrd_job.pid
)
return next(matching_statuses, None)
pid = await self._remote.read_file(f"/data/{ocrd_job.remotedir}/ocrd.pid")
process_statuses = await self._remote.process_status(int(pid))

for status in process_statuses:
if status.state == ProcessState.RUNNING:
return status

if process_statuses:
return process_statuses[0]

return None
20 changes: 13 additions & 7 deletions ocrdmonitor/ocrdjob.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from __future__ import annotations

from datetime import datetime
from dataclasses import dataclass
from functools import cached_property
from pathlib import Path
from typing import Any, NamedTuple, Type
from typing import Any, Callable, NamedTuple, Type

_KEYMAP: dict[str, tuple[Type[int] | Type[str] | Type[Path], str]] = {
_KEYMAP: dict[str, tuple[Type[int] | Type[str] | Type[Path] | Callable[[str], datetime], str]] = {
"PID": (int, "pid"),
"RETVAL": (int, "return_code"),
"PROCESS_ID": (int, "process_id"),
"TASK_ID": (int, "task_id"),
"TIME_CREATED": (datetime.fromisoformat, "time_created"),
"TIME_TERMINATED": (datetime.fromisoformat, "time_terminated"),
"PROCESS_ID": (str, "process_id"),
"TASK_ID": (str, "task_id"),
"PROCESS_DIR": (Path, "processdir"),
"WORKDIR": (Path, "workdir"),
"WORKFLOW": (Path, "workflow_file"),
Expand All @@ -18,7 +21,7 @@
}


def _into_dict(content: str) -> dict[str, int | str | Path]:
def _into_dict(content: str) -> dict[str, int | str | Path | datetime]:
result_dict = {}
lines = content.splitlines()
for line in lines:
Expand All @@ -35,8 +38,8 @@ def _into_dict(content: str) -> dict[str, int | str | Path]:


class KitodoProcessDetails(NamedTuple):
process_id: int
task_id: int
process_id: str
task_id: str
processdir: Path


Expand All @@ -59,6 +62,9 @@ class OcrdJob:
pid: int | None = None
return_code: int | None = None

time_created: datetime | None = None
time_terminated: datetime | None = None

@classmethod
def from_str(cls, content: str) -> "OcrdJob":
"""
Expand Down
19 changes: 8 additions & 11 deletions ocrdmonitor/processstatus.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from __future__ import annotations

import subprocess
from dataclasses import dataclass
from datetime import timedelta
from enum import Enum

PS_CMD = "ps -g {} -o pid,state,%cpu,rss,cputime --no-headers"


class ProcessState(Enum):
# see ps(1)#PROCESS_STATE_CODES
RUNNING = "R"
SLEEPING = "S"
SLEEPIO = "D"
STOPPED = "T"
TRACING = "t"
ZOMBIE = "Z"
UNKNOWN = "?"

Expand All @@ -28,7 +28,11 @@ class ProcessStatus:
cpu_time: timedelta

@classmethod
def from_ps_output(cls, ps_output: str) -> list["ProcessStatus"]:
def shell_command(cls, pid: int) -> str:
return f"ps -s {pid} -o pid,state,%cpu,rss,cputime --no-headers"

@classmethod
def from_shell_output(cls, ps_output: str) -> list["ProcessStatus"]:
def is_error(lines: list[str]) -> bool:
return lines[0].startswith("error:")

Expand All @@ -49,13 +53,6 @@ def parse_line(line: str) -> "ProcessStatus":
return [parse_line(line) for line in lines]


def run(group: int) -> list[ProcessStatus]:
cmd = PS_CMD.format(group)
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)

return ProcessStatus.from_ps_output(result.stdout)


def _cpu_time_to_seconds(cpu_time: str) -> int:
hours, minutes, seconds, *_ = cpu_time.split(":")
return int(hours) * 3600 + int(minutes) * 60 + int(seconds)
2 changes: 1 addition & 1 deletion ocrdmonitor/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async def swallow_exceptions(request: Request, err: Exception) -> Response:
create_jobs(
templates,
OcrdController(
settings.ocrd_controller.process_query(),
settings.ocrd_controller.controller_remote(),
settings.ocrd_controller.job_dir,
),
)
Expand Down
16 changes: 12 additions & 4 deletions ocrdmonitor/server/jobs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from datetime import datetime, timezone
from dataclasses import dataclass
from typing import Iterable

Expand Down Expand Up @@ -42,19 +43,26 @@ def create_jobs(templates: Jinja2Templates, controller: OcrdController) -> APIRo
router = APIRouter(prefix="/jobs")

@router.get("/", name="jobs")
def jobs(request: Request) -> Response:
async def jobs(request: Request) -> Response:
jobs = controller.get_jobs()
running, completed = split_into_running_and_completed(jobs)

job_status = [controller.status_for(job) for job in running]
job_status = [await controller.status_for(job) for job in running]
running_jobs = wrap_in_running_job_type(running, job_status)

now = datetime.now(timezone.utc)
return templates.TemplateResponse(
"jobs.html.j2",
{
"request": request,
"running_jobs": running_jobs,
"completed_jobs": completed,
"running_jobs": sorted(
running_jobs,
key=lambda x: x.ocrd_job.time_created or now,
),
"completed_jobs": sorted(
completed,
key=lambda x: x.time_terminated or now,
),
},
)

Expand Down
10 changes: 5 additions & 5 deletions ocrdmonitor/server/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import asyncio
import atexit
from functools import partial
from pathlib import Path
from typing import Literal

Expand All @@ -13,8 +12,9 @@
OcrdBrowserFactory,
SubProcessOcrdBrowserFactory,
)
from ocrdmonitor.ocrdcontroller import ProcessQuery
from ocrdmonitor.sshps import process_status

from ocrdmonitor.ocrdcontroller import RemoteServer
from ocrdmonitor.sshremote import SSHRemote


class OcrdControllerSettings(BaseModel):
Expand All @@ -24,8 +24,8 @@ class OcrdControllerSettings(BaseModel):
port: int = 22
keyfile: Path = Path.home() / ".ssh" / "id_rsa"

def process_query(self) -> ProcessQuery:
return partial(process_status, self)
def controller_remote(self) -> RemoteServer:
return SSHRemote(self)


class OcrdLogViewSettings(BaseModel):
Expand Down
16 changes: 14 additions & 2 deletions ocrdmonitor/server/templates/jobs.html.j2
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,17 @@
{% endblock %}

{% block content %}
<script>
function killjob(pid) {
/* todo: log into the Manager and kill this process ... */
window.alert("Not implemented yet")
}
</script>
<h2 class="title">Active Jobs</h2>
<table id="running-jobs" class="table">
<thead>
<tr>
<th>TSTART</th>
<th>TASK ID</th>
<th>PROCESS ID</th>
<th>WORKFLOW</th>
Expand All @@ -21,19 +28,22 @@
<th>% CPU</th>
<th>MB RSS</th>
<th>DURATION</th>
<th>ACTION</th>
</tr>
</thead>
<tbody>
{% for job in running_jobs: %}
<tr>
<td>{{ job.ocrd_job.time_created }}</td>
<td>{{ job.ocrd_job.kitodo_details.task_id }}</td>
<td>{{ job.ocrd_job.kitodo_details.process_id }}</td>
<td><a href="#">{{ job.ocrd_job.workflow }}</a></td>
<td><a href="{{ url_for('workflows.detail', path=job.ocrd_job.workflow_file) }}">{{ job.ocrd_job.workflow }}</a></td>
<td>{{ job.process_status.pid }}</td>
<td>{{ job.process_status.state }}</td>
<td>{{ job.process_status.percent_cpu }}</td>
<td>{{ job.process_status.memory }}</td>
<td>{{ job.process_status.cpu_time }}</td>
<td><button onclick="killjob({{ job.process_status.pid }})">Kill!</button></td>
</tr>
{% endfor %}
</tbody>
Expand All @@ -42,6 +52,7 @@
<table id="completed-jobs" class="table">
<thead>
<tr>
<th>TSTOP</th>
<th>TASK ID</th>
<th>PROCESS ID</th>
<th>WORKFLOW</th>
Expand All @@ -53,9 +64,10 @@
<tbody>
{% for job in completed_jobs: %}
<tr>
<td>{{ job.time_terminated }}</td>
<td>{{ job.kitodo_details.task_id }}</td>
<td>{{ job.kitodo_details.process_id }}</td>
<td>{% if job.workflow is defined %}<a href="{{ url_for('workflows.detail', path=job.workflow_file) }}">{{ job.workflow }}</a>{% endif %}</td>
<td><a href="{{ url_for('workflows.detail', path=job.workflow_file) }}">{{ job.workflow }}</a></td>
<td>{{ job.return_code }} {% if job.return_code == 0 %}(SUCCESS){% else %}(FAILURE){% endif %}</td>
<td><a href="{{ url_for('workspaces.open', workspace=job.workdir)}}">{{ job.kitodo_details.processdir.name }}</a></td>
<td>
Expand Down
Loading

0 comments on commit 2fe3b2d

Please sign in to comment.