Skip to content

Chain 开发设计文档

1. 概述

在 Tinyflow AI 工作流编排框架中,Chain执行引擎的核心抽象,负责管理整个工作流的生命周期、状态协调、节点调度、错误处理与事件通知。它将静态的 ChainDefinition(工作流定义)与动态的运行时状态(ChainState)解耦,实现了可恢复、可监控、可并行、可持久化的 AI 工作流执行模型。

本文档深入解析 Chain设计哲学、架构组成、状态管理机制与扩展点,帮助开发者理解其内部运作原理,并基于此构建高可靠、高可维护的 AI 应用。

2. 设计哲学

2.1 状态与定义分离(State-Definition Separation)

  • ChainDefinition:只读的、静态的工作流蓝图,包含节点(Node)、边(Edge)、条件(Condition)等拓扑结构。
  • ChainState:动态的、可变的运行时状态,记录执行进度、内存变量、错误信息、挂起状态等。

特点:同一 ChainDefinition 可被多个实例(stateInstanceId)并行执行,彼此状态隔离。

2.2 事件驱动(Event-Driven)

Chain 通过 EventManager 发布生命周期事件(如 ChainStartEventNodeEndEvent),支持:

  • 日志记录
  • 指标上报(Prometheus)
  • 外部系统通知(Webhook)
  • 自定义监控逻辑

2.3 乐观并发控制(Optimistic Concurrency Control)

采用版本号 + 重试机制保证状态更新的原子性与一致性,避免分布式环境下的竞态条件。

2.4 可恢复执行(Resumable Execution)

支持挂起(Suspend)与恢复(Resume),适用于需人工干预、异步回调或长时间等待的场景(如审批、支付回调)。

3. 核心架构

Chain 的执行依赖以下关键组件:

组件职责扩展接口
ChainDefinition工作流静态定义(只读)ChainDefinitionRepository
ChainStateRepository持久化/加载 ChainState可替换(如 Redis、DB)
NodeStateRepository持久化/加载 NodeState可替换
TriggerScheduler调度节点执行(支持延迟、重试、循环)可替换(如 Quartz、Redis Streams)
EventManager事件发布与监听可注册自定义监听器

📌 所有组件均可通过 setter 注入,实现依赖注入友好测试可 mock

4. 状态管理机制

4.1 状态模型

  • ChainState:链级别状态

    • statusRUNNING / SUSPEND / SUCCEEDED / FAILED
    • memory:全局变量存储(Map<String, Object>
    • executeResult:最近一次节点输出
    • suspendForParameters:挂起时需外部提供的参数
    • suspendNodeIds:挂起的节点 ID 集合
    • version:用于乐观锁的版本号
  • NodeState:节点级别状态

    • statusPENDING / RUNNING / SUCCEEDED / ERROR / SUSPEND
    • executeCount / retryCount / loopCount
    • error:异常摘要(ExceptionSummary
    • version:隐式通过 ChainState.version 保证一致性

4.2 安全状态更新:updateStateSafely

java
public ChainState updateStateSafely(ChainStateModifier modifier) {
    // 1. 加载当前状态
    ChainState current = repository.load(instanceId);
    
    // 2. 应用修改,返回变更字段
    EnumSet<ChainStateField> fields = modifier.modify(current);
    
    // 3. 尝试 CAS 更新(基于 version)
    if (repository.tryUpdate(current, fields)) {
        return current; // 成功
    }
    
    // 4. 失败则重试(指数退避 + jitter)
    // ... 最多重试 30 秒
}
  • 字段级更新modifier 返回 EnumSet<ChainStateField>,仅持久化变更字段,提升性能。
  • 超时控制:30 秒内无法成功则抛出 ChainUpdateTimeoutException
  • 退避策略:指数退避 + 随机抖动(jitter),避免“惊群效应”。

5. 执行流程

5.1 启动(start

  1. 初始化 ChainState(状态 → RUNNING,加载初始变量)
  2. 发布 ChainStartEvent
  3. 调度所有入口节点definition.getStartNodes()

5.2 节点执行(executeNode

  1. 检查条件 node.getCondition(),决定是否跳过
  2. 调用 node.execute(chain)(实际执行逻辑在 Node 中)
  3. 捕获异常,记录错误
  4. 调用 handleNodeResult 处理结果

5.3 结果处理(handleNodeResult

成功路径:

  • 更新 NodeStateSUCCEEDED
  • 将输出写入 ChainState.memory(键为 nodeId.outputKey
  • 若启用 resetRetryCountAfterNormal,重置重试计数
  • 调度下游节点(或循环/挂起)

失败路径:

  • 更新 NodeStateERROR
  • 若启用重试且未达上限 → 延迟重试
  • 否则 → 链终止(ChainStatus.FAILED

挂起路径:

  • 抛出 ChainSuspendException
  • 更新状态 → SUSPEND
  • 记录需外部提供的参数(suspendForParameters

5.4 节点调度(scheduleNode

  • 生成 Trigger 对象(含 stateInstanceIdnodeIdedgeIddelayMs 等)
  • 交由 TriggerScheduler 异步调度
  • 支持
    • 普通执行(TriggerType.NEXT
    • 重试(RETRY
    • 循环(LOOP
    • 手动恢复(MANUAL

6. 高级特性

6.1 循环(Loop)

java
if (node.isLoopEnable()) {
    // 检查:是否超最大循环次数?
    // 检查:是否满足 breakCondition?
    // 否则:loopCount++,重新调度自身
}
  • 通过 loopIntervalMs 控制循环间隔
  • 通过 loopBreakCondition 动态退出

6.2 条件分支(Conditional Edges)

  • 节点级条件node.getCondition() 决定节点是否执行
  • 边级条件edge.getCondition() 决定是否走向下游节点

条件实现需实现 NodeCondition / EdgeCondition 接口,可访问 ChainStateNodeState、上一节点输出等。

6.3 挂起与恢复(Suspend & Resume)

挂起:

java
throw new ChainSuspendException(Map.of("requiredParam", "description"));

恢复:

java
chain.resume(Map.of("requiredParam", "value"));
// 自动调度所有 suspendNodeIds

适用于人机协作、异步审批、外部系统回调等场景。

6.4 上下文感知(ThreadLocal)

java
public static Chain currentChain() {
    return EXECUTION_THREAD_LOCAL.get();
}
  • Node.execute() 中可随时获取当前 Chain 实例
  • 便于日志追踪(MDC)、链路传递

7. 扩展点

7.1 自定义状态存储

实现 ChainStateRepository / NodeStateRepository

  • 使用数据库(MySQL、PostgreSQL)
  • 使用内存(InMemoryChainStateRepository,用于测试)
  • 使用 Redis(支持 TTL、分布式锁)

7.2 自定义调度器

实现 TriggerScheduler

  • 基于线程池(ExecutorTriggerScheduler
  • 基于消息队列(RabbitMQ、Kafka)
  • 基于分布式调度(Quartz Cluster)

7.3 事件监听

注册 EventListenerEventManager

java
eventManager.addEventListener(ChainEndEvent.class, event -> {
    // 发送通知、记录审计日志等
});

8. 性能与可靠性

机制说明
乐观锁避免分布式写冲突
指数退避降低重试风暴风险
字段级更新减少 I/O 开销
异步调度解耦执行与调度
状态快照支持持久化与恢复

💡 生产建议:搭配 Redis 实现 ChainStateRepository + TriggerScheduler,获得高性能与高可用。

9. 总结

Chain 不仅是一个工作流执行器,更是一个面向 AI 应用的运行时引擎。它通过状态管理、事件驱动、乐观并发、可恢复执行等机制,解决了 AI 工作流中常见的长时运行、异步交互、错误恢复、可观测性等难题。

开发者可通过扩展其组件,轻松构建企业级 AI Agent、自动化工作流、智能客服等复杂系统。