Skip to content

Commit efbb023

Browse files
committed
test: WIP
1 parent d9e1667 commit efbb023

File tree

5 files changed

+481
-9
lines changed

5 files changed

+481
-9
lines changed

dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe
5656
using Activity? activity = s_activitySource.StartActivity(ActivityNames.WorkflowRun);
5757
activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.RunId, this._stepRunner.RunId);
5858

59+
bool hadException = false;
60+
bool hadCancellation = false;
61+
5962
try
6063
{
6164
this.RunStatus = RunStatus.Running;
@@ -77,14 +80,21 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe
7780
}
7881
catch (OperationCanceledException)
7982
{
83+
hadCancellation = true;
8084
}
81-
catch (Exception ex) when (activity is not null)
85+
catch (Exception ex)
8286
{
83-
activity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() {
84-
{ Tags.ErrorType, ex.GetType().FullName },
85-
{ Tags.BuildErrorMessage, ex.Message },
86-
}));
87-
activity.CaptureException(ex);
87+
hadException = true;
88+
89+
if (activity != null)
90+
{
91+
activity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() {
92+
{ Tags.ErrorType, ex.GetType().FullName },
93+
{ Tags.BuildErrorMessage, ex.Message },
94+
}));
95+
activity.CaptureException(ex);
96+
}
97+
8898
throw;
8999
}
90100

@@ -136,7 +146,19 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe
136146
}
137147
finally
138148
{
139-
this.RunStatus = this._stepRunner.HasUnservicedRequests ? RunStatus.PendingRequests : RunStatus.Idle;
149+
if (hadException || hadCancellation || linkedSource.Token.IsCancellationRequested)
150+
{
151+
this.RunStatus = RunStatus.Ended;
152+
}
153+
else if (this._stepRunner.HasUnservicedRequests)
154+
{
155+
this.RunStatus = RunStatus.PendingRequests;
156+
}
157+
else
158+
{
159+
this.RunStatus = RunStatus.Idle;
160+
}
161+
140162
this._stepRunner.OutgoingEvents.EventRaised -= OnWorkflowEventAsync;
141163
}
142164

dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,11 +121,12 @@ private async Task RunLoopAsync(CancellationToken cancellationToken)
121121
}
122122
finally
123123
{
124+
// Mark as ended when run loop exits
125+
this._runStatus = RunStatus.Ended;
126+
124127
this._stepRunner.OutgoingEvents.EventRaised -= OnEventRaisedAsync;
125128
this._eventChannel.Writer.Complete();
126129

127-
// Mark as ended when run loop exits
128-
this._runStatus = RunStatus.Ended;
129130
activity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted));
130131
}
131132

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
3+
using System;
4+
using System.Threading.Tasks;
5+
using System.Threading.Tasks.Sources;
6+
7+
namespace Microsoft.Agents.AI.Workflows.UnitTests;
8+
9+
internal sealed class DelayValueTaskSource<T> : IValueTaskSource<T>
10+
{
11+
private readonly TestValueTaskSource<T> _innerSource = new();
12+
private readonly T _value;
13+
14+
public DelayValueTaskSource(T value)
15+
{
16+
this._value = value;
17+
}
18+
19+
public ValueTask ReleaseSucceededAsync() => this._innerSource.SetSucceededAsync(this._value);
20+
public ValueTask ReleaseFaultedAsync(Exception exception) => this._innerSource.SetFaultedAsync(exception);
21+
public ValueTask ReleaseCanceledAsync() => this._innerSource.SetCanceledAsync();
22+
23+
public T GetResult(short token) => this._innerSource.GetResult(token);
24+
25+
public ValueTaskSourceStatus GetStatus(short token) => this._innerSource.GetStatus(token);
26+
27+
public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
28+
=> this._innerSource.OnCompleted(continuation, state, token, flags);
29+
}
Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
3+
using System;
4+
using System.Diagnostics;
5+
using System.Runtime.CompilerServices;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using FluentAssertions;
9+
using Microsoft.Agents.AI.Workflows.Execution;
10+
11+
namespace Microsoft.Agents.AI.Workflows.UnitTests;
12+
13+
public static class RunStatusTests
14+
{
15+
internal sealed class TestStepRunner : ISuperStepRunner
16+
{
17+
public TestStepRunner([CallerMemberName] string? name = null)
18+
{
19+
Console.WriteLine($"Starting test {name}");
20+
}
21+
22+
public string RunId { get; } = Guid.NewGuid().ToString("N");
23+
24+
public string StartExecutorId { get; } = "start";
25+
26+
public bool HasUnservicedRequests { get; set; }
27+
public bool HasUnprocessedMessages { get; set; }
28+
29+
public ConcurrentEventSink OutgoingEvents { get; } = new();
30+
31+
public ValueTask<bool> EnqueueMessageAsync<T>(T message, CancellationToken cancellationToken = default)
32+
{
33+
this.HasUnprocessedMessages = true;
34+
return new(true);
35+
}
36+
37+
ValueTask<bool> ISuperStepRunner.EnqueueMessageUntypedAsync(object message, Type declaredType, CancellationToken cancellationToken)
38+
{
39+
this.HasUnprocessedMessages = true;
40+
return new(true);
41+
}
42+
43+
public async ValueTask EnqueueResponseAsync(ExternalResponse response, CancellationToken cancellationToken = default)
44+
{
45+
this.HasUnservicedRequests = false;
46+
await this.EnqueueMessageAsync(response, cancellationToken);
47+
}
48+
49+
public ValueTask<bool> IsValidInputTypeAsync<T>(CancellationToken cancellationToken = default) => new(true);
50+
51+
public ValueTask RequestEndRunAsync()
52+
{
53+
if (this._currentStepSource != null)
54+
{
55+
return this.CancelStepAsync();
56+
}
57+
58+
return new();
59+
}
60+
61+
private DelayValueTaskSource<bool>? _currentStepSource;
62+
private CancellationTokenRegistration? _registration;
63+
64+
ValueTask<bool> ISuperStepRunner.RunSuperStepAsync(CancellationToken cancellationToken)
65+
{
66+
Debug.Assert(Interlocked.CompareExchange(ref this._currentStepSource,
67+
value: new DelayValueTaskSource<bool>(this.HasUnprocessedMessages),
68+
null) is null);
69+
70+
this._registration = cancellationToken.Register(() => _ = this._currentStepSource == null
71+
? Task.CompletedTask
72+
: this._currentStepSource.ReleaseCanceledAsync().AsTask());
73+
this.HasUnprocessedMessages = false;
74+
75+
return new(this._currentStepSource, 0);
76+
}
77+
78+
private DelayValueTaskSource<bool> TakeCurrentStepSource()
79+
{
80+
DelayValueTaskSource<bool>? currentStepSource = Interlocked.Exchange(ref this._currentStepSource, null);
81+
Debug.Assert(currentStepSource is not null);
82+
this._registration?.Dispose();
83+
this._registration = null;
84+
85+
return currentStepSource;
86+
}
87+
88+
public ValueTask CompleteStepAsync() => this.TakeCurrentStepSource().ReleaseSucceededAsync();
89+
90+
public ValueTask CompleteStepWithPendingAsync()
91+
{
92+
this.HasUnservicedRequests = true;
93+
return this.CompleteStepAsync();
94+
}
95+
96+
public ValueTask CancelStepAsync() => this.TakeCurrentStepSource().ReleaseCanceledAsync();
97+
98+
public ValueTask FailStepAsync(Exception exception) => this.TakeCurrentStepSource().ReleaseFaultedAsync(exception);
99+
}
100+
101+
public enum EventStreamKind
102+
{
103+
OffThread,
104+
Lockstep
105+
}
106+
107+
private static IRunEventStream GetRunStreamForKind(EventStreamKind kind, ISuperStepRunner stepRunner)
108+
{
109+
IRunEventStream result;
110+
switch (kind)
111+
{
112+
case EventStreamKind.OffThread:
113+
result = new StreamingRunEventStream(stepRunner);
114+
break;
115+
case EventStreamKind.Lockstep:
116+
result = new LockstepRunEventStream(stepRunner);
117+
break;
118+
default:
119+
throw new NotSupportedException($"Unsupported RunStream kind: {kind}");
120+
}
121+
122+
result.Start();
123+
return result;
124+
}
125+
126+
[Theory]
127+
[InlineData(EventStreamKind.OffThread)]
128+
[InlineData(EventStreamKind.Lockstep)]
129+
public static async Task Test_RunStatus_NotStartedWhenStartingAsync(EventStreamKind mode)
130+
{
131+
TestStepRunner runner = new();
132+
IRunEventStream eventStream = GetRunStreamForKind(mode, runner);
133+
134+
RunStatus status = await eventStream.GetStatusAsync();
135+
status.Should().Be(RunStatus.NotStarted);
136+
}
137+
138+
[Theory]
139+
[InlineData(EventStreamKind.OffThread)]
140+
[InlineData(EventStreamKind.Lockstep)]
141+
public static async Task Test_RunStatus_RunningWhenInSuperstepAsync(EventStreamKind mode)
142+
{
143+
TestStepRunner runner = new();
144+
IRunEventStream eventStream = GetRunStreamForKind(mode, runner);
145+
146+
await runner.EnqueueMessageAsync(new object());
147+
eventStream.SignalInput();
148+
149+
_ = WatchStreamAsync();
150+
151+
RunStatus status = await eventStream.GetStatusAsync();
152+
status.Should().Be(RunStatus.Running);
153+
154+
await eventStream.DisposeAsync();
155+
156+
async Task WatchStreamAsync()
157+
{
158+
await foreach (var _ in eventStream.TakeEventStreamAsync(false)) { }
159+
}
160+
}
161+
162+
[Theory]
163+
[InlineData(EventStreamKind.OffThread)]
164+
[InlineData(EventStreamKind.Lockstep)]
165+
public static async Task Test_RunStatus_IdleWhenFinishedSuperstepsAsync(EventStreamKind mode)
166+
{
167+
TestStepRunner runner = new();
168+
IRunEventStream eventStream = GetRunStreamForKind(mode, runner);
169+
170+
await runner.EnqueueMessageAsync(new object());
171+
eventStream.SignalInput();
172+
173+
Task watchTask = WatchStreamAsync();
174+
await runner.CompleteStepAsync();
175+
await watchTask;
176+
177+
RunStatus status = await eventStream.GetStatusAsync();
178+
status.Should().Be(RunStatus.Idle);
179+
180+
await eventStream.DisposeAsync();
181+
182+
async Task WatchStreamAsync()
183+
{
184+
await foreach (var _ in eventStream.TakeEventStreamAsync(false)) { }
185+
}
186+
}
187+
188+
[Theory]
189+
[InlineData(EventStreamKind.OffThread)]
190+
[InlineData(EventStreamKind.Lockstep)]
191+
public static async Task Test_RunStatus_EndedWhenCancelledAsync(EventStreamKind mode)
192+
{
193+
TestStepRunner runner = new();
194+
IRunEventStream eventStream = GetRunStreamForKind(mode, runner);
195+
196+
await runner.EnqueueMessageAsync(new object());
197+
eventStream.SignalInput();
198+
199+
Task watchTask = WatchStreamAsync();
200+
await runner.CancelStepAsync();
201+
await watchTask;
202+
203+
RunStatus status = await eventStream.GetStatusAsync();
204+
status.Should().Be(RunStatus.Ended);
205+
206+
await eventStream.DisposeAsync();
207+
208+
async Task WatchStreamAsync()
209+
{
210+
await foreach (var _ in eventStream.TakeEventStreamAsync(false)) { }
211+
}
212+
}
213+
214+
[Theory]
215+
[InlineData(EventStreamKind.OffThread)]
216+
[InlineData(EventStreamKind.Lockstep)]
217+
public static async Task Test_RunStatus_ExceptionWhenFaultedAsync(EventStreamKind mode)
218+
{
219+
TestStepRunner runner = new();
220+
IRunEventStream eventStream = GetRunStreamForKind(mode, runner);
221+
222+
await runner.EnqueueMessageAsync(new object());
223+
eventStream.SignalInput();
224+
225+
Task watchTask = WatchStreamAsync();
226+
await runner.FailStepAsync(new InvalidOperationException());
227+
await watchTask;
228+
229+
RunStatus status = await eventStream.GetStatusAsync();
230+
status.Should().Be(RunStatus.Ended);
231+
232+
await eventStream.DisposeAsync();
233+
234+
async Task WatchStreamAsync()
235+
{
236+
await foreach (var _ in eventStream.TakeEventStreamAsync(false)) { }
237+
}
238+
}
239+
240+
//[Theory]
241+
//[InlineData(EventStreamKind.OffThread)]
242+
//[InlineData(EventStreamKind.Lockstep)]
243+
internal static async Task Test_RunStatus_PendingRequestsAsync(EventStreamKind mode)
244+
{
245+
TestStepRunner runner = new();
246+
IRunEventStream eventStream = GetRunStreamForKind(mode, runner);
247+
248+
// Act 1: Send the input object, and run the step to PendingRequest
249+
await runner.EnqueueMessageAsync(new object());
250+
eventStream.SignalInput();
251+
252+
Task watchTask = WatchStreamAsync();
253+
await runner.CompleteStepWithPendingAsync();
254+
await watchTask;
255+
256+
// Assert 1
257+
RunStatus status = await eventStream.GetStatusAsync();
258+
status.Should().Be(RunStatus.PendingRequests);
259+
260+
// Act 2: Send the response, check running state
261+
await runner.EnqueueResponseAsync(
262+
new ExternalResponse(
263+
new Checkpointing.RequestPortInfo(new(typeof(object)), new(typeof(object)), "_"),
264+
Guid.NewGuid().ToString("N"),
265+
new(new())));
266+
eventStream.SignalInput();
267+
268+
watchTask = WatchStreamAsync();
269+
270+
// Assert 2
271+
status = await eventStream.GetStatusAsync();
272+
status.Should().Be(RunStatus.Running);
273+
274+
// Act 3: Process the response, check state is idle
275+
await runner.CompleteStepAsync();
276+
await watchTask; status = await eventStream.GetStatusAsync();
277+
status.Should().Be(RunStatus.Running);
278+
279+
// Assert 3
280+
status = await eventStream.GetStatusAsync();
281+
status.Should().Be(RunStatus.Idle);
282+
283+
await eventStream.DisposeAsync();
284+
285+
async Task WatchStreamAsync()
286+
{
287+
await foreach (var _ in eventStream.TakeEventStreamAsync(false)) { }
288+
}
289+
}
290+
}

0 commit comments

Comments
 (0)