Files
agenticSystem/src/orchestrator/agents/base.py
Jordan Diaz 43337e8554 Hardening: lock de sesion atomico, monitor off por defecto, fix DeepSeek reasoning-only
- session_lock: token uuid + compare-and-delete (Lua), TTL > timeout de
  ejecucion; abort solo limpia el lock tras cancelacion confirmada.
  Evita doble ejecucion concurrente sobre la misma sesion.
- monitor HTTP (puerto 4545) deshabilitado salvo MCP_MONITOR_ENABLED=true
  y atado a 127.0.0.1; no se acumula historial en memoria si esta off.
- DeepSeek/LiteLLM: turnos que llegan solo con reasoning_content (sin
  content ni tool_calls) ya no rompen la sesion (400 'Invalid assistant
  message') ni se pintan como 'pensando': se promueven a texto en el
  historial y en el snapshot persistido.
- litellm pinneado a ==1.80.0 (builds reproducibles).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 15:17:52 +00:00

1089 lines
48 KiB
Python

"""Base subagent class with shared execution logic."""
from __future__ import annotations
import hashlib
import json
import logging
import time
import uuid
from typing import Any, AsyncIterator
from ...adapters.base import ModelAdapter, ModelConfig, StreamChunk
from ...config import settings
from ...context.engine import ContextEngine
from ...mcp.manager import MCPManager
from ...memory.store import MemoryStore
from ...models.agent import AgentProfile
from ...models.artifacts import ArtifactSummary
from ...models.session import SessionState
from ...models.tools import ToolExecution, ToolExecutionStatus
from ...streaming.sse import SSEEmitter, EventType
from ..planner import run_planner_subloop
from ..plan_judge import judge_plan_progress
from ..tool_groups import is_plan_internal_tool, strip_namespace
logger = logging.getLogger(__name__)
class BaseAgent:
"""Base class for all subagents."""
def __init__(
self,
profile: AgentProfile,
model_adapter: ModelAdapter,
context_engine: ContextEngine,
mcp_client: MCPManager,
memory_store: MemoryStore,
sse_emitter: SSEEmitter,
) -> None:
self.profile = profile
self.model = model_adapter
self.context = context_engine
self.mcp = mcp_client
self.memory = memory_store
self.sse = sse_emitter
async def execute(
self,
session: SessionState,
max_steps: int = 30,
) -> dict[str, Any]:
"""Run the agent's execution loop.
Uses real conversation messages with complete tool results,
like professional agentic tools (Claude Code, Cursor).
Compaction happens at the step level, not per tool result.
Returns a result dict with keys: content, artifacts, tool_executions.
"""
artifacts: list[ArtifactSummary] = await self.memory.list_artifacts(
session.session_id
)
tool_executions: list[ToolExecution] = []
accumulated_content = ""
total_input_tokens = 0
total_output_tokens = 0
# Real conversation history: assistant messages + tool results
conversation: list[dict[str, Any]] = []
# Expuesta para que las tools internas (acai_plan) puedan resumir
# el thinking acumulado del agente principal sin que tengamos que
# pasarlo explicitamente por cada llamada a `_execute_tool`.
self._current_conversation = conversation
for step in range(max_steps):
# Build context with real conversation
ctx = await self.context.build_context(
session=session,
agent=self.profile,
artifacts=artifacts,
conversation=conversation,
)
# Prepare tool definitions. plan_mode "off" oculta acai_plan al
# modelo (toggle del UI desactivado). "force" la expone normalmente.
tool_defs = self._get_allowed_tools(
followup_mode=str(session.metadata.get("followup_mode", "none")),
plan_mode=str(session.metadata.get("plan_mode", "off") or "off"),
)
# Stream model response
config = ModelConfig(
model_id=self.profile.model_id or "",
max_tokens=self.profile.max_tokens or 4096,
temperature=self.profile.temperature or 0.3,
)
# Snapshot del numero de tool_executions ya acumulados ANTES del
# step. El judge solo necesita las del step actual; el slice
# `tool_executions[exec_offset:]` da exactamente ese delta.
exec_offset = len(tool_executions)
full_text = ""
tool_calls: list[dict[str, Any]] = []
active_tools: dict[str, dict[str, Any]] = {}
# Acumuladores Anthropic-style por turno (interleaved thinking M2).
# Por cada block_index guardamos un dict block parcial. Al cerrar el
# turno, lo serializamos en orden.
turn_blocks_by_index: dict[int, dict[str, Any]] = {}
# Cuando text_delta llega sin block_index (p.ej. via OpenAI adapter
# legacy), asignamos un sintetico para no perder el texto.
synthetic_text_idx = 10_000
async for chunk in self.model.stream(
messages=ctx.to_messages(),
tools=tool_defs if tool_defs else None,
config=config,
):
if chunk.delta:
full_text += chunk.delta
# Acumular por block_index para reconstruir blocks.
idx = chunk.block_index
if idx < 0:
idx = synthetic_text_idx
blk = turn_blocks_by_index.get(idx)
if blk is None:
blk = {"type": "text", "text": ""}
turn_blocks_by_index[idx] = blk
if blk.get("type") == "text":
blk["text"] = blk.get("text", "") + chunk.delta
if self.profile.stream_deltas:
await self.sse.emit(
EventType.AGENT_DELTA,
{
"agent": self.profile.role,
"delta": chunk.delta,
"step": step,
},
session_id=session.session_id,
)
# Thinking deltas (MiniMax M2 interleaved). El adapter ya viene
# con block_index correcto; solo acumulamos.
if chunk.thinking_delta and chunk.block_index >= 0:
blk = turn_blocks_by_index.get(chunk.block_index)
if blk is None:
blk = {"type": "thinking", "thinking": "", "signature": ""}
turn_blocks_by_index[chunk.block_index] = blk
if blk.get("type") == "thinking":
blk["thinking"] = blk.get("thinking", "") + chunk.thinking_delta
if self.profile.stream_deltas:
await self.sse.emit(
EventType.AGENT_DELTA,
{
"agent": self.profile.role,
"thinking_delta": chunk.thinking_delta,
"block_index": chunk.block_index,
"step": step,
},
session_id=session.session_id,
)
if chunk.thinking_signature and chunk.block_index >= 0:
blk = turn_blocks_by_index.get(chunk.block_index)
if blk is None:
blk = {"type": "thinking", "thinking": "", "signature": ""}
turn_blocks_by_index[chunk.block_index] = blk
if blk.get("type") == "thinking":
blk["signature"] = chunk.thinking_signature
if chunk.tool_name and chunk.tool_call_id:
if chunk.tool_call_id not in active_tools:
active_tools[chunk.tool_call_id] = {
"id": chunk.tool_call_id,
"name": chunk.tool_name,
"arguments": "",
"block_index": chunk.block_index,
}
await self.sse.emit(
EventType.TOOL_STARTED,
{"tool": chunk.tool_name, "tool_call_id": chunk.tool_call_id, "step": step},
session_id=session.session_id,
)
if chunk.tool_arguments and chunk.tool_call_id and not chunk.finish_reason:
tool = active_tools.get(chunk.tool_call_id)
if tool:
tool["arguments"] += chunk.tool_arguments
await self.sse.emit(
EventType.AGENT_DELTA,
{
"agent": self.profile.role,
"delta": "",
"tool_arguments": chunk.tool_arguments,
"tool_call_id": chunk.tool_call_id,
"step": step,
},
session_id=session.session_id,
)
if chunk.finish_reason == "tool_use" and chunk.tool_call_id:
tool = active_tools.pop(chunk.tool_call_id, None)
if not tool:
tool = {
"id": chunk.tool_call_id,
"name": chunk.tool_name or "",
"arguments": "",
"block_index": chunk.block_index,
}
final_args = tool["arguments"] or chunk.tool_arguments or ""
try:
args = json.loads(final_args) if final_args else {}
tool["parse_error"] = None
except json.JSONDecodeError as e:
# Args truncados o malformados — causa tipica: el modelo
# excedio max_tokens a mitad de la serializacion JSON
# del tool_use (ej. escribiendo un fichero grande).
logger.warning(
"Failed to parse tool args for %s (%d chars): %s... | err: %s",
tool.get("name", "?"), len(final_args), final_args[:200], str(e)[:100],
)
args = {}
# Guardamos el raw para poder generar un fingerprint distinto
# al de otros fallos y un mensaje util para el modelo.
tool["parse_error"] = {
"raw": final_args,
"raw_hash": hashlib.md5(final_args.encode()).hexdigest()[:8],
"message": str(e)[:200],
}
tool["parsed_arguments"] = args
tool_calls.append(tool)
# Registrar tool_use block en su posicion del turno.
bidx = tool.get("block_index", -1)
if bidx >= 0:
turn_blocks_by_index[bidx] = {
"type": "tool_use",
"id": tool["id"],
"name": tool["name"],
"input": args,
}
# Accumulate token usage from any chunk that has it
if chunk.usage:
total_input_tokens += chunk.usage.get("input_tokens", 0)
total_output_tokens += chunk.usage.get("output_tokens", 0)
if chunk.finish_reason == "end_turn":
break
accumulated_content += full_text
# Materializar blocks del turno en orden por block_index.
# Filtra thinking blocks sin signature: MiniMax los rechazaria al
# reenviarlos. Mejor descartar el thinking entero que mandar uno
# corrupto y ver un 400.
turn_blocks: list[dict[str, Any]] = []
for idx in sorted(turn_blocks_by_index.keys()):
b = turn_blocks_by_index[idx]
if b.get("type") == "thinking":
if not b.get("signature"):
logger.warning(
"Drop thinking block at idx=%d (no signature) — chars=%d",
idx, len(b.get("thinking", "")),
)
continue
# Limpiar texto vacio defensivo.
if not b.get("thinking"):
continue
turn_blocks.append(b)
# Backstop: garantizar que CADA tool_call tenga su tool_use block
# en turn_blocks. Si no lo tiene (chunks sin block_index, adapter
# legacy, etc.), apendearlo al final. Sin esto, MiniMax devuelve
# 400 ("tool result's tool id not found") en el siguiente request.
tool_use_ids_in_blocks = {
b.get("id") for b in turn_blocks
if b.get("type") == "tool_use" and b.get("id")
}
for tc in tool_calls:
if tc["id"] not in tool_use_ids_in_blocks:
turn_blocks.append({
"type": "tool_use",
"id": tc["id"],
"name": tc["name"],
"input": tc.get("parsed_arguments", {}),
})
tool_use_ids_in_blocks.add(tc["id"])
# If no tool calls, we're done
if not tool_calls:
# Quirk DeepSeek thinking: a veces el modelo emite TODA su
# respuesta como reasoning y cierra el turno sin text ni
# tool_use. Si el turno termina SOLO con bloques thinking,
# promovemos el thinking a un bloque text en el snapshot que
# se persiste — asi el UI no lo muestra como "pensando" al
# recargar y el siguiente turno no rompe con
# "content or tool_calls must be set".
if turn_blocks and all(b.get("type") == "thinking" for b in turn_blocks):
promoted = "\n".join(
b.get("thinking", "") for b in turn_blocks if b.get("thinking")
)
turn_blocks = [{"type": "text", "text": promoted}]
accumulated_content += promoted
if promoted and self.profile.stream_deltas:
# Emision en vivo via AGENT_DELTA normal: el
# ClaudeFormatEmitter cierra el thinking block abierto
# (content_block_stop) y abre un text block nuevo con
# su propio indice (start/delta/stop), asi que el
# protocolo de bloques no se rompe.
await self.sse.emit(
EventType.AGENT_DELTA,
{
"agent": self.profile.role,
"delta": promoted,
"step": step,
},
session_id=session.session_id,
)
if turn_blocks:
conversation.append({"role": "assistant", "content": turn_blocks})
elif full_text:
# Fallback (no debiera ocurrir si el adapter emite block_index).
conversation.append({"role": "assistant", "content": full_text})
# El agente termino sin mas tool calls: cerramos el plan si
# estaba activo. El judge no se llama (no hay tools que evaluar);
# el flag `no_tool_calls_this_step=True` marca todos los pendientes
# como completados.
try:
await self._auto_advance_plan_cursor(
session,
[],
no_tool_calls_this_step=True,
)
except Exception as e:
logger.warning("[plan-advance] failed at end_turn: %s", e)
break
# Push del assistant turn con TODOS los blocks (thinking+text+tool_use).
# Esto preserva la cadena de razonamiento de M2 entre turnos.
if turn_blocks:
conversation.append({"role": "assistant", "content": turn_blocks})
else:
# Fallback OpenAI-style si no hay blocks (modelo legacy o sin
# block_index). Mantenemos compat con OpenAIAdapter / cualquier
# adapter que no propague block_index.
assistant_msg: dict[str, Any] = {"role": "assistant"}
if full_text:
assistant_msg["content"] = full_text
assistant_msg["tool_calls"] = [
{
"id": tc["id"],
"type": "function",
"function": {
"name": tc["name"],
"arguments": json.dumps(tc.get("parsed_arguments", {})),
},
}
for tc in tool_calls
]
conversation.append(assistant_msg)
# Execute tool calls. Los results se agrupan en UN solo user message
# con array de tool_result blocks (formato Anthropic). Anteriormente
# se hacian N appends `{"role":"tool",...}` en formato OpenAI.
tool_result_blocks: list[dict[str, Any]] = []
for tc in tool_calls:
# Si los args no se pudieron parsear (p.ej. truncados por max_tokens),
# NO ejecutamos la tool. En su lugar devolvemos un mensaje al modelo
# explicando el problema para que pueda ajustar el siguiente intento
# (dividir el contenido, acortar, etc.).
if tc.get("parse_error"):
pe = tc["parse_error"]
tool_result_blocks.append({
"type": "tool_result",
"tool_use_id": tc["id"],
"content": (
f"[ERROR] No se pudieron parsear los argumentos del tool "
f"'{tc['name']}'. Los argumentos llegaron truncados o mal "
f"formados (probablemente excediste el limite de max_tokens "
f"al serializar el tool_use). Recibido {len(pe['raw'])} chars. "
f"Error: {pe['message']}. "
f"Reintenta dividiendo el contenido en varios tool calls mas "
f"pequenos o reduciendo el tamano del argumento 'content'."
),
"is_error": True,
})
continue
tool_exec = await self._execute_tool(
session=session,
tool_name=tc["name"],
arguments=tc.get("parsed_arguments", {}),
artifacts=artifacts,
tool_call_id=tc["id"],
)
tool_executions.append(tool_exec)
tool_result_blocks.append({
"type": "tool_result",
"tool_use_id": tc["id"],
"content": (
tool_exec.raw_output[:settings.tool_raw_output_max_chars]
if tool_exec.raw_output
else tool_exec.result_summary
),
})
if tool_result_blocks:
conversation.append({"role": "user", "content": tool_result_blocks})
# Auto-avance del cursor del plan TRAS CADA STEP INTERNO (no solo
# al final del turno). Asi el frontend ve los `✓` aparecer en vivo
# conforme el agente ejecuta tools, no de golpe al final.
try:
await self._auto_advance_plan_cursor(
session,
tool_executions[exec_offset:],
)
except Exception as e:
logger.warning("Auto-advance plan cursor failed: %s", e)
return {
"content": accumulated_content,
"artifacts": artifacts,
"tool_executions": tool_executions,
"conversation": conversation,
"usage": {
"input_tokens": total_input_tokens,
"output_tokens": total_output_tokens,
},
}
async def _execute_tool(
self,
session: SessionState,
tool_name: str,
arguments: dict[str, Any],
artifacts: list[ArtifactSummary],
tool_call_id: str = "",
) -> ToolExecution:
"""Execute a tool and summarise the result."""
exec_id = uuid.uuid4().hex[:12]
tool_exec = ToolExecution(
execution_id=exec_id,
tool_name=tool_name,
arguments=arguments,
status=ToolExecutionStatus.RUNNING,
)
logger.info("Tool call: %s(%s)", tool_name, json.dumps(arguments)[:200])
# Intercepcion: tools internas del orquestador (Fase 5: acai_plan).
# No atraviesan MCP — se ejecutan en Python directamente.
if is_plan_internal_tool(tool_name):
raw_name = strip_namespace(tool_name)
await self.sse.emit(
EventType.TOOL_STARTED,
{"tool": raw_name, "tool_call_id": tool_call_id},
session_id=session.session_id,
)
if raw_name == "acai_plan":
return await self._execute_acai_plan(session, arguments, tool_call_id, tool_exec)
if raw_name == "acai_plan_advance":
return await self._execute_acai_plan_advance(session, arguments, tool_call_id, tool_exec)
start = time.monotonic()
try:
if self.mcp.is_running:
# Intentar llamada directa: call_tool tiene fallback bare-name
# via _resolve_tool, asi que aunque venga sin prefijo
# `acai_code__` (caso comun cuando el modelo emite XML inline)
# se resuelve solo. El check `tool_name in self.mcp.tools` que
# haciamos antes era demasiado estricto y rechazaba bare names.
try:
result = await self.mcp.call_tool(tool_name, arguments)
raw_output = self._extract_mcp_output(result)
except Exception as resolve_err:
raw_output = (
f"Tool '{tool_name}' no disponible o fallo al resolver: "
f"{str(resolve_err)[:200]}"
)
else:
raw_output = f"Tool '{tool_name}' not available via MCP."
duration = (time.monotonic() - start) * 1000
# Summarise — raw output NEVER enters context
task_id = session.current_task.task_id if session.current_task else "none"
artifact = self.context.summarize_tool_output(
tool_name=tool_name,
raw_output=raw_output,
session_id=session.session_id,
task_id=task_id,
)
# Store artifact
await self.memory.store_artifact(session.session_id, artifact)
artifacts.append(artifact)
tool_exec.status = ToolExecutionStatus.COMPLETED
tool_exec.result_summary = artifact.summary
tool_exec.raw_output = raw_output[:settings.tool_raw_output_max_chars]
tool_exec.duration_ms = duration
await self.sse.emit(
EventType.TOOL_COMPLETED,
{
"tool": tool_name,
"status": "completed",
"summary": artifact.summary[:200],
"raw_output": raw_output[:min(4000, settings.tool_raw_output_max_chars)],
"tool_call_id": tool_call_id,
},
session_id=session.session_id,
)
except Exception as e:
tool_exec.status = ToolExecutionStatus.FAILED
tool_exec.error = str(e)
tool_exec.duration_ms = (time.monotonic() - start) * 1000
logger.error("Tool execution failed: %s%s", tool_name, e)
await self.sse.emit(
EventType.TOOL_COMPLETED,
{"tool": tool_name, "status": "failed", "error": str(e), "tool_call_id": tool_call_id},
session_id=session.session_id,
)
return tool_exec
# ---- Tools internas del orquestador (Fase 5) -----------------------------
@staticmethod
def _summarize_parent_thinking(conversation: list[dict[str, Any]], max_chars: int = 1200) -> str:
"""Resumen del thinking acumulado del agente principal hasta este turno.
Recorre los assistants Anthropic-style con content blocks `type=thinking`,
junta los textos y trunca a `max_chars`. Se usa para pasar contexto
comprimido al planner sub-loop sin contaminarlo con el thinking entero.
"""
chunks: list[str] = []
total = 0
for msg in reversed(conversation):
if msg.get("role") != "assistant":
continue
content = msg.get("content")
if not isinstance(content, list):
continue
for block in content:
if isinstance(block, dict) and block.get("type") == "thinking":
txt = block.get("thinking", "") or ""
if not txt:
continue
chunks.append(txt)
total += len(txt)
if total >= max_chars:
break
if total >= max_chars:
break
# Concatenamos del mas viejo al mas reciente para mantener orden logico.
joined = "\n---\n".join(reversed(chunks))
if len(joined) > max_chars:
joined = "[...] " + joined[-max_chars:]
return joined
async def _execute_acai_plan(
self,
session: SessionState,
arguments: dict[str, Any],
tool_call_id: str,
tool_exec: ToolExecution,
) -> ToolExecution:
"""Implementacion de la tool sintetica `acai_plan`.
Lanza un sub-loop con `system.planner.md` y solo tools de lectura.
Persiste el plan resultante en `session.metadata["current_plan"]`.
"""
# Limite de invocaciones por turno: maximo 2. Tras eso, el modelo debe
# ejecutar directo o abandonar.
count = int(session.metadata.get("plan_call_count_in_turn", 0))
if count >= 2:
tool_exec.status = ToolExecutionStatus.COMPLETED
tool_exec.result_summary = (
"Ya invocaste acai_plan dos veces este turno. "
"Ejecuta directo o usa acai_plan_advance({abandon:true}) para resetear."
)
tool_exec.raw_output = json.dumps({"error": "max_plan_calls_per_turn"})
await self.sse.emit(
EventType.TOOL_COMPLETED,
{"tool": "acai_plan", "status": "completed", "summary": tool_exec.result_summary, "tool_call_id": tool_call_id},
session_id=session.session_id,
)
return tool_exec
session.metadata["plan_call_count_in_turn"] = count + 1
objective = str(arguments.get("objective") or "").strip()
scope = str(arguments.get("scope") or "").strip()
if not objective:
tool_exec.status = ToolExecutionStatus.FAILED
tool_exec.error = "Falta el campo 'objective'"
tool_exec.result_summary = "acai_plan FALLO: falta objective."
tool_exec.raw_output = json.dumps({"error": "missing_objective"})
await self.sse.emit(
EventType.TOOL_COMPLETED,
{"tool": "acai_plan", "status": "failed", "error": tool_exec.error, "tool_call_id": tool_call_id},
session_id=session.session_id,
)
return tool_exec
# Resumen del thinking acumulado en el turno actual (si lo hay).
# `self._current_conversation` se setea al inicio de execute() — ver mas abajo.
parent_summary = self._summarize_parent_thinking(
getattr(self, "_current_conversation", []) or [],
)
start = time.monotonic()
try:
result = await run_planner_subloop(
objective=objective,
scope=scope,
agent_profile=self.profile,
model_adapter=self.model,
mcp=self.mcp,
parent_thinking_summary=parent_summary,
)
except Exception as e:
logger.error("Planner sub-loop crashed: %s", e)
tool_exec.status = ToolExecutionStatus.FAILED
tool_exec.error = str(e)
tool_exec.duration_ms = (time.monotonic() - start) * 1000
tool_exec.result_summary = f"acai_plan FALLO: {str(e)[:200]}"
tool_exec.raw_output = json.dumps({"error": str(e)[:500]})
await self.sse.emit(
EventType.TOOL_COMPLETED,
{"tool": "acai_plan", "status": "failed", "error": str(e), "tool_call_id": tool_call_id},
session_id=session.session_id,
)
return tool_exec
tool_exec.duration_ms = (time.monotonic() - start) * 1000
if not result.plan:
err = result.error or "Plan vacio"
logger.warning(
"[acai_plan] Plan FAILED: %s (raw_preview=%r)",
err, (result.raw_text or "")[:200],
)
tool_exec.status = ToolExecutionStatus.FAILED
tool_exec.error = err
tool_exec.result_summary = (
f"acai_plan FALLO: {err}. Procede en modo directo o reintenta con scope distinto."
)
tool_exec.raw_output = json.dumps({
"error": err,
"raw_text_preview": (result.raw_text or "")[:500],
})
await self.sse.emit(
EventType.TOOL_COMPLETED,
{"tool": "acai_plan", "status": "failed", "error": err, "tool_call_id": tool_call_id},
session_id=session.session_id,
)
return tool_exec
# Plan valido: persistir en metadata. Si habia un plan activo previo,
# moverlo a history como `superseded`.
old_plan = session.metadata.get("current_plan")
if old_plan and old_plan.get("status") == "active":
old_plan["status"] = "superseded"
session.metadata.setdefault("plan_history", []).append(old_plan)
plan = dict(result.plan)
plan["cursor"] = 0
plan["completed_step_ids"] = []
plan["status"] = "active"
plan["created_at"] = int(time.time())
session.metadata["current_plan"] = plan
steps = plan.get("steps") or []
next_desc = steps[0]["description"] if steps else "(plan vacio)"
n_steps = len(steps)
n_risks = len(plan.get("risks") or [])
tool_exec.status = ToolExecutionStatus.COMPLETED
tool_exec.result_summary = (
f"Plan generado: {n_steps} step(s), {n_risks} risk(s). "
f"Proximo: step 1 — {next_desc[:200]}"
)
logger.info(
"[acai_plan] Plan persisted: %d steps, %d risks, objective=%r",
n_steps, n_risks, objective[:120],
)
# raw_output al modelo: el JSON completo del plan (truncado a 4000 chars).
plan_json = json.dumps(plan, ensure_ascii=False)
if len(plan_json) > 4000:
tool_exec.raw_output = plan_json[:4000] + "\n[...truncated]"
else:
tool_exec.raw_output = plan_json
await self.sse.emit(
EventType.TOOL_COMPLETED,
{
"tool": "acai_plan",
"status": "completed",
"summary": tool_exec.result_summary[:200],
"raw_output": tool_exec.raw_output[:4000],
"tool_call_id": tool_call_id,
},
session_id=session.session_id,
)
# PlanStepper UI: notifica al frontend que hay un plan nuevo activo.
await self.sse.emit(
EventType.PLAN_CREATED,
{
"objective": plan.get("objective", ""),
"steps": [
{
"id": s.get("id"),
"description": s.get("description", "")[:300],
"agent_action": s.get("agent_action", "")[:200],
"files_touched": s.get("files_touched", [])[:10],
"tables_touched": s.get("tables_touched", [])[:10],
}
for s in plan.get("steps", [])
],
"risks": plan.get("risks", [])[:10],
"cursor": plan.get("cursor", 0),
"completed_step_ids": plan.get("completed_step_ids", []),
"status": plan.get("status", "active"),
},
session_id=session.session_id,
)
return tool_exec
async def _execute_acai_plan_advance(
self,
session: SessionState,
arguments: dict[str, Any],
tool_call_id: str,
tool_exec: ToolExecution,
) -> ToolExecution:
"""Avanza/abandona el plan activo."""
plan = session.metadata.get("current_plan")
if not plan or plan.get("status") != "active":
tool_exec.status = ToolExecutionStatus.COMPLETED
tool_exec.result_summary = "No hay plan activo."
tool_exec.raw_output = json.dumps({"status": "no_active_plan"})
await self.sse.emit(
EventType.TOOL_COMPLETED,
{"tool": "acai_plan_advance", "status": "completed", "summary": tool_exec.result_summary, "tool_call_id": tool_call_id},
session_id=session.session_id,
)
return tool_exec
if arguments.get("abandon"):
plan["status"] = "abandoned"
session.metadata.setdefault("plan_history", []).append(plan)
session.metadata["current_plan"] = None
tool_exec.status = ToolExecutionStatus.COMPLETED
tool_exec.result_summary = "Plan abandonado."
tool_exec.raw_output = json.dumps({"status": "abandoned"})
await self.sse.emit(
EventType.TOOL_COMPLETED,
{"tool": "acai_plan_advance", "status": "completed", "summary": tool_exec.result_summary, "tool_call_id": tool_call_id},
session_id=session.session_id,
)
await self.sse.emit(
EventType.PLAN_ENDED,
{"status": "abandoned", "objective": plan.get("objective", "")},
session_id=session.session_id,
)
return tool_exec
# Aplicar completed_ids
completed_in = arguments.get("completed_ids") or []
completed_set = set(plan.get("completed_step_ids", []))
for cid in completed_in:
if isinstance(cid, int) and cid not in completed_set:
plan.setdefault("completed_step_ids", []).append(cid)
completed_set.add(cid)
# Aplicar cursor
steps = plan.get("steps") or []
if "next_cursor" in arguments:
plan["cursor"] = max(0, min(int(arguments["next_cursor"]), len(steps)))
else:
# Auto-avanzar al primer step no completado.
for i, st in enumerate(steps):
if st.get("id") not in completed_set:
plan["cursor"] = i
break
else:
plan["status"] = "done"
cursor = plan.get("cursor", 0)
if plan.get("status") == "done" or cursor >= len(steps):
tool_exec.result_summary = f"Plan completado ({len(completed_set)}/{len(steps)} steps)."
else:
next_desc = steps[cursor].get("description", "(?)") if cursor < len(steps) else "(?)"
tool_exec.result_summary = (
f"Plan avanzado a step {cursor + 1}/{len(steps)}: {next_desc[:200]}"
)
tool_exec.status = ToolExecutionStatus.COMPLETED
tool_exec.raw_output = json.dumps({
"cursor": plan.get("cursor", 0),
"completed_step_ids": plan.get("completed_step_ids", []),
"status": plan.get("status", "active"),
})
await self.sse.emit(
EventType.TOOL_COMPLETED,
{"tool": "acai_plan_advance", "status": "completed", "summary": tool_exec.result_summary, "tool_call_id": tool_call_id},
session_id=session.session_id,
)
# Emitir PLAN_ADVANCED o PLAN_ENDED segun el resultado.
if plan.get("status") == "done":
await self.sse.emit(
EventType.PLAN_ENDED,
{"status": "done", "objective": plan.get("objective", "")},
session_id=session.session_id,
)
else:
await self.sse.emit(
EventType.PLAN_ADVANCED,
{
"cursor": plan.get("cursor", 0),
"completed_step_ids": plan.get("completed_step_ids", []),
"status": plan.get("status", "active"),
},
session_id=session.session_id,
)
return tool_exec
@staticmethod
def _match_step_to_executions(
step: dict[str, Any],
tool_executions: list[ToolExecution],
) -> bool:
"""Heuristica: matchea step.agent_action con tool calls reales.
Marca el step como completado si alguna de las tools ejecutadas
coincide con el `agent_action` del step. Compara:
1) nombre de la tool (normalizando guion/underscore: `acai-write`
matchea con `acai_write`).
2) si action menciona algun `files_touched` y la tool ejecutada
tiene ese path en sus argumentos.
3) si action menciona algun `tables_touched` y la tool ejecutada
tiene ese tableName en sus argumentos.
"""
action = (step.get("agent_action") or "").lower()
files_touched = [str(f).lower() for f in (step.get("files_touched") or [])]
tables_touched = [str(t).lower() for t in (step.get("tables_touched") or [])]
if not action and not files_touched and not tables_touched:
return False
for te in tool_executions:
if te.status != ToolExecutionStatus.COMPLETED:
continue
raw_name = strip_namespace(te.tool_name).lower()
# Normaliza guiones/underscores para matching tool name <-> action.
tool_variants = {raw_name, raw_name.replace("-", "_"), raw_name.replace("_", "-")}
# Match 1: nombre de la tool aparece en action
if any(v and v in action for v in tool_variants):
return True
# Match 2/3: path o tableName en los args de la tool
try:
args_str = json.dumps(te.arguments or {}, ensure_ascii=False).lower()
except Exception:
args_str = str(te.arguments or "").lower()
for f in files_touched:
if f and f in args_str:
return True
for t in tables_touched:
if t and t in args_str:
return True
return False
async def _auto_advance_plan_cursor(
self,
session: SessionState,
tool_executions_this_step: list[ToolExecution],
no_tool_calls_this_step: bool = False,
) -> None:
"""Avanza el cursor del plan tras un step interno del agente.
Usa LLM-as-judge (`plan_judge.judge_plan_progress`) para decidir que
steps del plan se acaban de completar con las tool_executions del step
actual. Mas robusto que el matching string heuristico anterior.
Si `no_tool_calls_this_step=True` y hay un plan active, marcamos el plan
como `done` — el agente decidio terminar (end_turn) sin mas tools, asi
que confiamos en su criterio. Esto cierra el plan visualmente cuando el
agente acaba.
"""
plan = session.metadata.get("current_plan")
if not plan or plan.get("status") != "active":
return
steps = plan.get("steps") or []
prev_cursor = int(plan.get("cursor", 0))
prev_completed = list(plan.get("completed_step_ids", []))
completed_set = set(prev_completed)
rationale = ""
# Si el agente termino el turn sin tools, NO marcamos los pendientes
# como completados — seria un falso positivo (caso real: agente se
# queda atascado y devuelve mensaje de chat sin haber hecho la tarea).
# Solo si el `completed_set` previo ya cubre todos los steps cerramos
# como done; si quedan pendientes, dejamos `active`.
if no_tool_calls_this_step:
if steps and len(completed_set) >= len(steps):
rationale = "agente termino el turn; todos los steps ya completados"
else:
rationale = "agente termino el turn con steps pendientes (no cerrado)"
# No tocar completed_set: respetamos lo que el judge dijo en steps previos
elif tool_executions_this_step:
# Pregunta al judge que steps acaba de completar.
try:
completed_ids, judge_rationale = await judge_plan_progress(
plan=plan,
tool_executions_this_step=tool_executions_this_step,
model_adapter=self.model,
model_id=self.profile.model_id,
)
for cid in completed_ids:
completed_set.add(cid)
rationale = judge_rationale
except Exception as e:
logger.warning("[plan-judge] failed, no advance this step: %s", e)
# Sin judge, no avanzamos el cursor — preferimos dejar el plan
# como esta antes que falsos positivos heuristicos.
return
# Cursor: primer step NO completado. Si todos completados → done.
cursor = len(steps)
for i, step in enumerate(steps):
if step.get("id") not in completed_set:
cursor = i
break
plan["cursor"] = cursor
plan["completed_step_ids"] = sorted(completed_set)
ended = False
if cursor >= len(steps) and steps:
plan["status"] = "done"
ended = True
# Solo emitimos si hubo cambio real.
changed = cursor != prev_cursor or set(plan["completed_step_ids"]) != set(prev_completed)
logger.info(
"[plan-advance] tools_in_step=%d prev_cursor=%d new_cursor=%d completed=%s changed=%s rationale=%r",
len(tool_executions_this_step), prev_cursor, cursor,
plan["completed_step_ids"], changed, rationale[:160],
)
if not changed:
return
try:
if ended:
await self.sse.emit(
EventType.PLAN_ENDED,
{"status": "done", "objective": plan.get("objective", "")},
session_id=session.session_id,
)
else:
await self.sse.emit(
EventType.PLAN_ADVANCED,
{
"cursor": plan["cursor"],
"completed_step_ids": plan["completed_step_ids"],
"status": plan.get("status", "active"),
},
session_id=session.session_id,
)
except Exception as e:
logger.warning("PLAN_ADVANCED/ENDED emit failed: %s", e)
# ---- Allowed tools --------------------------------------------------------
def _get_allowed_tools(
self,
followup_mode: str = "none",
plan_mode: str = "force",
) -> list[dict[str, Any]]:
"""Return tool definitions filtered by this agent's allowed_tools.
Si el agente tiene `has_planner_tool=True` Y `plan_mode == "force"`,
anade definiciones sinteticas de `acai_plan` y `acai_plan_advance`
(la tool interna no atraviesa MCP — se intercepta en `_execute_tool`).
Cuando `plan_mode != "force"` (toggle del UI desactivado), las tools
del planner NO se exponen y el agente ejecuta directo.
"""
if followup_mode == "transform":
return []
if not self.mcp.is_running:
return []
all_tools = self.mcp.get_tool_definitions()
if self.profile.allowed_tools:
tool_defs = [t for t in all_tools if t["name"] in self.profile.allowed_tools]
else:
tool_defs = list(all_tools)
if self.profile.has_planner_tool and plan_mode == "force":
tool_defs.append({
"name": "acai_plan",
"description": (
"Genera un plan estructurado de ejecucion. Usa esta tool al recibir "
"una peticion compuesta (landing entera, tienda, refactor amplio, modulo "
"con tabla+hook+frontend). NO la uses para tareas triviales (cambiar un titulo, "
"ajustar un color, leer datos). Devuelve JSON con steps, risks, files_touched, "
"tables_touched."
),
"input_schema": {
"type": "object",
"required": ["objective"],
"properties": {
"objective": {
"type": "string",
"description": "Descripcion en español de lo que hay que conseguir.",
},
"scope": {
"type": "string",
"description": "Restricciones opcionales (ej. 'no toques el header').",
},
},
},
})
tool_defs.append({
"name": "acai_plan_advance",
"description": (
"Avanza/abandona el plan activo. Llama con `abandon: true` si el "
"usuario corrige y el plan ya no es valido, o con `next_cursor` para "
"saltar al siguiente step pendiente."
),
"input_schema": {
"type": "object",
"properties": {
"abandon": {"type": "boolean"},
"completed_ids": {"type": "array", "items": {"type": "integer"}},
"next_cursor": {"type": "integer"},
},
},
})
return tool_defs
@staticmethod
def _extract_mcp_output(result: dict[str, Any]) -> str:
"""Extract text content from MCP tool result.
El modelo (MiniMax M2.7) es text-only — los blocks `type=image` no
pueden reenviarse. En lugar de descartar silenciosamente (lo que dejaba
al agente con un tool_result vacio y le hacia repetir la llamada),
emitimos un placeholder explicito que le dice que use `browser_snapshot`
si quiere inspeccionar la pagina.
"""
content = result.get("content", [])
if isinstance(content, list):
parts: list[str] = []
image_count = 0
for item in content:
if not isinstance(item, dict):
continue
itype = item.get("type")
if itype == "text":
parts.append(item.get("text", ""))
elif itype == "image":
image_count += 1
if image_count and not parts:
return (
f"[{image_count} imagen(es) no procesada(s) — el modelo es "
f"text-only. Para inspeccionar la pagina usa "
f"`browser_snapshot` (devuelve accessibility tree en texto). "
f"`browser_take_screenshot` solo sirve para que el usuario "
f"vea la captura, no para tu analisis.]"
)
if image_count and parts:
parts.append(
f"\n[Adicionalmente {image_count} imagen(es) no incluida(s): "
f"el modelo no las procesa.]"
)
return "\n".join(parts) if parts else json.dumps(result)
return str(content)