Skip to content

Commit

Permalink
fix: wait for whole dataframe before dumping bigquery to parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
vieiralucas committed Sep 11, 2024
1 parent 756d03d commit 42789e2
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 112 deletions.
132 changes: 21 additions & 111 deletions apps/api/src/python/query/bigquery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,113 +90,12 @@ def _briefer_make_bq_query():
df[col] = pd.to_datetime(df[col], errors='coerce').dt.time
elif bq_type == 'NUMERIC':
df[col] = pd.to_numeric(df[col], errors='coerce')
elif bq_type == "REPEATED":
print(json.dumps({"type": "log", "message": f"Converting column {col} to list"}))
df[col] = df[col].apply(lambda x: json.loads(json.dumps(x if x is not None else [], default=str)))
# all types: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableFieldSchema.FIELDS.type
def object_encoding_from_schema(schema):
object_encoding = {}
for field in schema:
if field.mode == 'REPEATED':
object_encoding[field.name] = 'json'
elif field.field_type == 'STRING':
object_encoding[field.name] = 'utf8'
elif field.field_type == 'BYTES':
object_encoding[field.name] = 'bytes'
elif field.field_type == 'INTEGER':
object_encoding[field.name] = 'int'
elif field.field_type == 'INT64':
object_encoding[field.name] = 'int'
elif field.field_type == 'FLOAT':
object_encoding[field.name] = 'float'
elif field.field_type == 'FLOAT64':
object_encoding[field.name] = 'float'
elif field.field_type == 'NUMERIC':
object_encoding[field.name] = 'decimal'
elif field.field_type == 'BOOLEAN':
object_encoding[field.name] = 'bool'
elif field.field_type == 'BOOL':
object_encoding[field.name] = 'bool'
elif field.field_type == 'TIMESTAMP':
object_encoding[field.name] = 'int'
elif field.field_type == 'DATE':
object_encoding[field.name] = 'int'
elif field.field_type == 'TIME':
object_encoding[field.name] = 'int'
elif field.field_type == 'DATETIME':
object_encoding[field.name] = 'int'
elif field.field_type == 'GEOGRAPHY':
object_encoding[field.name] = 'json'
elif field.field_type == 'NUMERIC':
object_encoding[field.name] = 'decimal'
elif field.field_type == 'BIGNUMERIC':
object_encoding[field.name] = 'decimal'
elif field.field_type == 'JSON':
object_encoding[field.name] = 'json'
elif field.field_type == 'RECORD':
object_encoding[field.name] = 'json'
elif field.field_type == 'STRUCT':
object_encoding[field.name] = 'json'
elif field.field_type == 'RANGE':
object_encoding[field.name] = 'json'
else:
object_encoding[field.name] = 'json'
return object_encoding
aborted = False
dump_file_base = f'/home/jupyteruser/.briefer/query-${queryId}'
parquet_file_path = f'{dump_file_base}.parquet.gzip'
csv_file_path = f'{dump_file_base}.csv'
object_encoding = None
queue = queue.Queue()
def dump_worker():
dumped_count = 0
is_first = True
is_over = False
if os.path.exists(parquet_file_path):
os.remove(parquet_file_path)
if os.path.exists(csv_file_path):
os.remove(csv_file_path)
while not (is_over or aborted):
first_chunk = queue.get()
if first_chunk is None:
break
chunks = [first_chunk]
while queue.qsize() > 0:
chunk = queue.get()
if chunk is None:
is_over = True
else:
chunks.append(chunk)
chunk = pd.concat(chunks, ignore_index=True)
convert_columns(chunk, columns_by_type)
mode = 'a'
if is_first:
mode = 'w'
os.makedirs('/home/jupyteruser/.briefer', exist_ok=True)
try:
# write to parquet
print(json.dumps({"type": "log", "message": f"Dumping {len(chunk)} rows as parquet, queue size {queue.qsize()}"}))
chunk.to_parquet(parquet_file_path, compression='gzip', index=False, append=not is_first, engine="fastparquet", object_encoding=object_encoding)
# write to csv
print(json.dumps({"type": "log", "message": f"Dumping {len(chunk)} rows as csv, queue size {queue.qsize()}"}))
chunk.to_csv(csv_file_path, index=False, mode=mode, header=is_first)
is_first = False
dumped_count += len(chunk)
print(json.dumps({"type": "log", "message": f"Dumped {len(chunk)} rows, total {dumped_count} rows, queue size {queue.qsize()}"}))
except Exception as e:
print(json.dumps({"type": "log", "message": f"Error dumping chunk: {e}"}))
raise
def rename_duplicates(df):
"""Renames duplicate columns in a DataFrame by appending a suffix."""
Expand Down Expand Up @@ -227,9 +126,6 @@ def _briefer_make_bq_query():
print(json.dumps({"type": "log", "message": "Creating flag file"}))
open(flag_file_path, "a").close()
dump_thread = threading.Thread(target=dump_worker)
dump_thread.start()
try:
print(json.dumps({"type": "log", "message": "Running query"}))
Expand All @@ -238,13 +134,10 @@ def _briefer_make_bq_query():
print(json.dumps({"type": "log", "message": "Fetching resulting schema"}))
schema = get_query_schema(${JSON.stringify(query)}, client)
object_encoding = object_encoding_from_schema(schema)
columns_by_type = {}
for field in schema:
if field.mode == 'REPEATED':
columns_by_type[field.name] = "REPEATED"
elif field.field_type in convertible_types:
if field.field_type in convertible_types:
columns_by_type[field.name] = field.field_type
print(json.dumps({"type": "log", "message": f"rows count {query_result.total_rows}"}))
Expand Down Expand Up @@ -279,7 +172,6 @@ def _briefer_make_bq_query():
chunk = rename_duplicates(chunk)
rows_count += len(chunk)
queue.put(chunk)
chunks.append(chunk)
if len(initial_rows) < 250:
Expand All @@ -302,8 +194,6 @@ def _briefer_make_bq_query():
last_emitted_at = now
print(json.dumps({"type": "log", "message": "Waiting for dump worker to finish"}))
queue.put(None)
dump_thread.join()
if aborted or not os.path.exists(flag_file_path):
print(json.dumps({"type": "log", "message": "Query aborted"}))
Expand All @@ -324,6 +214,26 @@ def _briefer_make_bq_query():
"columns": columns,
"count": rows_count
}
df = pd.concat(chunks, ignore_index=True)
convert_columns(df, columns_by_type)
os.makedirs('/home/jupyteruser/.briefer', exist_ok=True)
try:
# write to parquet
print(json.dumps({"type": "log", "message": f"Dumping {len(df)} rows as parquet"}))
df.to_parquet(parquet_file_path, compression='gzip', index=False)
# write to csv
print(json.dumps({"type": "log", "message": f"Dumping {len(df)} rows as csv"}))
df.to_csv(csv_file_path, index=False, header=True)
print(json.dumps({"type": "log", "message": f"Dumped {len(df)} rows"}))
except Exception as e:
print(json.dumps({"type": "log", "message": f"Error dumping df: {e}"}))
raise
print(json.dumps(result, default=str))
except BadRequest as e:
error = {
Expand Down
2 changes: 1 addition & 1 deletion apps/api/src/python/query/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def _briefer_read_query():
retries = 3
while retries > 0:
try:
return pd.read_parquet("/home/jupyteruser/.briefer/query-${queryId}.parquet.gzip", engine="fastparquet")
return pd.read_parquet("/home/jupyteruser/.briefer/query-${queryId}.parquet.gzip")
except:
retries -= 1
if retries == 0:
Expand Down

0 comments on commit 42789e2

Please sign in to comment.