Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions csharp/src/Reader/DatabricksReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using AdbcDrivers.Databricks.Telemetry.TagDefinitions;
using Apache.Arrow;
using Apache.Arrow.Adbc;
using AdbcDrivers.HiveServer2.Hive2;
Expand All @@ -47,6 +49,20 @@ internal sealed class DatabricksReader : BaseDatabricksReader
// than the limit in the last batch but reports adjusted rowCount in metadata.
private long _currentBatchExpectedRows;

// Issue #485: Track total bytes consumed by this inline reader so
// DatabricksCompositeReader.Dispose can emit a result.bytes_downloaded tag
// with the same shape as CloudFetchReader (which sets the tag inside its
// own Dispose, while Activity.Current is the composite Dispose span).
//
// Byte-counting convention: we count the wire-side size of each
// TSparkArrowBatch.Batch — i.e. the bytes received over the Thrift
// connection, before any LZ4 decompression. This matches CloudFetch,
// which uses DownloadResult.Size (the downloaded chunk size, also
// pre-decompression). The tag name "downloaded" is reused for
// dashboard/query compatibility even though inline data is not a
// separate download.
private long _totalBytesConsumed;

public DatabricksReader(IHiveServer2Statement statement, Schema schema, IResponse response, TFetchResultsResp? initialResults, bool isLz4Compressed)
: base(statement, schema, response, isLz4Compressed) // IHiveServer2Statement implements IActivityTracer
{
Expand Down Expand Up @@ -189,6 +205,13 @@ private void ProcessFetchedBatches()
ReadOnlyMemory<byte> dataToUse = new ReadOnlyMemory<byte>(batch.Batch);
int originalSize = batch.Batch.Length;

// Issue #485: accumulate wire-side batch bytes for the
// result.bytes_downloaded tag emitted on Dispose. We use the
// pre-decompression size (originalSize) to match CloudFetch's
// convention of counting downloaded/received bytes rather than
// decompressed bytes.
_totalBytesConsumed += originalSize;

// If LZ4 compression is enabled, decompress the data
if (isLz4Compressed)
{
Expand Down Expand Up @@ -241,6 +264,14 @@ _ when ex.GetType().Name.Contains("LZ4") => $"Batch {this.index}: LZ4 decompress

protected override void Dispose(bool disposing)
{
// Issue #485: emit the same result.bytes_downloaded tag CloudFetchReader
// emits, so DatabricksCompositeReader.Dispose carries a uniform
// byte-consumption signal on both reader paths. Activity.Current here
// is the composite Dispose span (BaseDatabricksReader.Dispose does not
// open its own activity), which is exactly where CloudFetchReader's
// tag lands today.
Activity.Current?.SetTag(StatementExecutionEvent.ResultBytesDownloaded, _totalBytesConsumed);

base.Dispose(disposing);
}

Expand Down
115 changes: 114 additions & 1 deletion csharp/test/E2E/CloseOperationE2ETest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ namespace AdbcDrivers.Databricks.Tests
public class CloseOperationE2ETest : TestBase<DatabricksTestConfiguration, DatabricksTestEnvironment>, IDisposable
{
private readonly List<(string ActivityName, string EventName)> _capturedEvents = new();
private readonly List<(string ActivityName, IEnumerable<KeyValuePair<string, object?>> Tags)> _capturedTags = new();
private readonly object _capturedEventsLock = new();
private readonly ActivityListener _activityListener;
private bool _disposed;
Expand All @@ -68,6 +69,9 @@ public CloseOperationE2ETest(ITestOutputHelper? outputHelper)
{
_capturedEvents.Add((activity.OperationName, evt.Name));
}
// Snapshot the tag set into a materialized list so we don't hold
// a reference to Activity's internal collection after it's recycled.
_capturedTags.Add((activity.OperationName, activity.TagObjects.ToList()));
}
}
};
Expand Down Expand Up @@ -130,7 +134,7 @@ public static IEnumerable<object[]> TestCases() =>
[MemberData(nameof(TestCases))]
public async Task DisposeEmitsCloseOperationEvent(string description, string query, bool useCloudFetch, bool enableDirectResults)
{
lock (_capturedEventsLock) { _capturedEvents.Clear(); }
lock (_capturedEventsLock) { _capturedEvents.Clear(); _capturedTags.Clear(); }

var parameters = new Dictionary<string, string>
{
Expand Down Expand Up @@ -187,5 +191,114 @@ public async Task DisposeEmitsCloseOperationEvent(string description, string que
connection.Dispose();
}
}

/// <summary>
/// Regression test for issue #485: the inline result path must emit the same
/// <c>result.bytes_downloaded</c> tag on <c>DatabricksCompositeReader.Dispose</c>
/// as the CloudFetch path, so debug tooling and dashboards filtering on this tag
/// see a uniform "bytes consumed by reader" signal regardless of which reader
/// was active.
///
/// Before the fix, only the CloudFetch path tagged the composite Dispose span
/// with <c>result.bytes_downloaded</c> (via <c>CloudFetchReader.Dispose</c>
/// running under the composite Dispose Activity). The inline
/// <c>DatabricksReader.Dispose</c> emitted no such tag.
///
/// The fix tracks bytes consumed in <see cref="DatabricksReader"/> as each
/// inline Arrow batch is processed, and emits the matching tag on Dispose so
/// the composite span carries it on both paths. The tag name is kept identical
/// to CloudFetch even though "downloaded" is technically a stretch for inline
/// data (which arrives via the Thrift connection rather than a separate
/// download), because dashboards and queries filtering on this tag should
/// work uniformly for both readers.
/// </summary>
[SkippableFact]
public async Task Dispose_InlineReader_EmitsBytesDownloaded_Issue485()
{
lock (_capturedEventsLock) { _capturedEvents.Clear(); _capturedTags.Clear(); }

var parameters = new Dictionary<string, string>
{
[DatabricksParameters.Protocol] = "thrift",
// Force the inline result path (no CloudFetch). We deliberately keep
// DirectResults disabled so the driver fetches results via the inline
// FetchResults RPC, guaranteeing DatabricksReader processes at least
// one TSparkArrowBatch and accumulates bytes.
[DatabricksParameters.UseCloudFetch] = "false",
[DatabricksParameters.EnableDirectResults] = "false",
};

// Small range query — produces a single inline batch via Thrift FetchResults.
const string query = "SELECT * FROM range(1, 100)";

var connection = NewConnection(TestConfiguration, parameters);
try
{
var statement = connection.CreateStatement();
statement.SqlQuery = query;
var result = await statement.ExecuteQueryAsync();

long totalRows = 0;
using (var reader = result.Stream!)
{
RecordBatch? batch;
while ((batch = await reader.ReadNextRecordBatchAsync()) != null)
{
totalRows += batch.Length;
}
}
// reader.Dispose() above triggers DatabricksCompositeReader.Dispose,
// which is where the result.bytes_downloaded tag must now be present
// for the inline path (issue #485).
statement.Dispose();

OutputHelper?.WriteLine($"Inline reader consumed {totalRows} rows; checking composite Dispose tags.");

// Pull the tags emitted by DatabricksCompositeReader.Dispose.
List<KeyValuePair<string, object?>> disposeTags;
lock (_capturedEventsLock)
{
disposeTags = _capturedTags
.Where(e => e.ActivityName == "DatabricksCompositeReader.Dispose")
.SelectMany(e => e.Tags)
.ToList();
}

OutputHelper?.WriteLine(
"Composite Dispose tags: [" +
string.Join(", ", disposeTags.Select(t => $"{t.Key}={t.Value}")) + "]");

// Sanity: confirm we exercised the inline path. CloudFetch parity test
// already covers the CloudFetchReader case.
var activeReaderTag = disposeTags
.FirstOrDefault(t => t.Key == "reader.active_reader_type");
Assert.Equal("DatabricksReader", activeReaderTag.Value as string);

// The core assertion: the inline path must now emit the same byte counter
// tag as CloudFetch, with a strictly positive value (we consumed at least
// one non-empty inline batch).
var bytesTag = disposeTags
.Where(t => t.Key == "result.bytes_downloaded")
.Cast<KeyValuePair<string, object?>?>()
.FirstOrDefault();

Assert.True(bytesTag != null,
"Expected 'result.bytes_downloaded' tag on DatabricksCompositeReader.Dispose " +
"for the inline result path (issue #485). Before the fix only CloudFetchReader " +
"emits this tag, so dashboards filtering on byte-consumption see no signal for " +
"inline-path disposes. Got tags: [" +
string.Join(", ", disposeTags.Select(t => t.Key)) + "]");

long bytesValue = Convert.ToInt64(bytesTag!.Value.Value);
Assert.True(bytesValue > 0,
$"Expected 'result.bytes_downloaded' > 0 on inline Dispose; got {bytesValue}. " +
"The inline reader must accumulate per-batch byte sizes as batches are " +
"consumed so this tag reflects the total bytes the reader processed.");
}
finally
{
connection.Dispose();
}
}
}
}
Loading