Skip to content

feat(realtime): subscribe retry improvements #747

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Sources/Helpers/Task+withTimeout.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import Foundation
@discardableResult
package func withTimeout<R: Sendable>(
interval: TimeInterval,
@_inheritActorContext operation: @escaping @Sendable () async throws -> R
@_inheritActorContext operation: @escaping @Sendable () async -> R
) async throws -> R {
try await withThrowingTaskGroup(of: R.self) { group in
defer {
Expand All @@ -20,7 +20,7 @@ package func withTimeout<R: Sendable>(
let deadline = Date(timeIntervalSinceNow: interval)

group.addTask {
try await operation()
await operation()
}

group.addTask {
Expand Down
134 changes: 109 additions & 25 deletions Sources/Realtime/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,101 @@ public final class RealtimeChannelV2: Sendable {
callbackManager.reset()
}

/// Subscribes to the channel
/// Subscribes to the channel.
public func subscribeWithError() async throws {
logger?.debug("Starting subscription to channel '\(topic)' (attempt 1/\(socket.options.maxRetryAttempts))")

status = .subscribing

defer {
// If the subscription fails, we need to set the status to unsubscribed
// to avoid the channel being stuck in a subscribing state.
if status != .subscribed {
status = .unsubscribed
}
}

var attempts = 0

while attempts < socket.options.maxRetryAttempts {
attempts += 1

do {
logger?.debug(
"Attempting to subscribe to channel '\(topic)' (attempt \(attempts)/\(socket.options.maxRetryAttempts))"
)

try await withTimeout(interval: socket.options.timeoutInterval) { [self] in
await _subscribe()
}

logger?.debug("Successfully subscribed to channel '\(topic)'")
return

} catch is TimeoutError {
logger?.debug(
"Subscribe timed out for channel '\(topic)' (attempt \(attempts)/\(socket.options.maxRetryAttempts))"
)

if attempts < socket.options.maxRetryAttempts {
// Add exponential backoff with jitter
let delay = calculateRetryDelay(for: attempts)
logger?.debug(
"Retrying subscription to channel '\(topic)' in \(String(format: "%.2f", delay)) seconds..."
)

do {
try await _clock.sleep(for: delay)
} catch {
// If sleep is cancelled, break out of retry loop
logger?.debug("Subscription retry cancelled for channel '\(topic)'")
throw CancellationError()
}
} else {
logger?.error(
"Failed to subscribe to channel '\(topic)' after \(socket.options.maxRetryAttempts) attempts due to timeout"
)
}
} catch is CancellationError {
logger?.debug("Subscription retry cancelled for channel '\(topic)'")
throw CancellationError()
} catch {
preconditionFailure(
"The only possible error here is TimeoutError or CancellationError, this should never happen."
)
}
}

logger?.error("Subscription to channel '\(topic)' failed after \(attempts) attempts")
throw RealtimeError.maxRetryAttemptsReached
}

/// Subscribes to the channel.
@available(*, deprecated, message: "Use `subscribeWithError` instead")
@MainActor
public func subscribe() async {
try? await subscribeWithError()
}

/// Calculates retry delay with exponential backoff and jitter
private func calculateRetryDelay(for attempt: Int) -> TimeInterval {
let baseDelay: TimeInterval = 1.0
let maxDelay: TimeInterval = 30.0
let backoffMultiplier: Double = 2.0

let exponentialDelay = baseDelay * pow(backoffMultiplier, Double(attempt - 1))
let cappedDelay = min(exponentialDelay, maxDelay)

// Add jitter (±25% random variation) to prevent thundering herd
let jitterRange = cappedDelay * 0.25
let jitter = Double.random(in: -jitterRange...jitterRange)

return max(0.1, cappedDelay + jitter)
}

/// Subscribes to the channel
@MainActor
private func _subscribe() async {
if socket.status != .connected {
if socket.options.connectOnSubscribe != true {
reportIssue(
Expand All @@ -96,7 +188,6 @@ public final class RealtimeChannelV2: Sendable {
await socket.connect()
}

status = .subscribing
logger?.debug("Subscribing to channel \(topic)")

config.presence.enabled = callbackManager.callbacks.contains(where: { $0.isPresence })
Expand Down Expand Up @@ -125,18 +216,7 @@ public final class RealtimeChannelV2: Sendable {
payload: try! JSONObject(payload)
)

do {
try await withTimeout(interval: socket.options.timeoutInterval) { [self] in
_ = await statusChange.first { @Sendable in $0 == .subscribed }
}
} catch {
if error is TimeoutError {
logger?.debug("Subscribe timed out.")
await subscribe()
} else {
logger?.error("Subscribe failed: \(error)")
}
}
_ = await statusChange.first { @Sendable in $0 == .subscribed }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's some consideration on what await subscribe() should wait for exactly. @filipecabaco explained that both the broadcast and the presence parts should be considered as subscribed on the message acknoledgement success, which is what is currently being done:

case .system:
if message.status == .ok {
logger?.debug("Subscribed to channel \(message.topic)")
status = .subscribed
} else {
logger?.debug(
"Failed to subscribe to channel \(message.topic): \(message.payload)"
)
}

but the postgres_changes part is only available when it receives the system message saying you're subscribed, which may take some time. This is already the current behavior, but maybe it is worth it adding this distinction in the RealtimeSubscribedStates, as a client may only be subscribed to broadcast, or presence or postgres_changes at any given time.

}

public func unsubscribe() async {
Expand Down Expand Up @@ -175,13 +255,6 @@ public final class RealtimeChannelV2: Sendable {
@MainActor
public func broadcast(event: String, message: JSONObject) async {
if status != .subscribed {
struct Message: Encodable {
let topic: String
let event: String
let payload: JSONObject
let `private`: Bool
}

var headers: HTTPFields = [.contentType: "application/json"]
if let apiKey = socket.options.apikey {
headers[.apiKey] = apiKey
Expand All @@ -190,23 +263,34 @@ public final class RealtimeChannelV2: Sendable {
headers[.authorization] = "Bearer \(accessToken)"
}

struct BroadcastMessagePayload: Encodable {
let messages: [Message]

struct Message: Encodable {
let topic: String
let event: String
let payload: JSONObject
let `private`: Bool
}
}

let task = Task { [headers] in
_ = try? await socket.http.send(
HTTPRequest(
url: socket.broadcastURL,
method: .post,
headers: headers,
body: JSONEncoder().encode(
[
"messages": [
Message(
BroadcastMessagePayload(
messages: [
BroadcastMessagePayload.Message(
topic: topic,
event: event,
payload: message,
private: config.isPrivate
)
]
]
)
)
)
)
Expand Down
7 changes: 7 additions & 0 deletions Sources/Realtime/RealtimeError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,10 @@ struct RealtimeError: LocalizedError {
self.errorDescription = errorDescription
}
}

extension RealtimeError {
/// The maximum retry attempts reached.
static var maxRetryAttemptsReached: Self {
Self("Maximum retry attempts reached.")
}
}
4 changes: 4 additions & 0 deletions Sources/Realtime/Types.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public struct RealtimeClientOptions: Sendable {
var timeoutInterval: TimeInterval
var disconnectOnSessionLoss: Bool
var connectOnSubscribe: Bool
var maxRetryAttempts: Int

/// Sets the log level for Realtime
var logLevel: LogLevel?
Expand All @@ -32,6 +33,7 @@ public struct RealtimeClientOptions: Sendable {
public static let defaultTimeoutInterval: TimeInterval = 10
public static let defaultDisconnectOnSessionLoss = true
public static let defaultConnectOnSubscribe: Bool = true
public static let defaultMaxRetryAttempts: Int = 5

public init(
headers: [String: String] = [:],
Expand All @@ -40,6 +42,7 @@ public struct RealtimeClientOptions: Sendable {
timeoutInterval: TimeInterval = Self.defaultTimeoutInterval,
disconnectOnSessionLoss: Bool = Self.defaultDisconnectOnSessionLoss,
connectOnSubscribe: Bool = Self.defaultConnectOnSubscribe,
maxRetryAttempts: Int = Self.defaultMaxRetryAttempts,
logLevel: LogLevel? = nil,
fetch: (@Sendable (_ request: URLRequest) async throws -> (Data, URLResponse))? = nil,
accessToken: (@Sendable () async throws -> String?)? = nil,
Expand All @@ -51,6 +54,7 @@ public struct RealtimeClientOptions: Sendable {
self.timeoutInterval = timeoutInterval
self.disconnectOnSessionLoss = disconnectOnSessionLoss
self.connectOnSubscribe = connectOnSubscribe
self.maxRetryAttempts = maxRetryAttempts
self.logLevel = logLevel
self.fetch = fetch
self.accessToken = accessToken
Expand Down
18 changes: 15 additions & 3 deletions Tests/IntegrationTests/RealtimeIntegrationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ struct TestLogger: SupabaseLogger {

await Task.yield()

await channel.subscribe()
do {
try await channel.subscribeWithError()
} catch {
XCTFail("Expected .subscribed but got error: \(error)")
}

struct Message: Codable {
var value: Int
Expand Down Expand Up @@ -141,7 +145,11 @@ struct TestLogger: SupabaseLogger {

await Task.yield()

await channel.subscribe()
do {
try await channel.subscribeWithError()
} catch {
XCTFail("Expected .subscribed but got error: \(error)")
}

struct UserState: Codable, Equatable {
let email: String
Expand Down Expand Up @@ -201,7 +209,11 @@ struct TestLogger: SupabaseLogger {
}

await Task.yield()
await channel.subscribe()
do {
try await channel.subscribeWithError()
} catch {
XCTFail("Expected .subscribed but got error: \(error)")
}

struct Entry: Codable, Equatable {
let key: String
Expand Down
5 changes: 4 additions & 1 deletion Tests/RealtimeTests/RealtimeChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ final class RealtimeChannelTests: XCTestCase {
XCTAssertTrue(channel.callbackManager.callbacks.contains(where: { $0.isPresence }))

// Start subscription process
Task {
let subscribeTask = Task {
await channel.subscribe()
}

Expand Down Expand Up @@ -191,5 +191,8 @@ final class RealtimeChannelTests: XCTestCase {
presenceSubscription.cancel()
await channel.unsubscribe()
socket.disconnect()

// Note: We don't assert the subscribe status here because the test doesn't wait for completion
// The subscription is still in progress when we clean up
}
}
Loading
Loading