🔧 API 参考¶
agnflow 的完整 API 文档。
🧩 核心类¶
🔧 Node¶
工作流的基本构建块。
参数: - name (str): 节点标识符 - exec (callable): 同步执行函数 - aexec (callable): 异步执行函数 - max_retries (int): 最大重试次数,默认1 - wait (int): 重试间隔时间(秒),默认0
方法: - run(state): 同步执行节点 - arun(state): 异步执行节点
🌊 Flow¶
用于组织和执行节点的容器。
参数: - name (str): 工作流名称
方法: - run(state, entry_action=None, max_steps=None): 同步执行工作流 - arun(state, entry_action=None, max_steps=None): 异步执行工作流 - render_mermaid(saved_file=None, title=None): 生成 Mermaid 流程图 - render_dot(saved_file=None): 生成 DOT 流程图
操作符: - flow[node]: 添加节点 - flow += node: 添加节点 - flow -= node: 移除节点
🐝 Swarm¶
多智能体协调模式,所有节点都相互连接。
参数: - name (str): 蜂群名称
👨💼 Supervisor¶
监督模式,第一个节点监督其他节点。
参数: - name (str): 监督者名称
🔗 连接操作符¶
➡️ 前向连接¶
将节点 a 连接到节点 b,方向向前。
⬅️ 反向连接¶
将节点 b 连接到节点 a,方向向后。
🔀 并行连接¶
将节点 a 并行连接到多个节点 b、c 和 d。
❌ 断开连接¶
移除节点之间的连接。
⚡ 执行函数¶
🔄 同步执行¶
参数: - state (dict): 当前工作流状态
返回: - dict: 更新后的状态 - tuple: (action_name, state) 用于流程控制
⚡ 异步执行¶
参数: - state (dict): 当前工作流状态
返回: - dict: 更新后的状态 - tuple: (action_name, state) 用于流程控制
🎮 流程控制动作¶
🚪 退出工作流¶
终止工作流执行。
🎯 跳转到节点¶
跳转到指定节点。
⚠️ 错误处理¶
处理工作流中的错误。
💾 状态管理¶
📋 状态结构¶
🔄 状态更新¶
🎨 可视化¶
📊 Mermaid 配置¶
参数: - saved_file (str): 保存文件路径 - title (str): 图表标题
🔷 DOT 配置¶
参数: - saved_file (str): 保存文件路径
🚀 高级功能¶
🔧 动态节点管理¶
# 添加节点
flow += Node("new_node", exec=my_function)
# 移除节点
flow -= existing_node
# 批量操作
flow += [node1, node2, node3]
flow -= [old_node1, old_node2]
🎲 条件连接¶
# 基于条件的连接
if condition:
    flow = Flow(a >> b)
else:
    flow = Flow(a >> c)
# 动态连接
flow = Flow(a >> (b if condition else c))
🛡️ 错误处理¶
def robust_function(state):
    try:
        # 可能失败的操作
        result = risky_operation()
        return {"success": True, "result": result}
    except Exception as e:
        return "error", {"error": str(e), "step": "robust_function"}
def error_handler(state):
    print(f"Error in {state['step']}: {state['error']}")
    return {"handled": True}
⚡ 性能优化¶
🔄 异步执行¶
# 使用异步执行提高性能
async def async_workflow():
    flow = Flow(async_node1 >> async_node2 >> async_node3)
    return await flow.arun(initial_state)
# 运行异步工作流
result = asyncio.run(async_workflow())
🔀 并行处理¶
# 并行执行多个节点
parallel_nodes = [Node(f"task_{i}", exec=task_function) for i in range(5)]
workflow = Flow(parallel_nodes >> combine_node)
🗄️ 缓存机制¶
def cached_function(state):
    cache_key = hash(str(state))
    if cache_key in cache:
        return cache[cache_key]
    result = expensive_operation(state)
    cache[cache_key] = result
    return result
📚 最佳实践¶
💾 状态设计¶
# 好的状态设计
state = {
    "data": "actual data",
    "metadata": {
        "created_at": "2024-01-01",
        "version": "1.0"
    },
    "results": [],
    "errors": []
}
# 避免在状态中存储大量数据
# 避免在状态中存储函数或复杂对象
🛡️ 错误处理¶
def safe_function(state):
    try:
        return process_safely(state)
    except ValueError as e:
        return {"error": "Invalid input", "details": str(e)}
    except Exception as e:
        return "error", {"error": "Unexpected error", "details": str(e)}
🔧 资源管理¶
def resource_aware_function(state):
    # 检查资源可用性
    if not check_resources():
        return "error", {"error": "Insufficient resources"}
    # 使用资源
    result = use_resources(state)
    # 清理资源
    cleanup_resources()
    return result
🔌 扩展和自定义¶
🧩 自定义节点类型¶
class CustomNode(Node):
    def __init__(self, name, custom_param, **kwargs):
        super().__init__(name, **kwargs)
        self.custom_param = custom_param
    def run(self, state):
        # 自定义逻辑
        return {"custom_result": self.custom_param}
🌊 自定义工作流类型¶
class CustomFlow(Flow):
    def __init__(self, name=None, custom_config=None):
        super().__init__(name=name)
        self.custom_config = custom_config
    def custom_method(self):
        # 自定义方法
        pass