Skip to content

Commit

Permalink
Handle bad JSONs (#190)
Browse files Browse the repository at this point in the history
* Handle bad JSONs

* Do it poperly this time

* Correct function name
  • Loading branch information
tunetheweb authored Jun 27, 2023
1 parent e4e8ff6 commit 6fa0bc8
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 17 deletions.
2 changes: 1 addition & 1 deletion modules/combined_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def create_pipeline(argv=None):
if combined_options.pipeline_type in ["combined", "non-summary"]:
(
files
| "MapJSON" >> beam.MapTuple(non_summary_pipeline.from_json)
| "MapJSON" >> beam.FlatMapTuple(non_summary_pipeline.from_json)
| "AddDateAndClient" >> beam.Map(non_summary_pipeline.add_date_and_client)
| "WriteNonSummaryTables"
>> non_summary_pipeline.WriteNonSummaryToBigQuery(
Expand Down
4 changes: 2 additions & 2 deletions modules/import_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ def from_json(file_name, string):
try:
return file_name, json.loads(string)
except json.JSONDecodeError as err:
logging.error('Unable to parse JSON object "%s...": %s', string[:50], err)
logging.error('Unable to parse file %s into JSON object "%s...": %s' % (file_name, string[:50], err))
return None, None


Expand Down Expand Up @@ -511,7 +511,7 @@ def create_pipeline(argv=None):
hars = (
pipeline
| ReadHarFiles(**vars(known_args))
| "MapJSON" >> beam.MapTuple(from_json)
| "MapJSON" >> beam.FlatMapTuple(from_json)
)

_ = (
Expand Down
32 changes: 18 additions & 14 deletions modules/non_summary_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,27 +435,31 @@ def from_json(file_name, element):
"""Returns an object from the JSON representation."""

try:
return file_name, json.loads(element)
return [(file_name, json.loads(element))]
except Exception as e:
logging.error('Unable to parse JSON object "%s...": %s' % (element[:50], e))
return None
logging.error('Unable to parse file %s into JSON object "%s...": %s' % (file_name, element[:50], e))
return [(None)]


def add_date_and_client(element):
"""Adds `date` and `client` attributes to facilitate BigQuery table routing"""

file_name, har = element
date, client = utils.date_and_client_from_file_name(file_name)
page = har.get("log").get("pages")[0]
metadata = page.get("_metadata", {})
har.update(
{
"date": "{:%Y_%m_%d}".format(date),
"client": metadata.get("layout", client).lower(),
}
)
try:
file_name, har = element
date, client = utils.date_and_client_from_file_name(file_name)
page = har.get("log").get("pages")[0]
metadata = page.get("_metadata", {})
har.update(
{
"date": "{:%Y_%m_%d}".format(date),
"client": metadata.get("layout", client).lower(),
}
)

return har
return har
except Exception as e:
logging.error('Unable to add date and client "%s...": %s' % (element[:50], e))
return None


class WriteNonSummaryToBigQuery(beam.PTransform):
Expand Down

0 comments on commit 6fa0bc8

Please sign in to comment.