跳到主要内容
版本:Next

ParallelEnd

分类: 通用步骤
命名空间: CMS.Plugin.FlowManagement.Abstractions.FlowBusiness.Activitys
基类: ParallelEndActivity (SYC.Flow.Kernel)
模块: FlowManagement.Abstractions

概述

ParallelEnd(并行流程结束)是用于结束并行流程的特殊节点。当运行到并行流程结束步骤时,通过其内部的逻辑运算规则,汇聚所有并行分支并结束整个并行流程。它继承自 SYC.Flow.Kernel.ParallelEndActivity,提供了灵活的等待模式和自动完成未完成分支的功能。

ParallelEnd 必须与 ParallelStart 配对使用,它负责等待并行分支的完成,并根据配置的等待模式决定何时继续执行后续流程。

业务场景

适用场景

  • 汇聚并行检测结果: 等待所有检测分支完成后汇总结果
  • 同步并行处理: 确保所有并行任务都完成后再继续
  • 快速响应: 只要有一个分支完成就继续执行(WaitOne 模式)
  • 超时控制: 配合超时机制,避免无限等待
  • 分支结果整合: 收集所有并行分支的输出数据

在系统中的作用

ParallelEnd 在 LMES 流程系统中扮演着并行流程同步器的角色:

  • 汇聚所有并行分支
  • 根据等待模式决定继续执行的时机
  • 自动完成未完成的并行分支
  • 管理并行流程的级别代码
  • 提供并行流程的完成信号

与其他节点的协作

  • ParallelStart: 必须配对使用,ParallelStart 负责启动并行分支
  • BusinessActivity: 并行分支中的业务节点,执行完成后流向 ParallelEnd
  • 后续节点: ParallelEnd 完成后,流程继续执行后续节点

配置说明

基本配置

属性名类型必填默认值说明
WaitModeWaitModeWaitAll等待模式(WaitOne/WaitAll)
ExtendedPropertyBusinessPropertyCollection空集合步骤扩展属性

配置项详解

WaitMode

说明: 等待模式,决定 ParallelEnd 何时继续执行后续流程。

取值范围:

  • WaitOne: 只要有一个并行分支完成就继续执行
  • WaitAll: 等待所有并行分支都完成才继续执行

注意事项:

  • WaitOne 模式: 适用于快速响应场景,第一个完成的分支触发继续执行,其他分支会被自动完成
  • WaitAll 模式: 适用于需要所有结果的场景,必须等待所有分支都完成
  • 选择合适的等待模式对流程性能和正确性至关重要

ExtendedProperty

说明: 扩展属性集合,允许在节点上添加自定义属性,这些属性可以写入流程上下文。

取值范围: BusinessPropertyCollection 对象

注意事项:

  • 扩展属性会在并行流程结束时写入流程上下文
  • 可以用于记录并行流程的汇总信息

流程上下文

输入参数

参数名类型说明
并行分支输出any各个并行分支写入的数据
LevelCodestring并行分支的级别代码
LevelKeystring级别键信息
LevelLabelstring级别标签信息

输出参数

参数名类型说明
扩展属性值anyExtendedProperty 中配置的属性值
汇总数据any在 ProcessAsync 中写入的汇总数据

数据流转说明

  1. 数据汇聚: ParallelEnd 可以访问所有并行分支写入的流程上下文数据
  2. 级别代码: 使用级别代码识别和管理并行分支
  3. 自动完成: 未完成的分支会被自动标记为完成状态
  4. 数据整合: 可以在 ParallelEnd 之后的节点中整合所有分支的结果

业务逻辑说明

处理流程

ParallelEnd 的执行流程如下:

  1. CanEnterAsync: 判断是否可以进入节点

    • 获取并行分支的数量(从 ParallelStart 的出口转换数量)
    • 根据等待模式判断:
      • WaitOne 模式:
        • 第一个到达的分支可以进入
        • 后续到达的分支不能进入
        • 当所有分支都到达时,重置计数器
      • WaitAll 模式:
        • 只有当所有分支都到达时才能进入
        • 使用原子操作计数到达的分支数
        • 最后一个到达的分支触发进入
    • 使用信号量(SemaphoreSlim)进行线程同步
    • 记录等待模式和分支数量日志
  2. EnterAsync: 进入并行流程结束节点

    • 等待信号量释放(由 CanEnterAsync 控制)
    • 响应流程取消令牌
    • 创建新的工作项(FlowItem)
    • 设置工作项属性:
      • StartTime 和 FinishTime:当前时间
      • TaskStat:FlowItemStatus.AutoFinished(自动完成)
      • AutoFinish:1(自动完成标志)
      • EnableCalculate:1(参与计算)
      • LevelCode:当前级别代码
      • KeyInfo 和 KeyLabel:从事件参数获取
      • TaskName:节点名称
    • 将工作项添加到流程实例
    • 更新当前工作项 ID
    • 记录进入日志
  3. EndingParallelProcess: 结束并行流程

    • 查找所有属于当前并行流程的工作项
    • 根据级别代码(LevelCode)过滤工作项
    • 将未完成的工作项标记为自动完成:
      • 设置 AutoFinish = 1
      • 设置 TaskStat = FlowItemStatus.AutoFinished
      • 设置 FinishTime = 当前时间
  4. ExitAsync: 退出并行流程结束节点

    • 计算执行耗时
    • 记录退出日志

流程图

[并行分支1] ┐
[并行分支2] ├→ [ParallelEnd] → [后续节点]
[并行分支3] ┘

等待模式详解

WaitOne 模式

时间线: ─────────────────────────────→

分支1: ──────●──────────────────── (第1个完成,触发继续)
分支2: ────────────●────────────── (第2个完成,被自动完成)
分支3: ──────────────────●──────── (第3个完成,被自动完成)

ParallelEnd: ──────●──────────────── (第1个分支完成时继续)

WaitAll 模式

时间线: ─────────────────────────────→

分支1: ──────●──────────────────── (第1个完成,等待)
分支2: ────────────●────────────── (第2个完成,等待)
分支3: ──────────────────●──────── (第3个完成,触发继续)

ParallelEnd: ──────────────────●──── (所有分支完成时继续)

级别代码机制

ParallelEnd 使用级别代码来识别和管理并行分支:

  • AllSubflowCode: 获取所有子流程的级别代码

    • 从流程实例的工作项中查找
    • 根据父级别代码和当前键过滤
    • 返回所有匹配的子流程级别代码
  • GetParentInfo: 获取父流程的信息

    • 根据级别代码查找父流程工作项
    • 返回父流程的 KeyInfo 和 KeyLabel

依赖服务

服务接口用途说明
Flow流程实例访问流程上下文和工作项
Flow.Logger日志记录器记录并行流程的执行日志
SemaphoreSlim信号量用于线程同步和等待控制

异常处理

ParallelEnd 节点的异常处理:

  • 取消令牌: 在等待信号量时响应流程取消令牌
  • 线程安全: 使用原子操作(Interlocked)确保计数器的线程安全
  • 空引用: 检查流程实例和工作项是否为空

日志记录

ParallelEnd 记录以下关键日志:

  • 进入判断: CanEnter={canEnter},WaitMode={WaitMode},TransitionCount={transitionCount}
  • 进入节点: Enter,TaskID={taskId}
  • 子流程信息: CurrentLevel={currentLevel},ParentLevel={parentLevel},CurrentKey={key},Subflows={subflows}
  • 退出节点: Exit,耗时={milliseconds}毫秒

使用示例

基本示例:WaitAll 模式

{
"Name": "并行检测流程",
"Activities": [
{
"Type": "ParallelStart",
"Name": "开始并行检测",
"Alias": "ParallelStart1"
},
{
"Type": "BusinessActivity",
"Name": "外观检测",
"Alias": "AppearanceCheck"
},
{
"Type": "BusinessActivity",
"Name": "尺寸检测",
"Alias": "DimensionCheck"
},
{
"Type": "BusinessActivity",
"Name": "功能检测",
"Alias": "FunctionCheck"
},
{
"Type": "ParallelEnd",
"Name": "结束并行检测",
"Alias": "ParallelEnd1",
"WaitMode": "WaitAll"
},
{
"Type": "BusinessActivity",
"Name": "汇总结果",
"Alias": "SummaryResult"
}
],
"Transitions": [
{
"From": "ParallelStart1",
"To": "AppearanceCheck"
},
{
"From": "ParallelStart1",
"To": "DimensionCheck"
},
{
"From": "ParallelStart1",
"To": "FunctionCheck"
},
{
"From": "AppearanceCheck",
"To": "ParallelEnd1"
},
{
"From": "DimensionCheck",
"To": "ParallelEnd1"
},
{
"From": "FunctionCheck",
"To": "ParallelEnd1"
},
{
"From": "ParallelEnd1",
"To": "SummaryResult"
}
]
}

高级示例:WaitOne 模式

{
"Name": "快速响应流程",
"Activities": [
{
"Type": "ParallelStart",
"Name": "开始并行查询",
"Alias": "ParallelStart1"
},
{
"Type": "ApiCallActivity",
"Name": "查询主数据库",
"Alias": "QueryMainDB"
},
{
"Type": "ApiCallActivity",
"Name": "查询备份数据库",
"Alias": "QueryBackupDB"
},
{
"Type": "ApiCallActivity",
"Name": "查询缓存",
"Alias": "QueryCache"
},
{
"Type": "ParallelEnd",
"Name": "结束并行查询",
"Alias": "ParallelEnd1",
"WaitMode": "WaitOne"
},
{
"Type": "BusinessActivity",
"Name": "处理数据",
"Alias": "ProcessData"
}
],
"Transitions": [
{
"From": "ParallelStart1",
"To": "QueryMainDB"
},
{
"From": "ParallelStart1",
"To": "QueryBackupDB"
},
{
"From": "ParallelStart1",
"To": "QueryCache"
},
{
"From": "QueryMainDB",
"To": "ParallelEnd1"
},
{
"From": "QueryBackupDB",
"To": "ParallelEnd1"
},
{
"From": "QueryCache",
"To": "ParallelEnd1"
},
{
"From": "ParallelEnd1",
"To": "ProcessData"
}
]
}

完整流程示例:结果汇总

// 在 ParallelEnd 之后的节点中汇总结果
[Serializable]
[Design("汇总检测结果", "汇总所有并行检测的结果", Sort = 1)]
[Category("自定义")]
public class SummaryResultActivity : BusinessActivity
{
public override async Task ProcessAsync(ProcessflowEventArgs args)
{
// 读取各个并行分支的结果
var appearanceResult = Flow.DataItems["AppearanceCheck_Result"]?.ToString();
var dimensionResult = Flow.DataItems["DimensionCheck_Result"]?.ToString();
var functionResult = Flow.DataItems["FunctionCheck_Result"]?.ToString();

// 汇总结果
var allPassed = appearanceResult == "Pass"
&& dimensionResult == "Pass"
&& functionResult == "Pass";

// 写入汇总结果
Flow.DataItems["FinalResult_Value"] = allPassed ? "Pass" : "Fail";

Flow.Logger.LogMessage(
$"检测结果汇总: 外观={appearanceResult}, 尺寸={dimensionResult}, 功能={functionResult}, 最终={allPassed}",
Name);

await base.ProcessAsync(args);
}
}

扩展开发指南

继承层次

Activity (SYC.Flow.Kernel)
└── ParallelEndActivity (SYC.Flow.Kernel)
└── ParallelEnd

可重写方法

方法名用途何时重写
CanEnterAsync判断是否可以进入节点需要自定义等待逻辑
EnterAsync进入节点时执行需要自定义并行流程结束逻辑
ExitAsync退出节点时执行需要自定义并行流程完成后的处理
EndingParallelProcess结束并行流程需要自定义未完成分支的处理逻辑
AllSubflowCode获取所有子流程代码需要自定义子流程识别逻辑
GetParentInfo获取父流程信息需要自定义父流程信息获取逻辑

自定义并行结束节点示例

[Serializable]
[Design("自定义并行结束", "带结果验证的并行结束节点", Sort = 1)]
[Category("自定义")]
public class CustomParallelEnd : ParallelEnd
{
[Design("最小成功数", "至少需要成功的分支数量", Sort = 1)]
[Category("配置")]
[DataMember]
public int MinSuccessCount { get; set; } = 2;

public override async Task<bool> CanEnterAsync(ProcessflowEventArgs args)
{
// 先调用基类的等待逻辑
var canEnter = await base.CanEnterAsync(args);

if (canEnter)
{
// 检查成功的分支数量
var successCount = 0;
var subflows = AllSubflowCode(args.LevelCode);

foreach (var subflow in subflows)
{
var result = Flow.DataItems[$"{subflow}_Result"]?.ToString();
if (result == "Success")
{
successCount++;
}
}

if (successCount < MinSuccessCount)
{
Flow.Logger.LogWarningMessage(
$"成功分支数 {successCount} 少于最小要求 {MinSuccessCount}",
Name);
return false;
}
}

return canEnter;
}

public override async Task EnterAsync(ProcessflowEventArgs e)
{
// 记录所有分支的结果
var subflows = AllSubflowCode(e.LevelCode);
var results = new List<string>();

foreach (var subflow in subflows)
{
var result = Flow.DataItems[$"{subflow}_Result"]?.ToString();
results.Add($"{subflow}={result}");
}

Flow.Logger.LogMessage($"并行分支结果: {string.Join(", ", results)}", Name);

await base.EnterAsync(e);
}
}

注册和集成

  1. 编译节点:

    • 继承 ParallelEnd 类
    • 添加 Design 和 Category 特性
    • 编译为 DLL
  2. 部署节点:

    • 将 DLL 放到 LMES 的插件目录
    • 重启 LMES 服务
  3. 使用节点:

    • 在流程设计器中选择自定义的并行结束节点
    • 配置节点属性(特别是 WaitMode)
    • 连接所有并行分支到此节点

注意事项

  • ⚠️ 必须配对: ParallelEnd 必须与 ParallelStart 配对使用
  • ⚠️ 等待模式: 根据业务需求选择合适的等待模式(WaitOne 或 WaitAll)
  • ⚠️ 分支数量: 流向 ParallelEnd 的转换线数量必须与 ParallelStart 的出口数量一致
  • ⚠️ 线程安全: ParallelEnd 使用线程安全的机制,但并行分支中的代码需要自行保证线程安全
  • ⚠️ 数据冲突: 注意并行分支可能同时写入相同的流程上下文键,导致数据覆盖
  • ⚠️ 取消令牌: ParallelEnd 会响应流程取消令牌,可以中断等待
  • ⚠️ 性能考虑: WaitAll 模式的性能取决于最慢的分支
  • 💡 最佳实践:
    • 为每个并行分支使用不同的输出键名
    • 在 ParallelEnd 之后的节点中汇总所有分支的结果
    • 使用 WaitOne 模式时,确保第一个完成的分支能提供足够的信息
    • 考虑添加超时机制,避免无限等待
    • 记录详细的日志,便于排查并行流程问题

相关节点

常见问题

Q1: WaitOne 和 WaitAll 有什么区别?

A:

  • WaitOne: 只要有一个并行分支完成就继续执行,适用于快速响应场景(如多数据源查询,取最快的结果)
  • WaitAll: 必须等待所有并行分支都完成才继续执行,适用于需要所有结果的场景(如多项检测都要完成)

Q2: 如果某个并行分支执行失败怎么办?

A:

  • 如果分支抛出异常,该分支会停止执行
  • WaitAll 模式:会等待其他分支完成,失败的分支会被自动标记为完成
  • WaitOne 模式:如果第一个完成的是失败分支,流程会继续,需要在后续节点中检查结果
  • 建议在并行分支中捕获异常并写入结果标志,在 ParallelEnd 之后检查

Q3: 如何获取所有并行分支的结果?

A: 在 ParallelEnd 之后的节点中,通过流程上下文读取各个分支写入的数据:

var result1 = Flow.DataItems["Branch1_Result"];
var result2 = Flow.DataItems["Branch2_Result"];
var result3 = Flow.DataItems["Branch3_Result"];

Q4: ParallelEnd 会自动完成未完成的分支吗?

A: 是的,ParallelEnd 的 EndingParallelProcess 方法会自动将所有未完成的并行分支标记为自动完成状态(AutoFinished)。

Q5: 如何避免并行分支之间的数据冲突?

A:

  • 为每个分支使用不同的流程上下文键名
  • 使用分支标识作为键名前缀,如 Branch1_ResultBranch2_Result
  • 避免在并行分支中修改共享数据
  • 如果必须共享数据,使用线程安全的数据结构

Q6: 可以嵌套使用并行流程吗?

A: 可以,但要注意:

  • 每对 ParallelStart/ParallelEnd 必须正确配对
  • 级别代码会自动管理嵌套层次
  • 嵌套并行会增加流程复杂度,建议谨慎使用

更新历史

日期版本说明
2025-11-281.0初始版本

本文档最后更新时间: 2025-11-28