Skip to content

💡 Examples

Practical examples demonstrating agnflow capabilities.

🎯 Basic Examples

📝 Simple Linear Workflow

from agnflow import Node, Flow

# Define nodes
def step1(state):
    return {"data": "processed", "step": 1}

def step2(state):
    return {"data": state["data"], "step": 2, "final": True}

# Create nodes
node1 = Node("step1", exec=step1)
node2 = Node("step2", exec=step2)

# Build workflow
workflow = Flow(node1 >> node2)

# Execute
result = workflow.run({"initial": "data"})
print(result)  # {'data': 'processed', 'step': 2, 'final': True}

🔄 Parallel Processing

from agnflow import Node, Flow

def process_a(state):
    return {"result_a": "A processed"}

def process_b(state):
    return {"result_b": "B processed"}

def combine(state):
    return {
        **state,
        "combined": f"{state['result_a']} + {state['result_b']}"
    }

# Create workflow with parallel branches
a = Node("process_a", exec=process_a)
b = Node("process_b", exec=process_b)
c = Node("combine", exec=combine)

workflow = Flow(a >> [b, c] >> c)
result = workflow.run({})

🚀 Advanced Examples

🔧 Dynamic Node Management

from agnflow import Node, Flow

def add_data(state):
    return {"data": "new data"}

def process_data(state):
    return {"processed": state["data"]}

# Create initial workflow
workflow = Flow()
workflow[Node("start", exec=lambda s: {"step": "started"})]

# Add nodes dynamically
workflow += Node("add", exec=add_data)
workflow += Node("process", exec=process_data)

# Connect nodes
workflow["start"] >> workflow["add"] >> workflow["process"]

# Execute
result = workflow.run({})

🎲 Conditional Workflows

from agnflow import Node, Flow

def check_condition(state):
    if state.get("condition"):
        return "branch_a"
    else:
        return "branch_b"

def branch_a(state):
    return {"path": "A", "result": "A processed"}

def branch_b(state):
    return {"path": "B", "result": "B processed"}

def finalize(state):
    return {"final": f"Completed via {state['path']}"}

# Create workflow with conditional branching
check = Node("check", exec=check_condition)
a = Node("branch_a", exec=branch_a)
b = Node("branch_b", exec=branch_b)
final = Node("finalize", exec=finalize)

workflow = Flow(check >> (a if True else b) >> final)
result = workflow.run({"condition": True})

👥 Human-in-the-Loop

from agnflow import Node, Flow
from agnflow.agent.hitl.cli import human_in_the_loop

def generate_content(state):
    return {"content": "Generated content for review"}

def human_review(state):
    result, approved = human_in_the_loop(
        "Please review this content:",
        input_data=state["content"],
        options=["approve", "reject", "modify"]
    )

    if approved:
        return {"reviewed": True, "content": result}
    else:
        return "exit", {"reviewed": False}

def publish(state):
    return {"published": True, "content": state["content"]}

# Create HITL workflow
generate = Node("generate", exec=generate_content)
review = Node("review", exec=human_review)
publish = Node("publish", exec=publish)

workflow = Flow(generate >> review >> publish)
result = workflow.run({})

🤖 Multi-Agent Examples

🐝 Swarm Pattern

from agnflow import Node, Swarm

def agent1(state):
    return {"agent1_result": "Task 1 completed"}

def agent2(state):
    return {"agent2_result": "Task 2 completed"}

def agent3(state):
    return {"agent3_result": "Task 3 completed"}

# Create swarm of agents
agent1_node = Node("agent1", exec=agent1)
agent2_node = Node("agent2", exec=agent2)
agent3_node = Node("agent3", exec=agent3)

swarm = Swarm[agent1_node, agent2_node, agent3_node]
result = swarm.run({"task": "collaborative task"})

👨‍💼 Supervisor Pattern

from agnflow import Node, Supervisor

def supervisor(state):
    # Supervisor coordinates workers
    return {"supervision": "coordinating", "tasks": ["task1", "task2"]}

def worker1(state):
    return {"worker1_result": "Task 1 done"}

def worker2(state):
    return {"worker2_result": "Task 2 done"}

# Create supervisor-worker pattern
supervisor_node = Node("supervisor", exec=supervisor)
worker1_node = Node("worker1", exec=worker1)
worker2_node = Node("worker2", exec=worker2)

supervisor_flow = Supervisor[supervisor_node, worker1_node, worker2_node]
result = supervisor_flow.run({"project": "supervised project"})

⚠️ Error Handling Examples

🛡️ Robust Workflow

from agnflow import Node, Flow

def risky_operation(state):
    try:
        # Risky operation that might fail
        result = 1 / 0
        return {"result": result}
    except Exception as e:
        return "error", {"error": str(e)}

def error_handler(state):
    print(f"Handling error: {state['error']}")
    return {"handled": True, "error": state["error"]}

def success_handler(state):
    return {"success": True, "result": state["result"]}

# Create error handling workflow
risky = Node("risky", exec=risky_operation)
error = Node("error", exec=error_handler)
success = Node("success", exec=success_handler)

workflow = Flow(risky >> (error if "error" in state else success))
result = workflow.run({})

🔄 Retry Mechanism

from agnflow import Node, Flow
import time

def retry_operation(state, max_retries=3):
    retries = state.get("retries", 0)

    if retries >= max_retries:
        return "error", {"error": "Max retries exceeded"}

    try:
        # Simulate operation that might fail
        if time.time() % 2 < 1:  # 50% failure rate
            raise Exception("Random failure")

        return {"success": True, "attempts": retries + 1}
    except Exception as e:
        return {
            "retries": retries + 1,
            "last_error": str(e)
        }

def retry_node(state):
    return retry_operation(state)

# Create retry workflow
retry = Node("retry", exec=retry_node)
workflow = Flow(retry >> retry)  # Self-loop for retries

result = workflow.run({"retries": 0})

⚡ Async Examples

🔄 Async Nodes

import asyncio
from agnflow import Node, Flow

async def async_operation(state):
    await asyncio.sleep(1)
    return {"async_result": "completed"}

async def async_combine(state):
    await asyncio.sleep(0.5)
    return {"combined": f"Async: {state['async_result']}"}

# Create async workflow
async_node = Node("async_op", aexec=async_operation)
async_combine_node = Node("async_combine", aexec=async_combine)

workflow = Flow(async_node >> async_combine_node)

# Execute asynchronously
result = asyncio.run(workflow.arun({}))

🔀 Mixed Sync/Async

import asyncio
from agnflow import Node, Flow

def sync_operation(state):
    return {"sync_data": "processed"}

async def async_operation(state):
    await asyncio.sleep(1)
    return {"async_data": "processed"}

def combine_results(state):
    return {
        "combined": f"{state['sync_data']} + {state['async_data']}"
    }

# Create mixed workflow
sync_node = Node("sync", exec=sync_operation)
async_node = Node("async", aexec=async_operation)
combine_node = Node("combine", exec=combine_results)

workflow = Flow(sync_node >> async_node >> combine_node)
result = asyncio.run(workflow.arun({}))

🎨 Visualization Examples

📊 Generate Flowcharts

from agnflow import Node, Flow

def step1(state):
    return {"step": 1}

def step2(state):
    return {"step": 2}

def step3(state):
    return {"step": 3}

# Create complex workflow
a = Node("step1", exec=step1)
b = Node("step2", exec=step2)
c = Node("step3", exec=step3)

workflow = Flow(a >> [b, c] >> b)

# Generate Mermaid chart
workflow.render_mermaid(saved_file="workflow.png", title="Complex Workflow")

# Generate DOT chart
workflow.render_dot(saved_file="workflow.dot")

💾 State Management Examples

🔢 Complex State Operations

from agnflow import Node, Flow

def initialize_state(state):
    return {
        **state,
        "counter": 0,
        "history": [],
        "metadata": {"created": "now"}
    }

def increment_counter(state):
    new_counter = state["counter"] + 1
    new_history = state["history"] + [new_counter]

    return {
        **state,
        "counter": new_counter,
        "history": new_history
    }

def analyze_history(state):
    history = state["history"]
    return {
        **state,
        "analysis": {
            "total": len(history),
            "sum": sum(history),
            "average": sum(history) / len(history) if history else 0
        }
    }

# Create state management workflow
init = Node("init", exec=initialize_state)
increment = Node("increment", exec=increment_counter)
analyze = Node("analyze", exec=analyze_history)

workflow = Flow(init >> increment >> increment >> increment >> analyze)
result = workflow.run({})

💾 State Persistence

import json
from agnflow import Node, Flow

def save_state(state):
    with open("workflow_state.json", "w") as f:
        json.dump(state, f)
    return state

def load_state(state):
    try:
        with open("workflow_state.json", "r") as f:
            loaded_state = json.load(f)
        return {**state, **loaded_state}
    except FileNotFoundError:
        return state

def process_with_persistence(state):
    return {"processed": True, "data": state.get("data", "default")}

# Create persistence workflow
load = Node("load", exec=load_state)
process = Node("process", exec=process_with_persistence)
save = Node("save", exec=save_state)

workflow = Flow(load >> process >> save)
result = workflow.run({"data": "important data"})

⚡ Performance Optimization Examples

🗄️ Caching Mechanism

from agnflow import Node, Flow
import hashlib
import json

class Cache:
    def __init__(self):
        self._cache = {}

    def get(self, key):
        return self._cache.get(key)

    def set(self, key, value):
        self._cache[key] = value

cache = Cache()

def expensive_operation(state):
    # Generate cache key
    cache_key = hashlib.md5(
        json.dumps(state, sort_keys=True).encode()
    ).hexdigest()

    # Check cache
    cached_result = cache.get(cache_key)
    if cached_result:
        return {"result": cached_result, "cached": True}

    # Execute expensive operation
    result = sum(i**2 for i in range(10000))

    # Cache result
    cache.set(cache_key, result)

    return {"result": result, "cached": False}

# Create caching workflow
expensive = Node("expensive", exec=expensive_operation)
workflow = Flow(expensive)

# First execution (no cache)
result1 = workflow.run({"input": "data1"})

# Second execution (with cache)
result2 = workflow.run({"input": "data1"})

🔄 Parallel Optimization

from agnflow import Node, Flow
import asyncio

async def parallel_task1(state):
    await asyncio.sleep(2)
    return {"task1": "completed"}

async def parallel_task2(state):
    await asyncio.sleep(3)
    return {"task2": "completed"}

async def parallel_task3(state):
    await asyncio.sleep(1)
    return {"task3": "completed"}

def combine_parallel_results(state):
    return {
        "all_tasks": [
            state.get("task1"),
            state.get("task2"),
            state.get("task3")
        ]
    }

# Create parallel optimization workflow
task1 = Node("task1", aexec=parallel_task1)
task2 = Node("task2", aexec=parallel_task2)
task3 = Node("task3", aexec=parallel_task3)
combine = Node("combine", exec=combine_parallel_results)

# Execute all tasks in parallel
workflow = Flow([task1, task2, task3] >> combine)
result = asyncio.run(workflow.arun({}))