Skip to content

Commit

Permalink
[app][feat] integrate ocr-pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
M3ssman committed Jun 12, 2024
1 parent 8a81013 commit a3ed84a
Show file tree
Hide file tree
Showing 20 changed files with 9,606 additions and 722 deletions.
2 changes: 1 addition & 1 deletion cli_dir_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
PROCESS: ODEMProcess = ODEMProcess.create(proc_type, None, req_dst_dir, EXECUTORS)

PROCESS.local_mode = True
PROCESS.cfg = CFG
PROCESS.odem_configuration = CFG
PROCESS.the_logger = LOGGER
local_images = PROCESS.get_local_image_paths(image_local_dir=ROOT_PATH)
PROCESS._statistics_ocr[STATS_KEY_N_PAGES] = len(local_images)
Expand Down
193 changes: 193 additions & 0 deletions cli_mets_local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
# -*- coding: utf-8 -*-
"""MAIN CLI plain OCR with optional export"""

import argparse
import os
import sys

from pathlib import Path

import digiflow as df

import lib.ocrd3_odem as o3o

from lib.resources_monitoring import ProcessResourceMonitor, ProcessResourceMonitorConfig

DEFAULT_EXECUTORS = 2


########
# MAIN #
########
if __name__ == "__main__":
PARSER = argparse.ArgumentParser(
description="generate ocr-data for OAI-Record")
PARSER.add_argument(
"mets_file",
help="path to digital object's METS/MODS file")
PARSER.add_argument(
"-c",
"--config",
required=False,
default="resources/odem.ini",
help="path to configuration file")
PARSER.add_argument(
"-e",
"--executors",
required=False,
help="Number of OCR-D Executors in parallel mode")
PARSER.add_argument(
"-s",
"--sequential-mode",
required=False,
default=False,
action="store_true",
help="Disable parallel mode, just run sequential")
PARSER.add_argument(
"-k",
"--keep-resources",
required=False,
default=False,
action='store_true',
help="keep stored images after processing")
PARSER.add_argument(
"-l",
"--lock-mode",
required=False,
default=False,
action='store_true',
help="lock each run to avoid parallel starts")
ARGS = PARSER.parse_args()

# check some pre-conditions
# inspect configuration settings
CONF_FILE = os.path.abspath(ARGS.config)
if not os.path.exists(CONF_FILE):
print(f"[ERROR] no config at '{CONF_FILE}'! Halt execution!")
sys.exit(1)

# pick common args
SEQUENTIAL = ARGS.sequential_mode
MUST_KEEP_RESOURCES = ARGS.keep_resources
MUST_LOCK = ARGS.lock_mode
EXECUTOR_ARGS = ARGS.executors

CFG = o3o.get_configparser()
configurations_read = CFG.read(CONF_FILE)
if not configurations_read:
print(f"unable to read config from '{CONF_FILE}! exit!")
sys.exit(1)

CREATE_PDF: bool = CFG.getboolean('derivans', 'derivans_enabled', fallback=True)
ENRICH_METS_FULLTEXT: bool = CFG.getboolean('export', 'enrich_mets_fulltext', fallback=True)

# set work_dirs and logger
LOCAL_DELETE_BEVOR_EXPORT = []
if CFG.has_option('export', 'delete_before_export'):
LOCAL_DELETE_BEVOR_EXPORT = CFG.getlist('export', 'delete_before_export')
LOCAL_LOG_DIR = CFG.get('global', 'local_log_dir')
if not os.path.exists(LOCAL_LOG_DIR) or not os.access(
LOCAL_LOG_DIR, os.W_OK):
raise RuntimeError(f"cant store log files at invalid {LOCAL_LOG_DIR}")
LOG_FILE_NAME = None
if CFG.has_option('global', 'logfile_name'):
LOG_FILE_NAME = CFG.get('global', 'logfile_name')
LOGGER = o3o.get_logger(LOCAL_LOG_DIR, LOG_FILE_NAME)

mets_file: Path = Path(ARGS.mets_file).absolute()
if not mets_file.is_file():
print(f"unable to read file '{mets_file}! exit!")
sys.exit(1)
LOGGER.info("use '%s'", mets_file)
mets_file_dir = mets_file.parent

# if valid n_executors via cli, use it's value
if EXECUTOR_ARGS and int(EXECUTOR_ARGS) > 0:
CFG.set('ocr', 'n_executors', str(EXECUTOR_ARGS))
EXECUTORS = CFG.getint('ocr', 'n_executors', fallback=DEFAULT_EXECUTORS)
if SEQUENTIAL:
EXECUTORS = 1
LOGGER.debug("local work_root: '%s', executors:%s, keep_res:%s, lock:%s",
mets_file_dir, EXECUTORS, MUST_KEEP_RESOURCES, MUST_LOCK)

try:
local_ident = mets_file.stem
proc_type: str = CFG.get('ocr', 'workflow_type', fallback=None)
if proc_type is None:
LOGGER.warning("no 'workflow_type' config option in section 'ocr' defined. defaults to 'OCRD_PAGE_PARALLEL'")
record = df.OAIRecord(local_ident)
odem_process: o3o.ODEMProcess = o3o.ODEMProcess(record, mets_file_dir)
odem_process.the_logger = LOGGER
odem_process.the_logger.info("[%s] odem from %s, %d executors", local_ident, mets_file, EXECUTORS)
odem_process.odem_configuration = CFG
process_resource_monitor: ProcessResourceMonitor = ProcessResourceMonitor(
ProcessResourceMonitorConfig(
enable_resource_monitoring=CFG.getboolean('resource-monitoring', 'enable', fallback=False),
polling_interval=CFG.getfloat('resource-monitoring', 'polling_interval', fallback=1),
path_disk_usage=CFG.get('resource-monitoring', 'path_disk_usage', fallback='/home/ocr'),
factor_free_disk_space_needed=CFG.getfloat(
'resource-monitoring',
'factor_free_disk_space_needed',
fallback=3.0
),
max_vmem_percentage=CFG.getfloat('resource-monitoring', 'max_vmem_percentage', fallback=None),
max_vmem_bytes=CFG.getint('resource-monitoring', 'max_vmem_bytes', fallback=None),
),
LOGGER.error,
None,
odem_process.process_identifier,
record.identifier
)
process_resource_monitor.check_vmem()
# process_resource_monitor.monit_disk_space(odem_process.load)
odem_process.inspect_metadata()
if CFG.getboolean('mets','prevalidate', fallback=True):
odem_process.validate_metadata()
odem_process.clear_existing_entries()
odem_process.language_modelconfig()
odem_process.set_local_images()

# NEW NEW NEW
odem_pipeline = o3o.ODEMOCRPipeline.create(proc_type, odem_process)
odem_runner = o3o.ODEMPipelineRunner(local_ident, EXECUTORS, LOGGER, odem_pipeline)
OUTCOMES = process_resource_monitor.monit_vmem(odem_runner.run)
if OUTCOMES is None or len(OUTCOMES) == 0:
raise o3o.ODEMException(f"process run error: {record.identifier}")

odem_process.calculate_statistics_ocr(OUTCOMES)
odem_process.the_logger.info("[%s] %s", local_ident, odem_process.statistics)
odem_process.link_ocr()
if CREATE_PDF:
odem_process.create_pdf()
odem_process.postprocess_ocr()
if CREATE_PDF:
odem_process.create_text_bundle_data()
odem_process.postprocess_mets()
if CFG.getboolean('mets','postvalidate', fallback=True):
odem_process.validate_metadata()
if odem_process.odem_configuration.has_option('export', 'local_export_dir'):
odem_process.the_logger.info("[%s] start to export data",
odem_process.process_identifier)
if not MUST_KEEP_RESOURCES:
odem_process.delete_before_export(LOCAL_DELETE_BEVOR_EXPORT)
odem_process.export_data()
_mode = 'sequential' if SEQUENTIAL else f'n_execs:{EXECUTORS}'
odem_process.the_logger.info("[%s] duration: %s/%s (%s)", odem_process.process_identifier,
odem_process.duration, _mode, odem_process.statistics)
# finale
LOGGER.info("[%s] odem done in '%s' (%d executors)",
odem_process.process_identifier, odem_process.duration, EXECUTORS)
except o3o.ODEMNoTypeForOCRException as type_unknown:
# we don't ocr this one
LOGGER.warning("[%s] odem skips '%s'",
odem_process.process_identifier, type_unknown.args[0])
except o3o.ODEMNoImagesForOCRException as not_ocrable:
LOGGER.warning("[%s] odem no ocrables '%s'",
odem_process.process_identifier, not_ocrable.args)
except o3o.ODEMException as _odem_exc:
_err_args = {'ODEMException': _odem_exc.args[0]}
LOGGER.error("[%s] odem fails with: '%s'", odem_process.process_identifier, _err_args)
except RuntimeError as exc:
LOGGER.error("odem fails for '%s' after %s with: '%s'",
record, odem_process.duration, str(exc))
sys.exit(1)
2 changes: 1 addition & 1 deletion cli_oai_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ def oai_arg_parser(value):
local_ident,
CLIENT.host, EXECUTORS
)
PROCESS.cfg = CFG
PROCESS.odem_configuration = CFG

try:
if os.path.exists(req_dst_dir):
Expand Down
102 changes: 51 additions & 51 deletions cli_oai_local.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
# -*- coding: utf-8 -*-
"""MAIN CLI OAI LOCAL ODEM"""

import ast
import argparse
import os
import shutil
import sys
from ast import (
literal_eval,
)
from digiflow import (
OAIRecordHandler,
OAIRecord,
LocalStore
)

import digiflow as df

import lib.ocrd3_odem as o3o
Expand All @@ -39,10 +33,10 @@
def trnfrm(row):
oai_id = row[RECORD_IDENTIFIER]
try:
_info = literal_eval(row[RECORD_INFO])
_info = ast.literal_eval(row[RECORD_INFO])
except:
_info = row[RECORD_INFO]
_record = OAIRecord(oai_id,)
_record = df.OAIRecord(oai_id,)
_record.info = _info
return _record

Expand Down Expand Up @@ -143,7 +137,7 @@ def trnfrm(row):
DATA_FIELDS = CFG.getlist('global', 'data_fields')
LOGGER.info("data fields: '%s'", DATA_FIELDS)
LOGGER.info("use records from '%s'", OAI_RECORD_FILE)
handler = OAIRecordHandler(
handler = df.OAIRecordHandler(
OAI_RECORD_FILE, data_fields=DATA_FIELDS, transform_func=trnfrm)
record: df.OAIRecord = handler.next_record(state=MARK_OCR_OPEN)
if not record:
Expand All @@ -165,16 +159,15 @@ def wrap_save_record_state(status: str, urn, **kwargs):
proc_type: str = CFG.get('ocr', 'workflow_type', fallback=None)
if proc_type is None:
LOGGER.warning("no 'workflow_type' config option in section 'ocr' defined. defaults to 'OCRD_PAGE_PARALLEL'")
PROCESS: ODEMProcess = ODEMProcess.create(proc_type, record, req_dst_dir, EXECUTORS)

PROCESS.the_logger = LOGGER
PROCESS.the_logger.info("[%s] odem from %s, %d executors", local_ident, OAI_RECORD_FILE, EXECUTORS)
PROCESS.cfg = CFG
odem_process: ODEMProcess = ODEMProcess(record, req_dst_dir)
odem_process.the_logger = LOGGER
odem_process.the_logger.info("[%s] odem from %s, %d executors", local_ident, OAI_RECORD_FILE, EXECUTORS)
odem_process.odem_configuration = CFG
LOCAL_STORE_ROOT = CFG.get('global', 'local_store_root', fallback=None)
if LOCAL_STORE_ROOT is not None:
STORE_DIR = os.path.join(LOCAL_STORE_ROOT, local_ident)
STORE = LocalStore(STORE_DIR, req_dst_dir)
PROCESS.store = STORE
STORE = df.LocalStore(STORE_DIR, req_dst_dir)
odem_process.store = STORE
process_resource_monitor: ProcessResourceMonitor = ProcessResourceMonitor(
ProcessResourceMonitorConfig(
enable_resource_monitoring=CFG.getboolean('resource-monitoring', 'enable', fallback=False),
Expand All @@ -191,67 +184,74 @@ def wrap_save_record_state(status: str, urn, **kwargs):
LOGGER.error,
wrap_save_record_state,
None,
PROCESS.process_identifier,
odem_process.process_identifier,
record.identifier
)
process_resource_monitor.check_vmem()
process_resource_monitor.monit_disk_space(PROCESS.load)
PROCESS.inspect_metadata()
process_resource_monitor.monit_disk_space(odem_process.load)
odem_process.inspect_metadata()
if CFG.getboolean('mets','prevalidate', fallback=True):
PROCESS.validate_metadata()
PROCESS.clear_existing_entries()
PROCESS.language_modelconfig()
PROCESS.set_local_images()
OUTCOMES = process_resource_monitor.monit_vmem(PROCESS.run)
PROCESS.calculate_statistics_ocr(OUTCOMES)
PROCESS.the_logger.info("[%s] %s", local_ident, PROCESS.statistics)
PROCESS.link_ocr()
odem_process.validate_metadata()
odem_process.clear_existing_entries()
odem_process.language_modelconfig()
odem_process.set_local_images()

# NEW NEW NEW
odem_pipeline = o3o.ODEMOCRPipeline.create(proc_type, odem_process)
odem_runner = o3o.ODEMPipelineRunner(local_ident, EXECUTORS, LOGGER, odem_pipeline)
OUTCOMES = process_resource_monitor.monit_vmem(odem_runner.run)
if OUTCOMES is None or len(OUTCOMES) == 0:
raise ODEMException(f"process run error: {record.identifier}")

odem_process.calculate_statistics_ocr(OUTCOMES)
odem_process.the_logger.info("[%s] %s", local_ident, odem_process.statistics)
odem_process.link_ocr()
if CREATE_PDF:
PROCESS.create_pdf()
PROCESS.postprocess_ocr()
odem_process.create_pdf()
odem_process.postprocess_ocr()
if CREATE_PDF:
PROCESS.create_text_bundle_data()
PROCESS.postprocess_mets()
odem_process.create_text_bundle_data()
odem_process.postprocess_mets()
if CFG.getboolean('mets','postvalidate', fallback=True):
PROCESS.validate_metadata()
odem_process.validate_metadata()
if not MUST_KEEP_RESOURCES:
PROCESS.delete_before_export(LOCAL_DELETE_BEVOR_EXPORT)
PROCESS.export_data()
_kwargs = PROCESS.statistics
if PROCESS.record.info != 'n.a.':
odem_process.delete_before_export(LOCAL_DELETE_BEVOR_EXPORT)
odem_process.export_data()
_kwargs = odem_process.statistics
if odem_process.record.info != 'n.a.':
try:
if isinstance(PROCESS.record.info, str):
_info = dict(literal_eval(PROCESS.record.info))
PROCESS.record.info.update(_kwargs)
_info = f"{PROCESS.record.info}"
if isinstance(odem_process.record.info, str):
_info = dict(ast.literal_eval(odem_process.record.info))
odem_process.record.info.update(_kwargs)
_info = f"{odem_process.record.info}"
except:
PROCESS.the_logger.error("Can't parse '%s', store info literally",
PROCESS.record.info)
odem_process.the_logger.error("Can't parse '%s', store info literally",
odem_process.record.info)
_info = f"{_kwargs}"
else:
_info = f"{_kwargs}"
handler.save_record_state(record.identifier, MARK_OCR_DONE, INFO=_info)
_mode = 'sequential' if SEQUENTIAL else f'n_execs:{EXECUTORS}'
PROCESS.the_logger.info("[%s] duration: %s/%s (%s)", PROCESS.process_identifier,
PROCESS.duration, _mode, PROCESS.statistics)
odem_process.the_logger.info("[%s] duration: %s/%s (%s)", odem_process.process_identifier,
odem_process.duration, _mode, odem_process.statistics)
# finale
LOGGER.info("[%s] odem done in '%s' (%d executors)",
PROCESS.process_identifier, PROCESS.duration, EXECUTORS)
odem_process.process_identifier, odem_process.duration, EXECUTORS)
except o3o.ODEMNoTypeForOCRException as type_unknown:
# we don't ocr this one
LOGGER.warning("[%s] odem skips '%s'",
PROCESS.process_identifier, type_unknown.args[0])
odem_process.process_identifier, type_unknown.args[0])
handler.save_record_state(record.identifier, o3o.MARK_OCR_SKIP)
except o3o.ODEMNoImagesForOCRException as not_ocrable:
LOGGER.warning("[%s] odem no ocrables '%s'",
PROCESS.process_identifier, not_ocrable.args)
odem_process.process_identifier, not_ocrable.args)
handler.save_record_state(record.identifier, o3o.MARK_OCR_SKIP)
except ODEMException as _odem_exc:
_err_args = {'ODEMException': _odem_exc.args[0]}
LOGGER.error("[%s] odem fails with: '%s'", PROCESS.process_identifier, _err_args)
LOGGER.error("[%s] odem fails with: '%s'", odem_process.process_identifier, _err_args)
handler.save_record_state(record.identifier, MARK_OCR_FAIL, INFO=f'{_err_args}')
except RuntimeError as exc:
LOGGER.error("odem fails for '%s' after %s with: '%s'",
record, PROCESS.duration, str(exc))
record, odem_process.duration, str(exc))
handler.save_record_state(record.identifier, MARK_OCR_FAIL, INFO=f'{str(exc) : exc.args[0]}')
sys.exit(1)
2 changes: 2 additions & 0 deletions lib/ocrd3_odem/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from .ocrd3_odem import (
OdemWorkflowProcessType,
ODEMProcess,
ODEMOCRPipeline,
ODEMPipelineRunner,
OCRDPageParallel,
ODEMTesseract,
)
Expand Down
Loading

0 comments on commit a3ed84a

Please sign in to comment.