@@ -125,8 +125,7 @@ def mock_bq_client():
125125def mock_write_client ():
126126 # Updated patch path to match the new import structure in src
127127 with mock .patch (
128- "google.cloud.bigquery_storage_v1.services.big_query_write.async_client.BigQueryWriteAsyncClient" ,
129- autospec = True ,
128+ "google.cloud.bigquery_storage_v1.BigQueryWriteAsyncClient" , autospec = True
130129 ) as mock_cls :
131130 mock_client = mock_cls .return_value
132131 mock_client .transport = mock .AsyncMock ()
@@ -298,13 +297,7 @@ async def test_event_allowlist(
298297 mock_write_client .append_rows .assert_called_once ()
299298 mock_write_client .append_rows .reset_mock ()
300299
301- # Re-init plugin logic since close() shuts it down, but for this test we want to test denial
302- # However, close() cleans up clients. We should probably create a new plugin or just check that the task was not created.
303- # But on_user_message_callback will try to log.
304- # To keep it simple, let's just use a fresh plugin for the second part or assume close() resets state enough to re-run _ensure_init if needed,
305- # but _ensure_init is called inside _perform_write.
306- # Actually, close() sets _is_shutting_down to True, so further logs are ignored.
307- # So we need a new plugin instance or reset _is_shutting_down.
300+ # Re-init plugin logic since close() shuts it down
308301 plugin ._is_shutting_down = False
309302
310303 user_message = types .Content (parts = [types .Part (text = "What is up?" )])
@@ -399,6 +392,44 @@ def mutate_payload(data):
399392 assert content ["model" ] == "GEMINI-PRO"
400393 assert content ["prompt" ][0 ]["role" ] == "user"
401394
395+ @pytest .mark .asyncio
396+ async def test_content_formatter_error_fallback (
397+ self ,
398+ mock_write_client ,
399+ invocation_context ,
400+ mock_auth_default ,
401+ mock_bq_client ,
402+ mock_to_arrow_schema ,
403+ dummy_arrow_schema ,
404+ mock_asyncio_to_thread ,
405+ ):
406+ """Tests that if content_formatter fails, the original payload is used."""
407+
408+ def error_formatter (data ):
409+ raise ValueError ("Formatter failed" )
410+
411+ config = BigQueryLoggerConfig (content_formatter = error_formatter )
412+ plugin = bigquery_agent_analytics_plugin .BigQueryAgentAnalyticsPlugin (
413+ PROJECT_ID , DATASET_ID , TABLE_ID , config
414+ )
415+ await plugin ._ensure_init ()
416+ mock_write_client .append_rows .reset_mock ()
417+
418+ user_message = types .Content (parts = [types .Part (text = "Original message" )])
419+
420+ # This triggers the log. Internal logic catches exception and proceeds.
421+ await plugin .on_user_message_callback (
422+ invocation_context = invocation_context , user_message = user_message
423+ )
424+ await plugin .close ()
425+
426+ mock_write_client .append_rows .assert_called_once ()
427+ log_entry = _get_captured_event_dict (mock_write_client , dummy_arrow_schema )
428+
429+ # Verify that despite the error, we still got the original data
430+ content = json .loads (log_entry ["content" ])
431+ assert content ["text" ] == "Original message"
432+
402433 @pytest .mark .asyncio
403434 async def test_max_content_length_smart_truncation (
404435 self ,
@@ -725,7 +756,9 @@ async def test_before_tool_callback_logs_correctly(
725756 type(mock_tool ).name = mock .PropertyMock (return_value = "MyTool" )
726757 type(mock_tool ).description = mock .PropertyMock (return_value = "Description" )
727758 await bq_plugin_inst .before_tool_callback (
728- tool = mock_tool , tool_args = {"param" : "value" }, tool_context = tool_context
759+ tool = mock_tool ,
760+ tool_args = {"param" : "value" },
761+ tool_context = tool_context ,
729762 )
730763 await bq_plugin_inst .close ()
731764 log_entry = _get_captured_event_dict (mock_write_client , dummy_arrow_schema )
0 commit comments