From 06ce51a9c12bb0765d9c9300f9f3dce8086188d3 Mon Sep 17 00:00:00 2001 From: Jordan Diaz Date: Wed, 6 May 2026 07:07:57 +0000 Subject: [PATCH] Mas cosas --- agents/acai/system.md | 4 + mcp-server/auth/mcpTokens.js | 7 +- mcp-server/httpServer.js | 58 +++++- src/adapters/base.py | 21 ++- src/adapters/claude_adapter.py | 314 +++++++++++++++++++++++++++++--- src/context/compactor.py | 110 +++++++++-- src/context/engine.py | 28 ++- src/orchestrator/agents/base.py | 176 ++++++++++++++---- src/orchestrator/engine.py | 5 +- 9 files changed, 643 insertions(+), 80 deletions(-) diff --git a/agents/acai/system.md b/agents/acai/system.md index 570491c..6dd6850 100644 --- a/agents/acai/system.md +++ b/agents/acai/system.md @@ -1,5 +1,9 @@ Eres el asistente de desarrollo de Acai CMS. Ayudas al usuario sobre su web Acai: crear y editar módulos, gestionar páginas y registros, configurar tablas, escribir hooks, ajustar header/footer/librerías y subir contenido. Hablas y respondes **siempre en español**. +# Mecanismo de tools (CRÍTICO) + +Para invocar herramientas usa **EXCLUSIVAMENTE el mecanismo nativo de tool_use** del API. NUNCA escribas tool calls como texto en tu respuesta. En particular NO escribas marcadores como ``, `[TOOL_CALL]`, ``, ``, `{tool => ...}`, `{name: ..., parameters: ...}` ni cualquier pseudocódigo similar dentro del campo `content` de texto. El sistema tiene soporte de tools incorporado — invócalas directamente. Si escribes una tool call como texto, **no se ejecutará** y el usuario solo verá el markup crudo. + # Identidad y rol Actúas como un desarrollador senior experto en Acai CMS. Antes de cualquier acción no trivial: diff --git a/mcp-server/auth/mcpTokens.js b/mcp-server/auth/mcpTokens.js index 53ad5a7..c389aa9 100644 --- a/mcp-server/auth/mcpTokens.js +++ b/mcp-server/auth/mcpTokens.js @@ -77,14 +77,17 @@ export async function validateMcpToken(secret) { } catch { return null; } - if (!meta || !meta.user || !meta.project) return null; + // Solo exigimos `user`. `project` puede ser "" (token user-wide que + // autoriza todos los proyectos del usuario, ver handlers/mcp_tokens.py + // del backend Python para los detalles del modelo). + if (!meta || !meta.user) return null; // Actualizacion asincrona de lastUsedAt — no bloqueamos la request. updateLastUsedAt(key, meta).catch((e) => { console.error("[mcp-tokens] lastUsedAt update failed:", e.message); }); - return { user: meta.user, project: meta.project, id: meta.id || "" }; + return { user: meta.user, project: meta.project || "", id: meta.id || "" }; } async function updateLastUsedAt(key, meta) { diff --git a/mcp-server/httpServer.js b/mcp-server/httpServer.js index ab0f9a7..375b266 100644 --- a/mcp-server/httpServer.js +++ b/mcp-server/httpServer.js @@ -195,6 +195,13 @@ export function startHttpServer() { // identifica manualmente con X-Acai-User + X-Project-Name). //============================================================================= app.use(async (req, res, next) => { + // DEBUG temporal: loguear TODA request que llegue. Quitar cuando este + // claro el flujo del cliente. + const secretPresent = !!req.headers["x-mcp-secret"]; + const authPresent = !!req.headers["authorization"]; + console.error( + `[MCP req] ${req.method} ${req.url} - X-MCP-Secret=${secretPresent ? "yes" : "MISSING"}, Authorization=${authPresent ? "yes" : "MISSING"}, UA=${(req.headers["user-agent"] || "").substring(0, 60)}`, + ); const secret = req.headers["x-mcp-secret"]; if (!secret) { return next(); @@ -202,6 +209,7 @@ export function startHttpServer() { try { const auth = await validateMcpToken(secret); if (!auth) { + console.error("[MCP middleware] Invalid X-MCP-Secret rejected"); res.status(401) .setHeader("Content-Type", "application/json") .end(JSON.stringify({ error: "Invalid MCP token" })); @@ -209,7 +217,18 @@ export function startHttpServer() { } // Sobrescribe los headers de identidad con los del token validado. req.headers["x-acai-user"] = auth.user; - req.headers["x-project-name"] = auth.project; + // `auth.project` solo se sobrescribe si el token es project-scoped. + // Si es user-wide (auth.project === ""), preservamos el + // `X-Project-Name` que el cliente envio (la extension VS Code + // Acai Forge lo manda con el slug del proyecto descargado). + if (auth.project) { + req.headers["x-project-name"] = auth.project; + } + console.error( + `[MCP middleware] Auth OK user=${auth.user} ` + + `tokenScope=${auth.project || "user-wide"} ` + + `clientProject=${req.headers["x-project-name"] || "(none)"}`, + ); return next(); } catch (err) { console.error("[MCP] mcpSecretMiddleware error:", err.message); @@ -580,11 +599,42 @@ export function startHttpServer() { }); //============================================================================= - // OAUTH2 ENDPOINTS + // OAUTH2 ENDPOINTS — DESHABILITADOS + //============================================================================= + // El flujo OAuth se diseno a medida (client_secret = nombre de proyecto) + // y no funciona con clientes MCP estandar (Claude Code, etc.) que usan + // PKCE puro. El unico cliente "oficial" es la extension VS Code Acai + // Forge, que NO usa OAuth — autentica con header X-MCP-Secret directo. + // + // Devolver 404 en `.well-known/oauth-authorization-server` hace que los + // clientes que hacen OAuth discovery hagan fallback a header auth, lo + // cual usa X-MCP-Secret (validado en el middleware de las lineas ~197). + // Los handlers `/register`, `/authorize`, `/token` y los helpers `signJwt` + // / `verifyJwt` / `resolveProjectCredentials` se mantienen porque son + // usados internamente por el transport SSE legacy (lineas ~113, ~265). //============================================================================= - // OAuth2 Authorization Server Metadata endpoint (per RFC8414) - app.get('/.well-known/oauth-authorization-server', (req, res) => { + // Rutas OAuth/OIDC discovery deshabilitadas — devuelven 404 JSON limpio + // para que el cliente fallback a header auth (X-MCP-Secret) en vez de + // intentar OAuth flow. Cubrimos ambas paths comunes y sus variantes + // anidadas bajo /mcp/ porque algunos clientes (Claude Code) prueban + // ambas: en la raiz Y bajo el endpoint MCP. + const _disabledOauthPaths = [ + '/.well-known/oauth-authorization-server', + '/.well-known/openid-configuration', + '/.well-known/oauth-protected-resource', + '/mcp/.well-known/oauth-authorization-server', + '/mcp/.well-known/openid-configuration', + '/mcp/.well-known/oauth-protected-resource', + ]; + for (const _p of _disabledOauthPaths) { + app.get(_p, (req, res) => { + res.status(404).json({ error: "OAuth not available; use X-MCP-Secret header" }); + }); + } + + // OAuth2 Authorization Server Metadata endpoint (per RFC8414) — REMOVED + app.get('/.well-known/oauth-authorization-server-DISABLED', (req, res) => { const baseUrl = `https://${req.headers.host}`; res.json({ issuer: baseUrl, diff --git a/src/adapters/base.py b/src/adapters/base.py index fc38b9c..ac62f50 100644 --- a/src/adapters/base.py +++ b/src/adapters/base.py @@ -9,7 +9,16 @@ from typing import Any, AsyncIterator @dataclass class StreamChunk: - """A single chunk from a streaming model response.""" + """A single chunk from a streaming model response. + + Campos legacy (`delta`, `tool_*`, `finish_reason`, `usage`) cubren todo el + flujo OpenAI/Anthropic original. Los `thinking_*` + `block_type`/`block_index` + se anaden para el interleaved thinking de MiniMax M2: el adapter Claude + los emite cuando ve bloques `type=thinking` y los `signature_delta` que el + SDK Anthropic devuelve al cerrar el bloque. El orquestador acumula esos + bloques con su `signature` para reenviarlos en el siguiente turno (sin sig, + MiniMax rechaza el assistant message). + """ delta: str = "" tool_call_id: str = "" @@ -18,6 +27,16 @@ class StreamChunk: finish_reason: str = "" usage: dict[str, int] = field(default_factory=dict) + # Interleaved thinking (MiniMax M2). Default vacios → no-op para callers + # que no los miran (OpenAI adapter, codigo legacy del orquestador). + thinking_delta: str = "" + thinking_signature: str = "" + # block_type ∈ {"text", "thinking", "tool_use", ""} — "" = chunk sin bloque + # asociado (p.ej. solo lleva `usage` o `finish_reason`). + block_type: str = "" + # 0-based, posicion del bloque en el turno. -1 = no aplica. + block_index: int = -1 + @dataclass class ModelResponse: diff --git a/src/adapters/claude_adapter.py b/src/adapters/claude_adapter.py index 7890186..acd5d8b 100644 --- a/src/adapters/claude_adapter.py +++ b/src/adapters/claude_adapter.py @@ -18,18 +18,21 @@ logger = logging.getLogger(__name__) # Algunos fine-tunes (sobre todo MiniMax) ocasionalmente emiten las tool calls -# como texto literal en lugar de usar los `tool_use` blocks nativos: -# -# -# ... -# -# +# como texto literal en lugar de usar los `tool_use` blocks nativos. Vistos +# tres formatos: +# 1) V +# 2) V (sin minimax wrapper) +# 3) {"name":"X","parameters":{...}}{"name":"Y","parameters":{...}} +# (multiples tool calls JSON-encoded dentro de un solo wrapper) # # Cuando eso pasa el orquestador ve "texto" y la tool nunca se ejecuta — el -# usuario ve el XML crudo en el chat. Detectamos y convertimos a tool_use +# usuario ve el markup crudo en el chat. Detectamos y convertimos a tool_use # sintetico mientras streameamos. Es un parche defensivo: el caso normal # (tool_use blocks) sigue por el camino estandar. -_TOOL_CALL_OPEN_RE = re.compile(r"<(?:minimax:tool_call|invoke\s+name)", re.IGNORECASE) +_TOOL_CALL_OPEN_RE = re.compile( + r"<(?:minimax:tool_call|invoke\s+name|tool_call\s*>)|\[TOOL_CALL\]", + re.IGNORECASE, +) _INVOKE_RE = re.compile( r"(.*?)", re.IGNORECASE | re.DOTALL, @@ -38,18 +41,48 @@ _PARAM_RE = re.compile( r"(.*?)", re.IGNORECASE | re.DOTALL, ) +# Formato 3: ...JSON.... El cuerpo puede contener uno +# o varios objetos JSON consecutivos (con o sin commas/newlines entre ellos). +_TOOL_CALL_JSON_BLOCK_RE = re.compile( + r"(.*?)", + re.IGNORECASE | re.DOTALL, +) +# Formato 4: [TOOL_CALL]\n{tool => "X", args => {--key "v" --k2 12}}\n[/TOOL_CALL] +# Sintaxis Perl-ish que MiniMax tambien improvisa. Cada bloque puede contener +# uno o varios "{tool => ..., args => {...}}" consecutivos. +_TOOL_CALL_BRACKET_BLOCK_RE = re.compile( + r"\[TOOL_CALL\](.*?)\[/TOOL_CALL\]", + re.DOTALL, +) +_PERL_TOOL_NAME_RE = re.compile( + r"tool\s*=>\s*[\"']([^\"']+)[\"']", +) +_PERL_ARGS_BLOCK_RE = re.compile( + r"args\s*=>\s*\{(.*?)\}\s*\}\s*(?=\{|\[|$)", + re.DOTALL, +) +# Args estilo `--key "value"` o `--key 12` o `--key true`. +_PERL_KV_RE = re.compile( + r"--([a-zA-Z_][a-zA-Z0-9_]*)\s+(\"[^\"]*\"|\'[^\']*\'|-?\d+(?:\.\d+)?|true|false|null)", +) def _safe_emit_split(buf: str) -> str: """Devuelve el prefijo del buffer que es seguro emitir como texto sin - perder un posible inicio de tag XML que esta llegando fragmentado. + perder un posible inicio de tag de tool_call que esta llegando fragmentado. - Mantenemos en hold los ultimos 30 chars si terminan con `<` o con un - prefijo parcial de ` str: # Si el tail ya tiene `>` cerrado, es un tag normal — emitir todo. if ">" in tail: return buf - # Si el tail puede ser inicio de tool_call/invoke, retenerlo. - candidates = (" str: return buf +def _parse_json_objects(text: str) -> list[dict[str, Any]]: + """Parsea uno o varios objetos JSON consecutivos en `text`. Tolerante a + espacios, newlines y commas entre objetos. Devuelve los que se pudieron + decodificar; salta los malformados.""" + objs: list[dict[str, Any]] = [] + decoder = json.JSONDecoder() + i = 0 + n = len(text) + while i < n: + # Saltar separadores no-JSON + while i < n and text[i] in " \t\r\n,": + i += 1 + if i >= n: + break + try: + obj, end = decoder.raw_decode(text, i) + except json.JSONDecodeError: + # Avanzar 1 char y reintentar; defensivo ante markup raro. + i += 1 + continue + if isinstance(obj, dict): + objs.append(obj) + i = end + return objs + + def _parse_xml_tool_calls(text: str) -> list[dict[str, Any]]: - """Extrae tool calls del texto. Devuelve lista de {id, name, arguments}. - Si no encuentra patrones validos devuelve [].""" - calls = [] + """Extrae tool calls del texto. Cubre tres formatos de fine-tunes: + - V + - ... + - {"name":"X","parameters":{...}}... + + Devuelve lista de {id, name, arguments}. Si no encuentra patrones validos + devuelve [].""" + calls: list[dict[str, Any]] = [] + + # Formato 1+2: ... for m in _INVOKE_RE.finditer(text): name = m.group(1).strip() body = m.group(2) - args = {} + args: dict[str, Any] = {} for p in _PARAM_RE.finditer(body): args[p.group(1).strip()] = p.group(2).strip() if name: @@ -83,6 +149,69 @@ def _parse_xml_tool_calls(text: str) -> list[dict[str, Any]]: "name": name, "arguments": args, }) + + # Formato 3: {json}...{json} + for m in _TOOL_CALL_JSON_BLOCK_RE.finditer(text): + body = m.group(1) + for obj in _parse_json_objects(body): + name = obj.get("name", "") or "" + # Algunos fine-tunes usan "parameters", otros "arguments", otros "input" + args_val = ( + obj.get("parameters") + or obj.get("arguments") + or obj.get("input") + or {} + ) + if isinstance(args_val, str): + # Si llega stringificado, intentar parsearlo + try: + args_val = json.loads(args_val) + except (json.JSONDecodeError, TypeError): + args_val = {"_raw": args_val} + if not isinstance(args_val, dict): + args_val = {"_raw": str(args_val)} + if name: + calls.append({ + "id": "xml_{}".format(uuid.uuid4().hex[:12]), + "name": str(name), + "arguments": args_val, + }) + + # Formato 4: [TOOL_CALL]{tool => "X", args => {--k "v" --k2 12}}{...}[/TOOL_CALL] + for m in _TOOL_CALL_BRACKET_BLOCK_RE.finditer(text): + body = m.group(1) + # Extraer pares (name, args_block). Recorremos por nombre y el bloque + # de args lo extraemos por proximidad textual. + names = list(_PERL_TOOL_NAME_RE.finditer(body)) + for i, nm in enumerate(names): + name = nm.group(1).strip() + # Cuerpo de args entre la posicion de este nombre y el siguiente + # (o final del bloque). + start = nm.end() + end = names[i + 1].start() if i + 1 < len(names) else len(body) + segment = body[start:end] + args: dict[str, Any] = {} + for kv in _PERL_KV_RE.finditer(segment): + k = kv.group(1) + v = kv.group(2) + if v.startswith('"') or v.startswith("'"): + args[k] = v[1:-1] + elif v in ("true", "false"): + args[k] = (v == "true") + elif v == "null": + args[k] = None + else: + try: + args[k] = int(v) if "." not in v else float(v) + except ValueError: + args[k] = v + if name: + calls.append({ + "id": "xml_{}".format(uuid.uuid4().hex[:12]), + "name": name, + "arguments": args, + }) + return calls @@ -113,6 +242,20 @@ class ClaudeAdapter(ModelAdapter): def __init__(self, api_key: str | None = None, base_url: str | None = None) -> None: kwargs: dict[str, Any] = { "api_key": api_key or settings.anthropic_api_key, + # Timeout granular: el endpoint MiniMax a veces se queda colgado sin + # devolver respuesta ni cerrar la conexion. Sin timeout explicito el + # stream queda pendiente para siempre. 120s total por request es + # generoso (M2 con thinking puede tardar 30-60s en respuestas largas) + # pero acota el peor caso. + "timeout": anthropic.Timeout( + connect=10.0, + read=120.0, + write=30.0, + pool=10.0, + ), + # Cero retries internos del SDK — manejamos retries en stream() con + # backoff propio (_RETRY_DELAYS). + "max_retries": 0, } url = base_url or settings.anthropic_base_url if url: @@ -178,6 +321,16 @@ class ClaudeAdapter(ModelAdapter): in_xml_capture = False xml_buffer = "" + # Interleaved thinking (MiniMax M2): el SDK emite un block + # con type=thinking, le siguen thinking_delta y al cerrar + # devuelve un signature criptografico. Trackeamos el indice + # de bloque actual para que el orquestador pueda reconstruir + # el assistant turn en orden. + current_block_index = -1 + current_block_type = "" + current_thinking_chars = 0 # solo para log al cerrar + current_thinking_sig_emitted = False + async for event in stream: yielded_any = True if event.type == "message_start" and hasattr(event, "message"): @@ -187,14 +340,30 @@ class ClaudeAdapter(ModelAdapter): if event.type == "content_block_start": block = event.content_block - if block.type == "tool_use": + current_block_index += 1 + current_block_type = getattr(block, "type", "") + if current_block_type == "tool_use": current_tool_id = block.id current_tool_name = block.name accumulated_args = "" yield StreamChunk( tool_call_id=current_tool_id, tool_name=current_tool_name, + block_type="tool_use", + block_index=current_block_index, ) + elif current_block_type == "thinking": + # Reset contadores y emitimos un "header" para + # que el orquestador registre que arranca un + # bloque thinking en este indice. + current_thinking_chars = 0 + current_thinking_sig_emitted = False + yield StreamChunk( + block_type="thinking", + block_index=current_block_index, + ) + # block_type == "text" no necesita header — los + # text_delta ya llevaran el indice. continue if event.type == "content_block_delta": @@ -211,7 +380,11 @@ class ClaudeAdapter(ModelAdapter): # legitimo que el modelo escribio antes del XML). prev = text_buffer[:m.start()] if prev: - yield StreamChunk(delta=prev) + yield StreamChunk( + delta=prev, + block_type="text", + block_index=current_block_index, + ) in_xml_capture = True xml_buffer = text_buffer[m.start():] text_buffer = "" @@ -221,7 +394,11 @@ class ClaudeAdapter(ModelAdapter): # esperamos al siguiente delta antes de emitir. safe = _safe_emit_split(text_buffer) if safe: - yield StreamChunk(delta=safe) + yield StreamChunk( + delta=safe, + block_type="text", + block_index=current_block_index, + ) text_buffer = text_buffer[len(safe):] elif delta.type == "input_json_delta": accumulated_args += delta.partial_json @@ -229,26 +406,81 @@ class ClaudeAdapter(ModelAdapter): tool_call_id=current_tool_id, tool_name=current_tool_name, tool_arguments=delta.partial_json, + block_type="tool_use", + block_index=current_block_index, ) + elif delta.type == "thinking_delta": + txt = getattr(delta, "thinking", "") or "" + current_thinking_chars += len(txt) + yield StreamChunk( + thinking_delta=txt, + block_type="thinking", + block_index=current_block_index, + ) + elif delta.type == "signature_delta": + sig = getattr(delta, "signature", "") or "" + if sig: + current_thinking_sig_emitted = True + yield StreamChunk( + thinking_signature=sig, + block_type="thinking", + block_index=current_block_index, + ) continue if event.type == "content_block_stop": + # Si el bloque cerrado es thinking y el signature + # no llego como signature_delta, intentar leerlo + # del content_block ya completo (algunos SDK lo + # exponen aqui). + if current_block_type == "thinking": + if not current_thinking_sig_emitted: + cb = getattr(event, "content_block", None) + sig = getattr(cb, "signature", "") if cb else "" + if sig: + yield StreamChunk( + thinking_signature=sig, + block_type="thinking", + block_index=current_block_index, + ) + current_thinking_sig_emitted = True + else: + logger.warning( + "Thinking block #%d cerrado sin signature (%d chars). " + "MiniMax rechazara el siguiente turno si lo reenviamos.", + current_block_index, current_thinking_chars, + ) + logger.info( + "[adapter] thinking block #%d: %d chars, sig=%s", + current_block_index, current_thinking_chars, + "yes" if current_thinking_sig_emitted else "MISSING", + ) if current_tool_id and accumulated_args: yield StreamChunk( tool_call_id=current_tool_id, tool_name=current_tool_name, tool_arguments=accumulated_args, finish_reason="tool_use", + block_type="tool_use", + block_index=current_block_index, ) current_tool_id = "" current_tool_name = "" accumulated_args = "" + current_block_type = "" + current_thinking_chars = 0 + current_thinking_sig_emitted = False continue if event.type == "message_delta": # Antes de cerrar, vaciar buffers. if in_xml_capture and xml_buffer: # Parsear el XML capturado y emitir tool_use sinteticos. + # Asignamos block_index sintetico a cada XML tool call + # para que el orquestador pueda registrarlo en + # turn_blocks_by_index. Si no, el assistant message + # iria sin el tool_use pero el tool_result sí lo + # referenciaria → MiniMax devuelve 400. calls = _parse_xml_tool_calls(xml_buffer) if calls: logger.info( @@ -256,24 +488,38 @@ class ClaudeAdapter(ModelAdapter): len(calls), ) for c in calls: + current_block_index += 1 + synthetic_idx = current_block_index yield StreamChunk( tool_call_id=c["id"], tool_name=c["name"], + block_type="tool_use", + block_index=synthetic_idx, ) yield StreamChunk( tool_call_id=c["id"], tool_name=c["name"], tool_arguments=json.dumps(c["arguments"]), finish_reason="tool_use", + block_type="tool_use", + block_index=synthetic_idx, ) else: # No se pudo parsear — devolver al usuario el # texto crudo para no perderlo silenciosamente. - yield StreamChunk(delta=xml_buffer) + yield StreamChunk( + delta=xml_buffer, + block_type="text", + block_index=current_block_index, + ) xml_buffer = "" in_xml_capture = False elif text_buffer: - yield StreamChunk(delta=text_buffer) + yield StreamChunk( + delta=text_buffer, + block_type="text", + block_index=current_block_index, + ) text_buffer = "" output_tokens = getattr(event.usage, "output_tokens", 0) if event.usage else 0 # Si convertimos XML a tool_use, override el stop_reason. @@ -404,11 +650,33 @@ class ClaudeAdapter(ModelAdapter): - role=tool → role=user with tool_result content blocks - assistant with tool_calls → assistant with tool_use content blocks - Consecutive same-role messages get merged (Claude requires alternating) + - Fast-path: si content ya viene como list (Anthropic-style nativo, p.ej. + messages emitidos por BaseAgent con interleaved thinking de M2), pasa + tal cual y solo hace merge con el anterior si toca. """ converted: list[dict[str, Any]] = [] for m in messages: role = m.get("role", "") + content = m.get("content") + + # Fast-path Anthropic-style: content ya es lista de blocks. + if isinstance(content, list) and role in ("user", "assistant"): + if converted and converted[-1]["role"] == role: + prev = converted[-1]["content"] + if isinstance(prev, list): + prev.extend(content) + elif isinstance(prev, str): + merged: list[dict[str, Any]] = [] + if prev: + merged.append({"type": "text", "text": prev}) + merged.extend(content) + converted[-1]["content"] = merged + else: + converted[-1]["content"] = list(content) + else: + converted.append({"role": role, "content": list(content)}) + continue if role == "tool": # Convert to user message with tool_result block diff --git a/src/context/compactor.py b/src/context/compactor.py index 3a9389f..595b902 100644 --- a/src/context/compactor.py +++ b/src/context/compactor.py @@ -187,19 +187,55 @@ class ContextCompactor: (i for i, m in enumerate(compacted) if m.get("role") == "user"), default=-1, ) + # Tool messages legacy (role=tool) y nuevos (role=user con tool_result blocks) tool_indexes = [i for i, m in enumerate(compacted) if m.get("role") == "tool"] + # Indices de user messages que contienen tool_result blocks (Anthropic-style) + user_tool_result_indexes = [ + i for i, m in enumerate(compacted) + if m.get("role") == "user" + and isinstance(m.get("content"), list) + and any( + isinstance(b, dict) and b.get("type") == "tool_result" + for b in m["content"] + ) + ] + # Combinamos para aplicar la misma politica de "preservar los ultimos N raw" + all_tool_carriers = tool_indexes + user_tool_result_indexes + all_tool_carriers.sort() keep_raw_tool_indexes = ( - set(tool_indexes[-recent_raw_limit:]) + set(all_tool_carriers[-recent_raw_limit:]) if recent_raw_limit > 0 else set() ) + def _truncate_tool_result_blocks(msg: dict[str, Any], char_limit: int) -> bool: + """Trunca el campo `content` de los tool_result blocks de un user + message con content list. Devuelve True si modifico algo.""" + modified = False + content = msg.get("content") + if not isinstance(content, list): + return False + for block in content: + if not isinstance(block, dict) or block.get("type") != "tool_result": + continue + bc = block.get("content", "") + if isinstance(bc, str) and len(bc) > char_limit: + block["content"] = bc[:char_limit] + modified = True + return modified + for idx in keep_raw_tool_indexes: - content = compacted[idx].get("content", "") + msg = compacted[idx] + content = msg.get("content", "") if isinstance(content, str) and content: truncated = content[:raw_char_limit] if truncated != content: - compacted[idx]["content"] = truncated + msg["content"] = truncated + meta["messages_compacted"] += 1 + meta["tool_messages_compacted"] += 1 + meta["raw_tool_results_kept"] += 1 + elif isinstance(content, list): + if _truncate_tool_result_blocks(msg, raw_char_limit): meta["messages_compacted"] += 1 meta["tool_messages_compacted"] += 1 meta["raw_tool_results_kept"] += 1 @@ -271,20 +307,49 @@ class ContextCompactor: if total <= max_tokens: break + # Last-resort: drop thinking blocks (M2 interleaved) de assistant + # messages que NO sean los 2 ultimos turnos. Ahorra muchisimo sin + # perder utilidad — los thinking de turnos lejanos ya cumplieron. + if total > max_tokens: + assistant_indexes = [ + i for i, m in enumerate(compacted) + if m.get("role") == "assistant" and isinstance(m.get("content"), list) + ] + # Conservar los thinking de los ultimos 2 assistants; descartar el resto. + droppable = assistant_indexes[:-2] if len(assistant_indexes) > 2 else [] + for idx in droppable: + content = compacted[idx]["content"] + new_content = [b for b in content if not (isinstance(b, dict) and b.get("type") == "thinking")] + if len(new_content) != len(content): + compacted[idx]["content"] = new_content + meta["messages_compacted"] += 1 + meta["assistant_messages_compacted"] += 1 + total = sum(self._estimate_message_tokens(m) for m in compacted) + if total <= max_tokens: + break + if total > max_tokens: for idx, message in enumerate(compacted): if idx == last_user_idx: continue role = message.get("role", "") content = message.get("content", "") - if not isinstance(content, str) or not content: + if isinstance(content, str) and content: + if role == "tool": + message["content"] = "[TOOL RESULT COMPACTADO]" + elif role == "assistant": + message["content"] = "[ASSISTANT COMPACTADO]" + elif role == "user": + message["content"] = "[USER CONTEXT COMPACTADO]" + elif isinstance(content, list) and content: + # Anthropic-style: reemplazar lista entera por placeholder string. + # Nota: pierde tool_use ids — solo aplicar al final como ultimo recurso. + if role == "assistant": + message["content"] = "[ASSISTANT COMPACTADO]" + elif role == "user": + message["content"] = "[USER CONTEXT COMPACTADO]" + else: continue - if role == "tool": - message["content"] = "[TOOL RESULT COMPACTADO]" - elif role == "assistant": - message["content"] = "[ASSISTANT COMPACTADO]" - elif role == "user": - message["content"] = "[USER CONTEXT COMPACTADO]" total = sum(self._estimate_message_tokens(m) for m in compacted) if total <= max_tokens: break @@ -575,7 +640,30 @@ class ContextCompactor: @staticmethod def _estimate_message_tokens(message: dict[str, Any]) -> int: content = message.get("content", "") - tokens = estimate_tokens(content if isinstance(content, str) else str(content)) + if isinstance(content, str): + tokens = estimate_tokens(content) + elif isinstance(content, list): + # Anthropic-style content blocks (interleaved thinking M2). + tokens = 0 + for block in content: + if not isinstance(block, dict): + tokens += estimate_tokens(str(block)) + continue + btype = block.get("type", "") + if btype == "text": + tokens += estimate_tokens(block.get("text", "")) + elif btype == "thinking": + tokens += estimate_tokens(block.get("thinking", "")) + elif btype == "tool_use": + tokens += estimate_tokens(block.get("name", "")) + tokens += estimate_tokens(str(block.get("input", ""))) + elif btype == "tool_result": + tc = block.get("content", "") + tokens += estimate_tokens(tc if isinstance(tc, str) else str(tc)) + else: + tokens += estimate_tokens(str(block)) + else: + tokens = estimate_tokens(str(content)) if message.get("tool_calls"): tokens += estimate_tokens(json.dumps(message.get("tool_calls", []), ensure_ascii=False)) return tokens diff --git a/src/context/engine.py b/src/context/engine.py index 3f4939b..8e06d98 100644 --- a/src/context/engine.py +++ b/src/context/engine.py @@ -825,7 +825,11 @@ class ContextEngine: sanitized: dict[str, Any] = {"role": role} content = message.get("content", "") - if isinstance(content, str) and content: + # Anthropic-style content list (blocks: thinking/text/tool_use/tool_result) + # se preserva tal cual — necesario para interleaved thinking de M2. + if isinstance(content, list) and content: + sanitized["content"] = content + elif isinstance(content, str) and content: sanitized["content"] = content if role == "assistant": @@ -848,6 +852,28 @@ class ContextEngine: content = message.get("content", "") if isinstance(content, str): return estimate_tokens(content) + if isinstance(content, list): + # Sumar tokens de cada bloque por su campo correspondiente. + total = 0 + for block in content: + if not isinstance(block, dict): + total += estimate_tokens(str(block)) + continue + btype = block.get("type", "") + if btype == "text": + total += estimate_tokens(block.get("text", "")) + elif btype == "thinking": + total += estimate_tokens(block.get("thinking", "")) + # signature es opaque — no cuenta tokens significativos + elif btype == "tool_use": + total += estimate_tokens(block.get("name", "")) + total += estimate_tokens(str(block.get("input", ""))) + elif btype == "tool_result": + tc = block.get("content", "") + total += estimate_tokens(tc if isinstance(tc, str) else str(tc)) + else: + total += estimate_tokens(str(block)) + return total return estimate_tokens(str(content)) @staticmethod diff --git a/src/orchestrator/agents/base.py b/src/orchestrator/agents/base.py index 1888897..056432a 100644 --- a/src/orchestrator/agents/base.py +++ b/src/orchestrator/agents/base.py @@ -89,6 +89,13 @@ class BaseAgent: full_text = "" tool_calls: list[dict[str, Any]] = [] active_tools: dict[str, dict[str, Any]] = {} + # Acumuladores Anthropic-style por turno (interleaved thinking M2). + # Por cada block_index guardamos un dict block parcial. Al cerrar el + # turno, lo serializamos en orden. + turn_blocks_by_index: dict[int, dict[str, Any]] = {} + # Cuando text_delta llega sin block_index (p.ej. via OpenAI adapter + # legacy), asignamos un sintetico para no perder el texto. + synthetic_text_idx = 10_000 async for chunk in self.model.stream( messages=ctx.to_messages(), @@ -97,6 +104,16 @@ class BaseAgent: ): if chunk.delta: full_text += chunk.delta + # Acumular por block_index para reconstruir blocks. + idx = chunk.block_index + if idx < 0: + idx = synthetic_text_idx + blk = turn_blocks_by_index.get(idx) + if blk is None: + blk = {"type": "text", "text": ""} + turn_blocks_by_index[idx] = blk + if blk.get("type") == "text": + blk["text"] = blk.get("text", "") + chunk.delta if self.profile.stream_deltas: await self.sse.emit( EventType.AGENT_DELTA, @@ -108,12 +125,31 @@ class BaseAgent: session_id=session.session_id, ) + # Thinking deltas (MiniMax M2 interleaved). El adapter ya viene + # con block_index correcto; solo acumulamos. + if chunk.thinking_delta and chunk.block_index >= 0: + blk = turn_blocks_by_index.get(chunk.block_index) + if blk is None: + blk = {"type": "thinking", "thinking": "", "signature": ""} + turn_blocks_by_index[chunk.block_index] = blk + if blk.get("type") == "thinking": + blk["thinking"] = blk.get("thinking", "") + chunk.thinking_delta + + if chunk.thinking_signature and chunk.block_index >= 0: + blk = turn_blocks_by_index.get(chunk.block_index) + if blk is None: + blk = {"type": "thinking", "thinking": "", "signature": ""} + turn_blocks_by_index[chunk.block_index] = blk + if blk.get("type") == "thinking": + blk["signature"] = chunk.thinking_signature + if chunk.tool_name and chunk.tool_call_id: if chunk.tool_call_id not in active_tools: active_tools[chunk.tool_call_id] = { "id": chunk.tool_call_id, "name": chunk.tool_name, "arguments": "", + "block_index": chunk.block_index, } await self.sse.emit( EventType.TOOL_STARTED, @@ -144,6 +180,7 @@ class BaseAgent: "id": chunk.tool_call_id, "name": chunk.tool_name or "", "arguments": "", + "block_index": chunk.block_index, } final_args = tool["arguments"] or chunk.tool_arguments or "" try: @@ -168,6 +205,16 @@ class BaseAgent: tool["parsed_arguments"] = args tool_calls.append(tool) + # Registrar tool_use block en su posicion del turno. + bidx = tool.get("block_index", -1) + if bidx >= 0: + turn_blocks_by_index[bidx] = { + "type": "tool_use", + "id": tool["id"], + "name": tool["name"], + "input": args, + } + # Accumulate token usage from any chunk that has it if chunk.usage: total_input_tokens += chunk.usage.get("input_tokens", 0) @@ -178,39 +225,80 @@ class BaseAgent: accumulated_content += full_text + # Materializar blocks del turno en orden por block_index. + # Filtra thinking blocks sin signature: MiniMax los rechazaria al + # reenviarlos. Mejor descartar el thinking entero que mandar uno + # corrupto y ver un 400. + turn_blocks: list[dict[str, Any]] = [] + for idx in sorted(turn_blocks_by_index.keys()): + b = turn_blocks_by_index[idx] + if b.get("type") == "thinking": + if not b.get("signature"): + logger.warning( + "Drop thinking block at idx=%d (no signature) — chars=%d", + idx, len(b.get("thinking", "")), + ) + continue + # Limpiar texto vacio defensivo. + if not b.get("thinking"): + continue + turn_blocks.append(b) + + # Backstop: garantizar que CADA tool_call tenga su tool_use block + # en turn_blocks. Si no lo tiene (chunks sin block_index, adapter + # legacy, etc.), apendearlo al final. Sin esto, MiniMax devuelve + # 400 ("tool result's tool id not found") en el siguiente request. + tool_use_ids_in_blocks = { + b.get("id") for b in turn_blocks + if b.get("type") == "tool_use" and b.get("id") + } + for tc in tool_calls: + if tc["id"] not in tool_use_ids_in_blocks: + turn_blocks.append({ + "type": "tool_use", + "id": tc["id"], + "name": tc["name"], + "input": tc.get("parsed_arguments", {}), + }) + tool_use_ids_in_blocks.add(tc["id"]) + # If no tool calls, we're done if not tool_calls: - # Add final assistant message to conversation - if full_text: + if turn_blocks: + conversation.append({"role": "assistant", "content": turn_blocks}) + elif full_text: + # Fallback (no debiera ocurrir si el adapter emite block_index). conversation.append({"role": "assistant", "content": full_text}) break - # Add assistant message with tool calls to conversation - # (OpenAI format: assistant message carries tool_calls) - assistant_msg: dict[str, Any] = {"role": "assistant"} - if full_text: - assistant_msg["content"] = full_text - assistant_msg["tool_calls"] = [ - { - "id": tc["id"], - "type": "function", - "function": { - "name": tc["name"], - "arguments": json.dumps(tc.get("parsed_arguments", {})), - }, - } - for tc in tool_calls - ] - conversation.append(assistant_msg) + # Push del assistant turn con TODOS los blocks (thinking+text+tool_use). + # Esto preserva la cadena de razonamiento de M2 entre turnos. + if turn_blocks: + conversation.append({"role": "assistant", "content": turn_blocks}) + else: + # Fallback OpenAI-style si no hay blocks (modelo legacy o sin + # block_index). Mantenemos compat con OpenAIAdapter / cualquier + # adapter que no propague block_index. + assistant_msg: dict[str, Any] = {"role": "assistant"} + if full_text: + assistant_msg["content"] = full_text + assistant_msg["tool_calls"] = [ + { + "id": tc["id"], + "type": "function", + "function": { + "name": tc["name"], + "arguments": json.dumps(tc.get("parsed_arguments", {})), + }, + } + for tc in tool_calls + ] + conversation.append(assistant_msg) - # Execute tool calls and add COMPLETE results to conversation. - # Antes habia dos capas anti-duplicado: (a) cachear resultado y - # devolver "[DUPLICADO]" en lugar de re-ejecutar y (b) cortar el - # step si TODAS las llamadas del paso eran duplicadas. Las quitamos - # porque en conversaciones largas el agente puede LEGITIMAMENTE - # repetir una llamada (p.ej. re-leer un fichero tras editarlo) y - # las heuristicas bloqueaban acciones validas. El usuario prefiere - # libertad — runaway loops se mitigan con limit de steps externo. + # Execute tool calls. Los results se agrupan en UN solo user message + # con array de tool_result blocks (formato Anthropic). Anteriormente + # se hacian N appends `{"role":"tool",...}` en formato OpenAI. + tool_result_blocks: list[dict[str, Any]] = [] for tc in tool_calls: # Si los args no se pudieron parsear (p.ej. truncados por max_tokens), # NO ejecutamos la tool. En su lugar devolvemos un mensaje al modelo @@ -218,9 +306,9 @@ class BaseAgent: # (dividir el contenido, acortar, etc.). if tc.get("parse_error"): pe = tc["parse_error"] - conversation.append({ - "role": "tool", - "tool_call_id": tc["id"], + tool_result_blocks.append({ + "type": "tool_result", + "tool_use_id": tc["id"], "content": ( f"[ERROR] No se pudieron parsear los argumentos del tool " f"'{tc['name']}'. Los argumentos llegaron truncados o mal " @@ -230,6 +318,7 @@ class BaseAgent: f"Reintenta dividiendo el contenido en varios tool calls mas " f"pequenos o reduciendo el tamano del argumento 'content'." ), + "is_error": True, }) continue @@ -242,10 +331,9 @@ class BaseAgent: ) tool_executions.append(tool_exec) - # COMPLETE result in conversation (truncated to safe limit) - conversation.append({ - "role": "tool", - "tool_call_id": tc["id"], + tool_result_blocks.append({ + "type": "tool_result", + "tool_use_id": tc["id"], "content": ( tool_exec.raw_output[:settings.tool_raw_output_max_chars] if tool_exec.raw_output @@ -253,6 +341,9 @@ class BaseAgent: ), }) + if tool_result_blocks: + conversation.append({"role": "user", "content": tool_result_blocks}) + return { "content": accumulated_content, "artifacts": artifacts, @@ -285,9 +376,20 @@ class BaseAgent: start = time.monotonic() try: - if self.mcp.is_running and tool_name in self.mcp.tools: - result = await self.mcp.call_tool(tool_name, arguments) - raw_output = self._extract_mcp_output(result) + if self.mcp.is_running: + # Intentar llamada directa: call_tool tiene fallback bare-name + # via _resolve_tool, asi que aunque venga sin prefijo + # `acai_code__` (caso comun cuando el modelo emite XML inline) + # se resuelve solo. El check `tool_name in self.mcp.tools` que + # haciamos antes era demasiado estricto y rechazaba bare names. + try: + result = await self.mcp.call_tool(tool_name, arguments) + raw_output = self._extract_mcp_output(result) + except Exception as resolve_err: + raw_output = ( + f"Tool '{tool_name}' no disponible o fallo al resolver: " + f"{str(resolve_err)[:200]}" + ) else: raw_output = f"Tool '{tool_name}' not available via MCP." diff --git a/src/orchestrator/engine.py b/src/orchestrator/engine.py index e08edf5..8ec2633 100644 --- a/src/orchestrator/engine.py +++ b/src/orchestrator/engine.py @@ -253,7 +253,10 @@ class OrchestratorEngine: sanitized: dict[str, Any] = {"role": role} content = message.get("content", "") - if isinstance(content, str) and content: + # Anthropic-style content list (interleaved thinking) → preservar tal cual. + if isinstance(content, list) and content: + sanitized["content"] = content + elif isinstance(content, str) and content: sanitized["content"] = content if role == "assistant":