From 8cfe9f36b4135e378146ae8cc69ff9d3e7ffe886 Mon Sep 17 00:00:00 2001 From: Brian Mesick <112640379+bmtcril@users.noreply.github.com> Date: Fri, 4 Aug 2023 09:25:58 -0400 Subject: [PATCH] fix: Retry getting bulk tracking logs (#330) * fix: Retry getting bulk tracking logs * chore: Bump version to 5.5.5 --- event_routing_backends/__init__.py | 2 +- .../tests/test_transform_tracking_logs.py | 28 ++++++++++++ .../commands/transform_tracking_logs.py | 43 ++++++++++++++++--- event_routing_backends/settings/common.py | 2 + 4 files changed, 69 insertions(+), 6 deletions(-) diff --git a/event_routing_backends/__init__.py b/event_routing_backends/__init__.py index bfdc83a1..79fb3904 100644 --- a/event_routing_backends/__init__.py +++ b/event_routing_backends/__init__.py @@ -2,4 +2,4 @@ Various backends for receiving edX LMS events.. """ -__version__ = '5.5.4' +__version__ = '5.5.5' diff --git a/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py b/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py index 10c8791b..11bc6c89 100644 --- a/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py +++ b/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py @@ -12,6 +12,7 @@ import event_routing_backends.management.commands.transform_tracking_logs as transform_tracking_logs from event_routing_backends.management.commands.helpers.queued_sender import QueuedSender from event_routing_backends.management.commands.transform_tracking_logs import ( + _get_chunks, get_dest_config_from_options, get_libcloud_drivers, get_source_config_from_options, @@ -394,3 +395,30 @@ def test_get_dest_config_lrs(): assert config is None assert container is None assert prefix is None + + +def test_get_chunks(): + """ + Tests the retry functionality of the get_chunks function. + """ + fake_source = MagicMock() + fake_source.download_object_range_as_stream.return_value = "abc" + + # Check that we got the expected return value + assert _get_chunks(fake_source, "", 0, 1) == "abc" + # Check that we broke out of the retry loop as expected + assert fake_source.download_object_range_as_stream.call_count == 1 + + fake_source_err = MagicMock() + fake_source_err.download_object_range_as_stream.side_effect = Exception("boom") + + # Speed up our test, don't wait for the sleep + with patch("event_routing_backends.management.commands.transform_tracking_logs.sleep"): + with pytest.raises(Exception) as e: + _get_chunks(fake_source_err, "", 0, 1) + + # Make sure we're getting the error we expect + assert "boom" in str(e) + + # Make sure we got the correct number of retries + assert fake_source_err.download_object_range_as_stream.call_count == 3 diff --git a/event_routing_backends/management/commands/transform_tracking_logs.py b/event_routing_backends/management/commands/transform_tracking_logs.py index c35a0849..28e08a89 100644 --- a/event_routing_backends/management/commands/transform_tracking_logs.py +++ b/event_routing_backends/management/commands/transform_tracking_logs.py @@ -5,7 +5,9 @@ import os from io import BytesIO from textwrap import dedent +from time import sleep +from django.conf import settings from django.core.management.base import BaseCommand from libcloud.storage.providers import get_driver from libcloud.storage.types import Provider @@ -16,6 +18,40 @@ CHUNK_SIZE = 1024 * 1024 * 2 +def _get_chunks(source, file, start_byte, end_byte): + """ + Fetch a chunk from the upstream source, retry 3 times if necessary. + + Often an upstream provider like S3 will fail occasionally on big jobs. This + tries to handle any of those cases gracefully. + """ + chunks = None + num_retries = getattr(settings, 'EVENT_ROUTING_BACKEND_BULK_DOWNLOAD_MAX_RETRIES', 3) + retry_countdown = getattr(settings, 'EVENT_ROUTING_BACKEND_BULK_DOWNLOAD_COUNTDOWN', 1) + + # Skipping coverage here because it wants to test a branch that will never + # be hit (for -> return) + for try_number in range(1, num_retries+1): # pragma: no cover + try: + chunks = source.download_object_range_as_stream( + file, + start_bytes=start_byte, + end_bytes=end_byte + ) + break + # Catching all exceptions here because there's no telling what all + # the possible errors from different libcloud providers are. + except Exception as e: # pylint: disable=broad-exception-caught + print(e) + if try_number == num_retries: + print(f"Try {try_number}: Error occurred downloading, giving up.") + raise + print(f"Try {try_number}: Error occurred downloading source file chunk. Trying again in 1 second.") + sleep(retry_countdown) + + return chunks + + def transform_tracking_logs( source, source_container, @@ -45,11 +81,8 @@ def transform_tracking_logs( if end_byte > file.size: end_byte = file.size - chunks = source.download_object_range_as_stream( - file, - start_bytes=last_successful_byte, - end_bytes=end_byte - ) + chunks = _get_chunks(source, file, last_successful_byte, end_byte) + for chunk in chunks: chunk = chunk.decode('utf-8') diff --git a/event_routing_backends/settings/common.py b/event_routing_backends/settings/common.py index 9b15c498..9b72d149 100644 --- a/event_routing_backends/settings/common.py +++ b/event_routing_backends/settings/common.py @@ -13,6 +13,8 @@ def plugin_settings(settings): settings.XAPI_EVENT_LOGGING_ENABLED = True settings.EVENT_ROUTING_BACKEND_MAX_RETRIES = 3 settings.EVENT_ROUTING_BACKEND_COUNTDOWN = 30 + settings.EVENT_ROUTING_BACKEND_BULK_DOWNLOAD_MAX_RETRIES = 3 + settings.EVENT_ROUTING_BACKEND_BULK_DOWNLOAD_COUNTDOWN = 1 # .. setting_name: XAPI_AGENT_IFI_TYPE # .. setting_default: 'external_id'