Skip to content

Commit

Permalink
fix: setTimeout
Browse files Browse the repository at this point in the history
  • Loading branch information
himself65 committed Nov 1, 2024
1 parent e625dcc commit 0a7b9a6
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 11 deletions.
16 changes: 13 additions & 3 deletions packages/workflow/src/workflow-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,15 @@ export type ReadonlyStepMap<Data> = ReadonlyMap<

type GlobalEvent = typeof globalThis.Event;

export type Wait = () => Promise<void>;

export type ContextParams<Start, Stop, Data> = {
startEvent: StartEvent<Start>;
contextData: Data;
steps: ReadonlyStepMap<Data>;
timeout: number | null;
verbose: boolean;
wait: Wait;

queue: QueueProtocol[] | undefined;
pendingInputQueue: WorkflowEvent<unknown>[] | undefined;
Expand Down Expand Up @@ -101,6 +104,7 @@ export class WorkflowContext<Start = string, Stop = string, Data = unknown>
readonly #startEvent: StartEvent<Start>;
readonly #queue: QueueProtocol[] = [];
readonly #queueEventTarget = new EventTarget();
readonly #wait: Wait;

#timeout: number | null = null;
#verbose: boolean = false;
Expand Down Expand Up @@ -176,6 +180,7 @@ export class WorkflowContext<Start = string, Stop = string, Data = unknown>
}
this.#data = params.contextData;
this.#verbose = params.verbose ?? false;
this.#wait = params.wait;

// push start event to the queue
const [step] = this.#getStepFunction(this.#startEvent);
Expand Down Expand Up @@ -257,6 +262,10 @@ export class WorkflowContext<Start = string, Stop = string, Data = unknown>
return this;
}

get data(): Data {
return this.#data;
}

/**
* Stream events from the start event
*
Expand Down Expand Up @@ -335,7 +344,7 @@ export class WorkflowContext<Start = string, Stop = string, Data = unknown>
`Running step ${step.name} with inputs ${events}`,
);
}
const data = this.#data;
const data = this.data;
return (step as StepHandler<Data>)
.call(
null,
Expand Down Expand Up @@ -431,8 +440,8 @@ export class WorkflowContext<Start = string, Stop = string, Data = unknown>
}
break;
}
// fixme: use a better way to wait for next tick
await new Promise<void>((resolve) => setTimeout(resolve, 0));

await this.#wait();
}
controller.close();
},
Expand All @@ -445,6 +454,7 @@ export class WorkflowContext<Start = string, Stop = string, Data = unknown>
): WorkflowContext<Start, Stop, Initial> {
return new WorkflowContext({
startEvent: this.#startEvent,
wait: this.#wait,
contextData: data,
steps: this.#steps,
timeout: this.#timeout,
Expand Down
20 changes: 16 additions & 4 deletions packages/workflow/src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
WorkflowContext,
type HandlerContext,
type StepHandler,
type Wait,
} from "./workflow-context.js";
import {
StartEvent,
Expand All @@ -28,15 +29,25 @@ export class Workflow<ContextData, Start, Stop> {
> = new Map();
#verbose: boolean = false;
#timeout: number | null = null;
// fixme: allow microtask
#nextTick: Wait = () => new Promise((resolve) => setTimeout(resolve, 0));

constructor(
params: {
verbose?: boolean;
timeout?: number | null;
wait?: Wait;
} = {},
) {
this.#verbose = params.verbose ?? false;
this.#timeout = params.timeout ?? null;
if (params.verbose) {
this.#verbose = params.verbose;
}
if (params.timeout) {
this.#timeout = params.timeout;
}
if (params.wait) {
this.#nextTick = params.wait;
}
}

addStep<
Expand Down Expand Up @@ -73,9 +84,9 @@ export class Workflow<ContextData, Start, Stop> {

run(
event: StartEvent<Start> | Start,
): never extends ContextData
): unknown extends ContextData
? WorkflowContext<Start, Stop, ContextData>
: never;
: WorkflowContext<Start, Stop, ContextData | undefined>;
run<Data extends ContextData>(
event: StartEvent<Start> | Start,
data: Data,
Expand All @@ -89,6 +100,7 @@ export class Workflow<ContextData, Start, Stop> {

return new WorkflowContext<Start, Stop, Data>({
startEvent,
wait: this.#nextTick,
contextData: data!,
steps: new Map(this.#steps),
timeout: this.#timeout,
Expand Down
12 changes: 12 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion unit/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
},
"devDependencies": {
"@faker-js/faker": "^9.0.1",
"@types/react": "^18.3.12",
"@types/react-dom": "^18.3.1",
"msw": "^2.6.0",
"vitest": "^2.0.5"
},
Expand All @@ -17,6 +19,8 @@
"@llamaindex/openai": "workspace:*",
"@llamaindex/readers": "workspace:*",
"@llamaindex/workflow": "workspace:*",
"llamaindex": "workspace:*"
"llamaindex": "workspace:*",
"react": "^18.3.1",
"react-dom": "^18.3.1"
}
}
5 changes: 3 additions & 2 deletions unit/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
"outDir": "./lib",
"module": "node16",
"moduleResolution": "node16",
"target": "ESNext"
"target": "ESNext",
"jsx": "react-jsx"
},
"include": ["./**/*.ts"],
"include": ["./**/*.ts", "./**/*.tsx"],
"references": [
{
"path": "../packages/core/tsconfig.json"
Expand Down
103 changes: 103 additions & 0 deletions unit/workflow/workflow-ui.test.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import {
StartEvent,
StopEvent,
Workflow,
WorkflowEvent,
} from "@llamaindex/workflow";
import type { ReactNode } from "react";
import { describe, expect, test } from "vitest";

describe("workflow integration", () => {
type Context = {
pending: string[];
};
type Start = string;
type Stop = ReactNode;

test("nodejs", async () => {
const workflow = new Workflow<never, Start, Stop>({
wait: async () => await new Promise((resolve) => setTimeout(resolve, 0)),
});
workflow.addStep(
{
inputs: [StartEvent],
outputs: [StopEvent],
},
async (_, __) => {
await new Promise((resolve) => setTimeout(resolve, 100));
return new StopEvent("hello");
},
);

console.log("start");

const run = workflow.run("start");
await run.then((stop) => {
expect(stop.data).toBe("hello");
});
});

test("with jsx", async () => {
const workflow = new Workflow<never, Start, Stop>();
workflow.addStep(
{
inputs: [StartEvent],
outputs: [StopEvent],
},
async (_, __) => {
return new StopEvent(<div>Hey there!</div>);
},
);

const run = workflow.run("start");
const stop = await run;
expect(stop.data).toEqual(<div>Hey there!</div>);
});

test("with message channel", async () => {
const workflow = new Workflow<Context, Start, Stop>();

class AnalysisStartEvent extends WorkflowEvent<string> {}

class AnalysisStopEvent extends WorkflowEvent<string> {}

workflow.addStep(
{
inputs: [StartEvent],
outputs: [StopEvent],
},
async ({ data, sendEvent, requireEvent }) => {
data.pending.push("analyzing");
sendEvent(new AnalysisStartEvent("analysis my document"));
const event = await requireEvent(AnalysisStopEvent);
await new Promise((resolve) => setTimeout(resolve, 100));
data.pending.push("analysis complete");
return new StopEvent(event.data);
},
);

workflow.addStep(
{
inputs: [AnalysisStartEvent],
outputs: [AnalysisStopEvent],
},
async ({ data }) => {
data.pending.push("loading document");
await new Promise((resolve) => setTimeout(resolve, 100));
data.pending.push("document loaded");
return new AnalysisStopEvent("analysis complete");
},
);

const run = workflow.run("start").with({
pending: [],
});
await run;
expect(run.data.pending).toEqual([
"analyzing",
"loading document",
"document loaded",
"analysis complete",
]);
});
});
2 changes: 1 addition & 1 deletion unit/workflow/workflow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ describe("workflow basic", () => {

expect(step1).toHaveBeenCalledTimes(1);
expect(step2).toHaveBeenCalledTimes(5);
expect(duration).toBeGreaterThan(500); // At least 5 * 100ms for step2
expect(duration).toBeGreaterThanOrEqual(500); // At least 5 * 100ms for step2
expect(duration).toBeLessThan(1000); // Less than 1 second
expect(result.data).toBe("Step 2 completed 5 times");
});
Expand Down

0 comments on commit 0a7b9a6

Please sign in to comment.