Skip to content

Commit

Permalink
fix: pass object_encoding from schema when dumping bigquery to parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
vieiralucas committed Sep 11, 2024
1 parent 234c2b5 commit 2e7d424
Showing 1 changed file with 40 additions and 3 deletions.
43 changes: 40 additions & 3 deletions apps/api/src/python/query/bigquery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,47 @@ def _briefer_make_bq_query():
print(json.dumps({"type": "log", "message": f"Converting column {col} to list"}))
df[col] = df[col].apply(lambda x: json.loads(json.dumps(x if x is not None else [], default=str)))
def object_encoding_from_schema(schema):
object_encoding = {}
for field in schema:
if field.field_type == 'STRING':
object_encoding[field.name] = 'utf8'
elif field.field_type == 'BYTES':
object_encoding[field.name] = 'bytes'
elif field.field_type == 'INTEGER':
object_encoding[field.name] = 'int'
elif field.field_type == 'INT64':
object_encoding[field.name] = 'int'
elif field.field_type == 'FLOAT64':
object_encoding[field.name] = 'float'
elif field.field_type == 'NUMERIC':
object_encoding[field.name] = 'decimal'
elif field.field_type == 'BOOL':
object_encoding[field.name] = 'bool'
elif field.field_type == 'TIMESTAMP':
object_encoding[field.name] = 'datetime'
elif field.field_type == 'DATE':
object_encoding[field.name] = 'date'
elif field.field_type == 'TIME':
object_encoding[field.name] = 'time'
elif field.field_type == 'DATETIME':
object_encoding[field.name] = 'datetime'
elif field.field_type == 'RECORD':
object_encoding[field.name] = 'json'
elif field.field_type == 'ARRAY':
object_encoding[field.name] = 'json'
elif field.field_type == 'STRUCT':
object_encoding[field.name] = 'json'
else:
object_encoding[field.name] = 'json'
return object_encoding
aborted = False
dump_file_base = f'/home/jupyteruser/.briefer/query-${queryId}'
parquet_file_path = f'{dump_file_base}.parquet.gzip'
csv_file_path = f'{dump_file_base}.csv'
object_encoding = None
queue = queue.Queue()
def dump_worker():
dumped_count = 0
Expand Down Expand Up @@ -128,13 +164,13 @@ def _briefer_make_bq_query():
mode = 'a'
if is_first:
mode = 'w'
os.makedirs('/home/jupyteruser/.briefer', exist_ok=True)
mode = 'w'
os.makedirs('/home/jupyteruser/.briefer', exist_ok=True)
try:
# write to parquet
print(json.dumps({"type": "log", "message": f"Dumping {len(chunk)} rows as parquet, queue size {queue.qsize()}"}))
chunk.to_parquet(parquet_file_path, compression='gzip', index=False, append=not is_first, engine="fastparquet")
chunk.to_parquet(parquet_file_path, compression='gzip', index=False, append=not is_first, engine="fastparquet", object_encoding=object_encoding)
# write to csv
print(json.dumps({"type": "log", "message": f"Dumping {len(chunk)} rows as csv, queue size {queue.qsize()}"}))
Expand Down Expand Up @@ -187,6 +223,7 @@ def _briefer_make_bq_query():
print(json.dumps({"type": "log", "message": "Fetching resulting schema"}))
schema = get_query_schema(${JSON.stringify(query)}, client)
object_encoding = object_encoding_from_schema(schema)
columns_by_type = {}
for field in schema:
Expand Down

0 comments on commit 2e7d424

Please sign in to comment.