Skip to content

Commit 6796ae8

Browse files
authored
Better packet ID management with O(1) allocation and reuse (#264)
1 parent 60ad39d commit 6796ae8

File tree

4 files changed

+149
-30
lines changed

4 files changed

+149
-30
lines changed

Benchmarks/ClientBenchmarkApp/ClientBenchmark.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,5 +169,29 @@ await this.client.PublishAsync(
169169
}
170170
}
171171

172+
[Benchmark(Description = "Publish 1k QoS 1 messages")]
173+
public async Task Publish1kQoS1MessageAsync()
174+
{
175+
for (var i = 0; i < 1000; i++)
176+
{
177+
await this.client.PublishAsync(
178+
"benchmarks/PublishQoS1Messages",
179+
this.smallPayload,
180+
QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false);
181+
}
182+
}
183+
184+
[Benchmark(Description = "Publish 1k QoS 2 messages")]
185+
public async Task Publish1kQoS2MessageAsync()
186+
{
187+
for (var i = 0; i < 1000; i++)
188+
{
189+
await this.client.PublishAsync(
190+
"benchmarks/PublishQoS2Messages",
191+
this.smallPayload,
192+
QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false);
193+
}
194+
}
195+
172196
public void Dispose() => GC.SuppressFinalize(this);
173197
}

Benchmarks/ClientBenchmarkApp/README.md

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,67 @@ The following results are from benchmarks run on my local MBP against a HiveMQ v
1818
1 ms : 1,000 Microseconds
1919
```
2020

21+
## October 28, 2025 v0.31.0
22+
23+
### Performance Improvements from v0.30.0
24+
25+
Significant performance improvements across all benchmarks, with the most notable gains in:
26+
27+
- **1k QoS 1 messages**: ~43% faster (287,802.82μs → 163,134.95μs)
28+
- **1k QoS 2 messages**: ~39% faster (533,574.86μs → 325,461.87μs)
29+
30+
The most dramatic improvements were seen in bulk message publishing scenarios, particularly for QoS 1 and QoS 2 messages, indicating better handling of acknowledgment flows and reduced overhead per message in batch operations.
31+
32+
| Method | Mean | Error | StdDev | Median |
33+
|------------------------------------------------- |--------------:|-------------:|--------------:|---------------:|
34+
| 'Publish a QoS 0 message' | 24.20 us | 70.88 us | 208.99 us | 2.646 us |
35+
| 'Publish a QoS 1 message' | 6,094.69 us | 16,414.36 us | 48,398.12 us | 1,026.708 us |
36+
| 'Publish a QoS 2 message' | 8,083.58 us | 17,437.59 us | 51,415.13 us | 1,353.751 us |
37+
| 'Publish 100 256b length payload QoS 0 messages' | 151.09 us | 116.79 us | 344.37 us | 107.188 us |
38+
| 'Publish 100 256b length payload QoS 1 messages' | 26,178.26 us | 18,001.87 us | 53,078.92 us | 20,828.730 us |
39+
| 'Publish 100 256b length payload QoS 2 messages' | 43,196.69 us | 19,064.73 us | 56,212.80 us | 36,206.188 us |
40+
| 'Publish 100 256k length payload QoS 0 messages' | 139.16 us | 94.26 us | 277.94 us | 101.584 us |
41+
| 'Publish 100 256k length payload QoS 1 messages' | 157,189.65 us | 41,857.20 us | 123,416.91 us | 139,608.958 us |
42+
| 'Publish 100 256k length payload QoS 2 messages' | 180,717.82 us | 49,526.73 us | 146,030.67 us | 154,019.188 us |
43+
| 'Publish 1k QoS 1 messages' | 163,134.95 us | 19,365.29 us | 57,099.01 us | 160,983.167 us |
44+
| 'Publish 1k QoS 2 messages' | 325,461.87 us | 26,343.40 us | 77,674.12 us | 314,379.667 us |
45+
46+
47+
## October 28, 2025 v0.30.0
48+
49+
### Performance Improvements from Mar 22, 2024
50+
51+
Major performance improvements across all benchmarks, with substantial gains in bulk message publishing:
52+
53+
- **100x 256b QoS 1 messages**: ~43% faster (45,813.98μs → 26,283.55μs)
54+
- **100x 256b QoS 2 messages**: ~50% faster (88,589.38μs → 44,557.05μs)
55+
- **100x 256k QoS 1 messages**: ~43% faster (270,043.05μs → 155,177.99μs)
56+
- **100x 256k QoS 2 messages**: ~43% faster (300,923.38μs → 172,109.05μs)
57+
58+
**New benchmarks introduced**: Added 1k message bulk publishing tests for QoS 1 and QoS 2, providing better insights into high-volume scenarios.
59+
60+
The improvements demonstrate significant optimization in batch processing and acknowledgment handling, particularly for larger payloads and bulk operations.
61+
62+
| Method | Mean | Error | StdDev | Median |
63+
|------------------------------------------------- |--------------:|-------------:|--------------:|---------------:|
64+
| 'Publish a QoS 0 message' | 39.67 us | 89.35 us | 263.45 us | 8.521 us |
65+
| 'Publish a QoS 1 message' | 5,892.86 us | 16,632.25 us | 49,040.57 us | 911.083 us |
66+
| 'Publish a QoS 2 message' | 7,369.47 us | 16,676.10 us | 49,169.85 us | 1,457.687 us |
67+
| 'Publish 100 256b length payload QoS 0 messages' | 137.26 us | 78.49 us | 231.44 us | 103.145 us |
68+
| 'Publish 100 256b length payload QoS 1 messages' | 26,283.55 us | 19,855.80 us | 58,545.26 us | 19,256.166 us |
69+
| 'Publish 100 256b length payload QoS 2 messages' | 44,557.05 us | 21,810.68 us | 64,309.29 us | 36,249.938 us |
70+
| 'Publish 100 256k length payload QoS 0 messages' | 141.71 us | 96.62 us | 284.88 us | 102.645 us |
71+
| 'Publish 100 256k length payload QoS 1 messages' | 155,177.99 us | 39,395.25 us | 116,157.79 us | 138,491.062 us |
72+
| 'Publish 100 256k length payload QoS 2 messages' | 172,109.05 us | 44,912.98 us | 132,426.93 us | 149,029.541 us |
73+
| 'Publish 1k QoS 1 messages' | 287,802.82 us | 90,041.54 us | 265,489.51 us | 248,344.521 us |
74+
| 'Publish 1k QoS 2 messages' | 533,574.86 us | 62,147.67 us | 183,243.82 us | 475,369.771 us |
75+
2176
## Mar 22, 2024
2277

78+
**New comprehensive benchmarks**: Introduced bulk message publishing tests with different payload sizes (256b and 256k), providing detailed performance insights for real-world scenarios.
79+
80+
The results demonstrate excellent scalability and efficiency in handling bulk operations, with particularly strong performance for larger payloads and QoS 2 message handling.
81+
2382
| Method | Mean | Error | StdDev | Median |
2483
|------------------------------------------------- |--------------:|------------:|-------------:|---------------:|
2584
| 'Publish a QoS 0 message' | 57.27 us | 158.55 us | 467.50 us | 9.084 us |

Source/HiveMQtt/Client/internal/PacketIDManager.cs

Lines changed: 63 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,37 +4,52 @@ namespace HiveMQtt.Client.Internal;
44

55
public class PacketIDManager
66
{
7-
private HashSet<int> PacketIDsInUse { get; } = new();
8-
7+
// Use only BitArray for O(1) operations - more memory efficient than HashSet
98
private BitArray PacketIDBitArray { get; } = new BitArray(65536);
109

1110
private SemaphoreSlim SemLock { get; } = new(1, 1);
1211

13-
private int LastPacketId { get; set; } = 1;
12+
// Circular allocation starting from 1 (0 is reserved)
13+
private int NextPacketId { get; set; } = 1;
14+
15+
// Queue for recently freed packet IDs to enable immediate reuse
16+
private Queue<int> FreedPacketIds { get; } = new();
1417

1518
public PacketIDManager() => this.PacketIDBitArray.SetAll(false);
1619

1720
/// <summary>
18-
/// Gets the next available packet ID.
21+
/// Gets the next available packet ID with O(1) performance.
1922
/// </summary>
2023
/// <returns>The next available packet ID.</returns>
2124
public async Task<int> GetAvailablePacketIDAsync()
2225
{
2326
// Obtain the lock
2427
await this.SemLock.WaitAsync().ConfigureAwait(false);
2528

26-
var candidate = this.FindNextAvailablePacketID();
27-
this.PacketIDsInUse.Add(candidate);
28-
this.PacketIDBitArray[candidate] = true;
29-
30-
// Release the lock
31-
this.SemLock.Release();
29+
try
30+
{
31+
// First, try to reuse a recently freed packet ID
32+
if (this.FreedPacketIds.Count > 0)
33+
{
34+
var reusedId = this.FreedPacketIds.Dequeue();
35+
this.PacketIDBitArray[reusedId] = true;
36+
return reusedId;
37+
}
3238

33-
return candidate;
39+
// Otherwise, find the next available packet ID using circular allocation
40+
var candidate = this.FindNextAvailablePacketID();
41+
this.PacketIDBitArray[candidate] = true;
42+
return candidate;
43+
}
44+
finally
45+
{
46+
// Release the lock
47+
this.SemLock.Release();
48+
}
3449
}
3550

3651
/// <summary>
37-
/// Marks a packet ID as available.
52+
/// Marks a packet ID as available and adds it to the reuse queue.
3853
/// </summary>
3954
/// <param name="packetId">The packet ID to mark as available.</param>
4055
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
@@ -43,36 +58,44 @@ public async Task MarkPacketIDAsAvailableAsync(int packetId)
4358
// Obtain the lock
4459
await this.SemLock.WaitAsync().ConfigureAwait(false);
4560

46-
this.PacketIDsInUse.Remove(packetId);
47-
this.PacketIDBitArray[packetId] = false;
61+
try
62+
{
63+
// Mark as available in the bit array
64+
this.PacketIDBitArray[packetId] = false;
4865

49-
// Release the lock
50-
this.SemLock.Release();
66+
// Add to reuse queue for immediate availability
67+
this.FreedPacketIds.Enqueue(packetId);
68+
}
69+
finally
70+
{
71+
// Release the lock
72+
this.SemLock.Release();
73+
}
5174
}
5275

5376
/// <summary>
54-
/// Finds the next available packet ID.
77+
/// Finds the next available packet ID using efficient circular allocation.
5578
/// </summary>
5679
/// <returns>The next available packet ID.</returns>
5780
/// <exception cref="InvalidOperationException">Thrown when no available packet IDs are available.</exception>
5881
internal int FindNextAvailablePacketID()
5982
{
60-
// Loop through starting at the last served packet ID
61-
for (var i = this.LastPacketId; i <= 65535; i++)
83+
// Start from the last allocated packet ID and search forward
84+
for (var i = this.NextPacketId; i <= 65535; i++)
6285
{
63-
if (!this.PacketIDsInUse.Contains(i) && !this.PacketIDBitArray[i])
86+
if (!this.PacketIDBitArray[i])
6487
{
65-
this.LastPacketId = i;
88+
this.NextPacketId = i + 1;
6689
return i;
6790
}
6891
}
6992

70-
// We hit the end of the range, loop from the beginning
71-
for (var i = 1; i < this.LastPacketId; i++)
93+
// Wrap around and search from 1 to the last allocated ID
94+
for (var i = 1; i < this.NextPacketId; i++)
7295
{
73-
if (!this.PacketIDsInUse.Contains(i) && !this.PacketIDBitArray[i])
96+
if (!this.PacketIDBitArray[i])
7497
{
75-
this.LastPacketId = i;
98+
this.NextPacketId = i + 1;
7699
return i;
77100
}
78101
}
@@ -83,5 +106,19 @@ internal int FindNextAvailablePacketID()
83106
/// <summary>
84107
/// Gets the number of packet IDs in use.
85108
/// </summary>
86-
public int Count => this.PacketIDsInUse.Count;
109+
public int Count
110+
{
111+
get
112+
{
113+
var count = 0;
114+
for (var i = 1; i <= 65535; i++)
115+
{
116+
if (this.PacketIDBitArray[i])
117+
{
118+
count++;
119+
}
120+
}
121+
return count;
122+
}
123+
}
87124
}

Tests/HiveMQtt.Test/HiveMQClient/Plan/PacketIDManagerTest.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,9 @@ public async Task Send_1Mio_QoS1_QoS2_Messages_All_Ids_Released_Async()
2323
var packetIdManager = client.Connection.GetPacketIDManager(); // Assuming the client exposes the manager for validation
2424
Assert.Equal(0, packetIdManager.Count);
2525

26-
// Manually tested with 1M messages, 500k QoS1 and 500k QoS2
27-
// Lower the count for the test suite to remain manageable
28-
var qos1Messages = 5000;
29-
var qos2Messages = 5000;
26+
// Testing with 500k messages, 250k QoS1 and 250k QoS2
27+
var qos1Messages = 250000;
28+
var qos2Messages = 250000;
3029
var totalMessages = qos1Messages + qos2Messages;
3130

3231
// Act

0 commit comments

Comments
 (0)