Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 39 additions & 17 deletions Runtime/Scripts/DataStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public abstract class ReadIncrementalInstructionBase<TContent> : StreamYieldInst
private readonly Queue<TContent> _pendingChunks = new();
private TContent _latestChunk;

// Chunk events arrive on the FFI thread; Reset() and the LatestChunk getter
// run on the main-thread coroutine. _gate serializes mutations of the queue,
// _latestChunk, IsCurrentReadDone, IsEos, and Error across both sides.
private readonly object _gate = new();

/// <summary>
/// Error that occurred on the last read, if any.
/// </summary>
Expand All @@ -94,8 +99,11 @@ protected TContent LatestChunk
{
get
{
if (Error != null) throw Error;
return _latestChunk;
lock (_gate)
{
if (Error != null) throw Error;
return _latestChunk;
}
}
}

Expand All @@ -108,34 +116,48 @@ protected ReadIncrementalInstructionBase(FfiHandle readerHandle)

protected void OnChunk(TContent content)
{
if (IsCurrentReadDone)
{
// Consumer hasn't yielded since the last chunk; buffer until Reset().
_pendingChunks.Enqueue(content);
}
else
lock (_gate)
{
_latestChunk = content;
IsCurrentReadDone = true;
if (IsCurrentReadDone)
{
// Consumer hasn't yielded since the last chunk; buffer until Reset().
_pendingChunks.Enqueue(content);
}
else
{
_latestChunk = content;
IsCurrentReadDone = true;
}
}
}

public override void Reset()
{
base.Reset();
if (_pendingChunks.Count > 0)
// base.Reset() must run under the same lock as OnChunk, otherwise the
// window between IsCurrentReadDone=false (from base) and the dequeue
// below lets a producer race in, write _latestChunk, and have its
// chunk immediately overwritten by the dequeue. That race lost ~4% of
// chunks under stress before this fix.
lock (_gate)
{
_latestChunk = _pendingChunks.Dequeue();
IsCurrentReadDone = true;
base.Reset();
if (_pendingChunks.Count > 0)
{
_latestChunk = _pendingChunks.Dequeue();
IsCurrentReadDone = true;
}
}
}

protected void OnEos(Proto.StreamError protoError)
{
IsEos = true;
if (protoError != null)
lock (_gate)
{
Error = new StreamError(protoError);
IsEos = true;
if (protoError != null)
{
Error = new StreamError(protoError);
}
}
}
}
Expand Down
103 changes: 87 additions & 16 deletions Runtime/Scripts/Internal/FFIClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ internal void RegisterPendingCallback<TCallback>(
ulong requestAsyncId,
Func<FfiEvent, TCallback?> selector,
Action<TCallback> onComplete,
Action? onCancel = null
Action? onCancel = null,
bool dispatchToMainThread = true
) where TCallback : class
{
// Request registration must happen before the request is sent. That ordering is what
Expand All @@ -198,7 +199,13 @@ internal void RegisterPendingCallback<TCallback>(
//
// Duplicate IDs are treated as a hard error because they would allow two unrelated
// requests to compete for the same completion slot.
var pending = new PendingCallback<TCallback>(selector, onComplete, onCancel);
//
// dispatchToMainThread defaults to true: completion runs on Unity's main thread via
// SynchronizationContext.Post, which is safe for any onComplete that touches Unity
// APIs or fires user events. Pass false when the onComplete only mutates volatile
// YieldInstruction state — RouteFfiEvent will then run it inline on the FFI callback
// thread instead of paying a frame of latency for the post.
var pending = new PendingCallback<TCallback>(selector, onComplete, onCancel, dispatchToMainThread);
if (!pendingCallbacks.TryAdd(requestAsyncId, pending))
{
throw new InvalidOperationException($"Duplicate pending callback for request_async_id={requestAsyncId}");
Expand Down Expand Up @@ -278,6 +285,15 @@ static unsafe void FFICallback(UIntPtr data, UIntPtr size)

var respData = new Span<byte>(data.ToPointer()!, (int)size.ToUInt64());
var response = FfiEvent.Parser!.ParseFrom(respData);
RouteFfiEvent(response);
}

// Routing logic split out from FFICallback so tests can drive it from a
// chosen thread without going through the P/Invoke entry point. Running
// production traffic still always lands here via FFICallback above.
internal static void RouteFfiEvent(FfiEvent response)
{
if (_isDisposed) return;

// Audio stream events are handled directly on the FFI callback thread
// to bypass the main thread, since the audio thread consumes the data
Expand All @@ -287,6 +303,47 @@ static unsafe void FFICallback(UIntPtr data, UIntPtr size)
return;
}

// Log batches are forwarded directly. UnityEngine.Debug.unityLogger is
// documented thread-safe; Unity's logger queues to its console drain
// internally. Skipping the main-thread post means logs reach the
// console without a one-frame delay — useful during error storms,
// panics, or LK_VERBOSE noise where the post queue could otherwise
// back up.
if (response.MessageCase == FfiEvent.MessageOneofCase.Logs)
{
Utils.HandleLogBatch(response.Logs);
return;
}

// Byte stream reader events feed an internal incremental-read buffer that
// already serializes mutations under its own lock. Skipping the main-thread
// post lets chunks land in the buffer immediately rather than waiting for
// the next frame drain.
if (response.MessageCase == FfiEvent.MessageOneofCase.ByteStreamReaderEvent)
{
Instance.ByteStreamReaderEventReceived?.Invoke(response.ByteStreamReaderEvent!);
return;
}

// Same treatment for text stream readers — they share
// ReadIncrementalInstructionBase<TContent> with the byte path, so the
// lock added there already protects all state mutations.
if (response.MessageCase == FfiEvent.MessageOneofCase.TextStreamReaderEvent)
{
Instance.TextStreamReaderEventReceived?.Invoke(response.TextStreamReaderEvent!);
return;
}

// One-shot completions registered with dispatchToMainThread:false also bypass the
// main thread. The pending callback's onComplete only mutates volatile
// YieldInstruction fields, so resolving it here saves up to one frame of latency
// on async ops like SetMetadata / UnpublishTrack / stream Read/Write/Close.
var requestAsyncId = ExtractRequestAsyncId(response);
if (requestAsyncId.HasValue && Instance.TrySkipDispatch(requestAsyncId.Value, response))
{
return;
}

// Run on the main thread, the order of execution is guaranteed by Unity
// It uses a Queue internally
Instance._context?.Post(static (resp) =>
Expand Down Expand Up @@ -316,9 +373,6 @@ private static void DispatchEvent(FfiEvent ffiEvent)

switch (ffiEvent.MessageCase)
{
case FfiEvent.MessageOneofCase.Logs:
Utils.HandleLogBatch(ffiEvent.Logs);
break;
case FfiEvent.MessageOneofCase.PublishData:
break;
case FfiEvent.MessageOneofCase.RoomEvent:
Expand All @@ -338,15 +392,6 @@ private static void DispatchEvent(FfiEvent ffiEvent)
case FfiEvent.MessageOneofCase.VideoStreamEvent:
Instance.VideoStreamEventReceived?.Invoke(ffiEvent.VideoStreamEvent!);
break;
case FfiEvent.MessageOneofCase.AudioStreamEvent:
Instance.AudioStreamEventReceived?.Invoke(ffiEvent.AudioStreamEvent!);
break;
case FfiEvent.MessageOneofCase.ByteStreamReaderEvent:
Instance.ByteStreamReaderEventReceived?.Invoke(ffiEvent.ByteStreamReaderEvent!);
break;
case FfiEvent.MessageOneofCase.TextStreamReaderEvent:
Instance.TextStreamReaderEventReceived?.Invoke(ffiEvent.TextStreamReaderEvent!);
break;
case FfiEvent.MessageOneofCase.DataTrackStreamEvent:
Instance.DataTrackStreamEventReceived?.Invoke(ffiEvent.DataTrackStreamEvent!);
break;
Expand All @@ -357,7 +402,7 @@ private static void DispatchEvent(FfiEvent ffiEvent)
}
}

private bool TryDispatchPendingCallback(ulong requestAsyncId, FfiEvent ffiEvent)
internal bool TryDispatchPendingCallback(ulong requestAsyncId, FfiEvent ffiEvent)
{
// Remove-first dispatch is the key race-proofing step.
//
Expand Down Expand Up @@ -385,6 +430,26 @@ private bool TryDispatchPendingCallback(ulong requestAsyncId, FfiEvent ffiEvent)
return false;
}

// Inline-completion fast path for one-shot callbacks whose onComplete only
// mutates volatile YieldInstruction state (no Unity APIs, no user-event
// invocations). Returning true means the caller has been fully handled here
// — no further dispatch needed. Returning false means the caller should fall
// through to its normal main-thread post path. Same race model as
// TryDispatchPendingCallback: the side that wins TryRemove is the only side
// that may invoke the completion.
internal bool TrySkipDispatch(ulong requestAsyncId, FfiEvent ffiEvent)
{
if (!pendingCallbacks.TryGetValue(requestAsyncId, out var pending))
{
return false;
}
if (pending.DispatchToMainThread)
{
return false;
}
return TryDispatchPendingCallback(requestAsyncId, ffiEvent);
}

private static ulong? ExtractRequestAsyncId(FfiEvent ffiEvent)
{
// This switch is only concerned with one-shot async completion callbacks that echo
Expand Down Expand Up @@ -420,6 +485,7 @@ private bool TryDispatchPendingCallback(ulong requestAsyncId, FfiEvent ffiEvent)

private abstract class PendingCallbackBase
{
public abstract bool DispatchToMainThread { get; }
public abstract bool TryComplete(FfiEvent ffiEvent);
public abstract void Cancel();
}
Expand All @@ -429,16 +495,21 @@ private sealed class PendingCallback<TCallback> : PendingCallbackBase where TCal
private readonly Func<FfiEvent, TCallback?> selector;
private readonly Action<TCallback> onComplete;
private readonly Action? onCancel;
private readonly bool dispatchToMainThread;

public override bool DispatchToMainThread => dispatchToMainThread;

public PendingCallback(
Func<FfiEvent, TCallback?> selector,
Action<TCallback> onComplete,
Action? onCancel
Action? onCancel,
bool dispatchToMainThread
)
{
this.selector = selector;
this.onComplete = onComplete;
this.onCancel = onCancel;
this.dispatchToMainThread = dispatchToMainThread;
}

public override bool TryComplete(FfiEvent ffiEvent)
Expand Down
9 changes: 6 additions & 3 deletions Runtime/Scripts/Internal/FfiInstruction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ internal FfiInstruction(
{
IsError = true;
IsDone = true;
});
},
dispatchToMainThread: false);
}
}

Expand Down Expand Up @@ -63,7 +64,8 @@ internal FfiStreamInstruction(
Error = new StreamError("Canceled");
IsError = true;
IsDone = true;
});
},
dispatchToMainThread: false);
}
}

Expand Down Expand Up @@ -116,7 +118,8 @@ internal FfiStreamResultInstruction(
Error = new StreamError("Canceled");
IsError = true;
IsDone = true;
});
},
dispatchToMainThread: false);
}
}
}
31 changes: 23 additions & 8 deletions Runtime/Scripts/Internal/YieldInstruction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,37 @@ namespace LiveKit
{
public class YieldInstruction : CustomYieldInstruction
{
public bool IsDone { protected set; get; }
public bool IsError { protected set; get; }
// Backing fields are volatile because completion may run on the FFI callback
// thread (pending callbacks registered with dispatchToMainThread:false bypass
// the main-thread post). The release semantics of a volatile write ensure any
// state mutated by the completion (Error, ResultValue, etc.) is visible to the
// main thread before it observes IsDone == true.
private volatile bool _isDone;
private volatile bool _isError;

public override bool keepWaiting => !IsDone;
public bool IsDone { get => _isDone; protected set => _isDone = value; }
public bool IsError { get => _isError; protected set => _isError = value; }

public override bool keepWaiting => !_isDone;
}

public class StreamYieldInstruction : CustomYieldInstruction
{
// Volatile so the main-thread coroutine's keepWaiting poll sees writes
// performed by the FFI-thread chunk dispatch (which goes through a lock
// that provides release semantics, but the unlocked reader still needs
// acquire semantics to observe the updated value promptly).
private volatile bool _isEos;
private volatile bool _isCurrentReadDone;

/// <summary>
/// True if the stream has reached the end.
/// </summary>
public bool IsEos { protected set; get; }
public bool IsEos { get => _isEos; protected set => _isEos = value; }

internal bool IsCurrentReadDone { get; set; }
internal bool IsCurrentReadDone { get => _isCurrentReadDone; set => _isCurrentReadDone = value; }

public override bool keepWaiting => !IsCurrentReadDone && !IsEos;
public override bool keepWaiting => !_isCurrentReadDone && !_isEos;

/// <summary>
/// Resets the yield instruction for the next read.
Expand All @@ -30,11 +45,11 @@ public class StreamYieldInstruction : CustomYieldInstruction
/// </remarks>
public override void Reset()
{
if (IsEos)
if (_isEos)
{
throw new InvalidOperationException("Cannot reset after end of stream");
}
IsCurrentReadDone = false;
_isCurrentReadDone = false;
}
}
}
Loading
Loading