@@ -3,12 +3,20 @@ package io.github.trueangle.knative.lambda.runtime
3
3
import io.github.trueangle.knative.lambda.runtime.LambdaEnvironmentException.NonRecoverableStateException
4
4
import io.github.trueangle.knative.lambda.runtime.api.Context
5
5
import io.github.trueangle.knative.lambda.runtime.api.LambdaClient
6
+ import io.github.trueangle.knative.lambda.runtime.api.LambdaClientImpl
6
7
import io.github.trueangle.knative.lambda.runtime.handler.LambdaBufferedHandler
7
8
import io.github.trueangle.knative.lambda.runtime.handler.LambdaHandler
8
9
import io.github.trueangle.knative.lambda.runtime.handler.LambdaStreamHandler
9
10
import io.github.trueangle.knative.lambda.runtime.log.KtorLogger
11
+ import io.github.trueangle.knative.lambda.runtime.log.LambdaLogger
10
12
import io.github.trueangle.knative.lambda.runtime.log.Log
13
+ import io.github.trueangle.knative.lambda.runtime.log.debug
14
+ import io.github.trueangle.knative.lambda.runtime.log.error
15
+ import io.github.trueangle.knative.lambda.runtime.log.fatal
16
+ import io.github.trueangle.knative.lambda.runtime.log.info
17
+ import io.github.trueangle.knative.lambda.runtime.log.warn
11
18
import io.ktor.client.HttpClient
19
+ import io.ktor.client.engine.HttpClientEngine
12
20
import io.ktor.client.engine.curl.Curl
13
21
import io.ktor.client.plugins.HttpTimeout
14
22
import io.ktor.client.plugins.contentnegotiation.ContentNegotiation
@@ -22,13 +30,20 @@ import io.ktor.utils.io.writeStringUtf8
22
30
import kotlinx.coroutines.runBlocking
23
31
import kotlinx.serialization.ExperimentalSerializationApi
24
32
import kotlinx.serialization.json.Json
25
- import kotlin.system.exitProcess
26
33
27
34
object LambdaRuntime {
28
35
@OptIn(ExperimentalSerializationApi ::class )
29
36
internal val json = Json { explicitNulls = false }
30
37
31
- private val httpClient = HttpClient (Curl ) {
38
+ inline fun <reified I , reified O > run (crossinline initHandler : () -> LambdaHandler <I , O >) = runBlocking {
39
+ val curlHttpClient = createHttpClient(Curl .create())
40
+ val lambdaClient = LambdaClientImpl (curlHttpClient)
41
+
42
+ Runner (client = lambdaClient, log = Log ).run (false , initHandler)
43
+ }
44
+
45
+ @PublishedApi
46
+ internal fun createHttpClient (engine : HttpClientEngine ) = HttpClient (engine) {
32
47
install(HttpTimeout )
33
48
install(ContentNegotiation ) { json(json) }
34
49
install(Logging ) {
@@ -39,86 +54,93 @@ object LambdaRuntime {
39
54
filter { ! it.headers.contains(" Lambda-Runtime-Function-Response-Mode" , " streaming" ) }
40
55
}
41
56
}
57
+ }
42
58
43
- @PublishedApi
44
- internal val client = LambdaClient (httpClient)
45
-
46
- inline fun <reified I , reified O > run (crossinline initHandler : () -> LambdaHandler <I , O >) = runBlocking {
59
+ @PublishedApi
60
+ internal class Runner (
61
+ val client : LambdaClient ,
62
+ val log : LambdaLogger ,
63
+ val env : LambdaEnvironment = LambdaEnvironment ()
64
+ ) {
65
+ suspend inline fun <reified I , reified O > run (singleEventMode : Boolean = false, crossinline initHandler : () -> LambdaHandler <I , O >) {
47
66
val handler = try {
48
- Log .info(" Initializing Kotlin Native Lambda Runtime" )
67
+ log .info(" Initializing Kotlin Native Lambda Runtime" )
49
68
50
69
initHandler()
51
70
} catch (e: Exception ) {
52
- Log .fatal(e)
71
+ log .fatal(e)
53
72
54
73
client.reportError(e.asInitError())
55
- exitProcess(1 )
74
+
75
+ env.terminate()
56
76
}
57
77
58
78
val handlerName = handler::class .simpleName
59
79
val inputTypeInfo = typeInfo<I >()
60
80
val outputTypeInfo = typeInfo<O >()
61
81
62
- while (true ) {
82
+ var shouldExit = false
83
+ while (! shouldExit) {
63
84
try {
64
- Log .info(" Runtime is ready for a new event" )
85
+ log .info(" Runtime is ready for a new event" )
65
86
66
87
val (event, context) = client.retrieveNextEvent<I >(inputTypeInfo)
67
88
68
- with (Log ) {
89
+ with (log ) {
69
90
setContext(context)
70
91
71
92
debug(event)
72
93
debug(context)
94
+ info(" $handlerName invocation started" )
73
95
}
74
96
75
- Log .info(" $handlerName invocation started" )
76
-
77
97
if (handler is LambdaStreamHandler <I , * >) {
78
98
val response = streamingResponse { handler.handleRequest(event, it, context) }
79
99
80
- Log .info(" $handlerName started response streaming" )
100
+ log .info(" $handlerName started response streaming" )
81
101
82
102
client.streamResponse(context, response)
83
103
} else {
84
104
handler as LambdaBufferedHandler <I , O >
85
105
val response = bufferedResponse(context) { handler.handleRequest(event, context) }
86
106
87
- Log .info(" $handlerName invocation completed" )
88
- Log .debug(response)
107
+ log .info(" $handlerName invocation completed" )
108
+ log .debug(response)
89
109
90
110
client.sendResponse(context, response, outputTypeInfo)
91
111
}
92
112
} catch (e: LambdaRuntimeException ) {
93
- Log .error(e)
113
+ log.error(e)
114
+
94
115
client.reportError(e)
95
116
} catch (e: LambdaEnvironmentException ) {
96
117
when (e) {
97
118
is NonRecoverableStateException -> {
98
- Log .fatal(e)
119
+ log .fatal(e)
99
120
100
- exitProcess( 1 )
121
+ env.terminate( )
101
122
}
102
123
103
- else -> Log .error(e)
124
+ else -> log .error(e)
104
125
}
105
126
} catch (e: Throwable ) {
106
- Log .fatal(e)
127
+ log.fatal(e)
128
+
129
+ env.terminate()
130
+ }
107
131
108
- exitProcess(1 )
132
+ if (singleEventMode) {
133
+ shouldExit = singleEventMode
109
134
}
110
135
}
111
136
}
112
- }
113
137
114
- @PublishedApi
115
- internal inline fun streamingResponse (crossinline handler : suspend (ByteWriteChannel ) -> Unit ) =
116
- object : WriteChannelContent () {
138
+ inline fun streamingResponse (crossinline handler : suspend (ByteWriteChannel ) -> Unit ) = object : WriteChannelContent () {
117
139
override suspend fun writeTo (channel : ByteWriteChannel ) {
118
140
try {
119
141
handler(channel)
120
142
} catch (e: Exception ) {
121
- Log .warn(" Exception occurred on streaming: " + e.message)
143
+ log .warn(" Exception occurred on streaming: " + e.message)
122
144
123
145
channel.writeStringUtf8(e.toTrailer())
124
146
}
@@ -128,9 +150,9 @@ internal inline fun streamingResponse(crossinline handler: suspend (ByteWriteCha
128
150
" Lambda-Runtime-Function-Error-Type: Runtime.StreamError\r\n Lambda-Runtime-Function-Error-Body: ${stackTraceToString().encodeBase64()} \r\n "
129
151
}
130
152
131
- @PublishedApi
132
- internal inline fun < T , R > T. bufferedResponse ( context : Context , block : T .() -> R ): R = try {
133
- block()
134
- } catch (e : Exception ) {
135
- throw e.asHandlerError(context)
153
+ inline fun < T , R > T. bufferedResponse ( context : Context , block : T .() -> R ): R = try {
154
+ block()
155
+ } catch (e : Exception ) {
156
+ throw e.asHandlerError(context)
157
+ }
136
158
}
0 commit comments