Skip to content

Commit

Permalink
Merge pull request #11 from briefercloud/make-sure-object-parquet-typ…
Browse files Browse the repository at this point in the history
…es-are-correct

fix: Pass object_encoding from schema when dumping bigquery to parquet
  • Loading branch information
vieiralucas authored Sep 11, 2024
2 parents 234c2b5 + 3173628 commit 2cd17d7
Showing 1 changed file with 55 additions and 3 deletions.
58 changes: 55 additions & 3 deletions apps/api/src/python/query/bigquery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,62 @@ def _briefer_make_bq_query():
print(json.dumps({"type": "log", "message": f"Converting column {col} to list"}))
df[col] = df[col].apply(lambda x: json.loads(json.dumps(x if x is not None else [], default=str)))
# all types: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableFieldSchema.FIELDS.type
def object_encoding_from_schema(schema):
object_encoding = {}
for field in schema:
if field.mode == 'REPEATED':
object_encoding[field.name] = 'json'
elif field.field_type == 'STRING':
object_encoding[field.name] = 'utf8'
elif field.field_type == 'BYTES':
object_encoding[field.name] = 'bytes'
elif field.field_type == 'INTEGER':
object_encoding[field.name] = 'int'
elif field.field_type == 'INT64':
object_encoding[field.name] = 'int'
elif field.field_type == 'FLOAT':
object_encoding[field.name] = 'float'
elif field.field_type == 'FLOAT64':
object_encoding[field.name] = 'float'
elif field.field_type == 'NUMERIC':
object_encoding[field.name] = 'decimal'
elif field.field_type == 'BOOLEAN':
object_encoding[field.name] = 'bool'
elif field.field_type == 'BOOL':
object_encoding[field.name] = 'bool'
elif field.field_type == 'TIMESTAMP':
object_encoding[field.name] = 'int'
elif field.field_type == 'DATE':
object_encoding[field.name] = 'int'
elif field.field_type == 'TIME':
object_encoding[field.name] = 'int'
elif field.field_type == 'DATETIME':
object_encoding[field.name] = 'int'
elif field.field_type == 'GEOGRAPHY':
object_encoding[field.name] = 'json'
elif field.field_type == 'NUMERIC':
object_encoding[field.name] = 'decimal'
elif field.field_type == 'BIGNUMERIC':
object_encoding[field.name] = 'decimal'
elif field.field_type == 'JSON':
object_encoding[field.name] = 'json'
elif field.field_type == 'RECORD':
object_encoding[field.name] = 'json'
elif field.field_type == 'STRUCT':
object_encoding[field.name] = 'json'
elif field.field_type == 'RANGE':
object_encoding[field.name] = 'json'
else:
object_encoding[field.name] = 'json'
return object_encoding
aborted = False
dump_file_base = f'/home/jupyteruser/.briefer/query-${queryId}'
parquet_file_path = f'{dump_file_base}.parquet.gzip'
csv_file_path = f'{dump_file_base}.csv'
object_encoding = None
queue = queue.Queue()
def dump_worker():
dumped_count = 0
Expand Down Expand Up @@ -128,13 +179,13 @@ def _briefer_make_bq_query():
mode = 'a'
if is_first:
mode = 'w'
os.makedirs('/home/jupyteruser/.briefer', exist_ok=True)
mode = 'w'
os.makedirs('/home/jupyteruser/.briefer', exist_ok=True)
try:
# write to parquet
print(json.dumps({"type": "log", "message": f"Dumping {len(chunk)} rows as parquet, queue size {queue.qsize()}"}))
chunk.to_parquet(parquet_file_path, compression='gzip', index=False, append=not is_first, engine="fastparquet")
chunk.to_parquet(parquet_file_path, compression='gzip', index=False, append=not is_first, engine="fastparquet", object_encoding=object_encoding)
# write to csv
print(json.dumps({"type": "log", "message": f"Dumping {len(chunk)} rows as csv, queue size {queue.qsize()}"}))
Expand Down Expand Up @@ -187,6 +238,7 @@ def _briefer_make_bq_query():
print(json.dumps({"type": "log", "message": "Fetching resulting schema"}))
schema = get_query_schema(${JSON.stringify(query)}, client)
object_encoding = object_encoding_from_schema(schema)
columns_by_type = {}
for field in schema:
Expand Down

0 comments on commit 2cd17d7

Please sign in to comment.