fix(mcp): el read loop ya no muere con respuestas grandes (screenshots)
Sintoma: "el agente se para cuando hace acciones". MCPClient._read_loop lee las
respuestas JSON-RPC con stdout.readline(), cuyo StreamReader tenia buffer de 1MB.
Una respuesta llega en UNA linea; playwright__browser_take_screenshot({fullPage:
true}) devuelve la imagen en base64 en esa linea y supera el limite →
asyncio.LimitOverrunError → el except Exception mataba el read loop y dejaba la
sesion MCP inservible (los turnos siguientes ejecutaban 0 tools).
Fix en dos capas:
- MCP_STREAM_LIMIT=64MB en create_subprocess_exec(limit=...) — cubre cualquier
screenshot real.
- Read loop tolerante: captura (ValueError, LimitOverrunError), descarta solo esa
respuesta re-sincronizando el stream hasta el \n (_drain_until_newline) y sigue
vivo, en vez de matar toda la sesion MCP.
Validado: navegar + screenshot fullPage + glob ejecuta las 4 tools sin "read loop
error" y sin colapsar el contexto.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -18,6 +18,15 @@ from ..models.tools import ToolDefinition
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Buffer maximo (bytes) del StreamReader para leer las respuestas JSON-RPC del
|
||||
# MCP por stdout. Una respuesta llega en UNA sola linea; tools como el
|
||||
# screenshot fullPage de Playwright devuelven la imagen en base64 en esa linea
|
||||
# y superan de largo el 64KB por defecto de asyncio (y el 1MB que teniamos),
|
||||
# lanzando LimitOverrunError que mataba el read loop y dejaba la sesion MCP
|
||||
# inservible (el agente "se paraba" al hacer acciones). 64MB cubre cualquier
|
||||
# screenshot real; por encima, el read loop descarta esa respuesta y sigue vivo.
|
||||
MCP_STREAM_LIMIT = 64 * 1024 * 1024
|
||||
|
||||
|
||||
class MCPClientError(Exception):
|
||||
pass
|
||||
@@ -74,7 +83,7 @@ class MCPClient:
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
env=self._env,
|
||||
limit=1024 * 1024, # 1MB buffer for large MCP responses
|
||||
limit=MCP_STREAM_LIMIT, # buffer grande para respuestas MCP (screenshots base64)
|
||||
)
|
||||
self._running = True
|
||||
self._reader_task = asyncio.create_task(self._read_loop())
|
||||
@@ -225,14 +234,30 @@ class MCPClient:
|
||||
if not self._process or not self._process.stdout:
|
||||
return
|
||||
|
||||
stdout = self._process.stdout
|
||||
try:
|
||||
while self._running:
|
||||
line = await self._process.stdout.readline()
|
||||
try:
|
||||
line = await stdout.readline()
|
||||
except (ValueError, asyncio.LimitOverrunError):
|
||||
# Una respuesta JSON-RPC supero el buffer (p.ej. screenshot
|
||||
# fullPage de Playwright en base64 por encima de 64MB). Antes
|
||||
# esto mataba el read loop y dejaba TODA la sesion MCP muerta
|
||||
# (el agente se "paraba" en la siguiente accion). Ahora
|
||||
# descartamos solo esa respuesta, re-sincronizamos el stream
|
||||
# y seguimos vivos para las demas tools.
|
||||
logger.warning(
|
||||
"MCP [%s]: respuesta supera el buffer (%d MB), se descarta y se continua",
|
||||
self.name, MCP_STREAM_LIMIT // (1024 * 1024),
|
||||
)
|
||||
await self._drain_until_newline(stdout)
|
||||
continue
|
||||
|
||||
if not line:
|
||||
logger.warning("MCP server stdout closed")
|
||||
break
|
||||
|
||||
line_str = line.decode().strip()
|
||||
line_str = line.decode(errors="replace").strip()
|
||||
if not line_str:
|
||||
continue
|
||||
|
||||
@@ -251,6 +276,21 @@ class MCPClient:
|
||||
finally:
|
||||
self._running = False
|
||||
|
||||
async def _drain_until_newline(self, stdout: asyncio.StreamReader) -> None:
|
||||
"""Consume bytes del stream hasta el proximo salto de linea para
|
||||
re-sincronizar tras un LimitOverrunError (la respuesta sobredimensionada
|
||||
se descarta). `read()` no usa separador, asi que no vuelve a disparar el
|
||||
overrun y va vaciando el buffer hasta liberar la linea gigante."""
|
||||
while self._running:
|
||||
try:
|
||||
chunk = await stdout.read(65536)
|
||||
except Exception:
|
||||
return
|
||||
if not chunk:
|
||||
return
|
||||
if b"\n" in chunk:
|
||||
return
|
||||
|
||||
def _handle_message(self, message: dict[str, Any]) -> None:
|
||||
"""Route an incoming JSON-RPC message."""
|
||||
msg_id = message.get("id")
|
||||
|
||||
Reference in New Issue
Block a user