diff --git a/src/common/lib/client/baserealtime.ts b/src/common/lib/client/baserealtime.ts index af3bdc78b..78f539aca 100644 --- a/src/common/lib/client/baserealtime.ts +++ b/src/common/lib/client/baserealtime.ts @@ -171,20 +171,41 @@ class Channels extends EventEmitter { /* Connection interruptions (ie when the connection will no longer queue * events) imply connection state changes for any channel which is either * attached, pending, or will attempt to become attached in the future */ - propogateConnectionInterruption(connectionState: string, reason: ErrorInfo) { - const connectionStateToChannelState: Record = { - closing: 'detached', - closed: 'detached', - failed: 'failed', - suspended: 'suspended', - }; - const fromChannelStates = ['attaching', 'attached', 'detaching', 'suspended']; - const toChannelState = connectionStateToChannelState[connectionState]; - + propagateConnectionInterruption(connectionState: string, reason: ErrorInfo) { for (const channelId in this.all) { const channel = this.all[channelId]; - if (fromChannelStates.includes(channel.state)) { - channel.notifyState(toChannelState, reason); + + let toChannelState: API.ChannelState | null = null; + let channelErrorReason: ErrorInfo | null = reason; + + switch (connectionState) { + case 'failed': + if (['attaching', 'attached', 'detaching'].includes(channel.state)) { + // RTL3a, RTL3g + toChannelState = 'failed'; + } + break; + case 'closed': + if (['attaching', 'attached', 'detaching'].includes(channel.state)) { + // RTL3b, RTL3h + toChannelState = 'detached'; + } + break; + case 'suspended': + if (['attaching', 'attached'].includes(channel.state)) { + // RTL3c + toChannelState = 'suspended'; + } else if (channel.state === 'detaching') { + // RTL3h + toChannelState = 'detached'; + // Don't propagate the error + channelErrorReason = null; + } + break; + } + + if (toChannelState !== null) { + channel.notifyState(toChannelState, channelErrorReason); } } } diff --git a/src/common/lib/transport/connectionmanager.ts b/src/common/lib/transport/connectionmanager.ts index 3017dd7c7..17f99be43 100644 --- a/src/common/lib/transport/connectionmanager.ts +++ b/src/common/lib/transport/connectionmanager.ts @@ -1303,7 +1303,7 @@ class ConnectionManager extends EventEmitter { if (this.state.sendEvents) { this.sendQueuedMessages(); } else if (!this.state.queueEvents) { - this.realtime.channels.propogateConnectionInterruption(state, change.reason); + this.realtime.channels.propagateConnectionInterruption(state, change.reason); this.failQueuedMessages(change.reason as ErrorInfo); // RTN7c } } diff --git a/test/realtime/auth.test.js b/test/realtime/auth.test.js index 6af1a7a31..3ba8b2028 100644 --- a/test/realtime/auth.test.js +++ b/test/realtime/auth.test.js @@ -768,7 +768,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async }); /** - * Check state change reason is propogated during a disconnect + * Check state change reason is propagated during a disconnect * (when connecting with a token that expires while connected) * * @spec RSA4b1 diff --git a/test/realtime/channel.test.js b/test/realtime/channel.test.js index 2f3280e20..839bf3587 100644 --- a/test/realtime/channel.test.js +++ b/test/realtime/channel.test.js @@ -1258,7 +1258,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async realtime.options.timeouts.realtimeRequestTimeout = 100; channel.once(function (stateChange) { expect(stateChange.current).to.equal('attaching', 'Channel reattach attempt happens immediately'); - expect(stateChange.reason.code).to.equal(50000, 'check error is propogated in the reason'); + expect(stateChange.reason.code).to.equal(50000, 'check error is propagated in the reason'); cb(); }); helper.recordPrivateApi('call.connectionManager.activeProtocol.getTransport'); @@ -1326,7 +1326,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async }; Helper.whenPromiseSettles(channel.attach(), function (err) { try { - expect(err.code).to.equal(50000, 'check error is propogated to the attach callback'); + expect(err.code).to.equal(50000, 'check error is propagated to the attach callback'); expect(channel.state).to.equal('suspended', 'check channel goes into suspended'); helper.closeAndFinish(done, realtime); } catch (err) { @@ -1356,7 +1356,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async channel.on('failed', function (stateChange) { try { - expect(stateChange.reason.code).to.equal(50000, 'check error is propogated'); + expect(stateChange.reason.code).to.equal(50000, 'check error is propagated'); helper.closeAndFinish(done, realtime); } catch (err) { helper.closeAndFinish(done, realtime, err); @@ -1406,7 +1406,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async expect(stateChange.current).to.equal('attached', 'check current'); expect(stateChange.previous).to.equal('attached', 'check previous'); expect(stateChange.resumed).to.equal(false, 'check resumed'); - expect(stateChange.reason.code).to.equal(50000, 'check error propogated'); + expect(stateChange.reason.code).to.equal(50000, 'check error propagated'); expect(channel.state).to.equal('attached', 'check channel still attached'); cb(); }); @@ -1583,6 +1583,145 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async ); }); + /** @spec RTL3g **/ + it('detaching_channel_when_connection_enters_failed', async function () { + // Given: A channel in the DETACHING state + const helper = this.test.helper; + const realtime = helper.AblyRealtime({ transports: [helper.bestTransport] }); + const channel = realtime.channels.get('detached_channel_when_connection_enters_failed'); + + await channel.attach(); + + helper.recordPrivateApi('call.connectionManager.activeProtocol.getTransport'); + const transport = realtime.connection.connectionManager.activeProtocol.getTransport(); + const onProtocolMessageOriginal = transport.onProtocolMessage; + + helper.recordPrivateApi('replace.transport.onProtocolMessage'); + transport.onProtocolMessage = function (msg) { + if (msg.action === 13) { + // Drop the incoming DETACHED so that the channel stays in DETACHING + return; + } + + helper.recordPrivateApi('call.transport.onProtocolMessage'); + onProtocolMessageOriginal.call(this, msg); + }; + + const channelDetachPromise = channel.detach(); + + expect(channel.state).to.equal('detaching'); + + // When: The connection enters FAILED + const channelFailedPromise = channel.whenState('failed'); + + // Inject a connection-level ERROR to make connection enter FAILED per RTN15i + helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); + helper.recordPrivateApi('call.transport.onProtocolMessage'); + transport.onProtocolMessage( + createPM({ action: 9, error: { code: 40000, statusCode: 400, message: 'Some error' } }), + ); + + // Then: The channel transitions to FAILED and the call to `detach()` fails + await channelFailedPromise; + + try { + await channelDetachPromise; + expect.fail('Expected channel.detach() to throw'); + } catch { + expect(channel.errorReason.code).to.equal(40000); + expect(channel.errorReason.statusCode).to.equal(400); + expect(channel.errorReason.message).to.equal('Some error'); + } + + // Teardown + await helper.closeAndFinishAsync(realtime); + }); + + /** @specpartial RTL3h - Tests the CLOSED case **/ + it('detaching_channel_when_connection_enters_closed', async function () { + // Given: A channel in the DETACHING state + const helper = this.test.helper; + const realtime = helper.AblyRealtime({ transports: [helper.bestTransport] }); + const channel = realtime.channels.get('detached_channel_when_connection_enters_failed'); + + await channel.attach(); + + helper.recordPrivateApi('call.connectionManager.activeProtocol.getTransport'); + const transport = realtime.connection.connectionManager.activeProtocol.getTransport(); + const onProtocolMessageOriginal = transport.onProtocolMessage; + + helper.recordPrivateApi('replace.transport.onProtocolMessage'); + transport.onProtocolMessage = function (msg) { + if (msg.action === 13) { + // Drop the incoming DETACHED so that the channel stays in DETACHING + return; + } + + helper.recordPrivateApi('call.transport.onProtocolMessage'); + onProtocolMessageOriginal.call(this, msg); + }; + + const channelDetachPromise = channel.detach(); + + expect(channel.state).to.equal('detaching'); + + // When: The connection enters CLOSED + const channelDetachedPromise = channel.whenState('detached'); + + realtime.close(); + + // Then: The channel transitions to DETACHED and the call to `detach()` succeeds + await channelDetachedPromise; + await channelDetachPromise; + + // Teardown + await helper.closeAndFinishAsync(realtime); + }); + + /** @specpartial RTL3h - Tests the SUSPENDED case **/ + it('detaching_channel_when_connection_enters_suspended', async function () { + // Given: A channel in the DETACHING state + const helper = this.test.helper; + const realtime = helper.AblyRealtime({ transports: [helper.bestTransport] }); + const channel = realtime.channels.get('detached_channel_when_connection_enters_failed'); + + await channel.attach(); + + helper.recordPrivateApi('call.connectionManager.activeProtocol.getTransport'); + const transport = realtime.connection.connectionManager.activeProtocol.getTransport(); + const onProtocolMessageOriginal = transport.onProtocolMessage; + + helper.recordPrivateApi('replace.transport.onProtocolMessage'); + transport.onProtocolMessage = function (msg) { + if (msg.action === 13) { + // Drop the incoming DETACHED so that the channel stays in DETACHING + return; + } + + helper.recordPrivateApi('call.transport.onProtocolMessage'); + onProtocolMessageOriginal.call(this, msg); + }; + + const channelDetachPromise = channel.detach(); + + expect(channel.state).to.equal('detaching'); + + // When: The connection enters SUSPENDED + const channelDetachedPromise = channel.whenState('detached'); + + await new Promise((resolve) => { + helper.becomeSuspended(realtime, resolve); + }); + + // Then: The channel transitions to DETACHED and the call to `detach()` succeeds + await channelDetachedPromise; + expect(channel.errorReason).to.be.null; + await channelDetachPromise; + + // Teardown + await helper.closeAndFinishAsync(realtime); + }); + /** @spec RTL5i */ it('attached_while_detaching', function (done) { var helper = this.test.helper, diff --git a/test/realtime/resume.test.js b/test/realtime/resume.test.js index 7fd47def5..3b51cfb3a 100644 --- a/test/realtime/resume.test.js +++ b/test/realtime/resume.test.js @@ -448,7 +448,7 @@ define(['shared_helper', 'async', 'chai'], function (Helper, async, chai) { function (cb) { connection.once('failed', function (stateChange) { try { - expect(stateChange.reason.code).to.equal(40101, 'check correct code propogated'); + expect(stateChange.reason.code).to.equal(40101, 'check correct code propagated'); } catch (err) { cb(err); return;