Support for large RPC messages using data streams#977
Conversation
|
|
|
The general direction looks good, need a deeper review for possible breaking changes, etc. |
|
@pblazej @hiroshihorie Two things:
|
| /// | ||
| /// Defaults to ``ClientProtocol/v1``, which enables RPC v2 (data-stream-based payloads | ||
| /// with no 15 KB size limit). Generally, it's not recommended to change this. | ||
| public let clientProtocol: ClientProtocol |
There was a problem hiding this comment.
Hm, just thinking out loud: should we even expose this? (applies to protocol as well Generally, it's not recommended to change this.)
There was a problem hiding this comment.
Yea, I had the same thought. Originally clientProtocol wasn't in here but I moved it in here as a secondary step to be consistent with protocol.
Some more thinking out loud: IMO, they both should be in the same place ideally since conceptually they communicate the same type of state. Would removing protocol from here be a breaking change? If not, then these could both become constants (which was what clientProtocol was originally before I moved it in here).
One (maybe) advantage to exposing it could be that you could configure a client in an older clientProtocol version for end to end tests to test unusual scenarios? That's a pretty weak argument though and if it's going to actively confuse users then definitely not a good idea.
If there's no clear best path forward, I'm also open to making protocol/clientProtocol work inconsistently in the short term and making a follow up task for you or somebody else to clean it up.
pblazej
left a comment
There was a problem hiding this comment.
Generally a fan of the split into RpcClientManager/RpcServerManager + clientProtocol negotiation — wire format matches the JS reference (rpc-with-compression-data-streams) byte-for-byte 🎉. There's a P0 build break on Xcode 16.2 (trailing comma) plus a few concurrency bugs around the caller-side pending state (race + potential double-resume on CheckedContinuation); also a 15 KB cap accidentally being applied on the v2 response path. Inline comments below cover those plus a bunch of test-side polish — should be a much tighter v2 once those are addressed.
| } | ||
|
|
||
| extension Room { | ||
| private static let reservedStreamTopics: Set<String> = [ |
There was a problem hiding this comment.
P1: Could we make lk. a global reserved prefix instead of a hand-maintained set? Server convention already documents lk. as LiveKit's namespace, and a single topic.hasPrefix("lk.") check (applied on register and unregister 👇) would cover any future internal topic without piecemeal additions:
static func checkReserved(topic: String) throws {
guard !topic.hasPrefix("lk.") else {
throw LiveKitError(.invalidParameter,
message: "Stream topic prefix 'lk.' is reserved for internal SDK use")
}
}Worth applying to unregisterByteStreamHandler/unregisterTextStreamHandler as well — currently a user can call unregisterTextStreamHandler(for: "lk.rpc_request") and silently disable our internal RPC v2 dispatch.
It is technically a breaking change for users who registered on lk.* topics, but they were against the convention anyway — single CHANGELOG line.
There was a problem hiding this comment.
@pblazej I mentioned this to @lukasIO and he wasn't a fan.
LLM summary of lukas's perspective from our chat:
Breaking change cost is high — that alone is a strong reason not to do it.
Dependencies complicate a hard block — other SDKs that build on the core SDK (e.g., the agents SDK) legitimately register lk.* listeners, so a blanket prohibition doesn't cleanly work.
A suppression mechanism is the wrong fix — letting some callers opt out of the warning/block "because they know what they're doing" doesn't sit right with him.
Prefer a structural solution — if the SDK registers its own lk.* listeners at room construction time, any later user attempt to register the same name would naturally fail through the existing duplicate-registration path, without needing a special rule.
Low priority absent real reports — he asks whether anyone has actually hit this, implying the change isn't worth the cost without evidence of the problem.
I don't think I really have a strong perspective here, I think @pblazej if this is important to you there are some way to do this which could still permit some of the agents sdk usage patterns like warnings rather than errors. Feel free to weigh in more here if desired.
There was a problem hiding this comment.
the LLM's summary sounds a lot harsher than my standing on this feels 👀
And yeah, I think it's mostly important to consider the agents-sdk registering lk topics.
Potentially this could also be other plugins of course that would do this.
There was a problem hiding this comment.
my take on that is: if we gonna break that we should break that forever for all lk.* topics rather than re-breaking for each small feature 😄
It's early Mon morning, but I think LLM is exaggerating that as well 👍
There was a problem hiding this comment.
@pblazej The thing that I am still a bit conflicted on is that while I think it would be good to reserve the lk.* prefix for internal client sdk use, the agents sdk uses this prefix already - and since the agents sdk is indistinguishable to a client sdk from an end user's application, there's not really a good way to leave the interface open that other clients couldn't also take advantage of.
So I think we'd either need some way for a client on registration to be able to indicate it's actually a "first party livekit client" or something (brought this up to lukas last week, he also was not a fan of this), or the filtering has to either be by topic or just outright removed.
Other possible idea for data streams at least - add logic so that internal sdk uses can register a data stream handle via a different way, and which this other mechanism is uses, the data is "tee"d and sent to both the external handler and the other internal sdk feature, so an external consumer can add / remove a handler and no matter what happens they can't break internal sdk features.
| let destination = Participant.Identity(from: "v2-destination") | ||
| try await RpcTestSupport.installV2Remote(in: room, identity: destination) | ||
|
|
||
| // Do nothing in response — let the connection timeout (7s max round-trip) fire |
There was a problem hiding this comment.
P3: Nit: comment says "let the connection timeout (7s max round-trip) fire", but with responseTimeout: 0.05 the outer withThrowingTimeout fires first — the 7 s ack-timeout path isn't actually exercised here. Worth either fixing the comment or splitting into two tests (one for the outer timeout mapping, one for the actual ack-timeout).
|
Btw: the skills from |
…est is send, not before Because of this, a very fast response could come back before the ack was registered leaving a garbage ack gumming up the works.
Ensure that the timeout and the completion can't accidentally fire the same callback multiple times.
… payloads, NOT v2 rpc requests
…onnect Otherwise, it is possible for messages to be dropped since there is a window of time during the Room construction where the rpc client / server managers aren't yet fully initialized, and a connect could occur during this period.
1. reservedPrefixRejectedOnRegisterAndUnregister — asserts that both register{Text,Byte}StreamHandler and unregister{Text,Byte}StreamHandler throw LiveKitError on any lk.* topic, closing the previously-flagged gap where users could silently disable internal RPC v2 dispatch by unregistering lk.rpc_request.
2. performRpcCleansUpOnCancellation — starts a performRpc that hangs (no mock response), waits for pending state to register (pendingCount == 1), cancels the awaiting Task, asserts the call throws and that pending state is cleared (pendingCount == 0).
3. performRpcConcurrentRequestsHaveDistinctIds — runs 5 concurrent performRpc calls to the same destination, asserts each gets a distinct requestId
(Set(observedIds).count == 5) and that no pending state leaks afterward.
4. v2ResponseStreamFromWrongSenderIsIgnored — installs a v2 remote, performs an RPC, then injects a v2 response stream from a different identity (v2-imposter). Asserts the spoofed response is ignored and the call ultimately times out with connectionTimeout rather than resolving with the spoofed payload.
|
Solid second pass — P0/P1 all closed with deterministic tests, the A handful of polish items still outstanding: P2:
P3: parameterize the v2 handler error tests, Bigger thinking-out-loud item — unit vs e2e: the v2 suite has grown a lot of mock-side plumbing ( |
…PassthroughViaPacket into a single parameterized test
|
@1egoman I don't see anything "unresolved" atm, let me take a final read and resolve the conflicts 🌮 BTW any manual (x-platform) tests worth performing now? |
@pblazej A few platform combinations things that come to mind: 1) old client-sdk-js (ie, current npm release) <-> new swift, 2) new client-sdk-js (ie, livekit/client-sdk-js#1832) <-> new swift. Going through all the happy path cases in the spec document I think are worthwhile at a minimum. |
See this pull request for more info: livekit/client-sdk-js#1832
@pblazej Sent you a slack message about this with some of the context!