diff --git a/docs/queue.md b/docs/queue.md index 96498f35..59f457a3 100644 --- a/docs/queue.md +++ b/docs/queue.md @@ -362,6 +362,21 @@ Note you can use [partial batch failures](#partial-batch-failures) to avoid fail It is possible to set the batch size between 1 and 10. +### Max Concurrency + +```yaml +constructs: + my-queue: + # ... + maxConcurrency: 10 # The maximum number of concurrent function instances that the SQS event source can invoke is 10 +``` + +The launch of maximum concurrency for SQS as an event source allows you to control Lambda function concurrency per source. You set the maximum concurrency on the event source mapping, not on the Lambda function. + +This event source mapping setting does not change the scaling or batching behavior of Lambda with SQS. You can continue to batch messages with a customized batch size and window. It rather sets a limit on the maximum number of concurrent function invocations per SQS event source. Once Lambda scales and reaches the maximum concurrency configured on the event source, Lambda stops reading more messages from the queue. This feature also provides you with the flexibility to define the maximum concurrency for individual event sources when the Lambda function has multiple event sources. + +It is possible to set the `maxConcurrency` between 2 and 10000. + ### Maximum Batching Window ```yaml diff --git a/package.json b/package.json index 12637b69..b4a20995 100644 --- a/package.json +++ b/package.json @@ -23,7 +23,7 @@ }, "devDependencies": { "@serverless/test": "^11.0.1", - "@serverless/typescript": "^3.21.0", + "@serverless/typescript": "^3.27.0", "@types/chai": "^4.2.21", "@types/inquirer": "^7.3.3", "@types/jest": "^27.0.1", @@ -49,7 +49,7 @@ "lint-staged": "^11.0.0", "nodemon": "^2.0.10", "prettier": "^2.3.2", - "serverless": "^3.21.0", + "serverless": "^3.28.0", "sinon": "^11.1.1", "stdout-stderr": "^0.1.13", "ts-jest": "^27.0.3", diff --git a/src/constructs/aws/Queue.ts b/src/constructs/aws/Queue.ts index 8bf0f4d5..22b3aaee 100644 --- a/src/constructs/aws/Queue.ts +++ b/src/constructs/aws/Queue.ts @@ -50,6 +50,11 @@ const QUEUE_DEFINITION = { minimum: 0, maximum: 300, }, + maxConcurrency: { + type: "number", + minimum: 2, + maximum: 1000, + }, fifo: { type: "boolean" }, delay: { type: "number" }, encryption: { type: "string" }, @@ -301,6 +306,7 @@ export class Queue extends AwsConstruct { // The default batch size is 1 const batchSize = this.configuration.batchSize ?? 1; const maximumBatchingWindow = this.getMaximumBatchingWindow(); + const maximumConcurrency = this.configuration.maxConcurrency; // Override events for the worker this.configuration.worker.events = [ @@ -310,6 +316,7 @@ export class Queue extends AwsConstruct { arn: this.queue.queueArn, batchSize: batchSize, maximumBatchingWindow: maximumBatchingWindow, + maximumConcurrency: maximumConcurrency, functionResponseType: "ReportBatchItemFailures", }, }, diff --git a/test/unit/queues.test.ts b/test/unit/queues.test.ts index 35076fd1..0d837c45 100644 --- a/test/unit/queues.test.ts +++ b/test/unit/queues.test.ts @@ -226,6 +226,32 @@ describe("queues", () => { }); }); + it("allows changing the max concurrency", async () => { + const { cfTemplate, serverless } = await runServerless({ + fixture: "queues", + configExt: merge({}, pluginConfigExt, { + constructs: { + emails: { + maxConcurrency: 10, + }, + }, + }), + command: "package", + }); + const serverlessVersion = serverless.version as string; + if (serverlessVersion.startsWith("3")) { + expect(cfTemplate.Resources.EmailsWorkerEventSourceMappingSQSEmailsQueueF057328A).toMatchObject({ + Properties: { + ScalingConfig: { + MaximumConcurrency: 10, + }, + }, + }); + } else { + expect(true).toEqual(true); + } + }); + it("allows changing the delivery delay", async () => { const { cfTemplate, computeLogicalId } = await runServerless({ fixture: "queues",