diff --git a/csharp/doc/sprint-plan-PECO-3022-sea-telemetry-2026-05-14.md b/csharp/doc/sprint-plan-PECO-3022-sea-telemetry-2026-05-14.md index 99bde1d56..23b38b982 100644 --- a/csharp/doc/sprint-plan-PECO-3022-sea-telemetry-2026-05-14.md +++ b/csharp/doc/sprint-plan-PECO-3022-sea-telemetry-2026-05-14.md @@ -66,9 +66,8 @@ Create the interface and three implementations per design §5.1 and §12. **New files:** - `csharp/src/Telemetry/IStatementOperationObserver.cs` -- `csharp/src/Telemetry/TelemetryObserver.cs` (uses `Safe(Action)` helper pattern from design §12) +- `csharp/src/Telemetry/TelemetryObserver.cs` (try/catch scoped to `OnFinalized`'s proto build + enqueue; lifecycle methods are plain field writes that cannot throw) - `csharp/src/Telemetry/NullObserver.cs` (singleton) -- `csharp/src/Telemetry/SafeObserver.cs` (decorator) **Acceptance criteria:** - Interface contract documented: methods MUST NOT throw, thread-safe, `OnFinalized` is terminal and idempotent. @@ -82,8 +81,6 @@ Create the interface and three implementations per design §5.1 and §12. - `TelemetryObserver_OnError_RecordsErrorAndFinalizes` - `TelemetryObserver_AllMethods_NeverThrow_WhenContextCorrupted` - `TelemetryObserver_OnChunksDownloaded_MergesIntoChunkDetails` - - `SafeObserver_PropagatesNormalCallsToInner` - - `SafeObserver_SwallowsExceptionsFromInner_LogsAtTrace` **Risks:** Low. New code, no existing callers yet. diff --git a/csharp/src/Telemetry/IStatementOperationObserver.cs b/csharp/src/Telemetry/IStatementOperationObserver.cs new file mode 100644 index 000000000..5c51914a5 --- /dev/null +++ b/csharp/src/Telemetry/IStatementOperationObserver.cs @@ -0,0 +1,114 @@ +/* +* Copyright (c) 2025 ADBC Drivers Contributors +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +using System; +using AdbcDrivers.Databricks.Reader.CloudFetch; +using ExecutionResultFormat = AdbcDrivers.Databricks.Telemetry.Proto.ExecutionResult.Types.Format; +using OperationType = AdbcDrivers.Databricks.Telemetry.Proto.Operation.Types.Type; +using StatementType = AdbcDrivers.Databricks.Telemetry.Proto.Statement.Types.Type; + +namespace AdbcDrivers.Databricks.Telemetry +{ + /// + /// Observer of a single statement's operational lifecycle. Sits between the statement + /// classes (Thrift and SEA) and the telemetry implementation so the statement code + /// does not depend on telemetry types directly. + /// + /// + /// + /// Fail-open contract: Implementations MUST NOT throw from any method on + /// this interface. All exceptions raised inside an implementation must be swallowed + /// internally — callsites in statement code intentionally contain no try/catch around + /// observer calls. Telemetry must never affect the caller's control flow. + /// + /// + /// Thread-safety: Methods on this interface may be invoked from any thread. + /// Implementations MUST be thread-safe. In practice the calls happen from the + /// statement's executing task and the reader's disposal thread, which may differ. + /// + /// + /// Terminal call: is the terminal call. After it has + /// been invoked once, the observer's record is considered complete and any further + /// calls — including additional calls — MUST be no-ops + /// (idempotent). This protects against the common case where both an error path + /// and a dispose path attempt to finalize the same statement. + /// + /// + /// Non-telemetry uses: The interface is shaped around the statement's lifecycle + /// rather than the telemetry data model so future observers (tracing, audit) can be + /// added without changing statement code. + /// + /// + internal interface IStatementOperationObserver + { + /// + /// Called once just before the statement is submitted to the server. + /// + /// The statement type (QUERY, UPDATE, METADATA, ...). + /// The operation type (EXECUTE_STATEMENT, EXECUTE_STATEMENT_ASYNC, ...). + /// Whether results are expected to be compressed (LZ4). + void OnExecuteStarted(StatementType stmtType, OperationType opType, bool isCompressed); + + /// + /// Called once after the server has accepted the statement and a statement id is known. + /// + /// The server-assigned statement id. + /// The result format inferred for the execution (INLINE_ARROW, EXTERNAL_LINKS, ...). + void OnExecuteSucceeded(string statementId, ExecutionResultFormat resultFormat); + + /// + /// Called once after the polling loop reaches a terminal state, with the accumulated + /// poll count and total elapsed poll latency. + /// + /// Total number of status-poll calls issued. + /// Sum of wall-clock time spent in poll calls, in milliseconds. + void OnPollCompleted(int count, long latencyMs); + + /// + /// Called when the first batch of results is available to the reader. + /// Implementations should treat repeated calls as a no-op (only the first wins). + /// + /// Elapsed time from execute start to first batch ready, in milliseconds. + void OnFirstBatchReady(long latencyMs); + + /// + /// Called when the reader has been fully consumed (or disposed). + /// + /// Elapsed time from execute start to results fully consumed, in milliseconds. + void OnConsumed(long latencyMs); + + /// + /// Called once when chunk metrics for the CloudFetch download are available. + /// + /// Aggregated CloudFetch chunk metrics. Implementations must + /// tolerate empty / default metrics if the gap-fix plumbing has not landed yet. + void OnChunksDownloaded(ChunkMetrics metrics); + + /// + /// Called once if the statement execution fails. Implementations should record the + /// error and continue to accept further calls; an explicit + /// call is still required to terminate the observer. + /// + /// The exception that occurred. + void OnError(Exception ex); + + /// + /// Terminal call. Implementations build and dispatch any pending record and mark + /// the observer as finalized. Must be idempotent: repeated calls are no-ops. + /// + void OnFinalized(); + } +} diff --git a/csharp/src/Telemetry/NullObserver.cs b/csharp/src/Telemetry/NullObserver.cs new file mode 100644 index 000000000..7a0c458d8 --- /dev/null +++ b/csharp/src/Telemetry/NullObserver.cs @@ -0,0 +1,93 @@ +/* +* Copyright (c) 2025 ADBC Drivers Contributors +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +using System; +using AdbcDrivers.Databricks.Reader.CloudFetch; +using ExecutionResultFormat = AdbcDrivers.Databricks.Telemetry.Proto.ExecutionResult.Types.Format; +using OperationType = AdbcDrivers.Databricks.Telemetry.Proto.Operation.Types.Type; +using StatementType = AdbcDrivers.Databricks.Telemetry.Proto.Statement.Types.Type; + +namespace AdbcDrivers.Databricks.Telemetry +{ + /// + /// Singleton no-op implementation of . + /// Used as the default observer so callsites in statement classes never need to + /// null-check before calling observer methods. + /// + /// + /// All methods are intentionally empty. They satisfy the fail-open, thread-safe, + /// and idempotent contract trivially. + /// + internal sealed class NullObserver : IStatementOperationObserver + { + /// + /// The singleton instance. Use this directly rather than constructing new instances. + /// + public static readonly NullObserver Instance = new NullObserver(); + + private NullObserver() + { + } + + /// + public void OnExecuteStarted(StatementType stmtType, OperationType opType, bool isCompressed) + { + // No-op. + } + + /// + public void OnExecuteSucceeded(string statementId, ExecutionResultFormat resultFormat) + { + // No-op. + } + + /// + public void OnPollCompleted(int count, long latencyMs) + { + // No-op. + } + + /// + public void OnFirstBatchReady(long latencyMs) + { + // No-op. + } + + /// + public void OnConsumed(long latencyMs) + { + // No-op. + } + + /// + public void OnChunksDownloaded(ChunkMetrics metrics) + { + // No-op. + } + + /// + public void OnError(Exception ex) + { + // No-op. + } + + /// + public void OnFinalized() + { + // No-op. Idempotent by construction. + } + } +} diff --git a/csharp/src/Telemetry/TelemetryObserver.cs b/csharp/src/Telemetry/TelemetryObserver.cs new file mode 100644 index 000000000..befe0d49d --- /dev/null +++ b/csharp/src/Telemetry/TelemetryObserver.cs @@ -0,0 +1,253 @@ +/* +* Copyright (c) 2025 ADBC Drivers Contributors +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +using System; +using System.Diagnostics; +using System.Threading; +using AdbcDrivers.Databricks.Reader.CloudFetch; +using AdbcDrivers.Databricks.Telemetry.Models; +using ExecutionResultFormat = AdbcDrivers.Databricks.Telemetry.Proto.ExecutionResult.Types.Format; +using OperationType = AdbcDrivers.Databricks.Telemetry.Proto.Operation.Types.Type; +using StatementType = AdbcDrivers.Databricks.Telemetry.Proto.Statement.Types.Type; + +namespace AdbcDrivers.Databricks.Telemetry +{ + /// + /// Default implementation that translates + /// observer method calls into mutations on a + /// and, on , builds a + /// and enqueues it on the session's telemetry client for background export. + /// + /// + /// + /// Fail-open: The lifecycle methods are plain field assignments and a couple of + /// guarded helpers; they cannot realistically throw. Only does + /// risky work (proto build + telemetry-client enqueue), and the try/catch is scoped to + /// exactly that block. Exceptions are surfaced as an OpenTelemetry activity event on the + /// ambient , not propagated to the caller. + /// + /// + /// Thread-safe: The scalar writes into the per-statement context are inherently + /// safe for the lifecycle calls (each is called at most a small number of times from the + /// statement's execution path or the reader's disposal thread). The terminal + /// uses + /// on an _emitted flag to guarantee exactly-once enqueue even if it is invoked + /// concurrently from multiple threads (e.g. error path + dispose path). + /// + /// + /// Post-finalize: Every non-terminal lifecycle method short-circuits on + /// , so any stray call after is a true + /// no-op per the interface contract. + /// + /// + /// Non-blocking: only enqueues the log into the + /// telemetry client's internal queue. The actual HTTP export runs on the client's + /// background flush timer and never blocks the calling thread. + /// + /// + internal sealed class TelemetryObserver : IStatementOperationObserver + { + // 0 = not yet emitted, 1 = emitted. Mutated via Interlocked.CompareExchange so the + // terminal OnFinalized() is exactly-once even under concurrent invocation. + private int _emitted; + + private readonly TelemetrySessionContext _session; + private readonly StatementTelemetryContext _ctx; + + /// + /// Initializes a new observer that records into a freshly created + /// seeded from . + /// + /// Per-connection telemetry session context. Required. + public TelemetryObserver(TelemetrySessionContext session) + : this(session, new StatementTelemetryContext(session ?? throw new ArgumentNullException(nameof(session)))) + { + } + + // Test seam: allows unit tests to inject a pre-populated context. Internal so + // it does not leak from the assembly. + internal TelemetryObserver(TelemetrySessionContext session, StatementTelemetryContext context) + { + _session = session ?? throw new ArgumentNullException(nameof(session)); + _ctx = context ?? throw new ArgumentNullException(nameof(context)); + } + + /// + /// Internal accessor for the underlying context, exposed for unit tests that need + /// to assert per-field state without building a full telemetry log. + /// + internal StatementTelemetryContext Context => _ctx; + + /// + /// Internal accessor for the idempotency flag, exposed for unit tests that need to + /// confirm exactly-once semantics without reaching into the enqueue path. + /// + internal bool HasEmitted => Volatile.Read(ref _emitted) != 0; + + /// + public void OnExecuteStarted(StatementType stmtType, OperationType opType, bool isCompressed) + { + if (IsFinalized()) return; + _ctx.StatementType = stmtType; + _ctx.OperationType = opType; + _ctx.IsCompressed = isCompressed; + } + + /// + public void OnExecuteSucceeded(string statementId, ExecutionResultFormat resultFormat) + { + if (IsFinalized()) return; + _ctx.StatementId = statementId; + _ctx.ResultFormat = resultFormat; + _ctx.RecordExecuteComplete(); + } + + /// + public void OnPollCompleted(int count, long latencyMs) + { + if (IsFinalized()) return; + _ctx.PollCount = count; + _ctx.PollLatencyMs = latencyMs; + } + + /// + public void OnFirstBatchReady(long latencyMs) + { + if (IsFinalized()) return; + // Only the first call wins: the underlying setter is null-guarded so + // repeated calls (e.g. inline reader emits one, cloud-fetch reader emits + // another) do not overwrite the earliest observed latency. + if (_ctx.FirstBatchReadyMs == null) + { + _ctx.FirstBatchReadyMs = latencyMs; + } + } + + /// + public void OnConsumed(long latencyMs) + { + if (IsFinalized()) return; + _ctx.ResultsConsumedMs = latencyMs; + } + + /// + public void OnChunksDownloaded(ChunkMetrics metrics) + { + if (IsFinalized() || metrics == null) return; + // Tolerate an empty ChunkMetrics — the gap-fix plumbing may not be landed + // yet, and the proto fields are nullable on the wire. + _ctx.SetChunkDetails( + metrics.TotalChunksPresent, + metrics.TotalChunksIterated, + metrics.InitialChunkLatencyMs, + metrics.SlowestChunkLatencyMs, + metrics.SumChunksDownloadTimeMs); + } + + /// + public void OnError(Exception ex) + { + if (IsFinalized() || ex == null) return; + _ctx.HasError = true; + _ctx.ErrorName = ex.GetType().Name; + // .Message is captured in-memory only. The proto's DriverErrorInfo.error_message + // field is pending LPP review (see ConnectionTelemetry.EmitOperationTelemetry), + // so BuildTelemetryLog does NOT serialize ErrorMessage into the wire payload + // today. SafeMessage neutralizes the rare case where ex.Message itself throws. + _ctx.ErrorMessage = SafeMessage(ex); + } + + /// + public void OnFinalized() + { + // Idempotency gate: only the first caller proceeds. + if (Interlocked.CompareExchange(ref _emitted, 1, 0) != 0) + { + return; + } + + ITelemetryClient? client = _session.TelemetryClient; + if (client == null) + { + // Telemetry is disabled at the session level. The idempotency flag is + // already set so a subsequent OnFinalized() remains a no-op. + return; + } + + // The only genuinely risky work in this class: proto construction and + // enqueue into the telemetry client. Anything else above this point is + // plain field writes that cannot throw. The catch is scoped tightly so the + // suppression only ever covers the real risk surface. + try + { + Proto.OssSqlDriverTelemetryLog log = _ctx.BuildTelemetryLog(); + TelemetryFrontendLog frontendLog = new TelemetryFrontendLog + { + WorkspaceId = _ctx.WorkspaceId, + FrontendLogEventId = Guid.NewGuid().ToString(), + Context = new FrontendLogContext + { + TimestampMillis = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }, + Entry = new FrontendLogEntry + { + SqlDriverLog = log, + }, + }; + + // Enqueue is non-blocking; the client buffers events and flushes on a + // background timer. No HTTP I/O happens on the calling thread. + client.Enqueue(frontendLog); + } + catch (Exception ex) + { + try + { + Activity.Current?.AddEvent(new ActivityEvent("telemetry.observer.suppressed", + tags: new ActivityTagsCollection + { + { "error.type", ex.GetType().Name }, + { "error.message", SafeMessage(ex) }, + { "observer.suppressed.source", "TelemetryObserver" }, + })); + } + catch + { + // Recording the suppression must not itself throw. Intentionally empty. + } + } + } + + // After OnFinalized has fired, all other lifecycle methods are no-ops per the + // interface contract. Centralized here so each method body stays terse. + private bool IsFinalized() => Volatile.Read(ref _emitted) != 0; + + // Some exceptions throw from their .Message property (rare but observed in the + // wild for user-defined types). Wrap it so OnError can never become a source of + // observer failure. + private static string? SafeMessage(Exception ex) + { + try + { + return ex.Message; + } + catch + { + return null; + } + } + } +} diff --git a/csharp/test/Unit/Telemetry/NullObserverTests.cs b/csharp/test/Unit/Telemetry/NullObserverTests.cs new file mode 100644 index 000000000..f9efda7c1 --- /dev/null +++ b/csharp/test/Unit/Telemetry/NullObserverTests.cs @@ -0,0 +1,75 @@ +/* +* Copyright (c) 2025 ADBC Drivers Contributors +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +using System; +using AdbcDrivers.Databricks.Reader.CloudFetch; +using AdbcDrivers.Databricks.Telemetry; +using ExecutionResultFormat = AdbcDrivers.Databricks.Telemetry.Proto.ExecutionResult.Types.Format; +using OperationType = AdbcDrivers.Databricks.Telemetry.Proto.Operation.Types.Type; +using StatementType = AdbcDrivers.Databricks.Telemetry.Proto.Statement.Types.Type; +using Xunit; + +namespace AdbcDrivers.Databricks.Tests.Unit.Telemetry +{ + /// + /// Tests for — verifies it satisfies the + /// fail-open / no-op / singleton contract. + /// + public class NullObserverTests + { + [Fact] + public void NullObserver_AllMethods_AreNoOps() + { + // Arrange + IStatementOperationObserver observer = NullObserver.Instance; + + // Act + Assert: every method must complete without throwing and without + // observable side effects. We exercise the full surface twice to also + // confirm idempotency of OnFinalized. + for (int i = 0; i < 2; i++) + { + observer.OnExecuteStarted(StatementType.Query, OperationType.ExecuteStatement, isCompressed: true); + observer.OnExecuteSucceeded("stmt-id-123", ExecutionResultFormat.InlineArrow); + observer.OnPollCompleted(count: 3, latencyMs: 42); + observer.OnFirstBatchReady(latencyMs: 100); + observer.OnConsumed(latencyMs: 200); + observer.OnChunksDownloaded(new ChunkMetrics()); + observer.OnError(new InvalidOperationException("boom")); + observer.OnFinalized(); + } + + // No state to inspect — passing the calls is the assertion. + } + + [Fact] + public void NullObserver_IsSingleton() + { + // Arrange + Act + NullObserver first = NullObserver.Instance; + NullObserver second = NullObserver.Instance; + + // Assert: same reference, and the only way to obtain an instance. + Assert.NotNull(first); + Assert.Same(first, second); + + // There must be no public constructor: callers should be forced to + // use the singleton accessor. + System.Reflection.ConstructorInfo[] publicCtors = typeof(NullObserver) + .GetConstructors(System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Instance); + Assert.Empty(publicCtors); + } + } +} diff --git a/csharp/test/Unit/Telemetry/TelemetryObserverTests.cs b/csharp/test/Unit/Telemetry/TelemetryObserverTests.cs new file mode 100644 index 000000000..039ca04af --- /dev/null +++ b/csharp/test/Unit/Telemetry/TelemetryObserverTests.cs @@ -0,0 +1,414 @@ +/* +* Copyright (c) 2025 ADBC Drivers Contributors +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using AdbcDrivers.Databricks.Reader.CloudFetch; +using AdbcDrivers.Databricks.Telemetry; +using AdbcDrivers.Databricks.Telemetry.Models; +using AdbcDrivers.Databricks.Telemetry.Proto; +using ExecutionResultFormat = AdbcDrivers.Databricks.Telemetry.Proto.ExecutionResult.Types.Format; +using OperationType = AdbcDrivers.Databricks.Telemetry.Proto.Operation.Types.Type; +using StatementType = AdbcDrivers.Databricks.Telemetry.Proto.Statement.Types.Type; +using Xunit; + +namespace AdbcDrivers.Databricks.Tests.Unit.Telemetry +{ + /// + /// Unit tests for verifying: + /// + /// Observer method calls propagate into the underlying . + /// OnFinalized enqueues exactly one . + /// The terminal call is idempotent under both serial and concurrent invocation. + /// All methods satisfy the fail-open contract even when the telemetry client throws. + /// + /// + public class TelemetryObserverTests + { + // ── Test doubles ───────────────────────────────────────────────────────────── + + /// + /// Records every enqueued log so tests can inspect the exact proto fields the + /// observer attempted to emit, and counts enqueues to assert exactly-once semantics. + /// + private sealed class CapturingTelemetryClient : ITelemetryClient + { + public List Logs { get; } = new List(); + + public int EnqueueCallCount; + + public void Enqueue(TelemetryFrontendLog log) + { + Interlocked.Increment(ref EnqueueCallCount); + lock (Logs) + { + Logs.Add(log); + } + } + + public Task FlushAsync(CancellationToken ct = default) => Task.CompletedTask; + + public Task CloseAsync() => Task.CompletedTask; + + public ValueTask DisposeAsync() => default; + } + + /// + /// Simulates a corrupted / faulty telemetry client whose Enqueue path raises an + /// exception. Used to assert the observer's fail-open guarantee. + /// + private sealed class ThrowingTelemetryClient : ITelemetryClient + { + public int EnqueueCallCount; + + public void Enqueue(TelemetryFrontendLog log) + { + Interlocked.Increment(ref EnqueueCallCount); + throw new InvalidOperationException("simulated telemetry client failure"); + } + + public Task FlushAsync(CancellationToken ct = default) => Task.CompletedTask; + + public Task CloseAsync() => Task.CompletedTask; + + public ValueTask DisposeAsync() => default; + } + + // ── Fixtures ───────────────────────────────────────────────────────────────── + + private static TelemetrySessionContext CreateSession(ITelemetryClient client) + { + return new TelemetrySessionContext + { + SessionId = "session-abc", + WorkspaceId = 4242L, + TelemetryClient = client, + AuthType = "pat", + SystemConfiguration = new DriverSystemConfiguration { DriverVersion = "1.0.0" }, + DriverConnectionParams = new DriverConnectionParameters { HttpPath = "/sql/1.0/wh/x" }, + }; + } + + private static (TelemetryObserver observer, CapturingTelemetryClient client) CreateObserver() + { + CapturingTelemetryClient client = new CapturingTelemetryClient(); + TelemetrySessionContext session = CreateSession(client); + TelemetryObserver observer = new TelemetryObserver(session); + return (observer, client); + } + + // ── Required tests (per task description) ──────────────────────────────────── + + [Fact] + public void TelemetryObserver_OnExecuteStarted_PopulatesContext() + { + // Arrange + (TelemetryObserver observer, _) = CreateObserver(); + + // Act + observer.OnExecuteStarted(StatementType.Query, OperationType.ExecuteStatement, isCompressed: true); + + // Assert: scalar fields land on the underlying context. + Assert.Equal(StatementType.Query, observer.Context.StatementType); + Assert.Equal(OperationType.ExecuteStatement, observer.Context.OperationType); + Assert.True(observer.Context.IsCompressed); + } + + [Fact] + public void TelemetryObserver_OnExecuteSucceeded_RecordsStatementId() + { + // Arrange + (TelemetryObserver observer, _) = CreateObserver(); + + // Act + observer.OnExecuteSucceeded("stmt-id-42", ExecutionResultFormat.ExternalLinks); + + // Assert + Assert.Equal("stmt-id-42", observer.Context.StatementId); + Assert.Equal(ExecutionResultFormat.ExternalLinks, observer.Context.ResultFormat); + } + + [Fact] + public void TelemetryObserver_OnFinalized_EnqueuesExactlyOnce() + { + // Arrange + (TelemetryObserver observer, CapturingTelemetryClient client) = CreateObserver(); + observer.OnExecuteStarted(StatementType.Query, OperationType.ExecuteStatement, isCompressed: false); + observer.OnExecuteSucceeded("stmt-1", ExecutionResultFormat.InlineArrow); + + // Act + observer.OnFinalized(); + + // Assert + Assert.Equal(1, client.EnqueueCallCount); + Assert.Single(client.Logs); + + // The emitted log must reflect the recorded context. + OssSqlDriverTelemetryLog log = client.Logs[0].Entry!.SqlDriverLog!; + Assert.Equal("session-abc", log.SessionId); + Assert.Equal("stmt-1", log.SqlStatementId); + Assert.Equal(StatementType.Query, log.SqlOperation.StatementType); + Assert.Equal(OperationType.ExecuteStatement, log.SqlOperation.OperationDetail.OperationType); + Assert.Null(log.ErrorInfo); + + // Workspace id and frontend envelope must be populated. + Assert.Equal(4242L, client.Logs[0].WorkspaceId); + Assert.False(string.IsNullOrEmpty(client.Logs[0].FrontendLogEventId)); + Assert.NotNull(client.Logs[0].Context); + Assert.True(client.Logs[0].Context!.TimestampMillis > 0); + } + + [Fact] + public void TelemetryObserver_OnFinalized_CalledTwice_EnqueuesOnce() + { + // Arrange + (TelemetryObserver observer, CapturingTelemetryClient client) = CreateObserver(); + + // Act: invoke OnFinalized twice in serial (mirrors error + dispose paths). + observer.OnFinalized(); + observer.OnFinalized(); + + // Assert: exactly one enqueue. + Assert.Equal(1, client.EnqueueCallCount); + Assert.True(observer.HasEmitted); + } + + [Fact] + public async Task TelemetryObserver_OnFinalized_ConcurrentCalls_EnqueueOnce() + { + // Arrange + (TelemetryObserver observer, CapturingTelemetryClient client) = CreateObserver(); + + // Act: race many threads to OnFinalized; only one must win. + const int parallelism = 32; + using ManualResetEventSlim start = new ManualResetEventSlim(); + Task[] tasks = new Task[parallelism]; + for (int i = 0; i < parallelism; i++) + { + tasks[i] = Task.Run(() => + { + start.Wait(); + observer.OnFinalized(); + }); + } + start.Set(); + await Task.WhenAll(tasks); + + // Assert + Assert.Equal(1, client.EnqueueCallCount); + } + + [Fact] + public void TelemetryObserver_OnError_RecordsErrorAndFinalizes() + { + // Arrange + (TelemetryObserver observer, CapturingTelemetryClient client) = CreateObserver(); + observer.OnExecuteStarted(StatementType.Query, OperationType.ExecuteStatement, isCompressed: false); + InvalidOperationException error = new InvalidOperationException("simulated query failure"); + + // Act + observer.OnError(error); + observer.OnFinalized(); + observer.OnFinalized(); // idempotent: should not double-emit even with error path + + // Assert: exactly one log, error_info populated. + Assert.Equal(1, client.EnqueueCallCount); + OssSqlDriverTelemetryLog log = client.Logs[0].Entry!.SqlDriverLog!; + Assert.NotNull(log.ErrorInfo); + Assert.Equal("InvalidOperationException", log.ErrorInfo.ErrorName); + Assert.True(observer.Context.HasError); + Assert.Equal("simulated query failure", observer.Context.ErrorMessage); + } + + [Fact] + public void TelemetryObserver_AllMethods_NeverThrow_WhenContextCorrupted() + { + // Arrange: build an observer whose telemetry client throws on every Enqueue + // (this simulates a corrupted downstream dependency). The observer must + // continue to absorb all calls without re-throwing. + ThrowingTelemetryClient throwing = new ThrowingTelemetryClient(); + TelemetrySessionContext session = CreateSession(throwing); + TelemetryObserver observer = new TelemetryObserver(session); + + // Act + Assert: exercise the entire surface, including pathological inputs + // (null statementId, null exception, null ChunkMetrics). + Exception? captured = Record.Exception(() => + { + observer.OnExecuteStarted(StatementType.Query, OperationType.ExecuteStatement, isCompressed: true); + observer.OnExecuteSucceeded(null!, ExecutionResultFormat.Unspecified); + observer.OnPollCompleted(count: 0, latencyMs: 0); + observer.OnFirstBatchReady(latencyMs: -1); + observer.OnConsumed(latencyMs: -1); + observer.OnChunksDownloaded(null!); + observer.OnError(null!); + observer.OnError(new InvalidOperationException("boom")); + observer.OnFinalized(); + observer.OnFinalized(); + }); + + Assert.Null(captured); + + // The throwing client must have been invoked exactly once (idempotent finalize) + // and the observer must have swallowed its exception. + Assert.Equal(1, throwing.EnqueueCallCount); + Assert.True(observer.HasEmitted); + } + + [Fact] + public void TelemetryObserver_OnChunksDownloaded_MergesIntoChunkDetails() + { + // Arrange + (TelemetryObserver observer, CapturingTelemetryClient client) = CreateObserver(); + ChunkMetrics metrics = new ChunkMetrics + { + TotalChunksPresent = 12, + TotalChunksIterated = 11, + InitialChunkLatencyMs = 75, + SlowestChunkLatencyMs = 220, + SumChunksDownloadTimeMs = 1430, + }; + + // Act + observer.OnExecuteStarted(StatementType.Query, OperationType.ExecuteStatement, isCompressed: false); + observer.OnChunksDownloaded(metrics); + observer.OnFinalized(); + + // Assert: context absorbed the metrics. + Assert.Equal(12, observer.Context.TotalChunksPresent); + Assert.Equal(11, observer.Context.TotalChunksIterated); + Assert.Equal(75, observer.Context.InitialChunkLatencyMs); + Assert.Equal(220, observer.Context.SlowestChunkLatencyMs); + Assert.Equal(1430, observer.Context.SumChunksDownloadTimeMs); + + // The chunk_details proto block on the emitted log mirrors the input. + ChunkDetails details = client.Logs[0].Entry!.SqlDriverLog!.SqlOperation.ChunkDetails; + Assert.NotNull(details); + Assert.Equal(12, details.TotalChunksPresent); + Assert.Equal(11, details.TotalChunksIterated); + Assert.Equal(75, details.InitialChunkLatencyMillis); + Assert.Equal(220, details.SlowestChunkLatencyMillis); + Assert.Equal(1430, details.SumChunksDownloadTimeMillis); + } + + // ── Additional coverage ────────────────────────────────────────────────────── + + [Fact] + public void TelemetryObserver_Constructor_RejectsNullSession() + { + Assert.Throws(() => new TelemetryObserver(null!)); + } + + [Fact] + public void TelemetryObserver_OnFinalized_WithNullTelemetryClient_IsNoOp() + { + // Arrange: session has no telemetry client (covers the disabled case). + TelemetrySessionContext session = new TelemetrySessionContext + { + SessionId = "s1", + WorkspaceId = 1L, + TelemetryClient = null, + }; + TelemetryObserver observer = new TelemetryObserver(session); + + // Act + Assert: must not throw, must mark itself emitted so a later call is a no-op. + observer.OnFinalized(); + observer.OnFinalized(); + Assert.True(observer.HasEmitted); + } + + [Fact] + public void TelemetryObserver_OnFirstBatchReady_OnlyFirstCallWins() + { + // Arrange + (TelemetryObserver observer, _) = CreateObserver(); + + // Act + observer.OnFirstBatchReady(latencyMs: 50); + observer.OnFirstBatchReady(latencyMs: 999); + + // Assert: subsequent calls do not overwrite the earliest observed latency. + Assert.Equal(50, observer.Context.FirstBatchReadyMs); + } + + [Fact] + public void TelemetryObserver_OnFinalized_ThenLifecycleCalls_AreNoOps() + { + // Arrange: drive the observer to a finalized state with a known context snapshot. + (TelemetryObserver observer, CapturingTelemetryClient client) = CreateObserver(); + observer.OnExecuteStarted(StatementType.Query, OperationType.ExecuteStatement, isCompressed: true); + observer.OnExecuteSucceeded("stmt-pre-final", ExecutionResultFormat.InlineArrow); + observer.OnFinalized(); + + Assert.Equal(1, client.EnqueueCallCount); + Assert.True(observer.HasEmitted); + + // Capture the post-finalize context fingerprint so we can prove subsequent + // lifecycle calls do not mutate state. + string? statementIdAfterFinal = observer.Context.StatementId; + StatementType stmtTypeAfterFinal = observer.Context.StatementType; + ExecutionResultFormat resultFormatAfterFinal = observer.Context.ResultFormat; + bool isCompressedAfterFinal = observer.Context.IsCompressed; + int? pollCountAfterFinal = observer.Context.PollCount; + long? consumedAfterFinal = observer.Context.ResultsConsumedMs; + bool hasErrorAfterFinal = observer.Context.HasError; + + // Act: call every non-terminal lifecycle method with different values that + // would visibly mutate the context if the gate were missing. + observer.OnExecuteStarted(StatementType.Update, OperationType.ExecuteStatementAsync, isCompressed: false); + observer.OnExecuteSucceeded("stmt-after-final-must-not-stick", ExecutionResultFormat.ExternalLinks); + observer.OnPollCompleted(count: 99, latencyMs: 9999); + observer.OnFirstBatchReady(latencyMs: 7777); + observer.OnConsumed(latencyMs: 8888); + observer.OnChunksDownloaded(new ChunkMetrics { TotalChunksPresent = 42, TotalChunksIterated = 42 }); + observer.OnError(new InvalidOperationException("post-finalize error")); + + // Second OnFinalized must also be a no-op. + observer.OnFinalized(); + + // Assert: no second enqueue, no observable mutation. + Assert.Equal(1, client.EnqueueCallCount); + Assert.Equal(statementIdAfterFinal, observer.Context.StatementId); + Assert.Equal(stmtTypeAfterFinal, observer.Context.StatementType); + Assert.Equal(resultFormatAfterFinal, observer.Context.ResultFormat); + Assert.Equal(isCompressedAfterFinal, observer.Context.IsCompressed); + Assert.Equal(pollCountAfterFinal, observer.Context.PollCount); + Assert.Equal(consumedAfterFinal, observer.Context.ResultsConsumedMs); + Assert.Equal(hasErrorAfterFinal, observer.Context.HasError); + Assert.Null(observer.Context.TotalChunksPresent); + } + + [Fact] + public void TelemetryObserver_OnPollCompleted_StoresCountAndLatency() + { + // Arrange + (TelemetryObserver observer, CapturingTelemetryClient client) = CreateObserver(); + + // Act + observer.OnExecuteStarted(StatementType.Query, OperationType.ExecuteStatementAsync, isCompressed: false); + observer.OnPollCompleted(count: 5, latencyMs: 250); + observer.OnFinalized(); + + // Assert + Assert.Equal(5, observer.Context.PollCount); + Assert.Equal(250, observer.Context.PollLatencyMs); + OperationDetail detail = client.Logs[0].Entry!.SqlDriverLog!.SqlOperation.OperationDetail; + Assert.Equal(5, detail.NOperationStatusCalls); + Assert.Equal(250, detail.OperationStatusLatencyMillis); + } + } +} diff --git a/docs/designs/PECO-3022-sea-telemetry-integration-design.md b/docs/designs/PECO-3022-sea-telemetry-integration-design.md index 2aad2b5cd..e73f4c488 100644 --- a/docs/designs/PECO-3022-sea-telemetry-integration-design.md +++ b/docs/designs/PECO-3022-sea-telemetry-integration-design.md @@ -106,10 +106,6 @@ classDiagram <> } - class SafeObserver { - -inner: IStatementOperationObserver - } - class StatementExecutionConnection { -_telemetry: IConnectionTelemetry +CreateStatement() AdbcStatement @@ -134,8 +130,6 @@ classDiagram IConnectionTelemetry <|.. ConnectionTelemetry IStatementOperationObserver <|.. TelemetryObserver IStatementOperationObserver <|.. NullObserver - IStatementOperationObserver <|.. SafeObserver - SafeObserver --> IStatementOperationObserver : wraps StatementExecutionConnection --> IConnectionTelemetry DatabricksConnection --> IConnectionTelemetry StatementExecutionStatement --> IStatementOperationObserver @@ -160,7 +154,7 @@ These components already exist for the Thrift path and are protocol-agnostic. Th - **`IStatementOperationObserver`** — small interface, ~8 methods, fail-open contract. - **`TelemetryObserver`** — default implementation that translates observer calls into `StatementTelemetryContext` mutations and enqueues a `OssSqlDriverTelemetryLog` on finalize. - **`NullObserver`** — singleton no-op, used as the default field value so callsites never need null checks. -- **`SafeObserver`** (optional) — decorator that swallows any exception thrown by an inner observer. Belt-and-suspenders. +_(A `SafeObserver` decorator was prototyped during implementation and removed after review: the only genuinely risky work in `TelemetryObserver` is the `OnFinalized` proto-build + enqueue path, which is now wrapped with a scoped `try/catch` inside that method. Lifecycle methods are plain field writes that cannot throw, so a per-method decorator was redundant.)_ ### 4.4 Modified components @@ -428,16 +422,12 @@ public void OnExecuteStarted(Statement.Types.Type stmt, Operation.Types.Type op, }); public void OnPollCompleted(int count, long latencyMs, ExecutionResult.Types.Format resultFormat) => - Safe(() => { - _ctx.PollCount = count; - _ctx.PollLatencyMs = latencyMs; - _ctx.ResultFormat = resultFormat; - }); + _ctx.PollCount = count; + _ctx.PollLatencyMs = latencyMs; + _ctx.ResultFormat = resultFormat; ``` -This concentrates the try/catch in exactly one place per observer impl. The tiny per-call lambda allocation is acceptable — these methods are called O(1) times per statement. - -The optional `SafeObserver` decorator is available for future third-party observer implementations that may not honor the contract; it wraps any inner observer with a defensive try/catch per method. +The lifecycle methods are plain field writes that cannot throw, so they need no defensive wrapping. Only `OnFinalized`'s proto-build + telemetry-client enqueue does risky work, and the `try/catch` is scoped tightly to exactly that block. Exceptions are surfaced as a `telemetry.observer.suppressed` activity event on the ambient `Activity`, never propagated to the caller. ### Circuit breaker reuse @@ -501,8 +491,6 @@ No new configuration parameters. All existing knobs apply unchanged: **`IStatementOperationObserver` implementations:** - `NullObserver_AllMethods_AreNoOps` - `NullObserver_IsSingleton` -- `SafeObserver_PropagatesNormalCallsToInner` -- `SafeObserver_SwallowsExceptionsFromInner_LogsAtTrace` - `TelemetryObserver_OnExecuteStarted_PopulatesContext` - `TelemetryObserver_OnStatementSubmitted_RecordsStatementId` - `TelemetryObserver_OnPollCompleted_RecordsCountLatencyAndResultFormat` @@ -559,7 +547,7 @@ No new configuration parameters. All existing knobs apply unchanged: ### PR sequencing 1. Refactor `ConnectionTelemetry.Create` signature (string sessionId, `DriverMode mode` param). Thrift call site updates only. Verifies no behavioral change. -2. Introduce `IStatementOperationObserver`, `TelemetryObserver`, `NullObserver`, `SafeObserver`. No callers yet. +2. Introduce `IStatementOperationObserver`, `TelemetryObserver`, `NullObserver`. No callers yet. 3. Refactor `DatabricksStatement` to use `_observer` field instead of private hooks. Mechanical; behavior unchanged. 4. Wire telemetry into `StatementExecutionConnection` (`_telemetry`, `OpenAsync`, `Dispose`). 5. Wire telemetry into `StatementExecutionStatement` (`_observer` field + hookpoint calls).