-
Notifications
You must be signed in to change notification settings - Fork 63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add dynamodb retry config for throttling and other errors. Add exponential backoff and jitter for unprocessed keys. Fix edge case where we succesfully process keys on our last attempt but still fail #1023
Changes from all commits
e382156
3e74d75
f88d7f9
6902ab0
b4e423d
b0e890b
de2e2f9
be7e063
6720e0f
ff216f7
e0f2cce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,8 @@ | |
from typing import TypeVar | ||
|
||
import boto3 # type: ignore | ||
import botocore # type: ignore | ||
from botocore.config import Config # type: ignore | ||
|
||
import tron.prom_metrics as prom_metrics | ||
from tron.core.job import Job | ||
|
@@ -35,16 +37,34 @@ | |
# to contain other attributes like object name and number of partitions. | ||
OBJECT_SIZE = 200_000 # TODO: TRON-2240 - consider swapping back to 400_000 now that we've removed pickles | ||
MAX_SAVE_QUEUE = 500 | ||
MAX_ATTEMPTS = 10 | ||
# This is distinct from the number of retries in the retry_config as this is used for handling unprocessed | ||
# keys outside the bounds of something like retrying on a ThrottlingException. We need this limit to avoid | ||
# infinite loops in the case where a key is truly unprocessable. We allow for more retries than it should | ||
# ever take to avoid failing restores due to transient issues. | ||
MAX_UNPROCESSED_KEYS_RETRIES = 30 | ||
MAX_TRANSACT_WRITE_ITEMS = 100 | ||
log = logging.getLogger(__name__) | ||
T = TypeVar("T") | ||
|
||
|
||
class DynamoDBStateStore: | ||
def __init__(self, name, dynamodb_region, stopping=False) -> None: | ||
self.dynamodb = boto3.resource("dynamodb", region_name=dynamodb_region) | ||
self.client = boto3.client("dynamodb", region_name=dynamodb_region) | ||
# Standard mode includes an exponential backoff by a base factor of 2 for a | ||
# maximum backoff time of 20 seconds (min(b*r^i, MAX_BACKOFF) where b is a | ||
# random number between 0 and 1 and r is the base factor of 2). This might | ||
# look like: | ||
# | ||
# seconds_to_sleep = min(1 × 2^1, 20) = min(2, 20) = 2 seconds | ||
# | ||
# By our 5th retry (2^5 is 32) we will be sleeping *up to* 20 seconds, depending | ||
# on the random jitter. | ||
# | ||
# It handles transient errors like RequestTimeout and ConnectionError, as well | ||
# as Service-side errors like Throttling, SlowDown, and LimitExceeded. | ||
retry_config = Config(retries={"max_attempts": 5, "mode": "standard"}) | ||
|
||
self.dynamodb = boto3.resource("dynamodb", region_name=dynamodb_region, config=retry_config) | ||
self.client = boto3.client("dynamodb", region_name=dynamodb_region, config=retry_config) | ||
self.name = name | ||
self.dynamodb_region = dynamodb_region | ||
self.table = self.dynamodb.Table(name) | ||
|
@@ -63,11 +83,11 @@ def build_key(self, type, iden) -> str: | |
|
||
def restore(self, keys, read_json: bool = False) -> dict: | ||
""" | ||
Fetch all under the same parition key(s). | ||
Fetch all under the same partition key(s). | ||
ret: <dict of key to states> | ||
""" | ||
# format of the keys always passed here is | ||
# job_state job_name --> high level info about the job: enabled, run_nums | ||
# job_state job_name --> high level info about the job: enabled, run_nums | ||
# job_run_state job_run_name --> high level info about the job run | ||
first_items = self._get_first_partitions(keys) | ||
remaining_items = self._get_remaining_partitions(first_items, read_json) | ||
|
@@ -83,12 +103,22 @@ def chunk_keys(self, keys: Sequence[T]) -> List[Sequence[T]]: | |
cand_keys_chunks.append(keys[i : min(len(keys), i + 100)]) | ||
return cand_keys_chunks | ||
|
||
def _calculate_backoff_delay(self, attempt: int) -> int: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this technically doesn't need to be in the class since we're not accessing anything in it (i.e., we never use |
||
# Clamp attempt to 1 to avoid negative or zero exponent | ||
safe_attempt = max(attempt, 1) | ||
base_delay_seconds = 1 | ||
max_delay_seconds = 10 | ||
delay: int = min(base_delay_seconds * (2 ** (safe_attempt - 1)), max_delay_seconds) | ||
return delay | ||
|
||
def _get_items(self, table_keys: list) -> object: | ||
items = [] | ||
# let's avoid potentially mutating our input :) | ||
cand_keys_list = copy.copy(table_keys) | ||
attempts_to_retrieve_keys = 0 | ||
while len(cand_keys_list) != 0: | ||
attempts = 0 | ||
|
||
# TODO: TRON-2363 - We should refactor this to not consume attempts when we are still making progress | ||
while len(cand_keys_list) != 0 and attempts < MAX_UNPROCESSED_KEYS_RETRIES: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just posting this here for future us: we'll probably want to refactor this at some point to not consume an attempt if we're making progress (i.e., we got at least one key back) and we're simply seeing dynamodb send us partial responses (unless we wanna take a hard line with what our data sizes are such that we can always get a full chunk back at any time) and only consume an attempt if we're doing an error-caused retry There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah shit, meant to ticket that and link it in a TODO. Thanks for calling that out |
||
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: | ||
responses = [ | ||
executor.submit( | ||
|
@@ -106,20 +136,35 @@ def _get_items(self, table_keys: list) -> object: | |
cand_keys_list = [] | ||
for resp in concurrent.futures.as_completed(responses): | ||
try: | ||
items.extend(resp.result()["Responses"][self.name]) | ||
# add any potential unprocessed keys to the thread pool | ||
if resp.result()["UnprocessedKeys"].get(self.name) and attempts_to_retrieve_keys < MAX_ATTEMPTS: | ||
cand_keys_list.extend(resp.result()["UnprocessedKeys"][self.name]["Keys"]) | ||
elif attempts_to_retrieve_keys >= MAX_ATTEMPTS: | ||
failed_keys = resp.result()["UnprocessedKeys"][self.name]["Keys"] | ||
error = Exception( | ||
f"tron_dynamodb_restore_failure: failed to retrieve items with keys \n{failed_keys}\n from dynamodb\n{resp.result()}" | ||
) | ||
raise error | ||
except Exception as e: | ||
result = resp.result() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we should also print the response when we get into the exception block to also have an idea on why we got unprocessed keys and why we exceeded the attempts There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so maybe we add it here
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was hesitant to dump the response because it can get pretty large. After a lot of reading I've landed on logging ResponseMetadata on ClientError. This should capture what we care about See https://fluffy.yelpcorp.com/i/qWG1tRPrFt40M6pPr3lLkXnCSbJJBFhd.html |
||
items.extend(result.get("Responses", {}).get(self.name, [])) | ||
|
||
# If DynamoDB returns unprocessed keys, we need to collect them and retry | ||
unprocessed_keys = result.get("UnprocessedKeys", {}).get(self.name, {}).get("Keys", []) | ||
if unprocessed_keys: | ||
cand_keys_list.extend(unprocessed_keys) | ||
except botocore.exceptions.ClientError as e: | ||
log.exception(f"ClientError during batch_get_item: {e.response}") | ||
raise | ||
except Exception: | ||
log.exception("Encountered issues retrieving data from DynamoDB") | ||
raise e | ||
attempts_to_retrieve_keys += 1 | ||
raise | ||
if cand_keys_list: | ||
# We use _calculate_backoff_delay to get a delay that increases exponentially | ||
# with each retry. These retry attempts are distinct from the boto3 retry_config | ||
# and are used specifically to handle unprocessed keys. | ||
attempts += 1 | ||
delay = self._calculate_backoff_delay(attempts) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fwiw, I think it's probably fine to rely on the built-in backoff from boto - there shouldn't be anything else touching these dynamo tables other than tron, so we don't really need any jitter to avoid a thundering herd scenario :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are two levels of backoff, basically. There is the built-in retry config that catches something like throttling, and then there is our own backoff based on unprocessedkeys. This seems to be the suggested approach based on the warning in: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/batch_get_item.html It's not 100% clear to me that the retry config will handle unprocessedkeys There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (sorry, missed this reply!) (not a blocker tho) |
||
log.warning( | ||
f"Attempt {attempts}/{MAX_UNPROCESSED_KEYS_RETRIES} - " | ||
f"Retrying {len(cand_keys_list)} unprocessed keys after {delay}s delay." | ||
) | ||
time.sleep(delay) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What to do about this lil guy? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. !8ball we should use a restore thread There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea, we should probably try to figure out a non-blocking way to do this or have this run in a separate thread - if we get to the worst case of 5 attempts and this is running on the reactor thread, we'll essentially block all of tron from doing anything for 20s There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. although, actually - this is probably fine since we do all sorts of blocking stuff in restore and aren't expecting tron to be usable/do anything until we've restored everything ...so maybe this is fine? |
||
if cand_keys_list: | ||
msg = f"tron_dynamodb_restore_failure: failed to retrieve items with keys \n{cand_keys_list}\n from dynamodb after {MAX_UNPROCESSED_KEYS_RETRIES} retries." | ||
log.error(msg) | ||
|
||
raise KeyError(msg) | ||
return items | ||
|
||
def _get_first_partitions(self, keys: list): | ||
|
@@ -291,12 +336,17 @@ def _save_loop(self): | |
def __setitem__(self, key: str, value: Tuple[bytes, str]) -> None: | ||
""" | ||
Partition the item and write up to MAX_TRANSACT_WRITE_ITEMS | ||
partitions atomically. Retry up to 3 times on failure. | ||
partitions atomically using TransactWriteItems. | ||
|
||
The function examines the size of pickled_val and json_val, | ||
splitting them into multiple segments based on OBJECT_SIZE, | ||
storing each segment under the same partition key. | ||
|
||
Examine the size of `pickled_val` and `json_val`, and | ||
splice them into different parts based on `OBJECT_SIZE` | ||
with different sort keys, and save them under the same | ||
partition key built. | ||
It relies on the boto3/botocore retry_config to handle | ||
certain errors (e.g. throttling). If an error is not | ||
addressed by boto3's internal logic, the transaction fails | ||
and raises an exception. It is the caller's responsibility | ||
to implement further retries. | ||
""" | ||
start = time.time() | ||
|
||
|
@@ -337,25 +387,21 @@ def __setitem__(self, key: str, value: Tuple[bytes, str]) -> None: | |
"N": str(num_json_val_partitions), | ||
} | ||
|
||
count = 0 | ||
items.append(item) | ||
|
||
while len(items) == MAX_TRANSACT_WRITE_ITEMS or index == max_partitions - 1: | ||
# We want to write the items when we've either reached the max number of items | ||
# for a transaction, or when we're done processing all partitions | ||
if len(items) == MAX_TRANSACT_WRITE_ITEMS or index == max_partitions - 1: | ||
try: | ||
self.client.transact_write_items(TransactItems=items) | ||
items = [] | ||
break # exit the while loop on successful writing | ||
except Exception as e: | ||
count += 1 | ||
if count > 3: | ||
timer( | ||
name="tron.dynamodb.setitem", | ||
delta=time.time() - start, | ||
) | ||
log.error(f"Failed to save partition for key: {key}, error: {repr(e)}") | ||
raise e | ||
else: | ||
log.warning(f"Got error while saving {key}, trying again: {repr(e)}") | ||
except Exception: | ||
timer( | ||
name="tron.dynamodb.setitem", | ||
delta=time.time() - start, | ||
) | ||
log.exception(f"Failed to save partition for key: {key}") | ||
raise | ||
timer( | ||
name="tron.dynamodb.setitem", | ||
delta=time.time() - start, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could probably remove this assert if we raised a less generic exception and rely on the
pytest.raises(SomeMoreTargetedException)
proving that the right exception was raised