ParallelStart
分类: 通用步骤
命名空间: CMS.Plugin.FlowManagement.Abstractions.FlowBusiness.Activitys
基类: ParallelStartActivity (SYC.Flow.Kernel)
模块: FlowManagement.Abstractions
概述
ParallelStart(并行流程开始)是用于启动并行流程的特殊节点。当流程执行到此节点时,会创建一个并行分支点,允许多个流程分支同时执行。它继承自 SYC.Flow.Kernel.ParallelStartActivity,提供了并行流程的初始化和级别代码管理功能。
并行流程适用于需要同时执行多个独立任务的场景,如同时进行多个检测、并行处理多个工件等。ParallelStart 节点会自动管理并行分支的级别代码,确保每个分支都有唯一的标识。
业务场景
适用场景
- 并行检测: 同时对产品进行多项独立的质量检测
- 并行处理: 同时处理多个工件或批次
- 并行调用: 同时调用多个外部 API 或服务
- 并行打印: 同时打印多个标签或报告
- 分支流程: 根据条件创建多个并行执行的流程分支
在系统中的作用
ParallelStart 在 LMES 流程系统中扮演着并行流程控制器的角色:
- 标记并行流程的开始点
- 为每个并行分支分配唯一的级别代码
- 创建并行流程的工作项记录
- 管理并行流程的上下文信息
- 与 ParallelEnd 配合实现完整的并行流程控制
与其他节点的协作
- ParallelEnd: 必须配对使用,ParallelEnd 负责汇聚并行分支
- BusinessActivity: 并行分支中的业务节点,正常执行业务逻辑
- Transition: 从 ParallelStart 引出多条转换线,形成并行分支
配置说明
基本配置
| 属性名 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| ExtendedProperty | BusinessPropertyCollection | 否 | 空集合 | 步骤扩展属性 |
配置项详解
ExtendedProperty
说明: 扩展属性集合,允许在节点上添加自定义属性,这些属性可以写入流程上下文,供并行分支使用。
取值范围: BusinessPropertyCollection 对象
注意事项:
- 扩展属性会在并行流程开始时写入流程上下文
- 所有并行分支都可以访问这些扩展属性
- 扩展属性的值会自动添加 "_Value" 后缀作为流程上下文键
流程上下文
输入参数
| 参数名 | 类型 | 说明 |
|---|---|---|
| ApplicationData | object | 应用程序数据(如 ProcessModel) |
| 上游节点数据 | any | 由上游节点写入的业务数据 |
输出参数
| 参数名 | 类型 | 说明 |
|---|---|---|
| ParalleInfo | IDictionary | 并行流程信息(默认为 系统) |
| LevelCode | string | 当前并行分支的级别代码 |
| 扩展属性值 | any | ExtendedProperty 中配置的属性值 |
数据流转说明
- 级别代码管理: ParallelStart 会为每个并行分支生成唯一的级别代码(LevelCode)
- 上下文继承: 并行分支会继承 ParallelStart 之前的流程上下文数据
- 数据隔离: 每个并行分支有独立的工作项(FlowItem),但共享流程上下文
- 键信息传递: KeyInfo 和 KeyLabel 会从当前任务传递到并行分支
业务逻辑说明
处理流程
ParallelStart 的执行流程如下:
-
EnterAsync: 进入并行流程开始节点
- 获取当前任务信息(CurrentTask)
- 创建新的工作项(FlowItem)
- 设置工作项属性:
- StartTime 和 FinishTime:当前时间
- PartName:固定为 "系统"
- TaskStat:FlowItemStatus.AutoFinished(自动完成)
- AutoFinish:1(自动完成标志)
- EnableCalculate:1(参与计算)
- LevelCode:当前级 别代码
- KeyInfo 和 KeyLabel:从当前任务继承
- TaskName:节点名称
- 将工作项添加到流程实例
- 更新当前工作项 ID
- 记录进入日志
-
并行分支创建:
- 流程引擎根据 ParallelStart 的出口转换创建多个并行分支
- 每个分支获得唯一的级别代码
-
ExitAsync: 退出并行流程开始节点
- 计算执行耗时
- 记录退出日志
流程图
[上游节点]
↓
[ParallelStart]
├─→ [并行分支1] → [业务节点1] ┐
├─→ [并行分支2] → [业务节点2] ├→ [ParallelEnd]
└─→ [并行分支3] → [业务节点3] ┘
级别代码机制
级别代码(LevelCode)是并行流程的核心机制:
- 格式: 类似 "00-1:01-1:02-1:" 的层级结构
- 作用: 唯一标识每个并行分支和子流程
- 生成: 由 ParallelStart 根据当前任务的级别代码生成
- 用途: ParallelEnd 使用级别代码来识别和汇聚并行分支
依赖服务
| 服务接口 | 用途 | 说明 |
|---|---|---|
| Flow | 流程实例 | 访问流程上下文和工作项 |
| Flow.Logger | 日志记录器 | 记录并行流程的执行日志 |
异常处理
ParallelStart 节点本身不会抛出业务异常,但需要注意:
- 如果流程实例为空,可能导致空引用异常
- 级别代码生成失败可能影响并行流程的正确执行
- 工作项添加失败 会影响流程追踪
日志记录
ParallelStart 记录以下关键日志:
- 进入节点:
Enter,TaskID={taskId} - 退出节点:
Exit,耗时={milliseconds}毫秒
使用示例
基本示例
{
"Name": "并行检测流程",
"Activities": [
{
"Type": "BusinessActivity",
"Name": "准备数据",
"Alias": "PrepareData"
},
{
"Type": "ParallelStart",
"Name": "开始并行检测",
"Alias": "ParallelStart1"
},
{
"Type": "BusinessActivity",
"Name": "外观检测",
"Alias": "AppearanceCheck"
},
{
"Type": "BusinessActivity",
"Name": "尺寸检测",
"Alias": "DimensionCheck"
},
{
"Type": "BusinessActivity",
"Name": "功能检测",
"Alias": "FunctionCheck"
},
{
"Type": "ParallelEnd",
"Name": "结束并行检测",
"Alias": "ParallelEnd1",
"WaitMode": "WaitAll"
},
{
"Type": "BusinessActivity",
"Name": "汇总结果",
"Alias": "SummaryResult"
}
],
"Transitions": [
{
"From": "PrepareData",
"To": "ParallelStart1"
},
{
"From": "ParallelStart1",
"To": "AppearanceCheck"
},
{
"From": "ParallelStart1",
"To": "DimensionCheck"
},
{
"From": "ParallelStart1",
"To": "FunctionCheck"
},
{
"From": "AppearanceCheck",
"To": "ParallelEnd1"
},
{
"From": "DimensionCheck",
"To": "ParallelEnd1"
},
{
"From": "FunctionCheck",
"To": "ParallelEnd1"
},
{
"From": "ParallelEnd1",
"To": "SummaryResult"
}
]
}
高级示例:带扩展属性
{
"Type": "ParallelStart",
"Name": "开始并行打印",
"Alias": "ParallelPrintStart",
"ExtendedProperty": [
{
"Name": "PrintCount",
"Value": "3",
"WriteIntoDataItems": true
},
{
"Name": "PrinterGroup",
"Value": "LabelPrinters",
"WriteIntoDataItems": true
}
]
}
完整流程示例:并行 API 调用
{
"Name": "并行API调用流程",
"Activities": [
{
"Type": "BusinessActivity",
"Name": "获取订单信息",
"Alias": "GetOrder"
},
{
"Type": "ParallelStart",
"Name": "开始并行调用",
"Alias": "ParallelAPIStart"
},
{
"Type": "ApiCallActivity",
"Name": "查询库存",
"Alias": "CheckInventory",
"ApiId": "InventoryAPI"
},
{
"Type": "ApiCallActivity",
"Name": "查询价格",
"Alias": "CheckPrice",
"ApiId": "PriceAPI"
},
{
"Type": "ApiCallActivity",
"Name": "查询物流",
"Alias": "CheckLogistics",
"ApiId": "LogisticsAPI"
},
{
"Type": "ParallelEnd",
"Name": "结束并行调用",
"Alias": "ParallelAPIEnd",
"WaitMode": "WaitAll"
},
{
"Type": "BusinessActivity",
"Name": "处理结果",
"Alias": "ProcessResults"
}
],
"Transitions": [
{
"From": "GetOrder",
"To": "ParallelAPIStart"
},
{
"From": "ParallelAPIStart",
"To": "CheckInventory"
},
{
"From": "ParallelAPIStart",
"To": "CheckPrice"
},
{
"From": "ParallelAPIStart",
"To": "CheckLogistics"
},
{
"From": "CheckInventory",
"To": "ParallelAPIEnd"
},
{
"From": "CheckPrice",
"To": "ParallelAPIEnd"
},
{
"From": "CheckLogistics",
"To": "ParallelAPIEnd"
},
{
"From": "ParallelAPIEnd",
"To": "ProcessResults"
}
]
}
扩展开发指南
继承层次
Activity (SYC.Flow.Kernel)
└── ParallelStartActivity (SYC.Flow.Kernel)
└── ParallelStart
可重写方法
| 方法名 | 用途 | 何时重写 |
|---|---|---|
| EnterAsync | 进入节点时执行 | 需要自定义并行流程初始化逻辑 |
| ExitAsync | 退出节点时执行 | 需要自定义并行流程启动后的处理 |
| ParalleInfo | 并行流程信息 | 需要自定义并行分支的标识信息 |
| LevelCode | 级别代码 | 需要自定义级别代码生成逻辑 |
自定义并行开始节点示例
[Serializable]
[Design("自定义并行开始", "带自定义逻辑的并行开始节点", Sort = 1)]
[Category("自定义")]
public class CustomParallelStart : ParallelStart
{
[Design("并行分支数", "要创建的并行分支数量", Sort = 1)]
[Category("配置")]
[DataMember]
public int BranchCount { get; set; } = 3;
public override IDictionary ParalleInfo
{
get
{
var info = new SortedList();
for (int i = 0; i < BranchCount; i++)
{
info.Add($"分支{i + 1}", $"Branch{i + 1}");
}
return info;
}
}
public override async Task EnterAsync(ProcessflowEventArgs e)
{
// 自定义初始化逻辑
Flow.Logger.LogMessage($"创建 {BranchCount} 个并行分支", Name);
// 为每个分支准备数据
for (int i = 0; i < BranchCount; i++)
{
Flow.DataItems[$"Branch{i}_Data"] = $"分支{i + 1}的数据";
}
await base.EnterAsync(e);
}
}