diff --git a/pyproject.toml b/pyproject.toml index e9d8d16..bbe80ed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "folio_data_import" -version = "0.2.8rc7" +version = "0.2.8rc8" description = "A python module to interact with the data importing capabilities of the open-source FOLIO ILS" authors = ["Brooks Travis "] license = "MIT" diff --git a/src/folio_data_import/MARCDataImport.py b/src/folio_data_import/MARCDataImport.py index 7fa560e..d0bd817 100644 --- a/src/folio_data_import/MARCDataImport.py +++ b/src/folio_data_import/MARCDataImport.py @@ -3,6 +3,7 @@ import glob import importlib import io +import logging import os import sys from typing import List @@ -21,7 +22,6 @@ import tabulate from humps import decamelize from tqdm import tqdm -from zmq import has try: @@ -37,6 +37,7 @@ RETRY_TIMEOUT_START = 1 RETRY_TIMEOUT_RETRY_FACTOR = 2 +logger = logging.getLogger(__name__) class MARCImportJob: """ Class to manage importing MARC data (Bib, Authority) into FOLIO using the Change Manager @@ -78,9 +79,11 @@ def __init__( marc_record_preprocessor=None, consolidate=False, no_progress=False, + let_summary_fail=False, ) -> None: self.consolidate_files = consolidate self.no_progress = no_progress + self.let_summary_fail = let_summary_fail self.folio_client: folioclient.FolioClient = folio_client self.import_files = marc_files self.import_profile_name = import_profile_name @@ -113,9 +116,9 @@ async def do_work(self) -> None: "wb+", ) as failed_batches: self.bad_records_file = bad_marc_file - print(f"Writing bad records to {self.bad_records_file.name}") + logger.info(f"Writing bad records to {self.bad_records_file.name}") self.failed_batches_file = failed_batches - print(f"Writing failed batches to {self.failed_batches_file.name}") + logger.info(f"Writing failed batches to {self.failed_batches_file.name}") self.http_client = http_client if self.consolidate_files: self.current_file = self.import_files @@ -136,16 +139,16 @@ async def wrap_up(self) -> None: Returns: None """ - self.bad_records_file.seek(0) - if not self.bad_records_file.read(1): - os.remove(self.bad_records_file.name) - print("No bad records found. Removing bad records file.") - self.failed_batches_file.seek(0) - if not self.failed_batches_file.read(1): - os.remove(self.failed_batches_file.name) - print("No failed batches. Removing failed batches file.") - print("Import complete.") - print(f"Total records imported: {self.total_records_sent}") + with open(self.bad_records_file.name, "rb") as bad_records: + if not bad_records.read(1): + os.remove(bad_records.name) + logger.info("No bad records found. Removing bad records file.") + with open(self.failed_batches_file.name, "rb") as failed_batches: + if not failed_batches.read(1): + os.remove(failed_batches.name) + logger.info("No failed batches. Removing failed batches file.") + logger.info("Import complete.") + logger.info(f"Total records imported: {self.total_records_sent}") async def get_job_status(self) -> None: """ @@ -166,14 +169,17 @@ async def get_job_status(self) -> None: "=PREPARING_FOR_PREVIEW&uiStatusAny=READY_FOR_PREVIEW&uiStatusAny=RUNNING&limit=50" ) self.current_retry_timeout = None - except (httpx.ConnectTimeout, httpx.ReadTimeout): - sleep(.25) - with httpx.Client( - timeout=self.current_retry_timeout, - verify=self.folio_client.ssl_verify - ) as temp_client: - self.folio_client.httpx_client = temp_client - return await self.get_job_status() + except (httpx.ConnectTimeout, httpx.ReadTimeout, httpx.HTTPStatusError): + if not hasattr(e, "response") or e.response.status_code in [502, 504]: + sleep(.25) + with httpx.Client( + timeout=self.current_retry_timeout, + verify=self.folio_client.ssl_verify + ) as temp_client: + self.folio_client.httpx_client = temp_client + return await self.get_job_status() + else: + raise e try: status = [ job for job in job_status["jobExecutions"] if job["id"] == self.job_id @@ -210,7 +216,7 @@ async def create_folio_import_job(self) -> None: try: create_job.raise_for_status() except httpx.HTTPError as e: - print( + logger.error( "Error creating job: " + str(e) + "\n" @@ -257,7 +263,7 @@ async def set_job_profile(self) -> None: try: set_job_profile.raise_for_status() except httpx.HTTPError as e: - print( + logger.error( "Error creating job: " + str(e) + "\n" @@ -313,7 +319,7 @@ async def process_record_batch(self, batch_payload) -> None: self.record_batch = [] self.pbar_sent.update(len(batch_payload["initialRecords"])) else: - print("Error posting batch: " + str(e)) + logger.error("Error posting batch: " + str(e)) for record in self.record_batch: self.failed_batches_file.write(record) self.error_records += len(self.record_batch) @@ -379,18 +385,18 @@ async def apply_marc_record_preprocessing(record: pymarc.Record, func_or_path) - module = importlib.import_module(module_path) func = getattr(module, func_name) except (ImportError, AttributeError) as e: - print(f"Error importing preprocessing function {func_or_path}: {e}. Skipping preprocessing.") + logger.error(f"Error importing preprocessing function {func_or_path}: {e}. Skipping preprocessing.") return record elif callable(func_or_path): func = func_or_path else: - print(f"Invalid preprocessing function: {func_or_path}. Skipping preprocessing.") + logger.warning(f"Invalid preprocessing function: {func_or_path}. Skipping preprocessing.") return record try: return func(record) except Exception as e: - print(f"Error applying preprocessing function: {e}. Skipping preprocessing.") + logger.error(f"Error applying preprocessing function: {e}. Skipping preprocessing.") return record async def create_batch_payload(self, counter, total_records, is_last) -> dict: @@ -479,17 +485,17 @@ async def import_marc_file(self) -> None: columns = columns[:1] + [ " ".join(decamelize(x).split("_")[:-1]) for x in columns[1:] ] - print( + logger.info( f"Results for {'file' if len(self.current_file) == 1 else 'files'}: " f"{', '.join([os.path.basename(x.name) for x in self.current_file])}" ) - print( + logger.info("\n" + tabulate.tabulate( table_data, headers=columns, tablefmt="fancy_grid" ), ) else: - print(f"No job summary available for job {self.job_id}.") + logger.error(f"No job summary available for job {self.job_id}.") self.last_current = 0 self.finished = False @@ -514,7 +520,7 @@ async def get_job_summary(self) -> dict: ) self.current_retry_timeout = None except (httpx.ConnectTimeout, httpx.ReadTimeout, httpx.HTTPStatusError) as e: - if not hasattr(e, "response") or e.response.status_code == 502: + if not hasattr(e, "response") or (e.response.status_code in [502, 504] and not self.let_summary_fail): sleep(.25) with httpx.Client( timeout=self.current_retry_timeout, @@ -522,7 +528,7 @@ async def get_job_summary(self) -> dict: ) as temp_client: self.folio_client.httpx_client = temp_client return await self.get_job_status() - elif hasattr(e, "response") and e.response.status_code == 504: + elif hasattr(e, "response") and (e.response.status_code in [502, 504] and self.let_summary_fail): job_summary = {} else: raise e @@ -536,6 +542,23 @@ async def main() -> None: This function parses command line arguments, initializes the FolioClient, and runs the MARCImportJob. """ + + # Set up logging + file_handler = logging.FileHandler('folio_data_import_{}.log'.format(dt.now().strftime('%Y%m%d%H%M%S'))) + file_handler.setLevel(logging.INFO) + file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + file_handler.setFormatter(file_formatter) + + logging.basicConfig( + level=logging.INFO, + format='%(message)s', + handlers=[ + logging.StreamHandler(sys.stderr), + file_handler + ] + ) + logging.getLogger("httpx").setLevel(logging.WARNING) + parser = argparse.ArgumentParser() parser.add_argument("--gateway_url", type=str, help="The FOLIO API Gateway URL") parser.add_argument("--tenant_id", type=str, help="The FOLIO tenant ID") @@ -592,6 +615,11 @@ async def main() -> None: action="store_true", help="Disable progress bars (eg. for running in a CI environment)", ) + parser.add_argument( + "--let-summary-fail", + action="store_true", + help="Do not retry fetching the final job summary if it fails", + ) args = parser.parse_args() if not args.password: args.password = getpass("Enter FOLIO password: ") @@ -611,10 +639,10 @@ async def main() -> None: marc_files.sort() if len(marc_files) == 0: - print(f"No files found matching {args.marc_file_path}. Exiting.") + logger.critical(f"No files found matching {args.marc_file_path}. Exiting.") sys.exit(1) else: - print(marc_files) + logger.info(marc_files) if not args.import_profile_name: import_profiles = folio_client.folio_get( @@ -646,9 +674,10 @@ async def main() -> None: marc_record_preprocessor=args.preprocessor, consolidate=bool(args.consolidate), no_progress=bool(args.no_progress), + let_summary_fail=bool(args.let_summary_fail), ).do_work() except Exception as e: - print("Error importing files: " + str(e)) + logger.error("Error importing files: " + str(e)) raise diff --git a/src/folio_data_import/SetupLogging.py b/src/folio_data_import/SetupLogging.py new file mode 100644 index 0000000..2961113 --- /dev/null +++ b/src/folio_data_import/SetupLogging.py @@ -0,0 +1,55 @@ +import logging + +class ExcludeLevelFilter(logging.Filter): + def __init__(self, level): + super().__init__() + self.level = level + + def filter(self, record): + return record.levelno != self.level + +class LevelFilter(logging.Filter): + def __init__(self, level): + super().__init__() + self.level = level + + def filter(self, record): + return record.levelno == self.level + +def setup_logging(logfile, errorfile): + DATA_ISSUE_LVL_NUM = 26 + logging.addLevelName(DATA_ISSUE_LVL_NUM, "DATA_ISSUES") + logger = logging.getLogger() + logger.propogate = True + logger.handlers = [] + formatter = logging.Formatter( + "%(asctime)s\t%(levelname)s\t%(message)s" + ) + logging.getLogger("httpx").setLevel(logging.WARNING) + stream_handler = logging.StreamHandler() + stream_handler.addFilter(ExcludeLevelFilter(26)) + logger.setLevel(logging.INFO) + stream_handler.setLevel(logging.INFO) + stream_handler.addFilter(ExcludeLevelFilter(30)) # Exclude warnings from pymarc + stream_handler.setFormatter(formatter) + logger.addHandler(stream_handler) + + file_handler = logging.FileHandler( + filename=logfile, mode="w" + ) + file_handler.setFormatter(formatter) + file_handler.addFilter(ExcludeLevelFilter(26)) + # file_handler.addFilter(LevelFilter(0, 20)) + file_handler.setLevel(logging.INFO) + + logger.addHandler(file_handler) + + # Data issue file formatter + data_issue_file_handler = logging.FileHandler( + filename=errorfile, mode="w" + ) + data_issue_file_handler.setFormatter("%(message)s") + data_issue_file_handler.addFilter(LevelFilter(26)) + data_issue_file_handler.setLevel(26) + logger.addHandler(data_issue_file_handler) + logging.info("Logging set up") diff --git a/src/folio_data_import/UserImport.py b/src/folio_data_import/UserImport.py index a64bdf2..6958ddd 100644 --- a/src/folio_data_import/UserImport.py +++ b/src/folio_data_import/UserImport.py @@ -5,15 +5,15 @@ import json import os import time +import logging import uuid from datetime import datetime as dt from pathlib import Path from typing import Tuple +from folio_data_import.SetupLogging import setup_logging -import aiofiles import folioclient import httpx -from aiofiles.threadpool.text import AsyncTextIOWrapper try: utc = datetime.UTC @@ -44,8 +44,6 @@ def __init__( library_name: str, batch_size: int, limit_simultaneous_requests: asyncio.Semaphore, - logfile: AsyncTextIOWrapper, - errorfile: AsyncTextIOWrapper, http_client: httpx.AsyncClient, user_file_path: Path = None, user_match_key: str = "externalSystemId", @@ -69,8 +67,6 @@ def __init__( self.service_point_map: dict = self.build_ref_data_id_map( self.folio_client, "/service-points", "servicepoints", "code" ) - self.logfile: AsyncTextIOWrapper = logfile - self.errorfile: AsyncTextIOWrapper = errorfile self.http_client: httpx.AsyncClient = http_client self.only_update_present_fields: bool = only_update_present_fields self.default_preferred_contact_type: str = default_preferred_contact_type @@ -237,11 +233,7 @@ async def map_address_types(self, user_obj, line_number) -> None: mapped_addresses.append(address) except KeyError: if address["addressTypeId"] not in self.address_type_map.values(): - print( - f"Row {line_number}: Address type {address['addressTypeId']} not found" - f", removing address" - ) - await self.logfile.write( + logging.info( f"Row {line_number}: Address type {address['addressTypeId']} not found" f", removing address\n" ) @@ -272,11 +264,7 @@ async def map_patron_groups(self, user_obj, line_number) -> None: user_obj["patronGroup"] = self.patron_group_map[user_obj["patronGroup"]] except KeyError: if user_obj["patronGroup"] not in self.patron_group_map.values(): - print( - f"Row {line_number}: Patron group {user_obj['patronGroup']} not found, " - f"removing patron group" - ) - await self.logfile.write( + logging.info( f"Row {line_number}: Patron group {user_obj['patronGroup']} not found in, " f"removing patron group\n" ) @@ -307,11 +295,7 @@ async def map_departments(self, user_obj, line_number) -> None: else: mapped_departments.append(self.department_map[department]) except KeyError: - print( - f'Row {line_number}: Department "{department}" not found, ' # noqa: B907 - f"excluding department from user" - ) - await self.logfile.write( + logging.info( f'Row {line_number}: Department "{department}" not found, ' # noqa: B907 f"excluding department from user\n" ) @@ -417,11 +401,7 @@ async def set_preferred_contact_type(self, user_obj, existing_user) -> None: else: existing_user["personal"]["preferredContactTypeId"] = current_pref_contact if current_pref_contact in PREFERRED_CONTACT_TYPES_MAP else self.default_preferred_contact_type else: - print( - f"Preferred contact type not provided or is not a valid option: {PREFERRED_CONTACT_TYPES_MAP}\n" - f"Setting preferred contact type to {self.default_preferred_contact_type} or using existing value" - ) - await self.logfile.write( + logging.info( f"Preferred contact type not provided or is not a valid option: {PREFERRED_CONTACT_TYPES_MAP}\n" f"Setting preferred contact type to {self.default_preferred_contact_type} or using existing value\n" ) @@ -453,16 +433,12 @@ async def create_or_update_user(self, user_obj, existing_user, protected_fields, self.logs["updated"] += 1 return existing_user except Exception as ee: - print( - f"Row {line_number}: User update failed: " - f"{str(getattr(getattr(ee, 'response', str(ee)), 'text', str(ee)))}" - ) - await self.logfile.write( + logging.info( f"Row {line_number}: User update failed: " f"{str(getattr(getattr(ee, 'response', str(ee)), 'text', str(ee)))}\n" ) - await self.errorfile.write( - json.dumps(existing_user, ensure_ascii=False) + "\n" + logging.error( + json.dumps(existing_user, ensure_ascii=False) ) self.logs["failed"] += 1 return {} @@ -471,16 +447,12 @@ async def create_or_update_user(self, user_obj, existing_user, protected_fields, new_user = await self.create_new_user(user_obj) return new_user except Exception as ee: - print( - f"Row {line_number}: User creation failed: " - f"{str(getattr(getattr(ee, 'response', str(ee)), 'text', str(ee)))}" - ) - await self.logfile.write( + logging.info( f"Row {line_number}: User creation failed: " f"{str(getattr(getattr(ee, 'response', str(ee)), 'text', str(ee)))}\n" ) - await self.errorfile.write( - json.dumps(user_obj, ensure_ascii=False) + "\n" + logging.error( + json.dumps(user_obj, ensure_ascii=False) ) self.logs["failed"] += 1 return {} @@ -566,10 +538,10 @@ async def create_or_update_rp(self, rp_obj, existing_rp, new_user_obj): None """ if existing_rp: - # print(existing_rp) + # await self.update_existing_rp(rp_obj, existing_rp) else: - # print(new_user_obj) + # await self.create_new_rp(new_user_obj) async def create_new_rp(self, new_user_obj): @@ -587,7 +559,7 @@ async def create_new_rp(self, new_user_obj): """ rp_obj = {"holdShelf": True, "delivery": False} rp_obj["userId"] = new_user_obj["id"] - # print(rp_obj) + # response = await self.http_client.post( self.folio_client.okapi_url + "/request-preference-storage/request-preference", @@ -611,7 +583,7 @@ async def update_existing_rp(self, rp_obj, existing_rp) -> None: None """ existing_rp.update(rp_obj) - # print(existing_rp) + # response = await self.http_client.put( self.folio_client.okapi_url + f"/request-preference-storage/request-preference/{existing_rp['id']}", @@ -678,11 +650,7 @@ async def process_line( rp_obj, existing_rp, new_user_obj ) else: - print( - f"Row {line_number}: Creating default request preference object" - f" for {new_user_obj['id']}" - ) - await self.logfile.write( + logging.info( f"Row {line_number}: Creating default request preference object" f" for {new_user_obj['id']}\n" ) @@ -693,8 +661,8 @@ async def process_line( f"{new_user_obj['id']}: " f"{str(getattr(getattr(ee, 'response', ee), 'text', str(ee)))}" ) - print(rp_error_message) - await self.logfile.write(rp_error_message + "\n") + + logging.info(rp_error_message) if not existing_pu: try: await self.create_perms_user(new_user_obj) @@ -704,8 +672,8 @@ async def process_line( f"{new_user_obj['id']}: " f"{str(getattr(getattr(ee, 'response', str(ee)), 'text', str(ee)))}" ) - print(pu_error_message) - await self.logfile.write(pu_error_message + "\n") + + logging.info(pu_error_message) await self.handle_service_points_user(spu_obj, existing_spu, new_user_obj) async def map_service_points(self, spu_obj, existing_user): @@ -731,7 +699,7 @@ async def map_service_points(self, spu_obj, existing_user): else: mapped_service_points.append(self.service_point_map[sp]) except KeyError: - print( + logging.error( f'Service point "{sp}" not found, excluding service point from user: ' f'{self.service_point_map}' ) @@ -748,14 +716,14 @@ async def map_service_points(self, spu_obj, existing_user): else: mapped_sp_id = self.service_point_map[sp_code] if mapped_sp_id not in spu_obj.get('servicePointsIds', []): - print( + logging.error( f'Default service point "{sp_code}" not found in assigned service points, ' 'excluding default service point from user' ) else: spu_obj['defaultServicePointId'] = mapped_sp_id except KeyError: - print( + logging.error( f'Default service point "{sp_code}" not found, excluding default service ' f'point from user: {existing_user["id"]}' ) @@ -853,13 +821,12 @@ async def process_file(self, openfile) -> None: duration = time.time() - start async with self.lock: message = ( - f"{dt.now().isoformat(sep=' ', timespec='milliseconds')}: " f"Batch of {self.batch_size} users processed in {duration:.2f} " f"seconds. - Users created: {self.logs['created']} - Users updated: " f"{self.logs['updated']} - Users failed: {self.logs['failed']}" ) - print(message) - await self.logfile.write(message + "\n") + + logging.info(message) tasks = [] if tasks: start = time.time() @@ -867,13 +834,12 @@ async def process_file(self, openfile) -> None: duration = time.time() - start async with self.lock: message = ( - f"{dt.now().isoformat(sep=' ', timespec='milliseconds')}: " f"Batch of {len(tasks)} users processed in {duration:.2f} seconds. - " f"Users created: {self.logs['created']} - Users updated: " f"{self.logs['updated']} - Users failed: {self.logs['failed']}" ) - print(message) - await self.logfile.write(message + "\n") + + logging.info(message) async def main() -> None: @@ -954,6 +920,7 @@ async def main() -> None: default="002", ) args = parser.parse_args() + report_file_base_path = args.report_file_base_path library_name = args.library_name @@ -977,29 +944,20 @@ async def main() -> None: folio_client.okapi_headers["x-okapi-tenant"] = args.member_tenant_id user_file_path = Path(args.user_file_path) - report_file_base_path = Path(args.report_file_base_path) - log_file_path = ( - report_file_base_path - / f"log_user_import_{dt.now(utc).strftime('%Y%m%d_%H%M%S')}.log" + log_file_path = os.path.join( + report_file_base_path, f"log_user_import_{dt.now(utc).strftime('%Y%m%d_%H%M%S')}.log" ) - error_file_path = ( - report_file_base_path - / f"failed_user_import_{dt.now(utc).strftime('%Y%m%d_%H%M%S')}.txt" + error_file_path = os.path.join( + report_file_base_path, f"failed_user_import_{dt.now(utc).strftime('%Y%m%d_%H%M%S')}.txt" ) - async with aiofiles.open( - log_file_path, - "w", - ) as logfile, aiofiles.open( - error_file_path, "w" - ) as errorfile, httpx.AsyncClient(timeout=None) as http_client: + setup_logging(log_file_path, error_file_path) + async with httpx.AsyncClient(timeout=None) as http_client: try: importer = UserImporter( folio_client, library_name, batch_size, limit_async_requests, - logfile, - errorfile, http_client, user_file_path, args.user_match_key, @@ -1008,8 +966,8 @@ async def main() -> None: ) await importer.do_import() except Exception as ee: - print(f"An unknown error occurred: {ee}") - await logfile.write(f"An error occurred {ee}\n") + + logging.error(f"An error occurred {ee}\n") raise ee