Compare commits

..

8 Commits

Author SHA1 Message Date
Jordan Diaz
9277862e56 read_doc: resolver docs por ACAI_PROJECT_DIR + knowledge load idempotente
- mcp-server _docsReader.js: resolveDocsDir → ACAI_DOCS_DIR /
  $ACAI_PROJECT_DIR/docs / /app/docs. Arregla DOC_NOT_FOUND en VSCode
  (HTTP MCP) y local; el .mcp.json ya inyecta ACAI_PROJECT_DIR
- routes.py: /knowledge/load idempotente — salta embeddings si el hash
  de contenido no cambió (clave Redis kbhash), para dispararlo libremente
  desde el botón de scaffold sin re-embeber

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-11 17:23:53 +00:00
Jordan Diaz
79ec267aa6 Compactor: garantizar emparejamiento tool_use/tool_result (sesiones largas bloqueadas)
Las sesiones largas con DeepSeek quedaban bloqueadas permanentemente con
400 "Messages with role 'tool' must be a response to a preceding message
with 'tool_calls'": el paso de ultimo recurso del compactor colapsaba
assistants con tool_use a un string placeholder dejando huerfanos los
tool_result del user siguiente.

- compactor: paso de ultimo recurso pair-aware + _enforce_tool_pairing
  como invariante final (matching por IDs, ambas direcciones, repara
  tambien historiales ya corruptos persistidos).
- openai_adapter: _repair_tool_sequence como guard defensivo del contrato
  del proveedor (tool huerfano -> user; tool_call sin respuesta -> fuera),
  con warning para detectar regresiones.
- recent_messages: trim por presupuesto de tokens al persistir
  (AGENTIC_RECENT_MESSAGES_MAX_TOKENS, default 60k) sin cortar pares;
  cierra el crecimiento sin limite que empujaba al paso destructivo.
- tests/test_tool_pairing_real.py: 23 tests que importan el codigo REAL
  (a diferencia de los tests standalone existentes). Suite completa: 92 ok.

Verificado offline contra los recent_messages reales de la sesion
bloqueada en prod: 0 violaciones con presupuesto normal y agresivo.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 19:08:53 +00:00
Jordan Diaz
43337e8554 Hardening: lock de sesion atomico, monitor off por defecto, fix DeepSeek reasoning-only
- session_lock: token uuid + compare-and-delete (Lua), TTL > timeout de
  ejecucion; abort solo limpia el lock tras cancelacion confirmada.
  Evita doble ejecucion concurrente sobre la misma sesion.
- monitor HTTP (puerto 4545) deshabilitado salvo MCP_MONITOR_ENABLED=true
  y atado a 127.0.0.1; no se acumula historial en memoria si esta off.
- DeepSeek/LiteLLM: turnos que llegan solo con reasoning_content (sin
  content ni tool_calls) ya no rompen la sesion (400 'Invalid assistant
  message') ni se pintan como 'pensando': se promueven a texto en el
  historial y en el snapshot persistido.
- litellm pinneado a ==1.80.0 (builds reproducibles).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 15:17:52 +00:00
Jordan Diaz
6a03fdf284 Harden DeepSeek agent: LiteLLM adapter, DSML/reasoning/embeddings/error fixes
- LiteLLMAdapter (subclasses OpenAIAdapter via _acreate hook): routes DeepSeek
  through LiteLLM. Opt-in AGENTIC_DEFAULT_MODEL_PROVIDER=litellm. A/B beat the
  hand-rolled adapter (0 DSML, 0 parse-fails). Defensive chunk.usage getattr,
  token-estimate usage fallback for billing, quiet litellm logs.
- DSML parser: tolerate single/multi fullwidth pipes, honor string="true/false"
  typed args (openai_adapter fallback when DeepSeek leaks tool calls as text).
- Thinking mode: capture and round-trip reasoning_content across turns.
- Embeddings: dedicated AGENTIC_EMBEDDINGS_API_KEY (DeepSeek has no embeddings);
  disable cleanly when unset to avoid per-turn 401.
- claude_format: friendly generic error messages to the chat, raw only in logs.
- acai agent max_tokens 4096->16384 (whole-file writes no longer truncate);
  system.md size-based edit policy; strict tools opt-in (off).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 14:49:48 +00:00
Jordan Diaz
e34a39e3bf fix(adapter): ejecutar tool calls que DeepSeek emite como texto DSML
Tercer modo de fallo del conector OpenAI (distinto de followup_mode y de
finish_reason=stop): DeepSeek a veces emite las tool calls en su formato interno
DSML (<||DSML||tool_calls>…, con U+FF5C) como TEXTO en el content, en vez de
como tool_calls nativos. El endpoint OpenAI no lo convierte, asi que el adapter
lo trataba como texto y el agente "se paraba" mostrando DSML inerte (0 tools).

Fix en OpenAIAdapter.stream: reutiliza el parser del claude_adapter
(_parse_xml_tool_calls / _TOOL_CALL_OPEN_RE). Acumula el content; si detecta el
inicio de un tool call en texto deja de emitirlo al usuario (DSML no debe verse);
al cerrar el turno, si no hubo tool_calls nativos, parsea el content y emite los
tool calls encontrados como tool_use para que el engine los ejecute.

Validado: el DSML real de la sesion (2x acai_grep) se parsea correctamente.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-05 20:15:49 +00:00
Jordan Diaz
d6b04e4122 fix(adapter): no perder tool_calls cuando DeepSeek cierra con finish_reason=stop
Sintoma (solo con el conector OpenAI): el agente anuncia la accion en texto
("Voy a crear los modulos…") y se PARA sin ejecutarla — 0 tools.

Causa: el stream del OpenAIAdapter solo emitia los tool_calls acumulados cuando
choice.finish_reason == "tool_calls". Pero DeepSeek (endpoint OpenAI) a veces
cierra el stream con finish_reason="stop" AUNQUE haya emitido tool_calls; en ese
caso caiamos en el branch else (end_turn) y los tool_calls acumulados se
descartaban. base.py solo ejecuta al recibir finish_reason="tool_use", asi que
nunca se ejecutaban. Con el adapter Claude (Anthropic) el finish_reason venia
distinto, por eso solo aparecia tras el cambio de conector.

Fix: disparar los tool_use SIEMPRE que haya tool_calls acumulados al cerrar el
stream, sea cual sea el finish_reason.

Validado: "crea un modulo…" ahora ejecuta acai_write + check_module y completa,
en vez de pararse tras anunciar.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-05 17:55:40 +00:00
Jordan Diaz
96b4542918 fix(mcp): el read loop ya no muere con respuestas grandes (screenshots)
Sintoma: "el agente se para cuando hace acciones". MCPClient._read_loop lee las
respuestas JSON-RPC con stdout.readline(), cuyo StreamReader tenia buffer de 1MB.
Una respuesta llega en UNA linea; playwright__browser_take_screenshot({fullPage:
true}) devuelve la imagen en base64 en esa linea y supera el limite →
asyncio.LimitOverrunError → el except Exception mataba el read loop y dejaba la
sesion MCP inservible (los turnos siguientes ejecutaban 0 tools).

Fix en dos capas:
- MCP_STREAM_LIMIT=64MB en create_subprocess_exec(limit=...) — cubre cualquier
  screenshot real.
- Read loop tolerante: captura (ValueError, LimitOverrunError), descarta solo esa
  respuesta re-sincronizando el stream hasta el \n (_drain_until_newline) y sigue
  vivo, en vez de matar toda la sesion MCP.

Validado: navegar + screenshot fullPage + glob ejecuta las 4 tools sin "read loop
error" y sin colapsar el contexto.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-05 17:38:19 +00:00
Jordan Diaz
454b51b45d fix(agentic): DeepSeek llama tools de forma fiable (conector OpenAI + followup_mode)
Dos bugs encadenados impedían que el agente ejecutara tools (emitía los tool
calls como texto sin ejecutarlos, y degradaba el contexto):

1. Conector: el OpenAIAdapter pasaba los mensajes en formato Anthropic (bloques
   tool_use/tool_result) que la API OpenAI de DeepSeek rechaza, y defaulteaba el
   modelo a "gpt-4o". Añade `_to_openai_messages()` (assistant.tool_use →
   tool_calls; user.tool_result → role:tool con tool_call_id) y `_blocks_text()`,
   y usa `settings.default_model_id`. Con esto DeepSeek devuelve tool_calls
   nativos vía https://api.deepseek.com (endpoint OpenAI), sin parsear texto y
   sin la degradación que sufría el endpoint Anthropic-compat.

2. followup_mode: `_classify_followup_mode` marcaba como "transform" cualquier
   PRIMER mensaje que contuviera un marker ("resumen", "estructura", "busca",
   "adapta"…), y `_get_allowed_tools` devuelve [] en modo transform → el agente
   se quedaba SIN tools. Un follow-up no tiene sentido sin turno anterior, así
   que ahora solo se clasifica si hay task_history/recent_messages.

claude_adapter: parser DSML/DeepSeek para tool calls como texto (fallback del
endpoint Anthropic-compat, ya no es la vía principal).

Validado: el prompt de análisis de estilos ("Guarda un resumen…") ahora explora
los módulos y escribe docs/project-styles.md vía save_project_styles.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-05 11:01:54 +00:00
23 changed files with 1801 additions and 109 deletions

View File

@@ -4,7 +4,11 @@ description: "Agente genérico de Acai CMS: crea módulos, edita contenido, gest
icon: "code"
category: "development"
temperature: 0.2
max_tokens: 4096
# 16K de salida: cubre escribir un fichero entero (acai_write) + el razonamiento
# (thinking) en un solo turno. Con 4096 el JSON del tool_use se truncaba a mitad
# en ficheros medianos y el agente caia en micro-ediciones lentas. v4-pro soporta
# hasta 384K de salida, asi que 16K es conservador.
max_tokens: 16384
context_sections:
- immutable_rules
- project_profile

View File

@@ -74,6 +74,26 @@ cms/data/schema/ # .ini.php — SOLO con tools de schema
14. **URL del proyecto**: `get_web_url` + `?pruebas=1` siempre.
15. **Operaciones destructivas**: confirma con el usuario antes de ejecutar.
# Eficiencia de edición (menos pasos Y menos tokens)
Elige la herramienta por el TAMAÑO del cambio. Ni micro-editar todo (muchos
pasos), ni reescribir el fichero entero por cada retoque (muchos tokens):
1. **Cambio pequeño o localizado** (un color, un valor, una regla, pocas zonas)
`acai-line-replace`. Barato: solo emites las líneas que cambian. NO
reescribas el fichero entero por un retoque.
2. **Creación o reescritura mayor** (cambias casi todo el fichero o lo creas de
cero) → UN solo `acai-write` del fichero completo. Reescribir entero por un
cambio pequeño desperdicia tokens; hazlo solo cuando de verdad cambia casi todo.
3. **Itera con `line-replace`, no con writes repetidos.** Tras ver el resultado
en el navegador, aplica los ajustes con `line-replace` puntuales. NO reescribas
el fichero completo en cada iteración de diseño.
4. **Cap de micro-ediciones.** Si te ves haciendo >4-5 `line-replace` sobre el
mismo fichero en un turno, para y reescríbelo entero de una vez (`acai-write`).
5. **NO hagas `acai-view` tras cada edición.** Ya tienes el contenido en contexto;
reléelo solo si una edición falló o dudas del estado real.
6. **Verificación visual al final, una sola pasada** — no tras cada retoque.
# Patrones canónicos (aplica por defecto)
- **Detalle de registro**: sección `custom-{tableName}` con `thisrecord.*`.

View File

@@ -15,6 +15,13 @@ export const CONFIG_FILE_PATH =
export const MCP_PORT = Number(process.env.MCP_PORT || 3000);
export const MONITOR_PORT = Number(process.env.MCP_MONITOR_PORT || 4545);
// El monitor HTTP (UI + POST /retry) queda DESACTIVADO por defecto. Solo se
// arranca si MCP_MONITOR_ENABLED === 'true' de forma explicita.
export const MONITOR_ENABLED =
String(process.env.MCP_MONITOR_ENABLED || "").toLowerCase() === "true";
// Por seguridad escucha solo en loopback salvo que se defina MCP_MONITOR_HOST.
export const MONITOR_HOST = process.env.MCP_MONITOR_HOST || "127.0.0.1";
// Compatibilidad: si alguien fuerza MCP_MONITOR_DISABLED tambien lo respetamos.
export const MONITOR_DISABLED =
String(process.env.MCP_MONITOR_DISABLED || "").toLowerCase() === "1" ||
String(process.env.MCP_MONITOR_DISABLED || "").toLowerCase() === "true";

View File

@@ -6,7 +6,7 @@
*/
// Load configuration first
import { loadLocalConfigProfile, applyProfileToEnv } from "./config/index.js";
import { loadLocalConfigProfile, applyProfileToEnv, MONITOR_ENABLED, MONITOR_DISABLED } from "./config/index.js";
// Load and apply config profile (backward compatibility)
const selectedProfile = loadLocalConfigProfile();
@@ -30,8 +30,11 @@ import { registerResources } from "./resources/index.js";
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
setRegistrationFunctions({ registerPrompts, registerTools, registerResources });
// Create the shared request monitor (will be applied to each session server)
const requestMonitor = createRequestMonitor();
// Create the shared request monitor (will be applied to each session server).
// Solo se crea si el monitor esta habilitado: asi no acumulamos historial en
// memoria ni envolvemos los handlers cuando la UI esta apagada (por defecto).
const monitorActive = MONITOR_ENABLED && !MONITOR_DISABLED;
const requestMonitor = monitorActive ? createRequestMonitor() : null;
// Create a server instance for retry functionality in the monitor UI
const server = createMcpServer();

View File

@@ -2,7 +2,7 @@ import http from "node:http";
import fsPromises from "node:fs/promises";
import path from "node:path";
import { fileURLToPath } from "node:url";
import { MONITOR_PORT, MONITOR_DISABLED } from "./config/index.js";
import { MONITOR_PORT, MONITOR_HOST, MONITOR_ENABLED, MONITOR_DISABLED } from "./config/index.js";
import { sessionCredentials } from "./auth/credentials.js";
import { activeSessions } from "./httpServer.js";
@@ -84,8 +84,8 @@ export function broadcastSessionsUpdate() {
* Start the monitor HTTP server
*/
export function startMonitorServer(requestMonitor, toolHandlers) {
if (MONITOR_DISABLED) {
console.error("MCP monitor UI deshabilitada (MCP_MONITOR_DISABLED=1).");
if (!MONITOR_ENABLED || MONITOR_DISABLED) {
console.error("[monitor] deshabilitado (MCP_MONITOR_ENABLED!=true)");
return null;
}
@@ -202,12 +202,12 @@ export function startMonitorServer(requestMonitor, toolHandlers) {
monitorServer.on("error", (error) => {
console.warn(
`[monitor] No se pudo iniciar la UI en el puerto ${MONITOR_PORT}: ${error.message}. Establece MCP_MONITOR_DISABLED=1 para ocultar este aviso.`
`[monitor] No se pudo iniciar la UI en ${MONITOR_HOST}:${MONITOR_PORT}: ${error.message}. Desactiva MCP_MONITOR_ENABLED para ocultar este aviso.`
);
});
monitorServer.listen(MONITOR_PORT, '0.0.0.0', () => {
console.error(`MCP monitor UI: http://0.0.0.0:${MONITOR_PORT}/monitor`);
monitorServer.listen(MONITOR_PORT, MONITOR_HOST, () => {
console.error(`MCP monitor UI: http://${MONITOR_HOST}:${MONITOR_PORT}/monitor`);
});
// Broadcast sessions + stats update every 2 seconds for real-time monitoring

View File

@@ -1,20 +1,41 @@
import fs from "node:fs/promises";
import { existsSync } from "node:fs";
import path from "node:path";
/**
* Lectura directa de los markdown del knowledge base desde el filesystem.
*
* El MCP server corre dentro del container `agentic` junto al FastAPI, asi
* que los .md viven en `/app/docs/` (la imagen los copia ahi).
*
* En caso de override por entorno, respeta `ACAI_DOCS_DIR`. En desarrollo
* fuera del container, fallback a paths relativos al cwd.
* Orden de resolucion del directorio de docs:
* 1. `ACAI_DOCS_DIR` — override explicito por entorno (si esta definido y no vacio).
* 2. `<ACAI_PROJECT_DIR>/docs` — caso principal: cada proyecto/web tiene su
* propio `docs/`. El `.mcp.json` inyecta `ACAI_PROJECT_DIR` (p.ej.
* `/opt/acai/webs/<user>/<site>`), funciona tanto en local (VSCode) como
* en cloud (agentic).
* 3. `/app/docs` — fallback final: container `agentic` donde esta horneada la
* copia canonica de los .md.
*/
function dirExists(p) {
try {
return existsSync(p);
} catch {
return false;
}
}
function resolveDocsDir() {
// 1. Override explicito
const override = process.env.ACAI_DOCS_DIR;
if (override) return override;
// Container path
if (override && override.trim() !== "") return override;
// 2. Docs del proyecto/web
const projectDir = process.env.ACAI_PROJECT_DIR;
if (projectDir && projectDir.trim() !== "") {
const projectDocs = path.join(projectDir, "docs");
if (dirExists(projectDocs)) return projectDocs;
}
// 3. Fallback al container agentic
return "/app/docs";
}

View File

@@ -5,6 +5,7 @@ pydantic-settings>=2.7.0,<3.0.0
redis[hiredis]>=5.2.0,<6.0.0
anthropic>=0.42.0,<1.0.0
openai>=1.60.0,<2.0.0
litellm==1.80.0
httpx>=0.28.0,<1.0.0
sse-starlette>=2.2.0,<3.0.0
tiktoken>=0.7.0,<1.0.0

View File

@@ -17,20 +17,24 @@ from .base import ModelAdapter, ModelConfig, ModelResponse, StreamChunk
logger = logging.getLogger(__name__)
# Algunos fine-tunes (sobre todo MiniMax) ocasionalmente emiten las tool calls
# como texto literal en lugar de usar los `tool_use` blocks nativos. Vistos
# tres formatos:
# Algunos fine-tunes (sobre todo MiniMax y DeepSeek) ocasionalmente emiten las
# tool calls como texto literal en lugar de usar los `tool_use` blocks nativos.
# Vistos cuatro formatos:
# 1) <minimax:tool_call><invoke name="X"><parameter name="P">V</parameter></invoke></minimax:tool_call>
# 2) <invoke name="X"><parameter name="P">V</parameter></invoke> (sin minimax wrapper)
# 3) <tool_call>{"name":"X","parameters":{...}}{"name":"Y","parameters":{...}}</tool_call>
# (multiples tool calls JSON-encoded dentro de un solo wrapper)
# 4) <DSMLtool_calls><DSMLinvoke name="X"><DSMLparameter name="P" string="true">V</DSMLparameter></DSMLinvoke></DSMLtool_calls>
# (formato DSML de DeepSeek — usa U+FF5C fullwidth vertical line como separador)
#
# Cuando eso pasa el orquestador ve "texto" y la tool nunca se ejecuta — el
# usuario ve el markup crudo en el chat. Detectamos y convertimos a tool_use
# sintetico mientras streameamos. Es un parche defensivo: el caso normal
# (tool_use blocks) sigue por el camino estandar.
_TOOL_CALL_OPEN_RE = re.compile(
r"<(?:minimax:tool_call|invoke\s+name|tool_call\s*>)|\[TOOL_CALL\]",
# `<` (U+FF5C) cubre cualquier special-token DeepSeek (DSML): <DSMLinvoke,
# <tool_calls, etc. Tolerante a 1+ pipes y a la presencia/ausencia de "DSML".
r"<(?:minimax:tool_call|invoke\s+name|tool_call[\s>]|use_mcp_tool|mm_special)|\[TOOL_CALL\]|<",
re.IGNORECASE,
)
_INVOKE_RE = re.compile(
@@ -65,6 +69,21 @@ _PERL_ARGS_BLOCK_RE = re.compile(
_PERL_KV_RE = re.compile(
r"--([a-zA-Z_][a-zA-Z0-9_]*)\s+(\"[^\"]*\"|\'[^\']*\'|-?\d+(?:\.\d+)?|true|false|null)",
)
# Formato 5 (DeepSeek DSML). Formato oficial V4-Pro: el marcador es `DSML`
# con UN pipe fullwidth (U+FF5C) a cada lado — <DSMLinvoke name="X"> ...
# <DSMLparameter name="P" string="true|false">V</DSMLparameter> ...
# </DSMLinvoke>. Hacemos el regex TOLERANTE: 1+ pipes y "DSML" opcional,
# para cubrir variantes entre versiones del modelo. El atributo `string`
# decide el tipo del valor: "true" = string crudo, "false" = valor JSON.
_DSML_INVOKE_RE = re.compile(
r"<+(?:DSML+)?invoke\s+name=\"([^\"]+)\"[^>]*>(.*?)</+(?:DSML+)?invoke\s*>",
re.IGNORECASE | re.DOTALL,
)
_DSML_PARAM_RE = re.compile(
r"<+(?:DSML+)?parameter\s+name=\"([^\"]+)\"([^>]*)>(.*?)</+(?:DSML+)?parameter\s*>",
re.IGNORECASE | re.DOTALL,
)
_DSML_STRING_ATTR_RE = re.compile(r"string\s*=\s*\"(true|false)\"", re.IGNORECASE)
def _safe_emit_split(buf: str) -> str:
@@ -91,8 +110,8 @@ def _safe_emit_split(buf: str) -> str:
# Si el tail ya tiene `>` cerrado, es un tag normal — emitir todo.
if ">" in tail:
return buf
# Si el tail puede ser inicio de tool_call/invoke/tool_call_json, retenerlo.
candidates = ("<minimax:tool_call", "<invoke", "<tool_call")
# Si el tail puede ser inicio de tool_call/invoke/tool_call_json/dsml, retenerlo.
candidates = ("<minimax:tool_call", "<invoke", "<tool_call", "<")
for cand in candidates:
if cand.startswith(tail.lower()) or tail.lower().startswith(cand[:len(tail)].lower()):
return buf[:idx]
@@ -212,6 +231,35 @@ def _parse_xml_tool_calls(text: str) -> list[dict[str, Any]]:
"arguments": args,
})
# Formato 5 (DeepSeek DSML):
# <DSMLinvoke name="X"><DSMLparameter name="P" string="true">V</DSMLparameter></DSMLinvoke>
for m in _DSML_INVOKE_RE.finditer(text):
name = m.group(1).strip()
body = m.group(2)
args_dsml: dict[str, Any] = {}
for p in _DSML_PARAM_RE.finditer(body):
pname = p.group(1).strip()
attrs = p.group(2) or ""
raw_val = p.group(3)
sm = _DSML_STRING_ATTR_RE.search(attrs)
if sm and sm.group(1).lower() == "true":
# string="true": valor es string crudo — NO strip (preserva
# whitespace significativo, p.ej. contenido de ficheros).
args_dsml[pname] = raw_val
else:
# string="false" (o ausente): valor JSON (num/bool/array/obj/string).
# Si no parsea, cae a string sin tocar.
try:
args_dsml[pname] = json.loads(raw_val.strip())
except (json.JSONDecodeError, ValueError):
args_dsml[pname] = raw_val.strip()
if name:
calls.append({
"id": "xml_{}".format(uuid.uuid4().hex[:12]),
"name": name,
"arguments": args_dsml,
})
return calls

View File

@@ -0,0 +1,67 @@
"""LiteLLM model adapter — spike para A/B contra el adapter OpenAI/DeepSeek nativo.
Reutiliza TODO el flujo de OpenAIAdapter (procesado de chunks, conversión de
mensajes, tools, fallback DSML) y solo cambia la llamada al modelo: en vez del
SDK de OpenAI, enruta por LiteLLM, que trae handling específico por proveedor
(DeepSeek incluido) y podría resolver de fábrica el DSML / reasoning_content que
hoy parcheamos a mano.
Activar con `AGENTIC_DEFAULT_MODEL_PROVIDER=litellm`. Modelo via
`AGENTIC_LITELLM_MODEL` (p.ej. "deepseek/deepseek-v4-pro"); si vacío, deriva de
`AGENTIC_DEFAULT_MODEL_ID`. Reusa `openai_api_key` / `openai_base_url` como
credenciales.
"""
from __future__ import annotations
import logging
from typing import Any
import litellm
from ..config import settings
from .openai_adapter import OpenAIAdapter
logger = logging.getLogger(__name__)
# Que LiteLLM descarte params no soportados por el proveedor en vez de petar.
litellm.drop_params = True
# Silenciar el spam INFO de litellm ("LiteLLM completion() model=...").
litellm.suppress_debug_info = True
logging.getLogger("LiteLLM").setLevel(logging.WARNING)
class LiteLLMAdapter(OpenAIAdapter):
"""Enruta las llamadas por LiteLLM, reutilizando el pipeline de OpenAIAdapter."""
def __init__(
self,
model: str | None = None,
api_key: str | None = None,
base_url: str | None = None,
) -> None:
# NO llamamos a super().__init__: no necesitamos el cliente AsyncOpenAI.
self._litellm_model = model or settings.litellm_model or self._derive_model()
self._api_key = api_key or settings.openai_api_key or None
self._api_base = base_url or settings.openai_base_url or None
# LiteLLM no entrega usage fiable en streaming → estimar para billing.
self._estimate_usage_fallback = True
logger.info(
"LiteLLMAdapter: model=%s api_base=%s",
self._litellm_model, self._api_base or "(default)",
)
@staticmethod
def _derive_model() -> str:
mid = settings.default_model_id or "deepseek-chat"
# Si ya trae prefijo de proveedor ("deepseek/...", "openai/..."), respetar.
return mid if "/" in mid else f"deepseek/{mid}"
async def _acreate(self, kwargs: dict[str, Any]):
kwargs = dict(kwargs)
kwargs["model"] = self._litellm_model
if self._api_key:
kwargs["api_key"] = self._api_key
if self._api_base:
kwargs["api_base"] = self._api_base
return await litellm.acompletion(**kwargs)

View File

@@ -14,6 +14,24 @@ from .base import ModelAdapter, ModelConfig, ModelResponse, StreamChunk
logger = logging.getLogger(__name__)
def _estimate_usage(messages: list[dict[str, Any]], output_text: str) -> dict[str, int]:
"""Estimacion de tokens cuando el proveedor no entrega usage (p.ej. LiteLLM
streaming). Aproximada pero evita billing 0."""
from ..context.compactor import estimate_tokens
inp = 0
for m in messages:
c = m.get("content")
if isinstance(c, str):
inp += estimate_tokens(c)
elif isinstance(c, list):
for b in c:
if isinstance(b, dict):
inp += estimate_tokens(
b.get("text") or b.get("thinking") or str(b.get("content") or "")
)
return {"input_tokens": inp, "output_tokens": estimate_tokens(output_text or "")}
class OpenAIAdapter(ModelAdapter):
"""Adapter for the OpenAI API (GPT-4o, o1, etc.)."""
@@ -25,6 +43,15 @@ class OpenAIAdapter(ModelAdapter):
if url:
kwargs["base_url"] = url
self._client = AsyncOpenAI(**kwargs)
# El path nativo conserva el usage real del proveedor; subclases que no
# reciben usage fiable en streaming (LiteLLM) lo ponen a True para estimar.
self._estimate_usage_fallback = False
async def _acreate(self, kwargs: dict[str, Any]):
"""Hook de la llamada al modelo. Subclases (p.ej. LiteLLMAdapter) lo
sobreescriben para enrutar por otra librería sin tocar el resto del
flujo (procesado de chunks, tools, mensajes)."""
return await self._client.chat.completions.create(**kwargs)
# ------------------------------------------------------------------
# Streaming
@@ -43,43 +70,90 @@ class OpenAIAdapter(ModelAdapter):
)
kwargs: dict[str, Any] = {
"model": config.model_id or "gpt-4o",
"model": config.model_id or settings.default_model_id or "gpt-4o",
"max_tokens": config.max_tokens,
"temperature": config.temperature,
"messages": messages,
"messages": self._to_openai_messages(messages),
"stream": True,
"stream_options": {"include_usage": True},
}
if tools:
kwargs["tools"] = self._format_tools(tools)
stream = await self._client.chat.completions.create(**kwargs)
stream = await self._acreate(kwargs)
# Fallback de tool-calls-en-texto: DeepSeek a veces emite las tool calls
# en su formato interno DSML como TEXTO (en el content) en vez de como
# tool_calls nativos. El endpoint OpenAI no lo convierte, asi que sin
# esto el agente "se para" mostrando DSML inerte. Reutilizamos el parser
# del claude_adapter.
from .claude_adapter import _parse_xml_tool_calls, _TOOL_CALL_OPEN_RE
tool_calls_acc: dict[int, dict[str, str]] = {}
final_usage: dict[str, int] = {}
usage_emitted = False # evita doble conteo si llega usage tras estimar
full_content = "" # content acumulado (para el fallback DSML)
full_reasoning = "" # razonamiento acumulado (para estimar usage)
emitted_chars = 0 # cuanto de full_content ya se emitio como delta
suppress_text = False # tras detectar un tool-call-en-texto, no emitir mas
# DeepSeek thinking mode: el razonamiento llega en `delta.reasoning_content`
# (antes del content). Lo acumulamos como un bloque `thinking` (block_index 0)
# para que el orquestador lo persista y `_to_openai_messages` lo reenvie como
# `reasoning_content` en el siguiente turno — DeepSeek lo exige en multi-turno
# con tool calls ("reasoning_content ... must be passed back to the API").
reasoning_seen = False
reasoning_sig_emitted = False
async for chunk in stream:
# With include_usage, the last chunk has usage but no choices
if chunk.usage:
# With include_usage, the last chunk has usage but no choices.
# getattr: el chunk de LiteLLM (ModelResponseStream) no siempre trae
# el atributo `usage`; el del SDK OpenAI sí (None salvo el ultimo).
chunk_usage = getattr(chunk, "usage", None)
if chunk_usage:
final_usage = {
"input_tokens": chunk.usage.prompt_tokens or 0,
"output_tokens": chunk.usage.completion_tokens or 0,
"input_tokens": getattr(chunk_usage, "prompt_tokens", 0) or 0,
"output_tokens": getattr(chunk_usage, "completion_tokens", 0) or 0,
}
choice = chunk.choices[0] if chunk.choices else None
if not choice:
# Usage-only chunk (last one with include_usage) — emit it
if final_usage:
if final_usage and not usage_emitted:
yield StreamChunk(usage=final_usage)
final_usage = {} # Only emit once
usage_emitted = True
continue
delta = choice.delta
# Reasoning content (DeepSeek thinking mode). Llega como campo extra
# del delta; lo emitimos como thinking_delta en el bloque index 0.
reasoning_txt = getattr(delta, "reasoning_content", None) if delta else None
if reasoning_txt:
reasoning_seen = True
full_reasoning += reasoning_txt
yield StreamChunk(
thinking_delta=reasoning_txt,
block_type="thinking",
block_index=0,
)
# Text content
if delta and delta.content:
yield StreamChunk(delta=delta.content)
full_content += delta.content
if not suppress_text:
# Si arranca un tool call en texto (DSML/XML), emitimos lo
# previo y dejamos de emitir el resto (el DSML no debe verse).
m = _TOOL_CALL_OPEN_RE.search(full_content, emitted_chars)
if m:
suppress_text = True
if m.start() > emitted_chars:
yield StreamChunk(delta=full_content[emitted_chars:m.start()])
emitted_chars = len(full_content)
else:
yield StreamChunk(delta=full_content[emitted_chars:])
emitted_chars = len(full_content)
# Tool calls
if delta and delta.tool_calls:
@@ -109,7 +183,31 @@ class OpenAIAdapter(ModelAdapter):
# Finish
if choice.finish_reason:
if choice.finish_reason == "tool_calls":
# Cerrar el bloque de razonamiento (si lo hubo) con un signature
# sintetico: el orquestador descarta thinking blocks sin signature
# (proteccion para MiniMax/Anthropic). DeepSeek no usa signatures;
# este marcador solo evita el descarte y NUNCA se reenvia — en
# `_to_openai_messages` el bloque se mapea a `reasoning_content`.
if reasoning_seen and not reasoning_sig_emitted:
reasoning_sig_emitted = True
yield StreamChunk(
thinking_signature="deepseek-reasoning",
block_type="thinking",
block_index=0,
)
# Fallback de usage: algunos proveedores via LiteLLM no entregan el
# chunk de usage (o llega tras el break del orquestador) → billing 0.
# Estimamos por tokens para no infra-cobrar. Solo si el adapter lo
# pide (LiteLLM); el path nativo conserva el usage real del proveedor.
if self._estimate_usage_fallback and not final_usage and not usage_emitted:
final_usage = _estimate_usage(messages, full_content + "\n" + full_reasoning)
# IMPORTANTE: DeepSeek (endpoint OpenAI) a veces cierra el stream
# con finish_reason="stop" AUNQUE haya emitido tool_calls. Si nos
# fiamos solo de =="tool_calls" perdemos esos tool calls: el agente
# anuncia la accion en texto y "se para" sin ejecutarla. Por eso
# disparamos los tool_use SIEMPRE que haya tool calls acumulados,
# sea cual sea el finish_reason.
if tool_calls_acc:
for acc in tool_calls_acc.values():
yield StreamChunk(
tool_call_id=acc["id"],
@@ -118,15 +216,33 @@ class OpenAIAdapter(ModelAdapter):
finish_reason="tool_use",
)
# Emit usage after tool_use chunks
if final_usage:
if final_usage and not usage_emitted:
yield StreamChunk(usage=final_usage)
usage_emitted = True
else:
yield StreamChunk(
finish_reason="end_turn"
if choice.finish_reason == "stop"
else choice.finish_reason,
usage=final_usage,
)
# Fallback: DeepSeek pudo emitir las tool calls como TEXTO
# (DSML/XML) en vez de nativas. Parseamos el content y, si hay
# tool calls, las ejecutamos igual; si no, cerramos el turno.
text_calls = _parse_xml_tool_calls(full_content) if full_content else []
if text_calls:
for c in text_calls:
yield StreamChunk(
tool_call_id=c["id"],
tool_name=c["name"],
tool_arguments=json.dumps(c.get("arguments", {}), ensure_ascii=False),
finish_reason="tool_use",
)
if final_usage and not usage_emitted:
yield StreamChunk(usage=final_usage)
usage_emitted = True
else:
yield StreamChunk(
finish_reason="end_turn"
if choice.finish_reason in ("stop", "tool_calls")
else choice.finish_reason,
usage=final_usage if not usage_emitted else {},
)
usage_emitted = True
# ------------------------------------------------------------------
# Non-streaming
@@ -145,10 +261,10 @@ class OpenAIAdapter(ModelAdapter):
)
kwargs: dict[str, Any] = {
"model": config.model_id or "gpt-4o",
"model": config.model_id or settings.default_model_id or "gpt-4o",
"max_tokens": config.max_tokens,
"temperature": config.temperature,
"messages": messages,
"messages": self._to_openai_messages(messages),
}
if tools:
kwargs["tools"] = self._format_tools(tools)
@@ -161,7 +277,7 @@ class OpenAIAdapter(ModelAdapter):
"function": {"name": force_tool},
}
response = await self._client.chat.completions.create(**kwargs)
response = await self._acreate(kwargs)
choice = response.choices[0]
content = choice.message.content or ""
@@ -204,19 +320,230 @@ class OpenAIAdapter(ModelAdapter):
@staticmethod
def _format_tools(tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Convert internal tool definitions to OpenAI function calling format."""
"""Convert internal tool definitions to OpenAI function calling format.
Si `deepseek_strict_tools`, marca cada funcion con `strict: true` y limpia
del schema los keywords que DeepSeek strict NO soporta (minLength/maxLength/
minItems/maxItems), que de otro modo darian 400."""
strict = settings.deepseek_strict_tools
formatted: list[dict[str, Any]] = []
for tool in tools:
formatted.append(
{
"type": "function",
"function": {
"name": tool["name"],
"description": tool.get("description", ""),
"parameters": tool.get(
"input_schema", tool.get("parameters", {"type": "object"})
),
},
}
)
params = tool.get("input_schema", tool.get("parameters", {"type": "object"}))
fn: dict[str, Any] = {
"name": tool["name"],
"description": tool.get("description", ""),
"parameters": OpenAIAdapter._sanitize_strict_schema(params) if strict else params,
}
if strict:
fn["strict"] = True
formatted.append({"type": "function", "function": fn})
return formatted
# Keywords no soportados por DeepSeek strict mode (segun docs oficiales).
_STRICT_UNSUPPORTED_KEYS = ("minLength", "maxLength", "minItems", "maxItems")
@staticmethod
def _sanitize_strict_schema(schema: Any) -> Any:
"""Elimina recursivamente keywords no soportados por DeepSeek strict."""
if isinstance(schema, dict):
return {
k: OpenAIAdapter._sanitize_strict_schema(v)
for k, v in schema.items()
if k not in OpenAIAdapter._STRICT_UNSUPPORTED_KEYS
}
if isinstance(schema, list):
return [OpenAIAdapter._sanitize_strict_schema(x) for x in schema]
return schema
@staticmethod
def _blocks_text(content: Any) -> str:
"""Extrae texto plano de un content que puede ser str o lista de bloques."""
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for b in content:
if isinstance(b, dict):
parts.append(b.get("text") or b.get("content") or "")
else:
parts.append(str(b))
return "\n".join(p for p in parts if p)
return str(content)
def _to_openai_messages(self, messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Convierte los mensajes del formato interno (Anthropic-style, con bloques
`tool_use` / `tool_result`) al formato de la API OpenAI (`tool_calls` en el
assistant, mensajes `role: tool` con `tool_call_id`). El contexto se construye
en formato Anthropic, así que sin esto la API OpenAI de DeepSeek rechaza el
body ('unknown variant tool_use')."""
out: list[dict[str, Any]] = []
for msg in messages:
role = msg.get("role")
content = msg.get("content")
if role == "system":
out.append({"role": "system", "content": content if isinstance(content, str) else self._blocks_text(content)})
continue
if not isinstance(content, list):
out.append({"role": role, "content": content if isinstance(content, str) else str(content or "")})
continue
if role == "assistant":
text_parts: list[str] = []
tool_calls: list[dict[str, Any]] = []
reasoning_parts: list[str] = []
for b in content:
if not isinstance(b, dict):
continue
t = b.get("type")
if t == "text":
text_parts.append(b.get("text", ""))
elif t == "thinking":
# DeepSeek thinking mode: el razonamiento del turno debe
# reenviarse como `reasoning_content` (no como signature).
rc = b.get("thinking", "")
if rc:
reasoning_parts.append(rc)
elif t == "tool_use":
tool_calls.append({
"id": b.get("id", ""),
"type": "function",
"function": {
"name": b.get("name", ""),
"arguments": json.dumps(b.get("input", {}), ensure_ascii=False),
},
})
text_joined = "\n".join(p for p in text_parts if p)
m: dict[str, Any] = {"role": "assistant", "content": (text_joined or None)}
if reasoning_parts:
if not text_joined and not tool_calls:
# Quirk DeepSeek thinking: a veces emite TODA la respuesta
# en reasoning_content y cierra sin content ni tool_calls.
# Reenviar content=None sin tool_calls rompe la API
# ("content or tool_calls must be set"), asi que promovemos
# el reasoning a content (sin duplicarlo como reasoning_content).
m["content"] = "\n".join(reasoning_parts)
else:
m["reasoning_content"] = "\n".join(reasoning_parts)
if tool_calls:
m["tool_calls"] = tool_calls
out.append(m)
else: # user (puede traer tool_result blocks)
text_parts = []
for b in content:
if not isinstance(b, dict):
continue
t = b.get("type")
if t == "tool_result":
out.append({
"role": "tool",
"tool_call_id": b.get("tool_use_id", ""),
"content": self._blocks_text(b.get("content")),
})
elif t == "text":
text_parts.append(b.get("text", ""))
if text_parts:
out.append({"role": "user", "content": "\n".join(text_parts)})
# Guard defensivo: el compactor ya garantiza el invariante tool_use ↔
# tool_result (`_enforce_tool_pairing`), pero si algo se escapa el
# proveedor devuelve 400 y la sesion queda bloqueada. Cinturon y tirantes.
return self._repair_tool_sequence(out)
@staticmethod
def _repair_tool_sequence(out: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Garantiza el contrato OpenAI sobre la secuencia ya convertida:
- Todo `role: tool` debe responder a un tool_call_id del assistant
inmediatamente anterior (o de su bloque contiguo de tool messages).
Si no → se convierte a user con placeholder.
- Todo assistant con `tool_calls` debe tener respuesta para CADA id.
Los tool_calls sin respuesta se eliminan; si la lista queda vacia se
elimina la key (y se asegura `content` no-None — "content or
tool_calls must be set").
No deberia activarse nunca (el compactor repara antes); si se activa,
loguea warning para detectar regresiones del compactor.
"""
repaired: list[dict[str, Any]] = []
i = 0
n = len(out)
while i < n:
msg = out[i]
role = msg.get("role")
if role == "assistant" and msg.get("tool_calls"):
# Bloque contiguo de tool messages que responden a este assistant.
j = i + 1
block: list[dict[str, Any]] = []
while j < n and out[j].get("role") == "tool":
block.append(out[j])
j += 1
answered = {t.get("tool_call_id", "") for t in block}
kept_calls = [
tc for tc in msg["tool_calls"] if tc.get("id", "") in answered
]
dropped = [
tc for tc in msg["tool_calls"] if tc.get("id", "") not in answered
]
new_msg = dict(msg)
if dropped:
for tc in dropped:
logger.warning(
"repaired unanswered tool_call at index %d (tool_call_id=%s)",
i,
tc.get("id", ""),
)
if kept_calls:
new_msg["tool_calls"] = kept_calls
else:
new_msg.pop("tool_calls", None)
if new_msg.get("content") is None:
# Promover reasoning a content si existe (mismo
# criterio que el quirk DeepSeek de arriba); si no,
# placeholder para no enviar content=None sin tools.
rc = new_msg.pop("reasoning_content", None)
new_msg["content"] = rc or "[ASSISTANT COMPACTADO]"
repaired.append(new_msg)
valid_ids = {tc.get("id", "") for tc in kept_calls}
converted: list[dict[str, Any]] = []
for t in block:
if t.get("tool_call_id", "") in valid_ids:
repaired.append(t)
else:
logger.warning(
"repaired orphan tool message (tool_call_id=%s)",
t.get("tool_call_id", ""),
)
converted.append(
{
"role": "user",
"content": "[Resultado de herramienta (contexto compactado)]: "
+ str(t.get("content", ""))[:500],
}
)
# Los huerfanos convertidos van DESPUES del bloque de tools
# validos para no romper la contiguidad assistant → tools.
repaired.extend(converted)
i = j
continue
if role == "tool":
# Tool message sin assistant con tool_calls delante → huerfano.
logger.warning(
"repaired orphan tool message at index %d (tool_call_id=%s)",
i,
msg.get("tool_call_id", ""),
)
repaired.append(
{
"role": "user",
"content": "[Resultado de herramienta (contexto compactado)]: "
+ str(msg.get("content", ""))[:500],
}
)
i += 1
continue
repaired.append(msg)
i += 1
return repaired

View File

@@ -427,9 +427,9 @@ async def _execute_and_persist(orchestrator, storage, session, message) -> dict[
async def abort_session(session_id: str) -> dict[str, Any]:
"""Cancela la ejecución en curso de una sesión (botón Stop del chat).
Cancela la tarea detached (liberando el session_lock), cierra el stream SSE
de los suscriptores y limpia un posible lock huérfano. Idempotente: si no
hay nada en curso devuelve `no_active_execution` sin error.
Cancela la tarea detached (liberando el session_lock) y cierra el stream
SSE de los suscriptores. Idempotente: si no hay nada en curso devuelve
`no_active_execution` sin error.
"""
storage = _get_storage()
session = await storage.get_session(session_id)
@@ -452,12 +452,17 @@ async def abort_session(session_id: str) -> dict[str, Any]:
except Exception as e:
logger.warning("Failed to close SSE stream on abort for %s: %s", session_id, e)
# Defensa: liberar un lock huérfano (p.ej. de una ejecución previa que crasheó
# antes de soltarlo) para no bloquear el siguiente mensaje hasta el TTL.
try:
await storage.clear_session_lock(session_id)
except Exception as e:
logger.warning("Failed to clear session lock on abort for %s: %s", session_id, e)
# Limpiar el lock SOLO si cancelamos una ejecución de verdad: el `finally`
# de la tarea cancelada puede no llegar a liberar el lock de forma fiable.
# `clear_session_lock` borra incondicional (sin conocer el token del lock),
# así que invocarlo sin cancelación confirmada borraría el lock de una
# ejecución síncrona (stream=false) aún viva — que no se registra en
# _running_executions — y permitiría una segunda ejecución concurrente.
if cancelled:
try:
await storage.clear_session_lock(session_id)
except Exception as e:
logger.warning("Failed to clear session lock on abort for %s: %s", session_id, e)
return {
"session_id": session_id,
@@ -781,22 +786,64 @@ async def _load_knowledge_from_dir(docs_path: str = "docs") -> dict[str, Any]:
docs_data.append((doc_id, title, content, summary, tags, priority, load_when))
# Generate embeddings in batch
from ..memory.embeddings import EmbeddingService
embed_service = EmbeddingService()
embed_texts = [
f"{title}\n{summary}\n{content[:2000]}"
for _, title, content, summary, _, _, _ in docs_data
]
# Hash de contenido por doc — base del skip idempotente de embeddings.
import hashlib
try:
embeddings = await embed_service.embed_batch(embed_texts)
has_embeddings = True
logger.info("Generated %d embeddings for knowledge base", len(embeddings))
except Exception as e:
logger.warning("Failed to generate embeddings: %s — loading without semantic search", e)
embeddings = [None] * len(docs_data)
has_embeddings = False
def _embed_text(title, summary, content):
return f"{title}\n{summary}\n{content[:2000]}"
def _doc_hash(title, summary, content):
return hashlib.md5(_embed_text(title, summary, content).encode("utf-8")).hexdigest()
new_hashes = [_doc_hash(t, s, c) for _, t, c, s, _, _, _ in docs_data]
# Generate embeddings SOLO para docs nuevos o cuyo contenido cambió (skip
# idempotente): si el hash coincide con el guardado y ya existe el embedding
# en Redis, se reutiliza y NO se vuelve a llamar a la API. Esto permite que
# /knowledge/load se dispare libremente (botón de scaffold, etc.) sin re-embeber.
embeddings: list[Any] = [None] * len(docs_data)
already_embedded = [False] * len(docs_data)
has_embeddings = False
if settings.embeddings_enabled:
to_embed = [] # indices que hay que (re)embeber
for i, (doc_id, title, content, summary, _, _, _) in enumerate(docs_data):
try:
prev = await memory._r.get(memory._key("kbhash", "knowledge", doc_id))
if isinstance(prev, bytes):
prev = prev.decode("utf-8")
has_embed = await memory._r.exists(memory._key("embeddings", "knowledge", doc_id))
except Exception:
prev, has_embed = None, 0
if prev == new_hashes[i] and has_embed:
already_embedded[i] = True # sin cambios → reutiliza el embedding existente
else:
to_embed.append(i)
if to_embed:
from ..memory.embeddings import EmbeddingService
embed_service = EmbeddingService()
embed_texts = [
_embed_text(docs_data[i][1], docs_data[i][3], docs_data[i][2])
for i in to_embed
]
try:
fresh = await embed_service.embed_batch(embed_texts)
for j, i in enumerate(to_embed):
embeddings[i] = fresh[j]
has_embeddings = True
logger.info(
"Generated %d embeddings (%d sin cambios, omitidos)",
len(to_embed), len(docs_data) - len(to_embed),
)
except Exception as e:
logger.warning("Failed to generate embeddings: %s — loading without semantic search", e)
embeddings = [None] * len(docs_data)
has_embeddings = False
else:
has_embeddings = True
logger.info("Knowledge sin cambios — no se regeneraron embeddings (%d docs)", len(docs_data))
else:
logger.info("Embeddings disabled (no AGENTIC_EMBEDDINGS_API_KEY) — KB loaded without semantic search")
# Limpia entradas huérfanas: docs que ya no existen en el filesystem.
# Sin esto, los IDs antiguos (e.g. tras renombrar 'builder-fields' →
@@ -807,9 +854,10 @@ async def _load_knowledge_from_dir(docs_path: str = "docs") -> dict[str, Any]:
for existing in existing_docs:
if existing.memory_id not in current_ids:
await memory.delete_document(existing.memory_id, namespace="knowledge")
# Borra también el embedding asociado
# Borra también el embedding asociado y el hash de contenido
embed_key = memory._key("embeddings", "knowledge", existing.memory_id)
await memory._r.delete(embed_key)
await memory._r.delete(memory._key("kbhash", "knowledge", existing.memory_id))
removed.append(existing.memory_id)
if removed:
logger.info("Removed %d stale knowledge docs: %s", len(removed), removed)
@@ -832,6 +880,11 @@ async def _load_knowledge_from_dir(docs_path: str = "docs") -> dict[str, Any]:
if embeddings[i] is not None:
await memory.store_embedding(doc_id, embeddings[i], namespace="knowledge")
# Guarda el hash de contenido para el skip idempotente del próximo load
try:
await memory._r.set(memory._key("kbhash", "knowledge", doc_id), new_hashes[i])
except Exception:
pass
loaded.append({
"id": doc_id,
@@ -840,7 +893,7 @@ async def _load_knowledge_from_dir(docs_path: str = "docs") -> dict[str, Any]:
"tags": tags[:5],
"priority": priority,
"load_when": load_when,
"embedded": embeddings[i] is not None,
"embedded": embeddings[i] is not None or already_embedded[i],
})
logger.info("Loaded %d knowledge documents from %s (embeddings: %s)", len(loaded), docs_dir, has_embeddings)

View File

@@ -32,6 +32,33 @@ class Settings(BaseSettings):
anthropic_base_url: str = "" # Custom base URL (for MiniMax Anthropic-compatible, etc.)
openai_api_key: str = ""
openai_base_url: str = "" # Custom base URL (for MiniMax, DeepInfra, etc.)
# --- Embeddings (semantic search) ---
# Credenciales DEDICADAS para embeddings. Necesarias porque el chat usa
# `openai_api_key` apuntando a un endpoint compatible (p.ej. DeepSeek, que NO
# tiene API de embeddings). Si vacio, cae a `openai_api_key` por compat. El
# base_url vacio => OpenAI real (api.openai.com); NO hereda `openai_base_url`.
embeddings_api_key: str = ""
embeddings_base_url: str = ""
embeddings_model: str = "text-embedding-3-small"
# Spike LiteLLM: si default_model_provider=litellm, modelo a usar (formato
# litellm, p.ej. "deepseek/deepseek-v4-pro"). Vacío → deriva de default_model_id.
litellm_model: str = ""
@property
def effective_embeddings_key(self) -> str:
"""Key a usar para embeddings. Prioriza la dedicada; reutiliza la del
chat SOLO si el chat es OpenAI real (sin `openai_base_url` custom) — si
apunta a DeepSeek u otro proveedor, esa key no sirve para embeddings."""
if self.embeddings_api_key:
return self.embeddings_api_key
if not self.openai_base_url:
return self.openai_api_key
return ""
@property
def embeddings_enabled(self) -> bool:
return bool(self.effective_embeddings_key or self.embeddings_base_url)
default_model_provider: str = "claude"
default_model_id: str = "claude-sonnet-4-20250514"
# Modelo override SOLO para el sub-loop del planner (acai_plan). Si vacio,
@@ -43,6 +70,11 @@ class Settings(BaseSettings):
planner_max_tokens: int = 16000
max_tokens: int = 4096
temperature: float = 0.3
# DeepSeek strict function calling (beta). OPT-IN (default False): exige schemas
# tipo OpenAI (additionalProperties:false, todos required, etc.) que los tools MCP
# actuales NO cumplen → da 400. Para activarlo: schemas compatibles + base_url
# https://api.deepseek.com/beta + AGENTIC_DEEPSEEK_STRICT_TOOLS=true.
deepseek_strict_tools: bool = False
# --- Context engine ---
model_context_window: int = 0 # 0 = use legacy fixed budget / explicit override
@@ -70,6 +102,10 @@ class Settings(BaseSettings):
conversation_recent_raw_limit: int = 2
task_history_max_entries: int = 20
task_history_max_tokens: int = 1500
# Presupuesto de tokens para la ventana de recent_messages persistida en
# sesion. Sin esto crece sin limite y empuja al compactor a su paso
# destructivo (colapsar bloques perdiendo tool_use ids). 0 = sin limite.
recent_messages_max_tokens: int = 60_000
# --- MCP ---
mcp_config_path: str = "" # Path to mcp.json; empty = legacy single-server mode

View File

@@ -180,7 +180,13 @@ class ContextCompactor:
"raw_tool_results_kept": 0,
}
if total <= max_tokens:
return messages, meta
# Aunque no haga falta compactar, garantizamos el invariante
# tool_use/tool_result (repara historiales ya rotos persistidos).
repaired = self._enforce_tool_pairing([dict(m) for m in messages])
meta["output_tokens"] = sum(
self._estimate_message_tokens(m) for m in repaired
)
return repaired, meta
compacted = [dict(m) for m in messages]
last_user_idx = max(
@@ -343,20 +349,241 @@ class ContextCompactor:
message["content"] = "[USER CONTEXT COMPACTADO]"
elif isinstance(content, list) and content:
# Anthropic-style: reemplazar lista entera por placeholder string.
# Nota: pierde tool_use ids — solo aplicar al final como ultimo recurso.
# Nota: colapsar pierde los tool_use/tool_result ids, asi que
# lo hacemos PAIR-AWARE (colapsar un lado del par colapsa el
# otro en la misma iteracion) y ademas `_enforce_tool_pairing`
# al final garantiza el invariante aunque algo se escape.
if role == "assistant":
message["content"] = "[ASSISTANT COMPACTADO]"
# Si este assistant tenia tool_use, colapsar tambien el
# user de tool_results que lo sigue (mismo par).
if self._blocks_have_type(content, "tool_use"):
nxt = idx + 1
if (
nxt < len(compacted)
and nxt != last_user_idx
and compacted[nxt].get("role") == "user"
and self._blocks_have_type(
compacted[nxt].get("content"), "tool_result"
)
):
compacted[nxt]["content"] = "[USER CONTEXT COMPACTADO]"
elif role == "user":
message["content"] = "[USER CONTEXT COMPACTADO]"
# Si este user llevaba tool_results, colapsar tambien el
# assistant anterior con sus tool_use (mismo par).
if self._blocks_have_type(content, "tool_result"):
prv = idx - 1
if (
prv >= 0
and compacted[prv].get("role") == "assistant"
and self._blocks_have_type(
compacted[prv].get("content"), "tool_use"
)
):
compacted[prv]["content"] = "[ASSISTANT COMPACTADO]"
else:
continue
total = sum(self._estimate_message_tokens(m) for m in compacted)
if total <= max_tokens:
break
# Invariante final: tras toda la compactacion, reparar cualquier par
# tool_use/tool_result roto. Sin esto, un tool_result huerfano se emite
# como `role: tool` sin `tool_calls` previo y el proveedor devuelve 400
# ("Messages with role 'tool' must be a response to a preceding message
# with 'tool_calls'").
compacted = self._enforce_tool_pairing(compacted)
total = sum(self._estimate_message_tokens(m) for m in compacted)
meta["output_tokens"] = total
return compacted, meta
# ------------------------------------------------------------------
# Invariante tool_use ↔ tool_result
# ------------------------------------------------------------------
@staticmethod
def _blocks_have_type(content: Any, block_type: str) -> bool:
"""True si `content` es una lista de bloques con alguno del tipo dado."""
if not isinstance(content, list):
return False
return any(
isinstance(b, dict) and b.get("type") == block_type for b in content
)
@staticmethod
def _tool_use_ids(message: dict[str, Any]) -> set[str]:
"""IDs de tool calls emitidos por un assistant (bloques `tool_use`
estilo Anthropic y/o `tool_calls` estilo OpenAI legacy)."""
ids: set[str] = set()
content = message.get("content")
if isinstance(content, list):
for b in content:
if isinstance(b, dict) and b.get("type") == "tool_use":
ids.add(str(b.get("id", "")))
for tc in message.get("tool_calls") or []:
if isinstance(tc, dict):
ids.add(str(tc.get("id", "")))
ids.discard("")
return ids
def _enforce_tool_pairing(
self, messages: list[dict[str, Any]]
) -> list[dict[str, Any]]:
"""Repara el invariante tool_use ↔ tool_result en ambas direcciones.
La compactacion puede colapsar el content de un assistant (perdiendo sus
bloques `tool_use`) mientras el user siguiente conserva sus `tool_result`,
o al reves. El matching es por IDs (`tool_use.id` vs `tool_result.tool_use_id`
y `tool_calls[].id` vs `tool_call_id`), no solo por adyacencia, asi que
tambien repara desajustes parciales (p.ej. 3 tool_use vs 2 tool_result).
- tool_result sin tool_use previo → bloque text placeholder.
- tool_use sin tool_result siguiente → se elimina el bloque (thinking/text
se conservan; si el content queda vacio, placeholder string).
- `role: tool` legacy sin assistant con `tool_calls` → user placeholder.
"""
repaired: list[dict[str, Any]] = []
for idx, msg in enumerate(messages):
role = msg.get("role", "")
content = msg.get("content")
if role == "assistant":
tool_ids = self._tool_use_ids(msg)
if not tool_ids:
repaired.append(msg)
continue
# IDs respondidos: user con tool_results inmediato y/o run
# contiguo de mensajes legacy `role: tool`.
answered: set[str] = set()
j = idx + 1
if (
j < len(messages)
and messages[j].get("role") == "user"
and isinstance(messages[j].get("content"), list)
):
for b in messages[j]["content"]:
if isinstance(b, dict) and b.get("type") == "tool_result":
answered.add(str(b.get("tool_use_id", "")))
j += 1
while j < len(messages) and messages[j].get("role") == "tool":
answered.add(str(messages[j].get("tool_call_id", "")))
j += 1
unanswered = tool_ids - answered
if not unanswered:
repaired.append(msg)
continue
# Eliminar los tool_use/tool_calls sin respuesta.
new_msg = dict(msg)
if isinstance(content, list):
new_content = [
b
for b in content
if not (
isinstance(b, dict)
and b.get("type") == "tool_use"
and str(b.get("id", "")) in unanswered
)
]
if not new_content:
new_msg["content"] = "[ASSISTANT COMPACTADO]"
else:
new_msg["content"] = new_content
if isinstance(new_msg.get("tool_calls"), list):
kept_calls = [
tc
for tc in new_msg["tool_calls"]
if isinstance(tc, dict)
and str(tc.get("id", "")) not in unanswered
]
if kept_calls:
new_msg["tool_calls"] = kept_calls
else:
new_msg.pop("tool_calls", None)
if not new_msg.get("content"):
new_msg["content"] = "[ASSISTANT COMPACTADO]"
repaired.append(new_msg)
continue
if role == "user" and self._blocks_have_type(content, "tool_result"):
# IDs disponibles en el assistant inmediatamente anterior
# (YA reparado — usar `repaired[-1]` refleja los tool_use que
# sobrevivieron, no los del mensaje original).
available: set[str] = set()
if repaired and repaired[-1].get("role") == "assistant":
available = self._tool_use_ids(repaired[-1])
new_content: list[Any] = []
orphaned = False
for b in content:
if (
isinstance(b, dict)
and b.get("type") == "tool_result"
and str(b.get("tool_use_id", "")) not in available
):
orphaned = True
# Fusionar placeholders consecutivos en un unico bloque text.
if not (
new_content
and isinstance(new_content[-1], dict)
and new_content[-1].get("type") == "text"
and new_content[-1].get("text")
== "[Resultado de herramienta compactado]"
):
new_content.append(
{
"type": "text",
"text": "[Resultado de herramienta compactado]",
}
)
continue
new_content.append(b)
if not orphaned:
repaired.append(msg)
continue
new_msg = dict(msg)
only_placeholders = all(
isinstance(b, dict)
and b.get("type") == "text"
and b.get("text") == "[Resultado de herramienta compactado]"
for b in new_content
)
if not new_content or only_placeholders:
new_msg["content"] = "[Resultado de herramienta compactado]"
else:
new_msg["content"] = new_content
repaired.append(new_msg)
continue
if role == "tool":
# Legacy: el assistant anterior (saltando otros `role: tool`
# contiguos) debe tener este tool_call_id en sus tool_calls.
prev_assistant: dict[str, Any] | None = None
for prev in reversed(repaired):
if prev.get("role") == "tool":
continue
if prev.get("role") == "assistant":
prev_assistant = prev
break
call_id = str(msg.get("tool_call_id", ""))
valid = (
prev_assistant is not None
and call_id in self._tool_use_ids(prev_assistant)
)
if valid:
repaired.append(msg)
else:
repaired.append(
{
"role": "user",
"content": "[Resultado de herramienta compactado]",
}
)
continue
repaired.append(msg)
return repaired
# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------

View File

@@ -583,6 +583,16 @@ class ContextEngine:
async def _semantic_rank(self, query: str) -> list[tuple[str, float]]:
"""Rank knowledge docs by cosine similarity. Returns (doc_id, score)."""
# Sin credencial de embeddings no tiene sentido intentar la llamada (daria
# 401 en cada turno). Se desactiva limpiamente con un aviso unico.
if not settings.embeddings_enabled:
if not getattr(self, "_embed_disabled_warned", False):
logger.warning(
"Embeddings disabled (no AGENTIC_EMBEDDINGS_API_KEY) — "
"semantic search off, loading all docs"
)
self._embed_disabled_warned = True
return []
try:
if not self._embed_service:
self._embed_service = EmbeddingService()
@@ -936,7 +946,22 @@ class ContextEngine:
else:
base_user_content = "Awaiting task assignment."
followup_mode = self._classify_followup_mode(base_user_content)
# Un follow-up (transform/fetch_more/ambiguous) SOLO tiene sentido si hay
# un turno anterior al que referirse. En una sesión fresca / primer mensaje
# no hay nada que transformar, así que NO clasificamos: de lo contrario un
# primer prompt que casualmente contenga un marker ("resumen", "estructura",
# "busca", "adapta"…) se marcaría como `transform` y `_get_allowed_tools`
# devolvería [] — el agente se quedaría SIN tools y emitiría los tool calls
# como texto sin ejecutarlos (caso real: el prompt de análisis de estilos
# que dice "Guarda un resumen…").
has_prior_turn = bool(session.task_history) or bool(
getattr(session, "recent_messages", [])
)
followup_mode = (
self._classify_followup_mode(base_user_content)
if has_prior_turn
else "none"
)
resolved_context = ""
if session.task_history and followup_mode != "none":
resolved_context = self._build_followup_resolution(session.task_history[-1])

View File

@@ -54,7 +54,11 @@ async def lifespan(app: FastAPI):
await redis_storage.connect()
# 2. Initialize model adapter
if settings.default_model_provider == "openai":
if settings.default_model_provider == "litellm":
from .adapters.litellm_adapter import LiteLLMAdapter
model_adapter = LiteLLMAdapter()
logger.info("Using LiteLLM adapter (model: %s)", settings.litellm_model or settings.default_model_id)
elif settings.default_model_provider == "openai":
model_adapter = OpenAIAdapter()
logger.info("Using OpenAI adapter (model: %s)", settings.default_model_id)
else:

View File

@@ -18,6 +18,15 @@ from ..models.tools import ToolDefinition
logger = logging.getLogger(__name__)
# Buffer maximo (bytes) del StreamReader para leer las respuestas JSON-RPC del
# MCP por stdout. Una respuesta llega en UNA sola linea; tools como el
# screenshot fullPage de Playwright devuelven la imagen en base64 en esa linea
# y superan de largo el 64KB por defecto de asyncio (y el 1MB que teniamos),
# lanzando LimitOverrunError que mataba el read loop y dejaba la sesion MCP
# inservible (el agente "se paraba" al hacer acciones). 64MB cubre cualquier
# screenshot real; por encima, el read loop descarta esa respuesta y sigue vivo.
MCP_STREAM_LIMIT = 64 * 1024 * 1024
class MCPClientError(Exception):
pass
@@ -74,7 +83,7 @@ class MCPClient:
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=self._env,
limit=1024 * 1024, # 1MB buffer for large MCP responses
limit=MCP_STREAM_LIMIT, # buffer grande para respuestas MCP (screenshots base64)
)
self._running = True
self._reader_task = asyncio.create_task(self._read_loop())
@@ -225,14 +234,30 @@ class MCPClient:
if not self._process or not self._process.stdout:
return
stdout = self._process.stdout
try:
while self._running:
line = await self._process.stdout.readline()
try:
line = await stdout.readline()
except (ValueError, asyncio.LimitOverrunError):
# Una respuesta JSON-RPC supero el buffer (p.ej. screenshot
# fullPage de Playwright en base64 por encima de 64MB). Antes
# esto mataba el read loop y dejaba TODA la sesion MCP muerta
# (el agente se "paraba" en la siguiente accion). Ahora
# descartamos solo esa respuesta, re-sincronizamos el stream
# y seguimos vivos para las demas tools.
logger.warning(
"MCP [%s]: respuesta supera el buffer (%d MB), se descarta y se continua",
self.name, MCP_STREAM_LIMIT // (1024 * 1024),
)
await self._drain_until_newline(stdout)
continue
if not line:
logger.warning("MCP server stdout closed")
break
line_str = line.decode().strip()
line_str = line.decode(errors="replace").strip()
if not line_str:
continue
@@ -251,6 +276,21 @@ class MCPClient:
finally:
self._running = False
async def _drain_until_newline(self, stdout: asyncio.StreamReader) -> None:
"""Consume bytes del stream hasta el proximo salto de linea para
re-sincronizar tras un LimitOverrunError (la respuesta sobredimensionada
se descarta). `read()` no usa separador, asi que no vuelve a disparar el
overrun y va vaciando el buffer hasta liberar la linea gigante."""
while self._running:
try:
chunk = await stdout.read(65536)
except Exception:
return
if not chunk:
return
if b"\n" in chunk:
return
def _handle_message(self, message: dict[str, Any]) -> None:
"""Route an incoming JSON-RPC message."""
msg_id = message.get("id")

View File

@@ -25,12 +25,19 @@ class EmbeddingService:
def __init__(
self,
api_key: str | None = None,
model: str = DEFAULT_MODEL,
model: str | None = None,
) -> None:
self._client = AsyncOpenAI(
api_key=api_key or settings.openai_api_key,
)
self._model = model
# Credenciales dedicadas de embeddings. Fallback a openai_api_key por
# compat. El base_url solo se aplica si se configura explicitamente
# `embeddings_base_url`; vacio => OpenAI real (api.openai.com). NO se
# hereda `openai_base_url` (que apunta al chat, p.ej. DeepSeek sin
# endpoint de embeddings).
key = api_key or settings.effective_embeddings_key
kwargs: dict[str, Any] = {"api_key": key}
if settings.embeddings_base_url:
kwargs["base_url"] = settings.embeddings_base_url
self._client = AsyncOpenAI(**kwargs)
self._model = model or settings.embeddings_model or DEFAULT_MODEL
async def embed(self, text: str) -> list[float]:
"""Generate embedding for a single text."""

View File

@@ -289,6 +289,34 @@ class BaseAgent:
# If no tool calls, we're done
if not tool_calls:
# Quirk DeepSeek thinking: a veces el modelo emite TODA su
# respuesta como reasoning y cierra el turno sin text ni
# tool_use. Si el turno termina SOLO con bloques thinking,
# promovemos el thinking a un bloque text en el snapshot que
# se persiste — asi el UI no lo muestra como "pensando" al
# recargar y el siguiente turno no rompe con
# "content or tool_calls must be set".
if turn_blocks and all(b.get("type") == "thinking" for b in turn_blocks):
promoted = "\n".join(
b.get("thinking", "") for b in turn_blocks if b.get("thinking")
)
turn_blocks = [{"type": "text", "text": promoted}]
accumulated_content += promoted
if promoted and self.profile.stream_deltas:
# Emision en vivo via AGENT_DELTA normal: el
# ClaudeFormatEmitter cierra el thinking block abierto
# (content_block_stop) y abre un text block nuevo con
# su propio indice (start/delta/stop), asi que el
# protocolo de bloques no se rompe.
await self.sse.emit(
EventType.AGENT_DELTA,
{
"agent": self.profile.role,
"delta": promoted,
"step": step,
},
session_id=session.session_id,
)
if turn_blocks:
conversation.append({"role": "assistant", "content": turn_blocks})
elif full_text:

View File

@@ -14,7 +14,7 @@ from typing import Any
from ..adapters.base import ModelAdapter
from ..config import settings
from ..context.engine import ContextEngine
from ..context.compactor import estimate_tokens
from ..context.compactor import ContextCompactor, estimate_tokens
from ..mcp.manager import MCPManager
from ..memory.store import MemoryStore
from ..models.agent import AgentProfile
@@ -260,7 +260,76 @@ class OrchestratorEngine:
current_turn.append(sanitized)
merged.extend(current_turn)
return merged
return OrchestratorEngine._trim_recent_messages(merged)
@staticmethod
def _trim_recent_messages(
messages: list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""Recorta recent_messages a un presupuesto de tokens eliminando
mensajes ENTEROS desde el principio (los mas antiguos).
Dos reglas para no romper el invariante tool_use ↔ tool_result:
- Nunca cortar dentro de un par: si se elimina un assistant con
tool_use, se eliminan tambien sus tool_results (user carrier o run
de mensajes legacy `role: tool`).
- El primer mensaje resultante nunca puede ser un carrier de
tool_result ni un `role: tool`.
Mantiene siempre al menos los ultimos 4 mensajes aunque excedan el
presupuesto.
"""
budget = settings.recent_messages_max_tokens
if budget <= 0 or not messages:
return messages
estimate = ContextCompactor._estimate_message_tokens
total = sum(estimate(m) for m in messages)
if total <= budget:
return messages
def _is_tool_result_carrier(msg: dict[str, Any]) -> bool:
if msg.get("role") == "tool":
return True
if msg.get("role") != "user":
return False
content = msg.get("content")
return isinstance(content, list) and any(
isinstance(b, dict) and b.get("type") == "tool_result"
for b in content
)
def _has_tool_use(msg: dict[str, Any]) -> bool:
if msg.get("role") != "assistant":
return False
if msg.get("tool_calls"):
return True
content = msg.get("content")
return isinstance(content, list) and any(
isinstance(b, dict) and b.get("type") == "tool_use"
for b in content
)
min_keep = 4
n = len(messages)
start = 0
while total > budget and start < n - min_keep:
end = start + 1
if _has_tool_use(messages[start]):
# Arrastrar los tool_results del par (no cortar dentro de el).
while end < n and _is_tool_result_carrier(messages[end]):
end += 1
if n - end < min_keep:
break # Eliminar el par completo invadiria los ultimos min_keep
for k in range(start, end):
total -= estimate(messages[k])
start = end
trimmed = messages[start:]
# El primer mensaje nunca puede ser un tool_result sin su tool_use.
while trimmed and _is_tool_result_carrier(trimmed[0]):
trimmed.pop(0)
return trimmed
@staticmethod
def _sanitize_recent_message(message: dict[str, Any]) -> dict[str, Any]:

View File

@@ -12,6 +12,7 @@ from __future__ import annotations
import json
import logging
import uuid
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator
@@ -127,14 +128,26 @@ class RedisStorage:
# Execution lock (prevents concurrent messages on same session)
# ------------------------------------------------------------------
# Compare-and-delete atómico: solo borra el lock si el valor coincide con
# el token de quien lo adquirió. Evita que una ejecución cuyo lock expiró
# por TTL borre en su `finally` el lock que ya adquirió otra petición.
_UNLOCK_LUA = (
"if redis.call('get', KEYS[1]) == ARGV[1] then "
"return redis.call('del', KEYS[1]) else return 0 end"
)
@asynccontextmanager
async def session_lock(
self, session_id: str, timeout: int = 300
self, session_id: str, timeout: int | None = None
) -> AsyncIterator[bool]:
"""Acquire an exclusive execution lock for a session.
Uses SETNX with auto-expiry to prevent deadlocks if the process
crashes mid-execution.
crashes mid-execution. El TTL es mayor que el timeout global de
ejecución para que el lock no expire (y otra petición lo robe)
mientras la ejecución original sigue viva. Cada adquisición guarda
un token único como valor y la liberación es compare-and-delete
(Lua), de modo que solo el dueño puede borrar el lock.
Usage:
async with storage.session_lock(session_id) as acquired:
@@ -142,20 +155,34 @@ class RedisStorage:
raise HTTPException(409, "Session busy")
# ... execute ...
"""
if timeout is None:
timeout = int(settings.max_execution_timeout_seconds) + 60
key = self._key("session", session_id, "lock")
acquired = await self.client.set(key, "1", nx=True, ex=timeout)
token = uuid.uuid4().hex
acquired = await self.client.set(key, token, nx=True, ex=timeout)
try:
yield bool(acquired)
finally:
if acquired:
await self.client.delete(key)
released = await self.client.eval(self._UNLOCK_LUA, 1, key, token)
if not released:
# El lock expiró por TTL y/o lo posee otra petición — no
# tocamos nada, pero lo dejamos registrado.
logger.warning(
"session_lock for %s no longer owned at release "
"(expired or taken over)",
session_id,
)
async def clear_session_lock(self, session_id: str) -> None:
"""Borra el lock de ejecución de una sesión de forma incondicional.
Usado por el endpoint de abort para liberar un lock huérfano (de una
ejecución previa que crasheó antes de soltarlo) y no bloquear el
siguiente mensaje hasta que expire el TTL.
OJO: borra sin conocer el token del dueño, así que se salta el
compare-and-delete de `session_lock`. SOLO debe invocarse cuando se
ha confirmado que la ejecución dueña del lock fue cancelada (ver
`abort_session` en routes.py): la tarea cancelada puede no ejecutar
su `finally` de liberación de forma fiable, y en ese caso no hay
riesgo de borrar el lock de una ejecución viva.
"""
key = self._key("session", session_id, "lock")
await self.client.delete(key)

View File

@@ -19,6 +19,71 @@ from .sse import EventType, SSEEmitter
logger = logging.getLogger(__name__)
_GENERIC_ERROR = (
"Ha ocurrido un error procesando tu mensaje. Vuelve a intentarlo en unos momentos."
)
# Patrones que el frontend interpreta por sí mismo (login / sesión expirada).
# No los genericamos para no romper esas detecciones.
_PASSTHROUGH_PATTERNS = (
"not logged in",
"login required",
"authentication required",
"no conversation found",
)
def friendly_error_message(raw: str, code: str = "") -> str:
"""Traduce un error crudo (proveedor/excepción) a un mensaje genérico y
localizado para el usuario final, sin filtrar detalles internos.
Devuelve el texto original sin tocar para los casos de auth/sesión que el
frontend ya gestiona por contenido.
"""
raw = raw or ""
text = "{} {}".format(code or "", raw).lower()
# Auth / sesión: dejar pasar el texto original (lo maneja el frontend)
if any(p in text for p in _PASSTHROUGH_PATTERNS):
return raw
# Timeout de ejecución
if "timeout" in text or "timed out" in text:
return (
"La tarea tardó demasiado en completarse. Prueba a dividirla en "
"pasos más pequeños o vuelve a intentarlo."
)
# Saldo insuficiente / facturación del proveedor (402)
if (
"402" in text
or "insufficient balance" in text
or "insufficient_quota" in text
or "billing" in text
):
return (
"El asistente no está disponible en este momento. Inténtalo de "
"nuevo en unos minutos."
)
# Credenciales del proveedor inválidas (401)
if (
"401" in text
or "invalid_api_key" in text
or "incorrect api key" in text
or "invalid api key" in text
):
return (
"El asistente no está disponible temporalmente por un problema de "
"configuración. Estamos trabajando en ello."
)
# Límite de peticiones (429)
if "429" in text or "rate limit" in text or "rate_limit" in text:
return (
"Hay mucha demanda en este momento. Espera unos segundos y vuelve "
"a intentarlo."
)
return _GENERIC_ERROR
class ClaudeFormatEmitter:
"""Emits events in Claude Code CLI SSE format.
@@ -304,7 +369,10 @@ class ClaudeFormatEmitter:
self._push(session_id, {"type": "done"})
elif event_type == EventType.ERROR:
error_msg = data.get("message", str(data.get("error", "Unknown error")))
raw_msg = data.get("message", str(data.get("error", "Unknown error")))
user_msg = friendly_error_message(raw_msg, str(data.get("error", "")))
# El error real (detalles del proveedor) solo va al log, nunca al cliente.
logger.warning("Session %s error (raw): %s", session_id, raw_msg)
# Close any open block
self._close_text_block(session_id)
@@ -312,7 +380,7 @@ class ClaudeFormatEmitter:
self._push(session_id, {
"type": "result",
"is_error": True,
"result": error_msg,
"result": user_msg,
"usage": {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0},
"total_cost_usd": 0,
})

View File

@@ -294,11 +294,27 @@ class TestTaskHistoryTrim:
class TestConversationCompaction:
def test_compactor_preserves_last_user_and_compacts_old_tool_results(self):
compactor = ContextCompactor(max_tokens=999999)
# Los assistants llevan sus tool_calls: sin ellos los `role: tool`
# serian huerfanos y `_enforce_tool_pairing` los convertiria a user.
messages = [
{"role": "user", "content": "Contexto anterior " * 10},
{"role": "assistant", "content": "Voy a revisar el modulo ahora mismo. " * 6},
{
"role": "assistant",
"content": "Voy a revisar el modulo ahora mismo. " * 6,
"tool_calls": [
{"id": "tool-1", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": "tool-1", "content": "resultado antiguo\n" * 80},
{"role": "assistant", "content": "He visto el resultado anterior. " * 6},
{
"role": "assistant",
"content": "He visto el resultado anterior. " * 6,
"tool_calls": [
{"id": "tool-2", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": "tool-2", "content": "resultado reciente\n" * 80},
{"role": "user", "content": "Este es el ultimo mensaje del usuario y debe quedar intacto."},
]
@@ -358,9 +374,18 @@ class TestConversationCompaction:
def test_compactor_only_touches_user_messages_as_last_resort(self):
compactor = ContextCompactor(max_tokens=999999)
# tool_calls en el assistant para que el `role: tool` no sea huerfano
# (el invariante `_enforce_tool_pairing` convertiria un huerfano a user).
messages = [
{"role": "user", "content": "Contexto previo del usuario " * 8},
{"role": "assistant", "content": "Respuesta previa del asistente " * 6},
{
"role": "assistant",
"content": "Respuesta previa del asistente " * 6,
"tool_calls": [
{"id": "tool-1", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": "tool-1", "content": "resultado viejo\n" * 80},
{"role": "user", "content": "Ultimo mensaje del usuario"},
]

View File

@@ -0,0 +1,585 @@
"""Tests de REGRESION REAL del invariante tool_use ↔ tool_result.
A diferencia del resto de tests (que replican logica), este archivo importa el
codigo REAL de src/. Cubre el bug de produccion: sesiones largas (~130k tokens)
donde `compact_conversation` colapsaba assistants a "[ASSISTANT COMPACTADO]"
perdiendo los bloques `tool_use`, dejando tool_results huerfanos que el adapter
emitia como `role: tool` sin `tool_calls` → 400 del proveedor en cada reintento.
Requiere las dependencias de src/ (pydantic, Python 3.11+). Si no estan
disponibles (p.ej. host con Python 3.10), el modulo entero se salta — ejecutar
dentro del container: `docker exec acai-agentic python3 -m pytest ...`.
"""
import pytest
try:
from src.context.compactor import ContextCompactor
except Exception as e: # pragma: no cover - entorno sin deps de src/
pytest.skip(f"src/ no importable en este entorno: {e}", allow_module_level=True)
# =====================================================================
# Helper de validacion reutilizable
# =====================================================================
def collect_tool_use_ids(message: dict) -> set:
"""IDs de tool calls de un assistant (Anthropic blocks + OpenAI legacy)."""
ids = set()
content = message.get("content")
if isinstance(content, list):
for b in content:
if isinstance(b, dict) and b.get("type") == "tool_use":
ids.add(str(b.get("id", "")))
for tc in message.get("tool_calls") or []:
if isinstance(tc, dict):
ids.add(str(tc.get("id", "")))
ids.discard("")
return ids
def assert_tool_pairing_ok(messages: list) -> None:
"""Valida el invariante completo sobre una lista de mensajes internos:
- Todo tool_result (block) referencia un tool_use del assistant anterior.
- Todo tool_use (block) tiene su tool_result en el mensaje siguiente.
- Todo `role: tool` legacy responde a un tool_call del assistant previo.
"""
for i, msg in enumerate(messages):
role = msg.get("role")
content = msg.get("content")
if role == "user" and isinstance(content, list):
result_ids = {
str(b.get("tool_use_id", ""))
for b in content
if isinstance(b, dict) and b.get("type") == "tool_result"
}
if result_ids:
assert i > 0, f"msg[{i}]: tool_result al inicio de la conversacion"
prev = messages[i - 1]
assert prev.get("role") == "assistant", (
f"msg[{i}]: tool_result sin assistant inmediatamente anterior"
)
available = collect_tool_use_ids(prev)
orphans = result_ids - available
assert not orphans, (
f"msg[{i}]: tool_result huerfanos {orphans} "
f"(assistant previo solo tiene {available})"
)
if role == "assistant":
tool_ids = collect_tool_use_ids(msg)
if tool_ids:
answered = set()
j = i + 1
if (
j < len(messages)
and messages[j].get("role") == "user"
and isinstance(messages[j].get("content"), list)
):
for b in messages[j]["content"]:
if isinstance(b, dict) and b.get("type") == "tool_result":
answered.add(str(b.get("tool_use_id", "")))
j += 1
while j < len(messages) and messages[j].get("role") == "tool":
answered.add(str(messages[j].get("tool_call_id", "")))
j += 1
unanswered = tool_ids - answered
assert not unanswered, (
f"msg[{i}]: tool_use sin respuesta {unanswered}"
)
if role == "tool":
prev_assistant = None
for k in range(i - 1, -1, -1):
if messages[k].get("role") == "tool":
continue
if messages[k].get("role") == "assistant":
prev_assistant = messages[k]
break
assert prev_assistant is not None, (
f"msg[{i}]: role tool sin assistant previo"
)
call_id = str(msg.get("tool_call_id", ""))
assert call_id in collect_tool_use_ids(prev_assistant), (
f"msg[{i}]: role tool con tool_call_id={call_id} no presente "
f"en el assistant previo"
)
def make_turn(n: int, payload_chars: int = 4000) -> list:
"""Genera un turno completo: user → assistant(thinking+text+tool_use) →
user(tool_result). Payloads grandes para forzar la compactacion."""
tid = f"call_{n}"
return [
{"role": "user", "content": f"Peticion {n}: " + ("x" * payload_chars)},
{
"role": "assistant",
"content": [
{"type": "thinking", "thinking": "razonando " * (payload_chars // 10)},
{"type": "text", "text": f"Voy a ejecutar la tool del turno {n}."},
{
"type": "tool_use",
"id": tid,
"name": "acai_get_records",
"input": {"tableName": f"tabla_{n}"},
},
],
},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": tid,
"content": "resultado " * (payload_chars // 10),
}
],
},
]
# =====================================================================
# (a) compact_conversation end-to-end: el paso de ultimo recurso ya no
# deja tool_results huerfanos ni tool_use sin respuesta
# =====================================================================
class TestCompactConversationPairing:
def test_last_resort_does_not_orphan_tool_results(self):
compactor = ContextCompactor()
messages = []
for n in range(12):
messages.extend(make_turn(n, payload_chars=6000))
messages.append({"role": "user", "content": "ultima peticion del usuario"})
# Presupuesto minusculo: fuerza TODOS los pasos incluida la colapsa
# de listas a placeholder string (el paso que causaba el bug).
compacted, meta = compactor.compact_conversation(messages, max_tokens=300)
assert meta["output_tokens"] < meta["input_tokens"]
assert_tool_pairing_ok(compacted)
def test_moderate_budget_keeps_pairing(self):
compactor = ContextCompactor()
messages = []
for n in range(8):
messages.extend(make_turn(n, payload_chars=3000))
messages.append({"role": "user", "content": "peticion final"})
compacted, _ = compactor.compact_conversation(messages, max_tokens=2000)
assert_tool_pairing_ok(compacted)
def test_under_budget_passthrough_keeps_pairing(self):
compactor = ContextCompactor()
messages = make_turn(1, payload_chars=50)
compacted, meta = compactor.compact_conversation(messages, max_tokens=100_000)
assert meta["messages_compacted"] == 0
assert_tool_pairing_ok(compacted)
# Los tool_use/tool_result originales se conservan intactos
assert collect_tool_use_ids(compacted[1]) == {"call_1"}
def test_last_user_message_preserved(self):
compactor = ContextCompactor()
messages = []
for n in range(10):
messages.extend(make_turn(n, payload_chars=5000))
final = "esta es la peticion actual que NO debe perderse"
messages.append({"role": "user", "content": final})
compacted, _ = compactor.compact_conversation(messages, max_tokens=300)
assert compacted[-1]["content"] == final
# =====================================================================
# (b) _enforce_tool_pairing directo
# =====================================================================
class TestEnforceToolPairing:
def setup_method(self):
self.compactor = ContextCompactor()
def test_collapsed_assistant_with_orphan_tool_results(self):
"""Assistant colapsado a string + user con tool_results → los
tool_result se convierten en placeholder."""
messages = [
{"role": "assistant", "content": "[ASSISTANT COMPACTADO]"},
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "call_a", "content": "datos"},
{"type": "tool_result", "tool_use_id": "call_b", "content": "mas datos"},
],
},
]
repaired = self.compactor._enforce_tool_pairing(messages)
assert_tool_pairing_ok(repaired)
# Solo placeholders → content string (fusionados en uno)
assert repaired[1]["role"] == "user"
assert repaired[1]["content"] == "[Resultado de herramienta compactado]"
def test_orphan_tool_results_mixed_with_text(self):
"""tool_result huerfano junto a un bloque text → placeholder en lista,
el text se conserva."""
messages = [
{"role": "assistant", "content": "[ASSISTANT COMPACTADO]"},
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "call_a", "content": "datos"},
{"type": "text", "text": "y ademas haz esto"},
],
},
]
repaired = self.compactor._enforce_tool_pairing(messages)
assert_tool_pairing_ok(repaired)
content = repaired[1]["content"]
assert isinstance(content, list)
types = [b.get("type") for b in content]
assert types == ["text", "text"]
assert content[0]["text"] == "[Resultado de herramienta compactado]"
assert content[1]["text"] == "y ademas haz esto"
def test_partial_id_mismatch_drops_unanswered_tool_use(self):
"""Assistant con 3 tool_use, user con solo 2 tool_result → se elimina
el tool_use sin respuesta, thinking/text intactos."""
messages = [
{
"role": "assistant",
"content": [
{"type": "thinking", "thinking": "pensando"},
{"type": "text", "text": "ejecuto tres tools"},
{"type": "tool_use", "id": "c1", "name": "t1", "input": {}},
{"type": "tool_use", "id": "c2", "name": "t2", "input": {}},
{"type": "tool_use", "id": "c3", "name": "t3", "input": {}},
],
},
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "c1", "content": "r1"},
{"type": "tool_result", "tool_use_id": "c3", "content": "r3"},
],
},
]
repaired = self.compactor._enforce_tool_pairing(messages)
assert_tool_pairing_ok(repaired)
assert collect_tool_use_ids(repaired[0]) == {"c1", "c3"}
types = [b.get("type") for b in repaired[0]["content"]]
assert "thinking" in types and "text" in types
def test_assistant_tool_use_with_no_results_at_all(self):
"""Assistant con tool_use y SIN user de resultados detras → se
eliminan los tool_use; si el content queda vacio, placeholder."""
messages = [
{
"role": "assistant",
"content": [
{"type": "tool_use", "id": "c9", "name": "t", "input": {}},
],
},
{"role": "user", "content": "otra cosa"},
]
repaired = self.compactor._enforce_tool_pairing(messages)
assert_tool_pairing_ok(repaired)
assert repaired[0]["content"] == "[ASSISTANT COMPACTADO]"
def test_legacy_orphan_role_tool_converted_to_user(self):
"""role:tool legacy cuyo assistant anterior no tiene tool_calls →
se convierte a user placeholder."""
messages = [
{"role": "assistant", "content": "[ASSISTANT COMPACTADO]"},
{"role": "tool", "tool_call_id": "call_x", "content": "salida tool"},
]
repaired = self.compactor._enforce_tool_pairing(messages)
assert_tool_pairing_ok(repaired)
assert repaired[1]["role"] == "user"
assert repaired[1]["content"] == "[Resultado de herramienta compactado]"
def test_legacy_valid_role_tool_untouched(self):
messages = [
{
"role": "assistant",
"content": "lanzo tool",
"tool_calls": [
{"id": "call_x", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": "call_x", "content": "salida"},
]
repaired = self.compactor._enforce_tool_pairing(messages)
assert_tool_pairing_ok(repaired)
assert repaired[1]["role"] == "tool"
def test_well_paired_history_is_noop(self):
messages = make_turn(7, payload_chars=50)
repaired = self.compactor._enforce_tool_pairing(messages)
assert repaired == messages
# =====================================================================
# (c) Trim de recent_messages (OrchestratorEngine._trim_recent_messages)
# =====================================================================
orchestrator_engine = pytest.importorskip(
"src.orchestrator.engine",
reason="deps del orquestador (mcp, sse, redis) no disponibles",
)
OrchestratorEngine = orchestrator_engine.OrchestratorEngine
class TestTrimRecentMessages:
def _set_budget(self, monkeypatch, tokens: int):
from src.config import settings
monkeypatch.setattr(settings, "recent_messages_max_tokens", tokens)
def test_under_budget_untouched(self, monkeypatch):
self._set_budget(monkeypatch, 100_000)
messages = make_turn(0, payload_chars=100)
assert OrchestratorEngine._trim_recent_messages(list(messages)) == messages
def test_trims_oldest_whole_pairs(self, monkeypatch):
self._set_budget(monkeypatch, 500)
messages = []
for n in range(10):
messages.extend(make_turn(n, payload_chars=1000))
trimmed = OrchestratorEngine._trim_recent_messages(messages)
assert len(trimmed) < len(messages)
# Nunca se corta dentro de un par
assert_tool_pairing_ok(trimmed)
# El primer mensaje nunca es un carrier de tool_result ni role tool
first = trimmed[0]
assert first.get("role") != "tool"
if isinstance(first.get("content"), list):
assert not any(
isinstance(b, dict) and b.get("type") == "tool_result"
for b in first["content"]
)
# Se eliminan los mas antiguos: el final se conserva
assert trimmed[-1] == messages[-1]
def test_keeps_last_four_even_over_budget(self, monkeypatch):
self._set_budget(monkeypatch, 10) # presupuesto imposible
messages = []
for n in range(5):
messages.extend(make_turn(n, payload_chars=2000))
trimmed = OrchestratorEngine._trim_recent_messages(messages)
assert len(trimmed) >= 4
def test_pair_dragging_includes_legacy_tool_run(self, monkeypatch):
"""Un assistant legacy con tool_calls arrastra su run de role:tool."""
self._set_budget(monkeypatch, 300)
big = "y" * 3000
messages = [
{
"role": "assistant",
"content": big,
"tool_calls": [
{"id": "c1", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
{"id": "c2", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": "c1", "content": big},
{"role": "tool", "tool_call_id": "c2", "content": big},
{"role": "user", "content": "pregunta"},
{"role": "assistant", "content": "respuesta"},
{"role": "user", "content": "otra pregunta"},
{"role": "assistant", "content": "otra respuesta"},
]
trimmed = OrchestratorEngine._trim_recent_messages(messages)
# El par legacy entero (assistant + 2 tools) se elimino junto
assert trimmed[0] == {"role": "user", "content": "pregunta"}
assert_tool_pairing_ok(trimmed)
def test_append_recent_messages_applies_trim(self, monkeypatch):
self._set_budget(monkeypatch, 500)
existing = []
for n in range(10):
existing.extend(make_turn(n, payload_chars=1000))
merged = OrchestratorEngine._append_recent_messages(
existing, message="nueva peticion", conversation=[
{"role": "assistant", "content": "ok hecho"},
],
)
assert len(merged) < len(existing) + 2
assert merged[-1] == {"role": "assistant", "content": "ok hecho"}
assert_tool_pairing_ok(merged)
# =====================================================================
# (d) Guard defensivo del adapter (_repair_tool_sequence)
# =====================================================================
openai_mod = pytest.importorskip("openai", reason="SDK openai no instalado")
class TestRepairToolSequence:
@property
def repair(self):
from src.adapters.openai_adapter import OpenAIAdapter
return OpenAIAdapter._repair_tool_sequence
def test_valid_sequence_untouched(self):
msgs = [
{"role": "system", "content": "sys"},
{"role": "user", "content": "hola"},
{
"role": "assistant",
"content": None,
"tool_calls": [
{"id": "c1", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": "c1", "content": "resultado"},
{"role": "assistant", "content": "listo"},
]
assert self.repair(list(msgs)) == msgs
def test_orphan_tool_message_converted_to_user(self):
msgs = [
{"role": "assistant", "content": "[ASSISTANT COMPACTADO]"},
{"role": "tool", "tool_call_id": "c_orphan", "content": "datos " * 200},
]
out = self.repair(msgs)
assert out[1]["role"] == "user"
assert out[1]["content"].startswith(
"[Resultado de herramienta (contexto compactado)]: "
)
# Content truncado a 500 chars (+ prefijo)
assert len(out[1]["content"]) <= 500 + len(
"[Resultado de herramienta (contexto compactado)]: "
)
assert not any(m.get("role") == "tool" for m in out)
def test_unanswered_tool_calls_removed(self):
msgs = [
{
"role": "assistant",
"content": None,
"tool_calls": [
{"id": "c1", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
{"id": "c2", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": "c1", "content": "r1"},
{"role": "user", "content": "sigue"},
]
out = self.repair(msgs)
assert [tc["id"] for tc in out[0]["tool_calls"]] == ["c1"]
assert out[1] == {"role": "tool", "tool_call_id": "c1", "content": "r1"}
def test_all_tool_calls_unanswered_drops_key_and_sets_content(self):
msgs = [
{
"role": "assistant",
"content": None,
"tool_calls": [
{"id": "c1", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
],
},
{"role": "user", "content": "sigue"},
]
out = self.repair(msgs)
assert "tool_calls" not in out[0]
assert out[0]["content"] # nunca None sin tool_calls
def test_reasoning_promoted_when_tool_calls_dropped(self):
"""No romper la promocion de reasoning a content del fix anterior."""
msgs = [
{
"role": "assistant",
"content": None,
"reasoning_content": "razonamiento del modelo",
"tool_calls": [
{"id": "c1", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
],
},
{"role": "user", "content": "sigue"},
]
out = self.repair(msgs)
assert "tool_calls" not in out[0]
assert out[0]["content"] == "razonamiento del modelo"
assert "reasoning_content" not in out[0]
def test_mixed_orphan_in_tool_block(self):
"""Un huerfano en medio de un bloque de tools validos se convierte a
user DESPUES del bloque (no rompe la contiguidad assistant→tools)."""
msgs = [
{
"role": "assistant",
"content": None,
"tool_calls": [
{"id": "c1", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
{"id": "c2", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": "c1", "content": "r1"},
{"role": "tool", "tool_call_id": "huerfano", "content": "rx"},
{"role": "tool", "tool_call_id": "c2", "content": "r2"},
{"role": "user", "content": "sigue"},
]
out = self.repair(msgs)
roles = [m["role"] for m in out]
assert roles == ["assistant", "tool", "tool", "user", "user"]
assert out[1]["tool_call_id"] == "c1"
assert out[2]["tool_call_id"] == "c2"
assert out[3]["content"].startswith("[Resultado de herramienta")
class TestAdapterEndToEnd:
"""_to_openai_messages + guard sobre un historial roto realista."""
def test_collapsed_assistant_history_produces_valid_openai_sequence(self):
from src.adapters.openai_adapter import OpenAIAdapter
adapter = OpenAIAdapter.__new__(OpenAIAdapter) # sin cliente real
internal = [
{"role": "system", "content": "eres un agente"},
{"role": "user", "content": "haz algo"},
# Assistant colapsado por el compactor (perdio sus tool_use)
{"role": "assistant", "content": "[ASSISTANT COMPACTADO]"},
# …pero el user conserva sus tool_results (el bug de produccion)
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "call_1", "content": "datos"},
],
},
{"role": "assistant", "content": "termine"},
{"role": "user", "content": "siguiente peticion"},
]
out = adapter._to_openai_messages(internal)
# Contrato OpenAI: ningun role:tool sin tool_calls previo
for i, m in enumerate(out):
if m.get("role") == "tool":
assert i > 0
prev = out[i - 1]
prev_ids = set()
k = i - 1
while k >= 0 and out[k].get("role") == "tool":
k -= 1
if k >= 0 and out[k].get("role") == "assistant":
prev_ids = {
tc.get("id") for tc in out[k].get("tool_calls") or []
}
assert m.get("tool_call_id") in prev_ids, (
f"role tool huerfano en out[{i}]"
)
# El tool_result huerfano acabo como user, no como role tool
assert not any(m.get("role") == "tool" for m in out)