From dfd8c2d6a62146f16b190002867c0c6ec4d32216 Mon Sep 17 00:00:00 2001 From: Jakub Kopys Date: Tue, 14 Oct 2025 13:22:24 +0200 Subject: [PATCH 1/2] fix: respect timeout even when token is provided --- Source/MQTTnet/MqttClient.cs | 81 ++++++++++++++---------------------- 1 file changed, 31 insertions(+), 50 deletions(-) diff --git a/Source/MQTTnet/MqttClient.cs b/Source/MQTTnet/MqttClient.cs index 8f278b857..cb9898a68 100644 --- a/Source/MQTTnet/MqttClient.cs +++ b/Source/MQTTnet/MqttClient.cs @@ -126,17 +126,12 @@ public async Task ConnectAsync(MqttClientOptions option _unexpectedDisconnectPacket = null; - if (cancellationToken.CanBeCanceled) - { - connectResult = await ConnectInternal(adapter, cancellationToken).ConfigureAwait(false); - } - else - { - // Fall back to the general timeout specified in the options if the user passed - // CancellationToken.None or similar. - using var timeout = new CancellationTokenSource(Options.Timeout); - connectResult = await ConnectInternal(adapter, timeout.Token).ConfigureAwait(false); - } + using var timeoutCts = cancellationToken.CanBeCanceled + ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken) + : new CancellationTokenSource(); + + timeoutCts.CancelAfter(Options.Timeout); + connectResult = await ConnectInternal(adapter, timeoutCts.Token).ConfigureAwait(false); if (connectResult.ResultCode != MqttClientConnectResultCode.Success) { @@ -211,15 +206,12 @@ public async Task DisconnectAsync(MqttClientDisconnectOptions options, Cancellat // must be thrown to let the caller know that the disconnect was not clean. var disconnectPacket = MqttDisconnectPacketFactory.Create(options); - if (cancellationToken.CanBeCanceled) - { - await Send(disconnectPacket, cancellationToken).ConfigureAwait(false); - } - else - { - using var timeout = new CancellationTokenSource(Options.Timeout); - await Send(disconnectPacket, timeout.Token).ConfigureAwait(false); - } + using var timeoutCts = cancellationToken.CanBeCanceled + ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken) + : new CancellationTokenSource(); + + timeoutCts.CancelAfter(Options.Timeout); + await Send(disconnectPacket, timeoutCts.Token).ConfigureAwait(false); } finally { @@ -234,15 +226,12 @@ public async Task PingAsync(CancellationToken cancellationToken = default) ThrowIfDisposed(); ThrowIfNotConnected(); - if (cancellationToken.CanBeCanceled) - { - await Request(MqttPingReqPacket.Instance, cancellationToken).ConfigureAwait(false); - } - else - { - using var timeout = new CancellationTokenSource(Options.Timeout); - await Request(MqttPingReqPacket.Instance, timeout.Token).ConfigureAwait(false); - } + using var timeoutCts = cancellationToken.CanBeCanceled + ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken) + : new CancellationTokenSource(); + + timeoutCts.CancelAfter(Options.Timeout); + await Request(MqttPingReqPacket.Instance, timeoutCts.Token).ConfigureAwait(false); } public Task PublishAsync(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken = default) @@ -322,16 +311,12 @@ public async Task SubscribeAsync(MqttClientSubscribeO var subscribePacket = MqttSubscribePacketFactory.Create(options); subscribePacket.PacketIdentifier = _packetIdentifierProvider.GetNextPacketIdentifier(); - MqttSubAckPacket subAckPacket; - if (cancellationToken.CanBeCanceled) - { - subAckPacket = await Request(subscribePacket, cancellationToken).ConfigureAwait(false); - } - else - { - using var timeout = new CancellationTokenSource(Options.Timeout); - subAckPacket = await Request(subscribePacket, timeout.Token).ConfigureAwait(false); - } + using var timeoutCts = cancellationToken.CanBeCanceled + ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken) + : new CancellationTokenSource(); + + timeoutCts.CancelAfter(Options.Timeout); + var subAckPacket = await Request(subscribePacket, timeoutCts.Token).ConfigureAwait(false); return MqttClientSubscribeResultFactory.Create(subscribePacket, subAckPacket); } @@ -356,16 +341,12 @@ public async Task UnsubscribeAsync(MqttClientUnsubs var unsubscribePacket = MqttUnsubscribePacketFactory.Create(options); unsubscribePacket.PacketIdentifier = _packetIdentifierProvider.GetNextPacketIdentifier(); - MqttUnsubAckPacket unsubAckPacket; - if (cancellationToken.CanBeCanceled) - { - unsubAckPacket = await Request(unsubscribePacket, cancellationToken).ConfigureAwait(false); - } - else - { - using var timeout = new CancellationTokenSource(Options.Timeout); - unsubAckPacket = await Request(unsubscribePacket, timeout.Token).ConfigureAwait(false); - } + using var timeoutCts = cancellationToken.CanBeCanceled + ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken) + : new CancellationTokenSource(); + + timeoutCts.CancelAfter(Options.Timeout); + var unsubAckPacket = await Request(unsubscribePacket, timeoutCts.Token).ConfigureAwait(false); return MqttClientUnsubscribeResultFactory.Create(unsubscribePacket, unsubAckPacket); } @@ -1069,4 +1050,4 @@ async Task TrySendKeepAliveMessages(CancellationToken cancellationToken) _logger.Verbose("Stopped sending keep alive packets"); } } -} \ No newline at end of file +} From 3350753808a344209e267cec0caa9ebd6758d6a5 Mon Sep 17 00:00:00 2001 From: Jakub Kopys Date: Tue, 14 Oct 2025 13:25:12 +0200 Subject: [PATCH 2/2] cleanup: Extract repeated code to a method --- Source/MQTTnet/MqttClient.cs | 40 ++++++++++++++---------------------- 1 file changed, 15 insertions(+), 25 deletions(-) diff --git a/Source/MQTTnet/MqttClient.cs b/Source/MQTTnet/MqttClient.cs index cb9898a68..805fb3a41 100644 --- a/Source/MQTTnet/MqttClient.cs +++ b/Source/MQTTnet/MqttClient.cs @@ -126,11 +126,7 @@ public async Task ConnectAsync(MqttClientOptions option _unexpectedDisconnectPacket = null; - using var timeoutCts = cancellationToken.CanBeCanceled - ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken) - : new CancellationTokenSource(); - - timeoutCts.CancelAfter(Options.Timeout); + using var timeoutCts = CreateTimeoutCancellationTokenSource(cancellationToken); connectResult = await ConnectInternal(adapter, timeoutCts.Token).ConfigureAwait(false); if (connectResult.ResultCode != MqttClientConnectResultCode.Success) @@ -206,11 +202,7 @@ public async Task DisconnectAsync(MqttClientDisconnectOptions options, Cancellat // must be thrown to let the caller know that the disconnect was not clean. var disconnectPacket = MqttDisconnectPacketFactory.Create(options); - using var timeoutCts = cancellationToken.CanBeCanceled - ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken) - : new CancellationTokenSource(); - - timeoutCts.CancelAfter(Options.Timeout); + using var timeoutCts = CreateTimeoutCancellationTokenSource(cancellationToken); await Send(disconnectPacket, timeoutCts.Token).ConfigureAwait(false); } finally @@ -226,11 +218,7 @@ public async Task PingAsync(CancellationToken cancellationToken = default) ThrowIfDisposed(); ThrowIfNotConnected(); - using var timeoutCts = cancellationToken.CanBeCanceled - ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken) - : new CancellationTokenSource(); - - timeoutCts.CancelAfter(Options.Timeout); + using var timeoutCts = CreateTimeoutCancellationTokenSource(cancellationToken); await Request(MqttPingReqPacket.Instance, timeoutCts.Token).ConfigureAwait(false); } @@ -311,11 +299,7 @@ public async Task SubscribeAsync(MqttClientSubscribeO var subscribePacket = MqttSubscribePacketFactory.Create(options); subscribePacket.PacketIdentifier = _packetIdentifierProvider.GetNextPacketIdentifier(); - using var timeoutCts = cancellationToken.CanBeCanceled - ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken) - : new CancellationTokenSource(); - - timeoutCts.CancelAfter(Options.Timeout); + using var timeoutCts = CreateTimeoutCancellationTokenSource(cancellationToken); var subAckPacket = await Request(subscribePacket, timeoutCts.Token).ConfigureAwait(false); return MqttClientSubscribeResultFactory.Create(subscribePacket, subAckPacket); @@ -341,11 +325,7 @@ public async Task UnsubscribeAsync(MqttClientUnsubs var unsubscribePacket = MqttUnsubscribePacketFactory.Create(options); unsubscribePacket.PacketIdentifier = _packetIdentifierProvider.GetNextPacketIdentifier(); - using var timeoutCts = cancellationToken.CanBeCanceled - ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken) - : new CancellationTokenSource(); - - timeoutCts.CancelAfter(Options.Timeout); + using var timeoutCts = CreateTimeoutCancellationTokenSource(cancellationToken); var unsubAckPacket = await Request(unsubscribePacket, timeoutCts.Token).ConfigureAwait(false); return MqttClientUnsubscribeResultFactory.Create(unsubscribePacket, unsubAckPacket); @@ -472,6 +452,16 @@ void Cleanup() } } + CancellationTokenSource CreateTimeoutCancellationTokenSource(CancellationToken cancellationToken) + { + var timeoutCts = cancellationToken.CanBeCanceled + ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken) + : new CancellationTokenSource(); + + timeoutCts.CancelAfter(Options.Timeout); + return timeoutCts; + } + MqttClientConnectionStatus CompareExchangeConnectionStatus(MqttClientConnectionStatus value, MqttClientConnectionStatus comparand) { return (MqttClientConnectionStatus)Interlocked.CompareExchange(ref _connectionStatus, (int)value, (int)comparand);