diff --git a/csharp/src/StatementExecution/StatementExecutionStatement.cs b/csharp/src/StatementExecution/StatementExecutionStatement.cs index 55784d93..6aaf6f2c 100644 --- a/csharp/src/StatementExecution/StatementExecutionStatement.cs +++ b/csharp/src/StatementExecution/StatementExecutionStatement.cs @@ -760,29 +760,37 @@ public override void Dispose() { if (_currentStatementId != null) { - try + // Start a dedicated span instead of annotating Activity.Current: Dispose is + // typically called outside any ambient activity (e.g. connection pooling), + // so Activity.Current is usually null here and the event would be dropped. + // Mirrors DatabricksCompositeReader.Dispose on the Thrift path, which owns + // its own span so the close is always traced regardless of caller context. + this.TraceActivity(activity => { - // Close statement synchronously during dispose - Activity.Current?.AddEvent(new ActivityEvent("statement.dispose", - tags: new ActivityTagsCollection - { - { "statement_id", _currentStatementId } - })); - _client.CloseStatementAsync(_currentStatementId, CancellationToken.None).GetAwaiter().GetResult(); - } - catch (Exception ex) - { - // Best effort - ignore errors during dispose - Activity.Current?.AddEvent(new ActivityEvent("statement.dispose.error", - tags: new ActivityTagsCollection - { - { "error", ex.Message } - })); - } - finally - { - _currentStatementId = null; - } + try + { + // Close statement synchronously during dispose + activity?.AddEvent(new ActivityEvent("statement.dispose", + tags: new ActivityTagsCollection + { + { "statement_id", _currentStatementId } + })); + _client.CloseStatementAsync(_currentStatementId, CancellationToken.None).GetAwaiter().GetResult(); + } + catch (Exception ex) + { + // Best effort - ignore errors during dispose + activity?.AddEvent(new ActivityEvent("statement.dispose.error", + tags: new ActivityTagsCollection + { + { "error", ex.Message } + })); + } + finally + { + _currentStatementId = null; + } + }, activityName: nameof(StatementExecutionStatement) + "." + nameof(Dispose)); } } diff --git a/csharp/test/E2E/CloseOperationE2ETest.cs b/csharp/test/E2E/CloseOperationE2ETest.cs index 2696aa54..ed8b1c22 100644 --- a/csharp/test/E2E/CloseOperationE2ETest.cs +++ b/csharp/test/E2E/CloseOperationE2ETest.cs @@ -114,16 +114,17 @@ public static IEnumerable TestCases() => ]; /// - /// Validates that DatabricksCompositeReader.Dispose emits the composite_reader.close_operation - /// trace event for all result delivery modes when the reader is disposed without closing - /// the underlying connection (simulating connection pooling). + /// Validates that disposing a statement releases the server-side operation across all + /// result delivery modes when the connection is not closed (simulating connection pooling). + /// The protocol is inherited from config / the CI matrix, and the assertion branches: /// - /// The composite_reader.close_operation event is only present in the code path introduced - /// by the fix. Without the fix: - /// - Inline (DirectResults or not): event missing because _activeReader != null caused - /// the old code to delegate to _activeReader.Dispose() instead. - /// - CloudFetch: same delegation, but CloseOperation is never sent at all, orphaning - /// the server operation for ~1 hour. + /// - Thrift: DatabricksCompositeReader.Dispose must emit composite_reader.close_operation. + /// Without the fix this event is missing — Inline (DirectResults or not) because the old + /// code delegated to _activeReader.Dispose(); CloudFetch because CloseOperation was never + /// sent at all, orphaning the server operation for ~1 hour. + /// - REST/SEA: there is no composite reader; StatementExecutionStatement.Dispose must emit + /// statement.dispose, which brackets the CloseStatementAsync (HTTP DELETE) that releases + /// the server statement. /// /// For Thrift wire-level assertions, see proxy-based tests in databricks-driver-test: /// CLOUDFETCH-013 through CLOUDFETCH-016. @@ -134,9 +135,14 @@ public async Task DisposeEmitsCloseOperationEvent(string description, string que { lock (_capturedEventsLock) { _capturedEvents.Clear(); } + // Inherit the protocol from config / CI matrix rather than hardcoding it. + // The two protocols release the server-side operation through different + // code paths, so the assertion below branches on the effective protocol. + string protocol = string.IsNullOrEmpty(TestConfiguration.Protocol) + ? "thrift" : TestConfiguration.Protocol.ToLowerInvariant(); + var parameters = new Dictionary { - [DatabricksParameters.Protocol] = "thrift", [DatabricksParameters.UseCloudFetch] = useCloudFetch.ToString(), [DatabricksParameters.EnableDirectResults] = enableDirectResults.ToString(), }; @@ -165,29 +171,54 @@ public async Task DisposeEmitsCloseOperationEvent(string description, string que OutputHelper?.WriteLine($"[{description}] Read {totalRows} rows, reader disposed."); - // Collect the events emitted by DatabricksCompositeReader.Dispose. - List disposeEvents; - lock (_capturedEventsLock) + if (protocol == "rest") { - disposeEvents = _capturedEvents - .Where(e => e.ActivityName == "DatabricksCompositeReader.Dispose") - .Select(e => e.EventName) - .ToList(); + // SEA path: there is no composite reader. The server statement is + // released by StatementExecutionStatement.Dispose via CloseStatementAsync + // (HTTP DELETE), traced as the statement.dispose event under that span. + List seaDisposeEvents; + lock (_capturedEventsLock) + { + seaDisposeEvents = _capturedEvents + .Where(e => e.ActivityName == "StatementExecutionStatement.Dispose") + .Select(e => e.EventName) + .ToList(); + } + + OutputHelper?.WriteLine($"[{description}] SEA dispose events: [{string.Join(", ", seaDisposeEvents)}]"); + + Assert.True(seaDisposeEvents.Contains("statement.dispose"), + $"[{description}] statement.dispose event not found in " + + $"StatementExecutionStatement.Dispose. The SEA path must release the " + + $"server statement via CloseStatementAsync (HTTP DELETE) on dispose. " + + $"Got events: [" + string.Join(", ", seaDisposeEvents) + "]"); } + else + { + // Thrift path: the operation is closed by DatabricksCompositeReader.Dispose. + List disposeEvents; + lock (_capturedEventsLock) + { + disposeEvents = _capturedEvents + .Where(e => e.ActivityName == "DatabricksCompositeReader.Dispose") + .Select(e => e.EventName) + .ToList(); + } - OutputHelper?.WriteLine($"[{description}] Dispose events: [{string.Join(", ", disposeEvents)}]"); + OutputHelper?.WriteLine($"[{description}] Dispose events: [{string.Join(", ", disposeEvents)}]"); - // The composite_reader.close_operation.started event is only present after - // the original CloseOperation fix (it was renamed from the bare - // "composite_reader.close_operation" by issue #489 to mark when the RPC - // begins). Without that fix, the CloudFetch path silently skips - // CloseOperation entirely and neither event is emitted. - Assert.True(disposeEvents.Contains("composite_reader.close_operation.started"), - $"[{description}] composite_reader.close_operation.started event not found in " + - $"DatabricksCompositeReader.Dispose. Without the close-operation fix, server " + - $"operations are orphaned until SQL Gateway closes them with " + - $"CommandInactivityTimeout (~1 hour). Got events: [" + - string.Join(", ", disposeEvents) + "]"); + // The composite_reader.close_operation.started event is only present after + // the original CloseOperation fix (it was renamed from the bare + // "composite_reader.close_operation" by issue #489 to mark when the RPC + // begins). Without that fix, the CloudFetch path silently skips + // CloseOperation entirely and neither event is emitted. + Assert.True(disposeEvents.Contains("composite_reader.close_operation.started"), + $"[{description}] composite_reader.close_operation.started event not found in " + + $"DatabricksCompositeReader.Dispose. Without the close-operation fix, server " + + $"operations are orphaned until SQL Gateway closes them with " + + $"CommandInactivityTimeout (~1 hour). Got events: [" + + string.Join(", ", disposeEvents) + "]"); + } } finally { @@ -221,6 +252,10 @@ public async Task DisposeCloseOperation_EmitsStartedAndCompletedWithDistinctTime var parameters = new Dictionary { + // Thrift-pinned by design: this test measures the latency of the Thrift + // TCloseOperationReq RPC via the started/completed event pair, which only + // exists on the composite-reader (Thrift) path. The REST/SEA close is a + // single HTTP DELETE with no equivalent latency-bracketing pair. [DatabricksParameters.Protocol] = "thrift", [DatabricksParameters.UseCloudFetch] = "false", [DatabricksParameters.EnableDirectResults] = "false", diff --git a/csharp/test/E2E/CloudFetchE2ETest.cs b/csharp/test/E2E/CloudFetchE2ETest.cs index 44a3c82e..603aa973 100644 --- a/csharp/test/E2E/CloudFetchE2ETest.cs +++ b/csharp/test/E2E/CloudFetchE2ETest.cs @@ -37,7 +37,8 @@ namespace AdbcDrivers.Databricks.Tests { /// /// End-to-end tests for the CloudFetch feature in the Databricks ADBC driver. - /// Tests both Thrift and Statement Execution REST API protocols. + /// Runs under the protocol selected by the connection config / CI matrix + /// (the suite is exercised against both Thrift and REST as separate jobs). /// public class CloudFetchE2ETest : TestBase, IDisposable { @@ -111,48 +112,51 @@ protected override void Dispose(bool disposing) } /// - /// Test cases for CloudFetch with protocol dimension. - /// Format: (query, expected row count, use cloud fetch, enable direct results, protocol) + /// Test cases for CloudFetch. + /// Format: (query, expected row count, use cloud fetch, enable direct results) + /// + /// The protocol is NOT a test-case dimension: each case runs under the protocol + /// selected by the connection config / CI matrix (separate rest and thrift jobs), + /// so it isn't duplicated or hardcoded here. /// public static IEnumerable TestCases() { - string[] protocols = { "thrift", "rest" }; - string zeroQuery = "SELECT * FROM range(1000) LIMIT 0"; string smallQuery = "SELECT * FROM range(1000)"; string largeQuery = "SELECT * FROM main.tpcds_sf100_delta.store_sales LIMIT 1000000"; - foreach (var protocol in protocols) - { - // LIMIT 0 test cases - edge case for empty result set (PECO-2524) - yield return new object[] { zeroQuery, 0, true, true, protocol }; - yield return new object[] { zeroQuery, 0, false, true, protocol }; - - // Small query test cases - yield return new object[] { smallQuery, 1000, true, true, protocol }; - yield return new object[] { smallQuery, 1000, false, true, protocol }; - yield return new object[] { smallQuery, 1000, true, false, protocol }; - yield return new object[] { smallQuery, 1000, false, false, protocol }; - - // Large query test cases - yield return new object[] { largeQuery, 1000000, true, true, protocol }; - yield return new object[] { largeQuery, 1000000, false, true, protocol }; - yield return new object[] { largeQuery, 1000000, true, false, protocol }; - yield return new object[] { largeQuery, 1000000, false, false, protocol }; - } + // LIMIT 0 test cases - edge case for empty result set (PECO-2524) + yield return new object[] { zeroQuery, 0, true, true }; + yield return new object[] { zeroQuery, 0, false, true }; + + // Small query test cases + yield return new object[] { smallQuery, 1000, true, true }; + yield return new object[] { smallQuery, 1000, false, true }; + yield return new object[] { smallQuery, 1000, true, false }; + yield return new object[] { smallQuery, 1000, false, false }; + + // Large query test cases + yield return new object[] { largeQuery, 1000000, true, true }; + yield return new object[] { largeQuery, 1000000, false, true }; + yield return new object[] { largeQuery, 1000000, true, false }; + yield return new object[] { largeQuery, 1000000, false, false }; } /// /// Integration test for running queries against a real Databricks cluster with different CloudFetch settings. - /// Tests both Thrift and Statement Execution REST API protocols. + /// Runs under the protocol selected by the connection config / CI matrix (rest or thrift). /// [Theory] [MemberData(nameof(TestCases))] - public async Task TestCloudFetch(string query, int rowCount, bool useCloudFetch, bool enableDirectResults, string protocol) + public async Task TestCloudFetch(string query, int rowCount, bool useCloudFetch, bool enableDirectResults) { + // Effective protocol comes from config (mirrors DatabricksDatabase's default), + // not from the test — so the rest-only setup below tracks the configured protocol. + string protocol = string.IsNullOrEmpty(TestConfiguration.Protocol) + ? "thrift" : TestConfiguration.Protocol.ToLowerInvariant(); + var parameters = new Dictionary { - [DatabricksParameters.Protocol] = protocol, [DatabricksParameters.UseCloudFetch] = useCloudFetch.ToString(), [DatabricksParameters.EnableDirectResults] = enableDirectResults.ToString(), [DatabricksParameters.CanDecompressLz4] = "true",