1+ using System . Collections . Concurrent ;
12using JustSaying . Messaging . MessageHandling ;
23using JustSaying . Models ;
34using JustSaying . TestingFramework ;
@@ -41,37 +42,38 @@ public async Task Then_The_Messages_Are_Handled_With_Throttle()
4142 c . WithSubscriptionGroup ( "group" ) ) ) ) )
4243 . AddSingleton < IHandlerAsync < WaitingMessage > > ( handler ) ;
4344
44- var baseSleep = TimeSpan . FromMilliseconds ( 50 ) ;
45-
4645 await WhenAsync (
4746 services ,
4847 async ( publisher , listener , cancellationToken ) =>
4948 {
50- var waitForElevenMessages = handler . WaitForMessageCountAsync ( 11 , cancellationToken ) ;
51-
5249 await listener . StartAsync ( cancellationToken ) ;
5350 await publisher . StartAsync ( cancellationToken ) ;
5451
5552 // Publish the message with a long-running handler
5653 await publisher . PublishAsync ( messagesToSend . First ( ) , cancellationToken ) ;
5754
58- // Give some time to AWS to schedule the first long-running message
59- await Task . Delay ( baseSleep , cancellationToken ) ;
55+ // Wait until the first (long-running) message has started processing
56+ // This ensures it occupies one of the two concurrency slots
57+ await handler . WaitForFirstMessageStartedAsync ( cancellationToken ) ;
6058
61- foreach ( var msg in messagesToSend . Skip ( 1 ) . SkipLast ( 1 ) )
59+ // Now publish the remaining messages
60+ foreach ( var msg in messagesToSend . Skip ( 1 ) )
6261 {
6362 await publisher . PublishAsync ( msg , cancellationToken ) ;
6463 }
6564
66- // Publish the last message after a couple of seconds to guarantee it was scheduled after all the rest
67- await Task . Delay ( baseSleep , cancellationToken ) ;
68- await publisher . PublishAsync ( messagesToSend . Last ( ) , cancellationToken ) ;
65+ // Wait for all short messages (messages 1-29) to complete
66+ // Message 0 is still running and won't complete during this test
67+ await handler . WaitForCompletedCountAsync ( 29 , cancellationToken ) ;
6968
70- // Wait for more than 10 messages.
71- await waitForElevenMessages ;
69+ // Get the completed messages (excluding the still-running message 0)
70+ var completedMessages = handler . CompletedMessages . ToList ( ) ;
71+ completedMessages . Count . ShouldBeGreaterThanOrEqualTo ( 29 ) ;
7272
73- handler . ReceivedMessages . Count . ShouldBeGreaterThan ( 10 ) ;
74- handler . ReceivedMessages . ShouldBeInOrder ( SortDirection . Ascending ) ;
73+ // Verify that messages 1-29 were processed in order
74+ // (they should be, since only one slot was available after message 0 started)
75+ var shortMessages = completedMessages . Where ( m => m . Order > 0 ) . ToList ( ) ;
76+ shortMessages . ShouldBeInOrder ( SortDirection . Ascending ) ;
7577 } ) ;
7678 }
7779
@@ -90,11 +92,40 @@ public int CompareTo(WaitingMessage other)
9092
9193 private class InspectableWaitingHandler ( ITestOutputHelper outputHelper ) : InspectableHandler < WaitingMessage >
9294 {
95+ private readonly TaskCompletionSource _firstMessageStarted = new ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
96+ private readonly ConcurrentQueue < WaitingMessage > _completedMessages = new ( ) ;
97+ private int _completedCount ;
98+
99+ public ConcurrentQueue < WaitingMessage > CompletedMessages => _completedMessages ;
100+
101+ public Task WaitForFirstMessageStartedAsync ( CancellationToken cancellationToken )
102+ {
103+ cancellationToken . Register ( ( ) => _firstMessageStarted . TrySetCanceled ( cancellationToken ) ) ;
104+ return _firstMessageStarted . Task ;
105+ }
106+
107+ public async Task WaitForCompletedCountAsync ( int count , CancellationToken cancellationToken )
108+ {
109+ while ( Volatile . Read ( ref _completedCount ) < count )
110+ {
111+ cancellationToken . ThrowIfCancellationRequested ( ) ;
112+ await Task . Delay ( 10 , cancellationToken ) ;
113+ }
114+ }
115+
93116 public override async Task < bool > Handle ( WaitingMessage message )
94117 {
95118 await base . Handle ( message ) ;
96119 outputHelper . WriteLine ( $ "Running task { message . Order } which will wait for { message . TimeToWait . TotalMilliseconds } ms") ;
120+
121+ // Signal that the first message has started (this allows the test to proceed)
122+ _firstMessageStarted . TrySetResult ( ) ;
123+
97124 await Task . Delay ( message . TimeToWait ) ;
125+
126+ _completedMessages . Enqueue ( message ) ;
127+ Interlocked . Increment ( ref _completedCount ) ;
128+
98129 return true ;
99130 }
100131 }
0 commit comments