Skip to content

Latest commit

 

History

History
163 lines (121 loc) · 3.67 KB

README.md

File metadata and controls

163 lines (121 loc) · 3.67 KB

🔩 Go Flow

Go Flow is a lightweight workflow framework based on state machines, designed to simplify the creation and management of workflows in Go.

✨ Getting Started

go get github.com/basenana/go-flow

Sample Task

You can create a workflow directly from functions:

builder := flow.NewFlowBuilder("sample-flow-01")

builder.Task(flow.NewFuncTask("task-1", func(ctx context.Context) error {
    fmt.Println("do something in task 1")
    return nil
}))
builder.Task(flow.NewFuncTask("task-2", func(ctx context.Context) error {
    fmt.Println("do something in task 2")
    return nil
}))
builder.Task(flow.NewFuncTask("task-3", func(ctx context.Context) error {
    fmt.Println("do something in task 3")
    return nil
}))

sampleFlow := builder.Finish()

runner := flow.NewRunner(sampleFlow)
_ = runner.Start(context.TODO())

Observer

You can customize the observer to track the status of flows and tasks, enabling operations such as persistence and logging.

type storageObserver struct {
	flow map[string]*flow.Flow
	task map[string]flow.Task

	sync.Mutex
}

func (s *storageObserver) Handle(event flow.UpdateEvent) {
	s.Lock()
	defer s.Unlock()

	fmt.Printf("update flow %s", event.Flow.ID)
	s.flow[event.Flow.ID] = event.Flow

	if event.Task != nil {
		fmt.Printf("update flow %s task %s", event.Flow.ID, event.Task.GetName())
		s.task[event.Task.GetName()] = event.Task
	}
}

var _ flow.Observer = &storageObserver{}

Register the observer in the builder:

builder := flow.NewFlowBuilder("sample-flow-01").Observer(&storageObserver{
		flow: make(map[string]*flow.Flow),
		task: make(map[string]flow.Task),
	})

DAG

For more complex tasks, workflows based on DAGs are also supported:

builder := flow.NewFlowBuilder("dag-flow-01").Coordinator(flow.NewDAGCoordinator())

task1 := flow.NewFuncTask("task-1", func(ctx context.Context) error {
    fmt.Println("do something in task 1")
    return nil
})
builder.Task(flow.WithDirector(task1, flow.NextTask{
    OnSucceed: "task-3",
    OnFailed:  "task-fail",
}))

builder.Task(flow.NewFuncTask("task-2", func(ctx context.Context) error {
    fmt.Println("do something in task 2")
    return nil
}))
builder.Task(flow.NewFuncTask("task-3", func(ctx context.Context) error {
    fmt.Println("do something in task 3")
    return nil
}))

builder.Task(flow.NewFuncTask("task-fail", func(ctx context.Context) error {
    fmt.Println("ops")
    return nil
}))

dagFlow := builder.Finish()

runner := flow.NewRunner(dagFlow)
_ = runner.Start(context.TODO())

🧩 Extensible

My Task

You can also define your own Task types and corresponding Executors:

type MyTask struct {
	flow.BasicTask
	parameters map[string]string
}

func NewMyTask(name string, parameters map[string]string) flow.Task {
	return &MyTask{BasicTask: flow.BasicTask{Name: name}, parameters: parameters}
}

type MyExecutor struct{}

var _ flow.Executor = &MyExecutor{}

func (m *MyExecutor) Setup(ctx context.Context) error {
	return nil
}

func (m *MyExecutor) Exec(ctx context.Context, flow *flow.Flow, task flow.Task) error {
	myTask, ok := task.(*MyTask)
	if !ok {
		return fmt.Errorf("not my task")
	}

	fmt.Printf("exec my task: %s\n", myTask.Name)
	return nil
}

func (m *MyExecutor) Teardown(ctx context.Context) error {
	return nil
}

Load the custom Tasks into the Flow:

builder := flow.NewFlowBuilder("my-flow-01").Executor(&MyExecutor{})

builder.Task(NewMyTask("my-task-1", make(map[string]string)))
builder.Task(NewMyTask("my-task-2", make(map[string]string)))
builder.Task(NewMyTask("my-task-3", make(map[string]string)))

myFlow := builder.Finish()

runner := flow.NewRunner(myFlow)
_ = runner.Start(context.TODO())