This commit is contained in:
Jordan Diaz
2026-05-08 21:31:28 +00:00
parent 0dabba5442
commit 44cb956f95
37 changed files with 2120 additions and 251 deletions

View File

@@ -19,6 +19,9 @@ from ...models.artifacts import ArtifactSummary
from ...models.session import SessionState
from ...models.tools import ToolExecution, ToolExecutionStatus
from ...streaming.sse import SSEEmitter, EventType
from ..planner import run_planner_subloop
from ..plan_judge import judge_plan_progress
from ..tool_groups import is_plan_internal_tool, strip_namespace
logger = logging.getLogger(__name__)
@@ -64,6 +67,10 @@ class BaseAgent:
total_output_tokens = 0
# Real conversation history: assistant messages + tool results
conversation: list[dict[str, Any]] = []
# Expuesta para que las tools internas (acai_plan) puedan resumir
# el thinking acumulado del agente principal sin que tengamos que
# pasarlo explicitamente por cada llamada a `_execute_tool`.
self._current_conversation = conversation
for step in range(max_steps):
# Build context with real conversation
@@ -86,6 +93,11 @@ class BaseAgent:
temperature=self.profile.temperature or 0.3,
)
# Snapshot del numero de tool_executions ya acumulados ANTES del
# step. El judge solo necesita las del step actual; el slice
# `tool_executions[exec_offset:]` da exactamente ese delta.
exec_offset = len(tool_executions)
full_text = ""
tool_calls: list[dict[str, Any]] = []
active_tools: dict[str, dict[str, Any]] = {}
@@ -269,6 +281,18 @@ class BaseAgent:
elif full_text:
# Fallback (no debiera ocurrir si el adapter emite block_index).
conversation.append({"role": "assistant", "content": full_text})
# El agente termino sin mas tool calls: cerramos el plan si
# estaba activo. El judge no se llama (no hay tools que evaluar);
# el flag `no_tool_calls_this_step=True` marca todos los pendientes
# como completados.
try:
await self._auto_advance_plan_cursor(
session,
[],
no_tool_calls_this_step=True,
)
except Exception as e:
logger.warning("[plan-advance] failed at end_turn: %s", e)
break
# Push del assistant turn con TODOS los blocks (thinking+text+tool_use).
@@ -344,6 +368,17 @@ class BaseAgent:
if tool_result_blocks:
conversation.append({"role": "user", "content": tool_result_blocks})
# Auto-avance del cursor del plan TRAS CADA STEP INTERNO (no solo
# al final del turno). Asi el frontend ve los `✓` aparecer en vivo
# conforme el agente ejecuta tools, no de golpe al final.
try:
await self._auto_advance_plan_cursor(
session,
tool_executions[exec_offset:],
)
except Exception as e:
logger.warning("Auto-advance plan cursor failed: %s", e)
return {
"content": accumulated_content,
"artifacts": artifacts,
@@ -374,6 +409,20 @@ class BaseAgent:
logger.info("Tool call: %s(%s)", tool_name, json.dumps(arguments)[:200])
# Intercepcion: tools internas del orquestador (Fase 5: acai_plan).
# No atraviesan MCP — se ejecutan en Python directamente.
if is_plan_internal_tool(tool_name):
raw_name = strip_namespace(tool_name)
await self.sse.emit(
EventType.TOOL_STARTED,
{"tool": raw_name, "tool_call_id": tool_call_id},
session_id=session.session_id,
)
if raw_name == "acai_plan":
return await self._execute_acai_plan(session, arguments, tool_call_id, tool_exec)
if raw_name == "acai_plan_advance":
return await self._execute_acai_plan_advance(session, arguments, tool_call_id, tool_exec)
start = time.monotonic()
try:
if self.mcp.is_running:
@@ -439,25 +488,554 @@ class BaseAgent:
return tool_exec
# ---- Tools internas del orquestador (Fase 5) -----------------------------
@staticmethod
def _summarize_parent_thinking(conversation: list[dict[str, Any]], max_chars: int = 1200) -> str:
"""Resumen del thinking acumulado del agente principal hasta este turno.
Recorre los assistants Anthropic-style con content blocks `type=thinking`,
junta los textos y trunca a `max_chars`. Se usa para pasar contexto
comprimido al planner sub-loop sin contaminarlo con el thinking entero.
"""
chunks: list[str] = []
total = 0
for msg in reversed(conversation):
if msg.get("role") != "assistant":
continue
content = msg.get("content")
if not isinstance(content, list):
continue
for block in content:
if isinstance(block, dict) and block.get("type") == "thinking":
txt = block.get("thinking", "") or ""
if not txt:
continue
chunks.append(txt)
total += len(txt)
if total >= max_chars:
break
if total >= max_chars:
break
# Concatenamos del mas viejo al mas reciente para mantener orden logico.
joined = "\n---\n".join(reversed(chunks))
if len(joined) > max_chars:
joined = "[...] " + joined[-max_chars:]
return joined
async def _execute_acai_plan(
self,
session: SessionState,
arguments: dict[str, Any],
tool_call_id: str,
tool_exec: ToolExecution,
) -> ToolExecution:
"""Implementacion de la tool sintetica `acai_plan`.
Lanza un sub-loop con `system.planner.md` y solo tools de lectura.
Persiste el plan resultante en `session.metadata["current_plan"]`.
"""
# Limite de invocaciones por turno: maximo 2. Tras eso, el modelo debe
# ejecutar directo o abandonar.
count = int(session.metadata.get("plan_call_count_in_turn", 0))
if count >= 2:
tool_exec.status = ToolExecutionStatus.COMPLETED
tool_exec.result_summary = (
"Ya invocaste acai_plan dos veces este turno. "
"Ejecuta directo o usa acai_plan_advance({abandon:true}) para resetear."
)
tool_exec.raw_output = json.dumps({"error": "max_plan_calls_per_turn"})
await self.sse.emit(
EventType.TOOL_COMPLETED,
{"tool": "acai_plan", "status": "completed", "summary": tool_exec.result_summary, "tool_call_id": tool_call_id},
session_id=session.session_id,
)
return tool_exec
session.metadata["plan_call_count_in_turn"] = count + 1
objective = str(arguments.get("objective") or "").strip()
scope = str(arguments.get("scope") or "").strip()
if not objective:
tool_exec.status = ToolExecutionStatus.FAILED
tool_exec.error = "Falta el campo 'objective'"
tool_exec.result_summary = "acai_plan FALLO: falta objective."
tool_exec.raw_output = json.dumps({"error": "missing_objective"})
await self.sse.emit(
EventType.TOOL_COMPLETED,
{"tool": "acai_plan", "status": "failed", "error": tool_exec.error, "tool_call_id": tool_call_id},
session_id=session.session_id,
)
return tool_exec
# Resumen del thinking acumulado en el turno actual (si lo hay).
# `self._current_conversation` se setea al inicio de execute() — ver mas abajo.
parent_summary = self._summarize_parent_thinking(
getattr(self, "_current_conversation", []) or [],
)
start = time.monotonic()
try:
result = await run_planner_subloop(
objective=objective,
scope=scope,
agent_profile=self.profile,
model_adapter=self.model,
mcp=self.mcp,
parent_thinking_summary=parent_summary,
)
except Exception as e:
logger.error("Planner sub-loop crashed: %s", e)
tool_exec.status = ToolExecutionStatus.FAILED
tool_exec.error = str(e)
tool_exec.duration_ms = (time.monotonic() - start) * 1000
tool_exec.result_summary = f"acai_plan FALLO: {str(e)[:200]}"
tool_exec.raw_output = json.dumps({"error": str(e)[:500]})
await self.sse.emit(
EventType.TOOL_COMPLETED,
{"tool": "acai_plan", "status": "failed", "error": str(e), "tool_call_id": tool_call_id},
session_id=session.session_id,
)
return tool_exec
tool_exec.duration_ms = (time.monotonic() - start) * 1000
if not result.plan:
err = result.error or "Plan vacio"
logger.warning(
"[acai_plan] Plan FAILED: %s (raw_preview=%r)",
err, (result.raw_text or "")[:200],
)
tool_exec.status = ToolExecutionStatus.FAILED
tool_exec.error = err
tool_exec.result_summary = (
f"acai_plan FALLO: {err}. Procede en modo directo o reintenta con scope distinto."
)
tool_exec.raw_output = json.dumps({
"error": err,
"raw_text_preview": (result.raw_text or "")[:500],
})
await self.sse.emit(
EventType.TOOL_COMPLETED,
{"tool": "acai_plan", "status": "failed", "error": err, "tool_call_id": tool_call_id},
session_id=session.session_id,
)
return tool_exec
# Plan valido: persistir en metadata. Si habia un plan activo previo,
# moverlo a history como `superseded`.
old_plan = session.metadata.get("current_plan")
if old_plan and old_plan.get("status") == "active":
old_plan["status"] = "superseded"
session.metadata.setdefault("plan_history", []).append(old_plan)
plan = dict(result.plan)
plan["cursor"] = 0
plan["completed_step_ids"] = []
plan["status"] = "active"
plan["created_at"] = int(time.time())
session.metadata["current_plan"] = plan
steps = plan.get("steps") or []
next_desc = steps[0]["description"] if steps else "(plan vacio)"
n_steps = len(steps)
n_risks = len(plan.get("risks") or [])
tool_exec.status = ToolExecutionStatus.COMPLETED
tool_exec.result_summary = (
f"Plan generado: {n_steps} step(s), {n_risks} risk(s). "
f"Proximo: step 1 — {next_desc[:200]}"
)
logger.info(
"[acai_plan] Plan persisted: %d steps, %d risks, objective=%r",
n_steps, n_risks, objective[:120],
)
# raw_output al modelo: el JSON completo del plan (truncado a 4000 chars).
plan_json = json.dumps(plan, ensure_ascii=False)
if len(plan_json) > 4000:
tool_exec.raw_output = plan_json[:4000] + "\n[...truncated]"
else:
tool_exec.raw_output = plan_json
await self.sse.emit(
EventType.TOOL_COMPLETED,
{
"tool": "acai_plan",
"status": "completed",
"summary": tool_exec.result_summary[:200],
"raw_output": tool_exec.raw_output[:4000],
"tool_call_id": tool_call_id,
},
session_id=session.session_id,
)
# PlanStepper UI: notifica al frontend que hay un plan nuevo activo.
await self.sse.emit(
EventType.PLAN_CREATED,
{
"objective": plan.get("objective", ""),
"steps": [
{
"id": s.get("id"),
"description": s.get("description", "")[:300],
"agent_action": s.get("agent_action", "")[:200],
"files_touched": s.get("files_touched", [])[:10],
"tables_touched": s.get("tables_touched", [])[:10],
}
for s in plan.get("steps", [])
],
"risks": plan.get("risks", [])[:10],
"cursor": plan.get("cursor", 0),
"completed_step_ids": plan.get("completed_step_ids", []),
"status": plan.get("status", "active"),
},
session_id=session.session_id,
)
return tool_exec
async def _execute_acai_plan_advance(
self,
session: SessionState,
arguments: dict[str, Any],
tool_call_id: str,
tool_exec: ToolExecution,
) -> ToolExecution:
"""Avanza/abandona el plan activo."""
plan = session.metadata.get("current_plan")
if not plan or plan.get("status") != "active":
tool_exec.status = ToolExecutionStatus.COMPLETED
tool_exec.result_summary = "No hay plan activo."
tool_exec.raw_output = json.dumps({"status": "no_active_plan"})
await self.sse.emit(
EventType.TOOL_COMPLETED,
{"tool": "acai_plan_advance", "status": "completed", "summary": tool_exec.result_summary, "tool_call_id": tool_call_id},
session_id=session.session_id,
)
return tool_exec
if arguments.get("abandon"):
plan["status"] = "abandoned"
session.metadata.setdefault("plan_history", []).append(plan)
session.metadata["current_plan"] = None
tool_exec.status = ToolExecutionStatus.COMPLETED
tool_exec.result_summary = "Plan abandonado."
tool_exec.raw_output = json.dumps({"status": "abandoned"})
await self.sse.emit(
EventType.TOOL_COMPLETED,
{"tool": "acai_plan_advance", "status": "completed", "summary": tool_exec.result_summary, "tool_call_id": tool_call_id},
session_id=session.session_id,
)
await self.sse.emit(
EventType.PLAN_ENDED,
{"status": "abandoned", "objective": plan.get("objective", "")},
session_id=session.session_id,
)
return tool_exec
# Aplicar completed_ids
completed_in = arguments.get("completed_ids") or []
completed_set = set(plan.get("completed_step_ids", []))
for cid in completed_in:
if isinstance(cid, int) and cid not in completed_set:
plan.setdefault("completed_step_ids", []).append(cid)
completed_set.add(cid)
# Aplicar cursor
steps = plan.get("steps") or []
if "next_cursor" in arguments:
plan["cursor"] = max(0, min(int(arguments["next_cursor"]), len(steps)))
else:
# Auto-avanzar al primer step no completado.
for i, st in enumerate(steps):
if st.get("id") not in completed_set:
plan["cursor"] = i
break
else:
plan["status"] = "done"
cursor = plan.get("cursor", 0)
if plan.get("status") == "done" or cursor >= len(steps):
tool_exec.result_summary = f"Plan completado ({len(completed_set)}/{len(steps)} steps)."
else:
next_desc = steps[cursor].get("description", "(?)") if cursor < len(steps) else "(?)"
tool_exec.result_summary = (
f"Plan avanzado a step {cursor + 1}/{len(steps)}: {next_desc[:200]}"
)
tool_exec.status = ToolExecutionStatus.COMPLETED
tool_exec.raw_output = json.dumps({
"cursor": plan.get("cursor", 0),
"completed_step_ids": plan.get("completed_step_ids", []),
"status": plan.get("status", "active"),
})
await self.sse.emit(
EventType.TOOL_COMPLETED,
{"tool": "acai_plan_advance", "status": "completed", "summary": tool_exec.result_summary, "tool_call_id": tool_call_id},
session_id=session.session_id,
)
# Emitir PLAN_ADVANCED o PLAN_ENDED segun el resultado.
if plan.get("status") == "done":
await self.sse.emit(
EventType.PLAN_ENDED,
{"status": "done", "objective": plan.get("objective", "")},
session_id=session.session_id,
)
else:
await self.sse.emit(
EventType.PLAN_ADVANCED,
{
"cursor": plan.get("cursor", 0),
"completed_step_ids": plan.get("completed_step_ids", []),
"status": plan.get("status", "active"),
},
session_id=session.session_id,
)
return tool_exec
@staticmethod
def _match_step_to_executions(
step: dict[str, Any],
tool_executions: list[ToolExecution],
) -> bool:
"""Heuristica: matchea step.agent_action con tool calls reales.
Marca el step como completado si alguna de las tools ejecutadas
coincide con el `agent_action` del step. Compara:
1) nombre de la tool (normalizando guion/underscore: `acai-write`
matchea con `acai_write`).
2) si action menciona algun `files_touched` y la tool ejecutada
tiene ese path en sus argumentos.
3) si action menciona algun `tables_touched` y la tool ejecutada
tiene ese tableName en sus argumentos.
"""
action = (step.get("agent_action") or "").lower()
files_touched = [str(f).lower() for f in (step.get("files_touched") or [])]
tables_touched = [str(t).lower() for t in (step.get("tables_touched") or [])]
if not action and not files_touched and not tables_touched:
return False
for te in tool_executions:
if te.status != ToolExecutionStatus.COMPLETED:
continue
raw_name = strip_namespace(te.tool_name).lower()
# Normaliza guiones/underscores para matching tool name <-> action.
tool_variants = {raw_name, raw_name.replace("-", "_"), raw_name.replace("_", "-")}
# Match 1: nombre de la tool aparece en action
if any(v and v in action for v in tool_variants):
return True
# Match 2/3: path o tableName en los args de la tool
try:
args_str = json.dumps(te.arguments or {}, ensure_ascii=False).lower()
except Exception:
args_str = str(te.arguments or "").lower()
for f in files_touched:
if f and f in args_str:
return True
for t in tables_touched:
if t and t in args_str:
return True
return False
async def _auto_advance_plan_cursor(
self,
session: SessionState,
tool_executions_this_step: list[ToolExecution],
no_tool_calls_this_step: bool = False,
) -> None:
"""Avanza el cursor del plan tras un step interno del agente.
Usa LLM-as-judge (`plan_judge.judge_plan_progress`) para decidir que
steps del plan se acaban de completar con las tool_executions del step
actual. Mas robusto que el matching string heuristico anterior.
Si `no_tool_calls_this_step=True` y hay un plan active, marcamos el plan
como `done` — el agente decidio terminar (end_turn) sin mas tools, asi
que confiamos en su criterio. Esto cierra el plan visualmente cuando el
agente acaba.
"""
plan = session.metadata.get("current_plan")
if not plan or plan.get("status") != "active":
return
steps = plan.get("steps") or []
prev_cursor = int(plan.get("cursor", 0))
prev_completed = list(plan.get("completed_step_ids", []))
completed_set = set(prev_completed)
rationale = ""
# Si el agente termino el turn sin tools, NO marcamos los pendientes
# como completados — seria un falso positivo (caso real: agente se
# queda atascado y devuelve mensaje de chat sin haber hecho la tarea).
# Solo si el `completed_set` previo ya cubre todos los steps cerramos
# como done; si quedan pendientes, dejamos `active`.
if no_tool_calls_this_step:
if steps and len(completed_set) >= len(steps):
rationale = "agente termino el turn; todos los steps ya completados"
else:
rationale = "agente termino el turn con steps pendientes (no cerrado)"
# No tocar completed_set: respetamos lo que el judge dijo en steps previos
elif tool_executions_this_step:
# Pregunta al judge que steps acaba de completar.
try:
completed_ids, judge_rationale = await judge_plan_progress(
plan=plan,
tool_executions_this_step=tool_executions_this_step,
model_adapter=self.model,
model_id=self.profile.model_id,
)
for cid in completed_ids:
completed_set.add(cid)
rationale = judge_rationale
except Exception as e:
logger.warning("[plan-judge] failed, no advance this step: %s", e)
# Sin judge, no avanzamos el cursor — preferimos dejar el plan
# como esta antes que falsos positivos heuristicos.
return
# Cursor: primer step NO completado. Si todos completados → done.
cursor = len(steps)
for i, step in enumerate(steps):
if step.get("id") not in completed_set:
cursor = i
break
plan["cursor"] = cursor
plan["completed_step_ids"] = sorted(completed_set)
ended = False
if cursor >= len(steps) and steps:
plan["status"] = "done"
ended = True
# Solo emitimos si hubo cambio real.
changed = cursor != prev_cursor or set(plan["completed_step_ids"]) != set(prev_completed)
logger.info(
"[plan-advance] tools_in_step=%d prev_cursor=%d new_cursor=%d completed=%s changed=%s rationale=%r",
len(tool_executions_this_step), prev_cursor, cursor,
plan["completed_step_ids"], changed, rationale[:160],
)
if not changed:
return
try:
if ended:
await self.sse.emit(
EventType.PLAN_ENDED,
{"status": "done", "objective": plan.get("objective", "")},
session_id=session.session_id,
)
else:
await self.sse.emit(
EventType.PLAN_ADVANCED,
{
"cursor": plan["cursor"],
"completed_step_ids": plan["completed_step_ids"],
"status": plan.get("status", "active"),
},
session_id=session.session_id,
)
except Exception as e:
logger.warning("PLAN_ADVANCED/ENDED emit failed: %s", e)
# ---- Allowed tools --------------------------------------------------------
def _get_allowed_tools(self, followup_mode: str = "none") -> list[dict[str, Any]]:
"""Return tool definitions filtered by this agent's allowed_tools."""
"""Return tool definitions filtered by this agent's allowed_tools.
Si el agente tiene `has_planner_tool=True`, anade definiciones sinteticas
de `acai_plan` y `acai_plan_advance` (Fase 5: la tool interna no
atraviesa MCP — se intercepta en `_execute_tool`).
"""
if followup_mode == "transform":
return []
if not self.mcp.is_running:
return []
all_tools = self.mcp.get_tool_definitions()
if not self.profile.allowed_tools:
return all_tools # No filter → all tools
return [t for t in all_tools if t["name"] in self.profile.allowed_tools]
if self.profile.allowed_tools:
tool_defs = [t for t in all_tools if t["name"] in self.profile.allowed_tools]
else:
tool_defs = list(all_tools)
if self.profile.has_planner_tool:
tool_defs.append({
"name": "acai_plan",
"description": (
"Genera un plan estructurado de ejecucion. Usa esta tool al recibir "
"una peticion compuesta (landing entera, tienda, refactor amplio, modulo "
"con tabla+hook+frontend). NO la uses para tareas triviales (cambiar un titulo, "
"ajustar un color, leer datos). Devuelve JSON con steps, risks, files_touched, "
"tables_touched."
),
"input_schema": {
"type": "object",
"required": ["objective"],
"properties": {
"objective": {
"type": "string",
"description": "Descripcion en español de lo que hay que conseguir.",
},
"scope": {
"type": "string",
"description": "Restricciones opcionales (ej. 'no toques el header').",
},
},
},
})
tool_defs.append({
"name": "acai_plan_advance",
"description": (
"Avanza/abandona el plan activo. Llama con `abandon: true` si el "
"usuario corrige y el plan ya no es valido, o con `next_cursor` para "
"saltar al siguiente step pendiente."
),
"input_schema": {
"type": "object",
"properties": {
"abandon": {"type": "boolean"},
"completed_ids": {"type": "array", "items": {"type": "integer"}},
"next_cursor": {"type": "integer"},
},
},
})
return tool_defs
@staticmethod
def _extract_mcp_output(result: dict[str, Any]) -> str:
"""Extract text content from MCP tool result."""
"""Extract text content from MCP tool result.
El modelo (MiniMax M2.7) es text-only — los blocks `type=image` no
pueden reenviarse. En lugar de descartar silenciosamente (lo que dejaba
al agente con un tool_result vacio y le hacia repetir la llamada),
emitimos un placeholder explicito que le dice que use `browser_snapshot`
si quiere inspeccionar la pagina.
"""
content = result.get("content", [])
if isinstance(content, list):
parts: list[str] = []
image_count = 0
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
if not isinstance(item, dict):
continue
itype = item.get("type")
if itype == "text":
parts.append(item.get("text", ""))
elif itype == "image":
image_count += 1
if image_count and not parts:
return (
f"[{image_count} imagen(es) no procesada(s) — el modelo es "
f"text-only. Para inspeccionar la pagina usa "
f"`browser_snapshot` (devuelve accessibility tree en texto). "
f"`browser_take_screenshot` solo sirve para que el usuario "
f"vea la captura, no para tu analisis.]"
)
if image_count and parts:
parts.append(
f"\n[Adicionalmente {image_count} imagen(es) no incluida(s): "
f"el modelo no las procesa.]"
)
return "\n".join(parts) if parts else json.dumps(result)
return str(content)

View File

@@ -99,10 +99,27 @@ class OrchestratorEngine:
session_id=session.session_id,
)
# Plan mode 'force': el usuario ha pulsado el toggle Plan en el chat.
# Prependeamos una directiva al mensaje para que el agente llame
# acai_plan ANTES de ejecutar nada. El system prompt ya conoce la tool;
# esto solo bypassa la heuristica trivial-vs-complex.
plan_mode = (session.metadata.get("plan_mode") or "auto").lower()
if plan_mode == "force":
message = (
"[modo Plan activo por el usuario] Tu PRIMERA accion debe ser "
"llamar a la tool `acai_plan` con un plan detallado del trabajo "
"que vas a hacer. No ejecutes ninguna otra tool antes. Despues "
"del plan, procede con la ejecucion normal.\n\n"
f"Peticion del usuario:\n{message}"
)
# Create task
task = session.begin_task(objective=message)
task.status = TaskStatus.EXECUTING
# Reset del contador de invocaciones de `acai_plan` por turno (Fase 5).
session.metadata["plan_call_count_in_turn"] = 0
# Execute with the selected agent
agent = BaseAgent(
profile=self.agent_profile,

View File

@@ -0,0 +1,206 @@
"""LLM-as-judge para tracking del progreso del plan.
Sustituye la heuristica string-matching de `_match_step_to_executions` por
una llamada al modelo que entiende semantica. Tras cada batch de tool calls
del agente principal, le preguntamos al judge "que steps acaba de completar"
con el plan + las tools como input. Devuelve JSON con `completed_ids`.
Diseno:
- Una sola llamada non-streaming, ~300 tokens output max.
- Solo evalua steps PENDIENTES (los ya completados no se envian — ahorra tokens).
- Falla en silencio si el modelo no devuelve JSON parseable. El caller decide
si caer al matcher heuristico o no avanzar el cursor.
"""
from __future__ import annotations
import json
import logging
import re
from typing import Any
from ..adapters.base import ModelAdapter, ModelConfig
from ..models.tools import ToolExecution, ToolExecutionStatus
from .tool_groups import strip_namespace
logger = logging.getLogger(__name__)
_SYSTEM_PROMPT = """\
Eres un revisor de progreso de un plan de ejecucion. Recibes:
1. El plan con sus steps PENDIENTES (id, description, agent_action, tables_touched, files_touched).
2. Las herramientas que el agente principal acaba de ejecutar en este step (nombre, args, success).
Tu unica salida es un objeto JSON con esta forma exacta:
{
"completed_ids": [1, 4],
"rationale": "una frase corta explicando por que"
}
Reglas:
- `completed_ids` contiene los IDs de los steps que han sido COMPLETAMENTE realizados por las tools ejecutadas en este step.
- Sé estricto: si un step requiere `create_or_update_record en builder_custom` y la tool ejecutada fue `create_or_update_record en apartados`, NO esta hecho.
- Si un step requiere `acai-write template/estandar/modulos/X/index-base.tpl` y la tool fue `acai-write` con un path distinto, NO esta hecho.
- Si un step menciona varias tools (ej. "create_or_update_record + add_module_to_record") solo lo marcas como done si TODAS las tools necesarias se ejecutaron.
- Si un step usa `ask_user` como agent_action, NUNCA lo marques como done — el agente debe preguntarle al usuario manualmente.
- Si dudas, NO incluyas el id. Mejor un falso negativo (que pase a otro step) que un falso positivo (que marque algo no hecho).
- Si ninguna tool corresponde a ningun step pendiente, devuelve `"completed_ids": []`.
- `rationale`: una frase concisa en español, max 200 chars.
Devuelve SOLO el JSON, sin texto alrededor."""
_FENCE_RE = re.compile(r"```(?:json)?\s*(\{.*?\})\s*```", re.DOTALL | re.IGNORECASE)
def _parse_judge_output(raw: str) -> dict[str, Any] | None:
"""Extrae el JSON del output del judge. Tolerante a fences y texto extra."""
if not raw:
return None
# Path 1: fence
m = _FENCE_RE.search(raw)
if m:
try:
return json.loads(m.group(1))
except json.JSONDecodeError:
pass
# Path 2: balanced braces
start = raw.find("{")
if start < 0:
return None
depth = 0
in_str = False
escape = False
for i in range(start, len(raw)):
c = raw[i]
if escape:
escape = False
continue
if c == "\\":
escape = True
continue
if c == '"' and not escape:
in_str = not in_str
continue
if in_str:
continue
if c == "{":
depth += 1
elif c == "}":
depth -= 1
if depth == 0:
candidate = raw[start:i + 1]
try:
return json.loads(candidate)
except json.JSONDecodeError:
return None
return None
def _serialize_tool_execs(tool_executions: list[ToolExecution]) -> list[dict[str, Any]]:
"""Compacta tool_executions a lo minimo necesario para el judge."""
out: list[dict[str, Any]] = []
for te in tool_executions:
if te.status not in (ToolExecutionStatus.COMPLETED, ToolExecutionStatus.FAILED):
continue
out.append({
"tool": strip_namespace(te.tool_name),
"args": te.arguments or {},
"success": te.status == ToolExecutionStatus.COMPLETED,
})
return out
def _serialize_pending_steps(plan: dict[str, Any]) -> list[dict[str, Any]]:
"""Solo los steps que aun no estan completados."""
completed = set(plan.get("completed_step_ids") or [])
out: list[dict[str, Any]] = []
for s in plan.get("steps") or []:
sid = s.get("id")
if sid in completed:
continue
out.append({
"id": sid,
"description": (s.get("description") or "")[:300],
"agent_action": (s.get("agent_action") or "")[:300],
"files_touched": s.get("files_touched") or [],
"tables_touched": s.get("tables_touched") or [],
})
return out
async def judge_plan_progress(
plan: dict[str, Any],
tool_executions_this_step: list[ToolExecution],
model_adapter: ModelAdapter,
model_id: str | None = None,
) -> tuple[list[int], str]:
"""Pregunta al modelo qué steps del plan están completados tras este batch.
Devuelve `(completed_ids, rationale)`. En caso de error o JSON no parseable
devuelve `([], "judge_error: <mensaje>")` — el caller decide si aplica
fallback heuristico o ignora.
"""
pending = _serialize_pending_steps(plan)
if not pending:
return [], "no pending steps"
tools_payload = _serialize_tool_execs(tool_executions_this_step)
if not tools_payload:
return [], "no tools executed"
user_msg = json.dumps({
"plan_pending_steps": pending,
"tools_executed_this_step": tools_payload,
}, ensure_ascii=False)
# max_tokens generoso: MiniMax M2.7 puede emitir thinking blocks aunque
# pidamos `disabled`, y necesitamos espacio para el JSON output sin que
# se trunque (causa principal de `parse_failed` en sesiones reales).
config = ModelConfig(
model_id=model_id or "",
max_tokens=1500,
temperature=0.0,
extra={"thinking": {"type": "disabled"}},
)
# Llamada NO streaming — usamos `complete()` que devuelve directamente texto.
try:
response = await model_adapter.complete(
messages=[
{"role": "system", "content": _SYSTEM_PROMPT},
{"role": "user", "content": user_msg},
],
tools=None,
config=config,
)
except Exception as e:
logger.warning("[plan_judge] model call failed: %s", e)
return [], f"judge_error: {str(e)[:120]}"
raw_text = (response.content or "").strip()
parsed = _parse_judge_output(raw_text)
if not parsed or not isinstance(parsed, dict):
logger.warning("[plan_judge] could not parse JSON: %r", raw_text[:200])
return [], "judge_error: parse_failed"
raw_ids = parsed.get("completed_ids") or []
if not isinstance(raw_ids, list):
return [], "judge_error: completed_ids not a list"
pending_ids = {s["id"] for s in pending}
completed_ids = []
for cid in raw_ids:
try:
cid_int = int(cid)
except (TypeError, ValueError):
continue
# Solo acepta IDs que estaban pendientes (defensa contra alucinacion)
if cid_int in pending_ids:
completed_ids.append(cid_int)
rationale = str(parsed.get("rationale") or "")[:300]
return completed_ids, rationale

355
src/orchestrator/planner.py Normal file
View File

@@ -0,0 +1,355 @@
"""Sub-loop del planner — implementacion de la tool interna `acai_plan`.
La tool `acai_plan` se intercepta en `BaseAgent._execute_tool`. Cuando el
agente principal la llama, lanzamos `run_planner_subloop` que abre una
mini-conversacion con el modelo usando `system.planner.md` y solo tools de
lectura. Devuelve un plan JSON estructurado.
Diseno:
- El planner NO ve el thinking del agente principal directamente — recibe
un `parent_thinking_summary` reducido (~300 tokens) para no contaminar.
- max_steps=3 turnos del modelo. Suficiente para 1-2 lookups + emitir JSON.
- La salida es texto que se parsea a JSON. Si falla, retornamos error y
el agente principal decide si reintenta o pasa a modo directo.
"""
from __future__ import annotations
import json
import logging
import re
from dataclasses import dataclass
from typing import Any
from ..adapters.base import ModelAdapter, ModelConfig
from ..mcp.manager import MCPManager
from ..models.agent import AgentProfile
from .tool_groups import PLANNER_TOOLS, strip_namespace
logger = logging.getLogger(__name__)
@dataclass
class PlannerResult:
"""Resultado del sub-loop del planner."""
plan: dict[str, Any] | None
error: str = ""
raw_text: str = ""
tool_executions: list[dict[str, Any]] = None # type: ignore
def __post_init__(self) -> None:
if self.tool_executions is None:
self.tool_executions = []
# Regex para extraer el primer bloque JSON del texto del modelo.
# Soporta tanto JSON puro como dentro de fences ```json ... ```.
_FENCE_RE = re.compile(r"```(?:json)?\s*(\{.*?\})\s*```", re.DOTALL | re.IGNORECASE)
def parse_plan(raw_text: str) -> dict[str, Any] | None:
"""Extrae JSON robustamente del output del planner.
Estrategia:
1) Intenta encontrar un fence ```json ... ```.
2) Si no, busca el primer `{` con su matching `}` balanceado.
3) Parsea con json.loads; si falla, retorna None.
"""
if not raw_text:
return None
# Path 1: fence
m = _FENCE_RE.search(raw_text)
if m:
try:
return json.loads(m.group(1))
except json.JSONDecodeError:
pass
# Path 2: balanced braces — encuentra el primer `{` y avanza contando.
start = raw_text.find("{")
if start < 0:
return None
depth = 0
in_str = False
escape = False
for i in range(start, len(raw_text)):
c = raw_text[i]
if escape:
escape = False
continue
if c == "\\":
escape = True
continue
if c == '"' and not escape:
in_str = not in_str
continue
if in_str:
continue
if c == "{":
depth += 1
elif c == "}":
depth -= 1
if depth == 0:
candidate = raw_text[start:i + 1]
try:
return json.loads(candidate)
except json.JSONDecodeError:
return None
return None
def _normalize_plan(plan: dict[str, Any], objective: str) -> dict[str, Any]:
"""Asegura los campos esperados con defaults razonables."""
out: dict[str, Any] = {
"objective": str(plan.get("objective") or objective)[:500],
"steps": [],
"risks": [],
"files_touched": [],
"tables_touched": [],
"estimated_steps": 0,
"notes": "",
}
raw_steps = plan.get("steps") or []
if isinstance(raw_steps, list):
for i, s in enumerate(raw_steps):
if not isinstance(s, dict):
continue
step = {
"id": int(s.get("id") or i + 1),
"description": str(s.get("description") or "")[:500],
"agent_action": str(s.get("agent_action") or "")[:500],
"files_touched": [str(x) for x in (s.get("files_touched") or []) if x][:20],
"tables_touched": [str(x) for x in (s.get("tables_touched") or []) if x][:20],
"depends_on": [int(x) for x in (s.get("depends_on") or []) if isinstance(x, (int, str)) and str(x).isdigit()][:10],
}
out["steps"].append(step)
out["risks"] = [str(r)[:300] for r in (plan.get("risks") or []) if r][:10]
out["files_touched"] = list({f for s in out["steps"] for f in s["files_touched"]})[:30]
out["tables_touched"] = list({t for s in out["steps"] for t in s["tables_touched"]})[:30]
out["estimated_steps"] = int(plan.get("estimated_steps") or len(out["steps"]))
out["notes"] = str(plan.get("notes") or "")[:500]
return out
def _build_planner_tools(mcp: MCPManager | None) -> list[dict[str, Any]]:
"""Devuelve solo las definiciones de tools de lectura."""
if not mcp or not mcp.is_running:
return []
out: list[dict[str, Any]] = []
for tool in mcp.get_tool_definitions():
if strip_namespace(tool["name"]) in PLANNER_TOOLS:
out.append(tool)
return out
async def run_planner_subloop(
*,
objective: str,
scope: str,
agent_profile: AgentProfile,
model_adapter: ModelAdapter,
mcp: MCPManager | None,
parent_thinking_summary: str = "",
max_subloop_steps: int = 6,
) -> PlannerResult:
"""Ejecuta una mini-conversacion con el modelo para producir el plan.
NO emite SSE de cara al usuario. NO persiste artifacts. NO escribe nada.
El agente principal (su caller) integra el resultado como tool_result.
"""
system_prompt = agent_profile.system_prompt_planner or ""
if not system_prompt.strip():
return PlannerResult(plan=None, error="planner system prompt vacio")
user_msg_parts = [
f"Objetivo: {objective}",
]
if scope.strip():
user_msg_parts.append(f"Scope: {scope}")
if parent_thinking_summary.strip():
user_msg_parts.append(f"Contexto previo (resumen del thinking del agente principal):\n{parent_thinking_summary}")
user_msg_parts.append("Produce el plan JSON segun la especificacion.")
user_message = "\n\n".join(user_msg_parts)
messages: list[dict[str, Any]] = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message},
]
config = ModelConfig(
model_id=agent_profile.model_id or "",
max_tokens=agent_profile.max_tokens or 4096,
# Temperatura mas baja que el agente principal — queremos JSON limpio.
temperature=0.1,
)
tool_defs = _build_planner_tools(mcp)
tool_executions_log: list[dict[str, Any]] = []
accumulated_text = ""
accumulated_thinking = ""
for sub_step in range(max_subloop_steps):
full_text = ""
active_tools: dict[str, dict[str, Any]] = {}
tool_calls_this_step: list[dict[str, Any]] = []
finish_reason = ""
async for chunk in model_adapter.stream(
messages=messages,
tools=tool_defs if tool_defs else None,
config=config,
):
if chunk.delta:
full_text += chunk.delta
if chunk.thinking_delta:
accumulated_thinking += chunk.thinking_delta
if chunk.tool_name and chunk.tool_call_id:
if chunk.tool_call_id not in active_tools:
active_tools[chunk.tool_call_id] = {
"id": chunk.tool_call_id,
"name": chunk.tool_name,
"arguments": "",
}
if chunk.tool_arguments and chunk.tool_call_id and not chunk.finish_reason:
tool = active_tools.get(chunk.tool_call_id)
if tool:
tool["arguments"] += chunk.tool_arguments
if chunk.finish_reason == "tool_use" and chunk.tool_call_id:
tool = active_tools.pop(chunk.tool_call_id, None)
if tool:
final_args = tool["arguments"] or chunk.tool_arguments or ""
try:
tool["parsed_arguments"] = json.loads(final_args) if final_args else {}
except json.JSONDecodeError:
tool["parsed_arguments"] = {}
tool_calls_this_step.append(tool)
if chunk.finish_reason in ("end_turn", "stop_sequence"):
finish_reason = chunk.finish_reason
break
accumulated_text += full_text
# Si el modelo no llamo tools y emitio texto -> intenta parsear plan.
if not tool_calls_this_step:
plan_raw = parse_plan(full_text or accumulated_text)
if plan_raw is not None:
normalized = _normalize_plan(plan_raw, objective)
# Adjuntar resumen del thinking interno como `notes` si no lo dio.
if not normalized.get("notes") and accumulated_thinking:
normalized["notes"] = accumulated_thinking[:300]
return PlannerResult(
plan=normalized,
raw_text=full_text,
tool_executions=tool_executions_log,
)
# Si llegamos aqui sin tools y sin plan parseable, fallamos.
if sub_step >= max_subloop_steps - 1:
return PlannerResult(
plan=None,
error="No se pudo parsear el JSON del plan",
raw_text=full_text or accumulated_text,
tool_executions=tool_executions_log,
)
# Reintenta con un mensaje de correccion explicito.
messages.append({"role": "assistant", "content": full_text or accumulated_text})
messages.append({
"role": "user",
"content": (
"Tu output anterior no contenia un JSON parseable. "
"Emite UNICAMENTE el plan JSON segun la especificacion, "
"sin texto adicional alrededor."
),
})
continue
# Si llamo tools, ejecutamos las tools y seguimos el sub-loop.
# Adjuntamos el assistant message con tool_use blocks y los tool_results.
assistant_blocks: list[dict[str, Any]] = []
if full_text:
assistant_blocks.append({"type": "text", "text": full_text})
for tc in tool_calls_this_step:
assistant_blocks.append({
"type": "tool_use",
"id": tc["id"],
"name": tc["name"],
"input": tc.get("parsed_arguments", {}),
})
messages.append({"role": "assistant", "content": assistant_blocks})
tool_result_blocks: list[dict[str, Any]] = []
for tc in tool_calls_this_step:
# Solo ejecutamos tools de lectura. Si por algun bug llega una
# tool de escritura, devolvemos error en lugar de ejecutarla.
tool_name_raw = tc["name"]
if not strip_namespace(tool_name_raw) in PLANNER_TOOLS:
tool_result_blocks.append({
"type": "tool_result",
"tool_use_id": tc["id"],
"content": f"[ERROR planner] tool '{tool_name_raw}' no permitida en planner sub-loop (solo lectura).",
"is_error": True,
})
continue
try:
if not mcp or not mcp.is_running:
raise RuntimeError("MCP no disponible")
result = await mcp.call_tool(tool_name_raw, tc.get("parsed_arguments", {}))
# Extraer texto del resultado MCP
content_parts: list[str] = []
for c in (result.get("content") or []):
if isinstance(c, dict) and c.get("type") == "text":
content_parts.append(c.get("text", ""))
raw_output = "\n".join(content_parts) if content_parts else json.dumps(result)
tool_result_blocks.append({
"type": "tool_result",
"tool_use_id": tc["id"],
"content": raw_output[:4000],
})
tool_executions_log.append({
"name": tool_name_raw,
"arguments": tc.get("parsed_arguments", {}),
"raw_output_preview": raw_output[:300],
})
except Exception as e:
logger.warning("Planner tool %s failed: %s", tool_name_raw, e)
tool_result_blocks.append({
"type": "tool_result",
"tool_use_id": tc["id"],
"content": f"[ERROR] {e}",
"is_error": True,
})
messages.append({"role": "user", "content": tool_result_blocks})
# En el penultimo y ultimo turno, forzamos al modelo a parar de
# investigar y emitir el JSON. M2.7 a veces sigue pidiendo tools
# indefinidamente — hay que cortar.
if sub_step >= max_subloop_steps - 2:
messages.append({
"role": "user",
"content": (
"PARA. No llames mas tools. Ya tienes lo necesario. "
"Emite AHORA el plan JSON segun la especificacion del system prompt. "
"Solo el JSON, sin texto alrededor."
),
})
# Si salimos del loop sin plan, fallamos.
logger.warning(
"Planner agotado: %d steps, %d tool calls totales, accumulated_text=%r",
max_subloop_steps,
len(tool_executions_log),
accumulated_text[:300],
)
return PlannerResult(
plan=None,
error=f"Planner agotado tras {max_subloop_steps} steps sin emitir JSON",
raw_text=accumulated_text,
tool_executions=tool_executions_log,
)

View File

@@ -25,6 +25,15 @@ class AgentRegistry:
self._agents: dict[str, AgentProfile] = {}
self._metadata: dict[str, dict[str, Any]] = {}
self._agents_dir = agents_dir
self._contract: str = ""
def _load_contract(self) -> str:
"""Lee el contrato compartido (`_shared/contract.md`) que se concatena
al system prompt de cada agente. Si no existe, devuelve string vacio."""
contract_path = self._agents_dir / "_shared" / "contract.md"
if contract_path.is_file():
return contract_path.read_text(encoding="utf-8")
return ""
# ------------------------------------------------------------------
# Carga
@@ -34,6 +43,7 @@ class AgentRegistry:
"""Escanea agents_dir y carga todos los agentes encontrados."""
self._agents.clear()
self._metadata.clear()
self._contract = self._load_contract()
if not self._agents_dir.is_dir():
logger.warning("Agents directory not found: %s", self._agents_dir)
@@ -42,6 +52,9 @@ class AgentRegistry:
for agent_dir in sorted(self._agents_dir.iterdir()):
if not agent_dir.is_dir():
continue
# Skip directorios especiales (`_shared`, etc).
if agent_dir.name.startswith("_"):
continue
yaml_path = agent_dir / "agent.yaml"
prompt_path = agent_dir / "system.md"
@@ -60,6 +73,26 @@ class AgentRegistry:
agent_id = meta.get("name", agent_dir.name)
# Concatena contract.md al system prompt del agente
# (Fase 3: las reglas comunes viven en _shared/contract.md).
# La identidad del agente va PRIMERO, las reglas de ambiente
# despues — separadas por linea horizontal.
if self._contract:
if system_prompt:
system_prompt = system_prompt.rstrip() + "\n\n---\n\n" + self._contract
else:
system_prompt = self._contract
# Planner system prompt (opcional, usado por la tool
# interna `acai_plan` cuando el agente lo expone).
# El planner tambien recibe el contract.
planner_path = agent_dir / "system.planner.md"
planner_prompt = ""
if planner_path.exists():
planner_prompt = planner_path.read_text(encoding="utf-8")
if self._contract:
planner_prompt = planner_prompt.rstrip() + "\n\n---\n\n" + self._contract
profile = AgentProfile(
role=agent_id,
name=agent_id,
@@ -79,6 +112,12 @@ class AgentRegistry:
"task_state",
]),
stream_deltas=meta.get("stream_deltas", True),
kb_load_strategy=meta.get("kb_load_strategy", "top_n"),
kb_tags=meta.get("kb_tags", []),
kb_max_tokens=meta.get("kb_max_tokens"),
kb_top_n=meta.get("kb_top_n"),
has_planner_tool=meta.get("has_planner_tool", False),
system_prompt_planner=planner_prompt,
)
self._agents[agent_id] = profile

View File

@@ -0,0 +1,63 @@
"""Grupos de tools utilizados por el orquestador.
`READ_TOOLS`: tools de solo lectura. Son seguras de exponer en sub-loops
(p.ej. el planner) porque NO modifican estado del proyecto.
`PLANNER_TOOLS`: alias de READ_TOOLS — el planner SOLO investiga.
`PLAN_INTERNAL_TOOLS`: tools sinteticas implementadas por el orquestador
Python (no atraviesan MCP). Se interceptan en `BaseAgent._execute_tool`.
"""
from __future__ import annotations
# Whitelist de tools de lectura. Cualquier tool MCP cuyo nombre `endswith`
# uno de estos sufijos o coincide exactamente entra en el set tras
# normalizar el namespace (p.ej. `acai_code__list_tables` se compara
# contra el sufijo `list_tables`).
READ_TOOL_NAMES: frozenset[str] = frozenset({
# Files (lectura/busqueda)
"acai-glob", "acai-grep", "acai-view",
# Records (lectura)
"list_table_records", "get_record",
"list_page_modules", "get_module_config_vars",
"list_record_uploads",
# Schema / tables (lectura)
"list_tables", "get_table_schema",
# Layout / libraries (lectura)
"get_layout_field", "list_global_libraries",
# Hooks (lectura)
"get_hook_middleware",
# Project / web (lectura)
"get_web_url",
# Git (lectura)
"list_git_log",
# Docs (lectura)
"list_docs", "read_doc",
})
PLANNER_TOOLS: frozenset[str] = READ_TOOL_NAMES
PLAN_INTERNAL_TOOL_NAMES: frozenset[str] = frozenset({
"acai_plan",
"acai_plan_advance",
})
def strip_namespace(tool_name: str) -> str:
"""Extrae el nombre raw de una tool con namespace.
El MCPManager prefija con `<server>__` cuando hay multiples servers.
Para comparar contra READ_TOOL_NAMES quitamos ese prefijo.
"""
if "__" in tool_name:
return tool_name.split("__", 1)[1]
return tool_name
def is_read_tool(tool_name: str) -> bool:
return strip_namespace(tool_name) in READ_TOOL_NAMES
def is_plan_internal_tool(tool_name: str) -> bool:
return strip_namespace(tool_name) in PLAN_INTERNAL_TOOL_NAMES