Skip to content

Commit 9c40bf9

Browse files
authored
[FSSDK-8501] fix: gracefully close client to dispatch queued events (#376)
* wait for dispatching events on close * update tests * use context * add unit test * update tests * refactor code * refactor code * fix linter * update test * update license header
1 parent 7ffed83 commit 9c40bf9

File tree

4 files changed

+82
-9
lines changed

4 files changed

+82
-9
lines changed

pkg/event/dispatcher.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/****************************************************************************
2-
* Copyright 2019, Optimizely, Inc. and contributors *
2+
* Copyright 2019,2023 Optimizely, Inc. and contributors *
33
* *
44
* Licensed under the Apache License, Version 2.0 (the "License"); *
55
* you may not use this file except in compliance with the License. *
@@ -18,6 +18,7 @@
1818
package event
1919

2020
import (
21+
"context"
2122
"fmt"
2223
"net/http"
2324
"time"
@@ -100,9 +101,26 @@ func (ed *QueueEventDispatcher) DispatchEvent(event LogEvent) (bool, error) {
100101
return true, nil
101102
}
102103

104+
// waitForDispatchingEventsOnClose will wait until all the event are dispatched
105+
func (ed *QueueEventDispatcher) waitForDispatchingEventsOnClose(timeout time.Duration) {
106+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
107+
defer cancel()
108+
109+
for {
110+
select {
111+
case <-ctx.Done():
112+
return
113+
default:
114+
if ed.eventQueue.Size() == 0 {
115+
return
116+
}
117+
time.Sleep(CloseEventDispatchWaitTime)
118+
}
119+
}
120+
}
121+
103122
// flush the events
104123
func (ed *QueueEventDispatcher) flushEvents() {
105-
106124
// Limit flushing to a single worker
107125
if !ed.processing.TryAcquire(1) {
108126
return

pkg/event/dispatcher_test.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/****************************************************************************
2-
* Copyright 2019-2020,2022 Optimizely, Inc. and contributors *
2+
* Copyright 2019-2020,2022-2023 Optimizely, Inc. and contributors *
33
* *
44
* Licensed under the Apache License, Version 2.0 (the "License"); *
55
* you may not use this file except in compliance with the License. *
@@ -197,3 +197,40 @@ func TestQueueEventDispatcher_FailDispath(t *testing.T) {
197197
assert.Equal(t, float64(1), metricsRegistry.GetGauge(metrics.DispatcherQueueSize).(*MetricsGauge).Get())
198198
assert.Equal(t, float64(0), metricsRegistry.GetCounter(metrics.DispatcherSuccessFlush).(*MetricsCounter).Get())
199199
}
200+
201+
func TestQueueEventDispatcher_WaitForDispatchingEventsOnClose(t *testing.T) {
202+
metricsRegistry := NewMetricsRegistry()
203+
204+
q := NewQueueEventDispatcher("", metricsRegistry)
205+
206+
assert.True(t, q.Dispatcher != nil)
207+
if d, ok := q.Dispatcher.(*httpEventDispatcher); ok {
208+
assert.True(t, d.requester != nil && d.logger != nil)
209+
} else {
210+
assert.True(t, false)
211+
}
212+
sender := &MockDispatcher{Events: NewInMemoryQueue(100), eventsQueue: NewInMemoryQueue(100)}
213+
q.Dispatcher = sender
214+
215+
eventTags := map[string]interface{}{"revenue": 55.0, "value": 25.1}
216+
config := TestConfig{}
217+
218+
for i := 0; i < 10; i++ {
219+
conversionUserEvent := CreateConversionUserEvent(config, entities.Event{ExperimentIds: []string{"15402980349"}, ID: "15368860886", Key: "sample_conversion"}, userContext, eventTags)
220+
221+
batch := createBatchEvent(conversionUserEvent, createVisitorFromUserEvent(conversionUserEvent))
222+
assert.Equal(t, conversionUserEvent.Timestamp, batch.Visitors[0].Snapshots[0].Events[0].Timestamp)
223+
224+
logEvent := createLogEvent(batch, DefaultEventEndPoint)
225+
226+
success, _ := q.DispatchEvent(logEvent)
227+
228+
assert.True(t, success)
229+
}
230+
231+
// wait for the events to be dispatched
232+
q.waitForDispatchingEventsOnClose(10 * time.Second)
233+
234+
// check the queue
235+
assert.Equal(t, 0, q.eventQueue.Size())
236+
}

pkg/event/processor.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ const DefaultEventQueueSize = 2000
6565
// DefaultEventFlushInterval holds the default value for the event flush interval
6666
const DefaultEventFlushInterval = 30 * time.Second
6767

68+
// CloseEventDispatchWaitTime holds the checking interval for the dispatching events on client close
69+
const CloseEventDispatchWaitTime = 500 * time.Millisecond
70+
71+
// CloseEventDispatchTimeout holds the timeout value for the waiting for the dispatching events on client close
72+
const CloseEventDispatchTimeout = 30 * time.Second
73+
6874
// DefaultEventEndPoint is used as the default endpoint for sending events.
6975
const DefaultEventEndPoint = "https://logx.optimizely.com/v1/events"
7076

@@ -245,6 +251,7 @@ func (p *BatchEventProcessor) startTicker(ctx context.Context) {
245251
d, ok := p.EventDispatcher.(*QueueEventDispatcher)
246252
if ok {
247253
d.flushEvents()
254+
d.waitForDispatchingEventsOnClose(CloseEventDispatchTimeout)
248255
}
249256
p.Ticker.Stop()
250257
return

pkg/event/processor_test.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/****************************************************************************
2-
* Copyright 2019-2020, Optimizely, Inc. and contributors *
2+
* Copyright 2019-2020,2023 Optimizely, Inc. and contributors *
33
* *
44
* Licensed under the Apache License, Version 2.0 (the "License"); *
55
* you may not use this file except in compliance with the License. *
@@ -43,8 +43,9 @@ func (c *CountingDispatcher) DispatchEvent(event LogEvent) (bool, error) {
4343
}
4444

4545
type MockDispatcher struct {
46-
ShouldFail bool
47-
Events Queue
46+
ShouldFail bool
47+
Events Queue
48+
eventsQueue Queue // dispatch events from this queue
4849
}
4950

5051
func (m *MockDispatcher) DispatchEvent(event LogEvent) (bool, error) {
@@ -53,11 +54,22 @@ func (m *MockDispatcher) DispatchEvent(event LogEvent) (bool, error) {
5354
}
5455

5556
m.Events.Add(event)
57+
if m.eventsQueue != nil {
58+
m.eventsQueue.Add(event)
59+
go m.flushEvents()
60+
}
5661
return true, nil
5762
}
5863

64+
func (m *MockDispatcher) flushEvents() {
65+
queueSize := m.eventsQueue.Size()
66+
for ; queueSize > 0; queueSize = m.eventsQueue.Size() {
67+
m.eventsQueue.Remove(1)
68+
}
69+
}
70+
5971
func NewMockDispatcher(queueSize int, shouldFail bool) *MockDispatcher {
60-
return &MockDispatcher{Events: NewInMemoryQueue(queueSize), ShouldFail: shouldFail}
72+
return &MockDispatcher{Events: NewInMemoryQueue(queueSize), eventsQueue: NewInMemoryQueue(queueSize), ShouldFail: shouldFail}
6173
}
6274

6375
func newExecutionContext() *utils.ExecGroup {
@@ -180,7 +192,6 @@ func TestDefaultEventProcessor_BatchSizes(t *testing.T) {
180192
assert.Equal(t, 50, len(logEvent.Event.Visitors))
181193
logEvent, _ = evs[1].(LogEvent)
182194
assert.Equal(t, 50, len(logEvent.Event.Visitors))
183-
184195
}
185196
eg.TerminateAndWait()
186197
}
@@ -493,7 +504,7 @@ func (l *NoOpLogger) SetLogLevel(level logging.LogLevel) {
493504

494505
}
495506

496-
/**
507+
/*
497508
goos: darwin
498509
goarch: amd64
499510
pkg: github.com/optimizely/go-sdk/pkg/event

0 commit comments

Comments
 (0)