From 9526052974a80f6a28a4f6fb1f24fdb4d183ad52 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Tue, 8 Jul 2025 08:40:25 -0300 Subject: [PATCH 1/8] feat(realtime): add retry limit to subscribe --- Sources/Helpers/Task+withTimeout.swift | 4 +- Sources/Realtime/RealtimeChannelV2.swift | 51 ++++++++++++++++++------ 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/Sources/Helpers/Task+withTimeout.swift b/Sources/Helpers/Task+withTimeout.swift index 5477806b..80ab8fb7 100644 --- a/Sources/Helpers/Task+withTimeout.swift +++ b/Sources/Helpers/Task+withTimeout.swift @@ -10,7 +10,7 @@ import Foundation @discardableResult package func withTimeout( interval: TimeInterval, - @_inheritActorContext operation: @escaping @Sendable () async throws -> R + @_inheritActorContext operation: @escaping @Sendable () async -> R ) async throws -> R { try await withThrowingTaskGroup(of: R.self) { group in defer { @@ -20,7 +20,7 @@ package func withTimeout( let deadline = Date(timeIntervalSinceNow: interval) group.addTask { - try await operation() + await operation() } group.addTask { diff --git a/Sources/Realtime/RealtimeChannelV2.swift b/Sources/Realtime/RealtimeChannelV2.swift index 9b6efa14..07d3367f 100644 --- a/Sources/Realtime/RealtimeChannelV2.swift +++ b/Sources/Realtime/RealtimeChannelV2.swift @@ -40,6 +40,7 @@ public final class RealtimeChannelV2: Sendable { let logger: (any SupabaseLogger)? let socket: RealtimeClientV2 + let maxRetryAttempt = 5 @MainActor var joinRef: String? { mutableState.joinRef } @@ -86,6 +87,43 @@ public final class RealtimeChannelV2: Sendable { /// Subscribes to the channel @MainActor public func subscribe() async { + logger?.debug("Starting subscription to channel '\(topic)' (attempt 1/\(maxRetryAttempt))") + + var attempts = 0 + + while attempts < maxRetryAttempt { + attempts += 1 + + do { + logger?.debug( + "Attempting to subscribe to channel '\(topic)' (attempt \(attempts)/\(maxRetryAttempt))" + ) + try await withTimeout(interval: socket.options.timeoutInterval) { [self] in + await _subscribe() + } + logger?.debug("Successfully subscribed to channel '\(topic)'") + return + } catch is TimeoutError { + logger?.debug( + "Subscribe timed out for channel '\(topic)' (attempt \(attempts)/\(maxRetryAttempt))" + ) + if attempts < maxRetryAttempt { + logger?.debug("Retrying subscription to channel '\(topic)'...") + } else { + logger?.error( + "Failed to subscribe to channel '\(topic)' after \(maxRetryAttempt) attempts due to timeout" + ) + } + } catch { + preconditionFailure("The only error possibly thrown is TimeoutError.") + } + } + + logger?.error("Subscription to channel '\(topic)' failed after \(attempts) attempts") + } + + /// Subscribes to the channel + private func _subscribe() async { if socket.status != .connected { if socket.options.connectOnSubscribe != true { reportIssue( @@ -125,18 +163,7 @@ public final class RealtimeChannelV2: Sendable { payload: try! JSONObject(payload) ) - do { - try await withTimeout(interval: socket.options.timeoutInterval) { [self] in - _ = await statusChange.first { @Sendable in $0 == .subscribed } - } - } catch { - if error is TimeoutError { - logger?.debug("Subscribe timed out.") - await subscribe() - } else { - logger?.error("Subscribe failed: \(error)") - } - } + _ = await statusChange.first { @Sendable in $0 == .subscribed } } public func unsubscribe() async { From edb270ac410a45a23f793c73c31b093e86c862fa Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Tue, 8 Jul 2025 08:54:31 -0300 Subject: [PATCH 2/8] feat(realtime): add exponential backoff with jitter and cancellation support --- Sources/Realtime/RealtimeChannelV2.swift | 39 ++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/Sources/Realtime/RealtimeChannelV2.swift b/Sources/Realtime/RealtimeChannelV2.swift index 07d3367f..a030eb99 100644 --- a/Sources/Realtime/RealtimeChannelV2.swift +++ b/Sources/Realtime/RealtimeChannelV2.swift @@ -98,28 +98,63 @@ public final class RealtimeChannelV2: Sendable { logger?.debug( "Attempting to subscribe to channel '\(topic)' (attempt \(attempts)/\(maxRetryAttempt))" ) + try await withTimeout(interval: socket.options.timeoutInterval) { [self] in await _subscribe() } + logger?.debug("Successfully subscribed to channel '\(topic)'") return + } catch is TimeoutError { logger?.debug( "Subscribe timed out for channel '\(topic)' (attempt \(attempts)/\(maxRetryAttempt))" ) + if attempts < maxRetryAttempt { - logger?.debug("Retrying subscription to channel '\(topic)'...") + // Add exponential backoff with jitter + let delay = calculateRetryDelay(for: attempts) + logger?.debug( + "Retrying subscription to channel '\(topic)' in \(String(format: "%.2f", delay)) seconds..." + ) + + do { + try await Task.sleep(nanoseconds: UInt64(delay * 1_000_000_000)) + } catch { + // If sleep is cancelled, break out of retry loop + logger?.debug("Subscription retry cancelled for channel '\(topic)'") + break + } } else { logger?.error( "Failed to subscribe to channel '\(topic)' after \(maxRetryAttempt) attempts due to timeout" ) } + } catch { - preconditionFailure("The only error possibly thrown is TimeoutError.") + preconditionFailure( + "The only possible error here is TimeoutError, this should never happen.") } } logger?.error("Subscription to channel '\(topic)' failed after \(attempts) attempts") + status = .unsubscribed + } + + /// Calculates retry delay with exponential backoff and jitter + private func calculateRetryDelay(for attempt: Int) -> TimeInterval { + let baseDelay: TimeInterval = 1.0 + let maxDelay: TimeInterval = 30.0 + let backoffMultiplier: Double = 2.0 + + let exponentialDelay = baseDelay * pow(backoffMultiplier, Double(attempt - 1)) + let cappedDelay = min(exponentialDelay, maxDelay) + + // Add jitter (±25% random variation) to prevent thundering herd + let jitterRange = cappedDelay * 0.25 + let jitter = Double.random(in: -jitterRange...jitterRange) + + return max(0.1, cappedDelay + jitter) } /// Subscribes to the channel From cec37661f9cec9a8552f8ffa64dfffe8d261b774 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Mon, 14 Jul 2025 12:24:23 -0300 Subject: [PATCH 3/8] use Encodable type for broadcast --- Sources/Realtime/RealtimeChannelV2.swift | 27 ++++++++++++++---------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/Sources/Realtime/RealtimeChannelV2.swift b/Sources/Realtime/RealtimeChannelV2.swift index a030eb99..abfa3940 100644 --- a/Sources/Realtime/RealtimeChannelV2.swift +++ b/Sources/Realtime/RealtimeChannelV2.swift @@ -158,6 +158,7 @@ public final class RealtimeChannelV2: Sendable { } /// Subscribes to the channel + @MainActor private func _subscribe() async { if socket.status != .connected { if socket.options.connectOnSubscribe != true { @@ -237,13 +238,6 @@ public final class RealtimeChannelV2: Sendable { @MainActor public func broadcast(event: String, message: JSONObject) async { if status != .subscribed { - struct Message: Encodable { - let topic: String - let event: String - let payload: JSONObject - let `private`: Bool - } - var headers: HTTPFields = [.contentType: "application/json"] if let apiKey = socket.options.apikey { headers[.apiKey] = apiKey @@ -252,6 +246,17 @@ public final class RealtimeChannelV2: Sendable { headers[.authorization] = "Bearer \(accessToken)" } + struct BroadcastMessagePayload: Encodable { + let messages: [Message] + + struct Message: Encodable { + let topic: String + let event: String + let payload: JSONObject + let `private`: Bool + } + } + let task = Task { [headers] in _ = try? await socket.http.send( HTTPRequest( @@ -259,16 +264,16 @@ public final class RealtimeChannelV2: Sendable { method: .post, headers: headers, body: JSONEncoder().encode( - [ - "messages": [ - Message( + BroadcastMessagePayload( + messages: [ + BroadcastMessagePayload.Message( topic: topic, event: event, payload: message, private: config.isPrivate ) ] - ] + ) ) ) ) From 0e967d257fcdaa86f2dd119f9c130966dabd7c7b Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Mon, 14 Jul 2025 12:34:51 -0300 Subject: [PATCH 4/8] fix tests --- Sources/Realtime/RealtimeChannelV2.swift | 2 +- Tests/RealtimeTests/RealtimeTests.swift | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/Sources/Realtime/RealtimeChannelV2.swift b/Sources/Realtime/RealtimeChannelV2.swift index abfa3940..7b6b30a3 100644 --- a/Sources/Realtime/RealtimeChannelV2.swift +++ b/Sources/Realtime/RealtimeChannelV2.swift @@ -119,7 +119,7 @@ public final class RealtimeChannelV2: Sendable { ) do { - try await Task.sleep(nanoseconds: UInt64(delay * 1_000_000_000)) + try await _clock.sleep(for: delay) } catch { // If sleep is cancelled, break out of retry loop logger?.debug("Subscription retry cancelled for channel '\(topic)'") diff --git a/Tests/RealtimeTests/RealtimeTests.swift b/Tests/RealtimeTests/RealtimeTests.swift index 59cb9ff5..b2af44be 100644 --- a/Tests/RealtimeTests/RealtimeTests.swift +++ b/Tests/RealtimeTests/RealtimeTests.swift @@ -221,6 +221,12 @@ final class RealtimeTests: XCTestCase { // Wait for the timeout for rejoining. await testClock.advance(by: .seconds(timeoutInterval)) + + // Wait for the retry delay (base delay is 1.0s, but we need to account for jitter) + // The retry delay is calculated as: baseDelay * pow(2, attempt-1) + jitter + // For attempt 2: 1.0 * pow(2, 1) = 2.0s + jitter (up to ±25% = ±0.5s) + // So we need to wait at least 2.5s to ensure the retry happens + await testClock.advance(by: .seconds(2.5)) let events = client.sentEvents.compactMap { $0.realtimeMessage }.filter { $0.event == "phx_join" From 0976218d9754541efb5f5c18d10a0cd13052b22e Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Mon, 14 Jul 2025 12:49:48 -0300 Subject: [PATCH 5/8] test: revamp realtime tests --- Tests/RealtimeTests/RealtimeTests.swift | 189 ++++++++++++++++++++++++ 1 file changed, 189 insertions(+) diff --git a/Tests/RealtimeTests/RealtimeTests.swift b/Tests/RealtimeTests/RealtimeTests.swift index b2af44be..b9690899 100644 --- a/Tests/RealtimeTests/RealtimeTests.swift +++ b/Tests/RealtimeTests/RealtimeTests.swift @@ -287,6 +287,195 @@ final class RealtimeTests: XCTestCase { } } + // Succeeds after 2 retries (on 3rd attempt) + func testSubscribeTimeout_successAfterRetries() async throws { + let successAttempt = 3 + let channel = sut.channel("public:messages") + let joinEventCount = LockIsolated(0) + + server.onEvent = { @Sendable [server] event in + guard let msg = event.realtimeMessage else { return } + + if msg.event == "heartbeat" { + server?.send( + RealtimeMessageV2( + joinRef: msg.joinRef, + ref: msg.ref, + topic: "phoenix", + event: "phx_reply", + payload: ["response": [:]] + ) + ) + } else if msg.event == "phx_join" { + joinEventCount.withValue { $0 += 1 } + // Respond on the 3rd attempt + if joinEventCount.value == successAttempt { + server?.send(.messagesSubscribed) + } + } + } + + await sut.connect() + await testClock.advance(by: .seconds(heartbeatInterval)) + + let subscribeTask = Task { + await channel.subscribe() + } + + // Wait for each attempt and retry delay + for attempt in 1.. Date: Mon, 14 Jul 2025 12:54:20 -0300 Subject: [PATCH 6/8] remove redundant test case --- Tests/RealtimeTests/RealtimeTests.swift | 51 ------------------------- 1 file changed, 51 deletions(-) diff --git a/Tests/RealtimeTests/RealtimeTests.swift b/Tests/RealtimeTests/RealtimeTests.swift index b9690899..56235504 100644 --- a/Tests/RealtimeTests/RealtimeTests.swift +++ b/Tests/RealtimeTests/RealtimeTests.swift @@ -339,57 +339,6 @@ final class RealtimeTests: XCTestCase { XCTAssertEqual(channel.status, .subscribed) } - // Succeeds after 3 retries (on 4th attempt) - func testSubscribeTimeout_successAfterMultipleRetries() async throws { - let channel = sut.channel("public:messages") - let joinEventCount = LockIsolated(0) - - server.onEvent = { @Sendable [server] event in - guard let msg = event.realtimeMessage else { return } - - if msg.event == "heartbeat" { - server?.send( - RealtimeMessageV2( - joinRef: msg.joinRef, - ref: msg.ref, - topic: "phoenix", - event: "phx_reply", - payload: ["response": [:]] - ) - ) - } else if msg.event == "phx_join" { - joinEventCount.withValue { $0 += 1 } - // Only respond to the 4th join attempt (after 3 timeouts) - if joinEventCount.value == 4 { - server?.send(.messagesSubscribed) - } - } - } - - await sut.connect() - await testClock.advance(by: .seconds(heartbeatInterval)) - - Task { - await channel.subscribe() - } - - // Wait for first timeout - await testClock.advance(by: .seconds(timeoutInterval)) - await testClock.advance(by: .seconds(2.5)) - await testClock.advance(by: .seconds(timeoutInterval)) - await testClock.advance(by: .seconds(5.0)) - await testClock.advance(by: .seconds(timeoutInterval)) - await testClock.advance(by: .seconds(10.0)) - - let events = client.sentEvents.compactMap { $0.realtimeMessage }.filter { - $0.event == "phx_join" - } - XCTAssertEqual(events.count, 4) - for (index, event) in events.enumerated() { - XCTAssertEqual(event.ref, "\(index + 1)") - } - } - // Fails after max retries (should unsubscribe) func testSubscribeTimeout_failsAfterMaxRetries() async throws { let channel = sut.channel("public:messages") From c9ea0466e3980f431d2fa15136f61ee740506407 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Mon, 14 Jul 2025 16:25:06 -0300 Subject: [PATCH 7/8] fix(realtime): fix subscription retry logic and test failures - Add missing return statement in subscribeWithError() to prevent infinite retries - Add heartbeat response handling in testBehavior test - Fix test expectations for retry attempts - Ensure proper cleanup on subscription failure This fixes the issue where successful subscriptions would continue retrying and tests would fail due to incorrect retry attempt counting. --- Sources/Realtime/RealtimeChannelV2.swift | 34 +++++++++--- Sources/Realtime/RealtimeError.swift | 7 +++ .../RealtimeIntegrationTests.swift | 18 +++++-- .../RealtimeTests/RealtimeChannelTests.swift | 5 +- Tests/RealtimeTests/RealtimeTests.swift | 53 ++++++++++++++++--- 5 files changed, 97 insertions(+), 20 deletions(-) diff --git a/Sources/Realtime/RealtimeChannelV2.swift b/Sources/Realtime/RealtimeChannelV2.swift index 7b6b30a3..4f097208 100644 --- a/Sources/Realtime/RealtimeChannelV2.swift +++ b/Sources/Realtime/RealtimeChannelV2.swift @@ -84,11 +84,20 @@ public final class RealtimeChannelV2: Sendable { callbackManager.reset() } - /// Subscribes to the channel - @MainActor - public func subscribe() async { + /// Subscribes to the channel. + public func subscribeWithError() async throws { logger?.debug("Starting subscription to channel '\(topic)' (attempt 1/\(maxRetryAttempt))") + status = .subscribing + + defer { + // If the subscription fails, we need to set the status to unsubscribed + // to avoid the channel being stuck in a subscribing state. + if status != .subscribed { + status = .unsubscribed + } + } + var attempts = 0 while attempts < maxRetryAttempt { @@ -123,22 +132,32 @@ public final class RealtimeChannelV2: Sendable { } catch { // If sleep is cancelled, break out of retry loop logger?.debug("Subscription retry cancelled for channel '\(topic)'") - break + throw CancellationError() } } else { logger?.error( "Failed to subscribe to channel '\(topic)' after \(maxRetryAttempt) attempts due to timeout" ) } - + } catch is CancellationError { + logger?.debug("Subscription retry cancelled for channel '\(topic)'") + throw CancellationError() } catch { preconditionFailure( - "The only possible error here is TimeoutError, this should never happen.") + "The only possible error here is TimeoutError or CancellationError, this should never happen." + ) } } logger?.error("Subscription to channel '\(topic)' failed after \(attempts) attempts") - status = .unsubscribed + throw RealtimeError.maxRetryAttemptsReached + } + + /// Subscribes to the channel. + @available(*, deprecated, message: "Use `subscribeWithError` instead") + @MainActor + public func subscribe() async { + try? await subscribeWithError() } /// Calculates retry delay with exponential backoff and jitter @@ -170,7 +189,6 @@ public final class RealtimeChannelV2: Sendable { await socket.connect() } - status = .subscribing logger?.debug("Subscribing to channel \(topic)") config.presence.enabled = callbackManager.callbacks.contains(where: { $0.isPresence }) diff --git a/Sources/Realtime/RealtimeError.swift b/Sources/Realtime/RealtimeError.swift index db0d3770..675ca27e 100644 --- a/Sources/Realtime/RealtimeError.swift +++ b/Sources/Realtime/RealtimeError.swift @@ -14,3 +14,10 @@ struct RealtimeError: LocalizedError { self.errorDescription = errorDescription } } + +extension RealtimeError { + /// The maximum retry attempts reached. + static var maxRetryAttemptsReached: Self { + Self("Maximum retry attempts reached.") + } +} diff --git a/Tests/IntegrationTests/RealtimeIntegrationTests.swift b/Tests/IntegrationTests/RealtimeIntegrationTests.swift index e641154e..5ad82f26 100644 --- a/Tests/IntegrationTests/RealtimeIntegrationTests.swift +++ b/Tests/IntegrationTests/RealtimeIntegrationTests.swift @@ -70,7 +70,11 @@ struct TestLogger: SupabaseLogger { await Task.yield() - await channel.subscribe() + do { + try await channel.subscribeWithError() + } catch { + XCTFail("Expected .subscribed but got error: \(error)") + } struct Message: Codable { var value: Int @@ -141,7 +145,11 @@ struct TestLogger: SupabaseLogger { await Task.yield() - await channel.subscribe() + do { + try await channel.subscribeWithError() + } catch { + XCTFail("Expected .subscribed but got error: \(error)") + } struct UserState: Codable, Equatable { let email: String @@ -201,7 +209,11 @@ struct TestLogger: SupabaseLogger { } await Task.yield() - await channel.subscribe() + do { + try await channel.subscribeWithError() + } catch { + XCTFail("Expected .subscribed but got error: \(error)") + } struct Entry: Codable, Equatable { let key: String diff --git a/Tests/RealtimeTests/RealtimeChannelTests.swift b/Tests/RealtimeTests/RealtimeChannelTests.swift index 8589519d..9362513a 100644 --- a/Tests/RealtimeTests/RealtimeChannelTests.swift +++ b/Tests/RealtimeTests/RealtimeChannelTests.swift @@ -161,7 +161,7 @@ final class RealtimeChannelTests: XCTestCase { XCTAssertTrue(channel.callbackManager.callbacks.contains(where: { $0.isPresence })) // Start subscription process - Task { + let subscribeTask = Task { await channel.subscribe() } @@ -191,5 +191,8 @@ final class RealtimeChannelTests: XCTestCase { presenceSubscription.cancel() await channel.unsubscribe() socket.disconnect() + + // Note: We don't assert the subscribe status here because the test doesn't wait for completion + // The subscription is still in progress when we clean up } } diff --git a/Tests/RealtimeTests/RealtimeTests.swift b/Tests/RealtimeTests/RealtimeTests.swift index 56235504..f24aec6f 100644 --- a/Tests/RealtimeTests/RealtimeTests.swift +++ b/Tests/RealtimeTests/RealtimeTests.swift @@ -108,6 +108,23 @@ final class RealtimeTests: XCTestCase { } .store(in: &subscriptions) + // Set up server to respond to heartbeats + server.onEvent = { @Sendable [server] event in + guard let msg = event.realtimeMessage else { return } + + if msg.event == "heartbeat" { + server?.send( + RealtimeMessageV2( + joinRef: msg.joinRef, + ref: msg.ref, + topic: "phoenix", + event: "phx_reply", + payload: ["response": [:]] + ) + ) + } + } + await sut.connect() XCTAssertEqual(socketStatuses.value, [.disconnected, .connecting, .connected]) @@ -127,14 +144,17 @@ final class RealtimeTests: XCTestCase { .store(in: &subscriptions) let subscribeTask = Task { - await channel.subscribe() + try await channel.subscribeWithError() } await Task.yield() server.send(.messagesSubscribed) // Wait until it subscribes to assert WS events - await subscribeTask.value - + do { + try await subscribeTask.value + } catch { + XCTFail("Expected .subscribed but got error: \(error)") + } XCTAssertEqual(channelStatuses.value, [.unsubscribed, .subscribing, .subscribed]) assertInlineSnapshot(of: client.sentEvents.map(\.json), as: .json) { @@ -216,7 +236,7 @@ final class RealtimeTests: XCTestCase { await testClock.advance(by: .seconds(heartbeatInterval)) Task { - await channel.subscribe() + try await channel.subscribeWithError() } // Wait for the timeout for rejoining. @@ -319,7 +339,7 @@ final class RealtimeTests: XCTestCase { await testClock.advance(by: .seconds(heartbeatInterval)) let subscribeTask = Task { - await channel.subscribe() + _ = try? await channel.subscribeWithError() } // Wait for each attempt and retry delay @@ -365,8 +385,8 @@ final class RealtimeTests: XCTestCase { await sut.connect() await testClock.advance(by: .seconds(heartbeatInterval)) - Task { - await channel.subscribe() + let subscribeTask = Task { + try await channel.subscribeWithError() } for attempt in 1...5 { @@ -377,6 +397,13 @@ final class RealtimeTests: XCTestCase { } } + do { + try await subscribeTask.value + XCTFail("Expected error but got success") + } catch { + XCTAssertTrue(error is RealtimeError) + } + let events = client.sentEvents.compactMap { $0.realtimeMessage }.filter { $0.event == "phx_join" } @@ -411,16 +438,26 @@ final class RealtimeTests: XCTestCase { await testClock.advance(by: .seconds(heartbeatInterval)) let subscribeTask = Task { - await channel.subscribe() + try await channel.subscribeWithError() } await testClock.advance(by: .seconds(timeoutInterval)) subscribeTask.cancel() + + do { + try await subscribeTask.value + XCTFail("Expected cancellation error but got success") + } catch is CancellationError { + // Expected + } catch { + XCTFail("Expected CancellationError but got: \(error)") + } await testClock.advance(by: .seconds(5.0)) let events = client.sentEvents.compactMap { $0.realtimeMessage }.filter { $0.event == "phx_join" } + XCTAssertEqual(events.count, 1) XCTAssertEqual(channel.status, .unsubscribed) } From d7029030d81b4732f27ba446079f05118e310660 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Tue, 22 Jul 2025 05:00:47 -0300 Subject: [PATCH 8/8] add configurable option for maxRetryAttempts --- Sources/Realtime/RealtimeChannelV2.swift | 13 ++++++------- Sources/Realtime/Types.swift | 4 ++++ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/Sources/Realtime/RealtimeChannelV2.swift b/Sources/Realtime/RealtimeChannelV2.swift index 4f097208..746ef43b 100644 --- a/Sources/Realtime/RealtimeChannelV2.swift +++ b/Sources/Realtime/RealtimeChannelV2.swift @@ -40,7 +40,6 @@ public final class RealtimeChannelV2: Sendable { let logger: (any SupabaseLogger)? let socket: RealtimeClientV2 - let maxRetryAttempt = 5 @MainActor var joinRef: String? { mutableState.joinRef } @@ -86,7 +85,7 @@ public final class RealtimeChannelV2: Sendable { /// Subscribes to the channel. public func subscribeWithError() async throws { - logger?.debug("Starting subscription to channel '\(topic)' (attempt 1/\(maxRetryAttempt))") + logger?.debug("Starting subscription to channel '\(topic)' (attempt 1/\(socket.options.maxRetryAttempts))") status = .subscribing @@ -100,12 +99,12 @@ public final class RealtimeChannelV2: Sendable { var attempts = 0 - while attempts < maxRetryAttempt { + while attempts < socket.options.maxRetryAttempts { attempts += 1 do { logger?.debug( - "Attempting to subscribe to channel '\(topic)' (attempt \(attempts)/\(maxRetryAttempt))" + "Attempting to subscribe to channel '\(topic)' (attempt \(attempts)/\(socket.options.maxRetryAttempts))" ) try await withTimeout(interval: socket.options.timeoutInterval) { [self] in @@ -117,10 +116,10 @@ public final class RealtimeChannelV2: Sendable { } catch is TimeoutError { logger?.debug( - "Subscribe timed out for channel '\(topic)' (attempt \(attempts)/\(maxRetryAttempt))" + "Subscribe timed out for channel '\(topic)' (attempt \(attempts)/\(socket.options.maxRetryAttempts))" ) - if attempts < maxRetryAttempt { + if attempts < socket.options.maxRetryAttempts { // Add exponential backoff with jitter let delay = calculateRetryDelay(for: attempts) logger?.debug( @@ -136,7 +135,7 @@ public final class RealtimeChannelV2: Sendable { } } else { logger?.error( - "Failed to subscribe to channel '\(topic)' after \(maxRetryAttempt) attempts due to timeout" + "Failed to subscribe to channel '\(topic)' after \(socket.options.maxRetryAttempts) attempts due to timeout" ) } } catch is CancellationError { diff --git a/Sources/Realtime/Types.swift b/Sources/Realtime/Types.swift index f1bd073e..30d625e0 100644 --- a/Sources/Realtime/Types.swift +++ b/Sources/Realtime/Types.swift @@ -20,6 +20,7 @@ public struct RealtimeClientOptions: Sendable { var timeoutInterval: TimeInterval var disconnectOnSessionLoss: Bool var connectOnSubscribe: Bool + var maxRetryAttempts: Int /// Sets the log level for Realtime var logLevel: LogLevel? @@ -32,6 +33,7 @@ public struct RealtimeClientOptions: Sendable { public static let defaultTimeoutInterval: TimeInterval = 10 public static let defaultDisconnectOnSessionLoss = true public static let defaultConnectOnSubscribe: Bool = true + public static let defaultMaxRetryAttempts: Int = 5 public init( headers: [String: String] = [:], @@ -40,6 +42,7 @@ public struct RealtimeClientOptions: Sendable { timeoutInterval: TimeInterval = Self.defaultTimeoutInterval, disconnectOnSessionLoss: Bool = Self.defaultDisconnectOnSessionLoss, connectOnSubscribe: Bool = Self.defaultConnectOnSubscribe, + maxRetryAttempts: Int = Self.defaultMaxRetryAttempts, logLevel: LogLevel? = nil, fetch: (@Sendable (_ request: URLRequest) async throws -> (Data, URLResponse))? = nil, accessToken: (@Sendable () async throws -> String?)? = nil, @@ -51,6 +54,7 @@ public struct RealtimeClientOptions: Sendable { self.timeoutInterval = timeoutInterval self.disconnectOnSessionLoss = disconnectOnSessionLoss self.connectOnSubscribe = connectOnSubscribe + self.maxRetryAttempts = maxRetryAttempts self.logLevel = logLevel self.fetch = fetch self.accessToken = accessToken