From e296b94cbcc80dbab007dfd15dee624024ea8b98 Mon Sep 17 00:00:00 2001 From: Lianet Sepulveda Torres Date: Thu, 1 May 2025 12:28:41 -0400 Subject: [PATCH] Updated the average size of retriever documents;Paralelize=True;solving HTMultipleConsumerServiceConcrete warning --- .../full_text_search_retriever_service.py | 11 ++++++----- document_retriever_service/retriever_arguments.py | 1 + .../run_retriever_service_by_file.py | 9 ++++++--- .../run_retriever_service_by_file_test.py | 3 ++- ht_queue_service/queue_connection.py | 5 ++--- ht_queue_service/queue_multiple_consumer_test.py | 9 +++++---- 6 files changed, 22 insertions(+), 16 deletions(-) diff --git a/document_retriever_service/full_text_search_retriever_service.py b/document_retriever_service/full_text_search_retriever_service.py index 9b97c30..557196f 100644 --- a/document_retriever_service/full_text_search_retriever_service.py +++ b/document_retriever_service/full_text_search_retriever_service.py @@ -35,7 +35,7 @@ MYSQL_COLUMN_UPDATE = 'retriever_status' SUCCESS_UPDATE_STATUS = f"UPDATE {PROCESSING_STATUS_TABLE_NAME} SET status = %s, {MYSQL_COLUMN_UPDATE} = %s, processed_at = %s WHERE ht_id = %s" FAILURE_UPDATE_STATUS = f"UPDATE {PROCESSING_STATUS_TABLE_NAME} SET status = %s, {MYSQL_COLUMN_UPDATE} = %s, processed_at = %s, error = %s WHERE ht_id = %s" -PARALLELIZE = False +PARALLELIZE = True SOLR_BATCH_SIZE = 200 # The chunk size is 200, because Solr will fail with the status code 414. The chunk size was determined # by testing the Solr query with different values (e.g., 100-500 and with 200 ht_ids it worked. @@ -232,18 +232,19 @@ def full_text_search_retriever_service(self, initial_documents, by_field: str = FullTextSearchRetrieverQueueService.publishing_documents(queue_producer, record_metadata_list, mysql_db) -def run_retriever_service(list_documents, by_field, document_retriever_service): +def run_retriever_service(list_documents, by_field, document_retriever_service, parallelize: bool = False): """ Run the retriever service :param list_documents: :param by_field: :param document_retriever_service: + :param parallelize: if True, the process will run in parallel """ total_documents = len(list_documents) - if PARALLELIZE: + if parallelize: n_cores = multiprocessing.cpu_count() @@ -307,7 +308,7 @@ def main(): # If the list of documents is provided, the process will run only for the documents in the list list_ids = RetrieverServicesUtils.extract_ids_from_documents(init_args_obj.list_documents, by_field) logger.info(f"Process=retrieving: Total of documents to process {len(list_ids)}") - run_retriever_service(list_ids, by_field, document_retriever_service) + run_retriever_service(list_ids, by_field, document_retriever_service, parallelize=PARALLELIZE) else: # If the table does not exist, stop the process @@ -330,7 +331,7 @@ def main(): logger.info(f"Process=retrieving: Total of documents to process {len(list_ids)}") - run_retriever_service(list_ids, by_field, document_retriever_service) + run_retriever_service(list_ids, by_field, document_retriever_service, parallelize=PARALLELIZE) # Checking the number of messages in the queue # Create a connection to the queue to produce messages diff --git a/document_retriever_service/retriever_arguments.py b/document_retriever_service/retriever_arguments.py index 0748488..0ce2b8d 100644 --- a/document_retriever_service/retriever_arguments.py +++ b/document_retriever_service/retriever_arguments.py @@ -61,6 +61,7 @@ def __init__(self, parser): self.solr_password=os.getenv("SOLR_PASSWORD") self.solr_retriever_query_params = { + 'q': '*:*', 'rows': SOLR_TOTAL_ROWS, 'wt': 'json' } diff --git a/document_retriever_service/run_retriever_service_by_file.py b/document_retriever_service/run_retriever_service_by_file.py index fb841be..ebce3ab 100644 --- a/document_retriever_service/run_retriever_service_by_file.py +++ b/document_retriever_service/run_retriever_service_by_file.py @@ -15,10 +15,11 @@ parent = os.path.dirname(current) sys.path.insert(0, parent) +PARALLELIZE = True def retrieve_documents_by_file(queue_name, queue_host, queue_user, queue_password, query_field, solr_host, solr_user, solr_password, solr_retriever_query_params, - input_documents_file, status_file): + input_documents_file, status_file, parallelize=PARALLELIZE): """ This method is used to retrieve the documents from the Catalog and generate the full text search entry. The list of documents to index is extracted from a file. @@ -33,6 +34,7 @@ def retrieve_documents_by_file(queue_name, queue_host, queue_user, queue_passwor :param solr_retriever_query_params: The query parameters to use in the Solr query :param input_documents_file: The file containing the list of documents to process :param status_file: The file to store the status of the documents + :param parallelize: If True, the processing will be done in parallel """ document_retriever_service = FullTextSearchRetrieverQueueService(queue_name, queue_host, queue_user, queue_password, @@ -58,9 +60,9 @@ def retrieve_documents_by_file(queue_name, queue_host, queue_user, queue_passwor logger.info(f"Total of documents to process {len(list_documents)}") - total_documents = len(list_documents) run_retriever_service(list_documents, query_field, document_retriever_service, + parallelize=parallelize ) logger.info(f"Total time to retrieve and generate documents {time.time() - start_time:.10f}") @@ -86,7 +88,8 @@ def main(): init_args_obj.solr_password, init_args_obj.solr_retriever_query_params, init_args_obj.input_documents_file, - status_file) + status_file, + parallelize=PARALLELIZE) if __name__ == "__main__": main() diff --git a/document_retriever_service/run_retriever_service_by_file_test.py b/document_retriever_service/run_retriever_service_by_file_test.py index faae4a4..d833eb8 100644 --- a/document_retriever_service/run_retriever_service_by_file_test.py +++ b/document_retriever_service/run_retriever_service_by_file_test.py @@ -54,7 +54,8 @@ def test_run_retriever_service_by_file(self, retriever_parameters, get_input_fil 'solr_password', get_retriever_service_solr_parameters, get_input_file, - get_status_file) + get_status_file, + parallelize=False) assert 9 == consumer_instance.conn.get_total_messages() diff --git a/ht_queue_service/queue_connection.py b/ht_queue_service/queue_connection.py index 0aa8cc6..aa326ef 100644 --- a/ht_queue_service/queue_connection.py +++ b/ht_queue_service/queue_connection.py @@ -3,15 +3,14 @@ # Calculate the maximum number of messages in the queue # The average size of a document_generator message is 1.8 MB -# The average size of a document_retriever message is 0.0132 MB +# The average size of a document_retriever message is 0.16 MB # The total disk space of the RabbitMQ server is 50 GB. # 1 GB = 1024 MB, so 50 GB = 50 * 1024 MB = 51,200 MB. # Let's calculate using 90% of the total disk space 51,200 MB * 0.90 = 46,080 MB - # The maximum number of document_generator messages in the queue is 46,080 MB / 1.8 MB = 25,600 messages -# The maximum number of document_retriever messages in the queue is 46,080 MB / 0.0132 MB = 3,487,878 messages +# The maximum number of document_retriever messages in the queue is 46,080 MB / 0.16 MB = 288,000 messages # To set the maximum number of messages in the retriever queue, I'll set it to 500,000 messages MAX_DOCUMENT_IN_QUEUE = 200000 # 200000 is the maximum number of messages in the retriever queue diff --git a/ht_queue_service/queue_multiple_consumer_test.py b/ht_queue_service/queue_multiple_consumer_test.py index 73acc1f..35459a3 100644 --- a/ht_queue_service/queue_multiple_consumer_test.py +++ b/ht_queue_service/queue_multiple_consumer_test.py @@ -7,7 +7,8 @@ logger = get_ht_logger(name=__name__) -class TestHTMultipleConsumerServiceConcrete(QueueMultipleConsumer): + +class HTMultipleConsumerServiceConcrete(QueueMultipleConsumer): def __init__(self, user: str, password: str, host: str, queue_name: str, requeue_message: bool = False, batch_size: int = 1, shutdown_on_empty_queue: bool = True): @@ -71,17 +72,17 @@ def list_messages(): @pytest.fixture def multiple_consumer_instance(): - return TestHTMultipleConsumerServiceConcrete(user="guest", password="guest", host="rabbitmq", + return HTMultipleConsumerServiceConcrete(user="guest", password="guest", host="rabbitmq", queue_name="test_producer_queue", requeue_message=False, batch_size=1) @pytest.fixture def multiple_consumer_instance_requeue_true_size_n(): - return TestHTMultipleConsumerServiceConcrete(user="guest", password="guest", host="rabbitmq", + return HTMultipleConsumerServiceConcrete(user="guest", password="guest", host="rabbitmq", queue_name="test_producer_queue", requeue_message=True, batch_size=10) @pytest.fixture def multiple_consumer_instance_requeue_false_size_n(): - return TestHTMultipleConsumerServiceConcrete(user="guest", password="guest", host="rabbitmq", + return HTMultipleConsumerServiceConcrete(user="guest", password="guest", host="rabbitmq", queue_name="test_producer_queue", requeue_message=False, batch_size=10) @pytest.fixture