diff --git a/pkg/beholder/chip_ingress_emitter.go b/pkg/beholder/chip_ingress_emitter.go index 0ca3e991a..41bc27ebc 100644 --- a/pkg/beholder/chip_ingress_emitter.go +++ b/pkg/beholder/chip_ingress_emitter.go @@ -27,7 +27,9 @@ func (c *ChipIngressEmitter) Emit(ctx context.Context, body []byte, attrKVs ...a return err } - event, err := chipingress.NewEvent(sourceDomain, entityType, body) + attributes := ExtractAttributes(attrKVs...) + + event, err := chipingress.NewEvent(sourceDomain, entityType, body, attributes) if err != nil { return err } @@ -73,3 +75,14 @@ func ExtractSourceAndType(attrKVs ...any) (string, string, error) { return sourceDomain, entityType, nil } + +func ExtractAttributes(attrKVs ...any) map[string]any { + attributes := newAttributes(attrKVs...) + + attributesMap := make(map[string]any) + for key, value := range attributes { + attributesMap[key] = value + } + + return attributesMap +} diff --git a/pkg/beholder/chip_ingress_emitter_test.go b/pkg/beholder/chip_ingress_emitter_test.go index 9798b6fff..9c572d493 100644 --- a/pkg/beholder/chip_ingress_emitter_test.go +++ b/pkg/beholder/chip_ingress_emitter_test.go @@ -2,6 +2,7 @@ package beholder_test import ( "testing" + "time" "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks" @@ -30,6 +31,12 @@ func TestChipIngressEmit(t *testing.T) { body := []byte("test body") domain := "test-domain" entity := "test-entity" + attributes := map[string]any{ + "datacontenttype": "application/protobuf", + "dataschema": "/schemas/ids/1001", + "subject": "example-subject", + "time": time.Now(), + } t.Run("happy path", func(t *testing.T) { @@ -42,7 +49,7 @@ func TestChipIngressEmit(t *testing.T) { emitter, err := beholder.NewChipIngressEmitter(clientMock) require.NoError(t, err) - err = emitter.Emit(t.Context(), body, beholder.AttrKeyDomain, domain, beholder.AttrKeyEntity, entity) + err = emitter.Emit(t.Context(), body, beholder.AttrKeyDomain, domain, beholder.AttrKeyEntity, entity, attributes) require.NoError(t, err) clientMock.AssertExpectations(t) @@ -217,3 +224,45 @@ func TestExtractSourceAndType(t *testing.T) { }) } } + +func TestExtractAttributes(t *testing.T) { + now := time.Now() + tests := []struct { + name string + attrs []any + wantAttributes map[string]any + wantErr bool + expectedError string + }{ + { + name: "valid attributes with specific keys", + attrs: []any{map[string]any{ + "datacontenttype": "application/protobuf", + "dataschema": "/schemas/ids/1001", + "subject": "example-subject", + "time": now, + }}, + wantAttributes: map[string]any{ + "datacontenttype": "application/protobuf", + "dataschema": "/schemas/ids/1001", + "subject": "example-subject", + "time": now, + }, + wantErr: false, + }, + { + name: "happy path - empty attributes", + attrs: []any{}, + wantAttributes: map[string]any{}, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotAttributes := beholder.ExtractAttributes(tt.attrs...) + + assert.Equal(t, tt.wantAttributes, gotAttributes) + }) + } +} diff --git a/pkg/chipingress/client.go b/pkg/chipingress/client.go index 7236c27fd..da7bee74b 100644 --- a/pkg/chipingress/client.go +++ b/pkg/chipingress/client.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "fmt" "strings" + t "time" "github.com/google/uuid" "go.uber.org/zap" @@ -248,13 +249,37 @@ func validateEvents(events []ce.Event) error { return nil } -func NewEvent(domain, entity string, payload []byte) (ce.Event, error) { +func NewEvent(domain, entity string, payload []byte, attributes map[string]any) (ce.Event, error) { event := ce.NewEvent() event.SetSource(domain) event.SetType(entity) event.SetID(uuid.New().String()) + now := t.Now().UTC() + event.SetExtension("recordedtime", ce.Timestamp{Time: now}) + + // Set optional attributes if provided + if attributes == nil { + attributes = make(map[string]any) + } + + if val, ok := attributes["time"].(t.Time); ok { + event.SetTime(val.UTC()) + } else { + event.SetTime(now) + } + + if val, ok := attributes["datacontenttype"].(string); ok { + event.SetDataContentType(val) + } + if val, ok := attributes["dataschema"].(string); ok { + event.SetDataSchema(val) + } + if val, ok := attributes["subject"].(string); ok { + event.SetSubject(val) + } + err := event.SetData(ceformat.ContentTypeProtobuf, payload) if err != nil { return ce.Event{}, fmt.Errorf("could not set data on event: %w", err) diff --git a/pkg/chipingress/client_test.go b/pkg/chipingress/client_test.go index 5639a32a7..8a609ec00 100644 --- a/pkg/chipingress/client_test.go +++ b/pkg/chipingress/client_test.go @@ -3,6 +3,7 @@ package chipingress import ( "context" "testing" + "time" ce "github.com/cloudevents/sdk-go/v2" "github.com/stretchr/testify/assert" @@ -67,7 +68,7 @@ func TestClient(t *testing.T) { testProto := pb.PingResponse{Message: "testing"} protoBytes, err := proto.Marshal(&testProto) require.NoError(t, err) - event, err := NewEvent("some-domain_here", "platform.on_chain.forwarder.ReportProcessed", protoBytes) + event, err := NewEvent("some-domain_here", "platform.on_chain.forwarder.ReportProcessed", protoBytes, nil) require.NoError(t, err) // Publish event @@ -160,8 +161,14 @@ func TestNewEvent(t *testing.T) { // Create new event testProto := pb.PingResponse{Message: "testing"} protoBytes, err := proto.Marshal(&testProto) + attributes := map[string]any{ + "datacontenttype": "application/protobuf", + "dataschema": "https://example.com/schema", + "subject": "example-subject", + "time": time.Now(), + } assert.NoError(t, err) - event, err := NewEvent("some-domain_here", "platform.on_chain.forwarder.ReportProcessed", protoBytes) + event, err := NewEvent("some-domain_here", "platform.on_chain.forwarder.ReportProcessed", protoBytes, attributes) assert.NoError(t, err) // There should be no validation errors @@ -172,6 +179,11 @@ func TestNewEvent(t *testing.T) { assert.Equal(t, "some-domain_here", event.Source()) assert.Equal(t, "platform.on_chain.forwarder.ReportProcessed", event.Type()) assert.NotEmpty(t, event.ID()) + assert.Equal(t, "application/protobuf", event.DataContentType()) + assert.Equal(t, "https://example.com/schema", event.DataSchema()) + assert.Equal(t, "example-subject", event.Subject()) + assert.Equal(t, attributes["time"].(time.Time).UTC(), event.Time()) + assert.NotEmpty(t, event.Extensions()["recordedtime"]) // Assert the event data was set as expected var resultProto pb.PingResponse @@ -220,13 +232,13 @@ func TestPublishBatch(t *testing.T) { testProto1 := pb.PingResponse{Message: "testing1"} protoBytes1, err := proto.Marshal(&testProto1) require.NoError(t, err) - event1, err := NewEvent("domain1", "entity.event1", protoBytes1) + event1, err := NewEvent("domain1", "entity.event1", protoBytes1, nil) require.NoError(t, err) testProto2 := pb.PingResponse{Message: "testing2"} protoBytes2, err := proto.Marshal(&testProto2) require.NoError(t, err) - event2, err := NewEvent("domain2", "entity.event2", protoBytes2) + event2, err := NewEvent("domain2", "entity.event2", protoBytes2, nil) require.NoError(t, err) events := []ce.Event{event1, event2} @@ -270,7 +282,7 @@ func TestPublishBatch(t *testing.T) { testProto := pb.PingResponse{Message: "testing"} protoBytes, err := proto.Marshal(&testProto) require.NoError(t, err) - event, err := NewEvent("domain", "entity.event", protoBytes) + event, err := NewEvent("domain", "entity.event", protoBytes, nil) require.NoError(t, err) events := []ce.Event{event}