diff --git a/src/mcp/client.py b/src/mcp/client.py index 319d8ca..266228f 100644 --- a/src/mcp/client.py +++ b/src/mcp/client.py @@ -18,6 +18,15 @@ from ..models.tools import ToolDefinition logger = logging.getLogger(__name__) +# Buffer maximo (bytes) del StreamReader para leer las respuestas JSON-RPC del +# MCP por stdout. Una respuesta llega en UNA sola linea; tools como el +# screenshot fullPage de Playwright devuelven la imagen en base64 en esa linea +# y superan de largo el 64KB por defecto de asyncio (y el 1MB que teniamos), +# lanzando LimitOverrunError que mataba el read loop y dejaba la sesion MCP +# inservible (el agente "se paraba" al hacer acciones). 64MB cubre cualquier +# screenshot real; por encima, el read loop descarta esa respuesta y sigue vivo. +MCP_STREAM_LIMIT = 64 * 1024 * 1024 + class MCPClientError(Exception): pass @@ -74,7 +83,7 @@ class MCPClient: stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=self._env, - limit=1024 * 1024, # 1MB buffer for large MCP responses + limit=MCP_STREAM_LIMIT, # buffer grande para respuestas MCP (screenshots base64) ) self._running = True self._reader_task = asyncio.create_task(self._read_loop()) @@ -225,14 +234,30 @@ class MCPClient: if not self._process or not self._process.stdout: return + stdout = self._process.stdout try: while self._running: - line = await self._process.stdout.readline() + try: + line = await stdout.readline() + except (ValueError, asyncio.LimitOverrunError): + # Una respuesta JSON-RPC supero el buffer (p.ej. screenshot + # fullPage de Playwright en base64 por encima de 64MB). Antes + # esto mataba el read loop y dejaba TODA la sesion MCP muerta + # (el agente se "paraba" en la siguiente accion). Ahora + # descartamos solo esa respuesta, re-sincronizamos el stream + # y seguimos vivos para las demas tools. + logger.warning( + "MCP [%s]: respuesta supera el buffer (%d MB), se descarta y se continua", + self.name, MCP_STREAM_LIMIT // (1024 * 1024), + ) + await self._drain_until_newline(stdout) + continue + if not line: logger.warning("MCP server stdout closed") break - line_str = line.decode().strip() + line_str = line.decode(errors="replace").strip() if not line_str: continue @@ -251,6 +276,21 @@ class MCPClient: finally: self._running = False + async def _drain_until_newline(self, stdout: asyncio.StreamReader) -> None: + """Consume bytes del stream hasta el proximo salto de linea para + re-sincronizar tras un LimitOverrunError (la respuesta sobredimensionada + se descarta). `read()` no usa separador, asi que no vuelve a disparar el + overrun y va vaciando el buffer hasta liberar la linea gigante.""" + while self._running: + try: + chunk = await stdout.read(65536) + except Exception: + return + if not chunk: + return + if b"\n" in chunk: + return + def _handle_message(self, message: dict[str, Any]) -> None: """Route an incoming JSON-RPC message.""" msg_id = message.get("id")