diff --git a/mcp_server_code_execution_mode.py b/mcp_server_code_execution_mode.py index 1b8eabf..b8b6437 100644 --- a/mcp_server_code_execution_mode.py +++ b/mcp_server_code_execution_mode.py @@ -21,6 +21,7 @@ import inspect import io import tempfile +import threading import textwrap from asyncio import subprocess as aio_subprocess from contextlib import suppress @@ -130,7 +131,7 @@ def _check_pydantic_compatibility() -> None: BRIDGE_NAME = "mcp-server-code-execution-mode" DEFAULT_IMAGE = os.environ.get("MCP_BRIDGE_IMAGE", "python:3.14-slim") DEFAULT_RUNTIME = os.environ.get("MCP_BRIDGE_RUNTIME") -DEFAULT_TIMEOUT = int(os.environ.get("MCP_BRIDGE_TIMEOUT", "30")) +DEFAULT_TIMEOUT = int(os.environ.get("MCP_BRIDGE_TIMEOUT", "90")) MAX_TIMEOUT = int(os.environ.get("MCP_BRIDGE_MAX_TIMEOUT", "120")) DEFAULT_MEMORY = os.environ.get("MCP_BRIDGE_MEMORY", "512m") DEFAULT_PIDS = int(os.environ.get("MCP_BRIDGE_PIDS", "128")) @@ -638,10 +639,13 @@ async def start(self) -> None: if self._session: return + # Merge server-specific env with parent env so credentials + # (JIRA tokens, GitHub tokens, etc.) are inherited automatically. + merged_env = {**os.environ, **(self.server_info.env or {})} params = StdioServerParameters( command=self.server_info.command, args=self.server_info.args, - env=self.server_info.env or None, + env=merged_env, cwd=self.server_info.cwd or None, ) @@ -771,6 +775,7 @@ def __init__( self._share_lock = asyncio.Lock() self._shared_paths: set[str] = set() self._process: Optional[asyncio.subprocess.Process] = None + self._popen: Optional[Any] = None def _base_cmd(self) -> List[str]: if not self.runtime: @@ -1619,8 +1624,15 @@ async def _ensure_started( volume_mounts: Optional[Sequence[str]], host_dir: Path, ) -> None: - if self._process and self._process.returncode is None: - return + # Always create a fresh container. Prior containers may have + # stale IPC directories (cleaned up by SandboxInvocation). + if self._popen is not None: + try: + self._popen.terminate() + self._popen.wait(timeout=5) + except Exception: + pass + self._popen = None await self._ensure_runtime_ready() if not self.runtime: @@ -1643,13 +1655,19 @@ async def _ensure_started( for key, value in container_env.items(): cmd.extend(["--env", f"{key}={value}"]) cmd.extend([self.image, "python3", "-u", entrypoint_target]) + logger.info("Launching container, entrypoint=%s", entrypoint_path) - self._process = await asyncio.create_subprocess_exec( - *cmd, - stdin=aio_subprocess.PIPE, - stdout=aio_subprocess.PIPE, - stderr=aio_subprocess.PIPE, + import subprocess as _sp + + self._popen = _sp.Popen( + cmd, + stdin=_sp.PIPE, + stdout=_sp.PIPE, + stderr=_sp.PIPE, ) + # Also set the asyncio _process so returncode check works + self._process = self._popen + logger.info("Container process started, pid=%s", self._popen.pid) async def execute( self, @@ -1675,137 +1693,231 @@ async def execute( volume_mounts, host_dir, ) - process = self._process - assert process is not None - assert process.stdin is not None - assert process.stdout is not None - assert process.stderr is not None - - # Send code execution request - request = {"type": "execute", "code": code} - try: - process.stdin.write(json.dumps(request).encode("utf-8") + b"\n") - await process.stdin.drain() - except Exception as exc: - raise SandboxError(f"Failed to send code to sandbox: {exc}") from exc - + popen = self._popen + assert popen is not None + assert popen.stdin is not None + assert popen.stdout is not None + + # ----------------------------------------------------------- + # Run the entire Docker stdin/stdout exchange on a **separate + # thread with its own asyncio event loop**. + # + # Why: the main event loop is starved by MCP client forwarder + # tasks (`PersistentMCPClient._forward_read`) that hold an + # `async for item in raw_read_stream:` loop open for every + # bridged server. These tasks never yield long enough for + # Docker subprocess I/O to be scheduled — whether through + # asyncio StreamReader, os.read on raw fds, or even plain + # subprocess.Popen iteration. + # + # Even `loop.run_in_executor` fails because `wait_for` on + # the main loop never fires the "future done" callback. + # + # Solution: a dedicated thread creates its own event loop, + # does all Docker pipe I/O there, and signals completion via + # a threading.Event. The main loop polls the event with + # short sleeps — these `asyncio.sleep` calls DO get scheduled + # because they are simple timers, not I/O-dependent. + # ----------------------------------------------------------- + main_loop = asyncio.get_running_loop() stdout_chunks: List[str] = [] stderr_chunks: List[str] = [] + done_event = threading.Event() + thread_error: List[Optional[Exception]] = [None] - async def _handle_stdout() -> None: - assert process.stdout is not None - async for line in process.stdout: + def _isolated_thread() -> None: + """Run Docker I/O on a completely separate event loop.""" + try: + _loop = asyncio.new_event_loop() + asyncio.set_event_loop(_loop) try: - message = json.loads(line.decode()) - except Exception: - stderr_chunks.append(line.decode(errors="replace")) - continue - - msg_type = message.get("type") - if msg_type == "stdout": - stdout_chunks.append(message.get("data", "")) - elif msg_type == "stderr": - stderr_chunks.append(message.get("data", "")) - elif msg_type == "execution_done": - break - elif msg_type == "rpc_request": - if process.stdin is None: - continue - if rpc_handler is None: - response: Dict[str, object] = { - "success": False, - "error": "RPC handler unavailable", - } - else: - try: - payload = message.get("payload", {}) - response = await rpc_handler( - payload if isinstance(payload, dict) else {} - ) - except Exception as exc: - logger.debug("RPC handler failed", exc_info=True) - response = {"success": False, "error": str(exc)} - reply: Dict[str, object] = { - "type": "rpc_response", - "id": message.get("id"), - "success": response.get("success", True), - "payload": response, - } - if not reply["success"]: - reply["error"] = response.get("error", "RPC error") - try: - data = ( - json.dumps(reply, separators=(",", ":")).encode("utf-8") - + b"\n" + _loop.run_until_complete( + self._docker_exchange( + popen, code, stdout_chunks, stderr_chunks, + rpc_handler, main_loop, timeout, ) - process.stdin.write(data) - await process.stdin.drain() - except Exception: - stderr_chunks.append("Failed to deliver RPC response\n") - break - else: - stderr_chunks.append(json.dumps(message, separators=(",", ":"))) - - async def _read_stderr() -> None: - assert process.stderr is not None - while True: - # We can't just read until EOF because the process is persistent. - # We need to read what's available. - # But stderr from python usually comes line by line or buffered. - # If we read(4096), it might block if nothing is there? - # No, asyncio read blocks until *some* data is available. - # But we don't know when to STOP reading for this execution. - # The container script redirects sys.stderr to _StreamProxy which sends JSON over stdout. - # So REAL stderr should only contain runtime errors or C-level stderr. - # We should probably read it continuously in a background task that lives as long as the process? - # For now, let's just read line by line. - line = await process.stderr.readline() - if not line: - break - stderr_chunks.append(line.decode(errors="replace")) + ) + finally: + _loop.close() + except Exception as exc: + thread_error[0] = exc + finally: + done_event.set() - # We can't easily sync stderr reading with execution_done if it's not wrapped. - # But our entrypoint redirects sys.stderr to stdout (wrapped). - # So process.stderr will only have "hard" errors. - # We can just let it accumulate in the background? - # But we want to return it with the result. - # Let's just assume "hard" stderr is rare and maybe we miss some if it comes after execution_done. - # Actually, we can't block on stderr.readline() if there is no stderr. - # So we should probably NOT await stderr reading here, or use a non-blocking approach. - # But we are in an async function. - # Let's spawn a stderr reader task that runs forever (attached to the object?) - # Or just ignore raw stderr for now? - # The previous implementation read stderr until EOF. - # I'll stick to reading stdout loop. Raw stderr will be lost if I don't read it. - # I'll add a background task to self that consumes stderr? - pass + worker = threading.Thread(target=_isolated_thread, daemon=True) + worker.start() - stdout_task = asyncio.create_task(_handle_stdout()) + # Wait for the isolated thread using anyio's thread dispatcher. + import functools as _ft + completed = await anyio.to_thread.run_sync( + _ft.partial(done_event.wait, timeout=timeout), + abandon_on_cancel=True, + ) - try: - await asyncio.wait_for(stdout_task, timeout=timeout) - except asyncio.TimeoutError as exc: - # We don't kill the process on timeout, we just stop waiting? - # Or do we kill it? - # If we don't kill it, the loop is still running. - # We should probably kill it to clear state? - # Or send a "cancel" message? - # For now, let's kill it on timeout to be safe. - process.kill() - await process.wait() + if not completed: + popen.kill() + popen.wait() raise SandboxTimeout( f"Execution timed out after {timeout}s", stdout="".join(stdout_chunks), stderr="".join(stderr_chunks), - ) from exc + ) - stdout_text = "".join(stdout_chunks) - stderr_text = "".join(stderr_chunks) + if thread_error[0] is not None: + exc = thread_error[0] + if isinstance(exc, SandboxTimeout): + raise exc + raise SandboxError( + f"Sandbox execution failed: {exc}", + stdout="".join(stdout_chunks), + stderr="".join(stderr_chunks), + ) + + return SandboxResult( + True, 0, "".join(stdout_chunks), "".join(stderr_chunks) + ) - # We don't check return code because process is still running. - return SandboxResult(True, 0, stdout_text, stderr_text) + async def _docker_exchange( + self, + popen: Any, + code: str, + stdout_chunks: List[str], + stderr_chunks: List[str], + rpc_handler: Optional[ + Callable[[Dict[str, object]], Awaitable[Dict[str, object]]] + ], + main_loop: asyncio.AbstractEventLoop, + timeout: int, + ) -> None: + """Execute code inside the Docker container. + + Runs on a **separate event loop** in a dedicated thread, fully + isolated from the main loop's MCP client tasks. + """ + # Drain stderr in background to prevent pipe deadlock + async def _drain_stderr() -> None: + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, self._sync_drain_stderr, + popen, stderr_chunks) + + asyncio.ensure_future(_drain_stderr()) + + # Give the container time to initialize (parse entrypoint, + # set up asyncio, register stdin reader). + await asyncio.sleep(2) + + # Check if process is still alive before writing + rc = popen.poll() + if rc is not None: + remaining_err = popen.stderr.read().decode(errors="replace") if popen.stderr else "" + stderr_chunks.append(remaining_err) + all_stderr = "".join(stderr_chunks) + raise SandboxError( + f"Container exited during init (rc={rc}): {all_stderr[:500]}", + stdout="", + stderr=all_stderr, + ) + + # Send execute request (synchronous write is fine — stdin pipe + # is never congested for a single small JSON line). + popen.stdin.write( + json.dumps({"type": "execute", "code": code}).encode("utf-8") + + b"\n" + ) + popen.stdin.flush() + + # Read stdout lines on this isolated loop's executor + loop = asyncio.get_running_loop() + await loop.run_in_executor( + None, + self._sync_read_stdout, + popen, stdout_chunks, stderr_chunks, + rpc_handler, main_loop, + ) + + @staticmethod + def _sync_drain_stderr( + popen: Any, stderr_chunks: List[str] + ) -> None: + """Blocking stderr reader — runs in executor thread.""" + try: + for line in popen.stderr: + stderr_chunks.append(line.decode(errors="replace")) + except Exception: + pass + + @staticmethod + def _sync_read_stdout( + popen: Any, + stdout_chunks: List[str], + stderr_chunks: List[str], + rpc_handler: Optional[ + Callable[[Dict[str, object]], Awaitable[Dict[str, object]]] + ], + main_loop: asyncio.AbstractEventLoop, + ) -> None: + """Blocking stdout reader — runs in executor thread.""" + logger.info("_sync_read_stdout: waiting for lines from pid=%s", popen.pid) + for raw_line in popen.stdout: + logger.info("_sync_read_stdout: got line (%d bytes)", len(raw_line)) + raw_line = raw_line.rstrip(b"\n") + if not raw_line.strip(): + continue + try: + message = json.loads(raw_line.decode()) + except Exception: + stderr_chunks.append(raw_line.decode(errors="replace")) + continue + + msg_type = message.get("type") + if msg_type == "stdout": + stdout_chunks.append(message.get("data", "")) + elif msg_type == "stderr": + stderr_chunks.append(message.get("data", "")) + elif msg_type == "execution_done": + return + elif msg_type == "rpc_request": + # Bridge the RPC call back to the *main* loop where + # the MCP client sessions live. + if rpc_handler is not None: + future = asyncio.run_coroutine_threadsafe( + rpc_handler( + message.get("payload", {}) + if isinstance(message.get("payload"), dict) + else {} + ), + main_loop, + ) + try: + resp = future.result(timeout=30) + except Exception as exc: + resp = {"success": False, "error": str(exc)} + else: + resp = {"success": False, "error": "No RPC handler"} + reply: Dict[str, object] = { + "type": "rpc_response", + "id": message.get("id"), + "success": resp.get("success", True), + "payload": resp, + } + if not reply["success"]: + reply["error"] = resp.get("error", "RPC error") + popen.stdin.write( + json.dumps( + reply, separators=(",", ":") + ).encode("utf-8") + + b"\n" + ) + popen.stdin.flush() async def _stop_runtime(self) -> None: + if self._popen is not None: + try: + self._popen.terminate() + self._popen.wait(timeout=5) + except Exception: + pass + self._popen = None if self._process: try: self._process.terminate() @@ -2576,13 +2688,40 @@ async def execute_code( ) -> SandboxResult: await self.discover_servers() request_timeout = max(1, min(MAX_TIMEOUT, timeout)) + # Auto-load all discovered servers only when None (caller didn't specify). + # An explicit empty list [] means "no servers". + if servers is None and self.servers: + servers = list(self.servers.keys()) requested_servers = list(dict.fromkeys(servers or [])) + # Load each server and collect metadata. Keep clients alive + # during execution so their async generators stay in the same + # task context for clean shutdown. + prefetched_metadata: List[Dict[str, object]] = [] + actually_loaded: List[str] = [] for server_name in requested_servers: - await self.load_server(server_name) - - async with SandboxInvocation(self, requested_servers) as invocation: + try: + await self.load_server(server_name) + metadata = await self.get_cached_server_metadata(server_name) + prefetched_metadata.append(metadata) + actually_loaded.append(server_name) + except Exception as exc: + logger.warning("Failed to load bridged server %s: %s", server_name, exc) + + # Pass empty server list so __aenter__ skips metadata fetch. + async with SandboxInvocation(self, []) as invocation: + invocation.server_metadata = prefetched_metadata + invocation.allowed_servers = { + str(m.get("name")) for m in prefetched_metadata + if isinstance(m.get("name"), str) + } + # Fix up container env — __aenter__ set it before we populated + # server_metadata, so MCP_AVAILABLE_SERVERS would be empty. + invocation.container_env["MCP_AVAILABLE_SERVERS"] = json.dumps( + invocation.server_metadata, separators=(",", ":"), + ) sandbox_obj = cast(SandboxLike, self.sandbox) + result = await sandbox_obj.execute( code, timeout=request_timeout, @@ -2594,6 +2733,16 @@ async def execute_code( rpc_handler=invocation.handle_rpc, ) + # Clean up bridged clients properly (same task context as start). + for server_name in list(self.clients.keys()): + client = self.clients.pop(server_name, None) + if client: + try: + await client.stop() + except Exception: + pass + self.loaded_servers.discard(server_name) + if not result.success: raise SandboxError( f"Sandbox exited with code {result.exit_code}",