Add .gitignore, remove __pycache__ from tracking, and update MCP/orchestrator modules

- Add .gitignore to exclude .env, __pycache__, node_modules, and IDE files
- Remove all __pycache__ bytecode files from version control
- Add MCP config files (mcp.json, mcp.json.example)
- Add MCP manager, registry, and config modules
- Update routes, orchestrator engine, and agent base with latest changes

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Jordan
2026-04-01 23:47:26 +01:00
parent 91cfdaee72
commit 264acc90b4
49 changed files with 749 additions and 68 deletions

28
.gitignore vendored Normal file
View File

@@ -0,0 +1,28 @@
# Environment variables
.env
.env.local
.env.*.local
# Python
__pycache__/
*.pyc
*.pyo
*.egg-info/
dist/
build/
.venv/
venv/
# Node
node_modules/
npm-debug.log*
# OS
.DS_Store
Thumbs.db
# IDE
.vscode/
.idea/
*.swp
*.swo

105
README.md
View File

@@ -227,20 +227,93 @@ mensaje → planner → [step₁ → step₂ → ... → stepₙ] → reviewer
## MCP (Model Context Protocol)
Cliente stdio que se conecta a un servidor MCP al arrancar:
Arquitectura **per-session**: el microservicio corre como un Docker único, y cada sesión arranca sus propios subprocesos MCP con las env vars del proyecto del usuario.
### Modelo de operación
```
Docker (microservicio — uno solo corriendo)
└── mcp.json ← template: QUÉ servers arrancar (global)
└── mcp-server/ ← código MCP (baked-in)
POST /sessions { mcp_env: { ACAI_WEB_URL: "https://tienda.com", ... } }
→ Arranca subprocesos MCP con esas env vars
→ Session aislada con su propio MCPManager
→ 33 tools descubiertas automáticamente
DELETE /sessions/{id}
→ Mata los subprocesos MCP de esa sesión
```
### Configuración
**1. Template global** (`mcp.json` en la raíz — define QUÉ servers existen):
```json
{
"mcpServers": {
"acai-code": {
"command": "node",
"args": ["mcp-server/stdio.js"],
"env": {},
"timeout": 30,
"startup_timeout": 10
}
}
}
```
```bash
# En .env
AGENTIC_MCP_SERVER_COMMAND=node
AGENTIC_MCP_SERVER_ARGS=["mcp-server/stdio.js"]
# Variables del MCP server (se heredan al subproceso)
ACAI_WEB_URL=http://localhost:8080
ACAI_WEBSITE=mi-sitio
ACAI_PROJECT_DIR=/ruta/al/proyecto
AGENTIC_MCP_CONFIG_PATH=mcp.json
```
El cliente descubre tools automáticamente vía `tools/list`, y los agentes las usan durante la ejecución. Los resultados de tools **nunca** entran al contexto como raw output — se resumen como artifacts.
**2. Per-session env vars** (cada usuario/proyecto pasa las suyas al crear sesión):
```bash
curl -X POST http://localhost:8001/api/v1/sessions \
-H "Content-Type: application/json" \
-d '{
"project_profile": {"name": "tienda-online"},
"mcp_env": {
"ACAI_WEB_URL": "https://superadmin_tienda.forge.acaisuite.com/",
"ACAI_WEBSITE": "tienda.com",
"ACAI_PROJECT_DIR": "/projects/tienda"
}
}'
```
Las env vars de `mcp_env` se fusionan con las del template y se pasan al subproceso MCP. El MCP server lee el token desde el `.acai` del proyecto automáticamente.
**3. Legacy** (un solo server global, sin `mcp.json`):
```bash
AGENTIC_MCP_SERVER_COMMAND=node
AGENTIC_MCP_SERVER_ARGS=["mcp-server/stdio.js"]
```
### Tool namespacing
En modo multi-server, los tools se namespean: `acai-code.compile_module`, `filesystem.read_file`. En modo single-server los tools mantienen su nombre original.
### API de gestión MCP
```bash
# Ver estado de todos los MCP servers activos (por sesión)
curl http://localhost:8001/api/v1/mcp/status
# Hot-reload del template (re-lee mcp.json, no afecta sesiones activas)
curl -X POST http://localhost:8001/api/v1/mcp/reload
```
### Ciclo de vida
1. `POST /sessions` con `mcp_env` → arranca subprocesos MCP para esa sesión
2. `POST /sessions/{id}/messages` → el agente usa los tools del MCP de esa sesión
3. Si el server se reinicia y la sesión sigue en Redis → el MCP se reconecta automáticamente al siguiente mensaje
4. `DELETE /sessions/{id}` → mata subprocesos MCP de esa sesión
Los resultados de tools **nunca** entran al contexto como raw output — se resumen como artifacts.
## Configuración
@@ -261,8 +334,9 @@ Variables de entorno con prefijo `AGENTIC_`:
| `AGENTIC_MAX_EXECUTION_STEPS` | `25` | Max steps por task |
| `AGENTIC_MAX_EXECUTION_TIMEOUT_SECONDS` | `300` | Timeout global (5 min) |
| `AGENTIC_SUBAGENT_MAX_STEPS` | `10` | Max iterations por subagent |
| `AGENTIC_MCP_SERVER_COMMAND` | — | Comando del servidor MCP |
| `AGENTIC_MCP_SERVER_ARGS` | `[]` | Argumentos del servidor MCP |
| `AGENTIC_MCP_CONFIG_PATH` | — | Ruta a `mcp.json` (multi-MCP per-session) |
| `AGENTIC_MCP_SERVER_COMMAND` | — | Legacy: comando del servidor MCP (single) |
| `AGENTIC_MCP_SERVER_ARGS` | `[]` | Legacy: argumentos del servidor MCP |
| `AGENTIC_MCP_TIMEOUT_SECONDS` | `30` | Timeout por tool call |
| `AGENTIC_DEBUG` | `false` | Logging verbose |
@@ -302,8 +376,11 @@ agentic:memory:_type:{type} → Set de doc IDs por tipo
│ │ ├── base.py # ModelAdapter interface
│ │ ├── claude_adapter.py # Anthropic Claude (streaming)
│ │ └── openai_adapter.py # OpenAI GPT (streaming)
│ ├── mcp/ # MCP client
│ │ ── client.py # stdio transport, tool registry
│ ├── mcp/ # MCP (per-session, multi-server)
│ │ ── client.py # stdio transport, tool registry
│ │ ├── config.py # mcp.json parser (Pydantic)
│ │ ├── manager.py # MCPManager (aggregates tools, routes calls)
│ │ └── registry.py # Per-session MCP lifecycle
│ ├── orchestrator/ # Agent orchestration
│ │ ├── engine.py # Pipeline + error recovery + timeout
│ │ ├── router.py # Step-to-agent routing
@@ -328,6 +405,8 @@ agentic:memory:_type:{type} → Set de doc IDs por tipo
│ └── components/ # Sidebar, chat, event log, inspector, timeline
├── mcp-server/ # Acai MCP server (Node.js, stdio)
├── docs/ # Knowledge base documents (*.md)
├── mcp.json # MCP server template (qué servers arrancar)
├── mcp.json.example # Ejemplo con múltiples servers
├── Dockerfile
├── docker-compose.yml
├── requirements.txt

11
mcp.json Normal file
View File

@@ -0,0 +1,11 @@
{
"mcpServers": {
"acai-code": {
"command": "node",
"args": ["mcp-server/stdio.js"],
"env": {},
"timeout": 30,
"startup_timeout": 10
}
}
}

19
mcp.json.example Normal file
View File

@@ -0,0 +1,19 @@
{
"mcpServers": {
"acai-code": {
"command": "node",
"args": ["mcp-server/stdio.js"],
"env": {
"ACAI_WEB_URL": "http://localhost:8080",
"ACAI_WEBSITE": "mi-sitio",
"ACAI_PROJECT_DIR": "/ruta/al/proyecto"
},
"timeout": 30,
"startup_timeout": 10
},
"filesystem": {
"command": "npx",
"args": ["@modelcontextprotocol/server-filesystem", "/tmp"]
}
}
}

Binary file not shown.

View File

@@ -13,6 +13,7 @@ from pydantic import BaseModel, Field
from ..models.context import MemoryDocument, MemoryType
from ..models.session import SessionState, SessionStatus
from ..orchestrator.engine import OrchestratorEngine
from ..streaming.sse import EventType
logger = logging.getLogger(__name__)
@@ -28,6 +29,10 @@ class CreateSessionRequest(BaseModel):
project_profile: dict[str, Any] = Field(default_factory=dict)
immutable_rules: list[str] = Field(default_factory=list)
metadata: dict[str, Any] = Field(default_factory=dict)
mcp_env: dict[str, str] = Field(
default_factory=dict,
description="Per-project env vars for MCP servers (e.g. ACAI_WEB_URL, ACAI_PROJECT_DIR)",
)
class CreateSessionResponse(BaseModel):
@@ -59,32 +64,43 @@ _deps: dict[str, Any] = {}
def set_dependencies(
storage: Any,
orchestrator: Any,
model_adapter: Any,
context_engine: Any,
memory_store: Any,
sse_emitter: Any,
context_engine: Any = None,
memory_store: Any = None,
mcp_registry: Any,
) -> None:
_deps["storage"] = storage
_deps["orchestrator"] = orchestrator
_deps["sse"] = sse_emitter
if context_engine:
_deps["model_adapter"] = model_adapter
_deps["context_engine"] = context_engine
if memory_store:
_deps["memory_store"] = memory_store
_deps["sse"] = sse_emitter
_deps["mcp_registry"] = mcp_registry
def _get_storage():
return _deps["storage"]
def _get_orchestrator():
return _deps["orchestrator"]
def _get_sse():
return _deps["sse"]
def _get_mcp_registry():
return _deps["mcp_registry"]
def _build_orchestrator(mcp_manager) -> OrchestratorEngine:
"""Build an orchestrator with a session-specific MCPManager."""
return OrchestratorEngine(
model_adapter=_deps["model_adapter"],
context_engine=_deps["context_engine"],
mcp_client=mcp_manager,
memory_store=_deps["memory_store"],
sse_emitter=_deps["sse"],
)
# ------------------------------------------------------------------
# POST /sessions
# ------------------------------------------------------------------
@@ -97,8 +113,17 @@ async def create_session(body: CreateSessionRequest) -> CreateSessionResponse:
immutable_rules=body.immutable_rules,
metadata=body.metadata,
)
# Store mcp_env in session metadata for reconnection
if body.mcp_env:
session.metadata["mcp_env"] = body.mcp_env
await storage.create_session(session)
# Start per-session MCP servers with project-specific env
registry = _get_mcp_registry()
if registry.has_config:
await registry.create_for_session(session.session_id, body.mcp_env)
sse = _get_sse()
await sse.emit(
EventType.SESSION_CREATED,
@@ -126,7 +151,16 @@ async def send_message(
if not session:
raise HTTPException(status_code=404, detail="Session not found")
orchestrator = _get_orchestrator()
# Get or create session's MCP manager
registry = _get_mcp_registry()
mcp_manager = registry.get_for_session(session_id)
if not mcp_manager and registry.has_config:
# Reconnect MCP (e.g. after server restart)
mcp_env = session.metadata.get("mcp_env", {})
mcp_manager = await registry.create_for_session(session_id, mcp_env)
from ..mcp.manager import MCPManager
orchestrator = _build_orchestrator(mcp_manager or MCPManager())
if body.stream:
asyncio.create_task(_execute_and_persist(orchestrator, storage, session, body.message))
@@ -225,6 +259,10 @@ async def delete_session(session_id: str) -> dict[str, str]:
if not deleted:
raise HTTPException(status_code=404, detail="Session not found")
# Stop session's MCP servers
registry = _get_mcp_registry()
await registry.destroy_for_session(session_id)
sse = _get_sse()
sse.cleanup_session(session_id)
@@ -373,3 +411,28 @@ async def delete_knowledge(doc_id: str) -> dict[str, str]:
if not deleted:
raise HTTPException(status_code=404, detail="Document not found")
return {"status": "deleted", "id": doc_id}
# ------------------------------------------------------------------
# MCP Management
# ------------------------------------------------------------------
@router.get("/mcp/status")
async def mcp_status() -> dict[str, Any]:
"""Status of all MCP servers across sessions."""
registry = _get_mcp_registry()
return registry.get_status()
@router.post("/mcp/reload")
async def mcp_reload() -> dict[str, Any]:
"""Hot-reload MCP config template (does not affect running sessions)."""
registry = _get_mcp_registry()
try:
registry.load_config()
return {
"status": "reloaded",
"servers": registry.server_names,
}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))

View File

@@ -42,7 +42,8 @@ class Settings(BaseSettings):
working_context_max_items: int = 20
# --- MCP ---
mcp_server_command: str = ""
mcp_config_path: str = "" # Path to mcp.json; empty = legacy single-server mode
mcp_server_command: str = "" # Legacy: single server command
mcp_server_args: list[str] = Field(default_factory=list)
mcp_timeout_seconds: float = 30.0
mcp_startup_timeout_seconds: float = 10.0

View File

@@ -1,7 +1,10 @@
"""Agentic Microservice — FastAPI application entry point.
Wires together all components: Redis storage, model adapters, MCP client,
Wires together all components: Redis storage, model adapters, MCP registry,
context engine, orchestrator, and SSE streaming.
MCP servers are per-session: the global mcp.json defines WHAT servers
to run, and each session provides project-specific env vars.
"""
from __future__ import annotations
@@ -20,7 +23,7 @@ from .adapters.openai_adapter import OpenAIAdapter
from .api.routes import router, set_dependencies
from .config import settings
from .context.engine import ContextEngine
from .mcp.client import MCPClient
from .mcp.registry import MCPRegistry
from .memory.store import MemoryStore
from .orchestrator.engine import OrchestratorEngine
from .storage.redis import RedisStorage
@@ -34,8 +37,8 @@ logger = logging.getLogger(__name__)
# Global instances (initialized in lifespan)
redis_storage = RedisStorage()
mcp_client = MCPClient()
sse_emitter = SSEEmitter(redis_storage=redis_storage)
mcp_registry = MCPRegistry()
@asynccontextmanager
@@ -45,11 +48,9 @@ async def lifespan(app: FastAPI):
# 1. Connect Redis
await redis_storage.connect()
# Wire SSE emitter to Redis for event persistence (re-set after connect)
sse_emitter.set_storage(redis_storage)
# 2. Initialize model adapter (based on configured provider)
# 2. Initialize model adapter
if settings.default_model_provider == "openai":
model_adapter = OpenAIAdapter()
logger.info("Using OpenAI adapter (model: %s)", settings.default_model_id)
@@ -57,36 +58,37 @@ async def lifespan(app: FastAPI):
model_adapter = ClaudeAdapter()
logger.info("Using Claude adapter (model: %s)", settings.default_model_id)
# 3. Initialize memory store (uses same Redis connection)
# 3. Initialize memory store
memory_store = MemoryStore(redis_storage.client)
# 4. Initialize context engine (with memory store for knowledge base)
# 4. Initialize context engine
context_engine = ContextEngine(memory_store=memory_store)
# 5. Start MCP client (if configured)
if settings.mcp_server_command:
try:
await mcp_client.start()
logger.info("MCP client started with %d tools", len(mcp_client.tools))
except Exception as e:
logger.warning("MCP client failed to start: %s — continuing without MCP", e)
# 6. Initialize orchestrator
orchestrator = OrchestratorEngine(
model_adapter=model_adapter,
context_engine=context_engine,
mcp_client=mcp_client,
memory_store=memory_store,
sse_emitter=sse_emitter,
# 5. Load MCP config template (servers are started per-session)
if settings.mcp_config_path:
config_path = pathlib.Path(settings.mcp_config_path)
if not config_path.is_absolute():
config_path = pathlib.Path(__file__).resolve().parent.parent / settings.mcp_config_path
mcp_registry._config_path = config_path
elif settings.mcp_server_command:
# Legacy: create a synthetic config from env vars
from .mcp.config import MCPConfigFile, MCPServerConfig
mcp_registry._config = MCPConfigFile(mcpServers={
"default": MCPServerConfig(
command=settings.mcp_server_command,
args=list(settings.mcp_server_args),
)
})
mcp_registry.load_config()
# 7. Wire dependencies into API routes
# 6. Wire dependencies (orchestrator is created per-message with session's MCP)
set_dependencies(
storage=redis_storage,
orchestrator=orchestrator,
sse_emitter=sse_emitter,
model_adapter=model_adapter,
context_engine=context_engine,
memory_store=memory_store,
sse_emitter=sse_emitter,
mcp_registry=mcp_registry,
)
logger.info("All systems initialized. Serving on %s:%d", settings.host, settings.port)
@@ -95,7 +97,7 @@ async def lifespan(app: FastAPI):
# Shutdown
logger.info("Shutting down...")
await mcp_client.stop()
await mcp_registry.stop_all()
await redis_storage.disconnect()
logger.info("Shutdown complete.")
@@ -106,7 +108,6 @@ app = FastAPI(
lifespan=lifespan,
)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
@@ -115,23 +116,19 @@ app.add_middleware(
allow_headers=["*"],
)
# Mount API routes
app.include_router(router, prefix="/api/v1")
# Health check
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "ok", "service": settings.service_name}
# Root redirect
@app.get("/")
async def root():
return RedirectResponse(url="/dashboard/")
# Dashboard static files (mounted AFTER API routes)
_dashboard_dir = pathlib.Path(__file__).resolve().parent.parent / "dashboard"
if _dashboard_dir.is_dir():
app.mount("/dashboard", StaticFiles(directory=str(_dashboard_dir), html=True), name="dashboard")

View File

@@ -1,3 +1,5 @@
from .client import MCPClient
from .client import MCPClient, MCPClientError
from .manager import MCPManager
from .registry import MCPRegistry
__all__ = ["MCPClient"]
__all__ = ["MCPClient", "MCPClientError", "MCPManager", "MCPRegistry"]

View File

@@ -33,7 +33,9 @@ class MCPClient:
timeout: float | None = None,
startup_timeout: float | None = None,
env: dict[str, str] | None = None,
name: str = "mcp",
) -> None:
self.name = name
self._command = command or settings.mcp_server_command
self._args = args if args is not None else list(settings.mcp_server_args)
self._timeout = timeout or settings.mcp_timeout_seconds
@@ -64,7 +66,7 @@ class MCPClient:
logger.warning("No MCP server command configured — skipping start")
return
logger.info("Starting MCP server: %s %s", self._command, self._args)
logger.info("Starting MCP server [%s]: %s %s", self.name, self._command, self._args)
self._process = await asyncio.create_subprocess_exec(
self._command,
*self._args,
@@ -287,5 +289,5 @@ class MCPClient:
name=name,
description=t.get("description", ""),
input_schema=t.get("inputSchema", {}),
server_name="mcp",
server_name=self.name,
)

55
src/mcp/config.py Normal file
View File

@@ -0,0 +1,55 @@
"""Multi-MCP configuration — parses mcp.json files (Claude Code format)."""
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Any
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
class MCPServerConfig(BaseModel):
"""Configuration for a single MCP server."""
command: str
args: list[str] = Field(default_factory=list)
env: dict[str, str] = Field(default_factory=dict)
timeout: float = 30.0
startup_timeout: float = 10.0
class MCPConfigFile(BaseModel):
"""Root config — mirrors Claude Code's .mcp.json format."""
mcpServers: dict[str, MCPServerConfig] = Field(default_factory=dict)
def load_mcp_config(path: str | Path) -> MCPConfigFile:
"""Read and validate a mcp.json config file.
Returns an empty config if the file doesn't exist.
Raises on parse/validation errors.
"""
config_path = Path(path)
if not config_path.is_file():
logger.warning("MCP config not found at %s — no servers configured", config_path)
return MCPConfigFile()
try:
raw = json.loads(config_path.read_text(encoding="utf-8"))
config = MCPConfigFile.model_validate(raw)
logger.info(
"Loaded MCP config from %s%d server(s): %s",
config_path,
len(config.mcpServers),
", ".join(config.mcpServers.keys()),
)
return config
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in {config_path}: {e}") from e
except Exception as e:
raise ValueError(f"Invalid MCP config at {config_path}: {e}") from e

289
src/mcp/manager.py Normal file
View File

@@ -0,0 +1,289 @@
"""MCPManager — aggregates multiple MCP servers with namespaced tools.
Sits between agents and MCPClient instances. Routes tool calls to the
correct server, handles hot-reload, and isolates failures.
"""
from __future__ import annotations
import asyncio
import logging
from pathlib import Path
from typing import Any
from ..models.tools import ToolDefinition
from .client import MCPClient, MCPClientError
from .config import MCPServerConfig, load_mcp_config
logger = logging.getLogger(__name__)
class MCPManager:
"""Manages multiple MCP servers with unified tool access.
Exposes the same interface as MCPClient (tools, is_running,
call_tool, get_tool_definitions) so the orchestrator doesn't
need to know if it's talking to one server or many.
"""
def __init__(self, config_path: str | Path | None = None) -> None:
self._config_path = Path(config_path) if config_path else None
self._clients: dict[str, MCPClient] = {}
self._tool_index: dict[str, str] = {} # namespaced_name → server_name
self._single_server_mode = False
# ------------------------------------------------------------------
# Public interface (mirrors MCPClient)
# ------------------------------------------------------------------
@property
def tools(self) -> dict[str, ToolDefinition]:
"""Combined tool registry across all servers."""
result: dict[str, ToolDefinition] = {}
for server_name, client in self._clients.items():
for tool_name, tool_def in client.tools.items():
ns_name = self._namespace(server_name, tool_name)
result[ns_name] = ToolDefinition(
name=ns_name,
description=tool_def.description,
input_schema=tool_def.input_schema,
server_name=server_name,
)
return result
@property
def is_running(self) -> bool:
return any(c.is_running for c in self._clients.values())
def get_tool_definitions(self) -> list[dict[str, Any]]:
"""Aggregated tool definitions for model adapters."""
definitions: list[dict[str, Any]] = []
for server_name, client in self._clients.items():
if not client.is_running:
continue
for tool in client.get_tool_definitions():
ns_name = self._namespace(server_name, tool["name"])
definitions.append({
"name": ns_name,
"description": f"[{server_name}] {tool.get('description', '')}",
"input_schema": tool.get("input_schema", {}),
})
return definitions
async def call_tool(
self, namespaced_name: str, arguments: dict[str, Any]
) -> dict[str, Any]:
"""Route a tool call to the correct MCP server."""
server_name, raw_name = self._resolve_tool(namespaced_name)
client = self._clients.get(server_name)
if not client:
raise MCPClientError(f"MCP server '{server_name}' not found")
if not client.is_running:
raise MCPClientError(f"MCP server '{server_name}' is not running")
return await client.call_tool(raw_name, arguments)
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
async def start(self) -> dict[str, Any]:
"""Start all configured MCP servers.
If a config_path is set, reads from it. Otherwise uses any
manually added clients.
"""
if self._config_path:
config = load_mcp_config(self._config_path)
for name, server_cfg in config.mcpServers.items():
if name not in self._clients:
self._clients[name] = self._create_client(name, server_cfg)
# Detect single-server mode for backward compat (no namespacing)
self._single_server_mode = len(self._clients) == 1
results: dict[str, Any] = {}
start_tasks = []
for name, client in self._clients.items():
start_tasks.append(self._start_client(name, client))
completed = await asyncio.gather(*start_tasks, return_exceptions=True)
for (name, _), result in zip(self._clients.items(), completed):
if isinstance(result, Exception):
results[name] = {"status": "failed", "error": str(result)}
else:
results[name] = result
self._rebuild_tool_index()
total_tools = len(self._tool_index)
running = sum(1 for c in self._clients.values() if c.is_running)
logger.info(
"MCPManager started: %d/%d servers running, %d tools available",
running, len(self._clients), total_tools,
)
return results
async def stop(self) -> None:
"""Stop all MCP servers."""
stop_tasks = [client.stop() for client in self._clients.values()]
await asyncio.gather(*stop_tasks, return_exceptions=True)
self._clients.clear()
self._tool_index.clear()
logger.info("MCPManager stopped all servers")
async def reload_config(self) -> dict[str, Any]:
"""Hot-reload: re-read config, start new servers, stop removed ones."""
if not self._config_path:
return {"error": "No config path set"}
config = load_mcp_config(self._config_path)
new_names = set(config.mcpServers.keys())
current_names = set(self._clients.keys())
to_add = new_names - current_names
to_remove = current_names - new_names
to_keep = new_names & current_names
summary: dict[str, Any] = {"added": [], "removed": [], "kept": []}
# Stop removed servers
for name in to_remove:
client = self._clients.pop(name)
await client.stop()
summary["removed"].append(name)
logger.info("Removed MCP server: %s", name)
# Start new servers
for name in to_add:
server_cfg = config.mcpServers[name]
client = self._create_client(name, server_cfg)
self._clients[name] = client
result = await self._start_client(name, client)
summary["added"].append({"name": name, **result})
logger.info("Added MCP server: %s", name)
# Check if kept servers need restart (config changed)
for name in to_keep:
new_cfg = config.mcpServers[name]
client = self._clients[name]
if self._config_changed(client, new_cfg):
await client.stop()
new_client = self._create_client(name, new_cfg)
self._clients[name] = new_client
result = await self._start_client(name, new_client)
summary["kept"].append({"name": name, "restarted": True, **result})
logger.info("Restarted MCP server (config changed): %s", name)
else:
summary["kept"].append({"name": name, "restarted": False})
self._single_server_mode = len(self._clients) == 1
self._rebuild_tool_index()
return summary
def add_client(self, name: str, client: MCPClient) -> None:
"""Manually add a client (used for legacy single-server mode)."""
self._clients[name] = client
self._single_server_mode = len(self._clients) == 1
def get_status(self) -> dict[str, Any]:
"""Return status of all MCP servers."""
servers: list[dict[str, Any]] = []
for name, client in self._clients.items():
servers.append({
"name": name,
"running": client.is_running,
"tools_count": len(client.tools),
"tools": list(client.tools.keys()),
})
return {
"config_path": str(self._config_path) if self._config_path else None,
"single_server_mode": self._single_server_mode,
"total_servers": len(self._clients),
"running_servers": sum(1 for c in self._clients.values() if c.is_running),
"total_tools": len(self._tool_index),
"servers": servers,
}
# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------
def _namespace(self, server_name: str, tool_name: str) -> str:
"""Build namespaced tool name. Skip prefix in single-server mode."""
if self._single_server_mode:
return tool_name
return f"{server_name}.{tool_name}"
def _resolve_tool(self, namespaced_name: str) -> tuple[str, str]:
"""Resolve a namespaced tool name to (server_name, raw_tool_name)."""
# Direct lookup in index
if namespaced_name in self._tool_index:
server_name = self._tool_index[namespaced_name]
raw_name = namespaced_name
if not self._single_server_mode and "." in namespaced_name:
raw_name = namespaced_name.split(".", 1)[1]
return server_name, raw_name
# Try splitting on first dot
if "." in namespaced_name:
server_name, raw_name = namespaced_name.split(".", 1)
if server_name in self._clients:
return server_name, raw_name
# Fallback: search all servers for the bare name
for server_name, client in self._clients.items():
if namespaced_name in client.tools:
return server_name, namespaced_name
raise MCPClientError(
f"Tool '{namespaced_name}' not found in any MCP server. "
f"Available: {list(self._tool_index.keys())[:20]}"
)
def _rebuild_tool_index(self) -> None:
"""Rebuild the unified tool index from all clients."""
self._tool_index.clear()
for server_name, client in self._clients.items():
for tool_name in client.tools:
ns_name = self._namespace(server_name, tool_name)
self._tool_index[ns_name] = server_name
@staticmethod
def _create_client(name: str, cfg: MCPServerConfig) -> MCPClient:
return MCPClient(
name=name,
command=cfg.command,
args=cfg.args,
timeout=cfg.timeout,
startup_timeout=cfg.startup_timeout,
env=cfg.env,
)
@staticmethod
async def _start_client(name: str, client: MCPClient) -> dict[str, Any]:
try:
await client.start()
return {
"status": "running",
"tools_count": len(client.tools),
}
except Exception as e:
logger.error("Failed to start MCP server [%s]: %s", name, e)
return {
"status": "failed",
"error": str(e),
}
@staticmethod
def _config_changed(client: MCPClient, new_cfg: MCPServerConfig) -> bool:
"""Check if a server's config has changed (needs restart)."""
return (
client._command != new_cfg.command
or client._args != new_cfg.args
or client._timeout != new_cfg.timeout
)

135
src/mcp/registry.py Normal file
View File

@@ -0,0 +1,135 @@
"""Per-session MCP registry.
Each session gets its own MCPManager with project-specific env vars.
The global mcp.json defines WHAT servers to run. The session's mcp_env
defines the project-specific variables (ACAI_WEB_URL, etc.).
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Any
from .client import MCPClient
from .config import MCPConfigFile, load_mcp_config
from .manager import MCPManager
logger = logging.getLogger(__name__)
class MCPRegistry:
"""Manages per-session MCPManager instances.
Uses a global mcp.json as template. Each session gets its own
set of MCP subprocesses with session-specific env vars merged in.
"""
def __init__(self, config_path: str | Path | None = None) -> None:
self._config_path = Path(config_path) if config_path else None
self._config: MCPConfigFile | None = None
self._sessions: dict[str, MCPManager] = {} # session_id → MCPManager
def load_config(self) -> None:
"""Load the global MCP config template."""
if self._config_path:
self._config = load_mcp_config(self._config_path)
logger.info(
"MCP registry loaded template: %d server(s) — %s",
len(self._config.mcpServers),
", ".join(self._config.mcpServers.keys()),
)
else:
self._config = MCPConfigFile()
logger.info("MCP registry: no config path, no servers")
@property
def has_config(self) -> bool:
return self._config is not None and len(self._config.mcpServers) > 0
@property
def server_names(self) -> list[str]:
if not self._config:
return []
return list(self._config.mcpServers.keys())
async def create_for_session(
self,
session_id: str,
mcp_env: dict[str, str] | None = None,
) -> MCPManager:
"""Create and start MCP servers for a session.
The global config defines the servers. The mcp_env overrides
are merged into each server's env (so ACAI_WEB_URL etc. are
project-specific).
"""
# Clean up existing if any
await self.destroy_for_session(session_id)
if not self._config or not self._config.mcpServers:
# No MCP configured — return empty manager
manager = MCPManager()
self._sessions[session_id] = manager
return manager
manager = MCPManager()
for server_name, server_cfg in self._config.mcpServers.items():
# Merge: server-defined env + session-specific env
merged_env = {**server_cfg.env, **(mcp_env or {})}
client = MCPClient(
name=server_name,
command=server_cfg.command,
args=server_cfg.args,
timeout=server_cfg.timeout,
startup_timeout=server_cfg.startup_timeout,
env=merged_env,
)
manager.add_client(server_name, client)
results = await manager.start()
self._sessions[session_id] = manager
logger.info(
"MCP started for session %s: %s",
session_id[:12],
{k: v.get("status") for k, v in results.items()},
)
return manager
async def destroy_for_session(self, session_id: str) -> None:
"""Stop and clean up MCP servers for a session."""
manager = self._sessions.pop(session_id, None)
if manager:
await manager.stop()
logger.info("MCP stopped for session %s", session_id[:12])
def get_for_session(self, session_id: str) -> MCPManager | None:
"""Get the MCPManager for a session, if any."""
return self._sessions.get(session_id)
async def stop_all(self) -> None:
"""Stop all sessions' MCP servers (shutdown)."""
for sid in list(self._sessions.keys()):
await self.destroy_for_session(sid)
logger.info("MCP registry: all sessions stopped")
def get_status(self) -> dict[str, Any]:
"""Global status of the registry."""
sessions_status: list[dict[str, Any]] = []
for sid, manager in self._sessions.items():
sessions_status.append({
"session_id": sid[:12],
"running": manager.is_running,
"tools_count": len(manager.tools),
})
return {
"config_path": str(self._config_path) if self._config_path else None,
"template_servers": self.server_names,
"active_sessions": len(self._sessions),
"sessions": sessions_status,
}

View File

@@ -10,7 +10,7 @@ from typing import Any, AsyncIterator
from ...adapters.base import ModelAdapter, ModelConfig, StreamChunk
from ...context.engine import ContextEngine
from ...mcp.client import MCPClient
from ...mcp.manager import MCPManager
from ...memory.store import MemoryStore
from ...models.agent import AgentProfile
from ...models.artifacts import ArtifactSummary
@@ -29,7 +29,7 @@ class BaseAgent:
profile: AgentProfile,
model_adapter: ModelAdapter,
context_engine: ContextEngine,
mcp_client: MCPClient,
mcp_client: MCPManager,
memory_store: MemoryStore,
sse_emitter: SSEEmitter,
) -> None:

View File

@@ -13,7 +13,7 @@ from typing import Any
from ..adapters.base import ModelAdapter
from ..config import settings
from ..context.engine import ContextEngine
from ..mcp.client import MCPClient
from ..mcp.manager import MCPManager
from ..memory.store import MemoryStore
from ..models.agent import AgentRole
from ..models.session import SessionState, SessionStatus, TaskStatus
@@ -34,7 +34,7 @@ class OrchestratorEngine:
self,
model_adapter: ModelAdapter,
context_engine: ContextEngine,
mcp_client: MCPClient,
mcp_client: MCPManager,
memory_store: MemoryStore,
sse_emitter: SSEEmitter,
) -> None: