Skip to content

Reduce main thread dispatch if not needed#269

Open
MaxHeimbrock wants to merge 13 commits intomainfrom
max/threading/internal-raw-dispatch
Open

Reduce main thread dispatch if not needed#269
MaxHeimbrock wants to merge 13 commits intomainfrom
max/threading/internal-raw-dispatch

Conversation

@MaxHeimbrock
Copy link
Copy Markdown
Contributor

@MaxHeimbrock MaxHeimbrock commented Apr 30, 2026

Move latency-sensitive FFI events off the main thread

Summary

Internal threading change. FFI events whose handlers don't touch Unity APIs now run directly on the FFI callback thread instead of being marshaled to Unity's main thread via SynchronizationContext.Post. Saves up to one frame of latency on async ops, log delivery, and stream chunk arrival, and removes a small but real allocation per event from the main-thread queue.

No public API changes. The user-facing threading contract is unchanged: Room.ParticipantConnected, TrackPublished, VideoStream.TextureReceived, RPC handlers, Disconnect, etc. all still fire on the main thread.

Why

FFICallback previously routed every Rust event (except AudioStreamEvent) through _context.Post to Unity's main thread. That's the safe default for handlers that touch Unity APIs (Texture2D, GameObject, Transform, …) but it costs one frame of latency for handlers that don't. Three categories of events are silently paying that cost without benefit:

  1. One-shot async completions that only flip IsDone on a YieldInstructionSetMetadata, UnpublishTrack, all stream Read/Write/Close ops.
  2. Stream reader chunk events that just append bytes/strings to an internal buffer.
  3. Log batchesUnityEngine.Debug.unityLogger is documented thread-safe; the post hop adds latency without benefit, especially during error storms or LK_VERBOSE noise.

This PR moves only those off the main thread. Bespoke completion handlers (anything that fires user events or constructs Unity objects) keep main-thread dispatch.

Event dispatch matrix

After this PR:

Event Where it runs Why
AudioStreamEvent FFI thread (unchanged) Audio thread consumes the data; main-thread latency would hurt timing
Logs FFI thread (new) Debug.unityLogger is thread-safe; logs reach console immediately during panics / errors
ByteStreamReaderEvent FFI thread (new) Internal buffer is now lock-protected; chunks land without frame delay
TextStreamReaderEvent FFI thread (new) Same lock as byte path (shared ReadIncrementalInstructionBase)
One-shot completions via FfiInstruction<T> FFI thread (new) SetLocalMetadata, SetLocalName, SetLocalAttributes, UnpublishTrack — only flip IsDone/IsError
One-shot completions via FfiStreamInstruction<T> FFI thread (new) ByteStreamWriter.Write/Close, TextStreamWriter.Write/Close
One-shot completions via FfiStreamResultInstruction<T,U> FFI thread (new) ByteStreamReader.ReadAll/WriteToFile, TextStreamReader.ReadAll
RoomEvent Main thread Fires user-facing ParticipantConnected, TrackPublished, etc.
TrackEvent Main thread (No subscribers today; main-thread default for safety)
RpcMethodInvocation Main thread User RPC handlers commonly touch game state
Disconnect Main thread UI updates typical
VideoStreamEvent Main thread Internal buffering is fast; user-facing raw delivery deferred (see follow-ups)
DataTrackStreamEvent Main thread Deferred until a concrete consumer asks
Connect (one-shot) Main thread Bespoke handler fires participant-connected events
PublishTrack (one-shot) Main thread Bespoke handler
GetStats (one-shot) Main thread Bespoke handler
CaptureAudioFrame (one-shot) Main thread Bespoke handler
PerformRpc (one-shot) Main thread Bespoke handler surfaces response
SendText / SendFile (one-shot) Main thread Bespoke handlers
TextStreamOpen / ByteStreamOpen (one-shot) Main thread Bespoke handlers return writer objects
PublishDataTrack (one-shot) Main thread Bespoke handler

Decision logic at runtime

FFICallback now only parses bytes and delegates to RouteFfiEvent. The routing decision lives in one place (Runtime/Scripts/Internal/FFIClient.cs):

internal static void RouteFfiEvent(FfiEvent response)
{
    if (_isDisposed) return;

    // 1. Per-event-type fast paths — invoke handler directly on FFI thread.
    if (response.MessageCase == FfiEvent.MessageOneofCase.AudioStreamEvent)      { ...; return; }
    if (response.MessageCase == FfiEvent.MessageOneofCase.Logs)                  { ...; return; }
    if (response.MessageCase == FfiEvent.MessageOneofCase.ByteStreamReaderEvent) { ...; return; }
    if (response.MessageCase == FfiEvent.MessageOneofCase.TextStreamReaderEvent) { ...; return; }

    // 2. One-shot completion fast path — opted-in pending callbacks complete inline.
    var requestAsyncId = ExtractRequestAsyncId(response);
    if (requestAsyncId.HasValue && Instance.TrySkipDispatch(requestAsyncId.Value, response))
        return;

    // 3. Fallback — post to Unity's main-thread sync context.
    Instance._context?.Post(static (resp) =>
    {
        var r = resp as FfiEvent;
        if (r == null) return;
        DispatchEvent(r);
    }, response);
}

TrySkipDispatch looks up the registered pending callback and only handles the event inline if the callback opted out of main-thread dispatch:

internal bool TrySkipDispatch(ulong requestAsyncId, FfiEvent ffiEvent)
{
    if (!pendingCallbacks.TryGetValue(requestAsyncId, out var pending)) return false;
    if (pending.DispatchToMainThread) return false;
    return TryDispatchPendingCallback(requestAsyncId, ffiEvent);
}

Three exit points:

  1. No matching pending entry → false. (Either not a one-shot completion, or already cancelled.)
  2. Entry exists with DispatchToMainThread == true → false. Caller falls through to _context.Post.
  3. Entry exists with DispatchToMainThread == false → run completion inline, return true.

Opt-in flow at registration

The default is conservative — every RegisterPendingCallback callsite implicitly requires main-thread dispatch:

internal void RegisterPendingCallback<TCallback>(
    ulong requestAsyncId,
    Func<FfiEvent, TCallback?> selector,
    Action<TCallback> onComplete,
    Action? onCancel = null,
    bool dispatchToMainThread = true   // safe default
) where TCallback : class

The three generic FfiInstruction* classes opt out, because their onComplete only mutates volatile YieldInstruction state:

// FfiInstruction.cs
FfiClient.Instance.RegisterPendingCallback(
    asyncId,
    selector,
    onComplete,
    onCancel,
    dispatchToMainThread: false);

The ten bespoke registration sites (Connect, PublishTrack, GetStats, …) keep the default. They fire user events, construct C# objects, or otherwise touch Unity-visible state.

To make YieldInstruction safe to read across threads, IsDone and IsError (and the corresponding IsEos/IsCurrentReadDone on StreamYieldInstruction) are backed by volatile fields. The release semantics of the IsDone write make any preceding state mutations (Error, _result, …) visible to the main-thread reader after it observes IsDone == true.

Tests

Eleven new EditMode tests, all bounded with explicit DateTime.UtcNow deadlines and Thread.Join(TimeSpan)/ManualResetEventSlim.Wait(TimeSpan) so a deadlock or volatile regression fails fast with a descriptive Assert.Fail rather than hanging the runner.

Test file Tests What it guards
Tests/EditMode/YieldInstructionThreadingTests.cs 3 Cross-thread visibility of volatile IsDone/IsError; keepWaiting flips after a background completion
Tests/EditMode/DataStreamIncrementalReadTests.cs (extended) +3 Concurrent producer/consumer over 5000 chunks; OnEos racing with chunks; Reset racing with OnChunk (200 trials)
Tests/EditMode/SkipDispatchTests.cs 5 Inline dispatch lands on the dispatching thread; main-thread entries reach SynchronizationContext.Post; integration via RouteFfiEvent

The RouteFfiEvent integration tests are the load-bearing ones. They drive the same code path FFICallback uses, so a future refactor that drops the TrySkipDispatch short-circuit fails them with a clear message:

Completion did not run — RouteFfiEvent may be missing the TrySkipDispatch call for inline entries.

Verified by negative control: temporarily commenting out the TrySkipDispatch call in RouteFfiEvent reproduced exactly that failure on the first run.

The non-raw integration test (RouteFfiEvent_MainThreadCallback_PostsToSynchronizationContext_AndDrainsCompletion) installs a RecordingSynchronizationContext as FfiClient._context for the test duration, calls RouteFfiEvent from a background thread, and asserts: (a) the completion did not run on the dispatcher thread, (b) exactly one item was posted, (c) draining the queue from the test thread runs the completion on the drainer thread.

MaxHeimbrock and others added 11 commits April 29, 2026 20:04
Adds an opt-in `rawSafe` flag to `RegisterPendingCallback`. When set,
`FFICallback` resolves the pending callback directly on the FFI callback
thread instead of marshaling through Unity's main-thread
SynchronizationContext. Saves up to one frame of latency on async ops
whose completion only mutates volatile YieldInstruction state.

`YieldInstruction.IsDone`/`IsError` are converted to volatile-backed
properties so the release semantics of the completion's volatile write
make any preceding state mutations visible to the main-thread reader
once it observes IsDone == true.

Opted in: the three generic instruction classes
(`FfiInstruction<T>`, `FfiStreamInstruction<T>`,
`FfiStreamResultInstruction<T,U>`) which only set IsDone/IsError/Error/
result and do not touch Unity APIs. This covers SetLocalName/Metadata/
Attributes, UnpublishTrack, all stream Read/Write/Close/WriteToFile.

Not opted in: bespoke completion handlers (Connect, PublishTrack,
GetStats, CaptureAudioFrame, PerformRpc, SendText, SendFile,
TextStreamOpen, ByteStreamOpen, PublishDataTrack). Those create C#
objects and/or fire user events and need a separate audit before they
can move off the main thread.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Short-circuits ByteStreamReaderEvent in FFICallback so chunk-received
events land in the incremental read buffer without waiting for the
next main-thread frame drain. Mirrors the AudioStreamEvent fast path.

The internal subscriber (ByteStreamReader.ReadIncrementalInstruction)
forwards into ReadIncrementalInstructionBase, which now serializes
mutations of the chunk queue, latest chunk, error, and the IsEos /
IsCurrentReadDone flags under a single lock. The flag fields on
StreamYieldInstruction are made volatile so the main-thread coroutine's
keepWaiting poll observes the FFI thread's writes without acquiring the
lock.

Only ReadIncremental consumers benefit; ReadAll still goes through
FfiStreamResultInstruction (already raw-safe via the previous commit).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors the byte-stream fast path. TextStreamReader.ReadIncrementalInstruction
shares ReadIncrementalInstructionBase<TContent> with the byte reader,
so the lock added in the previous commit already covers text chunk
mutations — only the FFICallback short-circuit needs to be added.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three new EditMode tests in DataStreamIncrementalReadTests covering the
ReadIncrementalInstructionBase lock added for raw chunk dispatch:
  - producer/consumer with 5000 chunks, asserts FIFO and no loss
  - OnEos racing with chunks, asserts consistent post-race state
  - Reset racing with OnChunk, asserts no chunk lost (200 trials)

YieldInstructionThreadingTests is a new file with three tests that
guard the volatile semantics of YieldInstruction.IsDone / IsError:
  - background thread sets IsDone, foreground spin observes it
  - keepWaiting flips after a background completion
  - IsError write before IsDone is visible after the volatile read

Each test has a Timeout attribute so a regression that breaks the
volatile read or the lock fails the runner instead of hanging it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The [Timeout] attribute does not behave reliably in our CI runner.
Removed it from all six threading tests and replaced the only
unbounded Task.Wait() with a Wait(TimeSpan) + Assert.Fail pair so a
genuine deadlock still produces a clean test failure rather than
hanging until the outer CI wall-clock kills the runner.

The in-test DateTime.UtcNow deadlines on the spin loops already
convert hang regressions into descriptive Assert.Fail messages, so
the [Timeout] attribute was redundant safety on top of those.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
base.Reset() writes IsCurrentReadDone=false outside the chunk lock
because StreamYieldInstruction owns the field but doesn't know about
the lock the subclass added. That left a window where a producer's
OnChunk could acquire the lock between base.Reset() and Reset()'s
re-acquisition, write _latestChunk and set IsCurrentReadDone=true,
and have its chunk immediately overwritten by Reset()'s subsequent
queue dequeue. The producer's chunk was lost.

Empirically the race fired ~4% of the time under stress
(150-300 chunks lost out of 5000 across 5 iterations of
OnChunk_ConcurrentProducerAndConsumer_AllChunksObservedInOrder).

Fix: move base.Reset() inside the lock so the entire IsCurrentReadDone
false-then-maybe-true sequence is atomic against OnChunk.

After fix: 5/5 iterations × 6 tests = 30/30 passing via
Scripts~/run_unity.sh test -m EditMode -n 5.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The earlier raw-safe commit only flipped FfiInstruction<T> and missed
the two sibling generic classes despite the commit message claiming
all three were opted in. They follow the same pattern: only mutate
volatile YieldInstruction state plus an Error / _result reference set
strictly before IsDone, no Unity APIs touched.

Adds rawSafe: true to:
  - FfiStreamInstruction<TCallback>          — covers
    ByteStreamWriter.WriteInstruction / CloseInstruction and
    TextStreamWriter.WriteInstruction / CloseInstruction
  - FfiStreamResultInstruction<TCallback,TResult> — covers
    ByteStreamReader.ReadAllInstruction / WriteToFileInstruction and
    TextStreamReader.ReadAllInstruction

Verified: 5/5 iterations × 6 threading tests = 30/30 passing via
Scripts~/run_unity.sh test -m EditMode -n 5.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The existing threading tests verify cross-thread state visibility
(volatile IsDone, lock around chunk mutations) but cannot detect a
regression that silently routed raw-safe completions back through
SynchronizationContext.Post — the volatile/lock work is correct in
either case.

Adds three EditMode tests that prove the FFI-thread fast path is
actually taken:

  RawSafeTrueCallback_RunsOnDispatchingBackgroundThread
    Registers a rawSafe:true pending callback, dispatches via
    TryDispatchRawSafe from a freshly-created Thread, asserts the
    completion captured the dispatcher's ManagedThreadId — not the
    test main thread's. This is the load-bearing test.

  RawSafeFalseCallback_TryDispatchRawSafe_ReturnsFalseAndDoesNotComplete
    Negative control: TryDispatchRawSafe returns false for entries
    registered with rawSafe:false and does not invoke the completion.

  TryDispatchPendingCallback_RunsCompletionSynchronouslyOnCallerThread
    Sanity check that the underlying race-proof dispatch is
    synchronous on the caller's thread regardless of rawSafe — what
    makes the rawSafe path "raw" is that FFICallback (the FFI thread)
    is the caller, not the dispatch mechanism itself.

Two FfiClient methods (TryDispatchRawSafe, TryDispatchPendingCallback)
go from private to internal so the tests can invoke them directly.
The race-semantics comments are unchanged.

Verified: 3 tests x 5 iterations = 15/15 passing via
Scripts~/run_unity.sh test -m EditMode -n 5 -f RawSafeDispatch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The earlier RawSafeDispatchTests called TryDispatchRawSafe directly,
which proves the inner method is synchronous on the caller's thread
but does not prove FFICallback actually invokes it for raw-safe
entries. Commenting out the if-block in FFICallback would not have
made any of those tests fail.

Refactor: extract everything in FFICallback after the byte parse into
RouteFfiEvent(FfiEvent). FFICallback shrinks to disposed-check, parse,
RouteFfiEvent. Production traffic still always lands in FFICallback;
tests can now drive RouteFfiEvent directly from a chosen thread
without P/Invoke pointer juggling.

Two new integration tests:

  RouteFfiEvent_RawSafeTrue_CompletesOnDispatchingThread
    Calls RouteFfiEvent from a fresh Thread, asserts the rawSafe
    completion captured the dispatcher thread's id (not the test
    main thread). Verified to fail with a clear message when the
    TryDispatchRawSafe call is removed from RouteFfiEvent.

  RouteFfiEvent_RawSafeFalse_PostsToSynchronizationContext_AndDrainsCompletion
    Swaps FfiClient.Instance._context for a RecordingSynchronizationContext,
    dispatches a non-raw entry, asserts the completion was NOT run on
    the dispatcher thread, asserts exactly one item was posted to the
    SC, then drains the queue from the test thread and asserts the
    completion runs on the drainer thread.

Verified: 5 tests x 5 iterations = 25/25 passing via
Scripts~/run_unity.sh test -m EditMode -n 5 -f RawSafeDispatch.

Negative control: with the rawSafe block in RouteFfiEvent commented
out, the new integration test fails with:
  "Completion did not run — RouteFfiEvent may be missing the
   TryDispatchRawSafe call for rawSafe entries."

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
AudioStreamEvent, ByteStreamReaderEvent, and TextStreamReaderEvent
all early-return inside RouteFfiEvent before reaching the
SynchronizationContext.Post that drains into DispatchEvent. The
switch arms for those three events were therefore unreachable —
DispatchEvent is private and only the post lambda calls it.

Replaced with a single comment pointing readers at the fast-path
returns. AudioStreamEvent's case had been dead since the audio
fast-path was added (predates this branch); the other two became
dead in cc2ed4f / 967d0b9.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds Logs to RouteFfiEvent's fast-path early-returns alongside the
audio and stream-reader events. UnityEngine.Debug.unityLogger is one
of the few Unity APIs documented thread-safe — it queues to the
console drain internally — so calling Utils.HandleLogBatch from the
FFI thread is safe.

Skipping the main-thread post matters most during error storms,
panics, and LK_VERBOSE noise where Rust may emit log batches faster
than Unity drains its post queue. Logs now reach the console without
a one-frame delay even if the main thread is busy or wedged. There
are no public subscribers for the Logs event, so this has no effect
on user-facing surfaces.

The unreachable Logs case in DispatchEvent's switch is removed (now
covered by the comment that already documents the other fast-path
events).

Verified: 11 tests x 5 iterations = 55/55 passing via
Scripts~/run_unity.sh test -m EditMode -n 5.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@MaxHeimbrock MaxHeimbrock changed the title Reduce main thread dispatch for data streams Reduce main thread dispatch if not needed Apr 30, 2026
MaxHeimbrock and others added 2 commits April 30, 2026 12:48
The rawSafe flag described a capability ("safe to run on the FFI
thread") but its name was opaque, and TryDispatchRawSafe was a
misnomer — the method doesn't dispatch anything; it runs the
completion inline on the caller's thread. Rename for clarity:

  rawSafe: false (default) → dispatchToMainThread: true (default)
  rawSafe: true            → dispatchToMainThread: false

  TryDispatchRawSafe       → TrySkipDispatch
  PendingCallbackBase.RawSafe → DispatchToMainThread

The default flips polarity but the meaning is the same: by default,
completions go through SynchronizationContext.Post (safe for any
handler that touches Unity APIs); the four generic FfiInstruction*
classes opt out by passing dispatchToMainThread:false because their
onComplete only mutates volatile state.

TrySkipDispatch reads correctly at the call site:

  if (Instance.TrySkipDispatch(...)) return;
  // fall through to dispatch
  Instance._context?.Post(...);

— "try to skip the dispatch; if we can't, dispatch normally."

TryDispatchPendingCallback keeps its name: "dispatch" there means
function-dispatch (route the event to its registered handler), a
different sense of the word and at a different abstraction level.

Test file renamed RawSafeDispatchTests.cs → SkipDispatchTests.cs
(GUID preserved so Unity sees the rename, not delete+add). Test
class and method names updated to match the new vocabulary.

Verified: 11 tests x 5 iterations = 55/55 passing via
Scripts~/run_unity.sh test -m EditMode -n 5.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@MaxHeimbrock MaxHeimbrock marked this pull request as ready for review May 4, 2026 11:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant