🧠 Core Concepts¶
Learn the fundamental concepts of agnflow.
🔧 Node Types¶
🔧 Basic Nodes¶
Nodes are the fundamental building blocks of workflows:
from agnflow import Node
# Synchronous node
sync_node = Node("sync", exec=lambda state: {"result": "done"})
# Asynchronous node
async def async_func(state):
await asyncio.sleep(1)
return {"result": "async done"}
async_node = Node("async", aexec=async_func)
🎯 Special Node Types¶
🌊 Flow Nodes¶
Container nodes that can hold other nodes:
from agnflow import Flow
# Create a flow container
flow = Flow()
flow[node1, node2, node3]
# Or create with initial nodes
flow = Flow(node1 >> node2 >> node3)
🤖 Swarm Nodes¶
Special nodes for multi-agent coordination:
from agnflow import Swarm, Supervisor
# Swarm - all nodes connected to each other
swarm = Swarm[node1, node2, node3]
# Supervisor - first node supervises others
supervisor = Supervisor[supervisor_node, worker1, worker2]
🔗 Connection Patterns¶
➡️ Linear Connections¶
🔀 Branching Connections¶
🔄 Looping Connections¶
🔄 Runtime Dynamic Management¶
➕ Adding Nodes¶
# Add single node
flow += new_node
# Add using bracket syntax
flow[new_node]
# Add multiple nodes
flow += [node1, node2, node3]
➖ Removing Nodes¶
🔗 Symmetric Operations¶
💾 State Management¶
🌊 State Flow¶
State flows through nodes and can be modified:
def process_node(state):
# Modify state
state["processed"] = True
state["timestamp"] = time.time()
return state
node = Node("process", exec=process_node)
💾 State Persistence¶
State is automatically passed between nodes:
# Initial state
initial_state = {"data": "hello", "step": 0}
# State flows through nodes
node1 = Node("step1", exec=lambda s: {**s, "step": s["step"] + 1})
node2 = Node("step2", exec=lambda s: {**s, "step": s["step"] + 1})
flow = Flow(node1 >> node2)
result = flow.run(initial_state)
# result: {"data": "hello", "step": 2}
🎮 Execution Control¶
🚪 Entry Points¶
Control where execution starts:
# Start from specific node
flow.run(state, entry_action="node_name")
# Start from first node (default)
flow.run(state)
⏱️ Execution Limits¶
# Limit execution steps
flow.run(state, max_steps=10)
# Async execution with limits
await flow.arun(state, max_steps=10)
🎨 Visualization¶
📊 Mermaid Flowcharts¶
# Generate Mermaid code
mermaid = flow.render_mermaid()
# Save as image
flow.render_mermaid(saved_file="workflow.png", title="My Workflow")
🔷 DOT Flowcharts¶
# Generate DOT code
dot = flow.render_dot()
# Save as image
flow.render_dot(saved_file="workflow.png")
Advanced Patterns¶
Human-in-the-Loop (HITL)¶
from agnflow.agent.hitl.cli import human_in_the_loop
def review_node(state):
result, approved = human_in_the_loop(
"Please review this data",
input_data=state
)
if approved:
return {"reviewed": True, "result": result}
else:
return "exit", {"reviewed": False}
review = Node("review", exec=review_node)
Error Handling¶
def robust_node(state):
try:
# Your logic here
return {"success": True}
except Exception as e:
return "error", {"error": str(e)}
node = Node("robust", exec=robust_node)