Skip to content

Commit 354e114

Browse files
author
Sebastien Stormacq
committed
Merge branch 'main' into sebsto/new_repo
2 parents 672d9e1 + 8116ba3 commit 354e114

File tree

3 files changed

+357
-2
lines changed

3 files changed

+357
-2
lines changed

Sources/AWSLambdaRuntime/Lambda+LocalServer.swift

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,14 @@ internal struct LambdaHTTPServer {
301301
await self.responsePool.push(
302302
LocalServerResponse(id: requestId, final: true)
303303
)
304+
305+
// Send acknowledgment back to Lambda runtime client for streaming END
306+
// This is the single HTTP response to the chunked HTTP request
307+
try await self.sendResponse(
308+
.init(id: requestId, status: .accepted, final: true),
309+
outbound: outbound,
310+
logger: logger
311+
)
304312
} else {
305313
// process the buffered response for non streaming requests
306314
try await self.processRequestAndSendResponse(
Lines changed: 346 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,346 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftAWSLambdaRuntime open source project
4+
//
5+
// Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Logging
16+
import NIOCore
17+
import NIOHTTP1
18+
import NIOPosix
19+
import Testing
20+
21+
@testable import AWSLambdaRuntime
22+
23+
#if canImport(FoundationEssentials)
24+
import FoundationEssentials
25+
#else
26+
import Foundation
27+
#endif
28+
29+
#if canImport(FoundationNetworking)
30+
import FoundationNetworking
31+
#else
32+
import Foundation
33+
#endif
34+
35+
extension LambdaLocalServerTest {
36+
@Test("Streaming handler sends multiple chunks and completes successfully")
37+
@available(LambdaSwift 2.0, *)
38+
func testStreamingHandlerMultipleChunks() async throws {
39+
let customPort = 8090
40+
41+
// Set environment variable
42+
setenv("LOCAL_LAMBDA_PORT", "\(customPort)", 1)
43+
defer { unsetenv("LOCAL_LAMBDA_PORT") }
44+
45+
let results = try await withThrowingTaskGroup(of: StreamingTestResult.self) { group in
46+
47+
// Start the Lambda runtime with streaming handler
48+
group.addTask {
49+
struct StreamingTestHandler: StreamingLambdaHandler {
50+
func handle(
51+
_ event: ByteBuffer,
52+
responseWriter: some LambdaResponseStreamWriter,
53+
context: LambdaContext
54+
) async throws {
55+
// Send multiple chunks with delays to test streaming
56+
for i in 1...3 {
57+
try await responseWriter.write(ByteBuffer(string: "Chunk \(i)\n"))
58+
try await Task.sleep(for: .milliseconds(50))
59+
}
60+
try await responseWriter.finish()
61+
}
62+
}
63+
64+
let runtime = LambdaRuntime(
65+
handler: StreamingTestHandler()
66+
)
67+
68+
try await runtime._run()
69+
return StreamingTestResult(chunks: [], statusCode: 0, completed: false)
70+
}
71+
72+
// Start HTTP client to make streaming request
73+
group.addTask {
74+
// Give server time to start
75+
try await Task.sleep(for: .milliseconds(200))
76+
77+
return try await self.makeStreamingInvokeRequest(
78+
host: "127.0.0.1",
79+
port: customPort,
80+
payload: "\"test-event\""
81+
)
82+
}
83+
84+
// Get the first result (streaming response) and cancel the runtime
85+
let first = try await group.next()
86+
group.cancelAll()
87+
return first ?? StreamingTestResult(chunks: [], statusCode: 0, completed: false)
88+
}
89+
90+
// Verify streaming response
91+
#expect(results.statusCode == 200, "Expected 200 OK, got \(results.statusCode)")
92+
#expect(results.completed, "Streaming response should be completed")
93+
#expect(results.chunks.count >= 1, "Expected at least 1 chunk, got \(results.chunks.count)")
94+
95+
// The streaming chunks are concatenated in the HTTP response
96+
let fullResponse = results.chunks.joined()
97+
let expectedContent = "Chunk 1\nChunk 2\nChunk 3\n"
98+
#expect(fullResponse == expectedContent, "Response was '\(fullResponse)', expected '\(expectedContent)'")
99+
}
100+
101+
@Test("Multiple streaming invocations work correctly")
102+
@available(LambdaSwift 2.0, *)
103+
func testMultipleStreamingInvocations() async throws {
104+
let customPort = 8091
105+
106+
setenv("LOCAL_LAMBDA_PORT", "\(customPort)", 1)
107+
defer { unsetenv("LOCAL_LAMBDA_PORT") }
108+
109+
let results = try await withThrowingTaskGroup(of: [StreamingTestResult].self) { group in
110+
111+
// Start the Lambda runtime
112+
group.addTask {
113+
struct MultiStreamingHandler: StreamingLambdaHandler {
114+
func handle(
115+
_ event: ByteBuffer,
116+
responseWriter: some LambdaResponseStreamWriter,
117+
context: LambdaContext
118+
) async throws {
119+
let eventString = String(buffer: event)
120+
try await responseWriter.write(ByteBuffer(string: "Echo: \(eventString)\n"))
121+
try await responseWriter.finish()
122+
}
123+
}
124+
125+
let runtime = LambdaRuntime(
126+
handler: MultiStreamingHandler()
127+
)
128+
129+
try await runtime._run()
130+
return []
131+
}
132+
133+
// Make multiple streaming requests
134+
group.addTask {
135+
try await Task.sleep(for: .milliseconds(200))
136+
137+
var results: [StreamingTestResult] = []
138+
139+
// Make 3 sequential streaming requests
140+
for i in 1...3 {
141+
let result = try await self.makeStreamingInvokeRequest(
142+
host: "127.0.0.1",
143+
port: customPort,
144+
payload: "\"request-\(i)\""
145+
)
146+
results.append(result)
147+
148+
// Small delay between requests
149+
try await Task.sleep(for: .milliseconds(100))
150+
}
151+
152+
return results
153+
}
154+
155+
let first = try await group.next()
156+
group.cancelAll()
157+
return first ?? []
158+
}
159+
160+
// Verify all requests completed successfully
161+
#expect(results.count == 3, "Expected 3 responses, got \(results.count)")
162+
163+
for (index, result) in results.enumerated() {
164+
#expect(result.statusCode == 200, "Request \(index + 1) returned \(result.statusCode), expected 200")
165+
#expect(result.completed, "Request \(index + 1) should be completed")
166+
#expect(result.chunks.count == 1, "Request \(index + 1) should have 1 chunk, got \(result.chunks.count)")
167+
168+
let expectedContent = "Echo: \"request-\(index + 1)\"\n"
169+
#expect(result.chunks.first == expectedContent, "Request \(index + 1) content mismatch")
170+
}
171+
}
172+
173+
@Test("Streaming handler with custom headers works correctly")
174+
@available(LambdaSwift 2.0, *)
175+
func testStreamingHandlerWithCustomHeaders() async throws {
176+
let customPort = 8092
177+
178+
setenv("LOCAL_LAMBDA_PORT", "\(customPort)", 1)
179+
defer { unsetenv("LOCAL_LAMBDA_PORT") }
180+
181+
let results = try await withThrowingTaskGroup(of: StreamingTestResult.self) { group in
182+
183+
group.addTask {
184+
struct HeaderStreamingHandler: StreamingLambdaHandler {
185+
func handle(
186+
_ event: ByteBuffer,
187+
responseWriter: some LambdaResponseStreamWriter,
188+
context: LambdaContext
189+
) async throws {
190+
// Send custom headers
191+
try await responseWriter.writeStatusAndHeaders(
192+
StreamingLambdaStatusAndHeadersResponse(
193+
statusCode: 201,
194+
headers: [
195+
"Content-Type": "text/plain",
196+
"X-Custom-Header": "streaming-test",
197+
]
198+
)
199+
)
200+
201+
try await responseWriter.write(ByteBuffer(string: "Custom response"))
202+
try await responseWriter.finish()
203+
}
204+
}
205+
206+
let runtime = LambdaRuntime(
207+
handler: HeaderStreamingHandler()
208+
)
209+
210+
try await runtime._run()
211+
return StreamingTestResult(chunks: [], statusCode: 0, completed: false)
212+
}
213+
214+
group.addTask {
215+
try await Task.sleep(for: .milliseconds(200))
216+
217+
return try await self.makeStreamingInvokeRequest(
218+
host: "127.0.0.1",
219+
port: customPort,
220+
payload: "\"header-test\""
221+
)
222+
}
223+
224+
let first = try await group.next()
225+
group.cancelAll()
226+
return first ?? StreamingTestResult(chunks: [], statusCode: 0, completed: false)
227+
}
228+
229+
// Verify response (custom headers are returned as JSON in the response body)
230+
#expect(results.statusCode == 200, "Expected 200 OK, got \(results.statusCode)")
231+
#expect(results.completed, "Streaming response should be completed")
232+
#expect(results.chunks.count >= 1, "Expected at least 1 chunk, got \(results.chunks.count)")
233+
234+
// The response contains both the headers JSON and the content
235+
let fullResponse = results.chunks.joined()
236+
#expect(fullResponse.contains("\"statusCode\":201"), "Response should contain custom status code")
237+
#expect(
238+
fullResponse.contains("\"X-Custom-Header\":\"streaming-test\""),
239+
"Response should contain custom header"
240+
)
241+
#expect(fullResponse.contains("Custom response"), "Response should contain custom content")
242+
}
243+
244+
@Test("Streaming handler error handling works correctly")
245+
@available(LambdaSwift 2.0, *)
246+
func testStreamingHandlerErrorHandling() async throws {
247+
let customPort = 8093
248+
249+
setenv("LOCAL_LAMBDA_PORT", "\(customPort)", 1)
250+
defer { unsetenv("LOCAL_LAMBDA_PORT") }
251+
252+
let results = try await withThrowingTaskGroup(of: StreamingTestResult.self) { group in
253+
254+
group.addTask {
255+
struct ErrorStreamingHandler: StreamingLambdaHandler {
256+
func handle(
257+
_ event: ByteBuffer,
258+
responseWriter: some LambdaResponseStreamWriter,
259+
context: LambdaContext
260+
) async throws {
261+
let eventString = String(buffer: event)
262+
263+
if eventString.contains("error") {
264+
throw TestStreamingError.intentionalError
265+
}
266+
267+
try await responseWriter.write(ByteBuffer(string: "Success"))
268+
try await responseWriter.finish()
269+
}
270+
}
271+
272+
let runtime = LambdaRuntime(
273+
handler: ErrorStreamingHandler()
274+
)
275+
276+
try await runtime._run()
277+
return StreamingTestResult(chunks: [], statusCode: 0, completed: false)
278+
}
279+
280+
group.addTask {
281+
try await Task.sleep(for: .milliseconds(200))
282+
283+
return try await self.makeStreamingInvokeRequest(
284+
host: "127.0.0.1",
285+
port: customPort,
286+
payload: "\"trigger-error\""
287+
)
288+
}
289+
290+
let first = try await group.next()
291+
group.cancelAll()
292+
return first ?? StreamingTestResult(chunks: [], statusCode: 0, completed: false)
293+
}
294+
295+
// Verify error response
296+
#expect(results.statusCode == 500, "Expected 500 Internal Server Error, got \(results.statusCode)")
297+
#expect(results.completed, "Error response should be completed")
298+
}
299+
300+
// MARK: - Helper Methods
301+
302+
private func makeStreamingInvokeRequest(
303+
host: String,
304+
port: Int,
305+
payload: String
306+
) async throws -> StreamingTestResult {
307+
let url = URL(string: "http://\(host):\(port)/invoke")!
308+
var request = URLRequest(url: url)
309+
request.httpMethod = "POST"
310+
request.setValue("application/json", forHTTPHeaderField: "Content-Type")
311+
request.httpBody = payload.data(using: .utf8)
312+
request.timeoutInterval = 10.0
313+
314+
let (data, response) = try await URLSession.shared.data(for: request)
315+
316+
guard let httpResponse = response as? HTTPURLResponse else {
317+
// On Linux, create a custom error since URLError might not be available
318+
struct HTTPError: Error {
319+
let message: String
320+
}
321+
throw HTTPError(message: "Bad server response")
322+
}
323+
324+
// Parse the streaming response
325+
let responseString = String(data: data, encoding: .utf8) ?? ""
326+
let chunks = responseString.isEmpty ? [] : [responseString]
327+
328+
return StreamingTestResult(
329+
chunks: chunks,
330+
statusCode: httpResponse.statusCode,
331+
completed: true
332+
)
333+
}
334+
}
335+
336+
// MARK: - Test Support Types
337+
338+
struct StreamingTestResult {
339+
let chunks: [String]
340+
let statusCode: Int
341+
let completed: Bool
342+
}
343+
344+
enum TestStreamingError: Error {
345+
case intentionalError
346+
}

Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ import Testing
1919

2020
@testable import AWSLambdaRuntime
2121

22-
extension LambdaRuntimeTests {
23-
22+
// serialized to start only one runtime at a time
23+
@Suite(.serialized)
24+
struct LambdaLocalServerTest {
2425
@Test("Local server respects LOCAL_LAMBDA_PORT environment variable")
2526
@available(LambdaSwift 2.0, *)
2627
func testLocalServerCustomPort() async throws {

0 commit comments

Comments
 (0)