💡 示例¶
展示 agnflow 功能的实用示例。
🎯 基础示例¶
📝 简单线性工作流¶
from agnflow import Node, Flow
# 定义节点
def step1(state):
    return {"data": "processed", "step": 1}
def step2(state):
    return {"data": state["data"], "step": 2, "final": True}
# 创建节点
node1 = Node("step1", exec=step1)
node2 = Node("step2", exec=step2)
# 构建工作流
workflow = Flow(node1 >> node2)
# 执行
result = workflow.run({"initial": "data"})
print(result)  # {'data': 'processed', 'step': 2, 'final': True}
🔄 并行处理¶
from agnflow import Node, Flow
def process_a(state):
    return {"result_a": "A processed"}
def process_b(state):
    return {"result_b": "B processed"}
def combine(state):
    return {
        **state,
        "combined": f"{state['result_a']} + {state['result_b']}"
    }
# 创建并行分支工作流
a = Node("process_a", exec=process_a)
b = Node("process_b", exec=process_b)
c = Node("combine", exec=combine)
workflow = Flow(a >> [b, c] >> c)
result = workflow.run({})
🚀 高级示例¶
🔧 动态节点管理¶
from agnflow import Node, Flow
def add_data(state):
    return {"data": "new data"}
def process_data(state):
    return {"processed": state["data"]}
# 创建初始工作流
workflow = Flow()
workflow[Node("start", exec=lambda s: {"step": "started"})]
# 动态添加节点
workflow += Node("add", exec=add_data)
workflow += Node("process", exec=process_data)
# 连接节点
workflow["start"] >> workflow["add"] >> workflow["process"]
# 执行
result = workflow.run({})
🎲 条件工作流¶
from agnflow import Node, Flow
def check_condition(state):
    if state.get("condition"):
        return "branch_a"
    else:
        return "branch_b"
def branch_a(state):
    return {"path": "A", "result": "A processed"}
def branch_b(state):
    return {"path": "B", "result": "B processed"}
def finalize(state):
    return {"final": f"Completed via {state['path']}"}
# 创建条件分支工作流
check = Node("check", exec=check_condition)
a = Node("branch_a", exec=branch_a)
b = Node("branch_b", exec=branch_b)
final = Node("finalize", exec=finalize)
workflow = Flow(check >> (a if True else b) >> final)
result = workflow.run({"condition": True})
👥 人工介入循环¶
from agnflow import Node, Flow
from agnflow.agent.hitl.cli import human_in_the_loop
def generate_content(state):
    return {"content": "Generated content for review"}
def human_review(state):
    result, approved = human_in_the_loop(
        "Please review this content:",
        input_data=state["content"],
        options=["approve", "reject", "modify"]
    )
    if approved:
        return {"reviewed": True, "content": result}
    else:
        return "exit", {"reviewed": False}
def publish(state):
    return {"published": True, "content": state["content"]}
# 创建 HITL 工作流
generate = Node("generate", exec=generate_content)
review = Node("review", exec=human_review)
publish = Node("publish", exec=publish)
workflow = Flow(generate >> review >> publish)
result = workflow.run({})
🤖 多智能体示例¶
🐝 蜂群模式¶
from agnflow import Node, Swarm
def agent1(state):
    return {"agent1_result": "Task 1 completed"}
def agent2(state):
    return {"agent2_result": "Task 2 completed"}
def agent3(state):
    return {"agent3_result": "Task 3 completed"}
# 创建智能体蜂群
agent1_node = Node("agent1", exec=agent1)
agent2_node = Node("agent2", exec=agent2)
agent3_node = Node("agent3", exec=agent3)
swarm = Swarm[agent1_node, agent2_node, agent3_node]
result = swarm.run({"task": "collaborative task"})
👨💼 监督者模式¶
from agnflow import Node, Supervisor
def supervisor(state):
    # 监督者协调工作者
    return {"supervision": "coordinating", "tasks": ["task1", "task2"]}
def worker1(state):
    return {"worker1_result": "Task 1 done"}
def worker2(state):
    return {"worker2_result": "Task 2 done"}
# 创建监督者-工作者模式
supervisor_node = Node("supervisor", exec=supervisor)
worker1_node = Node("worker1", exec=worker1)
worker2_node = Node("worker2", exec=worker2)
supervisor_flow = Supervisor[supervisor_node, worker1_node, worker2_node]
result = supervisor_flow.run({"project": "supervised project"})
⚠️ 错误处理示例¶
🛡️ 健壮工作流¶
from agnflow import Node, Flow
def risky_operation(state):
    try:
        # 可能失败的操作
        result = 1 / 0
        return {"result": result}
    except Exception as e:
        return "error", {"error": str(e)}
def error_handler(state):
    print(f"Handling error: {state['error']}")
    return {"handled": True, "error": state["error"]}
def success_handler(state):
    return {"success": True, "result": state["result"]}
# 创建错误处理工作流
risky = Node("risky", exec=risky_operation)
error = Node("error", exec=error_handler)
success = Node("success", exec=success_handler)
workflow = Flow(risky >> (error if "error" in state else success))
result = workflow.run({})
🔄 重试机制¶
from agnflow import Node, Flow
import time
def retry_operation(state, max_retries=3):
    retries = state.get("retries", 0)
    if retries >= max_retries:
        return "error", {"error": "Max retries exceeded"}
    try:
        # 模拟可能失败的操作
        if time.time() % 2 < 1:  # 50% 失败率
            raise Exception("Random failure")
        return {"success": True, "attempts": retries + 1}
    except Exception as e:
        return {
            "retries": retries + 1,
            "last_error": str(e)
        }
def retry_node(state):
    return retry_operation(state)
# 创建重试工作流
retry = Node("retry", exec=retry_node)
workflow = Flow(retry >> retry)  # 自循环重试
result = workflow.run({"retries": 0})
⚡ 异步示例¶
🔄 异步节点¶
import asyncio
from agnflow import Node, Flow
async def async_operation(state):
    await asyncio.sleep(1)
    return {"async_result": "completed"}
async def async_combine(state):
    await asyncio.sleep(0.5)
    return {"combined": f"Async: {state['async_result']}"}
# 创建异步工作流
async_node = Node("async_op", aexec=async_operation)
async_combine_node = Node("async_combine", aexec=async_combine)
workflow = Flow(async_node >> async_combine_node)
# 异步执行
result = asyncio.run(workflow.arun({}))
🔀 混合同步异步¶
import asyncio
from agnflow import Node, Flow
def sync_operation(state):
    return {"sync_data": "processed"}
async def async_operation(state):
    await asyncio.sleep(1)
    return {"async_data": "processed"}
def combine_results(state):
    return {
        "combined": f"{state['sync_data']} + {state['async_data']}"
    }
# 创建混合工作流
sync_node = Node("sync", exec=sync_operation)
async_node = Node("async", aexec=async_operation)
combine_node = Node("combine", exec=combine_results)
workflow = Flow(sync_node >> async_node >> combine_node)
result = asyncio.run(workflow.arun({}))
🎨 可视化示例¶
📊 生成流程图¶
from agnflow import Node, Flow
def step1(state):
    return {"step": 1}
def step2(state):
    return {"step": 2}
def step3(state):
    return {"step": 3}
# 创建复杂工作流
a = Node("step1", exec=step1)
b = Node("step2", exec=step2)
c = Node("step3", exec=step3)
workflow = Flow(a >> [b, c] >> b)
# 生成 Mermaid 图表
workflow.render_mermaid(saved_file="workflow.png", title="复杂工作流")
# 生成 DOT 图表
workflow.render_dot(saved_file="workflow.dot")
💾 状态管理示例¶
🔢 复杂状态操作¶
from agnflow import Node, Flow
def initialize_state(state):
    return {
        **state,
        "counter": 0,
        "history": [],
        "metadata": {"created": "now"}
    }
def increment_counter(state):
    new_counter = state["counter"] + 1
    new_history = state["history"] + [new_counter]
    return {
        **state,
        "counter": new_counter,
        "history": new_history
    }
def analyze_history(state):
    history = state["history"]
    return {
        **state,
        "analysis": {
            "total": len(history),
            "sum": sum(history),
            "average": sum(history) / len(history) if history else 0
        }
    }
# 创建状态管理工作流
init = Node("init", exec=initialize_state)
increment = Node("increment", exec=increment_counter)
analyze = Node("analyze", exec=analyze_history)
workflow = Flow(init >> increment >> increment >> increment >> analyze)
result = workflow.run({})
💾 状态持久化¶
import json
from agnflow import Node, Flow
def save_state(state):
    with open("workflow_state.json", "w") as f:
        json.dump(state, f)
    return state
def load_state(state):
    try:
        with open("workflow_state.json", "r") as f:
            loaded_state = json.load(f)
        return {**state, **loaded_state}
    except FileNotFoundError:
        return state
def process_with_persistence(state):
    return {"processed": True, "data": state.get("data", "default")}
# 创建持久化工作流
load = Node("load", exec=load_state)
process = Node("process", exec=process_with_persistence)
save = Node("save", exec=save_state)
workflow = Flow(load >> process >> save)
result = workflow.run({"data": "important data"})
⚡ 性能优化示例¶
🗄️ 缓存机制¶
from agnflow import Node, Flow
import hashlib
import json
class Cache:
    def __init__(self):
        self._cache = {}
    def get(self, key):
        return self._cache.get(key)
    def set(self, key, value):
        self._cache[key] = value
cache = Cache()
def expensive_operation(state):
    # 生成缓存键
    cache_key = hashlib.md5(
        json.dumps(state, sort_keys=True).encode()
    ).hexdigest()
    # 检查缓存
    cached_result = cache.get(cache_key)
    if cached_result:
        return {"result": cached_result, "cached": True}
    # 执行昂贵操作
    result = sum(i**2 for i in range(10000))
    # 缓存结果
    cache.set(cache_key, result)
    return {"result": result, "cached": False}
# 创建缓存工作流
expensive = Node("expensive", exec=expensive_operation)
workflow = Flow(expensive)
# 第一次执行(无缓存)
result1 = workflow.run({"input": "data1"})
# 第二次执行(有缓存)
result2 = workflow.run({"input": "data1"})
🔄 并行优化¶
from agnflow import Node, Flow
import asyncio
async def parallel_task1(state):
    await asyncio.sleep(2)
    return {"task1": "completed"}
async def parallel_task2(state):
    await asyncio.sleep(3)
    return {"task2": "completed"}
async def parallel_task3(state):
    await asyncio.sleep(1)
    return {"task3": "completed"}
def combine_parallel_results(state):
    return {
        "all_tasks": [
            state.get("task1"),
            state.get("task2"),
            state.get("task3")
        ]
    }
# 创建并行优化工作流
task1 = Node("task1", aexec=parallel_task1)
task2 = Node("task2", aexec=parallel_task2)
task3 = Node("task3", aexec=parallel_task3)
combine = Node("combine", exec=combine_parallel_results)
# 并行执行所有任务
workflow = Flow([task1, task2, task3] >> combine)
result = asyncio.run(workflow.arun({}))