diff --git a/README.md b/README.md index 50d5a0d..bdcb842 100644 --- a/README.md +++ b/README.md @@ -171,10 +171,14 @@ ht_indexer/ │ └── monitoring_arguments.py ├── document_retriever_service/ │ ├── __init__.py -│ ├── catalog_metadata.py │ ├── full_text_search_retriever_service.py +│ ├── full_text_search_retriever_service_test.py +│ ├── ht_status_retriever_service.py +│ ├── retriever_arguments.py +│ ├── retriever_services_utils.py +│ ├── retriever_services_utils_test.py.py +│ └── run_retriever_service_by_file_test.py │ └── run_retriever_service_by_file.py -| └──test_document_retriever_service.py ├── ht_utils/ │ ├── __init__.py │ └── sample_data/ @@ -237,6 +241,30 @@ item of a Catalog record will be processed. The Solr query will look like this: `id:100673101` or `ht_id:umn.31951d01828300z` +The Solr terms query parser is used to look up the documents in the Catalog index. +The terms query parser in Solr is a highly efficient way to search for multiple exact values +in a specific field — great for querying by id or any other exact-match field, especially when you're dealing with +large lists. + +The Solr query looks like this: + +``` +{ q:*:*, + fq:{!terms f=ht_id}id1,id2,id3,id4 } + 'rows': 200, + 'wt': 'json' + ``` + +In the query `*:*` is used to match everything (or in the future we could implement the use case to add a specific +query string. The list of ids is used as a filter via `fq` to limit the documents by ht_id. The query avoids +scoring computation, which improves performance. + +The parameter `rows` is used to retrieve documents in batches of 200 IDs each time. +As we have a long list of ht_ids/ids, we recommend splitting the list of documents into chunks +and creating a query batch to avoid the Solr URI too long error (status code 414). +The chunk size equal 200 was determined by testing the Solr query with different values, e.g., 100-500 and +with 200 ht_ids it worked. + ### Use case 1: Generating & indexing one or N items retrieved from Catalog in Full-text search index: The application receives a list of ids and a parameter that indicates if all the items in a record are processed @@ -269,6 +297,7 @@ be one of the following: pending, processing, failed, completed. generator_status=completed, indexer_status=failed. ``` + CREATE TABLE IF NOT EXISTS fulltext_item_processing_status ( ht_id VARCHAR(255) UNIQUE NOT NULL, record_id VARCHAR(255) NOT NULL, @@ -281,6 +310,7 @@ be one of the following: pending, processing, failed, completed. updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, processed_at TIMESTAMP NULL DEFAULT NULL ); + ``` ## Usage @@ -299,20 +329,29 @@ be one of the following: pending, processing, failed, completed. * **Run retriever service** ``` -docker compose exec document_retriever python document_retriever_service/full_text_search_retriever_service.py --list_documents chi.096189208,iau.31858049957305,hvd.32044106262314,chi.096415811,hvd.32044020307005,hvd.32044092647320,iau.31858042938971 --query_field item + +docker compose exec document_retriever python document_retriever_service/full_text_search_retriever_service.py +--list_documents +chi.096189208,iau.31858049957305,hvd.32044106262314,chi.096415811,hvd.32044020307005,hvd.32044092647320,iau.31858042938971 +--query_field item + ``` * **Run retriever service by file** ``` -docker compose exec document_retriever python document_retriever_service/run_retriever_service_by_file.py + +docker compose exec document_retriever python document_retriever_service/run_retriever_service_by_file.py --query_field item --input_document_file document_retriever_service/list_htids_indexer_test.txt + ``` * **Generator service** ``` + docker compose up document_generator -d + ``` This container will automatically start the script `python document_generator/document_generator_service.py` that will @@ -321,7 +360,9 @@ be retrieving the documents from the retriever_queue and will be published a new * **Indexer service** ``` + docker compose up document_indexer -d + ``` This container will automatically start the script `python document_indexer_service/document_indexer_service.py` that @@ -333,7 +374,12 @@ index. * **Retriever service receives a list of document ids** ``` -docker compose exec document_retriever python document_retriever_service/full_text_search_retriever_service.py --list_documents chi.096189208,iau.31858049957305,hvd.32044106262314,chi.096415811,hvd.32044020307005,hvd.32044092647320,iau.31858042938971 --query_field item + +docker compose exec document_retriever python document_retriever_service/full_text_search_retriever_service.py +--list_documents +chi.096189208,iau.31858049957305,hvd.32044106262314,chi.096415811,hvd.32044020307005,hvd.32044092647320,iau.31858042938971 +--query_field item + ``` * **Generator service running locally** diff --git a/conftest.py b/conftest.py index 0bb6f86..093e8db 100644 --- a/conftest.py +++ b/conftest.py @@ -3,12 +3,15 @@ import os from catalog_metadata.catalog_metadata import CatalogRecordMetadata, CatalogItemMetadata -from document_retriever_service.catalog_retriever_service import CatalogRetrieverService from ht_queue_service.queue_consumer import QueueConsumer from ht_queue_service.queue_producer import QueueProducer +from ht_utils.ht_utils import get_solr_url current = os.path.dirname(__file__) +@pytest.fixture +def get_retriever_service_solr_parameters(): + return {'q': '*:*','rows': 10, 'wt': 'json'} # Fixtures to retrieve the catalog record # Retrieve JSON file to create a dictionary with a catalog record @@ -31,16 +34,9 @@ def get_catalog_record_metadata(get_record_data): def get_item_metadata(get_record_data: dict, get_catalog_record_metadata: CatalogRecordMetadata): return CatalogItemMetadata("mdp.39015078560292", get_catalog_record_metadata) - -# CatalogRetrieverService object to retrieve the catalog record -@pytest.fixture -def get_catalog_retriever_service(solr_api_url): - return CatalogRetrieverService(solr_api_url) - - @pytest.fixture -def solr_api_url(): - return "http://solr-sdr-catalog:9033/solr/catalog/" +def solr_catalog_url(): + return get_solr_url().strip('/') # Fixtures to instantiate the queue consumer and producer diff --git a/docker-compose.yml b/docker-compose.yml index 04aed23..e08df07 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -200,6 +200,8 @@ services: - .:/app - ../tmp:/tmp command: [ "python", "-m", "pytest" ] + environment: + SOLR_URL: http://solr-sdr-catalog:9033/solr/catalog/ depends_on: solr-sdr-catalog: condition: service_healthy diff --git a/document_retriever_service/catalog_retrieve_service_test.py b/document_retriever_service/catalog_retrieve_service_test.py deleted file mode 100644 index 65903f6..0000000 --- a/document_retriever_service/catalog_retrieve_service_test.py +++ /dev/null @@ -1,90 +0,0 @@ -import pytest - -from document_retriever_service.catalog_retriever_service import (CatalogRetrieverService, - create_catalog_object_by_item_id) - - -@pytest.fixture -def get_catalog_retriever_service_solr_fake_solr_url(): - return CatalogRetrieverService("http://solr-sdr-catalog:9033/solr/catalogFake/") - - -class TestCatalogRetrieverService: - - def test_create_catalog_object_by_item_id(self, get_catalog_record_metadata, - get_record_data): - """Test if the method returns only the metadata of the input item""" - results = [] - # Create the list - list_documents = ["mdp.39015078560292"] - create_catalog_object_by_item_id(list_documents, get_record_data, get_catalog_record_metadata, results) - - assert len(results) == 1 - assert results[0].ht_id == "mdp.39015078560292" - assert results[0].metadata.get("vol_id") == "mdp.39015078560292" - - def test_retrieve_documents_by_item(self, get_catalog_retriever_service): - """Use case: Receive a list of items (ht_id) to index and retrieve the metadata from Catalog - We want to index only the item that appear in the list and not all the items of each record. - """ - list_documents = ['nyp.33433082002258', 'nyp.33433082046495', 'nyp.33433082046503', 'nyp.33433082046529', - 'nyp.33433082046537', 'nyp.33433082046545', 'nyp.33433082067798', 'nyp.33433082067806', - 'nyp.33433082067822'] - by_field = 'item' - - results = get_catalog_retriever_service.retrieve_documents(list_documents, 0, 10, by_field) - assert len(results) == 9 - - def test_retrieve_documents_by_record(self, get_catalog_retriever_service): - """Use case: Receive one record to process all their items""" - - list_documents = ["008394936"] - by_field = 'record' - - results = get_catalog_retriever_service.retrieve_documents(list_documents, 0, 10, by_field) - assert len(results) == 4 - - def test_retrieve_documents_by_item_only_one(self, get_catalog_retriever_service): - """Use case: Retrieve only the metadata of the item given by parameter""" - list_documents = ['nyp.33433082002258'] - by_field = 'item' - - results = get_catalog_retriever_service.retrieve_documents(list_documents, 0, 10, by_field) - assert len(results) == 1 - - def test_retrieve_documents_by_record_list_records(self, get_catalog_retriever_service): - """Use case: Receive one record to process all their items""" - - list_documents = ["008394936", "100393743"] - by_field = 'record' - - results = get_catalog_retriever_service.retrieve_documents(list_documents, 0, 10, by_field) - assert len(results) == 5 - - def test_retrieve_documents_empty_result(self, get_catalog_retriever_service): - """Use case: Check of the results list is empty because the input item is not in Solr""" - list_documents = ["this_id_does_not_exist_in_solr"] - by_field = 'item' - - results = get_catalog_retriever_service.retrieve_documents(list_documents, 0, 10, by_field) - assert len(results) == 0 - - def test_count_documents_zero_documents(self, get_catalog_retriever_service): - """Test if the count_documents method returns 0 when the documents in the list are not in solr""" - list_documents = ["this_id_does_not_exist_in_solr"] - by_field = 'item' - - total_documents = get_catalog_retriever_service.count_documents(list_documents, 0, 10, by_field) - assert total_documents == 0 - - def test_count_documents_solr_is_not_working(self, get_catalog_retriever_service_solr_fake_solr_url): - """Use case: Count the number of documents in Catalog""" - list_documents = ['nyp.33433082002258'] - by_field = 'item' - - with pytest.raises(Exception): - total_documents = get_catalog_retriever_service_solr_fake_solr_url.count_documents(list_documents, - 0, - 10, - by_field) - assert total_documents is None diff --git a/document_retriever_service/catalog_retriever_service.py b/document_retriever_service/catalog_retriever_service.py deleted file mode 100644 index da02051..0000000 --- a/document_retriever_service/catalog_retriever_service.py +++ /dev/null @@ -1,204 +0,0 @@ -import argparse -import json -import time - -from catalog_metadata.catalog_metadata import CatalogRecordMetadata, CatalogItemMetadata -from document_retriever_service.retriever_arguments import SOLR_TOTAL_ROWS, SOLR_ROW_START - -from ht_indexer_api.ht_indexer_api import HTSolrAPI -from ht_utils.ht_logger import get_ht_logger -from ht_utils.query_maker import make_query - -logger = get_ht_logger(name=__name__) - - -def create_catalog_object_by_record_id(record: dict, catalog_record_metadata: CatalogRecordMetadata, - results: list) -> list[CatalogItemMetadata]: - """Receive a record and return a list of item, and their metadata - :param record: dict with catalog record (retrieve from Solr) - :param catalog_record_metadata: CatalogRecordMetadata object - :param results: list of CatalogItemMetadata object - """ - - for item_id in record.get('ht_id'): - results.append(CatalogRetrieverService.get_catalog_object(item_id, catalog_record_metadata)) - - return results - - -def create_catalog_object_by_item_id(list_documents: list, record: dict, - catalog_record_metadata: CatalogRecordMetadata, results: list) \ - -> list[CatalogItemMetadata]: - """Receive a list of documents and a catalog record; - Search for the item (ht_id) in the list and then; - Create the CatalogMetadata object for each document in the list - :param list_documents: list of ht_id - :param record: dict with catalog record (retrieve from Solr) - :param catalog_record_metadata: CatalogRecordMetadata object - :param results: list of CatalogItemMetadata object - """ - - for item_id in record.get('ht_id'): - if item_id in list_documents: - results.append(CatalogRetrieverService.get_catalog_object(item_id, catalog_record_metadata)) - return results - - -class CatalogRetrieverService: - """ - This class is used to retrieve the documents from the Catalog index - It uses the HTSolrAPI to retrieve the documents from the Catalog index - It accepts queries considering the field 'item' or 'record'. The default field is 'item' - item is used to retrieve at ht_id level - record is used to retrieve at id level, that means all the ht_id from a record - """ - - def __init__(self, catalog_api=None): - - self.catalog_api = HTSolrAPI(catalog_api) - - @staticmethod - def get_catalog_object(item_id: str, - record_metadata: CatalogRecordMetadata) -> CatalogItemMetadata: - - catalog_item_metadata = CatalogItemMetadata(item_id, record_metadata) - return catalog_item_metadata - - def count_documents(self, list_documents, start, rows, by_field: str = 'item'): - - """ - This method counts the number of documents to process having the list of documents - :param list_documents: list of ids to process - :param start: start index - :param rows: number of rows to retrieve - :param by_field: field to search by (item=ht_id or record=id) - """ - # Build the query to retrieve the total of documents to process - query = make_query(list_documents, by_field) - - try: - response = self.catalog_api.get_documents(query=query, response_format="json", start=start, rows=rows) - output = response.json() - - total_records = output.get("response").get("numFound") - logger.info(f"Process=retrieving: Total of records {total_records} to process.") - - return total_records if total_records else 0 - except Exception as e: - logger.error(f"Error in getting documents from Solr {e}") - raise e - - def retrieve_documents_from_solr(self, list_documents: list, start: int, rows: int, by_field: str = 'item'): - - """Function to retrieve documents from Solr - :param list_documents: list of documents to retrieve from Solr index - :param start: start index - :param rows: number of rows to retrieve - :param by_field: field to search by (item or record) - """ - - # try ... except block to catch any exception raised by the Solr connection - try: - response = self.catalog_api.get_documents( - query=make_query(list_documents, by_field), start=start, rows=rows - ) - - output = json.loads(response.content.decode("utf-8")) - - except Exception as e: - logger.error(f"Error in getting documents from Solr {e}") - raise e - - return output - - def retrieve_documents(self, list_documents: list[str], start: int, rows: int, by_field: str = 'item') \ - -> list[CatalogItemMetadata]: - - """ - This method is used to retrieve the documents from the Catalog. For each document is generated a CatalogMedata - object. The output of this method is a list of CatalogMetadata objects - :param list_documents: list of documents to retrieve - :param start: start index - :param rows: number of rows to retrieve - :param by_field: use this field to create the Solr query. item=ht_id and record=id - Create the query, if only one item, so the query will be ht_id: item_id - if more than one item, the query will be ht_id: (item_id1 OR item_id2 OR item_id3) - """ - - count_records = 0 - - results = [] - - # Query Solr to retrieve the documents - output = self.retrieve_documents_from_solr(list_documents, start, rows, by_field) - - # If no documents are found, output.get("response").get("docs") is an empty list - logger.info(f" {output.get('response').get('numFound')} documents found in Solr to process") - for record in output.get("response").get("docs"): - count_records = count_records + 1 - - catalog_record_metadata = CatalogRecordMetadata(record) - start_time = time.time() - if by_field == 'item': - # Validate query field = ht_id, list_documents could contain 1 or more items, but they probably are from - # different records - # Process a specific item of a record - results = create_catalog_object_by_item_id(list_documents, record, catalog_record_metadata, results) - # This is the most efficient way to retrieve the items from Catalog - else: - # Process all the items of a record - results = create_catalog_object_by_record_id(record, catalog_record_metadata, results) - - logger.info(f"Time to retrieve document metadata {time.time() - start_time}") - logger.info(f"Batch documents {count_records}") - start += rows - logger.info(f"Result length {len(results)}") - return results - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument( - "--solr_url", - help="Solr server URL", - required=True, - default="http://localhost:8983/solr/core-x/", - ) - - parser.add_argument("--query", help="Query used to retrieve documents", default='*:*' - ) - - parser.add_argument("--output_file", - help="Path of the file to load the list of ht_id.", - required=False, - default="../items_list.txt" - ) - - # Use case: Given a query, generate a list of ht_id from Catalog index - args = parser.parse_args() - - solr_api_catalog = HTSolrAPI(url=args.solr_url) - - catalog_retrieval_service = CatalogRetrieverService(solr_api_catalog) - - count = 0 - # TODO Parallelize the process of retrieving documents from solr Catalog - file_object = open(args.output_file, "w+") - - start = SOLR_ROW_START - rows = SOLR_TOTAL_ROWS - - # TODO Implement the use case that retrieve documents accept a query instead of a list of documents - list_documents = [] - for result in catalog_retrieval_service.retrieve_documents(list_documents, start, rows, by_field='record'): - item_id = result.ht_id - logger.info(f"Item id: {item_id}") - file_object.write(f"{item_id}\n") - count = count + 1 - - file_object.close() - logger.info(count) - - -if __name__ == "__main__": - main() diff --git a/document_retriever_service/full_text_search_retriever_service.py b/document_retriever_service/full_text_search_retriever_service.py index f894e76..9b97c30 100644 --- a/document_retriever_service/full_text_search_retriever_service.py +++ b/document_retriever_service/full_text_search_retriever_service.py @@ -1,13 +1,19 @@ import argparse +import copy import inspect +import json import os import sys import time import multiprocessing +import requests + +from ht_indexer_api.ht_indexer_api import HTSolrAPI + import ht_utils.ht_utils -from catalog_metadata.catalog_metadata import CatalogItemMetadata -from document_retriever_service.catalog_retriever_service import CatalogRetrieverService +from catalog_metadata.catalog_metadata import CatalogItemMetadata, CatalogRecordMetadata +from document_retriever_service.retriever_services_utils import RetrieverServicesUtils from document_retriever_service.retriever_arguments import RetrieverServiceArguments from ht_indexer_monitoring.ht_indexer_tracktable import HT_INDEXER_TRACKTABLE, PROCESSING_STATUS_TABLE_NAME from ht_queue_service.queue_connection import MAX_DOCUMENT_IN_QUEUE @@ -15,6 +21,7 @@ from ht_utils.ht_logger import get_ht_logger from ht_queue_service.queue_producer import QueueProducer from ht_utils.ht_mysql import get_mysql_conn +from ht_utils.query_maker import make_solr_term_query logger = get_ht_logger(name=__name__) @@ -25,18 +32,12 @@ PROCESSES = multiprocessing.cpu_count() - 1 WAITING_TIME_QUEUE_PRODUCER = 180 # Wait 3 minutes to send documents in the queue WAITING_TIME_MYSQL = 60 # Wait 1 minute to query MySQL checking if there are documents to process (retriever_status = pending) - - -def publish_document(queue_producer: QueueProducer, content: dict = None): - """ - Publish the document in a queue - :param queue_producer: QueueProducer object - :param content: dict with the content of the message - """ - message = content - logger.info(f"Sending message with id {content.get('ht_id')} to queue {queue_producer.queue_name}") - queue_producer.publish_messages(message) - +MYSQL_COLUMN_UPDATE = 'retriever_status' +SUCCESS_UPDATE_STATUS = f"UPDATE {PROCESSING_STATUS_TABLE_NAME} SET status = %s, {MYSQL_COLUMN_UPDATE} = %s, processed_at = %s WHERE ht_id = %s" +FAILURE_UPDATE_STATUS = f"UPDATE {PROCESSING_STATUS_TABLE_NAME} SET status = %s, {MYSQL_COLUMN_UPDATE} = %s, processed_at = %s, error = %s WHERE ht_id = %s" +PARALLELIZE = False +SOLR_BATCH_SIZE = 200 # The chunk size is 200, because Solr will fail with the status code 414. The chunk size was determined +# by testing the Solr query with different values (e.g., 100-500 and with 200 ht_ids it worked. class FullTextSearchRetrieverQueueService: """ @@ -49,17 +50,25 @@ class FullTextSearchRetrieverQueueService: All the entries are published in a queue """ - def __init__(self, - solr_api_url, - queue_name: str = 'retriever_queue', queue_host: str = None, + def __init__(self, queue_name: str = 'retriever_queue', queue_host: str = None, queue_user: str = None, - queue_password: str = None): + queue_password: str = None, + solr_host: str = None, + solr_user: str = None, + solr_password: str = None, + solr_retriever_query_params: dict = None + ): + - self.solr_api_url = solr_api_url self.queue_name = queue_name self.queue_host = queue_host self.queue_user = queue_user self.queue_password = queue_password + self.solr_host = solr_host + self.solr_user = solr_user + self.solr_password = solr_password + self.solr_retriever_query_params = solr_retriever_query_params + def get_queue_producer(self) -> QueueProducer | None: @@ -87,33 +96,11 @@ def generate_metadata(record: CatalogItemMetadata) -> tuple[dict, str]: return item_metadata, item_id - @staticmethod - def count_documents(catalog_retriever, initial_documents, start, rows, by_field: str = 'item'): - - """Count the number of documents retrieved from the Catalog - param catalog_retriever: CatalogRetrieverService object - param initial_documents: list of documents to process - param start: start index - param rows: number of rows to retrieve - param by_field: field to search by (item=ht_id or record=id) - """ - - try: - total_documents = catalog_retriever.count_documents(initial_documents, start, rows, by_field) - except Exception as e: - error_info = ht_utils.ht_utils.get_general_error_message("FullTextSearchRetrieverQueueService", - e) - logger.error(f"Error in getting documents from Solr {error_info}") - exit(1) - return total_documents - @staticmethod def publishing_documents(queue_producer, result, mysql_db): processed_items = [] failed_items = [] - processed_update_query = f"UPDATE {PROCESSING_STATUS_TABLE_NAME} SET status = %s, retriever_status = %s, processed_at = %s WHERE ht_id = %s" - failed_update_query = f"UPDATE {PROCESSING_STATUS_TABLE_NAME} SET status = %s, retriever_status = %s, processed_at = %s, error = %s WHERE ht_id = %s" for record in result: @@ -122,7 +109,7 @@ def publishing_documents(queue_producer, result, mysql_db): # Try to publish the document in the queue, if an error occurs, log the error and continue to the next try: - publish_document(queue_producer, item_metadata) + RetrieverServicesUtils.publish_document(queue_producer, item_metadata) processed_items.append(('processing', 'completed', ht_utils.ht_utils.get_current_time(), item_id)) except Exception as e: @@ -138,71 +125,130 @@ def publishing_documents(queue_producer, result, mysql_db): # Update the status of the items in MySQL table if len(failed_items)>0: - mysql_db.update_status(failed_update_query, failed_items) + mysql_db.update_status(FAILURE_UPDATE_STATUS, failed_items) if len(processed_items)>0: logger.info(f"Total of processed documents: {len(processed_items)}") - mysql_db.update_status(processed_update_query, processed_items) + mysql_db.update_status(SUCCESS_UPDATE_STATUS, processed_items) + + def retrieve_documents_from_solr(self, solr_query: str, solr_retriever) -> requests.Response: + + """Function to retrieve documents from Solr + :param solr_query: + :param solr_retriever: HTSolrAPI object + :return: response from Solr + """ + + chunk_solr_params = copy.deepcopy(self.solr_retriever_query_params) + + chunk_solr_params['fq'] = solr_query + + response = solr_retriever.send_solr_request( + solr_host=f"{self.solr_host}/query", + solr_params=chunk_solr_params + ) + if response.status_code != 200: + logger.error(f"Error {response.status_code} in query: {solr_query}") + raise requests.exceptions.RequestException(f"Error {response.status_code} in query: {solr_query}") + return response + + @staticmethod + def generate_chunk_metadata(chunk: list, solr_output: dict, by_field: str = 'item') -> list[ + CatalogItemMetadata] | None: + """Generate the metadata for the documents + + :param chunk: list of documents to process + :param solr_output: response from Solr + :param by_field: field to search by (item=ht_id or record=id) + :return: list of metadata for the documents + """ + + record_metadata_list = [] + for record in solr_output.get("response").get("docs"): + + # Create the object to create items and metadata. + catalog_record_metadata = CatalogRecordMetadata(record) + + # If there is something with Solr retrieving a chunk of documents will try to retrieve the next chunk + try: + if by_field == 'item': + # Validate query field = ht_id, list_documents could contain 1 or more items, but they probably are from + # different records + # Process a specific item of a record + results = RetrieverServicesUtils.create_catalog_object_by_item_id(chunk, record, catalog_record_metadata) + # This is the most efficient way to retrieve the items from Catalog + else: + # Process all the items of a record + results = RetrieverServicesUtils.create_catalog_object_by_record_id(record, catalog_record_metadata) + + record_metadata_list.extend(results) + except Exception as e: + error_info = ht_utils.ht_utils.get_general_error_message("FullTextSearchRetrieverQueueService", + e) + logger.error(f"Error in getting documents from Solr {error_info}") + return record_metadata_list - def full_text_search_retriever_service(self, initial_documents, start, rows, by_field: str = 'item'): + def full_text_search_retriever_service(self, initial_documents, by_field: str = 'item'): """ This method is used to retrieve the documents from the Catalog and generate the full text search entry If the Solr is not available, an error will be raised, and the process will be stopped + + We run Solr queries in batch + Each batch will retrieve 200 documents because Solr will fail with the status code 414 + if the URI is too Long. """ - # Create a connection to Solr Api - catalog_retriever = CatalogRetrieverService(self.solr_api_url) + # Create a connection to the MySQL database mysql_db = get_mysql_conn(pool_size=1) # Create a connection to the queue to produce messages queue_producer = self.get_queue_producer() - total_documents = FullTextSearchRetrieverQueueService.count_documents(catalog_retriever, initial_documents, - start, rows, by_field) - count_records = 0 - while count_records < total_documents: + solr_retriever = HTSolrAPI(self.solr_host, self.solr_user, self.solr_password) - chunk = initial_documents[count_records:count_records + rows] + # As we have a long list of ht_ids/ids, the recommendation is to split the list of documents into chunks + # and create query batch to avoid the Solr URI too long error. + # The chunk size is 200, because Solr will fail with the status code 414. The chunk size was determined + # by testing the Solr query with different values (e.g., 100-500 and with 200 ht_ids it worked. - # If there is something with Solr retrieving a chunk of documents will try to retrieve the next chunk - try: - result = catalog_retriever.retrieve_documents(chunk, start, rows, by_field=by_field) - except Exception as e: - error_info = ht_utils.ht_utils.get_general_error_message("FullTextSearchRetrieverQueueService", - e) - logger.error(f"Error in getting documents from Solr {error_info}") - continue + # Create chunk of documents to process according to the Solr query batch size + for chunk in ht_utils.ht_utils.split_into_batches(initial_documents, SOLR_BATCH_SIZE): + + # Build the query to retrieve the total of documents to process + query = make_solr_term_query(chunk, by_field) + + # Retrieve the documents from Solr + response = self.retrieve_documents_from_solr(query, solr_retriever) + output = json.loads(response.content.decode("utf-8")) + + # Generate the metadata for the documents + start_time = time.time() + record_metadata_list = FullTextSearchRetrieverQueueService.generate_chunk_metadata(chunk, output, by_field) + + logger.info(f"Metadata generator: Total items = {len(record_metadata_list)}.") + logger.info(f"Metadata generator: Total time = {time.time() - start_time}") # Publish the documents in the queue - FullTextSearchRetrieverQueueService.publishing_documents(queue_producer, result, mysql_db) + FullTextSearchRetrieverQueueService.publishing_documents(queue_producer, record_metadata_list, mysql_db) - count_records += len(result) - logger.info(f"Total of processed items {count_records}") -def run_retriever_service(parallelize, num_threads, total_documents, list_documents, by_field, document_indexer_service, - start, rows): +def run_retriever_service(list_documents, by_field, document_retriever_service): """ Run the retriever service - :param parallelize: - :param num_threads: - :param total_documents: :param list_documents: :param by_field: - :param document_indexer_service: - :param start: - :param rows: + :param document_retriever_service: """ - if parallelize: + total_documents = len(list_documents) - if num_threads: - n_cores = num_threads - else: - n_cores = multiprocessing.cpu_count() + if PARALLELIZE: + + n_cores = multiprocessing.cpu_count() # The number of MySQL connections is equal to batch_size - if total_documents: + if total_documents > 0: batch_size = round(total_documents / n_cores + 0.5) else: logger.info("Nothing to process") @@ -210,9 +256,9 @@ def run_retriever_service(parallelize, num_threads, total_documents, list_docume start_time = time.time() - processes = [multiprocessing.Process(target=document_indexer_service.full_text_search_retriever_service, + processes = [multiprocessing.Process(target=document_retriever_service.full_text_search_retriever_service, args=(list_documents[i:i + batch_size], - start, rows, by_field)) + by_field)) for i in range(0, total_documents, batch_size)] for process in processes: @@ -224,14 +270,11 @@ def run_retriever_service(parallelize, num_threads, total_documents, list_docume logger.info(f"Process=retrieving: Total time to retrieve a batch documents {time.time() - start_time:.10f}") else: - document_indexer_service.full_text_search_retriever_service( + document_retriever_service.full_text_search_retriever_service( list_documents, - start, - rows, by_field ) - def main(): parser = argparse.ArgumentParser() @@ -243,28 +286,36 @@ def main(): sys.exit(1) document_retriever_service = FullTextSearchRetrieverQueueService( - init_args_obj.solr_api_url, init_args_obj.queue_name, init_args_obj.queue_host, init_args_obj.queue_user, - init_args_obj.queue_password) + init_args_obj.queue_password, + init_args_obj.solr_host, + init_args_obj.solr_user, + init_args_obj.solr_password, + init_args_obj.solr_retriever_query_params + ) # by_field is use to define the type of query to retrieve the documents (by item or by record). - # by_field = item => MySQL query will return the ht_id of the items - # by_field = record => MySQL query will return the id of the records - # To retrieve documents from Catalog the field is also used to define the type of query + # From MySQL table we will always return the ht_id and record_id + # To retrieve documents from Catalog the field is used to define the type of query by_field = init_args_obj.query_field - list_documents = init_args_obj.list_documents - if len(list_documents) == 0: + if len(init_args_obj.list_documents) > 0: + + # If the list of documents is provided, the process will run only for the documents in the list + list_ids = RetrieverServicesUtils.extract_ids_from_documents(init_args_obj.list_documents, by_field) + logger.info(f"Process=retrieving: Total of documents to process {len(list_ids)}") + run_retriever_service(list_ids, by_field, document_retriever_service) + else: # If the table does not exist, stop the process if not init_args_obj.db_conn.table_exists(PROCESSING_STATUS_TABLE_NAME): logger.error(f"{PROCESSING_STATUS_TABLE_NAME} does not exist") init_args_obj.db_conn.create_table(HT_INDEXER_TRACKTABLE) - # The process will run every 5 minutes to check if there are documents to process + # The process will run every 5 minutes to check if there are documents to process (retriever_status = pending) while True: total_time_waiting = 0 if total_time_waiting > 0: @@ -275,23 +326,11 @@ def main(): time.sleep(WAITING_TIME_MYSQL) continue else: - if by_field == 'record': - list_documents = [record['record_id'] for record in list_documents] - - if by_field == 'item': - list_documents = [record['ht_id'] for record in list_documents] - - logger.info(f"Process=retrieving: Total of documents to process {len(list_documents)}") - parallelize = True - - # TODO: Define the number of threads to use - nthreads = None - - total_documents = len(list_documents) - run_retriever_service(parallelize, nthreads, total_documents, list_documents, by_field, document_retriever_service, - init_args_obj.start, init_args_obj.rows) + list_ids = RetrieverServicesUtils.extract_ids_from_documents(list_documents, by_field) + logger.info(f"Process=retrieving: Total of documents to process {len(list_ids)}") + run_retriever_service(list_ids, by_field, document_retriever_service) # Checking the number of messages in the queue # Create a connection to the queue to produce messages diff --git a/document_retriever_service/full_text_search_retriever_service_test.py b/document_retriever_service/full_text_search_retriever_service_test.py index b15027e..831d58e 100644 --- a/document_retriever_service/full_text_search_retriever_service_test.py +++ b/document_retriever_service/full_text_search_retriever_service_test.py @@ -1,32 +1,54 @@ import pytest +import json from document_retriever_service.full_text_search_retriever_service import FullTextSearchRetrieverQueueService +from ht_indexer_api.ht_indexer_api import HTSolrAPI +from ht_utils.query_maker import make_solr_term_query @pytest.fixture -def get_document_retriever_service(solr_api_url): - return FullTextSearchRetrieverQueueService(solr_api_url, +def get_catalog_retriever_service_solr_fake_solr_url(): + return HTSolrAPI("http://solr-sdr-catalog:9033/solr/catalogFake/", 'solr_user', 'solr_password') + +@pytest.fixture +def get_solr_request(solr_catalog_url): + return HTSolrAPI(solr_catalog_url, 'solr_user', 'solr_password') + +@pytest.fixture +def get_document_retriever_service(solr_catalog_url, get_retriever_service_solr_parameters): + return FullTextSearchRetrieverQueueService( "test_producer_queue", "rabbitmq", "guest", - "guest" + "guest", + solr_catalog_url, + 'solr_user', + 'solr_password', + get_retriever_service_solr_parameters ) class TestFullTextRetrieverService: - def test_full_text_service_count_documents(self, get_catalog_retriever_service): + def test_full_text_retriever_service_query(self): + """Use case: Check if the Solr query is created correctly""" list_documents = ['nyp.33433082002258', 'not_exist_document'] + query = make_solr_term_query(list_documents, by_field="item") + assert query == """{!terms f=ht_id}nyp.33433082002258,not_exist_document""" - count_docs = FullTextSearchRetrieverQueueService.count_documents(get_catalog_retriever_service, - list_documents, - 0, - 1, - "item") + def test_full_text_service_retrieve_documents_from_solr(self, get_document_retriever_service, get_solr_request): + """Use case: Check if the documents are retrieved from Solr""" + list_documents = ['nyp.33433082002258', 'not_exist_document'] - assert 1 == count_docs + query = make_solr_term_query(list_documents, by_field="item") + + response = get_document_retriever_service.retrieve_documents_from_solr(query,get_solr_request) + + output = json.loads(response.content.decode("utf-8")) + assert output.get("response").get("numFound") == 1 def test_generate_metadata(self, get_item_metadata): + """Use case: Generate the metadata of the item to be indexed""" list_documents = ['mdp.39015078560292'] metadata, item_id = FullTextSearchRetrieverQueueService.generate_metadata(get_item_metadata) @@ -39,11 +61,10 @@ def test_generate_metadata(self, get_item_metadata): "queue_name": "test_producer_queue", "requeue_message": False, "query_field": "item", - "start": 0, - "rows": 100, "batch_size": 1}]) def test_full_text_search_retriever_service(self, retriever_parameters, get_document_retriever_service, consumer_instance): + """ Use case: Check if the message is sent to the queue""" # Clean up the queue consumer_instance.conn.ht_channel.queue_purge(consumer_instance.queue_name) @@ -51,8 +72,6 @@ def test_full_text_search_retriever_service(self, retriever_parameters, get_docu get_document_retriever_service.full_text_search_retriever_service( list_documents, - retriever_parameters["start"], - retriever_parameters["rows"], retriever_parameters["query_field"] ) @@ -60,3 +79,107 @@ def test_full_text_search_retriever_service(self, retriever_parameters, get_docu # Clean up the queue consumer_instance.conn.ht_channel.queue_purge(consumer_instance.queue_name) + + def test_retrieve_documents_by_item(self, get_document_retriever_service, get_solr_request): + """Use case: Receive a list of items (ht_id) to index and retrieve the metadata from Catalog + We want to index only the item that appear in the list and not all the items of each record. + """ + list_documents = ['nyp.33433082002258', 'nyp.33433082046495', 'nyp.33433082046503', 'nyp.33433082046529', + 'nyp.33433082046537', 'nyp.33433082046545', 'nyp.33433082067798', 'nyp.33433082067806', + 'nyp.33433082067822'] + by_field = 'item' + + # Build the query to retrieve the total of documents to process + query = make_solr_term_query(list_documents, by_field) + + # Retrieve the documents from Solr + response = get_document_retriever_service.retrieve_documents_from_solr(query, get_solr_request) + output = json.loads(response.content.decode("utf-8")) + + # Generate the metadata for the documents + record_metadata_list = FullTextSearchRetrieverQueueService.generate_chunk_metadata(list_documents, output, by_field) + + assert len(record_metadata_list) == 9 + + def test_retrieve_documents_by_record(self, get_document_retriever_service, get_solr_request): + """Use case: Receive one record to process all their items""" + + list_documents = ["008394936"] + by_field = 'record' + + # Build the query to retrieve the total of documents to process + query = make_solr_term_query(list_documents, by_field) + + # Retrieve the documents from Solr + response = get_document_retriever_service.retrieve_documents_from_solr(query, get_solr_request) + output = json.loads(response.content.decode("utf-8")) + + # Generate the metadata for the documents + record_metadata_list = FullTextSearchRetrieverQueueService.generate_chunk_metadata(list_documents, output, by_field) + + #results = get_catalog_retriever_service.retrieve_documents(list_documents, 0, 10, by_field) + assert len(record_metadata_list) == 4 + + def test_retrieve_documents_by_item_only_one(self, get_document_retriever_service, get_solr_request): + """Use case: Retrieve only the metadata of the item given by parameter""" + list_documents = ['nyp.33433082002258'] + by_field = 'item' + + # Build the query to retrieve the total of documents to process + query = make_solr_term_query(list_documents, by_field) + + # Retrieve the documents from Solr + response = get_document_retriever_service.retrieve_documents_from_solr(query, get_solr_request) + output = json.loads(response.content.decode("utf-8")) + + # Generate the metadata for the documents + record_metadata_list = FullTextSearchRetrieverQueueService.generate_chunk_metadata(list_documents, output, by_field) + + #results = get_catalog_retriever_service.retrieve_documents(list_documents, 0, 10, by_field) + assert len(record_metadata_list) == 1 + + def test_retrieve_documents_by_record_list_records(self, get_document_retriever_service, get_solr_request): + """Use case: Receive one record to process all their items""" + + list_documents = ["008394936", "100393743"] + by_field = 'record' + + # Build the query to retrieve the total of documents to process + query = make_solr_term_query(list_documents, by_field) + + # Retrieve the documents from Solr + response = get_document_retriever_service.retrieve_documents_from_solr(query, get_solr_request) + output = json.loads(response.content.decode("utf-8")) + + # Generate the metadata for the documents + record_metadata_list = FullTextSearchRetrieverQueueService.generate_chunk_metadata(list_documents, output, by_field) + + # results = get_catalog_retriever_service.retrieve_documents(list_documents, 0, 10, by_field) + assert len(record_metadata_list) == 5 + + def test_retrieve_documents_empty_result(self, get_document_retriever_service, get_solr_request): + """Use case: Check of the results list is empty because the input item is not in Solr""" + list_documents = ["this_id_does_not_exist_in_solr"] + + query = make_solr_term_query(list_documents, by_field="item") + + response = get_document_retriever_service.retrieve_documents_from_solr(query, get_solr_request) + + output = json.loads(response.content.decode("utf-8")) + + #results = get_catalog_retriever_service.retrieve_documents(list_documents, 0, 10, by_field) + assert output.get("response").get("numFound") == 0 + + + def test_solr_is_not_working(self, get_catalog_retriever_service_solr_fake_solr_url): + """Use case: Count the number of documents in Catalog""" + + list_documents = ['nyp.33433082002258'] + + query = make_solr_term_query(list_documents, by_field="item") + + with pytest.raises(Exception): + response = FullTextSearchRetrieverQueueService.retrieve_documents_from_solr(query, + get_catalog_retriever_service_solr_fake_solr_url) + assert response is None + diff --git a/document_retriever_service/retriever_arguments.py b/document_retriever_service/retriever_arguments.py index 6545338..0ce2b8d 100644 --- a/document_retriever_service/retriever_arguments.py +++ b/document_retriever_service/retriever_arguments.py @@ -5,10 +5,11 @@ import ht_utils.ht_utils from ht_utils.ht_logger import get_ht_logger from ht_utils.ht_mysql import get_mysql_conn -from ht_utils.ht_utils import get_solr_url +from ht_utils.ht_utils import get_solr_url, comma_separated_list from ht_indexer_monitoring.ht_indexer_tracktable import PROCESSING_STATUS_TABLE_NAME + logger = get_ht_logger(name=__name__) current = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) @@ -20,9 +21,6 @@ TOTAL_MYSQL_ROWS = 24000 -def comma_separated_list(arg): - return arg.split(",") - class RetrieverServiceArguments: def __init__(self, parser): parser.add_argument("--list_documents", help="List of items to process", @@ -56,12 +54,17 @@ def __init__(self, parser): # Retriever 24k items from the database self.retriever_query = f"SELECT ht_id, record_id FROM {PROCESSING_STATUS_TABLE_NAME} WHERE retriever_status = 'pending' LIMIT {TOTAL_MYSQL_ROWS}" + # TODO Remove the line below once SolrExporter been updated self.solr_url = f"{solr_url}/query" - # Each Solr query will retrieve maximum 1000 documents - self.start = SOLR_ROW_START - self.rows = SOLR_TOTAL_ROWS + self.solr_host = get_solr_url().strip('/') + self.solr_user=os.getenv("SOLR_USER") + self.solr_password=os.getenv("SOLR_PASSWORD") - self.solr_api_url = get_solr_url() + self.solr_retriever_query_params = { + 'q': '*:*', + 'rows': SOLR_TOTAL_ROWS, + 'wt': 'json' + } class RetrieverServiceByFileArguments(RetrieverServiceArguments): diff --git a/document_retriever_service/retriever_services_utils.py b/document_retriever_service/retriever_services_utils.py new file mode 100644 index 0000000..dd23ff9 --- /dev/null +++ b/document_retriever_service/retriever_services_utils.py @@ -0,0 +1,75 @@ +from catalog_metadata.catalog_metadata import CatalogItemMetadata, CatalogRecordMetadata +from ht_queue_service.queue_producer import QueueProducer +from ht_utils.ht_logger import get_ht_logger +logger = get_ht_logger(name=__name__) +class RetrieverServicesUtils: + """ + A utility class for the Document Retriever Service. + """ + + @staticmethod + def publish_document(queue_producer: QueueProducer, content: dict = None): + """ + Publish the document in a queue + :param queue_producer: QueueProducer object + :param content: dict with the content of the message + """ + message = content + logger.info(f"Sending message with id {content.get('ht_id')} to queue {queue_producer.queue_name}") + queue_producer.publish_messages(message) + + @staticmethod + def get_catalog_object(item_id: str, + record_metadata: CatalogRecordMetadata) -> CatalogItemMetadata: + + catalog_item_metadata = CatalogItemMetadata(item_id, record_metadata) + return catalog_item_metadata + + @staticmethod + def extract_ids_from_documents(list_documents, by_field): + """ + Prepare the list of ids to be processed + :param list_documents: list of documents to process + :param by_field: field to search by (item=ht_id or record=id) + :return: list of ids to be processed + """ + + if by_field == 'record': + list_documents = [record['record_id'] for record in list_documents] + + if by_field == 'item': + list_documents = [record['ht_id'] for record in list_documents] + + return list_documents + + @staticmethod + def create_catalog_object_by_record_id(record: dict, catalog_record_metadata: CatalogRecordMetadata) -> list[ + CatalogItemMetadata]: + """Receive a record and return a list of item, and their metadata + :param record: dict with catalog record (retrieve from Solr) + :param catalog_record_metadata: CatalogRecordMetadata object + """ + + results = [] + for item_id in record.get('ht_id'): + results.append(RetrieverServicesUtils.get_catalog_object(item_id, catalog_record_metadata)) + + return results + + @staticmethod + def create_catalog_object_by_item_id(list_documents: list, record: dict, + catalog_record_metadata: CatalogRecordMetadata) \ + -> list[CatalogItemMetadata]: + """Receive a list of documents and a catalog record; + Search for the item (ht_id) in the list and then; + Create the CatalogMetadata object for each document in the list + :param list_documents: list of ht_id + :param record: dict with catalog record (retrieve from Solr) + :param catalog_record_metadata: CatalogRecordMetadata object + """ + results = [] + for item_id in record.get('ht_id'): + if item_id in list_documents: + results.append(RetrieverServicesUtils.get_catalog_object(item_id, catalog_record_metadata)) + return results + diff --git a/document_retriever_service/retriever_services_utils_test.py b/document_retriever_service/retriever_services_utils_test.py new file mode 100644 index 0000000..d0f2dc6 --- /dev/null +++ b/document_retriever_service/retriever_services_utils_test.py @@ -0,0 +1,16 @@ +from document_retriever_service.retriever_services_utils import RetrieverServicesUtils + +class TestRetrieverServicesUtils: + + def test_create_catalog_object_by_item_id(self, get_catalog_record_metadata, + get_record_data): + """Test if the method returns only the metadata of the input item""" + results = [] + # Create the list + list_documents = ["mdp.39015078560292"] + results = RetrieverServicesUtils.create_catalog_object_by_item_id(list_documents, get_record_data, + get_catalog_record_metadata) + + assert len(results) == 1 + assert results[0].ht_id == "mdp.39015078560292" + assert results[0].metadata.get("vol_id") == "mdp.39015078560292" diff --git a/document_retriever_service/run_retriever_service_by_file.py b/document_retriever_service/run_retriever_service_by_file.py index b97857d..fb841be 100644 --- a/document_retriever_service/run_retriever_service_by_file.py +++ b/document_retriever_service/run_retriever_service_by_file.py @@ -16,31 +16,27 @@ sys.path.insert(0, parent) -def retrieve_documents_by_file(solr_api_url, queue_name, queue_host, queue_user, queue_password, - input_documents_file, query_field, start, rows, status_file, parallelize, nthreads): +def retrieve_documents_by_file(queue_name, queue_host, queue_user, queue_password, + query_field, solr_host, solr_user, solr_password, solr_retriever_query_params, + input_documents_file, status_file): """ This method is used to retrieve the documents from the Catalog and generate the full text search entry. The list of documents to index is extracted from a file. - :param solr_api_url: Solr API URL - :param queue_name: Queue name - :param queue_host: Queue host - :param queue_user: Queue user - :param queue_password: Queue password - :param input_documents_file: File with the list of documents to process - :param query_field: Query field - :param start: Start Solr query - :param rows: rows - :param status_file: Status file - :param parallelize: Boolean parameter to parallelize the process - :param nthreads: Number of threads + :param queue_name: The name of the queue to use + :param queue_host: The host of the queue + :param queue_user: The user of the queue + :param queue_password: The password of the queue + :param query_field: The field to use in the query + :param solr_host: The host of the Solr server + :param solr_user: The user of the Solr server + :param solr_password: The password of the Solr server + :param solr_retriever_query_params: The query parameters to use in the Solr query + :param input_documents_file: The file containing the list of documents to process + :param status_file: The file to store the status of the documents """ - document_indexer_service = FullTextSearchRetrieverQueueService( - solr_api_url, - queue_name, - queue_host, - queue_user, - queue_password) + document_retriever_service = FullTextSearchRetrieverQueueService(queue_name, queue_host, queue_user, queue_password, + solr_host, solr_user, solr_password, solr_retriever_query_params) if os.path.isfile(input_documents_file): with open(input_documents_file) as f: @@ -63,10 +59,9 @@ def retrieve_documents_by_file(solr_api_url, queue_name, queue_host, queue_user, logger.info(f"Total of documents to process {len(list_documents)}") total_documents = len(list_documents) - run_retriever_service(parallelize, nthreads, total_documents, list_documents, query_field, - document_indexer_service, - start, - rows) + run_retriever_service(list_documents, query_field, + document_retriever_service, + ) logger.info(f"Total time to retrieve and generate documents {time.time() - start_time:.10f}") @@ -83,15 +78,15 @@ def main(): # TODO: Review the logic of the status file status_file = os.path.join(current, "document_retriever_status.txt") - parallelize = True - nthreads = None - - retrieve_documents_by_file(init_args_obj.solr_api_url, init_args_obj.queue_name, init_args_obj.queue_host, + retrieve_documents_by_file(init_args_obj.queue_name, init_args_obj.queue_host, init_args_obj.queue_user, init_args_obj.queue_password, - init_args_obj.input_documents_file, init_args_obj.query_field, - init_args_obj.start, init_args_obj.rows, - status_file, parallelize, nthreads) - + init_args_obj.query_field, + init_args_obj.solr_host, + init_args_obj.solr_user, + init_args_obj.solr_password, + init_args_obj.solr_retriever_query_params, + init_args_obj.input_documents_file, + status_file) if __name__ == "__main__": main() diff --git a/document_retriever_service/run_retriever_service_by_file_test.py b/document_retriever_service/run_retriever_service_by_file_test.py index 284e46a..faae4a4 100644 --- a/document_retriever_service/run_retriever_service_by_file_test.py +++ b/document_retriever_service/run_retriever_service_by_file_test.py @@ -1,6 +1,7 @@ import pytest import os +from conftest import solr_catalog_url from document_retriever_service.run_retriever_service_by_file import retrieve_documents_by_file from document_retriever_service.ht_status_retriever_service import get_non_processed_ids import tempfile @@ -32,34 +33,28 @@ def test_get_non_processed_ids(self, get_input_file, get_status_file): assert len(ids2process) == 12 assert len(processed_ids) == 0 - @pytest.mark.parametrize("retriever_parameters", [{"solr_api": "http://solr-sdr-catalog:9033/solr/catalog/", - "user": "guest", "password": "guest", "host": "rabbitmq", + @pytest.mark.parametrize("retriever_parameters", [{"user": "guest", "password": "guest", "host": "rabbitmq", "queue_name": "test_producer_queue", "requeue_message": False, "query_field": "item", - "start": 0, - "rows": 100, "batch_size": 1}]) def test_run_retriever_service_by_file(self, retriever_parameters, get_input_file, get_status_file, - consumer_instance): - parallelize = False - nthreads = None + consumer_instance, solr_catalog_url, get_retriever_service_solr_parameters): # Clean up the queue consumer_instance.conn.ht_channel.queue_purge(consumer_instance.queue_name) - retrieve_documents_by_file(retriever_parameters["solr_api"], - retriever_parameters["queue_name"], + retrieve_documents_by_file(retriever_parameters["queue_name"], retriever_parameters["host"], retriever_parameters["user"], retriever_parameters["password"], - get_input_file, retriever_parameters["query_field"], - retriever_parameters["start"], - retriever_parameters["rows"], - get_status_file, - parallelize, - nthreads) + solr_catalog_url, + 'solr_user', + 'solr_password', + get_retriever_service_solr_parameters, + get_input_file, + get_status_file) assert 9 == consumer_instance.conn.get_total_messages() diff --git a/ht_indexer_api/ht_indexer_api.py b/ht_indexer_api/ht_indexer_api.py index 3b203c9..a629f80 100644 --- a/ht_indexer_api/ht_indexer_api.py +++ b/ht_indexer_api/ht_indexer_api.py @@ -69,41 +69,20 @@ def index_documents_by_file(self, path: Path, list_documents: list = None, solr_ return response - def get_documents( - self, - query: str = None, - response_format: Text = "json", - start: int = 0, - rows: int = 100, - ): - """ Get documents from Solr server - If solr server is running, it will return a response object - otherwise, it will raise an exception - Any exception will be raised to the caller + def send_solr_request(self, solr_host: str, solr_params: dict): """ - solr_headers = {"Content-type": "application/json"} - - if response_format == "xml": - solr_headers = {"Content-type": "application/xml"} - if response_format == "html": - solr_headers = {"Content-type": "application/html"} - - data_query = {"q": "*:*"} - if query: - data_query["q"] = query - else: - data_query = {"q": "*:*"} - - data_query.update({"start": start, "rows": rows}) - + Send a request to Solr and return the response. + """ + # try ... except block to catch any exception raised by the Solr connection try: response = requests.post( - f"{self.url}query", - params=data_query, - headers=solr_headers, + f"{solr_host}", + params=solr_params, + auth=self.auth, + headers={"Content-type": "application/json"} ) response.raise_for_status() return response except requests.exceptions.RequestException as e: - logger.info(f"Error {e} in query: {query}") - raise e + logger.info(f"Error {e} in query: {solr_params}") + raise e \ No newline at end of file diff --git a/ht_indexer_api/ht_indexer_api_test.py b/ht_indexer_api/ht_indexer_api_test.py index dacdbe2..e8868d7 100644 --- a/ht_indexer_api/ht_indexer_api_test.py +++ b/ht_indexer_api/ht_indexer_api_test.py @@ -54,43 +54,3 @@ def test_index_document_add(self, mock_index_documents, get_solr_api): # Assert assert response.status_code == 200 - - @patch('ht_indexer_api.ht_indexer_api.HTSolrAPI.get_documents') - def test_query_by_id(self, mock_get_documents, get_solr_api): - mock_response = MagicMock() - mock_response.status_code = 200 - mock_response.headers = {"Content-Type": "text/plain;charset=utf-8"} - - mock_get_documents.return_value = mock_response - - """ - :param get_solrAPI: - :return: - """ - query = "oclc:23549320" - response = get_solr_api.get_documents(query=query, response_format="json") - - assert response.status_code == 200 - assert ( - response.headers["Content-Type"] == "text/plain;charset=utf-8" - ) - - @patch('ht_indexer_api.ht_indexer_api.HTSolrAPI.index_documents_by_file') - def test_index_document_delete(self, mock_index_documents, get_solr_api): - mock_response = MagicMock() - mock_response.status_code = 200 - mock_index_documents.return_value = mock_response - - document_path = Path( - f"{os.path.dirname(__file__)}/data/delete" - ) # "data/delete" - list_documents = ["39015078560292-1-1-flat.solr_delete.xml"] - - response = get_solr_api.index_documents_by_file(document_path, list_documents=list_documents, solr_url_json="update/") - assert response.status_code == 200 - - def test_get_documents_failed(self, get_fake_solr_api): - query = "*:*" - - with pytest.raises(Exception, match=""): - response = get_fake_solr_api.get_documents(query, response_format="json") diff --git a/ht_indexer_monitoring/ht_indexer_tracktable.py b/ht_indexer_monitoring/ht_indexer_tracktable.py index e49a873..61616dc 100644 --- a/ht_indexer_monitoring/ht_indexer_tracktable.py +++ b/ht_indexer_monitoring/ht_indexer_tracktable.py @@ -22,6 +22,7 @@ # MySQL table to track the status of the indexer PROCESSING_STATUS_TABLE_NAME = "fulltext_item_processing_status" +MYSQL_INSERT_BATCH_SIZE = 500 HT_INDEXER_TRACKTABLE = f""" CREATE TABLE IF NOT EXISTS {PROCESSING_STATUS_TABLE_NAME} ( @@ -85,7 +86,7 @@ def get_catalog_data(self, query: str, status=record['status'])) # Insert in MySQL a batch size of 500 records - if len(data) >= 500: + if len(data) >= MYSQL_INSERT_BATCH_SIZE: yield data data = [] if len(data) > 0: diff --git a/ht_utils/ht_utils.py b/ht_utils/ht_utils.py index 4f02a13..cb9a631 100644 --- a/ht_utils/ht_utils.py +++ b/ht_utils/ht_utils.py @@ -59,3 +59,11 @@ def get_error_message_by_document(service_name: str, e: Exception, doc: dict) -> 'ht_id': doc.get('ht_id') if doc.get('ht_id') else doc.get('id'), 'timestamp': get_current_time() } + +def split_into_batches(documents, batch_size): + """Split the list of documents into batches of given size.""" + for i in range(0, len(documents), batch_size): + yield documents[i:i + batch_size] + +def comma_separated_list(arg): + return arg.split(",") \ No newline at end of file diff --git a/ht_utils/query_maker.py b/ht_utils/query_maker.py index 8390258..b632c3a 100644 --- a/ht_utils/query_maker.py +++ b/ht_utils/query_maker.py @@ -31,3 +31,30 @@ def make_query(list_documents: list[str], by_field: str = 'item') -> str: values = '"'.join(("", values, "")) query = f"{query_field}:({values})" return query + +def make_solr_term_query(list_documents: list[str], by_field: str = 'item') -> str: + """ + Receives a list of ht_id or id and returns a query to retrieve the documents from the Catalog + Parameters + ---------- + by_field: str + Field to be used in the query. If item, the query will be ht_id: item_id + If record, the query will be ht_id: (item_id1 OR item_id2 OR item_id3) + ---------- + list_documents: list[str] + List of ht_id + Returns + ------- + str + Query to retrieve the documents from the Catalog + """ + + # Use terms query parser for faster lookup for large sets of IDs, e.g., document_retriever_service + # The terms query parser in Solr is a highly efficient way to search for multiple exact values + # in a specific field — great for querying by id or any other exact-match field, + # especially when you're dealing with large lists. + query = '{!terms f=ht_id}' + ','.join(list_documents) + + if by_field == 'record': + query = '{!terms f=id}' + ','.join(list_documents) + return query \ No newline at end of file diff --git a/main.py b/main.py index cd36136..c98c9b6 100755 --- a/main.py +++ b/main.py @@ -53,11 +53,6 @@ def solr_indexing(path, list_documents: list = None): response = solr_api.index_documents_by_file(path, list_documents=list_documents) return {"status": response.status_code, "description": response.headers} - @app.post("/solrQuery/") - def solr_query_id(query): - response = solr_api.get_documents(query, response_format="json") - return {"status": response.status_code, "description": response.headers} - uvicorn.run(app, host=args.host, port=int(args.port))