Skip to content

Commit

Permalink
[app][fix] digiflow 4 compliance
Browse files Browse the repository at this point in the history
  • Loading branch information
M3ssman committed Jun 18, 2024
1 parent 291988d commit 480cda1
Show file tree
Hide file tree
Showing 16 changed files with 185 additions and 208 deletions.
8 changes: 4 additions & 4 deletions cli_dir_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import ocrd_utils

import lib.odem as odem
from lib import odem


########
Expand Down Expand Up @@ -97,8 +97,8 @@
LOGGER.warning("no 'workflow_type' config option in section ocr defined. defaults to 'OCRD_PAGE_PARALLEL'")
PROCESS: odem.ODEMProcessImpl = odem.ODEMProcessImpl.create(proc_type, None, req_dst_dir, EXECUTORS)
PROCESS.local_mode = True
PROCESS.odem_configuration = CFG
PROCESS.the_logger = LOGGER
PROCESS.configuration = CFG
PROCESS.logger = LOGGER
local_images = PROCESS.get_local_image_paths(image_local_dir=ROOT_PATH)
PROCESS.process_statistics[odem.STATS_KEY_N_PAGES] = len(local_images)
PROCESS.process_statistics[odem.STATS_KEY_N_OCRABLE] = 0
Expand All @@ -111,7 +111,7 @@
PROCESS.ocr_candidates = list(zip(PROCESS.ocr_candidates,
[pathlib.Path(i).stem for i in PROCESS.ocr_candidates]))
PROCESS.run()
PROCESS.the_logger.info("[%s] duration: %s (%s)", req_idn,
PROCESS.logger.info("[%s] duration: %s (%s)", req_idn,
PROCESS.statistics['timedelta'], PROCESS.statistics)
except Exception as exc:
LOGGER.error("odem fails for '%s' after %s with: '%s'",
Expand Down
16 changes: 8 additions & 8 deletions cli_mets_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
########
if __name__ == "__main__":
PARSER = argparse.ArgumentParser(
description="generate ocr-data for OAI-Record")
description="generate ocr-data for Record")
PARSER.add_argument(
"mets_file",
help="path to digital object's METS/MODS file")
Expand Down Expand Up @@ -117,9 +117,9 @@
LOGGER.warning("no 'workflow_type' config option in section ocr defined. defaults to 'OCRD_PAGE_PARALLEL'")
record = df.OAIRecord(local_ident)
odem_process: odem.ODEMProcessImpl = odem.ODEMProcessImpl(record, mets_file_dir)
odem_process.the_logger = LOGGER
odem_process.the_logger.info("[%s] odem from %s, %d executors", local_ident, mets_file, EXECUTORS)
odem_process.odem_configuration = CFG
odem_process.logger = LOGGER
odem_process.logger.info("[%s] odem from %s, %d executors", local_ident, mets_file, EXECUTORS)
odem_process.configuration = CFG
process_resource_monitor: odem_rm.ProcessResourceMonitor = odem_rm.ProcessResourceMonitor(
odem_rm.from_configuration(CFG),
LOGGER.error,
Expand All @@ -144,7 +144,7 @@
raise odem.ODEMException(f"OCR Process Runner error for {record.identifier}")
odem_process.calculate_statistics_ocr(ocr_results)
odem_process.process_statistics[odem.STATS_KEY_N_EXECS] = EXECUTORS
odem_process.the_logger.info("[%s] %s", local_ident, odem_process.statistics)
odem_process.logger.info("[%s] %s", local_ident, odem_process.statistics)
odem_process.link_ocr_files()
odem_process.postprocess_ocr()
wf_enrich_ocr = CFG.getboolean(odem.CFG_SEC_METS, odem.CFG_SEC_METS_OPT_ENRICH, fallback=True)
Expand All @@ -157,14 +157,14 @@
odem_process.postprocess_mets()
if CFG.getboolean('mets', 'postvalidate', fallback=True):
odem_process.validate_metadata()
if odem_process.odem_configuration.has_option('export', 'local_export_dir'):
odem_process.the_logger.info("[%s] start to export data",
if odem_process.configuration.has_option('export', 'local_export_dir'):
odem_process.logger.info("[%s] start to export data",
odem_process.process_identifier)
if not MUST_KEEP_RESOURCES and len(DELETE_BEVOR_EXPORT) > 0:
odem_process.delete_before_export(DELETE_BEVOR_EXPORT)
odem_process.export_data()
_mode = 'sequential' if SEQUENTIAL else f'n_execs:{EXECUTORS}'
odem_process.the_logger.info("[%s] duration: %s/%s (%s)", odem_process.process_identifier,
odem_process.logger.info("[%s] duration: %s/%s (%s)", odem_process.process_identifier,
odem_process.statistics['timedelta'], _mode, odem_process.statistics)
LOGGER.info("[%s] odem done in '%s' (%d executors)",
odem_process.process_identifier, odem_process.statistics['timedelta'], EXECUTORS)
Expand Down
21 changes: 10 additions & 11 deletions cli_oai_local.py → cli_record_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import shutil
import sys

import digiflow as df
import digiflow.record as df_r

import lib.odem as odem
Expand All @@ -30,7 +31,7 @@
DEFAULT_EXECUTORS = 2


def trnfrm(row):
def _trnfrm(row):
oai_id = row[RECORD_IDENTIFIER]
try:
_info = ast.literal_eval(row[RECORD_INFO])
Expand Down Expand Up @@ -135,17 +136,15 @@ def trnfrm(row):
LOGGER.info("data fields: '%s'", DATA_FIELDS)
LOGGER.info("use records from '%s'", OAI_RECORD_FILE)
handler = df_r.RecordHandler(
OAI_RECORD_FILE, data_fields=DATA_FIELDS, transform_func=trnfrm)
OAI_RECORD_FILE, data_fields=DATA_FIELDS, transform_func=_trnfrm)
record: df_r.Record = handler.next_record(state=MARK_OCR_OPEN)
if not record:
LOGGER.info("no open records in '%s', work done", OAI_RECORD_FILE)
sys.exit(1)


def wrap_save_record_state(status: str, urn, **kwargs):
handler.save_record_state(urn, status, **kwargs)


try:
handler.save_record_state(record.identifier, MARK_OCR_BUSY)
local_ident = record.local_identifier
Expand All @@ -157,13 +156,13 @@ def wrap_save_record_state(status: str, urn, **kwargs):
if proc_type is None:
LOGGER.warning("no 'workflow_type' config option in section ocr defined. defaults to 'OCRD_PAGE_PARALLEL'")
odem_process: ODEMProcessImpl = ODEMProcessImpl(record, req_dst_dir)
odem_process.the_logger = LOGGER
odem_process.the_logger.info("[%s] odem from %s, %d executors", local_ident, OAI_RECORD_FILE, EXECUTORS)
odem_process.odem_configuration = CFG
odem_process.logger = LOGGER
odem_process.logger.info("[%s] odem from %s, %d executors", local_ident, OAI_RECORD_FILE, EXECUTORS)
odem_process.configuration = CFG
LOCAL_STORE_ROOT = CFG.get('global', 'local_store_root', fallback=None)
if LOCAL_STORE_ROOT is not None:
STORE_DIR = os.path.join(LOCAL_STORE_ROOT, local_ident)
STORE = df_r.LocalStore(STORE_DIR, req_dst_dir)
STORE = df.LocalStore(STORE_DIR, req_dst_dir)
odem_process.store = STORE
process_resource_monitor: odem_rm.ProcessResourceMonitor = odem_rm.ProcessResourceMonitor(
odem_rm.from_configuration(CFG),
Expand All @@ -190,7 +189,7 @@ def wrap_save_record_state(status: str, urn, **kwargs):
raise ODEMException(f"process run error: {record.identifier}")
odem_process.calculate_statistics_ocr(ocr_results)
odem_process.process_statistics[odem.STATS_KEY_N_EXECS] = EXECUTORS
odem_process.the_logger.info("[%s] %s", local_ident, odem_process.statistics)
odem_process.logger.info("[%s] %s", local_ident, odem_process.statistics)
# odem_process.link_ocr_files()
# odem_process.postprocess_ocr()
wf_enrich_ocr = CFG.getboolean(odem.CFG_SEC_METS, odem.CFG_SEC_METS_OPT_ENRICH, fallback=True)
Expand All @@ -214,14 +213,14 @@ def wrap_save_record_state(status: str, urn, **kwargs):
odem_process.record.info.update(_kwargs)
_info = f"{odem_process.record.info}"
except:
odem_process.the_logger.warning("Can't parse '%s', store info literally",
odem_process.logger.warning("Can't parse '%s', store info literally",
odem_process.record.info)
_info = f"{_kwargs}"
else:
_info = f"{_kwargs}"
handler.save_record_state(record.identifier, MARK_OCR_DONE, INFO=_info)
_mode = 'sequential' if SEQUENTIAL else f'n_execs:{EXECUTORS}'
odem_process.the_logger.info("[%s] duration: %s/%s (%s)", odem_process.process_identifier,
odem_process.logger.info("[%s] duration: %s/%s (%s)", odem_process.process_identifier,
odem_process.statistics['timedelta'], _mode, odem_process.statistics)
# finale
LOGGER.info("[%s] odem done in '%s' (%d executors)",
Expand Down
113 changes: 47 additions & 66 deletions cli_oai_server.py → cli_record_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,83 +5,64 @@
"""

import configparser
import contextlib
import functools
import http.server
import json
import logging.config
import os
import sys
import time
from contextlib import (
contextmanager
)
from functools import (
partial
)
from http.server import (
SimpleHTTPRequestHandler,
HTTPServer,
)
from pathlib import (
Path
)
from threading import (
Thread
)
import threading

from digiflow import (
OAIRecord,
OAIRecordHandler,
)
from pathlib import Path

import digiflow.record as df_r

import lib.odem as odem
from lib import odem

from lib.odem.odem_commons import (
MARK_OCR_BUSY,
RECORD_IDENTIFIER,
RECORD_INFO,
RECORD_SPEC,
RECORD_RELEASED,
RECORD_STATE,
to_dict,
)

_PROJECT_ROOT = Path(__file__).resolve().parent
ODEM_LOG_CONFIG_FILE = _PROJECT_ROOT / 'resources' / 'odem_logging.ini'
LOCKFILE = 'SERVER_RUNNING'
PROJECT_ROOT = Path(__file__).resolve().parent
ODEM_LOG_CONFIG_FILE = PROJECT_ROOT / 'resources' / 'odem_logging.ini'
ODEM_LOG_NAME = 'odem'
NEXT_COMMAND = 'next'
UPDATE_COMMAND = 'update'
MIME_TXT = 'text/plain'
MIME_HTML = 'text/html'
STATETIME_FORMAT = '%Y-%m-%d_%H:%M:%S'
# MARK_DATA_EXHAUSTED_PREFIX = 'no open records'
# MARK_DATA_EXHAUSTED = MARK_DATA_EXHAUSTED_PREFIX + ' in {}, please inspect resource'
_MIME_TXT = 'text/plain'
LOGGER = None


def to_full_record(row):
"""Serialize CSV row into OAIRecord
with all attributes being evaluated"""
# def _to_full_record(row):
# """Serialize CSV row into OAIRecord
# with all attributes being evaluated"""

oai_id = row[RECORD_IDENTIFIER]
record = OAIRecord(oai_id)
# legacy field for backward compatibility
if RECORD_SPEC in row:
record.set = row[RECORD_SPEC]
# legacy field for backward compatibility
if RECORD_RELEASED in row:
record.date_stamp = row[RECORD_RELEASED]
record.info = row[RECORD_INFO]
return record
# oai_id = row[RECORD_IDENTIFIER]
# record = df_r.Record(oai_id)
# # legacy field for backward compatibility
# if RECORD_SPEC in row:
# record.set = row[RECORD_SPEC]
# # legacy field for backward compatibility
# if RECORD_RELEASED in row:
# record.date_stamp = row[RECORD_RELEASED]
# record.info = row[RECORD_INFO]
# return record


class OAIService(SimpleHTTPRequestHandler):
class RecordRequestHandler(http.server.SimpleHTTPRequestHandler):
"""Http Request handler for POST and GET requests"""

def __init__(self, data_path, *args, **kwargs):
def __init__(self, data_path:Path, *args, **kwargs):
"""Overrides __init__ from regular SimpleHTTPRequestHandler
data_path: folder where we expect oai record files"""
data_path: folder where we expect record files"""

self.record_list_directory = data_path
self.record_list_directory: Path = data_path
super().__init__(*args, **kwargs)

def do_GET(self):
Expand All @@ -102,7 +83,7 @@ def do_GET(self):
state, data = self.get_next_record(file_name, client_name)
LOGGER.debug("deliver next record: '%s'", data)
if isinstance(data, str):
self._set_headers(state, MIME_TXT)
self._set_headers(state, _MIME_TXT)
self.wfile.write(data.encode('utf-8'))
else:
self._set_headers(state)
Expand All @@ -118,7 +99,7 @@ def do_POST(self):
except ValueError:
self.wfile.write(
b'please provide record file name and command '
b' e.g.: /oai-records-vd18/next')
b' e.g.: /records-vd18/next')
LOGGER.error('request next record with umbiguous data')
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
Expand All @@ -130,13 +111,13 @@ def do_POST(self):
if ident:
state, data = self.update_record(file_name, data_dict)
if isinstance(data, str):
self._set_headers(state, MIME_TXT)
self._set_headers(state, _MIME_TXT)
self.wfile.write(data.encode('utf-8'))
else:
self._set_headers(state)
self.wfile.write(json.dumps(data, default=to_dict).encode('utf-8'))
else:
self._set_headers(400, MIME_TXT)
self._set_headers(400, _MIME_TXT)
self.wfile.write(f"no entry for {ident} in {file_name}!".encode('utf-8'))

def _set_headers(self, state=200, mime_type='application/json') -> None:
Expand Down Expand Up @@ -168,8 +149,8 @@ def get_next_record(self, file_name, client_name) -> tuple:
LOGGER.warning("no '%s' found in '%s'", data_file_path, self.record_list_directory)
return (404, f"no file '{file_name}' in {self.record_list_directory}")

handler = OAIRecordHandler(data_file_path, transform_func=to_full_record)
next_record: OAIRecord = handler.next_record()
handler = df_r.RecordHandler(data_file_path, transform_func=df_r.row_to_record)
next_record: df_r.Record = handler.next_record()
# if no record in resource available, alert no resource after all, too
if not next_record:
_msg = f'{odem.MARK_DATA_EXHAUSTED.format(data_file_path)}'
Expand All @@ -193,9 +174,9 @@ def update_record(self, data_file, data) -> tuple:
LOGGER.error('do_POST: %s not found', data_file_path)
return (404, f"data file not found: {data_file_path}")
try:
handler = OAIRecordHandler(data_file_path)
handler = df_r.RecordHandler(data_file_path)
_ident = data[RECORD_IDENTIFIER]
_record: OAIRecord = handler.get(_ident)
_record: df_r.Record = handler.get(_ident)
_prev_info = _record.info
_info = f'{data[RECORD_INFO]}'
if _prev_info != 'n.a.':
Expand All @@ -211,17 +192,17 @@ def update_record(self, data_file, data) -> tuple:
return (500, _msg)


class OAIRequestHandler():
class RecordServer():
"""helper class to start server"""

@contextmanager
@contextlib.contextmanager
def http_server(self, host: str, port: int, directory: str):
"""init server"""
server = HTTPServer(
server = http.server.HTTPServer(
(host, port),
partial(OAIService, directory)
functools.partial(RecordRequestHandler, directory)
)
server_thread = Thread(target=server.serve_forever, name="http_server")
server_thread = threading.Thread(target=server.serve_forever, name="http_server")
server_thread.start()

try:
Expand All @@ -243,8 +224,8 @@ def serve(self, host, port, pth):

LOGGER.info("server starts listen at: %s:%s", host, port)
LOGGER.info("serve record files from: %s", pth)
LOGGER.info("call for next record with: %s:%s/<oai-record-file>/next", host, port)
LOGGER.info("post a record update with: %s:%s/<oai-record-file>/update", host, port)
LOGGER.info("call for next record with: %s:%s/<record-file>/next", host, port)
LOGGER.info("post a record update with: %s:%s/<record-file>/update", host, port)
with self.http_server(host, port, pth):
with open(LOCKFILE, 'w', encoding='UTF-8') as lock:
lock.write(f'delete me to stop server @{host}:{port}')
Expand Down Expand Up @@ -294,14 +275,14 @@ def serve(self, host, port, pth):
LOGGER.info("logging initialized - store log entry in %s", _logfile_name)

# evaluate configured server data
_port = SCRIPT_CONFIGURATION.getint('oai-server', 'oai_server_port')
_host = SCRIPT_CONFIGURATION.get('oai-server', 'oai_server_url')
_oai_res_dir = SCRIPT_CONFIGURATION.get('oai-server', 'oai_server_resource_dir')
_port = SCRIPT_CONFIGURATION.getint('record-server', 'record_server_port')
_host = SCRIPT_CONFIGURATION.get('record-server', 'record_server_url')
_oai_res_dir = SCRIPT_CONFIGURATION.get('record-server', 'record_server_resource_dir')

# foster the record dir path, propably shortened
if '~' in _oai_res_dir:
_oai_res_dir = Path(_oai_res_dir).expanduser()
_oai_res_dir = Path(_oai_res_dir).absolute().resolve()

# forward to request handler
OAIRequestHandler().serve(_host, _port, _oai_res_dir)
RecordServer().serve(_host, _port, _oai_res_dir)
Loading

0 comments on commit 480cda1

Please sign in to comment.