fix(mcp): límite LRU de sesiones MCP — evita degradación del contexto
Las sesiones MCP no se limpiaban (registry._sessions crecía sin límite); cada una arranca 3 subprocesos stdio (acai-code, playwright+chromium, fetch) con sus read-loops. Con varias vivas a la vez, las sesiones nuevas recibían cada vez menos contexto/tools al modelo, hasta que el modelo dejaba de recibir tools y emitía los tool calls como texto sin ejecutarlos (~300 input_tokens). Esto degradaba el chat a lo largo del día hasta reiniciar el container. Fix: MAX_ACTIVE_MCP_SESSIONS=2 con evicción LRU (touch last_used en create/get_for_session, _evict_lru destruye las menos usadas). Seguro porque send_message reconecta el MCP de una sesión evictada si vuelve a usarse. Validado: 1 sesión viva era estable, 6 colapsaban; con cap=2, 7 sesiones secuenciales se mantienen estables (40-115K tokens, tools OK). Mitigación, no cura de fondo: el motivo por el que N managers vivos degradan (probable: chromium de playwright) queda pendiente para subir el umbral. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -8,6 +8,7 @@ defines the project-specific variables (ACAI_WEB_URL, etc.).
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
import time
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
@@ -17,6 +18,15 @@ from .manager import MCPManager
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Máximo de sesiones MCP vivas a la vez. Cada sesión arranca N subprocesos
|
||||||
|
# stdio (acai-code, playwright+chromium, fetch) con sus read-loops en el event
|
||||||
|
# loop; si se acumulan sin límite, saturan recursos y DEGRADAN progresivamente
|
||||||
|
# las sesiones nuevas (menos contexto/tools al modelo, hasta que el agente deja
|
||||||
|
# de recibir tools y emite los tool calls como texto). Evictamos por LRU las
|
||||||
|
# menos usadas — es seguro porque send_message reconecta el MCP de una sesión
|
||||||
|
# si vuelve a usarse y ya no está viva.
|
||||||
|
MAX_ACTIVE_MCP_SESSIONS = 2
|
||||||
|
|
||||||
|
|
||||||
class MCPRegistry:
|
class MCPRegistry:
|
||||||
"""Manages per-session MCPManager instances.
|
"""Manages per-session MCPManager instances.
|
||||||
@@ -29,6 +39,7 @@ class MCPRegistry:
|
|||||||
self._config_path = Path(config_path) if config_path else None
|
self._config_path = Path(config_path) if config_path else None
|
||||||
self._config: MCPConfigFile | None = None
|
self._config: MCPConfigFile | None = None
|
||||||
self._sessions: dict[str, MCPManager] = {} # session_id → MCPManager
|
self._sessions: dict[str, MCPManager] = {} # session_id → MCPManager
|
||||||
|
self._last_used: dict[str, float] = {} # session_id → monotonic ts (LRU)
|
||||||
|
|
||||||
def load_config(self) -> None:
|
def load_config(self) -> None:
|
||||||
"""Load the global MCP config template."""
|
"""Load the global MCP config template."""
|
||||||
@@ -91,6 +102,7 @@ class MCPRegistry:
|
|||||||
|
|
||||||
results = await manager.start()
|
results = await manager.start()
|
||||||
self._sessions[session_id] = manager
|
self._sessions[session_id] = manager
|
||||||
|
self._last_used[session_id] = time.monotonic()
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"MCP started for session %s: %s",
|
"MCP started for session %s: %s",
|
||||||
@@ -98,18 +110,41 @@ class MCPRegistry:
|
|||||||
{k: v.get("status") for k, v in results.items()},
|
{k: v.get("status") for k, v in results.items()},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Evictar por LRU si superamos el máximo (evita la degradación por
|
||||||
|
# acumulación de subprocesos MCP). Seguro: send_message reconecta.
|
||||||
|
await self._evict_lru()
|
||||||
|
|
||||||
return manager
|
return manager
|
||||||
|
|
||||||
|
async def _evict_lru(self) -> None:
|
||||||
|
"""Destruye las sesiones MCP menos usadas recientemente hasta no superar
|
||||||
|
MAX_ACTIVE_MCP_SESSIONS."""
|
||||||
|
while len(self._sessions) > MAX_ACTIVE_MCP_SESSIONS:
|
||||||
|
# sesión con last_used más antiguo (las que no tienen ts van primero)
|
||||||
|
oldest = min(
|
||||||
|
self._sessions.keys(),
|
||||||
|
key=lambda sid: self._last_used.get(sid, 0.0),
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"MCP registry: evicting LRU session %s (%d active > max %d)",
|
||||||
|
oldest[:12], len(self._sessions), MAX_ACTIVE_MCP_SESSIONS,
|
||||||
|
)
|
||||||
|
await self.destroy_for_session(oldest)
|
||||||
|
|
||||||
async def destroy_for_session(self, session_id: str) -> None:
|
async def destroy_for_session(self, session_id: str) -> None:
|
||||||
"""Stop and clean up MCP servers for a session."""
|
"""Stop and clean up MCP servers for a session."""
|
||||||
manager = self._sessions.pop(session_id, None)
|
manager = self._sessions.pop(session_id, None)
|
||||||
|
self._last_used.pop(session_id, None)
|
||||||
if manager:
|
if manager:
|
||||||
await manager.stop()
|
await manager.stop()
|
||||||
logger.info("MCP stopped for session %s", session_id[:12])
|
logger.info("MCP stopped for session %s", session_id[:12])
|
||||||
|
|
||||||
def get_for_session(self, session_id: str) -> MCPManager | None:
|
def get_for_session(self, session_id: str) -> MCPManager | None:
|
||||||
"""Get the MCPManager for a session, if any."""
|
"""Get the MCPManager for a session, if any."""
|
||||||
return self._sessions.get(session_id)
|
manager = self._sessions.get(session_id)
|
||||||
|
if manager is not None:
|
||||||
|
self._last_used[session_id] = time.monotonic() # touch LRU
|
||||||
|
return manager
|
||||||
|
|
||||||
async def stop_all(self) -> None:
|
async def stop_all(self) -> None:
|
||||||
"""Stop all sessions' MCP servers (shutdown)."""
|
"""Stop all sessions' MCP servers (shutdown)."""
|
||||||
|
|||||||
Reference in New Issue
Block a user