Skip to content

Commit d57b055

Browse files
authored
Replace Delay with Connection Event Signal (#272)
1 parent bddd827 commit d57b055

File tree

4 files changed

+28
-3
lines changed

4 files changed

+28
-3
lines changed

Source/HiveMQtt/Client/Connection/ConnectionManager.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
namespace HiveMQtt.Client.Connection;
1717

1818
using System.Diagnostics;
19+
using System.Threading;
20+
using System.Threading.Tasks;
1921
using HiveMQtt.Client.Internal;
2022
using HiveMQtt.Client.Transport;
2123
using HiveMQtt.MQTT5;
@@ -68,6 +70,9 @@ public partial class ConnectionManager : IDisposable
6870
// This is used to know if and when we need to send a MQTT PingReq
6971
private readonly Stopwatch lastCommunicationTimer = new();
7072

73+
// Event-like signal to indicate the connection reached Connected state
74+
private TaskCompletionSource<bool> connectedSignal = new(TaskCreationOptions.RunContinuationsAsynchronously);
75+
7176
/// <summary>
7277
/// Initializes a new instance of the <see cref="ConnectionManager"/> class.
7378
/// </summary>
@@ -79,6 +84,7 @@ public ConnectionManager(HiveMQClient client)
7984
this.IPubTransactionQueue = new BoundedDictionaryX<int, List<ControlPacket>>(this.Client.Options.ClientReceiveMaximum);
8085
this.OPubTransactionQueue = new BoundedDictionaryX<int, List<ControlPacket>>(65535);
8186
this.State = ConnectState.Disconnected;
87+
this.ResetConnectedSignal();
8288

8389
// Connect the appropriate transport
8490
if (this.Client.Options.Host.StartsWith("ws://", StringComparison.OrdinalIgnoreCase) ||
@@ -100,6 +106,18 @@ public ConnectionManager(HiveMQClient client)
100106
Logger.Trace(" -(RPH)- == ReceivedPacketsHandler");
101107
}
102108

109+
internal Task WaitUntilConnectedAsync(CancellationToken cancellationToken) => this.connectedSignal.Task.WaitAsync(cancellationToken);
110+
111+
internal void SignalConnected()
112+
{
113+
if (!this.connectedSignal.Task.IsCompleted)
114+
{
115+
this.connectedSignal.TrySetResult(true);
116+
}
117+
}
118+
119+
internal void ResetConnectedSignal() => this.connectedSignal = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
120+
103121
internal async Task<bool> ConnectAsync()
104122
{
105123
// Connect the appropriate transport

Source/HiveMQtt/Client/Connection/ConnectionManagerHandlers.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ internal void HandleIncomingConnAckPacket(ConnAckPacket connAckPacket)
4242
}
4343

4444
this.ConnectionProperties = connAckPacket.Properties;
45+
4546
this.Client.OnConnAckReceivedEventLauncher(connAckPacket);
4647
}
4748

@@ -390,6 +391,9 @@ internal async Task<bool> HandleDisconnectionAsync(bool clean = true)
390391
// Cancel all background tasks and close the socket
391392
this.State = ConnectState.Disconnected;
392393

394+
// Reset the connection-ready signal for the next connect cycle
395+
this.ResetConnectedSignal();
396+
393397
// Cancel all background tasks
394398
await this.CancelBackgroundTasksAsync().ConfigureAwait(false);
395399

Source/HiveMQtt/Client/Connection/ConnectionManagerTasks.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,11 @@ private Task ConnectionPublishWriterAsync(CancellationToken cancellationToken) =
119119
{
120120
try
121121
{
122-
while (this.State != ConnectState.Connected)
122+
// Await connection readiness without polling to avoid arbitrary delay
123+
if (this.State != ConnectState.Connected)
123124
{
124125
Logger.Trace($"{this.Client.Options.ClientId}-(PW)- Not connected. Waiting for connect...");
125-
await Task.Delay(500).ConfigureAwait(false);
126-
continue;
126+
await this.WaitUntilConnectedAsync(cancellationToken).ConfigureAwait(false);
127127
}
128128

129129
var writeSuccess = true;

Source/HiveMQtt/Client/HiveMQClient.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,9 @@ public async Task<ConnectResult> ConnectAsync(ConnectOptions? connectOptions = n
152152
if (connAck.ReasonCode == ConnAckReasonCode.Success)
153153
{
154154
this.Connection.State = ConnectState.Connected;
155+
156+
// Ensure connection-ready signal is set for any writers awaiting readiness
157+
this.Connection.SignalConnected();
155158
}
156159
else
157160
{

0 commit comments

Comments
 (0)