The Graph Analytics AI Platform now supports parallel execution of independent workflow steps, providing 40-60% performance improvement over sequential execution.
This guide explains how to use parallel execution, what performance gains to expect, and how the system works under the hood.
import asyncio
from graph_analytics_ai.ai.agents import AgenticWorkflowRunner
async def main():
# Create runner with parallel execution enabled
runner = AgenticWorkflowRunner(
graph_name="my_graph",
enable_tracing=True
)
# Run workflow with parallelism (default)
state = await runner.run_async(
input_documents=[{"path": "requirements.md"}],
enable_parallelism=True # This is the default
)
print(f"Generated {len(state.reports)} reports")
runner.print_trace_summary() # View performance metrics
# Run async workflow
asyncio.run(main())import asyncio
from graph_analytics_ai.ai.agents import run_agentic_workflow_async
async def main():
state = await run_agentic_workflow_async(
graph_name="my_graph",
max_executions=5,
enable_parallelism=True
)
asyncio.run(main())The system parallelizes independent workflow phases:
| Phase | Execution Mode | Speedup |
|---|---|---|
| Phase 1: Schema Analysis + Requirements | Parallel | ~2x faster |
| Phase 2: Use Case Generation | Sequential | No change |
| Phase 3: Template Generation | Sequential | No change |
| Phase 4: Execute Templates | Parallel (N templates) | ~Nx faster |
| Phase 5: Generate Reports | Parallel (N reports) | ~Nx faster |
Typical Workflow (5 templates, 5 reports):
- Sequential: ~180 seconds
- Parallel: ~70-90 seconds
- Improvement: 40-60% faster
Breakdown:
- Phase 1 (Schema + Requirements): 40s → 20s (2x speedup)
- Phase 4 (5 executions): 60s → 15s (4x speedup)
- Phase 5 (5 reports): 30s → 8s (4x speedup)
The parallel execution system intelligently orchestrates agents to run independent tasks concurrently:
graph TD
A[Orchestrator Agent] --> V[Industry Vertical Resolver<br/>built-in/custom/auto-generate]
V --> B[Schema Analysis Agent]
V --> C[Requirements Extraction Agent]
B --> D[Use Case Generation Agent]
C --> D
D --> E[Template Agent]
E --> F1[Execute Template 1]
E --> F2[Execute Template 2]
E --> F3[Execute Template 3]
E --> F4[Execute Template N]
F1 --> G1[Generate Report 1]
F2 --> G2[Generate Report 2]
F3 --> G3[Generate Report 3]
F4 --> G4[Generate Report N]
G1 --> H[Complete]
G2 --> H
G3 --> H
G4 --> H
K[(Analytics Catalog<br/>epoch + lineage)]:::catalog
C -.-> K
D -.-> K
E -.-> K
F1 -.-> K
F2 -.-> K
F3 -.-> K
F4 -.-> K
style A fill:#e1f5ff,stroke:#01579b,stroke-width:2px
style V fill:#bbdefb,stroke:#1565c0,stroke-width:2px
style B fill:#fff9c4,stroke:#f57f17,stroke-width:2px
style C fill:#fff9c4,stroke:#f57f17,stroke-width:2px
style D fill:#f3e5f5,stroke:#4a148c,stroke-width:2px
style E fill:#e8f5e9,stroke:#1b5e20,stroke-width:2px
style F1 fill:#ffe0b2,stroke:#e65100,stroke-width:2px
style F2 fill:#ffe0b2,stroke:#e65100,stroke-width:2px
style F3 fill:#ffe0b2,stroke:#e65100,stroke-width:2px
style F4 fill:#ffe0b2,stroke:#e65100,stroke-width:2px
style G1 fill:#c5e1a5,stroke:#33691e,stroke-width:2px
style G2 fill:#c5e1a5,stroke:#33691e,stroke-width:2px
style G3 fill:#c5e1a5,stroke:#33691e,stroke-width:2px
style G4 fill:#c5e1a5,stroke:#33691e,stroke-width:2px
style H fill:#e1f5ff,stroke:#01579b,stroke-width:2px
classDef catalog fill:#ede7f6,stroke:#4527a0,stroke-width:2px
Key Features:
- Yellow boxes (Phase 1): Schema + Requirements run in parallel
- Orange boxes (Phase 4): All templates execute concurrently
- Green boxes (Phase 5): All reports generate in parallel
Sequential Execution (v3.0.0):
graph LR
V[Industry Vertical] --> A[Schema]
V --> B[Requirements]
B --> C[Use Cases]
C --> D[Templates]
D --> E[Execute 1]
E --> F[Execute 2]
F --> G[Execute 3]
G --> H[Report 1]
H --> I[Report 2]
I --> J[Report 3]
style V fill:#ffcdd2,stroke:#c62828
style A fill:#ffcdd2,stroke:#c62828
style B fill:#ffcdd2,stroke:#c62828
style E fill:#ffcdd2,stroke:#c62828
style F fill:#ffcdd2,stroke:#c62828
style G fill:#ffcdd2,stroke:#c62828
style H fill:#ffcdd2,stroke:#c62828
style I fill:#ffcdd2,stroke:#c62828
style J fill:#ffcdd2,stroke:#c62828
Parallel Execution (v3.1.0):
graph TB
subgraph Phase0[" Phase 0: Industry Vertical (auto)"]
V[Resolve industry prompt + patterns]
end
subgraph Phase1[" Phase 1: Parallel (2x speedup)"]
A[Schema Analysis]
B[Requirements Extraction]
end
subgraph Phase2["Phase 2: Sequential"]
C[Use Case Generation]
end
subgraph Phase3["Phase 3: Sequential"]
D[Template Generation]
end
subgraph Phase4[" Phase 4: Parallel (Nx speedup)"]
E1[Execute Template 1]
E2[Execute Template 2]
E3[Execute Template 3]
end
subgraph Phase5[" Phase 5: Parallel (Nx speedup)"]
R1[Generate Report 1]
R2[Generate Report 2]
R3[Generate Report 3]
end
K[(Analytics Catalog)]:::catalog
V --> A
V --> B
A --> C
B --> C
C --> D
D --> E1
D --> E2
D --> E3
E1 --> R1
E2 --> R2
E3 --> R3
C -.-> K
D -.-> K
E1 -.-> K
E2 -.-> K
E3 -.-> K
style Phase0 fill:#bbdefb,stroke:#1565c0,stroke-width:3px
style Phase1 fill:#c8e6c9,stroke:#2e7d32,stroke-width:3px
style Phase4 fill:#c8e6c9,stroke:#2e7d32,stroke-width:3px
style Phase5 fill:#c8e6c9,stroke:#2e7d32,stroke-width:3px
classDef catalog fill:#ede7f6,stroke:#4527a0,stroke-width:2px
All agents now support both synchronous and asynchronous execution:
# Synchronous (backward compatible)
response = agent.process(message, state)
# Asynchronous (new)
response = await agent.process_async(message, state)The orchestrator identifies independent steps and runs them concurrently:
# Phase 1: Run schema + requirements in parallel
schema_task = execute_step_async("schema_analysis", state)
requirements_task = execute_step_async("requirements_extraction", state)
await asyncio.gather(schema_task, requirements_task)
# Phase 4: Execute all templates in parallel
tasks = [execute_template(t) for t in templates]
results = await asyncio.gather(*tasks)
# Phase 5: Generate all reports in parallel
tasks = [generate_report(r) for r in results]
reports = await asyncio.gather(*tasks)AgentState now includes async-safe methods for concurrent access:
# Async state updates (thread-safe)
await state.add_message_async(message)
await state.add_error_async(agent_name, error)
await state.mark_step_complete_async(step)LLM providers support native async HTTP requests using aiohttp:
# Synchronous (uses requests)
response = llm.generate("prompt")
# Asynchronous (uses aiohttp)
response = await llm.generate_async("prompt")# Enable parallel execution (default, recommended)
state = await runner.run_async(enable_parallelism=True)
# Disable parallel execution (sequential, for debugging)
state = await runner.run_async(enable_parallelism=False)The system automatically manages concurrency, but you can control the number of concurrent executions:
runner = AgenticWorkflowRunner(
max_executions=5 # Limit to 5 concurrent template executions
)# Good: Use async for production workflows
state = await runner.run_async(enable_parallelism=True)# Good: Use sequential for debugging
state = await runner.run_async(enable_parallelism=False)
# Or use synchronous API
state = runner.run()runner = AgenticWorkflowRunner(enable_tracing=True)
state = await runner.run_async()
# View performance metrics
runner.print_trace_summary()
# Export detailed trace
runner.export_trace("output/traces")# Good: Use asyncio.run() at top level
import asyncio
async def main():
state = await runner.run_async()
if __name__ == "__main__":
asyncio.run(main())The parallel execution feature is fully backward compatible:
# Old synchronous API still works
runner = AgenticWorkflowRunner()
state = runner.run() # Sequential execution
# New async API for better performance
state = await runner.run_async() # Parallel executionAll existing code continues to work without changes.
runner = AgenticWorkflowRunner(enable_tracing=True)
state = await runner.run_async()
# Print performance summary
runner.print_trace_summary()Example Output:
======================================================================
Workflow Trace Summary
======================================================================
Trace ID: workflow-1234567890
Total Events: 42
Performance:
Total Time: 72.45s
Steps Completed: 6
Avg Time/Step: 12075ms
LLM Usage:
Total Calls: 15
Total Time: 28.30s
Total Tokens: 45,230
Estimated Cost: $0.0453 USD
Communication:
Messages Exchanged: 24
Errors: 0
Slowest Agents:
1. Execution Specialist: 30000ms
2. Requirements Analyst: 15000ms
3. Reporting Specialist: 12000ms
# Export trace in multiple formats
runner.export_trace(
output_dir="output/traces",
formats=["json", "html", "markdown"]
)Solution: Ensure you're using asyncio.run() at the top level:
import asyncio
async def main():
state = await runner.run_async()
asyncio.run(main()) # CorrectSolution: Install the required dependency:
pip install aiohttp>=3.9.0Or install from requirements:
pip install -r requirements.txtPossible causes:
- Database bottleneck: If your database connection is slow, parallel execution won't help much
- Small workflow: For workflows with 1-2 templates, overhead may negate benefits
- Sequential fallback: Check that
enable_parallelism=Trueis set
Debug:
# Enable debug mode to see detailed execution flow
runner = AgenticWorkflowRunner(
enable_tracing=True,
enable_debug_mode=True
)Parallel execution requires:
- Python 3.7+ (asyncio support)
aiohttp>=3.9.0(async HTTP requests)
- Base Agent: Added
process_async()method - LLM Provider: Added
generate_async(),chat_async()methods - AgentState: Added async-safe state update methods
- Orchestrator: Added parallel workflow orchestration
- Runner: Added
run_async()method
CPU-Bound Operations (minimal improvement):
- Template generation
- Use case generation
I/O-Bound Operations (major improvement):
- LLM API calls (HTTP requests)
- Database queries (schema extraction)
- Template execution (GAE API calls)
Result: 40-60% overall improvement due to I/O parallelization.
import asyncio
from graph_analytics_ai.ai.agents import run_agentic_workflow_async
async def main():
state = await run_agentic_workflow_async(
graph_name="ecommerce_graph",
max_executions=3
)
print(f" Generated {len(state.reports)} reports")
for report in state.reports:
print(f" - {report.title}")
asyncio.run(main())import asyncio
from graph_analytics_ai.ai.agents import AgenticWorkflowRunner
async def main():
runner = AgenticWorkflowRunner(
graph_name="social_network",
core_collections=["users", "posts", "follows"],
enable_tracing=True,
enable_debug_mode=False
)
documents = [
{"path": "docs/requirements.md"},
{"content": "Find influential users in our platform."}
]
state = await runner.run_async(
input_documents=documents,
max_executions=5,
enable_parallelism=True
)
# Export results
runner.export_reports(state, "output/reports")
runner.export_trace("output/traces")
# Print metrics
runner.print_trace_summary()
asyncio.run(main())# In Jupyter, you can use await directly in async cells
from graph_analytics_ai.ai.agents import AgenticWorkflowRunner
runner = AgenticWorkflowRunner(enable_tracing=True)
# Use await directly (no asyncio.run() needed in Jupyter)
state = await runner.run_async(enable_parallelism=True)
print(f"Generated {len(state.reports)} reports")
runner.print_trace_summary()Test Case: E-commerce graph, 5 use cases, 5 templates
| Metric | Sequential | Parallel | Improvement |
|---|---|---|---|
| Total Time | 180s | 78s | 56% faster |
| Phase 1 (Schema + Reqs) | 40s | 22s | 45% faster |
| Phase 4 (Execution) | 75s | 20s | 73% faster |
| Phase 5 (Reporting) | 35s | 10s | 71% faster |
| LLM Calls | 15 | 15 | Same |
| Cost | $0.045 | $0.045 | Same |
Conclusion: Parallel execution provides significant time savings without increasing cost.
Q: Does parallel execution increase costs? A: No. The same LLM calls and database queries are made, just executed in parallel.
Q: Is it safe to use in production? A: Yes. All state updates are thread-safe, and error handling is preserved.
Q: What if I don't have aiohttp installed?
A: The system automatically falls back to synchronous execution using requests.
Q: Can I mix sync and async agents?
A: Yes. Async methods automatically wrap sync agents using run_in_executor().
Q: How much memory does parallel execution use? A: Slightly more (5-10% increase) due to concurrent task management. Still well within reasonable limits.
Before (synchronous):
from graph_analytics_ai.ai.agents import AgenticWorkflowRunner
runner = AgenticWorkflowRunner()
state = runner.run()After (parallel):
import asyncio
from graph_analytics_ai.ai.agents import AgenticWorkflowRunner
async def main():
runner = AgenticWorkflowRunner()
state = await runner.run_async(enable_parallelism=True)
return state
state = asyncio.run(main())No Breaking Changes: Old code continues to work without modification!
For issues or questions about parallel execution:
- Check this guide and troubleshooting section
- Review trace output with
enable_debug_mode=True - File an issue on GitHub with trace details
- v3.1.0 (Current): Full parallel execution support
- v3.0.0: Sequential agentic workflow
- v2.1.0: Template-based analytics
Ready to get 40-60% faster workflows? Start using runner.run_async() today!