"""REST API endpoints for the agentic microservice.""" from __future__ import annotations import asyncio import logging import pathlib from typing import Any from fastapi import APIRouter, HTTPException, Request from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field from ..models.context import MemoryDocument, MemoryType from ..models.session import SessionState, SessionStatus from ..orchestrator.engine import OrchestratorEngine from ..streaming.sse import EventType logger = logging.getLogger(__name__) router = APIRouter() # ------------------------------------------------------------------ # Request / Response schemas # ------------------------------------------------------------------ class CreateSessionRequest(BaseModel): project_profile: dict[str, Any] = Field(default_factory=dict) immutable_rules: list[str] = Field(default_factory=list) metadata: dict[str, Any] = Field(default_factory=dict) mcp_env: dict[str, str] = Field( default_factory=dict, description="Per-project env vars for MCP servers (e.g. ACAI_WEB_URL, ACAI_PROJECT_DIR)", ) agent_id: str = "acai" class CreateSessionResponse(BaseModel): session_id: str status: str class SendMessageRequest(BaseModel): message: str stream: bool = False agent_id: str | None = None class SessionResponse(BaseModel): session_id: str status: str turn_count: int current_task: dict[str, Any] | None = None completed_tasks: list[str] = Field(default_factory=list) created_at: str updated_at: str agent_id: str = "acai" # ------------------------------------------------------------------ # Dependency helpers (set by main.py at startup) # ------------------------------------------------------------------ _deps: dict[str, Any] = {} def set_dependencies( storage: Any, model_adapter: Any, context_engine: Any, memory_store: Any, sse_emitter: Any, claude_emitter: Any = None, mcp_registry: Any = None, agent_registry: Any = None, ) -> None: _deps["storage"] = storage _deps["model_adapter"] = model_adapter _deps["context_engine"] = context_engine _deps["memory_store"] = memory_store _deps["sse"] = sse_emitter _deps["claude_sse"] = claude_emitter _deps["mcp_registry"] = mcp_registry _deps["agent_registry"] = agent_registry def _get_storage(): return _deps["storage"] def _get_sse(): return _deps["sse"] def _get_mcp_registry(): return _deps["mcp_registry"] def _get_agent_registry(): return _deps["agent_registry"] def _build_orchestrator(mcp_manager, agent_profile) -> OrchestratorEngine: """Build an orchestrator with a session-specific MCPManager.""" return OrchestratorEngine( model_adapter=_deps["model_adapter"], context_engine=_deps["context_engine"], mcp_client=mcp_manager, memory_store=_deps["memory_store"], sse_emitter=_deps["sse"], agent_profile=agent_profile, ) # ------------------------------------------------------------------ # POST /sessions # ------------------------------------------------------------------ @router.post("/sessions", response_model=CreateSessionResponse, status_code=201) async def create_session(body: CreateSessionRequest) -> CreateSessionResponse: # Validar agent_id en el registry agent_reg = _get_agent_registry() if agent_reg and not agent_reg.get(body.agent_id): raise HTTPException(status_code=400, detail="Agent not found") storage = _get_storage() session = SessionState( project_profile=body.project_profile, immutable_rules=body.immutable_rules, metadata=body.metadata, ) session.agent_id = body.agent_id # Store mcp_env in session metadata for reconnection if body.mcp_env: session.metadata["mcp_env"] = body.mcp_env await storage.create_session(session) # Start per-session MCP servers with project-specific env registry = _get_mcp_registry() if registry.has_config: await registry.create_for_session(session.session_id, body.mcp_env) sse = _get_sse() await sse.emit( EventType.SESSION_CREATED, {"session_id": session.session_id}, session_id=session.session_id, ) logger.info("Session created: %s", session.session_id) return CreateSessionResponse( session_id=session.session_id, status=session.status.value, ) # ------------------------------------------------------------------ # POST /sessions/{id}/messages # ------------------------------------------------------------------ @router.post("/sessions/{session_id}/messages") async def send_message( session_id: str, body: SendMessageRequest ) -> dict[str, Any]: storage = _get_storage() session = await storage.get_session(session_id) if not session: raise HTTPException(status_code=404, detail="Session not found") # Get or create session's MCP manager registry = _get_mcp_registry() mcp_manager = registry.get_for_session(session_id) if not mcp_manager and registry.has_config: # Reconnect MCP (e.g. after server restart) mcp_env = session.metadata.get("mcp_env", {}) mcp_manager = await registry.create_for_session(session_id, mcp_env) # Cambiar agente mid-session si se solicita if body.agent_id and body.agent_id != session.agent_id: agent_reg_check = _get_agent_registry() if agent_reg_check and agent_reg_check.get(body.agent_id): session.agent_id = body.agent_id # Resolver agent profile desde el registry agent_reg = _get_agent_registry() agent_profile = None if agent_reg: agent_profile = agent_reg.get(session.agent_id) if not agent_profile: agent_profile = agent_reg.get(agent_reg.default_agent_id) from ..mcp.manager import MCPManager orchestrator = _build_orchestrator(mcp_manager or MCPManager(), agent_profile) if body.stream: asyncio.create_task(_execute_and_persist(orchestrator, storage, session, body.message)) return { "session_id": session_id, "status": "executing", "stream_url": f"/sessions/{session_id}/stream", } result = await _execute_and_persist(orchestrator, storage, session, body.message) return result async def _execute_and_persist(orchestrator, storage, session, message) -> dict[str, Any]: # Acquire exclusive lock — prevents concurrent execution on same session async with storage.session_lock(session.session_id) as acquired: if not acquired: return { "session_id": session.session_id, "content": "Error: session is busy — another request is executing", "status": "busy", } try: result = await orchestrator.process_message(session, message) return result except Exception as e: session.status = SessionStatus.ERROR logger.exception("Execution failed for session %s", session.session_id) return { "session_id": session.session_id, "content": f"Error: {e}", "status": "error", } finally: try: await storage.update_session(session) except Exception as e: logger.error("Failed to persist session state: %s", e) # ------------------------------------------------------------------ # GET /sessions/{id}/stream # ------------------------------------------------------------------ @router.get("/sessions/{session_id}/stream") async def stream_session(session_id: str, format: str = "native") -> StreamingResponse: storage = _get_storage() session = await storage.get_session(session_id) if not session: raise HTTPException(status_code=404, detail="Session not found") headers = { "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", } if format == "claude": claude_sse = _deps.get("claude_sse") if not claude_sse: raise HTTPException(status_code=501, detail="Claude format emitter not available") return StreamingResponse( claude_sse.subscribe(session_id), media_type="text/event-stream", headers=headers, ) sse = _get_sse() return StreamingResponse( sse.subscribe(session_id), media_type="text/event-stream", headers=headers, ) # ------------------------------------------------------------------ # GET /sessions/{id} # ------------------------------------------------------------------ @router.get("/sessions/{session_id}", response_model=SessionResponse) async def get_session(session_id: str) -> SessionResponse: storage = _get_storage() session = await storage.get_session(session_id) if not session: raise HTTPException(status_code=404, detail="Session not found") return SessionResponse( session_id=session.session_id, status=session.status.value, turn_count=session.turn_count, current_task=session.current_task.model_dump() if session.current_task else None, completed_tasks=session.completed_tasks, created_at=session.created_at.isoformat(), updated_at=session.updated_at.isoformat(), agent_id=session.agent_id, ) # ------------------------------------------------------------------ # DELETE /sessions/{id} # ------------------------------------------------------------------ @router.delete("/sessions/{session_id}") async def delete_session(session_id: str) -> dict[str, str]: storage = _get_storage() deleted = await storage.delete_session(session_id) if not deleted: raise HTTPException(status_code=404, detail="Session not found") # Stop session's MCP servers registry = _get_mcp_registry() await registry.destroy_for_session(session_id) sse = _get_sse() sse.cleanup_session(session_id) return {"status": "deleted", "session_id": session_id} # ------------------------------------------------------------------ # GET /sessions/{id}/events # ------------------------------------------------------------------ @router.get("/sessions/{session_id}/events") async def get_session_events(session_id: str) -> list[dict[str, Any]]: sse = _get_sse() return await sse.get_history(session_id) # ------------------------------------------------------------------ # GET /sessions/{id}/context-debug # ------------------------------------------------------------------ @router.get("/sessions/{session_id}/context-debug") async def get_context_debug(session_id: str) -> dict[str, Any]: """Returns the full context engine debug history for a session. Shows exactly what each agent received: sections, token counts, priorities, compaction status, and content previews. """ ctx_engine = _deps.get("context_engine") if not ctx_engine: raise HTTPException(status_code=501, detail="Context engine not available") history = ctx_engine.get_debug_history(session_id) last = ctx_engine.get_last_context_debug(session_id) full_context = ctx_engine.get_last_full_context(session_id) return { "session_id": session_id, "total_builds": len(history), "last_build": last, "full_context": full_context, "history": history, } # ------------------------------------------------------------------ # GET /agents # ------------------------------------------------------------------ @router.get("/agents") async def list_agents() -> dict[str, Any]: """Lista todos los agentes disponibles.""" registry = _get_agent_registry() return { "agents": registry.list_agents(), "default": registry.default_agent_id, } # ------------------------------------------------------------------ # GET /agents/{agent_id} # ------------------------------------------------------------------ @router.get("/agents/{agent_id}") async def get_agent(agent_id: str) -> dict[str, Any]: """Detalle completo de un agente, incluyendo system prompt.""" registry = _get_agent_registry() profile = registry.get(agent_id) if not profile: raise HTTPException(status_code=404, detail="Agent not found") return { "id": profile.name, "display_name": profile.display_name, "description": profile.description, "icon": profile.icon, "category": profile.category, "temperature": profile.temperature, "max_tokens": profile.max_tokens, "model_id": profile.model_id, "system_prompt": profile.system_prompt, "context_sections": profile.context_sections, "stream_deltas": profile.stream_deltas, } # ------------------------------------------------------------------ # Knowledge Base # ------------------------------------------------------------------ class LoadKnowledgeRequest(BaseModel): docs_path: str = "docs" async def _load_knowledge_from_dir(docs_path: str = "docs") -> dict[str, Any]: """Load knowledge docs from directory. Used by endpoint and startup.""" memory = _deps.get("memory_store") if not memory: return {"status": "error", "message": "Memory store not available"} docs_dir = pathlib.Path(docs_path) if not docs_dir.is_absolute(): docs_dir = pathlib.Path(__file__).resolve().parent.parent.parent / docs_path if not docs_dir.is_dir(): return {"status": "error", "message": f"Directory not found: {docs_dir}"} # Read all docs docs_data: list[tuple[str, str, str, str, list[str]]] = [] # (id, title, content, summary, tags) for md_file in sorted(docs_dir.glob("*.md")): content = md_file.read_text(encoding="utf-8") doc_id = md_file.stem lines = content.strip().splitlines() title = lines[0].lstrip("#").strip() if lines else doc_id summary_lines = [] for line in lines[:30]: line = line.strip() if line and not line.startswith("#"): summary_lines.append(line) if len(" ".join(summary_lines)) > 500: break summary = " ".join(summary_lines)[:500] tags = [] for line in lines: if line.startswith("## "): tags.append(line.lstrip("#").strip().lower()[:30]) docs_data.append((doc_id, title, content, summary, tags[:10])) # Generate embeddings in batch from ..memory.embeddings import EmbeddingService embed_service = EmbeddingService() embed_texts = [f"{title}\n{summary}\n{content[:2000]}" for _, title, content, summary, _ in docs_data] try: embeddings = await embed_service.embed_batch(embed_texts) has_embeddings = True logger.info("Generated %d embeddings for knowledge base", len(embeddings)) except Exception as e: logger.warning("Failed to generate embeddings: %s — loading without semantic search", e) embeddings = [None] * len(docs_data) has_embeddings = False # Store docs + embeddings loaded = [] for i, (doc_id, title, content, summary, tags) in enumerate(docs_data): doc = MemoryDocument( memory_id=doc_id, memory_type=MemoryType.DOCUMENT, namespace="knowledge", title=title, content=content, summary=summary, tags=tags, ) await memory.store_document(doc) if embeddings[i] is not None: await memory.store_embedding(doc_id, embeddings[i], namespace="knowledge") loaded.append({ "id": doc_id, "title": title, "chars": len(content), "tags": tags[:5], "embedded": embeddings[i] is not None, }) logger.info("Loaded %d knowledge documents from %s (embeddings: %s)", len(loaded), docs_dir, has_embeddings) return { "status": "loaded", "count": len(loaded), "embeddings": has_embeddings, "documents": loaded, } @router.post("/knowledge/load") async def load_knowledge(body: LoadKnowledgeRequest) -> dict[str, Any]: """Load markdown docs from a directory into the knowledge base. Generates embeddings for semantic search via OpenAI text-embedding-3-small. """ result = await _load_knowledge_from_dir(body.docs_path) if result.get("status") == "error": raise HTTPException(status_code=501, detail=result["message"]) return result @router.get("/knowledge") async def list_knowledge() -> dict[str, Any]: """List all documents in the knowledge base.""" memory = _deps.get("memory_store") if not memory: raise HTTPException(status_code=501, detail="Memory store not available") docs = await memory.list_documents(namespace="knowledge") return { "count": len(docs), "documents": [ { "id": d.memory_id, "title": d.title, "chars": len(d.content), "summary": d.summary[:200], "tags": d.tags, "updated_at": d.updated_at.isoformat(), } for d in docs ], } @router.delete("/knowledge/{doc_id}") async def delete_knowledge(doc_id: str) -> dict[str, str]: """Remove a document from the knowledge base.""" memory = _deps.get("memory_store") if not memory: raise HTTPException(status_code=501, detail="Memory store not available") deleted = await memory.delete_document(doc_id, namespace="knowledge") if not deleted: raise HTTPException(status_code=404, detail="Document not found") return {"status": "deleted", "id": doc_id} # ------------------------------------------------------------------ # MCP Management # ------------------------------------------------------------------ @router.get("/mcp/status") async def mcp_status() -> dict[str, Any]: """Status of all MCP servers across sessions.""" registry = _get_mcp_registry() return registry.get_status() @router.post("/mcp/reload") async def mcp_reload() -> dict[str, Any]: """Hot-reload MCP config template (does not affect running sessions).""" registry = _get_mcp_registry() try: registry.load_config() return { "status": "reloaded", "servers": registry.server_names, } except ValueError as e: raise HTTPException(status_code=400, detail=str(e))