-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhypercorn_server.py
More file actions
208 lines (182 loc) · 6.6 KB
/
hypercorn_server.py
File metadata and controls
208 lines (182 loc) · 6.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# hypercorn_server.py - MCP Server with Hypercorn (HTTP/2)
"""
Hypercorn-based MCP server for ASGI performance benchmarking.
Optimized for HTTP/2 multiplexed agent communication patterns.
"""
import asyncio
from hypercorn.config import Config
from hypercorn.asyncio import serve
from mcp.server.fastmcp import FastMCP
import mcp.types as types
import time
import json
# Create MCP server optimized for HTTP/2 agent communication
mcp_app = FastMCP(
name="hypercorn-mcp-benchmark",
description="MCP server running on Hypercorn for HTTP/2 performance testing",
stateless_http=True, # Essential for benchmarking
debug=False, # Production mode
log_level="WARNING" # Minimal logging for performance
)
@mcp_app.tool()
def agent_task(task_id: str, data: str, complexity: str = "simple") -> dict:
"""Simulate typical agent task processing with varying complexity."""
start_time = time.time()
# Simulate different processing complexities
if complexity == "simple":
result = f"Processed {len(data)} bytes"
elif complexity == "medium":
# Simulate some CPU work
result = f"Analyzed {len(data)} bytes with {len(data.split())} tokens"
else: # complex
# Simulate more intensive processing
result = {
"analysis": f"Deep analysis of {len(data)} bytes",
"tokens": len(data.split()),
"patterns": ["pattern1", "pattern2"] if len(data) > 50 else ["pattern1"],
"confidence": 0.95
}
processing_time = time.time() - start_time
return {
"task_id": task_id,
"result": result,
"processing_time_ms": round(processing_time * 1000, 2),
"timestamp": time.time(),
"status": "complete",
"server": "hypercorn"
}
@mcp_app.tool()
def batch_process(tasks: list) -> dict:
"""Process multiple tasks in a batch - common agent pattern."""
start_time = time.time()
results = []
for i, task in enumerate(tasks):
results.append({
"task_index": i,
"data_length": len(str(task)),
"processed": True
})
processing_time = time.time() - start_time
return {
"batch_id": f"batch_{int(time.time())}",
"total_tasks": len(tasks),
"results": results,
"processing_time_ms": round(processing_time * 1000, 2),
"server": "hypercorn"
}
@mcp_app.tool()
def parallel_agent_tasks(task_configs: list) -> dict:
"""Process parallel agent tasks - showcases HTTP/2 multiplexing."""
start_time = time.time()
results = []
for i, config in enumerate(task_configs):
# Simulate processing different task types
task_type = config.get("type", "unknown")
data = config.get("data", "")
results.append({
"task_index": i,
"task_type": task_type,
"data_processed": len(str(data)),
"status": "completed"
})
processing_time = time.time() - start_time
return {
"batch_id": f"parallel_{int(time.time())}",
"total_tasks": len(task_configs),
"results": results,
"processing_time_ms": round(processing_time * 1000, 2),
"server": "hypercorn"
}
@mcp_app.resource("agent://{agent_id}/status")
async def agent_status(agent_id: str) -> dict:
"""Agent status check - enhanced for HTTP/2."""
return {
"agent_id": agent_id,
"status": "active",
"last_seen": time.time(),
"server": "hypercorn",
"protocol": "HTTP/2",
"capabilities": [
"tool_calling",
"resource_access",
"batch_processing",
"parallel_multiplexing",
"header_compression"
],
"http2_features": {
"multiplexing": True,
"server_push": True,
"header_compression": True
}
}
@mcp_app.resource("benchmark://{test_type}/data")
async def benchmark_data(test_type: str) -> dict:
"""Provide test data optimized for HTTP/2 testing."""
test_data = {
"small": "x" * 100, # 100 bytes
"medium": "x" * 1000, # 1KB
"large": "x" * 10000, # 10KB
"xlarge": "x" * 100000, # 100KB - HTTP/2 compression test
}
return {
"test_type": test_type,
"data": test_data.get(test_type, test_data["small"]),
"size_bytes": len(test_data.get(test_type, test_data["small"])),
"server": "hypercorn",
"compression_enabled": True
}
@mcp_app.resource("stream://{stream_id}/data")
async def stream_data(stream_id: str) -> dict:
"""Simulate HTTP/2 stream-specific data."""
return {
"stream_id": stream_id,
"data": f"Stream {stream_id} data payload",
"timestamp": time.time(),
"server": "hypercorn",
"protocol": "HTTP/2"
}
@mcp_app.prompt()
async def agent_communication(
source_agent: str,
target_agent: str,
task_type: str = "data_analysis"
) -> list[types.PromptMessage]:
"""Generate structured prompts for A2A communication with HTTP/2 optimization."""
return [
types.PromptMessage(
role="system",
content=types.TextContent(
text=f"You are {source_agent} communicating with {target_agent} over HTTP/2",
type="text"
)
),
types.PromptMessage(
role="user",
content=types.TextContent(
text=f"Perform {task_type} using HTTP/2 multiplexing for optimal performance",
type="text"
)
)
]
# Create the streamable HTTP app at module level (same pattern as Uvicorn)
streamable_http_app = mcp_app.streamable_http_app()
if __name__ == "__main__":
print("🚀 Starting MCP Server with Hypercorn (HTTP/2)")
print("=" * 50)
print("Server: Hypercorn")
print("Protocol: HTTP/2 + WebSockets")
print("Optimization: Multiplexing, header compression")
print("Target: Parallel agent communication")
print("=" * 50)
# Hypercorn configuration optimized for HTTP/2 benchmarking
config = Config()
config.bind = ["0.0.0.0:8001"] # Different port to avoid conflict
config.workers = 1 # Single worker for consistent benchmarking
# HTTP/2 specific settings
config.http2 = True
config.alpn_protocols = ["h2", "http/1.1"]
# Logging (minimal for performance)
config.loglevel = "warning"
config.access_log_format = None # Disable access logs
# Run with the same pattern as Uvicorn
asyncio.run(serve(streamable_http_app, config))