diff --git a/.changes/connection-credentials b/.changes/connection-credentials new file mode 100644 index 000000000..37cbf36c7 --- /dev/null +++ b/.changes/connection-credentials @@ -0,0 +1 @@ +patch type="added" "Abstract token source for easier token fetching in production and faster integration with sandbox environment" diff --git a/Package.swift b/Package.swift index eb1ec22fc..7d4ba7199 100644 --- a/Package.swift +++ b/Package.swift @@ -23,10 +23,9 @@ let package = Package( .package(url: "https://github.com/apple/swift-protobuf.git", from: "1.29.0"), .package(url: "https://github.com/apple/swift-log.git", from: "1.6.2"), .package(url: "https://github.com/apple/swift-collections.git", from: "1.1.0"), + .package(url: "https://github.com/vapor/jwt-kit.git", from: "4.13.5"), // Only used for DocC generation .package(url: "https://github.com/apple/swift-docc-plugin.git", from: "1.3.0"), - // Only used for Testing - .package(url: "https://github.com/vapor/jwt-kit.git", from: "4.13.4"), ], targets: [ .target( @@ -41,6 +40,7 @@ let package = Package( .product(name: "DequeModule", package: "swift-collections"), .product(name: "OrderedCollections", package: "swift-collections"), .product(name: "Logging", package: "swift-log"), + .product(name: "JWTKit", package: "jwt-kit"), "LKObjCHelpers", ], exclude: [ @@ -57,14 +57,12 @@ let package = Package( name: "LiveKitTests", dependencies: [ "LiveKit", - .product(name: "JWTKit", package: "jwt-kit"), ] ), .testTarget( name: "LiveKitTestsObjC", dependencies: [ "LiveKit", - .product(name: "JWTKit", package: "jwt-kit"), ] ), ], diff --git a/Package@swift-6.0.swift b/Package@swift-6.0.swift index b9076ef75..ab1f670ef 100644 --- a/Package@swift-6.0.swift +++ b/Package@swift-6.0.swift @@ -24,10 +24,9 @@ let package = Package( .package(url: "https://github.com/apple/swift-protobuf.git", from: "1.29.0"), .package(url: "https://github.com/apple/swift-log.git", from: "1.6.2"), .package(url: "https://github.com/apple/swift-collections.git", from: "1.1.0"), + .package(url: "https://github.com/vapor/jwt-kit.git", from: "4.13.5"), // Only used for DocC generation .package(url: "https://github.com/apple/swift-docc-plugin.git", from: "1.3.0"), - // Only used for Testing - .package(url: "https://github.com/vapor/jwt-kit.git", from: "4.13.4"), ], targets: [ .target( @@ -42,6 +41,7 @@ let package = Package( .product(name: "DequeModule", package: "swift-collections"), .product(name: "OrderedCollections", package: "swift-collections"), .product(name: "Logging", package: "swift-log"), + .product(name: "JWTKit", package: "jwt-kit"), "LKObjCHelpers", ], exclude: [ @@ -58,14 +58,12 @@ let package = Package( name: "LiveKitTests", dependencies: [ "LiveKit", - .product(name: "JWTKit", package: "jwt-kit"), ] ), .testTarget( name: "LiveKitTestsObjC", dependencies: [ "LiveKit", - .product(name: "JWTKit", package: "jwt-kit"), ] ), ], diff --git a/Sources/LiveKit/Agent/Agent.swift b/Sources/LiveKit/Agent/Agent.swift new file mode 100644 index 000000000..f2a23bf0b --- /dev/null +++ b/Sources/LiveKit/Agent/Agent.swift @@ -0,0 +1,54 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +@MainActor +open class Agent: ObservableObject { + @Published public private(set) var state: AgentState = .idle + + @Published public private(set) var audioTrack: (any AudioTrack)? + @Published public private(set) var avatarVideoTrack: (any VideoTrack)? + + public let participant: Participant + + public init(participant: Participant) { + self.participant = participant + observe(participant) + } + + private func observe(_ participant: Participant) { + Task { [weak self] in + for try await _ in participant.changes { + guard let self else { return } + + state = participant.agentState + updateTracks(of: participant) + } + } + } + + private func updateTracks(of participant: Participant) { + audioTrack = participant.audioTracks.first(where: { $0.source == .microphone })?.track as? AudioTrack + avatarVideoTrack = participant.avatarWorker?.firstCameraVideoTrack + } +} + +extension AgentState: CustomStringConvertible { + public var description: String { + rawValue.capitalized + } +} diff --git a/Sources/LiveKit/Agent/Chat/Message.swift b/Sources/LiveKit/Agent/Chat/Message.swift new file mode 100644 index 000000000..529728c1c --- /dev/null +++ b/Sources/LiveKit/Agent/Chat/Message.swift @@ -0,0 +1,41 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +/// A message received from the agent. +public struct ReceivedMessage: Identifiable, Equatable, Codable, Sendable { + public let id: String + public let timestamp: Date + public let content: Content + + public enum Content: Equatable, Codable, Sendable { + case agentTranscript(String) + case userTranscript(String) + case userInput(String) + } +} + +/// A message sent to the agent. +public struct SentMessage: Identifiable, Equatable, Codable, Sendable { + public let id: String + public let timestamp: Date + public let content: Content + + public enum Content: Equatable, Codable, Sendable { + case userInput(String) + } +} diff --git a/Sources/LiveKit/Agent/Chat/Receive/MessageReceiver.swift b/Sources/LiveKit/Agent/Chat/Receive/MessageReceiver.swift new file mode 100644 index 000000000..2344be30e --- /dev/null +++ b/Sources/LiveKit/Agent/Chat/Receive/MessageReceiver.swift @@ -0,0 +1,27 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +/// A protocol that defines a message receiver. +/// +/// A message receiver is responsible for creating a stream of messages from the agent. +/// It is used to receive messages from the agent and update the message feed. +/// +/// - SeeAlso: ``ReceivedMessage`` +public protocol MessageReceiver: Sendable { + func messages() async throws -> AsyncStream +} diff --git a/Sources/LiveKit/Agent/Chat/Receive/TranscriptionDelegateReceiver.swift b/Sources/LiveKit/Agent/Chat/Receive/TranscriptionDelegateReceiver.swift new file mode 100644 index 000000000..43c8bfe1a --- /dev/null +++ b/Sources/LiveKit/Agent/Chat/Receive/TranscriptionDelegateReceiver.swift @@ -0,0 +1,68 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +/// An actor that receives transcription messages from the room and yields them as messages. +/// +/// Room delegate methods are called multiple times for each message, with a stable message ID +/// that can be direcly used for diffing. +/// +/// Example: +/// ``` +/// { id: "1", content: "Hello" } +/// { id: "1", content: "Hello world!" } +/// ``` +@available(*, deprecated, message: "Use TranscriptionStreamReceiver compatible with livekit-agents 1.0") +actor TranscriptionDelegateReceiver: MessageReceiver, RoomDelegate { + private let room: Room + private var continuation: AsyncStream.Continuation? + + init(room: Room) { + self.room = room + room.add(delegate: self) + } + + deinit { + room.remove(delegate: self) + } + + /// Creates a new message stream for the transcription delegate receiver. + func messages() -> AsyncStream { + let (stream, continuation) = AsyncStream.makeStream(of: ReceivedMessage.self) + self.continuation = continuation + return stream + } + + nonisolated func room(_: Room, participant: Participant, trackPublication _: TrackPublication, didReceiveTranscriptionSegments segments: [TranscriptionSegment]) { + segments + .filter { !$0.text.isEmpty } + .forEach { segment in + let message = ReceivedMessage( + id: segment.id, + timestamp: segment.lastReceivedTime, + content: participant.isAgent ? .agentTranscript(segment.text) : .userTranscript(segment.text) + ) + Task { + await yield(message) + } + } + } + + private func yield(_ message: ReceivedMessage) { + continuation?.yield(message) + } +} diff --git a/Sources/LiveKit/Agent/Chat/Receive/TranscriptionStreamReceiver.swift b/Sources/LiveKit/Agent/Chat/Receive/TranscriptionStreamReceiver.swift new file mode 100644 index 000000000..758ce5043 --- /dev/null +++ b/Sources/LiveKit/Agent/Chat/Receive/TranscriptionStreamReceiver.swift @@ -0,0 +1,173 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +/// An actor that converts raw text streams from the LiveKit `Room` into `Message` objects. +/// - Note: Streams are supported by `livekit-agents` >= 1.0.0. +/// - SeeAlso: ``TranscriptionDelegateReceiver`` +/// +/// For agent messages, new text stream is emitted for each message, and the stream is closed when the message is finalized. +/// Each agent message is delivered in chunks, that are accumulated and published into the message stream. +/// +/// For user messages, the full transcription is sent each time, but may be updated until finalized. +/// +/// The ID of the segment is stable and unique across the lifetime of the message. +/// This ID can be used directly for `Identifiable` conformance. +/// +/// Example text stream for agent messages: +/// ``` +/// { segment_id: "1", content: "Hello" } +/// { segment_id: "1", content: " world" } +/// { segment_id: "1", content: "!" } +/// { segment_id: "2", content: "Hello" } +/// { segment_id: "2", content: " Apple" } +/// { segment_id: "2", content: "!" } +/// ``` +/// +/// Example text stream for user messages: +/// ``` +/// { segment_id: "3", content: "Hello" } +/// { segment_id: "3", content: "Hello world!" } +/// { segment_id: "4", content: "Hello" } +/// { segment_id: "4", content: "Hello Apple!" } +/// ``` +/// +/// Example output: +/// ``` +/// Message(id: "1", timestamp: 2025-01-01 12:00:00 +0000, content: .agentTranscript("Hello world!")) +/// Message(id: "2", timestamp: 2025-01-01 12:00:10 +0000, content: .agentTranscript("Hello Apple!")) +/// Message(id: "3", timestamp: 2025-01-01 12:00:20 +0000, content: .userTranscript("Hello world!")) +/// Message(id: "4", timestamp: 2025-01-01 12:00:30 +0000, content: .userTranscript("Hello Apple!")) +/// ``` +/// +actor TranscriptionStreamReceiver: MessageReceiver, Loggable { + private struct PartialMessageID: Hashable { + let segmentID: String + let participantID: Participant.Identity + } + + private struct PartialMessage { + var content: String + let timestamp: Date + var streamID: String + + mutating func appendContent(_ newContent: String) { + content += newContent + } + + mutating func replaceContent(_ newContent: String, streamID: String) { + content = newContent + self.streamID = streamID + } + } + + private let room: Room + private let topic: String + + private lazy var partialMessages: [PartialMessageID: PartialMessage] = [:] + + init(room: Room, topic: String = "lk.transcription") { + self.room = room + self.topic = topic + } + + /// Creates a new message stream for the chat topic. + func messages() async throws -> AsyncStream { + let (stream, continuation) = AsyncStream.makeStream(of: ReceivedMessage.self) + + try await room.registerTextStreamHandler(for: topic) { [weak self] reader, participantIdentity in + guard let self else { return } + for try await message in reader where !message.isEmpty { + await continuation.yield(processIncoming(partialMessage: message, reader: reader, participantIdentity: participantIdentity)) + } + } + + continuation.onTermination = { [weak self] _ in + Task { + guard let self else { return } + await self.room.unregisterTextStreamHandler(for: self.topic) + } + } + + return stream + } + + /// Aggregates the incoming text into a message, storing the partial content in the `partialMessages` dictionary. + /// - Note: When the message is finalized, or a new message is started, the dictionary is purged to limit memory usage. + private func processIncoming(partialMessage message: String, reader: TextStreamReader, participantIdentity: Participant.Identity) -> ReceivedMessage { + let attributes = reader.info.attributes.mapped(to: TranscriptionAttributes.self) + if attributes == nil { + log("Unable to read message attributes from \(reader.info.attributes)", .error) + } + + let segmentID = attributes?.lkSegmentID ?? reader.info.id + let participantID = participantIdentity + let partialID = PartialMessageID(segmentID: segmentID, participantID: participantID) + + let currentStreamID = reader.info.id + + let timestamp: Date + let updatedContent: String + + if var existingMessage = partialMessages[partialID] { + // Update existing message + if existingMessage.streamID == currentStreamID { + // Same stream, append content + existingMessage.appendContent(message) + } else { + // Different stream for same segment, replace content + existingMessage.replaceContent(message, streamID: currentStreamID) + } + updatedContent = existingMessage.content + timestamp = existingMessage.timestamp + partialMessages[partialID] = existingMessage + } else { + // This is a new message + updatedContent = message + timestamp = reader.info.timestamp + partialMessages[partialID] = PartialMessage( + content: updatedContent, + timestamp: timestamp, + streamID: currentStreamID + ) + cleanupPreviousTurn(participantIdentity, exceptSegmentID: segmentID) + } + + let isFinal = attributes?.lkTranscriptionFinal ?? false + if isFinal { + partialMessages[partialID] = nil + } + + let newOrUpdatedMessage = ReceivedMessage( + id: segmentID, + timestamp: timestamp, + content: participantIdentity == room.localParticipant.identity ? .userTranscript(updatedContent) : .agentTranscript(updatedContent) + ) + + return newOrUpdatedMessage + } + + private func cleanupPreviousTurn(_ participantID: Participant.Identity, exceptSegmentID: String) { + let keysToRemove = partialMessages.keys.filter { + $0.participantID == participantID && $0.segmentID != exceptSegmentID + } + + for key in keysToRemove { + partialMessages[key] = nil + } + } +} diff --git a/Sources/LiveKit/Agent/AgentState+.swift b/Sources/LiveKit/Agent/Chat/Send/MessageSender.swift similarity index 63% rename from Sources/LiveKit/Agent/AgentState+.swift rename to Sources/LiveKit/Agent/Chat/Send/MessageSender.swift index 9bb45b096..fe78232c0 100644 --- a/Sources/LiveKit/Agent/AgentState+.swift +++ b/Sources/LiveKit/Agent/Chat/Send/MessageSender.swift @@ -14,8 +14,14 @@ * limitations under the License. */ -extension AgentState: CustomStringConvertible { - public var description: String { - rawValue.capitalized - } +import Foundation + +/// A protocol that defines a message sender. +/// +/// A message sender is responsible for sending messages to the agent. +/// It is used to send messages to the agent and update the message feed. +/// +/// - SeeAlso: ``SentMessage`` +public protocol MessageSender: Sendable { + func send(_ message: SentMessage) async throws } diff --git a/Sources/LiveKit/Agent/Chat/Send/TextMessageSender.swift b/Sources/LiveKit/Agent/Chat/Send/TextMessageSender.swift new file mode 100644 index 000000000..3fcfc87e0 --- /dev/null +++ b/Sources/LiveKit/Agent/Chat/Send/TextMessageSender.swift @@ -0,0 +1,55 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +/// An actor that sends local messages to the agent. +/// Currently, it only supports sending text messages. +/// +/// It also serves as the loopback for the local messages, +/// so that they can be displayed in the message feed +/// without relying on the agent-side transcription. +actor TextMessageSender: MessageSender, MessageReceiver { + private let room: Room + private let topic: String + + private var messageContinuation: AsyncStream.Continuation? + + init(room: Room, topic: String = "lk.chat") { + self.room = room + self.topic = topic + } + + func send(_ message: SentMessage) async throws { + guard case let .userInput(text) = message.content else { return } + + try await room.localParticipant.sendText(text, for: topic) + + let loopbackMessage = ReceivedMessage( + id: message.id, + timestamp: message.timestamp, + content: .userInput(text) + ) + + messageContinuation?.yield(loopbackMessage) + } + + func messages() async throws -> AsyncStream { + let (stream, continuation) = AsyncStream.makeStream() + messageContinuation = continuation + return stream + } +} diff --git a/Sources/LiveKit/Agent/Conversation+Environment.swift b/Sources/LiveKit/Agent/Conversation+Environment.swift new file mode 100644 index 000000000..d0f64cde7 --- /dev/null +++ b/Sources/LiveKit/Agent/Conversation+Environment.swift @@ -0,0 +1,80 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import SwiftUI + +#if swift(>=6.0) +public extension EnvironmentValues { + @Entry var agentName: String? = nil +} +#else +public struct AgentNameKey: EnvironmentKey { + public static let defaultValue: String? = nil +} + +public extension EnvironmentValues { + var agentName: String? { + get { self[AgentNameKey.self] } + set { self[AgentNameKey.self] = newValue } + } +} +#endif + +@MainActor +@propertyWrapper +public struct LiveKitConversation: DynamicProperty { + @EnvironmentObject private var conversation: Conversation + + public init() {} + + public var wrappedValue: Conversation { + conversation + } +} + +@MainActor +@propertyWrapper +public struct LiveKitLocalMedia: DynamicProperty { + @EnvironmentObject private var localMedia: LocalMedia + + public init() {} + + public var wrappedValue: LocalMedia { + localMedia + } +} + +@MainActor +@propertyWrapper +public struct LiveKitAgent: DynamicProperty { + @EnvironmentObject private var conversation: Conversation + @Environment(\.agentName) private var environmentName + + let agentName: String? + + public init(_ agentName: String? = nil) { + self.agentName = agentName + } + + public var wrappedValue: Agent? { + if let agentName { + return conversation.agent(named: agentName) + } else if let environmentName { + return conversation.agent(named: environmentName) + } + return conversation.agents.values.first + } +} diff --git a/Sources/LiveKit/Agent/Conversation.swift b/Sources/LiveKit/Agent/Conversation.swift new file mode 100644 index 000000000..cac682780 --- /dev/null +++ b/Sources/LiveKit/Agent/Conversation.swift @@ -0,0 +1,199 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation +import OrderedCollections + +@MainActor +open class Conversation: ObservableObject { + // MARK: - Error + + public enum Error: LocalizedError { + case agentNotConnected + case failedToConnect(Swift.Error) + case failedToSend(Swift.Error) + + public var errorDescription: String? { + "TODO" + } + } + + // MARK: - State + + @Published public private(set) var error: Error? + + @Published public private(set) var connectionState: ConnectionState = .disconnected + @Published public private(set) var isListening = false + public var isReady: Bool { + switch connectionState { + case .disconnected where isListening, + .connecting where isListening, + .connected, + .reconnecting: + true + default: + false + } + } + + @Published public private(set) var agents: [Participant.Identity: Agent] = [:] + public var hasAgents: Bool { !agents.isEmpty } + + @Published public private(set) var messages: OrderedDictionary = [:] + + // MARK: - Dependencies + + public let room: Room + + private let tokenSource: any TokenSource + private let senders: [any MessageSender] + private let receivers: [any MessageReceiver] + + // MARK: - Internal state + + private var waitForAgentTask: Task? + + // MARK: - Init + + public init(tokenSource: TokenSource, room: Room = .init(), agentName: String? = nil, senders: [any MessageSender]? = nil, receivers: [any MessageReceiver]? = nil) { + self.tokenSource = tokenSource + self.room = room + + let textMessageSender = TextMessageSender(room: room) + let senders = senders ?? [textMessageSender] + let receivers = receivers ?? [textMessageSender, TranscriptionStreamReceiver(room: room)] + + self.senders = senders + self.receivers = receivers + + observe(room: room, agentName: agentName) + observe(receivers: receivers) + } + + private func observe(room: Room, agentName _: String?) { + Task { [weak self] in + for try await _ in room.changes { + guard let self else { return } + + connectionState = room.connectionState + updateAgents(in: room) + } + } + } + + private func updateAgents(in room: Room) { + let agentParticipants = room.agentParticipants + + var newAgents: [Participant.Identity: Agent] = [:] + + for (identity, participant) in agentParticipants { + if let existingAgent = agents[identity] { + newAgents[identity] = existingAgent + } else { + let newAgent = Agent(participant: participant) + newAgents[identity] = newAgent + } + } + + agents = newAgents + } + + private func observe(receivers: [any MessageReceiver]) { + for receiver in receivers { + Task { [weak self] in + for await message in try await receiver.messages() { + guard let self else { return } + messages.updateValue(message, forKey: message.id) + } + } + } + } + + // MARK: - Agents + + public func agent(named name: String) -> Agent? { + agents.values.first { $0.participant.attributes["lk.agent_name"] == name || $0.participant.identity?.stringValue == name } + } + + public subscript(name: String) -> Agent? { + agent(named: name) + } + + // MARK: - Lifecycle + + public func start(preConnectAudio: Bool = true, waitForAgent: TimeInterval = 20, options: ConnectOptions? = nil, roomOptions: RoomOptions? = nil) async { + guard connectionState == .disconnected else { return } + + error = nil + waitForAgentTask?.cancel() + + defer { + waitForAgentTask = Task { + try await Task.sleep(nanoseconds: UInt64(TimeInterval(NSEC_PER_SEC) * waitForAgent)) + try Task.checkCancellation() + if connectionState == .connected, agents.isEmpty { + await end() + self.error = .agentNotConnected + } + } + } + + do { + if preConnectAudio { + try await room.withPreConnectAudio(timeout: waitForAgent) { + await MainActor.run { self.isListening = true } + try await self.room.connect(tokenSource: self.tokenSource, connectOptions: options, roomOptions: roomOptions) + await MainActor.run { self.isListening = false } + } + } else { + try await room.connect(tokenSource: tokenSource, connectOptions: options, roomOptions: roomOptions) + } + } catch { + self.error = .failedToConnect(error) + } + } + + public func end() async { + await room.disconnect() + } + + public func resetError() { + error = nil + } + + // MARK: - Messages + + @discardableResult + public func send(text: String) async -> SentMessage { + let message = SentMessage(id: UUID().uuidString, timestamp: Date(), content: .userInput(text)) + do { + for sender in senders { + try await sender.send(message) + } + } catch { + self.error = .failedToSend(error) + } + return message + } + + public func getMessageHistory() -> [ReceivedMessage] { + messages.values.elements + } + + public func restoreMessageHistory(_ messages: [ReceivedMessage]) { + self.messages = .init(uniqueKeysWithValues: messages.sorted(by: { $0.timestamp < $1.timestamp }).map { ($0.id, $0) }) + } +} diff --git a/Sources/LiveKit/Agent/LocalMedia.swift b/Sources/LiveKit/Agent/LocalMedia.swift new file mode 100644 index 000000000..e0d7775ab --- /dev/null +++ b/Sources/LiveKit/Agent/LocalMedia.swift @@ -0,0 +1,172 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@preconcurrency import AVFoundation + +@MainActor +open class LocalMedia: ObservableObject { + // MARK: - Error + + public enum Error: LocalizedError { + case mediaDevice(Swift.Error) + } + + // MARK: - Devices + + @Published public private(set) var error: Error? + + @Published public private(set) var microphoneTrack: (any AudioTrack)? + @Published public private(set) var cameraTrack: (any VideoTrack)? + @Published public private(set) var screenShareTrack: (any VideoTrack)? + + @Published public private(set) var isMicrophoneEnabled: Bool = false + @Published public private(set) var isCameraEnabled: Bool = false + @Published public private(set) var isScreenShareEnabled: Bool = false + + @Published public private(set) var audioDevices: [AudioDevice] = AudioManager.shared.inputDevices + @Published public private(set) var selectedAudioDeviceID: String = AudioManager.shared.inputDevice.deviceId + + @Published public private(set) var videoDevices: [AVCaptureDevice] = [] + @Published public private(set) var selectedVideoDeviceID: String? + + @Published public private(set) var canSwitchCamera = false + + // MARK: - Dependencies + + private var localParticipant: LocalParticipant + + // MARK: - Initialization + + public init(localParticipant: LocalParticipant) { + self.localParticipant = localParticipant + + observe(localParticipant) + observeDevices() + } + + public convenience init(room: Room) { + self.init(localParticipant: room.localParticipant) + } + + public convenience init(conversation: Conversation) { + self.init(room: conversation.room) + } + + private func observe(_ localParticipant: LocalParticipant) { + Task { [weak self] in + for try await _ in localParticipant.changes { + guard let self else { return } + + microphoneTrack = localParticipant.firstAudioTrack + cameraTrack = localParticipant.firstCameraVideoTrack + screenShareTrack = localParticipant.firstScreenShareVideoTrack + + isMicrophoneEnabled = localParticipant.isMicrophoneEnabled() + isCameraEnabled = localParticipant.isCameraEnabled() + isScreenShareEnabled = localParticipant.isScreenShareEnabled() + } + } + } + + private func observeDevices() { + try? AudioManager.shared.set(microphoneMuteMode: .inputMixer) // don't play mute sound effect + Task { + try await AudioManager.shared.setRecordingAlwaysPreparedMode(true) + } + + AudioManager.shared.onDeviceUpdate = { [weak self] _ in + Task { @MainActor in + self?.audioDevices = AudioManager.shared.inputDevices + self?.selectedAudioDeviceID = AudioManager.shared.defaultInputDevice.deviceId + } + } + + Task { + canSwitchCamera = try await CameraCapturer.canSwitchPosition() + videoDevices = try await CameraCapturer.captureDevices() + selectedVideoDeviceID = videoDevices.first?.uniqueID + } + } + + deinit { + AudioManager.shared.onDeviceUpdate = nil + } + + // MARK: - Toggle + + public func toggleMicrophone() async { + do { + try await localParticipant.setMicrophone(enabled: !isMicrophoneEnabled) + } catch { + self.error = .mediaDevice(error) + } + } + + public func toggleCamera(disableScreenShare: Bool = false) async { + let enable = !isCameraEnabled + do { + if enable, disableScreenShare, isScreenShareEnabled { + try await localParticipant.setScreenShare(enabled: false) + } + + let device = try await CameraCapturer.captureDevices().first(where: { $0.uniqueID == selectedVideoDeviceID }) + try await localParticipant.setCamera(enabled: enable, captureOptions: CameraCaptureOptions(device: device)) + } catch { + self.error = .mediaDevice(error) + } + } + + public func toggleScreenShare(disableCamera: Bool = false) async { + let enable = !isScreenShareEnabled + do { + if enable, disableCamera, isCameraEnabled { + try await localParticipant.setCamera(enabled: false) + } + try await localParticipant.setScreenShare(enabled: enable) + } catch { + self.error = .mediaDevice(error) + } + } + + // MARK: - Select + + public func select(audioDevice: AudioDevice) { + selectedAudioDeviceID = audioDevice.deviceId + + let device = AudioManager.shared.inputDevices.first(where: { $0.deviceId == selectedAudioDeviceID }) ?? AudioManager.shared.defaultInputDevice + AudioManager.shared.inputDevice = device + } + + public func select(videoDevice: AVCaptureDevice) async { + selectedVideoDeviceID = videoDevice.uniqueID + + guard let cameraCapturer = getCameraCapturer() else { return } + let captureOptions = CameraCaptureOptions(device: videoDevice) + _ = try? await cameraCapturer.set(options: captureOptions) + } + + public func switchCamera() async { + guard let cameraCapturer = getCameraCapturer() else { return } + _ = try? await cameraCapturer.switchCameraPosition() + } + + // MARK: - Private + + private func getCameraCapturer() -> CameraCapturer? { + guard let cameraTrack = localParticipant.firstCameraVideoTrack as? LocalVideoTrack else { return nil } + return cameraTrack.capturer as? CameraCapturer + } +} diff --git a/Sources/LiveKit/Auth/JWT.swift b/Sources/LiveKit/Auth/JWT.swift new file mode 100644 index 000000000..b0531594f --- /dev/null +++ b/Sources/LiveKit/Auth/JWT.swift @@ -0,0 +1,99 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import JWTKit + +public struct LiveKitJWTPayload: JWTPayload, Codable, Equatable { + public struct VideoGrant: Codable, Equatable { + /// Name of the room, must be set for admin or join permissions + public let room: String? + /// Permission to create a room + public let roomCreate: Bool? + /// Permission to join a room as a participant, room must be set + public let roomJoin: Bool? + /// Permission to list rooms + public let roomList: Bool? + /// Permission to start a recording + public let roomRecord: Bool? + /// Permission to control a specific room, room must be set + public let roomAdmin: Bool? + + /// Allow participant to publish. If neither canPublish or canSubscribe is set, both publish and subscribe are enabled + public let canPublish: Bool? + /// Allow participant to subscribe to other tracks + public let canSubscribe: Bool? + /// Allow participants to publish data, defaults to true if not set + public let canPublishData: Bool? + /// Allowed sources for publishing + public let canPublishSources: [String]? + /// Participant isn't visible to others + public let hidden: Bool? + /// Participant is recording the room, when set, allows room to indicate it's being recorded + public let recorder: Bool? + + public init(room: String? = nil, + roomCreate: Bool? = nil, + roomJoin: Bool? = nil, + roomList: Bool? = nil, + roomRecord: Bool? = nil, + roomAdmin: Bool? = nil, + canPublish: Bool? = nil, + canSubscribe: Bool? = nil, + canPublishData: Bool? = nil, + canPublishSources: [String]? = nil, + hidden: Bool? = nil, + recorder: Bool? = nil) + { + self.room = room + self.roomCreate = roomCreate + self.roomJoin = roomJoin + self.roomList = roomList + self.roomRecord = roomRecord + self.roomAdmin = roomAdmin + self.canPublish = canPublish + self.canSubscribe = canSubscribe + self.canPublishData = canPublishData + self.canPublishSources = canPublishSources + self.hidden = hidden + self.recorder = recorder + } + } + + /// Expiration time claim + public let exp: ExpirationClaim + /// Issuer claim + public let iss: IssuerClaim + /// Not before claim + public let nbf: NotBeforeClaim + /// Subject claim + public let sub: SubjectClaim + + /// Participant name + public let name: String? + /// Participant metadata + public let metadata: String? + /// Video grants for the participant + public let video: VideoGrant? + + public func verify(using _: JWTSigner) throws { + try nbf.verifyNotBefore() + try exp.verifyNotExpired() + } + + static func fromUnverified(token: String) -> Self? { + try? JWTSigners().unverified(token, as: Self.self) + } +} diff --git a/Sources/LiveKit/Auth/Sandbox.swift b/Sources/LiveKit/Auth/Sandbox.swift new file mode 100644 index 000000000..e090d625a --- /dev/null +++ b/Sources/LiveKit/Auth/Sandbox.swift @@ -0,0 +1,35 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +/// `Sandbox` queries LiveKit Sandbox token server for credentials, +/// which supports quick prototyping/getting started types of use cases. +/// - Warning: This token endpoint is **INSECURE** and should **NOT** be used in production. +public struct Sandbox: TokenEndpoint { + public let url = URL(string: "https://cloud-api.livekit.io/api/sandbox/connection-details")! + public var headers: [String: String] { + ["X-Sandbox-ID": id] + } + + /// The sandbox ID provided by LiveKit Cloud. + public let id: String + + /// Initialize with a sandbox ID from LiveKit Cloud. + public init(id: String) { + self.id = id.trimmingCharacters(in: .alphanumerics.inverted) + } +} diff --git a/Sources/LiveKit/Auth/TokenSource.swift b/Sources/LiveKit/Auth/TokenSource.swift new file mode 100644 index 000000000..866f2e176 --- /dev/null +++ b/Sources/LiveKit/Auth/TokenSource.swift @@ -0,0 +1,271 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +#warning("Fix camel case after deploying backend") + +// MARK: - Token + +/// `Token` represent the credentials needed for connecting to a new Room. +/// - SeeAlso: [LiveKit's Authentication Documentation](https://docs.livekit.io/home/get-started/authentication/) for more information. +public enum Token { + /// Request parameters for generating connection credentials. + public struct Request: Encodable, Sendable, Equatable { + /// The name of the room being requested when generating credentials. + public let roomName: String? + /// The name of the participant being requested for this client when generating credentials. + public let participantName: String? + /// The identity of the participant being requested for this client when generating credentials. + public let participantIdentity: String? + /// Any participant metadata being included along with the credentials generation operation. + public let participantMetadata: String? + /// Any participant attributes being included along with the credentials generation operation. + public let participantAttributes: [String: String]? + /// A `RoomConfiguration` object can be passed to request extra parameters when generating connection credentials. + /// Used for advanced room configuration like dispatching agents, setting room limits, etc. + /// - SeeAlso: [Room Configuration Documentation](https://docs.livekit.io/home/get-started/authentication/#room-configuration) for more info. + public let roomConfiguration: RoomConfiguration? + + // enum CodingKeys: String, CodingKey { + // case roomName = "room_name" + // case participantName = "participant_name" + // case participantIdentity = "participant_identity" + // case participantMetadata = "participant_metadata" + // case participantAttributes = "participant_attributes" + // case roomConfiguration = "room_configuration" + // } + + public init( + roomName: String? = nil, + participantName: String? = nil, + participantIdentity: String? = nil, + participantMetadata: String? = nil, + participantAttributes: [String: String]? = nil, + roomConfiguration: RoomConfiguration? = nil + ) { + self.roomName = roomName + self.participantName = participantName + self.participantIdentity = participantIdentity + self.participantMetadata = participantMetadata + self.participantAttributes = participantAttributes + self.roomConfiguration = roomConfiguration + } + } + + /// Response containing the credentials needed to connect to a room. + public struct Response: Decodable, Sendable { + /// The WebSocket URL for the LiveKit server. + public let serverURL: URL + /// The JWT token containing participant permissions and metadata. + public let participantToken: String + + enum CodingKeys: String, CodingKey { + case serverURL = "serverUrl" + case participantToken + } + + public init(serverURL: URL, participantToken: String) { + self.serverURL = serverURL + self.participantToken = participantToken + } + } + + public typealias Options = Request + public typealias Literal = Response +} + +// MARK: - Source + +/// Protocol for types that can provide connection credentials. +/// Implement this protocol to create custom credential providers (e.g., fetching from your backend API). +public protocol TokenSource: Sendable { + /// Fetch connection credentials for the given request. + /// - Parameter request: The token request containing room and participant information + /// - Returns: A token response containing the server URL and participant token + /// - Throws: An error if the token generation fails + func fetch(_ request: Token.Request) async throws -> Token.Response +} + +/// `Token.Literal` contains a single set of credentials, hard-coded or acquired from a static source. +extension Token.Literal: TokenSource { + public func fetch(_: Token.Request) async throws -> Token.Response { + self + } +} + +// MARK: - Endpoint + +/// Protocol for token servers that fetch credentials via HTTP requests. +/// Provides a default implementation of `fetch` that can be used to integrate with custom backend token generation endpoints. +/// - Note: The response is expected to be a `Token.Response` object. +public protocol TokenEndpoint: TokenSource { + /// The URL endpoint for token generation. + var url: URL { get } + /// The HTTP method to use (defaults to "POST"). + var method: String { get } + /// Additional HTTP headers to include with the request. + var headers: [String: String] { get } +} + +public extension TokenEndpoint { + var method: String { "POST" } + var headers: [String: String] { [:] } + + func fetch(_ request: Token.Request) async throws -> Token.Response { + var urlRequest = URLRequest(url: url) + + urlRequest.httpMethod = method + for (key, value) in headers { + urlRequest.addValue(value, forHTTPHeaderField: key) + } + urlRequest.httpBody = try JSONEncoder().encode(request) + + let (data, response) = try await URLSession.shared.data(for: urlRequest) + + guard let httpResponse = response as? HTTPURLResponse else { + throw LiveKitError(.network, message: "Error generating token from the token server, no response") + } + + guard (200 ..< 300).contains(httpResponse.statusCode) else { + throw LiveKitError(.network, message: "Error generating token from the token server, received \(httpResponse)") + } + + return try JSONDecoder().decode(Token.Response.self, from: data) + } +} + +// MARK: - Cache + +/// `CachingTokenSource` handles caching of credentials from any other `TokenSource` using configurable store. +public actor CachingTokenSource: TokenSource, Loggable { + /// A tuple containing the request and response that were cached. + public typealias Cached = (Token.Request, Token.Response) + /// A closure that validates whether cached credentials are still valid. + /// - Parameters: + /// - request: The original token request + /// - response: The cached token response + /// - Returns: `true` if the cached credentials are still valid, `false` otherwise + public typealias TokenValidator = (Token.Request, Token.Response) -> Bool + + private let source: TokenSource + private let store: TokenStore + private let validator: TokenValidator + + /// Initialize a caching wrapper around any credentials provider. + /// - Parameters: + /// - source: The underlying token source to wrap + /// - store: The store implementation to use for caching (defaults to in-memory store) + /// - validator: A closure to determine if cached credentials are still valid (defaults to JWT expiration check) + public init( + _ source: TokenSource, + store: TokenStore = InMemoryTokenStore(), + validator: @escaping TokenValidator = { _, response in response.hasValidToken() } + ) { + self.source = source + self.store = store + self.validator = validator + } + + public func fetch(_ request: Token.Request) async throws -> Token.Response { + if let (cachedRequest, cachedResponse) = await store.retrieve(), + cachedRequest == request, + validator(cachedRequest, cachedResponse) + { + log("Using cached credentials", .debug) + return cachedResponse + } + + log("Requesting new credentials", .debug) + let response = try await source.fetch(request) + await store.store((request, response)) + return response + } + + /// Invalidate the cached credentials, forcing a fresh fetch on the next request. + public func invalidate() async { + await store.clear() + } + + /// Get the cached credentials + /// - Returns: The cached token if found, nil otherwise + public func cachedToken() async -> Token.Response? { + await store.retrieve()?.1 + } +} + +// MARK: - Store + +/// Protocol for abstract store that can persist and retrieve a single cached credential pair. +/// Implement this protocol to create custom store implementations e.g. for Keychain. +public protocol TokenStore: Sendable { + /// Store credentials in the store (replaces any existing credentials) + func store(_ credentials: CachingTokenSource.Cached) async + + /// Retrieve the cached credentials + /// - Returns: The cached credentials if found, nil otherwise + func retrieve() async -> CachingTokenSource.Cached? + + /// Clear the stored credentials + func clear() async +} + +/// Simple in-memory store implementation +public actor InMemoryTokenStore: TokenStore { + private var cached: CachingTokenSource.Cached? + + public init() {} + + public func store(_ credentials: CachingTokenSource.Cached) async { + cached = credentials + } + + public func retrieve() async -> CachingTokenSource.Cached? { + cached + } + + public func clear() async { + cached = nil + } +} + +// MARK: - Validation + +public extension Token.Response { + /// Validates whether the JWT token is still valid. + /// - Parameter tolerance: Time tolerance in seconds for token expiration check (default: 60 seconds) + /// - Returns: `true` if the token is valid and not expired, `false` otherwise + func hasValidToken(withTolerance tolerance: TimeInterval = 60) -> Bool { + guard let jwt = jwt() else { + return false + } + + do { + try jwt.nbf.verifyNotBefore() + try jwt.exp.verifyNotExpired(currentDate: Date().addingTimeInterval(tolerance)) + } catch { + return false + } + + return true + } + + /// Extracts the JWT payload from the participant token. + /// - Returns: The JWT payload if found, nil otherwise + func jwt() -> LiveKitJWTPayload? { + LiveKitJWTPayload.fromUnverified(token: participantToken) + } +} diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index 2d1675494..95379ff52 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -82,6 +82,9 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable { @objc public var publishersCount: Int { _state.numPublishers } + // Credentials + public var tokenSource: (any TokenSource)? + // expose engine's vars @objc public var url: String? { _state.url?.absoluteString } @@ -408,6 +411,17 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable { log("Connected to \(String(describing: self))", .info) } + public func connect(tokenSource: TokenSource, + tokenOptions: Token.Options = .init(), + connectOptions: ConnectOptions? = nil, + roomOptions: RoomOptions? = nil) async throws + { + self.tokenSource = tokenSource + + let token = try await tokenSource.fetch(tokenOptions) + try await connect(url: token.serverURL.absoluteString, token: token.participantToken, connectOptions: connectOptions, roomOptions: roomOptions) + } + @objc public func disconnect() async { // Return if already disconnected state diff --git a/Sources/LiveKit/Support/ObservableObject+.swift b/Sources/LiveKit/Support/ObservableObject+.swift new file mode 100644 index 000000000..68a3d7a18 --- /dev/null +++ b/Sources/LiveKit/Support/ObservableObject+.swift @@ -0,0 +1,37 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@preconcurrency import Combine + +extension ObservableObject { + /// An async sequence that emits the `objectWillChange` events. + var changes: any AsyncSequence { + if #available(macOS 12.0, iOS 15.0, tvOS 15.0, *) { + // This is necessary due to ObservableObjectPublisher not respecting the demand. + // See: https://forums.swift.org/t/asyncpublisher-causes-crash-in-rather-simple-situation + objectWillChange.buffer(size: 1, prefetch: .byRequest, whenFull: .dropOldest).values + } else { + AsyncStream { continuation in + let cancellable = objectWillChange.sink { _ in + continuation.yield() + } + continuation.onTermination = { _ in + cancellable.cancel() + } + } + } + } +} diff --git a/Sources/LiveKit/Track/VideoTrack.swift b/Sources/LiveKit/Track/VideoTrack.swift index 07bd636f3..49bf94520 100644 --- a/Sources/LiveKit/Track/VideoTrack.swift +++ b/Sources/LiveKit/Track/VideoTrack.swift @@ -69,3 +69,11 @@ extension VideoTrack { return missingCodecs } } + +public extension VideoTrack { + /// The aspect ratio of the video track or 1 if the dimensions are not available. + var aspectRatio: CGFloat { + guard let dimensions else { return 1 } + return CGFloat(dimensions.width) / CGFloat(dimensions.height) + } +} diff --git a/Sources/LiveKit/Types/Attributes/AttributeTypings.swift b/Sources/LiveKit/Types/Attributes/AttributeTypings.swift index 35dbc8f0b..9fc56e609 100644 --- a/Sources/LiveKit/Types/Attributes/AttributeTypings.swift +++ b/Sources/LiveKit/Types/Attributes/AttributeTypings.swift @@ -20,6 +20,35 @@ import Foundation extension AgentAttributes: Hashable {} extension AgentAttributes: Equatable {} +// Bool as String encoding +extension TranscriptionAttributes { + init(from decoder: Decoder) throws { + let container = try decoder.container(keyedBy: CodingKeys.self) + lkSegmentID = try container.decodeIfPresent(String.self, forKey: .lkSegmentID) + lkTranscribedTrackID = try container.decodeIfPresent(String.self, forKey: .lkTranscribedTrackID) + + // Decode as Bool first, fallback to String + if let boolValue = try? container.decodeIfPresent(Bool.self, forKey: .lkTranscriptionFinal) { + lkTranscriptionFinal = boolValue + } else if let stringValue = try? container.decodeIfPresent(String.self, forKey: .lkTranscriptionFinal) { + lkTranscriptionFinal = (stringValue as NSString).boolValue + } else { + lkTranscriptionFinal = nil + } + } + + func encode(to encoder: Encoder) throws { + var container = encoder.container(keyedBy: CodingKeys.self) + try container.encodeIfPresent(lkSegmentID, forKey: .lkSegmentID) + try container.encodeIfPresent(lkTranscribedTrackID, forKey: .lkTranscribedTrackID) + + // Always encode Bool as a string if it exists + if let boolValue = lkTranscriptionFinal { + try container.encode(boolValue ? "true" : "false", forKey: .lkTranscriptionFinal) + } + } +} + // MARK: - AgentAttributes struct AgentAttributes: Codable, Sendable { diff --git a/Sources/LiveKit/Types/RoomConfiguration.swift b/Sources/LiveKit/Types/RoomConfiguration.swift new file mode 100644 index 000000000..6e54a768b --- /dev/null +++ b/Sources/LiveKit/Types/RoomConfiguration.swift @@ -0,0 +1,102 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +public struct RoomConfiguration: Encodable, Sendable, Equatable { + /// Room name, used as ID, must be unique + public let name: String? + + /// Number of seconds to keep the room open if no one joins + public let emptyTimeout: UInt32? + + /// Number of seconds to keep the room open after everyone leaves + public let departureTimeout: UInt32? + + /// Limit number of participants that can be in a room, excluding Egress and Ingress participants + public let maxParticipants: UInt32? + + /// Metadata of room + public let metadata: String? + + /// Minimum playout delay of subscriber + public let minPlayoutDelay: UInt32? + + /// Maximum playout delay of subscriber + public let maxPlayoutDelay: UInt32? + + /// Improves A/V sync when playout_delay set to a value larger than 200ms. + /// It will disable transceiver re-use so not recommended for rooms with frequent subscription changes + public let syncStreams: Bool? + + /// Define agents that should be dispatched to this room + public let agents: [RoomAgentDispatch]? + + enum CodingKeys: String, CodingKey { + case name + case emptyTimeout = "empty_timeout" + case departureTimeout = "departure_timeout" + case maxParticipants = "max_participants" + case metadata + case minPlayoutDelay = "min_playout_delay" + case maxPlayoutDelay = "max_playout_delay" + case syncStreams = "sync_streams" + case agents + } + + public init( + name: String? = nil, + emptyTimeout: UInt32? = nil, + departureTimeout: UInt32? = nil, + maxParticipants: UInt32? = nil, + metadata: String? = nil, + minPlayoutDelay: UInt32? = nil, + maxPlayoutDelay: UInt32? = nil, + syncStreams: Bool? = nil, + agents: [RoomAgentDispatch]? = nil + ) { + self.name = name + self.emptyTimeout = emptyTimeout + self.departureTimeout = departureTimeout + self.maxParticipants = maxParticipants + self.metadata = metadata + self.minPlayoutDelay = minPlayoutDelay + self.maxPlayoutDelay = maxPlayoutDelay + self.syncStreams = syncStreams + self.agents = agents + } +} + +public struct RoomAgentDispatch: Encodable, Sendable, Equatable { + /// Name of the agent to dispatch + public let agentName: String? + + /// Metadata for the agent + public let metadata: String? + + enum CodingKeys: String, CodingKey { + case agentName = "agent_name" + case metadata + } + + public init( + agentName: String? = nil, + metadata: String? = nil + ) { + self.agentName = agentName + self.metadata = metadata + } +} diff --git a/Tests/LiveKitTests/Agent/TranscriptionTests.swift b/Tests/LiveKitTests/Agent/TranscriptionTests.swift new file mode 100644 index 000000000..1090d8532 --- /dev/null +++ b/Tests/LiveKitTests/Agent/TranscriptionTests.swift @@ -0,0 +1,185 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@testable import LiveKit +import OrderedCollections +import XCTest + +actor MessageCollector { + private var updates: [ReceivedMessage] = [] + private var messages: OrderedDictionary = [:] + + func add(_ message: ReceivedMessage) { + updates.append(message) + messages[message.id] = message + } + + func getUpdates() -> [ReceivedMessage] { + updates + } + + func getMessages() -> OrderedDictionary { + messages + } +} + +class TranscriptionTests: LKTestCase, @unchecked Sendable { + private var rooms: [Room] = [] + private var receiver: TranscriptionStreamReceiver! + private var senderRoom: Room! + private var messageCollector: MessageCollector! + private var collectionTask: Task! + private var messageExpectation: XCTestExpectation! + + // Same segment, same stream + func testUpdates() async throws { + let segmentID = "test-segment" + let streamID = UUID().uuidString + let testChunks = ["Hey", " there!", " What's up?"] + let expectedContent = ["Hey", "Hey there!", "Hey there! What's up?"] + + try await runTranscriptionTest( + chunks: testChunks, + segmentID: segmentID, + streamID: streamID, + expectedContent: expectedContent + ) + } + + // Same segment, different stream + func testReplace() async throws { + let segmentID = "test-segment" + let testChunks = ["Hey", "Hey there!", "Hey there! What's up?"] + let expectedContent = ["Hey", "Hey there!", "Hey there! What's up?"] + + try await runTranscriptionTest( + chunks: testChunks, + segmentID: segmentID, + streamID: nil, + expectedContent: expectedContent + ) + } + + private func setupTestEnvironment(expectedCount: Int) async throws { + messageExpectation = expectation(description: "Receives all message updates") + messageExpectation.expectedFulfillmentCount = expectedCount + + receiver = TranscriptionStreamReceiver(room: rooms[0]) + let messageStream = try await receiver.messages() + messageCollector = MessageCollector() + senderRoom = rooms[1] + + collectionTask = Task { @Sendable in + var iterator = messageStream.makeAsyncIterator() + while let message = await iterator.next() { + await self.messageCollector.add(message) + self.messageExpectation.fulfill() + } + } + } + + private func sendTranscriptionChunks( + chunks: [String], + segmentID: String, + streamID: String? = nil, + to room: Room + ) async throws { + let topic = "lk.transcription" + + for (index, chunk) in chunks.enumerated() { + let isLast = index == chunks.count - 1 + + var attributes: [String: String] = [ + "lk.segment_id": segmentID, + "lk.transcription_final": "false", + ] + + if isLast { + attributes["lk.transcription_final"] = "true" + } + + let options = StreamTextOptions( + topic: topic, + attributes: attributes, + id: streamID ?? UUID().uuidString + ) + + try await room.localParticipant.sendText(chunk, options: options) + try await Task.sleep(nanoseconds: 10_000_000) + } + } + + private func validateTranscriptionResults( + updates: [ReceivedMessage], + messages: OrderedDictionary, + segmentID: String, + expectedContent: [String] + ) { + // Validate updates + XCTAssertEqual(updates.count, expectedContent.count) + for (index, expected) in expectedContent.enumerated() { + XCTAssertEqual(updates[index].content, .agentTranscript(expected)) + XCTAssertEqual(updates[index].id, segmentID) + } + + // Validate timestamps are consistent + let firstTimestamp = updates[0].timestamp + for update in updates { + XCTAssertEqual(update.timestamp, firstTimestamp) + } + + // Validate final message + XCTAssertEqual(messages.count, 1) + XCTAssertEqual(messages.keys[0], segmentID) + XCTAssertEqual(messages.values[0].content, .agentTranscript(expectedContent.last!)) + XCTAssertEqual(messages.values[0].id, segmentID) + XCTAssertEqual(messages.values[0].timestamp, firstTimestamp) + } + + private func runTranscriptionTest( + chunks: [String], + segmentID: String, + streamID: String? = nil, + expectedContent: [String] + ) async throws { + try await withRooms([ + RoomTestingOptions(canSubscribe: true), + RoomTestingOptions(canPublishData: true), + ]) { rooms in + self.rooms = rooms + try await self.setupTestEnvironment(expectedCount: expectedContent.count) + try await self.sendTranscriptionChunks( + chunks: chunks, + segmentID: segmentID, + streamID: streamID, + to: self.senderRoom + ) + + await self.fulfillment(of: [self.messageExpectation], timeout: 5) + self.collectionTask.cancel() + + let updates = await self.messageCollector.getUpdates() + let messages = await self.messageCollector.getMessages() + + self.validateTranscriptionResults( + updates: updates, + messages: messages, + segmentID: segmentID, + expectedContent: expectedContent + ) + } + } +} diff --git a/Tests/LiveKitTests/Auth/TokenSourceTests.swift b/Tests/LiveKitTests/Auth/TokenSourceTests.swift new file mode 100644 index 000000000..37841af17 --- /dev/null +++ b/Tests/LiveKitTests/Auth/TokenSourceTests.swift @@ -0,0 +1,267 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation +@testable import LiveKit +import XCTest + +class TokenSourceTests: LKTestCase { + actor MockValidJWTSource: TokenSource { + let serverURL = URL(string: "wss://test.livekit.io")! + let participantName: String + var callCount = 0 + + init(participantName: String = "test-participant") { + self.participantName = participantName + } + + func fetch(_ request: Token.Request) async throws -> Token.Response { + callCount += 1 + + let tokenGenerator = TokenGenerator( + apiKey: "test-api-key", + apiSecret: "test-api-secret", + identity: request.participantIdentity ?? "test-identity" + ) + tokenGenerator.name = request.participantName ?? participantName + tokenGenerator.videoGrant = LiveKitJWTPayload.VideoGrant(room: request.roomName ?? "test-room", roomJoin: true) + + let token = try tokenGenerator.sign() + + return Token.Response( + serverURL: serverURL, + participantToken: token + ) + } + } + + actor MockInvalidJWTSource: TokenSource { + let serverURL = URL(string: "wss://test.livekit.io")! + var callCount = 0 + + func fetch(_: Token.Request) async throws -> Token.Response { + callCount += 1 + + return Token.Response( + serverURL: serverURL, + participantToken: "invalid.jwt.token" + ) + } + } + + actor MockExpiredJWTSource: TokenSource { + let serverURL = URL(string: "wss://test.livekit.io")! + var callCount = 0 + + func fetch(_ request: Token.Request) async throws -> Token.Response { + callCount += 1 + + let tokenGenerator = TokenGenerator( + apiKey: "test-api-key", + apiSecret: "test-api-secret", + identity: request.participantIdentity ?? "test-identity", + ttl: -60 + ) + tokenGenerator.name = request.participantName ?? "test-participant" + tokenGenerator.videoGrant = LiveKitJWTPayload.VideoGrant(room: request.roomName ?? "test-room", roomJoin: true) + + let token = try tokenGenerator.sign() + + return Token.Response( + serverURL: serverURL, + participantToken: token + ) + } + } + + func testValidJWTCaching() async throws { + let mockSource = MockValidJWTSource(participantName: "alice") + let cachingSource = CachingTokenSource(mockSource) + + let request = Token.Request( + roomName: "test-room", + participantName: "alice", + participantIdentity: "alice-id" + ) + + let response1 = try await cachingSource.fetch(request) + let callCount1 = await mockSource.callCount + XCTAssertEqual(callCount1, 1) + XCTAssertEqual(response1.serverURL.absoluteString, "wss://test.livekit.io") + XCTAssertTrue(response1.hasValidToken(), "Generated token should be valid") + + let response2 = try await cachingSource.fetch(request) + let callCount2 = await mockSource.callCount + XCTAssertEqual(callCount2, 1) + XCTAssertEqual(response2.participantToken, response1.participantToken) + XCTAssertEqual(response2.serverURL, response1.serverURL) + + let differentRequest = Token.Request( + roomName: "different-room", + participantName: "alice", + participantIdentity: "alice-id" + ) + let response3 = try await cachingSource.fetch(differentRequest) + let callCount3 = await mockSource.callCount + XCTAssertEqual(callCount3, 2) + XCTAssertNotEqual(response3.participantToken, response1.participantToken) + + await cachingSource.invalidate() + _ = try await cachingSource.fetch(request) + let callCount4 = await mockSource.callCount + XCTAssertEqual(callCount4, 3) + } + + func testInvalidJWTHandling() async throws { + let mockInvalidSource = MockInvalidJWTSource() + let cachingSource = CachingTokenSource(mockInvalidSource) + + let request = Token.Request( + roomName: "test-room", + participantName: "bob", + participantIdentity: "bob-id" + ) + + let response1 = try await cachingSource.fetch(request) + let callCount1 = await mockInvalidSource.callCount + XCTAssertEqual(callCount1, 1) + XCTAssertFalse(response1.hasValidToken(), "Invalid token should not be considered valid") + + let response2 = try await cachingSource.fetch(request) + let callCount2 = await mockInvalidSource.callCount + XCTAssertEqual(callCount2, 2) + XCTAssertEqual(response2.participantToken, response1.participantToken) + + let mockExpiredSource = MockExpiredJWTSource() + let cachingSourceExpired = CachingTokenSource(mockExpiredSource) + + let response3 = try await cachingSourceExpired.fetch(request) + let expiredCallCount1 = await mockExpiredSource.callCount + XCTAssertEqual(expiredCallCount1, 1) + XCTAssertFalse(response3.hasValidToken(), "Expired token should not be considered valid") + + _ = try await cachingSourceExpired.fetch(request) + let expiredCallCount2 = await mockExpiredSource.callCount + XCTAssertEqual(expiredCallCount2, 2) + } + + func testCustomValidator() async throws { + let mockSource = MockValidJWTSource(participantName: "charlie") + + let customValidator: CachingTokenSource.TokenValidator = { request, response in + request.participantName == "charlie" && response.hasValidToken() + } + + let cachingSource = CachingTokenSource(mockSource, validator: customValidator) + + let charlieRequest = Token.Request( + roomName: "test-room", + participantName: "charlie", + participantIdentity: "charlie-id" + ) + + let response1 = try await cachingSource.fetch(charlieRequest) + let callCount1 = await mockSource.callCount + XCTAssertEqual(callCount1, 1) + XCTAssertTrue(response1.hasValidToken()) + + let response2 = try await cachingSource.fetch(charlieRequest) + let callCount2 = await mockSource.callCount + XCTAssertEqual(callCount2, 1) + XCTAssertEqual(response2.participantToken, response1.participantToken) + + let aliceRequest = Token.Request( + roomName: "test-room", + participantName: "alice", + participantIdentity: "alice-id" + ) + + _ = try await cachingSource.fetch(aliceRequest) + let callCount3 = await mockSource.callCount + XCTAssertEqual(callCount3, 2) + + _ = try await cachingSource.fetch(aliceRequest) + let callCount4 = await mockSource.callCount + XCTAssertEqual(callCount4, 3) + + let tokenMockSource = MockValidJWTSource(participantName: "dave") + let tokenContentValidator: CachingTokenSource.TokenValidator = { request, response in + request.roomName == "test-room" && response.hasValidToken() + } + + let tokenCachingSource = CachingTokenSource(tokenMockSource, validator: tokenContentValidator) + + let roomRequest = Token.Request( + roomName: "test-room", + participantName: "dave", + participantIdentity: "dave-id" + ) + + _ = try await tokenCachingSource.fetch(roomRequest) + let tokenCallCount1 = await tokenMockSource.callCount + XCTAssertEqual(tokenCallCount1, 1) + + _ = try await tokenCachingSource.fetch(roomRequest) + let tokenCallCount2 = await tokenMockSource.callCount + XCTAssertEqual(tokenCallCount2, 1) + + let differentRoomRequest = Token.Request( + roomName: "different-room", + participantName: "dave", + participantIdentity: "dave-id" + ) + + _ = try await tokenCachingSource.fetch(differentRoomRequest) + let tokenCallCount3 = await tokenMockSource.callCount + XCTAssertEqual(tokenCallCount3, 2) + + _ = try await tokenCachingSource.fetch(differentRoomRequest) + let tokenCallCount4 = await tokenMockSource.callCount + XCTAssertEqual(tokenCallCount4, 3) + } + + func testConcurrentAccess() async throws { + let mockSource = MockValidJWTSource(participantName: "concurrent-test") + let cachingSource = CachingTokenSource(mockSource) + + let request = Token.Request( + roomName: "concurrent-room", + participantName: "concurrent-user", + participantIdentity: "concurrent-id" + ) + + let initialResponse = try await cachingSource.fetch(request) + let initialCallCount = await mockSource.callCount + XCTAssertEqual(initialCallCount, 1) + + async let fetch1 = cachingSource.fetch(request) + async let fetch2 = cachingSource.fetch(request) + async let fetch3 = cachingSource.fetch(request) + + let responses = try await [fetch1, fetch2, fetch3] + + XCTAssertEqual(responses[0].participantToken, initialResponse.participantToken) + XCTAssertEqual(responses[1].participantToken, initialResponse.participantToken) + XCTAssertEqual(responses[2].participantToken, initialResponse.participantToken) + + XCTAssertEqual(responses[0].serverURL, initialResponse.serverURL) + XCTAssertEqual(responses[1].serverURL, initialResponse.serverURL) + XCTAssertEqual(responses[2].serverURL, initialResponse.serverURL) + + let finalCallCount = await mockSource.callCount + XCTAssertEqual(finalCallCount, 1) + } +} diff --git a/Tests/LiveKitTests/Support/Room.swift b/Tests/LiveKitTests/Support/Room.swift index b9cdc33d4..3493e97a2 100644 --- a/Tests/LiveKitTests/Support/Room.swift +++ b/Tests/LiveKitTests/Support/Room.swift @@ -76,12 +76,12 @@ extension LKTestCase { apiSecret: apiSecret, identity: identity) - tokenGenerator.videoGrant = VideoGrant(room: room, - roomJoin: true, - canPublish: canPublish, - canSubscribe: canSubscribe, - canPublishData: canPublishData, - canPublishSources: canPublishSources.map(String.init)) + tokenGenerator.videoGrant = LiveKitJWTPayload.VideoGrant(room: room, + roomJoin: true, + canPublish: canPublish, + canSubscribe: canSubscribe, + canPublishData: canPublishData, + canPublishSources: canPublishSources.map(String.init)) return try tokenGenerator.sign() } diff --git a/Tests/LiveKitTests/Support/TokenGenerator.swift b/Tests/LiveKitTests/Support/TokenGenerator.swift index db304b27d..ccd22ed8c 100644 --- a/Tests/LiveKitTests/Support/TokenGenerator.swift +++ b/Tests/LiveKitTests/Support/TokenGenerator.swift @@ -16,83 +16,9 @@ import Foundation import JWTKit - -public struct VideoGrant: Codable, Equatable { - /** name of the room, must be set for admin or join permissions */ - let room: String? - /** permission to create a room */ - let roomCreate: Bool? - /** permission to join a room as a participant, room must be set */ - let roomJoin: Bool? - /** permission to list rooms */ - let roomList: Bool? - /** permission to start a recording */ - let roomRecord: Bool? - /** permission to control a specific room, room must be set */ - let roomAdmin: Bool? - - /** - * allow participant to publish. If neither canPublish or canSubscribe is set, - * both publish and subscribe are enabled - */ - let canPublish: Bool? - /** allow participant to subscribe to other tracks */ - let canSubscribe: Bool? - /** - * allow participants to publish data, defaults to true if not set - */ - let canPublishData: Bool? - /** allowed sources for publishing */ - let canPublishSources: [String]? // String as returned in the JWT - /** participant isn't visible to others */ - let hidden: Bool? - /** participant is recording the room, when set, allows room to indicate it's being recorded */ - let recorder: Bool? - - init(room: String? = nil, - roomCreate: Bool? = nil, - roomJoin: Bool? = nil, - roomList: Bool? = nil, - roomRecord: Bool? = nil, - roomAdmin: Bool? = nil, - canPublish: Bool? = nil, - canSubscribe: Bool? = nil, - canPublishData: Bool? = nil, - canPublishSources: [String]? = nil, - hidden: Bool? = nil, - recorder: Bool? = nil) - { - self.room = room - self.roomCreate = roomCreate - self.roomJoin = roomJoin - self.roomList = roomList - self.roomRecord = roomRecord - self.roomAdmin = roomAdmin - self.canPublish = canPublish - self.canSubscribe = canSubscribe - self.canPublishData = canPublishData - self.canPublishSources = canPublishSources - self.hidden = hidden - self.recorder = recorder - } -} +@testable import LiveKit public class TokenGenerator { - private struct Payload: JWTPayload, Equatable { - let exp: ExpirationClaim - let iss: IssuerClaim - let nbf: NotBeforeClaim - let sub: SubjectClaim - - let name: String? - let metadata: String? - let video: VideoGrant? - - func verify(using _: JWTSigner) throws { - fatalError("not implemented") - } - } - // 30 mins static let defaultTTL: TimeInterval = 30 * 60 @@ -104,7 +30,7 @@ public class TokenGenerator { public var ttl: TimeInterval public var name: String? public var metadata: String? - public var videoGrant: VideoGrant? + public var videoGrant: LiveKitJWTPayload.VideoGrant? // MARK: - Private @@ -127,13 +53,13 @@ public class TokenGenerator { let n = Date().timeIntervalSince1970 - let p = Payload(exp: .init(value: Date(timeIntervalSince1970: floor(n + ttl))), - iss: .init(stringLiteral: apiKey), - nbf: .init(value: Date(timeIntervalSince1970: floor(n))), - sub: .init(stringLiteral: identity), - name: name, - metadata: metadata, - video: videoGrant) + let p = LiveKitJWTPayload(exp: .init(value: Date(timeIntervalSince1970: floor(n + ttl))), + iss: .init(stringLiteral: apiKey), + nbf: .init(value: Date(timeIntervalSince1970: floor(n))), + sub: .init(stringLiteral: identity), + name: name, + metadata: metadata, + video: videoGrant) return try signers.sign(p) }