The Model Context Protocol (MCP) enables BotForge RAG to integrate with external tool servers, extending the system's capabilities beyond information retrieval to include dynamic tool execution. This guide covers how to build, deploy, and integrate MCP servers with BotForge RAG.
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ BotForge │────▶│ MCP Manager │────▶│ External MCP │
│ Agent │ │ │ │ Servers │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
│ │ │
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Tool │ │ Server │ │ HTTP │
│Selection│ │Registry │ │ API │
└─────────┘ └─────────┘ └─────────┘
Every MCP server must implement these endpoints:
Returns server metadata and available tools.
Response Format:
{
"server": {
"name": "Calculator Server",
"version": "1.0.0",
"description": "Mathematical calculations and string operations"
},
"tools": [
{
"name": "calculator",
"description": "Perform basic mathematical calculations",
"schema": {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "Mathematical expression to evaluate (e.g., '25 * 17 + 100')"
}
},
"required": ["expression"]
}
}
],
"protocol_version": "1.0"
}Returns available tools (simplified version of capabilities).
Response Format:
{
"tools": [
{
"name": "calculator",
"description": "Perform basic mathematical calculations",
"schema": {...}
}
]
}Executes a specific tool with parameters.
Request Format:
{
"tool_name": "calculator",
"parameters": {
"expression": "25 * 17 + 100"
}
}Response Format:
{
"success": true,
"result": {
"calculation": "25 * 17 + 100",
"answer": 525
},
"error": null
}Health check endpoint.
{
"status": "healthy",
"server": "Calculator Server",
"uptime": 3600,
"tools_available": 3
}Root endpoint with server information.
{
"message": "Calculator MCP Server",
"version": "1.0.0",
"endpoints": {
"capabilities": "/capabilities",
"tools": "/tools",
"execute": "/execute",
"health": "/health"
}
}Here's a complete example of a simple MCP server:
#!/usr/bin/env python3
"""
Simple MCP server implementation using FastAPI
"""
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
import uvicorn
import json
app = FastAPI(title="Calculator MCP Server", version="1.0.0")
# Tool definitions
AVAILABLE_TOOLS = [
{
"name": "calculator",
"description": "Perform basic mathematical calculations",
"schema": {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "Mathematical expression to evaluate"
}
},
"required": ["expression"]
}
},
{
"name": "string_operations",
"description": "Perform string operations like uppercase, lowercase, reverse",
"schema": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "Text to process"
},
"operation": {
"type": "string",
"description": "Operation: 'upper', 'lower', 'reverse', 'length'"
}
},
"required": ["text", "operation"]
}
}
]
class ToolExecuteRequest(BaseModel):
tool_name: str
parameters: Dict[str, Any]
class ToolExecuteResponse(BaseModel):
success: bool
result: Any
error: Optional[str] = None
@app.get("/capabilities")
async def get_capabilities():
"""Return server capabilities and available tools"""
return {
"server": {
"name": "Calculator MCP Server",
"version": "1.0.0",
"description": "Mathematical calculations and string operations"
},
"tools": AVAILABLE_TOOLS,
"protocol_version": "1.0"
}
@app.get("/tools")
async def list_tools():
"""List all available tools"""
return {"tools": AVAILABLE_TOOLS}
@app.post("/execute", response_model=ToolExecuteResponse)
async def execute_tool(request: ToolExecuteRequest):
"""Execute a tool with given parameters"""
try:
tool_name = request.tool_name
parameters = request.parameters
if tool_name == "calculator":
return execute_calculator(parameters)
elif tool_name == "string_operations":
return execute_string_operations(parameters)
else:
return ToolExecuteResponse(
success=False,
result=None,
error=f"Unknown tool: {tool_name}"
)
except Exception as e:
return ToolExecuteResponse(
success=False,
result=None,
error=f"Tool execution failed: {str(e)}"
)
def execute_calculator(parameters: Dict[str, Any]) -> ToolExecuteResponse:
"""Execute calculator tool"""
expression = parameters.get("expression", "")
if not expression:
return ToolExecuteResponse(
success=False,
result=None,
error="Expression parameter is required"
)
try:
# Safe evaluation for basic math
allowed_chars = set('0123456789+-*/(). ')
if all(c in allowed_chars for c in expression):
result = eval(expression)
return ToolExecuteResponse(
success=True,
result={
"calculation": expression,
"answer": result
}
)
else:
return ToolExecuteResponse(
success=False,
result=None,
error="Invalid characters in mathematical expression"
)
except Exception as e:
return ToolExecuteResponse(
success=False,
result=None,
error=f"Calculation error: {str(e)}"
)
def execute_string_operations(parameters: Dict[str, Any]) -> ToolExecuteResponse:
"""Execute string operations tool"""
text = parameters.get("text", "")
operation = parameters.get("operation", "").lower()
if not text:
return ToolExecuteResponse(
success=False,
result=None,
error="Text parameter is required"
)
if operation == "upper":
result = text.upper()
elif operation == "lower":
result = text.lower()
elif operation == "reverse":
result = text[::-1]
elif operation == "length":
result = len(text)
else:
return ToolExecuteResponse(
success=False,
result=None,
error="Invalid operation. Use: upper, lower, reverse, length"
)
return ToolExecuteResponse(
success=True,
result={
"original": text,
"operation": operation,
"result": result
}
)
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"server": "Calculator MCP Server",
"tools_available": len(AVAILABLE_TOOLS)
}
@app.get("/")
async def root():
"""Root endpoint with server info"""
return {
"message": "Calculator MCP Server",
"version": "1.0.0",
"endpoints": {
"capabilities": "/capabilities",
"tools": "/tools",
"execute": "/execute",
"health": "/health"
}
}
if __name__ == "__main__":
print("🚀 Starting Calculator MCP Server")
print("📋 Available endpoints:")
print(" • Capabilities: http://localhost:3001/capabilities")
print(" • Tools: http://localhost:3001/tools")
print(" • Execute: http://localhost:3001/execute")
print(" • Health: http://localhost:3001/health")
uvicorn.run(app, host="0.0.0.0", port=3001)const express = require('express');
const app = express();
const port = 3001;
app.use(express.json());
// Tool definitions
const AVAILABLE_TOOLS = [
{
name: "web_scraper",
description: "Scrape content from web pages",
schema: {
type: "object",
properties: {
url: {
type: "string",
description: "URL to scrape"
},
selector: {
type: "string",
description: "CSS selector for content extraction"
}
},
required: ["url"]
}
},
{
name: "file_processor",
description: "Process and analyze files",
schema: {
type: "object",
properties: {
file_path: {
type: "string",
description: "Path to the file"
},
operation: {
type: "string",
description: "Operation: 'read', 'analyze', 'summarize'"
}
},
required: ["file_path", "operation"]
}
}
];
// Capabilities endpoint
app.get('/capabilities', (req, res) => {
res.json({
server: {
name: "Web Tools MCP Server",
version: "1.0.0",
description: "Web scraping and file processing tools"
},
tools: AVAILABLE_TOOLS,
protocol_version: "1.0"
});
});
// Tools endpoint
app.get('/tools', (req, res) => {
res.json({
tools: AVAILABLE_TOOLS
});
});
// Execute endpoint
app.post('/execute', async (req, res) => {
try {
const { tool_name, parameters } = req.body;
let result;
switch (tool_name) {
case 'web_scraper':
result = await executeWebScraper(parameters);
break;
case 'file_processor':
result = await executeFileProcessor(parameters);
break;
default:
return res.json({
success: false,
result: null,
error: `Unknown tool: ${tool_name}`
});
}
res.json(result);
} catch (error) {
res.json({
success: false,
result: null,
error: `Tool execution failed: ${error.message}`
});
}
});
// Tool implementations
async function executeWebScraper(parameters) {
const { url, selector } = parameters;
// Implement web scraping logic here
// This is a simplified example
try {
const response = await fetch(url);
const html = await response.text();
return {
success: true,
result: {
url: url,
content: html.slice(0, 1000), // Truncated for example
length: html.length
}
};
} catch (error) {
return {
success: false,
result: null,
error: `Scraping failed: ${error.message}`
};
}
}
async function executeFileProcessor(parameters) {
const { file_path, operation } = parameters;
// Implement file processing logic here
try {
const fs = require('fs').promises;
switch (operation) {
case 'read':
const content = await fs.readFile(file_path, 'utf8');
return {
success: true,
result: {
file_path: file_path,
content: content,
size: content.length
}
};
case 'analyze':
const stats = await fs.stat(file_path);
return {
success: true,
result: {
file_path: file_path,
size: stats.size,
modified: stats.mtime,
type: file_path.split('.').pop()
}
};
default:
return {
success: false,
result: null,
error: `Unknown operation: ${operation}`
};
}
} catch (error) {
return {
success: false,
result: null,
error: `File processing failed: ${error.message}`
};
}
}
// Health check
app.get('/health', (req, res) => {
res.json({
status: 'healthy',
server: 'Web Tools MCP Server',
tools_available: AVAILABLE_TOOLS.length
});
});
// Root endpoint
app.get('/', (req, res) => {
res.json({
message: 'Web Tools MCP Server',
version: '1.0.0',
endpoints: {
capabilities: '/capabilities',
tools: '/tools',
execute: '/execute',
health: '/health'
}
});
});
app.listen(port, () => {
console.log(`🚀 Web Tools MCP Server listening at http://localhost:${port}`);
console.log('📋 Available endpoints:');
console.log(` • Capabilities: http://localhost:${port}/capabilities`);
console.log(` • Tools: http://localhost:${port}/tools`);
console.log(` • Execute: http://localhost:${port}/execute`);
console.log(` • Health: http://localhost:${port}/health`);
});curl -X POST "http://localhost:8000/api/mcp/register" \
-H "Content-Type: application/json" \
-d '{
"bot_id": "550e8400-e29b-41d4-a716-446655440000",
"name": "Calculator Server",
"endpoint_url": "http://localhost:3001",
"description": "Mathematical calculations and string operations",
"timeout_seconds": 30,
"retry_attempts": 3
}'import requests
def register_mcp_server(bot_id, server_config):
"""Register an MCP server for a bot"""
response = requests.post(
"http://localhost:8000/api/mcp/register",
json={
"bot_id": bot_id,
"name": server_config["name"],
"endpoint_url": server_config["endpoint_url"],
"description": server_config.get("description", ""),
"timeout_seconds": server_config.get("timeout_seconds", 30),
"retry_attempts": server_config.get("retry_attempts", 3)
}
)
return response.json()
# Example usage
bot_id = "550e8400-e29b-41d4-a716-446655440000"
server_config = {
"name": "Calculator Server",
"endpoint_url": "http://localhost:3001",
"description": "Mathematical calculations"
}
result = register_mcp_server(bot_id, server_config)
print(f"Server registered: {result['mcp_server_id']}")# Test server capabilities
curl "http://localhost:3001/capabilities"
# Test tool execution
curl -X POST "http://localhost:3001/execute" \
-H "Content-Type: application/json" \
-d '{
"tool_name": "calculator",
"parameters": {
"expression": "25 * 17 + 100"
}
}'
# Test health check
curl "http://localhost:3001/health"import pytest
import requests
import asyncio
from typing import Dict, Any
class MCPServerTester:
def __init__(self, base_url: str):
self.base_url = base_url
def test_capabilities(self):
"""Test capabilities endpoint"""
response = requests.get(f"{self.base_url}/capabilities")
assert response.status_code == 200
data = response.json()
assert "server" in data
assert "tools" in data
assert "protocol_version" in data
# Validate server info
server = data["server"]
assert "name" in server
assert "version" in server
# Validate tools
tools = data["tools"]
assert isinstance(tools, list)
for tool in tools:
assert "name" in tool
assert "description" in tool
assert "schema" in tool
def test_tools_endpoint(self):
"""Test tools endpoint"""
response = requests.get(f"{self.base_url}/tools")
assert response.status_code == 200
data = response.json()
assert "tools" in data
assert isinstance(data["tools"], list)
def test_tool_execution(self, tool_name: str, parameters: Dict[str, Any]):
"""Test tool execution"""
response = requests.post(
f"{self.base_url}/execute",
json={
"tool_name": tool_name,
"parameters": parameters
}
)
assert response.status_code == 200
data = response.json()
assert "success" in data
if data["success"]:
assert "result" in data
else:
assert "error" in data
def test_health_check(self):
"""Test health endpoint"""
response = requests.get(f"{self.base_url}/health")
assert response.status_code == 200
data = response.json()
assert "status" in data
assert data["status"] == "healthy"
def run_all_tests(self):
"""Run all tests"""
print("Testing MCP Server...")
# Test basic endpoints
self.test_capabilities()
self.test_tools_endpoint()
self.test_health_check()
# Get available tools for testing
tools_response = requests.get(f"{self.base_url}/tools")
tools = tools_response.json()["tools"]
# Test each tool with sample parameters
for tool in tools:
tool_name = tool["name"]
schema = tool["schema"]
# Generate sample parameters based on schema
sample_params = self.generate_sample_parameters(schema)
try:
self.test_tool_execution(tool_name, sample_params)
print(f"✅ Tool '{tool_name}' test passed")
except Exception as e:
print(f"❌ Tool '{tool_name}' test failed: {e}")
print("All tests completed!")
def generate_sample_parameters(self, schema: Dict[str, Any]) -> Dict[str, Any]:
"""Generate sample parameters based on tool schema"""
properties = schema.get("properties", {})
required = schema.get("required", [])
params = {}
for prop_name, prop_def in properties.items():
prop_type = prop_def.get("type", "string")
if prop_type == "string":
if "expression" in prop_name.lower():
params[prop_name] = "2 + 2"
elif "text" in prop_name.lower():
params[prop_name] = "hello world"
elif "operation" in prop_name.lower():
params[prop_name] = "upper"
else:
params[prop_name] = "test"
elif prop_type == "number":
params[prop_name] = 42
elif prop_type == "boolean":
params[prop_name] = True
return params
# Usage
if __name__ == "__main__":
tester = MCPServerTester("http://localhost:3001")
tester.run_all_tests()Dockerfile for MCP server:
FROM python:3.9-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Expose port
EXPOSE 3001
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=30s --retries=3 \
CMD curl -f http://localhost:3001/health || exit 1
# Run server
CMD ["python", "mcp_server.py"]docker-compose.yml:
version: '3.8'
services:
calculator-mcp:
build: .
ports:
- "3001:3001"
environment:
- SERVER_HOST=0.0.0.0
- SERVER_PORT=3001
- LOG_LEVEL=INFO
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3001/health"]
interval: 30s
timeout: 10s
retries: 3
restart: unless-stopped
web-tools-mcp:
build: ./web-tools
ports:
- "3002:3001"
environment:
- SERVER_HOST=0.0.0.0
- SERVER_PORT=3001
restart: unless-stoppedapiVersion: apps/v1
kind: Deployment
metadata:
name: calculator-mcp
labels:
app: calculator-mcp
spec:
replicas: 2
selector:
matchLabels:
app: calculator-mcp
template:
metadata:
labels:
app: calculator-mcp
spec:
containers:
- name: calculator-mcp
image: botforge/calculator-mcp:latest
ports:
- containerPort: 3001
livenessProbe:
httpGet:
path: /health
port: 3001
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 3001
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: calculator-mcp-service
spec:
selector:
app: calculator-mcp
ports:
- port: 80
targetPort: 3001
type: ClusterIPfrom fastapi import Depends, HTTPException, Header
from typing import Optional
async def verify_api_key(x_api_key: Optional[str] = Header(None)):
if not x_api_key or x_api_key != "your-secret-key":
raise HTTPException(status_code=401, detail="Invalid API key")
return x_api_key
@app.post("/execute", dependencies=[Depends(verify_api_key)])
async def execute_tool(request: ToolExecuteRequest):
# Tool execution with authentication
passfrom slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/execute")
@limiter.limit("10/minute")
async def execute_tool(request: Request, tool_request: ToolExecuteRequest):
# Rate-limited tool execution
passimport redis
from typing import Optional
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_cached_result(tool_name: str, parameters: dict) -> Optional[dict]:
"""Get cached tool execution result"""
cache_key = f"mcp:{tool_name}:{hash(str(parameters))}"
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
return None
def cache_result(tool_name: str, parameters: dict, result: dict, ttl: int = 3600):
"""Cache tool execution result"""
cache_key = f"mcp:{tool_name}:{hash(str(parameters))}"
redis_client.setex(cache_key, ttl, json.dumps(result))
@app.post("/execute")
async def execute_tool(request: ToolExecuteRequest):
# Check cache first
cached_result = get_cached_result(request.tool_name, request.parameters)
if cached_result:
return cached_result
# Execute tool
result = await perform_tool_execution(request)
# Cache result
if result.success:
cache_result(request.tool_name, request.parameters, result.dict())
return resultimport logging
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@app.post("/execute")
async def execute_tool(request: ToolExecuteRequest):
start_time = datetime.utcnow()
try:
logger.info(f"Executing tool: {request.tool_name}")
result = await perform_tool_execution(request)
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.info(f"Tool execution completed in {execution_time:.3f}s")
return result
except Exception as e:
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.error(f"Tool execution failed after {execution_time:.3f}s: {str(e)}")
raise- Always return proper error responses
- Include helpful error messages
- Log errors for debugging
- Handle timeouts gracefully
- Implement caching for expensive operations
- Use async/await for I/O operations
- Optimize tool execution algorithms
- Monitor memory usage
- Validate all input parameters
- Implement rate limiting
- Use authentication when needed
- Sanitize user inputs
- Implement health checks
- Handle network failures
- Use circuit breakers for external dependencies
- Implement graceful shutdowns
- Document all tools and parameters
- Provide usage examples
- Include error codes and messages
- Maintain API versioning
Server Not Responding:
# Check if server is running
curl http://localhost:3001/health
# Check server logs
docker logs calculator-mcp
# Verify port binding
netstat -tlnp | grep 3001Tool Execution Failures:
# Add debug logging
logger.debug(f"Tool parameters: {parameters}")
logger.debug(f"Tool execution result: {result}")
# Validate parameters
def validate_parameters(tool_name: str, parameters: dict):
tool_schema = get_tool_schema(tool_name)
# Implement validation logicRegistration Issues:
# Test server capabilities
curl http://localhost:3001/capabilities
# Check BotForge logs
docker logs botforge-api | grep -i mcp
# Verify network connectivity
telnet localhost 3001# MCP Server Debug Endpoint
@app.get("/debug")
async def debug_info():
return {
"server_info": {
"name": "Calculator MCP Server",
"version": "1.0.0",
"uptime": get_uptime(),
"memory_usage": get_memory_usage()
},
"tools": AVAILABLE_TOOLS,
"recent_executions": get_recent_executions(),
"error_log": get_recent_errors()
}
# Health Check with Details
@app.get("/health/detailed")
async def detailed_health():
return {
"status": "healthy",
"components": {
"database": check_database_health(),
"external_apis": check_external_apis(),
"memory": check_memory_usage(),
"disk": check_disk_usage()
},
"tools": [
{
"name": tool["name"],
"status": "active",
"last_execution": get_last_execution_time(tool["name"])
}
for tool in AVAILABLE_TOOLS
]
}This comprehensive guide provides everything needed to build, deploy, and integrate MCP servers with BotForge RAG, enabling powerful tool execution capabilities alongside information retrieval.