Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add signals middleware #1220

Merged
merged 25 commits into from
Feb 3, 2025
Prev Previous commit
Next Next commit
add plugin support
silesky committed Jan 29, 2025
commit d0bebb8222b22795a99139b4db7275246aa26947
90 changes: 77 additions & 13 deletions packages/signals/signals/src/core/emitter/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Emitter } from '@segment/analytics-generic-utils'
import { logger } from '../../lib/logger'
import { Signal } from '@segment/analytics-signals-runtime'
import { SignalGlobalSettings } from '../signals'

export interface EmitSignal {
emit: (signal: Signal) => void
@@ -17,31 +17,95 @@ const logSignal = (signal: Signal) => {
)
}

export type LoadContext = {
settings: SignalGlobalSettings
writeKey: string
}

interface SignalPlugin {
/**
* Wait for this to complete before emitting signals
* Like a 'before' plugin, blocks the event pipeline
*/
load(ctx: LoadContext): Promise<void> | void
process(signal: Signal): Signal | null
}

export class SignalEmitter implements EmitSignal {
private emitter = new Emitter<{ add: [Signal] }>()
private listeners = new Set<(signal: Signal) => void>()
private middlewares: SignalPlugin[] = []
private initialized = false // Controls buffering vs eager signal processing
private signalQueue: Signal[] = [] // Buffer for signals emitted before initialization

// Add a plugin
addPlugin(plugin: SignalPlugin): void {
this.middlewares.push(plugin)
}

emit(signal: Signal) {
// Emit a signal
emit(signal: Signal): void {
logSignal(signal)
this.emitter.emit('add', signal)
if (!this.initialized) {
// Buffer the signal if not initialized
this.signalQueue.push(signal)
return
}

// Process and notify listeners
this.processAndEmit(signal)
}

subscribe(listener: (signal: Signal) => void) {
// Prevent duplicate subscriptions
// Process and emit a signal
private processAndEmit(signal: Signal): void {
// Apply plugin; drop the signal if any plugin returns null
for (const plugin of this.middlewares) {
const processed = plugin.process(signal)
if (processed === null) return // Drop the signal
}

// Notify listeners
for (const listener of this.listeners) {
listener(signal)
}
}

// Initialize the emitter, load plugin, flush the buffer, and enable eager processing
async initialize(settings: LoadContext): Promise<void> {
if (this.initialized) return

// Wait for all plugin to complete their load method
await Promise.all(this.middlewares.map((mw) => mw.load(settings)))

this.initialized = true

// Process and emit all buffered signals
while (this.signalQueue.length > 0) {
const signal = this.signalQueue.shift() as Signal
this.processAndEmit(signal)
}
}

// Subscribe a listener to signals
subscribe(listener: (signal: Signal) => void): void {
if (!this.listeners.has(listener)) {
logger.debug('subscribed')
this.listeners.add(listener)
}
this.emitter.on('add', listener)
}

unsubscribe(listener: (signal: Signal) => void) {
this.listeners.delete(listener)
logger.debug('unsubscribed')
this.emitter.off('add', listener)
// Unsubscribe a listener
unsubscribe(listener: (signal: Signal) => void): void {
if (this.listeners.delete(listener)) {
logger.debug('unsubscribed')
}
}

once(listener: (signal: Signal) => void) {
this.emitter.once('add', listener)
// Subscribe a listener to a single signal
once(listener: (signal: Signal) => void): void {
const wrappedListener = (signal: Signal) => {
this.unsubscribe(wrappedListener)
listener(signal)
}
this.subscribe(wrappedListener)
}
}
Original file line number Diff line number Diff line change
@@ -6,9 +6,10 @@ describe('OnChangeGenerator', () => {
let onChangeGenerator: OnChangeGenerator
let emitter: SignalEmitter
let unregister: () => void
beforeEach(() => {
beforeEach(async () => {
onChangeGenerator = new OnChangeGenerator()
emitter = new SignalEmitter()
await emitter.initialize({} as any)
})

afterEach(() => {
17 changes: 16 additions & 1 deletion packages/signals/signals/src/core/signals/signals.ts
Original file line number Diff line number Diff line change
@@ -39,13 +39,21 @@ export class Signals implements ISignals {
private globalSettings: SignalGlobalSettings
constructor(settingsConfig: SignalsSettingsConfig = {}) {
this.globalSettings = new SignalGlobalSettings(settingsConfig)
/**
* TODO: add an event queue inside the signal emitter
*/
this.signalEmitter = new SignalEmitter()
this.signalsClient = new SignalsIngestClient(
this.globalSettings.ingestClient
)

this.buffer = getSignalBuffer(this.globalSettings.signalBuffer)

/**
* TODO: support middleweware chain should be able to modify the signal before it is added to the buffer. This middleware chain should be something that will wait for cdn settings before it dispatches
* (e.g, you can implement a disallow list that waits for the instance and then drops the signal)
* It can be set at the emitter level, so that no signals actually get emitted until the middleware has initialized.
*/
this.signalEmitter.subscribe((signal) => {
void this.signalsClient.send(signal)
void this.buffer.add(signal)
@@ -86,6 +94,7 @@ export class Signals implements ISignals {
*/
async start(analytics: AnyAnalytics): Promise<void> {
const analyticsService = new AnalyticsService(analytics)

analyticsService.instance.on('reset', () => {
this.clearStorage()
})
@@ -101,6 +110,12 @@ export class Signals implements ISignals {
.autoInstrumentationSettings?.sampleRate ?? 0,
})

// promise will resolve once all the middleware has been initialized.
void this.signalEmitter.initialize({
settings: this.globalSettings,
writeKey: analyticsService.instance.settings.writeKey,
})

const sandbox = new Sandbox(
new SandboxSettings(this.globalSettings.sandbox)
)
@@ -110,8 +125,8 @@ export class Signals implements ISignals {
sandbox
)

// flush pre start buffer and then actually process signals
void this.flushPreStartBuffer(processor)

this.signalEmitter.subscribe(async (signal) => {
void processor.process(signal, await this.buffer.getAll())
})