Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 21 additions & 50 deletions Source/MQTTnet/MqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,8 @@ public async Task<MqttClientConnectResult> ConnectAsync(MqttClientOptions option

_unexpectedDisconnectPacket = null;

if (cancellationToken.CanBeCanceled)
{
connectResult = await ConnectInternal(adapter, cancellationToken).ConfigureAwait(false);
}
else
{
// Fall back to the general timeout specified in the options if the user passed
// CancellationToken.None or similar.
using var timeout = new CancellationTokenSource(Options.Timeout);
connectResult = await ConnectInternal(adapter, timeout.Token).ConfigureAwait(false);
}
using var timeoutCts = CreateTimeoutCancellationTokenSource(cancellationToken);
connectResult = await ConnectInternal(adapter, timeoutCts.Token).ConfigureAwait(false);

if (connectResult.ResultCode != MqttClientConnectResultCode.Success)
{
Expand Down Expand Up @@ -211,15 +202,8 @@ public async Task DisconnectAsync(MqttClientDisconnectOptions options, Cancellat
// must be thrown to let the caller know that the disconnect was not clean.
var disconnectPacket = MqttDisconnectPacketFactory.Create(options);

if (cancellationToken.CanBeCanceled)
{
await Send(disconnectPacket, cancellationToken).ConfigureAwait(false);
}
else
{
using var timeout = new CancellationTokenSource(Options.Timeout);
await Send(disconnectPacket, timeout.Token).ConfigureAwait(false);
}
using var timeoutCts = CreateTimeoutCancellationTokenSource(cancellationToken);
await Send(disconnectPacket, timeoutCts.Token).ConfigureAwait(false);
}
finally
{
Expand All @@ -234,15 +218,8 @@ public async Task PingAsync(CancellationToken cancellationToken = default)
ThrowIfDisposed();
ThrowIfNotConnected();

if (cancellationToken.CanBeCanceled)
{
await Request<MqttPingRespPacket>(MqttPingReqPacket.Instance, cancellationToken).ConfigureAwait(false);
}
else
{
using var timeout = new CancellationTokenSource(Options.Timeout);
await Request<MqttPingRespPacket>(MqttPingReqPacket.Instance, timeout.Token).ConfigureAwait(false);
}
using var timeoutCts = CreateTimeoutCancellationTokenSource(cancellationToken);
await Request<MqttPingRespPacket>(MqttPingReqPacket.Instance, timeoutCts.Token).ConfigureAwait(false);
}

public Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -322,16 +299,8 @@ public async Task<MqttClientSubscribeResult> SubscribeAsync(MqttClientSubscribeO
var subscribePacket = MqttSubscribePacketFactory.Create(options);
subscribePacket.PacketIdentifier = _packetIdentifierProvider.GetNextPacketIdentifier();

MqttSubAckPacket subAckPacket;
if (cancellationToken.CanBeCanceled)
{
subAckPacket = await Request<MqttSubAckPacket>(subscribePacket, cancellationToken).ConfigureAwait(false);
}
else
{
using var timeout = new CancellationTokenSource(Options.Timeout);
subAckPacket = await Request<MqttSubAckPacket>(subscribePacket, timeout.Token).ConfigureAwait(false);
}
using var timeoutCts = CreateTimeoutCancellationTokenSource(cancellationToken);
var subAckPacket = await Request<MqttSubAckPacket>(subscribePacket, timeoutCts.Token).ConfigureAwait(false);

return MqttClientSubscribeResultFactory.Create(subscribePacket, subAckPacket);
}
Expand All @@ -356,16 +325,8 @@ public async Task<MqttClientUnsubscribeResult> UnsubscribeAsync(MqttClientUnsubs
var unsubscribePacket = MqttUnsubscribePacketFactory.Create(options);
unsubscribePacket.PacketIdentifier = _packetIdentifierProvider.GetNextPacketIdentifier();

MqttUnsubAckPacket unsubAckPacket;
if (cancellationToken.CanBeCanceled)
{
unsubAckPacket = await Request<MqttUnsubAckPacket>(unsubscribePacket, cancellationToken).ConfigureAwait(false);
}
else
{
using var timeout = new CancellationTokenSource(Options.Timeout);
unsubAckPacket = await Request<MqttUnsubAckPacket>(unsubscribePacket, timeout.Token).ConfigureAwait(false);
}
using var timeoutCts = CreateTimeoutCancellationTokenSource(cancellationToken);
var unsubAckPacket = await Request<MqttUnsubAckPacket>(unsubscribePacket, timeoutCts.Token).ConfigureAwait(false);

return MqttClientUnsubscribeResultFactory.Create(unsubscribePacket, unsubAckPacket);
}
Expand Down Expand Up @@ -491,6 +452,16 @@ void Cleanup()
}
}

CancellationTokenSource CreateTimeoutCancellationTokenSource(CancellationToken cancellationToken)
{
var timeoutCts = cancellationToken.CanBeCanceled
? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)
: new CancellationTokenSource();

timeoutCts.CancelAfter(Options.Timeout);
return timeoutCts;
}

MqttClientConnectionStatus CompareExchangeConnectionStatus(MqttClientConnectionStatus value, MqttClientConnectionStatus comparand)
{
return (MqttClientConnectionStatus)Interlocked.CompareExchange(ref _connectionStatus, (int)value, (int)comparand);
Expand Down Expand Up @@ -1069,4 +1040,4 @@ async Task TrySendKeepAliveMessages(CancellationToken cancellationToken)
_logger.Verbose("Stopped sending keep alive packets");
}
}
}
}