Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug fixes and more #24

Merged
merged 17 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ Variables:
currently: "$(MANAGER_KEY)"
- MANAGER_DATA host directory to mount into `/data` (shared with Manager)
currently: "$(MANAGER_DATA)"
- MANAGER_WORKFLOWS host directory to mount into `/workflows` (shared with Manager)
currently: "$(MANAGER_WORKFLOWS)"
- NETWORK Docker network to use (manage via "docker network")
currently: $(NETWORK)
- CONTROLLER_HOST network address for the Controller client
Expand All @@ -42,6 +44,7 @@ help: ; @eval "$$HELP"

MANAGER_KEY ?= $(firstword $(filter-out %.pub,$(wildcard $(HOME)/.ssh/id_*)))
MANAGER_DATA ?= $(CURDIR)
MANAGER_WORKFLOWS ?= $(CURDIR)
MONITOR_PORT_WEB ?= 5000
NETWORK ?= bridge
CONTROLLER_HOST ?= $(shell dig +short $$HOSTNAME)
Expand All @@ -54,6 +57,8 @@ run: $(DATA)
-p $(MONITOR_PORT_WEB):5000 \
-v ${MANAGER_KEY}:/id_rsa \
--mount type=bind,source=$(MANAGER_KEY),target=/id_rsa \
-v $(MANAGER_DATA):/data \
-v $(MANAGER_WORKFLOWS):/workflows \
-v shared:/run/lock/ocrd.jobs \
-e CONTROLLER=$(CONTROLLER_HOST):$(CONTROLLER_PORT_SSH) \
-e MONITOR_PORT_LOG=${MONITOR_PORT_LOG} \
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ In order to work properly, the following **environment variables** must be set:
| CONTROLLER_HOST | Hostname of the OCR-D Controller |
| CONTROLLER_PORT_SSH | Port on the OCR-D Controller host that allows a SSH connection |
| MANAGER_DATA | Path to the OCR-D workspaces on the host |
| MANAGER_WORKFLOWS | Path to the OCR-D workflows on the host |
| MANAGER_KEY | Path to a private key that can be used to authenticate with the OCR-D Controller |
| MONITOR_PORT_WEB | The port at which the OCR-D Monitor will be available on the host |
| MONITOR_PORT_LOG | The port at which the Dozzle logs will be available on the host |
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ services:

volumes:
- ${MANAGER_DATA}:/data
- ${MANAGER_WORKFLOWS}:/workflows
- ${MANAGER_KEY}:/id_rsa
- shared:/run/lock/ocrd.jobs

Expand Down
2 changes: 1 addition & 1 deletion init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export OCRD_LOGVIEW__PORT=$MONITOR_PORT_LOG
export OCRD_CONTROLLER__JOB_DIR=/run/lock/ocrd.jobs
export OCRD_CONTROLLER__HOST=$CONTROLLER_HOST
export OCRD_CONTROLLER__PORT=$CONTROLLER_PORT
export OCRD_CONTROLLER__USER=ocrd
export OCRD_CONTROLLER__USER=admin
export OCRD_CONTROLLER__KEYFILE=~/.ssh/id_rsa

cd /usr/local/ocrd-monitor
Expand Down
30 changes: 19 additions & 11 deletions ocrdmonitor/ocrdcontroller.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from __future__ import annotations

import sys
import logging
from pathlib import Path
from typing import Protocol

from ocrdmonitor.ocrdjob import OcrdJob
from ocrdmonitor.processstatus import ProcessStatus
from ocrdmonitor.processstatus import ProcessStatus, ProcessState

if sys.version_info >= (3, 10):
from typing import TypeGuard
Expand All @@ -14,39 +15,46 @@


class ProcessQuery(Protocol):
def __call__(self, process_group: int) -> list[ProcessStatus]:
def __call__(self, remotedir: str) -> list[ProcessStatus]:
...


class OcrdController:
def __init__(self, process_query: ProcessQuery, job_dir: Path) -> None:
self._process_query = process_query
self._job_dir = job_dir
logging.info(f"process_query: {process_query}")
logging.info(f"job_dir: {job_dir}")

def get_jobs(self) -> list[OcrdJob]:
def is_ocrd_job(j: OcrdJob | None) -> TypeGuard[OcrdJob]:
return j is not None

job_candidates = [
self._try_parse(job_file.read_text())
self._try_parse(job_file)
for job_file in self._job_dir.iterdir()
if job_file.is_file()
]

return list(filter(is_ocrd_job, job_candidates))

def _try_parse(self, job_str: str) -> OcrdJob | None:
def _try_parse(self, job_file: Path) -> OcrdJob | None:
job_str = job_file.read_text()
try:
return OcrdJob.from_str(job_str)
except (ValueError, KeyError):
except (ValueError, KeyError) as e:
logging.warning(f"found invalid job file: {job_file.name} - {e}")
return None

def status_for(self, ocrd_job: OcrdJob) -> ProcessStatus | None:
if ocrd_job.pid is None:
if ocrd_job.remotedir is None:
return None

process_statuses = self._process_query(ocrd_job.pid)
matching_statuses = (
status for status in process_statuses if status.pid == ocrd_job.pid
)
return next(matching_statuses, None)
process_statuses = self._process_query(ocrd_job.remotedir)

for status in process_statuses:
if status.state == ProcessState.RUNNING:
return status
if len(process_statuses) > 0:
return process_statuses[0]
return None
20 changes: 13 additions & 7 deletions ocrdmonitor/ocrdjob.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from __future__ import annotations

from datetime import datetime
from dataclasses import dataclass
from functools import cached_property
from pathlib import Path
from typing import Any, NamedTuple, Type
from typing import Any, Callable, NamedTuple, Type

_KEYMAP: dict[str, tuple[Type[int] | Type[str] | Type[Path], str]] = {
_KEYMAP: dict[str, tuple[Type[int] | Type[str] | Type[Path] | Callable[[str], datetime], str]] = {
"PID": (int, "pid"),
"RETVAL": (int, "return_code"),
"PROCESS_ID": (int, "process_id"),
"TASK_ID": (int, "task_id"),
"TIME_CREATED": (datetime.fromisoformat, "time_created"),
"TIME_TERMINATED": (datetime.fromisoformat, "time_terminated"),
"PROCESS_ID": (str, "process_id"),
"TASK_ID": (str, "task_id"),
"PROCESS_DIR": (Path, "processdir"),
"WORKDIR": (Path, "workdir"),
"WORKFLOW": (Path, "workflow_file"),
Expand All @@ -18,7 +21,7 @@
}


def _into_dict(content: str) -> dict[str, int | str | Path]:
def _into_dict(content: str) -> dict[str, int | str | Path | datetime]:
result_dict = {}
lines = content.splitlines()
for line in lines:
Expand All @@ -35,8 +38,8 @@ def _into_dict(content: str) -> dict[str, int | str | Path]:


class KitodoProcessDetails(NamedTuple):
process_id: int
task_id: int
process_id: str
task_id: str
processdir: Path


Expand All @@ -59,6 +62,9 @@ class OcrdJob:
pid: int | None = None
return_code: int | None = None

time_created: datetime | None = None
time_terminated: datetime | None = None

@classmethod
def from_str(cls, content: str) -> "OcrdJob":
"""
Expand Down
21 changes: 11 additions & 10 deletions ocrdmonitor/processstatus.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from __future__ import annotations

import subprocess
from dataclasses import dataclass
from datetime import timedelta
from enum import Enum

PS_CMD = "ps -g {} -o pid,state,%cpu,rss,cputime --no-headers"


class ProcessState(Enum):
# see ps(1)#PROCESS_STATE_CODES
RUNNING = "R"
SLEEPING = "S"
SLEEPIO = "D"
STOPPED = "T"
TRACING = "t"
ZOMBIE = "Z"
UNKNOWN = "?"

Expand All @@ -27,6 +27,14 @@ class ProcessStatus:
memory: int
cpu_time: timedelta

@classmethod
def remotedir_to_pid_cmd(cls, remotedir: str) -> str:
return "cat /data/{}/ocrd.pid".format(remotedir)

@classmethod
def session_pid_to_ps_cmd(cls, pid: str) -> str:
return "ps -s {} -o pid,state,%cpu,rss,cputime --no-headers".format(pid)

@classmethod
def from_ps_output(cls, ps_output: str) -> list["ProcessStatus"]:
def is_error(lines: list[str]) -> bool:
Expand All @@ -49,13 +57,6 @@ def parse_line(line: str) -> "ProcessStatus":
return [parse_line(line) for line in lines]


def run(group: int) -> list[ProcessStatus]:
cmd = PS_CMD.format(group)
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)

return ProcessStatus.from_ps_output(result.stdout)


def _cpu_time_to_seconds(cpu_time: str) -> int:
hours, minutes, seconds, *_ = cpu_time.split(":")
return int(hours) * 3600 + int(minutes) * 60 + int(seconds)
12 changes: 10 additions & 2 deletions ocrdmonitor/server/jobs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from datetime import datetime, timezone
from dataclasses import dataclass
from typing import Iterable

Expand Down Expand Up @@ -49,12 +50,19 @@ def jobs(request: Request) -> Response:
job_status = [controller.status_for(job) for job in running]
running_jobs = wrap_in_running_job_type(running, job_status)

now = datetime.now(timezone.utc)
return templates.TemplateResponse(
"jobs.html.j2",
{
"request": request,
"running_jobs": running_jobs,
"completed_jobs": completed,
"running_jobs": sorted(
running_jobs,
key=lambda x: x.ocrd_job.time_created or now,
),
"completed_jobs": sorted(
completed,
key=lambda x: x.time_terminated or now,
),
},
)

Expand Down
16 changes: 14 additions & 2 deletions ocrdmonitor/server/templates/jobs.html.j2
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,17 @@
{% endblock %}

{% block content %}
<script>
function killjob(pid) {
/* todo: log into the Manager and kill this process ... */
window.alert("Not implemented yet")
}
</script>
<h2 class="title">Active Jobs</h2>
<table id="running-jobs" class="table">
<thead>
<tr>
<th>TSTART</th>
<th>TASK ID</th>
<th>PROCESS ID</th>
<th>WORKFLOW</th>
Expand All @@ -21,19 +28,22 @@
<th>% CPU</th>
<th>MB RSS</th>
<th>DURATION</th>
<th>ACTION</th>
</tr>
</thead>
<tbody>
{% for job in running_jobs: %}
<tr>
<td>{{ job.ocrd_job.time_created }}</td>
<td>{{ job.ocrd_job.kitodo_details.task_id }}</td>
<td>{{ job.ocrd_job.kitodo_details.process_id }}</td>
<td><a href="#">{{ job.ocrd_job.workflow }}</a></td>
<td><a href="{{ url_for('workflows.detail', path=job.ocrd_job.workflow_file) }}">{{ job.ocrd_job.workflow }}</a></td>
<td>{{ job.process_status.pid }}</td>
<td>{{ job.process_status.state }}</td>
<td>{{ job.process_status.percent_cpu }}</td>
<td>{{ job.process_status.memory }}</td>
<td>{{ job.process_status.cpu_time }}</td>
<td><button onclick="killjob({{ job.process_status.pid }})">Kill!</button></td>
</tr>
{% endfor %}
</tbody>
Expand All @@ -42,6 +52,7 @@
<table id="completed-jobs" class="table">
<thead>
<tr>
<th>TSTOP</th>
<th>TASK ID</th>
<th>PROCESS ID</th>
<th>WORKFLOW</th>
Expand All @@ -53,9 +64,10 @@
<tbody>
{% for job in completed_jobs: %}
<tr>
<td>{{ job.time_terminated }}</td>
<td>{{ job.kitodo_details.task_id }}</td>
<td>{{ job.kitodo_details.process_id }}</td>
<td>{% if job.workflow is defined %}<a href="{{ url_for('workflows.detail', path=job.workflow_file) }}">{{ job.workflow }}</a>{% endif %}</td>
<td><a href="{{ url_for('workflows.detail', path=job.workflow_file) }}">{{ job.workflow }}</a></td>
<td>{{ job.return_code }} {% if job.return_code == 0 %}(SUCCESS){% else %}(FAILURE){% endif %}</td>
<td><a href="{{ url_for('workspaces.open', workspace=job.workdir)}}">{{ job.kitodo_details.processdir.name }}</a></td>
<td>
Expand Down
51 changes: 31 additions & 20 deletions ocrdmonitor/sshps.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from __future__ import annotations

import logging
import subprocess
from pathlib import Path
from typing import Protocol

from ocrdmonitor.processstatus import PS_CMD, ProcessStatus
from ocrdmonitor.processstatus import ProcessStatus


class SSHConfig(Protocol):
Expand All @@ -13,33 +14,43 @@ class SSHConfig(Protocol):
user: str
keyfile: Path


_SSH = (
"ssh -o StrictHostKeyChecking=no -i '{keyfile}' -p {port} {user}@{host} '{ps_cmd}'"
)


def process_status(config: SSHConfig, process_group: int) -> list[ProcessStatus]:
ssh_cmd = _build_ssh_command(config, process_group)

def process_status(config: SSHConfig, remotedir: str) -> list[ProcessStatus]:
pid_cmd = ProcessStatus.remotedir_to_pid_cmd(remotedir)
pid_cmd = _build_ssh_command(config, pid_cmd)
result = subprocess.run(
ssh_cmd,
pid_cmd,
shell=True,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
capture_output=True,
encoding="utf-8",
)

if result.returncode > 0:
logging.error(
f"looking up PID of process for {remotedir} failed: {result.stderr}"
)
return []
pid = result.stdout.strip()
ps_cmd = ProcessStatus.session_pid_to_ps_cmd(pid)
ps_cmd = _build_ssh_command(config, ps_cmd)
result = subprocess.run(
ps_cmd,
shell=True,
text=True,
capture_output=True,
encoding="utf-8",
)
if result.returncode > 0:
logging.error(
f"checking status of process {pid} failed: {result.stderr}"
)
return []
return ProcessStatus.from_ps_output(result.stdout)


def _build_ssh_command(config: SSHConfig, process_group: int) -> str:
ps_cmd = PS_CMD.format(process_group or "")
return _SSH.format(
def _build_ssh_command(config: SSHConfig, cmd: str) -> str:
return "ssh -o StrictHostKeyChecking=no -i '{keyfile}' -p {port} {user}@{host} '{cmd}'".format(
port=config.port,
keyfile=config.keyfile,
user=config.user,
host=config.host,
ps_cmd=ps_cmd,
cmd=cmd,
)
Loading