From 42789e2373b8ab56830f07d8c2b96882a3ea03b6 Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Wed, 11 Sep 2024 13:30:58 -0300 Subject: [PATCH] fix: wait for whole dataframe before dumping bigquery to parquet --- apps/api/src/python/query/bigquery.ts | 132 ++++---------------------- apps/api/src/python/query/index.ts | 2 +- 2 files changed, 22 insertions(+), 112 deletions(-) diff --git a/apps/api/src/python/query/bigquery.ts b/apps/api/src/python/query/bigquery.ts index d545b4b8..688ac185 100644 --- a/apps/api/src/python/query/bigquery.ts +++ b/apps/api/src/python/query/bigquery.ts @@ -90,113 +90,12 @@ def _briefer_make_bq_query(): df[col] = pd.to_datetime(df[col], errors='coerce').dt.time elif bq_type == 'NUMERIC': df[col] = pd.to_numeric(df[col], errors='coerce') - elif bq_type == "REPEATED": - print(json.dumps({"type": "log", "message": f"Converting column {col} to list"})) - df[col] = df[col].apply(lambda x: json.loads(json.dumps(x if x is not None else [], default=str))) - - # all types: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableFieldSchema.FIELDS.type - def object_encoding_from_schema(schema): - object_encoding = {} - for field in schema: - if field.mode == 'REPEATED': - object_encoding[field.name] = 'json' - elif field.field_type == 'STRING': - object_encoding[field.name] = 'utf8' - elif field.field_type == 'BYTES': - object_encoding[field.name] = 'bytes' - elif field.field_type == 'INTEGER': - object_encoding[field.name] = 'int' - elif field.field_type == 'INT64': - object_encoding[field.name] = 'int' - elif field.field_type == 'FLOAT': - object_encoding[field.name] = 'float' - elif field.field_type == 'FLOAT64': - object_encoding[field.name] = 'float' - elif field.field_type == 'NUMERIC': - object_encoding[field.name] = 'decimal' - elif field.field_type == 'BOOLEAN': - object_encoding[field.name] = 'bool' - elif field.field_type == 'BOOL': - object_encoding[field.name] = 'bool' - elif field.field_type == 'TIMESTAMP': - object_encoding[field.name] = 'int' - elif field.field_type == 'DATE': - object_encoding[field.name] = 'int' - elif field.field_type == 'TIME': - object_encoding[field.name] = 'int' - elif field.field_type == 'DATETIME': - object_encoding[field.name] = 'int' - elif field.field_type == 'GEOGRAPHY': - object_encoding[field.name] = 'json' - elif field.field_type == 'NUMERIC': - object_encoding[field.name] = 'decimal' - elif field.field_type == 'BIGNUMERIC': - object_encoding[field.name] = 'decimal' - elif field.field_type == 'JSON': - object_encoding[field.name] = 'json' - elif field.field_type == 'RECORD': - object_encoding[field.name] = 'json' - elif field.field_type == 'STRUCT': - object_encoding[field.name] = 'json' - elif field.field_type == 'RANGE': - object_encoding[field.name] = 'json' - else: - object_encoding[field.name] = 'json' - return object_encoding aborted = False dump_file_base = f'/home/jupyteruser/.briefer/query-${queryId}' parquet_file_path = f'{dump_file_base}.parquet.gzip' csv_file_path = f'{dump_file_base}.csv' - object_encoding = None - queue = queue.Queue() - def dump_worker(): - dumped_count = 0 - is_first = True - is_over = False - - if os.path.exists(parquet_file_path): - os.remove(parquet_file_path) - - if os.path.exists(csv_file_path): - os.remove(csv_file_path) - - while not (is_over or aborted): - first_chunk = queue.get() - if first_chunk is None: - break - chunks = [first_chunk] - while queue.qsize() > 0: - chunk = queue.get() - if chunk is None: - is_over = True - else: - chunks.append(chunk) - - chunk = pd.concat(chunks, ignore_index=True) - convert_columns(chunk, columns_by_type) - - mode = 'a' - if is_first: - mode = 'w' - os.makedirs('/home/jupyteruser/.briefer', exist_ok=True) - - try: - # write to parquet - print(json.dumps({"type": "log", "message": f"Dumping {len(chunk)} rows as parquet, queue size {queue.qsize()}"})) - chunk.to_parquet(parquet_file_path, compression='gzip', index=False, append=not is_first, engine="fastparquet", object_encoding=object_encoding) - - # write to csv - print(json.dumps({"type": "log", "message": f"Dumping {len(chunk)} rows as csv, queue size {queue.qsize()}"})) - chunk.to_csv(csv_file_path, index=False, mode=mode, header=is_first) - - is_first = False - dumped_count += len(chunk) - print(json.dumps({"type": "log", "message": f"Dumped {len(chunk)} rows, total {dumped_count} rows, queue size {queue.qsize()}"})) - except Exception as e: - print(json.dumps({"type": "log", "message": f"Error dumping chunk: {e}"})) - raise def rename_duplicates(df): """Renames duplicate columns in a DataFrame by appending a suffix.""" @@ -227,9 +126,6 @@ def _briefer_make_bq_query(): print(json.dumps({"type": "log", "message": "Creating flag file"})) open(flag_file_path, "a").close() - dump_thread = threading.Thread(target=dump_worker) - dump_thread.start() - try: print(json.dumps({"type": "log", "message": "Running query"})) @@ -238,13 +134,10 @@ def _briefer_make_bq_query(): print(json.dumps({"type": "log", "message": "Fetching resulting schema"})) schema = get_query_schema(${JSON.stringify(query)}, client) - object_encoding = object_encoding_from_schema(schema) columns_by_type = {} for field in schema: - if field.mode == 'REPEATED': - columns_by_type[field.name] = "REPEATED" - elif field.field_type in convertible_types: + if field.field_type in convertible_types: columns_by_type[field.name] = field.field_type print(json.dumps({"type": "log", "message": f"rows count {query_result.total_rows}"})) @@ -279,7 +172,6 @@ def _briefer_make_bq_query(): chunk = rename_duplicates(chunk) rows_count += len(chunk) - queue.put(chunk) chunks.append(chunk) if len(initial_rows) < 250: @@ -302,8 +194,6 @@ def _briefer_make_bq_query(): last_emitted_at = now print(json.dumps({"type": "log", "message": "Waiting for dump worker to finish"})) - queue.put(None) - dump_thread.join() if aborted or not os.path.exists(flag_file_path): print(json.dumps({"type": "log", "message": "Query aborted"})) @@ -324,6 +214,26 @@ def _briefer_make_bq_query(): "columns": columns, "count": rows_count } + + df = pd.concat(chunks, ignore_index=True) + convert_columns(df, columns_by_type) + + os.makedirs('/home/jupyteruser/.briefer', exist_ok=True) + + try: + # write to parquet + print(json.dumps({"type": "log", "message": f"Dumping {len(df)} rows as parquet"})) + df.to_parquet(parquet_file_path, compression='gzip', index=False) + + # write to csv + print(json.dumps({"type": "log", "message": f"Dumping {len(df)} rows as csv"})) + df.to_csv(csv_file_path, index=False, header=True) + + print(json.dumps({"type": "log", "message": f"Dumped {len(df)} rows"})) + except Exception as e: + print(json.dumps({"type": "log", "message": f"Error dumping df: {e}"})) + raise + print(json.dumps(result, default=str)) except BadRequest as e: error = { diff --git a/apps/api/src/python/query/index.ts b/apps/api/src/python/query/index.ts index 3e345269..9c9db081 100644 --- a/apps/api/src/python/query/index.ts +++ b/apps/api/src/python/query/index.ts @@ -224,7 +224,7 @@ def _briefer_read_query(): retries = 3 while retries > 0: try: - return pd.read_parquet("/home/jupyteruser/.briefer/query-${queryId}.parquet.gzip", engine="fastparquet") + return pd.read_parquet("/home/jupyteruser/.briefer/query-${queryId}.parquet.gzip") except: retries -= 1 if retries == 0: