Hardening: lock de sesion atomico, monitor off por defecto, fix DeepSeek reasoning-only

- session_lock: token uuid + compare-and-delete (Lua), TTL > timeout de
  ejecucion; abort solo limpia el lock tras cancelacion confirmada.
  Evita doble ejecucion concurrente sobre la misma sesion.
- monitor HTTP (puerto 4545) deshabilitado salvo MCP_MONITOR_ENABLED=true
  y atado a 127.0.0.1; no se acumula historial en memoria si esta off.
- DeepSeek/LiteLLM: turnos que llegan solo con reasoning_content (sin
  content ni tool_calls) ya no rompen la sesion (400 'Invalid assistant
  message') ni se pintan como 'pensando': se promueven a texto en el
  historial y en el snapshot persistido.
- litellm pinneado a ==1.80.0 (builds reproducibles).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
Jordan Diaz
2026-06-10 15:17:52 +00:00
parent 6a03fdf284
commit 43337e8554
8 changed files with 107 additions and 28 deletions

View File

@@ -15,6 +15,13 @@ export const CONFIG_FILE_PATH =
export const MCP_PORT = Number(process.env.MCP_PORT || 3000); export const MCP_PORT = Number(process.env.MCP_PORT || 3000);
export const MONITOR_PORT = Number(process.env.MCP_MONITOR_PORT || 4545); export const MONITOR_PORT = Number(process.env.MCP_MONITOR_PORT || 4545);
// El monitor HTTP (UI + POST /retry) queda DESACTIVADO por defecto. Solo se
// arranca si MCP_MONITOR_ENABLED === 'true' de forma explicita.
export const MONITOR_ENABLED =
String(process.env.MCP_MONITOR_ENABLED || "").toLowerCase() === "true";
// Por seguridad escucha solo en loopback salvo que se defina MCP_MONITOR_HOST.
export const MONITOR_HOST = process.env.MCP_MONITOR_HOST || "127.0.0.1";
// Compatibilidad: si alguien fuerza MCP_MONITOR_DISABLED tambien lo respetamos.
export const MONITOR_DISABLED = export const MONITOR_DISABLED =
String(process.env.MCP_MONITOR_DISABLED || "").toLowerCase() === "1" || String(process.env.MCP_MONITOR_DISABLED || "").toLowerCase() === "1" ||
String(process.env.MCP_MONITOR_DISABLED || "").toLowerCase() === "true"; String(process.env.MCP_MONITOR_DISABLED || "").toLowerCase() === "true";

View File

@@ -6,7 +6,7 @@
*/ */
// Load configuration first // Load configuration first
import { loadLocalConfigProfile, applyProfileToEnv } from "./config/index.js"; import { loadLocalConfigProfile, applyProfileToEnv, MONITOR_ENABLED, MONITOR_DISABLED } from "./config/index.js";
// Load and apply config profile (backward compatibility) // Load and apply config profile (backward compatibility)
const selectedProfile = loadLocalConfigProfile(); const selectedProfile = loadLocalConfigProfile();
@@ -30,8 +30,11 @@ import { registerResources } from "./resources/index.js";
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
setRegistrationFunctions({ registerPrompts, registerTools, registerResources }); setRegistrationFunctions({ registerPrompts, registerTools, registerResources });
// Create the shared request monitor (will be applied to each session server) // Create the shared request monitor (will be applied to each session server).
const requestMonitor = createRequestMonitor(); // Solo se crea si el monitor esta habilitado: asi no acumulamos historial en
// memoria ni envolvemos los handlers cuando la UI esta apagada (por defecto).
const monitorActive = MONITOR_ENABLED && !MONITOR_DISABLED;
const requestMonitor = monitorActive ? createRequestMonitor() : null;
// Create a server instance for retry functionality in the monitor UI // Create a server instance for retry functionality in the monitor UI
const server = createMcpServer(); const server = createMcpServer();

View File

@@ -2,7 +2,7 @@ import http from "node:http";
import fsPromises from "node:fs/promises"; import fsPromises from "node:fs/promises";
import path from "node:path"; import path from "node:path";
import { fileURLToPath } from "node:url"; import { fileURLToPath } from "node:url";
import { MONITOR_PORT, MONITOR_DISABLED } from "./config/index.js"; import { MONITOR_PORT, MONITOR_HOST, MONITOR_ENABLED, MONITOR_DISABLED } from "./config/index.js";
import { sessionCredentials } from "./auth/credentials.js"; import { sessionCredentials } from "./auth/credentials.js";
import { activeSessions } from "./httpServer.js"; import { activeSessions } from "./httpServer.js";
@@ -84,8 +84,8 @@ export function broadcastSessionsUpdate() {
* Start the monitor HTTP server * Start the monitor HTTP server
*/ */
export function startMonitorServer(requestMonitor, toolHandlers) { export function startMonitorServer(requestMonitor, toolHandlers) {
if (MONITOR_DISABLED) { if (!MONITOR_ENABLED || MONITOR_DISABLED) {
console.error("MCP monitor UI deshabilitada (MCP_MONITOR_DISABLED=1)."); console.error("[monitor] deshabilitado (MCP_MONITOR_ENABLED!=true)");
return null; return null;
} }
@@ -202,12 +202,12 @@ export function startMonitorServer(requestMonitor, toolHandlers) {
monitorServer.on("error", (error) => { monitorServer.on("error", (error) => {
console.warn( console.warn(
`[monitor] No se pudo iniciar la UI en el puerto ${MONITOR_PORT}: ${error.message}. Establece MCP_MONITOR_DISABLED=1 para ocultar este aviso.` `[monitor] No se pudo iniciar la UI en ${MONITOR_HOST}:${MONITOR_PORT}: ${error.message}. Desactiva MCP_MONITOR_ENABLED para ocultar este aviso.`
); );
}); });
monitorServer.listen(MONITOR_PORT, '0.0.0.0', () => { monitorServer.listen(MONITOR_PORT, MONITOR_HOST, () => {
console.error(`MCP monitor UI: http://0.0.0.0:${MONITOR_PORT}/monitor`); console.error(`MCP monitor UI: http://${MONITOR_HOST}:${MONITOR_PORT}/monitor`);
}); });
// Broadcast sessions + stats update every 2 seconds for real-time monitoring // Broadcast sessions + stats update every 2 seconds for real-time monitoring

View File

@@ -5,7 +5,7 @@ pydantic-settings>=2.7.0,<3.0.0
redis[hiredis]>=5.2.0,<6.0.0 redis[hiredis]>=5.2.0,<6.0.0
anthropic>=0.42.0,<1.0.0 anthropic>=0.42.0,<1.0.0
openai>=1.60.0,<2.0.0 openai>=1.60.0,<2.0.0
litellm>=1.50.0,<2.0.0 litellm==1.80.0
httpx>=0.28.0,<1.0.0 httpx>=0.28.0,<1.0.0
sse-starlette>=2.2.0,<3.0.0 sse-starlette>=2.2.0,<3.0.0
tiktoken>=0.7.0,<1.0.0 tiktoken>=0.7.0,<1.0.0

View File

@@ -413,8 +413,17 @@ class OpenAIAdapter(ModelAdapter):
"arguments": json.dumps(b.get("input", {}), ensure_ascii=False), "arguments": json.dumps(b.get("input", {}), ensure_ascii=False),
}, },
}) })
m: dict[str, Any] = {"role": "assistant", "content": ("\n".join(p for p in text_parts if p) or None)} text_joined = "\n".join(p for p in text_parts if p)
m: dict[str, Any] = {"role": "assistant", "content": (text_joined or None)}
if reasoning_parts: if reasoning_parts:
if not text_joined and not tool_calls:
# Quirk DeepSeek thinking: a veces emite TODA la respuesta
# en reasoning_content y cierra sin content ni tool_calls.
# Reenviar content=None sin tool_calls rompe la API
# ("content or tool_calls must be set"), asi que promovemos
# el reasoning a content (sin duplicarlo como reasoning_content).
m["content"] = "\n".join(reasoning_parts)
else:
m["reasoning_content"] = "\n".join(reasoning_parts) m["reasoning_content"] = "\n".join(reasoning_parts)
if tool_calls: if tool_calls:
m["tool_calls"] = tool_calls m["tool_calls"] = tool_calls

View File

@@ -427,9 +427,9 @@ async def _execute_and_persist(orchestrator, storage, session, message) -> dict[
async def abort_session(session_id: str) -> dict[str, Any]: async def abort_session(session_id: str) -> dict[str, Any]:
"""Cancela la ejecución en curso de una sesión (botón Stop del chat). """Cancela la ejecución en curso de una sesión (botón Stop del chat).
Cancela la tarea detached (liberando el session_lock), cierra el stream SSE Cancela la tarea detached (liberando el session_lock) y cierra el stream
de los suscriptores y limpia un posible lock huérfano. Idempotente: si no SSE de los suscriptores. Idempotente: si no hay nada en curso devuelve
hay nada en curso devuelve `no_active_execution` sin error. `no_active_execution` sin error.
""" """
storage = _get_storage() storage = _get_storage()
session = await storage.get_session(session_id) session = await storage.get_session(session_id)
@@ -452,8 +452,13 @@ async def abort_session(session_id: str) -> dict[str, Any]:
except Exception as e: except Exception as e:
logger.warning("Failed to close SSE stream on abort for %s: %s", session_id, e) logger.warning("Failed to close SSE stream on abort for %s: %s", session_id, e)
# Defensa: liberar un lock huérfano (p.ej. de una ejecución previa que crasheó # Limpiar el lock SOLO si cancelamos una ejecución de verdad: el `finally`
# antes de soltarlo) para no bloquear el siguiente mensaje hasta el TTL. # de la tarea cancelada puede no llegar a liberar el lock de forma fiable.
# `clear_session_lock` borra incondicional (sin conocer el token del lock),
# así que invocarlo sin cancelación confirmada borraría el lock de una
# ejecución síncrona (stream=false) aún viva — que no se registra en
# _running_executions — y permitiría una segunda ejecución concurrente.
if cancelled:
try: try:
await storage.clear_session_lock(session_id) await storage.clear_session_lock(session_id)
except Exception as e: except Exception as e:

View File

@@ -289,6 +289,34 @@ class BaseAgent:
# If no tool calls, we're done # If no tool calls, we're done
if not tool_calls: if not tool_calls:
# Quirk DeepSeek thinking: a veces el modelo emite TODA su
# respuesta como reasoning y cierra el turno sin text ni
# tool_use. Si el turno termina SOLO con bloques thinking,
# promovemos el thinking a un bloque text en el snapshot que
# se persiste — asi el UI no lo muestra como "pensando" al
# recargar y el siguiente turno no rompe con
# "content or tool_calls must be set".
if turn_blocks and all(b.get("type") == "thinking" for b in turn_blocks):
promoted = "\n".join(
b.get("thinking", "") for b in turn_blocks if b.get("thinking")
)
turn_blocks = [{"type": "text", "text": promoted}]
accumulated_content += promoted
if promoted and self.profile.stream_deltas:
# Emision en vivo via AGENT_DELTA normal: el
# ClaudeFormatEmitter cierra el thinking block abierto
# (content_block_stop) y abre un text block nuevo con
# su propio indice (start/delta/stop), asi que el
# protocolo de bloques no se rompe.
await self.sse.emit(
EventType.AGENT_DELTA,
{
"agent": self.profile.role,
"delta": promoted,
"step": step,
},
session_id=session.session_id,
)
if turn_blocks: if turn_blocks:
conversation.append({"role": "assistant", "content": turn_blocks}) conversation.append({"role": "assistant", "content": turn_blocks})
elif full_text: elif full_text:

View File

@@ -12,6 +12,7 @@ from __future__ import annotations
import json import json
import logging import logging
import uuid
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Any, AsyncIterator from typing import Any, AsyncIterator
@@ -127,14 +128,26 @@ class RedisStorage:
# Execution lock (prevents concurrent messages on same session) # Execution lock (prevents concurrent messages on same session)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Compare-and-delete atómico: solo borra el lock si el valor coincide con
# el token de quien lo adquirió. Evita que una ejecución cuyo lock expiró
# por TTL borre en su `finally` el lock que ya adquirió otra petición.
_UNLOCK_LUA = (
"if redis.call('get', KEYS[1]) == ARGV[1] then "
"return redis.call('del', KEYS[1]) else return 0 end"
)
@asynccontextmanager @asynccontextmanager
async def session_lock( async def session_lock(
self, session_id: str, timeout: int = 300 self, session_id: str, timeout: int | None = None
) -> AsyncIterator[bool]: ) -> AsyncIterator[bool]:
"""Acquire an exclusive execution lock for a session. """Acquire an exclusive execution lock for a session.
Uses SETNX with auto-expiry to prevent deadlocks if the process Uses SETNX with auto-expiry to prevent deadlocks if the process
crashes mid-execution. crashes mid-execution. El TTL es mayor que el timeout global de
ejecución para que el lock no expire (y otra petición lo robe)
mientras la ejecución original sigue viva. Cada adquisición guarda
un token único como valor y la liberación es compare-and-delete
(Lua), de modo que solo el dueño puede borrar el lock.
Usage: Usage:
async with storage.session_lock(session_id) as acquired: async with storage.session_lock(session_id) as acquired:
@@ -142,20 +155,34 @@ class RedisStorage:
raise HTTPException(409, "Session busy") raise HTTPException(409, "Session busy")
# ... execute ... # ... execute ...
""" """
if timeout is None:
timeout = int(settings.max_execution_timeout_seconds) + 60
key = self._key("session", session_id, "lock") key = self._key("session", session_id, "lock")
acquired = await self.client.set(key, "1", nx=True, ex=timeout) token = uuid.uuid4().hex
acquired = await self.client.set(key, token, nx=True, ex=timeout)
try: try:
yield bool(acquired) yield bool(acquired)
finally: finally:
if acquired: if acquired:
await self.client.delete(key) released = await self.client.eval(self._UNLOCK_LUA, 1, key, token)
if not released:
# El lock expiró por TTL y/o lo posee otra petición — no
# tocamos nada, pero lo dejamos registrado.
logger.warning(
"session_lock for %s no longer owned at release "
"(expired or taken over)",
session_id,
)
async def clear_session_lock(self, session_id: str) -> None: async def clear_session_lock(self, session_id: str) -> None:
"""Borra el lock de ejecución de una sesión de forma incondicional. """Borra el lock de ejecución de una sesión de forma incondicional.
Usado por el endpoint de abort para liberar un lock huérfano (de una OJO: borra sin conocer el token del dueño, así que se salta el
ejecución previa que crasheó antes de soltarlo) y no bloquear el compare-and-delete de `session_lock`. SOLO debe invocarse cuando se
siguiente mensaje hasta que expire el TTL. ha confirmado que la ejecución dueña del lock fue cancelada (ver
`abort_session` en routes.py): la tarea cancelada puede no ejecutar
su `finally` de liberación de forma fiable, y en ese caso no hay
riesgo de borrar el lock de una ejecución viva.
""" """
key = self._key("session", session_id, "lock") key = self._key("session", session_id, "lock")
await self.client.delete(key) await self.client.delete(key)