Skip to content

Commit

Permalink
feat: add worker actor
Browse files Browse the repository at this point in the history
Signed-off-by: aabidsofi19 <[email protected]>
  • Loading branch information
aabidsofi19 committed Jun 5, 2024
1 parent 3ae0b3a commit 82be540
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 2 deletions.
21 changes: 21 additions & 0 deletions src/actors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,28 @@ export {
selectValidationResults
} from './validators/dataValidator';

export * from './worker';

export {
REDUX_COMMANDS,
REDUX_EVENTS,
reduxActor,
reduxCommands,
reduxEvents,
type REXUX_ACTOR_EVENTS
} from './reduxActor';

export {
RTK_EVENTS,
rtkQueryActor,
rtkQueryActorCommands,
rtkQueryActorEvents
} from './rtkQueryActor';

export const REEE = 'xxx';

export {
DeferEvents,
XSTATE_DEBUG_EVENT,
deadLetter,
forwardToActors,
Expand Down
34 changes: 33 additions & 1 deletion src/actors/utils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// disbale stict no any for now for full file
/* eslint-disable @typescript-eslint/no-explicit-any */

import { AnyActorRef, AnyEventObject, enqueueActions, sendTo } from 'xstate';
import { AnyActorRef, AnyEventObject, assign, enqueueActions, sendTo } from 'xstate';
import { AnyActorSystem } from 'xstate/dist/declarations/src/system';

type ContextWithReturnAddress = { returnAddress: AnyActorRef };
Expand Down Expand Up @@ -43,3 +43,35 @@ export const reply = (eventFn: (actionArgs: any, params: any) => AnyEventObject)
sendTo(({ context }: { context: ContextWithReturnAddress }) => context.returnAddress, eventFn);

export const XSTATE_DEBUG_EVENT = 'XSTATE_DEBUG_EVENT';

type deferredEventsQueue = AnyEventObject[];

interface DeferredEventsQueueContext {
deferredEventsQueue: deferredEventsQueue;
}

interface deferActionParams {
event: AnyEventObject;
context: DeferredEventsQueueContext;
}

const defer = assign({
deferredEventsQueue: ({ context: { deferredEventsQueue }, event }: deferActionParams) => [
...deferredEventsQueue,
event
]
});

const recall = enqueueActions(({ context: { deferredEventsQueue }, enqueue }) => {
enqueue.assign({
deferredEventsQueue: []
});
for (const event of deferredEventsQueue) {
enqueue.raise(event);
}
});

export const DeferEvents = {
defer,
recall
};
2 changes: 1 addition & 1 deletion src/actors/validators/dataValidator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ export const dataValidatorMachine = setup({
type ValidationMachineSnapshot = SnapshotFrom<typeof dataValidatorMachine>;

export const selectValidationResults = (state: ValidationMachineSnapshot) =>
state.context.validationResults;
state.context?.validationResults;

export const selectIsValidating = (state: ValidationMachineSnapshot) =>
state.matches('validatingData');
Empty file added src/actors/worker/README.md
Empty file.
47 changes: 47 additions & 0 deletions src/actors/worker/events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { AnyEventObject } from 'xstate';

export const WORKER_COMMANDS = {
START_ACTOR: 'START_ACTOR',
STOP_ACTOR: 'STOP_ACTOR',
SEND_EVENT: 'SEND_EVENT',
GET_STATE: 'GET_STATE'
};

export const workerCommands = {
startActor: () => ({ type: WORKER_COMMANDS.START_ACTOR }),
stopActor: () => ({ type: WORKER_COMMANDS.STOP_ACTOR }),
sendEvent: (event: AnyEventObject) => ({ type: WORKER_COMMANDS.SEND_EVENT, event }),
getState: () => ({ type: WORKER_COMMANDS.GET_STATE })
};

export interface PROXY_EVENT {
type: 'PROXY_EVENT';
data: {
event: AnyEventObject;
to: string;
};
}

export interface STATE_SNAPSHOT_EVENT {
type: 'STATE_SNAPSHOT';
data: {
snapshot: unknown;
};
}

export const workerEvents = {
proxyEvent: (event: AnyEventObject, to: string) => ({
type: 'PROXY_EVENT',
data: { event, to }
}),

stateSnapshot: (snapshot: unknown) => ({
type: 'STATE_SNAPSHOT',
data: { snapshot }
})
};

export const WORKER_EVENTS = {
STATE_SNAPSHOT: 'STATE_SNAPSHOT',
PROXY_EVENT: 'PROXY_EVENT'
};
132 changes: 132 additions & 0 deletions src/actors/worker/fromWorkerfiedActor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import {
ActorLogic,
AnyEventObject,
AnyMachineSnapshot,
EventObject,
NonReducibleUnknown,
StateValue,
matchesState
} from 'xstate';
import { AnyActorSystem } from 'xstate/dist/declarations/src/system';
import { STATE_SNAPSHOT_EVENT, WORKER_EVENTS, workerCommands } from './events';

const instanceStates = /* #__PURE__ */ new WeakMap();

type WorkerInput = NonReducibleUnknown;

type WorkerSnapshot = AnyMachineSnapshot;
type WorkerActorLogic<TEvent extends EventObject, TInput = NonReducibleUnknown> = ActorLogic<
WorkerSnapshot,
TEvent,
TInput,
AnyActorSystem,
EventObject // TEmitted
>;

interface ProxyEvent {
type: 'PROXY_EVENT';
data: {
event: AnyEventObject;
to: string;
};
}

export const fromWorkerfiedActor = (
worker: Worker
): WorkerActorLogic<EventObject, WorkerInput> => ({
config: Worker,

start: (state, actorScope) => {
const { self, system } = actorScope;
console.log('Starting fromWorkerActor [+]', state, actorScope);
worker.postMessage(workerCommands.startActor());
console.log('Worker created fromWorkerActor [-]', worker);
const workerState = {
worker,
snapshot: null
};

worker.addEventListener('message', (event) => {
const eventFromWorker = event.data as AnyEventObject;

console.log('message received from worker', eventFromWorker, event);
console.log(
'matching',
eventFromWorker.type,
WORKER_EVENTS.STATE_SNAPSHOT,
eventFromWorker.type === WORKER_EVENTS.STATE_SNAPSHOT
);
if (eventFromWorker.type == 'STATE_SNAPSHOT') {
// workerState.snapshot = event.data.snapshot;
console.log('passing snapshot', eventFromWorker.data.snapshot);
self.send(eventFromWorker);
return state;
}

if (event.type === WORKER_EVENTS.PROXY_EVENT) {
const proxyEvent = event as ProxyEvent;
if (proxyEvent.data.to === 'parent' && self._parent) {
console.log('Relaying to parent', proxyEvent.data);
self._parent.send(proxyEvent.data.event);
return state;
}

system.get(proxyEvent.data.to).send(proxyEvent.data.event);
return state;
}
});

instanceStates.set(self, workerState);
},
transition: (state, event, actorScope) => {
const { self } = actorScope;
const workerState = instanceStates.get(self);
console.log('Transitioning fromWorkerActor...', state, event, actorScope, workerState);
if (event.type === 'xstate.stop') {
console.log('Stopping fromWorkerActor...', state, event, actorScope);
workerState.worker.postMessage(workerCommands.stopActor());
workerState.worker.terminate();
return {
...state,
status: 'stopped',
error: undefined
};
}
if (event.type == WORKER_EVENTS.STATE_SNAPSHOT) {
const snapshot = (event as STATE_SNAPSHOT_EVENT).data.snapshot;
console.log('Syncing snapshot with worker', snapshot);
return {
...state,
...(snapshot || {})
};
}

workerState.worker.postMessage(workerCommands.sendEvent(event));
const nextState = {
...state
// matches: (value: StateValue) => matchesState(value, state?.value)
};
console.log('Transitioned fromWorkerActor...', workerState, nextState);
return nextState;
},
getInitialSnapshot: (_, input) => {
return {
status: 'active',
output: undefined,
error: undefined,
value: 'created',
input,
tags: [],
historyValue: undefined,
context: {},
matches: function (value: StateValue) {
const currentValue = (this as WorkerSnapshot).value;
console.log('Matching', currentValue, value);
return matchesState(currentValue, value);
}
} as unknown as AnyMachineSnapshot;
},

getPersistedSnapshot: (snapshot) => snapshot,
restoreSnapshot: (snapshot) => snapshot as WorkerSnapshot
});
2 changes: 2 additions & 0 deletions src/actors/worker/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { fromWorkerfiedActor } from './fromWorkerfiedActor';
export { workerfyActor } from './workerfy';
92 changes: 92 additions & 0 deletions src/actors/worker/workerfy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { AnyActorLogic, AnyActorRef, Subscription, createActor, setup } from 'xstate';
import { WORKER_COMMANDS, workerEvents } from './events';

interface ProxyActorContext {
proxyToId: string;
}

interface ProxyActorInput {
proxyToId: string;
}

const ProxyActor = setup({
types: {
context: {} as ProxyActorContext,
input: {} as ProxyActorInput
}
}).createMachine({
id: 'proxy-actor',
initial: 'idle',
context: ({ input }) => ({
proxyToId: input.proxyToId
}),

states: {
idle: {
on: {
'*': {
actions: [
({ event, context }) => console.log('Proxying event', event, 'to', context.proxyToId),
({ event, context }) => postMessage(workerEvents.proxyEvent(event, context.proxyToId))
]
}
}
}
}
});

const syncSnapshot = (actorRef: AnyActorRef) => {
return actorRef.subscribe((snapshot) => {
console.log('Worker sending state snapshot...', snapshot.toJSON());
postMessage(workerEvents.stateSnapshot(snapshot.toJSON()));
});
};

export const workerfyActor = (actor: AnyActorLogic) => {
let actorRef: AnyActorRef | null = null;
let snapshotSubscription: Subscription | null = null;
const parentProxy = createActor(ProxyActor, {
input: {
proxyToId: 'parent'
}
}).start();

addEventListener('message', (event) => {
console.log('Worker received message', event.data);

if (event.data.type === WORKER_COMMANDS.START_ACTOR) {
actorRef = createActor(actor, {
input: event.data.input,
parent: parentProxy
});

console.log('Worker created actor', actor);
snapshotSubscription = syncSnapshot(actorRef);
actorRef.start();
console.log('Worker starting actor...');
console.log('Worker started state Subscription', snapshotSubscription);
}

if (event.data.type === WORKER_COMMANDS.STOP_ACTOR) {
snapshotSubscription?.unsubscribe && snapshotSubscription.unsubscribe();
actorRef?.stop && actorRef.stop();
}

if (event.data.type === WORKER_COMMANDS.SEND_EVENT) {
if (!actorRef) {
throw new Error('Cannot send event to uninitialized actor');
}
actorRef.send(event.data.event);
}

if (event.data.type === WORKER_COMMANDS.GET_STATE) {
if (!actorRef) {
throw new Error('Cannot get state of uninitialized actor');
}
const snapshot = actorRef.getSnapshot().toJSON();
postMessage(workerEvents.stateSnapshot(snapshot));
}
});

return actorRef;
};

0 comments on commit 82be540

Please sign in to comment.