Compare commits
8 Commits
9854960c7c
...
9277862e56
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9277862e56 | ||
|
|
79ec267aa6 | ||
|
|
43337e8554 | ||
|
|
6a03fdf284 | ||
|
|
e34a39e3bf | ||
|
|
d6b04e4122 | ||
|
|
96b4542918 | ||
|
|
454b51b45d |
@@ -4,7 +4,11 @@ description: "Agente genérico de Acai CMS: crea módulos, edita contenido, gest
|
||||
icon: "code"
|
||||
category: "development"
|
||||
temperature: 0.2
|
||||
max_tokens: 4096
|
||||
# 16K de salida: cubre escribir un fichero entero (acai_write) + el razonamiento
|
||||
# (thinking) en un solo turno. Con 4096 el JSON del tool_use se truncaba a mitad
|
||||
# en ficheros medianos y el agente caia en micro-ediciones lentas. v4-pro soporta
|
||||
# hasta 384K de salida, asi que 16K es conservador.
|
||||
max_tokens: 16384
|
||||
context_sections:
|
||||
- immutable_rules
|
||||
- project_profile
|
||||
|
||||
@@ -74,6 +74,26 @@ cms/data/schema/ # .ini.php — SOLO con tools de schema
|
||||
14. **URL del proyecto**: `get_web_url` + `?pruebas=1` siempre.
|
||||
15. **Operaciones destructivas**: confirma con el usuario antes de ejecutar.
|
||||
|
||||
# Eficiencia de edición (menos pasos Y menos tokens)
|
||||
|
||||
Elige la herramienta por el TAMAÑO del cambio. Ni micro-editar todo (muchos
|
||||
pasos), ni reescribir el fichero entero por cada retoque (muchos tokens):
|
||||
|
||||
1. **Cambio pequeño o localizado** (un color, un valor, una regla, pocas zonas)
|
||||
→ `acai-line-replace`. Barato: solo emites las líneas que cambian. NO
|
||||
reescribas el fichero entero por un retoque.
|
||||
2. **Creación o reescritura mayor** (cambias casi todo el fichero o lo creas de
|
||||
cero) → UN solo `acai-write` del fichero completo. Reescribir entero por un
|
||||
cambio pequeño desperdicia tokens; hazlo solo cuando de verdad cambia casi todo.
|
||||
3. **Itera con `line-replace`, no con writes repetidos.** Tras ver el resultado
|
||||
en el navegador, aplica los ajustes con `line-replace` puntuales. NO reescribas
|
||||
el fichero completo en cada iteración de diseño.
|
||||
4. **Cap de micro-ediciones.** Si te ves haciendo >4-5 `line-replace` sobre el
|
||||
mismo fichero en un turno, para y reescríbelo entero de una vez (`acai-write`).
|
||||
5. **NO hagas `acai-view` tras cada edición.** Ya tienes el contenido en contexto;
|
||||
reléelo solo si una edición falló o dudas del estado real.
|
||||
6. **Verificación visual al final, una sola pasada** — no tras cada retoque.
|
||||
|
||||
# Patrones canónicos (aplica por defecto)
|
||||
|
||||
- **Detalle de registro**: sección `custom-{tableName}` con `thisrecord.*`.
|
||||
|
||||
@@ -15,6 +15,13 @@ export const CONFIG_FILE_PATH =
|
||||
|
||||
export const MCP_PORT = Number(process.env.MCP_PORT || 3000);
|
||||
export const MONITOR_PORT = Number(process.env.MCP_MONITOR_PORT || 4545);
|
||||
// El monitor HTTP (UI + POST /retry) queda DESACTIVADO por defecto. Solo se
|
||||
// arranca si MCP_MONITOR_ENABLED === 'true' de forma explicita.
|
||||
export const MONITOR_ENABLED =
|
||||
String(process.env.MCP_MONITOR_ENABLED || "").toLowerCase() === "true";
|
||||
// Por seguridad escucha solo en loopback salvo que se defina MCP_MONITOR_HOST.
|
||||
export const MONITOR_HOST = process.env.MCP_MONITOR_HOST || "127.0.0.1";
|
||||
// Compatibilidad: si alguien fuerza MCP_MONITOR_DISABLED tambien lo respetamos.
|
||||
export const MONITOR_DISABLED =
|
||||
String(process.env.MCP_MONITOR_DISABLED || "").toLowerCase() === "1" ||
|
||||
String(process.env.MCP_MONITOR_DISABLED || "").toLowerCase() === "true";
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
*/
|
||||
|
||||
// Load configuration first
|
||||
import { loadLocalConfigProfile, applyProfileToEnv } from "./config/index.js";
|
||||
import { loadLocalConfigProfile, applyProfileToEnv, MONITOR_ENABLED, MONITOR_DISABLED } from "./config/index.js";
|
||||
|
||||
// Load and apply config profile (backward compatibility)
|
||||
const selectedProfile = loadLocalConfigProfile();
|
||||
@@ -30,8 +30,11 @@ import { registerResources } from "./resources/index.js";
|
||||
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
setRegistrationFunctions({ registerPrompts, registerTools, registerResources });
|
||||
|
||||
// Create the shared request monitor (will be applied to each session server)
|
||||
const requestMonitor = createRequestMonitor();
|
||||
// Create the shared request monitor (will be applied to each session server).
|
||||
// Solo se crea si el monitor esta habilitado: asi no acumulamos historial en
|
||||
// memoria ni envolvemos los handlers cuando la UI esta apagada (por defecto).
|
||||
const monitorActive = MONITOR_ENABLED && !MONITOR_DISABLED;
|
||||
const requestMonitor = monitorActive ? createRequestMonitor() : null;
|
||||
|
||||
// Create a server instance for retry functionality in the monitor UI
|
||||
const server = createMcpServer();
|
||||
|
||||
@@ -2,7 +2,7 @@ import http from "node:http";
|
||||
import fsPromises from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
import { MONITOR_PORT, MONITOR_DISABLED } from "./config/index.js";
|
||||
import { MONITOR_PORT, MONITOR_HOST, MONITOR_ENABLED, MONITOR_DISABLED } from "./config/index.js";
|
||||
import { sessionCredentials } from "./auth/credentials.js";
|
||||
import { activeSessions } from "./httpServer.js";
|
||||
|
||||
@@ -84,8 +84,8 @@ export function broadcastSessionsUpdate() {
|
||||
* Start the monitor HTTP server
|
||||
*/
|
||||
export function startMonitorServer(requestMonitor, toolHandlers) {
|
||||
if (MONITOR_DISABLED) {
|
||||
console.error("MCP monitor UI deshabilitada (MCP_MONITOR_DISABLED=1).");
|
||||
if (!MONITOR_ENABLED || MONITOR_DISABLED) {
|
||||
console.error("[monitor] deshabilitado (MCP_MONITOR_ENABLED!=true)");
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -202,12 +202,12 @@ export function startMonitorServer(requestMonitor, toolHandlers) {
|
||||
|
||||
monitorServer.on("error", (error) => {
|
||||
console.warn(
|
||||
`[monitor] No se pudo iniciar la UI en el puerto ${MONITOR_PORT}: ${error.message}. Establece MCP_MONITOR_DISABLED=1 para ocultar este aviso.`
|
||||
`[monitor] No se pudo iniciar la UI en ${MONITOR_HOST}:${MONITOR_PORT}: ${error.message}. Desactiva MCP_MONITOR_ENABLED para ocultar este aviso.`
|
||||
);
|
||||
});
|
||||
|
||||
monitorServer.listen(MONITOR_PORT, '0.0.0.0', () => {
|
||||
console.error(`MCP monitor UI: http://0.0.0.0:${MONITOR_PORT}/monitor`);
|
||||
monitorServer.listen(MONITOR_PORT, MONITOR_HOST, () => {
|
||||
console.error(`MCP monitor UI: http://${MONITOR_HOST}:${MONITOR_PORT}/monitor`);
|
||||
});
|
||||
|
||||
// Broadcast sessions + stats update every 2 seconds for real-time monitoring
|
||||
|
||||
@@ -1,20 +1,41 @@
|
||||
import fs from "node:fs/promises";
|
||||
import { existsSync } from "node:fs";
|
||||
import path from "node:path";
|
||||
|
||||
/**
|
||||
* Lectura directa de los markdown del knowledge base desde el filesystem.
|
||||
*
|
||||
* El MCP server corre dentro del container `agentic` junto al FastAPI, asi
|
||||
* que los .md viven en `/app/docs/` (la imagen los copia ahi).
|
||||
*
|
||||
* En caso de override por entorno, respeta `ACAI_DOCS_DIR`. En desarrollo
|
||||
* fuera del container, fallback a paths relativos al cwd.
|
||||
* Orden de resolucion del directorio de docs:
|
||||
* 1. `ACAI_DOCS_DIR` — override explicito por entorno (si esta definido y no vacio).
|
||||
* 2. `<ACAI_PROJECT_DIR>/docs` — caso principal: cada proyecto/web tiene su
|
||||
* propio `docs/`. El `.mcp.json` inyecta `ACAI_PROJECT_DIR` (p.ej.
|
||||
* `/opt/acai/webs/<user>/<site>`), funciona tanto en local (VSCode) como
|
||||
* en cloud (agentic).
|
||||
* 3. `/app/docs` — fallback final: container `agentic` donde esta horneada la
|
||||
* copia canonica de los .md.
|
||||
*/
|
||||
|
||||
function dirExists(p) {
|
||||
try {
|
||||
return existsSync(p);
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
function resolveDocsDir() {
|
||||
// 1. Override explicito
|
||||
const override = process.env.ACAI_DOCS_DIR;
|
||||
if (override) return override;
|
||||
// Container path
|
||||
if (override && override.trim() !== "") return override;
|
||||
|
||||
// 2. Docs del proyecto/web
|
||||
const projectDir = process.env.ACAI_PROJECT_DIR;
|
||||
if (projectDir && projectDir.trim() !== "") {
|
||||
const projectDocs = path.join(projectDir, "docs");
|
||||
if (dirExists(projectDocs)) return projectDocs;
|
||||
}
|
||||
|
||||
// 3. Fallback al container agentic
|
||||
return "/app/docs";
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ pydantic-settings>=2.7.0,<3.0.0
|
||||
redis[hiredis]>=5.2.0,<6.0.0
|
||||
anthropic>=0.42.0,<1.0.0
|
||||
openai>=1.60.0,<2.0.0
|
||||
litellm==1.80.0
|
||||
httpx>=0.28.0,<1.0.0
|
||||
sse-starlette>=2.2.0,<3.0.0
|
||||
tiktoken>=0.7.0,<1.0.0
|
||||
|
||||
@@ -17,20 +17,24 @@ from .base import ModelAdapter, ModelConfig, ModelResponse, StreamChunk
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Algunos fine-tunes (sobre todo MiniMax) ocasionalmente emiten las tool calls
|
||||
# como texto literal en lugar de usar los `tool_use` blocks nativos. Vistos
|
||||
# tres formatos:
|
||||
# Algunos fine-tunes (sobre todo MiniMax y DeepSeek) ocasionalmente emiten las
|
||||
# tool calls como texto literal en lugar de usar los `tool_use` blocks nativos.
|
||||
# Vistos cuatro formatos:
|
||||
# 1) <minimax:tool_call><invoke name="X"><parameter name="P">V</parameter></invoke></minimax:tool_call>
|
||||
# 2) <invoke name="X"><parameter name="P">V</parameter></invoke> (sin minimax wrapper)
|
||||
# 3) <tool_call>{"name":"X","parameters":{...}}{"name":"Y","parameters":{...}}</tool_call>
|
||||
# (multiples tool calls JSON-encoded dentro de un solo wrapper)
|
||||
# 4) <||DSML||tool_calls><||DSML||invoke name="X"><||DSML||parameter name="P" string="true">V</||DSML||parameter></||DSML||invoke></||DSML||tool_calls>
|
||||
# (formato DSML de DeepSeek — usa U+FF5C fullwidth vertical line como separador)
|
||||
#
|
||||
# Cuando eso pasa el orquestador ve "texto" y la tool nunca se ejecuta — el
|
||||
# usuario ve el markup crudo en el chat. Detectamos y convertimos a tool_use
|
||||
# sintetico mientras streameamos. Es un parche defensivo: el caso normal
|
||||
# (tool_use blocks) sigue por el camino estandar.
|
||||
_TOOL_CALL_OPEN_RE = re.compile(
|
||||
r"<(?:minimax:tool_call|invoke\s+name|tool_call\s*>)|\[TOOL_CALL\]",
|
||||
# `<|` (U+FF5C) cubre cualquier special-token DeepSeek (DSML): <|DSML|invoke,
|
||||
# <|tool_calls, etc. Tolerante a 1+ pipes y a la presencia/ausencia de "DSML".
|
||||
r"<(?:minimax:tool_call|invoke\s+name|tool_call[\s>]|use_mcp_tool|mm_special)|\[TOOL_CALL\]|<|",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
_INVOKE_RE = re.compile(
|
||||
@@ -65,6 +69,21 @@ _PERL_ARGS_BLOCK_RE = re.compile(
|
||||
_PERL_KV_RE = re.compile(
|
||||
r"--([a-zA-Z_][a-zA-Z0-9_]*)\s+(\"[^\"]*\"|\'[^\']*\'|-?\d+(?:\.\d+)?|true|false|null)",
|
||||
)
|
||||
# Formato 5 (DeepSeek DSML). Formato oficial V4-Pro: el marcador es `|DSML|`
|
||||
# con UN pipe fullwidth (U+FF5C) a cada lado — <|DSML|invoke name="X"> ...
|
||||
# <|DSML|parameter name="P" string="true|false">V</|DSML|parameter> ...
|
||||
# </|DSML|invoke>. Hacemos el regex TOLERANTE: 1+ pipes y "DSML" opcional,
|
||||
# para cubrir variantes entre versiones del modelo. El atributo `string`
|
||||
# decide el tipo del valor: "true" = string crudo, "false" = valor JSON.
|
||||
_DSML_INVOKE_RE = re.compile(
|
||||
r"<|+(?:DSML|+)?invoke\s+name=\"([^\"]+)\"[^>]*>(.*?)</|+(?:DSML|+)?invoke\s*>",
|
||||
re.IGNORECASE | re.DOTALL,
|
||||
)
|
||||
_DSML_PARAM_RE = re.compile(
|
||||
r"<|+(?:DSML|+)?parameter\s+name=\"([^\"]+)\"([^>]*)>(.*?)</|+(?:DSML|+)?parameter\s*>",
|
||||
re.IGNORECASE | re.DOTALL,
|
||||
)
|
||||
_DSML_STRING_ATTR_RE = re.compile(r"string\s*=\s*\"(true|false)\"", re.IGNORECASE)
|
||||
|
||||
|
||||
def _safe_emit_split(buf: str) -> str:
|
||||
@@ -91,8 +110,8 @@ def _safe_emit_split(buf: str) -> str:
|
||||
# Si el tail ya tiene `>` cerrado, es un tag normal — emitir todo.
|
||||
if ">" in tail:
|
||||
return buf
|
||||
# Si el tail puede ser inicio de tool_call/invoke/tool_call_json, retenerlo.
|
||||
candidates = ("<minimax:tool_call", "<invoke", "<tool_call")
|
||||
# Si el tail puede ser inicio de tool_call/invoke/tool_call_json/dsml, retenerlo.
|
||||
candidates = ("<minimax:tool_call", "<invoke", "<tool_call", "<|")
|
||||
for cand in candidates:
|
||||
if cand.startswith(tail.lower()) or tail.lower().startswith(cand[:len(tail)].lower()):
|
||||
return buf[:idx]
|
||||
@@ -212,6 +231,35 @@ def _parse_xml_tool_calls(text: str) -> list[dict[str, Any]]:
|
||||
"arguments": args,
|
||||
})
|
||||
|
||||
# Formato 5 (DeepSeek DSML):
|
||||
# <|DSML|invoke name="X"><|DSML|parameter name="P" string="true">V</|DSML|parameter></|DSML|invoke>
|
||||
for m in _DSML_INVOKE_RE.finditer(text):
|
||||
name = m.group(1).strip()
|
||||
body = m.group(2)
|
||||
args_dsml: dict[str, Any] = {}
|
||||
for p in _DSML_PARAM_RE.finditer(body):
|
||||
pname = p.group(1).strip()
|
||||
attrs = p.group(2) or ""
|
||||
raw_val = p.group(3)
|
||||
sm = _DSML_STRING_ATTR_RE.search(attrs)
|
||||
if sm and sm.group(1).lower() == "true":
|
||||
# string="true": valor es string crudo — NO strip (preserva
|
||||
# whitespace significativo, p.ej. contenido de ficheros).
|
||||
args_dsml[pname] = raw_val
|
||||
else:
|
||||
# string="false" (o ausente): valor JSON (num/bool/array/obj/string).
|
||||
# Si no parsea, cae a string sin tocar.
|
||||
try:
|
||||
args_dsml[pname] = json.loads(raw_val.strip())
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
args_dsml[pname] = raw_val.strip()
|
||||
if name:
|
||||
calls.append({
|
||||
"id": "xml_{}".format(uuid.uuid4().hex[:12]),
|
||||
"name": name,
|
||||
"arguments": args_dsml,
|
||||
})
|
||||
|
||||
return calls
|
||||
|
||||
|
||||
|
||||
67
src/adapters/litellm_adapter.py
Normal file
67
src/adapters/litellm_adapter.py
Normal file
@@ -0,0 +1,67 @@
|
||||
"""LiteLLM model adapter — spike para A/B contra el adapter OpenAI/DeepSeek nativo.
|
||||
|
||||
Reutiliza TODO el flujo de OpenAIAdapter (procesado de chunks, conversión de
|
||||
mensajes, tools, fallback DSML) y solo cambia la llamada al modelo: en vez del
|
||||
SDK de OpenAI, enruta por LiteLLM, que trae handling específico por proveedor
|
||||
(DeepSeek incluido) y podría resolver de fábrica el DSML / reasoning_content que
|
||||
hoy parcheamos a mano.
|
||||
|
||||
Activar con `AGENTIC_DEFAULT_MODEL_PROVIDER=litellm`. Modelo via
|
||||
`AGENTIC_LITELLM_MODEL` (p.ej. "deepseek/deepseek-v4-pro"); si vacío, deriva de
|
||||
`AGENTIC_DEFAULT_MODEL_ID`. Reusa `openai_api_key` / `openai_base_url` como
|
||||
credenciales.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import litellm
|
||||
|
||||
from ..config import settings
|
||||
from .openai_adapter import OpenAIAdapter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Que LiteLLM descarte params no soportados por el proveedor en vez de petar.
|
||||
litellm.drop_params = True
|
||||
# Silenciar el spam INFO de litellm ("LiteLLM completion() model=...").
|
||||
litellm.suppress_debug_info = True
|
||||
logging.getLogger("LiteLLM").setLevel(logging.WARNING)
|
||||
|
||||
|
||||
class LiteLLMAdapter(OpenAIAdapter):
|
||||
"""Enruta las llamadas por LiteLLM, reutilizando el pipeline de OpenAIAdapter."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
model: str | None = None,
|
||||
api_key: str | None = None,
|
||||
base_url: str | None = None,
|
||||
) -> None:
|
||||
# NO llamamos a super().__init__: no necesitamos el cliente AsyncOpenAI.
|
||||
self._litellm_model = model or settings.litellm_model or self._derive_model()
|
||||
self._api_key = api_key or settings.openai_api_key or None
|
||||
self._api_base = base_url or settings.openai_base_url or None
|
||||
# LiteLLM no entrega usage fiable en streaming → estimar para billing.
|
||||
self._estimate_usage_fallback = True
|
||||
logger.info(
|
||||
"LiteLLMAdapter: model=%s api_base=%s",
|
||||
self._litellm_model, self._api_base or "(default)",
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _derive_model() -> str:
|
||||
mid = settings.default_model_id or "deepseek-chat"
|
||||
# Si ya trae prefijo de proveedor ("deepseek/...", "openai/..."), respetar.
|
||||
return mid if "/" in mid else f"deepseek/{mid}"
|
||||
|
||||
async def _acreate(self, kwargs: dict[str, Any]):
|
||||
kwargs = dict(kwargs)
|
||||
kwargs["model"] = self._litellm_model
|
||||
if self._api_key:
|
||||
kwargs["api_key"] = self._api_key
|
||||
if self._api_base:
|
||||
kwargs["api_base"] = self._api_base
|
||||
return await litellm.acompletion(**kwargs)
|
||||
@@ -14,6 +14,24 @@ from .base import ModelAdapter, ModelConfig, ModelResponse, StreamChunk
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _estimate_usage(messages: list[dict[str, Any]], output_text: str) -> dict[str, int]:
|
||||
"""Estimacion de tokens cuando el proveedor no entrega usage (p.ej. LiteLLM
|
||||
streaming). Aproximada pero evita billing 0."""
|
||||
from ..context.compactor import estimate_tokens
|
||||
inp = 0
|
||||
for m in messages:
|
||||
c = m.get("content")
|
||||
if isinstance(c, str):
|
||||
inp += estimate_tokens(c)
|
||||
elif isinstance(c, list):
|
||||
for b in c:
|
||||
if isinstance(b, dict):
|
||||
inp += estimate_tokens(
|
||||
b.get("text") or b.get("thinking") or str(b.get("content") or "")
|
||||
)
|
||||
return {"input_tokens": inp, "output_tokens": estimate_tokens(output_text or "")}
|
||||
|
||||
|
||||
class OpenAIAdapter(ModelAdapter):
|
||||
"""Adapter for the OpenAI API (GPT-4o, o1, etc.)."""
|
||||
|
||||
@@ -25,6 +43,15 @@ class OpenAIAdapter(ModelAdapter):
|
||||
if url:
|
||||
kwargs["base_url"] = url
|
||||
self._client = AsyncOpenAI(**kwargs)
|
||||
# El path nativo conserva el usage real del proveedor; subclases que no
|
||||
# reciben usage fiable en streaming (LiteLLM) lo ponen a True para estimar.
|
||||
self._estimate_usage_fallback = False
|
||||
|
||||
async def _acreate(self, kwargs: dict[str, Any]):
|
||||
"""Hook de la llamada al modelo. Subclases (p.ej. LiteLLMAdapter) lo
|
||||
sobreescriben para enrutar por otra librería sin tocar el resto del
|
||||
flujo (procesado de chunks, tools, mensajes)."""
|
||||
return await self._client.chat.completions.create(**kwargs)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Streaming
|
||||
@@ -43,43 +70,90 @@ class OpenAIAdapter(ModelAdapter):
|
||||
)
|
||||
|
||||
kwargs: dict[str, Any] = {
|
||||
"model": config.model_id or "gpt-4o",
|
||||
"model": config.model_id or settings.default_model_id or "gpt-4o",
|
||||
"max_tokens": config.max_tokens,
|
||||
"temperature": config.temperature,
|
||||
"messages": messages,
|
||||
"messages": self._to_openai_messages(messages),
|
||||
"stream": True,
|
||||
"stream_options": {"include_usage": True},
|
||||
}
|
||||
if tools:
|
||||
kwargs["tools"] = self._format_tools(tools)
|
||||
|
||||
stream = await self._client.chat.completions.create(**kwargs)
|
||||
stream = await self._acreate(kwargs)
|
||||
|
||||
# Fallback de tool-calls-en-texto: DeepSeek a veces emite las tool calls
|
||||
# en su formato interno DSML como TEXTO (en el content) en vez de como
|
||||
# tool_calls nativos. El endpoint OpenAI no lo convierte, asi que sin
|
||||
# esto el agente "se para" mostrando DSML inerte. Reutilizamos el parser
|
||||
# del claude_adapter.
|
||||
from .claude_adapter import _parse_xml_tool_calls, _TOOL_CALL_OPEN_RE
|
||||
|
||||
tool_calls_acc: dict[int, dict[str, str]] = {}
|
||||
|
||||
final_usage: dict[str, int] = {}
|
||||
usage_emitted = False # evita doble conteo si llega usage tras estimar
|
||||
full_content = "" # content acumulado (para el fallback DSML)
|
||||
full_reasoning = "" # razonamiento acumulado (para estimar usage)
|
||||
emitted_chars = 0 # cuanto de full_content ya se emitio como delta
|
||||
suppress_text = False # tras detectar un tool-call-en-texto, no emitir mas
|
||||
|
||||
# DeepSeek thinking mode: el razonamiento llega en `delta.reasoning_content`
|
||||
# (antes del content). Lo acumulamos como un bloque `thinking` (block_index 0)
|
||||
# para que el orquestador lo persista y `_to_openai_messages` lo reenvie como
|
||||
# `reasoning_content` en el siguiente turno — DeepSeek lo exige en multi-turno
|
||||
# con tool calls ("reasoning_content ... must be passed back to the API").
|
||||
reasoning_seen = False
|
||||
reasoning_sig_emitted = False
|
||||
|
||||
async for chunk in stream:
|
||||
# With include_usage, the last chunk has usage but no choices
|
||||
if chunk.usage:
|
||||
# With include_usage, the last chunk has usage but no choices.
|
||||
# getattr: el chunk de LiteLLM (ModelResponseStream) no siempre trae
|
||||
# el atributo `usage`; el del SDK OpenAI sí (None salvo el ultimo).
|
||||
chunk_usage = getattr(chunk, "usage", None)
|
||||
if chunk_usage:
|
||||
final_usage = {
|
||||
"input_tokens": chunk.usage.prompt_tokens or 0,
|
||||
"output_tokens": chunk.usage.completion_tokens or 0,
|
||||
"input_tokens": getattr(chunk_usage, "prompt_tokens", 0) or 0,
|
||||
"output_tokens": getattr(chunk_usage, "completion_tokens", 0) or 0,
|
||||
}
|
||||
|
||||
choice = chunk.choices[0] if chunk.choices else None
|
||||
if not choice:
|
||||
# Usage-only chunk (last one with include_usage) — emit it
|
||||
if final_usage:
|
||||
if final_usage and not usage_emitted:
|
||||
yield StreamChunk(usage=final_usage)
|
||||
final_usage = {} # Only emit once
|
||||
usage_emitted = True
|
||||
continue
|
||||
|
||||
delta = choice.delta
|
||||
|
||||
# Reasoning content (DeepSeek thinking mode). Llega como campo extra
|
||||
# del delta; lo emitimos como thinking_delta en el bloque index 0.
|
||||
reasoning_txt = getattr(delta, "reasoning_content", None) if delta else None
|
||||
if reasoning_txt:
|
||||
reasoning_seen = True
|
||||
full_reasoning += reasoning_txt
|
||||
yield StreamChunk(
|
||||
thinking_delta=reasoning_txt,
|
||||
block_type="thinking",
|
||||
block_index=0,
|
||||
)
|
||||
|
||||
# Text content
|
||||
if delta and delta.content:
|
||||
yield StreamChunk(delta=delta.content)
|
||||
full_content += delta.content
|
||||
if not suppress_text:
|
||||
# Si arranca un tool call en texto (DSML/XML), emitimos lo
|
||||
# previo y dejamos de emitir el resto (el DSML no debe verse).
|
||||
m = _TOOL_CALL_OPEN_RE.search(full_content, emitted_chars)
|
||||
if m:
|
||||
suppress_text = True
|
||||
if m.start() > emitted_chars:
|
||||
yield StreamChunk(delta=full_content[emitted_chars:m.start()])
|
||||
emitted_chars = len(full_content)
|
||||
else:
|
||||
yield StreamChunk(delta=full_content[emitted_chars:])
|
||||
emitted_chars = len(full_content)
|
||||
|
||||
# Tool calls
|
||||
if delta and delta.tool_calls:
|
||||
@@ -109,7 +183,31 @@ class OpenAIAdapter(ModelAdapter):
|
||||
|
||||
# Finish
|
||||
if choice.finish_reason:
|
||||
if choice.finish_reason == "tool_calls":
|
||||
# Cerrar el bloque de razonamiento (si lo hubo) con un signature
|
||||
# sintetico: el orquestador descarta thinking blocks sin signature
|
||||
# (proteccion para MiniMax/Anthropic). DeepSeek no usa signatures;
|
||||
# este marcador solo evita el descarte y NUNCA se reenvia — en
|
||||
# `_to_openai_messages` el bloque se mapea a `reasoning_content`.
|
||||
if reasoning_seen and not reasoning_sig_emitted:
|
||||
reasoning_sig_emitted = True
|
||||
yield StreamChunk(
|
||||
thinking_signature="deepseek-reasoning",
|
||||
block_type="thinking",
|
||||
block_index=0,
|
||||
)
|
||||
# Fallback de usage: algunos proveedores via LiteLLM no entregan el
|
||||
# chunk de usage (o llega tras el break del orquestador) → billing 0.
|
||||
# Estimamos por tokens para no infra-cobrar. Solo si el adapter lo
|
||||
# pide (LiteLLM); el path nativo conserva el usage real del proveedor.
|
||||
if self._estimate_usage_fallback and not final_usage and not usage_emitted:
|
||||
final_usage = _estimate_usage(messages, full_content + "\n" + full_reasoning)
|
||||
# IMPORTANTE: DeepSeek (endpoint OpenAI) a veces cierra el stream
|
||||
# con finish_reason="stop" AUNQUE haya emitido tool_calls. Si nos
|
||||
# fiamos solo de =="tool_calls" perdemos esos tool calls: el agente
|
||||
# anuncia la accion en texto y "se para" sin ejecutarla. Por eso
|
||||
# disparamos los tool_use SIEMPRE que haya tool calls acumulados,
|
||||
# sea cual sea el finish_reason.
|
||||
if tool_calls_acc:
|
||||
for acc in tool_calls_acc.values():
|
||||
yield StreamChunk(
|
||||
tool_call_id=acc["id"],
|
||||
@@ -118,15 +216,33 @@ class OpenAIAdapter(ModelAdapter):
|
||||
finish_reason="tool_use",
|
||||
)
|
||||
# Emit usage after tool_use chunks
|
||||
if final_usage:
|
||||
if final_usage and not usage_emitted:
|
||||
yield StreamChunk(usage=final_usage)
|
||||
usage_emitted = True
|
||||
else:
|
||||
# Fallback: DeepSeek pudo emitir las tool calls como TEXTO
|
||||
# (DSML/XML) en vez de nativas. Parseamos el content y, si hay
|
||||
# tool calls, las ejecutamos igual; si no, cerramos el turno.
|
||||
text_calls = _parse_xml_tool_calls(full_content) if full_content else []
|
||||
if text_calls:
|
||||
for c in text_calls:
|
||||
yield StreamChunk(
|
||||
tool_call_id=c["id"],
|
||||
tool_name=c["name"],
|
||||
tool_arguments=json.dumps(c.get("arguments", {}), ensure_ascii=False),
|
||||
finish_reason="tool_use",
|
||||
)
|
||||
if final_usage and not usage_emitted:
|
||||
yield StreamChunk(usage=final_usage)
|
||||
usage_emitted = True
|
||||
else:
|
||||
yield StreamChunk(
|
||||
finish_reason="end_turn"
|
||||
if choice.finish_reason == "stop"
|
||||
if choice.finish_reason in ("stop", "tool_calls")
|
||||
else choice.finish_reason,
|
||||
usage=final_usage,
|
||||
usage=final_usage if not usage_emitted else {},
|
||||
)
|
||||
usage_emitted = True
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Non-streaming
|
||||
@@ -145,10 +261,10 @@ class OpenAIAdapter(ModelAdapter):
|
||||
)
|
||||
|
||||
kwargs: dict[str, Any] = {
|
||||
"model": config.model_id or "gpt-4o",
|
||||
"model": config.model_id or settings.default_model_id or "gpt-4o",
|
||||
"max_tokens": config.max_tokens,
|
||||
"temperature": config.temperature,
|
||||
"messages": messages,
|
||||
"messages": self._to_openai_messages(messages),
|
||||
}
|
||||
if tools:
|
||||
kwargs["tools"] = self._format_tools(tools)
|
||||
@@ -161,7 +277,7 @@ class OpenAIAdapter(ModelAdapter):
|
||||
"function": {"name": force_tool},
|
||||
}
|
||||
|
||||
response = await self._client.chat.completions.create(**kwargs)
|
||||
response = await self._acreate(kwargs)
|
||||
choice = response.choices[0]
|
||||
|
||||
content = choice.message.content or ""
|
||||
@@ -204,19 +320,230 @@ class OpenAIAdapter(ModelAdapter):
|
||||
|
||||
@staticmethod
|
||||
def _format_tools(tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||||
"""Convert internal tool definitions to OpenAI function calling format."""
|
||||
"""Convert internal tool definitions to OpenAI function calling format.
|
||||
|
||||
Si `deepseek_strict_tools`, marca cada funcion con `strict: true` y limpia
|
||||
del schema los keywords que DeepSeek strict NO soporta (minLength/maxLength/
|
||||
minItems/maxItems), que de otro modo darian 400."""
|
||||
strict = settings.deepseek_strict_tools
|
||||
formatted: list[dict[str, Any]] = []
|
||||
for tool in tools:
|
||||
formatted.append(
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
params = tool.get("input_schema", tool.get("parameters", {"type": "object"}))
|
||||
fn: dict[str, Any] = {
|
||||
"name": tool["name"],
|
||||
"description": tool.get("description", ""),
|
||||
"parameters": tool.get(
|
||||
"input_schema", tool.get("parameters", {"type": "object"})
|
||||
),
|
||||
"parameters": OpenAIAdapter._sanitize_strict_schema(params) if strict else params,
|
||||
}
|
||||
if strict:
|
||||
fn["strict"] = True
|
||||
formatted.append({"type": "function", "function": fn})
|
||||
return formatted
|
||||
|
||||
# Keywords no soportados por DeepSeek strict mode (segun docs oficiales).
|
||||
_STRICT_UNSUPPORTED_KEYS = ("minLength", "maxLength", "minItems", "maxItems")
|
||||
|
||||
@staticmethod
|
||||
def _sanitize_strict_schema(schema: Any) -> Any:
|
||||
"""Elimina recursivamente keywords no soportados por DeepSeek strict."""
|
||||
if isinstance(schema, dict):
|
||||
return {
|
||||
k: OpenAIAdapter._sanitize_strict_schema(v)
|
||||
for k, v in schema.items()
|
||||
if k not in OpenAIAdapter._STRICT_UNSUPPORTED_KEYS
|
||||
}
|
||||
if isinstance(schema, list):
|
||||
return [OpenAIAdapter._sanitize_strict_schema(x) for x in schema]
|
||||
return schema
|
||||
|
||||
@staticmethod
|
||||
def _blocks_text(content: Any) -> str:
|
||||
"""Extrae texto plano de un content que puede ser str o lista de bloques."""
|
||||
if content is None:
|
||||
return ""
|
||||
if isinstance(content, str):
|
||||
return content
|
||||
if isinstance(content, list):
|
||||
parts = []
|
||||
for b in content:
|
||||
if isinstance(b, dict):
|
||||
parts.append(b.get("text") or b.get("content") or "")
|
||||
else:
|
||||
parts.append(str(b))
|
||||
return "\n".join(p for p in parts if p)
|
||||
return str(content)
|
||||
|
||||
def _to_openai_messages(self, messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||||
"""Convierte los mensajes del formato interno (Anthropic-style, con bloques
|
||||
`tool_use` / `tool_result`) al formato de la API OpenAI (`tool_calls` en el
|
||||
assistant, mensajes `role: tool` con `tool_call_id`). El contexto se construye
|
||||
en formato Anthropic, así que sin esto la API OpenAI de DeepSeek rechaza el
|
||||
body ('unknown variant tool_use')."""
|
||||
out: list[dict[str, Any]] = []
|
||||
for msg in messages:
|
||||
role = msg.get("role")
|
||||
content = msg.get("content")
|
||||
if role == "system":
|
||||
out.append({"role": "system", "content": content if isinstance(content, str) else self._blocks_text(content)})
|
||||
continue
|
||||
if not isinstance(content, list):
|
||||
out.append({"role": role, "content": content if isinstance(content, str) else str(content or "")})
|
||||
continue
|
||||
if role == "assistant":
|
||||
text_parts: list[str] = []
|
||||
tool_calls: list[dict[str, Any]] = []
|
||||
reasoning_parts: list[str] = []
|
||||
for b in content:
|
||||
if not isinstance(b, dict):
|
||||
continue
|
||||
t = b.get("type")
|
||||
if t == "text":
|
||||
text_parts.append(b.get("text", ""))
|
||||
elif t == "thinking":
|
||||
# DeepSeek thinking mode: el razonamiento del turno debe
|
||||
# reenviarse como `reasoning_content` (no como signature).
|
||||
rc = b.get("thinking", "")
|
||||
if rc:
|
||||
reasoning_parts.append(rc)
|
||||
elif t == "tool_use":
|
||||
tool_calls.append({
|
||||
"id": b.get("id", ""),
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": b.get("name", ""),
|
||||
"arguments": json.dumps(b.get("input", {}), ensure_ascii=False),
|
||||
},
|
||||
})
|
||||
text_joined = "\n".join(p for p in text_parts if p)
|
||||
m: dict[str, Any] = {"role": "assistant", "content": (text_joined or None)}
|
||||
if reasoning_parts:
|
||||
if not text_joined and not tool_calls:
|
||||
# Quirk DeepSeek thinking: a veces emite TODA la respuesta
|
||||
# en reasoning_content y cierra sin content ni tool_calls.
|
||||
# Reenviar content=None sin tool_calls rompe la API
|
||||
# ("content or tool_calls must be set"), asi que promovemos
|
||||
# el reasoning a content (sin duplicarlo como reasoning_content).
|
||||
m["content"] = "\n".join(reasoning_parts)
|
||||
else:
|
||||
m["reasoning_content"] = "\n".join(reasoning_parts)
|
||||
if tool_calls:
|
||||
m["tool_calls"] = tool_calls
|
||||
out.append(m)
|
||||
else: # user (puede traer tool_result blocks)
|
||||
text_parts = []
|
||||
for b in content:
|
||||
if not isinstance(b, dict):
|
||||
continue
|
||||
t = b.get("type")
|
||||
if t == "tool_result":
|
||||
out.append({
|
||||
"role": "tool",
|
||||
"tool_call_id": b.get("tool_use_id", ""),
|
||||
"content": self._blocks_text(b.get("content")),
|
||||
})
|
||||
elif t == "text":
|
||||
text_parts.append(b.get("text", ""))
|
||||
if text_parts:
|
||||
out.append({"role": "user", "content": "\n".join(text_parts)})
|
||||
# Guard defensivo: el compactor ya garantiza el invariante tool_use ↔
|
||||
# tool_result (`_enforce_tool_pairing`), pero si algo se escapa el
|
||||
# proveedor devuelve 400 y la sesion queda bloqueada. Cinturon y tirantes.
|
||||
return self._repair_tool_sequence(out)
|
||||
|
||||
@staticmethod
|
||||
def _repair_tool_sequence(out: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||||
"""Garantiza el contrato OpenAI sobre la secuencia ya convertida:
|
||||
|
||||
- Todo `role: tool` debe responder a un tool_call_id del assistant
|
||||
inmediatamente anterior (o de su bloque contiguo de tool messages).
|
||||
Si no → se convierte a user con placeholder.
|
||||
- Todo assistant con `tool_calls` debe tener respuesta para CADA id.
|
||||
Los tool_calls sin respuesta se eliminan; si la lista queda vacia se
|
||||
elimina la key (y se asegura `content` no-None — "content or
|
||||
tool_calls must be set").
|
||||
|
||||
No deberia activarse nunca (el compactor repara antes); si se activa,
|
||||
loguea warning para detectar regresiones del compactor.
|
||||
"""
|
||||
repaired: list[dict[str, Any]] = []
|
||||
i = 0
|
||||
n = len(out)
|
||||
while i < n:
|
||||
msg = out[i]
|
||||
role = msg.get("role")
|
||||
|
||||
if role == "assistant" and msg.get("tool_calls"):
|
||||
# Bloque contiguo de tool messages que responden a este assistant.
|
||||
j = i + 1
|
||||
block: list[dict[str, Any]] = []
|
||||
while j < n and out[j].get("role") == "tool":
|
||||
block.append(out[j])
|
||||
j += 1
|
||||
answered = {t.get("tool_call_id", "") for t in block}
|
||||
kept_calls = [
|
||||
tc for tc in msg["tool_calls"] if tc.get("id", "") in answered
|
||||
]
|
||||
dropped = [
|
||||
tc for tc in msg["tool_calls"] if tc.get("id", "") not in answered
|
||||
]
|
||||
new_msg = dict(msg)
|
||||
if dropped:
|
||||
for tc in dropped:
|
||||
logger.warning(
|
||||
"repaired unanswered tool_call at index %d (tool_call_id=%s)",
|
||||
i,
|
||||
tc.get("id", ""),
|
||||
)
|
||||
if kept_calls:
|
||||
new_msg["tool_calls"] = kept_calls
|
||||
else:
|
||||
new_msg.pop("tool_calls", None)
|
||||
if new_msg.get("content") is None:
|
||||
# Promover reasoning a content si existe (mismo
|
||||
# criterio que el quirk DeepSeek de arriba); si no,
|
||||
# placeholder para no enviar content=None sin tools.
|
||||
rc = new_msg.pop("reasoning_content", None)
|
||||
new_msg["content"] = rc or "[ASSISTANT COMPACTADO]"
|
||||
repaired.append(new_msg)
|
||||
valid_ids = {tc.get("id", "") for tc in kept_calls}
|
||||
converted: list[dict[str, Any]] = []
|
||||
for t in block:
|
||||
if t.get("tool_call_id", "") in valid_ids:
|
||||
repaired.append(t)
|
||||
else:
|
||||
logger.warning(
|
||||
"repaired orphan tool message (tool_call_id=%s)",
|
||||
t.get("tool_call_id", ""),
|
||||
)
|
||||
converted.append(
|
||||
{
|
||||
"role": "user",
|
||||
"content": "[Resultado de herramienta (contexto compactado)]: "
|
||||
+ str(t.get("content", ""))[:500],
|
||||
}
|
||||
)
|
||||
return formatted
|
||||
# Los huerfanos convertidos van DESPUES del bloque de tools
|
||||
# validos para no romper la contiguidad assistant → tools.
|
||||
repaired.extend(converted)
|
||||
i = j
|
||||
continue
|
||||
|
||||
if role == "tool":
|
||||
# Tool message sin assistant con tool_calls delante → huerfano.
|
||||
logger.warning(
|
||||
"repaired orphan tool message at index %d (tool_call_id=%s)",
|
||||
i,
|
||||
msg.get("tool_call_id", ""),
|
||||
)
|
||||
repaired.append(
|
||||
{
|
||||
"role": "user",
|
||||
"content": "[Resultado de herramienta (contexto compactado)]: "
|
||||
+ str(msg.get("content", ""))[:500],
|
||||
}
|
||||
)
|
||||
i += 1
|
||||
continue
|
||||
|
||||
repaired.append(msg)
|
||||
i += 1
|
||||
return repaired
|
||||
|
||||
@@ -427,9 +427,9 @@ async def _execute_and_persist(orchestrator, storage, session, message) -> dict[
|
||||
async def abort_session(session_id: str) -> dict[str, Any]:
|
||||
"""Cancela la ejecución en curso de una sesión (botón Stop del chat).
|
||||
|
||||
Cancela la tarea detached (liberando el session_lock), cierra el stream SSE
|
||||
de los suscriptores y limpia un posible lock huérfano. Idempotente: si no
|
||||
hay nada en curso devuelve `no_active_execution` sin error.
|
||||
Cancela la tarea detached (liberando el session_lock) y cierra el stream
|
||||
SSE de los suscriptores. Idempotente: si no hay nada en curso devuelve
|
||||
`no_active_execution` sin error.
|
||||
"""
|
||||
storage = _get_storage()
|
||||
session = await storage.get_session(session_id)
|
||||
@@ -452,8 +452,13 @@ async def abort_session(session_id: str) -> dict[str, Any]:
|
||||
except Exception as e:
|
||||
logger.warning("Failed to close SSE stream on abort for %s: %s", session_id, e)
|
||||
|
||||
# Defensa: liberar un lock huérfano (p.ej. de una ejecución previa que crasheó
|
||||
# antes de soltarlo) para no bloquear el siguiente mensaje hasta el TTL.
|
||||
# Limpiar el lock SOLO si cancelamos una ejecución de verdad: el `finally`
|
||||
# de la tarea cancelada puede no llegar a liberar el lock de forma fiable.
|
||||
# `clear_session_lock` borra incondicional (sin conocer el token del lock),
|
||||
# así que invocarlo sin cancelación confirmada borraría el lock de una
|
||||
# ejecución síncrona (stream=false) aún viva — que no se registra en
|
||||
# _running_executions — y permitiría una segunda ejecución concurrente.
|
||||
if cancelled:
|
||||
try:
|
||||
await storage.clear_session_lock(session_id)
|
||||
except Exception as e:
|
||||
@@ -781,22 +786,64 @@ async def _load_knowledge_from_dir(docs_path: str = "docs") -> dict[str, Any]:
|
||||
|
||||
docs_data.append((doc_id, title, content, summary, tags, priority, load_when))
|
||||
|
||||
# Generate embeddings in batch
|
||||
# Hash de contenido por doc — base del skip idempotente de embeddings.
|
||||
import hashlib
|
||||
|
||||
def _embed_text(title, summary, content):
|
||||
return f"{title}\n{summary}\n{content[:2000]}"
|
||||
|
||||
def _doc_hash(title, summary, content):
|
||||
return hashlib.md5(_embed_text(title, summary, content).encode("utf-8")).hexdigest()
|
||||
|
||||
new_hashes = [_doc_hash(t, s, c) for _, t, c, s, _, _, _ in docs_data]
|
||||
|
||||
# Generate embeddings SOLO para docs nuevos o cuyo contenido cambió (skip
|
||||
# idempotente): si el hash coincide con el guardado y ya existe el embedding
|
||||
# en Redis, se reutiliza y NO se vuelve a llamar a la API. Esto permite que
|
||||
# /knowledge/load se dispare libremente (botón de scaffold, etc.) sin re-embeber.
|
||||
embeddings: list[Any] = [None] * len(docs_data)
|
||||
already_embedded = [False] * len(docs_data)
|
||||
has_embeddings = False
|
||||
if settings.embeddings_enabled:
|
||||
to_embed = [] # indices que hay que (re)embeber
|
||||
for i, (doc_id, title, content, summary, _, _, _) in enumerate(docs_data):
|
||||
try:
|
||||
prev = await memory._r.get(memory._key("kbhash", "knowledge", doc_id))
|
||||
if isinstance(prev, bytes):
|
||||
prev = prev.decode("utf-8")
|
||||
has_embed = await memory._r.exists(memory._key("embeddings", "knowledge", doc_id))
|
||||
except Exception:
|
||||
prev, has_embed = None, 0
|
||||
if prev == new_hashes[i] and has_embed:
|
||||
already_embedded[i] = True # sin cambios → reutiliza el embedding existente
|
||||
else:
|
||||
to_embed.append(i)
|
||||
|
||||
if to_embed:
|
||||
from ..memory.embeddings import EmbeddingService
|
||||
embed_service = EmbeddingService()
|
||||
embed_texts = [
|
||||
f"{title}\n{summary}\n{content[:2000]}"
|
||||
for _, title, content, summary, _, _, _ in docs_data
|
||||
_embed_text(docs_data[i][1], docs_data[i][3], docs_data[i][2])
|
||||
for i in to_embed
|
||||
]
|
||||
|
||||
try:
|
||||
embeddings = await embed_service.embed_batch(embed_texts)
|
||||
fresh = await embed_service.embed_batch(embed_texts)
|
||||
for j, i in enumerate(to_embed):
|
||||
embeddings[i] = fresh[j]
|
||||
has_embeddings = True
|
||||
logger.info("Generated %d embeddings for knowledge base", len(embeddings))
|
||||
logger.info(
|
||||
"Generated %d embeddings (%d sin cambios, omitidos)",
|
||||
len(to_embed), len(docs_data) - len(to_embed),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to generate embeddings: %s — loading without semantic search", e)
|
||||
embeddings = [None] * len(docs_data)
|
||||
has_embeddings = False
|
||||
else:
|
||||
has_embeddings = True
|
||||
logger.info("Knowledge sin cambios — no se regeneraron embeddings (%d docs)", len(docs_data))
|
||||
else:
|
||||
logger.info("Embeddings disabled (no AGENTIC_EMBEDDINGS_API_KEY) — KB loaded without semantic search")
|
||||
|
||||
# Limpia entradas huérfanas: docs que ya no existen en el filesystem.
|
||||
# Sin esto, los IDs antiguos (e.g. tras renombrar 'builder-fields' →
|
||||
@@ -807,9 +854,10 @@ async def _load_knowledge_from_dir(docs_path: str = "docs") -> dict[str, Any]:
|
||||
for existing in existing_docs:
|
||||
if existing.memory_id not in current_ids:
|
||||
await memory.delete_document(existing.memory_id, namespace="knowledge")
|
||||
# Borra también el embedding asociado
|
||||
# Borra también el embedding asociado y el hash de contenido
|
||||
embed_key = memory._key("embeddings", "knowledge", existing.memory_id)
|
||||
await memory._r.delete(embed_key)
|
||||
await memory._r.delete(memory._key("kbhash", "knowledge", existing.memory_id))
|
||||
removed.append(existing.memory_id)
|
||||
if removed:
|
||||
logger.info("Removed %d stale knowledge docs: %s", len(removed), removed)
|
||||
@@ -832,6 +880,11 @@ async def _load_knowledge_from_dir(docs_path: str = "docs") -> dict[str, Any]:
|
||||
|
||||
if embeddings[i] is not None:
|
||||
await memory.store_embedding(doc_id, embeddings[i], namespace="knowledge")
|
||||
# Guarda el hash de contenido para el skip idempotente del próximo load
|
||||
try:
|
||||
await memory._r.set(memory._key("kbhash", "knowledge", doc_id), new_hashes[i])
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
loaded.append({
|
||||
"id": doc_id,
|
||||
@@ -840,7 +893,7 @@ async def _load_knowledge_from_dir(docs_path: str = "docs") -> dict[str, Any]:
|
||||
"tags": tags[:5],
|
||||
"priority": priority,
|
||||
"load_when": load_when,
|
||||
"embedded": embeddings[i] is not None,
|
||||
"embedded": embeddings[i] is not None or already_embedded[i],
|
||||
})
|
||||
|
||||
logger.info("Loaded %d knowledge documents from %s (embeddings: %s)", len(loaded), docs_dir, has_embeddings)
|
||||
|
||||
@@ -32,6 +32,33 @@ class Settings(BaseSettings):
|
||||
anthropic_base_url: str = "" # Custom base URL (for MiniMax Anthropic-compatible, etc.)
|
||||
openai_api_key: str = ""
|
||||
openai_base_url: str = "" # Custom base URL (for MiniMax, DeepInfra, etc.)
|
||||
# --- Embeddings (semantic search) ---
|
||||
# Credenciales DEDICADAS para embeddings. Necesarias porque el chat usa
|
||||
# `openai_api_key` apuntando a un endpoint compatible (p.ej. DeepSeek, que NO
|
||||
# tiene API de embeddings). Si vacio, cae a `openai_api_key` por compat. El
|
||||
# base_url vacio => OpenAI real (api.openai.com); NO hereda `openai_base_url`.
|
||||
embeddings_api_key: str = ""
|
||||
embeddings_base_url: str = ""
|
||||
embeddings_model: str = "text-embedding-3-small"
|
||||
# Spike LiteLLM: si default_model_provider=litellm, modelo a usar (formato
|
||||
# litellm, p.ej. "deepseek/deepseek-v4-pro"). Vacío → deriva de default_model_id.
|
||||
litellm_model: str = ""
|
||||
|
||||
@property
|
||||
def effective_embeddings_key(self) -> str:
|
||||
"""Key a usar para embeddings. Prioriza la dedicada; reutiliza la del
|
||||
chat SOLO si el chat es OpenAI real (sin `openai_base_url` custom) — si
|
||||
apunta a DeepSeek u otro proveedor, esa key no sirve para embeddings."""
|
||||
if self.embeddings_api_key:
|
||||
return self.embeddings_api_key
|
||||
if not self.openai_base_url:
|
||||
return self.openai_api_key
|
||||
return ""
|
||||
|
||||
@property
|
||||
def embeddings_enabled(self) -> bool:
|
||||
return bool(self.effective_embeddings_key or self.embeddings_base_url)
|
||||
|
||||
default_model_provider: str = "claude"
|
||||
default_model_id: str = "claude-sonnet-4-20250514"
|
||||
# Modelo override SOLO para el sub-loop del planner (acai_plan). Si vacio,
|
||||
@@ -43,6 +70,11 @@ class Settings(BaseSettings):
|
||||
planner_max_tokens: int = 16000
|
||||
max_tokens: int = 4096
|
||||
temperature: float = 0.3
|
||||
# DeepSeek strict function calling (beta). OPT-IN (default False): exige schemas
|
||||
# tipo OpenAI (additionalProperties:false, todos required, etc.) que los tools MCP
|
||||
# actuales NO cumplen → da 400. Para activarlo: schemas compatibles + base_url
|
||||
# https://api.deepseek.com/beta + AGENTIC_DEEPSEEK_STRICT_TOOLS=true.
|
||||
deepseek_strict_tools: bool = False
|
||||
|
||||
# --- Context engine ---
|
||||
model_context_window: int = 0 # 0 = use legacy fixed budget / explicit override
|
||||
@@ -70,6 +102,10 @@ class Settings(BaseSettings):
|
||||
conversation_recent_raw_limit: int = 2
|
||||
task_history_max_entries: int = 20
|
||||
task_history_max_tokens: int = 1500
|
||||
# Presupuesto de tokens para la ventana de recent_messages persistida en
|
||||
# sesion. Sin esto crece sin limite y empuja al compactor a su paso
|
||||
# destructivo (colapsar bloques perdiendo tool_use ids). 0 = sin limite.
|
||||
recent_messages_max_tokens: int = 60_000
|
||||
|
||||
# --- MCP ---
|
||||
mcp_config_path: str = "" # Path to mcp.json; empty = legacy single-server mode
|
||||
|
||||
@@ -180,7 +180,13 @@ class ContextCompactor:
|
||||
"raw_tool_results_kept": 0,
|
||||
}
|
||||
if total <= max_tokens:
|
||||
return messages, meta
|
||||
# Aunque no haga falta compactar, garantizamos el invariante
|
||||
# tool_use/tool_result (repara historiales ya rotos persistidos).
|
||||
repaired = self._enforce_tool_pairing([dict(m) for m in messages])
|
||||
meta["output_tokens"] = sum(
|
||||
self._estimate_message_tokens(m) for m in repaired
|
||||
)
|
||||
return repaired, meta
|
||||
|
||||
compacted = [dict(m) for m in messages]
|
||||
last_user_idx = max(
|
||||
@@ -343,20 +349,241 @@ class ContextCompactor:
|
||||
message["content"] = "[USER CONTEXT COMPACTADO]"
|
||||
elif isinstance(content, list) and content:
|
||||
# Anthropic-style: reemplazar lista entera por placeholder string.
|
||||
# Nota: pierde tool_use ids — solo aplicar al final como ultimo recurso.
|
||||
# Nota: colapsar pierde los tool_use/tool_result ids, asi que
|
||||
# lo hacemos PAIR-AWARE (colapsar un lado del par colapsa el
|
||||
# otro en la misma iteracion) y ademas `_enforce_tool_pairing`
|
||||
# al final garantiza el invariante aunque algo se escape.
|
||||
if role == "assistant":
|
||||
message["content"] = "[ASSISTANT COMPACTADO]"
|
||||
# Si este assistant tenia tool_use, colapsar tambien el
|
||||
# user de tool_results que lo sigue (mismo par).
|
||||
if self._blocks_have_type(content, "tool_use"):
|
||||
nxt = idx + 1
|
||||
if (
|
||||
nxt < len(compacted)
|
||||
and nxt != last_user_idx
|
||||
and compacted[nxt].get("role") == "user"
|
||||
and self._blocks_have_type(
|
||||
compacted[nxt].get("content"), "tool_result"
|
||||
)
|
||||
):
|
||||
compacted[nxt]["content"] = "[USER CONTEXT COMPACTADO]"
|
||||
elif role == "user":
|
||||
message["content"] = "[USER CONTEXT COMPACTADO]"
|
||||
# Si este user llevaba tool_results, colapsar tambien el
|
||||
# assistant anterior con sus tool_use (mismo par).
|
||||
if self._blocks_have_type(content, "tool_result"):
|
||||
prv = idx - 1
|
||||
if (
|
||||
prv >= 0
|
||||
and compacted[prv].get("role") == "assistant"
|
||||
and self._blocks_have_type(
|
||||
compacted[prv].get("content"), "tool_use"
|
||||
)
|
||||
):
|
||||
compacted[prv]["content"] = "[ASSISTANT COMPACTADO]"
|
||||
else:
|
||||
continue
|
||||
total = sum(self._estimate_message_tokens(m) for m in compacted)
|
||||
if total <= max_tokens:
|
||||
break
|
||||
|
||||
# Invariante final: tras toda la compactacion, reparar cualquier par
|
||||
# tool_use/tool_result roto. Sin esto, un tool_result huerfano se emite
|
||||
# como `role: tool` sin `tool_calls` previo y el proveedor devuelve 400
|
||||
# ("Messages with role 'tool' must be a response to a preceding message
|
||||
# with 'tool_calls'").
|
||||
compacted = self._enforce_tool_pairing(compacted)
|
||||
total = sum(self._estimate_message_tokens(m) for m in compacted)
|
||||
|
||||
meta["output_tokens"] = total
|
||||
return compacted, meta
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Invariante tool_use ↔ tool_result
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def _blocks_have_type(content: Any, block_type: str) -> bool:
|
||||
"""True si `content` es una lista de bloques con alguno del tipo dado."""
|
||||
if not isinstance(content, list):
|
||||
return False
|
||||
return any(
|
||||
isinstance(b, dict) and b.get("type") == block_type for b in content
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _tool_use_ids(message: dict[str, Any]) -> set[str]:
|
||||
"""IDs de tool calls emitidos por un assistant (bloques `tool_use`
|
||||
estilo Anthropic y/o `tool_calls` estilo OpenAI legacy)."""
|
||||
ids: set[str] = set()
|
||||
content = message.get("content")
|
||||
if isinstance(content, list):
|
||||
for b in content:
|
||||
if isinstance(b, dict) and b.get("type") == "tool_use":
|
||||
ids.add(str(b.get("id", "")))
|
||||
for tc in message.get("tool_calls") or []:
|
||||
if isinstance(tc, dict):
|
||||
ids.add(str(tc.get("id", "")))
|
||||
ids.discard("")
|
||||
return ids
|
||||
|
||||
def _enforce_tool_pairing(
|
||||
self, messages: list[dict[str, Any]]
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Repara el invariante tool_use ↔ tool_result en ambas direcciones.
|
||||
|
||||
La compactacion puede colapsar el content de un assistant (perdiendo sus
|
||||
bloques `tool_use`) mientras el user siguiente conserva sus `tool_result`,
|
||||
o al reves. El matching es por IDs (`tool_use.id` vs `tool_result.tool_use_id`
|
||||
y `tool_calls[].id` vs `tool_call_id`), no solo por adyacencia, asi que
|
||||
tambien repara desajustes parciales (p.ej. 3 tool_use vs 2 tool_result).
|
||||
|
||||
- tool_result sin tool_use previo → bloque text placeholder.
|
||||
- tool_use sin tool_result siguiente → se elimina el bloque (thinking/text
|
||||
se conservan; si el content queda vacio, placeholder string).
|
||||
- `role: tool` legacy sin assistant con `tool_calls` → user placeholder.
|
||||
"""
|
||||
repaired: list[dict[str, Any]] = []
|
||||
for idx, msg in enumerate(messages):
|
||||
role = msg.get("role", "")
|
||||
content = msg.get("content")
|
||||
|
||||
if role == "assistant":
|
||||
tool_ids = self._tool_use_ids(msg)
|
||||
if not tool_ids:
|
||||
repaired.append(msg)
|
||||
continue
|
||||
# IDs respondidos: user con tool_results inmediato y/o run
|
||||
# contiguo de mensajes legacy `role: tool`.
|
||||
answered: set[str] = set()
|
||||
j = idx + 1
|
||||
if (
|
||||
j < len(messages)
|
||||
and messages[j].get("role") == "user"
|
||||
and isinstance(messages[j].get("content"), list)
|
||||
):
|
||||
for b in messages[j]["content"]:
|
||||
if isinstance(b, dict) and b.get("type") == "tool_result":
|
||||
answered.add(str(b.get("tool_use_id", "")))
|
||||
j += 1
|
||||
while j < len(messages) and messages[j].get("role") == "tool":
|
||||
answered.add(str(messages[j].get("tool_call_id", "")))
|
||||
j += 1
|
||||
unanswered = tool_ids - answered
|
||||
if not unanswered:
|
||||
repaired.append(msg)
|
||||
continue
|
||||
# Eliminar los tool_use/tool_calls sin respuesta.
|
||||
new_msg = dict(msg)
|
||||
if isinstance(content, list):
|
||||
new_content = [
|
||||
b
|
||||
for b in content
|
||||
if not (
|
||||
isinstance(b, dict)
|
||||
and b.get("type") == "tool_use"
|
||||
and str(b.get("id", "")) in unanswered
|
||||
)
|
||||
]
|
||||
if not new_content:
|
||||
new_msg["content"] = "[ASSISTANT COMPACTADO]"
|
||||
else:
|
||||
new_msg["content"] = new_content
|
||||
if isinstance(new_msg.get("tool_calls"), list):
|
||||
kept_calls = [
|
||||
tc
|
||||
for tc in new_msg["tool_calls"]
|
||||
if isinstance(tc, dict)
|
||||
and str(tc.get("id", "")) not in unanswered
|
||||
]
|
||||
if kept_calls:
|
||||
new_msg["tool_calls"] = kept_calls
|
||||
else:
|
||||
new_msg.pop("tool_calls", None)
|
||||
if not new_msg.get("content"):
|
||||
new_msg["content"] = "[ASSISTANT COMPACTADO]"
|
||||
repaired.append(new_msg)
|
||||
continue
|
||||
|
||||
if role == "user" and self._blocks_have_type(content, "tool_result"):
|
||||
# IDs disponibles en el assistant inmediatamente anterior
|
||||
# (YA reparado — usar `repaired[-1]` refleja los tool_use que
|
||||
# sobrevivieron, no los del mensaje original).
|
||||
available: set[str] = set()
|
||||
if repaired and repaired[-1].get("role") == "assistant":
|
||||
available = self._tool_use_ids(repaired[-1])
|
||||
new_content: list[Any] = []
|
||||
orphaned = False
|
||||
for b in content:
|
||||
if (
|
||||
isinstance(b, dict)
|
||||
and b.get("type") == "tool_result"
|
||||
and str(b.get("tool_use_id", "")) not in available
|
||||
):
|
||||
orphaned = True
|
||||
# Fusionar placeholders consecutivos en un unico bloque text.
|
||||
if not (
|
||||
new_content
|
||||
and isinstance(new_content[-1], dict)
|
||||
and new_content[-1].get("type") == "text"
|
||||
and new_content[-1].get("text")
|
||||
== "[Resultado de herramienta compactado]"
|
||||
):
|
||||
new_content.append(
|
||||
{
|
||||
"type": "text",
|
||||
"text": "[Resultado de herramienta compactado]",
|
||||
}
|
||||
)
|
||||
continue
|
||||
new_content.append(b)
|
||||
if not orphaned:
|
||||
repaired.append(msg)
|
||||
continue
|
||||
new_msg = dict(msg)
|
||||
only_placeholders = all(
|
||||
isinstance(b, dict)
|
||||
and b.get("type") == "text"
|
||||
and b.get("text") == "[Resultado de herramienta compactado]"
|
||||
for b in new_content
|
||||
)
|
||||
if not new_content or only_placeholders:
|
||||
new_msg["content"] = "[Resultado de herramienta compactado]"
|
||||
else:
|
||||
new_msg["content"] = new_content
|
||||
repaired.append(new_msg)
|
||||
continue
|
||||
|
||||
if role == "tool":
|
||||
# Legacy: el assistant anterior (saltando otros `role: tool`
|
||||
# contiguos) debe tener este tool_call_id en sus tool_calls.
|
||||
prev_assistant: dict[str, Any] | None = None
|
||||
for prev in reversed(repaired):
|
||||
if prev.get("role") == "tool":
|
||||
continue
|
||||
if prev.get("role") == "assistant":
|
||||
prev_assistant = prev
|
||||
break
|
||||
call_id = str(msg.get("tool_call_id", ""))
|
||||
valid = (
|
||||
prev_assistant is not None
|
||||
and call_id in self._tool_use_ids(prev_assistant)
|
||||
)
|
||||
if valid:
|
||||
repaired.append(msg)
|
||||
else:
|
||||
repaired.append(
|
||||
{
|
||||
"role": "user",
|
||||
"content": "[Resultado de herramienta compactado]",
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
repaired.append(msg)
|
||||
return repaired
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internals
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@@ -583,6 +583,16 @@ class ContextEngine:
|
||||
|
||||
async def _semantic_rank(self, query: str) -> list[tuple[str, float]]:
|
||||
"""Rank knowledge docs by cosine similarity. Returns (doc_id, score)."""
|
||||
# Sin credencial de embeddings no tiene sentido intentar la llamada (daria
|
||||
# 401 en cada turno). Se desactiva limpiamente con un aviso unico.
|
||||
if not settings.embeddings_enabled:
|
||||
if not getattr(self, "_embed_disabled_warned", False):
|
||||
logger.warning(
|
||||
"Embeddings disabled (no AGENTIC_EMBEDDINGS_API_KEY) — "
|
||||
"semantic search off, loading all docs"
|
||||
)
|
||||
self._embed_disabled_warned = True
|
||||
return []
|
||||
try:
|
||||
if not self._embed_service:
|
||||
self._embed_service = EmbeddingService()
|
||||
@@ -936,7 +946,22 @@ class ContextEngine:
|
||||
else:
|
||||
base_user_content = "Awaiting task assignment."
|
||||
|
||||
followup_mode = self._classify_followup_mode(base_user_content)
|
||||
# Un follow-up (transform/fetch_more/ambiguous) SOLO tiene sentido si hay
|
||||
# un turno anterior al que referirse. En una sesión fresca / primer mensaje
|
||||
# no hay nada que transformar, así que NO clasificamos: de lo contrario un
|
||||
# primer prompt que casualmente contenga un marker ("resumen", "estructura",
|
||||
# "busca", "adapta"…) se marcaría como `transform` y `_get_allowed_tools`
|
||||
# devolvería [] — el agente se quedaría SIN tools y emitiría los tool calls
|
||||
# como texto sin ejecutarlos (caso real: el prompt de análisis de estilos
|
||||
# que dice "Guarda un resumen…").
|
||||
has_prior_turn = bool(session.task_history) or bool(
|
||||
getattr(session, "recent_messages", [])
|
||||
)
|
||||
followup_mode = (
|
||||
self._classify_followup_mode(base_user_content)
|
||||
if has_prior_turn
|
||||
else "none"
|
||||
)
|
||||
resolved_context = ""
|
||||
if session.task_history and followup_mode != "none":
|
||||
resolved_context = self._build_followup_resolution(session.task_history[-1])
|
||||
|
||||
@@ -54,7 +54,11 @@ async def lifespan(app: FastAPI):
|
||||
await redis_storage.connect()
|
||||
|
||||
# 2. Initialize model adapter
|
||||
if settings.default_model_provider == "openai":
|
||||
if settings.default_model_provider == "litellm":
|
||||
from .adapters.litellm_adapter import LiteLLMAdapter
|
||||
model_adapter = LiteLLMAdapter()
|
||||
logger.info("Using LiteLLM adapter (model: %s)", settings.litellm_model or settings.default_model_id)
|
||||
elif settings.default_model_provider == "openai":
|
||||
model_adapter = OpenAIAdapter()
|
||||
logger.info("Using OpenAI adapter (model: %s)", settings.default_model_id)
|
||||
else:
|
||||
|
||||
@@ -18,6 +18,15 @@ from ..models.tools import ToolDefinition
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Buffer maximo (bytes) del StreamReader para leer las respuestas JSON-RPC del
|
||||
# MCP por stdout. Una respuesta llega en UNA sola linea; tools como el
|
||||
# screenshot fullPage de Playwright devuelven la imagen en base64 en esa linea
|
||||
# y superan de largo el 64KB por defecto de asyncio (y el 1MB que teniamos),
|
||||
# lanzando LimitOverrunError que mataba el read loop y dejaba la sesion MCP
|
||||
# inservible (el agente "se paraba" al hacer acciones). 64MB cubre cualquier
|
||||
# screenshot real; por encima, el read loop descarta esa respuesta y sigue vivo.
|
||||
MCP_STREAM_LIMIT = 64 * 1024 * 1024
|
||||
|
||||
|
||||
class MCPClientError(Exception):
|
||||
pass
|
||||
@@ -74,7 +83,7 @@ class MCPClient:
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
env=self._env,
|
||||
limit=1024 * 1024, # 1MB buffer for large MCP responses
|
||||
limit=MCP_STREAM_LIMIT, # buffer grande para respuestas MCP (screenshots base64)
|
||||
)
|
||||
self._running = True
|
||||
self._reader_task = asyncio.create_task(self._read_loop())
|
||||
@@ -225,14 +234,30 @@ class MCPClient:
|
||||
if not self._process or not self._process.stdout:
|
||||
return
|
||||
|
||||
stdout = self._process.stdout
|
||||
try:
|
||||
while self._running:
|
||||
line = await self._process.stdout.readline()
|
||||
try:
|
||||
line = await stdout.readline()
|
||||
except (ValueError, asyncio.LimitOverrunError):
|
||||
# Una respuesta JSON-RPC supero el buffer (p.ej. screenshot
|
||||
# fullPage de Playwright en base64 por encima de 64MB). Antes
|
||||
# esto mataba el read loop y dejaba TODA la sesion MCP muerta
|
||||
# (el agente se "paraba" en la siguiente accion). Ahora
|
||||
# descartamos solo esa respuesta, re-sincronizamos el stream
|
||||
# y seguimos vivos para las demas tools.
|
||||
logger.warning(
|
||||
"MCP [%s]: respuesta supera el buffer (%d MB), se descarta y se continua",
|
||||
self.name, MCP_STREAM_LIMIT // (1024 * 1024),
|
||||
)
|
||||
await self._drain_until_newline(stdout)
|
||||
continue
|
||||
|
||||
if not line:
|
||||
logger.warning("MCP server stdout closed")
|
||||
break
|
||||
|
||||
line_str = line.decode().strip()
|
||||
line_str = line.decode(errors="replace").strip()
|
||||
if not line_str:
|
||||
continue
|
||||
|
||||
@@ -251,6 +276,21 @@ class MCPClient:
|
||||
finally:
|
||||
self._running = False
|
||||
|
||||
async def _drain_until_newline(self, stdout: asyncio.StreamReader) -> None:
|
||||
"""Consume bytes del stream hasta el proximo salto de linea para
|
||||
re-sincronizar tras un LimitOverrunError (la respuesta sobredimensionada
|
||||
se descarta). `read()` no usa separador, asi que no vuelve a disparar el
|
||||
overrun y va vaciando el buffer hasta liberar la linea gigante."""
|
||||
while self._running:
|
||||
try:
|
||||
chunk = await stdout.read(65536)
|
||||
except Exception:
|
||||
return
|
||||
if not chunk:
|
||||
return
|
||||
if b"\n" in chunk:
|
||||
return
|
||||
|
||||
def _handle_message(self, message: dict[str, Any]) -> None:
|
||||
"""Route an incoming JSON-RPC message."""
|
||||
msg_id = message.get("id")
|
||||
|
||||
@@ -25,12 +25,19 @@ class EmbeddingService:
|
||||
def __init__(
|
||||
self,
|
||||
api_key: str | None = None,
|
||||
model: str = DEFAULT_MODEL,
|
||||
model: str | None = None,
|
||||
) -> None:
|
||||
self._client = AsyncOpenAI(
|
||||
api_key=api_key or settings.openai_api_key,
|
||||
)
|
||||
self._model = model
|
||||
# Credenciales dedicadas de embeddings. Fallback a openai_api_key por
|
||||
# compat. El base_url solo se aplica si se configura explicitamente
|
||||
# `embeddings_base_url`; vacio => OpenAI real (api.openai.com). NO se
|
||||
# hereda `openai_base_url` (que apunta al chat, p.ej. DeepSeek sin
|
||||
# endpoint de embeddings).
|
||||
key = api_key or settings.effective_embeddings_key
|
||||
kwargs: dict[str, Any] = {"api_key": key}
|
||||
if settings.embeddings_base_url:
|
||||
kwargs["base_url"] = settings.embeddings_base_url
|
||||
self._client = AsyncOpenAI(**kwargs)
|
||||
self._model = model or settings.embeddings_model or DEFAULT_MODEL
|
||||
|
||||
async def embed(self, text: str) -> list[float]:
|
||||
"""Generate embedding for a single text."""
|
||||
|
||||
@@ -289,6 +289,34 @@ class BaseAgent:
|
||||
|
||||
# If no tool calls, we're done
|
||||
if not tool_calls:
|
||||
# Quirk DeepSeek thinking: a veces el modelo emite TODA su
|
||||
# respuesta como reasoning y cierra el turno sin text ni
|
||||
# tool_use. Si el turno termina SOLO con bloques thinking,
|
||||
# promovemos el thinking a un bloque text en el snapshot que
|
||||
# se persiste — asi el UI no lo muestra como "pensando" al
|
||||
# recargar y el siguiente turno no rompe con
|
||||
# "content or tool_calls must be set".
|
||||
if turn_blocks and all(b.get("type") == "thinking" for b in turn_blocks):
|
||||
promoted = "\n".join(
|
||||
b.get("thinking", "") for b in turn_blocks if b.get("thinking")
|
||||
)
|
||||
turn_blocks = [{"type": "text", "text": promoted}]
|
||||
accumulated_content += promoted
|
||||
if promoted and self.profile.stream_deltas:
|
||||
# Emision en vivo via AGENT_DELTA normal: el
|
||||
# ClaudeFormatEmitter cierra el thinking block abierto
|
||||
# (content_block_stop) y abre un text block nuevo con
|
||||
# su propio indice (start/delta/stop), asi que el
|
||||
# protocolo de bloques no se rompe.
|
||||
await self.sse.emit(
|
||||
EventType.AGENT_DELTA,
|
||||
{
|
||||
"agent": self.profile.role,
|
||||
"delta": promoted,
|
||||
"step": step,
|
||||
},
|
||||
session_id=session.session_id,
|
||||
)
|
||||
if turn_blocks:
|
||||
conversation.append({"role": "assistant", "content": turn_blocks})
|
||||
elif full_text:
|
||||
|
||||
@@ -14,7 +14,7 @@ from typing import Any
|
||||
from ..adapters.base import ModelAdapter
|
||||
from ..config import settings
|
||||
from ..context.engine import ContextEngine
|
||||
from ..context.compactor import estimate_tokens
|
||||
from ..context.compactor import ContextCompactor, estimate_tokens
|
||||
from ..mcp.manager import MCPManager
|
||||
from ..memory.store import MemoryStore
|
||||
from ..models.agent import AgentProfile
|
||||
@@ -260,7 +260,76 @@ class OrchestratorEngine:
|
||||
current_turn.append(sanitized)
|
||||
|
||||
merged.extend(current_turn)
|
||||
return merged
|
||||
return OrchestratorEngine._trim_recent_messages(merged)
|
||||
|
||||
@staticmethod
|
||||
def _trim_recent_messages(
|
||||
messages: list[dict[str, Any]],
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Recorta recent_messages a un presupuesto de tokens eliminando
|
||||
mensajes ENTEROS desde el principio (los mas antiguos).
|
||||
|
||||
Dos reglas para no romper el invariante tool_use ↔ tool_result:
|
||||
- Nunca cortar dentro de un par: si se elimina un assistant con
|
||||
tool_use, se eliminan tambien sus tool_results (user carrier o run
|
||||
de mensajes legacy `role: tool`).
|
||||
- El primer mensaje resultante nunca puede ser un carrier de
|
||||
tool_result ni un `role: tool`.
|
||||
|
||||
Mantiene siempre al menos los ultimos 4 mensajes aunque excedan el
|
||||
presupuesto.
|
||||
"""
|
||||
budget = settings.recent_messages_max_tokens
|
||||
if budget <= 0 or not messages:
|
||||
return messages
|
||||
|
||||
estimate = ContextCompactor._estimate_message_tokens
|
||||
total = sum(estimate(m) for m in messages)
|
||||
if total <= budget:
|
||||
return messages
|
||||
|
||||
def _is_tool_result_carrier(msg: dict[str, Any]) -> bool:
|
||||
if msg.get("role") == "tool":
|
||||
return True
|
||||
if msg.get("role") != "user":
|
||||
return False
|
||||
content = msg.get("content")
|
||||
return isinstance(content, list) and any(
|
||||
isinstance(b, dict) and b.get("type") == "tool_result"
|
||||
for b in content
|
||||
)
|
||||
|
||||
def _has_tool_use(msg: dict[str, Any]) -> bool:
|
||||
if msg.get("role") != "assistant":
|
||||
return False
|
||||
if msg.get("tool_calls"):
|
||||
return True
|
||||
content = msg.get("content")
|
||||
return isinstance(content, list) and any(
|
||||
isinstance(b, dict) and b.get("type") == "tool_use"
|
||||
for b in content
|
||||
)
|
||||
|
||||
min_keep = 4
|
||||
n = len(messages)
|
||||
start = 0
|
||||
while total > budget and start < n - min_keep:
|
||||
end = start + 1
|
||||
if _has_tool_use(messages[start]):
|
||||
# Arrastrar los tool_results del par (no cortar dentro de el).
|
||||
while end < n and _is_tool_result_carrier(messages[end]):
|
||||
end += 1
|
||||
if n - end < min_keep:
|
||||
break # Eliminar el par completo invadiria los ultimos min_keep
|
||||
for k in range(start, end):
|
||||
total -= estimate(messages[k])
|
||||
start = end
|
||||
|
||||
trimmed = messages[start:]
|
||||
# El primer mensaje nunca puede ser un tool_result sin su tool_use.
|
||||
while trimmed and _is_tool_result_carrier(trimmed[0]):
|
||||
trimmed.pop(0)
|
||||
return trimmed
|
||||
|
||||
@staticmethod
|
||||
def _sanitize_recent_message(message: dict[str, Any]) -> dict[str, Any]:
|
||||
|
||||
@@ -12,6 +12,7 @@ from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Any, AsyncIterator
|
||||
|
||||
@@ -127,14 +128,26 @@ class RedisStorage:
|
||||
# Execution lock (prevents concurrent messages on same session)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
# Compare-and-delete atómico: solo borra el lock si el valor coincide con
|
||||
# el token de quien lo adquirió. Evita que una ejecución cuyo lock expiró
|
||||
# por TTL borre en su `finally` el lock que ya adquirió otra petición.
|
||||
_UNLOCK_LUA = (
|
||||
"if redis.call('get', KEYS[1]) == ARGV[1] then "
|
||||
"return redis.call('del', KEYS[1]) else return 0 end"
|
||||
)
|
||||
|
||||
@asynccontextmanager
|
||||
async def session_lock(
|
||||
self, session_id: str, timeout: int = 300
|
||||
self, session_id: str, timeout: int | None = None
|
||||
) -> AsyncIterator[bool]:
|
||||
"""Acquire an exclusive execution lock for a session.
|
||||
|
||||
Uses SETNX with auto-expiry to prevent deadlocks if the process
|
||||
crashes mid-execution.
|
||||
crashes mid-execution. El TTL es mayor que el timeout global de
|
||||
ejecución para que el lock no expire (y otra petición lo robe)
|
||||
mientras la ejecución original sigue viva. Cada adquisición guarda
|
||||
un token único como valor y la liberación es compare-and-delete
|
||||
(Lua), de modo que solo el dueño puede borrar el lock.
|
||||
|
||||
Usage:
|
||||
async with storage.session_lock(session_id) as acquired:
|
||||
@@ -142,20 +155,34 @@ class RedisStorage:
|
||||
raise HTTPException(409, "Session busy")
|
||||
# ... execute ...
|
||||
"""
|
||||
if timeout is None:
|
||||
timeout = int(settings.max_execution_timeout_seconds) + 60
|
||||
key = self._key("session", session_id, "lock")
|
||||
acquired = await self.client.set(key, "1", nx=True, ex=timeout)
|
||||
token = uuid.uuid4().hex
|
||||
acquired = await self.client.set(key, token, nx=True, ex=timeout)
|
||||
try:
|
||||
yield bool(acquired)
|
||||
finally:
|
||||
if acquired:
|
||||
await self.client.delete(key)
|
||||
released = await self.client.eval(self._UNLOCK_LUA, 1, key, token)
|
||||
if not released:
|
||||
# El lock expiró por TTL y/o lo posee otra petición — no
|
||||
# tocamos nada, pero lo dejamos registrado.
|
||||
logger.warning(
|
||||
"session_lock for %s no longer owned at release "
|
||||
"(expired or taken over)",
|
||||
session_id,
|
||||
)
|
||||
|
||||
async def clear_session_lock(self, session_id: str) -> None:
|
||||
"""Borra el lock de ejecución de una sesión de forma incondicional.
|
||||
|
||||
Usado por el endpoint de abort para liberar un lock huérfano (de una
|
||||
ejecución previa que crasheó antes de soltarlo) y no bloquear el
|
||||
siguiente mensaje hasta que expire el TTL.
|
||||
OJO: borra sin conocer el token del dueño, así que se salta el
|
||||
compare-and-delete de `session_lock`. SOLO debe invocarse cuando se
|
||||
ha confirmado que la ejecución dueña del lock fue cancelada (ver
|
||||
`abort_session` en routes.py): la tarea cancelada puede no ejecutar
|
||||
su `finally` de liberación de forma fiable, y en ese caso no hay
|
||||
riesgo de borrar el lock de una ejecución viva.
|
||||
"""
|
||||
key = self._key("session", session_id, "lock")
|
||||
await self.client.delete(key)
|
||||
|
||||
@@ -19,6 +19,71 @@ from .sse import EventType, SSEEmitter
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_GENERIC_ERROR = (
|
||||
"Ha ocurrido un error procesando tu mensaje. Vuelve a intentarlo en unos momentos."
|
||||
)
|
||||
|
||||
# Patrones que el frontend interpreta por sí mismo (login / sesión expirada).
|
||||
# No los genericamos para no romper esas detecciones.
|
||||
_PASSTHROUGH_PATTERNS = (
|
||||
"not logged in",
|
||||
"login required",
|
||||
"authentication required",
|
||||
"no conversation found",
|
||||
)
|
||||
|
||||
|
||||
def friendly_error_message(raw: str, code: str = "") -> str:
|
||||
"""Traduce un error crudo (proveedor/excepción) a un mensaje genérico y
|
||||
localizado para el usuario final, sin filtrar detalles internos.
|
||||
|
||||
Devuelve el texto original sin tocar para los casos de auth/sesión que el
|
||||
frontend ya gestiona por contenido.
|
||||
"""
|
||||
raw = raw or ""
|
||||
text = "{} {}".format(code or "", raw).lower()
|
||||
|
||||
# Auth / sesión: dejar pasar el texto original (lo maneja el frontend)
|
||||
if any(p in text for p in _PASSTHROUGH_PATTERNS):
|
||||
return raw
|
||||
|
||||
# Timeout de ejecución
|
||||
if "timeout" in text or "timed out" in text:
|
||||
return (
|
||||
"La tarea tardó demasiado en completarse. Prueba a dividirla en "
|
||||
"pasos más pequeños o vuelve a intentarlo."
|
||||
)
|
||||
# Saldo insuficiente / facturación del proveedor (402)
|
||||
if (
|
||||
"402" in text
|
||||
or "insufficient balance" in text
|
||||
or "insufficient_quota" in text
|
||||
or "billing" in text
|
||||
):
|
||||
return (
|
||||
"El asistente no está disponible en este momento. Inténtalo de "
|
||||
"nuevo en unos minutos."
|
||||
)
|
||||
# Credenciales del proveedor inválidas (401)
|
||||
if (
|
||||
"401" in text
|
||||
or "invalid_api_key" in text
|
||||
or "incorrect api key" in text
|
||||
or "invalid api key" in text
|
||||
):
|
||||
return (
|
||||
"El asistente no está disponible temporalmente por un problema de "
|
||||
"configuración. Estamos trabajando en ello."
|
||||
)
|
||||
# Límite de peticiones (429)
|
||||
if "429" in text or "rate limit" in text or "rate_limit" in text:
|
||||
return (
|
||||
"Hay mucha demanda en este momento. Espera unos segundos y vuelve "
|
||||
"a intentarlo."
|
||||
)
|
||||
return _GENERIC_ERROR
|
||||
|
||||
|
||||
class ClaudeFormatEmitter:
|
||||
"""Emits events in Claude Code CLI SSE format.
|
||||
|
||||
@@ -304,7 +369,10 @@ class ClaudeFormatEmitter:
|
||||
self._push(session_id, {"type": "done"})
|
||||
|
||||
elif event_type == EventType.ERROR:
|
||||
error_msg = data.get("message", str(data.get("error", "Unknown error")))
|
||||
raw_msg = data.get("message", str(data.get("error", "Unknown error")))
|
||||
user_msg = friendly_error_message(raw_msg, str(data.get("error", "")))
|
||||
# El error real (detalles del proveedor) solo va al log, nunca al cliente.
|
||||
logger.warning("Session %s error (raw): %s", session_id, raw_msg)
|
||||
|
||||
# Close any open block
|
||||
self._close_text_block(session_id)
|
||||
@@ -312,7 +380,7 @@ class ClaudeFormatEmitter:
|
||||
self._push(session_id, {
|
||||
"type": "result",
|
||||
"is_error": True,
|
||||
"result": error_msg,
|
||||
"result": user_msg,
|
||||
"usage": {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0},
|
||||
"total_cost_usd": 0,
|
||||
})
|
||||
|
||||
@@ -294,11 +294,27 @@ class TestTaskHistoryTrim:
|
||||
class TestConversationCompaction:
|
||||
def test_compactor_preserves_last_user_and_compacts_old_tool_results(self):
|
||||
compactor = ContextCompactor(max_tokens=999999)
|
||||
# Los assistants llevan sus tool_calls: sin ellos los `role: tool`
|
||||
# serian huerfanos y `_enforce_tool_pairing` los convertiria a user.
|
||||
messages = [
|
||||
{"role": "user", "content": "Contexto anterior " * 10},
|
||||
{"role": "assistant", "content": "Voy a revisar el modulo ahora mismo. " * 6},
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": "Voy a revisar el modulo ahora mismo. " * 6,
|
||||
"tool_calls": [
|
||||
{"id": "tool-1", "type": "function",
|
||||
"function": {"name": "t", "arguments": "{}"}},
|
||||
],
|
||||
},
|
||||
{"role": "tool", "tool_call_id": "tool-1", "content": "resultado antiguo\n" * 80},
|
||||
{"role": "assistant", "content": "He visto el resultado anterior. " * 6},
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": "He visto el resultado anterior. " * 6,
|
||||
"tool_calls": [
|
||||
{"id": "tool-2", "type": "function",
|
||||
"function": {"name": "t", "arguments": "{}"}},
|
||||
],
|
||||
},
|
||||
{"role": "tool", "tool_call_id": "tool-2", "content": "resultado reciente\n" * 80},
|
||||
{"role": "user", "content": "Este es el ultimo mensaje del usuario y debe quedar intacto."},
|
||||
]
|
||||
@@ -358,9 +374,18 @@ class TestConversationCompaction:
|
||||
|
||||
def test_compactor_only_touches_user_messages_as_last_resort(self):
|
||||
compactor = ContextCompactor(max_tokens=999999)
|
||||
# tool_calls en el assistant para que el `role: tool` no sea huerfano
|
||||
# (el invariante `_enforce_tool_pairing` convertiria un huerfano a user).
|
||||
messages = [
|
||||
{"role": "user", "content": "Contexto previo del usuario " * 8},
|
||||
{"role": "assistant", "content": "Respuesta previa del asistente " * 6},
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": "Respuesta previa del asistente " * 6,
|
||||
"tool_calls": [
|
||||
{"id": "tool-1", "type": "function",
|
||||
"function": {"name": "t", "arguments": "{}"}},
|
||||
],
|
||||
},
|
||||
{"role": "tool", "tool_call_id": "tool-1", "content": "resultado viejo\n" * 80},
|
||||
{"role": "user", "content": "Ultimo mensaje del usuario"},
|
||||
]
|
||||
|
||||
585
tests/test_tool_pairing_real.py
Normal file
585
tests/test_tool_pairing_real.py
Normal file
@@ -0,0 +1,585 @@
|
||||
"""Tests de REGRESION REAL del invariante tool_use ↔ tool_result.
|
||||
|
||||
A diferencia del resto de tests (que replican logica), este archivo importa el
|
||||
codigo REAL de src/. Cubre el bug de produccion: sesiones largas (~130k tokens)
|
||||
donde `compact_conversation` colapsaba assistants a "[ASSISTANT COMPACTADO]"
|
||||
perdiendo los bloques `tool_use`, dejando tool_results huerfanos que el adapter
|
||||
emitia como `role: tool` sin `tool_calls` → 400 del proveedor en cada reintento.
|
||||
|
||||
Requiere las dependencias de src/ (pydantic, Python 3.11+). Si no estan
|
||||
disponibles (p.ej. host con Python 3.10), el modulo entero se salta — ejecutar
|
||||
dentro del container: `docker exec acai-agentic python3 -m pytest ...`.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
try:
|
||||
from src.context.compactor import ContextCompactor
|
||||
except Exception as e: # pragma: no cover - entorno sin deps de src/
|
||||
pytest.skip(f"src/ no importable en este entorno: {e}", allow_module_level=True)
|
||||
|
||||
|
||||
# =====================================================================
|
||||
# Helper de validacion reutilizable
|
||||
# =====================================================================
|
||||
|
||||
|
||||
def collect_tool_use_ids(message: dict) -> set:
|
||||
"""IDs de tool calls de un assistant (Anthropic blocks + OpenAI legacy)."""
|
||||
ids = set()
|
||||
content = message.get("content")
|
||||
if isinstance(content, list):
|
||||
for b in content:
|
||||
if isinstance(b, dict) and b.get("type") == "tool_use":
|
||||
ids.add(str(b.get("id", "")))
|
||||
for tc in message.get("tool_calls") or []:
|
||||
if isinstance(tc, dict):
|
||||
ids.add(str(tc.get("id", "")))
|
||||
ids.discard("")
|
||||
return ids
|
||||
|
||||
|
||||
def assert_tool_pairing_ok(messages: list) -> None:
|
||||
"""Valida el invariante completo sobre una lista de mensajes internos:
|
||||
|
||||
- Todo tool_result (block) referencia un tool_use del assistant anterior.
|
||||
- Todo tool_use (block) tiene su tool_result en el mensaje siguiente.
|
||||
- Todo `role: tool` legacy responde a un tool_call del assistant previo.
|
||||
"""
|
||||
for i, msg in enumerate(messages):
|
||||
role = msg.get("role")
|
||||
content = msg.get("content")
|
||||
|
||||
if role == "user" and isinstance(content, list):
|
||||
result_ids = {
|
||||
str(b.get("tool_use_id", ""))
|
||||
for b in content
|
||||
if isinstance(b, dict) and b.get("type") == "tool_result"
|
||||
}
|
||||
if result_ids:
|
||||
assert i > 0, f"msg[{i}]: tool_result al inicio de la conversacion"
|
||||
prev = messages[i - 1]
|
||||
assert prev.get("role") == "assistant", (
|
||||
f"msg[{i}]: tool_result sin assistant inmediatamente anterior"
|
||||
)
|
||||
available = collect_tool_use_ids(prev)
|
||||
orphans = result_ids - available
|
||||
assert not orphans, (
|
||||
f"msg[{i}]: tool_result huerfanos {orphans} "
|
||||
f"(assistant previo solo tiene {available})"
|
||||
)
|
||||
|
||||
if role == "assistant":
|
||||
tool_ids = collect_tool_use_ids(msg)
|
||||
if tool_ids:
|
||||
answered = set()
|
||||
j = i + 1
|
||||
if (
|
||||
j < len(messages)
|
||||
and messages[j].get("role") == "user"
|
||||
and isinstance(messages[j].get("content"), list)
|
||||
):
|
||||
for b in messages[j]["content"]:
|
||||
if isinstance(b, dict) and b.get("type") == "tool_result":
|
||||
answered.add(str(b.get("tool_use_id", "")))
|
||||
j += 1
|
||||
while j < len(messages) and messages[j].get("role") == "tool":
|
||||
answered.add(str(messages[j].get("tool_call_id", "")))
|
||||
j += 1
|
||||
unanswered = tool_ids - answered
|
||||
assert not unanswered, (
|
||||
f"msg[{i}]: tool_use sin respuesta {unanswered}"
|
||||
)
|
||||
|
||||
if role == "tool":
|
||||
prev_assistant = None
|
||||
for k in range(i - 1, -1, -1):
|
||||
if messages[k].get("role") == "tool":
|
||||
continue
|
||||
if messages[k].get("role") == "assistant":
|
||||
prev_assistant = messages[k]
|
||||
break
|
||||
assert prev_assistant is not None, (
|
||||
f"msg[{i}]: role tool sin assistant previo"
|
||||
)
|
||||
call_id = str(msg.get("tool_call_id", ""))
|
||||
assert call_id in collect_tool_use_ids(prev_assistant), (
|
||||
f"msg[{i}]: role tool con tool_call_id={call_id} no presente "
|
||||
f"en el assistant previo"
|
||||
)
|
||||
|
||||
|
||||
def make_turn(n: int, payload_chars: int = 4000) -> list:
|
||||
"""Genera un turno completo: user → assistant(thinking+text+tool_use) →
|
||||
user(tool_result). Payloads grandes para forzar la compactacion."""
|
||||
tid = f"call_{n}"
|
||||
return [
|
||||
{"role": "user", "content": f"Peticion {n}: " + ("x" * payload_chars)},
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{"type": "thinking", "thinking": "razonando " * (payload_chars // 10)},
|
||||
{"type": "text", "text": f"Voy a ejecutar la tool del turno {n}."},
|
||||
{
|
||||
"type": "tool_use",
|
||||
"id": tid,
|
||||
"name": "acai_get_records",
|
||||
"input": {"tableName": f"tabla_{n}"},
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "tool_result",
|
||||
"tool_use_id": tid,
|
||||
"content": "resultado " * (payload_chars // 10),
|
||||
}
|
||||
],
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
# =====================================================================
|
||||
# (a) compact_conversation end-to-end: el paso de ultimo recurso ya no
|
||||
# deja tool_results huerfanos ni tool_use sin respuesta
|
||||
# =====================================================================
|
||||
|
||||
|
||||
class TestCompactConversationPairing:
|
||||
def test_last_resort_does_not_orphan_tool_results(self):
|
||||
compactor = ContextCompactor()
|
||||
messages = []
|
||||
for n in range(12):
|
||||
messages.extend(make_turn(n, payload_chars=6000))
|
||||
messages.append({"role": "user", "content": "ultima peticion del usuario"})
|
||||
|
||||
# Presupuesto minusculo: fuerza TODOS los pasos incluida la colapsa
|
||||
# de listas a placeholder string (el paso que causaba el bug).
|
||||
compacted, meta = compactor.compact_conversation(messages, max_tokens=300)
|
||||
|
||||
assert meta["output_tokens"] < meta["input_tokens"]
|
||||
assert_tool_pairing_ok(compacted)
|
||||
|
||||
def test_moderate_budget_keeps_pairing(self):
|
||||
compactor = ContextCompactor()
|
||||
messages = []
|
||||
for n in range(8):
|
||||
messages.extend(make_turn(n, payload_chars=3000))
|
||||
messages.append({"role": "user", "content": "peticion final"})
|
||||
|
||||
compacted, _ = compactor.compact_conversation(messages, max_tokens=2000)
|
||||
assert_tool_pairing_ok(compacted)
|
||||
|
||||
def test_under_budget_passthrough_keeps_pairing(self):
|
||||
compactor = ContextCompactor()
|
||||
messages = make_turn(1, payload_chars=50)
|
||||
compacted, meta = compactor.compact_conversation(messages, max_tokens=100_000)
|
||||
assert meta["messages_compacted"] == 0
|
||||
assert_tool_pairing_ok(compacted)
|
||||
# Los tool_use/tool_result originales se conservan intactos
|
||||
assert collect_tool_use_ids(compacted[1]) == {"call_1"}
|
||||
|
||||
def test_last_user_message_preserved(self):
|
||||
compactor = ContextCompactor()
|
||||
messages = []
|
||||
for n in range(10):
|
||||
messages.extend(make_turn(n, payload_chars=5000))
|
||||
final = "esta es la peticion actual que NO debe perderse"
|
||||
messages.append({"role": "user", "content": final})
|
||||
|
||||
compacted, _ = compactor.compact_conversation(messages, max_tokens=300)
|
||||
assert compacted[-1]["content"] == final
|
||||
|
||||
|
||||
# =====================================================================
|
||||
# (b) _enforce_tool_pairing directo
|
||||
# =====================================================================
|
||||
|
||||
|
||||
class TestEnforceToolPairing:
|
||||
def setup_method(self):
|
||||
self.compactor = ContextCompactor()
|
||||
|
||||
def test_collapsed_assistant_with_orphan_tool_results(self):
|
||||
"""Assistant colapsado a string + user con tool_results → los
|
||||
tool_result se convierten en placeholder."""
|
||||
messages = [
|
||||
{"role": "assistant", "content": "[ASSISTANT COMPACTADO]"},
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "tool_result", "tool_use_id": "call_a", "content": "datos"},
|
||||
{"type": "tool_result", "tool_use_id": "call_b", "content": "mas datos"},
|
||||
],
|
||||
},
|
||||
]
|
||||
repaired = self.compactor._enforce_tool_pairing(messages)
|
||||
assert_tool_pairing_ok(repaired)
|
||||
# Solo placeholders → content string (fusionados en uno)
|
||||
assert repaired[1]["role"] == "user"
|
||||
assert repaired[1]["content"] == "[Resultado de herramienta compactado]"
|
||||
|
||||
def test_orphan_tool_results_mixed_with_text(self):
|
||||
"""tool_result huerfano junto a un bloque text → placeholder en lista,
|
||||
el text se conserva."""
|
||||
messages = [
|
||||
{"role": "assistant", "content": "[ASSISTANT COMPACTADO]"},
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "tool_result", "tool_use_id": "call_a", "content": "datos"},
|
||||
{"type": "text", "text": "y ademas haz esto"},
|
||||
],
|
||||
},
|
||||
]
|
||||
repaired = self.compactor._enforce_tool_pairing(messages)
|
||||
assert_tool_pairing_ok(repaired)
|
||||
content = repaired[1]["content"]
|
||||
assert isinstance(content, list)
|
||||
types = [b.get("type") for b in content]
|
||||
assert types == ["text", "text"]
|
||||
assert content[0]["text"] == "[Resultado de herramienta compactado]"
|
||||
assert content[1]["text"] == "y ademas haz esto"
|
||||
|
||||
def test_partial_id_mismatch_drops_unanswered_tool_use(self):
|
||||
"""Assistant con 3 tool_use, user con solo 2 tool_result → se elimina
|
||||
el tool_use sin respuesta, thinking/text intactos."""
|
||||
messages = [
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{"type": "thinking", "thinking": "pensando"},
|
||||
{"type": "text", "text": "ejecuto tres tools"},
|
||||
{"type": "tool_use", "id": "c1", "name": "t1", "input": {}},
|
||||
{"type": "tool_use", "id": "c2", "name": "t2", "input": {}},
|
||||
{"type": "tool_use", "id": "c3", "name": "t3", "input": {}},
|
||||
],
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "tool_result", "tool_use_id": "c1", "content": "r1"},
|
||||
{"type": "tool_result", "tool_use_id": "c3", "content": "r3"},
|
||||
],
|
||||
},
|
||||
]
|
||||
repaired = self.compactor._enforce_tool_pairing(messages)
|
||||
assert_tool_pairing_ok(repaired)
|
||||
assert collect_tool_use_ids(repaired[0]) == {"c1", "c3"}
|
||||
types = [b.get("type") for b in repaired[0]["content"]]
|
||||
assert "thinking" in types and "text" in types
|
||||
|
||||
def test_assistant_tool_use_with_no_results_at_all(self):
|
||||
"""Assistant con tool_use y SIN user de resultados detras → se
|
||||
eliminan los tool_use; si el content queda vacio, placeholder."""
|
||||
messages = [
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{"type": "tool_use", "id": "c9", "name": "t", "input": {}},
|
||||
],
|
||||
},
|
||||
{"role": "user", "content": "otra cosa"},
|
||||
]
|
||||
repaired = self.compactor._enforce_tool_pairing(messages)
|
||||
assert_tool_pairing_ok(repaired)
|
||||
assert repaired[0]["content"] == "[ASSISTANT COMPACTADO]"
|
||||
|
||||
def test_legacy_orphan_role_tool_converted_to_user(self):
|
||||
"""role:tool legacy cuyo assistant anterior no tiene tool_calls →
|
||||
se convierte a user placeholder."""
|
||||
messages = [
|
||||
{"role": "assistant", "content": "[ASSISTANT COMPACTADO]"},
|
||||
{"role": "tool", "tool_call_id": "call_x", "content": "salida tool"},
|
||||
]
|
||||
repaired = self.compactor._enforce_tool_pairing(messages)
|
||||
assert_tool_pairing_ok(repaired)
|
||||
assert repaired[1]["role"] == "user"
|
||||
assert repaired[1]["content"] == "[Resultado de herramienta compactado]"
|
||||
|
||||
def test_legacy_valid_role_tool_untouched(self):
|
||||
messages = [
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": "lanzo tool",
|
||||
"tool_calls": [
|
||||
{"id": "call_x", "type": "function",
|
||||
"function": {"name": "t", "arguments": "{}"}},
|
||||
],
|
||||
},
|
||||
{"role": "tool", "tool_call_id": "call_x", "content": "salida"},
|
||||
]
|
||||
repaired = self.compactor._enforce_tool_pairing(messages)
|
||||
assert_tool_pairing_ok(repaired)
|
||||
assert repaired[1]["role"] == "tool"
|
||||
|
||||
def test_well_paired_history_is_noop(self):
|
||||
messages = make_turn(7, payload_chars=50)
|
||||
repaired = self.compactor._enforce_tool_pairing(messages)
|
||||
assert repaired == messages
|
||||
|
||||
|
||||
# =====================================================================
|
||||
# (c) Trim de recent_messages (OrchestratorEngine._trim_recent_messages)
|
||||
# =====================================================================
|
||||
|
||||
|
||||
orchestrator_engine = pytest.importorskip(
|
||||
"src.orchestrator.engine",
|
||||
reason="deps del orquestador (mcp, sse, redis) no disponibles",
|
||||
)
|
||||
OrchestratorEngine = orchestrator_engine.OrchestratorEngine
|
||||
|
||||
|
||||
class TestTrimRecentMessages:
|
||||
def _set_budget(self, monkeypatch, tokens: int):
|
||||
from src.config import settings
|
||||
monkeypatch.setattr(settings, "recent_messages_max_tokens", tokens)
|
||||
|
||||
def test_under_budget_untouched(self, monkeypatch):
|
||||
self._set_budget(monkeypatch, 100_000)
|
||||
messages = make_turn(0, payload_chars=100)
|
||||
assert OrchestratorEngine._trim_recent_messages(list(messages)) == messages
|
||||
|
||||
def test_trims_oldest_whole_pairs(self, monkeypatch):
|
||||
self._set_budget(monkeypatch, 500)
|
||||
messages = []
|
||||
for n in range(10):
|
||||
messages.extend(make_turn(n, payload_chars=1000))
|
||||
trimmed = OrchestratorEngine._trim_recent_messages(messages)
|
||||
|
||||
assert len(trimmed) < len(messages)
|
||||
# Nunca se corta dentro de un par
|
||||
assert_tool_pairing_ok(trimmed)
|
||||
# El primer mensaje nunca es un carrier de tool_result ni role tool
|
||||
first = trimmed[0]
|
||||
assert first.get("role") != "tool"
|
||||
if isinstance(first.get("content"), list):
|
||||
assert not any(
|
||||
isinstance(b, dict) and b.get("type") == "tool_result"
|
||||
for b in first["content"]
|
||||
)
|
||||
# Se eliminan los mas antiguos: el final se conserva
|
||||
assert trimmed[-1] == messages[-1]
|
||||
|
||||
def test_keeps_last_four_even_over_budget(self, monkeypatch):
|
||||
self._set_budget(monkeypatch, 10) # presupuesto imposible
|
||||
messages = []
|
||||
for n in range(5):
|
||||
messages.extend(make_turn(n, payload_chars=2000))
|
||||
trimmed = OrchestratorEngine._trim_recent_messages(messages)
|
||||
assert len(trimmed) >= 4
|
||||
|
||||
def test_pair_dragging_includes_legacy_tool_run(self, monkeypatch):
|
||||
"""Un assistant legacy con tool_calls arrastra su run de role:tool."""
|
||||
self._set_budget(monkeypatch, 300)
|
||||
big = "y" * 3000
|
||||
messages = [
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": big,
|
||||
"tool_calls": [
|
||||
{"id": "c1", "type": "function",
|
||||
"function": {"name": "t", "arguments": "{}"}},
|
||||
{"id": "c2", "type": "function",
|
||||
"function": {"name": "t", "arguments": "{}"}},
|
||||
],
|
||||
},
|
||||
{"role": "tool", "tool_call_id": "c1", "content": big},
|
||||
{"role": "tool", "tool_call_id": "c2", "content": big},
|
||||
{"role": "user", "content": "pregunta"},
|
||||
{"role": "assistant", "content": "respuesta"},
|
||||
{"role": "user", "content": "otra pregunta"},
|
||||
{"role": "assistant", "content": "otra respuesta"},
|
||||
]
|
||||
trimmed = OrchestratorEngine._trim_recent_messages(messages)
|
||||
# El par legacy entero (assistant + 2 tools) se elimino junto
|
||||
assert trimmed[0] == {"role": "user", "content": "pregunta"}
|
||||
assert_tool_pairing_ok(trimmed)
|
||||
|
||||
def test_append_recent_messages_applies_trim(self, monkeypatch):
|
||||
self._set_budget(monkeypatch, 500)
|
||||
existing = []
|
||||
for n in range(10):
|
||||
existing.extend(make_turn(n, payload_chars=1000))
|
||||
merged = OrchestratorEngine._append_recent_messages(
|
||||
existing, message="nueva peticion", conversation=[
|
||||
{"role": "assistant", "content": "ok hecho"},
|
||||
],
|
||||
)
|
||||
assert len(merged) < len(existing) + 2
|
||||
assert merged[-1] == {"role": "assistant", "content": "ok hecho"}
|
||||
assert_tool_pairing_ok(merged)
|
||||
|
||||
|
||||
# =====================================================================
|
||||
# (d) Guard defensivo del adapter (_repair_tool_sequence)
|
||||
# =====================================================================
|
||||
|
||||
|
||||
openai_mod = pytest.importorskip("openai", reason="SDK openai no instalado")
|
||||
|
||||
|
||||
class TestRepairToolSequence:
|
||||
@property
|
||||
def repair(self):
|
||||
from src.adapters.openai_adapter import OpenAIAdapter
|
||||
return OpenAIAdapter._repair_tool_sequence
|
||||
|
||||
def test_valid_sequence_untouched(self):
|
||||
msgs = [
|
||||
{"role": "system", "content": "sys"},
|
||||
{"role": "user", "content": "hola"},
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": None,
|
||||
"tool_calls": [
|
||||
{"id": "c1", "type": "function",
|
||||
"function": {"name": "t", "arguments": "{}"}},
|
||||
],
|
||||
},
|
||||
{"role": "tool", "tool_call_id": "c1", "content": "resultado"},
|
||||
{"role": "assistant", "content": "listo"},
|
||||
]
|
||||
assert self.repair(list(msgs)) == msgs
|
||||
|
||||
def test_orphan_tool_message_converted_to_user(self):
|
||||
msgs = [
|
||||
{"role": "assistant", "content": "[ASSISTANT COMPACTADO]"},
|
||||
{"role": "tool", "tool_call_id": "c_orphan", "content": "datos " * 200},
|
||||
]
|
||||
out = self.repair(msgs)
|
||||
assert out[1]["role"] == "user"
|
||||
assert out[1]["content"].startswith(
|
||||
"[Resultado de herramienta (contexto compactado)]: "
|
||||
)
|
||||
# Content truncado a 500 chars (+ prefijo)
|
||||
assert len(out[1]["content"]) <= 500 + len(
|
||||
"[Resultado de herramienta (contexto compactado)]: "
|
||||
)
|
||||
assert not any(m.get("role") == "tool" for m in out)
|
||||
|
||||
def test_unanswered_tool_calls_removed(self):
|
||||
msgs = [
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": None,
|
||||
"tool_calls": [
|
||||
{"id": "c1", "type": "function",
|
||||
"function": {"name": "t", "arguments": "{}"}},
|
||||
{"id": "c2", "type": "function",
|
||||
"function": {"name": "t", "arguments": "{}"}},
|
||||
],
|
||||
},
|
||||
{"role": "tool", "tool_call_id": "c1", "content": "r1"},
|
||||
{"role": "user", "content": "sigue"},
|
||||
]
|
||||
out = self.repair(msgs)
|
||||
assert [tc["id"] for tc in out[0]["tool_calls"]] == ["c1"]
|
||||
assert out[1] == {"role": "tool", "tool_call_id": "c1", "content": "r1"}
|
||||
|
||||
def test_all_tool_calls_unanswered_drops_key_and_sets_content(self):
|
||||
msgs = [
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": None,
|
||||
"tool_calls": [
|
||||
{"id": "c1", "type": "function",
|
||||
"function": {"name": "t", "arguments": "{}"}},
|
||||
],
|
||||
},
|
||||
{"role": "user", "content": "sigue"},
|
||||
]
|
||||
out = self.repair(msgs)
|
||||
assert "tool_calls" not in out[0]
|
||||
assert out[0]["content"] # nunca None sin tool_calls
|
||||
|
||||
def test_reasoning_promoted_when_tool_calls_dropped(self):
|
||||
"""No romper la promocion de reasoning a content del fix anterior."""
|
||||
msgs = [
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": None,
|
||||
"reasoning_content": "razonamiento del modelo",
|
||||
"tool_calls": [
|
||||
{"id": "c1", "type": "function",
|
||||
"function": {"name": "t", "arguments": "{}"}},
|
||||
],
|
||||
},
|
||||
{"role": "user", "content": "sigue"},
|
||||
]
|
||||
out = self.repair(msgs)
|
||||
assert "tool_calls" not in out[0]
|
||||
assert out[0]["content"] == "razonamiento del modelo"
|
||||
assert "reasoning_content" not in out[0]
|
||||
|
||||
def test_mixed_orphan_in_tool_block(self):
|
||||
"""Un huerfano en medio de un bloque de tools validos se convierte a
|
||||
user DESPUES del bloque (no rompe la contiguidad assistant→tools)."""
|
||||
msgs = [
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": None,
|
||||
"tool_calls": [
|
||||
{"id": "c1", "type": "function",
|
||||
"function": {"name": "t", "arguments": "{}"}},
|
||||
{"id": "c2", "type": "function",
|
||||
"function": {"name": "t", "arguments": "{}"}},
|
||||
],
|
||||
},
|
||||
{"role": "tool", "tool_call_id": "c1", "content": "r1"},
|
||||
{"role": "tool", "tool_call_id": "huerfano", "content": "rx"},
|
||||
{"role": "tool", "tool_call_id": "c2", "content": "r2"},
|
||||
{"role": "user", "content": "sigue"},
|
||||
]
|
||||
out = self.repair(msgs)
|
||||
roles = [m["role"] for m in out]
|
||||
assert roles == ["assistant", "tool", "tool", "user", "user"]
|
||||
assert out[1]["tool_call_id"] == "c1"
|
||||
assert out[2]["tool_call_id"] == "c2"
|
||||
assert out[3]["content"].startswith("[Resultado de herramienta")
|
||||
|
||||
|
||||
class TestAdapterEndToEnd:
|
||||
"""_to_openai_messages + guard sobre un historial roto realista."""
|
||||
|
||||
def test_collapsed_assistant_history_produces_valid_openai_sequence(self):
|
||||
from src.adapters.openai_adapter import OpenAIAdapter
|
||||
adapter = OpenAIAdapter.__new__(OpenAIAdapter) # sin cliente real
|
||||
|
||||
internal = [
|
||||
{"role": "system", "content": "eres un agente"},
|
||||
{"role": "user", "content": "haz algo"},
|
||||
# Assistant colapsado por el compactor (perdio sus tool_use)
|
||||
{"role": "assistant", "content": "[ASSISTANT COMPACTADO]"},
|
||||
# …pero el user conserva sus tool_results (el bug de produccion)
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "tool_result", "tool_use_id": "call_1", "content": "datos"},
|
||||
],
|
||||
},
|
||||
{"role": "assistant", "content": "termine"},
|
||||
{"role": "user", "content": "siguiente peticion"},
|
||||
]
|
||||
out = adapter._to_openai_messages(internal)
|
||||
# Contrato OpenAI: ningun role:tool sin tool_calls previo
|
||||
for i, m in enumerate(out):
|
||||
if m.get("role") == "tool":
|
||||
assert i > 0
|
||||
prev = out[i - 1]
|
||||
prev_ids = set()
|
||||
k = i - 1
|
||||
while k >= 0 and out[k].get("role") == "tool":
|
||||
k -= 1
|
||||
if k >= 0 and out[k].get("role") == "assistant":
|
||||
prev_ids = {
|
||||
tc.get("id") for tc in out[k].get("tool_calls") or []
|
||||
}
|
||||
assert m.get("tool_call_id") in prev_ids, (
|
||||
f"role tool huerfano en out[{i}]"
|
||||
)
|
||||
# El tool_result huerfano acabo como user, no como role tool
|
||||
assert not any(m.get("role") == "tool" for m in out)
|
||||
Reference in New Issue
Block a user