Ajustes de estructura

This commit is contained in:
Jordan Diaz
2026-04-28 20:25:09 +00:00
parent 6881d64a08
commit 3af875ed11
6 changed files with 279 additions and 84 deletions

View File

@@ -5,6 +5,8 @@ from __future__ import annotations
import asyncio
import json
import logging
import re
import uuid
from typing import Any, AsyncIterator
import anthropic
@@ -15,6 +17,75 @@ from .base import ModelAdapter, ModelConfig, ModelResponse, StreamChunk
logger = logging.getLogger(__name__)
# Algunos fine-tunes (sobre todo MiniMax) ocasionalmente emiten las tool calls
# como texto literal en lugar de usar los `tool_use` blocks nativos:
# <minimax:tool_call>
# <invoke name="acai_code__acai_view">
# <parameter name="file_path">...</parameter>
# </invoke>
# </minimax:tool_call>
#
# Cuando eso pasa el orquestador ve "texto" y la tool nunca se ejecuta — el
# usuario ve el XML crudo en el chat. Detectamos y convertimos a tool_use
# sintetico mientras streameamos. Es un parche defensivo: el caso normal
# (tool_use blocks) sigue por el camino estandar.
_TOOL_CALL_OPEN_RE = re.compile(r"<(?:minimax:tool_call|invoke\s+name)", re.IGNORECASE)
_INVOKE_RE = re.compile(
r"<invoke\s+name=\"([^\"]+)\"\s*>(.*?)</invoke>",
re.IGNORECASE | re.DOTALL,
)
_PARAM_RE = re.compile(
r"<parameter\s+name=\"([^\"]+)\"\s*>(.*?)</parameter>",
re.IGNORECASE | re.DOTALL,
)
def _safe_emit_split(buf: str) -> str:
"""Devuelve el prefijo del buffer que es seguro emitir como texto sin
perder un posible inicio de tag XML que esta llegando fragmentado.
Mantenemos en hold los ultimos 30 chars si terminan con `<` o con un
prefijo parcial de `<minimax:tool_call` / `<invoke`. Si el buffer es
largo y no termina con `<`, todo es seguro.
"""
if not buf:
return ""
# Buscar el ultimo `<` y comprobar si lo que sigue puede ser apertura.
idx = buf.rfind("<")
if idx == -1:
return buf
tail = buf[idx:]
# Si el tail ya tiene `>` cerrado, es un tag normal — emitir todo.
if ">" in tail:
return buf
# Si el tail puede ser inicio de tool_call/invoke, retenerlo.
candidates = ("<minimax:tool_call", "<invoke")
for cand in candidates:
if cand.startswith(tail.lower()) or tail.lower().startswith(cand[:len(tail)].lower()):
return buf[:idx]
# No coincide con ninguna apertura sospechosa — emitir todo.
return buf
def _parse_xml_tool_calls(text: str) -> list[dict[str, Any]]:
"""Extrae tool calls del texto. Devuelve lista de {id, name, arguments}.
Si no encuentra patrones validos devuelve []."""
calls = []
for m in _INVOKE_RE.finditer(text):
name = m.group(1).strip()
body = m.group(2)
args = {}
for p in _PARAM_RE.finditer(body):
args[p.group(1).strip()] = p.group(2).strip()
if name:
calls.append({
"id": "xml_{}".format(uuid.uuid4().hex[:12]),
"name": name,
"arguments": args,
})
return calls
# Errores transitorios del proxy del modelo (MiniMax/Anthropic). Reintentamos
# con backoff exponencial: 1s, 3s, 9s. 529 es overloaded_error de Anthropic;
# 429 rate-limit; 503 service unavailable.
@@ -98,6 +169,14 @@ class ClaudeAdapter(ModelAdapter):
current_tool_name = ""
accumulated_args = ""
input_tokens = 0
# Buffer + flag para detectar XML tool calls inline (MiniMax).
# En modo "text", emitimos delta directamente. Si vemos `<invoke`
# o `<minimax:tool_call`, pasamos a modo "buffer" y dejamos de
# emitir hasta cerrar el bloque o terminar el mensaje. Al final
# parseamos y emitimos un tool_use sintetico.
text_buffer = ""
in_xml_capture = False
xml_buffer = ""
async for event in stream:
yielded_any = True
@@ -121,7 +200,29 @@ class ClaudeAdapter(ModelAdapter):
if event.type == "content_block_delta":
delta = event.delta
if delta.type == "text_delta":
yield StreamChunk(delta=delta.text)
text_buffer += delta.text
if in_xml_capture:
xml_buffer += delta.text
else:
# Detectar inicio del bloque XML antes de emitir.
m = _TOOL_CALL_OPEN_RE.search(text_buffer)
if m:
# Emitir el texto previo al match (texto
# legitimo que el modelo escribio antes del XML).
prev = text_buffer[:m.start()]
if prev:
yield StreamChunk(delta=prev)
in_xml_capture = True
xml_buffer = text_buffer[m.start():]
text_buffer = ""
else:
# Holdback: si el final del buffer parece
# arrancar una apertura ('<', '<i', '<inv'...)
# esperamos al siguiente delta antes de emitir.
safe = _safe_emit_split(text_buffer)
if safe:
yield StreamChunk(delta=safe)
text_buffer = text_buffer[len(safe):]
elif delta.type == "input_json_delta":
accumulated_args += delta.partial_json
yield StreamChunk(
@@ -145,9 +246,40 @@ class ClaudeAdapter(ModelAdapter):
continue
if event.type == "message_delta":
# Antes de cerrar, vaciar buffers.
if in_xml_capture and xml_buffer:
# Parsear el XML capturado y emitir tool_use sinteticos.
calls = _parse_xml_tool_calls(xml_buffer)
if calls:
logger.info(
"Detected %d inline XML tool call(s) — converting to tool_use",
len(calls),
)
for c in calls:
yield StreamChunk(
tool_call_id=c["id"],
tool_name=c["name"],
)
yield StreamChunk(
tool_call_id=c["id"],
tool_name=c["name"],
tool_arguments=json.dumps(c["arguments"]),
finish_reason="tool_use",
)
else:
# No se pudo parsear — devolver al usuario el
# texto crudo para no perderlo silenciosamente.
yield StreamChunk(delta=xml_buffer)
xml_buffer = ""
in_xml_capture = False
elif text_buffer:
yield StreamChunk(delta=text_buffer)
text_buffer = ""
output_tokens = getattr(event.usage, "output_tokens", 0) if event.usage else 0
# Si convertimos XML a tool_use, override el stop_reason.
stop_reason = event.delta.stop_reason or ""
yield StreamChunk(
finish_reason=event.delta.stop_reason or "",
finish_reason=stop_reason,
usage={
"input_tokens": input_tokens,
"output_tokens": output_tokens,

View File

@@ -64,8 +64,6 @@ class BaseAgent:
total_output_tokens = 0
# Real conversation history: assistant messages + tool results
conversation: list[dict[str, Any]] = []
tool_fingerprints: dict[str, ToolExecution] = {}
all_duplicates_streak = 0 # consecutive steps where ALL calls are duplicates
for step in range(max_steps):
# Build context with real conversation
@@ -205,14 +203,19 @@ class BaseAgent:
]
conversation.append(assistant_msg)
# Execute tool calls and add COMPLETE results to conversation
duplicates_this_step = 0
# Execute tool calls and add COMPLETE results to conversation.
# Antes habia dos capas anti-duplicado: (a) cachear resultado y
# devolver "[DUPLICADO]" en lugar de re-ejecutar y (b) cortar el
# step si TODAS las llamadas del paso eran duplicadas. Las quitamos
# porque en conversaciones largas el agente puede LEGITIMAMENTE
# repetir una llamada (p.ej. re-leer un fichero tras editarlo) y
# las heuristicas bloqueaban acciones validas. El usuario prefiere
# libertad — runaway loops se mitigan con limit de steps externo.
for tc in tool_calls:
# Si los args no se pudieron parsear (p.ej. truncados por max_tokens),
# NO ejecutamos la tool. En su lugar devolvemos un mensaje al modelo
# explicando el problema para que pueda ajustar el siguiente intento
# (dividir el contenido, acortar, etc.). Fingerprint incluye el hash
# del raw para distinguir fallos distintos.
# (dividir el contenido, acortar, etc.).
if tc.get("parse_error"):
pe = tc["parse_error"]
conversation.append({
@@ -229,24 +232,6 @@ class BaseAgent:
),
})
continue
fp_raw = f"{tc['name']}:{json.dumps(tc.get('parsed_arguments', {}), sort_keys=True)}"
fp = hashlib.md5(fp_raw.encode()).hexdigest()
if fp in tool_fingerprints:
prev_exec = tool_fingerprints[fp]
tool_executions.append(prev_exec)
duplicates_this_step += 1
# Return cached result as tool message
conversation.append({
"role": "tool",
"tool_call_id": tc["id"],
"content": (
"[DUPLICADO] Ya ejecutada con mismos argumentos. Resultado: "
f"{prev_exec.raw_output[:settings.tool_raw_output_max_chars]}"
),
})
logger.warning("Duplicate tool call skipped: %s (fingerprint: %s)", tc["name"], fp[:8])
continue
tool_exec = await self._execute_tool(
session=session,
@@ -255,7 +240,6 @@ class BaseAgent:
artifacts=artifacts,
tool_call_id=tc["id"],
)
tool_fingerprints[fp] = tool_exec
tool_executions.append(tool_exec)
# COMPLETE result in conversation (truncated to safe limit)
@@ -269,32 +253,6 @@ class BaseAgent:
),
})
# Loop detection: if ALL tool calls in this step were duplicates
if duplicates_this_step == len(tool_calls):
all_duplicates_streak += 1
if all_duplicates_streak >= 2:
logger.warning("Loop detected: %d consecutive steps with all duplicate calls. Breaking.", all_duplicates_streak)
conversation.append({
"role": "user",
"content": "[SISTEMA] Se detectaron llamadas repetidas. Ya tienes toda la información necesaria. Genera tu respuesta final ahora.",
})
# One more chance to generate a final response
ctx = await self.context.build_context(
session=session, agent=self.profile,
artifacts=artifacts, conversation=conversation,
)
async for chunk in self.model.stream(
messages=ctx.to_messages(),
config=config,
):
if chunk.delta:
accumulated_content += chunk.delta
if chunk.finish_reason:
break
break
else:
all_duplicates_streak = 0
return {
"content": accumulated_content,
"artifacts": artifacts,