Skip to content

Commit 75f0cba

Browse files
Enhancement clear queue flush (#1071)
* feat: expose methods to clear queue and get event count * feat: added test cases for clear queue and get count * refactor: refactored method names and some logic * fix: fixed test cases * feat: changed the logic to touch minimum file changes
1 parent 9bd10c1 commit 75f0cba

File tree

4 files changed

+122
-1
lines changed

4 files changed

+122
-1
lines changed

packages/core/src/analytics.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ import {
7171
SegmentError,
7272
translateHTTPError,
7373
} from './errors';
74+
import { QueueFlushingPlugin } from './plugins/QueueFlushingPlugin';
7475

7576
type OnPluginAddedCallback = (plugin: Plugin) => void;
7677

@@ -982,4 +983,46 @@ export class SegmentClient {
982983
userId: userInfo.userId,
983984
};
984985
};
986+
/* Method for clearing flush queue */
987+
clear() {
988+
const plugins = this.getPlugins();
989+
990+
plugins.forEach(async (plugin) => {
991+
if (plugin instanceof SegmentDestination) {
992+
const timelinePlugins = plugin.timeline?.plugins?.after ?? [];
993+
994+
for (const subPlugin of timelinePlugins) {
995+
if (subPlugin instanceof QueueFlushingPlugin) {
996+
await subPlugin.dequeueEvents();
997+
}
998+
}
999+
}
1000+
});
1001+
1002+
this.flushPolicyExecuter.reset();
1003+
}
1004+
1005+
/**
1006+
* Method to get count of events in flush queue.
1007+
*/
1008+
async pendingEvents() {
1009+
const plugins = this.getPlugins();
1010+
let totalEventsCount = 0;
1011+
1012+
for (const plugin of plugins) {
1013+
// We're looking inside SegmentDestination's `after` plugins
1014+
if (plugin instanceof SegmentDestination) {
1015+
const timelinePlugins = plugin.timeline?.plugins?.after ?? [];
1016+
1017+
for (const subPlugin of timelinePlugins) {
1018+
if (subPlugin instanceof QueueFlushingPlugin) {
1019+
const eventsCount = await subPlugin.pendingEvents();
1020+
totalEventsCount += eventsCount;
1021+
}
1022+
}
1023+
}
1024+
}
1025+
1026+
return totalEventsCount;
1027+
}
9851028
}

packages/core/src/plugins/QueueFlushingPlugin.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,4 +130,20 @@ export class QueueFlushingPlugin extends UtilityPlugin {
130130
return { events: filteredEvents };
131131
});
132132
}
133+
/**
134+
* Clear all events from the queue
135+
*/
136+
async dequeueEvents() {
137+
await this.queueStore?.dispatch(() => {
138+
return { events: [] };
139+
});
140+
}
141+
142+
/**
143+
* * Returns the count of items in the queue
144+
*/
145+
async pendingEvents() {
146+
const events = (await this.queueStore?.getState(true))?.events ?? [];
147+
return events.length;
148+
}
133149
}

packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,67 @@ describe('QueueFlushingPlugin', () => {
7777
// @ts-ignore
7878
expect(queuePlugin.queueStore?.getState().events).toHaveLength(0);
7979
});
80+
it('should clear all events from the queue', async () => {
81+
const onFlush = jest.fn().mockResolvedValue(undefined);
82+
const queuePlugin = setupQueuePlugin(onFlush, 10);
83+
const event1: SegmentEvent = {
84+
type: EventType.TrackEvent,
85+
event: 'test1',
86+
properties: {
87+
test: 'test1',
88+
},
89+
};
90+
const event2: SegmentEvent = {
91+
type: EventType.TrackEvent,
92+
event: 'test2',
93+
properties: {
94+
test: 'test2',
95+
},
96+
};
97+
await queuePlugin.execute(event1);
98+
await queuePlugin.execute(event2);
99+
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
100+
// @ts-ignore
101+
expect(queuePlugin.queueStore?.getState().events).toHaveLength(2);
102+
await queuePlugin.dequeueEvents();
103+
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
104+
// @ts-ignore
105+
expect(queuePlugin.queueStore?.getState().events).toHaveLength(0);
106+
});
107+
it('should return the count of items in the queue', async () => {
108+
const onFlush = jest.fn().mockResolvedValue(undefined);
109+
const queuePlugin = setupQueuePlugin(onFlush, 10);
110+
111+
const event1: SegmentEvent = {
112+
type: EventType.TrackEvent,
113+
event: 'test1',
114+
properties: {
115+
test: 'test1',
116+
},
117+
};
118+
119+
const event2: SegmentEvent = {
120+
type: EventType.TrackEvent,
121+
event: 'test2',
122+
properties: {
123+
test: 'test2',
124+
},
125+
};
126+
127+
await queuePlugin.execute(event1);
128+
await queuePlugin.execute(event2);
129+
130+
let eventsCount = await queuePlugin.pendingEvents();
131+
expect(eventsCount).toBe(2);
132+
133+
await queuePlugin.dequeue(event1);
134+
135+
eventsCount = await queuePlugin.pendingEvents();
136+
expect(eventsCount).toBe(1);
137+
138+
await queuePlugin.dequeueEvents();
139+
140+
eventsCount = await queuePlugin.pendingEvents();
141+
expect(eventsCount).toBe(0);
142+
});
80143
});

packages/core/src/storage/types.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ export interface Storage {
9292
readonly pendingEvents: Watchable<SegmentEvent[]> &
9393
Settable<SegmentEvent[]> &
9494
Queue<SegmentEvent, SegmentEvent[]>;
95-
9695
readonly enabled: Watchable<boolean> & Settable<boolean>;
9796
}
9897
export type DeepLinkData = {

0 commit comments

Comments
 (0)