Files
agenticSystem/src/main.py
Jordan Diaz df7dfbc280 SSE en formato Claude Code CLI via ?format=claude
Nuevo ClaudeFormatEmitter traduce eventos nativos al formato exacto
que produce Claude Code CLI: content_block_start/delta/stop, tool_result,
assistant snapshots, result con usage/cost, done.

- streaming/claude_format.py: ClaudeFormatEmitter + DualEmitter
- base.py: enriquecer eventos con tool_call_id, raw_output, tool_arguments
- engine.py: usage/cost en EXECUTION_COMPLETED
- routes.py: ?format=claude en /sessions/{id}/stream
- main.py: DualEmitter wiring (emite a ambos formatos)

El frontend puede consumir el stream sin cambios — mismos event types
que Claude Code CLI. El formato nativo sigue disponible para el dashboard.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 18:48:07 +00:00

139 lines
4.4 KiB
Python

"""Agentic Microservice — FastAPI application entry point.
Wires together all components: Redis storage, model adapters, MCP registry,
context engine, orchestrator, and SSE streaming.
MCP servers are per-session: the global mcp.json defines WHAT servers
to run, and each session provides project-specific env vars.
"""
from __future__ import annotations
import logging
import pathlib
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import RedirectResponse
from fastapi.staticfiles import StaticFiles
from .adapters.claude_adapter import ClaudeAdapter
from .adapters.openai_adapter import OpenAIAdapter
from .api.routes import router, set_dependencies
from .config import settings
from .context.engine import ContextEngine
from .mcp.registry import MCPRegistry
from .memory.store import MemoryStore
from .orchestrator.engine import OrchestratorEngine
from .storage.redis import RedisStorage
from .streaming.claude_format import ClaudeFormatEmitter, DualEmitter
from .streaming.sse import SSEEmitter
logging.basicConfig(
level=logging.DEBUG if settings.debug else logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
# Global instances (initialized in lifespan)
redis_storage = RedisStorage()
sse_emitter = SSEEmitter(redis_storage=redis_storage)
claude_emitter = ClaudeFormatEmitter()
dual_emitter = DualEmitter(sse_emitter, claude_emitter)
mcp_registry = MCPRegistry()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifecycle: startup and shutdown."""
logger.info("Starting %s v%s", settings.service_name, settings.service_version)
# 1. Connect Redis
await redis_storage.connect()
# 2. Initialize model adapter
if settings.default_model_provider == "openai":
model_adapter = OpenAIAdapter()
logger.info("Using OpenAI adapter (model: %s)", settings.default_model_id)
else:
model_adapter = ClaudeAdapter()
logger.info("Using Claude adapter (model: %s)", settings.default_model_id)
# 3. Initialize memory store
memory_store = MemoryStore(redis_storage.client)
# 4. Initialize context engine
context_engine = ContextEngine(memory_store=memory_store)
# 5. Load MCP config template (servers are started per-session)
if settings.mcp_config_path:
config_path = pathlib.Path(settings.mcp_config_path)
if not config_path.is_absolute():
config_path = pathlib.Path(__file__).resolve().parent.parent / settings.mcp_config_path
mcp_registry._config_path = config_path
elif settings.mcp_server_command:
# Legacy: create a synthetic config from env vars
from .mcp.config import MCPConfigFile, MCPServerConfig
mcp_registry._config = MCPConfigFile(mcpServers={
"default": MCPServerConfig(
command=settings.mcp_server_command,
args=list(settings.mcp_server_args),
)
})
mcp_registry.load_config()
# 6. Wire dependencies (orchestrator is created per-message with session's MCP)
dual_emitter.set_storage(redis_storage)
set_dependencies(
storage=redis_storage,
model_adapter=model_adapter,
context_engine=context_engine,
memory_store=memory_store,
sse_emitter=dual_emitter,
claude_emitter=claude_emitter,
mcp_registry=mcp_registry,
)
logger.info("All systems initialized. Serving on %s:%d", settings.host, settings.port)
yield
# Shutdown
logger.info("Shutting down...")
await mcp_registry.stop_all()
await redis_storage.disconnect()
logger.info("Shutdown complete.")
app = FastAPI(
title=settings.service_name,
version=settings.service_version,
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(router, prefix="/api/v1")
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "ok", "service": settings.service_name}
@app.get("/")
async def root():
return RedirectResponse(url="/dashboard/")
_dashboard_dir = pathlib.Path(__file__).resolve().parent.parent / "dashboard"
if _dashboard_dir.is_dir():
app.mount("/dashboard", StaticFiles(directory=str(_dashboard_dir), html=True), name="dashboard")