Chain 开发设计文档
1. 概述
在 Tinyflow AI 工作流编排框架中,Chain 是执行引擎的核心抽象,负责管理整个工作流的生命周期、状态协调、节点调度、错误处理与事件通知。它将静态的 ChainDefinition(工作流定义)与动态的运行时状态(ChainState)解耦,实现了可恢复、可监控、可并行、可持久化的 AI 工作流执行模型。
本文档深入解析 Chain 的设计哲学、架构组成、状态管理机制与扩展点,帮助开发者理解其内部运作原理,并基于此构建高可靠、高可维护的 AI 应用。
2. 设计哲学
2.1 状态与定义分离(State-Definition Separation)
ChainDefinition:只读的、静态的工作流蓝图,包含节点(Node)、边(Edge)、条件(Condition)等拓扑结构。ChainState:动态的、可变的运行时状态,记录执行进度、内存变量、错误信息、挂起状态等。
特点:同一
ChainDefinition可被多个实例(stateInstanceId)并行执行,彼此状态隔离。
2.2 事件驱动(Event-Driven)
Chain 通过 EventManager 发布生命周期事件(如 ChainStartEvent、NodeEndEvent),支持:
- 日志记录
- 指标上报(Prometheus)
- 外部系统通知(Webhook)
- 自定义监控逻辑
2.3 乐观并发控制(Optimistic Concurrency Control)
采用版本号 + 重试机制保证状态更新的原子性与一致性,避免分布式环境下的竞态条件。
2.4 可恢复执行(Resumable Execution)
支持挂起(Suspend)与恢复(Resume),适用于需人工干预、异步回调或长时间等待的场景(如审批、支付回调)。
3. 核心架构
Chain 的执行依赖以下关键组件:
| 组件 | 职责 | 扩展接口 |
|---|---|---|
ChainDefinition | 工作流静态定义(只读) | ChainDefinitionRepository |
ChainStateRepository | 持久化/加载 ChainState | 可替换(如 Redis、DB) |
NodeStateRepository | 持久化/加载 NodeState | 可替换 |
TriggerScheduler | 调度节点执行(支持延迟、重试、循环) | 可替换(如 Quartz、Redis Streams) |
EventManager | 事件发布与监听 | 可注册自定义监听器 |
📌 所有组件均可通过 setter 注入,实现依赖注入友好与测试可 mock。
4. 状态管理机制
4.1 状态模型
ChainState:链级别状态status:RUNNING/SUSPEND/SUCCEEDED/FAILEDmemory:全局变量存储(Map<String, Object>)executeResult:最近一次节点输出suspendForParameters:挂起时需外部提供的参数suspendNodeIds:挂起的节点 ID 集合version:用于乐观锁的版本号
NodeState:节点级别状态status:PENDING/RUNNING/SUCCEEDED/ERROR/SUSPENDexecuteCount/retryCount/loopCounterror:异常摘要(ExceptionSummary)version:隐式通过ChainState.version保证一致性
4.2 安全状态更新:updateStateSafely
public ChainState updateStateSafely(ChainStateModifier modifier) {
// 1. 加载当前状态
ChainState current = repository.load(instanceId);
// 2. 应用修改,返回变更字段
EnumSet<ChainStateField> fields = modifier.modify(current);
// 3. 尝试 CAS 更新(基于 version)
if (repository.tryUpdate(current, fields)) {
return current; // 成功
}
// 4. 失败则重试(指数退避 + jitter)
// ... 最多重试 30 秒
}- 字段级更新:
modifier返回EnumSet<ChainStateField>,仅持久化变更字段,提升性能。 - 超时控制:30 秒内无法成功则抛出
ChainUpdateTimeoutException。 - 退避策略:指数退避 + 随机抖动(jitter),避免“惊群效应”。
5. 执行流程
5.1 启动(start)
- 初始化
ChainState(状态 →RUNNING,加载初始变量) - 发布
ChainStartEvent - 调度所有入口节点(
definition.getStartNodes())
5.2 节点执行(executeNode)
- 检查条件
node.getCondition(),决定是否跳过 - 调用
node.execute(chain)(实际执行逻辑在Node中) - 捕获异常,记录错误
- 调用
handleNodeResult处理结果
5.3 结果处理(handleNodeResult)
成功路径:
- 更新
NodeState→SUCCEEDED - 将输出写入
ChainState.memory(键为nodeId.outputKey) - 若启用
resetRetryCountAfterNormal,重置重试计数 - 调度下游节点(或循环/挂起)
失败路径:
- 更新
NodeState→ERROR - 若启用重试且未达上限 → 延迟重试
- 否则 → 链终止(
ChainStatus.FAILED)
挂起路径:
- 抛出
ChainSuspendException - 更新状态 →
SUSPEND - 记录需外部提供的参数(
suspendForParameters)
5.4 节点调度(scheduleNode)
- 生成
Trigger对象(含stateInstanceId、nodeId、edgeId、delayMs等) - 交由
TriggerScheduler异步调度 - 支持:
- 普通执行(
TriggerType.NEXT) - 重试(
RETRY) - 循环(
LOOP) - 手动恢复(
MANUAL)
- 普通执行(
6. 高级特性
6.1 循环(Loop)
if (node.isLoopEnable()) {
// 检查:是否超最大循环次数?
// 检查:是否满足 breakCondition?
// 否则:loopCount++,重新调度自身
}- 通过
loopIntervalMs控制循环间隔 - 通过
loopBreakCondition动态退出
6.2 条件分支(Conditional Edges)
- 节点级条件:
node.getCondition()决定节点是否执行 - 边级条件:
edge.getCondition()决定是否走向下游节点
条件实现需实现
NodeCondition/EdgeCondition接口,可访问ChainState、NodeState、上一节点输出等。
6.3 挂起与恢复(Suspend & Resume)
挂起:
throw new ChainSuspendException(Map.of("requiredParam", "description"));恢复:
chain.resume(Map.of("requiredParam", "value"));
// 自动调度所有 suspendNodeIds适用于人机协作、异步审批、外部系统回调等场景。
6.4 上下文感知(ThreadLocal)
public static Chain currentChain() {
return EXECUTION_THREAD_LOCAL.get();
}- 在
Node.execute()中可随时获取当前Chain实例 - 便于日志追踪(MDC)、链路传递
7. 扩展点
7.1 自定义状态存储
实现 ChainStateRepository / NodeStateRepository:
- 使用数据库(MySQL、PostgreSQL)
- 使用内存(
InMemoryChainStateRepository,用于测试) - 使用 Redis(支持 TTL、分布式锁)
7.2 自定义调度器
实现 TriggerScheduler:
- 基于线程池(
ExecutorTriggerScheduler) - 基于消息队列(RabbitMQ、Kafka)
- 基于分布式调度(Quartz Cluster)
7.3 事件监听
注册 EventListener 到 EventManager:
eventManager.addEventListener(ChainEndEvent.class, event -> {
// 发送通知、记录审计日志等
});8. 性能与可靠性
| 机制 | 说明 |
|---|---|
| 乐观锁 | 避免分布式写冲突 |
| 指数退避 | 降低重试风暴风险 |
| 字段级更新 | 减少 I/O 开销 |
| 异步调度 | 解耦执行与调度 |
| 状态快照 | 支持持久化与恢复 |
💡 生产建议:搭配 Redis 实现
ChainStateRepository+TriggerScheduler,获得高性能与高可用。
9. 总结
Chain 不仅是一个工作流执行器,更是一个面向 AI 应用的运行时引擎。它通过状态管理、事件驱动、乐观并发、可恢复执行等机制,解决了 AI 工作流中常见的长时运行、异步交互、错误恢复、可观测性等难题。
开发者可通过扩展其组件,轻松构建企业级 AI Agent、自动化工作流、智能客服等复杂系统。