Skip to content

Commit 506e656

Browse files
authored
adjust innerEvent (#37)
Signed-off-by: laminar <[email protected]>
1 parent 515f7b3 commit 506e656

File tree

4 files changed

+43
-59
lines changed

4 files changed

+43
-59
lines changed

context/context.go

+15-7
Original file line numberDiff line numberDiff line change
@@ -339,8 +339,7 @@ func (ctx *FunctionContext) Send(outputName string, data []byte) ([]byte, error)
339339
var err error
340340
var output *Output
341341
var response *dapr.BindingEvent
342-
var payload interface{}
343-
var payloadBytes []byte
342+
var payload []byte
344343

345344
if v, ok := ctx.Outputs[outputName]; ok {
346345
output = v
@@ -349,14 +348,12 @@ func (ctx *FunctionContext) Send(outputName string, data []byte) ([]byte, error)
349348
}
350349

351350
payload = data
352-
payloadBytes = data
353351

354352
if traceable(output.ComponentType) {
355353
ie := NewInnerEvent(ctx)
356354
ie.MergeMetadata(ctx.GetInnerEvent())
357355
ie.SetUserData(data)
358-
payload = ie.GetCloudEvent()
359-
payloadBytes = ie.GetCloudEventJSON()
356+
payload = ie.GetCloudEventJSON()
360357
}
361358

362359
switch output.GetType() {
@@ -366,7 +363,7 @@ func (ctx *FunctionContext) Send(outputName string, data []byte) ([]byte, error)
366363
in := &dapr.InvokeBindingRequest{
367364
Name: output.ComponentName,
368365
Operation: output.Operation,
369-
Data: payloadBytes,
366+
Data: payload,
370367
Metadata: output.Metadata,
371368
}
372369
response, err = ctx.daprClient.InvokeBinding(context.Background(), in)
@@ -484,7 +481,7 @@ func (ctx *FunctionContext) SetEvent(inputName string, event interface{}) {
484481
ctx.setEvent(inputName, be, nil, nil, ie)
485482
case *common.TopicEvent:
486483
te := event.(*common.TopicEvent)
487-
ie := convertEvent(ctx, inputName, te.Data)
484+
ie := convertEvent(ctx, inputName, ConvertUserDataToBytes(te.Data))
488485
ctx.setEvent(inputName, nil, te, nil, ie)
489486
case *cloudevents.Event:
490487
ce := event.(*cloudevents.Event)
@@ -803,3 +800,14 @@ func getBuildingBlockType(componentType string) (ResourceType, error) {
803800
}
804801
return "", errors.New("invalid component type")
805802
}
803+
804+
func ConvertUserDataToBytes(data interface{}) []byte {
805+
if d, ok := data.([]byte); ok {
806+
return d
807+
}
808+
if d, err := json.Marshal(data); err != nil {
809+
return nil
810+
} else {
811+
return d
812+
}
813+
}

context/innerevent.go

+17-11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package context
22

33
import (
4+
"encoding/base64"
45
"encoding/json"
56
"fmt"
67
"sync"
@@ -24,7 +25,7 @@ type InnerEvent interface {
2425
SetUserData(data interface{})
2526

2627
// GetUserData returns the userData in innerEventData.
27-
GetUserData() interface{}
28+
GetUserData() []byte
2829

2930
// GetCloudEvent returns the cloudevent object in innerEvent.
3031
GetCloudEvent() cloudevents.Event
@@ -50,7 +51,7 @@ type innerEvent struct {
5051

5152
type innerEventData struct {
5253
Metadata map[string]string `json:"metadata,omitempty"`
53-
UserData interface{} `json:"userData,omitempty"`
54+
UserData []byte `json:"userData,omitempty"`
5455
}
5556

5657
func NewInnerEvent(ctx RuntimeContext) InnerEvent {
@@ -79,12 +80,13 @@ func (inner *innerEvent) GetMetadata() map[string]string {
7980
}
8081

8182
func (inner *innerEvent) SetUserData(data interface{}) {
83+
rawData := ConvertUserDataToBytes(data)
8284
inner.mu.Lock()
8385
defer func() {
8486
inner.save()
8587
inner.mu.Unlock()
8688
}()
87-
inner.data.UserData = data
89+
inner.data.UserData = rawData
8890
}
8991

9092
func (inner *innerEvent) SetSubject(s string) {
@@ -93,7 +95,7 @@ func (inner *innerEvent) SetSubject(s string) {
9395
inner.cloudevent.SetSubject(s)
9496
}
9597

96-
func (inner *innerEvent) GetUserData() interface{} {
98+
func (inner *innerEvent) GetUserData() []byte {
9799
return inner.data.UserData
98100
}
99101

@@ -152,26 +154,33 @@ func (inner *innerEvent) Clone(event *cloudevents.Event) {
152154
inner.mu.Unlock()
153155
}()
154156

157+
var ud []byte
155158
inner.cloudevent = event
156159

157160
d := &innerEventData{}
158161
if event.Data() != nil {
159162
if err := event.DataAs(d); err == nil {
160163
inner.data.Metadata = d.Metadata
161-
inner.data.UserData = d.UserData
164+
ud = d.UserData
162165
} else {
163-
inner.data.UserData = event.Data()
166+
ud = event.Data()
164167
}
168+
if event.DataBase64 {
169+
if rawUserData, err := base64.StdEncoding.DecodeString(string(ud)); err == nil {
170+
inner.data.UserData = rawUserData
171+
return
172+
}
173+
}
174+
inner.data.UserData = ud
165175
}
166176
}
167177

168178
func (inner *innerEvent) save() {
169179
if inner.cloudevent == nil || (inner.data != nil && len(inner.data.Metadata) > 0 && inner.data.UserData == nil) {
170-
fmt.Println(inner.data.UserData)
171180
return
172181
}
173182

174-
if err := inner.cloudevent.SetData(cloudevents.ApplicationJSON, *inner.data); err != nil {
183+
if err := inner.cloudevent.SetData(cloudevents.ApplicationJSON, ConvertUserDataToBytes(*inner.data)); err != nil {
175184
klog.Errorf("failed to set cloudevent data: %v\n", err)
176185
}
177186
}
@@ -190,9 +199,6 @@ func convertEvent(ctx RuntimeContext, inputName string, data interface{}) InnerE
190199
inner.Clone(ce)
191200
return inner
192201
}
193-
case cloudevents.Event:
194-
inner.Clone(&data)
195-
return inner
196202
default:
197203
inner.SetSubject(inputName)
198204
inner.SetUserData(data)

context/innerevent_test.go

+10-28
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func TestInnerEvent(t *testing.T) {
102102
eventTest(t, ctx, te1, byteData)
103103

104104
te2 := &common.TopicEvent{
105-
Data: ie.GetCloudEvent(),
105+
Data: ie.GetCloudEventJSON(),
106106
}
107107
eventTest(t, ctx, te2, byteData)
108108

@@ -126,11 +126,10 @@ func TestInnerEvent(t *testing.T) {
126126
}
127127

128128
func eventTest(t *testing.T, ctx RuntimeContext, event interface{}, target []byte) {
129-
130129
// receive test
131130
ctx.SetEvent("cron", event)
132131
ie := ctx.GetInnerEvent()
133-
if !bytes.Equal(convertToByte(ie.GetUserData()), target) {
132+
if !bytes.Equal(ie.GetUserData(), target) {
134133
t.Fatal("Error get user data in innerEvent")
135134
}
136135
ie.SetMetadata("k1", "v1")
@@ -156,17 +155,13 @@ func eventTest(t *testing.T, ctx RuntimeContext, event interface{}, target []byt
156155

157156
udInEvent := map[string]string{}
158157
if ie2.GetUserData() != nil {
159-
ud := ie2.GetUserData()
160-
switch ud := ud.(type) {
161-
case []byte:
162-
if err := json.Unmarshal(ud, &udInEvent); err != nil {
163-
t.Fatal("Error unmarshal user data in inner event")
164-
}
165-
if v, exist := udInEvent["foo2"]; exist && v == "bar2" {
166-
167-
} else {
168-
t.Fatal("Error set inner event userdata")
169-
}
158+
if err := json.Unmarshal(ie2.GetUserData(), &udInEvent); err != nil {
159+
t.Fatal("Error unmarshal user data in inner event")
160+
}
161+
if v, exist := udInEvent["foo2"]; exist && v == "bar2" {
162+
163+
} else {
164+
t.Fatal("Error set inner event userdata")
170165
}
171166
} else {
172167
t.Fatal("Error set inner event userdata")
@@ -184,8 +179,7 @@ func eventTest(t *testing.T, ctx RuntimeContext, event interface{}, target []byt
184179
}
185180

186181
ud := map[string]string{}
187-
udByte, _ := json.Marshal(ieData.UserData)
188-
if err := json.Unmarshal(udByte, &ud); err != nil {
182+
if err := json.Unmarshal(ieData.UserData, &ud); err != nil {
189183
t.Fatal("Error unmarshal user data in inner event")
190184
}
191185

@@ -194,16 +188,4 @@ func eventTest(t *testing.T, ctx RuntimeContext, event interface{}, target []byt
194188
} else {
195189
t.Fatal("Error save inner event userdata")
196190
}
197-
198-
}
199-
200-
func convertToByte(data interface{}) []byte {
201-
if d, ok := data.([]byte); ok {
202-
return d
203-
}
204-
if d, err := json.Marshal(data); err != nil {
205-
return nil
206-
} else {
207-
return d
208-
}
209191
}

runtime/runtime.go

+1-13
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package runtime
22

33
import (
44
"context"
5-
"encoding/json"
65
"io/ioutil"
76
"net/http"
87

@@ -129,7 +128,7 @@ func (rm *RuntimeManager) FunctionRunWrapperWithHooks(fn interface{}) {
129128
userData := rm.FuncContext.GetInnerEvent().GetUserData()
130129

131130
// pass user data to user function
132-
out, err := function(functionContext, convertUserDataToBytes(userData))
131+
out, err := function(functionContext, userData)
133132

134133
rm.FuncContext.WithOut(out.GetOut())
135134
rm.FuncContext.WithError(err)
@@ -152,14 +151,3 @@ func (rm *RuntimeManager) FunctionRunWrapperWithHooks(fn interface{}) {
152151

153152
rm.ProcessPostHooks()
154153
}
155-
156-
func convertUserDataToBytes(data interface{}) []byte {
157-
if d, ok := data.([]byte); ok {
158-
return d
159-
}
160-
if d, err := json.Marshal(data); err != nil {
161-
return nil
162-
} else {
163-
return d
164-
}
165-
}

0 commit comments

Comments
 (0)