🔧 API 参考¶
agnflow 的完整 API 文档。
🧩 核心类¶
🔧 Node¶
工作流的基本构建块。
参数: - name
(str): 节点标识符 - exec
(callable): 同步执行函数 - aexec
(callable): 异步执行函数 - max_retries
(int): 最大重试次数,默认1 - wait
(int): 重试间隔时间(秒),默认0
方法: - run(state)
: 同步执行节点 - arun(state)
: 异步执行节点
🌊 Flow¶
用于组织和执行节点的容器。
参数: - name
(str): 工作流名称
方法: - run(state, entry_action=None, max_steps=None)
: 同步执行工作流 - arun(state, entry_action=None, max_steps=None)
: 异步执行工作流 - render_mermaid(saved_file=None, title=None)
: 生成 Mermaid 流程图 - render_dot(saved_file=None)
: 生成 DOT 流程图
操作符: - flow[node]
: 添加节点 - flow += node
: 添加节点 - flow -= node
: 移除节点
🐝 Swarm¶
多智能体协调模式,所有节点都相互连接。
参数: - name
(str): 蜂群名称
👨💼 Supervisor¶
监督模式,第一个节点监督其他节点。
参数: - name
(str): 监督者名称
🔗 连接操作符¶
➡️ 前向连接¶
将节点 a
连接到节点 b
,方向向前。
⬅️ 反向连接¶
将节点 b
连接到节点 a
,方向向后。
🔀 并行连接¶
将节点 a
并行连接到多个节点 b
、c
和 d
。
❌ 断开连接¶
移除节点之间的连接。
⚡ 执行函数¶
🔄 同步执行¶
参数: - state
(dict): 当前工作流状态
返回: - dict
: 更新后的状态 - tuple
: (action_name, state) 用于流程控制
⚡ 异步执行¶
参数: - state
(dict): 当前工作流状态
返回: - dict
: 更新后的状态 - tuple
: (action_name, state) 用于流程控制
🎮 流程控制动作¶
🚪 退出工作流¶
终止工作流执行。
🎯 跳转到节点¶
跳转到指定节点。
⚠️ 错误处理¶
处理工作流中的错误。
💾 状态管理¶
📋 状态结构¶
🔄 状态更新¶
🎨 可视化¶
📊 Mermaid 配置¶
参数: - saved_file
(str): 保存文件路径 - title
(str): 图表标题
🔷 DOT 配置¶
参数: - saved_file
(str): 保存文件路径
🚀 高级功能¶
🔧 动态节点管理¶
# 添加节点
flow += Node("new_node", exec=my_function)
# 移除节点
flow -= existing_node
# 批量操作
flow += [node1, node2, node3]
flow -= [old_node1, old_node2]
🎲 条件连接¶
# 基于条件的连接
if condition:
flow = Flow(a >> b)
else:
flow = Flow(a >> c)
# 动态连接
flow = Flow(a >> (b if condition else c))
🛡️ 错误处理¶
def robust_function(state):
try:
# 可能失败的操作
result = risky_operation()
return {"success": True, "result": result}
except Exception as e:
return "error", {"error": str(e), "step": "robust_function"}
def error_handler(state):
print(f"Error in {state['step']}: {state['error']}")
return {"handled": True}
⚡ 性能优化¶
🔄 异步执行¶
# 使用异步执行提高性能
async def async_workflow():
flow = Flow(async_node1 >> async_node2 >> async_node3)
return await flow.arun(initial_state)
# 运行异步工作流
result = asyncio.run(async_workflow())
🔀 并行处理¶
# 并行执行多个节点
parallel_nodes = [Node(f"task_{i}", exec=task_function) for i in range(5)]
workflow = Flow(parallel_nodes >> combine_node)
🗄️ 缓存机制¶
def cached_function(state):
cache_key = hash(str(state))
if cache_key in cache:
return cache[cache_key]
result = expensive_operation(state)
cache[cache_key] = result
return result
📚 最佳实践¶
💾 状态设计¶
# 好的状态设计
state = {
"data": "actual data",
"metadata": {
"created_at": "2024-01-01",
"version": "1.0"
},
"results": [],
"errors": []
}
# 避免在状态中存储大量数据
# 避免在状态中存储函数或复杂对象
🛡️ 错误处理¶
def safe_function(state):
try:
return process_safely(state)
except ValueError as e:
return {"error": "Invalid input", "details": str(e)}
except Exception as e:
return "error", {"error": "Unexpected error", "details": str(e)}
🔧 资源管理¶
def resource_aware_function(state):
# 检查资源可用性
if not check_resources():
return "error", {"error": "Insufficient resources"}
# 使用资源
result = use_resources(state)
# 清理资源
cleanup_resources()
return result
🔌 扩展和自定义¶
🧩 自定义节点类型¶
class CustomNode(Node):
def __init__(self, name, custom_param, **kwargs):
super().__init__(name, **kwargs)
self.custom_param = custom_param
def run(self, state):
# 自定义逻辑
return {"custom_result": self.custom_param}
🌊 自定义工作流类型¶
class CustomFlow(Flow):
def __init__(self, name=None, custom_config=None):
super().__init__(name=name)
self.custom_config = custom_config
def custom_method(self):
# 自定义方法
pass