🔧 API Reference¶
Complete API documentation for agnflow.
🧩 Core Classes¶
🔧 Node¶
The fundamental building block of workflows.
Parameters: - name
(str): Node identifier - exec
(callable): Synchronous execution function - aexec
(callable): Asynchronous execution function - max_retries
(int): Maximum retry attempts, default 1 - wait
(int): Retry interval in seconds, default 0
Methods: - run(state)
: Execute node synchronously - arun(state)
: Execute node asynchronously
🌊 Flow¶
Container for organizing and executing nodes.
Parameters: - name
(str): Flow name
Methods: - run(state, entry_action=None, max_steps=None)
: Execute flow synchronously - arun(state, entry_action=None, max_steps=None)
: Execute flow asynchronously - render_mermaid(saved_file=None, title=None)
: Generate Mermaid flowchart - render_dot(saved_file=None)
: Generate DOT flowchart
Operators: - flow[node]
: Add node - flow += node
: Add node - flow -= node
: Remove node
🐝 Swarm¶
Multi-agent coordination pattern where all nodes are connected to each other.
Parameters: - name
(str): Swarm name
👨💼 Supervisor¶
Supervision pattern where the first node supervises others.
Parameters: - name
(str): Supervisor name
🔗 Connection Operators¶
➡️ Forward Connection¶
Connects node a
to node b
in forward direction.
⬅️ Reverse Connection¶
Connects node b
to node a
in reverse direction.
🔀 Parallel Connection¶
Connects node a
to multiple nodes b
, c
, and d
in parallel.
❌ Disconnection¶
Removes connections between nodes.
⚡ Execution Functions¶
🔄 Synchronous Execution¶
Parameters: - state
(dict): Current workflow state
Returns: - dict
: Updated state - tuple
: (action_name, state) for flow control
⚡ Asynchronous Execution¶
async def async_function(state):
# Async processing
await asyncio.sleep(1)
return {"result": "async processed"}
Parameters: - state
(dict): Current workflow state
Returns: - dict
: Updated state - tuple
: (action_name, state) for flow control
🎮 Flow Control Actions¶
🚪 Exit Flow¶
Terminates workflow execution.
🎯 Jump to Node¶
Jumps to specified node.
⚠️ Error Handling¶
Handles errors in workflow.
💾 State Management¶
📋 State Structure¶
🔄 State Updates¶
def update_state(state):
# Immutable update
new_state = {**state, "step": state["step"] + 1}
return new_state
�� Visualization¶
📊 Mermaid Configuration¶
Parameters: - saved_file
(str): Save file path - title
(str): Chart title
🔷 DOT Configuration¶
Parameters: - saved_file
(str): Save file path
🚀 Advanced Features¶
🔧 Dynamic Node Management¶
# Add nodes
flow += Node("new_node", exec=my_function)
# Remove nodes
flow -= existing_node
# Batch operations
flow += [node1, node2, node3]
flow -= [old_node1, old_node2]
🎲 Conditional Connections¶
# Condition-based connections
if condition:
flow = Flow(a >> b)
else:
flow = Flow(a >> c)
# Dynamic connections
flow = Flow(a >> (b if condition else c))
🛡️ Error Handling¶
def robust_function(state):
try:
# Risky operation
result = risky_operation()
return {"success": True, "result": result}
except Exception as e:
return "error", {"error": str(e), "step": "robust_function"}
def error_handler(state):
print(f"Error in {state['step']}: {state['error']}")
return {"handled": True}
⚡ Performance Optimization¶
🔄 Async Execution¶
# Use async execution for better performance
async def async_workflow():
flow = Flow(async_node1 >> async_node2 >> async_node3)
return await flow.arun(initial_state)
# Run async workflow
result = asyncio.run(async_workflow())
🔀 Parallel Processing¶
# Execute multiple nodes in parallel
parallel_nodes = [Node(f"task_{i}", exec=task_function) for i in range(5)]
workflow = Flow(parallel_nodes >> combine_node)
🗄️ Caching Mechanism¶
def cached_function(state):
cache_key = hash(str(state))
if cache_key in cache:
return cache[cache_key]
result = expensive_operation(state)
cache[cache_key] = result
return result
📚 Best Practices¶
💾 State Design¶
# Good state design
state = {
"data": "actual data",
"metadata": {
"created_at": "2024-01-01",
"version": "1.0"
},
"results": [],
"errors": []
}
# Avoid storing large data in state
# Avoid storing functions or complex objects in state
🛡️ Error Handling¶
def safe_function(state):
try:
return process_safely(state)
except ValueError as e:
return {"error": "Invalid input", "details": str(e)}
except Exception as e:
return "error", {"error": "Unexpected error", "details": str(e)}
🔧 Resource Management¶
def resource_aware_function(state):
# Check resource availability
if not check_resources():
return "error", {"error": "Insufficient resources"}
# Use resources
result = use_resources(state)
# Clean up resources
cleanup_resources()
return result
🔌 Extension and Customization¶
🧩 Custom Node Types¶
class CustomNode(Node):
def __init__(self, name, custom_param, **kwargs):
super().__init__(name, **kwargs)
self.custom_param = custom_param
def run(self, state):
# Custom logic
return {"custom_result": self.custom_param}
🌊 Custom Flow Types¶
class CustomFlow(Flow):
def __init__(self, name=None, custom_config=None):
super().__init__(name=name)
self.custom_config = custom_config
def custom_method(self):
# Custom method
pass
🔌 Plugin System¶
def register_plugin(plugin_name, plugin_function):
"""Register plugin function"""
plugins[plugin_name] = plugin_function
def use_plugin(plugin_name, state):
"""Use plugin function"""
if plugin_name in plugins:
return plugins[plugin_name](state)
else:
raise ValueError(f"Plugin {plugin_name} not found")