|
16 | 16 | import { expect, describe, it, vi, beforeEach, afterEach, MockInstance } from 'vitest';
|
17 | 17 |
|
18 | 18 | import { EventWithId, BatchEventProcessor, LOGGER_NAME } from './batch_event_processor';
|
19 |
| -import { getMockSyncCache } from '../tests/mock/mock_cache'; |
| 19 | +import { getMockAsyncCache, getMockSyncCache } from '../tests/mock/mock_cache'; |
20 | 20 | import { createImpressionEvent } from '../tests/mock/create_event';
|
21 | 21 | import { ProcessableEvent } from './event_processor';
|
22 | 22 | import { buildLogEvent } from './event_builder/log_event';
|
23 |
| -import { resolvablePromise } from '../utils/promise/resolvablePromise'; |
| 23 | +import { ResolvablePromise, resolvablePromise } from '../utils/promise/resolvablePromise'; |
24 | 24 | import { advanceTimersByTime } from '../tests/testUtils';
|
25 | 25 | import { getMockLogger } from '../tests/mock/mock_logger';
|
26 | 26 | import { getMockRepeater } from '../tests/mock/mock_repeater';
|
27 | 27 | import * as retry from '../utils/executor/backoff_retry_runner';
|
28 | 28 | import { ServiceState, StartupLog } from '../service';
|
29 | 29 | import { LogLevel } from '../logging/logger';
|
| 30 | +import { IdGenerator } from '../utils/id_generator'; |
30 | 31 |
|
31 | 32 | const getMockDispatcher = () => {
|
32 | 33 | return {
|
@@ -366,6 +367,160 @@ describe('BatchEventProcessor', async () => {
|
366 | 367 |
|
367 | 368 | expect(events).toEqual(eventsInStore);
|
368 | 369 | });
|
| 370 | + |
| 371 | + it('should not store the event in the eventStore but still dispatch if the \ |
| 372 | + number of pending events is greater than the limit', async () => { |
| 373 | + const eventDispatcher = getMockDispatcher(); |
| 374 | + const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent; |
| 375 | + mockDispatch.mockResolvedValue(resolvablePromise().promise); |
| 376 | + |
| 377 | + const eventStore = getMockSyncCache<EventWithId>(); |
| 378 | + |
| 379 | + const idGenerator = new IdGenerator(); |
| 380 | + |
| 381 | + for (let i = 0; i < 505; i++) { |
| 382 | + const event = createImpressionEvent(`id-${i}`); |
| 383 | + const cacheId = idGenerator.getId(); |
| 384 | + await eventStore.set(cacheId, { id: cacheId, event }); |
| 385 | + } |
| 386 | + |
| 387 | + expect(eventStore.size()).toEqual(505); |
| 388 | + |
| 389 | + const processor = new BatchEventProcessor({ |
| 390 | + eventDispatcher, |
| 391 | + dispatchRepeater: getMockRepeater(), |
| 392 | + batchSize: 1, |
| 393 | + eventStore, |
| 394 | + }); |
| 395 | + |
| 396 | + processor.start(); |
| 397 | + await processor.onRunning(); |
| 398 | + |
| 399 | + const events: ProcessableEvent[] = []; |
| 400 | + for(let i = 0; i < 2; i++) { |
| 401 | + const event = createImpressionEvent(`id-${i}`); |
| 402 | + events.push(event); |
| 403 | + await processor.process(event) |
| 404 | + } |
| 405 | + |
| 406 | + expect(eventStore.size()).toEqual(505); |
| 407 | + expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(507); |
| 408 | + expect(eventDispatcher.dispatchEvent.mock.calls[505][0]).toEqual(buildLogEvent([events[0]])); |
| 409 | + expect(eventDispatcher.dispatchEvent.mock.calls[506][0]).toEqual(buildLogEvent([events[1]])); |
| 410 | + }); |
| 411 | + |
| 412 | + it('should store events in the eventStore when the number of events in the store\ |
| 413 | + becomes lower than the limit', async () => { |
| 414 | + const eventDispatcher = getMockDispatcher(); |
| 415 | + |
| 416 | + const dispatchResponses: ResolvablePromise<any>[] = []; |
| 417 | + |
| 418 | + const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent; |
| 419 | + mockDispatch.mockImplementation((arg) => { |
| 420 | + const dispatchResponse = resolvablePromise(); |
| 421 | + dispatchResponses.push(dispatchResponse); |
| 422 | + return dispatchResponse.promise; |
| 423 | + }); |
| 424 | + |
| 425 | + const eventStore = getMockSyncCache<EventWithId>(); |
| 426 | + |
| 427 | + const idGenerator = new IdGenerator(); |
| 428 | + |
| 429 | + for (let i = 0; i < 502; i++) { |
| 430 | + const event = createImpressionEvent(`id-${i}`); |
| 431 | + const cacheId = String(i); |
| 432 | + await eventStore.set(cacheId, { id: cacheId, event }); |
| 433 | + } |
| 434 | + |
| 435 | + expect(eventStore.size()).toEqual(502); |
| 436 | + |
| 437 | + const processor = new BatchEventProcessor({ |
| 438 | + eventDispatcher, |
| 439 | + dispatchRepeater: getMockRepeater(), |
| 440 | + batchSize: 1, |
| 441 | + eventStore, |
| 442 | + }); |
| 443 | + |
| 444 | + processor.start(); |
| 445 | + await processor.onRunning(); |
| 446 | + |
| 447 | + let events: ProcessableEvent[] = []; |
| 448 | + for(let i = 0; i < 2; i++) { |
| 449 | + const event = createImpressionEvent(`id-${i + 502}`); |
| 450 | + events.push(event); |
| 451 | + await processor.process(event) |
| 452 | + } |
| 453 | + |
| 454 | + expect(eventStore.size()).toEqual(502); |
| 455 | + expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(504); |
| 456 | + |
| 457 | + expect(eventDispatcher.dispatchEvent.mock.calls[502][0]).toEqual(buildLogEvent([events[0]])); |
| 458 | + expect(eventDispatcher.dispatchEvent.mock.calls[503][0]).toEqual(buildLogEvent([events[1]])); |
| 459 | + |
| 460 | + // resolve the dispatch for events not saved in the store |
| 461 | + dispatchResponses[502].resolve({ statusCode: 200 }); |
| 462 | + dispatchResponses[503].resolve({ statusCode: 200 }); |
| 463 | + |
| 464 | + await exhaustMicrotasks(); |
| 465 | + expect(eventStore.size()).toEqual(502); |
| 466 | + |
| 467 | + // resolve the dispatch for 3 events in store, making the store size 499 which is lower than the limit |
| 468 | + dispatchResponses[0].resolve({ statusCode: 200 }); |
| 469 | + dispatchResponses[1].resolve({ statusCode: 200 }); |
| 470 | + dispatchResponses[2].resolve({ statusCode: 200 }); |
| 471 | + |
| 472 | + await exhaustMicrotasks(); |
| 473 | + expect(eventStore.size()).toEqual(499); |
| 474 | + |
| 475 | + // process 2 more events |
| 476 | + events = []; |
| 477 | + for(let i = 0; i < 2; i++) { |
| 478 | + const event = createImpressionEvent(`id-${i + 504}`); |
| 479 | + events.push(event); |
| 480 | + await processor.process(event) |
| 481 | + } |
| 482 | + |
| 483 | + expect(eventStore.size()).toEqual(500); |
| 484 | + expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(506); |
| 485 | + expect(eventDispatcher.dispatchEvent.mock.calls[504][0]).toEqual(buildLogEvent([events[0]])); |
| 486 | + expect(eventDispatcher.dispatchEvent.mock.calls[505][0]).toEqual(buildLogEvent([events[1]])); |
| 487 | + }); |
| 488 | + |
| 489 | + it('should still dispatch events even if the store save fails', async () => { |
| 490 | + const eventDispatcher = getMockDispatcher(); |
| 491 | + const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent; |
| 492 | + mockDispatch.mockResolvedValue({}); |
| 493 | + |
| 494 | + const eventStore = getMockAsyncCache<EventWithId>(); |
| 495 | + // Simulate failure in saving to store |
| 496 | + eventStore.set = vi.fn().mockRejectedValue(new Error('Failed to save')); |
| 497 | + |
| 498 | + const dispatchRepeater = getMockRepeater(); |
| 499 | + |
| 500 | + const processor = new BatchEventProcessor({ |
| 501 | + eventDispatcher, |
| 502 | + dispatchRepeater, |
| 503 | + batchSize: 100, |
| 504 | + eventStore, |
| 505 | + }); |
| 506 | + |
| 507 | + processor.start(); |
| 508 | + await processor.onRunning(); |
| 509 | + |
| 510 | + const events: ProcessableEvent[] = []; |
| 511 | + for(let i = 0; i < 10; i++) { |
| 512 | + const event = createImpressionEvent(`id-${i}`); |
| 513 | + events.push(event); |
| 514 | + await processor.process(event) |
| 515 | + } |
| 516 | + |
| 517 | + expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(0); |
| 518 | + |
| 519 | + await dispatchRepeater.execute(0); |
| 520 | + |
| 521 | + expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1); |
| 522 | + expect(eventDispatcher.dispatchEvent.mock.calls[0][0]).toEqual(buildLogEvent(events)); |
| 523 | + }); |
369 | 524 | });
|
370 | 525 |
|
371 | 526 | it('should dispatch events when dispatchRepeater is triggered', async () => {
|
|
0 commit comments