Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 33 additions & 12 deletions src/common/lib/client/baserealtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,41 @@ class Channels extends EventEmitter {
/* Connection interruptions (ie when the connection will no longer queue
* events) imply connection state changes for any channel which is either
* attached, pending, or will attempt to become attached in the future */
propogateConnectionInterruption(connectionState: string, reason: ErrorInfo) {
const connectionStateToChannelState: Record<string, API.ChannelState> = {
closing: 'detached',
closed: 'detached',
failed: 'failed',
suspended: 'suspended',
};
const fromChannelStates = ['attaching', 'attached', 'detaching', 'suspended'];
const toChannelState = connectionStateToChannelState[connectionState];

propagateConnectionInterruption(connectionState: string, reason: ErrorInfo) {
for (const channelId in this.all) {
const channel = this.all[channelId];
if (fromChannelStates.includes(channel.state)) {
channel.notifyState(toChannelState, reason);

let toChannelState: API.ChannelState | null = null;
let channelErrorReason: ErrorInfo | null = reason;

switch (connectionState) {
case 'failed':
if (['attaching', 'attached', 'detaching'].includes(channel.state)) {
// RTL3a, RTL3g
toChannelState = 'failed';
}
break;
case 'closed':
if (['attaching', 'attached', 'detaching'].includes(channel.state)) {
// RTL3b, RTL3h
toChannelState = 'detached';
}
break;
case 'suspended':
if (['attaching', 'attached'].includes(channel.state)) {
// RTL3c
toChannelState = 'suspended';
} else if (channel.state === 'detaching') {
// RTL3h
toChannelState = 'detached';
// Don't propagate the error
channelErrorReason = null;
}
break;
}

if (toChannelState !== null) {
channel.notifyState(toChannelState, channelErrorReason);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/lib/transport/connectionmanager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,7 @@ class ConnectionManager extends EventEmitter {
if (this.state.sendEvents) {
this.sendQueuedMessages();
} else if (!this.state.queueEvents) {
this.realtime.channels.propogateConnectionInterruption(state, change.reason);
this.realtime.channels.propagateConnectionInterruption(state, change.reason);
this.failQueuedMessages(change.reason as ErrorInfo); // RTN7c
}
}
Expand Down
2 changes: 1 addition & 1 deletion test/realtime/auth.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async
});

/**
* Check state change reason is propogated during a disconnect
* Check state change reason is propagated during a disconnect
* (when connecting with a token that expires while connected)
*
* @spec RSA4b1
Expand Down
147 changes: 143 additions & 4 deletions test/realtime/channel.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1258,7 +1258,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async
realtime.options.timeouts.realtimeRequestTimeout = 100;
channel.once(function (stateChange) {
expect(stateChange.current).to.equal('attaching', 'Channel reattach attempt happens immediately');
expect(stateChange.reason.code).to.equal(50000, 'check error is propogated in the reason');
expect(stateChange.reason.code).to.equal(50000, 'check error is propagated in the reason');
cb();
});
helper.recordPrivateApi('call.connectionManager.activeProtocol.getTransport');
Expand Down Expand Up @@ -1326,7 +1326,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async
};
Helper.whenPromiseSettles(channel.attach(), function (err) {
try {
expect(err.code).to.equal(50000, 'check error is propogated to the attach callback');
expect(err.code).to.equal(50000, 'check error is propagated to the attach callback');
expect(channel.state).to.equal('suspended', 'check channel goes into suspended');
helper.closeAndFinish(done, realtime);
} catch (err) {
Expand Down Expand Up @@ -1356,7 +1356,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async

channel.on('failed', function (stateChange) {
try {
expect(stateChange.reason.code).to.equal(50000, 'check error is propogated');
expect(stateChange.reason.code).to.equal(50000, 'check error is propagated');
helper.closeAndFinish(done, realtime);
} catch (err) {
helper.closeAndFinish(done, realtime, err);
Expand Down Expand Up @@ -1406,7 +1406,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async
expect(stateChange.current).to.equal('attached', 'check current');
expect(stateChange.previous).to.equal('attached', 'check previous');
expect(stateChange.resumed).to.equal(false, 'check resumed');
expect(stateChange.reason.code).to.equal(50000, 'check error propogated');
expect(stateChange.reason.code).to.equal(50000, 'check error propagated');
expect(channel.state).to.equal('attached', 'check channel still attached');
cb();
});
Expand Down Expand Up @@ -1583,6 +1583,145 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async
);
});

/** @spec RTL3g **/
it('detaching_channel_when_connection_enters_failed', async function () {
// Given: A channel in the DETACHING state
const helper = this.test.helper;
const realtime = helper.AblyRealtime({ transports: [helper.bestTransport] });
const channel = realtime.channels.get('detached_channel_when_connection_enters_failed');

await channel.attach();

helper.recordPrivateApi('call.connectionManager.activeProtocol.getTransport');
const transport = realtime.connection.connectionManager.activeProtocol.getTransport();
const onProtocolMessageOriginal = transport.onProtocolMessage;

helper.recordPrivateApi('replace.transport.onProtocolMessage');
transport.onProtocolMessage = function (msg) {
if (msg.action === 13) {
// Drop the incoming DETACHED so that the channel stays in DETACHING
return;
}

helper.recordPrivateApi('call.transport.onProtocolMessage');
onProtocolMessageOriginal.call(this, msg);
};

const channelDetachPromise = channel.detach();

expect(channel.state).to.equal('detaching');

// When: The connection enters FAILED
const channelFailedPromise = channel.whenState('failed');

// Inject a connection-level ERROR to make connection enter FAILED per RTN15i
helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized');
helper.recordPrivateApi('call.transport.onProtocolMessage');
transport.onProtocolMessage(
createPM({ action: 9, error: { code: 40000, statusCode: 400, message: 'Some error' } }),
);

// Then: The channel transitions to FAILED and the call to `detach()` fails
await channelFailedPromise;

try {
await channelDetachPromise;
expect.fail('Expected channel.detach() to throw');
} catch {
expect(channel.errorReason.code).to.equal(40000);
expect(channel.errorReason.statusCode).to.equal(400);
expect(channel.errorReason.message).to.equal('Some error');
}

// Teardown
await helper.closeAndFinishAsync(realtime);
});

/** @specpartial RTL3h - Tests the CLOSED case **/
it('detaching_channel_when_connection_enters_closed', async function () {
// Given: A channel in the DETACHING state
const helper = this.test.helper;
const realtime = helper.AblyRealtime({ transports: [helper.bestTransport] });
const channel = realtime.channels.get('detached_channel_when_connection_enters_failed');

await channel.attach();

helper.recordPrivateApi('call.connectionManager.activeProtocol.getTransport');
const transport = realtime.connection.connectionManager.activeProtocol.getTransport();
const onProtocolMessageOriginal = transport.onProtocolMessage;

helper.recordPrivateApi('replace.transport.onProtocolMessage');
transport.onProtocolMessage = function (msg) {
if (msg.action === 13) {
// Drop the incoming DETACHED so that the channel stays in DETACHING
return;
}

helper.recordPrivateApi('call.transport.onProtocolMessage');
onProtocolMessageOriginal.call(this, msg);
};

const channelDetachPromise = channel.detach();

expect(channel.state).to.equal('detaching');

// When: The connection enters CLOSED
const channelDetachedPromise = channel.whenState('detached');

realtime.close();

// Then: The channel transitions to DETACHED and the call to `detach()` succeeds
await channelDetachedPromise;
await channelDetachPromise;

// Teardown
await helper.closeAndFinishAsync(realtime);
});

/** @specpartial RTL3h - Tests the SUSPENDED case **/
it('detaching_channel_when_connection_enters_suspended', async function () {
// Given: A channel in the DETACHING state
const helper = this.test.helper;
const realtime = helper.AblyRealtime({ transports: [helper.bestTransport] });
const channel = realtime.channels.get('detached_channel_when_connection_enters_failed');

await channel.attach();

helper.recordPrivateApi('call.connectionManager.activeProtocol.getTransport');
const transport = realtime.connection.connectionManager.activeProtocol.getTransport();
const onProtocolMessageOriginal = transport.onProtocolMessage;

helper.recordPrivateApi('replace.transport.onProtocolMessage');
transport.onProtocolMessage = function (msg) {
if (msg.action === 13) {
// Drop the incoming DETACHED so that the channel stays in DETACHING
return;
}

helper.recordPrivateApi('call.transport.onProtocolMessage');
onProtocolMessageOriginal.call(this, msg);
};

const channelDetachPromise = channel.detach();

expect(channel.state).to.equal('detaching');

// When: The connection enters SUSPENDED
const channelDetachedPromise = channel.whenState('detached');

await new Promise((resolve) => {
helper.becomeSuspended(realtime, resolve);
});

// Then: The channel transitions to DETACHED and the call to `detach()` succeeds
await channelDetachedPromise;
expect(channel.errorReason).to.be.null;
await channelDetachPromise;

// Teardown
await helper.closeAndFinishAsync(realtime);
});

/** @spec RTL5i */
it('attached_while_detaching', function (done) {
var helper = this.test.helper,
Expand Down
2 changes: 1 addition & 1 deletion test/realtime/resume.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ define(['shared_helper', 'async', 'chai'], function (Helper, async, chai) {
function (cb) {
connection.once('failed', function (stateChange) {
try {
expect(stateChange.reason.code).to.equal(40101, 'check correct code propogated');
expect(stateChange.reason.code).to.equal(40101, 'check correct code propagated');
} catch (err) {
cb(err);
return;
Expand Down
Loading