diff --git a/livekit/livekit_webhook.pb.go b/livekit/livekit_webhook.pb.go index b4f0f0f42..eb2202f98 100644 --- a/livekit/livekit_webhook.pb.go +++ b/livekit/livekit_webhook.pb.go @@ -56,6 +56,7 @@ type WebhookEvent struct { Id string `protobuf:"bytes,6,opt,name=id,proto3" json:"id,omitempty"` // timestamp in seconds CreatedAt int64 `protobuf:"varint,7,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` + DequeuedAt int64 `protobuf:"varint,12,opt,name=dequeued_at,json=dequeuedAt,proto3" json:"dequeued_at,omitempty"` NumDropped int32 `protobuf:"varint,11,opt,name=num_dropped,json=numDropped,proto3" json:"num_dropped,omitempty"` } @@ -147,6 +148,13 @@ func (x *WebhookEvent) GetCreatedAt() int64 { return 0 } +func (x *WebhookEvent) GetDequeuedAt() int64 { + if x != nil { + return x.DequeuedAt + } + return 0 +} + func (x *WebhookEvent) GetNumDropped() int32 { if x != nil { return x.NumDropped @@ -154,6 +162,69 @@ func (x *WebhookEvent) GetNumDropped() int32 { return 0 } +type BatchedWebhookEvents struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Events []*WebhookEvent `protobuf:"bytes,1,rep,name=events,proto3" json:"events,omitempty"` + NumDropped int32 `protobuf:"varint,2,opt,name=num_dropped,json=numDropped,proto3" json:"num_dropped,omitempty"` + DequeuedAt int64 `protobuf:"varint,3,opt,name=dequeued_at,json=dequeuedAt,proto3" json:"dequeued_at,omitempty"` +} + +func (x *BatchedWebhookEvents) Reset() { + *x = BatchedWebhookEvents{} + if protoimpl.UnsafeEnabled { + mi := &file_livekit_webhook_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BatchedWebhookEvents) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BatchedWebhookEvents) ProtoMessage() {} + +func (x *BatchedWebhookEvents) ProtoReflect() protoreflect.Message { + mi := &file_livekit_webhook_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BatchedWebhookEvents.ProtoReflect.Descriptor instead. +func (*BatchedWebhookEvents) Descriptor() ([]byte, []int) { + return file_livekit_webhook_proto_rawDescGZIP(), []int{1} +} + +func (x *BatchedWebhookEvents) GetEvents() []*WebhookEvent { + if x != nil { + return x.Events + } + return nil +} + +func (x *BatchedWebhookEvents) GetNumDropped() int32 { + if x != nil { + return x.NumDropped + } + return 0 +} + +func (x *BatchedWebhookEvents) GetDequeuedAt() int64 { + if x != nil { + return x.DequeuedAt + } + return 0 +} + var File_livekit_webhook_proto protoreflect.FileDescriptor var file_livekit_webhook_proto_rawDesc = []byte{ @@ -163,7 +234,7 @@ var file_livekit_webhook_proto_rawDesc = []byte{ 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x14, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x15, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x69, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x22, 0xec, 0x02, 0x0a, 0x0c, 0x57, 0x65, 0x62, 0x68, 0x6f, 0x6f, 0x6b, 0x45, + 0x6f, 0x74, 0x6f, 0x22, 0x8d, 0x03, 0x0a, 0x0c, 0x57, 0x65, 0x62, 0x68, 0x6f, 0x6f, 0x6b, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x21, 0x0a, 0x04, 0x72, 0x6f, 0x6f, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, @@ -184,14 +255,24 @@ var file_livekit_webhook_proto_rawDesc = []byte{ 0x63, 0x6b, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, - 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x6e, 0x75, 0x6d, 0x5f, 0x64, 0x72, 0x6f, 0x70, 0x70, 0x65, 0x64, - 0x18, 0x0b, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x6e, 0x75, 0x6d, 0x44, 0x72, 0x6f, 0x70, 0x70, - 0x65, 0x64, 0x42, 0x46, 0x5a, 0x23, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, - 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, - 0x6c, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0xaa, 0x02, 0x0d, 0x4c, 0x69, 0x76, 0x65, - 0x4b, 0x69, 0x74, 0x2e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0xea, 0x02, 0x0e, 0x4c, 0x69, 0x76, 0x65, - 0x4b, 0x69, 0x74, 0x3a, 0x3a, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x64, 0x65, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x5f, 0x61, 0x74, + 0x18, 0x0c, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x64, 0x65, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, + 0x41, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x6e, 0x75, 0x6d, 0x5f, 0x64, 0x72, 0x6f, 0x70, 0x70, 0x65, + 0x64, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x6e, 0x75, 0x6d, 0x44, 0x72, 0x6f, 0x70, + 0x70, 0x65, 0x64, 0x22, 0x87, 0x01, 0x0a, 0x14, 0x42, 0x61, 0x74, 0x63, 0x68, 0x65, 0x64, 0x57, + 0x65, 0x62, 0x68, 0x6f, 0x6f, 0x6b, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x2d, 0x0a, 0x06, + 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6c, + 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x57, 0x65, 0x62, 0x68, 0x6f, 0x6f, 0x6b, 0x45, 0x76, + 0x65, 0x6e, 0x74, 0x52, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x6e, + 0x75, 0x6d, 0x5f, 0x64, 0x72, 0x6f, 0x70, 0x70, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, + 0x52, 0x0a, 0x6e, 0x75, 0x6d, 0x44, 0x72, 0x6f, 0x70, 0x70, 0x65, 0x64, 0x12, 0x1f, 0x0a, 0x0b, + 0x64, 0x65, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x0a, 0x64, 0x65, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x41, 0x74, 0x42, 0x46, 0x5a, + 0x23, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, + 0x6b, 0x69, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2f, 0x6c, 0x69, 0x76, + 0x65, 0x6b, 0x69, 0x74, 0xaa, 0x02, 0x0d, 0x4c, 0x69, 0x76, 0x65, 0x4b, 0x69, 0x74, 0x2e, 0x50, + 0x72, 0x6f, 0x74, 0x6f, 0xea, 0x02, 0x0e, 0x4c, 0x69, 0x76, 0x65, 0x4b, 0x69, 0x74, 0x3a, 0x3a, + 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -206,26 +287,28 @@ func file_livekit_webhook_proto_rawDescGZIP() []byte { return file_livekit_webhook_proto_rawDescData } -var file_livekit_webhook_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_livekit_webhook_proto_msgTypes = make([]protoimpl.MessageInfo, 2) var file_livekit_webhook_proto_goTypes = []any{ - (*WebhookEvent)(nil), // 0: livekit.WebhookEvent - (*Room)(nil), // 1: livekit.Room - (*ParticipantInfo)(nil), // 2: livekit.ParticipantInfo - (*EgressInfo)(nil), // 3: livekit.EgressInfo - (*IngressInfo)(nil), // 4: livekit.IngressInfo - (*TrackInfo)(nil), // 5: livekit.TrackInfo + (*WebhookEvent)(nil), // 0: livekit.WebhookEvent + (*BatchedWebhookEvents)(nil), // 1: livekit.BatchedWebhookEvents + (*Room)(nil), // 2: livekit.Room + (*ParticipantInfo)(nil), // 3: livekit.ParticipantInfo + (*EgressInfo)(nil), // 4: livekit.EgressInfo + (*IngressInfo)(nil), // 5: livekit.IngressInfo + (*TrackInfo)(nil), // 6: livekit.TrackInfo } var file_livekit_webhook_proto_depIdxs = []int32{ - 1, // 0: livekit.WebhookEvent.room:type_name -> livekit.Room - 2, // 1: livekit.WebhookEvent.participant:type_name -> livekit.ParticipantInfo - 3, // 2: livekit.WebhookEvent.egress_info:type_name -> livekit.EgressInfo - 4, // 3: livekit.WebhookEvent.ingress_info:type_name -> livekit.IngressInfo - 5, // 4: livekit.WebhookEvent.track:type_name -> livekit.TrackInfo - 5, // [5:5] is the sub-list for method output_type - 5, // [5:5] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 2, // 0: livekit.WebhookEvent.room:type_name -> livekit.Room + 3, // 1: livekit.WebhookEvent.participant:type_name -> livekit.ParticipantInfo + 4, // 2: livekit.WebhookEvent.egress_info:type_name -> livekit.EgressInfo + 5, // 3: livekit.WebhookEvent.ingress_info:type_name -> livekit.IngressInfo + 6, // 4: livekit.WebhookEvent.track:type_name -> livekit.TrackInfo + 0, // 5: livekit.BatchedWebhookEvents.events:type_name -> livekit.WebhookEvent + 6, // [6:6] is the sub-list for method output_type + 6, // [6:6] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name } func init() { file_livekit_webhook_proto_init() } @@ -249,6 +332,18 @@ func file_livekit_webhook_proto_init() { return nil } } + file_livekit_webhook_proto_msgTypes[1].Exporter = func(v any, i int) any { + switch v := v.(*BatchedWebhookEvents); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -256,7 +351,7 @@ func file_livekit_webhook_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_livekit_webhook_proto_rawDesc, NumEnums: 0, - NumMessages: 1, + NumMessages: 2, NumExtensions: 0, NumServices: 0, }, diff --git a/protobufs/livekit_webhook.proto b/protobufs/livekit_webhook.proto index e13ef45ed..473d6a9dc 100644 --- a/protobufs/livekit_webhook.proto +++ b/protobufs/livekit_webhook.proto @@ -49,7 +49,17 @@ message WebhookEvent { // timestamp in seconds int64 created_at = 7; + int64 dequeued_at = 12; + int32 num_dropped = 11; - // NEXT_ID: 12 + // NEXT_ID: 13 +} + +message BatchedWebhookEvents { + repeated WebhookEvent events = 1; + + int32 num_dropped = 2; + + int64 dequeued_at = 3; } diff --git a/webhook/batch_url_notifier.go b/webhook/batch_url_notifier.go new file mode 100644 index 000000000..5c1f593c5 --- /dev/null +++ b/webhook/batch_url_notifier.go @@ -0,0 +1,155 @@ +package webhook + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/base64" + "github.com/hashicorp/go-retryablehttp" + "github.com/livekit/protocol/auth" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + "google.golang.org/protobuf/encoding/protojson" + "sync" + "time" +) + +const ( + DefaultBatchSendInterval = 100 * time.Millisecond + DefaultMaxBatchSize = 10000 +) + +type BatchURLNotifierParams struct { + Logger logger.Logger + URL string + Interval time.Duration + MaxSize int + APIKey string + APISecret string +} + +type BatchURLNotifier struct { + cancelFunc context.CancelFunc + client *retryablehttp.Client + mu sync.RWMutex + params BatchURLNotifierParams + batch []*livekit.WebhookEvent + dropped int // it's operated inside a mutex scope so no need for atomic type +} + +func NewBatchURLNotifier(ctx context.Context, params BatchURLNotifierParams) URLNotifier { + if params.Interval == 0 { + params.Interval = DefaultBatchSendInterval + } + if params.MaxSize == 0 { + params.MaxSize = DefaultMaxBatchSize + } + + ctx, cancel := context.WithCancel(ctx) + notifier := &BatchURLNotifier{ + cancelFunc: cancel, + params: params, + client: retryablehttp.NewClient(), + } + + go notifier.runner(ctx) + + return notifier +} + +func (b *BatchURLNotifier) runner(ctx context.Context) { + ticker := time.NewTicker(b.params.Interval) + for { + select { + case <-ticker.C: + b.mu.Lock() + b.sendBatch() + b.mu.Unlock() + case <-ctx.Done(): + return + } + } +} + +func (b *BatchURLNotifier) sendBatch() { + if len(b.batch) == 0 { + return + } + raw := &livekit.BatchedWebhookEvents{ + Events: b.batch, + NumDropped: int32(b.dropped), + DequeuedAt: time.Now().Unix(), + } + defer func() { + b.batch = nil + }() + b.dropped = 0 + + encoded, err := protojson.Marshal(raw) + if err != nil { + b.params.Logger.Warnw("Failed to marshal event", err) + b.dropped += len(b.batch) + return + } + + // sign payload + sum := sha256.Sum256(encoded) + b64 := base64.StdEncoding.EncodeToString(sum[:]) + at := auth.NewAccessToken(b.params.APIKey, b.params.APISecret). + SetValidFor(5 * time.Minute). + SetSha256(b64) + token, err := at.ToJWT() + if err != nil { + b.params.Logger.Warnw("Failed to generate jwt token", err) + b.dropped += len(b.batch) + return + } + + req, err := retryablehttp.NewRequest("POST", b.params.URL, bytes.NewReader(encoded)) + if err != nil { + b.params.Logger.Warnw("Failed to create http req", err) + b.dropped += len(b.batch) + return + } + + req.Header.Set(authHeader, token) + req.Header.Set("batched", "true") + req.Header.Set("content-type", "application/webhook+json") + resp, err := b.client.Do(req) + if err != nil { + b.params.Logger.Errorw("Failed to send request", err) + b.dropped += len(b.batch) + return + } + _ = resp.Body.Close() + + return +} + +func (b *BatchURLNotifier) SetKeys(apiKey, apiSecret string) { + b.mu.Lock() + defer b.mu.Unlock() + b.params.APIKey = apiKey + b.params.APISecret = apiSecret +} + +func (b *BatchURLNotifier) QueueNotify(event *livekit.WebhookEvent) error { + b.mu.Lock() + defer b.mu.Unlock() + b.batch = append(b.batch, event) + + if len(b.batch) >= b.params.MaxSize { + b.sendBatch() + } + + return nil +} + +func (b *BatchURLNotifier) Stop(force bool) { + b.cancelFunc() + if !force { + b.mu.Lock() + b.sendBatch() + b.mu.Unlock() + } +} diff --git a/webhook/notifier.go b/webhook/notifier.go index da1a91de4..5598fc17b 100644 --- a/webhook/notifier.go +++ b/webhook/notifier.go @@ -16,26 +16,53 @@ package webhook import ( "context" + "github.com/livekit/protocol/logger" "sync" "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/logger" ) type QueuedNotifier interface { QueueNotify(ctx context.Context, event *livekit.WebhookEvent) error + Stop(force bool) +} + +type NotifierParams struct { + ApiKey string + ApiSecret string + Urls []string + IncludeEvents []string // when IncludeEvents is not empty ExcludeEvents will be ignored + ExcludeEvents []string // needs IncludeEvents to be empty, otherwise it won't take effect + Batched bool // when Batched is true, DropWhenFull is ignored + DropWhenFull bool // only works when Batched is disabled +} + +func NewNotifier(ctx context.Context, params NotifierParams) QueuedNotifier { + if len(params.IncludeEvents) > 0 { + params.ExcludeEvents = nil + } + if params.Batched { + return NewBatchedNotifier(ctx, params.ApiKey, params.ApiSecret, params.Urls, params.IncludeEvents, params.ExcludeEvents) + } else { + return NewDefaultNotifier(params.ApiKey, params.ApiSecret, params.Urls, params.DropWhenFull, params.IncludeEvents, params.ExcludeEvents) + } } type DefaultNotifier struct { - urlNotifiers []*URLNotifier + urlNotifiers []URLNotifier + includedEvents []string + excludedEvents []string } -func NewDefaultNotifier(apiKey, apiSecret string, urls []string) QueuedNotifier { - n := &DefaultNotifier{} +func NewBatchedNotifier(ctx context.Context, apiKey, apiSecret string, urls []string, includedEvents []string, excludedEvents []string) QueuedNotifier { + n := &DefaultNotifier{ + includedEvents: includedEvents, + excludedEvents: excludedEvents, + } for _, url := range urls { - u := NewURLNotifier(URLNotifierParams{ - URL: url, + u := NewBatchURLNotifier(ctx, BatchURLNotifierParams{ Logger: logger.GetLogger().WithComponent("webhook"), + URL: url, APIKey: apiKey, APISecret: apiSecret, }) @@ -44,11 +71,29 @@ func NewDefaultNotifier(apiKey, apiSecret string, urls []string) QueuedNotifier return n } +func NewDefaultNotifier(apiKey, apiSecret string, urls []string, dropWhenFull bool, includedEvents []string, excludedEvents []string) QueuedNotifier { + n := &DefaultNotifier{ + includedEvents: includedEvents, + excludedEvents: excludedEvents, + } + for _, url := range urls { + u := NewDefaultURLNotifier(URLNotifierParams{ + URL: url, + Logger: logger.GetLogger().WithComponent("webhook"), + APIKey: apiKey, + APISecret: apiSecret, + DropWhenFull: dropWhenFull, + }) + n.urlNotifiers = append(n.urlNotifiers, u) + } + return n +} + func (n *DefaultNotifier) Stop(force bool) { wg := sync.WaitGroup{} for _, u := range n.urlNotifiers { wg.Add(1) - go func(u *URLNotifier) { + go func(u URLNotifier) { defer wg.Done() u.Stop(force) }(u) @@ -57,6 +102,24 @@ func (n *DefaultNotifier) Stop(force bool) { } func (n *DefaultNotifier) QueueNotify(_ context.Context, event *livekit.WebhookEvent) error { + for _, ev := range n.includedEvents { + if event.Event == ev { + return n.queueNotify(event) + } + } + if len(n.includedEvents) > 0 { + return nil + } + + for _, ev := range n.excludedEvents { + if event.Event == ev { + return nil + } + } + return n.queueNotify(event) +} + +func (n *DefaultNotifier) queueNotify(event *livekit.WebhookEvent) error { for _, u := range n.urlNotifiers { if err := u.QueueNotify(event); err != nil { return err diff --git a/webhook/url_notifier.go b/webhook/url_notifier.go index ad956f8db..b600b706b 100644 --- a/webhook/url_notifier.go +++ b/webhook/url_notifier.go @@ -31,19 +31,26 @@ import ( "github.com/livekit/protocol/logger" ) +type URLNotifier interface { + SetKeys(apiKey, apiSecret string) + QueueNotify(event *livekit.WebhookEvent) error + Stop(force bool) +} + type URLNotifierParams struct { - Logger logger.Logger - QueueSize int - URL string - APIKey string - APISecret string + Logger logger.Logger + QueueSize int + DropWhenFull bool + URL string + APIKey string + APISecret string } const defaultQueueSize = 100 -// URLNotifier is a QueuedNotifier that sends a POST request to a Webhook URL. +// DefaultURLNotifier is a QueuedNotifier that sends a POST request to a Webhook URL. // It will retry on failure, and will drop events if notification fall too far behind -type URLNotifier struct { +type DefaultURLNotifier struct { mu sync.RWMutex params URLNotifierParams client *retryablehttp.Client @@ -51,7 +58,7 @@ type URLNotifier struct { worker core.QueueWorker } -func NewURLNotifier(params URLNotifierParams) *URLNotifier { +func NewDefaultURLNotifier(params URLNotifierParams) URLNotifier { if params.QueueSize == 0 { params.QueueSize = defaultQueueSize } @@ -59,27 +66,27 @@ func NewURLNotifier(params URLNotifierParams) *URLNotifier { params.Logger = logger.GetLogger() } - n := &URLNotifier{ + n := &DefaultURLNotifier{ params: params, client: retryablehttp.NewClient(), } n.client.Logger = &logAdapter{} n.worker = core.NewQueueWorker(core.QueueWorkerParams{ QueueSize: params.QueueSize, - DropWhenFull: true, + DropWhenFull: params.DropWhenFull, OnDropped: func() { n.dropped.Inc() }, }) return n } -func (n *URLNotifier) SetKeys(apiKey, apiSecret string) { +func (n *DefaultURLNotifier) SetKeys(apiKey, apiSecret string) { n.mu.Lock() defer n.mu.Unlock() n.params.APIKey = apiKey n.params.APISecret = apiSecret } -func (n *URLNotifier) QueueNotify(event *livekit.WebhookEvent) error { +func (n *DefaultURLNotifier) QueueNotify(event *livekit.WebhookEvent) error { n.worker.Submit(func() { if err := n.send(event); err != nil { n.params.Logger.Warnw("failed to send webhook", err, "url", n.params.URL, "event", event.Event) @@ -91,7 +98,7 @@ func (n *URLNotifier) QueueNotify(event *livekit.WebhookEvent) error { return nil } -func (n *URLNotifier) Stop(force bool) { +func (n *DefaultURLNotifier) Stop(force bool) { if force { n.worker.Kill() } else { @@ -99,9 +106,10 @@ func (n *URLNotifier) Stop(force bool) { } } -func (n *URLNotifier) send(event *livekit.WebhookEvent) error { +func (n *DefaultURLNotifier) send(event *livekit.WebhookEvent) error { // set dropped count event.NumDropped = n.dropped.Swap(0) + event.DequeuedAt = time.Now().Unix() encoded, err := protojson.Marshal(event) if err != nil { return err diff --git a/webhook/verifier.go b/webhook/verifier.go index 32964b8ec..f0b193e9c 100644 --- a/webhook/verifier.go +++ b/webhook/verifier.go @@ -82,3 +82,21 @@ func ReceiveWebhookEvent(r *http.Request, provider auth.KeyProvider) (*livekit.W } return &event, nil } + +func ReceiveWebhookEventBatched(r *http.Request, provider auth.KeyProvider) ([]*livekit.WebhookEvent, error) { + data, err := Receive(r, provider) + if err != nil { + return nil, err + } + unmarshalOpts := protojson.UnmarshalOptions{ + DiscardUnknown: true, + AllowPartial: true, + } + + events := livekit.BatchedWebhookEvents{} + if err = unmarshalOpts.Unmarshal(data, &events); err != nil { + return nil, err + } + + return events.Events, nil +} diff --git a/webhook/webhook_test.go b/webhook/webhook_test.go index 1ec111c2e..281fd984f 100644 --- a/webhook/webhook_test.go +++ b/webhook/webhook_test.go @@ -41,12 +41,61 @@ var authProvider = auth.NewSimpleKeyProvider( apiKey, apiSecret, ) +var sampleEvents = []*livekit.WebhookEvent{ + { + Event: EventTrackPublished, + Participant: &livekit.ParticipantInfo{ + Identity: "test", + }, + Track: &livekit.TrackInfo{ + Sid: "TR_abcde", + }, + }, + { + Event: EventParticipantJoined, + Participant: &livekit.ParticipantInfo{ + Identity: "test", + }, + Track: &livekit.TrackInfo{ + Sid: "TR_abcde", + }, + }, + { + Event: EventParticipantJoined, + Participant: &livekit.ParticipantInfo{ + Identity: "test", + }, + Track: &livekit.TrackInfo{ + Sid: "TR_abcde", + }, + }, + { + Event: EventRoomFinished, + Participant: &livekit.ParticipantInfo{ + Identity: "test", + }, + Track: &livekit.TrackInfo{ + Sid: "TR_abcde", + }, + }, + { + Event: EventRoomStarted, + Participant: &livekit.ParticipantInfo{ + Identity: "test", + }, + Track: &livekit.TrackInfo{ + Sid: "TR_abcde", + }, + }, +} + func TestWebHook(t *testing.T) { s := newServer(testAddr) require.NoError(t, s.Start()) defer s.Stop() - notifier := NewDefaultNotifier(apiKey, apiSecret, []string{testUrl}) + notifier := NewDefaultNotifier(apiKey, apiSecret, []string{testUrl}, true, nil, nil) + defer notifier.Stop(true) t.Run("test event payload", func(t *testing.T) { event := &livekit.WebhookEvent{ @@ -74,33 +123,214 @@ func TestWebHook(t *testing.T) { } +func TestBatchWebHook(t *testing.T) { + s := newServer(testAddr) + require.NoError(t, s.Start()) + defer s.Stop() + + notifier := NewBatchedNotifier(context.Background(), apiKey, apiSecret, []string{testUrl}, nil, nil) + defer notifier.Stop(true) + + t.Run("test events payload", func(t *testing.T) { + event := &livekit.WebhookEvent{ + Event: EventTrackPublished, + Participant: &livekit.ParticipantInfo{ + Identity: "test", + }, + Track: &livekit.TrackInfo{ + Sid: "TR_abcde", + }, + } + + wg := sync.WaitGroup{} + wg.Add(1) + s.handler = func(r *http.Request) { + defer wg.Done() + decodedEvent, err := ReceiveWebhookEventBatched(r, authProvider) + require.NoError(t, err) + + require.Equal(t, 3, len(decodedEvent)) + for _, ev := range decodedEvent { + require.EqualValues(t, event, ev) + } + } + // send 3 times + require.NoError(t, notifier.QueueNotify(context.Background(), event)) + require.NoError(t, notifier.QueueNotify(context.Background(), event)) + require.NoError(t, notifier.QueueNotify(context.Background(), event)) + wg.Wait() + }) + +} + +func TestWebHookWithIncludeFilter(t *testing.T) { + s := newServer(testAddr) + require.NoError(t, s.Start()) + defer s.Stop() + + notifier := NewDefaultNotifier(apiKey, apiSecret, []string{testUrl}, true, []string{EventParticipantJoined, EventTrackPublished}, nil) + defer notifier.Stop(true) + + t.Run("test event payload", func(t *testing.T) { + wg := sync.WaitGroup{} + wg.Add(3) + counter := 0 + s.handler = func(r *http.Request) { + defer wg.Done() + decodedEvent, err := ReceiveWebhookEvent(r, authProvider) + require.NoError(t, err) + + require.EqualValues(t, sampleEvents[counter], decodedEvent) + counter++ + } + for _, ev := range sampleEvents { + require.NoError(t, notifier.QueueNotify(context.Background(), ev)) + } + wg.Wait() + require.Equal(t, 3, counter) + }) +} + +func TestBatchWebHookWithIncludeFilter(t *testing.T) { + s := newServer(testAddr) + require.NoError(t, s.Start()) + defer s.Stop() + + notifier := NewBatchedNotifier(context.Background(), apiKey, apiSecret, []string{testUrl}, []string{EventParticipantJoined, EventTrackPublished}, nil) + defer notifier.Stop(true) + + t.Run("test events payload", func(t *testing.T) { + wg := sync.WaitGroup{} + wg.Add(1) + s.handler = func(r *http.Request) { + defer wg.Done() + decodedEvent, err := ReceiveWebhookEventBatched(r, authProvider) + require.NoError(t, err) + + require.Equal(t, 3, len(decodedEvent)) + for idx, ev := range decodedEvent { + require.EqualValues(t, sampleEvents[idx], ev) + } + } + for _, ev := range sampleEvents { + require.NoError(t, notifier.QueueNotify(context.Background(), ev)) + } + wg.Wait() + }) + +} + +func TestWebHookWithExcludeFilter(t *testing.T) { + s := newServer(testAddr) + require.NoError(t, s.Start()) + defer s.Stop() + + notifier := NewDefaultNotifier(apiKey, apiSecret, []string{testUrl}, true, nil, []string{EventParticipantJoined, EventTrackPublished}) + defer notifier.Stop(true) + + t.Run("test event payload", func(t *testing.T) { + wg := sync.WaitGroup{} + wg.Add(2) + counter := 0 + s.handler = func(r *http.Request) { + defer wg.Done() + decodedEvent, err := ReceiveWebhookEvent(r, authProvider) + require.NoError(t, err) + + require.EqualValues(t, sampleEvents[counter+3], decodedEvent) + counter++ + } + for _, ev := range sampleEvents { + require.NoError(t, notifier.QueueNotify(context.Background(), ev)) + } + wg.Wait() + require.Equal(t, 2, counter) + }) +} + +func TestBatchWebHookWithExcludeFilter(t *testing.T) { + s := newServer(testAddr) + require.NoError(t, s.Start()) + defer s.Stop() + + notifier := NewBatchedNotifier(context.Background(), apiKey, apiSecret, []string{testUrl}, nil, []string{EventParticipantJoined, EventTrackPublished}) + defer notifier.Stop(true) + + t.Run("test events payload", func(t *testing.T) { + wg := sync.WaitGroup{} + wg.Add(1) + s.handler = func(r *http.Request) { + defer wg.Done() + decodedEvent, err := ReceiveWebhookEventBatched(r, authProvider) + require.NoError(t, err) + + require.Equal(t, 2, len(decodedEvent)) + for idx, ev := range decodedEvent { + require.EqualValues(t, sampleEvents[idx+3], ev) + } + } + for _, ev := range sampleEvents { + require.NoError(t, notifier.QueueNotify(context.Background(), ev)) + } + wg.Wait() + }) + +} + func TestURLNotifierDropped(t *testing.T) { s := newServer(testAddr) require.NoError(t, s.Start()) defer s.Stop() - urlNotifier := newTestNotifier() - defer urlNotifier.Stop(true) - totalDropped := atomic.Int32{} - totalReceived := atomic.Int32{} - s.handler = func(r *http.Request) { - decodedEvent, err := ReceiveWebhookEvent(r, authProvider) - require.NoError(t, err) - totalReceived.Inc() - totalDropped.Add(decodedEvent.NumDropped) - } - // send multiple notifications - for i := 0; i < 10; i++ { - _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted}) - _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventParticipantJoined}) - _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished}) - } + t.Run("DropWhenFull = true", func(t *testing.T) { + urlNotifier := newTestNotifier(true) + defer urlNotifier.Stop(true) + totalDropped := atomic.Int32{} + totalReceived := atomic.Int32{} + s.handler = func(r *http.Request) { + decodedEvent, err := ReceiveWebhookEvent(r, authProvider) + require.NoError(t, err) + totalReceived.Inc() + totalDropped.Add(decodedEvent.NumDropped) + } + // send multiple notifications + for i := 0; i < 10; i++ { + _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted}) + _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventParticipantJoined}) + _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished}) + } - time.Sleep(webhookCheckInterval) + time.Sleep(webhookCheckInterval) + + require.Equal(t, int32(30), totalDropped.Load()+totalReceived.Load()) + // at least one request dropped + require.Less(t, int32(0), totalDropped.Load()) + }) - require.Equal(t, int32(30), totalDropped.Load()+totalReceived.Load()) - // at least one request dropped - require.Less(t, int32(0), totalDropped.Load()) + t.Run("DropWhenFull = false", func(t *testing.T) { + urlNotifier := newTestNotifier(false) + defer urlNotifier.Stop(true) + totalDropped := atomic.Int32{} + totalReceived := atomic.Int32{} + s.handler = func(r *http.Request) { + decodedEvent, err := ReceiveWebhookEvent(r, authProvider) + require.NoError(t, err) + totalReceived.Inc() + totalDropped.Add(decodedEvent.NumDropped) + } + // send multiple notifications + for i := 0; i < 10; i++ { + _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted}) + _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventParticipantJoined}) + _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished}) + } + + time.Sleep(webhookCheckInterval) + + require.Equal(t, int32(30), totalDropped.Load()+totalReceived.Load()) + // at least one request dropped + require.Equal(t, int32(0), totalDropped.Load()) + }) } func TestURLNotifierLifecycle(t *testing.T) { @@ -109,12 +339,12 @@ func TestURLNotifierLifecycle(t *testing.T) { defer s.Stop() t.Run("start/stop without use", func(t *testing.T) { - urlNotifier := newTestNotifier() + urlNotifier := newTestNotifier(true) urlNotifier.Stop(false) }) t.Run("stop allowing to drain", func(t *testing.T) { - urlNotifier := newTestNotifier() + urlNotifier := newTestNotifier(true) numCalled := atomic.Int32{} s.handler = func(r *http.Request) { numCalled.Inc() @@ -128,7 +358,7 @@ func TestURLNotifierLifecycle(t *testing.T) { }) t.Run("force stop", func(t *testing.T) { - urlNotifier := newTestNotifier() + urlNotifier := newTestNotifier(true) numCalled := atomic.Int32{} s.handler = func(r *http.Request) { numCalled.Inc() @@ -143,12 +373,13 @@ func TestURLNotifierLifecycle(t *testing.T) { }) } -func newTestNotifier() *URLNotifier { - return NewURLNotifier(URLNotifierParams{ - QueueSize: 20, - URL: testUrl, - APIKey: apiKey, - APISecret: apiSecret, +func newTestNotifier(dropWhenFull bool) URLNotifier { + return NewDefaultURLNotifier(URLNotifierParams{ + QueueSize: 20, + URL: testUrl, + APIKey: apiKey, + APISecret: apiSecret, + DropWhenFull: dropWhenFull, }) }