Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add signals middleware #1220

Merged
merged 25 commits into from
Feb 3, 2025
Prev Previous commit
Next Next commit
wip
silesky committed Jan 30, 2025
commit a2c5825bb56b41d3b07b0522eb54176cb6c28187
38 changes: 16 additions & 22 deletions packages/signals/signals/src/core/emitter/index.ts
Original file line number Diff line number Diff line change
@@ -17,31 +17,26 @@ const logSignal = (signal: Signal) => {
)
}

export type LoadContext = {
export interface SignalsMiddlewareContext {
settings: SignalGlobalSettings
writeKey: string
}

interface SignalPlugin {
export interface SignalsMiddleware {
/**
* Wait for this to complete before emitting signals
* Like a 'before' plugin, blocks the event pipeline
* Wait for .load to complete before emitting signals
* This blocks the signal emitter until all plugins are loaded.
*/
load(ctx: LoadContext): Promise<void> | void
load(ctx: SignalsMiddlewareContext): Promise<void> | void
process(signal: Signal): Signal | null
}

export class SignalEmitter implements EmitSignal {
private listeners = new Set<(signal: Signal) => void>()
private middlewares: SignalPlugin[] = []
private middlewares: SignalsMiddleware[] = []
private initialized = false // Controls buffering vs eager signal processing
private signalQueue: Signal[] = [] // Buffer for signals emitted before initialization

// Add a plugin
addPlugin(plugin: SignalPlugin): void {
this.middlewares.push(plugin)
}

// Emit a signal
emit(signal: Signal): void {
logSignal(signal)
@@ -55,6 +50,11 @@ export class SignalEmitter implements EmitSignal {
this.processAndEmit(signal)
}

// Register custom signals middleware, to drop signals or modify them before they are emitted.
register(middleware: SignalsMiddleware): void {
this.middlewares.push(middleware)
}

// Process and emit a signal
private processAndEmit(signal: Signal): void {
// Apply plugin; drop the signal if any plugin returns null
@@ -70,7 +70,7 @@ export class SignalEmitter implements EmitSignal {
}

// Initialize the emitter, load plugin, flush the buffer, and enable eager processing
async initialize(settings: LoadContext): Promise<void> {
async initialize(settings: SignalsMiddlewareContext): Promise<void> {
if (this.initialized) return

// Wait for all plugin to complete their load method
@@ -85,7 +85,10 @@ export class SignalEmitter implements EmitSignal {
}
}

// Subscribe a listener to signals -- equivilant to a destination plugin?
/**
* Listen to signals emitted, once they have travelled through the plugin pipeline.
* This is equivalent to a destination plugin.
*/
subscribe(listener: (signal: Signal) => void): void {
if (!this.listeners.has(listener)) {
logger.debug('subscribed')
@@ -99,13 +102,4 @@ export class SignalEmitter implements EmitSignal {
logger.debug('unsubscribed')
}
}

// Subscribe a listener to a single signal
once(listener: (signal: Signal) => void): void {
const wrappedListener = (signal: Signal) => {
this.unsubscribe(wrappedListener)
listener(signal)
}
this.subscribe(wrappedListener)
}
}