Skip to content

Commit

Permalink
fix handling values out of bounds in pandas
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasfcosta committed Nov 30, 2024
1 parent 542724b commit fe56890
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 9 deletions.
30 changes: 23 additions & 7 deletions apps/api/src/python/query/bigquery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export async function makeBigQueryQuery(

const code = `
def _briefer_make_bq_query():
from datetime import datetime
from google.cloud import bigquery
from google.cloud import bigquery_storage
from google.oauth2 import service_account
Expand All @@ -48,6 +49,7 @@ def _briefer_make_bq_query():
from concurrent.futures import ThreadPoolExecutor
import threading
import queue
import pytz
def get_columns(df):
columns = []
Expand Down Expand Up @@ -86,8 +88,19 @@ def _briefer_make_bq_query():
if bq_type in ['DATE', 'DATETIME']:
df[col] = pd.to_datetime(df[col], errors='coerce')
elif bq_type == 'TIMESTAMP':
# Convert to timezone-aware datetime, preserving the original time zone
df[col] = pd.to_datetime(df[col], errors='coerce', utc=True).dt.tz_convert(None)
# Define fallback and maximum possible date
# This is useful because timestamps that are too small or large for pandas are converted to NaT
fallback_date = datetime(1, 1, 1, 0, 0, 0, 0)
max_date = datetime(9999, 12, 31, 23, 59, 59, 999999)
# make sure timezone is in utc and remove timezone info
df[col] = df[col].apply(lambda x: x.astimezone(pytz.utc).replace(tzinfo=None))
# Replace large values with the maximum possible date
df[col] = df[col].apply(lambda x: max_date if x > max_date else x)
# Replace small values with the fallback date
df[col] = df[col].apply(lambda x: fallback_date if x < fallback_date else x)
elif bq_type == 'TIME':
df[col] = pd.to_datetime(df[col], errors='coerce').dt.time
elif bq_type == 'NUMERIC':
Expand Down Expand Up @@ -158,23 +171,26 @@ def _briefer_make_bq_query():
return df
bq_storage_client = bigquery_storage.BigQueryReadClient(credentials=credentials)
df_iter = query_result.to_dataframe_iterable(bqstorage_client=bq_storage_client)
arrow_iter = query_result.to_arrow_iterable(bqstorage_client=bq_storage_client)
df = pd.DataFrame()
initial_rows = []
columns = None
last_emitted_at = 0
chunks = []
rows_count = 0
for chunk in df_iter:
for chunk in arrow_iter:
if not os.path.exists(flag_file_path):
print(json.dumps({"type": "log", "message": "Query aborted"}))
aborted = True
break
chunk = rename_duplicates(chunk)
rows_count += len(chunk)
chunks.append(chunk)
# make sure that to_pandas does not crash for dates out of bounds
chunk_df = chunk.to_pandas(timestamp_as_object=True)
chunk_df = rename_duplicates(chunk_df)
rows_count += len(chunk_df)
chunks.append(chunk_df)
if len(initial_rows) < 250:
df = pd.concat(chunks, ignore_index=True)
Expand Down
7 changes: 5 additions & 2 deletions apps/api/src/python/query/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,13 @@ export async function makeQuery(

const code = `
def _briefer_read_query():
import pyarrow.parquet as pq
import pandas as pd
retries = 3
while retries > 0:
try:
return pd.read_parquet("/home/jupyteruser/.briefer/query-${queryId}.parquet.gzip")
table = pq.read_table("/home/jupyteruser/.briefer/query-${queryId}.parquet.gzip")
return table.to_pandas(timestamp_as_object=True)
except:
retries -= 1
if retries == 0:
Expand Down Expand Up @@ -347,8 +349,9 @@ export async function readDataframePage(
if not ("${dataframeName}" in globals()):
import pandas as pd
import pyarrow.parquet as pq
try:
${dataframeName} = pd.read_parquet("/home/jupyteruser/.briefer/query-${queryId}.parquet.gzip")
${dataframeName} = pq.read_table("/home/jupyteruser/.briefer/query-${queryId}.parquet.gzip").to_pandas(timestamp_as_object=True)
except:
print(json.dumps({"type": "not-found"}))
Expand Down

0 comments on commit fe56890

Please sign in to comment.