2828from google .api_core .gapic_v1 import client_info as gapic_client_info
2929import google .auth
3030from google .cloud import bigquery
31+ from google .cloud import bigquery_storage_v1
3132from google .cloud .bigquery import schema as bq_schema
3233from google .cloud .bigquery_storage_v1 import types as bq_storage_types
33- from google .cloud .bigquery_storage_v1 .services .big_query_write .async_client import BigQueryWriteAsyncClient
3434from google .genai import types
3535import pyarrow as pa
3636
@@ -221,7 +221,7 @@ class BigQueryLoggerConfig:
221221 event_allowlist : Optional [List [str ]] = None
222222 event_denylist : Optional [List [str ]] = None
223223 # Custom formatter is discouraged now that we use JSON, but kept for compat
224- content_formatter : Optional [Callable [[Any ], str ]] = None
224+ content_formatter : Optional [Callable [[dict ], dict ]] = None
225225 shutdown_timeout : float = 5.0
226226 client_close_timeout : float = 2.0
227227 # Increased default limit to 50KB since we truncate per-field, not per-row
@@ -307,7 +307,11 @@ def __init__(
307307 )
308308 self ._config = config if config else BigQueryLoggerConfig ()
309309 self ._bq_client : bigquery .Client | None = None
310- self ._write_client : BigQueryWriteAsyncClient | None = None
310+ # Type alias update: Use the class from the top-level package import
311+ self ._write_client : (
312+ bigquery_storage_v1 .services .big_query_write .async_client .BigQueryWriteAsyncClient
313+ | None
314+ ) = None
311315 self ._init_lock : asyncio .Lock | None = None
312316 self ._arrow_schema : pa .Schema | None = None
313317 self ._background_tasks : set [asyncio .Task ] = set ()
@@ -407,7 +411,8 @@ def create_resources():
407411
408412 await asyncio .to_thread (create_resources )
409413
410- self ._write_client = BigQueryWriteAsyncClient (
414+ # Fix: Use the top-level package import to avoid "cli" substring in path
415+ self ._write_client = bigquery_storage_v1 .services .big_query_write .async_client .BigQueryWriteAsyncClient (
411416 credentials = creds ,
412417 client_info = client_info ,
413418 )
@@ -446,7 +451,11 @@ async def _perform_write(self, row: dict):
446451 ):
447452 if resp .error .code != 0 :
448453 msg = resp .error .message
449- if "schema mismatch" in msg .lower ():
454+ if (
455+ "schema mismatch" in msg .lower ()
456+ or "field" in msg .lower ()
457+ or "type" in msg .lower ()
458+ ):
450459 logging .error (
451460 "BQ Plugin: Schema Mismatch. You may need to delete the"
452461 " existing table if you migrated from STRING content to JSON"
@@ -459,9 +468,6 @@ async def _perform_write(self, row: dict):
459468 except RuntimeError as e :
460469 if "Event loop is closed" not in str (e ) and not self ._is_shutting_down :
461470 logging .error ("BQ Plugin: Runtime Error during write:" , exc_info = True )
462- except asyncio .CancelledError :
463- if not self ._is_shutting_down :
464- logging .warning ("BQ Plugin: Write task cancelled unexpectedly." )
465471 except Exception :
466472 logging .error ("BQ Plugin: Write Failed:" , exc_info = True )
467473
@@ -657,6 +663,7 @@ async def on_event_callback(
657663 "invocation_id" : invocation_context .invocation_id ,
658664 "user_id" : invocation_context .session .user_id ,
659665 "error_message" : event .error_message ,
666+ "timestamp" : datetime .fromtimestamp (event .timestamp , timezone .utc ),
660667 },
661668 content_payload = payload ,
662669 )
@@ -790,14 +797,14 @@ async def before_model_callback(
790797
791798 payload = {
792799 "model" : llm_request .model or "default" ,
793- "params" : params ,
800+ "params" : params if params else None ,
794801 "tools_available" : (
795802 list (llm_request .tools_dict .keys ())
796803 if llm_request .tools_dict
797- else []
804+ else None
798805 ),
799806 "system_instruction" : system_instr ,
800- "prompt" : prompt_history ,
807+ "prompt" : prompt_history if prompt_history else None ,
801808 }
802809
803810 await self ._log (
@@ -888,7 +895,6 @@ async def before_tool_callback(
888895 If individual string fields exceed `max_content_length`, they are truncated
889896 to preserve the valid JSON structure.
890897 """
891-
892898 payload = {
893899 "tool_name" : tool .name if tool .name else None ,
894900 "description" : tool .description if tool .description else None ,
@@ -923,7 +929,10 @@ async def after_tool_callback(
923929 If individual string fields exceed `max_content_length`, they are truncated
924930 to preserve the valid JSON structure.
925931 """
926- payload = {"tool_name" : tool .name if tool .name else None , "result" : result if result else None }
932+ payload = {
933+ "tool_name" : tool .name if tool .name else None ,
934+ "result" : result if result else None ,
935+ }
927936 await self ._log (
928937 {
929938 "event_type" : "TOOL_COMPLETED" ,
@@ -977,7 +986,10 @@ async def on_tool_error_callback(
977986 If individual string fields exceed `max_content_length`, they are truncated
978987 to preserve the valid JSON structure.
979988 """
980- payload = {"tool_name" : tool .name if tool .name else None , "arguments" : tool_args if tool_args else None }
989+ payload = {
990+ "tool_name" : tool .name if tool .name else None ,
991+ "arguments" : tool_args if tool_args else None ,
992+ }
981993 await self ._log (
982994 {
983995 "event_type" : "TOOL_ERROR" ,
0 commit comments