Mas cosas

This commit is contained in:
Jordan Diaz
2026-05-06 07:07:57 +00:00
parent 8875cb29cb
commit 06ce51a9c1
9 changed files with 643 additions and 80 deletions

View File

@@ -89,6 +89,13 @@ class BaseAgent:
full_text = ""
tool_calls: list[dict[str, Any]] = []
active_tools: dict[str, dict[str, Any]] = {}
# Acumuladores Anthropic-style por turno (interleaved thinking M2).
# Por cada block_index guardamos un dict block parcial. Al cerrar el
# turno, lo serializamos en orden.
turn_blocks_by_index: dict[int, dict[str, Any]] = {}
# Cuando text_delta llega sin block_index (p.ej. via OpenAI adapter
# legacy), asignamos un sintetico para no perder el texto.
synthetic_text_idx = 10_000
async for chunk in self.model.stream(
messages=ctx.to_messages(),
@@ -97,6 +104,16 @@ class BaseAgent:
):
if chunk.delta:
full_text += chunk.delta
# Acumular por block_index para reconstruir blocks.
idx = chunk.block_index
if idx < 0:
idx = synthetic_text_idx
blk = turn_blocks_by_index.get(idx)
if blk is None:
blk = {"type": "text", "text": ""}
turn_blocks_by_index[idx] = blk
if blk.get("type") == "text":
blk["text"] = blk.get("text", "") + chunk.delta
if self.profile.stream_deltas:
await self.sse.emit(
EventType.AGENT_DELTA,
@@ -108,12 +125,31 @@ class BaseAgent:
session_id=session.session_id,
)
# Thinking deltas (MiniMax M2 interleaved). El adapter ya viene
# con block_index correcto; solo acumulamos.
if chunk.thinking_delta and chunk.block_index >= 0:
blk = turn_blocks_by_index.get(chunk.block_index)
if blk is None:
blk = {"type": "thinking", "thinking": "", "signature": ""}
turn_blocks_by_index[chunk.block_index] = blk
if blk.get("type") == "thinking":
blk["thinking"] = blk.get("thinking", "") + chunk.thinking_delta
if chunk.thinking_signature and chunk.block_index >= 0:
blk = turn_blocks_by_index.get(chunk.block_index)
if blk is None:
blk = {"type": "thinking", "thinking": "", "signature": ""}
turn_blocks_by_index[chunk.block_index] = blk
if blk.get("type") == "thinking":
blk["signature"] = chunk.thinking_signature
if chunk.tool_name and chunk.tool_call_id:
if chunk.tool_call_id not in active_tools:
active_tools[chunk.tool_call_id] = {
"id": chunk.tool_call_id,
"name": chunk.tool_name,
"arguments": "",
"block_index": chunk.block_index,
}
await self.sse.emit(
EventType.TOOL_STARTED,
@@ -144,6 +180,7 @@ class BaseAgent:
"id": chunk.tool_call_id,
"name": chunk.tool_name or "",
"arguments": "",
"block_index": chunk.block_index,
}
final_args = tool["arguments"] or chunk.tool_arguments or ""
try:
@@ -168,6 +205,16 @@ class BaseAgent:
tool["parsed_arguments"] = args
tool_calls.append(tool)
# Registrar tool_use block en su posicion del turno.
bidx = tool.get("block_index", -1)
if bidx >= 0:
turn_blocks_by_index[bidx] = {
"type": "tool_use",
"id": tool["id"],
"name": tool["name"],
"input": args,
}
# Accumulate token usage from any chunk that has it
if chunk.usage:
total_input_tokens += chunk.usage.get("input_tokens", 0)
@@ -178,39 +225,80 @@ class BaseAgent:
accumulated_content += full_text
# Materializar blocks del turno en orden por block_index.
# Filtra thinking blocks sin signature: MiniMax los rechazaria al
# reenviarlos. Mejor descartar el thinking entero que mandar uno
# corrupto y ver un 400.
turn_blocks: list[dict[str, Any]] = []
for idx in sorted(turn_blocks_by_index.keys()):
b = turn_blocks_by_index[idx]
if b.get("type") == "thinking":
if not b.get("signature"):
logger.warning(
"Drop thinking block at idx=%d (no signature) — chars=%d",
idx, len(b.get("thinking", "")),
)
continue
# Limpiar texto vacio defensivo.
if not b.get("thinking"):
continue
turn_blocks.append(b)
# Backstop: garantizar que CADA tool_call tenga su tool_use block
# en turn_blocks. Si no lo tiene (chunks sin block_index, adapter
# legacy, etc.), apendearlo al final. Sin esto, MiniMax devuelve
# 400 ("tool result's tool id not found") en el siguiente request.
tool_use_ids_in_blocks = {
b.get("id") for b in turn_blocks
if b.get("type") == "tool_use" and b.get("id")
}
for tc in tool_calls:
if tc["id"] not in tool_use_ids_in_blocks:
turn_blocks.append({
"type": "tool_use",
"id": tc["id"],
"name": tc["name"],
"input": tc.get("parsed_arguments", {}),
})
tool_use_ids_in_blocks.add(tc["id"])
# If no tool calls, we're done
if not tool_calls:
# Add final assistant message to conversation
if full_text:
if turn_blocks:
conversation.append({"role": "assistant", "content": turn_blocks})
elif full_text:
# Fallback (no debiera ocurrir si el adapter emite block_index).
conversation.append({"role": "assistant", "content": full_text})
break
# Add assistant message with tool calls to conversation
# (OpenAI format: assistant message carries tool_calls)
assistant_msg: dict[str, Any] = {"role": "assistant"}
if full_text:
assistant_msg["content"] = full_text
assistant_msg["tool_calls"] = [
{
"id": tc["id"],
"type": "function",
"function": {
"name": tc["name"],
"arguments": json.dumps(tc.get("parsed_arguments", {})),
},
}
for tc in tool_calls
]
conversation.append(assistant_msg)
# Push del assistant turn con TODOS los blocks (thinking+text+tool_use).
# Esto preserva la cadena de razonamiento de M2 entre turnos.
if turn_blocks:
conversation.append({"role": "assistant", "content": turn_blocks})
else:
# Fallback OpenAI-style si no hay blocks (modelo legacy o sin
# block_index). Mantenemos compat con OpenAIAdapter / cualquier
# adapter que no propague block_index.
assistant_msg: dict[str, Any] = {"role": "assistant"}
if full_text:
assistant_msg["content"] = full_text
assistant_msg["tool_calls"] = [
{
"id": tc["id"],
"type": "function",
"function": {
"name": tc["name"],
"arguments": json.dumps(tc.get("parsed_arguments", {})),
},
}
for tc in tool_calls
]
conversation.append(assistant_msg)
# Execute tool calls and add COMPLETE results to conversation.
# Antes habia dos capas anti-duplicado: (a) cachear resultado y
# devolver "[DUPLICADO]" en lugar de re-ejecutar y (b) cortar el
# step si TODAS las llamadas del paso eran duplicadas. Las quitamos
# porque en conversaciones largas el agente puede LEGITIMAMENTE
# repetir una llamada (p.ej. re-leer un fichero tras editarlo) y
# las heuristicas bloqueaban acciones validas. El usuario prefiere
# libertad — runaway loops se mitigan con limit de steps externo.
# Execute tool calls. Los results se agrupan en UN solo user message
# con array de tool_result blocks (formato Anthropic). Anteriormente
# se hacian N appends `{"role":"tool",...}` en formato OpenAI.
tool_result_blocks: list[dict[str, Any]] = []
for tc in tool_calls:
# Si los args no se pudieron parsear (p.ej. truncados por max_tokens),
# NO ejecutamos la tool. En su lugar devolvemos un mensaje al modelo
@@ -218,9 +306,9 @@ class BaseAgent:
# (dividir el contenido, acortar, etc.).
if tc.get("parse_error"):
pe = tc["parse_error"]
conversation.append({
"role": "tool",
"tool_call_id": tc["id"],
tool_result_blocks.append({
"type": "tool_result",
"tool_use_id": tc["id"],
"content": (
f"[ERROR] No se pudieron parsear los argumentos del tool "
f"'{tc['name']}'. Los argumentos llegaron truncados o mal "
@@ -230,6 +318,7 @@ class BaseAgent:
f"Reintenta dividiendo el contenido en varios tool calls mas "
f"pequenos o reduciendo el tamano del argumento 'content'."
),
"is_error": True,
})
continue
@@ -242,10 +331,9 @@ class BaseAgent:
)
tool_executions.append(tool_exec)
# COMPLETE result in conversation (truncated to safe limit)
conversation.append({
"role": "tool",
"tool_call_id": tc["id"],
tool_result_blocks.append({
"type": "tool_result",
"tool_use_id": tc["id"],
"content": (
tool_exec.raw_output[:settings.tool_raw_output_max_chars]
if tool_exec.raw_output
@@ -253,6 +341,9 @@ class BaseAgent:
),
})
if tool_result_blocks:
conversation.append({"role": "user", "content": tool_result_blocks})
return {
"content": accumulated_content,
"artifacts": artifacts,
@@ -285,9 +376,20 @@ class BaseAgent:
start = time.monotonic()
try:
if self.mcp.is_running and tool_name in self.mcp.tools:
result = await self.mcp.call_tool(tool_name, arguments)
raw_output = self._extract_mcp_output(result)
if self.mcp.is_running:
# Intentar llamada directa: call_tool tiene fallback bare-name
# via _resolve_tool, asi que aunque venga sin prefijo
# `acai_code__` (caso comun cuando el modelo emite XML inline)
# se resuelve solo. El check `tool_name in self.mcp.tools` que
# haciamos antes era demasiado estricto y rechazaba bare names.
try:
result = await self.mcp.call_tool(tool_name, arguments)
raw_output = self._extract_mcp_output(result)
except Exception as resolve_err:
raw_output = (
f"Tool '{tool_name}' no disponible o fallo al resolver: "
f"{str(resolve_err)[:200]}"
)
else:
raw_output = f"Tool '{tool_name}' not available via MCP."

View File

@@ -253,7 +253,10 @@ class OrchestratorEngine:
sanitized: dict[str, Any] = {"role": role}
content = message.get("content", "")
if isinstance(content, str) and content:
# Anthropic-style content list (interleaved thinking) → preservar tal cual.
if isinstance(content, list) and content:
sanitized["content"] = content
elif isinstance(content, str) and content:
sanitized["content"] = content
if role == "assistant":