Skip to content

Comments

Data tracks - outgoing manager#1810

Merged
1egoman merged 43 commits intomainfrom
data-track-outgoing-manager
Feb 20, 2026
Merged

Data tracks - outgoing manager#1810
1egoman merged 43 commits intomainfrom
data-track-outgoing-manager

Conversation

@1egoman
Copy link
Contributor

@1egoman 1egoman commented Feb 10, 2026

This pull request introduces a new OutgoingDataTrackManager and the associated support infrastructure like LocalDataTrack to support sending data track packets from the web to the SFU.

…oming, which is incorrect and needs to be fixed!)
Biggest fixed isssue: packetizer implementation payload seeking needed
some updates because DataTrackFrame["payload"] is now Uint8Array, not ArrayBuffer.
ie, like:
try {
  // ...
} catch (e) {
  throw new WrapperErrorType(e);
}
@changeset-bot
Copy link

changeset-bot bot commented Feb 10, 2026

🦋 Changeset detected

Latest commit: 2b70c4c

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 1 package
Name Type
livekit-client Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@github-actions
Copy link
Contributor

github-actions bot commented Feb 10, 2026

size-limit report 📦

Path Size
dist/livekit-client.esm.mjs 86.78 KB (+0.1% 🔺)
dist/livekit-client.umd.js 97.3 KB (+0.17% 🔺)

@1egoman 1egoman marked this pull request as ready for review February 10, 2026 19:09
@1egoman
Copy link
Contributor Author

1egoman commented Feb 10, 2026

@coderabbitai review

Comment on lines 27 to 29
// FIXME: this was introduced by web / there isn't a corresponding case in the rust version.
// Upon further reflection though I think this should exist in rust.
Cancelled = 6,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a new DataTrackPublishErrorReason.Cancelled reason which is used when publishing a data track is aborted via an AbortSignal.

I think this might be potentially a good idea to add into the rust implementation as well, as I could see allowing cancellation of publishing a data track to be a nice behavior to pass up to the caller on the associated data track creation room method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From 1:1 - this will be a web specific case for now, and the rust implicit cancellation behavior will be documented.

Comment on lines 126 to 163
/** Used by attached {@link LocalDataTrack} instances to broadcast data track packets to other
* subscribers.
* @internal
*/
tryProcessAndSend(
handle: DataTrackHandle,
payload: Uint8Array,
options?: { signal?: AbortSignal },
): Throws<
void,
| DataTrackPushFrameError<DataTrackPushFrameErrorReason.Dropped>
| DataTrackPushFrameError<DataTrackPushFrameErrorReason.TrackUnpublished>
> {
const descriptor = this.getDescriptor(handle);
if (descriptor?.type !== 'active') {
throw DataTrackPushFrameError.trackUnpublished();
}

const frame: DataTrackFrame = {
payload,
extensions: new DataTrackExtensions(),
};

try {
for (const packet of descriptor.pipeline.processFrame(frame)) {
this.emit('packetsAvailable', { bytes: packet.toBinary(), signal: options?.signal });
}
} catch (err) {
// FIXME: catch and log errors instead of rethrowing? That is what the rust implementation
// is doing instead.
// process_frame(...).inspect_err(|err| log::debug!("Process failed: {}", err))
// event_out_tx.try_send(...).inspect_err(|err| log::debug!("Cannot send packet to transport: {}", err));
//
// In the rust implementation this "dropped" error means something different (not enough room
// in the track mpsc channel)
throw DataTrackPushFrameError.dropped(err);
}
}
Copy link
Contributor Author

@1egoman 1egoman Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior of this function is a bit different from the rust implementation right now, and I'm unsure exactly how it would be best to standardize this. The closest equivilent rust code is in DataTrack<Local>::try_push here and TrackTask::process_and_send here.

In rust, all the error cases here are logged (see the comments in the above code + the TrackTask::process_and_send link). The dropped error is returned when the frame_tx.try_send(...) in try_push fails.

In js, there is no frame_tx / TrackTask / etc because there isn't a need to run this across multiple processes (io is being handled in a non-blocking fashion and the computational requirements are minimal). So instead I opted to expose this dropped error as a wrapper around the debug logged errors.

I think the behavior should be aligned here, but if I go with the rust implemenattion's approach then I think there would never be a case where dropped gets thrown in the js implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I think the Rust SDK needs to have better error reporting at the call site. Let's discuss this more in our 1:1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From 1:1 -

  • @ladvoc rename dropped -> queueFull or something similar in rust
  • Consider renaming try_push

Comment on lines 44 to 61
@@ -57,24 +56,20 @@ export class DataTrackHandle {
if (raw > U16_MAX_SIZE) {
throw DataTrackHandleError.tooLarge();
}
return new DataTrackHandle(raw);
}

constructor(raw: number) {
this.value = raw;
}
}
return raw;
},
};
Copy link
Contributor Author

@1egoman 1egoman Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may prove controversial, but got rid of DataTrackHandle as a wrapper class and instead defined a type shadowing a const (docs about this) so I could continue to get the fromNumber static method, and could get the documentation benefits of being able to label the type as DataTrackHandle rather than number.

The reason I did it is because I wanted to key a map by a track handle, and I couldn't look up by this given the reference of the wrapper object would change every time it was newly constructed. ie:

class Wrapper { constructor(inner) { this.inner = inner; } }

const map = new Map();
map.set(new Wrapper(5), 123);
map.get(new Wrapper(5)); // -> undefined

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me. Generally, I think it is ok to diverge from the patterns in the Rust implementation if they introduce complications here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

FWIW, this is more of a comment for @lukasIO, last time I used this pattern he wasn't a fan.

Comment on lines 465 to 468
export class Future<T, E extends Error> {
promise: Promise<T>;
promise: Promise<Throws<T, E>>;

resolve?: (arg: T) => void;
Copy link
Contributor Author

@1egoman 1egoman Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to call out explicitly since it's a fairly important change - Future now uses the Throws<...> brand type internally, and anywhere a future.promise is awaited (assuming you are in a function which returns a Throws branded type) will now result in a linting error.

Because of that last limitation (throws-transformer is opt in within functions that return Throws<...>) this is scoped to data tracks code / doesn't cause errors all over the place.

/** How long to wait when attempting to publish before timing out. */
const PUBLISH_TIMEOUT_MILLISECONDS = 10_000;

export default class OutgoingDataTrackManager extends (EventEmitter as new () => TypedEmitter<DataTrackOutgoingManagerCallbacks>) {
Copy link
Contributor Author

@1egoman 1egoman Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I called it OutgoingDataTrackManager to match with the OutgoingDataStreamManager. Also @ladvoc discussed this in a 1:1 and he was good with this name. For reference, the rust implementation (in the livekit-datatrack crate) calls this local::Manager.

I am not sure what the end name should be, but IMO given these two entities have very close to the same behavior, it would be good to align on a name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional context for other reviewers: the only potential downside of this naming convention is the terms "local" and "remote" are still used in the public API (e.g. LocalTrack, RemoteTrack). However, since the track managers are internal components, I don't think this necessarily needs to be consistent. Will plan on renaming in Rust if there are no objections.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both sound fine to me.
I like Outgoing for being very explicit about what it handles and like Local mainly for consistency with the public constructs

Copy link
Contributor

@ladvoc ladvoc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good to me!

/** How long to wait when attempting to publish before timing out. */
const PUBLISH_TIMEOUT_MILLISECONDS = 10_000;

export default class OutgoingDataTrackManager extends (EventEmitter as new () => TypedEmitter<DataTrackOutgoingManagerCallbacks>) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional context for other reviewers: the only potential downside of this naming convention is the terms "local" and "remote" are still used in the public API (e.g. LocalTrack, RemoteTrack). However, since the track managers are internal components, I don't think this necessarily needs to be consistent. Will plan on renaming in Rust if there are no objections.

Comment on lines 44 to 61
@@ -57,24 +56,20 @@ export class DataTrackHandle {
if (raw > U16_MAX_SIZE) {
throw DataTrackHandleError.tooLarge();
}
return new DataTrackHandle(raw);
}

constructor(raw: number) {
this.value = raw;
}
}
return raw;
},
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me. Generally, I think it is ok to diverge from the patterns in the Rust implementation if they introduce complications here.

Comment on lines 126 to 163
/** Used by attached {@link LocalDataTrack} instances to broadcast data track packets to other
* subscribers.
* @internal
*/
tryProcessAndSend(
handle: DataTrackHandle,
payload: Uint8Array,
options?: { signal?: AbortSignal },
): Throws<
void,
| DataTrackPushFrameError<DataTrackPushFrameErrorReason.Dropped>
| DataTrackPushFrameError<DataTrackPushFrameErrorReason.TrackUnpublished>
> {
const descriptor = this.getDescriptor(handle);
if (descriptor?.type !== 'active') {
throw DataTrackPushFrameError.trackUnpublished();
}

const frame: DataTrackFrame = {
payload,
extensions: new DataTrackExtensions(),
};

try {
for (const packet of descriptor.pipeline.processFrame(frame)) {
this.emit('packetsAvailable', { bytes: packet.toBinary(), signal: options?.signal });
}
} catch (err) {
// FIXME: catch and log errors instead of rethrowing? That is what the rust implementation
// is doing instead.
// process_frame(...).inspect_err(|err| log::debug!("Process failed: {}", err))
// event_out_tx.try_send(...).inspect_err(|err| log::debug!("Cannot send packet to transport: {}", err));
//
// In the rust implementation this "dropped" error means something different (not enough room
// in the track mpsc channel)
throw DataTrackPushFrameError.dropped(err);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I think the Rust SDK needs to have better error reporting at the call site. Let's discuss this more in our 1:1.

@1egoman 1egoman merged commit 25418c8 into main Feb 20, 2026
6 checks passed
@1egoman 1egoman deleted the data-track-outgoing-manager branch February 20, 2026 15:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants