Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug fixes and more #24

Merged
merged 17 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ Variables:
currently: "$(MANAGER_KEY)"
- MANAGER_DATA host directory to mount into `/data` (shared with Manager)
currently: "$(MANAGER_DATA)"
- MANAGER_WORKFLOWS host directory to mount into `/workflows` (shared with Manager)
currently: "$(MANAGER_WORKFLOWS)"
- NETWORK Docker network to use (manage via "docker network")
currently: $(NETWORK)
- CONTROLLER_HOST network address for the Controller client
Expand All @@ -42,6 +44,7 @@ help: ; @eval "$$HELP"

MANAGER_KEY ?= $(firstword $(filter-out %.pub,$(wildcard $(HOME)/.ssh/id_*)))
MANAGER_DATA ?= $(CURDIR)
MANAGER_WORKFLOWS ?= $(CURDIR)
MONITOR_PORT_WEB ?= 5000
NETWORK ?= bridge
CONTROLLER_HOST ?= $(shell dig +short $$HOSTNAME)
Expand All @@ -54,6 +57,8 @@ run: $(DATA)
-p $(MONITOR_PORT_WEB):5000 \
-v ${MANAGER_KEY}:/id_rsa \
--mount type=bind,source=$(MANAGER_KEY),target=/id_rsa \
-v $(MANAGER_DATA):/data \
-v $(MANAGER_WORKFLOWS):/workflows \
-v shared:/run/lock/ocrd.jobs \
-e CONTROLLER=$(CONTROLLER_HOST):$(CONTROLLER_PORT_SSH) \
-e MONITOR_PORT_LOG=${MONITOR_PORT_LOG} \
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ In order to work properly, the following **environment variables** must be set:
| CONTROLLER_HOST | Hostname of the OCR-D Controller |
| CONTROLLER_PORT_SSH | Port on the OCR-D Controller host that allows a SSH connection |
| MANAGER_DATA | Path to the OCR-D workspaces on the host |
| MANAGER_WORKFLOWS | Path to the OCR-D workflows on the host |
| MANAGER_KEY | Path to a private key that can be used to authenticate with the OCR-D Controller |
| MONITOR_PORT_WEB | The port at which the OCR-D Monitor will be available on the host |
| MONITOR_PORT_LOG | The port at which the Dozzle logs will be available on the host |
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ services:

volumes:
- ${MANAGER_DATA}:/data
- ${MANAGER_WORKFLOWS}:/workflows
- ${MANAGER_KEY}:/id_rsa
- shared:/run/lock/ocrd.jobs

Expand Down
28 changes: 18 additions & 10 deletions ocrdmonitor/ocrdcontroller.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from __future__ import annotations

import sys
import logging
from pathlib import Path
from typing import Protocol

from ocrdmonitor.ocrdjob import OcrdJob
from ocrdmonitor.processstatus import ProcessStatus
from ocrdmonitor.processstatus import ProcessStatus, ProcessState

if sys.version_info >= (3, 10):
from typing import TypeGuard
Expand All @@ -14,39 +15,46 @@


class ProcessQuery(Protocol):
def __call__(self, process_group: int) -> list[ProcessStatus]:
def __call__(self, remotedir: str) -> list[ProcessStatus]:
...


class OcrdController:
def __init__(self, process_query: ProcessQuery, job_dir: Path) -> None:
self._process_query = process_query
self._job_dir = job_dir
logging.info(f"process_query: {process_query}")
logging.info(f"job_dir: {job_dir}")

def get_jobs(self) -> list[OcrdJob]:
def is_ocrd_job(j: OcrdJob | None) -> TypeGuard[OcrdJob]:
return j is not None

job_candidates = [
self._try_parse(job_file.read_text())
self._try_parse(job_file)
for job_file in self._job_dir.iterdir()
if job_file.is_file()
]

return list(filter(is_ocrd_job, job_candidates))

def _try_parse(self, job_str: str) -> OcrdJob | None:
def _try_parse(self, job_file: Path) -> OcrdJob | None:
job_str = job_file.read_text()
try:
return OcrdJob.from_str(job_str)
except (ValueError, KeyError):
except (ValueError, KeyError) as e:
logging.warning(f"found invalid job file: {job_file.name} - {e}")
return None

def status_for(self, ocrd_job: OcrdJob) -> ProcessStatus | None:
if ocrd_job.pid is None:
return None

process_statuses = self._process_query(ocrd_job.pid)
matching_statuses = (
status for status in process_statuses if status.pid == ocrd_job.pid
)
return next(matching_statuses, None)
process_statuses = self._process_query(ocrd_job.remotedir)

for status in process_statuses:
if status.state == ProcessState.RUNNING:
return status
if len(process_statuses) > 0:
return process_statuses[0]
return None
14 changes: 10 additions & 4 deletions ocrdmonitor/ocrdjob.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from datetime import datetime
from dataclasses import dataclass
from functools import cached_property
from pathlib import Path
Expand All @@ -8,8 +9,10 @@
_KEYMAP: dict[str, tuple[Type[int] | Type[str] | Type[Path], str]] = {
"PID": (int, "pid"),
"RETVAL": (int, "return_code"),
"PROCESS_ID": (int, "process_id"),
"TASK_ID": (int, "task_id"),
"TIME_CREATED": (datetime.fromisoformat, "time_created"),
"TIME_TERMINATED": (datetime.fromisoformat, "time_terminated"),
"PROCESS_ID": (str, "process_id"),
"TASK_ID": (str, "task_id"),
"PROCESS_DIR": (Path, "processdir"),
"WORKDIR": (Path, "workdir"),
"WORKFLOW": (Path, "workflow_file"),
Expand All @@ -35,8 +38,8 @@ def _into_dict(content: str) -> dict[str, int | str | Path]:


class KitodoProcessDetails(NamedTuple):
process_id: int
task_id: int
process_id: str
task_id: str
processdir: Path


Expand All @@ -59,6 +62,9 @@ class OcrdJob:
pid: int | None = None
return_code: int | None = None

time_created: datetime | None = None
time_terminated: datetime | None = None

@classmethod
def from_str(cls, content: str) -> "OcrdJob":
"""
Expand Down
15 changes: 10 additions & 5 deletions ocrdmonitor/processstatus.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
from __future__ import annotations

import logging
import subprocess
from dataclasses import dataclass
from datetime import timedelta
from enum import Enum

PS_CMD = "ps -g {} -o pid,state,%cpu,rss,cputime --no-headers"
PS_CMD = "ps -s `cat /data/{}/ocrd.pid` -o pid,state,%cpu,rss,cputime --no-headers"


class ProcessState(Enum):
# see ps(1)#PROCESS_STATE_CODES
RUNNING = "R"
SLEEPING = "S"
SLEEPIO = "D"
STOPPED = "T"
TRACING = "t"
ZOMBIE = "Z"
UNKNOWN = "?"

Expand Down Expand Up @@ -48,11 +52,12 @@ def parse_line(line: str) -> "ProcessStatus":

return [parse_line(line) for line in lines]


def run(group: int) -> list[ProcessStatus]:
cmd = PS_CMD.format(group)
# only used in tests!
def run(remotedir: str) -> list[ProcessStatus]:
cmd = PS_CMD.format(remotedir)
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)

if result.returncode > 0:
logging.error(f"checking status of process for {remotedir} failed: {result.stdout} {result.stderr}")
return ProcessStatus.from_ps_output(result.stdout)


Expand Down
6 changes: 4 additions & 2 deletions ocrdmonitor/server/jobs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from datetime import datetime, timezone
from dataclasses import dataclass
from typing import Iterable

Expand Down Expand Up @@ -49,12 +50,13 @@ def jobs(request: Request) -> Response:
job_status = [controller.status_for(job) for job in running]
running_jobs = wrap_in_running_job_type(running, job_status)

now = datetime.now(timezone.utc)
return templates.TemplateResponse(
"jobs.html.j2",
{
"request": request,
"running_jobs": running_jobs,
"completed_jobs": completed,
"running_jobs": sorted(running_jobs, key=lambda x: x.ocrd_job.time_created or now),
"completed_jobs": sorted(completed, key=lambda x: x.time_terminated or now),
},
)

Expand Down
12 changes: 12 additions & 0 deletions ocrdmonitor/server/templates/jobs.html.j2
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,17 @@
{% endblock %}

{% block content %}
<script>
function killjob(pid) {
/* todo: log into the Manager and kill this process ... */
window.alert("Not implemented yet")
}
</script>
<h2 class="title">Active Jobs</h2>
<table id="running-jobs" class="table">
<thead>
<tr>
<th>TSTART</th>
<th>TASK ID</th>
<th>PROCESS ID</th>
<th>WORKFLOW</th>
Expand All @@ -21,11 +28,13 @@
<th>% CPU</th>
<th>MB RSS</th>
<th>DURATION</th>
<th>ACTION</th>
</tr>
</thead>
<tbody>
{% for job in running_jobs: %}
<tr>
<td>{{ job.ocrd_job.time_created }}</td>
<td>{{ job.ocrd_job.kitodo_details.task_id }}</td>
<td>{{ job.ocrd_job.kitodo_details.process_id }}</td>
<td><a href="#">{{ job.ocrd_job.workflow }}</a></td>
Expand All @@ -34,6 +43,7 @@
<td>{{ job.process_status.percent_cpu }}</td>
<td>{{ job.process_status.memory }}</td>
<td>{{ job.process_status.cpu_time }}</td>
<td><button onclick="killjob({{ job.process_status.pid }})">Kill!</button></td>
</tr>
{% endfor %}
</tbody>
Expand All @@ -42,6 +52,7 @@
<table id="completed-jobs" class="table">
<thead>
<tr>
<th>TSTOP</th>
<th>TASK ID</th>
<th>PROCESS ID</th>
<th>WORKFLOW</th>
Expand All @@ -53,6 +64,7 @@
<tbody>
{% for job in completed_jobs: %}
<tr>
<td>{{ job.time_terminated }}</td>
<td>{{ job.kitodo_details.task_id }}</td>
<td>{{ job.kitodo_details.process_id }}</td>
<td>{% if job.workflow is defined %}<a href="{{ url_for('workflows.detail', path=job.workflow_file) }}">{{ job.workflow }}</a>{% endif %}</td>
Expand Down
17 changes: 9 additions & 8 deletions ocrdmonitor/sshps.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import logging
import subprocess
from pathlib import Path
from typing import Protocol
Expand All @@ -19,23 +20,23 @@ class SSHConfig(Protocol):
)


def process_status(config: SSHConfig, process_group: int) -> list[ProcessStatus]:
ssh_cmd = _build_ssh_command(config, process_group)
def process_status(config: SSHConfig, remotedir: str) -> list[ProcessStatus]:
ssh_cmd = _build_ssh_command(config, remotedir)

result = subprocess.run(
ssh_cmd,
shell=True,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
capture_output=True,
encoding="utf-8",
)

if result.returncode > 0:
logging.error(f"checking status of process for {remotedir} failed: {result.stderr}")
return ProcessStatus.from_ps_output(result.stdout)


def _build_ssh_command(config: SSHConfig, process_group: int) -> str:
ps_cmd = PS_CMD.format(process_group or "")
def _build_ssh_command(config: SSHConfig, remotedir: str) -> str:
ps_cmd = PS_CMD.format(remotedir)
return _SSH.format(
port=config.port,
keyfile=config.keyfile,
Expand Down