From fadc8b8ea0b5c1170d1e4f1e2838761d7c8b094a Mon Sep 17 00:00:00 2001 From: Alex Yang <himself65@outlook.com> Date: Wed, 13 Nov 2024 01:15:50 -0800 Subject: [PATCH] feat: recoverable data with error handling (#1476) --- .changeset/cool-cows-wonder.md | 5 ++ packages/workflow/src/workflow-context.ts | 3 + unit/workflow/workflow.test.ts | 80 +++++++++++++++++++++++ 3 files changed, 88 insertions(+) create mode 100644 .changeset/cool-cows-wonder.md diff --git a/.changeset/cool-cows-wonder.md b/.changeset/cool-cows-wonder.md new file mode 100644 index 000000000..5bfd72fda --- /dev/null +++ b/.changeset/cool-cows-wonder.md @@ -0,0 +1,5 @@ +--- +"@llamaindex/workflow": patch +--- + +feat: recoverable context with error handling diff --git a/packages/workflow/src/workflow-context.ts b/packages/workflow/src/workflow-context.ts index bd8bdf2ad..624916f8b 100644 --- a/packages/workflow/src/workflow-context.ts +++ b/packages/workflow/src/workflow-context.ts @@ -453,6 +453,9 @@ export class WorkflowContext<Start = string, Stop = string, Data = unknown> }), ) .catch((err) => { + // when the step raise an error, should go back to the previous step + this.#sendEvent(event); + isPendingEvents.add(event); controller.error(err); }); } diff --git a/unit/workflow/workflow.test.ts b/unit/workflow/workflow.test.ts index 4cc1d0787..4d2104ef4 100644 --- a/unit/workflow/workflow.test.ts +++ b/unit/workflow/workflow.test.ts @@ -884,3 +884,83 @@ describe("snapshot", async () => { expect(fn).toHaveBeenCalledTimes(1); }); }); + +describe("error", () => { + test("error in handler", async () => { + const myFlow = new Workflow<boolean, string, string>({ verbose: true }); + myFlow.addStep( + { + inputs: [StartEvent<string>], + outputs: [StopEvent<string>], + }, + async ({ data }) => { + if (!data) { + throw new Error("Something went wrong"); + } else { + return new StopEvent(`Hello ${data}!`); + } + }, + ); + await expect(myFlow.run("world")).rejects.toThrow("Something went wrong"); + { + const context = myFlow.run("world"); + try { + for await (const _ of context) { + // do nothing + } + } catch (error) { + expect(error).toBeInstanceOf(Error); + expect((error as Error).message).toBe("Something went wrong"); + const snapshot = context.snapshot(); + const newContext = myFlow.recover(snapshot).with(true); + expect((await newContext).data).toBe("Hello true!"); + } + } + }); + + test("recover in the middle of workflow", async () => { + const myFlow = new Workflow<string | undefined, string, string>({ + verbose: true, + }); + + class AEvent extends WorkflowEvent<string> {} + + myFlow.addStep( + { + inputs: [StartEvent<string>], + outputs: [AEvent], + }, + async ({ data }) => { + if (data !== undefined) { + throw new Error("Something went wrong"); + } + return new AEvent("world"); + }, + ); + myFlow.addStep( + { + inputs: [AEvent], + outputs: [StopEvent], + }, + async ({ data }, ev) => { + if (data === undefined) { + throw new Error("Something went wrong"); + } + return new StopEvent(`Hello, ${data}!`); + }, + ); + // no context, so will throw error + const context = myFlow.run("world"); + try { + for await (const _ of context) { + // do nothing + } + } catch (error) { + expect(error).toBeInstanceOf(Error); + expect((error as Error).message).toBe("Something went wrong"); + const snapshot = context.snapshot(); + const newContext = myFlow.recover(snapshot).with("Recovered Data"); + expect((await newContext).data).toBe("Hello, Recovered Data!"); + } + }); +}); -- GitLab