From 50bd267e484b392c6a85bf7bfac0e3b246ec4afb Mon Sep 17 00:00:00 2001 From: Tim Behrsin Date: Mon, 18 Jan 2021 09:26:38 +0000 Subject: [PATCH] #130 AsyncIterator.return() --- .../src/DynamoDBConnectionManager.ts | 9 ++++- .../src/DynamoDBSubscriptionManager.ts | 10 ++++- .../src/MemorySubscriptionManager.ts | 2 +- .../src/RedisConnectionManager.ts | 5 ++- .../src/RedisSubscriptionManager.ts | 3 ++ packages/aws-lambda-graphql/src/Server.ts | 40 ++++++++++++++++++- .../src/WebSocketConnectionManager.ts | 3 +- .../src/__tests__/Server.test.ts | 5 ++- .../src/types/connection.ts | 28 +++++++++++++ .../src/types/connections.ts | 32 ++------------- .../src/types/subscriptions.ts | 4 +- 11 files changed, 100 insertions(+), 41 deletions(-) create mode 100644 packages/aws-lambda-graphql/src/types/connection.ts diff --git a/packages/aws-lambda-graphql/src/DynamoDBConnectionManager.ts b/packages/aws-lambda-graphql/src/DynamoDBConnectionManager.ts index 0962946d..2e3aa58a 100644 --- a/packages/aws-lambda-graphql/src/DynamoDBConnectionManager.ts +++ b/packages/aws-lambda-graphql/src/DynamoDBConnectionManager.ts @@ -7,6 +7,7 @@ import { IConnectionManager, ISubscriptionManager, IConnectionData, + ISubscriber, HydrateConnectionOptions, } from './types'; import { computeTTL } from './helpers'; @@ -211,8 +212,10 @@ export class DynamoDBConnectionManager implements IConnectionManager { } }; - unregisterConnection = async ({ id }: DynamoDBConnection): Promise => { - await Promise.all([ + unregisterConnection = async ({ + id, + }: DynamoDBConnection): Promise => { + const [, subscribers] = await Promise.all([ this.db .delete({ Key: { @@ -223,6 +226,8 @@ export class DynamoDBConnectionManager implements IConnectionManager { .promise(), this.subscriptions.unsubscribeAllByConnectionId(id), ]); + + return subscribers; }; closeConnection = async ({ id, data }: DynamoDBConnection): Promise => { diff --git a/packages/aws-lambda-graphql/src/DynamoDBSubscriptionManager.ts b/packages/aws-lambda-graphql/src/DynamoDBSubscriptionManager.ts index 025510cf..4811b088 100644 --- a/packages/aws-lambda-graphql/src/DynamoDBSubscriptionManager.ts +++ b/packages/aws-lambda-graphql/src/DynamoDBSubscriptionManager.ts @@ -309,7 +309,10 @@ export class DynamoDBSubscriptionManager implements ISubscriptionManager { } }; - unsubscribeAllByConnectionId = async (connectionId: string) => { + unsubscribeAllByConnectionId = async ( + connectionId: string, + ): Promise => { + const subscribers: ISubscriber[] = []; let cursor: DynamoDB.DocumentClient.Key | undefined; do { @@ -324,9 +327,10 @@ export class DynamoDBSubscriptionManager implements ISubscriptionManager { Limit: 12, // Maximum of 25 request items sent to DynamoDB a time }) .promise(); + subscribers.push(...(Items as ISubscriber[])); if (Items == null || (LastEvaluatedKey == null && Items.length === 0)) { - return; + return subscribers; } if (Items.length > 0) { @@ -353,6 +357,8 @@ export class DynamoDBSubscriptionManager implements ISubscriptionManager { cursor = LastEvaluatedKey; } while (cursor); + + return subscribers; }; generateSubscriptionId = ( diff --git a/packages/aws-lambda-graphql/src/MemorySubscriptionManager.ts b/packages/aws-lambda-graphql/src/MemorySubscriptionManager.ts index d1cd7235..898f4ef6 100644 --- a/packages/aws-lambda-graphql/src/MemorySubscriptionManager.ts +++ b/packages/aws-lambda-graphql/src/MemorySubscriptionManager.ts @@ -136,6 +136,6 @@ export class MemorySubscriptionManager implements ISubscriptionManager { ); } - return Promise.resolve(); + return Promise.resolve([]); }; } diff --git a/packages/aws-lambda-graphql/src/RedisConnectionManager.ts b/packages/aws-lambda-graphql/src/RedisConnectionManager.ts index b2598c28..08a9316c 100644 --- a/packages/aws-lambda-graphql/src/RedisConnectionManager.ts +++ b/packages/aws-lambda-graphql/src/RedisConnectionManager.ts @@ -144,12 +144,13 @@ export class RedisConnectionManager implements IConnectionManager { } }; - unregisterConnection = async ({ id }: IConnection): Promise => { + unregisterConnection = async ({ id }: IConnection) => { const key = prefixRedisKey(`connection:${id}`); - await Promise.all([ + const [, subscribers] = await Promise.all([ this.redisClient.del(key), this.subscriptions.unsubscribeAllByConnectionId(id), ]); + return subscribers; }; closeConnection = async ({ id, data }: IConnection): Promise => { diff --git a/packages/aws-lambda-graphql/src/RedisSubscriptionManager.ts b/packages/aws-lambda-graphql/src/RedisSubscriptionManager.ts index ecbf80a9..1d401e88 100644 --- a/packages/aws-lambda-graphql/src/RedisSubscriptionManager.ts +++ b/packages/aws-lambda-graphql/src/RedisSubscriptionManager.ts @@ -219,6 +219,7 @@ export class RedisSubscriptionManager implements ISubscriptionManager { }; unsubscribeAllByConnectionId = async (connectionId: string) => { + const subscribers: ISubscriber[] = []; let done = false; const limit = 50; let offset = 0; @@ -244,6 +245,7 @@ export class RedisSubscriptionManager implements ISubscriptionManager { const result = await this.redisClient.get(key); if (result) { subscriber = JSON.parse(result); + subscribers.push(subscriber as ISubscriber); const subscriptionId = this.generateSubscriptionId( connectionId, subscriber.operationId, @@ -270,6 +272,7 @@ export class RedisSubscriptionManager implements ISubscriptionManager { } } while (!done); await this.redisClient.del(subscriptionListKey); + return subscribers; }; generateSubscriptionId = ( diff --git a/packages/aws-lambda-graphql/src/Server.ts b/packages/aws-lambda-graphql/src/Server.ts index b7d4e025..21267d60 100644 --- a/packages/aws-lambda-graphql/src/Server.ts +++ b/packages/aws-lambda-graphql/src/Server.ts @@ -11,7 +11,7 @@ import { Context as LambdaContext, Handler as LambdaHandler, } from 'aws-lambda'; -import { isAsyncIterable } from 'iterall'; +import { isAsyncIterable, getAsyncIterator } from 'iterall'; import { ExecutionResult } from 'graphql'; import { PubSub } from 'graphql-subscriptions'; import { @@ -357,7 +357,43 @@ export class Server< onDisconnect(connection); } - await this.connectionManager.unregisterConnection(connection); + const subscribers = await this.connectionManager.unregisterConnection( + connection, + ); + + const promises = subscribers.map(async (subscriber) => { + const pubSub = new PubSub(); + + const options = await this.createGraphQLServerOptions( + event, + lambdaContext, + { + connection, + operation: subscriber.operation, + pubSub, + }, + ); + + const iterable = await execute({ + ...options, + connection, + connectionManager: this.connectionManager, + event, + lambdaContext, + operation: subscriber.operation, + pubSub, + registerSubscriptions: false, + subscriptionManager: this.subscriptionManager, + }); + + if (!isAsyncIterable(iterable)) { + return; + } + + const iterator = getAsyncIterator(iterable); + await iterator.return?.(); + }); + await Promise.all(promises); return { body: '', diff --git a/packages/aws-lambda-graphql/src/WebSocketConnectionManager.ts b/packages/aws-lambda-graphql/src/WebSocketConnectionManager.ts index 2b8a8ba4..225b0d37 100644 --- a/packages/aws-lambda-graphql/src/WebSocketConnectionManager.ts +++ b/packages/aws-lambda-graphql/src/WebSocketConnectionManager.ts @@ -77,8 +77,9 @@ export class WebSocketConnectionManager implements IConnectionManager { }); }; - unregisterConnection = async (connection: IConnection): Promise => { + unregisterConnection = async (connection: IConnection) => { this.connections.delete(connection.id); + return []; }; closeConnection = async (connection: WSConnection): Promise => { diff --git a/packages/aws-lambda-graphql/src/__tests__/Server.test.ts b/packages/aws-lambda-graphql/src/__tests__/Server.test.ts index 7831c10e..afac97a7 100644 --- a/packages/aws-lambda-graphql/src/__tests__/Server.test.ts +++ b/packages/aws-lambda-graphql/src/__tests__/Server.test.ts @@ -437,7 +437,7 @@ describe('Server', () => { {}, ); (connectionManager.unregisterConnection as jest.Mock).mockResolvedValueOnce( - undefined, + [], ); await expect( @@ -478,6 +478,9 @@ describe('Server', () => { (connectionManager.hydrateConnection as jest.Mock).mockResolvedValueOnce( {}, ); + (connectionManager.unregisterConnection as jest.Mock).mockResolvedValueOnce( + [], + ); await expect( handlerWithOnDisconnect( diff --git a/packages/aws-lambda-graphql/src/types/connection.ts b/packages/aws-lambda-graphql/src/types/connection.ts new file mode 100644 index 00000000..e6fda489 --- /dev/null +++ b/packages/aws-lambda-graphql/src/types/connection.ts @@ -0,0 +1,28 @@ +export interface IConnection { + /** + * Unique connection id + */ + readonly id: string; + + /** + * Extra connection data, this data is stored only upon registration + * All values should be JSON serializable + */ + readonly data: IConnectionData; +} + +export interface IConnectionData { + [key: string]: any; + + /** + * Connection context data provided from GQL_CONNECTION_INIT message or from onConnect method + * This data is passed to graphql resolvers' context + * All values should be JSON serializable + */ + context: Object; + + /** + * Indicates whether connection sent GQL_CONNECTION_INIT message or + */ + readonly isInitialized: boolean; +} diff --git a/packages/aws-lambda-graphql/src/types/connections.ts b/packages/aws-lambda-graphql/src/types/connections.ts index 8c83d35a..78421bd3 100644 --- a/packages/aws-lambda-graphql/src/types/connections.ts +++ b/packages/aws-lambda-graphql/src/types/connections.ts @@ -1,31 +1,7 @@ -export interface IConnection { - /** - * Unique connection id - */ - readonly id: string; - - /** - * Extra connection data, this data is stored only upon registration - * All values should be JSON serializable - */ - readonly data: IConnectionData; -} - -export interface IConnectionData { - [key: string]: any; +import { IConnection, IConnectionData } from './connection'; +import { ISubscriber } from './subscriptions'; - /** - * Connection context data provided from GQL_CONNECTION_INIT message or from onConnect method - * This data is passed to graphql resolvers' context - * All values should be JSON serializable - */ - context: Object; - - /** - * Indicates whether connection sent GQL_CONNECTION_INIT message or - */ - readonly isInitialized: boolean; -} +export { IConnection, IConnectionData }; export interface HydrateConnectionOptions { /** @@ -58,6 +34,6 @@ export interface IConnectionManager { connection: IConnection, payload: string | Buffer, ): Promise; - unregisterConnection(connection: IConnection): Promise; + unregisterConnection(connection: IConnection): Promise; closeConnection(connection: IConnection): Promise; } diff --git a/packages/aws-lambda-graphql/src/types/subscriptions.ts b/packages/aws-lambda-graphql/src/types/subscriptions.ts index 4ff90896..6a572ea4 100644 --- a/packages/aws-lambda-graphql/src/types/subscriptions.ts +++ b/packages/aws-lambda-graphql/src/types/subscriptions.ts @@ -1,4 +1,4 @@ -import { IConnection } from './connections'; +import { IConnection } from './connection'; import { IdentifiedOperationRequest, OperationRequest } from './operations'; export interface ISubscriptionEvent { @@ -51,5 +51,5 @@ export interface ISubscriptionManager { * * @param connectionId */ - unsubscribeAllByConnectionId(connectionId: string): Promise; + unsubscribeAllByConnectionId(connectionId: string): Promise; }