diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4da07d4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,28 @@ +# Environment variables +.env +.env.local +.env.*.local + +# Python +__pycache__/ +*.pyc +*.pyo +*.egg-info/ +dist/ +build/ +.venv/ +venv/ + +# Node +node_modules/ +npm-debug.log* + +# OS +.DS_Store +Thumbs.db + +# IDE +.vscode/ +.idea/ +*.swp +*.swo diff --git a/README.md b/README.md index ce29043..cc9355a 100644 --- a/README.md +++ b/README.md @@ -227,20 +227,93 @@ mensaje → planner → [step₁ → step₂ → ... → stepₙ] → reviewer ## MCP (Model Context Protocol) -Cliente stdio que se conecta a un servidor MCP al arrancar: +Arquitectura **per-session**: el microservicio corre como un Docker único, y cada sesión arranca sus propios subprocesos MCP con las env vars del proyecto del usuario. + +### Modelo de operación + +``` +Docker (microservicio — uno solo corriendo) + └── mcp.json ← template: QUÉ servers arrancar (global) + └── mcp-server/ ← código MCP (baked-in) + +POST /sessions { mcp_env: { ACAI_WEB_URL: "https://tienda.com", ... } } + → Arranca subprocesos MCP con esas env vars + → Session aislada con su propio MCPManager + → 33 tools descubiertas automáticamente + +DELETE /sessions/{id} + → Mata los subprocesos MCP de esa sesión +``` + +### Configuración + +**1. Template global** (`mcp.json` en la raíz — define QUÉ servers existen): + +```json +{ + "mcpServers": { + "acai-code": { + "command": "node", + "args": ["mcp-server/stdio.js"], + "env": {}, + "timeout": 30, + "startup_timeout": 10 + } + } +} +``` ```bash # En .env -AGENTIC_MCP_SERVER_COMMAND=node -AGENTIC_MCP_SERVER_ARGS=["mcp-server/stdio.js"] - -# Variables del MCP server (se heredan al subproceso) -ACAI_WEB_URL=http://localhost:8080 -ACAI_WEBSITE=mi-sitio -ACAI_PROJECT_DIR=/ruta/al/proyecto +AGENTIC_MCP_CONFIG_PATH=mcp.json ``` -El cliente descubre tools automáticamente vía `tools/list`, y los agentes las usan durante la ejecución. Los resultados de tools **nunca** entran al contexto como raw output — se resumen como artifacts. +**2. Per-session env vars** (cada usuario/proyecto pasa las suyas al crear sesión): + +```bash +curl -X POST http://localhost:8001/api/v1/sessions \ + -H "Content-Type: application/json" \ + -d '{ + "project_profile": {"name": "tienda-online"}, + "mcp_env": { + "ACAI_WEB_URL": "https://superadmin_tienda.forge.acaisuite.com/", + "ACAI_WEBSITE": "tienda.com", + "ACAI_PROJECT_DIR": "/projects/tienda" + } + }' +``` + +Las env vars de `mcp_env` se fusionan con las del template y se pasan al subproceso MCP. El MCP server lee el token desde el `.acai` del proyecto automáticamente. + +**3. Legacy** (un solo server global, sin `mcp.json`): + +```bash +AGENTIC_MCP_SERVER_COMMAND=node +AGENTIC_MCP_SERVER_ARGS=["mcp-server/stdio.js"] +``` + +### Tool namespacing + +En modo multi-server, los tools se namespean: `acai-code.compile_module`, `filesystem.read_file`. En modo single-server los tools mantienen su nombre original. + +### API de gestión MCP + +```bash +# Ver estado de todos los MCP servers activos (por sesión) +curl http://localhost:8001/api/v1/mcp/status + +# Hot-reload del template (re-lee mcp.json, no afecta sesiones activas) +curl -X POST http://localhost:8001/api/v1/mcp/reload +``` + +### Ciclo de vida + +1. `POST /sessions` con `mcp_env` → arranca subprocesos MCP para esa sesión +2. `POST /sessions/{id}/messages` → el agente usa los tools del MCP de esa sesión +3. Si el server se reinicia y la sesión sigue en Redis → el MCP se reconecta automáticamente al siguiente mensaje +4. `DELETE /sessions/{id}` → mata subprocesos MCP de esa sesión + +Los resultados de tools **nunca** entran al contexto como raw output — se resumen como artifacts. ## Configuración @@ -261,8 +334,9 @@ Variables de entorno con prefijo `AGENTIC_`: | `AGENTIC_MAX_EXECUTION_STEPS` | `25` | Max steps por task | | `AGENTIC_MAX_EXECUTION_TIMEOUT_SECONDS` | `300` | Timeout global (5 min) | | `AGENTIC_SUBAGENT_MAX_STEPS` | `10` | Max iterations por subagent | -| `AGENTIC_MCP_SERVER_COMMAND` | — | Comando del servidor MCP | -| `AGENTIC_MCP_SERVER_ARGS` | `[]` | Argumentos del servidor MCP | +| `AGENTIC_MCP_CONFIG_PATH` | — | Ruta a `mcp.json` (multi-MCP per-session) | +| `AGENTIC_MCP_SERVER_COMMAND` | — | Legacy: comando del servidor MCP (single) | +| `AGENTIC_MCP_SERVER_ARGS` | `[]` | Legacy: argumentos del servidor MCP | | `AGENTIC_MCP_TIMEOUT_SECONDS` | `30` | Timeout por tool call | | `AGENTIC_DEBUG` | `false` | Logging verbose | @@ -302,8 +376,11 @@ agentic:memory:_type:{type} → Set de doc IDs por tipo │ │ ├── base.py # ModelAdapter interface │ │ ├── claude_adapter.py # Anthropic Claude (streaming) │ │ └── openai_adapter.py # OpenAI GPT (streaming) -│ ├── mcp/ # MCP client -│ │ └── client.py # stdio transport, tool registry +│ ├── mcp/ # MCP (per-session, multi-server) +│ │ ├── client.py # stdio transport, tool registry +│ │ ├── config.py # mcp.json parser (Pydantic) +│ │ ├── manager.py # MCPManager (aggregates tools, routes calls) +│ │ └── registry.py # Per-session MCP lifecycle │ ├── orchestrator/ # Agent orchestration │ │ ├── engine.py # Pipeline + error recovery + timeout │ │ ├── router.py # Step-to-agent routing @@ -328,6 +405,8 @@ agentic:memory:_type:{type} → Set de doc IDs por tipo │ └── components/ # Sidebar, chat, event log, inspector, timeline ├── mcp-server/ # Acai MCP server (Node.js, stdio) ├── docs/ # Knowledge base documents (*.md) +├── mcp.json # MCP server template (qué servers arrancar) +├── mcp.json.example # Ejemplo con múltiples servers ├── Dockerfile ├── docker-compose.yml ├── requirements.txt diff --git a/mcp.json b/mcp.json new file mode 100644 index 0000000..2a212c6 --- /dev/null +++ b/mcp.json @@ -0,0 +1,11 @@ +{ + "mcpServers": { + "acai-code": { + "command": "node", + "args": ["mcp-server/stdio.js"], + "env": {}, + "timeout": 30, + "startup_timeout": 10 + } + } +} diff --git a/mcp.json.example b/mcp.json.example new file mode 100644 index 0000000..de634f0 --- /dev/null +++ b/mcp.json.example @@ -0,0 +1,19 @@ +{ + "mcpServers": { + "acai-code": { + "command": "node", + "args": ["mcp-server/stdio.js"], + "env": { + "ACAI_WEB_URL": "http://localhost:8080", + "ACAI_WEBSITE": "mi-sitio", + "ACAI_PROJECT_DIR": "/ruta/al/proyecto" + }, + "timeout": 30, + "startup_timeout": 10 + }, + "filesystem": { + "command": "npx", + "args": ["@modelcontextprotocol/server-filesystem", "/tmp"] + } + } +} diff --git a/src/__pycache__/__init__.cpython-312.pyc b/src/__pycache__/__init__.cpython-312.pyc deleted file mode 100644 index 4b00cdb..0000000 Binary files a/src/__pycache__/__init__.cpython-312.pyc and /dev/null differ diff --git a/src/__pycache__/config.cpython-312.pyc b/src/__pycache__/config.cpython-312.pyc deleted file mode 100644 index 1d2b6af..0000000 Binary files a/src/__pycache__/config.cpython-312.pyc and /dev/null differ diff --git a/src/__pycache__/main.cpython-312.pyc b/src/__pycache__/main.cpython-312.pyc deleted file mode 100644 index 9288ce7..0000000 Binary files a/src/__pycache__/main.cpython-312.pyc and /dev/null differ diff --git a/src/adapters/__pycache__/__init__.cpython-312.pyc b/src/adapters/__pycache__/__init__.cpython-312.pyc deleted file mode 100644 index fa93f7a..0000000 Binary files a/src/adapters/__pycache__/__init__.cpython-312.pyc and /dev/null differ diff --git a/src/adapters/__pycache__/base.cpython-312.pyc b/src/adapters/__pycache__/base.cpython-312.pyc deleted file mode 100644 index 761ccca..0000000 Binary files a/src/adapters/__pycache__/base.cpython-312.pyc and /dev/null differ diff --git a/src/adapters/__pycache__/claude_adapter.cpython-312.pyc b/src/adapters/__pycache__/claude_adapter.cpython-312.pyc deleted file mode 100644 index 37df90b..0000000 Binary files a/src/adapters/__pycache__/claude_adapter.cpython-312.pyc and /dev/null differ diff --git a/src/adapters/__pycache__/openai_adapter.cpython-312.pyc b/src/adapters/__pycache__/openai_adapter.cpython-312.pyc deleted file mode 100644 index 502e051..0000000 Binary files a/src/adapters/__pycache__/openai_adapter.cpython-312.pyc and /dev/null differ diff --git a/src/api/__pycache__/__init__.cpython-312.pyc b/src/api/__pycache__/__init__.cpython-312.pyc deleted file mode 100644 index 3502dc9..0000000 Binary files a/src/api/__pycache__/__init__.cpython-312.pyc and /dev/null differ diff --git a/src/api/__pycache__/routes.cpython-312.pyc b/src/api/__pycache__/routes.cpython-312.pyc deleted file mode 100644 index 55086ff..0000000 Binary files a/src/api/__pycache__/routes.cpython-312.pyc and /dev/null differ diff --git a/src/api/routes.py b/src/api/routes.py index d2db28a..1012e96 100644 --- a/src/api/routes.py +++ b/src/api/routes.py @@ -13,6 +13,7 @@ from pydantic import BaseModel, Field from ..models.context import MemoryDocument, MemoryType from ..models.session import SessionState, SessionStatus +from ..orchestrator.engine import OrchestratorEngine from ..streaming.sse import EventType logger = logging.getLogger(__name__) @@ -28,6 +29,10 @@ class CreateSessionRequest(BaseModel): project_profile: dict[str, Any] = Field(default_factory=dict) immutable_rules: list[str] = Field(default_factory=list) metadata: dict[str, Any] = Field(default_factory=dict) + mcp_env: dict[str, str] = Field( + default_factory=dict, + description="Per-project env vars for MCP servers (e.g. ACAI_WEB_URL, ACAI_PROJECT_DIR)", + ) class CreateSessionResponse(BaseModel): @@ -59,32 +64,43 @@ _deps: dict[str, Any] = {} def set_dependencies( storage: Any, - orchestrator: Any, + model_adapter: Any, + context_engine: Any, + memory_store: Any, sse_emitter: Any, - context_engine: Any = None, - memory_store: Any = None, + mcp_registry: Any, ) -> None: _deps["storage"] = storage - _deps["orchestrator"] = orchestrator + _deps["model_adapter"] = model_adapter + _deps["context_engine"] = context_engine + _deps["memory_store"] = memory_store _deps["sse"] = sse_emitter - if context_engine: - _deps["context_engine"] = context_engine - if memory_store: - _deps["memory_store"] = memory_store + _deps["mcp_registry"] = mcp_registry def _get_storage(): return _deps["storage"] -def _get_orchestrator(): - return _deps["orchestrator"] - - def _get_sse(): return _deps["sse"] +def _get_mcp_registry(): + return _deps["mcp_registry"] + + +def _build_orchestrator(mcp_manager) -> OrchestratorEngine: + """Build an orchestrator with a session-specific MCPManager.""" + return OrchestratorEngine( + model_adapter=_deps["model_adapter"], + context_engine=_deps["context_engine"], + mcp_client=mcp_manager, + memory_store=_deps["memory_store"], + sse_emitter=_deps["sse"], + ) + + # ------------------------------------------------------------------ # POST /sessions # ------------------------------------------------------------------ @@ -97,8 +113,17 @@ async def create_session(body: CreateSessionRequest) -> CreateSessionResponse: immutable_rules=body.immutable_rules, metadata=body.metadata, ) + # Store mcp_env in session metadata for reconnection + if body.mcp_env: + session.metadata["mcp_env"] = body.mcp_env + await storage.create_session(session) + # Start per-session MCP servers with project-specific env + registry = _get_mcp_registry() + if registry.has_config: + await registry.create_for_session(session.session_id, body.mcp_env) + sse = _get_sse() await sse.emit( EventType.SESSION_CREATED, @@ -126,7 +151,16 @@ async def send_message( if not session: raise HTTPException(status_code=404, detail="Session not found") - orchestrator = _get_orchestrator() + # Get or create session's MCP manager + registry = _get_mcp_registry() + mcp_manager = registry.get_for_session(session_id) + if not mcp_manager and registry.has_config: + # Reconnect MCP (e.g. after server restart) + mcp_env = session.metadata.get("mcp_env", {}) + mcp_manager = await registry.create_for_session(session_id, mcp_env) + + from ..mcp.manager import MCPManager + orchestrator = _build_orchestrator(mcp_manager or MCPManager()) if body.stream: asyncio.create_task(_execute_and_persist(orchestrator, storage, session, body.message)) @@ -225,6 +259,10 @@ async def delete_session(session_id: str) -> dict[str, str]: if not deleted: raise HTTPException(status_code=404, detail="Session not found") + # Stop session's MCP servers + registry = _get_mcp_registry() + await registry.destroy_for_session(session_id) + sse = _get_sse() sse.cleanup_session(session_id) @@ -373,3 +411,28 @@ async def delete_knowledge(doc_id: str) -> dict[str, str]: if not deleted: raise HTTPException(status_code=404, detail="Document not found") return {"status": "deleted", "id": doc_id} + + +# ------------------------------------------------------------------ +# MCP Management +# ------------------------------------------------------------------ + +@router.get("/mcp/status") +async def mcp_status() -> dict[str, Any]: + """Status of all MCP servers across sessions.""" + registry = _get_mcp_registry() + return registry.get_status() + + +@router.post("/mcp/reload") +async def mcp_reload() -> dict[str, Any]: + """Hot-reload MCP config template (does not affect running sessions).""" + registry = _get_mcp_registry() + try: + registry.load_config() + return { + "status": "reloaded", + "servers": registry.server_names, + } + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) diff --git a/src/config.py b/src/config.py index d8e7f00..dd47241 100644 --- a/src/config.py +++ b/src/config.py @@ -42,7 +42,8 @@ class Settings(BaseSettings): working_context_max_items: int = 20 # --- MCP --- - mcp_server_command: str = "" + mcp_config_path: str = "" # Path to mcp.json; empty = legacy single-server mode + mcp_server_command: str = "" # Legacy: single server command mcp_server_args: list[str] = Field(default_factory=list) mcp_timeout_seconds: float = 30.0 mcp_startup_timeout_seconds: float = 10.0 diff --git a/src/context/__pycache__/__init__.cpython-312.pyc b/src/context/__pycache__/__init__.cpython-312.pyc deleted file mode 100644 index 7559ba6..0000000 Binary files a/src/context/__pycache__/__init__.cpython-312.pyc and /dev/null differ diff --git a/src/context/__pycache__/compactor.cpython-312.pyc b/src/context/__pycache__/compactor.cpython-312.pyc deleted file mode 100644 index 720b58a..0000000 Binary files a/src/context/__pycache__/compactor.cpython-312.pyc and /dev/null differ diff --git a/src/context/__pycache__/engine.cpython-312.pyc b/src/context/__pycache__/engine.cpython-312.pyc deleted file mode 100644 index a6afedc..0000000 Binary files a/src/context/__pycache__/engine.cpython-312.pyc and /dev/null differ diff --git a/src/main.py b/src/main.py index 6d4a06a..8f3d583 100644 --- a/src/main.py +++ b/src/main.py @@ -1,7 +1,10 @@ """Agentic Microservice — FastAPI application entry point. -Wires together all components: Redis storage, model adapters, MCP client, +Wires together all components: Redis storage, model adapters, MCP registry, context engine, orchestrator, and SSE streaming. + +MCP servers are per-session: the global mcp.json defines WHAT servers +to run, and each session provides project-specific env vars. """ from __future__ import annotations @@ -20,7 +23,7 @@ from .adapters.openai_adapter import OpenAIAdapter from .api.routes import router, set_dependencies from .config import settings from .context.engine import ContextEngine -from .mcp.client import MCPClient +from .mcp.registry import MCPRegistry from .memory.store import MemoryStore from .orchestrator.engine import OrchestratorEngine from .storage.redis import RedisStorage @@ -34,8 +37,8 @@ logger = logging.getLogger(__name__) # Global instances (initialized in lifespan) redis_storage = RedisStorage() -mcp_client = MCPClient() sse_emitter = SSEEmitter(redis_storage=redis_storage) +mcp_registry = MCPRegistry() @asynccontextmanager @@ -45,11 +48,9 @@ async def lifespan(app: FastAPI): # 1. Connect Redis await redis_storage.connect() - - # Wire SSE emitter to Redis for event persistence (re-set after connect) sse_emitter.set_storage(redis_storage) - # 2. Initialize model adapter (based on configured provider) + # 2. Initialize model adapter if settings.default_model_provider == "openai": model_adapter = OpenAIAdapter() logger.info("Using OpenAI adapter (model: %s)", settings.default_model_id) @@ -57,36 +58,37 @@ async def lifespan(app: FastAPI): model_adapter = ClaudeAdapter() logger.info("Using Claude adapter (model: %s)", settings.default_model_id) - # 3. Initialize memory store (uses same Redis connection) + # 3. Initialize memory store memory_store = MemoryStore(redis_storage.client) - # 4. Initialize context engine (with memory store for knowledge base) + # 4. Initialize context engine context_engine = ContextEngine(memory_store=memory_store) - # 5. Start MCP client (if configured) - if settings.mcp_server_command: - try: - await mcp_client.start() - logger.info("MCP client started with %d tools", len(mcp_client.tools)) - except Exception as e: - logger.warning("MCP client failed to start: %s — continuing without MCP", e) + # 5. Load MCP config template (servers are started per-session) + if settings.mcp_config_path: + config_path = pathlib.Path(settings.mcp_config_path) + if not config_path.is_absolute(): + config_path = pathlib.Path(__file__).resolve().parent.parent / settings.mcp_config_path + mcp_registry._config_path = config_path + elif settings.mcp_server_command: + # Legacy: create a synthetic config from env vars + from .mcp.config import MCPConfigFile, MCPServerConfig + mcp_registry._config = MCPConfigFile(mcpServers={ + "default": MCPServerConfig( + command=settings.mcp_server_command, + args=list(settings.mcp_server_args), + ) + }) + mcp_registry.load_config() - # 6. Initialize orchestrator - orchestrator = OrchestratorEngine( - model_adapter=model_adapter, - context_engine=context_engine, - mcp_client=mcp_client, - memory_store=memory_store, - sse_emitter=sse_emitter, - ) - - # 7. Wire dependencies into API routes + # 6. Wire dependencies (orchestrator is created per-message with session's MCP) set_dependencies( storage=redis_storage, - orchestrator=orchestrator, - sse_emitter=sse_emitter, + model_adapter=model_adapter, context_engine=context_engine, memory_store=memory_store, + sse_emitter=sse_emitter, + mcp_registry=mcp_registry, ) logger.info("All systems initialized. Serving on %s:%d", settings.host, settings.port) @@ -95,7 +97,7 @@ async def lifespan(app: FastAPI): # Shutdown logger.info("Shutting down...") - await mcp_client.stop() + await mcp_registry.stop_all() await redis_storage.disconnect() logger.info("Shutdown complete.") @@ -106,7 +108,6 @@ app = FastAPI( lifespan=lifespan, ) -# CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], @@ -115,23 +116,19 @@ app.add_middleware( allow_headers=["*"], ) -# Mount API routes app.include_router(router, prefix="/api/v1") -# Health check @app.get("/health") async def health() -> dict[str, str]: return {"status": "ok", "service": settings.service_name} -# Root redirect @app.get("/") async def root(): return RedirectResponse(url="/dashboard/") -# Dashboard static files (mounted AFTER API routes) _dashboard_dir = pathlib.Path(__file__).resolve().parent.parent / "dashboard" if _dashboard_dir.is_dir(): app.mount("/dashboard", StaticFiles(directory=str(_dashboard_dir), html=True), name="dashboard") diff --git a/src/mcp/__init__.py b/src/mcp/__init__.py index aea2731..d6059b4 100644 --- a/src/mcp/__init__.py +++ b/src/mcp/__init__.py @@ -1,3 +1,5 @@ -from .client import MCPClient +from .client import MCPClient, MCPClientError +from .manager import MCPManager +from .registry import MCPRegistry -__all__ = ["MCPClient"] +__all__ = ["MCPClient", "MCPClientError", "MCPManager", "MCPRegistry"] diff --git a/src/mcp/__pycache__/__init__.cpython-312.pyc b/src/mcp/__pycache__/__init__.cpython-312.pyc deleted file mode 100644 index 36a1c73..0000000 Binary files a/src/mcp/__pycache__/__init__.cpython-312.pyc and /dev/null differ diff --git a/src/mcp/__pycache__/client.cpython-312.pyc b/src/mcp/__pycache__/client.cpython-312.pyc deleted file mode 100644 index 3c76a86..0000000 Binary files a/src/mcp/__pycache__/client.cpython-312.pyc and /dev/null differ diff --git a/src/mcp/client.py b/src/mcp/client.py index f88354e..97c4ed6 100644 --- a/src/mcp/client.py +++ b/src/mcp/client.py @@ -33,7 +33,9 @@ class MCPClient: timeout: float | None = None, startup_timeout: float | None = None, env: dict[str, str] | None = None, + name: str = "mcp", ) -> None: + self.name = name self._command = command or settings.mcp_server_command self._args = args if args is not None else list(settings.mcp_server_args) self._timeout = timeout or settings.mcp_timeout_seconds @@ -64,7 +66,7 @@ class MCPClient: logger.warning("No MCP server command configured — skipping start") return - logger.info("Starting MCP server: %s %s", self._command, self._args) + logger.info("Starting MCP server [%s]: %s %s", self.name, self._command, self._args) self._process = await asyncio.create_subprocess_exec( self._command, *self._args, @@ -287,5 +289,5 @@ class MCPClient: name=name, description=t.get("description", ""), input_schema=t.get("inputSchema", {}), - server_name="mcp", + server_name=self.name, ) diff --git a/src/mcp/config.py b/src/mcp/config.py new file mode 100644 index 0000000..2df047f --- /dev/null +++ b/src/mcp/config.py @@ -0,0 +1,55 @@ +"""Multi-MCP configuration — parses mcp.json files (Claude Code format).""" + +from __future__ import annotations + +import json +import logging +from pathlib import Path +from typing import Any + +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + + +class MCPServerConfig(BaseModel): + """Configuration for a single MCP server.""" + + command: str + args: list[str] = Field(default_factory=list) + env: dict[str, str] = Field(default_factory=dict) + timeout: float = 30.0 + startup_timeout: float = 10.0 + + +class MCPConfigFile(BaseModel): + """Root config — mirrors Claude Code's .mcp.json format.""" + + mcpServers: dict[str, MCPServerConfig] = Field(default_factory=dict) + + +def load_mcp_config(path: str | Path) -> MCPConfigFile: + """Read and validate a mcp.json config file. + + Returns an empty config if the file doesn't exist. + Raises on parse/validation errors. + """ + config_path = Path(path) + if not config_path.is_file(): + logger.warning("MCP config not found at %s — no servers configured", config_path) + return MCPConfigFile() + + try: + raw = json.loads(config_path.read_text(encoding="utf-8")) + config = MCPConfigFile.model_validate(raw) + logger.info( + "Loaded MCP config from %s — %d server(s): %s", + config_path, + len(config.mcpServers), + ", ".join(config.mcpServers.keys()), + ) + return config + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON in {config_path}: {e}") from e + except Exception as e: + raise ValueError(f"Invalid MCP config at {config_path}: {e}") from e diff --git a/src/mcp/manager.py b/src/mcp/manager.py new file mode 100644 index 0000000..5b66632 --- /dev/null +++ b/src/mcp/manager.py @@ -0,0 +1,289 @@ +"""MCPManager — aggregates multiple MCP servers with namespaced tools. + +Sits between agents and MCPClient instances. Routes tool calls to the +correct server, handles hot-reload, and isolates failures. +""" + +from __future__ import annotations + +import asyncio +import logging +from pathlib import Path +from typing import Any + +from ..models.tools import ToolDefinition +from .client import MCPClient, MCPClientError +from .config import MCPServerConfig, load_mcp_config + +logger = logging.getLogger(__name__) + + +class MCPManager: + """Manages multiple MCP servers with unified tool access. + + Exposes the same interface as MCPClient (tools, is_running, + call_tool, get_tool_definitions) so the orchestrator doesn't + need to know if it's talking to one server or many. + """ + + def __init__(self, config_path: str | Path | None = None) -> None: + self._config_path = Path(config_path) if config_path else None + self._clients: dict[str, MCPClient] = {} + self._tool_index: dict[str, str] = {} # namespaced_name → server_name + self._single_server_mode = False + + # ------------------------------------------------------------------ + # Public interface (mirrors MCPClient) + # ------------------------------------------------------------------ + + @property + def tools(self) -> dict[str, ToolDefinition]: + """Combined tool registry across all servers.""" + result: dict[str, ToolDefinition] = {} + for server_name, client in self._clients.items(): + for tool_name, tool_def in client.tools.items(): + ns_name = self._namespace(server_name, tool_name) + result[ns_name] = ToolDefinition( + name=ns_name, + description=tool_def.description, + input_schema=tool_def.input_schema, + server_name=server_name, + ) + return result + + @property + def is_running(self) -> bool: + return any(c.is_running for c in self._clients.values()) + + def get_tool_definitions(self) -> list[dict[str, Any]]: + """Aggregated tool definitions for model adapters.""" + definitions: list[dict[str, Any]] = [] + for server_name, client in self._clients.items(): + if not client.is_running: + continue + for tool in client.get_tool_definitions(): + ns_name = self._namespace(server_name, tool["name"]) + definitions.append({ + "name": ns_name, + "description": f"[{server_name}] {tool.get('description', '')}", + "input_schema": tool.get("input_schema", {}), + }) + return definitions + + async def call_tool( + self, namespaced_name: str, arguments: dict[str, Any] + ) -> dict[str, Any]: + """Route a tool call to the correct MCP server.""" + server_name, raw_name = self._resolve_tool(namespaced_name) + + client = self._clients.get(server_name) + if not client: + raise MCPClientError(f"MCP server '{server_name}' not found") + if not client.is_running: + raise MCPClientError(f"MCP server '{server_name}' is not running") + + return await client.call_tool(raw_name, arguments) + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + async def start(self) -> dict[str, Any]: + """Start all configured MCP servers. + + If a config_path is set, reads from it. Otherwise uses any + manually added clients. + """ + if self._config_path: + config = load_mcp_config(self._config_path) + for name, server_cfg in config.mcpServers.items(): + if name not in self._clients: + self._clients[name] = self._create_client(name, server_cfg) + + # Detect single-server mode for backward compat (no namespacing) + self._single_server_mode = len(self._clients) == 1 + + results: dict[str, Any] = {} + start_tasks = [] + + for name, client in self._clients.items(): + start_tasks.append(self._start_client(name, client)) + + completed = await asyncio.gather(*start_tasks, return_exceptions=True) + for (name, _), result in zip(self._clients.items(), completed): + if isinstance(result, Exception): + results[name] = {"status": "failed", "error": str(result)} + else: + results[name] = result + + self._rebuild_tool_index() + + total_tools = len(self._tool_index) + running = sum(1 for c in self._clients.values() if c.is_running) + logger.info( + "MCPManager started: %d/%d servers running, %d tools available", + running, len(self._clients), total_tools, + ) + + return results + + async def stop(self) -> None: + """Stop all MCP servers.""" + stop_tasks = [client.stop() for client in self._clients.values()] + await asyncio.gather(*stop_tasks, return_exceptions=True) + self._clients.clear() + self._tool_index.clear() + logger.info("MCPManager stopped all servers") + + async def reload_config(self) -> dict[str, Any]: + """Hot-reload: re-read config, start new servers, stop removed ones.""" + if not self._config_path: + return {"error": "No config path set"} + + config = load_mcp_config(self._config_path) + new_names = set(config.mcpServers.keys()) + current_names = set(self._clients.keys()) + + to_add = new_names - current_names + to_remove = current_names - new_names + to_keep = new_names & current_names + + summary: dict[str, Any] = {"added": [], "removed": [], "kept": []} + + # Stop removed servers + for name in to_remove: + client = self._clients.pop(name) + await client.stop() + summary["removed"].append(name) + logger.info("Removed MCP server: %s", name) + + # Start new servers + for name in to_add: + server_cfg = config.mcpServers[name] + client = self._create_client(name, server_cfg) + self._clients[name] = client + result = await self._start_client(name, client) + summary["added"].append({"name": name, **result}) + logger.info("Added MCP server: %s", name) + + # Check if kept servers need restart (config changed) + for name in to_keep: + new_cfg = config.mcpServers[name] + client = self._clients[name] + if self._config_changed(client, new_cfg): + await client.stop() + new_client = self._create_client(name, new_cfg) + self._clients[name] = new_client + result = await self._start_client(name, new_client) + summary["kept"].append({"name": name, "restarted": True, **result}) + logger.info("Restarted MCP server (config changed): %s", name) + else: + summary["kept"].append({"name": name, "restarted": False}) + + self._single_server_mode = len(self._clients) == 1 + self._rebuild_tool_index() + + return summary + + def add_client(self, name: str, client: MCPClient) -> None: + """Manually add a client (used for legacy single-server mode).""" + self._clients[name] = client + self._single_server_mode = len(self._clients) == 1 + + def get_status(self) -> dict[str, Any]: + """Return status of all MCP servers.""" + servers: list[dict[str, Any]] = [] + for name, client in self._clients.items(): + servers.append({ + "name": name, + "running": client.is_running, + "tools_count": len(client.tools), + "tools": list(client.tools.keys()), + }) + return { + "config_path": str(self._config_path) if self._config_path else None, + "single_server_mode": self._single_server_mode, + "total_servers": len(self._clients), + "running_servers": sum(1 for c in self._clients.values() if c.is_running), + "total_tools": len(self._tool_index), + "servers": servers, + } + + # ------------------------------------------------------------------ + # Internals + # ------------------------------------------------------------------ + + def _namespace(self, server_name: str, tool_name: str) -> str: + """Build namespaced tool name. Skip prefix in single-server mode.""" + if self._single_server_mode: + return tool_name + return f"{server_name}.{tool_name}" + + def _resolve_tool(self, namespaced_name: str) -> tuple[str, str]: + """Resolve a namespaced tool name to (server_name, raw_tool_name).""" + # Direct lookup in index + if namespaced_name in self._tool_index: + server_name = self._tool_index[namespaced_name] + raw_name = namespaced_name + if not self._single_server_mode and "." in namespaced_name: + raw_name = namespaced_name.split(".", 1)[1] + return server_name, raw_name + + # Try splitting on first dot + if "." in namespaced_name: + server_name, raw_name = namespaced_name.split(".", 1) + if server_name in self._clients: + return server_name, raw_name + + # Fallback: search all servers for the bare name + for server_name, client in self._clients.items(): + if namespaced_name in client.tools: + return server_name, namespaced_name + + raise MCPClientError( + f"Tool '{namespaced_name}' not found in any MCP server. " + f"Available: {list(self._tool_index.keys())[:20]}" + ) + + def _rebuild_tool_index(self) -> None: + """Rebuild the unified tool index from all clients.""" + self._tool_index.clear() + for server_name, client in self._clients.items(): + for tool_name in client.tools: + ns_name = self._namespace(server_name, tool_name) + self._tool_index[ns_name] = server_name + + @staticmethod + def _create_client(name: str, cfg: MCPServerConfig) -> MCPClient: + return MCPClient( + name=name, + command=cfg.command, + args=cfg.args, + timeout=cfg.timeout, + startup_timeout=cfg.startup_timeout, + env=cfg.env, + ) + + @staticmethod + async def _start_client(name: str, client: MCPClient) -> dict[str, Any]: + try: + await client.start() + return { + "status": "running", + "tools_count": len(client.tools), + } + except Exception as e: + logger.error("Failed to start MCP server [%s]: %s", name, e) + return { + "status": "failed", + "error": str(e), + } + + @staticmethod + def _config_changed(client: MCPClient, new_cfg: MCPServerConfig) -> bool: + """Check if a server's config has changed (needs restart).""" + return ( + client._command != new_cfg.command + or client._args != new_cfg.args + or client._timeout != new_cfg.timeout + ) diff --git a/src/mcp/registry.py b/src/mcp/registry.py new file mode 100644 index 0000000..5e6d047 --- /dev/null +++ b/src/mcp/registry.py @@ -0,0 +1,135 @@ +"""Per-session MCP registry. + +Each session gets its own MCPManager with project-specific env vars. +The global mcp.json defines WHAT servers to run. The session's mcp_env +defines the project-specific variables (ACAI_WEB_URL, etc.). +""" + +from __future__ import annotations + +import logging +from pathlib import Path +from typing import Any + +from .client import MCPClient +from .config import MCPConfigFile, load_mcp_config +from .manager import MCPManager + +logger = logging.getLogger(__name__) + + +class MCPRegistry: + """Manages per-session MCPManager instances. + + Uses a global mcp.json as template. Each session gets its own + set of MCP subprocesses with session-specific env vars merged in. + """ + + def __init__(self, config_path: str | Path | None = None) -> None: + self._config_path = Path(config_path) if config_path else None + self._config: MCPConfigFile | None = None + self._sessions: dict[str, MCPManager] = {} # session_id → MCPManager + + def load_config(self) -> None: + """Load the global MCP config template.""" + if self._config_path: + self._config = load_mcp_config(self._config_path) + logger.info( + "MCP registry loaded template: %d server(s) — %s", + len(self._config.mcpServers), + ", ".join(self._config.mcpServers.keys()), + ) + else: + self._config = MCPConfigFile() + logger.info("MCP registry: no config path, no servers") + + @property + def has_config(self) -> bool: + return self._config is not None and len(self._config.mcpServers) > 0 + + @property + def server_names(self) -> list[str]: + if not self._config: + return [] + return list(self._config.mcpServers.keys()) + + async def create_for_session( + self, + session_id: str, + mcp_env: dict[str, str] | None = None, + ) -> MCPManager: + """Create and start MCP servers for a session. + + The global config defines the servers. The mcp_env overrides + are merged into each server's env (so ACAI_WEB_URL etc. are + project-specific). + """ + # Clean up existing if any + await self.destroy_for_session(session_id) + + if not self._config or not self._config.mcpServers: + # No MCP configured — return empty manager + manager = MCPManager() + self._sessions[session_id] = manager + return manager + + manager = MCPManager() + + for server_name, server_cfg in self._config.mcpServers.items(): + # Merge: server-defined env + session-specific env + merged_env = {**server_cfg.env, **(mcp_env or {})} + + client = MCPClient( + name=server_name, + command=server_cfg.command, + args=server_cfg.args, + timeout=server_cfg.timeout, + startup_timeout=server_cfg.startup_timeout, + env=merged_env, + ) + manager.add_client(server_name, client) + + results = await manager.start() + self._sessions[session_id] = manager + + logger.info( + "MCP started for session %s: %s", + session_id[:12], + {k: v.get("status") for k, v in results.items()}, + ) + + return manager + + async def destroy_for_session(self, session_id: str) -> None: + """Stop and clean up MCP servers for a session.""" + manager = self._sessions.pop(session_id, None) + if manager: + await manager.stop() + logger.info("MCP stopped for session %s", session_id[:12]) + + def get_for_session(self, session_id: str) -> MCPManager | None: + """Get the MCPManager for a session, if any.""" + return self._sessions.get(session_id) + + async def stop_all(self) -> None: + """Stop all sessions' MCP servers (shutdown).""" + for sid in list(self._sessions.keys()): + await self.destroy_for_session(sid) + logger.info("MCP registry: all sessions stopped") + + def get_status(self) -> dict[str, Any]: + """Global status of the registry.""" + sessions_status: list[dict[str, Any]] = [] + for sid, manager in self._sessions.items(): + sessions_status.append({ + "session_id": sid[:12], + "running": manager.is_running, + "tools_count": len(manager.tools), + }) + + return { + "config_path": str(self._config_path) if self._config_path else None, + "template_servers": self.server_names, + "active_sessions": len(self._sessions), + "sessions": sessions_status, + } diff --git a/src/memory/__pycache__/__init__.cpython-312.pyc b/src/memory/__pycache__/__init__.cpython-312.pyc deleted file mode 100644 index 3661d7a..0000000 Binary files a/src/memory/__pycache__/__init__.cpython-312.pyc and /dev/null differ diff --git a/src/memory/__pycache__/store.cpython-312.pyc b/src/memory/__pycache__/store.cpython-312.pyc deleted file mode 100644 index 35fe2c8..0000000 Binary files a/src/memory/__pycache__/store.cpython-312.pyc and /dev/null differ diff --git a/src/models/__pycache__/__init__.cpython-312.pyc b/src/models/__pycache__/__init__.cpython-312.pyc deleted file mode 100644 index 74fb6df..0000000 Binary files a/src/models/__pycache__/__init__.cpython-312.pyc and /dev/null differ diff --git a/src/models/__pycache__/agent.cpython-312.pyc b/src/models/__pycache__/agent.cpython-312.pyc deleted file mode 100644 index 500fa55..0000000 Binary files a/src/models/__pycache__/agent.cpython-312.pyc and /dev/null differ diff --git a/src/models/__pycache__/artifacts.cpython-312.pyc b/src/models/__pycache__/artifacts.cpython-312.pyc deleted file mode 100644 index efe4a23..0000000 Binary files a/src/models/__pycache__/artifacts.cpython-312.pyc and /dev/null differ diff --git a/src/models/__pycache__/context.cpython-312.pyc b/src/models/__pycache__/context.cpython-312.pyc deleted file mode 100644 index 3fbf22f..0000000 Binary files a/src/models/__pycache__/context.cpython-312.pyc and /dev/null differ diff --git a/src/models/__pycache__/session.cpython-312.pyc b/src/models/__pycache__/session.cpython-312.pyc deleted file mode 100644 index 1145a36..0000000 Binary files a/src/models/__pycache__/session.cpython-312.pyc and /dev/null differ diff --git a/src/models/__pycache__/tools.cpython-312.pyc b/src/models/__pycache__/tools.cpython-312.pyc deleted file mode 100644 index 62313b6..0000000 Binary files a/src/models/__pycache__/tools.cpython-312.pyc and /dev/null differ diff --git a/src/orchestrator/__pycache__/__init__.cpython-312.pyc b/src/orchestrator/__pycache__/__init__.cpython-312.pyc deleted file mode 100644 index b1c2d37..0000000 Binary files a/src/orchestrator/__pycache__/__init__.cpython-312.pyc and /dev/null differ diff --git a/src/orchestrator/__pycache__/engine.cpython-312.pyc b/src/orchestrator/__pycache__/engine.cpython-312.pyc deleted file mode 100644 index 1302896..0000000 Binary files a/src/orchestrator/__pycache__/engine.cpython-312.pyc and /dev/null differ diff --git a/src/orchestrator/__pycache__/router.cpython-312.pyc b/src/orchestrator/__pycache__/router.cpython-312.pyc deleted file mode 100644 index 4bca5e0..0000000 Binary files a/src/orchestrator/__pycache__/router.cpython-312.pyc and /dev/null differ diff --git a/src/orchestrator/agents/__pycache__/__init__.cpython-312.pyc b/src/orchestrator/agents/__pycache__/__init__.cpython-312.pyc deleted file mode 100644 index cc0d921..0000000 Binary files a/src/orchestrator/agents/__pycache__/__init__.cpython-312.pyc and /dev/null differ diff --git a/src/orchestrator/agents/__pycache__/base.cpython-312.pyc b/src/orchestrator/agents/__pycache__/base.cpython-312.pyc deleted file mode 100644 index 8765bdd..0000000 Binary files a/src/orchestrator/agents/__pycache__/base.cpython-312.pyc and /dev/null differ diff --git a/src/orchestrator/agents/__pycache__/coder.cpython-312.pyc b/src/orchestrator/agents/__pycache__/coder.cpython-312.pyc deleted file mode 100644 index c19e409..0000000 Binary files a/src/orchestrator/agents/__pycache__/coder.cpython-312.pyc and /dev/null differ diff --git a/src/orchestrator/agents/__pycache__/collector.cpython-312.pyc b/src/orchestrator/agents/__pycache__/collector.cpython-312.pyc deleted file mode 100644 index 7395236..0000000 Binary files a/src/orchestrator/agents/__pycache__/collector.cpython-312.pyc and /dev/null differ diff --git a/src/orchestrator/agents/__pycache__/planner.cpython-312.pyc b/src/orchestrator/agents/__pycache__/planner.cpython-312.pyc deleted file mode 100644 index 1a4e9b7..0000000 Binary files a/src/orchestrator/agents/__pycache__/planner.cpython-312.pyc and /dev/null differ diff --git a/src/orchestrator/agents/__pycache__/reviewer.cpython-312.pyc b/src/orchestrator/agents/__pycache__/reviewer.cpython-312.pyc deleted file mode 100644 index 487971d..0000000 Binary files a/src/orchestrator/agents/__pycache__/reviewer.cpython-312.pyc and /dev/null differ diff --git a/src/orchestrator/agents/base.py b/src/orchestrator/agents/base.py index 3da4724..4cf5da1 100644 --- a/src/orchestrator/agents/base.py +++ b/src/orchestrator/agents/base.py @@ -10,7 +10,7 @@ from typing import Any, AsyncIterator from ...adapters.base import ModelAdapter, ModelConfig, StreamChunk from ...context.engine import ContextEngine -from ...mcp.client import MCPClient +from ...mcp.manager import MCPManager from ...memory.store import MemoryStore from ...models.agent import AgentProfile from ...models.artifacts import ArtifactSummary @@ -29,7 +29,7 @@ class BaseAgent: profile: AgentProfile, model_adapter: ModelAdapter, context_engine: ContextEngine, - mcp_client: MCPClient, + mcp_client: MCPManager, memory_store: MemoryStore, sse_emitter: SSEEmitter, ) -> None: diff --git a/src/orchestrator/engine.py b/src/orchestrator/engine.py index 909d47f..c5b23f1 100644 --- a/src/orchestrator/engine.py +++ b/src/orchestrator/engine.py @@ -13,7 +13,7 @@ from typing import Any from ..adapters.base import ModelAdapter from ..config import settings from ..context.engine import ContextEngine -from ..mcp.client import MCPClient +from ..mcp.manager import MCPManager from ..memory.store import MemoryStore from ..models.agent import AgentRole from ..models.session import SessionState, SessionStatus, TaskStatus @@ -34,7 +34,7 @@ class OrchestratorEngine: self, model_adapter: ModelAdapter, context_engine: ContextEngine, - mcp_client: MCPClient, + mcp_client: MCPManager, memory_store: MemoryStore, sse_emitter: SSEEmitter, ) -> None: diff --git a/src/storage/__pycache__/__init__.cpython-312.pyc b/src/storage/__pycache__/__init__.cpython-312.pyc deleted file mode 100644 index ccec8cd..0000000 Binary files a/src/storage/__pycache__/__init__.cpython-312.pyc and /dev/null differ diff --git a/src/storage/__pycache__/redis.cpython-312.pyc b/src/storage/__pycache__/redis.cpython-312.pyc deleted file mode 100644 index 956679c..0000000 Binary files a/src/storage/__pycache__/redis.cpython-312.pyc and /dev/null differ diff --git a/src/streaming/__pycache__/__init__.cpython-312.pyc b/src/streaming/__pycache__/__init__.cpython-312.pyc deleted file mode 100644 index 82807bd..0000000 Binary files a/src/streaming/__pycache__/__init__.cpython-312.pyc and /dev/null differ diff --git a/src/streaming/__pycache__/sse.cpython-312.pyc b/src/streaming/__pycache__/sse.cpython-312.pyc deleted file mode 100644 index 7d0b974..0000000 Binary files a/src/streaming/__pycache__/sse.cpython-312.pyc and /dev/null differ