Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/healthy-cherries-allow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/rtc-node": patch
---

Ensure FFI events are processed sequentially
78 changes: 48 additions & 30 deletions packages/livekit-rtc/src/participant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,15 @@ export type DataPublishOptions = {
export class LocalParticipant extends Participant {
private rpcHandlers: Map<string, (data: RpcInvocationData) => Promise<string>> = new Map();

private roomEventLock: Mutex;

trackPublications: Map<string, LocalTrackPublication> = new Map();

constructor(info: OwnedParticipant, roomEventLock: Mutex) {
super(info);
this.roomEventLock = roomEventLock;
}

async publishData(data: Uint8Array, options: DataPublishOptions) {
const req = new PublishDataRequest({
localParticipantHandle: this.ffi_handle.handle,
Expand Down Expand Up @@ -655,51 +662,62 @@ export class LocalParticipant extends Participant {
options: options,
});

const unlock = await this.roomEventLock.lock();

const res = FfiClient.instance.request<PublishTrackResponse>({
message: { case: 'publishTrack', value: req },
});

const cb = await FfiClient.instance.waitFor<PublishTrackCallback>((ev) => {
return ev.message.case == 'publishTrack' && ev.message.value.asyncId == res.asyncId;
});
try {
const cb = await FfiClient.instance.waitFor<PublishTrackCallback>((ev) => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought(non-blocking)
curiously, does FfiClient.instance.request guarantee that it will trigger a PublishTrackCallback later ?

There is no need to change it now, but just an idea, I wonder if it makes sense to do things like

  1. prepare a waiter
  2. roomEventLock.lock()
  3. FfiClient.instance.request
  4. waiter.setAsyncId(res.asyncId)
  5. waiter will wait with some timeout, like await waiter.wait({ timeoutMs: 50 }); // not sure if that will bring more flakiness or less here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curiously, does FfiClient.instance.request guarantee that it will trigger a PublishTrackCallback later ?

I think so, yes. Not sure what the actual guarantees are though or rather how they are enforced.

return ev.message.case == 'publishTrack' && ev.message.value.asyncId == res.asyncId;
});

switch (cb.message.case) {
case 'publication':
const track_publication = new LocalTrackPublication(cb.message.value!);
track_publication.track = track;
this.trackPublications.set(track_publication.sid!, track_publication);
switch (cb.message.case) {
case 'publication':
const track_publication = new LocalTrackPublication(cb.message.value!);
track_publication.track = track;
this.trackPublications.set(track_publication.sid!, track_publication);

return track_publication;
case 'error':
default:
throw new Error(cb.message.value);
return track_publication;
case 'error':
default:
throw new Error(cb.message.value);
}
} finally {
unlock();
}
}

async unpublishTrack(trackSid: string, stopOnUnpublish?: boolean) {
const req = new UnpublishTrackRequest({
localParticipantHandle: this.ffi_handle.handle,
trackSid: trackSid,
stopOnUnpublish: stopOnUnpublish ?? true,
});
const unlock = await this.roomEventLock.lock();
try {
const req = new UnpublishTrackRequest({
localParticipantHandle: this.ffi_handle.handle,
trackSid: trackSid,
stopOnUnpublish: stopOnUnpublish ?? true,
});

const res = FfiClient.instance.request<UnpublishTrackResponse>({
message: { case: 'unpublishTrack', value: req },
});
const res = FfiClient.instance.request<UnpublishTrackResponse>({
message: { case: 'unpublishTrack', value: req },
});

const cb = await FfiClient.instance.waitFor<UnpublishTrackCallback>((ev) => {
return ev.message.case == 'unpublishTrack' && ev.message.value.asyncId == res.asyncId;
});
const cb = await FfiClient.instance.waitFor<UnpublishTrackCallback>((ev) => {
return ev.message.case == 'unpublishTrack' && ev.message.value.asyncId == res.asyncId;
});

if (cb.error) {
throw new Error(cb.error);
}
if (cb.error) {
throw new Error(cb.error);
}

const pub = this.trackPublications.get(trackSid);
if (pub) {
pub.track = undefined;
const pub = this.trackPublications.get(trackSid);
if (pub) {
pub.track = undefined;
}
this.trackPublications.delete(trackSid);
} finally {
unlock();
}
this.trackPublications.delete(trackSid);
}

/**
Expand Down
Loading
Loading