Skip to content
This repository was archived by the owner on Dec 11, 2023. It is now read-only.

Commit ff2388c

Browse files
committed
Converter interface added
1 parent 3783d6b commit ff2388c

File tree

10 files changed

+253
-1153
lines changed

10 files changed

+253
-1153
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ GOLINT ?= golangci-lint run
1818
GOTOOL ?= go tool
1919
GOTEST ?= gotestsum --junitfile $(TEST_OUTPUT_DIR)/$(PACKAGE)-unit-tests.xml --format pkgname-and-test-fails --
2020

21-
GOPKGS = ./pkg/events/...
21+
GOPKGS = ./pkg/...
2222
LDFLAGS = -extldflags=-static -w -s
2323

2424
HAS_GOTESTSUM := $(shell command -v gotestsum;)

go.mod

-4
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,6 @@ module github.com/triggermesh/aws-custom-runtime
33
go 1.15
44

55
require (
6-
github.com/aws/aws-lambda-go v1.13.3
7-
github.com/cloudevents/sdk-go v1.2.0 // indirect
8-
github.com/cloudevents/sdk-go/v2 v2.3.1
96
github.com/google/uuid v1.2.0
107
github.com/kelseyhightower/envconfig v1.4.0
11-
knative.dev/eventing v0.22.1 // indirect
128
)

go.sum

-1,107
Large diffs are not rendered by default.

main.go

+37-25
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
1-
// Copyright 2018 TriggerMesh, Inc
2-
//
3-
// Licensed under the Apache License, Version 2.0 (the "License");
4-
// you may not use this file except in compliance with the License.
5-
// You may obtain a copy of the License at
6-
//
7-
// http://www.apache.org/licenses/LICENSE-2.0
8-
//
9-
// Unless required by applicable law or agreed to in writing, software
10-
// distributed under the License is distributed on an "AS IS" BASIS,
11-
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12-
// See the License for the specific language governing permissions and
13-
// limitations under the License.
1+
/*
2+
Copyright 2021 Triggermesh Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
1416

1517
package main
1618

@@ -27,6 +29,8 @@ import (
2729
"time"
2830

2931
"github.com/kelseyhightower/envconfig"
32+
33+
"github.com/triggermesh/aws-custom-runtime/pkg/converter"
3034
"github.com/triggermesh/aws-custom-runtime/pkg/sender"
3135
)
3236

@@ -69,7 +73,8 @@ type Specification struct {
6973
}
7074

7175
type Handler struct {
72-
*sender.Handler
76+
Sender *sender.Sender
77+
Converter converter.Converter
7378

7479
requestSizeLimit int64
7580
functionTTL int64
@@ -139,17 +144,17 @@ func (h *Handler) newTask(w http.ResponseWriter, r *http.Request) {
139144
select {
140145
case <-time.After(time.Duration(functionTTLInNanoSeconds)):
141146
log.Printf("-> ! %s Deadline is reached\n", task.id)
142-
// w.WriteHeader(http.StatusGone)
143-
// w.Write([]byte(fmt.Sprintf("Deadline is reached, data %s", task.data)))
144147
resp := []byte(fmt.Sprintf("Deadline is reached, data %s", task.data))
145-
if err := h.Send(resp, http.StatusGone, w); err != nil {
148+
if err := h.Sender.Send(resp, http.StatusGone, w); err != nil {
146149
log.Printf("! %s %v\n", task.id, err)
147150
}
148151
case result := <-resultsChannel:
149152
log.Printf("-> %s %d %s\n", result.id, result.statusCode, result.data)
150-
// w.WriteHeader(result.statusCode)
151-
// w.Write(result.data)
152-
if err := h.Send(result.data, result.statusCode, w); err != nil {
153+
body, err := h.Converter.Convert(result.data)
154+
if err != nil {
155+
log.Printf("! %s %v\n", result.id, err)
156+
}
157+
if err := h.Sender.Send(body, result.statusCode, w); err != nil {
153158
log.Printf("! %s %v\n", result.id, err)
154159
}
155160
}
@@ -275,9 +280,16 @@ func main() {
275280
log.Fatalf("Cannot setup runime env: %v", err)
276281
}
277282

278-
// create sender
279-
sender := Handler{
280-
Handler: sender.New(spec.Sink, "text/plain"),
283+
// create converter
284+
conv, err := converter.New(spec.ResponseFormat)
285+
if err != nil {
286+
log.Fatalf("Cannot create converter: %v", err)
287+
}
288+
289+
// setup sender
290+
handler := Handler{
291+
Sender: sender.New(spec.Sink, conv.ContentType()),
292+
Converter: conv,
281293
requestSizeLimit: spec.RequestSizeLimit,
282294
functionTTL: spec.FunctionTTL,
283295
}
@@ -311,10 +323,10 @@ func main() {
311323

312324
// start external API
313325
taskRouter := http.NewServeMux()
314-
taskHandler := http.HandlerFunc(sender.newTask)
326+
taskHandler := http.HandlerFunc(handler.newTask)
315327
taskRouter.Handle("/", taskHandler)
316328
log.Println("Listening...")
317-
err := http.ListenAndServe(":"+spec.ExternalAPIport, taskRouter)
329+
err = http.ListenAndServe(":"+spec.ExternalAPIport, taskRouter)
318330
if err != nil && err != http.ErrServerClosed {
319331
log.Fatalf("Runtime external API error: %v", err)
320332
}

main_test.go

+18-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"fmt"
66
"io/ioutil"
7+
"log"
78
"net/http"
89
"net/http/httptest"
910
"os"
@@ -14,6 +15,8 @@ import (
1415
"time"
1516

1617
"github.com/kelseyhightower/envconfig"
18+
"github.com/triggermesh/aws-custom-runtime/pkg/converter"
19+
"github.com/triggermesh/aws-custom-runtime/pkg/sender"
1720
)
1821

1922
func TestSetupEnv(t *testing.T) {
@@ -23,7 +26,7 @@ func TestSetupEnv(t *testing.T) {
2326
t.Fatal(err)
2427
}
2528

26-
if err := s.setupEnv(); err != nil {
29+
if err := setupEnv(s.InternalAPIport); err != nil {
2730
t.Errorf("Setup Env error %s\n", err)
2831
}
2932

@@ -41,21 +44,33 @@ func TestNewTask(t *testing.T) {
4144
t.Fatal(err)
4245
}
4346

47+
conv, err := converter.New(s.ResponseFormat)
48+
if err != nil {
49+
log.Fatalf("Cannot create converter: %v", err)
50+
}
51+
52+
handler := Handler{
53+
Sender: sender.New(s.Sink, conv.ContentType()),
54+
Converter: conv,
55+
requestSizeLimit: s.RequestSizeLimit,
56+
functionTTL: s.FunctionTTL,
57+
}
58+
4459
payload := []byte(`{"payload": "test"}`)
4560

4661
tasks = make(chan message, 100)
4762
results = make(map[string]chan message)
4863
defer close(tasks)
4964

5065
recorder := httptest.NewRecorder()
51-
handler := http.HandlerFunc(s.newTask)
66+
h := http.HandlerFunc(handler.newTask)
5267

5368
req, err := http.NewRequest("POST", "/", bytes.NewBuffer(payload))
5469
if err != nil {
5570
t.Fatal(err)
5671
}
5772

58-
go handler.ServeHTTP(recorder, req)
73+
go h.ServeHTTP(recorder, req)
5974
task := <-tasks
6075
results[task.id] <- task
6176
time.Sleep(time.Millisecond * 100)
+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
Copyright 2021 Triggermesh Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package apigateway
18+
19+
// TODO: move logic from
20+
// https://github.com/triggermesh/aws-custom-runtime/blob/b3ec30de10c29b761fc1d89e9dd828d554c1ce0f/pkg/events/apigateway/mapper.go
+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
Copyright 2021 Triggermesh Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cloudevents
18+
19+
import (
20+
"encoding/json"
21+
"fmt"
22+
"time"
23+
24+
"github.com/google/uuid"
25+
"github.com/kelseyhightower/envconfig"
26+
)
27+
28+
type ceBody struct {
29+
ID string `json:"id"`
30+
Type string `json:"type"`
31+
Time string `json:"time"`
32+
Source string `json:"source"`
33+
Specversion string `json:"specversion"`
34+
Data string `json:"data"`
35+
}
36+
37+
const contentType = "application/cloudevents+json"
38+
39+
// CloudEvent is a data structure required to map KLR responses to cloudevents
40+
type CloudEvent struct {
41+
EventType string `envconfig:"type" default:"ce.klr.triggermesh.io"`
42+
Source string `envconfig:"source" default:"knative-lambda-runtime"`
43+
Subject string `envconfig:"subject" default:"klr-response"`
44+
}
45+
46+
func New() (*CloudEvent, error) {
47+
var ce CloudEvent
48+
if err := envconfig.Process("ce", &ce); err != nil {
49+
return nil, fmt.Errorf("Cannot process CloudEvent env variables: %v", err)
50+
}
51+
return &ce, nil
52+
}
53+
54+
func (ce *CloudEvent) Convert(data []byte) ([]byte, error) {
55+
b := ceBody{
56+
ID: uuid.NewString(),
57+
Type: ce.EventType,
58+
Time: time.Now().Format(time.RFC3339),
59+
Source: ce.Source,
60+
Specversion: "1.0",
61+
Data: string(data),
62+
}
63+
return json.Marshal(b)
64+
}
65+
66+
func (ce *CloudEvent) ContentType() string {
67+
return contentType
68+
}

pkg/converter/converter.go

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
Copyright 2021 Triggermesh Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package converter
18+
19+
import (
20+
"github.com/triggermesh/aws-custom-runtime/pkg/converter/cloudevents"
21+
"github.com/triggermesh/aws-custom-runtime/pkg/converter/plain"
22+
)
23+
24+
type Converter interface {
25+
Convert([]byte) ([]byte, error)
26+
ContentType() string
27+
}
28+
29+
func New(format string) (Converter, error) {
30+
switch format {
31+
case "API_GATEWAY":
32+
case "CLOUDEVENTS":
33+
return cloudevents.New()
34+
}
35+
return plain.New()
36+
}

pkg/converter/plain/cloudevents.go

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
Copyright 2021 Triggermesh Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package plain
18+
19+
type Plain struct{}
20+
21+
const contentType = "plain/text"
22+
23+
func New() (*Plain, error) {
24+
return &Plain{}, nil
25+
}
26+
27+
func (p *Plain) Convert(data []byte) ([]byte, error) {
28+
return data, nil
29+
}
30+
31+
func (p *Plain) ContentType() string {
32+
return contentType
33+
}

0 commit comments

Comments
 (0)