-
Notifications
You must be signed in to change notification settings - Fork 8.8k
parallel task design
Seata Saga 状态机执行目前只支持串行化,因此本次 PR 是主要是通过 Fork 和 Join 两种状态来实现状态机的并行能力。
其中,Fork代表并行分支的开始,Join代表并行分支的结束。
Fork 状态的可配置属性如下:
"Fork": {
"Type": "Fork",
"Branches": [
"SecondState", // First state name in one branch
"ThirdState"
],
"Parallel": 0, // limit on parallelism,0 stands for no limit
"Timeout": 60000 // waiting time in milliseconds
},
支持的功能:
- 并行分支嵌套,即允许并行分支内包含子分支
- 并行分支内支持任意状态组合
- 并行度(同时运行的并行分支数量)控制
- 正确的补偿、重试机制
行为约束:
- 不同并行分支内不能包含相同的状态,比如图中 S1, S2, S3 必须互不相同
- Fork 状态和 Join 状态不能对应,确保相同层次下所有的并行分支都始于一个 Fork 节点并终于一个 Join 节点
设计器样式:
代码实现
ForkStateParser
:
- 对 Fork 的可配置属性进行解析。
- 还需要根据状态机的连接关系来生成
ForkState.pairedJoinState
和ForkState.allBranchStates
属性:分别代表 Fork 状态匹配的 Join 状态,Fork 状态所有分支的子状态。这部分主要实现于StateMachineUtils
中的静态方法。
ForkStateHandler#process
实现:
- 对当前的流程上下文
ProcessContext
进行拷贝实现线程封闭,拷贝后的ProcessContext
使用SideEffectFreeProcessContext
作为实现,这个实现的特点在于是:对其进行修改不会影响到父级的上下文 - 在异步事件总线中发布分支(如果处于向前重试状态,则将异步分支进行向前操作)
- 并发控制方式:
- Fork 发布并行分支后直接结束当前分支
- 在该 Fork 的一个并行分支到达 Join 后判断所有并行分支是否已经全部到达,是则转向 Join 下一个状态,否则结束分支
ForkStateRouter#route
实现:
- 直接 route 到
null
,结束执行
JoinStateHandler#process
实现:
- 获取当前分支下的
ParallelContextHolder
,调用ParallelContextHolder#complete
方法代表该分支已完成。
JoinStateRouter#route
实现:
- 获取当前分支下的
ParallelContextHolder
,调用ParallelContextHolder#next
方法来获取下一步要 route 到的状态。ParallelContextHolder#next
方法实现的逻辑是:- 如果并行分支还有没发布的,则 route 到没发布的并行分支第一个状态
- 如果所有并行分支都已经完成且没有 route 到 Join 的下一个状态,则 route 到 Join 的下一个状态
向前重试
由于原先每个状态机都是串行执行,一个时间点之会有一个状态执行。而引入并行执行情况则发生了改变,存在状态机正在执行多个状态时收到重试的指令。
首先对 ProcessCtrlStateMachineEngine#forwardInternal
进行适配。原先的逻辑是通过 EngineUtils#findOutLastForwardStateInstance
找到最后一个执行(且未被补偿)的状态并对其进行重试;现在则更改为通过新增方法 EngineUtils#findOutLastSerailState
找到最后一个串行执行的状态进行重试。这里的区别在于,最后一个执行的状态如果是处于并行分支内,则最后一个串行执行的状态将会是其最外层的 Fork 状态。
其次,需要对 ForkStateHandler
做一定的处理。当状态机引擎处理到 Fork 状态时需要判断该状态机是否处于重试中,如果是则会调用 ParallelTaskUtils.forwardBranch
方法,将每个并行分支都向前到最后一个执行(且未被补偿)的状态上。
结合上述两点变更,实现了向前重试只会对那些未被执行或执行失败的状态进行重试。
向后补偿 无需特意适配,找到已完成的状态列表进行反向进行补偿即可。