-
Notifications
You must be signed in to change notification settings - Fork 61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Write JSON state #997
Write JSON state #997
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ty for adding all these types as well!
tron/actioncommand.py
Outdated
@staticmethod | ||
def to_json(state_data: dict) -> str: | ||
return json.dumps( | ||
{ | ||
"status_path": state_data["status_path"], | ||
"exec_path": state_data["exec_path"], | ||
} | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should these to_json()
functions be normal on the methods so that it's easier to add additional data to be serialized in the future? as-is, we'd need to track down any calls of to_json()
for any modified classes and ensure that the state_data dict that we build for those calls has any new fields
e.g.,
@staticmethod | |
def to_json(state_data: dict) -> str: | |
return json.dumps( | |
{ | |
"status_path": state_data["status_path"], | |
"exec_path": state_data["exec_path"], | |
} | |
) | |
def to_json(self) -> str: | |
return json.dumps( | |
{ | |
"status_path": self.status_path, | |
"exec_path": self.exec_path, | |
} | |
) |
i'm also debating whether or not we should have this return a dict and we call json.dumps() before saving, but that's probably not too big a change if we wanna do that later - and the current approach means that if something cannot be serialized to json, we'll get a better traceback :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, i see - a lot of these have state_data()
properties! i think that that makes this less pressing since the calls will (i assume) look like `SomeClass.to_json(some_object.state_data)
but it might still be nicer to switch since for things that do implement state_data() this would look like
@staticmethod | |
def to_json(state_data: dict) -> str: | |
return json.dumps( | |
{ | |
"status_path": state_data["status_path"], | |
"exec_path": state_data["exec_path"], | |
} | |
) | |
def to_json(self) -> str: | |
state_data = self.state_data | |
return json.dumps( | |
{ | |
"status_path": state_data["status_path"], | |
"exec_path": state_data["exec_path"], | |
} | |
) |
and for classes that don't have a state_data
property we'd be in the scenario described above and would benefit from this no longer being a staticmethod :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(another benefit would also be avoiding any potential circular imports in the future since we won't need to import classes just to call to_json() :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, i see - when this gets called in tron/serialize/runstate/dynamodb_state_store.py we just have a key and a dict ;_;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, maybe this is better for a post-pickle-deletion refactor where we update the simplified code to pass around the actual objects rather than state_data dicts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol, yeah basically my train of thought while writing all this. Your last comment is where I landed as well. It would have made this work a lot easier if we just had the objects. Ultimately it shouldn't be too terrible to refactor. Added a TODO/ticket.
def get_type_from_key(self, key: str) -> str: | ||
return key.split()[0] | ||
|
||
def _serialize_item(self, key: str, state: Dict[str, Any]) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this only accepts two values for key
, i think we can write key: Literal[runstate.JOB_STATE, runstate.JOB_RUN_STATE]
rather than key: str
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did have that, but was getting Variable not allowed in type expression
. I guess in this case I should have done Literal[job_state", "job_run_state"]
or maybe just pretended it's an enum and added # type: ignore
?
At any rate, I added the ignore, but let me know if you have a preference.
tron/utils/crontab.py
Outdated
return sorted_groups # type: ignore | ||
return sorted_groups # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we add comments for these ignores?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added types. This file was one of those dominoes that came up when adding types elsewhere. I tried to use my whole brain to do this one because it's kind of a spooky change, but I think I got it. Infrastage testing looks good, but it's not exhaustive
"json_val": { | ||
"S": json_val[index * OBJECT_SIZE : min(index * OBJECT_SIZE + OBJECT_SIZE, len(json_val))] | ||
}, | ||
"num_json_val_partitions": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i always thought that this num_partitions stuff was something that dynamo required 🤣 - i guess this is just for our usage (to know how many partitions we're using per-item?)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, exactly. We have key+index for our compound key, but we use num_partitions in a funny multi-step restore thing where we get all the first partitions, then for every one of those we try _get_remaining_partitions (which annoyingly will need to handle JSON having more partitions).
I debated at least adding a check before this to see if num_partitions was greater than 1, but just never got around to it. This might be worth a refactor after we've depickled things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally lgtm - we can hopefully refactor things a bit to make this even nicer after all the pickle logic is gone :p
i think my only remaining question is do we need to handle json failures in any specific way
(and i'll leave it up to you whether or not you'd like to merge the crontab changes)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we talked about this a bit in our 1:1, but i think it's fine to type: ignore things a bit and deal with fixing any type issues here for a later PR if we're not feeling particularly confident in these changes :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the sake of posterity, I kept this, but tried to maintain the logic as much as possible and added a few tests for good measure.
tron/actioncommand.py
Outdated
@@ -195,6 +202,15 @@ def __eq__(self, other): | |||
def __ne__(self, other): | |||
return not self == other | |||
|
|||
@staticmethod | |||
def to_json(state_data: dict) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from our 1:1: should we (or do we need to) handle failures in to_json? e.g., non-serializable data, missing data in the state_data dict leading to a KeyError?
if we want the json writing to be optional until we're reading from the json fields, we probably also want to make sure that json serialization failures don't result in losing pickle'd writes :)
6054272
to
2c41653
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i have one last question/thought around the save queue handling, but otherwise i think this is good to start testing in our non-prod envs
we should probably setup some log-based alerting (or maybe use the prometheus integration?) for whenever json writing errors happen since they're definitely things we'll want to handle ASAP as part of the pickle-removal process
tron/actioncommand.py
Outdated
except KeyError as e: | ||
log.error(f"Missing key in state_data: {e}") | ||
return None | ||
except Exception as e: | ||
log.error(f"Error serializing SubprocessActionRunnerFactory to JSON: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doing something like the below suggestion here and in the other to_json()
except KeyError as e: | |
log.error(f"Missing key in state_data: {e}") | |
return None | |
except Exception as e: | |
log.error(f"Error serializing SubprocessActionRunnerFactory to JSON: {e}") | |
except KeyError: | |
log.exception(f"Missing key in state_data:") | |
return None | |
except Exception: | |
log.exception(f"Error serializing SubprocessActionRunnerFactory to JSON:") |
might be nice since it'll include the full traceback
log.error(error) | ||
# Add items back to the queue if we failed to save |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thoughts on adding back to the save queue with just the pickled data? if there's any issues writing the json, a second json-less attempt might still succeed right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a solid idea. I don't think we see tron_dynamodb_save_failure
very often so it's a reasonable fallback. Would be nice to catch any partitioning issues.
When you say prom integration are you thinking some counter like json_serialization_errors.inc() on failed serialization, then alerting on that? |
@KaspariK yup - exactly! |
…d to_json methods for classes in DynamoDB restore flow. Write an additional attribute to DynamoDB to capture non-pickled state_data.
…odb_state_store to something a little more explanatory now that we have 2 versions
…saction limit, and change setitem signature for take value tuple
…ODO and ticket for regex issues
…rom pickle write so that we maintain writing pickles if JSON fails
… the pickle back into the queue
… what to do in dynamodb_state_store
2ce62ad
to
b0186d1
Compare
What
We store job and job run state as pickles. We would like to not do that. This is part of the path to not doing that by writing state data as JSON alongside our current pickles. Restoring from JSON will follow in another PR.
Why
In DAR-2328, the removal of the mesos-related code led to resetting of tron jobs state resulting in job runs starting from "0". The pickles weren't being unpickled correctly as this relies on classes that were deleted in the mesos code.