1+ // Copyright (c) KeelMatrix
2+
13#nullable enable
24using System . Data ;
35using System . Data . Common ;
68
79namespace KeelMatrix . QueryWatch . Ado {
810 /// <summary>
9- /// Delegating <see cref="System.Data.Common. DbCommand"/> that measures execution and records into a session.
11+ /// Delegating <see cref="DbCommand"/> that measures execution and records into a session.
1012 /// </summary>
1113 public sealed class QueryWatchCommand : DbCommand {
1214 private readonly DbCommand _inner ;
1315 private readonly QueryWatchSession _session ;
1416
1517 // Keep track of the wrapper connection (so getters surface the wrapper instance).
1618 private QueryWatchConnection ? _wrapperConnection ;
19+ // Keep track of a wrapper transaction so we can track readers for draining before Commit().
20+ private QueryWatchTransaction ? _wrapperTransaction ;
1721
1822 /// <summary>
1923 /// Initializes a new wrapper over an inner <see cref="DbCommand"/>.
@@ -28,6 +32,14 @@ public QueryWatchCommand(DbCommand inner, QueryWatchSession session, DbConnectio
2832 _wrapperConnection = wrapperConnection as QueryWatchConnection ;
2933 }
3034
35+ private sealed class CompositeDisposer : IDisposable {
36+ private readonly CancellationTokenSource _cts ;
37+ private readonly IDisposable _reg ;
38+ public CompositeDisposer ( CancellationTokenSource cts , IDisposable reg ) { _cts = cts ; _reg = reg ; }
39+ public bool IsCancelled => _cts . IsCancellationRequested ;
40+ public void Dispose ( ) { _reg . Dispose ( ) ; _cts . Dispose ( ) ; }
41+ }
42+
3143 #region Property delegation
3244
3345 /// <inheritdoc />
@@ -69,13 +81,20 @@ protected override DbConnection? DbConnection {
6981
7082 /// <inheritdoc />
7183 protected override DbTransaction ? DbTransaction {
72- get => _inner . Transaction ;
84+ get {
85+ // Prefer the wrapper if we have it and it still matches the inner transaction.
86+ if ( _wrapperTransaction is not null && ReferenceEquals ( _inner . Transaction , _wrapperTransaction . Inner ) )
87+ return _wrapperTransaction ;
88+ return _inner . Transaction ;
89+ }
7390 set {
7491 if ( value is QueryWatchTransaction wrapped ) {
7592 _inner . Transaction = wrapped . Inner ;
93+ _wrapperTransaction = wrapped ;
7694 }
7795 else {
7896 _inner . Transaction = value ;
97+ _wrapperTransaction = null ;
7998 }
8099 }
81100 }
@@ -133,54 +152,135 @@ private void RecordFailure(TimeSpan elapsed, Exception ex) {
133152 _session . Record ( text , elapsed , meta ) ;
134153 }
135154
155+ private static Exception NormalizeCancellation ( Exception ex , CancellationToken token ) {
156+ if ( ! token . IsCancellationRequested ) return ex ;
157+
158+ // SQL Server
159+ var full = ex . GetType ( ) ? . FullName ?? string . Empty ;
160+ if ( full . IndexOf ( "SqlClient.SqlException" , StringComparison . OrdinalIgnoreCase ) >= 0 )
161+ return new OperationCanceledException ( "Command was cancelled." , ex , token ) ;
162+
163+ // Npgsql: PostgresException with SqlState 57014 (query_canceled)
164+ var typeName = ex . GetType ( ) . FullName ?? "" ;
165+ if ( ( typeName ?? string . Empty ) . IndexOf ( "Npgsql.PostgresException" , StringComparison . OrdinalIgnoreCase ) >= 0 ) {
166+ try {
167+ var sqlStateProp = ex . GetType ( ) . GetProperty ( "SqlState" ) ;
168+ var sqlState = sqlStateProp ? . GetValue ( ex ) as string ;
169+ if ( string . Equals ( sqlState , "57014" , StringComparison . Ordinal ) )
170+ return new OperationCanceledException ( "Command was cancelled." , ex , token ) ;
171+ }
172+ catch { /* ignore reflection issues */ }
173+ }
174+
175+ // MySQL: MySqlConnector.MySqlException often thrown, or TaskCanceled/Timeout
176+ if ( ( typeName ?? string . Empty ) . IndexOf ( "MySqlConnector.MySqlException" , StringComparison . OrdinalIgnoreCase ) >= 0 ||
177+ ex is TaskCanceledException || ex is TimeoutException )
178+ return new OperationCanceledException ( "Command was cancelled." , ex , token ) ;
179+
180+ // Fallback: if token says cancelled, prefer OCE so callers can handle uniformly.
181+ return new OperationCanceledException ( "Command was cancelled." , ex , token ) ;
182+ }
183+
184+ private static bool IsMySql ( DbCommand cmd )
185+ => cmd . GetType ( ) . Namespace ? . StartsWith ( "MySqlConnector" , StringComparison . Ordinal ) == true ;
186+
187+ /// <summary>Registers a callback that calls provider Cancel() when token is cancelled.</summary>
188+ private static CancellationTokenRegistration RegisterCancelOnToken ( DbCommand cmd , CancellationToken token ) {
189+ if ( ! token . CanBeCanceled )
190+ return default ;
191+ try {
192+ return token . Register ( static state => {
193+ try { ( ( DbCommand ) state ! ) . Cancel ( ) ; } catch { /* best-effort */ }
194+ } , cmd ) ;
195+ }
196+ catch { return default ; }
197+ }
198+
199+ /// <summary>For providers that may not enforce CommandTimeout deterministically (MySQL),
200+ /// schedule a Cancel() at timeout expiry (seconds). No effect for zero/infinite timeout.</summary>
201+ private static CompositeDisposer ? BeginTimeoutCancelIfNeeded ( DbCommand cmd ) {
202+ try {
203+ if ( ! IsMySql ( cmd ) )
204+ return null ; // scope to MySQL only
205+ int seconds = cmd . CommandTimeout ;
206+ if ( seconds <= 0 )
207+ return null ; // 0 == infinite
208+ var cts = new CancellationTokenSource ( TimeSpan . FromSeconds ( seconds ) ) ;
209+ var reg = cts . Token . Register ( static state => {
210+ try { ( ( DbCommand ) state ! ) . Cancel ( ) ; } catch { /* best-effort */ }
211+ } , cmd ) ;
212+ return new CompositeDisposer ( cts , reg ) ;
213+ }
214+ catch { return null ; }
215+ }
216+
136217 #endregion
137218
138219 #region Execute overrides (sync)
139220
140221 /// <inheritdoc />
141222 public override int ExecuteNonQuery ( ) {
142223 var sw = Stopwatch . StartNew ( ) ;
224+ IDisposable ? tmo = BeginTimeoutCancelIfNeeded ( _inner ) ;
143225 try {
144- return _inner . ExecuteNonQuery ( ) ;
226+ var result = _inner . ExecuteNonQuery ( ) ;
227+ if ( tmo is CompositeDisposer s && s . IsCancelled )
228+ throw new TimeoutException ( "CommandTimeout elapsed and the command was cancelled." ) ;
229+ return result ;
145230 }
146231 catch ( Exception ex ) {
147232 sw . Stop ( ) ;
148233 RecordFailure ( sw . Elapsed , ex ) ;
149234 throw ;
150235 }
151236 finally {
237+ tmo ? . Dispose ( ) ;
152238 if ( sw . IsRunning ) { sw . Stop ( ) ; RecordSuccess ( sw . Elapsed ) ; }
153239 }
154240 }
155241
156242 /// <inheritdoc />
157243 public override object ? ExecuteScalar ( ) {
158244 var sw = Stopwatch . StartNew ( ) ;
245+ IDisposable ? tmo = BeginTimeoutCancelIfNeeded ( _inner ) ;
159246 try {
160- return _inner . ExecuteScalar ( ) ;
247+ var result = _inner . ExecuteScalar ( ) ;
248+ if ( tmo is CompositeDisposer s && s . IsCancelled )
249+ throw new TimeoutException ( "CommandTimeout elapsed and the command was cancelled." ) ;
250+ return result ;
161251 }
162252 catch ( Exception ex ) {
163253 sw . Stop ( ) ;
164254 RecordFailure ( sw . Elapsed , ex ) ;
165255 throw ;
166256 }
167257 finally {
258+ tmo ? . Dispose ( ) ;
168259 if ( sw . IsRunning ) { sw . Stop ( ) ; RecordSuccess ( sw . Elapsed ) ; }
169260 }
170261 }
171262
172263 /// <inheritdoc />
173264 protected override DbDataReader ExecuteDbDataReader ( CommandBehavior behavior ) {
174265 var sw = Stopwatch . StartNew ( ) ;
266+ IDisposable ? tmo = BeginTimeoutCancelIfNeeded ( _inner ) ;
175267 try {
176- return _inner . ExecuteReader ( behavior ) ;
268+ var r = _inner . ExecuteReader ( behavior ) ;
269+ if ( tmo is CompositeDisposer s && s . IsCancelled ) {
270+ r . Dispose ( ) ;
271+ throw new TimeoutException ( "CommandTimeout elapsed and the command was cancelled." ) ;
272+ }
273+ var tx = _wrapperTransaction ;
274+ tx ? . TrackReader ( r ) ;
275+ return new QueryWatchDataReader ( r , tx ) ;
177276 }
178277 catch ( Exception ex ) {
179278 sw . Stop ( ) ;
180279 RecordFailure ( sw . Elapsed , ex ) ;
181280 throw ;
182281 }
183282 finally {
283+ tmo ? . Dispose ( ) ;
184284 if ( sw . IsRunning ) { sw . Stop ( ) ; RecordSuccess ( sw . Elapsed ) ; }
185285 }
186286 }
@@ -190,68 +290,102 @@ protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior) {
190290 #region Execute overrides (async)
191291
192292 /// <inheritdoc />
193- public override System . Threading . Tasks . Task < int > ExecuteNonQueryAsync ( System . Threading . CancellationToken cancellationToken ) {
293+ public override Task < int > ExecuteNonQueryAsync ( CancellationToken cancellationToken ) {
194294 return ExecuteNonQueryAsyncCore ( cancellationToken ) ;
195295 }
196296
197- private async System . Threading . Tasks . Task < int > ExecuteNonQueryAsyncCore ( System . Threading . CancellationToken token ) {
297+ private async Task < int > ExecuteNonQueryAsyncCore ( CancellationToken token ) {
198298 var sw = Stopwatch . StartNew ( ) ;
299+ CancellationTokenRegistration reg = default ;
199300 try {
200- return await _inner . ExecuteNonQueryAsync ( token ) . ConfigureAwait ( false ) ;
301+ reg = RegisterCancelOnToken ( _inner , token ) ;
302+ var result = await _inner . ExecuteNonQueryAsync ( token ) . ConfigureAwait ( false ) ;
303+ if ( IsMySql ( _inner ) && token . IsCancellationRequested )
304+ throw new OperationCanceledException ( "Command was cancelled." , token ) ;
305+ return result ;
201306 }
202307 catch ( Exception ex ) {
203308 sw . Stop ( ) ;
204309 RecordFailure ( sw . Elapsed , ex ) ;
205- throw ;
310+ throw NormalizeCancellation ( ex , token ) ;
206311 }
207312 finally {
313+ #if NET8_0_OR_GREATER
314+ await reg . DisposeAsync ( ) ;
315+ #else
316+ reg . Dispose ( ) ;
317+ #endif
208318 if ( sw . IsRunning ) { sw . Stop ( ) ; RecordSuccess ( sw . Elapsed ) ; }
209319 }
210320 }
211321
212322 /// <inheritdoc />
213- public override System . Threading . Tasks . Task < object ? > ExecuteScalarAsync ( System . Threading . CancellationToken cancellationToken ) {
323+ public override Task < object ? > ExecuteScalarAsync ( CancellationToken cancellationToken ) {
214324 return ExecuteScalarAsyncCore ( cancellationToken ) ;
215325 }
216326
217- private async System . Threading . Tasks . Task < object ? > ExecuteScalarAsyncCore ( System . Threading . CancellationToken token ) {
327+ private async Task < object ? > ExecuteScalarAsyncCore ( CancellationToken token ) {
218328 var sw = Stopwatch . StartNew ( ) ;
329+ CancellationTokenRegistration reg = default ;
219330 try {
220- return await _inner . ExecuteScalarAsync ( token ) . ConfigureAwait ( false ) ;
331+ reg = RegisterCancelOnToken ( _inner , token ) ;
332+ var obj = await _inner . ExecuteScalarAsync ( token ) . ConfigureAwait ( false ) ;
333+ if ( IsMySql ( _inner ) && token . IsCancellationRequested )
334+ throw new OperationCanceledException ( "Command was cancelled." , token ) ;
335+ return obj ;
221336 }
222337 catch ( Exception ex ) {
223338 sw . Stop ( ) ;
224339 RecordFailure ( sw . Elapsed , ex ) ;
225- throw ;
340+ throw NormalizeCancellation ( ex , token ) ;
226341 }
227342 finally {
343+ #if NET8_0_OR_GREATER
344+ await reg . DisposeAsync ( ) ;
345+ #else
346+ reg . Dispose ( ) ;
347+ #endif
228348 if ( sw . IsRunning ) { sw . Stop ( ) ; RecordSuccess ( sw . Elapsed ) ; }
229349 }
230350 }
231351
232352 /// <inheritdoc />
233- protected override System . Threading . Tasks . Task < DbDataReader > ExecuteDbDataReaderAsync ( CommandBehavior behavior , System . Threading . CancellationToken cancellationToken ) {
234- return ExecuteDbDataReaderAsyncCore ( behavior , cancellationToken ) ;
235- }
236-
237- private async System . Threading . Tasks . Task < DbDataReader > ExecuteDbDataReaderAsyncCore ( CommandBehavior behavior , System . Threading . CancellationToken token ) {
353+ protected override async Task < DbDataReader > ExecuteDbDataReaderAsync ( CommandBehavior behavior , CancellationToken cancellationToken ) {
238354 var sw = Stopwatch . StartNew ( ) ;
355+ CancellationTokenRegistration reg = default ;
239356 try {
240- return await _inner . ExecuteReaderAsync ( behavior , token ) . ConfigureAwait ( false ) ;
357+ reg = RegisterCancelOnToken ( _inner , cancellationToken ) ;
358+ var r = await _inner . ExecuteReaderAsync ( behavior , cancellationToken ) . ConfigureAwait ( false ) ;
359+ if ( IsMySql ( _inner ) && cancellationToken . IsCancellationRequested ) {
360+ #if NET8_0_OR_GREATER
361+ await r . DisposeAsync ( ) ;
362+ #else
363+ r . Dispose ( ) ;
364+ #endif
365+ throw new OperationCanceledException ( "Command was cancelled." , cancellationToken ) ;
366+ }
367+ var tx = _wrapperTransaction ;
368+ tx ? . TrackReader ( r ) ;
369+ return new QueryWatchDataReader ( r , tx ) ;
241370 }
242371 catch ( Exception ex ) {
243372 sw . Stop ( ) ;
244373 RecordFailure ( sw . Elapsed , ex ) ;
245- throw ;
374+ throw NormalizeCancellation ( ex , cancellationToken ) ;
246375 }
247376 finally {
377+ #if NET8_0_OR_GREATER
378+ await reg . DisposeAsync ( ) ;
379+ #else
380+ reg . Dispose ( ) ;
381+ #endif
248382 if ( sw . IsRunning ) { sw . Stop ( ) ; RecordSuccess ( sw . Elapsed ) ; }
249383 }
250384 }
251385
252386 #endregion
253387
254- #region Boilerplate
388+ #region Misc
255389
256390 /// <inheritdoc />
257391 public override void Cancel ( ) => _inner . Cancel ( ) ;
@@ -262,12 +396,6 @@ private async System.Threading.Tasks.Task<DbDataReader> ExecuteDbDataReaderAsync
262396 /// <inheritdoc />
263397 protected override DbParameter CreateDbParameter ( ) => _inner . CreateParameter ( ) ;
264398
265- /// <inheritdoc />
266- protected override void Dispose ( bool disposing ) {
267- if ( disposing ) _inner . Dispose ( ) ;
268- base . Dispose ( disposing ) ;
269- }
270-
271399 #endregion
272400 }
273401}
0 commit comments