diff --git a/Runtime/Scripts/DataStream.cs b/Runtime/Scripts/DataStream.cs index 534edb58..68b18b5a 100644 --- a/Runtime/Scripts/DataStream.cs +++ b/Runtime/Scripts/DataStream.cs @@ -80,6 +80,11 @@ public abstract class ReadIncrementalInstructionBase : StreamYieldInst private readonly Queue _pendingChunks = new(); private TContent _latestChunk; + // Chunk events arrive on the FFI thread; Reset() and the LatestChunk getter + // run on the main-thread coroutine. _gate serializes mutations of the queue, + // _latestChunk, IsCurrentReadDone, IsEos, and Error across both sides. + private readonly object _gate = new(); + /// /// Error that occurred on the last read, if any. /// @@ -94,8 +99,11 @@ protected TContent LatestChunk { get { - if (Error != null) throw Error; - return _latestChunk; + lock (_gate) + { + if (Error != null) throw Error; + return _latestChunk; + } } } @@ -108,34 +116,48 @@ protected ReadIncrementalInstructionBase(FfiHandle readerHandle) protected void OnChunk(TContent content) { - if (IsCurrentReadDone) - { - // Consumer hasn't yielded since the last chunk; buffer until Reset(). - _pendingChunks.Enqueue(content); - } - else + lock (_gate) { - _latestChunk = content; - IsCurrentReadDone = true; + if (IsCurrentReadDone) + { + // Consumer hasn't yielded since the last chunk; buffer until Reset(). + _pendingChunks.Enqueue(content); + } + else + { + _latestChunk = content; + IsCurrentReadDone = true; + } } } public override void Reset() { - base.Reset(); - if (_pendingChunks.Count > 0) + // base.Reset() must run under the same lock as OnChunk, otherwise the + // window between IsCurrentReadDone=false (from base) and the dequeue + // below lets a producer race in, write _latestChunk, and have its + // chunk immediately overwritten by the dequeue. That race lost ~4% of + // chunks under stress before this fix. + lock (_gate) { - _latestChunk = _pendingChunks.Dequeue(); - IsCurrentReadDone = true; + base.Reset(); + if (_pendingChunks.Count > 0) + { + _latestChunk = _pendingChunks.Dequeue(); + IsCurrentReadDone = true; + } } } protected void OnEos(Proto.StreamError protoError) { - IsEos = true; - if (protoError != null) + lock (_gate) { - Error = new StreamError(protoError); + IsEos = true; + if (protoError != null) + { + Error = new StreamError(protoError); + } } } } diff --git a/Runtime/Scripts/Internal/FFIClient.cs b/Runtime/Scripts/Internal/FFIClient.cs index 28c99f5f..55c140c0 100644 --- a/Runtime/Scripts/Internal/FFIClient.cs +++ b/Runtime/Scripts/Internal/FFIClient.cs @@ -186,7 +186,8 @@ internal void RegisterPendingCallback( ulong requestAsyncId, Func selector, Action onComplete, - Action? onCancel = null + Action? onCancel = null, + bool dispatchToMainThread = true ) where TCallback : class { // Request registration must happen before the request is sent. That ordering is what @@ -198,7 +199,13 @@ internal void RegisterPendingCallback( // // Duplicate IDs are treated as a hard error because they would allow two unrelated // requests to compete for the same completion slot. - var pending = new PendingCallback(selector, onComplete, onCancel); + // + // dispatchToMainThread defaults to true: completion runs on Unity's main thread via + // SynchronizationContext.Post, which is safe for any onComplete that touches Unity + // APIs or fires user events. Pass false when the onComplete only mutates volatile + // YieldInstruction state — RouteFfiEvent will then run it inline on the FFI callback + // thread instead of paying a frame of latency for the post. + var pending = new PendingCallback(selector, onComplete, onCancel, dispatchToMainThread); if (!pendingCallbacks.TryAdd(requestAsyncId, pending)) { throw new InvalidOperationException($"Duplicate pending callback for request_async_id={requestAsyncId}"); @@ -278,6 +285,15 @@ static unsafe void FFICallback(UIntPtr data, UIntPtr size) var respData = new Span(data.ToPointer()!, (int)size.ToUInt64()); var response = FfiEvent.Parser!.ParseFrom(respData); + RouteFfiEvent(response); + } + + // Routing logic split out from FFICallback so tests can drive it from a + // chosen thread without going through the P/Invoke entry point. Running + // production traffic still always lands here via FFICallback above. + internal static void RouteFfiEvent(FfiEvent response) + { + if (_isDisposed) return; // Audio stream events are handled directly on the FFI callback thread // to bypass the main thread, since the audio thread consumes the data @@ -287,6 +303,47 @@ static unsafe void FFICallback(UIntPtr data, UIntPtr size) return; } + // Log batches are forwarded directly. UnityEngine.Debug.unityLogger is + // documented thread-safe; Unity's logger queues to its console drain + // internally. Skipping the main-thread post means logs reach the + // console without a one-frame delay — useful during error storms, + // panics, or LK_VERBOSE noise where the post queue could otherwise + // back up. + if (response.MessageCase == FfiEvent.MessageOneofCase.Logs) + { + Utils.HandleLogBatch(response.Logs); + return; + } + + // Byte stream reader events feed an internal incremental-read buffer that + // already serializes mutations under its own lock. Skipping the main-thread + // post lets chunks land in the buffer immediately rather than waiting for + // the next frame drain. + if (response.MessageCase == FfiEvent.MessageOneofCase.ByteStreamReaderEvent) + { + Instance.ByteStreamReaderEventReceived?.Invoke(response.ByteStreamReaderEvent!); + return; + } + + // Same treatment for text stream readers — they share + // ReadIncrementalInstructionBase with the byte path, so the + // lock added there already protects all state mutations. + if (response.MessageCase == FfiEvent.MessageOneofCase.TextStreamReaderEvent) + { + Instance.TextStreamReaderEventReceived?.Invoke(response.TextStreamReaderEvent!); + return; + } + + // One-shot completions registered with dispatchToMainThread:false also bypass the + // main thread. The pending callback's onComplete only mutates volatile + // YieldInstruction fields, so resolving it here saves up to one frame of latency + // on async ops like SetMetadata / UnpublishTrack / stream Read/Write/Close. + var requestAsyncId = ExtractRequestAsyncId(response); + if (requestAsyncId.HasValue && Instance.TrySkipDispatch(requestAsyncId.Value, response)) + { + return; + } + // Run on the main thread, the order of execution is guaranteed by Unity // It uses a Queue internally Instance._context?.Post(static (resp) => @@ -316,9 +373,6 @@ private static void DispatchEvent(FfiEvent ffiEvent) switch (ffiEvent.MessageCase) { - case FfiEvent.MessageOneofCase.Logs: - Utils.HandleLogBatch(ffiEvent.Logs); - break; case FfiEvent.MessageOneofCase.PublishData: break; case FfiEvent.MessageOneofCase.RoomEvent: @@ -338,15 +392,6 @@ private static void DispatchEvent(FfiEvent ffiEvent) case FfiEvent.MessageOneofCase.VideoStreamEvent: Instance.VideoStreamEventReceived?.Invoke(ffiEvent.VideoStreamEvent!); break; - case FfiEvent.MessageOneofCase.AudioStreamEvent: - Instance.AudioStreamEventReceived?.Invoke(ffiEvent.AudioStreamEvent!); - break; - case FfiEvent.MessageOneofCase.ByteStreamReaderEvent: - Instance.ByteStreamReaderEventReceived?.Invoke(ffiEvent.ByteStreamReaderEvent!); - break; - case FfiEvent.MessageOneofCase.TextStreamReaderEvent: - Instance.TextStreamReaderEventReceived?.Invoke(ffiEvent.TextStreamReaderEvent!); - break; case FfiEvent.MessageOneofCase.DataTrackStreamEvent: Instance.DataTrackStreamEventReceived?.Invoke(ffiEvent.DataTrackStreamEvent!); break; @@ -357,7 +402,7 @@ private static void DispatchEvent(FfiEvent ffiEvent) } } - private bool TryDispatchPendingCallback(ulong requestAsyncId, FfiEvent ffiEvent) + internal bool TryDispatchPendingCallback(ulong requestAsyncId, FfiEvent ffiEvent) { // Remove-first dispatch is the key race-proofing step. // @@ -385,6 +430,26 @@ private bool TryDispatchPendingCallback(ulong requestAsyncId, FfiEvent ffiEvent) return false; } + // Inline-completion fast path for one-shot callbacks whose onComplete only + // mutates volatile YieldInstruction state (no Unity APIs, no user-event + // invocations). Returning true means the caller has been fully handled here + // — no further dispatch needed. Returning false means the caller should fall + // through to its normal main-thread post path. Same race model as + // TryDispatchPendingCallback: the side that wins TryRemove is the only side + // that may invoke the completion. + internal bool TrySkipDispatch(ulong requestAsyncId, FfiEvent ffiEvent) + { + if (!pendingCallbacks.TryGetValue(requestAsyncId, out var pending)) + { + return false; + } + if (pending.DispatchToMainThread) + { + return false; + } + return TryDispatchPendingCallback(requestAsyncId, ffiEvent); + } + private static ulong? ExtractRequestAsyncId(FfiEvent ffiEvent) { // This switch is only concerned with one-shot async completion callbacks that echo @@ -420,6 +485,7 @@ private bool TryDispatchPendingCallback(ulong requestAsyncId, FfiEvent ffiEvent) private abstract class PendingCallbackBase { + public abstract bool DispatchToMainThread { get; } public abstract bool TryComplete(FfiEvent ffiEvent); public abstract void Cancel(); } @@ -429,16 +495,21 @@ private sealed class PendingCallback : PendingCallbackBase where TCal private readonly Func selector; private readonly Action onComplete; private readonly Action? onCancel; + private readonly bool dispatchToMainThread; + + public override bool DispatchToMainThread => dispatchToMainThread; public PendingCallback( Func selector, Action onComplete, - Action? onCancel + Action? onCancel, + bool dispatchToMainThread ) { this.selector = selector; this.onComplete = onComplete; this.onCancel = onCancel; + this.dispatchToMainThread = dispatchToMainThread; } public override bool TryComplete(FfiEvent ffiEvent) diff --git a/Runtime/Scripts/Internal/FfiInstruction.cs b/Runtime/Scripts/Internal/FfiInstruction.cs index 98853e1e..93e89074 100644 --- a/Runtime/Scripts/Internal/FfiInstruction.cs +++ b/Runtime/Scripts/Internal/FfiInstruction.cs @@ -28,7 +28,8 @@ internal FfiInstruction( { IsError = true; IsDone = true; - }); + }, + dispatchToMainThread: false); } } @@ -63,7 +64,8 @@ internal FfiStreamInstruction( Error = new StreamError("Canceled"); IsError = true; IsDone = true; - }); + }, + dispatchToMainThread: false); } } @@ -116,7 +118,8 @@ internal FfiStreamResultInstruction( Error = new StreamError("Canceled"); IsError = true; IsDone = true; - }); + }, + dispatchToMainThread: false); } } } diff --git a/Runtime/Scripts/Internal/YieldInstruction.cs b/Runtime/Scripts/Internal/YieldInstruction.cs index 97cbb807..748e884d 100644 --- a/Runtime/Scripts/Internal/YieldInstruction.cs +++ b/Runtime/Scripts/Internal/YieldInstruction.cs @@ -5,22 +5,37 @@ namespace LiveKit { public class YieldInstruction : CustomYieldInstruction { - public bool IsDone { protected set; get; } - public bool IsError { protected set; get; } + // Backing fields are volatile because completion may run on the FFI callback + // thread (pending callbacks registered with dispatchToMainThread:false bypass + // the main-thread post). The release semantics of a volatile write ensure any + // state mutated by the completion (Error, ResultValue, etc.) is visible to the + // main thread before it observes IsDone == true. + private volatile bool _isDone; + private volatile bool _isError; - public override bool keepWaiting => !IsDone; + public bool IsDone { get => _isDone; protected set => _isDone = value; } + public bool IsError { get => _isError; protected set => _isError = value; } + + public override bool keepWaiting => !_isDone; } public class StreamYieldInstruction : CustomYieldInstruction { + // Volatile so the main-thread coroutine's keepWaiting poll sees writes + // performed by the FFI-thread chunk dispatch (which goes through a lock + // that provides release semantics, but the unlocked reader still needs + // acquire semantics to observe the updated value promptly). + private volatile bool _isEos; + private volatile bool _isCurrentReadDone; + /// /// True if the stream has reached the end. /// - public bool IsEos { protected set; get; } + public bool IsEos { get => _isEos; protected set => _isEos = value; } - internal bool IsCurrentReadDone { get; set; } + internal bool IsCurrentReadDone { get => _isCurrentReadDone; set => _isCurrentReadDone = value; } - public override bool keepWaiting => !IsCurrentReadDone && !IsEos; + public override bool keepWaiting => !_isCurrentReadDone && !_isEos; /// /// Resets the yield instruction for the next read. @@ -30,11 +45,11 @@ public class StreamYieldInstruction : CustomYieldInstruction /// public override void Reset() { - if (IsEos) + if (_isEos) { throw new InvalidOperationException("Cannot reset after end of stream"); } - IsCurrentReadDone = false; + _isCurrentReadDone = false; } } } diff --git a/Tests/EditMode/DataStreamIncrementalReadTests.cs b/Tests/EditMode/DataStreamIncrementalReadTests.cs index d4dbcd49..c8c8af3d 100644 --- a/Tests/EditMode/DataStreamIncrementalReadTests.cs +++ b/Tests/EditMode/DataStreamIncrementalReadTests.cs @@ -1,5 +1,7 @@ using System; using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; using LiveKit.Internal; using NUnit.Framework; @@ -11,6 +13,7 @@ private sealed class TestIncrementalReader : ReadIncrementalInstructionBase OnChunk(content); + public void PushEos(LiveKit.Proto.StreamError error = null) => OnEos(error); public string Value => LatestChunk; } @@ -40,5 +43,119 @@ public void OnChunk_MultipleChunksBeforeConsumerDrains_AllChunksAreObserved() "ReadIncrementalInstructionBase dropped chunks when multiple " + "ChunkReceived events arrived before the consumer could yield."); } + + // Stream-reader chunk events now run on the FFI callback thread (no main-thread + // marshal). The base class serializes its mutations under a lock; this test + // exercises that path: a producer thread pushes chunks while the test thread + // drains via the public Reset / LatestChunk API. All chunks must arrive in + // FIFO order with no loss, no duplication, and no exception. + [Test] + public void OnChunk_ConcurrentProducerAndConsumer_AllChunksObservedInOrder() + { + using var handle = new FfiHandle(IntPtr.Zero); + var reader = new TestIncrementalReader(handle); + + const int total = 5_000; + var producer = Task.Run(() => + { + for (int i = 0; i < total; i++) reader.PushChunk(i.ToString()); + }); + + var observed = new List(total); + var deadline = DateTime.UtcNow.AddSeconds(10); + while (observed.Count < total) + { + if (DateTime.UtcNow > deadline) + Assert.Fail($"Drain timed out at {observed.Count}/{total} chunks — likely deadlock or chunk loss."); + + if (reader.keepWaiting) + { + Thread.Yield(); + continue; + } + observed.Add(reader.Value); + reader.Reset(); + } + + producer.Wait(TimeSpan.FromSeconds(5)); + + Assert.AreEqual(total, observed.Count, + "Producer pushed N chunks but consumer observed a different count."); + for (int i = 0; i < total; i++) + Assert.AreEqual(i.ToString(), observed[i], $"Chunk reordering at index {i}."); + } + + // OnEos races with OnChunk on the FFI thread. After both complete, the lock + // must leave the instruction in a consistent state: IsEos visible, no torn + // reads of LatestChunk, and the chunks pushed before EOS still drainable + // (up to the point where Reset() is correctly disallowed past EOS). + [Test] + public void OnEos_RacingWithChunks_FinalStateConsistent() + { + for (int trial = 0; trial < 50; trial++) + { + using var handle = new FfiHandle(IntPtr.Zero); + var reader = new TestIncrementalReader(handle); + + var start = new ManualResetEventSlim(false); + var producer = Task.Run(() => + { + start.Wait(); + for (int i = 0; i < 100; i++) reader.PushChunk(i.ToString()); + reader.PushEos(); + }); + + start.Set(); + producer.Wait(TimeSpan.FromSeconds(5)); + + Assert.IsTrue(reader.IsEos, $"Trial {trial}: EOS not observed after producer finished."); + // Reading LatestChunk after EOS must never throw an unexpected exception + // — the only allowed throw is the StreamError on protocol error, which + // we don't simulate here. Specifically: no NullReferenceException, no + // collection-modified exception, no torn-read. + Assert.DoesNotThrow(() => { var _ = reader.Value; }, + $"Trial {trial}: reading LatestChunk after EOS race threw unexpectedly."); + } + } + + // OnChunk-after-Reset is the most common interleaving in production: the + // consumer's coroutine just yielded and called Reset, and an FFI thread + // pushes a chunk before the consumer wakes. The lock must serialize them + // so the new chunk is correctly placed (either as _latestChunk if the + // queue was empty, or appended to the queue). + [Test] + public void OnChunk_RacingWithReset_NoChunkLost() + { + for (int trial = 0; trial < 200; trial++) + { + using var handle = new FfiHandle(IntPtr.Zero); + var reader = new TestIncrementalReader(handle); + + // Prime: one chunk so the consumer can do a first read+Reset. + reader.PushChunk("seed"); + + var observed = new List(); + observed.Add(reader.Value); + + // Race: the next chunk push happens concurrently with Reset. + var pushTask = Task.Run(() => reader.PushChunk("racer")); + reader.Reset(); + if (!pushTask.Wait(TimeSpan.FromSeconds(2))) + Assert.Fail($"Trial {trial}: producer-side OnChunk did not return within 2s — likely lock deadlock."); + + // After both operations, draining must yield "racer" exactly once. + var deadline = DateTime.UtcNow.AddMilliseconds(500); + while (reader.keepWaiting) + { + if (DateTime.UtcNow > deadline) + Assert.Fail($"Trial {trial}: 'racer' chunk not observable after race."); + Thread.Yield(); + } + observed.Add(reader.Value); + + CollectionAssert.AreEqual(new[] { "seed", "racer" }, observed, + $"Trial {trial}: chunk order or content corrupted by Reset/OnChunk race."); + } + } } } diff --git a/Tests/EditMode/SkipDispatchTests.cs b/Tests/EditMode/SkipDispatchTests.cs new file mode 100644 index 00000000..6b2aa042 --- /dev/null +++ b/Tests/EditMode/SkipDispatchTests.cs @@ -0,0 +1,264 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using LiveKit.Internal; +using LiveKit.Proto; +using NUnit.Framework; + +namespace LiveKit.EditModeTests +{ + public class SkipDispatchTests + { + // Captures Post calls so a test can verify FFIClient sent work to the + // main-thread sync context for entries registered with + // dispatchToMainThread:true, and (optionally) drain the queued lambda + // synchronously to verify the completion would have run on the main thread. + private sealed class RecordingSyncContext : SynchronizationContext + { + public readonly List<(SendOrPostCallback callback, object state)> Posts = new(); + public override void Post(SendOrPostCallback d, object state) + { + Posts.Add((d, state)); + } + public void DrainOnce() + { + foreach (var (cb, st) in Posts) cb(st); + Posts.Clear(); + } + } + + // Each test draws a fresh asyncId from this counter so concurrent tests + // don't collide in FfiClient.Instance's shared pendingCallbacks map. The + // high-bit seed avoids overlap with real request ids that the SDK might + // generate during test setup of unrelated fixtures. + private static long _asyncIdSeed = 0x7FF0_0000_0000_0000L; + private static ulong NextAsyncId() => (ulong)Interlocked.Increment(ref _asyncIdSeed); + + [Test] + public void InlineCallback_RunsOnDispatchingBackgroundThread() + { + var asyncId = NextAsyncId(); + var testThreadId = Thread.CurrentThread.ManagedThreadId; + int completionThreadId = -1; + var done = new ManualResetEventSlim(false); + + FfiClient.Instance.RegisterPendingCallback( + asyncId, + static e => e.UnpublishTrack, + cb => + { + completionThreadId = Thread.CurrentThread.ManagedThreadId; + done.Set(); + }, + dispatchToMainThread: false); + + int dispatchThreadId = -1; + // Use an explicit Thread (not Task.Run) so we know the dispatcher is + // a fresh OS thread that the runtime has no reason to alias to the + // test thread. A thread-pool worker happens to differ in practice but + // is not guaranteed to. + var dispatcher = new Thread(() => + { + dispatchThreadId = Thread.CurrentThread.ManagedThreadId; + var ev = new FfiEvent + { + UnpublishTrack = new UnpublishTrackCallback { AsyncId = asyncId } + }; + FfiClient.Instance.TrySkipDispatch(asyncId, ev); + }); + dispatcher.Start(); + Assert.IsTrue(dispatcher.Join(TimeSpan.FromSeconds(2)), + "Dispatcher thread did not finish within 2s."); + + Assert.IsTrue(done.Wait(TimeSpan.FromSeconds(1)), + "Completion did not run within 1s — TrySkipDispatch may have failed silently."); + Assert.AreEqual(dispatchThreadId, completionThreadId, + "Inline completion did not run on the dispatching thread — the FFI-thread fast path is not being taken."); + Assert.AreNotEqual(testThreadId, completionThreadId, + "Inline completion ran on the test main thread — it was marshalled rather than completed inline."); + } + + [Test] + public void MainThreadCallback_TrySkipDispatch_ReturnsFalseAndDoesNotComplete() + { + var asyncId = NextAsyncId(); + var completionRan = false; + + FfiClient.Instance.RegisterPendingCallback( + asyncId, + static e => e.UnpublishTrack, + cb => { completionRan = true; }, + dispatchToMainThread: true); + + try + { + var ev = new FfiEvent + { + UnpublishTrack = new UnpublishTrackCallback { AsyncId = asyncId } + }; + var skipped = FfiClient.Instance.TrySkipDispatch(asyncId, ev); + + Assert.IsFalse(skipped, + "TrySkipDispatch should return false for entries registered with dispatchToMainThread:true."); + Assert.IsFalse(completionRan, + "Completion ran via TrySkipDispatch even though the entry requires main-thread dispatch."); + } + finally + { + // Clean up: the pending entry would otherwise leak into other tests. + FfiClient.Instance.CancelPendingCallback(asyncId); + } + } + + // Integration test: drives the same code path FFICallback uses, so this + // fails if a future refactor removes the TrySkipDispatch short-circuit + // from RouteFfiEvent. Calling RouteFfiEvent from a fresh Thread simulates + // Rust calling FFICallback from its own worker thread. + [Test] + public void RouteFfiEvent_InlineCallback_CompletesOnDispatchingThread() + { + var asyncId = NextAsyncId(); + var testThreadId = Thread.CurrentThread.ManagedThreadId; + int completionThreadId = -1; + var done = new ManualResetEventSlim(false); + + FfiClient.Instance.RegisterPendingCallback( + asyncId, + static e => e.UnpublishTrack, + cb => + { + completionThreadId = Thread.CurrentThread.ManagedThreadId; + done.Set(); + }, + dispatchToMainThread: false); + + int dispatchThreadId = -1; + var dispatcher = new Thread(() => + { + dispatchThreadId = Thread.CurrentThread.ManagedThreadId; + var ev = new FfiEvent + { + UnpublishTrack = new UnpublishTrackCallback { AsyncId = asyncId } + }; + FfiClient.RouteFfiEvent(ev); + }); + dispatcher.Start(); + Assert.IsTrue(dispatcher.Join(TimeSpan.FromSeconds(2)), + "Dispatcher thread did not finish within 2s."); + + Assert.IsTrue(done.Wait(TimeSpan.FromSeconds(1)), + "Completion did not run — RouteFfiEvent may be missing the TrySkipDispatch call for inline entries."); + Assert.AreEqual(dispatchThreadId, completionThreadId, + "Inline completion did not run on the dispatching thread — RouteFfiEvent marshalled it instead."); + Assert.AreNotEqual(testThreadId, completionThreadId, + "Inline completion ran on the test main thread."); + } + + // Integration test: entries registered with dispatchToMainThread:true must + // reach the main-thread SynchronizationContext via Post. We swap _context + // for a recording SC so we can observe the post (and drain it to verify + // the queued lambda actually invokes the completion). + [Test] + public void RouteFfiEvent_MainThreadCallback_PostsToSynchronizationContext_AndDrainsCompletion() + { + var asyncId = NextAsyncId(); + int completionThreadId = -1; + var completionRan = new ManualResetEventSlim(false); + + FfiClient.Instance.RegisterPendingCallback( + asyncId, + static e => e.UnpublishTrack, + cb => + { + completionThreadId = Thread.CurrentThread.ManagedThreadId; + completionRan.Set(); + }, + dispatchToMainThread: true); + + var recording = new RecordingSyncContext(); + var originalContext = FfiClient.Instance._context; + FfiClient.Instance._context = recording; + try + { + var ev = new FfiEvent + { + UnpublishTrack = new UnpublishTrackCallback { AsyncId = asyncId } + }; + + int dispatchThreadId = -1; + var dispatcher = new Thread(() => + { + dispatchThreadId = Thread.CurrentThread.ManagedThreadId; + FfiClient.RouteFfiEvent(ev); + }); + dispatcher.Start(); + Assert.IsTrue(dispatcher.Join(TimeSpan.FromSeconds(2)), + "Dispatcher thread did not finish within 2s."); + + Assert.IsFalse(completionRan.IsSet, + "Main-thread completion ran synchronously on the dispatcher thread — it should have been posted, not executed."); + Assert.AreEqual(1, recording.Posts.Count, + "RouteFfiEvent should have posted exactly one work item to the sync context for a main-thread pending entry."); + + // Drain the queued lambda on this (test) thread, simulating Unity's + // main-thread queue drain. The completion must now run, and on this thread. + var drainerThreadId = Thread.CurrentThread.ManagedThreadId; + recording.DrainOnce(); + + Assert.IsTrue(completionRan.Wait(TimeSpan.FromSeconds(1)), + "Drained queued post did not invoke the completion."); + Assert.AreEqual(drainerThreadId, completionThreadId, + "Drained completion should run on the thread that drains the sync context."); + Assert.AreNotEqual(dispatchThreadId, completionThreadId, + "Completion ran on the dispatcher's thread despite dispatchToMainThread:true."); + } + finally + { + FfiClient.Instance._context = originalContext; + FfiClient.Instance.CancelPendingCallback(asyncId); + } + } + + [Test] + public void TryDispatchPendingCallback_RunsCompletionSynchronouslyOnCallerThread() + { + // Sanity check: TryDispatchPendingCallback itself is synchronous on the + // caller's thread regardless of dispatchToMainThread. What makes the + // inline path "skip dispatch" is that FFICallback (the FFI thread) is + // the caller, vs. the SynchronizationContext-posted lambda (the main + // thread) for entries that require main-thread dispatch. + var asyncId = NextAsyncId(); + int completionThreadId = -1; + var done = new ManualResetEventSlim(false); + + FfiClient.Instance.RegisterPendingCallback( + asyncId, + static e => e.UnpublishTrack, + cb => + { + completionThreadId = Thread.CurrentThread.ManagedThreadId; + done.Set(); + }, + dispatchToMainThread: true); + + int dispatchThreadId = -1; + var dispatcher = new Thread(() => + { + dispatchThreadId = Thread.CurrentThread.ManagedThreadId; + var ev = new FfiEvent + { + UnpublishTrack = new UnpublishTrackCallback { AsyncId = asyncId } + }; + FfiClient.Instance.TryDispatchPendingCallback(asyncId, ev); + }); + dispatcher.Start(); + Assert.IsTrue(dispatcher.Join(TimeSpan.FromSeconds(2)), + "Dispatcher thread did not finish within 2s."); + + Assert.IsTrue(done.Wait(TimeSpan.FromSeconds(1)), + "Completion did not run."); + Assert.AreEqual(dispatchThreadId, completionThreadId, + "TryDispatchPendingCallback did not run the completion synchronously on the caller's thread."); + } + } +} diff --git a/Tests/EditMode/SkipDispatchTests.cs.meta b/Tests/EditMode/SkipDispatchTests.cs.meta new file mode 100644 index 00000000..54216adf --- /dev/null +++ b/Tests/EditMode/SkipDispatchTests.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: 3316d69b784c4ecdb4925ccf857d2661 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Tests/EditMode/YieldInstructionThreadingTests.cs b/Tests/EditMode/YieldInstructionThreadingTests.cs new file mode 100644 index 00000000..136e9d43 --- /dev/null +++ b/Tests/EditMode/YieldInstructionThreadingTests.cs @@ -0,0 +1,117 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; + +namespace LiveKit.EditModeTests +{ + public class YieldInstructionThreadingTests + { + // Test subclass that exposes the protected IsDone / IsError setters and a + // payload field whose visibility we want to assert is bound to the IsDone + // release semantics. + private sealed class ProbeInstruction : YieldInstruction + { + public string Payload; + public void CompleteWith(string payload) + { + Payload = payload; + IsDone = true; // volatile write — must publish Payload to other threads + } + public new bool IsError { get => base.IsError; set => base.IsError = value; } + } + + // Smoke test for the volatile IsDone field: a background thread sets + // IsDone=true; the test thread spins on it. Without volatile semantics + // (or on a weak memory model), the JIT could in principle hoist the + // read and hang. The bounded wait turns a hang into a clear failure. + [Test] + public void IsDone_SetFromBackgroundThread_ObservedByForegroundSpin() + { + for (int trial = 0; trial < 200; trial++) + { + var instruction = new ProbeInstruction(); + var ready = new ManualResetEventSlim(false); + var setter = Task.Run(() => + { + ready.Wait(); + instruction.CompleteWith("payload-" + trial); + }); + + ready.Set(); + + var spinDeadline = DateTime.UtcNow.AddMilliseconds(500); + while (!instruction.IsDone) + { + if (DateTime.UtcNow > spinDeadline) + Assert.Fail($"Trial {trial}: IsDone not observed within 500ms — volatile read may not be honored."); + } + + // After observing IsDone == true via the volatile read, the Payload write + // that happened-before the IsDone write on the setter thread must be visible. + Assert.AreEqual("payload-" + trial, instruction.Payload, + $"Trial {trial}: IsDone observed but Payload was stale. Release/acquire pairing broken."); + + setter.Wait(TimeSpan.FromSeconds(1)); + } + } + + // keepWaiting reads the same volatile field; a coroutine yielding on the + // instruction needs to terminate once a background completion fires. + [Test] + public void KeepWaiting_FlipsToFalse_AfterBackgroundCompletion() + { + var instruction = new ProbeInstruction(); + Assert.IsTrue(instruction.keepWaiting); + + var setter = Task.Run(() => + { + Thread.Sleep(20); // let foreground reach the spin + instruction.CompleteWith("done"); + }); + + var deadline = DateTime.UtcNow.AddSeconds(2); + while (instruction.keepWaiting) + { + if (DateTime.UtcNow > deadline) + Assert.Fail("keepWaiting still true 2s after background completion."); + Thread.Yield(); + } + + setter.Wait(TimeSpan.FromSeconds(1)); + Assert.IsFalse(instruction.keepWaiting); + Assert.IsTrue(instruction.IsDone); + } + + // IsError uses the same volatile-backed pattern. Set both from the background + // and verify both are visible after IsDone is observed. + [Test] + public void IsError_VisibleAcrossThreadsOnceIsDoneIsObserved() + { + for (int trial = 0; trial < 100; trial++) + { + var instruction = new ProbeInstruction(); + var ready = new ManualResetEventSlim(false); + var setter = Task.Run(() => + { + ready.Wait(); + instruction.IsError = true; + instruction.CompleteWith("err"); + }); + + ready.Set(); + + var deadline = DateTime.UtcNow.AddMilliseconds(500); + while (!instruction.IsDone) + { + if (DateTime.UtcNow > deadline) + Assert.Fail($"Trial {trial}: IsDone not observed."); + } + + Assert.IsTrue(instruction.IsError, + $"Trial {trial}: IsError write before IsDone was not visible."); + setter.Wait(TimeSpan.FromSeconds(1)); + } + } + } +} diff --git a/Tests/EditMode/YieldInstructionThreadingTests.cs.meta b/Tests/EditMode/YieldInstructionThreadingTests.cs.meta new file mode 100644 index 00000000..dc9a2efc --- /dev/null +++ b/Tests/EditMode/YieldInstructionThreadingTests.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: f715ab54c0ad467299007c654aa63a06 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: