Skip to content

Commit ee70835

Browse files
authored
Ensure FFI events are processed sequentially (#547)
* Ensure FFI events are processed sequentially * naming * remove setImmediate * Create healthy-cherries-allow.md
1 parent 4d0a090 commit ee70835

File tree

3 files changed

+280
-236
lines changed

3 files changed

+280
-236
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@livekit/rtc-node": patch
3+
---
4+
5+
Ensure FFI events are processed sequentially

packages/livekit-rtc/src/participant.ts

Lines changed: 48 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,15 @@ export type DataPublishOptions = {
155155
export class LocalParticipant extends Participant {
156156
private rpcHandlers: Map<string, (data: RpcInvocationData) => Promise<string>> = new Map();
157157

158+
private roomEventLock: Mutex;
159+
158160
trackPublications: Map<string, LocalTrackPublication> = new Map();
159161

162+
constructor(info: OwnedParticipant, roomEventLock: Mutex) {
163+
super(info);
164+
this.roomEventLock = roomEventLock;
165+
}
166+
160167
async publishData(data: Uint8Array, options: DataPublishOptions) {
161168
const req = new PublishDataRequest({
162169
localParticipantHandle: this.ffi_handle.handle,
@@ -655,51 +662,62 @@ export class LocalParticipant extends Participant {
655662
options: options,
656663
});
657664

665+
const unlock = await this.roomEventLock.lock();
666+
658667
const res = FfiClient.instance.request<PublishTrackResponse>({
659668
message: { case: 'publishTrack', value: req },
660669
});
661670

662-
const cb = await FfiClient.instance.waitFor<PublishTrackCallback>((ev) => {
663-
return ev.message.case == 'publishTrack' && ev.message.value.asyncId == res.asyncId;
664-
});
671+
try {
672+
const cb = await FfiClient.instance.waitFor<PublishTrackCallback>((ev) => {
673+
return ev.message.case == 'publishTrack' && ev.message.value.asyncId == res.asyncId;
674+
});
665675

666-
switch (cb.message.case) {
667-
case 'publication':
668-
const track_publication = new LocalTrackPublication(cb.message.value!);
669-
track_publication.track = track;
670-
this.trackPublications.set(track_publication.sid!, track_publication);
676+
switch (cb.message.case) {
677+
case 'publication':
678+
const track_publication = new LocalTrackPublication(cb.message.value!);
679+
track_publication.track = track;
680+
this.trackPublications.set(track_publication.sid!, track_publication);
671681

672-
return track_publication;
673-
case 'error':
674-
default:
675-
throw new Error(cb.message.value);
682+
return track_publication;
683+
case 'error':
684+
default:
685+
throw new Error(cb.message.value);
686+
}
687+
} finally {
688+
unlock();
676689
}
677690
}
678691

679692
async unpublishTrack(trackSid: string, stopOnUnpublish?: boolean) {
680-
const req = new UnpublishTrackRequest({
681-
localParticipantHandle: this.ffi_handle.handle,
682-
trackSid: trackSid,
683-
stopOnUnpublish: stopOnUnpublish ?? true,
684-
});
693+
const unlock = await this.roomEventLock.lock();
694+
try {
695+
const req = new UnpublishTrackRequest({
696+
localParticipantHandle: this.ffi_handle.handle,
697+
trackSid: trackSid,
698+
stopOnUnpublish: stopOnUnpublish ?? true,
699+
});
685700

686-
const res = FfiClient.instance.request<UnpublishTrackResponse>({
687-
message: { case: 'unpublishTrack', value: req },
688-
});
701+
const res = FfiClient.instance.request<UnpublishTrackResponse>({
702+
message: { case: 'unpublishTrack', value: req },
703+
});
689704

690-
const cb = await FfiClient.instance.waitFor<UnpublishTrackCallback>((ev) => {
691-
return ev.message.case == 'unpublishTrack' && ev.message.value.asyncId == res.asyncId;
692-
});
705+
const cb = await FfiClient.instance.waitFor<UnpublishTrackCallback>((ev) => {
706+
return ev.message.case == 'unpublishTrack' && ev.message.value.asyncId == res.asyncId;
707+
});
693708

694-
if (cb.error) {
695-
throw new Error(cb.error);
696-
}
709+
if (cb.error) {
710+
throw new Error(cb.error);
711+
}
697712

698-
const pub = this.trackPublications.get(trackSid);
699-
if (pub) {
700-
pub.track = undefined;
713+
const pub = this.trackPublications.get(trackSid);
714+
if (pub) {
715+
pub.track = undefined;
716+
}
717+
this.trackPublications.delete(trackSid);
718+
} finally {
719+
unlock();
701720
}
702-
this.trackPublications.delete(trackSid);
703721
}
704722

705723
/**

0 commit comments

Comments
 (0)