Compare commits
8 Commits
9854960c7c
...
9277862e56
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9277862e56 | ||
|
|
79ec267aa6 | ||
|
|
43337e8554 | ||
|
|
6a03fdf284 | ||
|
|
e34a39e3bf | ||
|
|
d6b04e4122 | ||
|
|
96b4542918 | ||
|
|
454b51b45d |
@@ -4,7 +4,11 @@ description: "Agente genérico de Acai CMS: crea módulos, edita contenido, gest
|
|||||||
icon: "code"
|
icon: "code"
|
||||||
category: "development"
|
category: "development"
|
||||||
temperature: 0.2
|
temperature: 0.2
|
||||||
max_tokens: 4096
|
# 16K de salida: cubre escribir un fichero entero (acai_write) + el razonamiento
|
||||||
|
# (thinking) en un solo turno. Con 4096 el JSON del tool_use se truncaba a mitad
|
||||||
|
# en ficheros medianos y el agente caia en micro-ediciones lentas. v4-pro soporta
|
||||||
|
# hasta 384K de salida, asi que 16K es conservador.
|
||||||
|
max_tokens: 16384
|
||||||
context_sections:
|
context_sections:
|
||||||
- immutable_rules
|
- immutable_rules
|
||||||
- project_profile
|
- project_profile
|
||||||
|
|||||||
@@ -74,6 +74,26 @@ cms/data/schema/ # .ini.php — SOLO con tools de schema
|
|||||||
14. **URL del proyecto**: `get_web_url` + `?pruebas=1` siempre.
|
14. **URL del proyecto**: `get_web_url` + `?pruebas=1` siempre.
|
||||||
15. **Operaciones destructivas**: confirma con el usuario antes de ejecutar.
|
15. **Operaciones destructivas**: confirma con el usuario antes de ejecutar.
|
||||||
|
|
||||||
|
# Eficiencia de edición (menos pasos Y menos tokens)
|
||||||
|
|
||||||
|
Elige la herramienta por el TAMAÑO del cambio. Ni micro-editar todo (muchos
|
||||||
|
pasos), ni reescribir el fichero entero por cada retoque (muchos tokens):
|
||||||
|
|
||||||
|
1. **Cambio pequeño o localizado** (un color, un valor, una regla, pocas zonas)
|
||||||
|
→ `acai-line-replace`. Barato: solo emites las líneas que cambian. NO
|
||||||
|
reescribas el fichero entero por un retoque.
|
||||||
|
2. **Creación o reescritura mayor** (cambias casi todo el fichero o lo creas de
|
||||||
|
cero) → UN solo `acai-write` del fichero completo. Reescribir entero por un
|
||||||
|
cambio pequeño desperdicia tokens; hazlo solo cuando de verdad cambia casi todo.
|
||||||
|
3. **Itera con `line-replace`, no con writes repetidos.** Tras ver el resultado
|
||||||
|
en el navegador, aplica los ajustes con `line-replace` puntuales. NO reescribas
|
||||||
|
el fichero completo en cada iteración de diseño.
|
||||||
|
4. **Cap de micro-ediciones.** Si te ves haciendo >4-5 `line-replace` sobre el
|
||||||
|
mismo fichero en un turno, para y reescríbelo entero de una vez (`acai-write`).
|
||||||
|
5. **NO hagas `acai-view` tras cada edición.** Ya tienes el contenido en contexto;
|
||||||
|
reléelo solo si una edición falló o dudas del estado real.
|
||||||
|
6. **Verificación visual al final, una sola pasada** — no tras cada retoque.
|
||||||
|
|
||||||
# Patrones canónicos (aplica por defecto)
|
# Patrones canónicos (aplica por defecto)
|
||||||
|
|
||||||
- **Detalle de registro**: sección `custom-{tableName}` con `thisrecord.*`.
|
- **Detalle de registro**: sección `custom-{tableName}` con `thisrecord.*`.
|
||||||
|
|||||||
@@ -15,6 +15,13 @@ export const CONFIG_FILE_PATH =
|
|||||||
|
|
||||||
export const MCP_PORT = Number(process.env.MCP_PORT || 3000);
|
export const MCP_PORT = Number(process.env.MCP_PORT || 3000);
|
||||||
export const MONITOR_PORT = Number(process.env.MCP_MONITOR_PORT || 4545);
|
export const MONITOR_PORT = Number(process.env.MCP_MONITOR_PORT || 4545);
|
||||||
|
// El monitor HTTP (UI + POST /retry) queda DESACTIVADO por defecto. Solo se
|
||||||
|
// arranca si MCP_MONITOR_ENABLED === 'true' de forma explicita.
|
||||||
|
export const MONITOR_ENABLED =
|
||||||
|
String(process.env.MCP_MONITOR_ENABLED || "").toLowerCase() === "true";
|
||||||
|
// Por seguridad escucha solo en loopback salvo que se defina MCP_MONITOR_HOST.
|
||||||
|
export const MONITOR_HOST = process.env.MCP_MONITOR_HOST || "127.0.0.1";
|
||||||
|
// Compatibilidad: si alguien fuerza MCP_MONITOR_DISABLED tambien lo respetamos.
|
||||||
export const MONITOR_DISABLED =
|
export const MONITOR_DISABLED =
|
||||||
String(process.env.MCP_MONITOR_DISABLED || "").toLowerCase() === "1" ||
|
String(process.env.MCP_MONITOR_DISABLED || "").toLowerCase() === "1" ||
|
||||||
String(process.env.MCP_MONITOR_DISABLED || "").toLowerCase() === "true";
|
String(process.env.MCP_MONITOR_DISABLED || "").toLowerCase() === "true";
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
// Load configuration first
|
// Load configuration first
|
||||||
import { loadLocalConfigProfile, applyProfileToEnv } from "./config/index.js";
|
import { loadLocalConfigProfile, applyProfileToEnv, MONITOR_ENABLED, MONITOR_DISABLED } from "./config/index.js";
|
||||||
|
|
||||||
// Load and apply config profile (backward compatibility)
|
// Load and apply config profile (backward compatibility)
|
||||||
const selectedProfile = loadLocalConfigProfile();
|
const selectedProfile = loadLocalConfigProfile();
|
||||||
@@ -30,8 +30,11 @@ import { registerResources } from "./resources/index.js";
|
|||||||
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||||
setRegistrationFunctions({ registerPrompts, registerTools, registerResources });
|
setRegistrationFunctions({ registerPrompts, registerTools, registerResources });
|
||||||
|
|
||||||
// Create the shared request monitor (will be applied to each session server)
|
// Create the shared request monitor (will be applied to each session server).
|
||||||
const requestMonitor = createRequestMonitor();
|
// Solo se crea si el monitor esta habilitado: asi no acumulamos historial en
|
||||||
|
// memoria ni envolvemos los handlers cuando la UI esta apagada (por defecto).
|
||||||
|
const monitorActive = MONITOR_ENABLED && !MONITOR_DISABLED;
|
||||||
|
const requestMonitor = monitorActive ? createRequestMonitor() : null;
|
||||||
|
|
||||||
// Create a server instance for retry functionality in the monitor UI
|
// Create a server instance for retry functionality in the monitor UI
|
||||||
const server = createMcpServer();
|
const server = createMcpServer();
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ import http from "node:http";
|
|||||||
import fsPromises from "node:fs/promises";
|
import fsPromises from "node:fs/promises";
|
||||||
import path from "node:path";
|
import path from "node:path";
|
||||||
import { fileURLToPath } from "node:url";
|
import { fileURLToPath } from "node:url";
|
||||||
import { MONITOR_PORT, MONITOR_DISABLED } from "./config/index.js";
|
import { MONITOR_PORT, MONITOR_HOST, MONITOR_ENABLED, MONITOR_DISABLED } from "./config/index.js";
|
||||||
import { sessionCredentials } from "./auth/credentials.js";
|
import { sessionCredentials } from "./auth/credentials.js";
|
||||||
import { activeSessions } from "./httpServer.js";
|
import { activeSessions } from "./httpServer.js";
|
||||||
|
|
||||||
@@ -84,8 +84,8 @@ export function broadcastSessionsUpdate() {
|
|||||||
* Start the monitor HTTP server
|
* Start the monitor HTTP server
|
||||||
*/
|
*/
|
||||||
export function startMonitorServer(requestMonitor, toolHandlers) {
|
export function startMonitorServer(requestMonitor, toolHandlers) {
|
||||||
if (MONITOR_DISABLED) {
|
if (!MONITOR_ENABLED || MONITOR_DISABLED) {
|
||||||
console.error("MCP monitor UI deshabilitada (MCP_MONITOR_DISABLED=1).");
|
console.error("[monitor] deshabilitado (MCP_MONITOR_ENABLED!=true)");
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -202,12 +202,12 @@ export function startMonitorServer(requestMonitor, toolHandlers) {
|
|||||||
|
|
||||||
monitorServer.on("error", (error) => {
|
monitorServer.on("error", (error) => {
|
||||||
console.warn(
|
console.warn(
|
||||||
`[monitor] No se pudo iniciar la UI en el puerto ${MONITOR_PORT}: ${error.message}. Establece MCP_MONITOR_DISABLED=1 para ocultar este aviso.`
|
`[monitor] No se pudo iniciar la UI en ${MONITOR_HOST}:${MONITOR_PORT}: ${error.message}. Desactiva MCP_MONITOR_ENABLED para ocultar este aviso.`
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
monitorServer.listen(MONITOR_PORT, '0.0.0.0', () => {
|
monitorServer.listen(MONITOR_PORT, MONITOR_HOST, () => {
|
||||||
console.error(`MCP monitor UI: http://0.0.0.0:${MONITOR_PORT}/monitor`);
|
console.error(`MCP monitor UI: http://${MONITOR_HOST}:${MONITOR_PORT}/monitor`);
|
||||||
});
|
});
|
||||||
|
|
||||||
// Broadcast sessions + stats update every 2 seconds for real-time monitoring
|
// Broadcast sessions + stats update every 2 seconds for real-time monitoring
|
||||||
|
|||||||
@@ -1,20 +1,41 @@
|
|||||||
import fs from "node:fs/promises";
|
import fs from "node:fs/promises";
|
||||||
|
import { existsSync } from "node:fs";
|
||||||
import path from "node:path";
|
import path from "node:path";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Lectura directa de los markdown del knowledge base desde el filesystem.
|
* Lectura directa de los markdown del knowledge base desde el filesystem.
|
||||||
*
|
*
|
||||||
* El MCP server corre dentro del container `agentic` junto al FastAPI, asi
|
* Orden de resolucion del directorio de docs:
|
||||||
* que los .md viven en `/app/docs/` (la imagen los copia ahi).
|
* 1. `ACAI_DOCS_DIR` — override explicito por entorno (si esta definido y no vacio).
|
||||||
*
|
* 2. `<ACAI_PROJECT_DIR>/docs` — caso principal: cada proyecto/web tiene su
|
||||||
* En caso de override por entorno, respeta `ACAI_DOCS_DIR`. En desarrollo
|
* propio `docs/`. El `.mcp.json` inyecta `ACAI_PROJECT_DIR` (p.ej.
|
||||||
* fuera del container, fallback a paths relativos al cwd.
|
* `/opt/acai/webs/<user>/<site>`), funciona tanto en local (VSCode) como
|
||||||
|
* en cloud (agentic).
|
||||||
|
* 3. `/app/docs` — fallback final: container `agentic` donde esta horneada la
|
||||||
|
* copia canonica de los .md.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
function dirExists(p) {
|
||||||
|
try {
|
||||||
|
return existsSync(p);
|
||||||
|
} catch {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
function resolveDocsDir() {
|
function resolveDocsDir() {
|
||||||
|
// 1. Override explicito
|
||||||
const override = process.env.ACAI_DOCS_DIR;
|
const override = process.env.ACAI_DOCS_DIR;
|
||||||
if (override) return override;
|
if (override && override.trim() !== "") return override;
|
||||||
// Container path
|
|
||||||
|
// 2. Docs del proyecto/web
|
||||||
|
const projectDir = process.env.ACAI_PROJECT_DIR;
|
||||||
|
if (projectDir && projectDir.trim() !== "") {
|
||||||
|
const projectDocs = path.join(projectDir, "docs");
|
||||||
|
if (dirExists(projectDocs)) return projectDocs;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Fallback al container agentic
|
||||||
return "/app/docs";
|
return "/app/docs";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ pydantic-settings>=2.7.0,<3.0.0
|
|||||||
redis[hiredis]>=5.2.0,<6.0.0
|
redis[hiredis]>=5.2.0,<6.0.0
|
||||||
anthropic>=0.42.0,<1.0.0
|
anthropic>=0.42.0,<1.0.0
|
||||||
openai>=1.60.0,<2.0.0
|
openai>=1.60.0,<2.0.0
|
||||||
|
litellm==1.80.0
|
||||||
httpx>=0.28.0,<1.0.0
|
httpx>=0.28.0,<1.0.0
|
||||||
sse-starlette>=2.2.0,<3.0.0
|
sse-starlette>=2.2.0,<3.0.0
|
||||||
tiktoken>=0.7.0,<1.0.0
|
tiktoken>=0.7.0,<1.0.0
|
||||||
|
|||||||
@@ -17,20 +17,24 @@ from .base import ModelAdapter, ModelConfig, ModelResponse, StreamChunk
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
# Algunos fine-tunes (sobre todo MiniMax) ocasionalmente emiten las tool calls
|
# Algunos fine-tunes (sobre todo MiniMax y DeepSeek) ocasionalmente emiten las
|
||||||
# como texto literal en lugar de usar los `tool_use` blocks nativos. Vistos
|
# tool calls como texto literal en lugar de usar los `tool_use` blocks nativos.
|
||||||
# tres formatos:
|
# Vistos cuatro formatos:
|
||||||
# 1) <minimax:tool_call><invoke name="X"><parameter name="P">V</parameter></invoke></minimax:tool_call>
|
# 1) <minimax:tool_call><invoke name="X"><parameter name="P">V</parameter></invoke></minimax:tool_call>
|
||||||
# 2) <invoke name="X"><parameter name="P">V</parameter></invoke> (sin minimax wrapper)
|
# 2) <invoke name="X"><parameter name="P">V</parameter></invoke> (sin minimax wrapper)
|
||||||
# 3) <tool_call>{"name":"X","parameters":{...}}{"name":"Y","parameters":{...}}</tool_call>
|
# 3) <tool_call>{"name":"X","parameters":{...}}{"name":"Y","parameters":{...}}</tool_call>
|
||||||
# (multiples tool calls JSON-encoded dentro de un solo wrapper)
|
# (multiples tool calls JSON-encoded dentro de un solo wrapper)
|
||||||
|
# 4) <||DSML||tool_calls><||DSML||invoke name="X"><||DSML||parameter name="P" string="true">V</||DSML||parameter></||DSML||invoke></||DSML||tool_calls>
|
||||||
|
# (formato DSML de DeepSeek — usa U+FF5C fullwidth vertical line como separador)
|
||||||
#
|
#
|
||||||
# Cuando eso pasa el orquestador ve "texto" y la tool nunca se ejecuta — el
|
# Cuando eso pasa el orquestador ve "texto" y la tool nunca se ejecuta — el
|
||||||
# usuario ve el markup crudo en el chat. Detectamos y convertimos a tool_use
|
# usuario ve el markup crudo en el chat. Detectamos y convertimos a tool_use
|
||||||
# sintetico mientras streameamos. Es un parche defensivo: el caso normal
|
# sintetico mientras streameamos. Es un parche defensivo: el caso normal
|
||||||
# (tool_use blocks) sigue por el camino estandar.
|
# (tool_use blocks) sigue por el camino estandar.
|
||||||
_TOOL_CALL_OPEN_RE = re.compile(
|
_TOOL_CALL_OPEN_RE = re.compile(
|
||||||
r"<(?:minimax:tool_call|invoke\s+name|tool_call\s*>)|\[TOOL_CALL\]",
|
# `<|` (U+FF5C) cubre cualquier special-token DeepSeek (DSML): <|DSML|invoke,
|
||||||
|
# <|tool_calls, etc. Tolerante a 1+ pipes y a la presencia/ausencia de "DSML".
|
||||||
|
r"<(?:minimax:tool_call|invoke\s+name|tool_call[\s>]|use_mcp_tool|mm_special)|\[TOOL_CALL\]|<|",
|
||||||
re.IGNORECASE,
|
re.IGNORECASE,
|
||||||
)
|
)
|
||||||
_INVOKE_RE = re.compile(
|
_INVOKE_RE = re.compile(
|
||||||
@@ -65,6 +69,21 @@ _PERL_ARGS_BLOCK_RE = re.compile(
|
|||||||
_PERL_KV_RE = re.compile(
|
_PERL_KV_RE = re.compile(
|
||||||
r"--([a-zA-Z_][a-zA-Z0-9_]*)\s+(\"[^\"]*\"|\'[^\']*\'|-?\d+(?:\.\d+)?|true|false|null)",
|
r"--([a-zA-Z_][a-zA-Z0-9_]*)\s+(\"[^\"]*\"|\'[^\']*\'|-?\d+(?:\.\d+)?|true|false|null)",
|
||||||
)
|
)
|
||||||
|
# Formato 5 (DeepSeek DSML). Formato oficial V4-Pro: el marcador es `|DSML|`
|
||||||
|
# con UN pipe fullwidth (U+FF5C) a cada lado — <|DSML|invoke name="X"> ...
|
||||||
|
# <|DSML|parameter name="P" string="true|false">V</|DSML|parameter> ...
|
||||||
|
# </|DSML|invoke>. Hacemos el regex TOLERANTE: 1+ pipes y "DSML" opcional,
|
||||||
|
# para cubrir variantes entre versiones del modelo. El atributo `string`
|
||||||
|
# decide el tipo del valor: "true" = string crudo, "false" = valor JSON.
|
||||||
|
_DSML_INVOKE_RE = re.compile(
|
||||||
|
r"<|+(?:DSML|+)?invoke\s+name=\"([^\"]+)\"[^>]*>(.*?)</|+(?:DSML|+)?invoke\s*>",
|
||||||
|
re.IGNORECASE | re.DOTALL,
|
||||||
|
)
|
||||||
|
_DSML_PARAM_RE = re.compile(
|
||||||
|
r"<|+(?:DSML|+)?parameter\s+name=\"([^\"]+)\"([^>]*)>(.*?)</|+(?:DSML|+)?parameter\s*>",
|
||||||
|
re.IGNORECASE | re.DOTALL,
|
||||||
|
)
|
||||||
|
_DSML_STRING_ATTR_RE = re.compile(r"string\s*=\s*\"(true|false)\"", re.IGNORECASE)
|
||||||
|
|
||||||
|
|
||||||
def _safe_emit_split(buf: str) -> str:
|
def _safe_emit_split(buf: str) -> str:
|
||||||
@@ -91,8 +110,8 @@ def _safe_emit_split(buf: str) -> str:
|
|||||||
# Si el tail ya tiene `>` cerrado, es un tag normal — emitir todo.
|
# Si el tail ya tiene `>` cerrado, es un tag normal — emitir todo.
|
||||||
if ">" in tail:
|
if ">" in tail:
|
||||||
return buf
|
return buf
|
||||||
# Si el tail puede ser inicio de tool_call/invoke/tool_call_json, retenerlo.
|
# Si el tail puede ser inicio de tool_call/invoke/tool_call_json/dsml, retenerlo.
|
||||||
candidates = ("<minimax:tool_call", "<invoke", "<tool_call")
|
candidates = ("<minimax:tool_call", "<invoke", "<tool_call", "<|")
|
||||||
for cand in candidates:
|
for cand in candidates:
|
||||||
if cand.startswith(tail.lower()) or tail.lower().startswith(cand[:len(tail)].lower()):
|
if cand.startswith(tail.lower()) or tail.lower().startswith(cand[:len(tail)].lower()):
|
||||||
return buf[:idx]
|
return buf[:idx]
|
||||||
@@ -212,6 +231,35 @@ def _parse_xml_tool_calls(text: str) -> list[dict[str, Any]]:
|
|||||||
"arguments": args,
|
"arguments": args,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
# Formato 5 (DeepSeek DSML):
|
||||||
|
# <|DSML|invoke name="X"><|DSML|parameter name="P" string="true">V</|DSML|parameter></|DSML|invoke>
|
||||||
|
for m in _DSML_INVOKE_RE.finditer(text):
|
||||||
|
name = m.group(1).strip()
|
||||||
|
body = m.group(2)
|
||||||
|
args_dsml: dict[str, Any] = {}
|
||||||
|
for p in _DSML_PARAM_RE.finditer(body):
|
||||||
|
pname = p.group(1).strip()
|
||||||
|
attrs = p.group(2) or ""
|
||||||
|
raw_val = p.group(3)
|
||||||
|
sm = _DSML_STRING_ATTR_RE.search(attrs)
|
||||||
|
if sm and sm.group(1).lower() == "true":
|
||||||
|
# string="true": valor es string crudo — NO strip (preserva
|
||||||
|
# whitespace significativo, p.ej. contenido de ficheros).
|
||||||
|
args_dsml[pname] = raw_val
|
||||||
|
else:
|
||||||
|
# string="false" (o ausente): valor JSON (num/bool/array/obj/string).
|
||||||
|
# Si no parsea, cae a string sin tocar.
|
||||||
|
try:
|
||||||
|
args_dsml[pname] = json.loads(raw_val.strip())
|
||||||
|
except (json.JSONDecodeError, ValueError):
|
||||||
|
args_dsml[pname] = raw_val.strip()
|
||||||
|
if name:
|
||||||
|
calls.append({
|
||||||
|
"id": "xml_{}".format(uuid.uuid4().hex[:12]),
|
||||||
|
"name": name,
|
||||||
|
"arguments": args_dsml,
|
||||||
|
})
|
||||||
|
|
||||||
return calls
|
return calls
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
67
src/adapters/litellm_adapter.py
Normal file
67
src/adapters/litellm_adapter.py
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
"""LiteLLM model adapter — spike para A/B contra el adapter OpenAI/DeepSeek nativo.
|
||||||
|
|
||||||
|
Reutiliza TODO el flujo de OpenAIAdapter (procesado de chunks, conversión de
|
||||||
|
mensajes, tools, fallback DSML) y solo cambia la llamada al modelo: en vez del
|
||||||
|
SDK de OpenAI, enruta por LiteLLM, que trae handling específico por proveedor
|
||||||
|
(DeepSeek incluido) y podría resolver de fábrica el DSML / reasoning_content que
|
||||||
|
hoy parcheamos a mano.
|
||||||
|
|
||||||
|
Activar con `AGENTIC_DEFAULT_MODEL_PROVIDER=litellm`. Modelo via
|
||||||
|
`AGENTIC_LITELLM_MODEL` (p.ej. "deepseek/deepseek-v4-pro"); si vacío, deriva de
|
||||||
|
`AGENTIC_DEFAULT_MODEL_ID`. Reusa `openai_api_key` / `openai_base_url` como
|
||||||
|
credenciales.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import litellm
|
||||||
|
|
||||||
|
from ..config import settings
|
||||||
|
from .openai_adapter import OpenAIAdapter
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Que LiteLLM descarte params no soportados por el proveedor en vez de petar.
|
||||||
|
litellm.drop_params = True
|
||||||
|
# Silenciar el spam INFO de litellm ("LiteLLM completion() model=...").
|
||||||
|
litellm.suppress_debug_info = True
|
||||||
|
logging.getLogger("LiteLLM").setLevel(logging.WARNING)
|
||||||
|
|
||||||
|
|
||||||
|
class LiteLLMAdapter(OpenAIAdapter):
|
||||||
|
"""Enruta las llamadas por LiteLLM, reutilizando el pipeline de OpenAIAdapter."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
model: str | None = None,
|
||||||
|
api_key: str | None = None,
|
||||||
|
base_url: str | None = None,
|
||||||
|
) -> None:
|
||||||
|
# NO llamamos a super().__init__: no necesitamos el cliente AsyncOpenAI.
|
||||||
|
self._litellm_model = model or settings.litellm_model or self._derive_model()
|
||||||
|
self._api_key = api_key or settings.openai_api_key or None
|
||||||
|
self._api_base = base_url or settings.openai_base_url or None
|
||||||
|
# LiteLLM no entrega usage fiable en streaming → estimar para billing.
|
||||||
|
self._estimate_usage_fallback = True
|
||||||
|
logger.info(
|
||||||
|
"LiteLLMAdapter: model=%s api_base=%s",
|
||||||
|
self._litellm_model, self._api_base or "(default)",
|
||||||
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _derive_model() -> str:
|
||||||
|
mid = settings.default_model_id or "deepseek-chat"
|
||||||
|
# Si ya trae prefijo de proveedor ("deepseek/...", "openai/..."), respetar.
|
||||||
|
return mid if "/" in mid else f"deepseek/{mid}"
|
||||||
|
|
||||||
|
async def _acreate(self, kwargs: dict[str, Any]):
|
||||||
|
kwargs = dict(kwargs)
|
||||||
|
kwargs["model"] = self._litellm_model
|
||||||
|
if self._api_key:
|
||||||
|
kwargs["api_key"] = self._api_key
|
||||||
|
if self._api_base:
|
||||||
|
kwargs["api_base"] = self._api_base
|
||||||
|
return await litellm.acompletion(**kwargs)
|
||||||
@@ -14,6 +14,24 @@ from .base import ModelAdapter, ModelConfig, ModelResponse, StreamChunk
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _estimate_usage(messages: list[dict[str, Any]], output_text: str) -> dict[str, int]:
|
||||||
|
"""Estimacion de tokens cuando el proveedor no entrega usage (p.ej. LiteLLM
|
||||||
|
streaming). Aproximada pero evita billing 0."""
|
||||||
|
from ..context.compactor import estimate_tokens
|
||||||
|
inp = 0
|
||||||
|
for m in messages:
|
||||||
|
c = m.get("content")
|
||||||
|
if isinstance(c, str):
|
||||||
|
inp += estimate_tokens(c)
|
||||||
|
elif isinstance(c, list):
|
||||||
|
for b in c:
|
||||||
|
if isinstance(b, dict):
|
||||||
|
inp += estimate_tokens(
|
||||||
|
b.get("text") or b.get("thinking") or str(b.get("content") or "")
|
||||||
|
)
|
||||||
|
return {"input_tokens": inp, "output_tokens": estimate_tokens(output_text or "")}
|
||||||
|
|
||||||
|
|
||||||
class OpenAIAdapter(ModelAdapter):
|
class OpenAIAdapter(ModelAdapter):
|
||||||
"""Adapter for the OpenAI API (GPT-4o, o1, etc.)."""
|
"""Adapter for the OpenAI API (GPT-4o, o1, etc.)."""
|
||||||
|
|
||||||
@@ -25,6 +43,15 @@ class OpenAIAdapter(ModelAdapter):
|
|||||||
if url:
|
if url:
|
||||||
kwargs["base_url"] = url
|
kwargs["base_url"] = url
|
||||||
self._client = AsyncOpenAI(**kwargs)
|
self._client = AsyncOpenAI(**kwargs)
|
||||||
|
# El path nativo conserva el usage real del proveedor; subclases que no
|
||||||
|
# reciben usage fiable en streaming (LiteLLM) lo ponen a True para estimar.
|
||||||
|
self._estimate_usage_fallback = False
|
||||||
|
|
||||||
|
async def _acreate(self, kwargs: dict[str, Any]):
|
||||||
|
"""Hook de la llamada al modelo. Subclases (p.ej. LiteLLMAdapter) lo
|
||||||
|
sobreescriben para enrutar por otra librería sin tocar el resto del
|
||||||
|
flujo (procesado de chunks, tools, mensajes)."""
|
||||||
|
return await self._client.chat.completions.create(**kwargs)
|
||||||
|
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
# Streaming
|
# Streaming
|
||||||
@@ -43,43 +70,90 @@ class OpenAIAdapter(ModelAdapter):
|
|||||||
)
|
)
|
||||||
|
|
||||||
kwargs: dict[str, Any] = {
|
kwargs: dict[str, Any] = {
|
||||||
"model": config.model_id or "gpt-4o",
|
"model": config.model_id or settings.default_model_id or "gpt-4o",
|
||||||
"max_tokens": config.max_tokens,
|
"max_tokens": config.max_tokens,
|
||||||
"temperature": config.temperature,
|
"temperature": config.temperature,
|
||||||
"messages": messages,
|
"messages": self._to_openai_messages(messages),
|
||||||
"stream": True,
|
"stream": True,
|
||||||
"stream_options": {"include_usage": True},
|
"stream_options": {"include_usage": True},
|
||||||
}
|
}
|
||||||
if tools:
|
if tools:
|
||||||
kwargs["tools"] = self._format_tools(tools)
|
kwargs["tools"] = self._format_tools(tools)
|
||||||
|
|
||||||
stream = await self._client.chat.completions.create(**kwargs)
|
stream = await self._acreate(kwargs)
|
||||||
|
|
||||||
|
# Fallback de tool-calls-en-texto: DeepSeek a veces emite las tool calls
|
||||||
|
# en su formato interno DSML como TEXTO (en el content) en vez de como
|
||||||
|
# tool_calls nativos. El endpoint OpenAI no lo convierte, asi que sin
|
||||||
|
# esto el agente "se para" mostrando DSML inerte. Reutilizamos el parser
|
||||||
|
# del claude_adapter.
|
||||||
|
from .claude_adapter import _parse_xml_tool_calls, _TOOL_CALL_OPEN_RE
|
||||||
|
|
||||||
tool_calls_acc: dict[int, dict[str, str]] = {}
|
tool_calls_acc: dict[int, dict[str, str]] = {}
|
||||||
|
|
||||||
final_usage: dict[str, int] = {}
|
final_usage: dict[str, int] = {}
|
||||||
|
usage_emitted = False # evita doble conteo si llega usage tras estimar
|
||||||
|
full_content = "" # content acumulado (para el fallback DSML)
|
||||||
|
full_reasoning = "" # razonamiento acumulado (para estimar usage)
|
||||||
|
emitted_chars = 0 # cuanto de full_content ya se emitio como delta
|
||||||
|
suppress_text = False # tras detectar un tool-call-en-texto, no emitir mas
|
||||||
|
|
||||||
|
# DeepSeek thinking mode: el razonamiento llega en `delta.reasoning_content`
|
||||||
|
# (antes del content). Lo acumulamos como un bloque `thinking` (block_index 0)
|
||||||
|
# para que el orquestador lo persista y `_to_openai_messages` lo reenvie como
|
||||||
|
# `reasoning_content` en el siguiente turno — DeepSeek lo exige en multi-turno
|
||||||
|
# con tool calls ("reasoning_content ... must be passed back to the API").
|
||||||
|
reasoning_seen = False
|
||||||
|
reasoning_sig_emitted = False
|
||||||
|
|
||||||
async for chunk in stream:
|
async for chunk in stream:
|
||||||
# With include_usage, the last chunk has usage but no choices
|
# With include_usage, the last chunk has usage but no choices.
|
||||||
if chunk.usage:
|
# getattr: el chunk de LiteLLM (ModelResponseStream) no siempre trae
|
||||||
|
# el atributo `usage`; el del SDK OpenAI sí (None salvo el ultimo).
|
||||||
|
chunk_usage = getattr(chunk, "usage", None)
|
||||||
|
if chunk_usage:
|
||||||
final_usage = {
|
final_usage = {
|
||||||
"input_tokens": chunk.usage.prompt_tokens or 0,
|
"input_tokens": getattr(chunk_usage, "prompt_tokens", 0) or 0,
|
||||||
"output_tokens": chunk.usage.completion_tokens or 0,
|
"output_tokens": getattr(chunk_usage, "completion_tokens", 0) or 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
choice = chunk.choices[0] if chunk.choices else None
|
choice = chunk.choices[0] if chunk.choices else None
|
||||||
if not choice:
|
if not choice:
|
||||||
# Usage-only chunk (last one with include_usage) — emit it
|
# Usage-only chunk (last one with include_usage) — emit it
|
||||||
if final_usage:
|
if final_usage and not usage_emitted:
|
||||||
yield StreamChunk(usage=final_usage)
|
yield StreamChunk(usage=final_usage)
|
||||||
final_usage = {} # Only emit once
|
usage_emitted = True
|
||||||
continue
|
continue
|
||||||
|
|
||||||
delta = choice.delta
|
delta = choice.delta
|
||||||
|
|
||||||
|
# Reasoning content (DeepSeek thinking mode). Llega como campo extra
|
||||||
|
# del delta; lo emitimos como thinking_delta en el bloque index 0.
|
||||||
|
reasoning_txt = getattr(delta, "reasoning_content", None) if delta else None
|
||||||
|
if reasoning_txt:
|
||||||
|
reasoning_seen = True
|
||||||
|
full_reasoning += reasoning_txt
|
||||||
|
yield StreamChunk(
|
||||||
|
thinking_delta=reasoning_txt,
|
||||||
|
block_type="thinking",
|
||||||
|
block_index=0,
|
||||||
|
)
|
||||||
|
|
||||||
# Text content
|
# Text content
|
||||||
if delta and delta.content:
|
if delta and delta.content:
|
||||||
yield StreamChunk(delta=delta.content)
|
full_content += delta.content
|
||||||
|
if not suppress_text:
|
||||||
|
# Si arranca un tool call en texto (DSML/XML), emitimos lo
|
||||||
|
# previo y dejamos de emitir el resto (el DSML no debe verse).
|
||||||
|
m = _TOOL_CALL_OPEN_RE.search(full_content, emitted_chars)
|
||||||
|
if m:
|
||||||
|
suppress_text = True
|
||||||
|
if m.start() > emitted_chars:
|
||||||
|
yield StreamChunk(delta=full_content[emitted_chars:m.start()])
|
||||||
|
emitted_chars = len(full_content)
|
||||||
|
else:
|
||||||
|
yield StreamChunk(delta=full_content[emitted_chars:])
|
||||||
|
emitted_chars = len(full_content)
|
||||||
|
|
||||||
# Tool calls
|
# Tool calls
|
||||||
if delta and delta.tool_calls:
|
if delta and delta.tool_calls:
|
||||||
@@ -109,7 +183,31 @@ class OpenAIAdapter(ModelAdapter):
|
|||||||
|
|
||||||
# Finish
|
# Finish
|
||||||
if choice.finish_reason:
|
if choice.finish_reason:
|
||||||
if choice.finish_reason == "tool_calls":
|
# Cerrar el bloque de razonamiento (si lo hubo) con un signature
|
||||||
|
# sintetico: el orquestador descarta thinking blocks sin signature
|
||||||
|
# (proteccion para MiniMax/Anthropic). DeepSeek no usa signatures;
|
||||||
|
# este marcador solo evita el descarte y NUNCA se reenvia — en
|
||||||
|
# `_to_openai_messages` el bloque se mapea a `reasoning_content`.
|
||||||
|
if reasoning_seen and not reasoning_sig_emitted:
|
||||||
|
reasoning_sig_emitted = True
|
||||||
|
yield StreamChunk(
|
||||||
|
thinking_signature="deepseek-reasoning",
|
||||||
|
block_type="thinking",
|
||||||
|
block_index=0,
|
||||||
|
)
|
||||||
|
# Fallback de usage: algunos proveedores via LiteLLM no entregan el
|
||||||
|
# chunk de usage (o llega tras el break del orquestador) → billing 0.
|
||||||
|
# Estimamos por tokens para no infra-cobrar. Solo si el adapter lo
|
||||||
|
# pide (LiteLLM); el path nativo conserva el usage real del proveedor.
|
||||||
|
if self._estimate_usage_fallback and not final_usage and not usage_emitted:
|
||||||
|
final_usage = _estimate_usage(messages, full_content + "\n" + full_reasoning)
|
||||||
|
# IMPORTANTE: DeepSeek (endpoint OpenAI) a veces cierra el stream
|
||||||
|
# con finish_reason="stop" AUNQUE haya emitido tool_calls. Si nos
|
||||||
|
# fiamos solo de =="tool_calls" perdemos esos tool calls: el agente
|
||||||
|
# anuncia la accion en texto y "se para" sin ejecutarla. Por eso
|
||||||
|
# disparamos los tool_use SIEMPRE que haya tool calls acumulados,
|
||||||
|
# sea cual sea el finish_reason.
|
||||||
|
if tool_calls_acc:
|
||||||
for acc in tool_calls_acc.values():
|
for acc in tool_calls_acc.values():
|
||||||
yield StreamChunk(
|
yield StreamChunk(
|
||||||
tool_call_id=acc["id"],
|
tool_call_id=acc["id"],
|
||||||
@@ -118,15 +216,33 @@ class OpenAIAdapter(ModelAdapter):
|
|||||||
finish_reason="tool_use",
|
finish_reason="tool_use",
|
||||||
)
|
)
|
||||||
# Emit usage after tool_use chunks
|
# Emit usage after tool_use chunks
|
||||||
if final_usage:
|
if final_usage and not usage_emitted:
|
||||||
yield StreamChunk(usage=final_usage)
|
yield StreamChunk(usage=final_usage)
|
||||||
|
usage_emitted = True
|
||||||
|
else:
|
||||||
|
# Fallback: DeepSeek pudo emitir las tool calls como TEXTO
|
||||||
|
# (DSML/XML) en vez de nativas. Parseamos el content y, si hay
|
||||||
|
# tool calls, las ejecutamos igual; si no, cerramos el turno.
|
||||||
|
text_calls = _parse_xml_tool_calls(full_content) if full_content else []
|
||||||
|
if text_calls:
|
||||||
|
for c in text_calls:
|
||||||
|
yield StreamChunk(
|
||||||
|
tool_call_id=c["id"],
|
||||||
|
tool_name=c["name"],
|
||||||
|
tool_arguments=json.dumps(c.get("arguments", {}), ensure_ascii=False),
|
||||||
|
finish_reason="tool_use",
|
||||||
|
)
|
||||||
|
if final_usage and not usage_emitted:
|
||||||
|
yield StreamChunk(usage=final_usage)
|
||||||
|
usage_emitted = True
|
||||||
else:
|
else:
|
||||||
yield StreamChunk(
|
yield StreamChunk(
|
||||||
finish_reason="end_turn"
|
finish_reason="end_turn"
|
||||||
if choice.finish_reason == "stop"
|
if choice.finish_reason in ("stop", "tool_calls")
|
||||||
else choice.finish_reason,
|
else choice.finish_reason,
|
||||||
usage=final_usage,
|
usage=final_usage if not usage_emitted else {},
|
||||||
)
|
)
|
||||||
|
usage_emitted = True
|
||||||
|
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
# Non-streaming
|
# Non-streaming
|
||||||
@@ -145,10 +261,10 @@ class OpenAIAdapter(ModelAdapter):
|
|||||||
)
|
)
|
||||||
|
|
||||||
kwargs: dict[str, Any] = {
|
kwargs: dict[str, Any] = {
|
||||||
"model": config.model_id or "gpt-4o",
|
"model": config.model_id or settings.default_model_id or "gpt-4o",
|
||||||
"max_tokens": config.max_tokens,
|
"max_tokens": config.max_tokens,
|
||||||
"temperature": config.temperature,
|
"temperature": config.temperature,
|
||||||
"messages": messages,
|
"messages": self._to_openai_messages(messages),
|
||||||
}
|
}
|
||||||
if tools:
|
if tools:
|
||||||
kwargs["tools"] = self._format_tools(tools)
|
kwargs["tools"] = self._format_tools(tools)
|
||||||
@@ -161,7 +277,7 @@ class OpenAIAdapter(ModelAdapter):
|
|||||||
"function": {"name": force_tool},
|
"function": {"name": force_tool},
|
||||||
}
|
}
|
||||||
|
|
||||||
response = await self._client.chat.completions.create(**kwargs)
|
response = await self._acreate(kwargs)
|
||||||
choice = response.choices[0]
|
choice = response.choices[0]
|
||||||
|
|
||||||
content = choice.message.content or ""
|
content = choice.message.content or ""
|
||||||
@@ -204,19 +320,230 @@ class OpenAIAdapter(ModelAdapter):
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _format_tools(tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
def _format_tools(tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||||||
"""Convert internal tool definitions to OpenAI function calling format."""
|
"""Convert internal tool definitions to OpenAI function calling format.
|
||||||
|
|
||||||
|
Si `deepseek_strict_tools`, marca cada funcion con `strict: true` y limpia
|
||||||
|
del schema los keywords que DeepSeek strict NO soporta (minLength/maxLength/
|
||||||
|
minItems/maxItems), que de otro modo darian 400."""
|
||||||
|
strict = settings.deepseek_strict_tools
|
||||||
formatted: list[dict[str, Any]] = []
|
formatted: list[dict[str, Any]] = []
|
||||||
for tool in tools:
|
for tool in tools:
|
||||||
formatted.append(
|
params = tool.get("input_schema", tool.get("parameters", {"type": "object"}))
|
||||||
{
|
fn: dict[str, Any] = {
|
||||||
"type": "function",
|
|
||||||
"function": {
|
|
||||||
"name": tool["name"],
|
"name": tool["name"],
|
||||||
"description": tool.get("description", ""),
|
"description": tool.get("description", ""),
|
||||||
"parameters": tool.get(
|
"parameters": OpenAIAdapter._sanitize_strict_schema(params) if strict else params,
|
||||||
"input_schema", tool.get("parameters", {"type": "object"})
|
}
|
||||||
),
|
if strict:
|
||||||
|
fn["strict"] = True
|
||||||
|
formatted.append({"type": "function", "function": fn})
|
||||||
|
return formatted
|
||||||
|
|
||||||
|
# Keywords no soportados por DeepSeek strict mode (segun docs oficiales).
|
||||||
|
_STRICT_UNSUPPORTED_KEYS = ("minLength", "maxLength", "minItems", "maxItems")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _sanitize_strict_schema(schema: Any) -> Any:
|
||||||
|
"""Elimina recursivamente keywords no soportados por DeepSeek strict."""
|
||||||
|
if isinstance(schema, dict):
|
||||||
|
return {
|
||||||
|
k: OpenAIAdapter._sanitize_strict_schema(v)
|
||||||
|
for k, v in schema.items()
|
||||||
|
if k not in OpenAIAdapter._STRICT_UNSUPPORTED_KEYS
|
||||||
|
}
|
||||||
|
if isinstance(schema, list):
|
||||||
|
return [OpenAIAdapter._sanitize_strict_schema(x) for x in schema]
|
||||||
|
return schema
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _blocks_text(content: Any) -> str:
|
||||||
|
"""Extrae texto plano de un content que puede ser str o lista de bloques."""
|
||||||
|
if content is None:
|
||||||
|
return ""
|
||||||
|
if isinstance(content, str):
|
||||||
|
return content
|
||||||
|
if isinstance(content, list):
|
||||||
|
parts = []
|
||||||
|
for b in content:
|
||||||
|
if isinstance(b, dict):
|
||||||
|
parts.append(b.get("text") or b.get("content") or "")
|
||||||
|
else:
|
||||||
|
parts.append(str(b))
|
||||||
|
return "\n".join(p for p in parts if p)
|
||||||
|
return str(content)
|
||||||
|
|
||||||
|
def _to_openai_messages(self, messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||||||
|
"""Convierte los mensajes del formato interno (Anthropic-style, con bloques
|
||||||
|
`tool_use` / `tool_result`) al formato de la API OpenAI (`tool_calls` en el
|
||||||
|
assistant, mensajes `role: tool` con `tool_call_id`). El contexto se construye
|
||||||
|
en formato Anthropic, así que sin esto la API OpenAI de DeepSeek rechaza el
|
||||||
|
body ('unknown variant tool_use')."""
|
||||||
|
out: list[dict[str, Any]] = []
|
||||||
|
for msg in messages:
|
||||||
|
role = msg.get("role")
|
||||||
|
content = msg.get("content")
|
||||||
|
if role == "system":
|
||||||
|
out.append({"role": "system", "content": content if isinstance(content, str) else self._blocks_text(content)})
|
||||||
|
continue
|
||||||
|
if not isinstance(content, list):
|
||||||
|
out.append({"role": role, "content": content if isinstance(content, str) else str(content or "")})
|
||||||
|
continue
|
||||||
|
if role == "assistant":
|
||||||
|
text_parts: list[str] = []
|
||||||
|
tool_calls: list[dict[str, Any]] = []
|
||||||
|
reasoning_parts: list[str] = []
|
||||||
|
for b in content:
|
||||||
|
if not isinstance(b, dict):
|
||||||
|
continue
|
||||||
|
t = b.get("type")
|
||||||
|
if t == "text":
|
||||||
|
text_parts.append(b.get("text", ""))
|
||||||
|
elif t == "thinking":
|
||||||
|
# DeepSeek thinking mode: el razonamiento del turno debe
|
||||||
|
# reenviarse como `reasoning_content` (no como signature).
|
||||||
|
rc = b.get("thinking", "")
|
||||||
|
if rc:
|
||||||
|
reasoning_parts.append(rc)
|
||||||
|
elif t == "tool_use":
|
||||||
|
tool_calls.append({
|
||||||
|
"id": b.get("id", ""),
|
||||||
|
"type": "function",
|
||||||
|
"function": {
|
||||||
|
"name": b.get("name", ""),
|
||||||
|
"arguments": json.dumps(b.get("input", {}), ensure_ascii=False),
|
||||||
},
|
},
|
||||||
|
})
|
||||||
|
text_joined = "\n".join(p for p in text_parts if p)
|
||||||
|
m: dict[str, Any] = {"role": "assistant", "content": (text_joined or None)}
|
||||||
|
if reasoning_parts:
|
||||||
|
if not text_joined and not tool_calls:
|
||||||
|
# Quirk DeepSeek thinking: a veces emite TODA la respuesta
|
||||||
|
# en reasoning_content y cierra sin content ni tool_calls.
|
||||||
|
# Reenviar content=None sin tool_calls rompe la API
|
||||||
|
# ("content or tool_calls must be set"), asi que promovemos
|
||||||
|
# el reasoning a content (sin duplicarlo como reasoning_content).
|
||||||
|
m["content"] = "\n".join(reasoning_parts)
|
||||||
|
else:
|
||||||
|
m["reasoning_content"] = "\n".join(reasoning_parts)
|
||||||
|
if tool_calls:
|
||||||
|
m["tool_calls"] = tool_calls
|
||||||
|
out.append(m)
|
||||||
|
else: # user (puede traer tool_result blocks)
|
||||||
|
text_parts = []
|
||||||
|
for b in content:
|
||||||
|
if not isinstance(b, dict):
|
||||||
|
continue
|
||||||
|
t = b.get("type")
|
||||||
|
if t == "tool_result":
|
||||||
|
out.append({
|
||||||
|
"role": "tool",
|
||||||
|
"tool_call_id": b.get("tool_use_id", ""),
|
||||||
|
"content": self._blocks_text(b.get("content")),
|
||||||
|
})
|
||||||
|
elif t == "text":
|
||||||
|
text_parts.append(b.get("text", ""))
|
||||||
|
if text_parts:
|
||||||
|
out.append({"role": "user", "content": "\n".join(text_parts)})
|
||||||
|
# Guard defensivo: el compactor ya garantiza el invariante tool_use ↔
|
||||||
|
# tool_result (`_enforce_tool_pairing`), pero si algo se escapa el
|
||||||
|
# proveedor devuelve 400 y la sesion queda bloqueada. Cinturon y tirantes.
|
||||||
|
return self._repair_tool_sequence(out)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _repair_tool_sequence(out: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||||||
|
"""Garantiza el contrato OpenAI sobre la secuencia ya convertida:
|
||||||
|
|
||||||
|
- Todo `role: tool` debe responder a un tool_call_id del assistant
|
||||||
|
inmediatamente anterior (o de su bloque contiguo de tool messages).
|
||||||
|
Si no → se convierte a user con placeholder.
|
||||||
|
- Todo assistant con `tool_calls` debe tener respuesta para CADA id.
|
||||||
|
Los tool_calls sin respuesta se eliminan; si la lista queda vacia se
|
||||||
|
elimina la key (y se asegura `content` no-None — "content or
|
||||||
|
tool_calls must be set").
|
||||||
|
|
||||||
|
No deberia activarse nunca (el compactor repara antes); si se activa,
|
||||||
|
loguea warning para detectar regresiones del compactor.
|
||||||
|
"""
|
||||||
|
repaired: list[dict[str, Any]] = []
|
||||||
|
i = 0
|
||||||
|
n = len(out)
|
||||||
|
while i < n:
|
||||||
|
msg = out[i]
|
||||||
|
role = msg.get("role")
|
||||||
|
|
||||||
|
if role == "assistant" and msg.get("tool_calls"):
|
||||||
|
# Bloque contiguo de tool messages que responden a este assistant.
|
||||||
|
j = i + 1
|
||||||
|
block: list[dict[str, Any]] = []
|
||||||
|
while j < n and out[j].get("role") == "tool":
|
||||||
|
block.append(out[j])
|
||||||
|
j += 1
|
||||||
|
answered = {t.get("tool_call_id", "") for t in block}
|
||||||
|
kept_calls = [
|
||||||
|
tc for tc in msg["tool_calls"] if tc.get("id", "") in answered
|
||||||
|
]
|
||||||
|
dropped = [
|
||||||
|
tc for tc in msg["tool_calls"] if tc.get("id", "") not in answered
|
||||||
|
]
|
||||||
|
new_msg = dict(msg)
|
||||||
|
if dropped:
|
||||||
|
for tc in dropped:
|
||||||
|
logger.warning(
|
||||||
|
"repaired unanswered tool_call at index %d (tool_call_id=%s)",
|
||||||
|
i,
|
||||||
|
tc.get("id", ""),
|
||||||
|
)
|
||||||
|
if kept_calls:
|
||||||
|
new_msg["tool_calls"] = kept_calls
|
||||||
|
else:
|
||||||
|
new_msg.pop("tool_calls", None)
|
||||||
|
if new_msg.get("content") is None:
|
||||||
|
# Promover reasoning a content si existe (mismo
|
||||||
|
# criterio que el quirk DeepSeek de arriba); si no,
|
||||||
|
# placeholder para no enviar content=None sin tools.
|
||||||
|
rc = new_msg.pop("reasoning_content", None)
|
||||||
|
new_msg["content"] = rc or "[ASSISTANT COMPACTADO]"
|
||||||
|
repaired.append(new_msg)
|
||||||
|
valid_ids = {tc.get("id", "") for tc in kept_calls}
|
||||||
|
converted: list[dict[str, Any]] = []
|
||||||
|
for t in block:
|
||||||
|
if t.get("tool_call_id", "") in valid_ids:
|
||||||
|
repaired.append(t)
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
"repaired orphan tool message (tool_call_id=%s)",
|
||||||
|
t.get("tool_call_id", ""),
|
||||||
|
)
|
||||||
|
converted.append(
|
||||||
|
{
|
||||||
|
"role": "user",
|
||||||
|
"content": "[Resultado de herramienta (contexto compactado)]: "
|
||||||
|
+ str(t.get("content", ""))[:500],
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
return formatted
|
# Los huerfanos convertidos van DESPUES del bloque de tools
|
||||||
|
# validos para no romper la contiguidad assistant → tools.
|
||||||
|
repaired.extend(converted)
|
||||||
|
i = j
|
||||||
|
continue
|
||||||
|
|
||||||
|
if role == "tool":
|
||||||
|
# Tool message sin assistant con tool_calls delante → huerfano.
|
||||||
|
logger.warning(
|
||||||
|
"repaired orphan tool message at index %d (tool_call_id=%s)",
|
||||||
|
i,
|
||||||
|
msg.get("tool_call_id", ""),
|
||||||
|
)
|
||||||
|
repaired.append(
|
||||||
|
{
|
||||||
|
"role": "user",
|
||||||
|
"content": "[Resultado de herramienta (contexto compactado)]: "
|
||||||
|
+ str(msg.get("content", ""))[:500],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
i += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
repaired.append(msg)
|
||||||
|
i += 1
|
||||||
|
return repaired
|
||||||
|
|||||||
@@ -427,9 +427,9 @@ async def _execute_and_persist(orchestrator, storage, session, message) -> dict[
|
|||||||
async def abort_session(session_id: str) -> dict[str, Any]:
|
async def abort_session(session_id: str) -> dict[str, Any]:
|
||||||
"""Cancela la ejecución en curso de una sesión (botón Stop del chat).
|
"""Cancela la ejecución en curso de una sesión (botón Stop del chat).
|
||||||
|
|
||||||
Cancela la tarea detached (liberando el session_lock), cierra el stream SSE
|
Cancela la tarea detached (liberando el session_lock) y cierra el stream
|
||||||
de los suscriptores y limpia un posible lock huérfano. Idempotente: si no
|
SSE de los suscriptores. Idempotente: si no hay nada en curso devuelve
|
||||||
hay nada en curso devuelve `no_active_execution` sin error.
|
`no_active_execution` sin error.
|
||||||
"""
|
"""
|
||||||
storage = _get_storage()
|
storage = _get_storage()
|
||||||
session = await storage.get_session(session_id)
|
session = await storage.get_session(session_id)
|
||||||
@@ -452,8 +452,13 @@ async def abort_session(session_id: str) -> dict[str, Any]:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Failed to close SSE stream on abort for %s: %s", session_id, e)
|
logger.warning("Failed to close SSE stream on abort for %s: %s", session_id, e)
|
||||||
|
|
||||||
# Defensa: liberar un lock huérfano (p.ej. de una ejecución previa que crasheó
|
# Limpiar el lock SOLO si cancelamos una ejecución de verdad: el `finally`
|
||||||
# antes de soltarlo) para no bloquear el siguiente mensaje hasta el TTL.
|
# de la tarea cancelada puede no llegar a liberar el lock de forma fiable.
|
||||||
|
# `clear_session_lock` borra incondicional (sin conocer el token del lock),
|
||||||
|
# así que invocarlo sin cancelación confirmada borraría el lock de una
|
||||||
|
# ejecución síncrona (stream=false) aún viva — que no se registra en
|
||||||
|
# _running_executions — y permitiría una segunda ejecución concurrente.
|
||||||
|
if cancelled:
|
||||||
try:
|
try:
|
||||||
await storage.clear_session_lock(session_id)
|
await storage.clear_session_lock(session_id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -781,22 +786,64 @@ async def _load_knowledge_from_dir(docs_path: str = "docs") -> dict[str, Any]:
|
|||||||
|
|
||||||
docs_data.append((doc_id, title, content, summary, tags, priority, load_when))
|
docs_data.append((doc_id, title, content, summary, tags, priority, load_when))
|
||||||
|
|
||||||
# Generate embeddings in batch
|
# Hash de contenido por doc — base del skip idempotente de embeddings.
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
def _embed_text(title, summary, content):
|
||||||
|
return f"{title}\n{summary}\n{content[:2000]}"
|
||||||
|
|
||||||
|
def _doc_hash(title, summary, content):
|
||||||
|
return hashlib.md5(_embed_text(title, summary, content).encode("utf-8")).hexdigest()
|
||||||
|
|
||||||
|
new_hashes = [_doc_hash(t, s, c) for _, t, c, s, _, _, _ in docs_data]
|
||||||
|
|
||||||
|
# Generate embeddings SOLO para docs nuevos o cuyo contenido cambió (skip
|
||||||
|
# idempotente): si el hash coincide con el guardado y ya existe el embedding
|
||||||
|
# en Redis, se reutiliza y NO se vuelve a llamar a la API. Esto permite que
|
||||||
|
# /knowledge/load se dispare libremente (botón de scaffold, etc.) sin re-embeber.
|
||||||
|
embeddings: list[Any] = [None] * len(docs_data)
|
||||||
|
already_embedded = [False] * len(docs_data)
|
||||||
|
has_embeddings = False
|
||||||
|
if settings.embeddings_enabled:
|
||||||
|
to_embed = [] # indices que hay que (re)embeber
|
||||||
|
for i, (doc_id, title, content, summary, _, _, _) in enumerate(docs_data):
|
||||||
|
try:
|
||||||
|
prev = await memory._r.get(memory._key("kbhash", "knowledge", doc_id))
|
||||||
|
if isinstance(prev, bytes):
|
||||||
|
prev = prev.decode("utf-8")
|
||||||
|
has_embed = await memory._r.exists(memory._key("embeddings", "knowledge", doc_id))
|
||||||
|
except Exception:
|
||||||
|
prev, has_embed = None, 0
|
||||||
|
if prev == new_hashes[i] and has_embed:
|
||||||
|
already_embedded[i] = True # sin cambios → reutiliza el embedding existente
|
||||||
|
else:
|
||||||
|
to_embed.append(i)
|
||||||
|
|
||||||
|
if to_embed:
|
||||||
from ..memory.embeddings import EmbeddingService
|
from ..memory.embeddings import EmbeddingService
|
||||||
embed_service = EmbeddingService()
|
embed_service = EmbeddingService()
|
||||||
embed_texts = [
|
embed_texts = [
|
||||||
f"{title}\n{summary}\n{content[:2000]}"
|
_embed_text(docs_data[i][1], docs_data[i][3], docs_data[i][2])
|
||||||
for _, title, content, summary, _, _, _ in docs_data
|
for i in to_embed
|
||||||
]
|
]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
embeddings = await embed_service.embed_batch(embed_texts)
|
fresh = await embed_service.embed_batch(embed_texts)
|
||||||
|
for j, i in enumerate(to_embed):
|
||||||
|
embeddings[i] = fresh[j]
|
||||||
has_embeddings = True
|
has_embeddings = True
|
||||||
logger.info("Generated %d embeddings for knowledge base", len(embeddings))
|
logger.info(
|
||||||
|
"Generated %d embeddings (%d sin cambios, omitidos)",
|
||||||
|
len(to_embed), len(docs_data) - len(to_embed),
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Failed to generate embeddings: %s — loading without semantic search", e)
|
logger.warning("Failed to generate embeddings: %s — loading without semantic search", e)
|
||||||
embeddings = [None] * len(docs_data)
|
embeddings = [None] * len(docs_data)
|
||||||
has_embeddings = False
|
has_embeddings = False
|
||||||
|
else:
|
||||||
|
has_embeddings = True
|
||||||
|
logger.info("Knowledge sin cambios — no se regeneraron embeddings (%d docs)", len(docs_data))
|
||||||
|
else:
|
||||||
|
logger.info("Embeddings disabled (no AGENTIC_EMBEDDINGS_API_KEY) — KB loaded without semantic search")
|
||||||
|
|
||||||
# Limpia entradas huérfanas: docs que ya no existen en el filesystem.
|
# Limpia entradas huérfanas: docs que ya no existen en el filesystem.
|
||||||
# Sin esto, los IDs antiguos (e.g. tras renombrar 'builder-fields' →
|
# Sin esto, los IDs antiguos (e.g. tras renombrar 'builder-fields' →
|
||||||
@@ -807,9 +854,10 @@ async def _load_knowledge_from_dir(docs_path: str = "docs") -> dict[str, Any]:
|
|||||||
for existing in existing_docs:
|
for existing in existing_docs:
|
||||||
if existing.memory_id not in current_ids:
|
if existing.memory_id not in current_ids:
|
||||||
await memory.delete_document(existing.memory_id, namespace="knowledge")
|
await memory.delete_document(existing.memory_id, namespace="knowledge")
|
||||||
# Borra también el embedding asociado
|
# Borra también el embedding asociado y el hash de contenido
|
||||||
embed_key = memory._key("embeddings", "knowledge", existing.memory_id)
|
embed_key = memory._key("embeddings", "knowledge", existing.memory_id)
|
||||||
await memory._r.delete(embed_key)
|
await memory._r.delete(embed_key)
|
||||||
|
await memory._r.delete(memory._key("kbhash", "knowledge", existing.memory_id))
|
||||||
removed.append(existing.memory_id)
|
removed.append(existing.memory_id)
|
||||||
if removed:
|
if removed:
|
||||||
logger.info("Removed %d stale knowledge docs: %s", len(removed), removed)
|
logger.info("Removed %d stale knowledge docs: %s", len(removed), removed)
|
||||||
@@ -832,6 +880,11 @@ async def _load_knowledge_from_dir(docs_path: str = "docs") -> dict[str, Any]:
|
|||||||
|
|
||||||
if embeddings[i] is not None:
|
if embeddings[i] is not None:
|
||||||
await memory.store_embedding(doc_id, embeddings[i], namespace="knowledge")
|
await memory.store_embedding(doc_id, embeddings[i], namespace="knowledge")
|
||||||
|
# Guarda el hash de contenido para el skip idempotente del próximo load
|
||||||
|
try:
|
||||||
|
await memory._r.set(memory._key("kbhash", "knowledge", doc_id), new_hashes[i])
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
loaded.append({
|
loaded.append({
|
||||||
"id": doc_id,
|
"id": doc_id,
|
||||||
@@ -840,7 +893,7 @@ async def _load_knowledge_from_dir(docs_path: str = "docs") -> dict[str, Any]:
|
|||||||
"tags": tags[:5],
|
"tags": tags[:5],
|
||||||
"priority": priority,
|
"priority": priority,
|
||||||
"load_when": load_when,
|
"load_when": load_when,
|
||||||
"embedded": embeddings[i] is not None,
|
"embedded": embeddings[i] is not None or already_embedded[i],
|
||||||
})
|
})
|
||||||
|
|
||||||
logger.info("Loaded %d knowledge documents from %s (embeddings: %s)", len(loaded), docs_dir, has_embeddings)
|
logger.info("Loaded %d knowledge documents from %s (embeddings: %s)", len(loaded), docs_dir, has_embeddings)
|
||||||
|
|||||||
@@ -32,6 +32,33 @@ class Settings(BaseSettings):
|
|||||||
anthropic_base_url: str = "" # Custom base URL (for MiniMax Anthropic-compatible, etc.)
|
anthropic_base_url: str = "" # Custom base URL (for MiniMax Anthropic-compatible, etc.)
|
||||||
openai_api_key: str = ""
|
openai_api_key: str = ""
|
||||||
openai_base_url: str = "" # Custom base URL (for MiniMax, DeepInfra, etc.)
|
openai_base_url: str = "" # Custom base URL (for MiniMax, DeepInfra, etc.)
|
||||||
|
# --- Embeddings (semantic search) ---
|
||||||
|
# Credenciales DEDICADAS para embeddings. Necesarias porque el chat usa
|
||||||
|
# `openai_api_key` apuntando a un endpoint compatible (p.ej. DeepSeek, que NO
|
||||||
|
# tiene API de embeddings). Si vacio, cae a `openai_api_key` por compat. El
|
||||||
|
# base_url vacio => OpenAI real (api.openai.com); NO hereda `openai_base_url`.
|
||||||
|
embeddings_api_key: str = ""
|
||||||
|
embeddings_base_url: str = ""
|
||||||
|
embeddings_model: str = "text-embedding-3-small"
|
||||||
|
# Spike LiteLLM: si default_model_provider=litellm, modelo a usar (formato
|
||||||
|
# litellm, p.ej. "deepseek/deepseek-v4-pro"). Vacío → deriva de default_model_id.
|
||||||
|
litellm_model: str = ""
|
||||||
|
|
||||||
|
@property
|
||||||
|
def effective_embeddings_key(self) -> str:
|
||||||
|
"""Key a usar para embeddings. Prioriza la dedicada; reutiliza la del
|
||||||
|
chat SOLO si el chat es OpenAI real (sin `openai_base_url` custom) — si
|
||||||
|
apunta a DeepSeek u otro proveedor, esa key no sirve para embeddings."""
|
||||||
|
if self.embeddings_api_key:
|
||||||
|
return self.embeddings_api_key
|
||||||
|
if not self.openai_base_url:
|
||||||
|
return self.openai_api_key
|
||||||
|
return ""
|
||||||
|
|
||||||
|
@property
|
||||||
|
def embeddings_enabled(self) -> bool:
|
||||||
|
return bool(self.effective_embeddings_key or self.embeddings_base_url)
|
||||||
|
|
||||||
default_model_provider: str = "claude"
|
default_model_provider: str = "claude"
|
||||||
default_model_id: str = "claude-sonnet-4-20250514"
|
default_model_id: str = "claude-sonnet-4-20250514"
|
||||||
# Modelo override SOLO para el sub-loop del planner (acai_plan). Si vacio,
|
# Modelo override SOLO para el sub-loop del planner (acai_plan). Si vacio,
|
||||||
@@ -43,6 +70,11 @@ class Settings(BaseSettings):
|
|||||||
planner_max_tokens: int = 16000
|
planner_max_tokens: int = 16000
|
||||||
max_tokens: int = 4096
|
max_tokens: int = 4096
|
||||||
temperature: float = 0.3
|
temperature: float = 0.3
|
||||||
|
# DeepSeek strict function calling (beta). OPT-IN (default False): exige schemas
|
||||||
|
# tipo OpenAI (additionalProperties:false, todos required, etc.) que los tools MCP
|
||||||
|
# actuales NO cumplen → da 400. Para activarlo: schemas compatibles + base_url
|
||||||
|
# https://api.deepseek.com/beta + AGENTIC_DEEPSEEK_STRICT_TOOLS=true.
|
||||||
|
deepseek_strict_tools: bool = False
|
||||||
|
|
||||||
# --- Context engine ---
|
# --- Context engine ---
|
||||||
model_context_window: int = 0 # 0 = use legacy fixed budget / explicit override
|
model_context_window: int = 0 # 0 = use legacy fixed budget / explicit override
|
||||||
@@ -70,6 +102,10 @@ class Settings(BaseSettings):
|
|||||||
conversation_recent_raw_limit: int = 2
|
conversation_recent_raw_limit: int = 2
|
||||||
task_history_max_entries: int = 20
|
task_history_max_entries: int = 20
|
||||||
task_history_max_tokens: int = 1500
|
task_history_max_tokens: int = 1500
|
||||||
|
# Presupuesto de tokens para la ventana de recent_messages persistida en
|
||||||
|
# sesion. Sin esto crece sin limite y empuja al compactor a su paso
|
||||||
|
# destructivo (colapsar bloques perdiendo tool_use ids). 0 = sin limite.
|
||||||
|
recent_messages_max_tokens: int = 60_000
|
||||||
|
|
||||||
# --- MCP ---
|
# --- MCP ---
|
||||||
mcp_config_path: str = "" # Path to mcp.json; empty = legacy single-server mode
|
mcp_config_path: str = "" # Path to mcp.json; empty = legacy single-server mode
|
||||||
|
|||||||
@@ -180,7 +180,13 @@ class ContextCompactor:
|
|||||||
"raw_tool_results_kept": 0,
|
"raw_tool_results_kept": 0,
|
||||||
}
|
}
|
||||||
if total <= max_tokens:
|
if total <= max_tokens:
|
||||||
return messages, meta
|
# Aunque no haga falta compactar, garantizamos el invariante
|
||||||
|
# tool_use/tool_result (repara historiales ya rotos persistidos).
|
||||||
|
repaired = self._enforce_tool_pairing([dict(m) for m in messages])
|
||||||
|
meta["output_tokens"] = sum(
|
||||||
|
self._estimate_message_tokens(m) for m in repaired
|
||||||
|
)
|
||||||
|
return repaired, meta
|
||||||
|
|
||||||
compacted = [dict(m) for m in messages]
|
compacted = [dict(m) for m in messages]
|
||||||
last_user_idx = max(
|
last_user_idx = max(
|
||||||
@@ -343,20 +349,241 @@ class ContextCompactor:
|
|||||||
message["content"] = "[USER CONTEXT COMPACTADO]"
|
message["content"] = "[USER CONTEXT COMPACTADO]"
|
||||||
elif isinstance(content, list) and content:
|
elif isinstance(content, list) and content:
|
||||||
# Anthropic-style: reemplazar lista entera por placeholder string.
|
# Anthropic-style: reemplazar lista entera por placeholder string.
|
||||||
# Nota: pierde tool_use ids — solo aplicar al final como ultimo recurso.
|
# Nota: colapsar pierde los tool_use/tool_result ids, asi que
|
||||||
|
# lo hacemos PAIR-AWARE (colapsar un lado del par colapsa el
|
||||||
|
# otro en la misma iteracion) y ademas `_enforce_tool_pairing`
|
||||||
|
# al final garantiza el invariante aunque algo se escape.
|
||||||
if role == "assistant":
|
if role == "assistant":
|
||||||
message["content"] = "[ASSISTANT COMPACTADO]"
|
message["content"] = "[ASSISTANT COMPACTADO]"
|
||||||
|
# Si este assistant tenia tool_use, colapsar tambien el
|
||||||
|
# user de tool_results que lo sigue (mismo par).
|
||||||
|
if self._blocks_have_type(content, "tool_use"):
|
||||||
|
nxt = idx + 1
|
||||||
|
if (
|
||||||
|
nxt < len(compacted)
|
||||||
|
and nxt != last_user_idx
|
||||||
|
and compacted[nxt].get("role") == "user"
|
||||||
|
and self._blocks_have_type(
|
||||||
|
compacted[nxt].get("content"), "tool_result"
|
||||||
|
)
|
||||||
|
):
|
||||||
|
compacted[nxt]["content"] = "[USER CONTEXT COMPACTADO]"
|
||||||
elif role == "user":
|
elif role == "user":
|
||||||
message["content"] = "[USER CONTEXT COMPACTADO]"
|
message["content"] = "[USER CONTEXT COMPACTADO]"
|
||||||
|
# Si este user llevaba tool_results, colapsar tambien el
|
||||||
|
# assistant anterior con sus tool_use (mismo par).
|
||||||
|
if self._blocks_have_type(content, "tool_result"):
|
||||||
|
prv = idx - 1
|
||||||
|
if (
|
||||||
|
prv >= 0
|
||||||
|
and compacted[prv].get("role") == "assistant"
|
||||||
|
and self._blocks_have_type(
|
||||||
|
compacted[prv].get("content"), "tool_use"
|
||||||
|
)
|
||||||
|
):
|
||||||
|
compacted[prv]["content"] = "[ASSISTANT COMPACTADO]"
|
||||||
else:
|
else:
|
||||||
continue
|
continue
|
||||||
total = sum(self._estimate_message_tokens(m) for m in compacted)
|
total = sum(self._estimate_message_tokens(m) for m in compacted)
|
||||||
if total <= max_tokens:
|
if total <= max_tokens:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
# Invariante final: tras toda la compactacion, reparar cualquier par
|
||||||
|
# tool_use/tool_result roto. Sin esto, un tool_result huerfano se emite
|
||||||
|
# como `role: tool` sin `tool_calls` previo y el proveedor devuelve 400
|
||||||
|
# ("Messages with role 'tool' must be a response to a preceding message
|
||||||
|
# with 'tool_calls'").
|
||||||
|
compacted = self._enforce_tool_pairing(compacted)
|
||||||
|
total = sum(self._estimate_message_tokens(m) for m in compacted)
|
||||||
|
|
||||||
meta["output_tokens"] = total
|
meta["output_tokens"] = total
|
||||||
return compacted, meta
|
return compacted, meta
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Invariante tool_use ↔ tool_result
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _blocks_have_type(content: Any, block_type: str) -> bool:
|
||||||
|
"""True si `content` es una lista de bloques con alguno del tipo dado."""
|
||||||
|
if not isinstance(content, list):
|
||||||
|
return False
|
||||||
|
return any(
|
||||||
|
isinstance(b, dict) and b.get("type") == block_type for b in content
|
||||||
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _tool_use_ids(message: dict[str, Any]) -> set[str]:
|
||||||
|
"""IDs de tool calls emitidos por un assistant (bloques `tool_use`
|
||||||
|
estilo Anthropic y/o `tool_calls` estilo OpenAI legacy)."""
|
||||||
|
ids: set[str] = set()
|
||||||
|
content = message.get("content")
|
||||||
|
if isinstance(content, list):
|
||||||
|
for b in content:
|
||||||
|
if isinstance(b, dict) and b.get("type") == "tool_use":
|
||||||
|
ids.add(str(b.get("id", "")))
|
||||||
|
for tc in message.get("tool_calls") or []:
|
||||||
|
if isinstance(tc, dict):
|
||||||
|
ids.add(str(tc.get("id", "")))
|
||||||
|
ids.discard("")
|
||||||
|
return ids
|
||||||
|
|
||||||
|
def _enforce_tool_pairing(
|
||||||
|
self, messages: list[dict[str, Any]]
|
||||||
|
) -> list[dict[str, Any]]:
|
||||||
|
"""Repara el invariante tool_use ↔ tool_result en ambas direcciones.
|
||||||
|
|
||||||
|
La compactacion puede colapsar el content de un assistant (perdiendo sus
|
||||||
|
bloques `tool_use`) mientras el user siguiente conserva sus `tool_result`,
|
||||||
|
o al reves. El matching es por IDs (`tool_use.id` vs `tool_result.tool_use_id`
|
||||||
|
y `tool_calls[].id` vs `tool_call_id`), no solo por adyacencia, asi que
|
||||||
|
tambien repara desajustes parciales (p.ej. 3 tool_use vs 2 tool_result).
|
||||||
|
|
||||||
|
- tool_result sin tool_use previo → bloque text placeholder.
|
||||||
|
- tool_use sin tool_result siguiente → se elimina el bloque (thinking/text
|
||||||
|
se conservan; si el content queda vacio, placeholder string).
|
||||||
|
- `role: tool` legacy sin assistant con `tool_calls` → user placeholder.
|
||||||
|
"""
|
||||||
|
repaired: list[dict[str, Any]] = []
|
||||||
|
for idx, msg in enumerate(messages):
|
||||||
|
role = msg.get("role", "")
|
||||||
|
content = msg.get("content")
|
||||||
|
|
||||||
|
if role == "assistant":
|
||||||
|
tool_ids = self._tool_use_ids(msg)
|
||||||
|
if not tool_ids:
|
||||||
|
repaired.append(msg)
|
||||||
|
continue
|
||||||
|
# IDs respondidos: user con tool_results inmediato y/o run
|
||||||
|
# contiguo de mensajes legacy `role: tool`.
|
||||||
|
answered: set[str] = set()
|
||||||
|
j = idx + 1
|
||||||
|
if (
|
||||||
|
j < len(messages)
|
||||||
|
and messages[j].get("role") == "user"
|
||||||
|
and isinstance(messages[j].get("content"), list)
|
||||||
|
):
|
||||||
|
for b in messages[j]["content"]:
|
||||||
|
if isinstance(b, dict) and b.get("type") == "tool_result":
|
||||||
|
answered.add(str(b.get("tool_use_id", "")))
|
||||||
|
j += 1
|
||||||
|
while j < len(messages) and messages[j].get("role") == "tool":
|
||||||
|
answered.add(str(messages[j].get("tool_call_id", "")))
|
||||||
|
j += 1
|
||||||
|
unanswered = tool_ids - answered
|
||||||
|
if not unanswered:
|
||||||
|
repaired.append(msg)
|
||||||
|
continue
|
||||||
|
# Eliminar los tool_use/tool_calls sin respuesta.
|
||||||
|
new_msg = dict(msg)
|
||||||
|
if isinstance(content, list):
|
||||||
|
new_content = [
|
||||||
|
b
|
||||||
|
for b in content
|
||||||
|
if not (
|
||||||
|
isinstance(b, dict)
|
||||||
|
and b.get("type") == "tool_use"
|
||||||
|
and str(b.get("id", "")) in unanswered
|
||||||
|
)
|
||||||
|
]
|
||||||
|
if not new_content:
|
||||||
|
new_msg["content"] = "[ASSISTANT COMPACTADO]"
|
||||||
|
else:
|
||||||
|
new_msg["content"] = new_content
|
||||||
|
if isinstance(new_msg.get("tool_calls"), list):
|
||||||
|
kept_calls = [
|
||||||
|
tc
|
||||||
|
for tc in new_msg["tool_calls"]
|
||||||
|
if isinstance(tc, dict)
|
||||||
|
and str(tc.get("id", "")) not in unanswered
|
||||||
|
]
|
||||||
|
if kept_calls:
|
||||||
|
new_msg["tool_calls"] = kept_calls
|
||||||
|
else:
|
||||||
|
new_msg.pop("tool_calls", None)
|
||||||
|
if not new_msg.get("content"):
|
||||||
|
new_msg["content"] = "[ASSISTANT COMPACTADO]"
|
||||||
|
repaired.append(new_msg)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if role == "user" and self._blocks_have_type(content, "tool_result"):
|
||||||
|
# IDs disponibles en el assistant inmediatamente anterior
|
||||||
|
# (YA reparado — usar `repaired[-1]` refleja los tool_use que
|
||||||
|
# sobrevivieron, no los del mensaje original).
|
||||||
|
available: set[str] = set()
|
||||||
|
if repaired and repaired[-1].get("role") == "assistant":
|
||||||
|
available = self._tool_use_ids(repaired[-1])
|
||||||
|
new_content: list[Any] = []
|
||||||
|
orphaned = False
|
||||||
|
for b in content:
|
||||||
|
if (
|
||||||
|
isinstance(b, dict)
|
||||||
|
and b.get("type") == "tool_result"
|
||||||
|
and str(b.get("tool_use_id", "")) not in available
|
||||||
|
):
|
||||||
|
orphaned = True
|
||||||
|
# Fusionar placeholders consecutivos en un unico bloque text.
|
||||||
|
if not (
|
||||||
|
new_content
|
||||||
|
and isinstance(new_content[-1], dict)
|
||||||
|
and new_content[-1].get("type") == "text"
|
||||||
|
and new_content[-1].get("text")
|
||||||
|
== "[Resultado de herramienta compactado]"
|
||||||
|
):
|
||||||
|
new_content.append(
|
||||||
|
{
|
||||||
|
"type": "text",
|
||||||
|
"text": "[Resultado de herramienta compactado]",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
new_content.append(b)
|
||||||
|
if not orphaned:
|
||||||
|
repaired.append(msg)
|
||||||
|
continue
|
||||||
|
new_msg = dict(msg)
|
||||||
|
only_placeholders = all(
|
||||||
|
isinstance(b, dict)
|
||||||
|
and b.get("type") == "text"
|
||||||
|
and b.get("text") == "[Resultado de herramienta compactado]"
|
||||||
|
for b in new_content
|
||||||
|
)
|
||||||
|
if not new_content or only_placeholders:
|
||||||
|
new_msg["content"] = "[Resultado de herramienta compactado]"
|
||||||
|
else:
|
||||||
|
new_msg["content"] = new_content
|
||||||
|
repaired.append(new_msg)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if role == "tool":
|
||||||
|
# Legacy: el assistant anterior (saltando otros `role: tool`
|
||||||
|
# contiguos) debe tener este tool_call_id en sus tool_calls.
|
||||||
|
prev_assistant: dict[str, Any] | None = None
|
||||||
|
for prev in reversed(repaired):
|
||||||
|
if prev.get("role") == "tool":
|
||||||
|
continue
|
||||||
|
if prev.get("role") == "assistant":
|
||||||
|
prev_assistant = prev
|
||||||
|
break
|
||||||
|
call_id = str(msg.get("tool_call_id", ""))
|
||||||
|
valid = (
|
||||||
|
prev_assistant is not None
|
||||||
|
and call_id in self._tool_use_ids(prev_assistant)
|
||||||
|
)
|
||||||
|
if valid:
|
||||||
|
repaired.append(msg)
|
||||||
|
else:
|
||||||
|
repaired.append(
|
||||||
|
{
|
||||||
|
"role": "user",
|
||||||
|
"content": "[Resultado de herramienta compactado]",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
repaired.append(msg)
|
||||||
|
return repaired
|
||||||
|
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
# Internals
|
# Internals
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
|
|||||||
@@ -583,6 +583,16 @@ class ContextEngine:
|
|||||||
|
|
||||||
async def _semantic_rank(self, query: str) -> list[tuple[str, float]]:
|
async def _semantic_rank(self, query: str) -> list[tuple[str, float]]:
|
||||||
"""Rank knowledge docs by cosine similarity. Returns (doc_id, score)."""
|
"""Rank knowledge docs by cosine similarity. Returns (doc_id, score)."""
|
||||||
|
# Sin credencial de embeddings no tiene sentido intentar la llamada (daria
|
||||||
|
# 401 en cada turno). Se desactiva limpiamente con un aviso unico.
|
||||||
|
if not settings.embeddings_enabled:
|
||||||
|
if not getattr(self, "_embed_disabled_warned", False):
|
||||||
|
logger.warning(
|
||||||
|
"Embeddings disabled (no AGENTIC_EMBEDDINGS_API_KEY) — "
|
||||||
|
"semantic search off, loading all docs"
|
||||||
|
)
|
||||||
|
self._embed_disabled_warned = True
|
||||||
|
return []
|
||||||
try:
|
try:
|
||||||
if not self._embed_service:
|
if not self._embed_service:
|
||||||
self._embed_service = EmbeddingService()
|
self._embed_service = EmbeddingService()
|
||||||
@@ -936,7 +946,22 @@ class ContextEngine:
|
|||||||
else:
|
else:
|
||||||
base_user_content = "Awaiting task assignment."
|
base_user_content = "Awaiting task assignment."
|
||||||
|
|
||||||
followup_mode = self._classify_followup_mode(base_user_content)
|
# Un follow-up (transform/fetch_more/ambiguous) SOLO tiene sentido si hay
|
||||||
|
# un turno anterior al que referirse. En una sesión fresca / primer mensaje
|
||||||
|
# no hay nada que transformar, así que NO clasificamos: de lo contrario un
|
||||||
|
# primer prompt que casualmente contenga un marker ("resumen", "estructura",
|
||||||
|
# "busca", "adapta"…) se marcaría como `transform` y `_get_allowed_tools`
|
||||||
|
# devolvería [] — el agente se quedaría SIN tools y emitiría los tool calls
|
||||||
|
# como texto sin ejecutarlos (caso real: el prompt de análisis de estilos
|
||||||
|
# que dice "Guarda un resumen…").
|
||||||
|
has_prior_turn = bool(session.task_history) or bool(
|
||||||
|
getattr(session, "recent_messages", [])
|
||||||
|
)
|
||||||
|
followup_mode = (
|
||||||
|
self._classify_followup_mode(base_user_content)
|
||||||
|
if has_prior_turn
|
||||||
|
else "none"
|
||||||
|
)
|
||||||
resolved_context = ""
|
resolved_context = ""
|
||||||
if session.task_history and followup_mode != "none":
|
if session.task_history and followup_mode != "none":
|
||||||
resolved_context = self._build_followup_resolution(session.task_history[-1])
|
resolved_context = self._build_followup_resolution(session.task_history[-1])
|
||||||
|
|||||||
@@ -54,7 +54,11 @@ async def lifespan(app: FastAPI):
|
|||||||
await redis_storage.connect()
|
await redis_storage.connect()
|
||||||
|
|
||||||
# 2. Initialize model adapter
|
# 2. Initialize model adapter
|
||||||
if settings.default_model_provider == "openai":
|
if settings.default_model_provider == "litellm":
|
||||||
|
from .adapters.litellm_adapter import LiteLLMAdapter
|
||||||
|
model_adapter = LiteLLMAdapter()
|
||||||
|
logger.info("Using LiteLLM adapter (model: %s)", settings.litellm_model or settings.default_model_id)
|
||||||
|
elif settings.default_model_provider == "openai":
|
||||||
model_adapter = OpenAIAdapter()
|
model_adapter = OpenAIAdapter()
|
||||||
logger.info("Using OpenAI adapter (model: %s)", settings.default_model_id)
|
logger.info("Using OpenAI adapter (model: %s)", settings.default_model_id)
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -18,6 +18,15 @@ from ..models.tools import ToolDefinition
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Buffer maximo (bytes) del StreamReader para leer las respuestas JSON-RPC del
|
||||||
|
# MCP por stdout. Una respuesta llega en UNA sola linea; tools como el
|
||||||
|
# screenshot fullPage de Playwright devuelven la imagen en base64 en esa linea
|
||||||
|
# y superan de largo el 64KB por defecto de asyncio (y el 1MB que teniamos),
|
||||||
|
# lanzando LimitOverrunError que mataba el read loop y dejaba la sesion MCP
|
||||||
|
# inservible (el agente "se paraba" al hacer acciones). 64MB cubre cualquier
|
||||||
|
# screenshot real; por encima, el read loop descarta esa respuesta y sigue vivo.
|
||||||
|
MCP_STREAM_LIMIT = 64 * 1024 * 1024
|
||||||
|
|
||||||
|
|
||||||
class MCPClientError(Exception):
|
class MCPClientError(Exception):
|
||||||
pass
|
pass
|
||||||
@@ -74,7 +83,7 @@ class MCPClient:
|
|||||||
stdout=asyncio.subprocess.PIPE,
|
stdout=asyncio.subprocess.PIPE,
|
||||||
stderr=asyncio.subprocess.PIPE,
|
stderr=asyncio.subprocess.PIPE,
|
||||||
env=self._env,
|
env=self._env,
|
||||||
limit=1024 * 1024, # 1MB buffer for large MCP responses
|
limit=MCP_STREAM_LIMIT, # buffer grande para respuestas MCP (screenshots base64)
|
||||||
)
|
)
|
||||||
self._running = True
|
self._running = True
|
||||||
self._reader_task = asyncio.create_task(self._read_loop())
|
self._reader_task = asyncio.create_task(self._read_loop())
|
||||||
@@ -225,14 +234,30 @@ class MCPClient:
|
|||||||
if not self._process or not self._process.stdout:
|
if not self._process or not self._process.stdout:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
stdout = self._process.stdout
|
||||||
try:
|
try:
|
||||||
while self._running:
|
while self._running:
|
||||||
line = await self._process.stdout.readline()
|
try:
|
||||||
|
line = await stdout.readline()
|
||||||
|
except (ValueError, asyncio.LimitOverrunError):
|
||||||
|
# Una respuesta JSON-RPC supero el buffer (p.ej. screenshot
|
||||||
|
# fullPage de Playwright en base64 por encima de 64MB). Antes
|
||||||
|
# esto mataba el read loop y dejaba TODA la sesion MCP muerta
|
||||||
|
# (el agente se "paraba" en la siguiente accion). Ahora
|
||||||
|
# descartamos solo esa respuesta, re-sincronizamos el stream
|
||||||
|
# y seguimos vivos para las demas tools.
|
||||||
|
logger.warning(
|
||||||
|
"MCP [%s]: respuesta supera el buffer (%d MB), se descarta y se continua",
|
||||||
|
self.name, MCP_STREAM_LIMIT // (1024 * 1024),
|
||||||
|
)
|
||||||
|
await self._drain_until_newline(stdout)
|
||||||
|
continue
|
||||||
|
|
||||||
if not line:
|
if not line:
|
||||||
logger.warning("MCP server stdout closed")
|
logger.warning("MCP server stdout closed")
|
||||||
break
|
break
|
||||||
|
|
||||||
line_str = line.decode().strip()
|
line_str = line.decode(errors="replace").strip()
|
||||||
if not line_str:
|
if not line_str:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@@ -251,6 +276,21 @@ class MCPClient:
|
|||||||
finally:
|
finally:
|
||||||
self._running = False
|
self._running = False
|
||||||
|
|
||||||
|
async def _drain_until_newline(self, stdout: asyncio.StreamReader) -> None:
|
||||||
|
"""Consume bytes del stream hasta el proximo salto de linea para
|
||||||
|
re-sincronizar tras un LimitOverrunError (la respuesta sobredimensionada
|
||||||
|
se descarta). `read()` no usa separador, asi que no vuelve a disparar el
|
||||||
|
overrun y va vaciando el buffer hasta liberar la linea gigante."""
|
||||||
|
while self._running:
|
||||||
|
try:
|
||||||
|
chunk = await stdout.read(65536)
|
||||||
|
except Exception:
|
||||||
|
return
|
||||||
|
if not chunk:
|
||||||
|
return
|
||||||
|
if b"\n" in chunk:
|
||||||
|
return
|
||||||
|
|
||||||
def _handle_message(self, message: dict[str, Any]) -> None:
|
def _handle_message(self, message: dict[str, Any]) -> None:
|
||||||
"""Route an incoming JSON-RPC message."""
|
"""Route an incoming JSON-RPC message."""
|
||||||
msg_id = message.get("id")
|
msg_id = message.get("id")
|
||||||
|
|||||||
@@ -25,12 +25,19 @@ class EmbeddingService:
|
|||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
api_key: str | None = None,
|
api_key: str | None = None,
|
||||||
model: str = DEFAULT_MODEL,
|
model: str | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
self._client = AsyncOpenAI(
|
# Credenciales dedicadas de embeddings. Fallback a openai_api_key por
|
||||||
api_key=api_key or settings.openai_api_key,
|
# compat. El base_url solo se aplica si se configura explicitamente
|
||||||
)
|
# `embeddings_base_url`; vacio => OpenAI real (api.openai.com). NO se
|
||||||
self._model = model
|
# hereda `openai_base_url` (que apunta al chat, p.ej. DeepSeek sin
|
||||||
|
# endpoint de embeddings).
|
||||||
|
key = api_key or settings.effective_embeddings_key
|
||||||
|
kwargs: dict[str, Any] = {"api_key": key}
|
||||||
|
if settings.embeddings_base_url:
|
||||||
|
kwargs["base_url"] = settings.embeddings_base_url
|
||||||
|
self._client = AsyncOpenAI(**kwargs)
|
||||||
|
self._model = model or settings.embeddings_model or DEFAULT_MODEL
|
||||||
|
|
||||||
async def embed(self, text: str) -> list[float]:
|
async def embed(self, text: str) -> list[float]:
|
||||||
"""Generate embedding for a single text."""
|
"""Generate embedding for a single text."""
|
||||||
|
|||||||
@@ -289,6 +289,34 @@ class BaseAgent:
|
|||||||
|
|
||||||
# If no tool calls, we're done
|
# If no tool calls, we're done
|
||||||
if not tool_calls:
|
if not tool_calls:
|
||||||
|
# Quirk DeepSeek thinking: a veces el modelo emite TODA su
|
||||||
|
# respuesta como reasoning y cierra el turno sin text ni
|
||||||
|
# tool_use. Si el turno termina SOLO con bloques thinking,
|
||||||
|
# promovemos el thinking a un bloque text en el snapshot que
|
||||||
|
# se persiste — asi el UI no lo muestra como "pensando" al
|
||||||
|
# recargar y el siguiente turno no rompe con
|
||||||
|
# "content or tool_calls must be set".
|
||||||
|
if turn_blocks and all(b.get("type") == "thinking" for b in turn_blocks):
|
||||||
|
promoted = "\n".join(
|
||||||
|
b.get("thinking", "") for b in turn_blocks if b.get("thinking")
|
||||||
|
)
|
||||||
|
turn_blocks = [{"type": "text", "text": promoted}]
|
||||||
|
accumulated_content += promoted
|
||||||
|
if promoted and self.profile.stream_deltas:
|
||||||
|
# Emision en vivo via AGENT_DELTA normal: el
|
||||||
|
# ClaudeFormatEmitter cierra el thinking block abierto
|
||||||
|
# (content_block_stop) y abre un text block nuevo con
|
||||||
|
# su propio indice (start/delta/stop), asi que el
|
||||||
|
# protocolo de bloques no se rompe.
|
||||||
|
await self.sse.emit(
|
||||||
|
EventType.AGENT_DELTA,
|
||||||
|
{
|
||||||
|
"agent": self.profile.role,
|
||||||
|
"delta": promoted,
|
||||||
|
"step": step,
|
||||||
|
},
|
||||||
|
session_id=session.session_id,
|
||||||
|
)
|
||||||
if turn_blocks:
|
if turn_blocks:
|
||||||
conversation.append({"role": "assistant", "content": turn_blocks})
|
conversation.append({"role": "assistant", "content": turn_blocks})
|
||||||
elif full_text:
|
elif full_text:
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ from typing import Any
|
|||||||
from ..adapters.base import ModelAdapter
|
from ..adapters.base import ModelAdapter
|
||||||
from ..config import settings
|
from ..config import settings
|
||||||
from ..context.engine import ContextEngine
|
from ..context.engine import ContextEngine
|
||||||
from ..context.compactor import estimate_tokens
|
from ..context.compactor import ContextCompactor, estimate_tokens
|
||||||
from ..mcp.manager import MCPManager
|
from ..mcp.manager import MCPManager
|
||||||
from ..memory.store import MemoryStore
|
from ..memory.store import MemoryStore
|
||||||
from ..models.agent import AgentProfile
|
from ..models.agent import AgentProfile
|
||||||
@@ -260,7 +260,76 @@ class OrchestratorEngine:
|
|||||||
current_turn.append(sanitized)
|
current_turn.append(sanitized)
|
||||||
|
|
||||||
merged.extend(current_turn)
|
merged.extend(current_turn)
|
||||||
return merged
|
return OrchestratorEngine._trim_recent_messages(merged)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _trim_recent_messages(
|
||||||
|
messages: list[dict[str, Any]],
|
||||||
|
) -> list[dict[str, Any]]:
|
||||||
|
"""Recorta recent_messages a un presupuesto de tokens eliminando
|
||||||
|
mensajes ENTEROS desde el principio (los mas antiguos).
|
||||||
|
|
||||||
|
Dos reglas para no romper el invariante tool_use ↔ tool_result:
|
||||||
|
- Nunca cortar dentro de un par: si se elimina un assistant con
|
||||||
|
tool_use, se eliminan tambien sus tool_results (user carrier o run
|
||||||
|
de mensajes legacy `role: tool`).
|
||||||
|
- El primer mensaje resultante nunca puede ser un carrier de
|
||||||
|
tool_result ni un `role: tool`.
|
||||||
|
|
||||||
|
Mantiene siempre al menos los ultimos 4 mensajes aunque excedan el
|
||||||
|
presupuesto.
|
||||||
|
"""
|
||||||
|
budget = settings.recent_messages_max_tokens
|
||||||
|
if budget <= 0 or not messages:
|
||||||
|
return messages
|
||||||
|
|
||||||
|
estimate = ContextCompactor._estimate_message_tokens
|
||||||
|
total = sum(estimate(m) for m in messages)
|
||||||
|
if total <= budget:
|
||||||
|
return messages
|
||||||
|
|
||||||
|
def _is_tool_result_carrier(msg: dict[str, Any]) -> bool:
|
||||||
|
if msg.get("role") == "tool":
|
||||||
|
return True
|
||||||
|
if msg.get("role") != "user":
|
||||||
|
return False
|
||||||
|
content = msg.get("content")
|
||||||
|
return isinstance(content, list) and any(
|
||||||
|
isinstance(b, dict) and b.get("type") == "tool_result"
|
||||||
|
for b in content
|
||||||
|
)
|
||||||
|
|
||||||
|
def _has_tool_use(msg: dict[str, Any]) -> bool:
|
||||||
|
if msg.get("role") != "assistant":
|
||||||
|
return False
|
||||||
|
if msg.get("tool_calls"):
|
||||||
|
return True
|
||||||
|
content = msg.get("content")
|
||||||
|
return isinstance(content, list) and any(
|
||||||
|
isinstance(b, dict) and b.get("type") == "tool_use"
|
||||||
|
for b in content
|
||||||
|
)
|
||||||
|
|
||||||
|
min_keep = 4
|
||||||
|
n = len(messages)
|
||||||
|
start = 0
|
||||||
|
while total > budget and start < n - min_keep:
|
||||||
|
end = start + 1
|
||||||
|
if _has_tool_use(messages[start]):
|
||||||
|
# Arrastrar los tool_results del par (no cortar dentro de el).
|
||||||
|
while end < n and _is_tool_result_carrier(messages[end]):
|
||||||
|
end += 1
|
||||||
|
if n - end < min_keep:
|
||||||
|
break # Eliminar el par completo invadiria los ultimos min_keep
|
||||||
|
for k in range(start, end):
|
||||||
|
total -= estimate(messages[k])
|
||||||
|
start = end
|
||||||
|
|
||||||
|
trimmed = messages[start:]
|
||||||
|
# El primer mensaje nunca puede ser un tool_result sin su tool_use.
|
||||||
|
while trimmed and _is_tool_result_carrier(trimmed[0]):
|
||||||
|
trimmed.pop(0)
|
||||||
|
return trimmed
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _sanitize_recent_message(message: dict[str, Any]) -> dict[str, Any]:
|
def _sanitize_recent_message(message: dict[str, Any]) -> dict[str, Any]:
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import uuid
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from typing import Any, AsyncIterator
|
from typing import Any, AsyncIterator
|
||||||
|
|
||||||
@@ -127,14 +128,26 @@ class RedisStorage:
|
|||||||
# Execution lock (prevents concurrent messages on same session)
|
# Execution lock (prevents concurrent messages on same session)
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
# Compare-and-delete atómico: solo borra el lock si el valor coincide con
|
||||||
|
# el token de quien lo adquirió. Evita que una ejecución cuyo lock expiró
|
||||||
|
# por TTL borre en su `finally` el lock que ya adquirió otra petición.
|
||||||
|
_UNLOCK_LUA = (
|
||||||
|
"if redis.call('get', KEYS[1]) == ARGV[1] then "
|
||||||
|
"return redis.call('del', KEYS[1]) else return 0 end"
|
||||||
|
)
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def session_lock(
|
async def session_lock(
|
||||||
self, session_id: str, timeout: int = 300
|
self, session_id: str, timeout: int | None = None
|
||||||
) -> AsyncIterator[bool]:
|
) -> AsyncIterator[bool]:
|
||||||
"""Acquire an exclusive execution lock for a session.
|
"""Acquire an exclusive execution lock for a session.
|
||||||
|
|
||||||
Uses SETNX with auto-expiry to prevent deadlocks if the process
|
Uses SETNX with auto-expiry to prevent deadlocks if the process
|
||||||
crashes mid-execution.
|
crashes mid-execution. El TTL es mayor que el timeout global de
|
||||||
|
ejecución para que el lock no expire (y otra petición lo robe)
|
||||||
|
mientras la ejecución original sigue viva. Cada adquisición guarda
|
||||||
|
un token único como valor y la liberación es compare-and-delete
|
||||||
|
(Lua), de modo que solo el dueño puede borrar el lock.
|
||||||
|
|
||||||
Usage:
|
Usage:
|
||||||
async with storage.session_lock(session_id) as acquired:
|
async with storage.session_lock(session_id) as acquired:
|
||||||
@@ -142,20 +155,34 @@ class RedisStorage:
|
|||||||
raise HTTPException(409, "Session busy")
|
raise HTTPException(409, "Session busy")
|
||||||
# ... execute ...
|
# ... execute ...
|
||||||
"""
|
"""
|
||||||
|
if timeout is None:
|
||||||
|
timeout = int(settings.max_execution_timeout_seconds) + 60
|
||||||
key = self._key("session", session_id, "lock")
|
key = self._key("session", session_id, "lock")
|
||||||
acquired = await self.client.set(key, "1", nx=True, ex=timeout)
|
token = uuid.uuid4().hex
|
||||||
|
acquired = await self.client.set(key, token, nx=True, ex=timeout)
|
||||||
try:
|
try:
|
||||||
yield bool(acquired)
|
yield bool(acquired)
|
||||||
finally:
|
finally:
|
||||||
if acquired:
|
if acquired:
|
||||||
await self.client.delete(key)
|
released = await self.client.eval(self._UNLOCK_LUA, 1, key, token)
|
||||||
|
if not released:
|
||||||
|
# El lock expiró por TTL y/o lo posee otra petición — no
|
||||||
|
# tocamos nada, pero lo dejamos registrado.
|
||||||
|
logger.warning(
|
||||||
|
"session_lock for %s no longer owned at release "
|
||||||
|
"(expired or taken over)",
|
||||||
|
session_id,
|
||||||
|
)
|
||||||
|
|
||||||
async def clear_session_lock(self, session_id: str) -> None:
|
async def clear_session_lock(self, session_id: str) -> None:
|
||||||
"""Borra el lock de ejecución de una sesión de forma incondicional.
|
"""Borra el lock de ejecución de una sesión de forma incondicional.
|
||||||
|
|
||||||
Usado por el endpoint de abort para liberar un lock huérfano (de una
|
OJO: borra sin conocer el token del dueño, así que se salta el
|
||||||
ejecución previa que crasheó antes de soltarlo) y no bloquear el
|
compare-and-delete de `session_lock`. SOLO debe invocarse cuando se
|
||||||
siguiente mensaje hasta que expire el TTL.
|
ha confirmado que la ejecución dueña del lock fue cancelada (ver
|
||||||
|
`abort_session` en routes.py): la tarea cancelada puede no ejecutar
|
||||||
|
su `finally` de liberación de forma fiable, y en ese caso no hay
|
||||||
|
riesgo de borrar el lock de una ejecución viva.
|
||||||
"""
|
"""
|
||||||
key = self._key("session", session_id, "lock")
|
key = self._key("session", session_id, "lock")
|
||||||
await self.client.delete(key)
|
await self.client.delete(key)
|
||||||
|
|||||||
@@ -19,6 +19,71 @@ from .sse import EventType, SSEEmitter
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
_GENERIC_ERROR = (
|
||||||
|
"Ha ocurrido un error procesando tu mensaje. Vuelve a intentarlo en unos momentos."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Patrones que el frontend interpreta por sí mismo (login / sesión expirada).
|
||||||
|
# No los genericamos para no romper esas detecciones.
|
||||||
|
_PASSTHROUGH_PATTERNS = (
|
||||||
|
"not logged in",
|
||||||
|
"login required",
|
||||||
|
"authentication required",
|
||||||
|
"no conversation found",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def friendly_error_message(raw: str, code: str = "") -> str:
|
||||||
|
"""Traduce un error crudo (proveedor/excepción) a un mensaje genérico y
|
||||||
|
localizado para el usuario final, sin filtrar detalles internos.
|
||||||
|
|
||||||
|
Devuelve el texto original sin tocar para los casos de auth/sesión que el
|
||||||
|
frontend ya gestiona por contenido.
|
||||||
|
"""
|
||||||
|
raw = raw or ""
|
||||||
|
text = "{} {}".format(code or "", raw).lower()
|
||||||
|
|
||||||
|
# Auth / sesión: dejar pasar el texto original (lo maneja el frontend)
|
||||||
|
if any(p in text for p in _PASSTHROUGH_PATTERNS):
|
||||||
|
return raw
|
||||||
|
|
||||||
|
# Timeout de ejecución
|
||||||
|
if "timeout" in text or "timed out" in text:
|
||||||
|
return (
|
||||||
|
"La tarea tardó demasiado en completarse. Prueba a dividirla en "
|
||||||
|
"pasos más pequeños o vuelve a intentarlo."
|
||||||
|
)
|
||||||
|
# Saldo insuficiente / facturación del proveedor (402)
|
||||||
|
if (
|
||||||
|
"402" in text
|
||||||
|
or "insufficient balance" in text
|
||||||
|
or "insufficient_quota" in text
|
||||||
|
or "billing" in text
|
||||||
|
):
|
||||||
|
return (
|
||||||
|
"El asistente no está disponible en este momento. Inténtalo de "
|
||||||
|
"nuevo en unos minutos."
|
||||||
|
)
|
||||||
|
# Credenciales del proveedor inválidas (401)
|
||||||
|
if (
|
||||||
|
"401" in text
|
||||||
|
or "invalid_api_key" in text
|
||||||
|
or "incorrect api key" in text
|
||||||
|
or "invalid api key" in text
|
||||||
|
):
|
||||||
|
return (
|
||||||
|
"El asistente no está disponible temporalmente por un problema de "
|
||||||
|
"configuración. Estamos trabajando en ello."
|
||||||
|
)
|
||||||
|
# Límite de peticiones (429)
|
||||||
|
if "429" in text or "rate limit" in text or "rate_limit" in text:
|
||||||
|
return (
|
||||||
|
"Hay mucha demanda en este momento. Espera unos segundos y vuelve "
|
||||||
|
"a intentarlo."
|
||||||
|
)
|
||||||
|
return _GENERIC_ERROR
|
||||||
|
|
||||||
|
|
||||||
class ClaudeFormatEmitter:
|
class ClaudeFormatEmitter:
|
||||||
"""Emits events in Claude Code CLI SSE format.
|
"""Emits events in Claude Code CLI SSE format.
|
||||||
|
|
||||||
@@ -304,7 +369,10 @@ class ClaudeFormatEmitter:
|
|||||||
self._push(session_id, {"type": "done"})
|
self._push(session_id, {"type": "done"})
|
||||||
|
|
||||||
elif event_type == EventType.ERROR:
|
elif event_type == EventType.ERROR:
|
||||||
error_msg = data.get("message", str(data.get("error", "Unknown error")))
|
raw_msg = data.get("message", str(data.get("error", "Unknown error")))
|
||||||
|
user_msg = friendly_error_message(raw_msg, str(data.get("error", "")))
|
||||||
|
# El error real (detalles del proveedor) solo va al log, nunca al cliente.
|
||||||
|
logger.warning("Session %s error (raw): %s", session_id, raw_msg)
|
||||||
|
|
||||||
# Close any open block
|
# Close any open block
|
||||||
self._close_text_block(session_id)
|
self._close_text_block(session_id)
|
||||||
@@ -312,7 +380,7 @@ class ClaudeFormatEmitter:
|
|||||||
self._push(session_id, {
|
self._push(session_id, {
|
||||||
"type": "result",
|
"type": "result",
|
||||||
"is_error": True,
|
"is_error": True,
|
||||||
"result": error_msg,
|
"result": user_msg,
|
||||||
"usage": {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0},
|
"usage": {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0},
|
||||||
"total_cost_usd": 0,
|
"total_cost_usd": 0,
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -294,11 +294,27 @@ class TestTaskHistoryTrim:
|
|||||||
class TestConversationCompaction:
|
class TestConversationCompaction:
|
||||||
def test_compactor_preserves_last_user_and_compacts_old_tool_results(self):
|
def test_compactor_preserves_last_user_and_compacts_old_tool_results(self):
|
||||||
compactor = ContextCompactor(max_tokens=999999)
|
compactor = ContextCompactor(max_tokens=999999)
|
||||||
|
# Los assistants llevan sus tool_calls: sin ellos los `role: tool`
|
||||||
|
# serian huerfanos y `_enforce_tool_pairing` los convertiria a user.
|
||||||
messages = [
|
messages = [
|
||||||
{"role": "user", "content": "Contexto anterior " * 10},
|
{"role": "user", "content": "Contexto anterior " * 10},
|
||||||
{"role": "assistant", "content": "Voy a revisar el modulo ahora mismo. " * 6},
|
{
|
||||||
|
"role": "assistant",
|
||||||
|
"content": "Voy a revisar el modulo ahora mismo. " * 6,
|
||||||
|
"tool_calls": [
|
||||||
|
{"id": "tool-1", "type": "function",
|
||||||
|
"function": {"name": "t", "arguments": "{}"}},
|
||||||
|
],
|
||||||
|
},
|
||||||
{"role": "tool", "tool_call_id": "tool-1", "content": "resultado antiguo\n" * 80},
|
{"role": "tool", "tool_call_id": "tool-1", "content": "resultado antiguo\n" * 80},
|
||||||
{"role": "assistant", "content": "He visto el resultado anterior. " * 6},
|
{
|
||||||
|
"role": "assistant",
|
||||||
|
"content": "He visto el resultado anterior. " * 6,
|
||||||
|
"tool_calls": [
|
||||||
|
{"id": "tool-2", "type": "function",
|
||||||
|
"function": {"name": "t", "arguments": "{}"}},
|
||||||
|
],
|
||||||
|
},
|
||||||
{"role": "tool", "tool_call_id": "tool-2", "content": "resultado reciente\n" * 80},
|
{"role": "tool", "tool_call_id": "tool-2", "content": "resultado reciente\n" * 80},
|
||||||
{"role": "user", "content": "Este es el ultimo mensaje del usuario y debe quedar intacto."},
|
{"role": "user", "content": "Este es el ultimo mensaje del usuario y debe quedar intacto."},
|
||||||
]
|
]
|
||||||
@@ -358,9 +374,18 @@ class TestConversationCompaction:
|
|||||||
|
|
||||||
def test_compactor_only_touches_user_messages_as_last_resort(self):
|
def test_compactor_only_touches_user_messages_as_last_resort(self):
|
||||||
compactor = ContextCompactor(max_tokens=999999)
|
compactor = ContextCompactor(max_tokens=999999)
|
||||||
|
# tool_calls en el assistant para que el `role: tool` no sea huerfano
|
||||||
|
# (el invariante `_enforce_tool_pairing` convertiria un huerfano a user).
|
||||||
messages = [
|
messages = [
|
||||||
{"role": "user", "content": "Contexto previo del usuario " * 8},
|
{"role": "user", "content": "Contexto previo del usuario " * 8},
|
||||||
{"role": "assistant", "content": "Respuesta previa del asistente " * 6},
|
{
|
||||||
|
"role": "assistant",
|
||||||
|
"content": "Respuesta previa del asistente " * 6,
|
||||||
|
"tool_calls": [
|
||||||
|
{"id": "tool-1", "type": "function",
|
||||||
|
"function": {"name": "t", "arguments": "{}"}},
|
||||||
|
],
|
||||||
|
},
|
||||||
{"role": "tool", "tool_call_id": "tool-1", "content": "resultado viejo\n" * 80},
|
{"role": "tool", "tool_call_id": "tool-1", "content": "resultado viejo\n" * 80},
|
||||||
{"role": "user", "content": "Ultimo mensaje del usuario"},
|
{"role": "user", "content": "Ultimo mensaje del usuario"},
|
||||||
]
|
]
|
||||||
|
|||||||
585
tests/test_tool_pairing_real.py
Normal file
585
tests/test_tool_pairing_real.py
Normal file
@@ -0,0 +1,585 @@
|
|||||||
|
"""Tests de REGRESION REAL del invariante tool_use ↔ tool_result.
|
||||||
|
|
||||||
|
A diferencia del resto de tests (que replican logica), este archivo importa el
|
||||||
|
codigo REAL de src/. Cubre el bug de produccion: sesiones largas (~130k tokens)
|
||||||
|
donde `compact_conversation` colapsaba assistants a "[ASSISTANT COMPACTADO]"
|
||||||
|
perdiendo los bloques `tool_use`, dejando tool_results huerfanos que el adapter
|
||||||
|
emitia como `role: tool` sin `tool_calls` → 400 del proveedor en cada reintento.
|
||||||
|
|
||||||
|
Requiere las dependencias de src/ (pydantic, Python 3.11+). Si no estan
|
||||||
|
disponibles (p.ej. host con Python 3.10), el modulo entero se salta — ejecutar
|
||||||
|
dentro del container: `docker exec acai-agentic python3 -m pytest ...`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
try:
|
||||||
|
from src.context.compactor import ContextCompactor
|
||||||
|
except Exception as e: # pragma: no cover - entorno sin deps de src/
|
||||||
|
pytest.skip(f"src/ no importable en este entorno: {e}", allow_module_level=True)
|
||||||
|
|
||||||
|
|
||||||
|
# =====================================================================
|
||||||
|
# Helper de validacion reutilizable
|
||||||
|
# =====================================================================
|
||||||
|
|
||||||
|
|
||||||
|
def collect_tool_use_ids(message: dict) -> set:
|
||||||
|
"""IDs de tool calls de un assistant (Anthropic blocks + OpenAI legacy)."""
|
||||||
|
ids = set()
|
||||||
|
content = message.get("content")
|
||||||
|
if isinstance(content, list):
|
||||||
|
for b in content:
|
||||||
|
if isinstance(b, dict) and b.get("type") == "tool_use":
|
||||||
|
ids.add(str(b.get("id", "")))
|
||||||
|
for tc in message.get("tool_calls") or []:
|
||||||
|
if isinstance(tc, dict):
|
||||||
|
ids.add(str(tc.get("id", "")))
|
||||||
|
ids.discard("")
|
||||||
|
return ids
|
||||||
|
|
||||||
|
|
||||||
|
def assert_tool_pairing_ok(messages: list) -> None:
|
||||||
|
"""Valida el invariante completo sobre una lista de mensajes internos:
|
||||||
|
|
||||||
|
- Todo tool_result (block) referencia un tool_use del assistant anterior.
|
||||||
|
- Todo tool_use (block) tiene su tool_result en el mensaje siguiente.
|
||||||
|
- Todo `role: tool` legacy responde a un tool_call del assistant previo.
|
||||||
|
"""
|
||||||
|
for i, msg in enumerate(messages):
|
||||||
|
role = msg.get("role")
|
||||||
|
content = msg.get("content")
|
||||||
|
|
||||||
|
if role == "user" and isinstance(content, list):
|
||||||
|
result_ids = {
|
||||||
|
str(b.get("tool_use_id", ""))
|
||||||
|
for b in content
|
||||||
|
if isinstance(b, dict) and b.get("type") == "tool_result"
|
||||||
|
}
|
||||||
|
if result_ids:
|
||||||
|
assert i > 0, f"msg[{i}]: tool_result al inicio de la conversacion"
|
||||||
|
prev = messages[i - 1]
|
||||||
|
assert prev.get("role") == "assistant", (
|
||||||
|
f"msg[{i}]: tool_result sin assistant inmediatamente anterior"
|
||||||
|
)
|
||||||
|
available = collect_tool_use_ids(prev)
|
||||||
|
orphans = result_ids - available
|
||||||
|
assert not orphans, (
|
||||||
|
f"msg[{i}]: tool_result huerfanos {orphans} "
|
||||||
|
f"(assistant previo solo tiene {available})"
|
||||||
|
)
|
||||||
|
|
||||||
|
if role == "assistant":
|
||||||
|
tool_ids = collect_tool_use_ids(msg)
|
||||||
|
if tool_ids:
|
||||||
|
answered = set()
|
||||||
|
j = i + 1
|
||||||
|
if (
|
||||||
|
j < len(messages)
|
||||||
|
and messages[j].get("role") == "user"
|
||||||
|
and isinstance(messages[j].get("content"), list)
|
||||||
|
):
|
||||||
|
for b in messages[j]["content"]:
|
||||||
|
if isinstance(b, dict) and b.get("type") == "tool_result":
|
||||||
|
answered.add(str(b.get("tool_use_id", "")))
|
||||||
|
j += 1
|
||||||
|
while j < len(messages) and messages[j].get("role") == "tool":
|
||||||
|
answered.add(str(messages[j].get("tool_call_id", "")))
|
||||||
|
j += 1
|
||||||
|
unanswered = tool_ids - answered
|
||||||
|
assert not unanswered, (
|
||||||
|
f"msg[{i}]: tool_use sin respuesta {unanswered}"
|
||||||
|
)
|
||||||
|
|
||||||
|
if role == "tool":
|
||||||
|
prev_assistant = None
|
||||||
|
for k in range(i - 1, -1, -1):
|
||||||
|
if messages[k].get("role") == "tool":
|
||||||
|
continue
|
||||||
|
if messages[k].get("role") == "assistant":
|
||||||
|
prev_assistant = messages[k]
|
||||||
|
break
|
||||||
|
assert prev_assistant is not None, (
|
||||||
|
f"msg[{i}]: role tool sin assistant previo"
|
||||||
|
)
|
||||||
|
call_id = str(msg.get("tool_call_id", ""))
|
||||||
|
assert call_id in collect_tool_use_ids(prev_assistant), (
|
||||||
|
f"msg[{i}]: role tool con tool_call_id={call_id} no presente "
|
||||||
|
f"en el assistant previo"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def make_turn(n: int, payload_chars: int = 4000) -> list:
|
||||||
|
"""Genera un turno completo: user → assistant(thinking+text+tool_use) →
|
||||||
|
user(tool_result). Payloads grandes para forzar la compactacion."""
|
||||||
|
tid = f"call_{n}"
|
||||||
|
return [
|
||||||
|
{"role": "user", "content": f"Peticion {n}: " + ("x" * payload_chars)},
|
||||||
|
{
|
||||||
|
"role": "assistant",
|
||||||
|
"content": [
|
||||||
|
{"type": "thinking", "thinking": "razonando " * (payload_chars // 10)},
|
||||||
|
{"type": "text", "text": f"Voy a ejecutar la tool del turno {n}."},
|
||||||
|
{
|
||||||
|
"type": "tool_use",
|
||||||
|
"id": tid,
|
||||||
|
"name": "acai_get_records",
|
||||||
|
"input": {"tableName": f"tabla_{n}"},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"role": "user",
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"type": "tool_result",
|
||||||
|
"tool_use_id": tid,
|
||||||
|
"content": "resultado " * (payload_chars // 10),
|
||||||
|
}
|
||||||
|
],
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
# =====================================================================
|
||||||
|
# (a) compact_conversation end-to-end: el paso de ultimo recurso ya no
|
||||||
|
# deja tool_results huerfanos ni tool_use sin respuesta
|
||||||
|
# =====================================================================
|
||||||
|
|
||||||
|
|
||||||
|
class TestCompactConversationPairing:
|
||||||
|
def test_last_resort_does_not_orphan_tool_results(self):
|
||||||
|
compactor = ContextCompactor()
|
||||||
|
messages = []
|
||||||
|
for n in range(12):
|
||||||
|
messages.extend(make_turn(n, payload_chars=6000))
|
||||||
|
messages.append({"role": "user", "content": "ultima peticion del usuario"})
|
||||||
|
|
||||||
|
# Presupuesto minusculo: fuerza TODOS los pasos incluida la colapsa
|
||||||
|
# de listas a placeholder string (el paso que causaba el bug).
|
||||||
|
compacted, meta = compactor.compact_conversation(messages, max_tokens=300)
|
||||||
|
|
||||||
|
assert meta["output_tokens"] < meta["input_tokens"]
|
||||||
|
assert_tool_pairing_ok(compacted)
|
||||||
|
|
||||||
|
def test_moderate_budget_keeps_pairing(self):
|
||||||
|
compactor = ContextCompactor()
|
||||||
|
messages = []
|
||||||
|
for n in range(8):
|
||||||
|
messages.extend(make_turn(n, payload_chars=3000))
|
||||||
|
messages.append({"role": "user", "content": "peticion final"})
|
||||||
|
|
||||||
|
compacted, _ = compactor.compact_conversation(messages, max_tokens=2000)
|
||||||
|
assert_tool_pairing_ok(compacted)
|
||||||
|
|
||||||
|
def test_under_budget_passthrough_keeps_pairing(self):
|
||||||
|
compactor = ContextCompactor()
|
||||||
|
messages = make_turn(1, payload_chars=50)
|
||||||
|
compacted, meta = compactor.compact_conversation(messages, max_tokens=100_000)
|
||||||
|
assert meta["messages_compacted"] == 0
|
||||||
|
assert_tool_pairing_ok(compacted)
|
||||||
|
# Los tool_use/tool_result originales se conservan intactos
|
||||||
|
assert collect_tool_use_ids(compacted[1]) == {"call_1"}
|
||||||
|
|
||||||
|
def test_last_user_message_preserved(self):
|
||||||
|
compactor = ContextCompactor()
|
||||||
|
messages = []
|
||||||
|
for n in range(10):
|
||||||
|
messages.extend(make_turn(n, payload_chars=5000))
|
||||||
|
final = "esta es la peticion actual que NO debe perderse"
|
||||||
|
messages.append({"role": "user", "content": final})
|
||||||
|
|
||||||
|
compacted, _ = compactor.compact_conversation(messages, max_tokens=300)
|
||||||
|
assert compacted[-1]["content"] == final
|
||||||
|
|
||||||
|
|
||||||
|
# =====================================================================
|
||||||
|
# (b) _enforce_tool_pairing directo
|
||||||
|
# =====================================================================
|
||||||
|
|
||||||
|
|
||||||
|
class TestEnforceToolPairing:
|
||||||
|
def setup_method(self):
|
||||||
|
self.compactor = ContextCompactor()
|
||||||
|
|
||||||
|
def test_collapsed_assistant_with_orphan_tool_results(self):
|
||||||
|
"""Assistant colapsado a string + user con tool_results → los
|
||||||
|
tool_result se convierten en placeholder."""
|
||||||
|
messages = [
|
||||||
|
{"role": "assistant", "content": "[ASSISTANT COMPACTADO]"},
|
||||||
|
{
|
||||||
|
"role": "user",
|
||||||
|
"content": [
|
||||||
|
{"type": "tool_result", "tool_use_id": "call_a", "content": "datos"},
|
||||||
|
{"type": "tool_result", "tool_use_id": "call_b", "content": "mas datos"},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
]
|
||||||
|
repaired = self.compactor._enforce_tool_pairing(messages)
|
||||||
|
assert_tool_pairing_ok(repaired)
|
||||||
|
# Solo placeholders → content string (fusionados en uno)
|
||||||
|
assert repaired[1]["role"] == "user"
|
||||||
|
assert repaired[1]["content"] == "[Resultado de herramienta compactado]"
|
||||||
|
|
||||||
|
def test_orphan_tool_results_mixed_with_text(self):
|
||||||
|
"""tool_result huerfano junto a un bloque text → placeholder en lista,
|
||||||
|
el text se conserva."""
|
||||||
|
messages = [
|
||||||
|
{"role": "assistant", "content": "[ASSISTANT COMPACTADO]"},
|
||||||
|
{
|
||||||
|
"role": "user",
|
||||||
|
"content": [
|
||||||
|
{"type": "tool_result", "tool_use_id": "call_a", "content": "datos"},
|
||||||
|
{"type": "text", "text": "y ademas haz esto"},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
]
|
||||||
|
repaired = self.compactor._enforce_tool_pairing(messages)
|
||||||
|
assert_tool_pairing_ok(repaired)
|
||||||
|
content = repaired[1]["content"]
|
||||||
|
assert isinstance(content, list)
|
||||||
|
types = [b.get("type") for b in content]
|
||||||
|
assert types == ["text", "text"]
|
||||||
|
assert content[0]["text"] == "[Resultado de herramienta compactado]"
|
||||||
|
assert content[1]["text"] == "y ademas haz esto"
|
||||||
|
|
||||||
|
def test_partial_id_mismatch_drops_unanswered_tool_use(self):
|
||||||
|
"""Assistant con 3 tool_use, user con solo 2 tool_result → se elimina
|
||||||
|
el tool_use sin respuesta, thinking/text intactos."""
|
||||||
|
messages = [
|
||||||
|
{
|
||||||
|
"role": "assistant",
|
||||||
|
"content": [
|
||||||
|
{"type": "thinking", "thinking": "pensando"},
|
||||||
|
{"type": "text", "text": "ejecuto tres tools"},
|
||||||
|
{"type": "tool_use", "id": "c1", "name": "t1", "input": {}},
|
||||||
|
{"type": "tool_use", "id": "c2", "name": "t2", "input": {}},
|
||||||
|
{"type": "tool_use", "id": "c3", "name": "t3", "input": {}},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"role": "user",
|
||||||
|
"content": [
|
||||||
|
{"type": "tool_result", "tool_use_id": "c1", "content": "r1"},
|
||||||
|
{"type": "tool_result", "tool_use_id": "c3", "content": "r3"},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
]
|
||||||
|
repaired = self.compactor._enforce_tool_pairing(messages)
|
||||||
|
assert_tool_pairing_ok(repaired)
|
||||||
|
assert collect_tool_use_ids(repaired[0]) == {"c1", "c3"}
|
||||||
|
types = [b.get("type") for b in repaired[0]["content"]]
|
||||||
|
assert "thinking" in types and "text" in types
|
||||||
|
|
||||||
|
def test_assistant_tool_use_with_no_results_at_all(self):
|
||||||
|
"""Assistant con tool_use y SIN user de resultados detras → se
|
||||||
|
eliminan los tool_use; si el content queda vacio, placeholder."""
|
||||||
|
messages = [
|
||||||
|
{
|
||||||
|
"role": "assistant",
|
||||||
|
"content": [
|
||||||
|
{"type": "tool_use", "id": "c9", "name": "t", "input": {}},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{"role": "user", "content": "otra cosa"},
|
||||||
|
]
|
||||||
|
repaired = self.compactor._enforce_tool_pairing(messages)
|
||||||
|
assert_tool_pairing_ok(repaired)
|
||||||
|
assert repaired[0]["content"] == "[ASSISTANT COMPACTADO]"
|
||||||
|
|
||||||
|
def test_legacy_orphan_role_tool_converted_to_user(self):
|
||||||
|
"""role:tool legacy cuyo assistant anterior no tiene tool_calls →
|
||||||
|
se convierte a user placeholder."""
|
||||||
|
messages = [
|
||||||
|
{"role": "assistant", "content": "[ASSISTANT COMPACTADO]"},
|
||||||
|
{"role": "tool", "tool_call_id": "call_x", "content": "salida tool"},
|
||||||
|
]
|
||||||
|
repaired = self.compactor._enforce_tool_pairing(messages)
|
||||||
|
assert_tool_pairing_ok(repaired)
|
||||||
|
assert repaired[1]["role"] == "user"
|
||||||
|
assert repaired[1]["content"] == "[Resultado de herramienta compactado]"
|
||||||
|
|
||||||
|
def test_legacy_valid_role_tool_untouched(self):
|
||||||
|
messages = [
|
||||||
|
{
|
||||||
|
"role": "assistant",
|
||||||
|
"content": "lanzo tool",
|
||||||
|
"tool_calls": [
|
||||||
|
{"id": "call_x", "type": "function",
|
||||||
|
"function": {"name": "t", "arguments": "{}"}},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{"role": "tool", "tool_call_id": "call_x", "content": "salida"},
|
||||||
|
]
|
||||||
|
repaired = self.compactor._enforce_tool_pairing(messages)
|
||||||
|
assert_tool_pairing_ok(repaired)
|
||||||
|
assert repaired[1]["role"] == "tool"
|
||||||
|
|
||||||
|
def test_well_paired_history_is_noop(self):
|
||||||
|
messages = make_turn(7, payload_chars=50)
|
||||||
|
repaired = self.compactor._enforce_tool_pairing(messages)
|
||||||
|
assert repaired == messages
|
||||||
|
|
||||||
|
|
||||||
|
# =====================================================================
|
||||||
|
# (c) Trim de recent_messages (OrchestratorEngine._trim_recent_messages)
|
||||||
|
# =====================================================================
|
||||||
|
|
||||||
|
|
||||||
|
orchestrator_engine = pytest.importorskip(
|
||||||
|
"src.orchestrator.engine",
|
||||||
|
reason="deps del orquestador (mcp, sse, redis) no disponibles",
|
||||||
|
)
|
||||||
|
OrchestratorEngine = orchestrator_engine.OrchestratorEngine
|
||||||
|
|
||||||
|
|
||||||
|
class TestTrimRecentMessages:
|
||||||
|
def _set_budget(self, monkeypatch, tokens: int):
|
||||||
|
from src.config import settings
|
||||||
|
monkeypatch.setattr(settings, "recent_messages_max_tokens", tokens)
|
||||||
|
|
||||||
|
def test_under_budget_untouched(self, monkeypatch):
|
||||||
|
self._set_budget(monkeypatch, 100_000)
|
||||||
|
messages = make_turn(0, payload_chars=100)
|
||||||
|
assert OrchestratorEngine._trim_recent_messages(list(messages)) == messages
|
||||||
|
|
||||||
|
def test_trims_oldest_whole_pairs(self, monkeypatch):
|
||||||
|
self._set_budget(monkeypatch, 500)
|
||||||
|
messages = []
|
||||||
|
for n in range(10):
|
||||||
|
messages.extend(make_turn(n, payload_chars=1000))
|
||||||
|
trimmed = OrchestratorEngine._trim_recent_messages(messages)
|
||||||
|
|
||||||
|
assert len(trimmed) < len(messages)
|
||||||
|
# Nunca se corta dentro de un par
|
||||||
|
assert_tool_pairing_ok(trimmed)
|
||||||
|
# El primer mensaje nunca es un carrier de tool_result ni role tool
|
||||||
|
first = trimmed[0]
|
||||||
|
assert first.get("role") != "tool"
|
||||||
|
if isinstance(first.get("content"), list):
|
||||||
|
assert not any(
|
||||||
|
isinstance(b, dict) and b.get("type") == "tool_result"
|
||||||
|
for b in first["content"]
|
||||||
|
)
|
||||||
|
# Se eliminan los mas antiguos: el final se conserva
|
||||||
|
assert trimmed[-1] == messages[-1]
|
||||||
|
|
||||||
|
def test_keeps_last_four_even_over_budget(self, monkeypatch):
|
||||||
|
self._set_budget(monkeypatch, 10) # presupuesto imposible
|
||||||
|
messages = []
|
||||||
|
for n in range(5):
|
||||||
|
messages.extend(make_turn(n, payload_chars=2000))
|
||||||
|
trimmed = OrchestratorEngine._trim_recent_messages(messages)
|
||||||
|
assert len(trimmed) >= 4
|
||||||
|
|
||||||
|
def test_pair_dragging_includes_legacy_tool_run(self, monkeypatch):
|
||||||
|
"""Un assistant legacy con tool_calls arrastra su run de role:tool."""
|
||||||
|
self._set_budget(monkeypatch, 300)
|
||||||
|
big = "y" * 3000
|
||||||
|
messages = [
|
||||||
|
{
|
||||||
|
"role": "assistant",
|
||||||
|
"content": big,
|
||||||
|
"tool_calls": [
|
||||||
|
{"id": "c1", "type": "function",
|
||||||
|
"function": {"name": "t", "arguments": "{}"}},
|
||||||
|
{"id": "c2", "type": "function",
|
||||||
|
"function": {"name": "t", "arguments": "{}"}},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{"role": "tool", "tool_call_id": "c1", "content": big},
|
||||||
|
{"role": "tool", "tool_call_id": "c2", "content": big},
|
||||||
|
{"role": "user", "content": "pregunta"},
|
||||||
|
{"role": "assistant", "content": "respuesta"},
|
||||||
|
{"role": "user", "content": "otra pregunta"},
|
||||||
|
{"role": "assistant", "content": "otra respuesta"},
|
||||||
|
]
|
||||||
|
trimmed = OrchestratorEngine._trim_recent_messages(messages)
|
||||||
|
# El par legacy entero (assistant + 2 tools) se elimino junto
|
||||||
|
assert trimmed[0] == {"role": "user", "content": "pregunta"}
|
||||||
|
assert_tool_pairing_ok(trimmed)
|
||||||
|
|
||||||
|
def test_append_recent_messages_applies_trim(self, monkeypatch):
|
||||||
|
self._set_budget(monkeypatch, 500)
|
||||||
|
existing = []
|
||||||
|
for n in range(10):
|
||||||
|
existing.extend(make_turn(n, payload_chars=1000))
|
||||||
|
merged = OrchestratorEngine._append_recent_messages(
|
||||||
|
existing, message="nueva peticion", conversation=[
|
||||||
|
{"role": "assistant", "content": "ok hecho"},
|
||||||
|
],
|
||||||
|
)
|
||||||
|
assert len(merged) < len(existing) + 2
|
||||||
|
assert merged[-1] == {"role": "assistant", "content": "ok hecho"}
|
||||||
|
assert_tool_pairing_ok(merged)
|
||||||
|
|
||||||
|
|
||||||
|
# =====================================================================
|
||||||
|
# (d) Guard defensivo del adapter (_repair_tool_sequence)
|
||||||
|
# =====================================================================
|
||||||
|
|
||||||
|
|
||||||
|
openai_mod = pytest.importorskip("openai", reason="SDK openai no instalado")
|
||||||
|
|
||||||
|
|
||||||
|
class TestRepairToolSequence:
|
||||||
|
@property
|
||||||
|
def repair(self):
|
||||||
|
from src.adapters.openai_adapter import OpenAIAdapter
|
||||||
|
return OpenAIAdapter._repair_tool_sequence
|
||||||
|
|
||||||
|
def test_valid_sequence_untouched(self):
|
||||||
|
msgs = [
|
||||||
|
{"role": "system", "content": "sys"},
|
||||||
|
{"role": "user", "content": "hola"},
|
||||||
|
{
|
||||||
|
"role": "assistant",
|
||||||
|
"content": None,
|
||||||
|
"tool_calls": [
|
||||||
|
{"id": "c1", "type": "function",
|
||||||
|
"function": {"name": "t", "arguments": "{}"}},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{"role": "tool", "tool_call_id": "c1", "content": "resultado"},
|
||||||
|
{"role": "assistant", "content": "listo"},
|
||||||
|
]
|
||||||
|
assert self.repair(list(msgs)) == msgs
|
||||||
|
|
||||||
|
def test_orphan_tool_message_converted_to_user(self):
|
||||||
|
msgs = [
|
||||||
|
{"role": "assistant", "content": "[ASSISTANT COMPACTADO]"},
|
||||||
|
{"role": "tool", "tool_call_id": "c_orphan", "content": "datos " * 200},
|
||||||
|
]
|
||||||
|
out = self.repair(msgs)
|
||||||
|
assert out[1]["role"] == "user"
|
||||||
|
assert out[1]["content"].startswith(
|
||||||
|
"[Resultado de herramienta (contexto compactado)]: "
|
||||||
|
)
|
||||||
|
# Content truncado a 500 chars (+ prefijo)
|
||||||
|
assert len(out[1]["content"]) <= 500 + len(
|
||||||
|
"[Resultado de herramienta (contexto compactado)]: "
|
||||||
|
)
|
||||||
|
assert not any(m.get("role") == "tool" for m in out)
|
||||||
|
|
||||||
|
def test_unanswered_tool_calls_removed(self):
|
||||||
|
msgs = [
|
||||||
|
{
|
||||||
|
"role": "assistant",
|
||||||
|
"content": None,
|
||||||
|
"tool_calls": [
|
||||||
|
{"id": "c1", "type": "function",
|
||||||
|
"function": {"name": "t", "arguments": "{}"}},
|
||||||
|
{"id": "c2", "type": "function",
|
||||||
|
"function": {"name": "t", "arguments": "{}"}},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{"role": "tool", "tool_call_id": "c1", "content": "r1"},
|
||||||
|
{"role": "user", "content": "sigue"},
|
||||||
|
]
|
||||||
|
out = self.repair(msgs)
|
||||||
|
assert [tc["id"] for tc in out[0]["tool_calls"]] == ["c1"]
|
||||||
|
assert out[1] == {"role": "tool", "tool_call_id": "c1", "content": "r1"}
|
||||||
|
|
||||||
|
def test_all_tool_calls_unanswered_drops_key_and_sets_content(self):
|
||||||
|
msgs = [
|
||||||
|
{
|
||||||
|
"role": "assistant",
|
||||||
|
"content": None,
|
||||||
|
"tool_calls": [
|
||||||
|
{"id": "c1", "type": "function",
|
||||||
|
"function": {"name": "t", "arguments": "{}"}},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{"role": "user", "content": "sigue"},
|
||||||
|
]
|
||||||
|
out = self.repair(msgs)
|
||||||
|
assert "tool_calls" not in out[0]
|
||||||
|
assert out[0]["content"] # nunca None sin tool_calls
|
||||||
|
|
||||||
|
def test_reasoning_promoted_when_tool_calls_dropped(self):
|
||||||
|
"""No romper la promocion de reasoning a content del fix anterior."""
|
||||||
|
msgs = [
|
||||||
|
{
|
||||||
|
"role": "assistant",
|
||||||
|
"content": None,
|
||||||
|
"reasoning_content": "razonamiento del modelo",
|
||||||
|
"tool_calls": [
|
||||||
|
{"id": "c1", "type": "function",
|
||||||
|
"function": {"name": "t", "arguments": "{}"}},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{"role": "user", "content": "sigue"},
|
||||||
|
]
|
||||||
|
out = self.repair(msgs)
|
||||||
|
assert "tool_calls" not in out[0]
|
||||||
|
assert out[0]["content"] == "razonamiento del modelo"
|
||||||
|
assert "reasoning_content" not in out[0]
|
||||||
|
|
||||||
|
def test_mixed_orphan_in_tool_block(self):
|
||||||
|
"""Un huerfano en medio de un bloque de tools validos se convierte a
|
||||||
|
user DESPUES del bloque (no rompe la contiguidad assistant→tools)."""
|
||||||
|
msgs = [
|
||||||
|
{
|
||||||
|
"role": "assistant",
|
||||||
|
"content": None,
|
||||||
|
"tool_calls": [
|
||||||
|
{"id": "c1", "type": "function",
|
||||||
|
"function": {"name": "t", "arguments": "{}"}},
|
||||||
|
{"id": "c2", "type": "function",
|
||||||
|
"function": {"name": "t", "arguments": "{}"}},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{"role": "tool", "tool_call_id": "c1", "content": "r1"},
|
||||||
|
{"role": "tool", "tool_call_id": "huerfano", "content": "rx"},
|
||||||
|
{"role": "tool", "tool_call_id": "c2", "content": "r2"},
|
||||||
|
{"role": "user", "content": "sigue"},
|
||||||
|
]
|
||||||
|
out = self.repair(msgs)
|
||||||
|
roles = [m["role"] for m in out]
|
||||||
|
assert roles == ["assistant", "tool", "tool", "user", "user"]
|
||||||
|
assert out[1]["tool_call_id"] == "c1"
|
||||||
|
assert out[2]["tool_call_id"] == "c2"
|
||||||
|
assert out[3]["content"].startswith("[Resultado de herramienta")
|
||||||
|
|
||||||
|
|
||||||
|
class TestAdapterEndToEnd:
|
||||||
|
"""_to_openai_messages + guard sobre un historial roto realista."""
|
||||||
|
|
||||||
|
def test_collapsed_assistant_history_produces_valid_openai_sequence(self):
|
||||||
|
from src.adapters.openai_adapter import OpenAIAdapter
|
||||||
|
adapter = OpenAIAdapter.__new__(OpenAIAdapter) # sin cliente real
|
||||||
|
|
||||||
|
internal = [
|
||||||
|
{"role": "system", "content": "eres un agente"},
|
||||||
|
{"role": "user", "content": "haz algo"},
|
||||||
|
# Assistant colapsado por el compactor (perdio sus tool_use)
|
||||||
|
{"role": "assistant", "content": "[ASSISTANT COMPACTADO]"},
|
||||||
|
# …pero el user conserva sus tool_results (el bug de produccion)
|
||||||
|
{
|
||||||
|
"role": "user",
|
||||||
|
"content": [
|
||||||
|
{"type": "tool_result", "tool_use_id": "call_1", "content": "datos"},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{"role": "assistant", "content": "termine"},
|
||||||
|
{"role": "user", "content": "siguiente peticion"},
|
||||||
|
]
|
||||||
|
out = adapter._to_openai_messages(internal)
|
||||||
|
# Contrato OpenAI: ningun role:tool sin tool_calls previo
|
||||||
|
for i, m in enumerate(out):
|
||||||
|
if m.get("role") == "tool":
|
||||||
|
assert i > 0
|
||||||
|
prev = out[i - 1]
|
||||||
|
prev_ids = set()
|
||||||
|
k = i - 1
|
||||||
|
while k >= 0 and out[k].get("role") == "tool":
|
||||||
|
k -= 1
|
||||||
|
if k >= 0 and out[k].get("role") == "assistant":
|
||||||
|
prev_ids = {
|
||||||
|
tc.get("id") for tc in out[k].get("tool_calls") or []
|
||||||
|
}
|
||||||
|
assert m.get("tool_call_id") in prev_ids, (
|
||||||
|
f"role tool huerfano en out[{i}]"
|
||||||
|
)
|
||||||
|
# El tool_result huerfano acabo como user, no como role tool
|
||||||
|
assert not any(m.get("role") == "tool" for m in out)
|
||||||
Reference in New Issue
Block a user