diff --git a/.changeset/healthy-cherries-allow.md b/.changeset/healthy-cherries-allow.md new file mode 100644 index 00000000..3e432267 --- /dev/null +++ b/.changeset/healthy-cherries-allow.md @@ -0,0 +1,5 @@ +--- +"@livekit/rtc-node": patch +--- + +Ensure FFI events are processed sequentially diff --git a/packages/livekit-rtc/src/participant.ts b/packages/livekit-rtc/src/participant.ts index bfff8abe..be12ea96 100644 --- a/packages/livekit-rtc/src/participant.ts +++ b/packages/livekit-rtc/src/participant.ts @@ -155,8 +155,15 @@ export type DataPublishOptions = { export class LocalParticipant extends Participant { private rpcHandlers: Map Promise> = new Map(); + private roomEventLock: Mutex; + trackPublications: Map = new Map(); + constructor(info: OwnedParticipant, roomEventLock: Mutex) { + super(info); + this.roomEventLock = roomEventLock; + } + async publishData(data: Uint8Array, options: DataPublishOptions) { const req = new PublishDataRequest({ localParticipantHandle: this.ffi_handle.handle, @@ -655,51 +662,62 @@ export class LocalParticipant extends Participant { options: options, }); + const unlock = await this.roomEventLock.lock(); + const res = FfiClient.instance.request({ message: { case: 'publishTrack', value: req }, }); - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case == 'publishTrack' && ev.message.value.asyncId == res.asyncId; - }); + try { + const cb = await FfiClient.instance.waitFor((ev) => { + return ev.message.case == 'publishTrack' && ev.message.value.asyncId == res.asyncId; + }); - switch (cb.message.case) { - case 'publication': - const track_publication = new LocalTrackPublication(cb.message.value!); - track_publication.track = track; - this.trackPublications.set(track_publication.sid!, track_publication); + switch (cb.message.case) { + case 'publication': + const track_publication = new LocalTrackPublication(cb.message.value!); + track_publication.track = track; + this.trackPublications.set(track_publication.sid!, track_publication); - return track_publication; - case 'error': - default: - throw new Error(cb.message.value); + return track_publication; + case 'error': + default: + throw new Error(cb.message.value); + } + } finally { + unlock(); } } async unpublishTrack(trackSid: string, stopOnUnpublish?: boolean) { - const req = new UnpublishTrackRequest({ - localParticipantHandle: this.ffi_handle.handle, - trackSid: trackSid, - stopOnUnpublish: stopOnUnpublish ?? true, - }); + const unlock = await this.roomEventLock.lock(); + try { + const req = new UnpublishTrackRequest({ + localParticipantHandle: this.ffi_handle.handle, + trackSid: trackSid, + stopOnUnpublish: stopOnUnpublish ?? true, + }); - const res = FfiClient.instance.request({ - message: { case: 'unpublishTrack', value: req }, - }); + const res = FfiClient.instance.request({ + message: { case: 'unpublishTrack', value: req }, + }); - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case == 'unpublishTrack' && ev.message.value.asyncId == res.asyncId; - }); + const cb = await FfiClient.instance.waitFor((ev) => { + return ev.message.case == 'unpublishTrack' && ev.message.value.asyncId == res.asyncId; + }); - if (cb.error) { - throw new Error(cb.error); - } + if (cb.error) { + throw new Error(cb.error); + } - const pub = this.trackPublications.get(trackSid); - if (pub) { - pub.track = undefined; + const pub = this.trackPublications.get(trackSid); + if (pub) { + pub.track = undefined; + } + this.trackPublications.delete(trackSid); + } finally { + unlock(); } - this.trackPublications.delete(trackSid); } /** diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index a9721b26..24ce9a27 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { Mutex } from '@livekit/mutex'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import EventEmitter from 'events'; import { ByteStreamReader, TextStreamReader } from './data_streams/stream_reader.js'; @@ -76,6 +77,12 @@ export const defaultRoomOptions = new FfiRoomOptions({ export class Room extends (EventEmitter as new () => TypedEmitter) { private info?: RoomInfo; private ffiHandle?: FfiHandle; + /** + * used to ensure events are processed sequentially and allow for + * the local participant to acquire the lock while doing state updates related to FFI events + * before processing the next events + */ + private roomEventLock = new Mutex(); private byteStreamControllers = new Map>(); private textStreamControllers = new Map>(); @@ -204,7 +211,10 @@ export class Room extends (EventEmitter as new () => TypedEmitter this.info = cb.message.value.room!.info; this.connectionState = ConnectionState.CONN_CONNECTED; - this.localParticipant = new LocalParticipant(cb.message.value.localParticipant!); + this.localParticipant = new LocalParticipant( + cb.message.value.localParticipant!, + this.roomEventLock, + ); for (const pt of cb.message.value.participants) { const rp = this.createRemoteParticipant(pt.participant!); @@ -279,7 +289,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter this.byteStreamHandlers.delete(topic); } - private onFfiEvent = (ffiEvent: FfiEvent) => { + private onFfiEvent = async (ffiEvent: FfiEvent) => { if (!this.localParticipant || !this.ffiHandle || !this.info) { this.preConnectEvents.push(ffiEvent); return; @@ -287,234 +297,245 @@ export class Room extends (EventEmitter as new () => TypedEmitter // process preConnectEvents if we received the connectCallback after the events were queued for (const ev of this.preConnectEvents) { - this.processFfiEvent(ev); + await this.processFfiEvent(ev); } this.preConnectEvents = []; - this.processFfiEvent(ffiEvent); + await this.processFfiEvent(ffiEvent); }; - private processFfiEvent = (ffiEvent: FfiEvent) => { + private processFfiEvent = async (ffiEvent: FfiEvent) => { if (!this.localParticipant || !this.ffiHandle || !this.info) { throw new Error('processFfiEvent called before connect'); } - if (ffiEvent.message.case == 'rpcMethodInvocation') { - if ( - ffiEvent.message.value.localParticipantHandle == this.localParticipant.ffi_handle.handle + const unlock = await this.roomEventLock.lock(); + + try { + if (ffiEvent.message.case == 'rpcMethodInvocation') { + if ( + ffiEvent.message.value.localParticipantHandle == this.localParticipant.ffi_handle.handle + ) { + this.localParticipant.handleRpcMethodInvocation( + ffiEvent.message.value.invocationId!, + ffiEvent.message.value.method!, + ffiEvent.message.value.requestId!, + ffiEvent.message.value.callerIdentity!, + ffiEvent.message.value.payload!, + ffiEvent.message.value.responseTimeoutMs!, + ); + } + return; + } else if ( + ffiEvent.message.case != 'roomEvent' || + ffiEvent.message.value.roomHandle != this.ffiHandle.handle ) { - this.localParticipant.handleRpcMethodInvocation( - ffiEvent.message.value.invocationId!, - ffiEvent.message.value.method!, - ffiEvent.message.value.requestId!, - ffiEvent.message.value.callerIdentity!, - ffiEvent.message.value.payload!, - ffiEvent.message.value.responseTimeoutMs!, - ); + return; } - return; - } else if ( - ffiEvent.message.case != 'roomEvent' || - ffiEvent.message.value.roomHandle != this.ffiHandle.handle - ) { - return; - } - const ev = ffiEvent.message.value.message; - if (process.env.LIVEKIT_DEBUG_LOG_ROOM_EVENTS) { - console.log('Room event:', ev); - } - if (ev.case == 'participantConnected') { - const participant = this.createRemoteParticipant(ev.value.info!); - this.remoteParticipants.set(participant.identity!, participant); - this.emit(RoomEvent.ParticipantConnected, participant); - } else if (ev.case == 'participantDisconnected') { - const participant = this.remoteParticipants.get(ev.value.participantIdentity!); - this.remoteParticipants.delete(participant!.identity); - participant!.info.disconnectReason = ev.value.disconnectReason; - this.emit(RoomEvent.ParticipantDisconnected, participant!); - } else if (ev.case == 'localTrackPublished') { - const publication = this.localParticipant!.trackPublications.get(ev.value.trackSid!); - this.emit(RoomEvent.LocalTrackPublished, publication!, this.localParticipant!); - } else if (ev.case == 'localTrackUnpublished') { - const publication = this.localParticipant!.trackPublications.get(ev.value.publicationSid!); - this.localParticipant!.trackPublications.delete(ev.value.publicationSid!); - this.emit(RoomEvent.LocalTrackUnpublished, publication!, this.localParticipant!); - } else if (ev.case == 'localTrackSubscribed') { - const publication = this.localParticipant!.trackPublications.get(ev.value.trackSid!); - publication!.resolveFirstSubscription(); - this.emit(RoomEvent.LocalTrackSubscribed, publication!.track!); - } else if (ev.case == 'trackPublished') { - const participant = this.remoteParticipants.get(ev.value.participantIdentity!); - const publication = new RemoteTrackPublication(ev.value.publication!); - participant!.trackPublications.set(publication.sid!, publication); - this.emit(RoomEvent.TrackPublished, publication, participant!); - } else if (ev.case == 'trackUnpublished') { - const participant = this.requireRemoteParticipant(ev.value.participantIdentity!); - const publication = participant.trackPublications.get(ev.value.publicationSid!); - participant.trackPublications.delete(ev.value.publicationSid!); - if (publication) { - this.emit(RoomEvent.TrackUnpublished, publication, participant); - } - } else if (ev.case == 'trackSubscribed') { - const ownedTrack = ev.value.track!; - const trackInfo = ownedTrack.info!; - const { participant, publication } = this.requirePublicationOfRemoteParticipant( - ev.value.participantIdentity!, - trackInfo.sid!, - ); - publication.subscribed = true; - if (trackInfo.kind == TrackKind.KIND_VIDEO) { - publication.track = new RemoteVideoTrack(ownedTrack); - } else if (trackInfo.kind == TrackKind.KIND_AUDIO) { - publication.track = new RemoteAudioTrack(ownedTrack); + const ev = ffiEvent.message.value.message; + if (process.env.LIVEKIT_DEBUG_LOG_ROOM_EVENTS) { + console.log('Room event:', ev); } + if (ev.case == 'participantConnected') { + const participant = this.createRemoteParticipant(ev.value.info!); + this.remoteParticipants.set(participant.identity!, participant); + this.emit(RoomEvent.ParticipantConnected, participant); + } else if (ev.case == 'participantDisconnected') { + const participant = this.remoteParticipants.get(ev.value.participantIdentity!); + this.remoteParticipants.delete(participant!.identity); + participant!.info.disconnectReason = ev.value.disconnectReason; + this.emit(RoomEvent.ParticipantDisconnected, participant!); + } else if (ev.case == 'localTrackPublished') { + const publication = this.localParticipant!.trackPublications.get(ev.value.trackSid!); + this.emit(RoomEvent.LocalTrackPublished, publication!, this.localParticipant!); + } else if (ev.case == 'localTrackUnpublished') { + const publication = this.localParticipant!.trackPublications.get(ev.value.publicationSid!); + this.localParticipant!.trackPublications.delete(ev.value.publicationSid!); + this.emit(RoomEvent.LocalTrackUnpublished, publication!, this.localParticipant!); + } else if (ev.case == 'localTrackSubscribed') { + const publication = this.localParticipant!.trackPublications.get(ev.value.trackSid!); + publication!.resolveFirstSubscription(); + this.emit(RoomEvent.LocalTrackSubscribed, publication!.track!); + } else if (ev.case == 'trackPublished') { + const participant = this.remoteParticipants.get(ev.value.participantIdentity!); + const publication = new RemoteTrackPublication(ev.value.publication!); + participant!.trackPublications.set(publication.sid!, publication); + this.emit(RoomEvent.TrackPublished, publication, participant!); + } else if (ev.case == 'trackUnpublished') { + const participant = this.requireRemoteParticipant(ev.value.participantIdentity!); + const publication = participant.trackPublications.get(ev.value.publicationSid!); + participant.trackPublications.delete(ev.value.publicationSid!); + if (publication) { + this.emit(RoomEvent.TrackUnpublished, publication, participant); + } + } else if (ev.case == 'trackSubscribed') { + const ownedTrack = ev.value.track!; + const trackInfo = ownedTrack.info!; + const { participant, publication } = this.requirePublicationOfRemoteParticipant( + ev.value.participantIdentity!, + trackInfo.sid!, + ); + publication.subscribed = true; + if (trackInfo.kind == TrackKind.KIND_VIDEO) { + publication.track = new RemoteVideoTrack(ownedTrack); + } else if (trackInfo.kind == TrackKind.KIND_AUDIO) { + publication.track = new RemoteAudioTrack(ownedTrack); + } - this.emit(RoomEvent.TrackSubscribed, publication.track!, publication, participant); - } else if (ev.case == 'trackUnsubscribed') { - const { participant, publication } = this.requirePublicationOfRemoteParticipant( - ev.value.participantIdentity!, - ev.value.trackSid!, - ); - const track = publication.track!; - publication.track = undefined; - publication.subscribed = false; - this.emit(RoomEvent.TrackUnsubscribed, track, publication, participant); - } else if (ev.case == 'trackSubscriptionFailed') { - const participant = this.requireRemoteParticipant(ev.value.participantIdentity!); - this.emit(RoomEvent.TrackSubscriptionFailed, ev.value.trackSid!, participant, ev.value.error); - } else if (ev.case == 'trackMuted') { - const { participant, publication } = this.requirePublicationOfParticipant( - ev.value.participantIdentity!, - ev.value.trackSid!, - ); - publication.info!.muted = true; - if (publication.track) { - publication.track.info!.muted = true; - } - this.emit(RoomEvent.TrackMuted, publication, participant); - } else if (ev.case == 'trackUnmuted') { - const { participant, publication } = this.requirePublicationOfParticipant( - ev.value.participantIdentity!, - ev.value.trackSid!, - ); - publication.info!.muted = false; - if (publication.track) { - publication.track.info!.muted = false; - } - this.emit(RoomEvent.TrackUnmuted, publication, participant); - } else if (ev.case == 'activeSpeakersChanged') { - const activeSpeakers = ev.value.participantIdentities.map((identity) => - this.requireParticipantByIdentity(identity), - ); - this.emit(RoomEvent.ActiveSpeakersChanged, activeSpeakers); - } else if (ev.case == 'roomMetadataChanged') { - this.info!.metadata = ev.value.metadata; - this.emit(RoomEvent.RoomMetadataChanged, this.info!.metadata!); - } else if (ev.case == 'participantMetadataChanged') { - const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!); - participant.info.metadata = ev.value.metadata; - this.emit(RoomEvent.ParticipantMetadataChanged, participant.metadata, participant); - } else if (ev.case == 'participantNameChanged') { - const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!); - participant.info.name = ev.value.name; - this.emit(RoomEvent.ParticipantNameChanged, participant.name!, participant); - } else if (ev.case == 'participantAttributesChanged') { - const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!); - participant.info.attributes = ev.value.attributes.reduce( - (acc, value) => { - acc[value.key!] = value.value!; - return acc; - }, - {} as Record, - ); - if (Object.keys(ev.value.changedAttributes).length > 0) { - const changedAttributes = ev.value.changedAttributes.reduce( + this.emit(RoomEvent.TrackSubscribed, publication.track!, publication, participant); + } else if (ev.case == 'trackUnsubscribed') { + const { participant, publication } = this.requirePublicationOfRemoteParticipant( + ev.value.participantIdentity!, + ev.value.trackSid!, + ); + const track = publication.track!; + publication.track = undefined; + publication.subscribed = false; + this.emit(RoomEvent.TrackUnsubscribed, track, publication, participant); + } else if (ev.case == 'trackSubscriptionFailed') { + const participant = this.requireRemoteParticipant(ev.value.participantIdentity!); + this.emit( + RoomEvent.TrackSubscriptionFailed, + ev.value.trackSid!, + participant, + ev.value.error, + ); + } else if (ev.case == 'trackMuted') { + const { participant, publication } = this.requirePublicationOfParticipant( + ev.value.participantIdentity!, + ev.value.trackSid!, + ); + publication.info!.muted = true; + if (publication.track) { + publication.track.info!.muted = true; + } + this.emit(RoomEvent.TrackMuted, publication, participant); + } else if (ev.case == 'trackUnmuted') { + const { participant, publication } = this.requirePublicationOfParticipant( + ev.value.participantIdentity!, + ev.value.trackSid!, + ); + publication.info!.muted = false; + if (publication.track) { + publication.track.info!.muted = false; + } + this.emit(RoomEvent.TrackUnmuted, publication, participant); + } else if (ev.case == 'activeSpeakersChanged') { + const activeSpeakers = ev.value.participantIdentities.map((identity) => + this.requireParticipantByIdentity(identity), + ); + this.emit(RoomEvent.ActiveSpeakersChanged, activeSpeakers); + } else if (ev.case == 'roomMetadataChanged') { + this.info!.metadata = ev.value.metadata; + this.emit(RoomEvent.RoomMetadataChanged, this.info!.metadata!); + } else if (ev.case == 'participantMetadataChanged') { + const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!); + participant.info.metadata = ev.value.metadata; + this.emit(RoomEvent.ParticipantMetadataChanged, participant.metadata, participant); + } else if (ev.case == 'participantNameChanged') { + const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!); + participant.info.name = ev.value.name; + this.emit(RoomEvent.ParticipantNameChanged, participant.name!, participant); + } else if (ev.case == 'participantAttributesChanged') { + const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!); + participant.info.attributes = ev.value.attributes.reduce( (acc, value) => { acc[value.key!] = value.value!; return acc; }, {} as Record, ); - this.emit(RoomEvent.ParticipantAttributesChanged, changedAttributes, participant); - } - } else if (ev.case == 'connectionQualityChanged') { - const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!); - this.emit(RoomEvent.ConnectionQualityChanged, ev.value.quality!, participant); - } else if (ev.case == 'chatMessage') { - const participant = this.retrieveParticipantByIdentity(ev.value.participantIdentity!); - const { id, message: messageText, timestamp, editTimestamp, generated } = ev.value.message!; - const message: ChatMessage = { - id: id!, - message: messageText!, - timestamp: Number(timestamp), - editTimestamp: Number(editTimestamp), - generated, - }; - this.emit(RoomEvent.ChatMessage, message, participant); - } else if (ev.case == 'dataPacketReceived') { - // Can be undefined if the data is sent from a Server SDK - const participant = this.remoteParticipants.get(ev.value.participantIdentity!); - const dataPacket = ev.value.value; - switch (dataPacket.case) { - case 'user': - const buffer = FfiClient.instance.copyBuffer( - dataPacket.value.data!.data!.dataPtr!, - Number(dataPacket.value.data!.data!.dataLen), + if (Object.keys(ev.value.changedAttributes).length > 0) { + const changedAttributes = ev.value.changedAttributes.reduce( + (acc, value) => { + acc[value.key!] = value.value!; + return acc; + }, + {} as Record, ); - new FfiHandle(dataPacket.value.data!.handle!.id!).dispose(); - this.emit( - RoomEvent.DataReceived, - buffer, - participant, - ev.value.kind, - dataPacket.value.topic, - ); - break; - case 'sipDtmf': - const { code, digit } = dataPacket.value; - this.emit(RoomEvent.DtmfReceived, code!, digit!, participant!); - break; - default: - break; - } - } else if (ev.case == 'e2eeStateChanged') { - if (ev.value.state == EncryptionState.INTERNAL_ERROR) { - // throw generic error until Rust SDK is updated to supply the error alongside INTERNAL_ERROR - this.emit(RoomEvent.EncryptionError, new Error('internal server error')); - } - } else if (ev.case == 'connectionStateChanged') { - this.connectionState = ev.value.state!; - this.emit(RoomEvent.ConnectionStateChanged, this.connectionState); - /*} else if (ev.case == 'connected') { + this.emit(RoomEvent.ParticipantAttributesChanged, changedAttributes, participant); + } + } else if (ev.case == 'connectionQualityChanged') { + const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!); + this.emit(RoomEvent.ConnectionQualityChanged, ev.value.quality!, participant); + } else if (ev.case == 'chatMessage') { + const participant = this.retrieveParticipantByIdentity(ev.value.participantIdentity!); + const { id, message: messageText, timestamp, editTimestamp, generated } = ev.value.message!; + const message: ChatMessage = { + id: id!, + message: messageText!, + timestamp: Number(timestamp), + editTimestamp: Number(editTimestamp), + generated, + }; + this.emit(RoomEvent.ChatMessage, message, participant); + } else if (ev.case == 'dataPacketReceived') { + // Can be undefined if the data is sent from a Server SDK + const participant = this.remoteParticipants.get(ev.value.participantIdentity!); + const dataPacket = ev.value.value; + switch (dataPacket.case) { + case 'user': + const buffer = FfiClient.instance.copyBuffer( + dataPacket.value.data!.data!.dataPtr!, + Number(dataPacket.value.data!.data!.dataLen), + ); + new FfiHandle(dataPacket.value.data!.handle!.id!).dispose(); + this.emit( + RoomEvent.DataReceived, + buffer, + participant, + ev.value.kind, + dataPacket.value.topic, + ); + break; + case 'sipDtmf': + const { code, digit } = dataPacket.value; + this.emit(RoomEvent.DtmfReceived, code!, digit!, participant!); + break; + default: + break; + } + } else if (ev.case == 'e2eeStateChanged') { + if (ev.value.state == EncryptionState.INTERNAL_ERROR) { + // throw generic error until Rust SDK is updated to supply the error alongside INTERNAL_ERROR + this.emit(RoomEvent.EncryptionError, new Error('internal server error')); + } + } else if (ev.case == 'connectionStateChanged') { + this.connectionState = ev.value.state!; + this.emit(RoomEvent.ConnectionStateChanged, this.connectionState); + /*} else if (ev.case == 'connected') { this.emit(RoomEvent.Connected);*/ - } else if (ev.case == 'disconnected') { - this.emit(RoomEvent.Disconnected, ev.value.reason!); - } else if (ev.case == 'reconnecting') { - this.emit(RoomEvent.Reconnecting); - } else if (ev.case == 'reconnected') { - this.emit(RoomEvent.Reconnected); - } else if (ev.case == 'roomSidChanged') { - this.emit(RoomEvent.RoomSidChanged, ev.value.sid!); - } else if (ev.case === 'streamHeaderReceived' && ev.value.header) { - this.handleStreamHeader(ev.value.header, ev.value.participantIdentity!); - } else if (ev.case === 'streamChunkReceived' && ev.value.chunk) { - this.handleStreamChunk(ev.value.chunk); - } else if (ev.case === 'streamTrailerReceived' && ev.value.trailer) { - this.handleStreamTrailer(ev.value.trailer); - } else if (ev.case === 'roomUpdated') { - this.info = ev.value; - this.emit(RoomEvent.RoomUpdated); - } else if (ev.case === 'moved') { - this.info = ev.value; - this.emit(RoomEvent.Moved); - } else if (ev.case === 'participantsUpdated') { - for (const info of ev.value.participants) { - const participant = this.retrieveParticipantByIdentity(info.identity!); - if (participant) { - participant.info = info; + } else if (ev.case == 'disconnected') { + this.emit(RoomEvent.Disconnected, ev.value.reason!); + } else if (ev.case == 'reconnecting') { + this.emit(RoomEvent.Reconnecting); + } else if (ev.case == 'reconnected') { + this.emit(RoomEvent.Reconnected); + } else if (ev.case == 'roomSidChanged') { + this.emit(RoomEvent.RoomSidChanged, ev.value.sid!); + } else if (ev.case === 'streamHeaderReceived' && ev.value.header) { + this.handleStreamHeader(ev.value.header, ev.value.participantIdentity!); + } else if (ev.case === 'streamChunkReceived' && ev.value.chunk) { + this.handleStreamChunk(ev.value.chunk); + } else if (ev.case === 'streamTrailerReceived' && ev.value.trailer) { + this.handleStreamTrailer(ev.value.trailer); + } else if (ev.case === 'roomUpdated') { + this.info = ev.value; + this.emit(RoomEvent.RoomUpdated); + } else if (ev.case === 'moved') { + this.info = ev.value; + this.emit(RoomEvent.Moved); + } else if (ev.case === 'participantsUpdated') { + for (const info of ev.value.participants) { + const participant = this.retrieveParticipantByIdentity(info.identity!); + if (participant) { + participant.info = info; + } } } + } finally { + unlock(); } };