diff --git a/README.md b/README.md index ffa42e3..87309fa 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,9 @@ myTable: - AttributeName: id KeyType: HASH region: us-east-1 + delete: true + stream: true #optional + streamViewType: NEW_IMAGE #optional ``` ### 4. Deploy diff --git a/serverless.js b/serverless.js index a862c2e..fec34f8 100644 --- a/serverless.js +++ b/serverless.js @@ -1,9 +1,9 @@ const { mergeDeepRight, pick, equals } = require('ramda') const AWS = require('aws-sdk') const { Component } = require('@serverless/core') -const { createTable, deleteTable, describeTable, updateTable, configChanged } = require('./utils') +const { createTable, deleteTable, describeTable, updateTable, configChanged, validate, getStreamArn } = require('./utils') -const outputsList = ['name', 'arn', 'region'] +const outputsList = ['name', 'arn', 'region', 'streamArn'] const defaults = { attributeDefinitions: [ @@ -18,8 +18,11 @@ const defaults = { KeyType: 'HASH' } ], + + region: 'us-east-1', name: false, - region: 'us-east-1' + stream: false, + streamViewType: 'NEW_IMAGE' } const setTableName = (component, inputs, config) => { @@ -61,30 +64,39 @@ class AwsDynamoDb extends Component { setTableName(this, inputs, config) const prevTable = await describeTable({ dynamodb, name: this.state.name }) - if (!prevTable) { + validate.streamViewType(inputs) this.context.status('Creating') this.context.debug(`Table ${config.name} does not exist. Creating...`) - config.arn = await createTable({ dynamodb, ...config }) + const { tableArn, streamArn } = await createTable({ dynamodb, ...config }) + config.arn = tableArn + config.streamArn = streamArn } else { + validate.streamViewType(inputs) + validate.streamViewTypeUpdate(this, prevTable, inputs) this.context.debug(`Table ${config.name} already exists. Comparing config changes...`) config.arn = prevTable.arn - + config.streamArn = prevTable.streamArn + config.streamEnabled = inputs.stream + config.streamViewType = inputs.streamViewType + if (configChanged(prevTable, config)) { this.context.status('Updating') this.context.debug(`Config changed for table ${config.name}. Updating...`) - + if (!equals(prevTable.name, config.name)) { // If "delete: false", don't delete the table if (config.delete === false) { throw new Error(`You're attempting to change your table name from ${this.state.name} to ${config.name} which will result in you deleting your table, but you've specified the "delete" input to "false" which prevents your original table from being deleted.`) } + await deleteTable({ dynamodb, name: prevTable.name }) config.arn = await createTable({ dynamodb, ...config }) } else { - await updateTable({ dynamodb, ...config }) + const { streamArn } = await updateTable({ prevTable, dynamodb, ...config }) + config.streamArn = streamArn } } } @@ -95,12 +107,13 @@ class AwsDynamoDb extends Component { this.state.arn = config.arn this.state.name = config.name + this.state.stream = config.streamArn + this.state.streamViewType = config.streamViewType this.state.region = config.region this.state.delete = config.delete === false ? config.delete : true await this.save() const outputs = pick(outputsList, config) - return outputs } diff --git a/testProjects/serverless.yml b/testProjects/serverless.yml index 15c3acc..7c09ff4 100644 --- a/testProjects/serverless.yml +++ b/testProjects/serverless.yml @@ -4,4 +4,6 @@ table: component: ../ inputs: name: test-table - delete: false + delete: true + stream: true + streamViewType: NEW_IMAGE diff --git a/utils.js b/utils.js index 5b2943c..6054236 100644 --- a/utils.js +++ b/utils.js @@ -1,15 +1,69 @@ const { not, equals, pick } = require('ramda') -async function createTable({ dynamodb, name, attributeDefinitions, keySchema }) { +const validate = { + streamViewType: (inputs) => { + if (!inputs.streamViewType) { + return + } + + const validStreamTypes = ['NEW_IMAGE', 'OLD_IMAGE', 'NEW_AND_OLD_IMAGES', 'KEYS_ONLY'] + if (!validStreamTypes.includes(inputs.streamViewType)) { + throw Error(`${inputs.streamViewType} is not a valid streamViewType.`) + } + }, + + streamViewTypeUpdate: (comp, previousTable, inputs) => { + if (!previousTable.streamArn || !inputs.streamViewType) { + return + } + + if (comp.state.stream && inputs.stream && comp.state.streamViewType !== inputs.streamViewType) { + throw Error(`You cannot change the view type of an existing DynamoDB stream.`) + } + } +} + +async function getStreamArn ({dynamodb, name}) { + const maxTries = 5 + let tries = 0 + + const getStreamArn = async () => { + if (tries > maxTries) { + throw Error(`There was a problem getting the arn for your DynamoDB stream. Please try again.`) + } + + const {streamArn } = await describeTable({ dynamodb, name}) + if (!streamArn && tries <= maxTries) { + tries++ + const sleep = ms => new Promise(r => setTimeout(r,ms)) + await sleep(3000) + return await getStreamArn() + } + return streamArn + } + + return await getStreamArn() +} + +async function createTable({ dynamodb, name, attributeDefinitions, keySchema, stream, streamViewType = false }) { const res = await dynamodb .createTable({ TableName: name, AttributeDefinitions: attributeDefinitions, KeySchema: keySchema, - BillingMode: 'PAY_PER_REQUEST' + BillingMode: 'PAY_PER_REQUEST', + ...(stream && { + StreamSpecification: { + StreamEnabled: true, + StreamViewType: streamViewType + } + }) }) .promise() - return res.TableDescription.TableArn + return { + tableArn: res.TableDescription.TableArn, + streamArn: res.TableDescription.LatestStreamArn || false + } } async function describeTable({ dynamodb, name }) { @@ -21,7 +75,14 @@ async function describeTable({ dynamodb, name }) { arn: data.Table.TableArn, name: data.Table.TableName, attributeDefinitions: data.Table.AttributeDefinitions, - keySchema: data.Table.KeySchema + keySchema: data.Table.KeySchema, + streamArn: data.Table.LatestStreamArn, + streamEnabled: data.Table.StreamSpecification + ? data.Table.StreamSpecification.StreamEnabled + : false, + streamViewType: data.Table.StreamSpecification + ? data.Table.StreamSpecification.StreamViewType + : false } } catch (error) { if (error.code === 'ResourceNotFoundException') { @@ -32,15 +93,41 @@ async function describeTable({ dynamodb, name }) { } } -async function updateTable({ dynamodb, name, attributeDefinitions }) { +async function updateTable({prevTable, dynamodb, name, attributeDefinitions, stream, streamViewType }) { + const disableStream = prevTable.streamEnabled && !stream + const enableStream = !prevTable.streamEnabled && stream + const res = await dynamodb .updateTable({ TableName: name, AttributeDefinitions: attributeDefinitions, - BillingMode: 'PAY_PER_REQUEST' + BillingMode: 'PAY_PER_REQUEST', + + ...(disableStream && { + StreamSpecification: { + StreamEnabled: false + } + }), + + ...(enableStream && { + StreamSpecification: { + StreamEnabled: true, + StreamViewType: streamViewType + } + }) }) .promise() - return res.TableDescription.TableArn + + + let streamArn = res.TableDescription.LatestStreamArn || false + if (stream && !res.TableDescription.LatestStreamArn) { + streamArn = await getStreamArn({dynamodb, name}) + } + + return { + tableArn: res.TableDescription.TableArn, + streamArn + } } async function deleteTable({ dynamodb, name }) { @@ -60,16 +147,20 @@ async function deleteTable({ dynamodb, name }) { } function configChanged(prevTable, table) { - const prevInputs = pick(['name', 'attributeDefinitions'], prevTable) - const inputs = pick(['name', 'attributeDefinitions'], table) + const prevInputs = pick(['name', 'attributeDefinitions', 'streamArn', 'streamViewType', 'streamEnabled'], prevTable) + const inputs = pick(['name', 'attributeDefinitions', 'streamArn', 'streamViewType', 'streamEnabled'], table) return not(equals(inputs, prevInputs)) } + + module.exports = { createTable, describeTable, updateTable, deleteTable, - configChanged + configChanged, + validate, + getStreamArn }