Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix handling values out of bounds in pandas #272

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions apps/api/src/python/query/bigquery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export async function makeBigQueryQuery(

const code = `
def _briefer_make_bq_query():
from datetime import datetime
from google.cloud import bigquery
from google.cloud import bigquery_storage
from google.oauth2 import service_account
Expand All @@ -48,6 +49,7 @@ def _briefer_make_bq_query():
from concurrent.futures import ThreadPoolExecutor
import threading
import queue
import pytz

def get_columns(df):
columns = []
Expand Down Expand Up @@ -86,8 +88,14 @@ def _briefer_make_bq_query():
if bq_type in ['DATE', 'DATETIME']:
df[col] = pd.to_datetime(df[col], errors='coerce')
elif bq_type == 'TIMESTAMP':
# Convert to timezone-aware datetime, preserving the original time zone
df[col] = pd.to_datetime(df[col], errors='coerce', utc=True).dt.tz_convert(None)
# handle out-of-bounds dates
fallback_date = datetime(1, 1, 1, 0, 0, 0, 0, tzinfo=pytz.utc)
max_date = datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc)
df[col] = df[col].apply(lambda x: max_date if not pd.isnull(x) and x > max_date else x)
df[col] = df[col].apply(lambda x: fallback_date if not pd.isnull(x) and x < fallback_date else x)

# make sure timezone is in utc and preserve timezone info
df[col] = df[col].apply(lambda x: x.astimezone(pytz.utc) if not pd.isnull(x) and x.tzinfo else x)
elif bq_type == 'TIME':
df[col] = pd.to_datetime(df[col], errors='coerce').dt.time
elif bq_type == 'NUMERIC':
Expand Down Expand Up @@ -158,23 +166,26 @@ def _briefer_make_bq_query():
return df

bq_storage_client = bigquery_storage.BigQueryReadClient(credentials=credentials)
df_iter = query_result.to_dataframe_iterable(bqstorage_client=bq_storage_client)
arrow_iter = query_result.to_arrow_iterable(bqstorage_client=bq_storage_client)
df = pd.DataFrame()

initial_rows = []
columns = None
last_emitted_at = 0
chunks = []
rows_count = 0
for chunk in df_iter:
for chunk in arrow_iter:
if not os.path.exists(flag_file_path):
print(json.dumps({"type": "log", "message": "Query aborted"}))
aborted = True
break

chunk = rename_duplicates(chunk)
rows_count += len(chunk)
chunks.append(chunk)
# make sure that to_pandas does not crash for dates out of bounds
chunk_df = chunk.to_pandas(timestamp_as_object=True)

chunk_df = rename_duplicates(chunk_df)
rows_count += len(chunk_df)
chunks.append(chunk_df)

if len(initial_rows) < 250:
df = pd.concat(chunks, ignore_index=True)
Expand Down
7 changes: 5 additions & 2 deletions apps/api/src/python/query/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,13 @@ export async function makeQuery(

const code = `
def _briefer_read_query():
import pyarrow.parquet as pq
import pandas as pd
retries = 3
while retries > 0:
try:
return pd.read_parquet("/home/jupyteruser/.briefer/query-${queryId}.parquet.gzip")
table = pq.read_table("/home/jupyteruser/.briefer/query-${queryId}.parquet.gzip")
return table.to_pandas(timestamp_as_object=True)
except:
retries -= 1
if retries == 0:
Expand Down Expand Up @@ -347,8 +349,9 @@ export async function readDataframePage(

if not ("${dataframeName}" in globals()):
import pandas as pd
import pyarrow.parquet as pq
try:
${dataframeName} = pd.read_parquet("/home/jupyteruser/.briefer/query-${queryId}.parquet.gzip")
${dataframeName} = pq.read_table("/home/jupyteruser/.briefer/query-${queryId}.parquet.gzip").to_pandas(timestamp_as_object=True)
except:
print(json.dumps({"type": "not-found"}))

Expand Down