Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 30 additions & 22 deletions csharp/src/StatementExecution/StatementExecutionStatement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -760,29 +760,37 @@ public override void Dispose()
{
if (_currentStatementId != null)
{
try
// Start a dedicated span instead of annotating Activity.Current: Dispose is
// typically called outside any ambient activity (e.g. connection pooling),
// so Activity.Current is usually null here and the event would be dropped.
// Mirrors DatabricksCompositeReader.Dispose on the Thrift path, which owns
// its own span so the close is always traced regardless of caller context.
this.TraceActivity(activity =>
{
// Close statement synchronously during dispose
Activity.Current?.AddEvent(new ActivityEvent("statement.dispose",
tags: new ActivityTagsCollection
{
{ "statement_id", _currentStatementId }
}));
_client.CloseStatementAsync(_currentStatementId, CancellationToken.None).GetAwaiter().GetResult();
}
catch (Exception ex)
{
// Best effort - ignore errors during dispose
Activity.Current?.AddEvent(new ActivityEvent("statement.dispose.error",
tags: new ActivityTagsCollection
{
{ "error", ex.Message }
}));
}
finally
{
_currentStatementId = null;
}
try
{
// Close statement synchronously during dispose
activity?.AddEvent(new ActivityEvent("statement.dispose",
tags: new ActivityTagsCollection
{
{ "statement_id", _currentStatementId }
}));
_client.CloseStatementAsync(_currentStatementId, CancellationToken.None).GetAwaiter().GetResult();
}
catch (Exception ex)
{
// Best effort - ignore errors during dispose
activity?.AddEvent(new ActivityEvent("statement.dispose.error",
tags: new ActivityTagsCollection
{
{ "error", ex.Message }
}));
}
finally
{
_currentStatementId = null;
}
}, activityName: nameof(StatementExecutionStatement) + "." + nameof(Dispose));
}
}

Expand Down
93 changes: 64 additions & 29 deletions csharp/test/E2E/CloseOperationE2ETest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,17 @@ public static IEnumerable<object[]> TestCases() =>
];

/// <summary>
/// Validates that DatabricksCompositeReader.Dispose emits the composite_reader.close_operation
/// trace event for all result delivery modes when the reader is disposed without closing
/// the underlying connection (simulating connection pooling).
/// Validates that disposing a statement releases the server-side operation across all
/// result delivery modes when the connection is not closed (simulating connection pooling).
/// The protocol is inherited from config / the CI matrix, and the assertion branches:
///
/// The composite_reader.close_operation event is only present in the code path introduced
/// by the fix. Without the fix:
/// - Inline (DirectResults or not): event missing because _activeReader != null caused
/// the old code to delegate to _activeReader.Dispose() instead.
/// - CloudFetch: same delegation, but CloseOperation is never sent at all, orphaning
/// the server operation for ~1 hour.
/// - Thrift: DatabricksCompositeReader.Dispose must emit composite_reader.close_operation.
/// Without the fix this event is missing — Inline (DirectResults or not) because the old
/// code delegated to _activeReader.Dispose(); CloudFetch because CloseOperation was never
/// sent at all, orphaning the server operation for ~1 hour.
/// - REST/SEA: there is no composite reader; StatementExecutionStatement.Dispose must emit
/// statement.dispose, which brackets the CloseStatementAsync (HTTP DELETE) that releases
/// the server statement.
///
/// For Thrift wire-level assertions, see proxy-based tests in databricks-driver-test:
/// CLOUDFETCH-013 through CLOUDFETCH-016.
Expand All @@ -134,9 +135,14 @@ public async Task DisposeEmitsCloseOperationEvent(string description, string que
{
lock (_capturedEventsLock) { _capturedEvents.Clear(); }

// Inherit the protocol from config / CI matrix rather than hardcoding it.
// The two protocols release the server-side operation through different
// code paths, so the assertion below branches on the effective protocol.
string protocol = string.IsNullOrEmpty(TestConfiguration.Protocol)
? "thrift" : TestConfiguration.Protocol.ToLowerInvariant();

var parameters = new Dictionary<string, string>
{
[DatabricksParameters.Protocol] = "thrift",
[DatabricksParameters.UseCloudFetch] = useCloudFetch.ToString(),
[DatabricksParameters.EnableDirectResults] = enableDirectResults.ToString(),
};
Expand Down Expand Up @@ -165,29 +171,54 @@ public async Task DisposeEmitsCloseOperationEvent(string description, string que

OutputHelper?.WriteLine($"[{description}] Read {totalRows} rows, reader disposed.");

// Collect the events emitted by DatabricksCompositeReader.Dispose.
List<string> disposeEvents;
lock (_capturedEventsLock)
if (protocol == "rest")
{
disposeEvents = _capturedEvents
.Where(e => e.ActivityName == "DatabricksCompositeReader.Dispose")
.Select(e => e.EventName)
.ToList();
// SEA path: there is no composite reader. The server statement is
// released by StatementExecutionStatement.Dispose via CloseStatementAsync
// (HTTP DELETE), traced as the statement.dispose event under that span.
List<string> seaDisposeEvents;
lock (_capturedEventsLock)
{
seaDisposeEvents = _capturedEvents
.Where(e => e.ActivityName == "StatementExecutionStatement.Dispose")
.Select(e => e.EventName)
.ToList();
}

OutputHelper?.WriteLine($"[{description}] SEA dispose events: [{string.Join(", ", seaDisposeEvents)}]");

Assert.True(seaDisposeEvents.Contains("statement.dispose"),
$"[{description}] statement.dispose event not found in " +
$"StatementExecutionStatement.Dispose. The SEA path must release the " +
$"server statement via CloseStatementAsync (HTTP DELETE) on dispose. " +
$"Got events: [" + string.Join(", ", seaDisposeEvents) + "]");
}
else
{
// Thrift path: the operation is closed by DatabricksCompositeReader.Dispose.
List<string> disposeEvents;
lock (_capturedEventsLock)
{
disposeEvents = _capturedEvents
.Where(e => e.ActivityName == "DatabricksCompositeReader.Dispose")
.Select(e => e.EventName)
.ToList();
}

OutputHelper?.WriteLine($"[{description}] Dispose events: [{string.Join(", ", disposeEvents)}]");
OutputHelper?.WriteLine($"[{description}] Dispose events: [{string.Join(", ", disposeEvents)}]");

// The composite_reader.close_operation.started event is only present after
// the original CloseOperation fix (it was renamed from the bare
// "composite_reader.close_operation" by issue #489 to mark when the RPC
// begins). Without that fix, the CloudFetch path silently skips
// CloseOperation entirely and neither event is emitted.
Assert.True(disposeEvents.Contains("composite_reader.close_operation.started"),
$"[{description}] composite_reader.close_operation.started event not found in " +
$"DatabricksCompositeReader.Dispose. Without the close-operation fix, server " +
$"operations are orphaned until SQL Gateway closes them with " +
$"CommandInactivityTimeout (~1 hour). Got events: [" +
string.Join(", ", disposeEvents) + "]");
// The composite_reader.close_operation.started event is only present after
// the original CloseOperation fix (it was renamed from the bare
// "composite_reader.close_operation" by issue #489 to mark when the RPC
// begins). Without that fix, the CloudFetch path silently skips
// CloseOperation entirely and neither event is emitted.
Assert.True(disposeEvents.Contains("composite_reader.close_operation.started"),
$"[{description}] composite_reader.close_operation.started event not found in " +
$"DatabricksCompositeReader.Dispose. Without the close-operation fix, server " +
$"operations are orphaned until SQL Gateway closes them with " +
$"CommandInactivityTimeout (~1 hour). Got events: [" +
string.Join(", ", disposeEvents) + "]");
}
}
finally
{
Expand Down Expand Up @@ -221,6 +252,10 @@ public async Task DisposeCloseOperation_EmitsStartedAndCompletedWithDistinctTime

var parameters = new Dictionary<string, string>
{
// Thrift-pinned by design: this test measures the latency of the Thrift
// TCloseOperationReq RPC via the started/completed event pair, which only
// exists on the composite-reader (Thrift) path. The REST/SEA close is a
// single HTTP DELETE with no equivalent latency-bracketing pair.
[DatabricksParameters.Protocol] = "thrift",
[DatabricksParameters.UseCloudFetch] = "false",
[DatabricksParameters.EnableDirectResults] = "false",
Expand Down
56 changes: 30 additions & 26 deletions csharp/test/E2E/CloudFetchE2ETest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ namespace AdbcDrivers.Databricks.Tests
{
/// <summary>
/// End-to-end tests for the CloudFetch feature in the Databricks ADBC driver.
/// Tests both Thrift and Statement Execution REST API protocols.
/// Runs under the protocol selected by the connection config / CI matrix
/// (the suite is exercised against both Thrift and REST as separate jobs).
/// </summary>
public class CloudFetchE2ETest : TestBase<DatabricksTestConfiguration, DatabricksTestEnvironment>, IDisposable
{
Expand Down Expand Up @@ -111,48 +112,51 @@ protected override void Dispose(bool disposing)
}

/// <summary>
/// Test cases for CloudFetch with protocol dimension.
/// Format: (query, expected row count, use cloud fetch, enable direct results, protocol)
/// Test cases for CloudFetch.
/// Format: (query, expected row count, use cloud fetch, enable direct results)
///
/// The protocol is NOT a test-case dimension: each case runs under the protocol
/// selected by the connection config / CI matrix (separate rest and thrift jobs),
/// so it isn't duplicated or hardcoded here.
/// </summary>
public static IEnumerable<object[]> TestCases()
{
string[] protocols = { "thrift", "rest" };

string zeroQuery = "SELECT * FROM range(1000) LIMIT 0";
string smallQuery = "SELECT * FROM range(1000)";
string largeQuery = "SELECT * FROM main.tpcds_sf100_delta.store_sales LIMIT 1000000";

foreach (var protocol in protocols)
{
// LIMIT 0 test cases - edge case for empty result set (PECO-2524)
yield return new object[] { zeroQuery, 0, true, true, protocol };
yield return new object[] { zeroQuery, 0, false, true, protocol };

// Small query test cases
yield return new object[] { smallQuery, 1000, true, true, protocol };
yield return new object[] { smallQuery, 1000, false, true, protocol };
yield return new object[] { smallQuery, 1000, true, false, protocol };
yield return new object[] { smallQuery, 1000, false, false, protocol };

// Large query test cases
yield return new object[] { largeQuery, 1000000, true, true, protocol };
yield return new object[] { largeQuery, 1000000, false, true, protocol };
yield return new object[] { largeQuery, 1000000, true, false, protocol };
yield return new object[] { largeQuery, 1000000, false, false, protocol };
}
// LIMIT 0 test cases - edge case for empty result set (PECO-2524)
yield return new object[] { zeroQuery, 0, true, true };
yield return new object[] { zeroQuery, 0, false, true };

// Small query test cases
yield return new object[] { smallQuery, 1000, true, true };
yield return new object[] { smallQuery, 1000, false, true };
yield return new object[] { smallQuery, 1000, true, false };
yield return new object[] { smallQuery, 1000, false, false };

// Large query test cases
yield return new object[] { largeQuery, 1000000, true, true };
yield return new object[] { largeQuery, 1000000, false, true };
yield return new object[] { largeQuery, 1000000, true, false };
yield return new object[] { largeQuery, 1000000, false, false };
}

/// <summary>
/// Integration test for running queries against a real Databricks cluster with different CloudFetch settings.
/// Tests both Thrift and Statement Execution REST API protocols.
/// Runs under the protocol selected by the connection config / CI matrix (rest or thrift).
/// </summary>
[Theory]
[MemberData(nameof(TestCases))]
public async Task TestCloudFetch(string query, int rowCount, bool useCloudFetch, bool enableDirectResults, string protocol)
public async Task TestCloudFetch(string query, int rowCount, bool useCloudFetch, bool enableDirectResults)
{
// Effective protocol comes from config (mirrors DatabricksDatabase's default),
// not from the test — so the rest-only setup below tracks the configured protocol.
string protocol = string.IsNullOrEmpty(TestConfiguration.Protocol)
? "thrift" : TestConfiguration.Protocol.ToLowerInvariant();

var parameters = new Dictionary<string, string>
{
[DatabricksParameters.Protocol] = protocol,
[DatabricksParameters.UseCloudFetch] = useCloudFetch.ToString(),
[DatabricksParameters.EnableDirectResults] = enableDirectResults.ToString(),
[DatabricksParameters.CanDecompressLz4] = "true",
Expand Down
Loading