Skip to content

Commit 4d2a4e1

Browse files
authored
[FSSDK-11513] limit number of events in the eventStore (#1053)
1 parent 3c63d79 commit 4d2a4e1

File tree

3 files changed

+214
-21
lines changed

3 files changed

+214
-21
lines changed

lib/event_processor/batch_event_processor.spec.ts

+157-2
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,18 @@
1616
import { expect, describe, it, vi, beforeEach, afterEach, MockInstance } from 'vitest';
1717

1818
import { EventWithId, BatchEventProcessor, LOGGER_NAME } from './batch_event_processor';
19-
import { getMockSyncCache } from '../tests/mock/mock_cache';
19+
import { getMockAsyncCache, getMockSyncCache } from '../tests/mock/mock_cache';
2020
import { createImpressionEvent } from '../tests/mock/create_event';
2121
import { ProcessableEvent } from './event_processor';
2222
import { buildLogEvent } from './event_builder/log_event';
23-
import { resolvablePromise } from '../utils/promise/resolvablePromise';
23+
import { ResolvablePromise, resolvablePromise } from '../utils/promise/resolvablePromise';
2424
import { advanceTimersByTime } from '../tests/testUtils';
2525
import { getMockLogger } from '../tests/mock/mock_logger';
2626
import { getMockRepeater } from '../tests/mock/mock_repeater';
2727
import * as retry from '../utils/executor/backoff_retry_runner';
2828
import { ServiceState, StartupLog } from '../service';
2929
import { LogLevel } from '../logging/logger';
30+
import { IdGenerator } from '../utils/id_generator';
3031

3132
const getMockDispatcher = () => {
3233
return {
@@ -366,6 +367,160 @@ describe('BatchEventProcessor', async () => {
366367

367368
expect(events).toEqual(eventsInStore);
368369
});
370+
371+
it('should not store the event in the eventStore but still dispatch if the \
372+
number of pending events is greater than the limit', async () => {
373+
const eventDispatcher = getMockDispatcher();
374+
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
375+
mockDispatch.mockResolvedValue(resolvablePromise().promise);
376+
377+
const eventStore = getMockSyncCache<EventWithId>();
378+
379+
const idGenerator = new IdGenerator();
380+
381+
for (let i = 0; i < 505; i++) {
382+
const event = createImpressionEvent(`id-${i}`);
383+
const cacheId = idGenerator.getId();
384+
await eventStore.set(cacheId, { id: cacheId, event });
385+
}
386+
387+
expect(eventStore.size()).toEqual(505);
388+
389+
const processor = new BatchEventProcessor({
390+
eventDispatcher,
391+
dispatchRepeater: getMockRepeater(),
392+
batchSize: 1,
393+
eventStore,
394+
});
395+
396+
processor.start();
397+
await processor.onRunning();
398+
399+
const events: ProcessableEvent[] = [];
400+
for(let i = 0; i < 2; i++) {
401+
const event = createImpressionEvent(`id-${i}`);
402+
events.push(event);
403+
await processor.process(event)
404+
}
405+
406+
expect(eventStore.size()).toEqual(505);
407+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(507);
408+
expect(eventDispatcher.dispatchEvent.mock.calls[505][0]).toEqual(buildLogEvent([events[0]]));
409+
expect(eventDispatcher.dispatchEvent.mock.calls[506][0]).toEqual(buildLogEvent([events[1]]));
410+
});
411+
412+
it('should store events in the eventStore when the number of events in the store\
413+
becomes lower than the limit', async () => {
414+
const eventDispatcher = getMockDispatcher();
415+
416+
const dispatchResponses: ResolvablePromise<any>[] = [];
417+
418+
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
419+
mockDispatch.mockImplementation((arg) => {
420+
const dispatchResponse = resolvablePromise();
421+
dispatchResponses.push(dispatchResponse);
422+
return dispatchResponse.promise;
423+
});
424+
425+
const eventStore = getMockSyncCache<EventWithId>();
426+
427+
const idGenerator = new IdGenerator();
428+
429+
for (let i = 0; i < 502; i++) {
430+
const event = createImpressionEvent(`id-${i}`);
431+
const cacheId = String(i);
432+
await eventStore.set(cacheId, { id: cacheId, event });
433+
}
434+
435+
expect(eventStore.size()).toEqual(502);
436+
437+
const processor = new BatchEventProcessor({
438+
eventDispatcher,
439+
dispatchRepeater: getMockRepeater(),
440+
batchSize: 1,
441+
eventStore,
442+
});
443+
444+
processor.start();
445+
await processor.onRunning();
446+
447+
let events: ProcessableEvent[] = [];
448+
for(let i = 0; i < 2; i++) {
449+
const event = createImpressionEvent(`id-${i + 502}`);
450+
events.push(event);
451+
await processor.process(event)
452+
}
453+
454+
expect(eventStore.size()).toEqual(502);
455+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(504);
456+
457+
expect(eventDispatcher.dispatchEvent.mock.calls[502][0]).toEqual(buildLogEvent([events[0]]));
458+
expect(eventDispatcher.dispatchEvent.mock.calls[503][0]).toEqual(buildLogEvent([events[1]]));
459+
460+
// resolve the dispatch for events not saved in the store
461+
dispatchResponses[502].resolve({ statusCode: 200 });
462+
dispatchResponses[503].resolve({ statusCode: 200 });
463+
464+
await exhaustMicrotasks();
465+
expect(eventStore.size()).toEqual(502);
466+
467+
// resolve the dispatch for 3 events in store, making the store size 499 which is lower than the limit
468+
dispatchResponses[0].resolve({ statusCode: 200 });
469+
dispatchResponses[1].resolve({ statusCode: 200 });
470+
dispatchResponses[2].resolve({ statusCode: 200 });
471+
472+
await exhaustMicrotasks();
473+
expect(eventStore.size()).toEqual(499);
474+
475+
// process 2 more events
476+
events = [];
477+
for(let i = 0; i < 2; i++) {
478+
const event = createImpressionEvent(`id-${i + 504}`);
479+
events.push(event);
480+
await processor.process(event)
481+
}
482+
483+
expect(eventStore.size()).toEqual(500);
484+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(506);
485+
expect(eventDispatcher.dispatchEvent.mock.calls[504][0]).toEqual(buildLogEvent([events[0]]));
486+
expect(eventDispatcher.dispatchEvent.mock.calls[505][0]).toEqual(buildLogEvent([events[1]]));
487+
});
488+
489+
it('should still dispatch events even if the store save fails', async () => {
490+
const eventDispatcher = getMockDispatcher();
491+
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
492+
mockDispatch.mockResolvedValue({});
493+
494+
const eventStore = getMockAsyncCache<EventWithId>();
495+
// Simulate failure in saving to store
496+
eventStore.set = vi.fn().mockRejectedValue(new Error('Failed to save'));
497+
498+
const dispatchRepeater = getMockRepeater();
499+
500+
const processor = new BatchEventProcessor({
501+
eventDispatcher,
502+
dispatchRepeater,
503+
batchSize: 100,
504+
eventStore,
505+
});
506+
507+
processor.start();
508+
await processor.onRunning();
509+
510+
const events: ProcessableEvent[] = [];
511+
for(let i = 0; i < 10; i++) {
512+
const event = createImpressionEvent(`id-${i}`);
513+
events.push(event);
514+
await processor.process(event)
515+
}
516+
517+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(0);
518+
519+
await dispatchRepeater.execute(0);
520+
521+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
522+
expect(eventDispatcher.dispatchEvent.mock.calls[0][0]).toEqual(buildLogEvent(events));
523+
});
369524
});
370525

371526
it('should dispatch events when dispatchRepeater is triggered', async () => {

lib/event_processor/batch_event_processor.ts

+56-19
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ import { EventProcessor, ProcessableEvent } from "./event_processor";
1818
import { getBatchedAsync, getBatchedSync, Store } from "../utils/cache/store";
1919
import { EventDispatcher, EventDispatcherResponse, LogEvent } from "./event_dispatcher/event_dispatcher";
2020
import { buildLogEvent } from "./event_builder/log_event";
21-
import { BackoffController, ExponentialBackoff, IntervalRepeater, Repeater } from "../utils/repeater/repeater";
21+
import { BackoffController, ExponentialBackoff, Repeater } from "../utils/repeater/repeater";
2222
import { LoggerFacade } from '../logging/logger';
2323
import { BaseService, ServiceState, StartupLog } from "../service";
24-
import { Consumer, Fn, Producer } from "../utils/type";
24+
import { Consumer, Fn, Maybe, Producer } from "../utils/type";
2525
import { RunResult, runWithRetry } from "../utils/executor/backoff_retry_runner";
2626
import { isSuccessStatusCode } from "../utils/http_request_handler/http_util";
2727
import { EventEmitter } from "../utils/event_emitter/event_emitter";
@@ -31,13 +31,16 @@ import { FAILED_TO_DISPATCH_EVENTS, SERVICE_NOT_RUNNING } from "error_message";
3131
import { OptimizelyError } from "../error/optimizly_error";
3232
import { sprintf } from "../utils/fns";
3333
import { SERVICE_STOPPED_BEFORE_RUNNING } from "../service";
34+
import { EVENT_STORE_FULL } from "../message/log_message";
3435

3536
export const DEFAULT_MIN_BACKOFF = 1000;
3637
export const DEFAULT_MAX_BACKOFF = 32000;
38+
export const MAX_EVENTS_IN_STORE = 500;
3739

3840
export type EventWithId = {
3941
id: string;
4042
event: ProcessableEvent;
43+
notStored?: boolean;
4144
};
4245

4346
export type RetryConfig = {
@@ -59,7 +62,7 @@ export type BatchEventProcessorConfig = {
5962

6063
type EventBatch = {
6164
request: LogEvent,
62-
ids: string[],
65+
events: EventWithId[],
6366
}
6467

6568
export const LOGGER_NAME = 'BatchEventProcessor';
@@ -70,11 +73,13 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
7073
private eventQueue: EventWithId[] = [];
7174
private batchSize: number;
7275
private eventStore?: Store<EventWithId>;
76+
private eventCountInStore: Maybe<number> = undefined;
77+
private maxEventsInStore: number = MAX_EVENTS_IN_STORE;
7378
private dispatchRepeater: Repeater;
7479
private failedEventRepeater?: Repeater;
7580
private idGenerator: IdGenerator = new IdGenerator();
7681
private runningTask: Map<string, RunResult<EventDispatcherResponse>> = new Map();
77-
private dispatchingEventIds: Set<string> = new Set();
82+
private dispatchingEvents: Map<string, EventWithId> = new Map();
7883
private eventEmitter: EventEmitter<{ dispatch: LogEvent }> = new EventEmitter();
7984
private retryConfig?: RetryConfig;
8085

@@ -84,11 +89,13 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
8489
this.closingEventDispatcher = config.closingEventDispatcher;
8590
this.batchSize = config.batchSize;
8691
this.eventStore = config.eventStore;
92+
8793
this.retryConfig = config.retryConfig;
8894

8995
this.dispatchRepeater = config.dispatchRepeater;
9096
this.dispatchRepeater.setTask(() => this.flush());
9197

98+
this.maxEventsInStore = Math.max(2 * config.batchSize, MAX_EVENTS_IN_STORE);
9299
this.failedEventRepeater = config.failedEventRepeater;
93100
this.failedEventRepeater?.setTask(() => this.retryFailedEvents());
94101
if (config.logger) {
@@ -111,7 +118,7 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
111118
}
112119

113120
const keys = (await this.eventStore.getKeys()).filter(
114-
(k) => !this.dispatchingEventIds.has(k) && !this.eventQueue.find((e) => e.id === k)
121+
(k) => !this.dispatchingEvents.has(k) && !this.eventQueue.find((e) => e.id === k)
115122
);
116123

117124
const events = await (this.eventStore.operation === 'sync' ?
@@ -138,7 +145,7 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
138145
(currentBatch.length > 0 && !areEventContextsEqual(currentBatch[0].event, event.event))) {
139146
batches.push({
140147
request: buildLogEvent(currentBatch.map((e) => e.event)),
141-
ids: currentBatch.map((e) => e.id),
148+
events: currentBatch,
142149
});
143150
currentBatch = [];
144151
}
@@ -148,7 +155,7 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
148155
if (currentBatch.length > 0) {
149156
batches.push({
150157
request: buildLogEvent(currentBatch.map((e) => e.event)),
151-
ids: currentBatch.map((e) => e.id),
158+
events: currentBatch,
152159
});
153160
}
154161

@@ -163,15 +170,15 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
163170
}
164171

165172
const events: ProcessableEvent[] = [];
166-
const ids: string[] = [];
173+
const eventWithIds: EventWithId[] = [];
167174

168175
this.eventQueue.forEach((event) => {
169176
events.push(event.event);
170-
ids.push(event.id);
177+
eventWithIds.push(event);
171178
});
172179

173180
this.eventQueue = [];
174-
return { request: buildLogEvent(events), ids };
181+
return { request: buildLogEvent(events), events: eventWithIds };
175182
}
176183

177184
private async executeDispatch(request: LogEvent, closing = false): Promise<EventDispatcherResponse> {
@@ -185,10 +192,10 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
185192
}
186193

187194
private dispatchBatch(batch: EventBatch, closing: boolean): void {
188-
const { request, ids } = batch;
195+
const { request, events } = batch;
189196

190-
ids.forEach((id) => {
191-
this.dispatchingEventIds.add(id);
197+
events.forEach((event) => {
198+
this.dispatchingEvents.set(event.id, event);
192199
});
193200

194201
const runResult: RunResult<EventDispatcherResponse> = this.retryConfig
@@ -205,9 +212,11 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
205212
this.runningTask.set(taskId, runResult);
206213

207214
runResult.result.then((res) => {
208-
ids.forEach((id) => {
209-
this.dispatchingEventIds.delete(id);
210-
this.eventStore?.remove(id);
215+
events.forEach((event) => {
216+
this.eventStore?.remove(event.id);
217+
if (!event.notStored && this.eventCountInStore) {
218+
this.eventCountInStore--;
219+
}
211220
});
212221
return Promise.resolve();
213222
}).catch((err) => {
@@ -216,7 +225,7 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
216225
this.logger?.error(err);
217226
}).finally(() => {
218227
this.runningTask.delete(taskId);
219-
ids.forEach((id) => this.dispatchingEventIds.delete(id));
228+
events.forEach((event) => this.dispatchingEvents.delete(event.id));
220229
});
221230
}
222231

@@ -235,12 +244,12 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
235244
return Promise.reject(new OptimizelyError(SERVICE_NOT_RUNNING, 'BatchEventProcessor'));
236245
}
237246

238-
const eventWithId = {
247+
const eventWithId: EventWithId = {
239248
id: this.idGenerator.getId(),
240249
event: event,
241250
};
242251

243-
await this.eventStore?.set(eventWithId.id, eventWithId);
252+
await this.storeEvent(eventWithId);
244253

245254
if (this.eventQueue.length > 0 && !areEventContextsEqual(this.eventQueue[0].event, event)) {
246255
this.flush();
@@ -253,7 +262,35 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
253262
} else if (!this.dispatchRepeater.isRunning()) {
254263
this.dispatchRepeater.start();
255264
}
265+
}
266+
267+
private async findEventCountInStore(): Promise<void> {
268+
if (this.eventStore && this.eventCountInStore === undefined) {
269+
try {
270+
const keys = await this.eventStore.getKeys();
271+
this.eventCountInStore = keys.length;
272+
} catch (e) {
273+
this.logger?.error(e);
274+
}
275+
}
276+
}
256277

278+
private async storeEvent(eventWithId: EventWithId): Promise<void> {
279+
await this.findEventCountInStore();
280+
if (this.eventCountInStore !== undefined && this.eventCountInStore >= this.maxEventsInStore) {
281+
this.logger?.info(EVENT_STORE_FULL, eventWithId.event.uuid);
282+
eventWithId.notStored = true;
283+
return;
284+
}
285+
286+
await Promise.resolve(this.eventStore?.set(eventWithId.id, eventWithId)).then(() => {
287+
if (this.eventCountInStore !== undefined) {
288+
this.eventCountInStore++;
289+
}
290+
}).catch((e) => {
291+
eventWithId.notStored = true;
292+
this.logger?.error(e);
293+
});
257294
}
258295

259296
start(): void {

lib/message/log_message.ts

+1
Original file line numberDiff line numberDiff line change
@@ -60,5 +60,6 @@ export const USER_HAS_NO_FORCED_VARIATION_FOR_EXPERIMENT =
6060
'No experiment %s mapped to user %s in the forced variation map.';
6161
export const INVALID_EXPERIMENT_KEY_INFO =
6262
'Experiment key %s is not in datafile. It is either invalid, paused, or archived.';
63+
export const EVENT_STORE_FULL = 'Event store is full. Not saving event with id %d.';
6364

6465
export const messages: string[] = [];

0 commit comments

Comments
 (0)