Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
07ae04b
feat: make improvements to how error generics are parsed
1egoman Feb 10, 2026
c5fdaa0
feat: make more modifications to throws-transformer to work with Erro…
1egoman Feb 10, 2026
2923ec2
fix: get rid of collapsing union function in throws-transformer
1egoman Feb 10, 2026
65cba12
feat: migrate whole data tracks implementation to get rid of reasons …
1egoman Feb 10, 2026
45b9ad5
feat: remove signal option from tryPush / tryProcessAndSend
1egoman Feb 11, 2026
85ee46e
feat: add some better handling of new Promise constructor / catch cha…
1egoman Feb 11, 2026
27ae3d0
fix: run npm run format
1egoman Feb 11, 2026
27105a8
refactor: break up data-track/track.ts into LocalDataTrack / types files
1egoman Feb 12, 2026
278a473
feat: add initial IncomingDataTrackManager implementation
1egoman Feb 12, 2026
4487bbe
feat: move RemoteDataTrack into its own file
1egoman Feb 13, 2026
85c5355
fix: address throw transform warnings
1egoman Feb 13, 2026
6dce9ec
fix: run npm run format and lint
1egoman Feb 13, 2026
e2323b7
fix: remove unused DataTrackPacketizerReason import
1egoman Feb 13, 2026
3878e4d
feat: apply same rethrow pattern as try_push to break the Throws erro…
1egoman Feb 17, 2026
e3c149a
feat: add first pass of track interfaces (idea from from lukas 1:1)
1egoman Feb 17, 2026
aa940eb
feat: add first part of tests for IncomingDataTrackManager
1egoman Feb 17, 2026
25de921
feat: add more tests and bugfixes to IncomingDataTrackManager to bett…
1egoman Feb 17, 2026
fd9a709
feat: add e2ee subscription test
1egoman Feb 17, 2026
725008a
fix: run npm run format
1egoman Feb 17, 2026
b09d811
refactor: rename methods / add docs comments
1egoman Feb 18, 2026
340846a
refactor: move incoming pipeline into its own file
1egoman Feb 20, 2026
6a6b2cb
refactor: get rid of unused DataTrackStreamReader
1egoman Feb 20, 2026
c7e6833
refactor: break errors out into separate file
1egoman Feb 20, 2026
1f3c6df
fix: clean up residual dead code left from rust port
1egoman Feb 20, 2026
36cc0cb
fix: delete dead code accidentally commited from development of new t…
1egoman Feb 20, 2026
8df9531
fix: add missing changeset file
1egoman Feb 20, 2026
6e287d0
fix: rework RemoteDataTrack signature to be better for future extension
1egoman Feb 20, 2026
498e181
feat: add new trackUnavailable event after jacob 1:1
1egoman Feb 20, 2026
df28392
feat: import incoming data track manager so linting / etc runs on it
1egoman Feb 20, 2026
7e0ec3b
fix: run format and lint
1egoman Feb 20, 2026
bbcbbd6
feat: update localitySymbol -> isLocal and add ITrack
1egoman Feb 20, 2026
f40f73b
fix: remove changeset based on lukas suggestion
1egoman Feb 23, 2026
9f72217
feat: remove exports for new track interface helpers
1egoman Feb 23, 2026
d7d0908
feat: convert to using abort signal polyfills
1egoman Feb 25, 2026
d32d628
feat: handle immediate abortsignal abort case in OutgoingDataTrackMan…
1egoman Feb 25, 2026
e3dccea
feat: alter the incoming manager subscription logic to fix some subtl…
1egoman Feb 25, 2026
d848d81
fix: run npm run format
1egoman Feb 25, 2026
710f20b
feat: add remote participant disconnected handler to terminate in fli…
1egoman Feb 25, 2026
5ff3c2e
fix: add omitted trackAvailable event to tests
1egoman Feb 25, 2026
6cbf73e
feat: add tests for remote participant disconnects cleaning up remote…
1egoman Feb 25, 2026
a9718cf
fix: run npm run format
1egoman Feb 25, 2026
4c7b39c
fix: clear descriptors after calling shutdown
1egoman Feb 25, 2026
6bdd819
feat: add explicit queuing strategy to IncomingDataTrackManager Reada…
1egoman Feb 25, 2026
edf51bf
fix: address comment typo
1egoman Feb 25, 2026
815e291
feat: add more data track publication tests
1egoman Feb 25, 2026
c5a8d77
feat: update default high water mark threshold for incoming `Readable…
1egoman Mar 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import type { ReconnectContext, ReconnectPolicy } from './room/ReconnectPolicy';
import Room, { ConnectionState, type RoomEventCallbacks } from './room/Room';
import * as attributes from './room/attribute-typings';
// FIXME: remove this import in a follow up data track pull request.
import './room/data-track/depacketizer';
import './room/data-track/incoming/IncomingDataTrackManager';
// FIXME: remove this import in a follow up data track pull request.
import './room/data-track/outgoing/OutgoingDataTrackManager';
import LocalParticipant from './room/participant/LocalParticipant';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
import type { DataTrackFrame } from './frame';
import { type DataTrackHandle } from './handle';
import type OutgoingDataTrackManager from './outgoing/OutgoingDataTrackManager';
import {
DataTrackSymbol,
type IDataTrack,
type ILocalTrack,
TrackSymbol,
} from './track-interfaces';
import type { DataTrackInfo } from './types';

export type DataTrackSid = string;
export default class LocalDataTrack implements ILocalTrack, IDataTrack {
readonly trackSymbol = TrackSymbol;

/** Information about a published data track. */
export type DataTrackInfo = {
sid: DataTrackSid;
pubHandle: DataTrackHandle;
name: String;
usesE2ee: boolean;
};
readonly isLocal = true;

readonly typeSymbol = DataTrackSymbol;

export class LocalDataTrack {
info: DataTrackInfo;

protected manager: OutgoingDataTrackManager;

/** @internal */
constructor(info: DataTrackInfo, manager: OutgoingDataTrackManager) {
this.info = info;
this.manager = manager;
Expand Down
82 changes: 82 additions & 0 deletions src/room/data-track/RemoteDataTrack.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import type Participant from '../participant/Participant';
import type { DataTrackFrame } from './frame';
import type IncomingDataTrackManager from './incoming/IncomingDataTrackManager';
import {
DataTrackSymbol,
type IDataTrack,
type IRemoteTrack,
TrackSymbol,
} from './track-interfaces';
import { type DataTrackInfo } from './types';

type RemoteDataTrackOptions = {
publisherIdentity: Participant['identity'];
};

export type RemoteDataTrackSubscribeOptions = {
signal?: AbortSignal;

/** The number of {@link DataTrackFrame}s to hold in the ReadableStream before disgarding extra
* frames. Defaults to 4, but this may not be good enough for especially high frequency data. */
highWaterMark?: number;
};

export default class RemoteDataTrack implements IRemoteTrack, IDataTrack {
readonly trackSymbol = TrackSymbol;

readonly isLocal = false;

readonly typeSymbol = DataTrackSymbol;

info: DataTrackInfo;

publisherIdentity: Participant['identity'];

protected manager: IncomingDataTrackManager;

/** @internal */
constructor(
info: DataTrackInfo,
manager: IncomingDataTrackManager,
options: RemoteDataTrackOptions,
) {
this.info = info;
this.manager = manager;
this.publisherIdentity = options.publisherIdentity;
}

/** Subscribes to the data track to receive frames.
*
* # Returns
*
* A stream that yields {@link DataTrackFrame}s as they arrive.
*
* # Multiple Subscriptions
*
* An application may call `subscribe` more than once to process frames in
* multiple places. For example, one async task might plot values on a graph
* while another writes them to a file.
*
* Internally, only the first call to `subscribe` communicates with the SFU and
* allocates the resources required to receive frames. Additional subscriptions
* reuse the same underlying pipeline and do not trigger additional signaling.
*
* Note that newly created subscriptions only receive frames published after
* the initial subscription is established.
*/
async subscribe(
options?: RemoteDataTrackSubscribeOptions,
): Promise<ReadableStream<DataTrackFrame>> {
try {
const stream = await this.manager.subscribeRequest(
this.info.sid,
options?.signal,
options?.highWaterMark,
);
return stream;
} catch (err) {
// NOTE: Rethrow errors to break Throws<...> type boundary
throw err;
}
}
}
18 changes: 3 additions & 15 deletions src/room/data-track/depacketizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type PartialFrame = {

/** An error indicating a frame was dropped. */
export class DataTrackDepacketizerDropError<
Reason extends DataTrackDepacketizerDropReason,
Reason extends DataTrackDepacketizerDropReason = DataTrackDepacketizerDropReason,
> extends LivekitReasonedError<Reason> {
readonly name = 'DataTrackDepacketizerDropError';

Expand Down Expand Up @@ -99,13 +99,7 @@ export default class DataTrackDepacketizer {
push(
packet: DataTrackPacket,
options?: PushOptions,
): Throws<
DataTrackFrame | null,
| DataTrackDepacketizerDropError<DataTrackDepacketizerDropReason.Interrupted>
| DataTrackDepacketizerDropError<DataTrackDepacketizerDropReason.BufferFull>
| DataTrackDepacketizerDropError<DataTrackDepacketizerDropReason.UnknownFrame>
| DataTrackDepacketizerDropError<DataTrackDepacketizerDropReason.Incomplete>
> {
): Throws<DataTrackFrame | null, DataTrackDepacketizerDropError> {
switch (packet.header.marker) {
case FrameMarker.Single:
return this.frameFromSingle(packet, options);
Expand Down Expand Up @@ -191,13 +185,7 @@ export default class DataTrackDepacketizer {
/** Push to the existing partial frame. */
private pushToPartial(
packet: DataTrackPacket,
): Throws<
DataTrackFrame | null,
| DataTrackDepacketizerDropError<DataTrackDepacketizerDropReason.Interrupted>
| DataTrackDepacketizerDropError<DataTrackDepacketizerDropReason.UnknownFrame>
| DataTrackDepacketizerDropError<DataTrackDepacketizerDropReason.BufferFull>
| DataTrackDepacketizerDropError<DataTrackDepacketizerDropReason.Incomplete>
> {
): Throws<DataTrackFrame | null, DataTrackDepacketizerDropError> {
if (packet.header.marker !== FrameMarker.Inter && packet.header.marker !== FrameMarker.Final) {
// @throws-transformer ignore - this should be treated as a "panic" and not be caught
throw new Error(
Expand Down
3 changes: 2 additions & 1 deletion src/room/data-track/e2ee.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ export type EncryptionProvider = {
};

export type DecryptionProvider = {
decrypt(payload: Uint8Array, senderIdentity: string): Uint8Array;
// FIXME: add in explicit `Throws<..., DecryptionError>`?
decrypt(payload: EncryptedPayload, senderIdentity: string): Uint8Array;
};
8 changes: 1 addition & 7 deletions src/room/data-track/handle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,7 @@ export class DataTrackHandleError<

export type DataTrackHandle = number;
export const DataTrackHandle = {
fromNumber(
raw: number,
): Throws<
DataTrackHandle,
| DataTrackHandleError<DataTrackHandleErrorReason.TooLarge>
| DataTrackHandleError<DataTrackHandleErrorReason.Reserved>
> {
fromNumber(raw: number): Throws<DataTrackHandle, DataTrackHandleError> {
if (raw === 0) {
throw DataTrackHandleError.reserved(raw);
}
Expand Down
Loading
Loading