Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions document_retriever_service/full_text_search_retriever_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
MYSQL_COLUMN_UPDATE = 'retriever_status'
SUCCESS_UPDATE_STATUS = f"UPDATE {PROCESSING_STATUS_TABLE_NAME} SET status = %s, {MYSQL_COLUMN_UPDATE} = %s, processed_at = %s WHERE ht_id = %s"
FAILURE_UPDATE_STATUS = f"UPDATE {PROCESSING_STATUS_TABLE_NAME} SET status = %s, {MYSQL_COLUMN_UPDATE} = %s, processed_at = %s, error = %s WHERE ht_id = %s"
PARALLELIZE = False
PARALLELIZE = True
SOLR_BATCH_SIZE = 200 # The chunk size is 200, because Solr will fail with the status code 414. The chunk size was determined
# by testing the Solr query with different values (e.g., 100-500 and with 200 ht_ids it worked.

Expand Down Expand Up @@ -232,18 +232,19 @@ def full_text_search_retriever_service(self, initial_documents, by_field: str =
FullTextSearchRetrieverQueueService.publishing_documents(queue_producer, record_metadata_list, mysql_db)


def run_retriever_service(list_documents, by_field, document_retriever_service):
def run_retriever_service(list_documents, by_field, document_retriever_service, parallelize: bool = False):
"""
Run the retriever service

:param list_documents:
:param by_field:
:param document_retriever_service:
:param parallelize: if True, the process will run in parallel
"""

total_documents = len(list_documents)

if PARALLELIZE:
if parallelize:

n_cores = multiprocessing.cpu_count()

Expand Down Expand Up @@ -307,7 +308,7 @@ def main():
# If the list of documents is provided, the process will run only for the documents in the list
list_ids = RetrieverServicesUtils.extract_ids_from_documents(init_args_obj.list_documents, by_field)
logger.info(f"Process=retrieving: Total of documents to process {len(list_ids)}")
run_retriever_service(list_ids, by_field, document_retriever_service)
run_retriever_service(list_ids, by_field, document_retriever_service, parallelize=PARALLELIZE)
else:

# If the table does not exist, stop the process
Expand All @@ -330,7 +331,7 @@ def main():

logger.info(f"Process=retrieving: Total of documents to process {len(list_ids)}")

run_retriever_service(list_ids, by_field, document_retriever_service)
run_retriever_service(list_ids, by_field, document_retriever_service, parallelize=PARALLELIZE)

# Checking the number of messages in the queue
# Create a connection to the queue to produce messages
Expand Down
1 change: 1 addition & 0 deletions document_retriever_service/retriever_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(self, parser):
self.solr_password=os.getenv("SOLR_PASSWORD")

self.solr_retriever_query_params = {
'q': '*:*',
'rows': SOLR_TOTAL_ROWS,
'wt': 'json'
}
Expand Down
9 changes: 6 additions & 3 deletions document_retriever_service/run_retriever_service_by_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
parent = os.path.dirname(current)
sys.path.insert(0, parent)

PARALLELIZE = True

def retrieve_documents_by_file(queue_name, queue_host, queue_user, queue_password,
query_field, solr_host, solr_user, solr_password, solr_retriever_query_params,
input_documents_file, status_file):
input_documents_file, status_file, parallelize=PARALLELIZE):
""" This method is used to retrieve the documents from the Catalog and generate the full text search entry.
The list of documents to index is extracted from a file.

Expand All @@ -33,6 +34,7 @@ def retrieve_documents_by_file(queue_name, queue_host, queue_user, queue_passwor
:param solr_retriever_query_params: The query parameters to use in the Solr query
:param input_documents_file: The file containing the list of documents to process
:param status_file: The file to store the status of the documents
:param parallelize: If True, the processing will be done in parallel
"""

document_retriever_service = FullTextSearchRetrieverQueueService(queue_name, queue_host, queue_user, queue_password,
Expand All @@ -58,9 +60,9 @@ def retrieve_documents_by_file(queue_name, queue_host, queue_user, queue_passwor

logger.info(f"Total of documents to process {len(list_documents)}")

total_documents = len(list_documents)
run_retriever_service(list_documents, query_field,
document_retriever_service,
parallelize=parallelize
)

logger.info(f"Total time to retrieve and generate documents {time.time() - start_time:.10f}")
Expand All @@ -86,7 +88,8 @@ def main():
init_args_obj.solr_password,
init_args_obj.solr_retriever_query_params,
init_args_obj.input_documents_file,
status_file)
status_file,
parallelize=PARALLELIZE)

if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ def test_run_retriever_service_by_file(self, retriever_parameters, get_input_fil
'solr_password',
get_retriever_service_solr_parameters,
get_input_file,
get_status_file)
get_status_file,
parallelize=False)

assert 9 == consumer_instance.conn.get_total_messages()

Expand Down
5 changes: 2 additions & 3 deletions ht_queue_service/queue_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
# Calculate the maximum number of messages in the queue

# The average size of a document_generator message is 1.8 MB
# The average size of a document_retriever message is 0.0132 MB
# The average size of a document_retriever message is 0.16 MB

# The total disk space of the RabbitMQ server is 50 GB.
# 1 GB = 1024 MB, so 50 GB = 50 * 1024 MB = 51,200 MB.

# Let's calculate using 90% of the total disk space 51,200 MB * 0.90 = 46,080 MB

# The maximum number of document_generator messages in the queue is 46,080 MB / 1.8 MB = 25,600 messages
# The maximum number of document_retriever messages in the queue is 46,080 MB / 0.0132 MB = 3,487,878 messages
# The maximum number of document_retriever messages in the queue is 46,080 MB / 0.16 MB = 288,000 messages

# To set the maximum number of messages in the retriever queue, I'll set it to 500,000 messages
MAX_DOCUMENT_IN_QUEUE = 200000 # 200000 is the maximum number of messages in the retriever queue
Expand Down
9 changes: 5 additions & 4 deletions ht_queue_service/queue_multiple_consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

logger = get_ht_logger(name=__name__)

class TestHTMultipleConsumerServiceConcrete(QueueMultipleConsumer):

class HTMultipleConsumerServiceConcrete(QueueMultipleConsumer):

def __init__(self, user: str, password: str, host: str, queue_name: str, requeue_message: bool = False,
batch_size: int = 1, shutdown_on_empty_queue: bool = True):
Expand Down Expand Up @@ -71,17 +72,17 @@ def list_messages():

@pytest.fixture
def multiple_consumer_instance():
return TestHTMultipleConsumerServiceConcrete(user="guest", password="guest", host="rabbitmq",
return HTMultipleConsumerServiceConcrete(user="guest", password="guest", host="rabbitmq",
queue_name="test_producer_queue", requeue_message=False, batch_size=1)

@pytest.fixture
def multiple_consumer_instance_requeue_true_size_n():
return TestHTMultipleConsumerServiceConcrete(user="guest", password="guest", host="rabbitmq",
return HTMultipleConsumerServiceConcrete(user="guest", password="guest", host="rabbitmq",
queue_name="test_producer_queue", requeue_message=True, batch_size=10)

@pytest.fixture
def multiple_consumer_instance_requeue_false_size_n():
return TestHTMultipleConsumerServiceConcrete(user="guest", password="guest", host="rabbitmq",
return HTMultipleConsumerServiceConcrete(user="guest", password="guest", host="rabbitmq",
queue_name="test_producer_queue", requeue_message=False, batch_size=10)

@pytest.fixture
Expand Down