diff --git a/packages/livekit-rtc/src/participant.ts b/packages/livekit-rtc/src/participant.ts index bfff8abe..731e8c87 100644 --- a/packages/livekit-rtc/src/participant.ts +++ b/packages/livekit-rtc/src/participant.ts @@ -353,15 +353,7 @@ export class LocalParticipant extends Participant { return writer.info; } - async streamBytes(options?: { - name?: string; - topic?: string; - attributes?: Record; - destinationIdentities?: Array; - streamId?: string; - mimeType?: string; - totalSize?: number; - }) { + async streamBytes(options?: ByteStreamOptions & { streamId?: string; totalSize?: number }) { const senderIdentity = this.identity; const streamId = options?.streamId ?? crypto.randomUUID(); const destinationIdentities = options?.destinationIdentities; @@ -481,6 +473,101 @@ export class LocalParticipant extends Participant { } } + /** Sends raw bytes or byte-like data to specified recipients */ + async sendBytes( + data: + | Uint8Array + | ArrayBuffer + | Blob + | ReadableStream + | NodeJS.ReadableStream, + options?: ByteStreamOptions, + ) { + const streamId = crypto.randomUUID(); + const destinationIdentities = options?.destinationIdentities; + + // Determine total size if available and infer mime when possible + let totalSize: number | undefined; + let inferredMime: string | undefined; + + if (data instanceof Uint8Array) { + totalSize = data.byteLength; + } else if (data instanceof ArrayBuffer) { + totalSize = data.byteLength; + } else if (typeof Blob !== 'undefined' && data instanceof Blob) { + totalSize = data.size; + inferredMime = (data as Blob).type || undefined; + } + + const writer = await this.streamBytes({ + streamId, + name: options?.name ?? 'unknown', + totalSize, + destinationIdentities, + topic: options?.topic, + mimeType: options?.mimeType ?? inferredMime, + attributes: options?.attributes, + }); + + let bytesSent = 0; + const maybeReportProgress = (increment: number) => { + bytesSent += increment; + if (options?.onProgress && totalSize !== undefined && totalSize > 0) { + const progress = Math.min(1, bytesSent / totalSize); + options.onProgress(progress); + } + }; + + // Helper to write from a Web ReadableStream + const writeFromWebStream = async (rs: ReadableStream) => { + const reader = rs.getReader(); + try { + while (true) { + const { value, done } = await reader.read(); + if (done) break; + if (value) { + await writer.write(value); + maybeReportProgress(value.byteLength); + } + } + } finally { + reader.releaseLock(); + } + }; + + if (data instanceof Uint8Array) { + await writer.write(data); + maybeReportProgress(data.byteLength); + } else if (data instanceof ArrayBuffer) { + const bytes = new Uint8Array(data); + await writer.write(bytes); + maybeReportProgress(bytes.byteLength); + } else if (typeof Blob !== 'undefined' && data instanceof Blob) { + await writeFromWebStream(data.stream() as ReadableStream); + } else if ( + typeof data === 'object' && + data !== null && + 'getReader' in data && + typeof (data as ReadableStream).getReader === 'function' + ) { + // Treat as Web ReadableStream + await writeFromWebStream(data as ReadableStream); + } else if (typeof data === 'object' && data !== null && Symbol.asyncIterator in data) { + // Treat as AsyncIterable (e.g., Node.js Readable of Uint8Array/Buffer) + for await (const chunk of data as AsyncIterable) { + await writer.write(chunk); + maybeReportProgress(chunk.byteLength); + } + } else { + throw new Error('Unsupported data type for sendBytes'); + } + + await writer.close(); + if (options?.onProgress && totalSize !== undefined) { + options.onProgress(1); + } + } + private async sendStreamHeader(req: SendStreamHeaderRequest) { const type = 'sendStreamHeader'; const res = FfiClient.instance.request({