From 96b454291858ec695a2f98ce5d4ddfe648ac5dc2 Mon Sep 17 00:00:00 2001 From: Jordan Diaz Date: Fri, 5 Jun 2026 17:38:19 +0000 Subject: [PATCH] fix(mcp): el read loop ya no muere con respuestas grandes (screenshots) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sintoma: "el agente se para cuando hace acciones". MCPClient._read_loop lee las respuestas JSON-RPC con stdout.readline(), cuyo StreamReader tenia buffer de 1MB. Una respuesta llega en UNA linea; playwright__browser_take_screenshot({fullPage: true}) devuelve la imagen en base64 en esa linea y supera el limite → asyncio.LimitOverrunError → el except Exception mataba el read loop y dejaba la sesion MCP inservible (los turnos siguientes ejecutaban 0 tools). Fix en dos capas: - MCP_STREAM_LIMIT=64MB en create_subprocess_exec(limit=...) — cubre cualquier screenshot real. - Read loop tolerante: captura (ValueError, LimitOverrunError), descarta solo esa respuesta re-sincronizando el stream hasta el \n (_drain_until_newline) y sigue vivo, en vez de matar toda la sesion MCP. Validado: navegar + screenshot fullPage + glob ejecuta las 4 tools sin "read loop error" y sin colapsar el contexto. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/mcp/client.py | 46 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 43 insertions(+), 3 deletions(-) diff --git a/src/mcp/client.py b/src/mcp/client.py index 319d8ca..266228f 100644 --- a/src/mcp/client.py +++ b/src/mcp/client.py @@ -18,6 +18,15 @@ from ..models.tools import ToolDefinition logger = logging.getLogger(__name__) +# Buffer maximo (bytes) del StreamReader para leer las respuestas JSON-RPC del +# MCP por stdout. Una respuesta llega en UNA sola linea; tools como el +# screenshot fullPage de Playwright devuelven la imagen en base64 en esa linea +# y superan de largo el 64KB por defecto de asyncio (y el 1MB que teniamos), +# lanzando LimitOverrunError que mataba el read loop y dejaba la sesion MCP +# inservible (el agente "se paraba" al hacer acciones). 64MB cubre cualquier +# screenshot real; por encima, el read loop descarta esa respuesta y sigue vivo. +MCP_STREAM_LIMIT = 64 * 1024 * 1024 + class MCPClientError(Exception): pass @@ -74,7 +83,7 @@ class MCPClient: stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=self._env, - limit=1024 * 1024, # 1MB buffer for large MCP responses + limit=MCP_STREAM_LIMIT, # buffer grande para respuestas MCP (screenshots base64) ) self._running = True self._reader_task = asyncio.create_task(self._read_loop()) @@ -225,14 +234,30 @@ class MCPClient: if not self._process or not self._process.stdout: return + stdout = self._process.stdout try: while self._running: - line = await self._process.stdout.readline() + try: + line = await stdout.readline() + except (ValueError, asyncio.LimitOverrunError): + # Una respuesta JSON-RPC supero el buffer (p.ej. screenshot + # fullPage de Playwright en base64 por encima de 64MB). Antes + # esto mataba el read loop y dejaba TODA la sesion MCP muerta + # (el agente se "paraba" en la siguiente accion). Ahora + # descartamos solo esa respuesta, re-sincronizamos el stream + # y seguimos vivos para las demas tools. + logger.warning( + "MCP [%s]: respuesta supera el buffer (%d MB), se descarta y se continua", + self.name, MCP_STREAM_LIMIT // (1024 * 1024), + ) + await self._drain_until_newline(stdout) + continue + if not line: logger.warning("MCP server stdout closed") break - line_str = line.decode().strip() + line_str = line.decode(errors="replace").strip() if not line_str: continue @@ -251,6 +276,21 @@ class MCPClient: finally: self._running = False + async def _drain_until_newline(self, stdout: asyncio.StreamReader) -> None: + """Consume bytes del stream hasta el proximo salto de linea para + re-sincronizar tras un LimitOverrunError (la respuesta sobredimensionada + se descarta). `read()` no usa separador, asi que no vuelve a disparar el + overrun y va vaciando el buffer hasta liberar la linea gigante.""" + while self._running: + try: + chunk = await stdout.read(65536) + except Exception: + return + if not chunk: + return + if b"\n" in chunk: + return + def _handle_message(self, message: dict[str, Any]) -> None: """Route an incoming JSON-RPC message.""" msg_id = message.get("id")