Skip to content

Commit 1705607

Browse files
committed
Makes URLNotifier more configurable
* Add a new constructor that accepts 'URLNotifierParams' directly * Add 'DropWhenFull' as parameter * Add 'DeqeuedAt' for better monitoring at hooked backends * Fixes tag name being off by 1 * Sets DequeuedAt
1 parent 1dd95af commit 1705607

File tree

4 files changed

+86
-40
lines changed

4 files changed

+86
-40
lines changed

protobufs/livekit_webhook.proto

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ message WebhookEvent {
4949
// timestamp in seconds
5050
int64 created_at = 7;
5151

52+
int64 dequeued_at = 12;
53+
5254
int32 num_dropped = 11;
5355

54-
// NEXT_ID: 12
56+
// NEXT_ID: 13
5557
}

webhook/notifier.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ package webhook
1616

1717
import (
1818
"context"
19+
"github.com/livekit/protocol/logger"
1920
"sync"
2021

2122
"github.com/livekit/protocol/livekit"
22-
"github.com/livekit/protocol/logger"
2323
)
2424

2525
type QueuedNotifier interface {
@@ -34,16 +34,30 @@ func NewDefaultNotifier(apiKey, apiSecret string, urls []string) QueuedNotifier
3434
n := &DefaultNotifier{}
3535
for _, url := range urls {
3636
u := NewURLNotifier(URLNotifierParams{
37-
URL: url,
38-
Logger: logger.GetLogger().WithComponent("webhook"),
39-
APIKey: apiKey,
40-
APISecret: apiSecret,
37+
URL: url,
38+
Logger: logger.GetLogger().WithComponent("webhook"),
39+
APIKey: apiKey,
40+
APISecret: apiSecret,
41+
DropWhenFull: true,
4142
})
4243
n.urlNotifiers = append(n.urlNotifiers, u)
4344
}
4445
return n
4546
}
4647

48+
func NewDefaultNotifierByParams(params []URLNotifierParams) QueuedNotifier {
49+
n := &DefaultNotifier{}
50+
for _, p := range params {
51+
if p.Logger == nil {
52+
p.Logger = logger.GetLogger().WithComponent("webhook")
53+
}
54+
55+
u := NewURLNotifier(p)
56+
n.urlNotifiers = append(n.urlNotifiers, u)
57+
}
58+
return n
59+
}
60+
4761
func (n *DefaultNotifier) Stop(force bool) {
4862
wg := sync.WaitGroup{}
4963
for _, u := range n.urlNotifiers {

webhook/url_notifier.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,12 @@ import (
3232
)
3333

3434
type URLNotifierParams struct {
35-
Logger logger.Logger
36-
QueueSize int
37-
URL string
38-
APIKey string
39-
APISecret string
35+
Logger logger.Logger
36+
QueueSize int
37+
DropWhenFull bool
38+
URL string
39+
APIKey string
40+
APISecret string
4041
}
4142

4243
const defaultQueueSize = 100
@@ -66,7 +67,7 @@ func NewURLNotifier(params URLNotifierParams) *URLNotifier {
6667
n.client.Logger = &logAdapter{}
6768
n.worker = core.NewQueueWorker(core.QueueWorkerParams{
6869
QueueSize: params.QueueSize,
69-
DropWhenFull: true,
70+
DropWhenFull: params.DropWhenFull,
7071
OnDropped: func() { n.dropped.Inc() },
7172
})
7273
return n
@@ -102,6 +103,7 @@ func (n *URLNotifier) Stop(force bool) {
102103
func (n *URLNotifier) send(event *livekit.WebhookEvent) error {
103104
// set dropped count
104105
event.NumDropped = n.dropped.Swap(0)
106+
event.DequeuedAt = time.Now().Unix()
105107
encoded, err := protojson.Marshal(event)
106108
if err != nil {
107109
return err

webhook/webhook_test.go

Lines changed: 56 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -79,28 +79,55 @@ func TestURLNotifierDropped(t *testing.T) {
7979
require.NoError(t, s.Start())
8080
defer s.Stop()
8181

82-
urlNotifier := newTestNotifier()
83-
defer urlNotifier.Stop(true)
84-
totalDropped := atomic.Int32{}
85-
totalReceived := atomic.Int32{}
86-
s.handler = func(r *http.Request) {
87-
decodedEvent, err := ReceiveWebhookEvent(r, authProvider)
88-
require.NoError(t, err)
89-
totalReceived.Inc()
90-
totalDropped.Add(decodedEvent.NumDropped)
91-
}
92-
// send multiple notifications
93-
for i := 0; i < 10; i++ {
94-
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted})
95-
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventParticipantJoined})
96-
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished})
97-
}
82+
t.Run("DropWhenFull = true", func(t *testing.T) {
83+
urlNotifier := newTestNotifier(true)
84+
defer urlNotifier.Stop(true)
85+
totalDropped := atomic.Int32{}
86+
totalReceived := atomic.Int32{}
87+
s.handler = func(r *http.Request) {
88+
decodedEvent, err := ReceiveWebhookEvent(r, authProvider)
89+
require.NoError(t, err)
90+
totalReceived.Inc()
91+
totalDropped.Add(decodedEvent.NumDropped)
92+
}
93+
// send multiple notifications
94+
for i := 0; i < 10; i++ {
95+
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted})
96+
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventParticipantJoined})
97+
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished})
98+
}
99+
100+
time.Sleep(webhookCheckInterval)
98101

99-
time.Sleep(webhookCheckInterval)
102+
require.Equal(t, int32(30), totalDropped.Load()+totalReceived.Load())
103+
// at least one request dropped
104+
require.Less(t, int32(0), totalDropped.Load())
105+
})
100106

101-
require.Equal(t, int32(30), totalDropped.Load()+totalReceived.Load())
102-
// at least one request dropped
103-
require.Less(t, int32(0), totalDropped.Load())
107+
t.Run("DropWhenFull = false", func(t *testing.T) {
108+
urlNotifier := newTestNotifier(false)
109+
defer urlNotifier.Stop(true)
110+
totalDropped := atomic.Int32{}
111+
totalReceived := atomic.Int32{}
112+
s.handler = func(r *http.Request) {
113+
decodedEvent, err := ReceiveWebhookEvent(r, authProvider)
114+
require.NoError(t, err)
115+
totalReceived.Inc()
116+
totalDropped.Add(decodedEvent.NumDropped)
117+
}
118+
// send multiple notifications
119+
for i := 0; i < 10; i++ {
120+
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted})
121+
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventParticipantJoined})
122+
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished})
123+
}
124+
125+
time.Sleep(webhookCheckInterval)
126+
127+
require.Equal(t, int32(30), totalDropped.Load()+totalReceived.Load())
128+
// at least one request dropped
129+
require.Equal(t, int32(0), totalDropped.Load())
130+
})
104131
}
105132

106133
func TestURLNotifierLifecycle(t *testing.T) {
@@ -109,12 +136,12 @@ func TestURLNotifierLifecycle(t *testing.T) {
109136
defer s.Stop()
110137

111138
t.Run("start/stop without use", func(t *testing.T) {
112-
urlNotifier := newTestNotifier()
139+
urlNotifier := newTestNotifier(true)
113140
urlNotifier.Stop(false)
114141
})
115142

116143
t.Run("stop allowing to drain", func(t *testing.T) {
117-
urlNotifier := newTestNotifier()
144+
urlNotifier := newTestNotifier(true)
118145
numCalled := atomic.Int32{}
119146
s.handler = func(r *http.Request) {
120147
numCalled.Inc()
@@ -128,7 +155,7 @@ func TestURLNotifierLifecycle(t *testing.T) {
128155
})
129156

130157
t.Run("force stop", func(t *testing.T) {
131-
urlNotifier := newTestNotifier()
158+
urlNotifier := newTestNotifier(true)
132159
numCalled := atomic.Int32{}
133160
s.handler = func(r *http.Request) {
134161
numCalled.Inc()
@@ -143,12 +170,13 @@ func TestURLNotifierLifecycle(t *testing.T) {
143170
})
144171
}
145172

146-
func newTestNotifier() *URLNotifier {
173+
func newTestNotifier(dropWhenFull bool) *URLNotifier {
147174
return NewURLNotifier(URLNotifierParams{
148-
QueueSize: 20,
149-
URL: testUrl,
150-
APIKey: apiKey,
151-
APISecret: apiSecret,
175+
QueueSize: 20,
176+
URL: testUrl,
177+
APIKey: apiKey,
178+
APISecret: apiSecret,
179+
DropWhenFull: dropWhenFull,
152180
})
153181
}
154182

0 commit comments

Comments
 (0)