From 43337e8554123f1a575bf54ce9a8d073e5f3827b Mon Sep 17 00:00:00 2001 From: Jordan Diaz Date: Wed, 10 Jun 2026 15:17:52 +0000 Subject: [PATCH] Hardening: lock de sesion atomico, monitor off por defecto, fix DeepSeek reasoning-only - session_lock: token uuid + compare-and-delete (Lua), TTL > timeout de ejecucion; abort solo limpia el lock tras cancelacion confirmada. Evita doble ejecucion concurrente sobre la misma sesion. - monitor HTTP (puerto 4545) deshabilitado salvo MCP_MONITOR_ENABLED=true y atado a 127.0.0.1; no se acumula historial en memoria si esta off. - DeepSeek/LiteLLM: turnos que llegan solo con reasoning_content (sin content ni tool_calls) ya no rompen la sesion (400 'Invalid assistant message') ni se pintan como 'pensando': se promueven a texto en el historial y en el snapshot persistido. - litellm pinneado a ==1.80.0 (builds reproducibles). Co-Authored-By: Claude Fable 5 --- mcp-server/config/index.js | 7 ++++++ mcp-server/index.js | 9 +++++--- mcp-server/monitor.js | 12 +++++----- requirements.txt | 2 +- src/adapters/openai_adapter.py | 13 +++++++++-- src/api/routes.py | 23 ++++++++++-------- src/orchestrator/agents/base.py | 28 ++++++++++++++++++++++ src/storage/redis.py | 41 +++++++++++++++++++++++++++------ 8 files changed, 107 insertions(+), 28 deletions(-) diff --git a/mcp-server/config/index.js b/mcp-server/config/index.js index 6a7c7a0..275b3cf 100644 --- a/mcp-server/config/index.js +++ b/mcp-server/config/index.js @@ -15,6 +15,13 @@ export const CONFIG_FILE_PATH = export const MCP_PORT = Number(process.env.MCP_PORT || 3000); export const MONITOR_PORT = Number(process.env.MCP_MONITOR_PORT || 4545); +// El monitor HTTP (UI + POST /retry) queda DESACTIVADO por defecto. Solo se +// arranca si MCP_MONITOR_ENABLED === 'true' de forma explicita. +export const MONITOR_ENABLED = + String(process.env.MCP_MONITOR_ENABLED || "").toLowerCase() === "true"; +// Por seguridad escucha solo en loopback salvo que se defina MCP_MONITOR_HOST. +export const MONITOR_HOST = process.env.MCP_MONITOR_HOST || "127.0.0.1"; +// Compatibilidad: si alguien fuerza MCP_MONITOR_DISABLED tambien lo respetamos. export const MONITOR_DISABLED = String(process.env.MCP_MONITOR_DISABLED || "").toLowerCase() === "1" || String(process.env.MCP_MONITOR_DISABLED || "").toLowerCase() === "true"; diff --git a/mcp-server/index.js b/mcp-server/index.js index 07e54dc..3b4f660 100644 --- a/mcp-server/index.js +++ b/mcp-server/index.js @@ -6,7 +6,7 @@ */ // Load configuration first -import { loadLocalConfigProfile, applyProfileToEnv } from "./config/index.js"; +import { loadLocalConfigProfile, applyProfileToEnv, MONITOR_ENABLED, MONITOR_DISABLED } from "./config/index.js"; // Load and apply config profile (backward compatibility) const selectedProfile = loadLocalConfigProfile(); @@ -30,8 +30,11 @@ import { registerResources } from "./resources/index.js"; // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ setRegistrationFunctions({ registerPrompts, registerTools, registerResources }); -// Create the shared request monitor (will be applied to each session server) -const requestMonitor = createRequestMonitor(); +// Create the shared request monitor (will be applied to each session server). +// Solo se crea si el monitor esta habilitado: asi no acumulamos historial en +// memoria ni envolvemos los handlers cuando la UI esta apagada (por defecto). +const monitorActive = MONITOR_ENABLED && !MONITOR_DISABLED; +const requestMonitor = monitorActive ? createRequestMonitor() : null; // Create a server instance for retry functionality in the monitor UI const server = createMcpServer(); diff --git a/mcp-server/monitor.js b/mcp-server/monitor.js index c20c8d8..1e9ca9a 100644 --- a/mcp-server/monitor.js +++ b/mcp-server/monitor.js @@ -2,7 +2,7 @@ import http from "node:http"; import fsPromises from "node:fs/promises"; import path from "node:path"; import { fileURLToPath } from "node:url"; -import { MONITOR_PORT, MONITOR_DISABLED } from "./config/index.js"; +import { MONITOR_PORT, MONITOR_HOST, MONITOR_ENABLED, MONITOR_DISABLED } from "./config/index.js"; import { sessionCredentials } from "./auth/credentials.js"; import { activeSessions } from "./httpServer.js"; @@ -84,8 +84,8 @@ export function broadcastSessionsUpdate() { * Start the monitor HTTP server */ export function startMonitorServer(requestMonitor, toolHandlers) { - if (MONITOR_DISABLED) { - console.error("MCP monitor UI deshabilitada (MCP_MONITOR_DISABLED=1)."); + if (!MONITOR_ENABLED || MONITOR_DISABLED) { + console.error("[monitor] deshabilitado (MCP_MONITOR_ENABLED!=true)"); return null; } @@ -202,12 +202,12 @@ export function startMonitorServer(requestMonitor, toolHandlers) { monitorServer.on("error", (error) => { console.warn( - `[monitor] No se pudo iniciar la UI en el puerto ${MONITOR_PORT}: ${error.message}. Establece MCP_MONITOR_DISABLED=1 para ocultar este aviso.` + `[monitor] No se pudo iniciar la UI en ${MONITOR_HOST}:${MONITOR_PORT}: ${error.message}. Desactiva MCP_MONITOR_ENABLED para ocultar este aviso.` ); }); - monitorServer.listen(MONITOR_PORT, '0.0.0.0', () => { - console.error(`MCP monitor UI: http://0.0.0.0:${MONITOR_PORT}/monitor`); + monitorServer.listen(MONITOR_PORT, MONITOR_HOST, () => { + console.error(`MCP monitor UI: http://${MONITOR_HOST}:${MONITOR_PORT}/monitor`); }); // Broadcast sessions + stats update every 2 seconds for real-time monitoring diff --git a/requirements.txt b/requirements.txt index 5ebfd0c..53633a1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,7 @@ pydantic-settings>=2.7.0,<3.0.0 redis[hiredis]>=5.2.0,<6.0.0 anthropic>=0.42.0,<1.0.0 openai>=1.60.0,<2.0.0 -litellm>=1.50.0,<2.0.0 +litellm==1.80.0 httpx>=0.28.0,<1.0.0 sse-starlette>=2.2.0,<3.0.0 tiktoken>=0.7.0,<1.0.0 diff --git a/src/adapters/openai_adapter.py b/src/adapters/openai_adapter.py index 74775f5..9563cd0 100644 --- a/src/adapters/openai_adapter.py +++ b/src/adapters/openai_adapter.py @@ -413,9 +413,18 @@ class OpenAIAdapter(ModelAdapter): "arguments": json.dumps(b.get("input", {}), ensure_ascii=False), }, }) - m: dict[str, Any] = {"role": "assistant", "content": ("\n".join(p for p in text_parts if p) or None)} + text_joined = "\n".join(p for p in text_parts if p) + m: dict[str, Any] = {"role": "assistant", "content": (text_joined or None)} if reasoning_parts: - m["reasoning_content"] = "\n".join(reasoning_parts) + if not text_joined and not tool_calls: + # Quirk DeepSeek thinking: a veces emite TODA la respuesta + # en reasoning_content y cierra sin content ni tool_calls. + # Reenviar content=None sin tool_calls rompe la API + # ("content or tool_calls must be set"), asi que promovemos + # el reasoning a content (sin duplicarlo como reasoning_content). + m["content"] = "\n".join(reasoning_parts) + else: + m["reasoning_content"] = "\n".join(reasoning_parts) if tool_calls: m["tool_calls"] = tool_calls out.append(m) diff --git a/src/api/routes.py b/src/api/routes.py index 1143197..de6f29d 100644 --- a/src/api/routes.py +++ b/src/api/routes.py @@ -427,9 +427,9 @@ async def _execute_and_persist(orchestrator, storage, session, message) -> dict[ async def abort_session(session_id: str) -> dict[str, Any]: """Cancela la ejecución en curso de una sesión (botón Stop del chat). - Cancela la tarea detached (liberando el session_lock), cierra el stream SSE - de los suscriptores y limpia un posible lock huérfano. Idempotente: si no - hay nada en curso devuelve `no_active_execution` sin error. + Cancela la tarea detached (liberando el session_lock) y cierra el stream + SSE de los suscriptores. Idempotente: si no hay nada en curso devuelve + `no_active_execution` sin error. """ storage = _get_storage() session = await storage.get_session(session_id) @@ -452,12 +452,17 @@ async def abort_session(session_id: str) -> dict[str, Any]: except Exception as e: logger.warning("Failed to close SSE stream on abort for %s: %s", session_id, e) - # Defensa: liberar un lock huérfano (p.ej. de una ejecución previa que crasheó - # antes de soltarlo) para no bloquear el siguiente mensaje hasta el TTL. - try: - await storage.clear_session_lock(session_id) - except Exception as e: - logger.warning("Failed to clear session lock on abort for %s: %s", session_id, e) + # Limpiar el lock SOLO si cancelamos una ejecución de verdad: el `finally` + # de la tarea cancelada puede no llegar a liberar el lock de forma fiable. + # `clear_session_lock` borra incondicional (sin conocer el token del lock), + # así que invocarlo sin cancelación confirmada borraría el lock de una + # ejecución síncrona (stream=false) aún viva — que no se registra en + # _running_executions — y permitiría una segunda ejecución concurrente. + if cancelled: + try: + await storage.clear_session_lock(session_id) + except Exception as e: + logger.warning("Failed to clear session lock on abort for %s: %s", session_id, e) return { "session_id": session_id, diff --git a/src/orchestrator/agents/base.py b/src/orchestrator/agents/base.py index 388181b..533dd7e 100644 --- a/src/orchestrator/agents/base.py +++ b/src/orchestrator/agents/base.py @@ -289,6 +289,34 @@ class BaseAgent: # If no tool calls, we're done if not tool_calls: + # Quirk DeepSeek thinking: a veces el modelo emite TODA su + # respuesta como reasoning y cierra el turno sin text ni + # tool_use. Si el turno termina SOLO con bloques thinking, + # promovemos el thinking a un bloque text en el snapshot que + # se persiste — asi el UI no lo muestra como "pensando" al + # recargar y el siguiente turno no rompe con + # "content or tool_calls must be set". + if turn_blocks and all(b.get("type") == "thinking" for b in turn_blocks): + promoted = "\n".join( + b.get("thinking", "") for b in turn_blocks if b.get("thinking") + ) + turn_blocks = [{"type": "text", "text": promoted}] + accumulated_content += promoted + if promoted and self.profile.stream_deltas: + # Emision en vivo via AGENT_DELTA normal: el + # ClaudeFormatEmitter cierra el thinking block abierto + # (content_block_stop) y abre un text block nuevo con + # su propio indice (start/delta/stop), asi que el + # protocolo de bloques no se rompe. + await self.sse.emit( + EventType.AGENT_DELTA, + { + "agent": self.profile.role, + "delta": promoted, + "step": step, + }, + session_id=session.session_id, + ) if turn_blocks: conversation.append({"role": "assistant", "content": turn_blocks}) elif full_text: diff --git a/src/storage/redis.py b/src/storage/redis.py index ac0603c..7a75ef9 100644 --- a/src/storage/redis.py +++ b/src/storage/redis.py @@ -12,6 +12,7 @@ from __future__ import annotations import json import logging +import uuid from contextlib import asynccontextmanager from typing import Any, AsyncIterator @@ -127,14 +128,26 @@ class RedisStorage: # Execution lock (prevents concurrent messages on same session) # ------------------------------------------------------------------ + # Compare-and-delete atómico: solo borra el lock si el valor coincide con + # el token de quien lo adquirió. Evita que una ejecución cuyo lock expiró + # por TTL borre en su `finally` el lock que ya adquirió otra petición. + _UNLOCK_LUA = ( + "if redis.call('get', KEYS[1]) == ARGV[1] then " + "return redis.call('del', KEYS[1]) else return 0 end" + ) + @asynccontextmanager async def session_lock( - self, session_id: str, timeout: int = 300 + self, session_id: str, timeout: int | None = None ) -> AsyncIterator[bool]: """Acquire an exclusive execution lock for a session. Uses SETNX with auto-expiry to prevent deadlocks if the process - crashes mid-execution. + crashes mid-execution. El TTL es mayor que el timeout global de + ejecución para que el lock no expire (y otra petición lo robe) + mientras la ejecución original sigue viva. Cada adquisición guarda + un token único como valor y la liberación es compare-and-delete + (Lua), de modo que solo el dueño puede borrar el lock. Usage: async with storage.session_lock(session_id) as acquired: @@ -142,20 +155,34 @@ class RedisStorage: raise HTTPException(409, "Session busy") # ... execute ... """ + if timeout is None: + timeout = int(settings.max_execution_timeout_seconds) + 60 key = self._key("session", session_id, "lock") - acquired = await self.client.set(key, "1", nx=True, ex=timeout) + token = uuid.uuid4().hex + acquired = await self.client.set(key, token, nx=True, ex=timeout) try: yield bool(acquired) finally: if acquired: - await self.client.delete(key) + released = await self.client.eval(self._UNLOCK_LUA, 1, key, token) + if not released: + # El lock expiró por TTL y/o lo posee otra petición — no + # tocamos nada, pero lo dejamos registrado. + logger.warning( + "session_lock for %s no longer owned at release " + "(expired or taken over)", + session_id, + ) async def clear_session_lock(self, session_id: str) -> None: """Borra el lock de ejecución de una sesión de forma incondicional. - Usado por el endpoint de abort para liberar un lock huérfano (de una - ejecución previa que crasheó antes de soltarlo) y no bloquear el - siguiente mensaje hasta que expire el TTL. + OJO: borra sin conocer el token del dueño, así que se salta el + compare-and-delete de `session_lock`. SOLO debe invocarse cuando se + ha confirmado que la ejecución dueña del lock fue cancelada (ver + `abort_session` en routes.py): la tarea cancelada puede no ejecutar + su `finally` de liberación de forma fiable, y en ese caso no hay + riesgo de borrar el lock de una ejecución viva. """ key = self._key("session", session_id, "lock") await self.client.delete(key)