Skip to content
Snippets Groups Projects
Commit 4d1d0100 authored by Alex Yang's avatar Alex Yang
Browse files

feat(workflow): remove `new` syntax

parent bf56fc08
No related branches found
No related tags found
No related merge requests found
import { CustomEvent, randomUUID } from "@llamaindex/env";
import {
type AnyWorkflowEventConstructor,
StartEvent,
type StartEventConstructor,
StopEvent,
type StopEventConstructor,
WorkflowEvent,
startEvent,
stopEvent,
type StartEvent,
type StartEventInstance,
type StopEventInstance,
type WorkflowEvent,
type WorkflowEventInstance,
} from "./workflow-event";
export type StepHandler<
Data = unknown,
Inputs extends [
AnyWorkflowEventConstructor | StartEventConstructor,
...(AnyWorkflowEventConstructor | StopEventConstructor)[],
] = [AnyWorkflowEventConstructor | StartEventConstructor],
Out extends (AnyWorkflowEventConstructor | StopEventConstructor)[] = [],
Inputs extends [WorkflowEvent, ...WorkflowEvent[]] = [WorkflowEvent],
Out extends WorkflowEvent[] = [],
> = (
context: HandlerContext<Data>,
...events: {
[K in keyof Inputs]: InstanceType<Inputs[K]>;
[K in keyof Inputs]: ReturnType<Inputs[K]>;
}
) => Promise<
Out extends []
? void
: {
[K in keyof Out]: InstanceType<Out[K]>;
[K in keyof Out]: ReturnType<Out[K]>;
}[number]
>;
export type ReadonlyStepMap<Data> = ReadonlyMap<
StepHandler<Data, never, never>,
{
inputs: AnyWorkflowEventConstructor[];
outputs: AnyWorkflowEventConstructor[];
inputs: WorkflowEvent[];
outputs: WorkflowEvent[];
}
>;
......@@ -41,7 +39,7 @@ type GlobalEvent = typeof globalThis.Event;
export type Wait = () => Promise<void>;
export type ContextParams<Start, Stop, Data> = {
startEvent: StartEvent<Start>;
startEvent: StartEventInstance<Start>;
contextData: Data;
steps: ReadonlyStepMap<Data>;
timeout: number | null;
......@@ -49,19 +47,16 @@ export type ContextParams<Start, Stop, Data> = {
wait: Wait;
queue: QueueProtocol[] | undefined;
pendingInputQueue: WorkflowEvent<unknown>[] | undefined;
resolved: StopEvent<Stop> | null | undefined;
pendingInputQueue: WorkflowEventInstance[] | undefined;
resolved: StopEventInstance<Stop> | null | undefined;
rejected: Error | null | undefined;
};
function flattenEvents(
acceptEventTypes: AnyWorkflowEventConstructor[],
inputEvents: WorkflowEvent<unknown>[],
): WorkflowEvent<unknown>[] {
const eventMap = new Map<
AnyWorkflowEventConstructor,
WorkflowEvent<unknown>
>();
acceptEventTypes: WorkflowEvent[],
inputEvents: WorkflowEventInstance[],
): WorkflowEventInstance[] {
const eventMap = new Map<WorkflowEvent, WorkflowEventInstance>();
for (const event of inputEvents) {
for (const acceptType of acceptEventTypes) {
......@@ -77,31 +72,29 @@ function flattenEvents(
export type HandlerContext<Data = unknown> = {
get data(): Data;
sendEvent(event: WorkflowEvent<unknown>): void;
requireEvent<T extends AnyWorkflowEventConstructor>(
event: T,
): Promise<InstanceType<T>>;
sendEvent(event: WorkflowEventInstance): void;
requireEvent<T extends WorkflowEvent>(event: T): Promise<ReturnType<T>>;
};
export type QueueProtocol =
| {
type: "event";
event: WorkflowEvent<unknown>;
event: WorkflowEventInstance;
}
| {
type: "requestEvent";
id: string;
requestEvent: AnyWorkflowEventConstructor;
requestEvent: WorkflowEvent;
};
export class WorkflowContext<Start = string, Stop = string, Data = unknown>
implements
AsyncIterable<WorkflowEvent<unknown>, unknown, void>,
Promise<StopEvent<Stop>>
AsyncIterable<WorkflowEventInstance, unknown, void>,
Promise<StopEventInstance<Stop>>
{
readonly #steps: ReadonlyStepMap<Data>;
readonly #startEvent: StartEvent<Start>;
readonly #startEvent: ReturnType<StartEvent>;
readonly #queue: QueueProtocol[] = [];
readonly #queueEventTarget = new EventTarget();
readonly #wait: Wait;
......@@ -111,32 +104,20 @@ export class WorkflowContext<Start = string, Stop = string, Data = unknown>
#data: Data;
#stepCache: WeakMap<
WorkflowEvent<unknown>,
WorkflowEventInstance,
[
step: Set<StepHandler<Data, never, never>>,
stepInputs: WeakMap<
StepHandler<Data, never, never>,
AnyWorkflowEventConstructor[]
>,
stepOutputs: WeakMap<
StepHandler<Data, never, never>,
AnyWorkflowEventConstructor[]
>,
stepInputs: WeakMap<StepHandler<Data, never, never>, WorkflowEvent[]>,
stepOutputs: WeakMap<StepHandler<Data, never, never>, WorkflowEvent[]>,
]
> = new Map();
#getStepFunction(
event: WorkflowEvent<unknown>,
event: WorkflowEventInstance,
): [
step: Set<StepHandler<Data, never, never>>,
stepInputs: WeakMap<
StepHandler<Data, never, never>,
AnyWorkflowEventConstructor[]
>,
stepOutputs: WeakMap<
StepHandler<Data, never, never>,
AnyWorkflowEventConstructor[]
>,
stepInputs: WeakMap<StepHandler<Data, never, never>, WorkflowEvent[]>,
stepOutputs: WeakMap<StepHandler<Data, never, never>, WorkflowEvent[]>,
] {
if (this.#stepCache.has(event)) {
return this.#stepCache.get(event)!;
......@@ -144,22 +125,16 @@ export class WorkflowContext<Start = string, Stop = string, Data = unknown>
const set = new Set<StepHandler<Data, never, never>>();
const stepInputs = new WeakMap<
StepHandler<Data, never, never>,
AnyWorkflowEventConstructor[]
WorkflowEvent[]
>();
const stepOutputs = new WeakMap<
StepHandler<Data, never, never>,
AnyWorkflowEventConstructor[]
WorkflowEvent[]
>();
const res: [
step: Set<StepHandler<Data, never, never>>,
stepInputs: WeakMap<
StepHandler<Data, never, never>,
AnyWorkflowEventConstructor[]
>,
stepOutputs: WeakMap<
StepHandler<Data, never, never>,
AnyWorkflowEventConstructor[]
>,
stepInputs: WeakMap<StepHandler<Data, never, never>, WorkflowEvent[]>,
stepOutputs: WeakMap<StepHandler<Data, never, never>, WorkflowEvent[]>,
] = [set, stepInputs, stepOutputs];
this.#stepCache.set(event, res);
for (const [step, { inputs, outputs }] of this.#steps) {
......@@ -208,30 +183,30 @@ export class WorkflowContext<Start = string, Stop = string, Data = unknown>
}
// make sure it will only be called once
#iterator: AsyncIterableIterator<WorkflowEvent<unknown>> | null = null;
#iterator: AsyncIterableIterator<WorkflowEventInstance> | null = null;
#signal: AbortSignal | null = null;
get #iteratorSingleton(): AsyncIterableIterator<WorkflowEvent<unknown>> {
get #iteratorSingleton(): AsyncIterableIterator<WorkflowEventInstance> {
if (this.#iterator === null) {
this.#iterator = this.#createStreamEvents();
}
return this.#iterator;
}
[Symbol.asyncIterator](): AsyncIterableIterator<WorkflowEvent<unknown>> {
[Symbol.asyncIterator](): AsyncIterableIterator<WorkflowEventInstance> {
return this.#iteratorSingleton;
}
#sendEvent = (event: WorkflowEvent<unknown>): void => {
#sendEvent = (event: WorkflowEventInstance): void => {
this.#queue.push({
type: "event",
event,
});
};
#requireEvent = async <T extends AnyWorkflowEventConstructor>(
#requireEvent = async <T extends WorkflowEvent>(
event: T,
): Promise<InstanceType<T>> => {
): Promise<ReturnType<T>> => {
const requestId = randomUUID();
this.#queue.push({
type: "requestEvent",
......@@ -252,7 +227,7 @@ export class WorkflowContext<Start = string, Stop = string, Data = unknown>
});
};
#pendingInputQueue: WorkflowEvent<unknown>[] = [];
#pendingInputQueue: WorkflowEventInstance[] = [];
// if strict mode is enabled, it will throw an error if there's input or output events are not expected
#strict = false;
......@@ -273,11 +248,11 @@ export class WorkflowContext<Start = string, Stop = string, Data = unknown>
* if you want stop immediately once reach a StopEvent, you should handle it in the other side.
* @private
*/
#createStreamEvents(): AsyncIterableIterator<WorkflowEvent<unknown>> {
const isPendingEvents = new WeakSet<WorkflowEvent<unknown>>();
const pendingTasks = new Set<Promise<WorkflowEvent<unknown> | void>>();
const enqueuedEvents = new Set<WorkflowEvent<unknown>>();
const stream = new ReadableStream<WorkflowEvent<unknown>>({
#createStreamEvents(): AsyncIterableIterator<WorkflowEventInstance> {
const isPendingEvents = new WeakSet<WorkflowEventInstance>();
const pendingTasks = new Set<Promise<WorkflowEventInstance | void>>();
const enqueuedEvents = new Set<WorkflowEventInstance>();
const stream = new ReadableStream<WorkflowEventInstance>({
start: async (controller) => {
while (true) {
const eventProtocol = this.#queue.shift();
......@@ -325,15 +300,15 @@ export class WorkflowContext<Start = string, Stop = string, Data = unknown>
}
const [steps, inputsMap, outputsMap] =
this.#getStepFunction(event);
const nextEventPromises: Promise<WorkflowEvent<unknown> | void>[] =
const nextEventPromises: Promise<WorkflowEventInstance | void>[] =
[...steps]
.map((step) => {
const inputs = [...(inputsMap.get(step) ?? [])];
const acceptableInputs: WorkflowEvent<unknown>[] =
const acceptableInputs: WorkflowEventInstance[] =
this.#pendingInputQueue.filter((event) =>
inputs.some((input) => event instanceof input),
);
const events: WorkflowEvent<unknown>[] = flattenEvents(
const events: WorkflowEventInstance[] = flattenEvents(
inputs,
[event, ...acceptableInputs],
);
......@@ -380,16 +355,12 @@ export class WorkflowContext<Start = string, Stop = string, Data = unknown>
},
// @ts-expect-error IDK why
...events.sort((a, b) => {
const aIndex = inputs.indexOf(
a.constructor as AnyWorkflowEventConstructor,
);
const bIndex = inputs.indexOf(
b.constructor as AnyWorkflowEventConstructor,
);
const aIndex = inputs.findIndex((i) => i.same(a));
const bIndex = inputs.findIndex((i) => i.same(b));
return aIndex - bIndex;
}),
)
.then((nextEvent: void | WorkflowEvent<unknown>) => {
.then((nextEvent: void | WorkflowEventInstance) => {
if (nextEvent === undefined) {
return;
}
......@@ -415,7 +386,7 @@ export class WorkflowContext<Start = string, Stop = string, Data = unknown>
);
}
}
if (!(nextEvent instanceof StopEvent)) {
if (!stopEvent.same(nextEvent)) {
this.#pendingInputQueue.unshift(nextEvent);
this.#sendEvent(nextEvent);
}
......@@ -506,12 +477,12 @@ export class WorkflowContext<Start = string, Stop = string, Data = unknown>
// PromiseLike implementation, this is following the Promise/A+ spec
// It will consume the iterator and resolve the promise once it reaches the StopEvent
// If you want to customize the behavior, you can use the async iterator directly
#resolved: StopEvent<Stop> | null = null;
#resolved: StopEventInstance<Stop> | null = null;
#rejected: Error | null = null;
async then<TResult1, TResult2 = never>(
onfulfilled?:
| ((value: StopEvent<Stop>) => TResult1 | PromiseLike<TResult1>)
| ((value: StopEventInstance<Stop>) => TResult1 | PromiseLike<TResult1>)
| null
| undefined,
onrejected?:
......@@ -545,12 +516,12 @@ export class WorkflowContext<Start = string, Stop = string, Data = unknown>
if (this.#rejected !== null) {
return onrejected?.(this.#rejected);
}
if (event instanceof StartEvent) {
if (startEvent.same<Start>(event)) {
if (this.#verbose) {
console.log(`Starting workflow with event ${event}`);
}
}
if (event instanceof StopEvent) {
if (stopEvent.same<Stop>(event)) {
if (this.#verbose && this.#pendingInputQueue.length > 0) {
// fixme: #pendingInputQueue might should be cleanup correctly?
}
......@@ -609,16 +580,12 @@ export class WorkflowContext<Start = string, Stop = string, Data = unknown>
const jsonString = JSON.stringify(state, (_, value) => {
// If value is an instance of a class, serialize only its properties
if (value instanceof WorkflowEvent) {
return { data: value.data, constructor: value.constructor.name };
if ("type" in value && "data" in value) {
return { data: value.data, type: value.type };
}
// value is Subtype of WorkflowEvent
if (
typeof value === "object" &&
value !== null &&
value?.prototype instanceof WorkflowEvent
) {
return { constructor: value.prototype.constructor.name };
if (typeof value === "object" && value !== null && "type" in value) {
return { type: value.type };
}
return value;
});
......
export class WorkflowEvent<Data> {
displayName: string;
data: Data;
constructor(data: Data) {
this.data = data;
this.displayName = this.constructor.name;
}
toString() {
return this.displayName;
}
static or<
A extends AnyWorkflowEventConstructor,
B extends AnyWorkflowEventConstructor,
>(AEvent: A, BEvent: B): A | B {
function OrEvent() {
throw new Error("Cannot instantiate OrEvent");
export type WorkflowEventInstance<
Data = unknown,
Type extends string = string,
> = {
type: Type;
__data: Data;
};
export type WorkflowEvent<Type extends string = string> = {
<Data>(data: Data): WorkflowEventInstance<Data, Type>;
same: <Data>(
instance: unknown,
) => instance is WorkflowEventInstance<Data, Type>;
type: Type;
};
export const workflowEvent = <Type extends string>(
eventType: Type,
): WorkflowEvent<Type> => {
const event = <Data>(data: Data): WorkflowEventInstance<Data, Type> => {
return {
type: eventType,
__data: data,
};
};
event.same = <Data>(
instance: unknown,
): instance is WorkflowEventInstance<Data, Type> => {
if (typeof instance !== "object" || instance === null) {
return false;
}
OrEvent.prototype = Object.create(AEvent.prototype);
Object.getOwnPropertyNames(BEvent.prototype).forEach((property) => {
if (!(property in OrEvent.prototype)) {
Object.defineProperty(
OrEvent.prototype,
property,
Object.getOwnPropertyDescriptor(BEvent.prototype, property)!,
);
}
});
OrEvent.prototype.constructor = OrEvent;
Object.defineProperty(OrEvent, Symbol.hasInstance, {
value: function (instance: unknown) {
return instance instanceof AEvent || instance instanceof BEvent;
},
});
return OrEvent as unknown as A | B;
}
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type AnyWorkflowEventConstructor = new (data: any) => WorkflowEvent<any>;
export type StartEventConstructor<T = string> = new (data: T) => StartEvent<T>;
export type StopEventConstructor<T = string> = new (data: T) => StopEvent<T>;
if (!("__data" in instance)) {
return false;
}
if (!("type" in instance)) {
return false;
}
return instance.type === eventType;
};
event.type = eventType;
return event;
};
// These are special events that are used to control the workflow
export class StartEvent<T = string> extends WorkflowEvent<T> {
constructor(data: T) {
super(data);
}
}
export type StartEvent = WorkflowEvent<"start">;
export type StartEventInstance<D> = WorkflowEventInstance<D, "start">;
export const startEvent = workflowEvent("start");
export class StopEvent<T = string> extends WorkflowEvent<T> {
constructor(data: T) {
super(data);
}
}
export type StopEvent = WorkflowEvent<"stop">;
export type StopEventInstance<D> = WorkflowEventInstance<D, "stop">;
export const stopEvent = workflowEvent("stop");
......@@ -6,16 +6,17 @@ import {
type Wait,
} from "./workflow-context.js";
import {
StartEvent,
StopEvent,
type AnyWorkflowEventConstructor,
type StartEventConstructor,
type StopEventConstructor,
startEvent,
stopEvent,
type StartEvent,
type StartEventInstance,
type StopEvent,
type WorkflowEvent,
} from "./workflow-event.js";
export type StepParameters<
In extends AnyWorkflowEventConstructor[],
Out extends AnyWorkflowEventConstructor[],
In extends WorkflowEvent[],
Out extends WorkflowEvent[],
> = {
inputs: In;
outputs: Out;
......@@ -25,8 +26,8 @@ export class Workflow<ContextData, Start, Stop> {
#steps: Map<
StepHandler<ContextData, never, never>,
{
inputs: AnyWorkflowEventConstructor[];
outputs: AnyWorkflowEventConstructor[];
inputs: WorkflowEvent[];
outputs: WorkflowEvent[];
}
> = new Map();
#verbose: boolean = false;
......@@ -53,23 +54,20 @@ export class Workflow<ContextData, Start, Stop> {
}
addStep<
const In extends [
AnyWorkflowEventConstructor | StartEventConstructor,
...(AnyWorkflowEventConstructor | StopEventConstructor)[],
],
const Out extends (AnyWorkflowEventConstructor | StopEventConstructor)[],
const In extends [WorkflowEvent | StartEvent, ...WorkflowEvent[]],
const Out extends (WorkflowEvent | StopEvent)[],
>(
parameters: StepParameters<In, Out>,
stepFn: (
context: HandlerContext<ContextData>,
...events: {
[K in keyof In]: InstanceType<In[K]>;
[K in keyof In]: ReturnType<In[K]>;
}
) => Promise<
Out extends []
? void
: {
[K in keyof Out]: InstanceType<Out[K]>;
[K in keyof Out]: ReturnType<Out[K]>;
}[number]
>,
): this {
......@@ -88,23 +86,20 @@ export class Workflow<ContextData, Start, Stop> {
}
run(
event: StartEvent<Start> | Start,
event: StartEventInstance<Start> | Start,
): unknown extends ContextData
? WorkflowContext<Start, Stop, ContextData>
: WorkflowContext<Start, Stop, ContextData | undefined>;
run<Data extends ContextData>(
event: StartEvent<Start> | Start,
event: StartEventInstance<Start> | Start,
data: Data,
): WorkflowContext<Start, Stop, Data>;
run<Data extends ContextData>(
event: StartEvent<Start> | Start,
event: StartEventInstance<Start> | Start,
data?: Data,
): WorkflowContext<Start, Stop, Data> {
const startEvent: StartEvent<Start> =
event instanceof StartEvent ? event : new StartEvent(event);
return new WorkflowContext<Start, Stop, Data>({
startEvent,
startEvent: startEvent.same<Start>(event) ? event : startEvent(event),
wait: this.#nextTick,
contextData: data!,
steps: new Map(this.#steps),
......@@ -122,7 +117,7 @@ export class Workflow<ContextData, Start, Stop> {
const state = JSON.parse(jsonString);
const reconstructedStartEvent = new StartEvent<Start>(state.startEvent);
const reconstructedStartEvent = startEvent(state.startEvent);
const AllEvents = [...this.#steps]
.map(([, { inputs, outputs }]) => [...inputs, ...(outputs ?? [])])
.flat();
......@@ -159,7 +154,7 @@ export class Workflow<ContextData, Start, Stop> {
}
return {
type: "event",
event: new EventType(event.data),
event: EventType(event.__data),
};
}
}
......@@ -174,7 +169,7 @@ export class Workflow<ContextData, Start, Stop> {
if (!EventType) {
throw new TypeError(`Event type not found: ${event.constructor}`);
}
return new EventType(event.data);
return EventType(event.data);
},
);
......@@ -187,7 +182,7 @@ export class Workflow<ContextData, Start, Stop> {
verbose: state.verbose,
queue: reconstructedQueue,
pendingInputQueue: reconstructedPendingInputQueue,
resolved: state.resolved ? new StopEvent<Stop>(state.resolved) : null,
resolved: state.resolved ? stopEvent(state.resolved) : null,
rejected: state.rejected ? new Error(state.rejected) : null,
});
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment