From 774e8f968bcf8a590cd3297e76b4261b60591c1d Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Thu, 28 May 2026 00:18:22 -0700 Subject: [PATCH 1/2] test(csharp): assert inline Dispose emits result.bytes_downloaded (failing for #485) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds Dispose_InlineReader_EmitsBytesDownloaded_Issue485 to CloseOperationE2ETest. The test forces the Thrift inline result path (UseCloudFetch=false, EnableDirectResults=false), reads a small range() result through DatabricksReader, disposes the reader, and asserts that the DatabricksCompositeReader.Dispose span carries a result.bytes_downloaded tag with a strictly positive value — matching the tag CloudFetchReader already emits on the same span. This commit is RED: against the unpatched driver the test fails with "Got tags: [reader.active_reader_type]", proving the inline path is missing the byte counter (issue #485). Refs #485 Co-authored-by: Isaac --- csharp/test/E2E/CloseOperationE2ETest.cs | 115 ++++++++++++++++++++++- 1 file changed, 114 insertions(+), 1 deletion(-) diff --git a/csharp/test/E2E/CloseOperationE2ETest.cs b/csharp/test/E2E/CloseOperationE2ETest.cs index 03d2cf91a..8affa1b35 100644 --- a/csharp/test/E2E/CloseOperationE2ETest.cs +++ b/csharp/test/E2E/CloseOperationE2ETest.cs @@ -47,6 +47,7 @@ namespace AdbcDrivers.Databricks.Tests public class CloseOperationE2ETest : TestBase, IDisposable { private readonly List<(string ActivityName, string EventName)> _capturedEvents = new(); + private readonly List<(string ActivityName, IEnumerable> Tags)> _capturedTags = new(); private readonly object _capturedEventsLock = new(); private readonly ActivityListener _activityListener; private bool _disposed; @@ -68,6 +69,9 @@ public CloseOperationE2ETest(ITestOutputHelper? outputHelper) { _capturedEvents.Add((activity.OperationName, evt.Name)); } + // Snapshot the tag set into a materialized list so we don't hold + // a reference to Activity's internal collection after it's recycled. + _capturedTags.Add((activity.OperationName, activity.TagObjects.ToList())); } } }; @@ -130,7 +134,7 @@ public static IEnumerable TestCases() => [MemberData(nameof(TestCases))] public async Task DisposeEmitsCloseOperationEvent(string description, string query, bool useCloudFetch, bool enableDirectResults) { - lock (_capturedEventsLock) { _capturedEvents.Clear(); } + lock (_capturedEventsLock) { _capturedEvents.Clear(); _capturedTags.Clear(); } var parameters = new Dictionary { @@ -187,5 +191,114 @@ public async Task DisposeEmitsCloseOperationEvent(string description, string que connection.Dispose(); } } + + /// + /// Regression test for issue #485: the inline result path must emit the same + /// result.bytes_downloaded tag on DatabricksCompositeReader.Dispose + /// as the CloudFetch path, so debug tooling and dashboards filtering on this tag + /// see a uniform "bytes consumed by reader" signal regardless of which reader + /// was active. + /// + /// Before the fix, only the CloudFetch path tagged the composite Dispose span + /// with result.bytes_downloaded (via CloudFetchReader.Dispose + /// running under the composite Dispose Activity). The inline + /// DatabricksReader.Dispose emitted no such tag. + /// + /// The fix tracks bytes consumed in as each + /// inline Arrow batch is processed, and emits the matching tag on Dispose so + /// the composite span carries it on both paths. The tag name is kept identical + /// to CloudFetch even though "downloaded" is technically a stretch for inline + /// data (which arrives via the Thrift connection rather than a separate + /// download), because dashboards and queries filtering on this tag should + /// work uniformly for both readers. + /// + [SkippableFact] + public async Task Dispose_InlineReader_EmitsBytesDownloaded_Issue485() + { + lock (_capturedEventsLock) { _capturedEvents.Clear(); _capturedTags.Clear(); } + + var parameters = new Dictionary + { + [DatabricksParameters.Protocol] = "thrift", + // Force the inline result path (no CloudFetch). We deliberately keep + // DirectResults disabled so the driver fetches results via the inline + // FetchResults RPC, guaranteeing DatabricksReader processes at least + // one TSparkArrowBatch and accumulates bytes. + [DatabricksParameters.UseCloudFetch] = "false", + [DatabricksParameters.EnableDirectResults] = "false", + }; + + // Small range query — produces a single inline batch via Thrift FetchResults. + const string query = "SELECT * FROM range(1, 100)"; + + var connection = NewConnection(TestConfiguration, parameters); + try + { + var statement = connection.CreateStatement(); + statement.SqlQuery = query; + var result = await statement.ExecuteQueryAsync(); + + long totalRows = 0; + using (var reader = result.Stream!) + { + RecordBatch? batch; + while ((batch = await reader.ReadNextRecordBatchAsync()) != null) + { + totalRows += batch.Length; + } + } + // reader.Dispose() above triggers DatabricksCompositeReader.Dispose, + // which is where the result.bytes_downloaded tag must now be present + // for the inline path (issue #485). + statement.Dispose(); + + OutputHelper?.WriteLine($"Inline reader consumed {totalRows} rows; checking composite Dispose tags."); + + // Pull the tags emitted by DatabricksCompositeReader.Dispose. + List> disposeTags; + lock (_capturedEventsLock) + { + disposeTags = _capturedTags + .Where(e => e.ActivityName == "DatabricksCompositeReader.Dispose") + .SelectMany(e => e.Tags) + .ToList(); + } + + OutputHelper?.WriteLine( + "Composite Dispose tags: [" + + string.Join(", ", disposeTags.Select(t => $"{t.Key}={t.Value}")) + "]"); + + // Sanity: confirm we exercised the inline path. CloudFetch parity test + // already covers the CloudFetchReader case. + var activeReaderTag = disposeTags + .FirstOrDefault(t => t.Key == "reader.active_reader_type"); + Assert.Equal("DatabricksReader", activeReaderTag.Value as string); + + // The core assertion: the inline path must now emit the same byte counter + // tag as CloudFetch, with a strictly positive value (we consumed at least + // one non-empty inline batch). + var bytesTag = disposeTags + .Where(t => t.Key == "result.bytes_downloaded") + .Cast?>() + .FirstOrDefault(); + + Assert.True(bytesTag != null, + "Expected 'result.bytes_downloaded' tag on DatabricksCompositeReader.Dispose " + + "for the inline result path (issue #485). Before the fix only CloudFetchReader " + + "emits this tag, so dashboards filtering on byte-consumption see no signal for " + + "inline-path disposes. Got tags: [" + + string.Join(", ", disposeTags.Select(t => t.Key)) + "]"); + + long bytesValue = Convert.ToInt64(bytesTag!.Value.Value); + Assert.True(bytesValue > 0, + $"Expected 'result.bytes_downloaded' > 0 on inline Dispose; got {bytesValue}. " + + "The inline reader must accumulate per-batch byte sizes as batches are " + + "consumed so this tag reflects the total bytes the reader processed."); + } + finally + { + connection.Dispose(); + } + } } } From 3d47509aee83b18862e39656ef1cfe180a6d5a28 Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Thu, 28 May 2026 00:19:40 -0700 Subject: [PATCH 2/2] fix(csharp): emit result.bytes_downloaded on inline Dispose for parity with CloudFetch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DatabricksCompositeReader.Dispose previously emitted result.bytes_downloaded only on the CloudFetch path. CloudFetchReader.Dispose calls Activity.Current?.SetTag(...) and Activity.Current at that point is the composite Dispose span (BaseDatabricksReader.Dispose opens no activity of its own), which is why the tag lands on the composite span. The inline DatabricksReader path emitted no such tag, leaving the composite Dispose span asymmetric: CloudFetch carried both reader.active_reader_type and result.bytes_downloaded, while inline carried only the reader-type tag. This patch: - Adds a _totalBytesConsumed counter to DatabricksReader, incremented in ProcessFetchedBatches with the wire-side TSparkArrowBatch.Batch.Length (i.e. pre-LZ4-decompression bytes received over Thrift). This matches CloudFetchReader's convention of counting downloaded chunk sizes (DownloadResult.Size, set after the chunk is downloaded but before decompression). - Emits the same Activity.Current?.SetTag(ResultBytesDownloaded, ...) call in DatabricksReader.Dispose so the inline path lands on the composite Dispose span identically to CloudFetch. The tag name "result.bytes_downloaded" is kept identical even though "downloaded" is technically a stretch for inline data (which arrives via the Thrift connection rather than a separate cloud download), because dashboards and queries filtering on this tag should work uniformly for both reader paths — that uniformity is the higher priority and is why issue #485 was filed. Closes #485 Co-authored-by: Isaac --- csharp/src/Reader/DatabricksReader.cs | 31 +++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/csharp/src/Reader/DatabricksReader.cs b/csharp/src/Reader/DatabricksReader.cs index 5d5a1c32c..1d4df9a89 100644 --- a/csharp/src/Reader/DatabricksReader.cs +++ b/csharp/src/Reader/DatabricksReader.cs @@ -23,8 +23,10 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; +using AdbcDrivers.Databricks.Telemetry.TagDefinitions; using Apache.Arrow; using Apache.Arrow.Adbc; using AdbcDrivers.HiveServer2.Hive2; @@ -47,6 +49,20 @@ internal sealed class DatabricksReader : BaseDatabricksReader // than the limit in the last batch but reports adjusted rowCount in metadata. private long _currentBatchExpectedRows; + // Issue #485: Track total bytes consumed by this inline reader so + // DatabricksCompositeReader.Dispose can emit a result.bytes_downloaded tag + // with the same shape as CloudFetchReader (which sets the tag inside its + // own Dispose, while Activity.Current is the composite Dispose span). + // + // Byte-counting convention: we count the wire-side size of each + // TSparkArrowBatch.Batch — i.e. the bytes received over the Thrift + // connection, before any LZ4 decompression. This matches CloudFetch, + // which uses DownloadResult.Size (the downloaded chunk size, also + // pre-decompression). The tag name "downloaded" is reused for + // dashboard/query compatibility even though inline data is not a + // separate download. + private long _totalBytesConsumed; + public DatabricksReader(IHiveServer2Statement statement, Schema schema, IResponse response, TFetchResultsResp? initialResults, bool isLz4Compressed) : base(statement, schema, response, isLz4Compressed) // IHiveServer2Statement implements IActivityTracer { @@ -189,6 +205,13 @@ private void ProcessFetchedBatches() ReadOnlyMemory dataToUse = new ReadOnlyMemory(batch.Batch); int originalSize = batch.Batch.Length; + // Issue #485: accumulate wire-side batch bytes for the + // result.bytes_downloaded tag emitted on Dispose. We use the + // pre-decompression size (originalSize) to match CloudFetch's + // convention of counting downloaded/received bytes rather than + // decompressed bytes. + _totalBytesConsumed += originalSize; + // If LZ4 compression is enabled, decompress the data if (isLz4Compressed) { @@ -241,6 +264,14 @@ _ when ex.GetType().Name.Contains("LZ4") => $"Batch {this.index}: LZ4 decompress protected override void Dispose(bool disposing) { + // Issue #485: emit the same result.bytes_downloaded tag CloudFetchReader + // emits, so DatabricksCompositeReader.Dispose carries a uniform + // byte-consumption signal on both reader paths. Activity.Current here + // is the composite Dispose span (BaseDatabricksReader.Dispose does not + // open its own activity), which is exactly where CloudFetchReader's + // tag lands today. + Activity.Current?.SetTag(StatementExecutionEvent.ResultBytesDownloaded, _totalBytesConsumed); + base.Dispose(disposing); }