Skip to content

Commit 746f999

Browse files
committed
fix(realtime): make realtime default to MainActor
1 parent 2564588 commit 746f999

File tree

3 files changed

+89
-158
lines changed

3 files changed

+89
-158
lines changed

Sources/Realtime/RealtimeChannelV2.swift

Lines changed: 10 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,32 +24,33 @@ public struct RealtimeChannelConfig: Sendable {
2424
public var isPrivate: Bool
2525
}
2626

27-
protocol RealtimeChannelProtocol: AnyObject, Sendable {
28-
@MainActor var config: RealtimeChannelConfig { get }
27+
@MainActor
28+
protocol RealtimeChannelProtocol: AnyObject {
29+
var config: RealtimeChannelConfig { get }
2930
var topic: String { get }
3031
var logger: (any SupabaseLogger)? { get }
3132

3233
var socket: any RealtimeClientProtocol { get }
3334
}
3435

36+
@MainActor
3537
public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
3638
struct MutableState {
3739
var clientChanges: [PostgresJoinConfig] = []
3840
var joinRef: String?
3941
var pushes: [String: PushV2] = [:]
4042
}
4143

42-
@MainActor
4344
private var mutableState = MutableState()
4445

4546
let topic: String
4647

47-
@MainActor var config: RealtimeChannelConfig
48+
var config: RealtimeChannelConfig
4849

4950
let logger: (any SupabaseLogger)?
5051
let socket: any RealtimeClientProtocol
5152

52-
@MainActor var joinRef: String? { mutableState.joinRef }
53+
var joinRef: String? { mutableState.joinRef }
5354

5455
let callbackManager = CallbackManager()
5556
private let statusSubject = AsyncValueSubject<RealtimeChannelStatus>(.unsubscribed)
@@ -162,13 +163,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
162163
throw RealtimeError.maxRetryAttemptsReached
163164
}
164165

165-
/// Subscribes to the channel.
166-
@available(*, deprecated, message: "Use `subscribeWithError` instead")
167-
@MainActor
168-
public func subscribe() async {
169-
try? await subscribeWithError()
170-
}
171-
172166
/// Calculates retry delay with exponential backoff and jitter
173167
private func calculateRetryDelay(for attempt: Int) -> TimeInterval {
174168
let baseDelay: TimeInterval = 1.0
@@ -186,7 +180,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
186180
}
187181

188182
/// Subscribes to the channel
189-
@MainActor
190183
private func _subscribe() async {
191184
if socket.status != .connected {
192185
if socket.options.connectOnSubscribe != true {
@@ -236,20 +229,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
236229
await push(ChannelEvent.leave)
237230
}
238231

239-
@available(
240-
*,
241-
deprecated,
242-
message:
243-
"manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead."
244-
)
245-
public func updateAuth(jwt: String?) async {
246-
logger?.debug("Updating auth token for channel \(topic)")
247-
await push(
248-
ChannelEvent.accessToken,
249-
payload: ["access_token": jwt.map { .string($0) } ?? .null]
250-
)
251-
}
252-
253232
/// Sends a broadcast message explicitly via REST API.
254233
///
255234
/// This method always uses the REST API endpoint regardless of WebSocket connection state.
@@ -295,7 +274,7 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
295274
}
296275
headers[.authorization] = "Bearer \(accessToken)"
297276

298-
let body = try await JSONEncoder.supabase().encode(
277+
let body = try JSONEncoder.supabase().encode(
299278
BroadcastMessagePayload(
300279
messages: [
301280
BroadcastMessagePayload.Message(
@@ -317,7 +296,7 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
317296

318297
let response = try await withTimeout(interval: timeout ?? socket.options.timeoutInterval) {
319298
[self] in
320-
await Result {
299+
await Result { @Sendable in
321300
try await socket.http.send(request)
322301
}
323302
}.get()
@@ -475,7 +454,7 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
475454
throw RealtimeError("Received a reply with unexpected payload: \(message)")
476455
}
477456

478-
await didReceiveReply(ref: ref, status: status)
457+
didReceiveReply(ref: ref, status: status)
479458

480459
if message.payload["response"]?.objectValue?.keys
481460
.contains(ChannelEvent.postgresChanges) == true
@@ -692,9 +671,7 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
692671
filter: filter
693672
)
694673

695-
Task { @MainActor in
696-
mutableState.clientChanges.append(config)
697-
}
674+
mutableState.clientChanges.append(config)
698675

699676
let id = callbackManager.addPostgresCallback(filter: config, callback: callback)
700677
return RealtimeSubscription { [weak callbackManager, logger] in
@@ -733,7 +710,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
733710
self.onSystem { _ in callback() }
734711
}
735712

736-
@MainActor
737713
@discardableResult
738714
func push(_ event: String, ref: String? = nil, payload: JSONObject = [:]) async -> PushStatus {
739715
let message = RealtimeMessageV2(
@@ -752,7 +728,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
752728
return await push.send()
753729
}
754730

755-
@MainActor
756731
private func didReceiveReply(ref: String, status: String) {
757732
let push = mutableState.pushes.removeValue(forKey: ref)
758733
push?.didReceive(status: PushStatus(rawValue: status) ?? .ok)

0 commit comments

Comments
 (0)