Skip to content

Commit

Permalink
Merge branch 'main' into brace/run-ci-prs
Browse files Browse the repository at this point in the history
  • Loading branch information
bracesproul authored Jan 17, 2024
2 parents 92682c4 + 9e814f2 commit c14eb6b
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 29 deletions.
2 changes: 1 addition & 1 deletion langgraph/src/graph/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ export class Graph<

const nodes: Record<
string,
ChannelInvoke<RunInput, RunOutput> | ChannelBatch<RunInput, RunOutput>
ChannelInvoke<RunInput, RunOutput> | ChannelBatch
> = {};
for (const [key, node] of Object.entries(this.nodes)) {
nodes[key] = Channel.subscribeTo(`${key}:inbox`)
Expand Down
55 changes: 28 additions & 27 deletions langgraph/src/pregel/read.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,15 @@ interface ChannelInvokeArgs<RunInput, RunOutput>
when?: (args: any) => boolean;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type ChannelInvokeInputType = any;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type ChannelInvokeOutputType = any;

export class ChannelInvoke<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
RunInput = any,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
RunOutput = any
RunInput = ChannelInvokeInputType,
RunOutput = ChannelInvokeOutputType
> extends RunnableBinding<RunInput, RunOutput, RunnableConfig> {
lc_graph_name = "ChannelInvoke";

Expand Down Expand Up @@ -147,30 +151,33 @@ export class ChannelInvoke<
}
}

interface ChannelBatchArgs<RunInput, RunOutput> {
interface ChannelBatchArgs {
channel: string;
key?: string;
bound?: Runnable<RunInput, RunOutput, RunnableConfig>;
bound?: Runnable;
}

export class ChannelBatch<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
RunInput = any,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
RunOutput = any
> extends RunnableEach<RunInput, RunOutput, RunnableConfig> {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type ChannelBatchInputType = any;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type ChannelBatchOutputType = any;

export class ChannelBatch extends RunnableEach<
ChannelBatchInputType,
ChannelBatchOutputType,
RunnableConfig
> {
lc_graph_name = "ChannelBatch";

channel: string;

key?: string;

constructor(fields: ChannelBatchArgs<RunInput, RunOutput>) {
constructor(fields: ChannelBatchArgs) {
super({
...fields,
bound:
fields.bound ??
(defaultRunnableBound as unknown as Runnable<RunInput, RunOutput>),
bound: fields.bound ?? defaultRunnableBound,
});

this.channel = fields.channel;
Expand Down Expand Up @@ -199,28 +206,22 @@ export class ChannelBatch<
return new ChannelBatch({
channel: this.channel,
key: this.key,
bound: this.bound.pipe(
joiner as unknown as RunnableLike<
RunOutput,
Record<string, unknown> & { [x: string]: unknown }
>
),
bound: this.bound.pipe(joiner),
});
}
}

pipe<NewRunOutput>(
coerceable: RunnableLike<RunOutput, NewRunOutput>
): ChannelBatch<RunInput, NewRunOutput> {
// @ts-expect-error TODO: fix later
pipe(coerceable: RunnableLike): ChannelBatch {
if (this.bound === defaultRunnableBound) {
return new ChannelBatch<RunInput, RunOutput>({
return new ChannelBatch({
channel: this.channel,
key: this.key,
bound: _coerceToRunnable(coerceable),
});
} else {
// Delegate to `or` in `this.bound`
return new ChannelBatch<RunInput, RunOutput>({
return new ChannelBatch({
channel: this.channel,
key: this.key,
bound: this.bound.pipe(coerceable),
Expand Down
4 changes: 3 additions & 1 deletion langgraph/src/tests/pregel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ it("should invoke two processes and get correct output", async () => {
nodes: { one, two },
});

await expect(app.invoke(2, { recursionLimit: 1 })).rejects.toThrow(GraphRecursionError);
await expect(app.invoke(2, { recursionLimit: 1 })).rejects.toThrow(
GraphRecursionError
);

expect(await app.invoke(2)).toEqual(4);

Expand Down

0 comments on commit c14eb6b

Please sign in to comment.