diff --git a/src/mcp/registry.py b/src/mcp/registry.py index 5e6d047..aa1460a 100644 --- a/src/mcp/registry.py +++ b/src/mcp/registry.py @@ -8,6 +8,7 @@ defines the project-specific variables (ACAI_WEB_URL, etc.). from __future__ import annotations import logging +import time from pathlib import Path from typing import Any @@ -17,6 +18,15 @@ from .manager import MCPManager logger = logging.getLogger(__name__) +# Máximo de sesiones MCP vivas a la vez. Cada sesión arranca N subprocesos +# stdio (acai-code, playwright+chromium, fetch) con sus read-loops en el event +# loop; si se acumulan sin límite, saturan recursos y DEGRADAN progresivamente +# las sesiones nuevas (menos contexto/tools al modelo, hasta que el agente deja +# de recibir tools y emite los tool calls como texto). Evictamos por LRU las +# menos usadas — es seguro porque send_message reconecta el MCP de una sesión +# si vuelve a usarse y ya no está viva. +MAX_ACTIVE_MCP_SESSIONS = 2 + class MCPRegistry: """Manages per-session MCPManager instances. @@ -29,6 +39,7 @@ class MCPRegistry: self._config_path = Path(config_path) if config_path else None self._config: MCPConfigFile | None = None self._sessions: dict[str, MCPManager] = {} # session_id → MCPManager + self._last_used: dict[str, float] = {} # session_id → monotonic ts (LRU) def load_config(self) -> None: """Load the global MCP config template.""" @@ -91,6 +102,7 @@ class MCPRegistry: results = await manager.start() self._sessions[session_id] = manager + self._last_used[session_id] = time.monotonic() logger.info( "MCP started for session %s: %s", @@ -98,18 +110,41 @@ class MCPRegistry: {k: v.get("status") for k, v in results.items()}, ) + # Evictar por LRU si superamos el máximo (evita la degradación por + # acumulación de subprocesos MCP). Seguro: send_message reconecta. + await self._evict_lru() + return manager + async def _evict_lru(self) -> None: + """Destruye las sesiones MCP menos usadas recientemente hasta no superar + MAX_ACTIVE_MCP_SESSIONS.""" + while len(self._sessions) > MAX_ACTIVE_MCP_SESSIONS: + # sesión con last_used más antiguo (las que no tienen ts van primero) + oldest = min( + self._sessions.keys(), + key=lambda sid: self._last_used.get(sid, 0.0), + ) + logger.info( + "MCP registry: evicting LRU session %s (%d active > max %d)", + oldest[:12], len(self._sessions), MAX_ACTIVE_MCP_SESSIONS, + ) + await self.destroy_for_session(oldest) + async def destroy_for_session(self, session_id: str) -> None: """Stop and clean up MCP servers for a session.""" manager = self._sessions.pop(session_id, None) + self._last_used.pop(session_id, None) if manager: await manager.stop() logger.info("MCP stopped for session %s", session_id[:12]) def get_for_session(self, session_id: str) -> MCPManager | None: """Get the MCPManager for a session, if any.""" - return self._sessions.get(session_id) + manager = self._sessions.get(session_id) + if manager is not None: + self._last_used[session_id] = time.monotonic() # touch LRU + return manager async def stop_all(self) -> None: """Stop all sessions' MCP servers (shutdown)."""