Skip to content

Latest commit

 

History

History
1080 lines (918 loc) · 28 KB

File metadata and controls

1080 lines (918 loc) · 28 KB

Model Context Protocol (MCP) Integration Guide

Overview

The Model Context Protocol (MCP) enables BotForge RAG to integrate with external tool servers, extending the system's capabilities beyond information retrieval to include dynamic tool execution. This guide covers how to build, deploy, and integrate MCP servers with BotForge RAG.

🏗️ MCP Architecture

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   BotForge      │────▶│ MCP Manager     │────▶│ External MCP    │
│   Agent         │    │                 │    │ Servers         │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         │                       │                       │
         │                       │                       │
    ┌─────────┐             ┌─────────┐             ┌─────────┐
    │ Tool    │             │ Server  │             │ HTTP    │
    │Selection│             │Registry │             │ API     │
    └─────────┘             └─────────┘             └─────────┘

📋 MCP Server Requirements

1. Required Endpoints

Every MCP server must implement these endpoints:

GET /capabilities

Returns server metadata and available tools.

Response Format:

{
  "server": {
    "name": "Calculator Server",
    "version": "1.0.0",
    "description": "Mathematical calculations and string operations"
  },
  "tools": [
    {
      "name": "calculator",
      "description": "Perform basic mathematical calculations",
      "schema": {
        "type": "object",
        "properties": {
          "expression": {
            "type": "string",
            "description": "Mathematical expression to evaluate (e.g., '25 * 17 + 100')"
          }
        },
        "required": ["expression"]
      }
    }
  ],
  "protocol_version": "1.0"
}

GET /tools

Returns available tools (simplified version of capabilities).

Response Format:

{
  "tools": [
    {
      "name": "calculator",
      "description": "Perform basic mathematical calculations",
      "schema": {...}
    }
  ]
}

POST /execute

Executes a specific tool with parameters.

Request Format:

{
  "tool_name": "calculator",
  "parameters": {
    "expression": "25 * 17 + 100"
  }
}

Response Format:

{
  "success": true,
  "result": {
    "calculation": "25 * 17 + 100",
    "answer": 525
  },
  "error": null
}

2. Optional Endpoints

GET /health

Health check endpoint.

{
  "status": "healthy",
  "server": "Calculator Server",
  "uptime": 3600,
  "tools_available": 3
}

GET /

Root endpoint with server information.

{
  "message": "Calculator MCP Server",
  "version": "1.0.0",
  "endpoints": {
    "capabilities": "/capabilities",
    "tools": "/tools",
    "execute": "/execute",
    "health": "/health"
  }
}

🛠️ Building MCP Servers

Python/FastAPI Example

Here's a complete example of a simple MCP server:

#!/usr/bin/env python3
"""
Simple MCP server implementation using FastAPI
"""

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
import uvicorn
import json

app = FastAPI(title="Calculator MCP Server", version="1.0.0")

# Tool definitions
AVAILABLE_TOOLS = [
    {
        "name": "calculator",
        "description": "Perform basic mathematical calculations",
        "schema": {
            "type": "object",
            "properties": {
                "expression": {
                    "type": "string",
                    "description": "Mathematical expression to evaluate"
                }
            },
            "required": ["expression"]
        }
    },
    {
        "name": "string_operations",
        "description": "Perform string operations like uppercase, lowercase, reverse",
        "schema": {
            "type": "object",
            "properties": {
                "text": {
                    "type": "string",
                    "description": "Text to process"
                },
                "operation": {
                    "type": "string",
                    "description": "Operation: 'upper', 'lower', 'reverse', 'length'"
                }
            },
            "required": ["text", "operation"]
        }
    }
]

class ToolExecuteRequest(BaseModel):
    tool_name: str
    parameters: Dict[str, Any]

class ToolExecuteResponse(BaseModel):
    success: bool
    result: Any
    error: Optional[str] = None

@app.get("/capabilities")
async def get_capabilities():
    """Return server capabilities and available tools"""
    return {
        "server": {
            "name": "Calculator MCP Server",
            "version": "1.0.0",
            "description": "Mathematical calculations and string operations"
        },
        "tools": AVAILABLE_TOOLS,
        "protocol_version": "1.0"
    }

@app.get("/tools")
async def list_tools():
    """List all available tools"""
    return {"tools": AVAILABLE_TOOLS}

@app.post("/execute", response_model=ToolExecuteResponse)
async def execute_tool(request: ToolExecuteRequest):
    """Execute a tool with given parameters"""
    try:
        tool_name = request.tool_name
        parameters = request.parameters
        
        if tool_name == "calculator":
            return execute_calculator(parameters)
        elif tool_name == "string_operations":
            return execute_string_operations(parameters)
        else:
            return ToolExecuteResponse(
                success=False,
                result=None,
                error=f"Unknown tool: {tool_name}"
            )
    
    except Exception as e:
        return ToolExecuteResponse(
            success=False,
            result=None,
            error=f"Tool execution failed: {str(e)}"
        )

def execute_calculator(parameters: Dict[str, Any]) -> ToolExecuteResponse:
    """Execute calculator tool"""
    expression = parameters.get("expression", "")
    if not expression:
        return ToolExecuteResponse(
            success=False,
            result=None,
            error="Expression parameter is required"
        )
    
    try:
        # Safe evaluation for basic math
        allowed_chars = set('0123456789+-*/(). ')
        if all(c in allowed_chars for c in expression):
            result = eval(expression)
            return ToolExecuteResponse(
                success=True,
                result={
                    "calculation": expression,
                    "answer": result
                }
            )
        else:
            return ToolExecuteResponse(
                success=False,
                result=None,
                error="Invalid characters in mathematical expression"
            )
    except Exception as e:
        return ToolExecuteResponse(
            success=False,
            result=None,
            error=f"Calculation error: {str(e)}"
        )

def execute_string_operations(parameters: Dict[str, Any]) -> ToolExecuteResponse:
    """Execute string operations tool"""
    text = parameters.get("text", "")
    operation = parameters.get("operation", "").lower()
    
    if not text:
        return ToolExecuteResponse(
            success=False,
            result=None,
            error="Text parameter is required"
        )
    
    if operation == "upper":
        result = text.upper()
    elif operation == "lower":
        result = text.lower()
    elif operation == "reverse":
        result = text[::-1]
    elif operation == "length":
        result = len(text)
    else:
        return ToolExecuteResponse(
            success=False,
            result=None,
            error="Invalid operation. Use: upper, lower, reverse, length"
        )
    
    return ToolExecuteResponse(
        success=True,
        result={
            "original": text,
            "operation": operation,
            "result": result
        }
    )

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return {
        "status": "healthy",
        "server": "Calculator MCP Server",
        "tools_available": len(AVAILABLE_TOOLS)
    }

@app.get("/")
async def root():
    """Root endpoint with server info"""
    return {
        "message": "Calculator MCP Server",
        "version": "1.0.0",
        "endpoints": {
            "capabilities": "/capabilities",
            "tools": "/tools",
            "execute": "/execute",
            "health": "/health"
        }
    }

if __name__ == "__main__":
    print("🚀 Starting Calculator MCP Server")
    print("📋 Available endpoints:")
    print("   • Capabilities: http://localhost:3001/capabilities")
    print("   • Tools:        http://localhost:3001/tools")
    print("   • Execute:      http://localhost:3001/execute")
    print("   • Health:       http://localhost:3001/health")
    
    uvicorn.run(app, host="0.0.0.0", port=3001)

Node.js/Express Example

const express = require('express');
const app = express();
const port = 3001;

app.use(express.json());

// Tool definitions
const AVAILABLE_TOOLS = [
    {
        name: "web_scraper",
        description: "Scrape content from web pages",
        schema: {
            type: "object",
            properties: {
                url: {
                    type: "string",
                    description: "URL to scrape"
                },
                selector: {
                    type: "string",
                    description: "CSS selector for content extraction"
                }
            },
            required: ["url"]
        }
    },
    {
        name: "file_processor",
        description: "Process and analyze files",
        schema: {
            type: "object",
            properties: {
                file_path: {
                    type: "string",
                    description: "Path to the file"
                },
                operation: {
                    type: "string",
                    description: "Operation: 'read', 'analyze', 'summarize'"
                }
            },
            required: ["file_path", "operation"]
        }
    }
];

// Capabilities endpoint
app.get('/capabilities', (req, res) => {
    res.json({
        server: {
            name: "Web Tools MCP Server",
            version: "1.0.0",
            description: "Web scraping and file processing tools"
        },
        tools: AVAILABLE_TOOLS,
        protocol_version: "1.0"
    });
});

// Tools endpoint
app.get('/tools', (req, res) => {
    res.json({
        tools: AVAILABLE_TOOLS
    });
});

// Execute endpoint
app.post('/execute', async (req, res) => {
    try {
        const { tool_name, parameters } = req.body;
        
        let result;
        switch (tool_name) {
            case 'web_scraper':
                result = await executeWebScraper(parameters);
                break;
            case 'file_processor':
                result = await executeFileProcessor(parameters);
                break;
            default:
                return res.json({
                    success: false,
                    result: null,
                    error: `Unknown tool: ${tool_name}`
                });
        }
        
        res.json(result);
    } catch (error) {
        res.json({
            success: false,
            result: null,
            error: `Tool execution failed: ${error.message}`
        });
    }
});

// Tool implementations
async function executeWebScraper(parameters) {
    const { url, selector } = parameters;
    
    // Implement web scraping logic here
    // This is a simplified example
    try {
        const response = await fetch(url);
        const html = await response.text();
        
        return {
            success: true,
            result: {
                url: url,
                content: html.slice(0, 1000), // Truncated for example
                length: html.length
            }
        };
    } catch (error) {
        return {
            success: false,
            result: null,
            error: `Scraping failed: ${error.message}`
        };
    }
}

async function executeFileProcessor(parameters) {
    const { file_path, operation } = parameters;
    
    // Implement file processing logic here
    try {
        const fs = require('fs').promises;
        
        switch (operation) {
            case 'read':
                const content = await fs.readFile(file_path, 'utf8');
                return {
                    success: true,
                    result: {
                        file_path: file_path,
                        content: content,
                        size: content.length
                    }
                };
            case 'analyze':
                const stats = await fs.stat(file_path);
                return {
                    success: true,
                    result: {
                        file_path: file_path,
                        size: stats.size,
                        modified: stats.mtime,
                        type: file_path.split('.').pop()
                    }
                };
            default:
                return {
                    success: false,
                    result: null,
                    error: `Unknown operation: ${operation}`
                };
        }
    } catch (error) {
        return {
            success: false,
            result: null,
            error: `File processing failed: ${error.message}`
        };
    }
}

// Health check
app.get('/health', (req, res) => {
    res.json({
        status: 'healthy',
        server: 'Web Tools MCP Server',
        tools_available: AVAILABLE_TOOLS.length
    });
});

// Root endpoint
app.get('/', (req, res) => {
    res.json({
        message: 'Web Tools MCP Server',
        version: '1.0.0',
        endpoints: {
            capabilities: '/capabilities',
            tools: '/tools',
            execute: '/execute',
            health: '/health'
        }
    });
});

app.listen(port, () => {
    console.log(`🚀 Web Tools MCP Server listening at http://localhost:${port}`);
    console.log('📋 Available endpoints:');
    console.log(`   • Capabilities: http://localhost:${port}/capabilities`);
    console.log(`   • Tools:        http://localhost:${port}/tools`);
    console.log(`   • Execute:      http://localhost:${port}/execute`);
    console.log(`   • Health:       http://localhost:${port}/health`);
});

🔗 Registering MCP Servers

Manual Registration

curl -X POST "http://localhost:8000/api/mcp/register" \
  -H "Content-Type: application/json" \
  -d '{
    "bot_id": "550e8400-e29b-41d4-a716-446655440000",
    "name": "Calculator Server",
    "endpoint_url": "http://localhost:3001",
    "description": "Mathematical calculations and string operations",
    "timeout_seconds": 30,
    "retry_attempts": 3
  }'

Programmatic Registration

import requests

def register_mcp_server(bot_id, server_config):
    """Register an MCP server for a bot"""
    response = requests.post(
        "http://localhost:8000/api/mcp/register",
        json={
            "bot_id": bot_id,
            "name": server_config["name"],
            "endpoint_url": server_config["endpoint_url"],
            "description": server_config.get("description", ""),
            "timeout_seconds": server_config.get("timeout_seconds", 30),
            "retry_attempts": server_config.get("retry_attempts", 3)
        }
    )
    return response.json()

# Example usage
bot_id = "550e8400-e29b-41d4-a716-446655440000"
server_config = {
    "name": "Calculator Server",
    "endpoint_url": "http://localhost:3001",
    "description": "Mathematical calculations"
}

result = register_mcp_server(bot_id, server_config)
print(f"Server registered: {result['mcp_server_id']}")

🧪 Testing MCP Servers

Manual Testing

# Test server capabilities
curl "http://localhost:3001/capabilities"

# Test tool execution
curl -X POST "http://localhost:3001/execute" \
  -H "Content-Type: application/json" \
  -d '{
    "tool_name": "calculator",
    "parameters": {
      "expression": "25 * 17 + 100"
    }
  }'

# Test health check
curl "http://localhost:3001/health"

Automated Testing

import pytest
import requests
import asyncio
from typing import Dict, Any

class MCPServerTester:
    def __init__(self, base_url: str):
        self.base_url = base_url
    
    def test_capabilities(self):
        """Test capabilities endpoint"""
        response = requests.get(f"{self.base_url}/capabilities")
        assert response.status_code == 200
        
        data = response.json()
        assert "server" in data
        assert "tools" in data
        assert "protocol_version" in data
        
        # Validate server info
        server = data["server"]
        assert "name" in server
        assert "version" in server
        
        # Validate tools
        tools = data["tools"]
        assert isinstance(tools, list)
        
        for tool in tools:
            assert "name" in tool
            assert "description" in tool
            assert "schema" in tool
    
    def test_tools_endpoint(self):
        """Test tools endpoint"""
        response = requests.get(f"{self.base_url}/tools")
        assert response.status_code == 200
        
        data = response.json()
        assert "tools" in data
        assert isinstance(data["tools"], list)
    
    def test_tool_execution(self, tool_name: str, parameters: Dict[str, Any]):
        """Test tool execution"""
        response = requests.post(
            f"{self.base_url}/execute",
            json={
                "tool_name": tool_name,
                "parameters": parameters
            }
        )
        assert response.status_code == 200
        
        data = response.json()
        assert "success" in data
        
        if data["success"]:
            assert "result" in data
        else:
            assert "error" in data
    
    def test_health_check(self):
        """Test health endpoint"""
        response = requests.get(f"{self.base_url}/health")
        assert response.status_code == 200
        
        data = response.json()
        assert "status" in data
        assert data["status"] == "healthy"
    
    def run_all_tests(self):
        """Run all tests"""
        print("Testing MCP Server...")
        
        # Test basic endpoints
        self.test_capabilities()
        self.test_tools_endpoint()
        self.test_health_check()
        
        # Get available tools for testing
        tools_response = requests.get(f"{self.base_url}/tools")
        tools = tools_response.json()["tools"]
        
        # Test each tool with sample parameters
        for tool in tools:
            tool_name = tool["name"]
            schema = tool["schema"]
            
            # Generate sample parameters based on schema
            sample_params = self.generate_sample_parameters(schema)
            
            try:
                self.test_tool_execution(tool_name, sample_params)
                print(f"✅ Tool '{tool_name}' test passed")
            except Exception as e:
                print(f"❌ Tool '{tool_name}' test failed: {e}")
        
        print("All tests completed!")
    
    def generate_sample_parameters(self, schema: Dict[str, Any]) -> Dict[str, Any]:
        """Generate sample parameters based on tool schema"""
        properties = schema.get("properties", {})
        required = schema.get("required", [])
        
        params = {}
        for prop_name, prop_def in properties.items():
            prop_type = prop_def.get("type", "string")
            
            if prop_type == "string":
                if "expression" in prop_name.lower():
                    params[prop_name] = "2 + 2"
                elif "text" in prop_name.lower():
                    params[prop_name] = "hello world"
                elif "operation" in prop_name.lower():
                    params[prop_name] = "upper"
                else:
                    params[prop_name] = "test"
            elif prop_type == "number":
                params[prop_name] = 42
            elif prop_type == "boolean":
                params[prop_name] = True
        
        return params

# Usage
if __name__ == "__main__":
    tester = MCPServerTester("http://localhost:3001")
    tester.run_all_tests()

📦 Deploying MCP Servers

Docker Deployment

Dockerfile for MCP server:

FROM python:3.9-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Expose port
EXPOSE 3001

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=30s --retries=3 \
  CMD curl -f http://localhost:3001/health || exit 1

# Run server
CMD ["python", "mcp_server.py"]

docker-compose.yml:

version: '3.8'

services:
  calculator-mcp:
    build: .
    ports:
      - "3001:3001"
    environment:
      - SERVER_HOST=0.0.0.0
      - SERVER_PORT=3001
      - LOG_LEVEL=INFO
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:3001/health"]
      interval: 30s
      timeout: 10s
      retries: 3
    restart: unless-stopped

  web-tools-mcp:
    build: ./web-tools
    ports:
      - "3002:3001"
    environment:
      - SERVER_HOST=0.0.0.0
      - SERVER_PORT=3001
    restart: unless-stopped

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: calculator-mcp
  labels:
    app: calculator-mcp
spec:
  replicas: 2
  selector:
    matchLabels:
      app: calculator-mcp
  template:
    metadata:
      labels:
        app: calculator-mcp
    spec:
      containers:
      - name: calculator-mcp
        image: botforge/calculator-mcp:latest
        ports:
        - containerPort: 3001
        livenessProbe:
          httpGet:
            path: /health
            port: 3001
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 3001
          initialDelaySeconds: 5
          periodSeconds: 5

---
apiVersion: v1
kind: Service
metadata:
  name: calculator-mcp-service
spec:
  selector:
    app: calculator-mcp
  ports:
  - port: 80
    targetPort: 3001
  type: ClusterIP

🔧 Advanced MCP Features

Authentication

from fastapi import Depends, HTTPException, Header
from typing import Optional

async def verify_api_key(x_api_key: Optional[str] = Header(None)):
    if not x_api_key or x_api_key != "your-secret-key":
        raise HTTPException(status_code=401, detail="Invalid API key")
    return x_api_key

@app.post("/execute", dependencies=[Depends(verify_api_key)])
async def execute_tool(request: ToolExecuteRequest):
    # Tool execution with authentication
    pass

Rate Limiting

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.post("/execute")
@limiter.limit("10/minute")
async def execute_tool(request: Request, tool_request: ToolExecuteRequest):
    # Rate-limited tool execution
    pass

Caching

import redis
from typing import Optional

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def get_cached_result(tool_name: str, parameters: dict) -> Optional[dict]:
    """Get cached tool execution result"""
    cache_key = f"mcp:{tool_name}:{hash(str(parameters))}"
    cached = redis_client.get(cache_key)
    if cached:
        return json.loads(cached)
    return None

def cache_result(tool_name: str, parameters: dict, result: dict, ttl: int = 3600):
    """Cache tool execution result"""
    cache_key = f"mcp:{tool_name}:{hash(str(parameters))}"
    redis_client.setex(cache_key, ttl, json.dumps(result))

@app.post("/execute")
async def execute_tool(request: ToolExecuteRequest):
    # Check cache first
    cached_result = get_cached_result(request.tool_name, request.parameters)
    if cached_result:
        return cached_result
    
    # Execute tool
    result = await perform_tool_execution(request)
    
    # Cache result
    if result.success:
        cache_result(request.tool_name, request.parameters, result.dict())
    
    return result

Logging and Monitoring

import logging
from datetime import datetime

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

@app.post("/execute")
async def execute_tool(request: ToolExecuteRequest):
    start_time = datetime.utcnow()
    
    try:
        logger.info(f"Executing tool: {request.tool_name}")
        result = await perform_tool_execution(request)
        
        execution_time = (datetime.utcnow() - start_time).total_seconds()
        logger.info(f"Tool execution completed in {execution_time:.3f}s")
        
        return result
    
    except Exception as e:
        execution_time = (datetime.utcnow() - start_time).total_seconds()
        logger.error(f"Tool execution failed after {execution_time:.3f}s: {str(e)}")
        raise

🎯 Best Practices

1. Error Handling

  • Always return proper error responses
  • Include helpful error messages
  • Log errors for debugging
  • Handle timeouts gracefully

2. Performance

  • Implement caching for expensive operations
  • Use async/await for I/O operations
  • Optimize tool execution algorithms
  • Monitor memory usage

3. Security

  • Validate all input parameters
  • Implement rate limiting
  • Use authentication when needed
  • Sanitize user inputs

4. Reliability

  • Implement health checks
  • Handle network failures
  • Use circuit breakers for external dependencies
  • Implement graceful shutdowns

5. Documentation

  • Document all tools and parameters
  • Provide usage examples
  • Include error codes and messages
  • Maintain API versioning

🚨 Troubleshooting

Common Issues

Server Not Responding:

# Check if server is running
curl http://localhost:3001/health

# Check server logs
docker logs calculator-mcp

# Verify port binding
netstat -tlnp | grep 3001

Tool Execution Failures:

# Add debug logging
logger.debug(f"Tool parameters: {parameters}")
logger.debug(f"Tool execution result: {result}")

# Validate parameters
def validate_parameters(tool_name: str, parameters: dict):
    tool_schema = get_tool_schema(tool_name)
    # Implement validation logic

Registration Issues:

# Test server capabilities
curl http://localhost:3001/capabilities

# Check BotForge logs
docker logs botforge-api | grep -i mcp

# Verify network connectivity
telnet localhost 3001

Debugging Tools

# MCP Server Debug Endpoint
@app.get("/debug")
async def debug_info():
    return {
        "server_info": {
            "name": "Calculator MCP Server",
            "version": "1.0.0",
            "uptime": get_uptime(),
            "memory_usage": get_memory_usage()
        },
        "tools": AVAILABLE_TOOLS,
        "recent_executions": get_recent_executions(),
        "error_log": get_recent_errors()
    }

# Health Check with Details
@app.get("/health/detailed")
async def detailed_health():
    return {
        "status": "healthy",
        "components": {
            "database": check_database_health(),
            "external_apis": check_external_apis(),
            "memory": check_memory_usage(),
            "disk": check_disk_usage()
        },
        "tools": [
            {
                "name": tool["name"],
                "status": "active",
                "last_execution": get_last_execution_time(tool["name"])
            }
            for tool in AVAILABLE_TOOLS
        ]
    }

This comprehensive guide provides everything needed to build, deploy, and integrate MCP servers with BotForge RAG, enabling powerful tool execution capabilities alongside information retrieval.