Skip to content

Commit

Permalink
feat: 更好的低代码支持
Browse files Browse the repository at this point in the history
  • Loading branch information
MarleneJiang committed Sep 2, 2023
1 parent 7e67126 commit 664163f
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 13 deletions.
36 changes: 24 additions & 12 deletions package/__tests__/pipe.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
DynamicExecutor,
} from "@idealeap/pipeline"; // 请替换成你的模块导入方式

test("Pipe", async () => {
test("Pipeline", async () => {
const pipe1 = new Pipe<number, number>(
(input) => {
return input + 1;
Expand All @@ -25,22 +25,19 @@ test("Pipe", async () => {
},
);

const pipeline = new Pipeline([pipe1], {
const pipeline = new Pipeline([pipe1, pipe2], {
onProgress: (completed, total) => {
console.log(`Progress: ${completed}/${total}`);
},
});

// 动态添加管道
pipeline.addPipe(pipe2);

// 执行管道
await pipeline.execute(1).then((results) => {
console.log("Final results:", results);
});
});

test("并行执行——测测你的", async () => {
test("Pipeline with 并行&串行&动态代码", async () => {
const pipe1 = new Pipe(
(input: string) => {
return input + "——————(被我测了";
Expand Down Expand Up @@ -94,8 +91,6 @@ test("并行执行——测测你的", async () => {
});

test("Pipeline with 链式调用", async () => {
// 示例代码
// 示例
const pipeline = Pipeline.create()
.addPipe(
Pipe.create((input: number) => input + 1, {
Expand Down Expand Up @@ -138,7 +133,7 @@ test("Pipeline with JSON", async () => {
await pipeline.execute("我饿").then(console.log);
});

test("pipeRegistry", async () => {
test("Pipeline with pipeRegistry", async () => {
const pipeRegistry = PipeRegistry.init();
// 注册预定义的 Pipe 类型
pipeRegistry.register("FetchData", async () => {
Expand All @@ -154,6 +149,12 @@ test("pipeRegistry", async () => {
return "transformed data";
});

pipeRegistry.register("postProcess", (input: string) => {
// 这里只是简单地返回一个字符串,实际情况可能涉及到更复杂的数据转换
// console.log(input, context);
return input + "\nBy the way, I'm postProcess";
});

const pipelineJson = {
pipes: [
{
Expand All @@ -163,16 +164,27 @@ test("pipeRegistry", async () => {
{
id: "TransformData",
type: "TransformData",
postProcessType: "postProcess",
},
],
};

const pipeline = Pipeline.fromJSON(pipelineJson, {}, pipeRegistry);
await pipeline.execute(undefined).then((result) => {
console.log("Final result:", result);
});

// 序列化为 JSON
const jsonConfig = JSON.stringify(pipeline.toJSON());
console.log("Serialized config:", jsonConfig);

// 导入 JSON
const newPipeline = Pipeline.fromJSON(
JSON.parse(jsonConfig),
{},
pipeRegistry,
);

// 执行
await newPipeline.execute().then(console.log);
expect(jsonConfig).toEqual(
`{"pipes":[{"id":"FetchData","type":"FetchData"},{"id":"TransformData","type":"TransformData","postProcessType":"postProcess"}]}`,
);
});
32 changes: 31 additions & 1 deletion package/pipe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,13 @@ export interface PipeOptions<T, R> extends BatchOptions<T, R> {
retries?: number;
timeout?: number;
preProcess?: (input: T, context: PipelineContext) => MaybePromise<T>;
preProcessType?: string;
postProcess?: (result: R, context: PipelineContext) => MaybePromise<R>;
postProcessType?: string;
errProcess?: (error: any, context: PipelineContext) => MaybePromise<boolean>;
errProcessType?: string;
destroyProcess?: () => void;
destroyProcessType?: string;
batch?: boolean;
type?: string;
params?: Record<string, any>;
Expand Down Expand Up @@ -176,6 +180,26 @@ export class Pipe<T, R> {
callback: (input: T, context: PipelineContext) => MaybePromise<R>,
predefinedTypes?: PipeRegistryType,
): Pipe<T, R> {
if (json.preProcessType) {
(json as PipeOptions<T, R>).preProcess = predefinedTypes?.get(
json.preProcessType,
);
}
if (json.postProcessType) {
(json as PipeOptions<T, R>).postProcess = predefinedTypes?.get(
json.postProcessType,
);
}
if (json.errProcessType) {
(json as PipeOptions<T, R>).errProcess = predefinedTypes?.get(
json.errProcessType,
);
}
if (json.destroyProcessType) {
(json as PipeOptions<T, R>).destroyProcess = predefinedTypes?.get(
json.destroyProcessType,
) as () => void;
}
if (json.type && predefinedTypes) {
const predefinedCallback = predefinedTypes.get(json.type);
if (predefinedCallback) {
Expand Down Expand Up @@ -265,7 +289,7 @@ export class Pipeline {
return this;
}

async execute(input: any): Promise<Map<string, any> | Map<string, any>[]> {
async execute(input?: any): Promise<Map<string, any> | Map<string, any>[]> {
this.verifyDependencies(); // 在执行前验证依赖关系
const emitter = this.options.emitter || new EventEmitter();
const abortController = new AbortController();
Expand Down Expand Up @@ -356,6 +380,12 @@ export class Pipeline {
retries: pipe.options.retries,
timeout: pipe.options.timeout,
batch: pipe.options.batch,
type: pipe.options.type,
params: pipe.options.params,
preProcessType: pipe.options.preProcessType,
postProcessType: pipe.options.postProcessType,
errProcessType: pipe.options.errProcessType,
destroyProcessType: pipe.options.destroyProcessType,
})),
};
}
Expand Down

0 comments on commit 664163f

Please sign in to comment.