From 6dfd145b5fe07b19731de3a0b4d7022c5e8e3387 Mon Sep 17 00:00:00 2001 From: Gary Jennings Date: Mon, 26 Aug 2019 13:53:35 -0600 Subject: [PATCH 1/6] added stream support --- serverless.js | 12 ++++++++---- utils.js | 33 ++++++++++++++++++++++++++------- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/serverless.js b/serverless.js index d177b1f..34e3bfe 100644 --- a/serverless.js +++ b/serverless.js @@ -3,7 +3,7 @@ const AWS = require('aws-sdk') const { Component } = require('@serverless/core') const { createTable, deleteTable, describeTable, updateTable, configChanged } = require('./utils') -const outputsList = ['name', 'arn', 'region'] +const outputsList = ['name', 'arn', 'region', 'stream'] const defaults = { attributeDefinitions: [ @@ -18,7 +18,8 @@ const defaults = { KeyType: 'HASH' } ], - region: 'us-east-1' + region: 'us-east-1', + stream: false } class AwsDynamoDb extends Component { @@ -47,11 +48,14 @@ class AwsDynamoDb extends Component { this.context.status('Creating') this.context.debug(`Table ${config.name} does not exist. Creating...`) - config.arn = await createTable({ dynamodb, ...config }) + const createResponse = await createTable({ dynamodb, ...config }) + config.arn = createResponse.tableArn + config.stream = createResponse.streamArn } else { this.context.debug(`Table ${config.name} already exists. Comparing config changes...`) config.arn = prevTable.arn + config.stream = prevTable.streamArn if (configChanged(prevTable, config)) { this.context.status('Updating') @@ -72,11 +76,11 @@ class AwsDynamoDb extends Component { this.state.arn = config.arn this.state.name = config.name + this.state.stream = config.stream this.state.region = config.region await this.save() const outputs = pick(outputsList, config) - return outputs } diff --git a/utils.js b/utils.js index 5b2943c..9e92614 100644 --- a/utils.js +++ b/utils.js @@ -1,15 +1,24 @@ const { not, equals, pick } = require('ramda') -async function createTable({ dynamodb, name, attributeDefinitions, keySchema }) { +async function createTable({ dynamodb, name, attributeDefinitions, keySchema, stream }) { const res = await dynamodb .createTable({ TableName: name, AttributeDefinitions: attributeDefinitions, KeySchema: keySchema, - BillingMode: 'PAY_PER_REQUEST' + BillingMode: 'PAY_PER_REQUEST', + ...(stream && { + StreamSpecification: { + StreamEnabled: true, + StreamViewType: 'NEW_IMAGE' + } + }) }) .promise() - return res.TableDescription.TableArn + return { + tableArn: res.TableDescription.TableArn, + streamArn: res.TableDescription.LatestStreamArn || false + } } async function describeTable({ dynamodb, name }) { @@ -21,7 +30,8 @@ async function describeTable({ dynamodb, name }) { arn: data.Table.TableArn, name: data.Table.TableName, attributeDefinitions: data.Table.AttributeDefinitions, - keySchema: data.Table.KeySchema + keySchema: data.Table.KeySchema, + streamArn: data.Table.LatestStreamArn } } catch (error) { if (error.code === 'ResourceNotFoundException') { @@ -32,15 +42,24 @@ async function describeTable({ dynamodb, name }) { } } -async function updateTable({ dynamodb, name, attributeDefinitions }) { +async function updateTable({ dynamodb, name, attributeDefinitions, stream }) { const res = await dynamodb .updateTable({ TableName: name, AttributeDefinitions: attributeDefinitions, - BillingMode: 'PAY_PER_REQUEST' + BillingMode: 'PAY_PER_REQUEST', + ...(stream && { + StreamSpecification: { + StreamEnabled: true, + StreamViewType: 'NEW_IMAGE' + } + }) }) .promise() - return res.TableDescription.TableArn + return { + tableArn: res.TableDescription.TableArn, + streamArn: res.TableDescription.LatestStreamArn || false + } } async function deleteTable({ dynamodb, name }) { From acf543d331a68a0f76d57aa699ded86dc35b4441 Mon Sep 17 00:00:00 2001 From: Gary Jennings Date: Tue, 27 Aug 2019 10:05:58 -0600 Subject: [PATCH 2/6] added stream turnerary in updateTable --- serverless.js | 1 + utils.js | 16 ++++++++++------ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/serverless.js b/serverless.js index 34e3bfe..b2c4041 100644 --- a/serverless.js +++ b/serverless.js @@ -65,6 +65,7 @@ class AwsDynamoDb extends Component { await deleteTable({ dynamodb, name: prevTable.name }) config.arn = await createTable({ dynamodb, ...config }) } else { + this.context.status('IN UPDATE BRANCH - Updating') await updateTable({ dynamodb, ...config }) } } diff --git a/utils.js b/utils.js index 9e92614..2957b73 100644 --- a/utils.js +++ b/utils.js @@ -48,12 +48,16 @@ async function updateTable({ dynamodb, name, attributeDefinitions, stream }) { TableName: name, AttributeDefinitions: attributeDefinitions, BillingMode: 'PAY_PER_REQUEST', - ...(stream && { - StreamSpecification: { - StreamEnabled: true, - StreamViewType: 'NEW_IMAGE' - } - }) + StreamSpecification: { + ...(stream + ? { + StreamEnabled: true, + StreamViewType: 'NEW_IMAGE' + } + : { + StreamEnabled: false + }) + } }) .promise() return { From aa4bf7ff1f6d05e1454ab0859b76660c183b5028 Mon Sep 17 00:00:00 2001 From: Gary Jennings Date: Mon, 2 Dec 2019 19:22:56 -0700 Subject: [PATCH 3/6] commit-before-merge --- serverless.js | 1 - 1 file changed, 1 deletion(-) diff --git a/serverless.js b/serverless.js index b2c4041..34e3bfe 100644 --- a/serverless.js +++ b/serverless.js @@ -65,7 +65,6 @@ class AwsDynamoDb extends Component { await deleteTable({ dynamodb, name: prevTable.name }) config.arn = await createTable({ dynamodb, ...config }) } else { - this.context.status('IN UPDATE BRANCH - Updating') await updateTable({ dynamodb, ...config }) } } From 46a675e08f1b1693052987d5574eb2cc6f5c0070 Mon Sep 17 00:00:00 2001 From: Gary Jennings Date: Thu, 5 Dec 2019 20:02:33 -0700 Subject: [PATCH 4/6] cleaned up stream code, added example to README --- testProjects/serverless.yml | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/testProjects/serverless.yml b/testProjects/serverless.yml index 712a806..c69a995 100644 --- a/testProjects/serverless.yml +++ b/testProjects/serverless.yml @@ -4,6 +4,16 @@ table: component: ../ inputs: name: test-table + attributeDefinitions: + - AttributeName: ID + AttributeType: S + - AttributeName: SORT + AttributeType: S + keySchema: + - AttributeName: ID + KeyType: HASH + - AttributeName: SORT + KeyType: RANGE delete: true stream: true - streamViewType: OLD_IMAGE + streamViewType: NEW_IMAGE From 68a7b58edc6ff5905bfcf89fae0986cade0ca930 Mon Sep 17 00:00:00 2001 From: Gary Jennings Date: Thu, 5 Dec 2019 20:06:43 -0700 Subject: [PATCH 5/6] updated test yml file --- testProjects/serverless.yml | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/testProjects/serverless.yml b/testProjects/serverless.yml index c69a995..7c09ff4 100644 --- a/testProjects/serverless.yml +++ b/testProjects/serverless.yml @@ -4,16 +4,6 @@ table: component: ../ inputs: name: test-table - attributeDefinitions: - - AttributeName: ID - AttributeType: S - - AttributeName: SORT - AttributeType: S - keySchema: - - AttributeName: ID - KeyType: HASH - - AttributeName: SORT - KeyType: RANGE delete: true stream: true streamViewType: NEW_IMAGE From 7ac573aa098d65330094ffb4ff06f2ad4120a33c Mon Sep 17 00:00:00 2001 From: Gary Jennings Date: Thu, 5 Dec 2019 20:13:58 -0700 Subject: [PATCH 6/6] added serverless.js and utils to commit --- README.md | 3 ++ serverless.js | 18 +++++------ utils.js | 86 +++++++++++++++++++++++++++------------------------ 3 files changed, 57 insertions(+), 50 deletions(-) diff --git a/README.md b/README.md index ffa42e3..87309fa 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,9 @@ myTable: - AttributeName: id KeyType: HASH region: us-east-1 + delete: true + stream: true #optional + streamViewType: NEW_IMAGE #optional ``` ### 4. Deploy diff --git a/serverless.js b/serverless.js index 8bc2efd..fec34f8 100644 --- a/serverless.js +++ b/serverless.js @@ -65,16 +65,16 @@ class AwsDynamoDb extends Component { const prevTable = await describeTable({ dynamodb, name: this.state.name }) if (!prevTable) { - validate.stream(inputs) + validate.streamViewType(inputs) this.context.status('Creating') this.context.debug(`Table ${config.name} does not exist. Creating...`) - const createResponse = await createTable({ dynamodb, ...config }) - config.arn = createResponse.tableArn - config.streamArn = createResponse.streamArn + const { tableArn, streamArn } = await createTable({ dynamodb, ...config }) + config.arn = tableArn + config.streamArn = streamArn } else { - validate.stream(inputs) - validate.streamViewType(this, prevTable, inputs) + validate.streamViewType(inputs) + validate.streamViewTypeUpdate(this, prevTable, inputs) this.context.debug(`Table ${config.name} already exists. Comparing config changes...`) config.arn = prevTable.arn @@ -95,10 +95,8 @@ class AwsDynamoDb extends Component { await deleteTable({ dynamodb, name: prevTable.name }) config.arn = await createTable({ dynamodb, ...config }) } else { - const {streamArn} = await updateTable({ prevTable, dynamodb, ...config }) - config.streamArn = !config.streamEnabled - ? false - : streamArn || await getStreamArn({dynamodb, name: this.state.name, config}) + const { streamArn } = await updateTable({ prevTable, dynamodb, ...config }) + config.streamArn = streamArn } } } diff --git a/utils.js b/utils.js index 57f4a86..6054236 100644 --- a/utils.js +++ b/utils.js @@ -1,7 +1,7 @@ const { not, equals, pick } = require('ramda') const validate = { - stream: (inputs) => { + streamViewType: (inputs) => { if (!inputs.streamViewType) { return } @@ -12,7 +12,7 @@ const validate = { } }, - streamViewType: (comp, previousTable, inputs) => { + streamViewTypeUpdate: (comp, previousTable, inputs) => { if (!previousTable.streamArn || !inputs.streamViewType) { return } @@ -23,6 +23,28 @@ const validate = { } } +async function getStreamArn ({dynamodb, name}) { + const maxTries = 5 + let tries = 0 + + const getStreamArn = async () => { + if (tries > maxTries) { + throw Error(`There was a problem getting the arn for your DynamoDB stream. Please try again.`) + } + + const {streamArn } = await describeTable({ dynamodb, name}) + if (!streamArn && tries <= maxTries) { + tries++ + const sleep = ms => new Promise(r => setTimeout(r,ms)) + await sleep(3000) + return await getStreamArn() + } + return streamArn + } + + return await getStreamArn() +} + async function createTable({ dynamodb, name, attributeDefinitions, keySchema, stream, streamViewType = false }) { const res = await dynamodb .createTable({ @@ -72,31 +94,39 @@ async function describeTable({ dynamodb, name }) { } async function updateTable({prevTable, dynamodb, name, attributeDefinitions, stream, streamViewType }) { - const enableStream = prevTable.streamArn && !stream - ? false - : true + const disableStream = prevTable.streamEnabled && !stream + const enableStream = !prevTable.streamEnabled && stream const res = await dynamodb .updateTable({ TableName: name, AttributeDefinitions: attributeDefinitions, BillingMode: 'PAY_PER_REQUEST', - StreamSpecification: { - ...(enableStream - ? { - StreamEnabled: true, - StreamViewType: streamViewType - } - : { - StreamEnabled: false - }) - } + + ...(disableStream && { + StreamSpecification: { + StreamEnabled: false + } + }), + + ...(enableStream && { + StreamSpecification: { + StreamEnabled: true, + StreamViewType: streamViewType + } + }) }) .promise() + + let streamArn = res.TableDescription.LatestStreamArn || false + if (stream && !res.TableDescription.LatestStreamArn) { + streamArn = await getStreamArn({dynamodb, name}) + } + return { tableArn: res.TableDescription.TableArn, - streamArn: res.TableDescription.LatestStreamArn || false + streamArn } } @@ -123,31 +153,7 @@ function configChanged(prevTable, table) { return not(equals(inputs, prevInputs)) } -async function getStreamArn ({dynamodb, name, config}) { - if (!config.streamEnabled) { - return false - } - - const maxTries = 5 - let tries = 0 - const getStreamArn = async () => { - if (tries > maxTries) { - throw Error(`There was a problem getting the arn for your DynamoDB stream. Please try again.`) - } - - const {streamArn } = await describeTable({ dynamodb, name}) - if (!streamArn && tries <= maxTries) { - tries++ - const sleep = ms => new Promise(r => setTimeout(r,ms)) - await sleep(3000) - return await getStreamArn() - } - return streamArn - } - - return await getStreamArn() -} module.exports = { createTable,