Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "folio_data_import"
version = "0.2.8rc7"
version = "0.2.8rc8"
description = "A python module to interact with the data importing capabilities of the open-source FOLIO ILS"
authors = ["Brooks Travis <[email protected]>"]
license = "MIT"
Expand Down
99 changes: 64 additions & 35 deletions src/folio_data_import/MARCDataImport.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import glob
import importlib
import io
import logging
import os
import sys
from typing import List
Expand All @@ -21,7 +22,6 @@
import tabulate
from humps import decamelize
from tqdm import tqdm
from zmq import has


try:
Expand All @@ -37,6 +37,7 @@
RETRY_TIMEOUT_START = 1
RETRY_TIMEOUT_RETRY_FACTOR = 2

logger = logging.getLogger(__name__)
class MARCImportJob:
"""
Class to manage importing MARC data (Bib, Authority) into FOLIO using the Change Manager
Expand Down Expand Up @@ -78,9 +79,11 @@ def __init__(
marc_record_preprocessor=None,
consolidate=False,
no_progress=False,
let_summary_fail=False,
) -> None:
self.consolidate_files = consolidate
self.no_progress = no_progress
self.let_summary_fail = let_summary_fail
self.folio_client: folioclient.FolioClient = folio_client
self.import_files = marc_files
self.import_profile_name = import_profile_name
Expand Down Expand Up @@ -113,9 +116,9 @@ async def do_work(self) -> None:
"wb+",
) as failed_batches:
self.bad_records_file = bad_marc_file
print(f"Writing bad records to {self.bad_records_file.name}")
logger.info(f"Writing bad records to {self.bad_records_file.name}")
self.failed_batches_file = failed_batches
print(f"Writing failed batches to {self.failed_batches_file.name}")
logger.info(f"Writing failed batches to {self.failed_batches_file.name}")
self.http_client = http_client
if self.consolidate_files:
self.current_file = self.import_files
Expand All @@ -136,16 +139,16 @@ async def wrap_up(self) -> None:
Returns:
None
"""
self.bad_records_file.seek(0)
if not self.bad_records_file.read(1):
os.remove(self.bad_records_file.name)
print("No bad records found. Removing bad records file.")
self.failed_batches_file.seek(0)
if not self.failed_batches_file.read(1):
os.remove(self.failed_batches_file.name)
print("No failed batches. Removing failed batches file.")
print("Import complete.")
print(f"Total records imported: {self.total_records_sent}")
with open(self.bad_records_file.name, "rb") as bad_records:
if not bad_records.read(1):
os.remove(bad_records.name)
logger.info("No bad records found. Removing bad records file.")
with open(self.failed_batches_file.name, "rb") as failed_batches:
if not failed_batches.read(1):
os.remove(failed_batches.name)
logger.info("No failed batches. Removing failed batches file.")
logger.info("Import complete.")
logger.info(f"Total records imported: {self.total_records_sent}")

async def get_job_status(self) -> None:
"""
Expand All @@ -166,14 +169,17 @@ async def get_job_status(self) -> None:
"=PREPARING_FOR_PREVIEW&uiStatusAny=READY_FOR_PREVIEW&uiStatusAny=RUNNING&limit=50"
)
self.current_retry_timeout = None
except (httpx.ConnectTimeout, httpx.ReadTimeout):
sleep(.25)
with httpx.Client(
timeout=self.current_retry_timeout,
verify=self.folio_client.ssl_verify
) as temp_client:
self.folio_client.httpx_client = temp_client
return await self.get_job_status()
except (httpx.ConnectTimeout, httpx.ReadTimeout, httpx.HTTPStatusError):
if not hasattr(e, "response") or e.response.status_code in [502, 504]:
sleep(.25)
with httpx.Client(
timeout=self.current_retry_timeout,
verify=self.folio_client.ssl_verify
) as temp_client:
self.folio_client.httpx_client = temp_client
return await self.get_job_status()
else:
raise e
try:
status = [
job for job in job_status["jobExecutions"] if job["id"] == self.job_id
Expand Down Expand Up @@ -210,7 +216,7 @@ async def create_folio_import_job(self) -> None:
try:
create_job.raise_for_status()
except httpx.HTTPError as e:
print(
logger.error(
"Error creating job: "
+ str(e)
+ "\n"
Expand Down Expand Up @@ -257,7 +263,7 @@ async def set_job_profile(self) -> None:
try:
set_job_profile.raise_for_status()
except httpx.HTTPError as e:
print(
logger.error(
"Error creating job: "
+ str(e)
+ "\n"
Expand Down Expand Up @@ -313,7 +319,7 @@ async def process_record_batch(self, batch_payload) -> None:
self.record_batch = []
self.pbar_sent.update(len(batch_payload["initialRecords"]))
else:
print("Error posting batch: " + str(e))
logger.error("Error posting batch: " + str(e))
for record in self.record_batch:
self.failed_batches_file.write(record)
self.error_records += len(self.record_batch)
Expand Down Expand Up @@ -379,18 +385,18 @@ async def apply_marc_record_preprocessing(record: pymarc.Record, func_or_path) -
module = importlib.import_module(module_path)
func = getattr(module, func_name)
except (ImportError, AttributeError) as e:
print(f"Error importing preprocessing function {func_or_path}: {e}. Skipping preprocessing.")
logger.error(f"Error importing preprocessing function {func_or_path}: {e}. Skipping preprocessing.")
return record
elif callable(func_or_path):
func = func_or_path
else:
print(f"Invalid preprocessing function: {func_or_path}. Skipping preprocessing.")
logger.warning(f"Invalid preprocessing function: {func_or_path}. Skipping preprocessing.")
return record

try:
return func(record)
except Exception as e:
print(f"Error applying preprocessing function: {e}. Skipping preprocessing.")
logger.error(f"Error applying preprocessing function: {e}. Skipping preprocessing.")
return record

async def create_batch_payload(self, counter, total_records, is_last) -> dict:
Expand Down Expand Up @@ -479,17 +485,17 @@ async def import_marc_file(self) -> None:
columns = columns[:1] + [
" ".join(decamelize(x).split("_")[:-1]) for x in columns[1:]
]
print(
logger.info(
f"Results for {'file' if len(self.current_file) == 1 else 'files'}: "
f"{', '.join([os.path.basename(x.name) for x in self.current_file])}"
)
print(
logger.info("\n" +
tabulate.tabulate(
table_data, headers=columns, tablefmt="fancy_grid"
),
)
else:
print(f"No job summary available for job {self.job_id}.")
logger.error(f"No job summary available for job {self.job_id}.")
self.last_current = 0
self.finished = False

Expand All @@ -514,15 +520,15 @@ async def get_job_summary(self) -> dict:
)
self.current_retry_timeout = None
except (httpx.ConnectTimeout, httpx.ReadTimeout, httpx.HTTPStatusError) as e:
if not hasattr(e, "response") or e.response.status_code == 502:
if not hasattr(e, "response") or (e.response.status_code in [502, 504] and not self.let_summary_fail):
sleep(.25)
with httpx.Client(
timeout=self.current_retry_timeout,
verify=self.folio_client.ssl_verify
) as temp_client:
self.folio_client.httpx_client = temp_client
return await self.get_job_status()
elif hasattr(e, "response") and e.response.status_code == 504:
elif hasattr(e, "response") and (e.response.status_code in [502, 504] and self.let_summary_fail):
job_summary = {}
else:
raise e
Expand All @@ -536,6 +542,23 @@ async def main() -> None:
This function parses command line arguments, initializes the FolioClient,
and runs the MARCImportJob.
"""

# Set up logging
file_handler = logging.FileHandler('folio_data_import_{}.log'.format(dt.now().strftime('%Y%m%d%H%M%S')))
file_handler.setLevel(logging.INFO)
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_formatter)

logging.basicConfig(
level=logging.INFO,
format='%(message)s',
handlers=[
logging.StreamHandler(sys.stderr),
file_handler
]
)
logging.getLogger("httpx").setLevel(logging.WARNING)

parser = argparse.ArgumentParser()
parser.add_argument("--gateway_url", type=str, help="The FOLIO API Gateway URL")
parser.add_argument("--tenant_id", type=str, help="The FOLIO tenant ID")
Expand Down Expand Up @@ -592,6 +615,11 @@ async def main() -> None:
action="store_true",
help="Disable progress bars (eg. for running in a CI environment)",
)
parser.add_argument(
"--let-summary-fail",
action="store_true",
help="Do not retry fetching the final job summary if it fails",
)
args = parser.parse_args()
if not args.password:
args.password = getpass("Enter FOLIO password: ")
Expand All @@ -611,10 +639,10 @@ async def main() -> None:
marc_files.sort()

if len(marc_files) == 0:
print(f"No files found matching {args.marc_file_path}. Exiting.")
logger.critical(f"No files found matching {args.marc_file_path}. Exiting.")
sys.exit(1)
else:
print(marc_files)
logger.info(marc_files)

if not args.import_profile_name:
import_profiles = folio_client.folio_get(
Expand Down Expand Up @@ -646,9 +674,10 @@ async def main() -> None:
marc_record_preprocessor=args.preprocessor,
consolidate=bool(args.consolidate),
no_progress=bool(args.no_progress),
let_summary_fail=bool(args.let_summary_fail),
).do_work()
except Exception as e:
print("Error importing files: " + str(e))
logger.error("Error importing files: " + str(e))
raise


Expand Down
55 changes: 55 additions & 0 deletions src/folio_data_import/SetupLogging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import logging

class ExcludeLevelFilter(logging.Filter):
def __init__(self, level):
super().__init__()
self.level = level

def filter(self, record):
return record.levelno != self.level

class LevelFilter(logging.Filter):
def __init__(self, level):
super().__init__()
self.level = level

def filter(self, record):
return record.levelno == self.level

def setup_logging(logfile, errorfile):
DATA_ISSUE_LVL_NUM = 26
logging.addLevelName(DATA_ISSUE_LVL_NUM, "DATA_ISSUES")
logger = logging.getLogger()
logger.propogate = True
logger.handlers = []
formatter = logging.Formatter(
"%(asctime)s\t%(levelname)s\t%(message)s"
)
logging.getLogger("httpx").setLevel(logging.WARNING)
stream_handler = logging.StreamHandler()
stream_handler.addFilter(ExcludeLevelFilter(26))
logger.setLevel(logging.INFO)
stream_handler.setLevel(logging.INFO)
stream_handler.addFilter(ExcludeLevelFilter(30)) # Exclude warnings from pymarc
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)

file_handler = logging.FileHandler(
filename=logfile, mode="w"
)
file_handler.setFormatter(formatter)
file_handler.addFilter(ExcludeLevelFilter(26))
# file_handler.addFilter(LevelFilter(0, 20))
file_handler.setLevel(logging.INFO)

logger.addHandler(file_handler)

# Data issue file formatter
data_issue_file_handler = logging.FileHandler(
filename=errorfile, mode="w"
)
data_issue_file_handler.setFormatter("%(message)s")
data_issue_file_handler.addFilter(LevelFilter(26))
data_issue_file_handler.setLevel(26)
logger.addHandler(data_issue_file_handler)
logging.info("Logging set up")
Loading