Skip to content

Commit ccf1813

Browse files
author
Sebastien Stormacq
committed
Allow multiple invocation of streaming Lambda functions with the local http server
1 parent 9487a09 commit ccf1813

File tree

2 files changed

+356
-0
lines changed

2 files changed

+356
-0
lines changed

Sources/AWSLambdaRuntime/Lambda+LocalServer.swift

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,14 @@ internal struct LambdaHTTPServer {
301301
await self.responsePool.push(
302302
LocalServerResponse(id: requestId, final: true)
303303
)
304+
305+
// Send acknowledgment back to Lambda runtime client for streaming END
306+
// This is the single HTTP response to the chunked HTTP request
307+
try await self.sendResponse(
308+
.init(id: requestId, status: .accepted, final: true),
309+
outbound: outbound,
310+
logger: logger
311+
)
304312
} else {
305313
// process the buffered response for non streaming requests
306314
try await self.processRequestAndSendResponse(
Lines changed: 348 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,348 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftAWSLambdaRuntime open source project
4+
//
5+
// Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Logging
16+
import NIOCore
17+
import NIOHTTP1
18+
import NIOPosix
19+
import Testing
20+
21+
@testable import AWSLambdaRuntime
22+
23+
#if canImport(FoundationEssentials)
24+
import FoundationEssentials
25+
#else
26+
import Foundation
27+
#endif
28+
29+
#if canImport(FoundationNetworking)
30+
import FoundationNetworking
31+
#else
32+
import Foundation
33+
#endif
34+
35+
@Suite(.serialized)
36+
struct LambdaStreamingTests {
37+
38+
@Test("Streaming handler sends multiple chunks and completes successfully")
39+
@available(LambdaSwift 2.0, *)
40+
func testStreamingHandlerMultipleChunks() async throws {
41+
let customPort = 8090
42+
43+
// Set environment variable
44+
setenv("LOCAL_LAMBDA_PORT", "\(customPort)", 1)
45+
defer { unsetenv("LOCAL_LAMBDA_PORT") }
46+
47+
let results = try await withThrowingTaskGroup(of: StreamingTestResult.self) { group in
48+
49+
// Start the Lambda runtime with streaming handler
50+
group.addTask {
51+
struct StreamingTestHandler: StreamingLambdaHandler {
52+
func handle(
53+
_ event: ByteBuffer,
54+
responseWriter: some LambdaResponseStreamWriter,
55+
context: LambdaContext
56+
) async throws {
57+
// Send multiple chunks with delays to test streaming
58+
for i in 1...3 {
59+
try await responseWriter.write(ByteBuffer(string: "Chunk \(i)\n"))
60+
try await Task.sleep(for: .milliseconds(50))
61+
}
62+
try await responseWriter.finish()
63+
}
64+
}
65+
66+
let runtime = LambdaRuntime(
67+
handler: StreamingTestHandler()
68+
)
69+
70+
try await runtime._run()
71+
return StreamingTestResult(chunks: [], statusCode: 0, completed: false)
72+
}
73+
74+
// Start HTTP client to make streaming request
75+
group.addTask {
76+
// Give server time to start
77+
try await Task.sleep(for: .milliseconds(200))
78+
79+
return try await self.makeStreamingInvokeRequest(
80+
host: "127.0.0.1",
81+
port: customPort,
82+
payload: "\"test-event\""
83+
)
84+
}
85+
86+
// Get the first result (streaming response) and cancel the runtime
87+
let first = try await group.next()
88+
group.cancelAll()
89+
return first ?? StreamingTestResult(chunks: [], statusCode: 0, completed: false)
90+
}
91+
92+
// Verify streaming response
93+
#expect(results.statusCode == 200, "Expected 200 OK, got \(results.statusCode)")
94+
#expect(results.completed, "Streaming response should be completed")
95+
#expect(results.chunks.count >= 1, "Expected at least 1 chunk, got \(results.chunks.count)")
96+
97+
// The streaming chunks are concatenated in the HTTP response
98+
let fullResponse = results.chunks.joined()
99+
let expectedContent = "Chunk 1\nChunk 2\nChunk 3\n"
100+
#expect(fullResponse == expectedContent, "Response was '\(fullResponse)', expected '\(expectedContent)'")
101+
}
102+
103+
@Test("Multiple streaming invocations work correctly")
104+
@available(LambdaSwift 2.0, *)
105+
func testMultipleStreamingInvocations() async throws {
106+
let customPort = 8091
107+
108+
setenv("LOCAL_LAMBDA_PORT", "\(customPort)", 1)
109+
defer { unsetenv("LOCAL_LAMBDA_PORT") }
110+
111+
let results = try await withThrowingTaskGroup(of: [StreamingTestResult].self) { group in
112+
113+
// Start the Lambda runtime
114+
group.addTask {
115+
struct MultiStreamingHandler: StreamingLambdaHandler {
116+
func handle(
117+
_ event: ByteBuffer,
118+
responseWriter: some LambdaResponseStreamWriter,
119+
context: LambdaContext
120+
) async throws {
121+
let eventString = String(buffer: event)
122+
try await responseWriter.write(ByteBuffer(string: "Echo: \(eventString)\n"))
123+
try await responseWriter.finish()
124+
}
125+
}
126+
127+
let runtime = LambdaRuntime(
128+
handler: MultiStreamingHandler()
129+
)
130+
131+
try await runtime._run()
132+
return []
133+
}
134+
135+
// Make multiple streaming requests
136+
group.addTask {
137+
try await Task.sleep(for: .milliseconds(200))
138+
139+
var results: [StreamingTestResult] = []
140+
141+
// Make 3 sequential streaming requests
142+
for i in 1...3 {
143+
let result = try await self.makeStreamingInvokeRequest(
144+
host: "127.0.0.1",
145+
port: customPort,
146+
payload: "\"request-\(i)\""
147+
)
148+
results.append(result)
149+
150+
// Small delay between requests
151+
try await Task.sleep(for: .milliseconds(100))
152+
}
153+
154+
return results
155+
}
156+
157+
let first = try await group.next()
158+
group.cancelAll()
159+
return first ?? []
160+
}
161+
162+
// Verify all requests completed successfully
163+
#expect(results.count == 3, "Expected 3 responses, got \(results.count)")
164+
165+
for (index, result) in results.enumerated() {
166+
#expect(result.statusCode == 200, "Request \(index + 1) returned \(result.statusCode), expected 200")
167+
#expect(result.completed, "Request \(index + 1) should be completed")
168+
#expect(result.chunks.count == 1, "Request \(index + 1) should have 1 chunk, got \(result.chunks.count)")
169+
170+
let expectedContent = "Echo: \"request-\(index + 1)\"\n"
171+
#expect(result.chunks.first == expectedContent, "Request \(index + 1) content mismatch")
172+
}
173+
}
174+
175+
@Test("Streaming handler with custom headers works correctly")
176+
@available(LambdaSwift 2.0, *)
177+
func testStreamingHandlerWithCustomHeaders() async throws {
178+
let customPort = 8092
179+
180+
setenv("LOCAL_LAMBDA_PORT", "\(customPort)", 1)
181+
defer { unsetenv("LOCAL_LAMBDA_PORT") }
182+
183+
let results = try await withThrowingTaskGroup(of: StreamingTestResult.self) { group in
184+
185+
group.addTask {
186+
struct HeaderStreamingHandler: StreamingLambdaHandler {
187+
func handle(
188+
_ event: ByteBuffer,
189+
responseWriter: some LambdaResponseStreamWriter,
190+
context: LambdaContext
191+
) async throws {
192+
// Send custom headers
193+
try await responseWriter.writeStatusAndHeaders(
194+
StreamingLambdaStatusAndHeadersResponse(
195+
statusCode: 201,
196+
headers: [
197+
"Content-Type": "text/plain",
198+
"X-Custom-Header": "streaming-test",
199+
]
200+
)
201+
)
202+
203+
try await responseWriter.write(ByteBuffer(string: "Custom response"))
204+
try await responseWriter.finish()
205+
}
206+
}
207+
208+
let runtime = LambdaRuntime(
209+
handler: HeaderStreamingHandler()
210+
)
211+
212+
try await runtime._run()
213+
return StreamingTestResult(chunks: [], statusCode: 0, completed: false)
214+
}
215+
216+
group.addTask {
217+
try await Task.sleep(for: .milliseconds(200))
218+
219+
return try await self.makeStreamingInvokeRequest(
220+
host: "127.0.0.1",
221+
port: customPort,
222+
payload: "\"header-test\""
223+
)
224+
}
225+
226+
let first = try await group.next()
227+
group.cancelAll()
228+
return first ?? StreamingTestResult(chunks: [], statusCode: 0, completed: false)
229+
}
230+
231+
// Verify response (custom headers are returned as JSON in the response body)
232+
#expect(results.statusCode == 200, "Expected 200 OK, got \(results.statusCode)")
233+
#expect(results.completed, "Streaming response should be completed")
234+
#expect(results.chunks.count >= 1, "Expected at least 1 chunk, got \(results.chunks.count)")
235+
236+
// The response contains both the headers JSON and the content
237+
let fullResponse = results.chunks.joined()
238+
#expect(fullResponse.contains("\"statusCode\":201"), "Response should contain custom status code")
239+
#expect(
240+
fullResponse.contains("\"X-Custom-Header\":\"streaming-test\""),
241+
"Response should contain custom header"
242+
)
243+
#expect(fullResponse.contains("Custom response"), "Response should contain custom content")
244+
}
245+
246+
@Test("Streaming handler error handling works correctly")
247+
@available(LambdaSwift 2.0, *)
248+
func testStreamingHandlerErrorHandling() async throws {
249+
let customPort = 8093
250+
251+
setenv("LOCAL_LAMBDA_PORT", "\(customPort)", 1)
252+
defer { unsetenv("LOCAL_LAMBDA_PORT") }
253+
254+
let results = try await withThrowingTaskGroup(of: StreamingTestResult.self) { group in
255+
256+
group.addTask {
257+
struct ErrorStreamingHandler: StreamingLambdaHandler {
258+
func handle(
259+
_ event: ByteBuffer,
260+
responseWriter: some LambdaResponseStreamWriter,
261+
context: LambdaContext
262+
) async throws {
263+
let eventString = String(buffer: event)
264+
265+
if eventString.contains("error") {
266+
throw TestStreamingError.intentionalError
267+
}
268+
269+
try await responseWriter.write(ByteBuffer(string: "Success"))
270+
try await responseWriter.finish()
271+
}
272+
}
273+
274+
let runtime = LambdaRuntime(
275+
handler: ErrorStreamingHandler()
276+
)
277+
278+
try await runtime._run()
279+
return StreamingTestResult(chunks: [], statusCode: 0, completed: false)
280+
}
281+
282+
group.addTask {
283+
try await Task.sleep(for: .milliseconds(200))
284+
285+
return try await self.makeStreamingInvokeRequest(
286+
host: "127.0.0.1",
287+
port: customPort,
288+
payload: "\"trigger-error\""
289+
)
290+
}
291+
292+
let first = try await group.next()
293+
group.cancelAll()
294+
return first ?? StreamingTestResult(chunks: [], statusCode: 0, completed: false)
295+
}
296+
297+
// Verify error response
298+
#expect(results.statusCode == 500, "Expected 500 Internal Server Error, got \(results.statusCode)")
299+
#expect(results.completed, "Error response should be completed")
300+
}
301+
302+
// MARK: - Helper Methods
303+
304+
private func makeStreamingInvokeRequest(
305+
host: String,
306+
port: Int,
307+
payload: String
308+
) async throws -> StreamingTestResult {
309+
let url = URL(string: "http://\(host):\(port)/invoke")!
310+
var request = URLRequest(url: url)
311+
request.httpMethod = "POST"
312+
request.setValue("application/json", forHTTPHeaderField: "Content-Type")
313+
request.httpBody = payload.data(using: .utf8)
314+
request.timeoutInterval = 10.0
315+
316+
let (data, response) = try await URLSession.shared.data(for: request)
317+
318+
guard let httpResponse = response as? HTTPURLResponse else {
319+
// On Linux, create a custom error since URLError might not be available
320+
struct HTTPError: Error {
321+
let message: String
322+
}
323+
throw HTTPError(message: "Bad server response")
324+
}
325+
326+
// Parse the streaming response
327+
let responseString = String(data: data, encoding: .utf8) ?? ""
328+
let chunks = responseString.isEmpty ? [] : [responseString]
329+
330+
return StreamingTestResult(
331+
chunks: chunks,
332+
statusCode: httpResponse.statusCode,
333+
completed: true
334+
)
335+
}
336+
}
337+
338+
// MARK: - Test Support Types
339+
340+
struct StreamingTestResult {
341+
let chunks: [String]
342+
let statusCode: Int
343+
let completed: Bool
344+
}
345+
346+
enum TestStreamingError: Error {
347+
case intentionalError
348+
}

0 commit comments

Comments
 (0)