From 195913ac1cf8fc9b271b752253d8dd0c8d26fd1f Mon Sep 17 00:00:00 2001 From: Max Heimbrock <43608204+MaxHeimbrock@users.noreply.github.com> Date: Wed, 29 Apr 2026 20:04:46 +0200 Subject: [PATCH 01/15] Dispatch raw-safe one-shot completions on the FFI thread Adds an opt-in `rawSafe` flag to `RegisterPendingCallback`. When set, `FFICallback` resolves the pending callback directly on the FFI callback thread instead of marshaling through Unity's main-thread SynchronizationContext. Saves up to one frame of latency on async ops whose completion only mutates volatile YieldInstruction state. `YieldInstruction.IsDone`/`IsError` are converted to volatile-backed properties so the release semantics of the completion's volatile write make any preceding state mutations visible to the main-thread reader once it observes IsDone == true. Opted in: the three generic instruction classes (`FfiInstruction`, `FfiStreamInstruction`, `FfiStreamResultInstruction`) which only set IsDone/IsError/Error/ result and do not touch Unity APIs. This covers SetLocalName/Metadata/ Attributes, UnpublishTrack, all stream Read/Write/Close/WriteToFile. Not opted in: bespoke completion handlers (Connect, PublishTrack, GetStats, CaptureAudioFrame, PerformRpc, SendText, SendFile, TextStreamOpen, ByteStreamOpen, PublishDataTrack). Those create C# objects and/or fire user events and need a separate audit before they can move off the main thread. Co-Authored-By: Claude Opus 4.7 (1M context) --- Runtime/Scripts/Internal/FFIClient.cs | 45 ++++++++++++++++++-- Runtime/Scripts/Internal/FfiInstruction.cs | 3 +- Runtime/Scripts/Internal/YieldInstruction.cs | 14 ++++-- 3 files changed, 55 insertions(+), 7 deletions(-) diff --git a/Runtime/Scripts/Internal/FFIClient.cs b/Runtime/Scripts/Internal/FFIClient.cs index 28c99f5f..17fe1454 100644 --- a/Runtime/Scripts/Internal/FFIClient.cs +++ b/Runtime/Scripts/Internal/FFIClient.cs @@ -186,7 +186,8 @@ internal void RegisterPendingCallback( ulong requestAsyncId, Func selector, Action onComplete, - Action? onCancel = null + Action? onCancel = null, + bool rawSafe = false ) where TCallback : class { // Request registration must happen before the request is sent. That ordering is what @@ -198,7 +199,11 @@ internal void RegisterPendingCallback( // // Duplicate IDs are treated as a hard error because they would allow two unrelated // requests to compete for the same completion slot. - var pending = new PendingCallback(selector, onComplete, onCancel); + // + // rawSafe == true means the onComplete is safe to invoke directly on the FFI callback + // thread (no Unity APIs touched, only volatile state mutations). The dispatcher will + // then bypass the main-thread SynchronizationContext post. + var pending = new PendingCallback(selector, onComplete, onCancel, rawSafe); if (!pendingCallbacks.TryAdd(requestAsyncId, pending)) { throw new InvalidOperationException($"Duplicate pending callback for request_async_id={requestAsyncId}"); @@ -287,6 +292,16 @@ static unsafe void FFICallback(UIntPtr data, UIntPtr size) return; } + // Raw-safe one-shot completions also bypass the main thread. The pending + // callback's onComplete only mutates volatile YieldInstruction fields, so + // resolving it here saves up to one frame of latency on async ops like + // SetMetadata / UnpublishTrack / stream Read/Write/Close. + var requestAsyncId = ExtractRequestAsyncId(response); + if (requestAsyncId.HasValue && Instance.TryDispatchRawSafe(requestAsyncId.Value, response)) + { + return; + } + // Run on the main thread, the order of execution is guaranteed by Unity // It uses a Queue internally Instance._context?.Post(static (resp) => @@ -385,6 +400,24 @@ private bool TryDispatchPendingCallback(ulong requestAsyncId, FfiEvent ffiEvent) return false; } + // FFI-thread fast path for one-shot completions whose onComplete only mutates + // volatile YieldInstruction state (no Unity APIs, no user-event invocations). + // Same race model as TryDispatchPendingCallback: the side that wins TryRemove + // is the only side that may invoke completion. If the entry isn't raw-safe we + // return false and let the caller fall through to the main-thread post. + private bool TryDispatchRawSafe(ulong requestAsyncId, FfiEvent ffiEvent) + { + if (!pendingCallbacks.TryGetValue(requestAsyncId, out var pending)) + { + return false; + } + if (!pending.RawSafe) + { + return false; + } + return TryDispatchPendingCallback(requestAsyncId, ffiEvent); + } + private static ulong? ExtractRequestAsyncId(FfiEvent ffiEvent) { // This switch is only concerned with one-shot async completion callbacks that echo @@ -420,6 +453,7 @@ private bool TryDispatchPendingCallback(ulong requestAsyncId, FfiEvent ffiEvent) private abstract class PendingCallbackBase { + public abstract bool RawSafe { get; } public abstract bool TryComplete(FfiEvent ffiEvent); public abstract void Cancel(); } @@ -429,16 +463,21 @@ private sealed class PendingCallback : PendingCallbackBase where TCal private readonly Func selector; private readonly Action onComplete; private readonly Action? onCancel; + private readonly bool rawSafe; + + public override bool RawSafe => rawSafe; public PendingCallback( Func selector, Action onComplete, - Action? onCancel + Action? onCancel, + bool rawSafe ) { this.selector = selector; this.onComplete = onComplete; this.onCancel = onCancel; + this.rawSafe = rawSafe; } public override bool TryComplete(FfiEvent ffiEvent) diff --git a/Runtime/Scripts/Internal/FfiInstruction.cs b/Runtime/Scripts/Internal/FfiInstruction.cs index 98853e1e..17c0766d 100644 --- a/Runtime/Scripts/Internal/FfiInstruction.cs +++ b/Runtime/Scripts/Internal/FfiInstruction.cs @@ -28,7 +28,8 @@ internal FfiInstruction( { IsError = true; IsDone = true; - }); + }, + rawSafe: true); } } diff --git a/Runtime/Scripts/Internal/YieldInstruction.cs b/Runtime/Scripts/Internal/YieldInstruction.cs index 97cbb807..38273aca 100644 --- a/Runtime/Scripts/Internal/YieldInstruction.cs +++ b/Runtime/Scripts/Internal/YieldInstruction.cs @@ -5,10 +5,18 @@ namespace LiveKit { public class YieldInstruction : CustomYieldInstruction { - public bool IsDone { protected set; get; } - public bool IsError { protected set; get; } + // Backing fields are volatile because completion may run on the FFI callback + // thread (raw-safe pending callbacks bypass the main-thread post). The release + // semantics of a volatile write ensure any state mutated by the completion + // (Error, ResultValue, etc.) is visible to the main thread before it observes + // IsDone == true. + private volatile bool _isDone; + private volatile bool _isError; - public override bool keepWaiting => !IsDone; + public bool IsDone { get => _isDone; protected set => _isDone = value; } + public bool IsError { get => _isError; protected set => _isError = value; } + + public override bool keepWaiting => !_isDone; } public class StreamYieldInstruction : CustomYieldInstruction From cc2ed4f240a62ea54c9d2fa6709885de7967e9c7 Mon Sep 17 00:00:00 2001 From: Max Heimbrock <43608204+MaxHeimbrock@users.noreply.github.com> Date: Wed, 29 Apr 2026 20:06:27 +0200 Subject: [PATCH 02/15] Dispatch ByteStreamReaderEvent on the FFI thread Short-circuits ByteStreamReaderEvent in FFICallback so chunk-received events land in the incremental read buffer without waiting for the next main-thread frame drain. Mirrors the AudioStreamEvent fast path. The internal subscriber (ByteStreamReader.ReadIncrementalInstruction) forwards into ReadIncrementalInstructionBase, which now serializes mutations of the chunk queue, latest chunk, error, and the IsEos / IsCurrentReadDone flags under a single lock. The flag fields on StreamYieldInstruction are made volatile so the main-thread coroutine's keepWaiting poll observes the FFI thread's writes without acquiring the lock. Only ReadIncremental consumers benefit; ReadAll still goes through FfiStreamResultInstruction (already raw-safe via the previous commit). Co-Authored-By: Claude Opus 4.7 (1M context) --- Runtime/Scripts/DataStream.cs | 49 +++++++++++++------- Runtime/Scripts/Internal/FFIClient.cs | 10 ++++ Runtime/Scripts/Internal/YieldInstruction.cs | 17 +++++-- 3 files changed, 55 insertions(+), 21 deletions(-) diff --git a/Runtime/Scripts/DataStream.cs b/Runtime/Scripts/DataStream.cs index 534edb58..a4af3eb4 100644 --- a/Runtime/Scripts/DataStream.cs +++ b/Runtime/Scripts/DataStream.cs @@ -80,6 +80,11 @@ public abstract class ReadIncrementalInstructionBase : StreamYieldInst private readonly Queue _pendingChunks = new(); private TContent _latestChunk; + // Chunk events arrive on the FFI thread; Reset() and the LatestChunk getter + // run on the main-thread coroutine. _gate serializes mutations of the queue, + // _latestChunk, IsCurrentReadDone, IsEos, and Error across both sides. + private readonly object _gate = new(); + /// /// Error that occurred on the last read, if any. /// @@ -94,8 +99,11 @@ protected TContent LatestChunk { get { - if (Error != null) throw Error; - return _latestChunk; + lock (_gate) + { + if (Error != null) throw Error; + return _latestChunk; + } } } @@ -108,34 +116,43 @@ protected ReadIncrementalInstructionBase(FfiHandle readerHandle) protected void OnChunk(TContent content) { - if (IsCurrentReadDone) - { - // Consumer hasn't yielded since the last chunk; buffer until Reset(). - _pendingChunks.Enqueue(content); - } - else + lock (_gate) { - _latestChunk = content; - IsCurrentReadDone = true; + if (IsCurrentReadDone) + { + // Consumer hasn't yielded since the last chunk; buffer until Reset(). + _pendingChunks.Enqueue(content); + } + else + { + _latestChunk = content; + IsCurrentReadDone = true; + } } } public override void Reset() { base.Reset(); - if (_pendingChunks.Count > 0) + lock (_gate) { - _latestChunk = _pendingChunks.Dequeue(); - IsCurrentReadDone = true; + if (_pendingChunks.Count > 0) + { + _latestChunk = _pendingChunks.Dequeue(); + IsCurrentReadDone = true; + } } } protected void OnEos(Proto.StreamError protoError) { - IsEos = true; - if (protoError != null) + lock (_gate) { - Error = new StreamError(protoError); + IsEos = true; + if (protoError != null) + { + Error = new StreamError(protoError); + } } } } diff --git a/Runtime/Scripts/Internal/FFIClient.cs b/Runtime/Scripts/Internal/FFIClient.cs index 17fe1454..117a2a07 100644 --- a/Runtime/Scripts/Internal/FFIClient.cs +++ b/Runtime/Scripts/Internal/FFIClient.cs @@ -292,6 +292,16 @@ static unsafe void FFICallback(UIntPtr data, UIntPtr size) return; } + // Byte stream reader events feed an internal incremental-read buffer that + // already serializes mutations under its own lock. Skipping the main-thread + // post lets chunks land in the buffer immediately rather than waiting for + // the next frame drain. + if (response.MessageCase == FfiEvent.MessageOneofCase.ByteStreamReaderEvent) + { + Instance.ByteStreamReaderEventReceived?.Invoke(response.ByteStreamReaderEvent!); + return; + } + // Raw-safe one-shot completions also bypass the main thread. The pending // callback's onComplete only mutates volatile YieldInstruction fields, so // resolving it here saves up to one frame of latency on async ops like diff --git a/Runtime/Scripts/Internal/YieldInstruction.cs b/Runtime/Scripts/Internal/YieldInstruction.cs index 38273aca..6beacf59 100644 --- a/Runtime/Scripts/Internal/YieldInstruction.cs +++ b/Runtime/Scripts/Internal/YieldInstruction.cs @@ -21,14 +21,21 @@ public class YieldInstruction : CustomYieldInstruction public class StreamYieldInstruction : CustomYieldInstruction { + // Volatile so the main-thread coroutine's keepWaiting poll sees writes + // performed by the FFI-thread chunk dispatch (which goes through a lock + // that provides release semantics, but the unlocked reader still needs + // acquire semantics to observe the updated value promptly). + private volatile bool _isEos; + private volatile bool _isCurrentReadDone; + /// /// True if the stream has reached the end. /// - public bool IsEos { protected set; get; } + public bool IsEos { get => _isEos; protected set => _isEos = value; } - internal bool IsCurrentReadDone { get; set; } + internal bool IsCurrentReadDone { get => _isCurrentReadDone; set => _isCurrentReadDone = value; } - public override bool keepWaiting => !IsCurrentReadDone && !IsEos; + public override bool keepWaiting => !_isCurrentReadDone && !_isEos; /// /// Resets the yield instruction for the next read. @@ -38,11 +45,11 @@ public class StreamYieldInstruction : CustomYieldInstruction /// public override void Reset() { - if (IsEos) + if (_isEos) { throw new InvalidOperationException("Cannot reset after end of stream"); } - IsCurrentReadDone = false; + _isCurrentReadDone = false; } } } From 967d0b936312ab4a6634d91e6e78531231ceb8ee Mon Sep 17 00:00:00 2001 From: Max Heimbrock <43608204+MaxHeimbrock@users.noreply.github.com> Date: Wed, 29 Apr 2026 20:07:09 +0200 Subject: [PATCH 03/15] Dispatch TextStreamReaderEvent on the FFI thread MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors the byte-stream fast path. TextStreamReader.ReadIncrementalInstruction shares ReadIncrementalInstructionBase with the byte reader, so the lock added in the previous commit already covers text chunk mutations — only the FFICallback short-circuit needs to be added. Co-Authored-By: Claude Opus 4.7 (1M context) --- Runtime/Scripts/Internal/FFIClient.cs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/Runtime/Scripts/Internal/FFIClient.cs b/Runtime/Scripts/Internal/FFIClient.cs index 117a2a07..b6ed8961 100644 --- a/Runtime/Scripts/Internal/FFIClient.cs +++ b/Runtime/Scripts/Internal/FFIClient.cs @@ -302,6 +302,15 @@ static unsafe void FFICallback(UIntPtr data, UIntPtr size) return; } + // Same treatment for text stream readers — they share + // ReadIncrementalInstructionBase with the byte path, so the + // lock added there already protects all state mutations. + if (response.MessageCase == FfiEvent.MessageOneofCase.TextStreamReaderEvent) + { + Instance.TextStreamReaderEventReceived?.Invoke(response.TextStreamReaderEvent!); + return; + } + // Raw-safe one-shot completions also bypass the main thread. The pending // callback's onComplete only mutates volatile YieldInstruction fields, so // resolving it here saves up to one frame of latency on async ops like From f120dcc424a099595c853ff60f17cd0877f55c2a Mon Sep 17 00:00:00 2001 From: Max Heimbrock <43608204+MaxHeimbrock@users.noreply.github.com> Date: Thu, 30 Apr 2026 10:07:44 +0200 Subject: [PATCH 04/15] Add threading tests for raw-safe dispatch and chunk lock Three new EditMode tests in DataStreamIncrementalReadTests covering the ReadIncrementalInstructionBase lock added for raw chunk dispatch: - producer/consumer with 5000 chunks, asserts FIFO and no loss - OnEos racing with chunks, asserts consistent post-race state - Reset racing with OnChunk, asserts no chunk lost (200 trials) YieldInstructionThreadingTests is a new file with three tests that guard the volatile semantics of YieldInstruction.IsDone / IsError: - background thread sets IsDone, foreground spin observes it - keepWaiting flips after a background completion - IsError write before IsDone is visible after the volatile read Each test has a Timeout attribute so a regression that breaks the volatile read or the lock fails the runner instead of hanging it. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../DataStreamIncrementalReadTests.cs | 116 +++++++++++++++++ .../YieldInstructionThreadingTests.cs | 117 ++++++++++++++++++ .../YieldInstructionThreadingTests.cs.meta | 11 ++ 3 files changed, 244 insertions(+) create mode 100644 Tests/EditMode/YieldInstructionThreadingTests.cs create mode 100644 Tests/EditMode/YieldInstructionThreadingTests.cs.meta diff --git a/Tests/EditMode/DataStreamIncrementalReadTests.cs b/Tests/EditMode/DataStreamIncrementalReadTests.cs index d4dbcd49..980ee960 100644 --- a/Tests/EditMode/DataStreamIncrementalReadTests.cs +++ b/Tests/EditMode/DataStreamIncrementalReadTests.cs @@ -1,5 +1,7 @@ using System; using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; using LiveKit.Internal; using NUnit.Framework; @@ -11,6 +13,7 @@ private sealed class TestIncrementalReader : ReadIncrementalInstructionBase OnChunk(content); + public void PushEos(LiveKit.Proto.StreamError error = null) => OnEos(error); public string Value => LatestChunk; } @@ -40,5 +43,118 @@ public void OnChunk_MultipleChunksBeforeConsumerDrains_AllChunksAreObserved() "ReadIncrementalInstructionBase dropped chunks when multiple " + "ChunkReceived events arrived before the consumer could yield."); } + + // Stream-reader chunk events now run on the FFI callback thread (no main-thread + // marshal). The base class serializes its mutations under a lock; this test + // exercises that path: a producer thread pushes chunks while the test thread + // drains via the public Reset / LatestChunk API. All chunks must arrive in + // FIFO order with no loss, no duplication, and no exception. + [Test, Timeout(15_000)] + public void OnChunk_ConcurrentProducerAndConsumer_AllChunksObservedInOrder() + { + using var handle = new FfiHandle(IntPtr.Zero); + var reader = new TestIncrementalReader(handle); + + const int total = 5_000; + var producer = Task.Run(() => + { + for (int i = 0; i < total; i++) reader.PushChunk(i.ToString()); + }); + + var observed = new List(total); + var deadline = DateTime.UtcNow.AddSeconds(10); + while (observed.Count < total) + { + if (DateTime.UtcNow > deadline) + Assert.Fail($"Drain timed out at {observed.Count}/{total} chunks — likely deadlock or chunk loss."); + + if (reader.keepWaiting) + { + Thread.Yield(); + continue; + } + observed.Add(reader.Value); + reader.Reset(); + } + + producer.Wait(TimeSpan.FromSeconds(5)); + + Assert.AreEqual(total, observed.Count, + "Producer pushed N chunks but consumer observed a different count."); + for (int i = 0; i < total; i++) + Assert.AreEqual(i.ToString(), observed[i], $"Chunk reordering at index {i}."); + } + + // OnEos races with OnChunk on the FFI thread. After both complete, the lock + // must leave the instruction in a consistent state: IsEos visible, no torn + // reads of LatestChunk, and the chunks pushed before EOS still drainable + // (up to the point where Reset() is correctly disallowed past EOS). + [Test, Timeout(10_000)] + public void OnEos_RacingWithChunks_FinalStateConsistent() + { + for (int trial = 0; trial < 50; trial++) + { + using var handle = new FfiHandle(IntPtr.Zero); + var reader = new TestIncrementalReader(handle); + + var start = new ManualResetEventSlim(false); + var producer = Task.Run(() => + { + start.Wait(); + for (int i = 0; i < 100; i++) reader.PushChunk(i.ToString()); + reader.PushEos(); + }); + + start.Set(); + producer.Wait(TimeSpan.FromSeconds(5)); + + Assert.IsTrue(reader.IsEos, $"Trial {trial}: EOS not observed after producer finished."); + // Reading LatestChunk after EOS must never throw an unexpected exception + // — the only allowed throw is the StreamError on protocol error, which + // we don't simulate here. Specifically: no NullReferenceException, no + // collection-modified exception, no torn-read. + Assert.DoesNotThrow(() => { var _ = reader.Value; }, + $"Trial {trial}: reading LatestChunk after EOS race threw unexpectedly."); + } + } + + // OnChunk-after-Reset is the most common interleaving in production: the + // consumer's coroutine just yielded and called Reset, and an FFI thread + // pushes a chunk before the consumer wakes. The lock must serialize them + // so the new chunk is correctly placed (either as _latestChunk if the + // queue was empty, or appended to the queue). + [Test, Timeout(5_000)] + public void OnChunk_RacingWithReset_NoChunkLost() + { + for (int trial = 0; trial < 200; trial++) + { + using var handle = new FfiHandle(IntPtr.Zero); + var reader = new TestIncrementalReader(handle); + + // Prime: one chunk so the consumer can do a first read+Reset. + reader.PushChunk("seed"); + + var observed = new List(); + observed.Add(reader.Value); + + // Race: the next chunk push happens concurrently with Reset. + var pushTask = Task.Run(() => reader.PushChunk("racer")); + reader.Reset(); + pushTask.Wait(); + + // After both operations, draining must yield "racer" exactly once. + var deadline = DateTime.UtcNow.AddMilliseconds(500); + while (reader.keepWaiting) + { + if (DateTime.UtcNow > deadline) + Assert.Fail($"Trial {trial}: 'racer' chunk not observable after race."); + Thread.Yield(); + } + observed.Add(reader.Value); + + CollectionAssert.AreEqual(new[] { "seed", "racer" }, observed, + $"Trial {trial}: chunk order or content corrupted by Reset/OnChunk race."); + } + } } } diff --git a/Tests/EditMode/YieldInstructionThreadingTests.cs b/Tests/EditMode/YieldInstructionThreadingTests.cs new file mode 100644 index 00000000..d73110b8 --- /dev/null +++ b/Tests/EditMode/YieldInstructionThreadingTests.cs @@ -0,0 +1,117 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; + +namespace LiveKit.EditModeTests +{ + public class YieldInstructionThreadingTests + { + // Test subclass that exposes the protected IsDone / IsError setters and a + // payload field whose visibility we want to assert is bound to the IsDone + // release semantics. + private sealed class ProbeInstruction : YieldInstruction + { + public string Payload; + public void CompleteWith(string payload) + { + Payload = payload; + IsDone = true; // volatile write — must publish Payload to other threads + } + public new bool IsError { get => base.IsError; set => base.IsError = value; } + } + + // Smoke test for the volatile IsDone field: a background thread sets + // IsDone=true; the test thread spins on it. Without volatile semantics + // (or on a weak memory model), the JIT could in principle hoist the + // read and hang. The bounded wait turns a hang into a clear failure. + [Test, Timeout(5_000)] + public void IsDone_SetFromBackgroundThread_ObservedByForegroundSpin() + { + for (int trial = 0; trial < 200; trial++) + { + var instruction = new ProbeInstruction(); + var ready = new ManualResetEventSlim(false); + var setter = Task.Run(() => + { + ready.Wait(); + instruction.CompleteWith("payload-" + trial); + }); + + ready.Set(); + + var spinDeadline = DateTime.UtcNow.AddMilliseconds(500); + while (!instruction.IsDone) + { + if (DateTime.UtcNow > spinDeadline) + Assert.Fail($"Trial {trial}: IsDone not observed within 500ms — volatile read may not be honored."); + } + + // After observing IsDone == true via the volatile read, the Payload write + // that happened-before the IsDone write on the setter thread must be visible. + Assert.AreEqual("payload-" + trial, instruction.Payload, + $"Trial {trial}: IsDone observed but Payload was stale. Release/acquire pairing broken."); + + setter.Wait(TimeSpan.FromSeconds(1)); + } + } + + // keepWaiting reads the same volatile field; a coroutine yielding on the + // instruction needs to terminate once a background completion fires. + [Test, Timeout(5_000)] + public void KeepWaiting_FlipsToFalse_AfterBackgroundCompletion() + { + var instruction = new ProbeInstruction(); + Assert.IsTrue(instruction.keepWaiting); + + var setter = Task.Run(() => + { + Thread.Sleep(20); // let foreground reach the spin + instruction.CompleteWith("done"); + }); + + var deadline = DateTime.UtcNow.AddSeconds(2); + while (instruction.keepWaiting) + { + if (DateTime.UtcNow > deadline) + Assert.Fail("keepWaiting still true 2s after background completion."); + Thread.Yield(); + } + + setter.Wait(TimeSpan.FromSeconds(1)); + Assert.IsFalse(instruction.keepWaiting); + Assert.IsTrue(instruction.IsDone); + } + + // IsError uses the same volatile-backed pattern. Set both from the background + // and verify both are visible after IsDone is observed. + [Test, Timeout(5_000)] + public void IsError_VisibleAcrossThreadsOnceIsDoneIsObserved() + { + for (int trial = 0; trial < 100; trial++) + { + var instruction = new ProbeInstruction(); + var ready = new ManualResetEventSlim(false); + var setter = Task.Run(() => + { + ready.Wait(); + instruction.IsError = true; + instruction.CompleteWith("err"); + }); + + ready.Set(); + + var deadline = DateTime.UtcNow.AddMilliseconds(500); + while (!instruction.IsDone) + { + if (DateTime.UtcNow > deadline) + Assert.Fail($"Trial {trial}: IsDone not observed."); + } + + Assert.IsTrue(instruction.IsError, + $"Trial {trial}: IsError write before IsDone was not visible."); + setter.Wait(TimeSpan.FromSeconds(1)); + } + } + } +} diff --git a/Tests/EditMode/YieldInstructionThreadingTests.cs.meta b/Tests/EditMode/YieldInstructionThreadingTests.cs.meta new file mode 100644 index 00000000..dc9a2efc --- /dev/null +++ b/Tests/EditMode/YieldInstructionThreadingTests.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: f715ab54c0ad467299007c654aa63a06 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: From 33b08afecc88302828615f3fe66890a4d229765c Mon Sep 17 00:00:00 2001 From: Max Heimbrock <43608204+MaxHeimbrock@users.noreply.github.com> Date: Thu, 30 Apr 2026 10:10:49 +0200 Subject: [PATCH 05/15] Drop NUnit [Timeout] from threading tests The [Timeout] attribute does not behave reliably in our CI runner. Removed it from all six threading tests and replaced the only unbounded Task.Wait() with a Wait(TimeSpan) + Assert.Fail pair so a genuine deadlock still produces a clean test failure rather than hanging until the outer CI wall-clock kills the runner. The in-test DateTime.UtcNow deadlines on the spin loops already convert hang regressions into descriptive Assert.Fail messages, so the [Timeout] attribute was redundant safety on top of those. Co-Authored-By: Claude Opus 4.7 (1M context) --- Tests/EditMode/DataStreamIncrementalReadTests.cs | 9 +++++---- Tests/EditMode/YieldInstructionThreadingTests.cs | 6 +++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/Tests/EditMode/DataStreamIncrementalReadTests.cs b/Tests/EditMode/DataStreamIncrementalReadTests.cs index 980ee960..c8c8af3d 100644 --- a/Tests/EditMode/DataStreamIncrementalReadTests.cs +++ b/Tests/EditMode/DataStreamIncrementalReadTests.cs @@ -49,7 +49,7 @@ public void OnChunk_MultipleChunksBeforeConsumerDrains_AllChunksAreObserved() // exercises that path: a producer thread pushes chunks while the test thread // drains via the public Reset / LatestChunk API. All chunks must arrive in // FIFO order with no loss, no duplication, and no exception. - [Test, Timeout(15_000)] + [Test] public void OnChunk_ConcurrentProducerAndConsumer_AllChunksObservedInOrder() { using var handle = new FfiHandle(IntPtr.Zero); @@ -89,7 +89,7 @@ public void OnChunk_ConcurrentProducerAndConsumer_AllChunksObservedInOrder() // must leave the instruction in a consistent state: IsEos visible, no torn // reads of LatestChunk, and the chunks pushed before EOS still drainable // (up to the point where Reset() is correctly disallowed past EOS). - [Test, Timeout(10_000)] + [Test] public void OnEos_RacingWithChunks_FinalStateConsistent() { for (int trial = 0; trial < 50; trial++) @@ -123,7 +123,7 @@ public void OnEos_RacingWithChunks_FinalStateConsistent() // pushes a chunk before the consumer wakes. The lock must serialize them // so the new chunk is correctly placed (either as _latestChunk if the // queue was empty, or appended to the queue). - [Test, Timeout(5_000)] + [Test] public void OnChunk_RacingWithReset_NoChunkLost() { for (int trial = 0; trial < 200; trial++) @@ -140,7 +140,8 @@ public void OnChunk_RacingWithReset_NoChunkLost() // Race: the next chunk push happens concurrently with Reset. var pushTask = Task.Run(() => reader.PushChunk("racer")); reader.Reset(); - pushTask.Wait(); + if (!pushTask.Wait(TimeSpan.FromSeconds(2))) + Assert.Fail($"Trial {trial}: producer-side OnChunk did not return within 2s — likely lock deadlock."); // After both operations, draining must yield "racer" exactly once. var deadline = DateTime.UtcNow.AddMilliseconds(500); diff --git a/Tests/EditMode/YieldInstructionThreadingTests.cs b/Tests/EditMode/YieldInstructionThreadingTests.cs index d73110b8..136e9d43 100644 --- a/Tests/EditMode/YieldInstructionThreadingTests.cs +++ b/Tests/EditMode/YieldInstructionThreadingTests.cs @@ -25,7 +25,7 @@ public void CompleteWith(string payload) // IsDone=true; the test thread spins on it. Without volatile semantics // (or on a weak memory model), the JIT could in principle hoist the // read and hang. The bounded wait turns a hang into a clear failure. - [Test, Timeout(5_000)] + [Test] public void IsDone_SetFromBackgroundThread_ObservedByForegroundSpin() { for (int trial = 0; trial < 200; trial++) @@ -58,7 +58,7 @@ public void IsDone_SetFromBackgroundThread_ObservedByForegroundSpin() // keepWaiting reads the same volatile field; a coroutine yielding on the // instruction needs to terminate once a background completion fires. - [Test, Timeout(5_000)] + [Test] public void KeepWaiting_FlipsToFalse_AfterBackgroundCompletion() { var instruction = new ProbeInstruction(); @@ -85,7 +85,7 @@ public void KeepWaiting_FlipsToFalse_AfterBackgroundCompletion() // IsError uses the same volatile-backed pattern. Set both from the background // and verify both are visible after IsDone is observed. - [Test, Timeout(5_000)] + [Test] public void IsError_VisibleAcrossThreadsOnceIsDoneIsObserved() { for (int trial = 0; trial < 100; trial++) From 2d7e1380f699bec510905e0799a3d7798c8f9d5d Mon Sep 17 00:00:00 2001 From: Max Heimbrock <43608204+MaxHeimbrock@users.noreply.github.com> Date: Thu, 30 Apr 2026 10:18:19 +0200 Subject: [PATCH 06/15] Fix Reset/OnChunk race in ReadIncrementalInstructionBase MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit base.Reset() writes IsCurrentReadDone=false outside the chunk lock because StreamYieldInstruction owns the field but doesn't know about the lock the subclass added. That left a window where a producer's OnChunk could acquire the lock between base.Reset() and Reset()'s re-acquisition, write _latestChunk and set IsCurrentReadDone=true, and have its chunk immediately overwritten by Reset()'s subsequent queue dequeue. The producer's chunk was lost. Empirically the race fired ~4% of the time under stress (150-300 chunks lost out of 5000 across 5 iterations of OnChunk_ConcurrentProducerAndConsumer_AllChunksObservedInOrder). Fix: move base.Reset() inside the lock so the entire IsCurrentReadDone false-then-maybe-true sequence is atomic against OnChunk. After fix: 5/5 iterations × 6 tests = 30/30 passing via Scripts~/run_unity.sh test -m EditMode -n 5. Co-Authored-By: Claude Opus 4.7 (1M context) --- Runtime/Scripts/DataStream.cs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/Runtime/Scripts/DataStream.cs b/Runtime/Scripts/DataStream.cs index a4af3eb4..68b18b5a 100644 --- a/Runtime/Scripts/DataStream.cs +++ b/Runtime/Scripts/DataStream.cs @@ -133,9 +133,14 @@ protected void OnChunk(TContent content) public override void Reset() { - base.Reset(); + // base.Reset() must run under the same lock as OnChunk, otherwise the + // window between IsCurrentReadDone=false (from base) and the dequeue + // below lets a producer race in, write _latestChunk, and have its + // chunk immediately overwritten by the dequeue. That race lost ~4% of + // chunks under stress before this fix. lock (_gate) { + base.Reset(); if (_pendingChunks.Count > 0) { _latestChunk = _pendingChunks.Dequeue(); From 7d88dcc157abaea6f1d0fa68c685f3969dd2af6a Mon Sep 17 00:00:00 2001 From: Max Heimbrock <43608204+MaxHeimbrock@users.noreply.github.com> Date: Thu, 30 Apr 2026 10:52:52 +0200 Subject: [PATCH 07/15] Opt remaining FfiStream*Instruction classes into raw-safe dispatch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The earlier raw-safe commit only flipped FfiInstruction and missed the two sibling generic classes despite the commit message claiming all three were opted in. They follow the same pattern: only mutate volatile YieldInstruction state plus an Error / _result reference set strictly before IsDone, no Unity APIs touched. Adds rawSafe: true to: - FfiStreamInstruction — covers ByteStreamWriter.WriteInstruction / CloseInstruction and TextStreamWriter.WriteInstruction / CloseInstruction - FfiStreamResultInstruction — covers ByteStreamReader.ReadAllInstruction / WriteToFileInstruction and TextStreamReader.ReadAllInstruction Verified: 5/5 iterations × 6 threading tests = 30/30 passing via Scripts~/run_unity.sh test -m EditMode -n 5. Co-Authored-By: Claude Opus 4.7 (1M context) --- Runtime/Scripts/Internal/FfiInstruction.cs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/Runtime/Scripts/Internal/FfiInstruction.cs b/Runtime/Scripts/Internal/FfiInstruction.cs index 17c0766d..3516eff0 100644 --- a/Runtime/Scripts/Internal/FfiInstruction.cs +++ b/Runtime/Scripts/Internal/FfiInstruction.cs @@ -64,7 +64,8 @@ internal FfiStreamInstruction( Error = new StreamError("Canceled"); IsError = true; IsDone = true; - }); + }, + rawSafe: true); } } @@ -117,7 +118,8 @@ internal FfiStreamResultInstruction( Error = new StreamError("Canceled"); IsError = true; IsDone = true; - }); + }, + rawSafe: true); } } } From 43c43cb89c3441e5c2a0cb42ea74ae1f7824b75f Mon Sep 17 00:00:00 2001 From: Max Heimbrock <43608204+MaxHeimbrock@users.noreply.github.com> Date: Thu, 30 Apr 2026 11:02:32 +0200 Subject: [PATCH 08/15] Test that rawSafe dispatch actually moves off the main thread MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The existing threading tests verify cross-thread state visibility (volatile IsDone, lock around chunk mutations) but cannot detect a regression that silently routed raw-safe completions back through SynchronizationContext.Post — the volatile/lock work is correct in either case. Adds three EditMode tests that prove the FFI-thread fast path is actually taken: RawSafeTrueCallback_RunsOnDispatchingBackgroundThread Registers a rawSafe:true pending callback, dispatches via TryDispatchRawSafe from a freshly-created Thread, asserts the completion captured the dispatcher's ManagedThreadId — not the test main thread's. This is the load-bearing test. RawSafeFalseCallback_TryDispatchRawSafe_ReturnsFalseAndDoesNotComplete Negative control: TryDispatchRawSafe returns false for entries registered with rawSafe:false and does not invoke the completion. TryDispatchPendingCallback_RunsCompletionSynchronouslyOnCallerThread Sanity check that the underlying race-proof dispatch is synchronous on the caller's thread regardless of rawSafe — what makes the rawSafe path "raw" is that FFICallback (the FFI thread) is the caller, not the dispatch mechanism itself. Two FfiClient methods (TryDispatchRawSafe, TryDispatchPendingCallback) go from private to internal so the tests can invoke them directly. The race-semantics comments are unchanged. Verified: 3 tests x 5 iterations = 15/15 passing via Scripts~/run_unity.sh test -m EditMode -n 5 -f RawSafeDispatch. Co-Authored-By: Claude Opus 4.7 (1M context) --- Runtime/Scripts/Internal/FFIClient.cs | 4 +- Tests/EditMode/RawSafeDispatchTests.cs | 135 ++++++++++++++++++++ Tests/EditMode/RawSafeDispatchTests.cs.meta | 11 ++ 3 files changed, 148 insertions(+), 2 deletions(-) create mode 100644 Tests/EditMode/RawSafeDispatchTests.cs create mode 100644 Tests/EditMode/RawSafeDispatchTests.cs.meta diff --git a/Runtime/Scripts/Internal/FFIClient.cs b/Runtime/Scripts/Internal/FFIClient.cs index b6ed8961..1244b1e4 100644 --- a/Runtime/Scripts/Internal/FFIClient.cs +++ b/Runtime/Scripts/Internal/FFIClient.cs @@ -391,7 +391,7 @@ private static void DispatchEvent(FfiEvent ffiEvent) } } - private bool TryDispatchPendingCallback(ulong requestAsyncId, FfiEvent ffiEvent) + internal bool TryDispatchPendingCallback(ulong requestAsyncId, FfiEvent ffiEvent) { // Remove-first dispatch is the key race-proofing step. // @@ -424,7 +424,7 @@ private bool TryDispatchPendingCallback(ulong requestAsyncId, FfiEvent ffiEvent) // Same race model as TryDispatchPendingCallback: the side that wins TryRemove // is the only side that may invoke completion. If the entry isn't raw-safe we // return false and let the caller fall through to the main-thread post. - private bool TryDispatchRawSafe(ulong requestAsyncId, FfiEvent ffiEvent) + internal bool TryDispatchRawSafe(ulong requestAsyncId, FfiEvent ffiEvent) { if (!pendingCallbacks.TryGetValue(requestAsyncId, out var pending)) { diff --git a/Tests/EditMode/RawSafeDispatchTests.cs b/Tests/EditMode/RawSafeDispatchTests.cs new file mode 100644 index 00000000..8604a1ca --- /dev/null +++ b/Tests/EditMode/RawSafeDispatchTests.cs @@ -0,0 +1,135 @@ +using System; +using System.Threading; +using LiveKit.Internal; +using LiveKit.Proto; +using NUnit.Framework; + +namespace LiveKit.EditModeTests +{ + public class RawSafeDispatchTests + { + // Each test draws a fresh asyncId from this counter so concurrent tests + // don't collide in FfiClient.Instance's shared pendingCallbacks map. The + // high-bit seed avoids overlap with real request ids that the SDK might + // generate during test setup of unrelated fixtures. + private static long _asyncIdSeed = 0x7FF0_0000_0000_0000L; + private static ulong NextAsyncId() => (ulong)Interlocked.Increment(ref _asyncIdSeed); + + [Test] + public void RawSafeTrueCallback_RunsOnDispatchingBackgroundThread() + { + var asyncId = NextAsyncId(); + var testThreadId = Thread.CurrentThread.ManagedThreadId; + int completionThreadId = -1; + var done = new ManualResetEventSlim(false); + + FfiClient.Instance.RegisterPendingCallback( + asyncId, + static e => e.UnpublishTrack, + cb => + { + completionThreadId = Thread.CurrentThread.ManagedThreadId; + done.Set(); + }, + rawSafe: true); + + int dispatchThreadId = -1; + // Use an explicit Thread (not Task.Run) so we know the dispatcher is + // a fresh OS thread that the runtime has no reason to alias to the + // test thread. A thread-pool worker happens to differ in practice but + // is not guaranteed to. + var dispatcher = new Thread(() => + { + dispatchThreadId = Thread.CurrentThread.ManagedThreadId; + var ev = new FfiEvent + { + UnpublishTrack = new UnpublishTrackCallback { AsyncId = asyncId } + }; + FfiClient.Instance.TryDispatchRawSafe(asyncId, ev); + }); + dispatcher.Start(); + Assert.IsTrue(dispatcher.Join(TimeSpan.FromSeconds(2)), + "Dispatcher thread did not finish within 2s."); + + Assert.IsTrue(done.Wait(TimeSpan.FromSeconds(1)), + "Completion did not run within 1s — TryDispatchRawSafe may have failed silently."); + Assert.AreEqual(dispatchThreadId, completionThreadId, + "rawSafe completion did not run on the dispatching thread — the FFI-thread fast path is not being taken."); + Assert.AreNotEqual(testThreadId, completionThreadId, + "rawSafe completion ran on the test main thread — it was marshalled rather than dispatched raw."); + } + + [Test] + public void RawSafeFalseCallback_TryDispatchRawSafe_ReturnsFalseAndDoesNotComplete() + { + var asyncId = NextAsyncId(); + var completionRan = false; + + FfiClient.Instance.RegisterPendingCallback( + asyncId, + static e => e.UnpublishTrack, + cb => { completionRan = true; }, + rawSafe: false); + + try + { + var ev = new FfiEvent + { + UnpublishTrack = new UnpublishTrackCallback { AsyncId = asyncId } + }; + var dispatched = FfiClient.Instance.TryDispatchRawSafe(asyncId, ev); + + Assert.IsFalse(dispatched, + "TryDispatchRawSafe should return false for entries registered with rawSafe: false."); + Assert.IsFalse(completionRan, + "Completion ran via TryDispatchRawSafe even though rawSafe was false."); + } + finally + { + // Clean up: the pending entry would otherwise leak into other tests. + FfiClient.Instance.CancelPendingCallback(asyncId); + } + } + + [Test] + public void TryDispatchPendingCallback_RunsCompletionSynchronouslyOnCallerThread() + { + // Sanity check: TryDispatchPendingCallback itself is synchronous on the + // caller's thread regardless of rawSafe. What makes the rawSafe path + // "raw" is that FFICallback (the FFI thread) is the caller, vs. the + // SynchronizationContext-posted lambda (the main thread) for non-raw. + var asyncId = NextAsyncId(); + int completionThreadId = -1; + var done = new ManualResetEventSlim(false); + + FfiClient.Instance.RegisterPendingCallback( + asyncId, + static e => e.UnpublishTrack, + cb => + { + completionThreadId = Thread.CurrentThread.ManagedThreadId; + done.Set(); + }, + rawSafe: false); + + int dispatchThreadId = -1; + var dispatcher = new Thread(() => + { + dispatchThreadId = Thread.CurrentThread.ManagedThreadId; + var ev = new FfiEvent + { + UnpublishTrack = new UnpublishTrackCallback { AsyncId = asyncId } + }; + FfiClient.Instance.TryDispatchPendingCallback(asyncId, ev); + }); + dispatcher.Start(); + Assert.IsTrue(dispatcher.Join(TimeSpan.FromSeconds(2)), + "Dispatcher thread did not finish within 2s."); + + Assert.IsTrue(done.Wait(TimeSpan.FromSeconds(1)), + "Completion did not run."); + Assert.AreEqual(dispatchThreadId, completionThreadId, + "TryDispatchPendingCallback did not run the completion synchronously on the caller's thread."); + } + } +} diff --git a/Tests/EditMode/RawSafeDispatchTests.cs.meta b/Tests/EditMode/RawSafeDispatchTests.cs.meta new file mode 100644 index 00000000..54216adf --- /dev/null +++ b/Tests/EditMode/RawSafeDispatchTests.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: 3316d69b784c4ecdb4925ccf857d2661 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: From 48462cf2f513f6187b6092fb37fbd5fa0169c6f9 Mon Sep 17 00:00:00 2001 From: Max Heimbrock <43608204+MaxHeimbrock@users.noreply.github.com> Date: Thu, 30 Apr 2026 11:28:57 +0200 Subject: [PATCH 09/15] Test rawSafe wiring end-to-end via RouteFfiEvent extraction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The earlier RawSafeDispatchTests called TryDispatchRawSafe directly, which proves the inner method is synchronous on the caller's thread but does not prove FFICallback actually invokes it for raw-safe entries. Commenting out the if-block in FFICallback would not have made any of those tests fail. Refactor: extract everything in FFICallback after the byte parse into RouteFfiEvent(FfiEvent). FFICallback shrinks to disposed-check, parse, RouteFfiEvent. Production traffic still always lands in FFICallback; tests can now drive RouteFfiEvent directly from a chosen thread without P/Invoke pointer juggling. Two new integration tests: RouteFfiEvent_RawSafeTrue_CompletesOnDispatchingThread Calls RouteFfiEvent from a fresh Thread, asserts the rawSafe completion captured the dispatcher thread's id (not the test main thread). Verified to fail with a clear message when the TryDispatchRawSafe call is removed from RouteFfiEvent. RouteFfiEvent_RawSafeFalse_PostsToSynchronizationContext_AndDrainsCompletion Swaps FfiClient.Instance._context for a RecordingSynchronizationContext, dispatches a non-raw entry, asserts the completion was NOT run on the dispatcher thread, asserts exactly one item was posted to the SC, then drains the queue from the test thread and asserts the completion runs on the drainer thread. Verified: 5 tests x 5 iterations = 25/25 passing via Scripts~/run_unity.sh test -m EditMode -n 5 -f RawSafeDispatch. Negative control: with the rawSafe block in RouteFfiEvent commented out, the new integration test fails with: "Completion did not run — RouteFfiEvent may be missing the TryDispatchRawSafe call for rawSafe entries." Co-Authored-By: Claude Opus 4.7 (1M context) --- Runtime/Scripts/Internal/FFIClient.cs | 9 ++ Tests/EditMode/RawSafeDispatchTests.cs | 128 +++++++++++++++++++++++++ 2 files changed, 137 insertions(+) diff --git a/Runtime/Scripts/Internal/FFIClient.cs b/Runtime/Scripts/Internal/FFIClient.cs index 1244b1e4..e12729ae 100644 --- a/Runtime/Scripts/Internal/FFIClient.cs +++ b/Runtime/Scripts/Internal/FFIClient.cs @@ -283,6 +283,15 @@ static unsafe void FFICallback(UIntPtr data, UIntPtr size) var respData = new Span(data.ToPointer()!, (int)size.ToUInt64()); var response = FfiEvent.Parser!.ParseFrom(respData); + RouteFfiEvent(response); + } + + // Routing logic split out from FFICallback so tests can drive it from a + // chosen thread without going through the P/Invoke entry point. Running + // production traffic still always lands here via FFICallback above. + internal static void RouteFfiEvent(FfiEvent response) + { + if (_isDisposed) return; // Audio stream events are handled directly on the FFI callback thread // to bypass the main thread, since the audio thread consumes the data diff --git a/Tests/EditMode/RawSafeDispatchTests.cs b/Tests/EditMode/RawSafeDispatchTests.cs index 8604a1ca..10a00d34 100644 --- a/Tests/EditMode/RawSafeDispatchTests.cs +++ b/Tests/EditMode/RawSafeDispatchTests.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading; using LiveKit.Internal; using LiveKit.Proto; @@ -8,6 +9,24 @@ namespace LiveKit.EditModeTests { public class RawSafeDispatchTests { + // Captures Post calls so a test can verify FFIClient sent work to the + // main-thread sync context for non-raw entries, and (optionally) drain + // the queued lambda synchronously to verify the completion would have + // run on the main thread. + private sealed class RecordingSyncContext : SynchronizationContext + { + public readonly List<(SendOrPostCallback callback, object state)> Posts = new(); + public override void Post(SendOrPostCallback d, object state) + { + Posts.Add((d, state)); + } + public void DrainOnce() + { + foreach (var (cb, st) in Posts) cb(st); + Posts.Clear(); + } + } + // Each test draws a fresh asyncId from this counter so concurrent tests // don't collide in FfiClient.Instance's shared pendingCallbacks map. The // high-bit seed avoids overlap with real request ids that the SDK might @@ -91,6 +110,115 @@ public void RawSafeFalseCallback_TryDispatchRawSafe_ReturnsFalseAndDoesNotComple } } + // Integration test: drives the same code path FFICallback uses, so this + // fails if a future refactor removes the rawSafe short-circuit from + // RouteFfiEvent. Calling RouteFfiEvent from a fresh Thread simulates + // Rust calling FFICallback from its own worker thread. + [Test] + public void RouteFfiEvent_RawSafeTrue_CompletesOnDispatchingThread() + { + var asyncId = NextAsyncId(); + var testThreadId = Thread.CurrentThread.ManagedThreadId; + int completionThreadId = -1; + var done = new ManualResetEventSlim(false); + + FfiClient.Instance.RegisterPendingCallback( + asyncId, + static e => e.UnpublishTrack, + cb => + { + completionThreadId = Thread.CurrentThread.ManagedThreadId; + done.Set(); + }, + rawSafe: true); + + int dispatchThreadId = -1; + var dispatcher = new Thread(() => + { + dispatchThreadId = Thread.CurrentThread.ManagedThreadId; + var ev = new FfiEvent + { + UnpublishTrack = new UnpublishTrackCallback { AsyncId = asyncId } + }; + FfiClient.RouteFfiEvent(ev); + }); + dispatcher.Start(); + Assert.IsTrue(dispatcher.Join(TimeSpan.FromSeconds(2)), + "Dispatcher thread did not finish within 2s."); + + Assert.IsTrue(done.Wait(TimeSpan.FromSeconds(1)), + "Completion did not run — RouteFfiEvent may be missing the TryDispatchRawSafe call for rawSafe entries."); + Assert.AreEqual(dispatchThreadId, completionThreadId, + "rawSafe completion did not run on the dispatching thread — RouteFfiEvent marshalled it instead."); + Assert.AreNotEqual(testThreadId, completionThreadId, + "rawSafe completion ran on the test main thread."); + } + + // Integration test: non-raw entries must reach the main-thread + // SynchronizationContext via Post. We swap _context for a recording + // SC so we can observe the post (and drain it to verify the queued + // lambda actually invokes the completion). + [Test] + public void RouteFfiEvent_RawSafeFalse_PostsToSynchronizationContext_AndDrainsCompletion() + { + var asyncId = NextAsyncId(); + int completionThreadId = -1; + var completionRan = new ManualResetEventSlim(false); + + FfiClient.Instance.RegisterPendingCallback( + asyncId, + static e => e.UnpublishTrack, + cb => + { + completionThreadId = Thread.CurrentThread.ManagedThreadId; + completionRan.Set(); + }, + rawSafe: false); + + var recording = new RecordingSyncContext(); + var originalContext = FfiClient.Instance._context; + FfiClient.Instance._context = recording; + try + { + var ev = new FfiEvent + { + UnpublishTrack = new UnpublishTrackCallback { AsyncId = asyncId } + }; + + int dispatchThreadId = -1; + var dispatcher = new Thread(() => + { + dispatchThreadId = Thread.CurrentThread.ManagedThreadId; + FfiClient.RouteFfiEvent(ev); + }); + dispatcher.Start(); + Assert.IsTrue(dispatcher.Join(TimeSpan.FromSeconds(2)), + "Dispatcher thread did not finish within 2s."); + + Assert.IsFalse(completionRan.IsSet, + "Non-raw completion ran synchronously on the dispatcher thread — it should have been posted, not executed."); + Assert.AreEqual(1, recording.Posts.Count, + "RouteFfiEvent should have posted exactly one work item to the sync context for a non-raw pending entry."); + + // Drain the queued lambda on this (test) thread, simulating Unity's + // main-thread queue drain. The completion must now run, and on this thread. + var drainerThreadId = Thread.CurrentThread.ManagedThreadId; + recording.DrainOnce(); + + Assert.IsTrue(completionRan.Wait(TimeSpan.FromSeconds(1)), + "Drained queued post did not invoke the completion."); + Assert.AreEqual(drainerThreadId, completionThreadId, + "Drained completion should run on the thread that drains the sync context."); + Assert.AreNotEqual(dispatchThreadId, completionThreadId, + "Completion ran on the dispatcher's thread despite rawSafe being false."); + } + finally + { + FfiClient.Instance._context = originalContext; + FfiClient.Instance.CancelPendingCallback(asyncId); + } + } + [Test] public void TryDispatchPendingCallback_RunsCompletionSynchronouslyOnCallerThread() { From f9612e108f7335564006387a70bf96db7739211c Mon Sep 17 00:00:00 2001 From: Max Heimbrock <43608204+MaxHeimbrock@users.noreply.github.com> Date: Thu, 30 Apr 2026 11:33:26 +0200 Subject: [PATCH 10/15] Remove unreachable switch cases for fast-path events in DispatchEvent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit AudioStreamEvent, ByteStreamReaderEvent, and TextStreamReaderEvent all early-return inside RouteFfiEvent before reaching the SynchronizationContext.Post that drains into DispatchEvent. The switch arms for those three events were therefore unreachable — DispatchEvent is private and only the post lambda calls it. Replaced with a single comment pointing readers at the fast-path returns. AudioStreamEvent's case had been dead since the audio fast-path was added (predates this branch); the other two became dead in cc2ed4f / 967d0b9. Co-Authored-By: Claude Opus 4.7 (1M context) --- Runtime/Scripts/Internal/FFIClient.cs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/Runtime/Scripts/Internal/FFIClient.cs b/Runtime/Scripts/Internal/FFIClient.cs index e12729ae..c16fb12c 100644 --- a/Runtime/Scripts/Internal/FFIClient.cs +++ b/Runtime/Scripts/Internal/FFIClient.cs @@ -381,15 +381,9 @@ private static void DispatchEvent(FfiEvent ffiEvent) case FfiEvent.MessageOneofCase.VideoStreamEvent: Instance.VideoStreamEventReceived?.Invoke(ffiEvent.VideoStreamEvent!); break; - case FfiEvent.MessageOneofCase.AudioStreamEvent: - Instance.AudioStreamEventReceived?.Invoke(ffiEvent.AudioStreamEvent!); - break; - case FfiEvent.MessageOneofCase.ByteStreamReaderEvent: - Instance.ByteStreamReaderEventReceived?.Invoke(ffiEvent.ByteStreamReaderEvent!); - break; - case FfiEvent.MessageOneofCase.TextStreamReaderEvent: - Instance.TextStreamReaderEventReceived?.Invoke(ffiEvent.TextStreamReaderEvent!); - break; + // AudioStreamEvent, ByteStreamReaderEvent, and TextStreamReaderEvent are + // dispatched directly on the FFI callback thread by RouteFfiEvent and + // never reach this switch — see the fast-path early-returns there. case FfiEvent.MessageOneofCase.DataTrackStreamEvent: Instance.DataTrackStreamEventReceived?.Invoke(ffiEvent.DataTrackStreamEvent!); break; From 9f63aa97063cf3dd408debcc8e5d26269157a779 Mon Sep 17 00:00:00 2001 From: Max Heimbrock <43608204+MaxHeimbrock@users.noreply.github.com> Date: Thu, 30 Apr 2026 11:39:37 +0200 Subject: [PATCH 11/15] Dispatch Logs on the FFI callback thread MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds Logs to RouteFfiEvent's fast-path early-returns alongside the audio and stream-reader events. UnityEngine.Debug.unityLogger is one of the few Unity APIs documented thread-safe — it queues to the console drain internally — so calling Utils.HandleLogBatch from the FFI thread is safe. Skipping the main-thread post matters most during error storms, panics, and LK_VERBOSE noise where Rust may emit log batches faster than Unity drains its post queue. Logs now reach the console without a one-frame delay even if the main thread is busy or wedged. There are no public subscribers for the Logs event, so this has no effect on user-facing surfaces. The unreachable Logs case in DispatchEvent's switch is removed (now covered by the comment that already documents the other fast-path events). Verified: 11 tests x 5 iterations = 55/55 passing via Scripts~/run_unity.sh test -m EditMode -n 5. Co-Authored-By: Claude Opus 4.7 (1M context) --- Runtime/Scripts/Internal/FFIClient.cs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/Runtime/Scripts/Internal/FFIClient.cs b/Runtime/Scripts/Internal/FFIClient.cs index c16fb12c..028a8a20 100644 --- a/Runtime/Scripts/Internal/FFIClient.cs +++ b/Runtime/Scripts/Internal/FFIClient.cs @@ -301,6 +301,18 @@ internal static void RouteFfiEvent(FfiEvent response) return; } + // Log batches are forwarded directly. UnityEngine.Debug.unityLogger is + // documented thread-safe; Unity's logger queues to its console drain + // internally. Skipping the main-thread post means logs reach the + // console without a one-frame delay — useful during error storms, + // panics, or LK_VERBOSE noise where the post queue could otherwise + // back up. + if (response.MessageCase == FfiEvent.MessageOneofCase.Logs) + { + Utils.HandleLogBatch(response.Logs); + return; + } + // Byte stream reader events feed an internal incremental-read buffer that // already serializes mutations under its own lock. Skipping the main-thread // post lets chunks land in the buffer immediately rather than waiting for @@ -359,9 +371,6 @@ private static void DispatchEvent(FfiEvent ffiEvent) switch (ffiEvent.MessageCase) { - case FfiEvent.MessageOneofCase.Logs: - Utils.HandleLogBatch(ffiEvent.Logs); - break; case FfiEvent.MessageOneofCase.PublishData: break; case FfiEvent.MessageOneofCase.RoomEvent: @@ -381,8 +390,8 @@ private static void DispatchEvent(FfiEvent ffiEvent) case FfiEvent.MessageOneofCase.VideoStreamEvent: Instance.VideoStreamEventReceived?.Invoke(ffiEvent.VideoStreamEvent!); break; - // AudioStreamEvent, ByteStreamReaderEvent, and TextStreamReaderEvent are - // dispatched directly on the FFI callback thread by RouteFfiEvent and + // Logs, AudioStreamEvent, ByteStreamReaderEvent, and TextStreamReaderEvent + // are dispatched directly on the FFI callback thread by RouteFfiEvent and // never reach this switch — see the fast-path early-returns there. case FfiEvent.MessageOneofCase.DataTrackStreamEvent: Instance.DataTrackStreamEventReceived?.Invoke(ffiEvent.DataTrackStreamEvent!); From 26782009af3ed99023381221f665d8248f3ab2e9 Mon Sep 17 00:00:00 2001 From: Max Heimbrock <43608204+MaxHeimbrock@users.noreply.github.com> Date: Thu, 30 Apr 2026 12:48:27 +0200 Subject: [PATCH 12/15] Rename rawSafe to dispatchToMainThread / TrySkipDispatch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The rawSafe flag described a capability ("safe to run on the FFI thread") but its name was opaque, and TryDispatchRawSafe was a misnomer — the method doesn't dispatch anything; it runs the completion inline on the caller's thread. Rename for clarity: rawSafe: false (default) → dispatchToMainThread: true (default) rawSafe: true → dispatchToMainThread: false TryDispatchRawSafe → TrySkipDispatch PendingCallbackBase.RawSafe → DispatchToMainThread The default flips polarity but the meaning is the same: by default, completions go through SynchronizationContext.Post (safe for any handler that touches Unity APIs); the four generic FfiInstruction* classes opt out by passing dispatchToMainThread:false because their onComplete only mutates volatile state. TrySkipDispatch reads correctly at the call site: if (Instance.TrySkipDispatch(...)) return; // fall through to dispatch Instance._context?.Post(...); — "try to skip the dispatch; if we can't, dispatch normally." TryDispatchPendingCallback keeps its name: "dispatch" there means function-dispatch (route the event to its registered handler), a different sense of the word and at a different abstraction level. Test file renamed RawSafeDispatchTests.cs → SkipDispatchTests.cs (GUID preserved so Unity sees the rename, not delete+add). Test class and method names updated to match the new vocabulary. Verified: 11 tests x 5 iterations = 55/55 passing via Scripts~/run_unity.sh test -m EditMode -n 5. Co-Authored-By: Claude Opus 4.7 (1M context) --- Runtime/Scripts/Internal/FFIClient.cs | 48 ++++++------ Runtime/Scripts/Internal/FfiInstruction.cs | 6 +- Runtime/Scripts/Internal/YieldInstruction.cs | 8 +- ...eDispatchTests.cs => SkipDispatchTests.cs} | 73 ++++++++++--------- ...ests.cs.meta => SkipDispatchTests.cs.meta} | 0 5 files changed, 70 insertions(+), 65 deletions(-) rename Tests/EditMode/{RawSafeDispatchTests.cs => SkipDispatchTests.cs} (76%) rename Tests/EditMode/{RawSafeDispatchTests.cs.meta => SkipDispatchTests.cs.meta} (100%) diff --git a/Runtime/Scripts/Internal/FFIClient.cs b/Runtime/Scripts/Internal/FFIClient.cs index 028a8a20..ad2fd42d 100644 --- a/Runtime/Scripts/Internal/FFIClient.cs +++ b/Runtime/Scripts/Internal/FFIClient.cs @@ -187,7 +187,7 @@ internal void RegisterPendingCallback( Func selector, Action onComplete, Action? onCancel = null, - bool rawSafe = false + bool dispatchToMainThread = true ) where TCallback : class { // Request registration must happen before the request is sent. That ordering is what @@ -200,10 +200,12 @@ internal void RegisterPendingCallback( // Duplicate IDs are treated as a hard error because they would allow two unrelated // requests to compete for the same completion slot. // - // rawSafe == true means the onComplete is safe to invoke directly on the FFI callback - // thread (no Unity APIs touched, only volatile state mutations). The dispatcher will - // then bypass the main-thread SynchronizationContext post. - var pending = new PendingCallback(selector, onComplete, onCancel, rawSafe); + // dispatchToMainThread defaults to true: completion runs on Unity's main thread via + // SynchronizationContext.Post, which is safe for any onComplete that touches Unity + // APIs or fires user events. Pass false when the onComplete only mutates volatile + // YieldInstruction state — RouteFfiEvent will then run it inline on the FFI callback + // thread instead of paying a frame of latency for the post. + var pending = new PendingCallback(selector, onComplete, onCancel, dispatchToMainThread); if (!pendingCallbacks.TryAdd(requestAsyncId, pending)) { throw new InvalidOperationException($"Duplicate pending callback for request_async_id={requestAsyncId}"); @@ -332,12 +334,12 @@ internal static void RouteFfiEvent(FfiEvent response) return; } - // Raw-safe one-shot completions also bypass the main thread. The pending - // callback's onComplete only mutates volatile YieldInstruction fields, so - // resolving it here saves up to one frame of latency on async ops like - // SetMetadata / UnpublishTrack / stream Read/Write/Close. + // One-shot completions registered with dispatchToMainThread:false also bypass the + // main thread. The pending callback's onComplete only mutates volatile + // YieldInstruction fields, so resolving it here saves up to one frame of latency + // on async ops like SetMetadata / UnpublishTrack / stream Read/Write/Close. var requestAsyncId = ExtractRequestAsyncId(response); - if (requestAsyncId.HasValue && Instance.TryDispatchRawSafe(requestAsyncId.Value, response)) + if (requestAsyncId.HasValue && Instance.TrySkipDispatch(requestAsyncId.Value, response)) { return; } @@ -431,18 +433,20 @@ internal bool TryDispatchPendingCallback(ulong requestAsyncId, FfiEvent ffiEvent return false; } - // FFI-thread fast path for one-shot completions whose onComplete only mutates - // volatile YieldInstruction state (no Unity APIs, no user-event invocations). - // Same race model as TryDispatchPendingCallback: the side that wins TryRemove - // is the only side that may invoke completion. If the entry isn't raw-safe we - // return false and let the caller fall through to the main-thread post. - internal bool TryDispatchRawSafe(ulong requestAsyncId, FfiEvent ffiEvent) + // Inline-completion fast path for one-shot callbacks whose onComplete only + // mutates volatile YieldInstruction state (no Unity APIs, no user-event + // invocations). Returning true means the caller has been fully handled here + // — no further dispatch needed. Returning false means the caller should fall + // through to its normal main-thread post path. Same race model as + // TryDispatchPendingCallback: the side that wins TryRemove is the only side + // that may invoke the completion. + internal bool TrySkipDispatch(ulong requestAsyncId, FfiEvent ffiEvent) { if (!pendingCallbacks.TryGetValue(requestAsyncId, out var pending)) { return false; } - if (!pending.RawSafe) + if (pending.DispatchToMainThread) { return false; } @@ -484,7 +488,7 @@ internal bool TryDispatchRawSafe(ulong requestAsyncId, FfiEvent ffiEvent) private abstract class PendingCallbackBase { - public abstract bool RawSafe { get; } + public abstract bool DispatchToMainThread { get; } public abstract bool TryComplete(FfiEvent ffiEvent); public abstract void Cancel(); } @@ -494,21 +498,21 @@ private sealed class PendingCallback : PendingCallbackBase where TCal private readonly Func selector; private readonly Action onComplete; private readonly Action? onCancel; - private readonly bool rawSafe; + private readonly bool dispatchToMainThread; - public override bool RawSafe => rawSafe; + public override bool DispatchToMainThread => dispatchToMainThread; public PendingCallback( Func selector, Action onComplete, Action? onCancel, - bool rawSafe + bool dispatchToMainThread ) { this.selector = selector; this.onComplete = onComplete; this.onCancel = onCancel; - this.rawSafe = rawSafe; + this.dispatchToMainThread = dispatchToMainThread; } public override bool TryComplete(FfiEvent ffiEvent) diff --git a/Runtime/Scripts/Internal/FfiInstruction.cs b/Runtime/Scripts/Internal/FfiInstruction.cs index 3516eff0..93e89074 100644 --- a/Runtime/Scripts/Internal/FfiInstruction.cs +++ b/Runtime/Scripts/Internal/FfiInstruction.cs @@ -29,7 +29,7 @@ internal FfiInstruction( IsError = true; IsDone = true; }, - rawSafe: true); + dispatchToMainThread: false); } } @@ -65,7 +65,7 @@ internal FfiStreamInstruction( IsError = true; IsDone = true; }, - rawSafe: true); + dispatchToMainThread: false); } } @@ -119,7 +119,7 @@ internal FfiStreamResultInstruction( IsError = true; IsDone = true; }, - rawSafe: true); + dispatchToMainThread: false); } } } diff --git a/Runtime/Scripts/Internal/YieldInstruction.cs b/Runtime/Scripts/Internal/YieldInstruction.cs index 6beacf59..748e884d 100644 --- a/Runtime/Scripts/Internal/YieldInstruction.cs +++ b/Runtime/Scripts/Internal/YieldInstruction.cs @@ -6,10 +6,10 @@ namespace LiveKit public class YieldInstruction : CustomYieldInstruction { // Backing fields are volatile because completion may run on the FFI callback - // thread (raw-safe pending callbacks bypass the main-thread post). The release - // semantics of a volatile write ensure any state mutated by the completion - // (Error, ResultValue, etc.) is visible to the main thread before it observes - // IsDone == true. + // thread (pending callbacks registered with dispatchToMainThread:false bypass + // the main-thread post). The release semantics of a volatile write ensure any + // state mutated by the completion (Error, ResultValue, etc.) is visible to the + // main thread before it observes IsDone == true. private volatile bool _isDone; private volatile bool _isError; diff --git a/Tests/EditMode/RawSafeDispatchTests.cs b/Tests/EditMode/SkipDispatchTests.cs similarity index 76% rename from Tests/EditMode/RawSafeDispatchTests.cs rename to Tests/EditMode/SkipDispatchTests.cs index 10a00d34..6b2aa042 100644 --- a/Tests/EditMode/RawSafeDispatchTests.cs +++ b/Tests/EditMode/SkipDispatchTests.cs @@ -7,12 +7,12 @@ namespace LiveKit.EditModeTests { - public class RawSafeDispatchTests + public class SkipDispatchTests { // Captures Post calls so a test can verify FFIClient sent work to the - // main-thread sync context for non-raw entries, and (optionally) drain - // the queued lambda synchronously to verify the completion would have - // run on the main thread. + // main-thread sync context for entries registered with + // dispatchToMainThread:true, and (optionally) drain the queued lambda + // synchronously to verify the completion would have run on the main thread. private sealed class RecordingSyncContext : SynchronizationContext { public readonly List<(SendOrPostCallback callback, object state)> Posts = new(); @@ -35,7 +35,7 @@ public void DrainOnce() private static ulong NextAsyncId() => (ulong)Interlocked.Increment(ref _asyncIdSeed); [Test] - public void RawSafeTrueCallback_RunsOnDispatchingBackgroundThread() + public void InlineCallback_RunsOnDispatchingBackgroundThread() { var asyncId = NextAsyncId(); var testThreadId = Thread.CurrentThread.ManagedThreadId; @@ -50,7 +50,7 @@ public void RawSafeTrueCallback_RunsOnDispatchingBackgroundThread() completionThreadId = Thread.CurrentThread.ManagedThreadId; done.Set(); }, - rawSafe: true); + dispatchToMainThread: false); int dispatchThreadId = -1; // Use an explicit Thread (not Task.Run) so we know the dispatcher is @@ -64,22 +64,22 @@ public void RawSafeTrueCallback_RunsOnDispatchingBackgroundThread() { UnpublishTrack = new UnpublishTrackCallback { AsyncId = asyncId } }; - FfiClient.Instance.TryDispatchRawSafe(asyncId, ev); + FfiClient.Instance.TrySkipDispatch(asyncId, ev); }); dispatcher.Start(); Assert.IsTrue(dispatcher.Join(TimeSpan.FromSeconds(2)), "Dispatcher thread did not finish within 2s."); Assert.IsTrue(done.Wait(TimeSpan.FromSeconds(1)), - "Completion did not run within 1s — TryDispatchRawSafe may have failed silently."); + "Completion did not run within 1s — TrySkipDispatch may have failed silently."); Assert.AreEqual(dispatchThreadId, completionThreadId, - "rawSafe completion did not run on the dispatching thread — the FFI-thread fast path is not being taken."); + "Inline completion did not run on the dispatching thread — the FFI-thread fast path is not being taken."); Assert.AreNotEqual(testThreadId, completionThreadId, - "rawSafe completion ran on the test main thread — it was marshalled rather than dispatched raw."); + "Inline completion ran on the test main thread — it was marshalled rather than completed inline."); } [Test] - public void RawSafeFalseCallback_TryDispatchRawSafe_ReturnsFalseAndDoesNotComplete() + public void MainThreadCallback_TrySkipDispatch_ReturnsFalseAndDoesNotComplete() { var asyncId = NextAsyncId(); var completionRan = false; @@ -88,7 +88,7 @@ public void RawSafeFalseCallback_TryDispatchRawSafe_ReturnsFalseAndDoesNotComple asyncId, static e => e.UnpublishTrack, cb => { completionRan = true; }, - rawSafe: false); + dispatchToMainThread: true); try { @@ -96,12 +96,12 @@ public void RawSafeFalseCallback_TryDispatchRawSafe_ReturnsFalseAndDoesNotComple { UnpublishTrack = new UnpublishTrackCallback { AsyncId = asyncId } }; - var dispatched = FfiClient.Instance.TryDispatchRawSafe(asyncId, ev); + var skipped = FfiClient.Instance.TrySkipDispatch(asyncId, ev); - Assert.IsFalse(dispatched, - "TryDispatchRawSafe should return false for entries registered with rawSafe: false."); + Assert.IsFalse(skipped, + "TrySkipDispatch should return false for entries registered with dispatchToMainThread:true."); Assert.IsFalse(completionRan, - "Completion ran via TryDispatchRawSafe even though rawSafe was false."); + "Completion ran via TrySkipDispatch even though the entry requires main-thread dispatch."); } finally { @@ -111,11 +111,11 @@ public void RawSafeFalseCallback_TryDispatchRawSafe_ReturnsFalseAndDoesNotComple } // Integration test: drives the same code path FFICallback uses, so this - // fails if a future refactor removes the rawSafe short-circuit from - // RouteFfiEvent. Calling RouteFfiEvent from a fresh Thread simulates + // fails if a future refactor removes the TrySkipDispatch short-circuit + // from RouteFfiEvent. Calling RouteFfiEvent from a fresh Thread simulates // Rust calling FFICallback from its own worker thread. [Test] - public void RouteFfiEvent_RawSafeTrue_CompletesOnDispatchingThread() + public void RouteFfiEvent_InlineCallback_CompletesOnDispatchingThread() { var asyncId = NextAsyncId(); var testThreadId = Thread.CurrentThread.ManagedThreadId; @@ -130,7 +130,7 @@ public void RouteFfiEvent_RawSafeTrue_CompletesOnDispatchingThread() completionThreadId = Thread.CurrentThread.ManagedThreadId; done.Set(); }, - rawSafe: true); + dispatchToMainThread: false); int dispatchThreadId = -1; var dispatcher = new Thread(() => @@ -147,19 +147,19 @@ public void RouteFfiEvent_RawSafeTrue_CompletesOnDispatchingThread() "Dispatcher thread did not finish within 2s."); Assert.IsTrue(done.Wait(TimeSpan.FromSeconds(1)), - "Completion did not run — RouteFfiEvent may be missing the TryDispatchRawSafe call for rawSafe entries."); + "Completion did not run — RouteFfiEvent may be missing the TrySkipDispatch call for inline entries."); Assert.AreEqual(dispatchThreadId, completionThreadId, - "rawSafe completion did not run on the dispatching thread — RouteFfiEvent marshalled it instead."); + "Inline completion did not run on the dispatching thread — RouteFfiEvent marshalled it instead."); Assert.AreNotEqual(testThreadId, completionThreadId, - "rawSafe completion ran on the test main thread."); + "Inline completion ran on the test main thread."); } - // Integration test: non-raw entries must reach the main-thread - // SynchronizationContext via Post. We swap _context for a recording - // SC so we can observe the post (and drain it to verify the queued - // lambda actually invokes the completion). + // Integration test: entries registered with dispatchToMainThread:true must + // reach the main-thread SynchronizationContext via Post. We swap _context + // for a recording SC so we can observe the post (and drain it to verify + // the queued lambda actually invokes the completion). [Test] - public void RouteFfiEvent_RawSafeFalse_PostsToSynchronizationContext_AndDrainsCompletion() + public void RouteFfiEvent_MainThreadCallback_PostsToSynchronizationContext_AndDrainsCompletion() { var asyncId = NextAsyncId(); int completionThreadId = -1; @@ -173,7 +173,7 @@ public void RouteFfiEvent_RawSafeFalse_PostsToSynchronizationContext_AndDrainsCo completionThreadId = Thread.CurrentThread.ManagedThreadId; completionRan.Set(); }, - rawSafe: false); + dispatchToMainThread: true); var recording = new RecordingSyncContext(); var originalContext = FfiClient.Instance._context; @@ -196,9 +196,9 @@ public void RouteFfiEvent_RawSafeFalse_PostsToSynchronizationContext_AndDrainsCo "Dispatcher thread did not finish within 2s."); Assert.IsFalse(completionRan.IsSet, - "Non-raw completion ran synchronously on the dispatcher thread — it should have been posted, not executed."); + "Main-thread completion ran synchronously on the dispatcher thread — it should have been posted, not executed."); Assert.AreEqual(1, recording.Posts.Count, - "RouteFfiEvent should have posted exactly one work item to the sync context for a non-raw pending entry."); + "RouteFfiEvent should have posted exactly one work item to the sync context for a main-thread pending entry."); // Drain the queued lambda on this (test) thread, simulating Unity's // main-thread queue drain. The completion must now run, and on this thread. @@ -210,7 +210,7 @@ public void RouteFfiEvent_RawSafeFalse_PostsToSynchronizationContext_AndDrainsCo Assert.AreEqual(drainerThreadId, completionThreadId, "Drained completion should run on the thread that drains the sync context."); Assert.AreNotEqual(dispatchThreadId, completionThreadId, - "Completion ran on the dispatcher's thread despite rawSafe being false."); + "Completion ran on the dispatcher's thread despite dispatchToMainThread:true."); } finally { @@ -223,9 +223,10 @@ public void RouteFfiEvent_RawSafeFalse_PostsToSynchronizationContext_AndDrainsCo public void TryDispatchPendingCallback_RunsCompletionSynchronouslyOnCallerThread() { // Sanity check: TryDispatchPendingCallback itself is synchronous on the - // caller's thread regardless of rawSafe. What makes the rawSafe path - // "raw" is that FFICallback (the FFI thread) is the caller, vs. the - // SynchronizationContext-posted lambda (the main thread) for non-raw. + // caller's thread regardless of dispatchToMainThread. What makes the + // inline path "skip dispatch" is that FFICallback (the FFI thread) is + // the caller, vs. the SynchronizationContext-posted lambda (the main + // thread) for entries that require main-thread dispatch. var asyncId = NextAsyncId(); int completionThreadId = -1; var done = new ManualResetEventSlim(false); @@ -238,7 +239,7 @@ public void TryDispatchPendingCallback_RunsCompletionSynchronouslyOnCallerThread completionThreadId = Thread.CurrentThread.ManagedThreadId; done.Set(); }, - rawSafe: false); + dispatchToMainThread: true); int dispatchThreadId = -1; var dispatcher = new Thread(() => diff --git a/Tests/EditMode/RawSafeDispatchTests.cs.meta b/Tests/EditMode/SkipDispatchTests.cs.meta similarity index 100% rename from Tests/EditMode/RawSafeDispatchTests.cs.meta rename to Tests/EditMode/SkipDispatchTests.cs.meta From 24968038982467cf5ca2038029aa5ca4253e01a6 Mon Sep 17 00:00:00 2001 From: Max Heimbrock <43608204+MaxHeimbrock@users.noreply.github.com> Date: Thu, 30 Apr 2026 13:24:57 +0200 Subject: [PATCH 13/15] Cleanup --- Runtime/Scripts/Internal/FFIClient.cs | 3 --- 1 file changed, 3 deletions(-) diff --git a/Runtime/Scripts/Internal/FFIClient.cs b/Runtime/Scripts/Internal/FFIClient.cs index ad2fd42d..55c140c0 100644 --- a/Runtime/Scripts/Internal/FFIClient.cs +++ b/Runtime/Scripts/Internal/FFIClient.cs @@ -392,9 +392,6 @@ private static void DispatchEvent(FfiEvent ffiEvent) case FfiEvent.MessageOneofCase.VideoStreamEvent: Instance.VideoStreamEventReceived?.Invoke(ffiEvent.VideoStreamEvent!); break; - // Logs, AudioStreamEvent, ByteStreamReaderEvent, and TextStreamReaderEvent - // are dispatched directly on the FFI callback thread by RouteFfiEvent and - // never reach this switch — see the fast-path early-returns there. case FfiEvent.MessageOneofCase.DataTrackStreamEvent: Instance.DataTrackStreamEventReceived?.Invoke(ffiEvent.DataTrackStreamEvent!); break; From 2d644d06f034e75c33fe2d8a4bf578ab2bee73d8 Mon Sep 17 00:00:00 2001 From: Max Heimbrock <43608204+MaxHeimbrock@users.noreply.github.com> Date: Wed, 6 May 2026 09:02:35 +0200 Subject: [PATCH 14/15] Adds the EventThreadingModel.md to document FFI event dispatch behaviour --- EventThreadingModel.md | 64 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 EventThreadingModel.md diff --git a/EventThreadingModel.md b/EventThreadingModel.md new file mode 100644 index 00000000..eccbafaf --- /dev/null +++ b/EventThreadingModel.md @@ -0,0 +1,64 @@ +## Summary + +FFI events whose handlers don't touch Unity APIs run directly on the FFI callback thread instead of being marshaled to Unity's main thread via `SynchronizationContext.Post`. + +`FFICallback` previously usually routes Rust events (except `AudioStreamEvent`) through `_context.Post` to Unity's main thread. That's the safe default for handlers that touch Unity APIs (`Texture2D`, `GameObject`, `Transform`, …) but it costs one frame of latency for handlers that don't. Four categories of events can skip that: + +1. **Audio stream events** that are written to the audio stream ring buffer and consumed on audio thread. +2. **One-shot async completions** that only flip `IsDone` on a `YieldInstruction` — `SetMetadata`, `UnpublishTrack`, all stream `Read/Write/Close` ops. +3. **Stream reader chunk events** that just append bytes/strings to an internal buffer. +4. **Log batches** — `UnityEngine.Debug.unityLogger` is documented thread-safe; the post hop adds latency without benefit, especially during error storms or `LK_VERBOSE` noise. + +## Logic in code: + +```csharp +internal static void RouteFfiEvent(FfiEvent response) +{ + if (_isDisposed) return; + + // 1. Per-event-type fast paths — invoke handler directly on FFI thread. + if (response.MessageCase == FfiEvent.MessageOneofCase.AudioStreamEvent) { ...; return; } + if (response.MessageCase == FfiEvent.MessageOneofCase.Logs) { ...; return; } + if (response.MessageCase == FfiEvent.MessageOneofCase.ByteStreamReaderEvent) { ...; return; } + if (response.MessageCase == FfiEvent.MessageOneofCase.TextStreamReaderEvent) { ...; return; } + + // 2. One-shot completion fast path — opted-in pending callbacks complete inline. + var requestAsyncId = ExtractRequestAsyncId(response); + if (requestAsyncId.HasValue && Instance.TrySkipDispatch(requestAsyncId.Value, response)) + return; + + // 3. Fallback — post to Unity's main-thread sync context. + Instance._context?.Post(static (resp) => + { + var r = resp as FfiEvent; + if (r == null) return; + DispatchEvent(r); + }, response); +} +``` + +## Event Table + +| Event | Where it runs | Why | +| --- | --- | --- | +| `AudioStreamEvent` | **FFI thread** (unchanged) | Audio thread consumes the data; main-thread latency would hurt timing | +| `Logs` | **FFI thread** (new) | `Debug.unityLogger` is thread-safe; logs reach console immediately during panics / errors | +| `ByteStreamReaderEvent` | **FFI thread** (new) | Internal buffer is now lock-protected; chunks land without frame delay | +| `TextStreamReaderEvent` | **FFI thread** (new) | Same lock as byte path (shared `ReadIncrementalInstructionBase`) | +| One-shot completions via `FfiInstruction` | **FFI thread** (new) | `SetLocalMetadata`, `SetLocalName`, `SetLocalAttributes`, `UnpublishTrack` — only flip `IsDone`/`IsError` | +| One-shot completions via `FfiStreamInstruction` | **FFI thread** (new) | `ByteStreamWriter.Write/Close`, `TextStreamWriter.Write/Close` | +| One-shot completions via `FfiStreamResultInstruction` | **FFI thread** (new) | `ByteStreamReader.ReadAll/WriteToFile`, `TextStreamReader.ReadAll` | +| `RoomEvent` | Main thread | Fires user-facing `ParticipantConnected`, `TrackPublished`, etc. | +| `TrackEvent` | Main thread | (No subscribers today; main-thread default for safety) | +| `RpcMethodInvocation` | Main thread | User RPC handlers commonly touch game state | +| `Disconnect` | Main thread | UI updates typical | +| `VideoStreamEvent` | Main thread | Internal buffering is fast; user-facing raw delivery deferred (see follow-ups) | +| `DataTrackStreamEvent` | Main thread | Deferred until a concrete consumer asks | +| `Connect` (one-shot) | Main thread | Bespoke handler fires participant-connected events | +| `PublishTrack` (one-shot) | Main thread | Bespoke handler | +| `GetStats` (one-shot) | Main thread | Bespoke handler | +| `CaptureAudioFrame` (one-shot) | Main thread | Bespoke handler | +| `PerformRpc` (one-shot) | Main thread | Bespoke handler surfaces response | +| `SendText` / `SendFile` (one-shot) | Main thread | Bespoke handlers | +| `TextStreamOpen` / `ByteStreamOpen` (one-shot) | Main thread | Bespoke handlers return writer objects | +| `PublishDataTrack` (one-shot) | Main thread | Bespoke handler | From 070e9daef16e0f064b23ebd6c328bee6efd122ea Mon Sep 17 00:00:00 2001 From: Max Heimbrock <43608204+MaxHeimbrock@users.noreply.github.com> Date: Wed, 6 May 2026 09:18:06 +0200 Subject: [PATCH 15/15] Adding .meta file for new doc file --- EventThreadingModel.md.meta | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 EventThreadingModel.md.meta diff --git a/EventThreadingModel.md.meta b/EventThreadingModel.md.meta new file mode 100644 index 00000000..3e4a076b --- /dev/null +++ b/EventThreadingModel.md.meta @@ -0,0 +1,7 @@ +fileFormatVersion: 2 +guid: c67584a1ec7ca416a82bb60874ed7513 +TextScriptImporter: + externalObjects: {} + userData: + assetBundleName: + assetBundleVariant: