Simplificar a agente único: eliminar planner/reviewer/steps

El sistema multi-agente (planner → coder → reviewer) añadía
complejidad y causaba problemas (sobreplanificaci��n, repetición
de tareas, pérdida de contexto entre steps).

Ahora: mensaje → coder → respuesta. Como Claude Code.
- El coder decide si responder directamente o usar tools
- Sin plan intermedio, sin reviewer, sin steps
- Un solo execute() con conversación real completa
- Historial compactado con key_data al finalizar
- System prompt actualizado: asistente de desarrollo, no agente

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Jordan Diaz
2026-04-03 23:57:08 +00:00
parent 1c3d67847a
commit 967d5bf25d
2 changed files with 89 additions and 440 deletions

View File

@@ -5,14 +5,12 @@ from __future__ import annotations
from ...models.agent import AgentProfile, AgentRole from ...models.agent import AgentProfile, AgentRole
from .base import BaseAgent from .base import BaseAgent
CODER_SYSTEM_PROMPT = """Eres un Agente Programador de Acai CMS. Tu rol es ejecutar tareas de implementación usando las herramientas MCP disponibles. CODER_SYSTEM_PROMPT = """Eres el asistente de desarrollo de Acai CMS. Ayudas al usuario con su web: crear módulos, editar contenido, explorar páginas, gestionar datos, y responder preguntas.
## Instrucciones ## Instrucciones
- Concéntrate en la descripción del paso actual. - Si el usuario saluda o hace una pregunta conversacional, responde directamente sin usar herramientas.
- Usa herramientas para lograr la tarea. - Si el usuario pide una acción sobre su web, usa las herramientas MCP disponibles.
- Sé preciso y minucioso. - Sé conciso y directo. NO expliques lo que vas a hacer — hazlo.
- Reporta lo que lograste, problemas encontrados y hechos relevantes.
- NO produzcas explicaciones innecesarias — produce resultados.
- Responde SIEMPRE en español. - Responde SIEMPRE en español.
- SIGUE ESTRICTAMENTE la Knowledge Base — contiene las convenciones obligatorias del proyecto. - SIGUE ESTRICTAMENTE la Knowledge Base — contiene las convenciones obligatorias del proyecto.

View File

@@ -1,13 +1,13 @@
"""Orchestrator Engine — the main execution loop. """Orchestrator Engine — single-agent execution.
Flow: message → plan → route → execute steps → summarize → update → stream Flow: message → coder agent (with tools) → response
No planner, no reviewer. The coder decides what to do.
""" """
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import logging import logging
from datetime import datetime, timezone
from typing import Any from typing import Any
from ..adapters.base import ModelAdapter from ..adapters.base import ModelAdapter
@@ -16,19 +16,15 @@ from ..context.engine import ContextEngine
from ..mcp.manager import MCPManager from ..mcp.manager import MCPManager
from ..memory.store import MemoryStore from ..memory.store import MemoryStore
from ..models.agent import AgentRole from ..models.agent import AgentRole
from ..models.session import SessionState, SessionStatus, TaskState, TaskStatus, TaskStep from ..models.session import SessionState, SessionStatus, TaskStatus
from ..streaming.sse import SSEEmitter, EventType from ..streaming.sse import SSEEmitter, EventType
from .agents.coder import CoderAgent, create_coder_profile from .agents.coder import CoderAgent, create_coder_profile
from .agents.collector import CollectorAgent, create_collector_profile
from .agents.planner import PlannerAgent, create_planner_profile
from .agents.reviewer import ReviewerAgent, create_reviewer_profile
from .router import route_step
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class OrchestratorEngine: class OrchestratorEngine:
"""Drives the full execution lifecycle for a session message.""" """Drives execution for a session message. Single agent, no planning."""
def __init__( def __init__(
self, self,
@@ -43,14 +39,7 @@ class OrchestratorEngine:
self.mcp = mcp_client self.mcp = mcp_client
self.memory = memory_store self.memory = memory_store
self.sse = sse_emitter self.sse = sse_emitter
self._coder_profile = create_coder_profile()
# Pre-built agent profiles
self._profiles = {
AgentRole.PLANNER: create_planner_profile(),
AgentRole.CODER: create_coder_profile(),
AgentRole.COLLECTOR: create_collector_profile(),
AgentRole.REVIEWER: create_reviewer_profile(),
}
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Public # Public
@@ -61,17 +50,10 @@ class OrchestratorEngine:
session: SessionState, session: SessionState,
message: str, message: str,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Process a user message through the full orchestration pipeline. """Process a user message. Single agent execution with timeout."""
Pipeline: plan → execute steps → review → complete
Handles errors gracefully: failed steps are marked and skipped,
the session always returns to idle/error — never stuck in executing.
"""
task = None
try: try:
return await asyncio.wait_for( return await asyncio.wait_for(
self._run_pipeline(session, message), self._run(session, message),
timeout=settings.max_execution_timeout_seconds, timeout=settings.max_execution_timeout_seconds,
) )
except asyncio.TimeoutError: except asyncio.TimeoutError:
@@ -86,7 +68,7 @@ class OrchestratorEngine:
) )
return self._error_result(session, "Execution timed out") return self._error_result(session, "Execution timed out")
except Exception as e: except Exception as e:
logger.exception("Unhandled error in pipeline for session %s", session.session_id) logger.exception("Unhandled error for session %s", session.session_id)
if session.current_task: if session.current_task:
session.current_task.mark_failed(str(e)) session.current_task.mark_failed(str(e))
session.status = SessionStatus.ERROR session.status = SessionStatus.ERROR
@@ -97,12 +79,12 @@ class OrchestratorEngine:
) )
return self._error_result(session, str(e)) return self._error_result(session, str(e))
async def _run_pipeline( async def _run(
self, self,
session: SessionState, session: SessionState,
message: str, message: str,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Inner pipeline — wrapped by process_message for error handling.""" """Execute: message → coder → response."""
await self.sse.emit( await self.sse.emit(
EventType.EXECUTION_STARTED, EventType.EXECUTION_STARTED,
@@ -110,233 +92,71 @@ class OrchestratorEngine:
session_id=session.session_id, session_id=session.session_id,
) )
# 1. Create task from message # Create task
task = session.begin_task(objective=message) task = session.begin_task(objective=message)
task.status = TaskStatus.EXECUTING
# Execute with the coder agent directly
agent = CoderAgent(
profile=self._coder_profile,
model_adapter=self.model,
context_engine=self.context,
mcp_client=self.mcp,
memory_store=self.memory,
sse_emitter=self.sse,
)
# 2. Plan
task.status = TaskStatus.PLANNING
planner_usage: dict[str, int] = {"input_tokens": 0, "output_tokens": 0}
try: try:
planner = self._create_agent(AgentRole.PLANNER) result = await agent.execute(
plan_result, planner_usage = await planner.plan(session) session=session,
max_steps=settings.subagent_max_steps,
# Direct response — no plan needed (saludo, pregunta simple) )
if isinstance(plan_result, str):
logger.info("Planner returned direct response for task %s", task.task_id)
# Save to task history so conversation context is preserved
session.task_history.append({
"task_id": task.task_id,
"objective": message,
"status": "completed",
"steps": 0,
"facts": [],
"key_data": {},
"tools_used": [],
"artifacts_count": 0,
"summary": f"User: {message[:200]} → Agent: {plan_result[:200]}",
"review": "",
})
if len(session.task_history) > 20:
session.task_history = session.task_history[-20:]
task.status = TaskStatus.COMPLETED
session.complete_task()
# Emit as text streaming for the frontend
await self.sse.emit(
EventType.AGENT_DELTA,
{"agent": "coder", "delta": plan_result, "step": 0},
session_id=session.session_id,
)
cost_usd = (
(planner_usage.get("input_tokens", 0) / 1_000_000) * settings.cost_per_1m_input
+ (planner_usage.get("output_tokens", 0) / 1_000_000) * settings.cost_per_1m_output
)
await self.sse.emit(
EventType.EXECUTION_COMPLETED,
{
"session_id": session.session_id,
"task_id": task.task_id,
"steps_completed": 0,
"steps_failed": [],
"status": "completed",
"usage": planner_usage,
"total_cost_usd": round(cost_usd, 6),
},
session_id=session.session_id,
)
return {
"session_id": session.session_id,
"task_id": task.task_id,
"content": plan_result,
"steps_completed": 0,
"steps_failed": [],
"artifacts_count": 0,
"review": "",
"status": "completed",
"usage": planner_usage,
"total_cost_usd": round(cost_usd, 6),
}
# Enforce max 2 steps: 1 coder + 1 optional reviewer
# The coder is capable enough to do everything in 1 step
if len(plan_result) > 2:
logger.warning("Plan had %d steps, trimming to 2", len(plan_result))
# Keep first coder step + merge descriptions of remaining into it
merged_desc = "; ".join(s.description for s in plan_result)
plan_result[0].description = merged_desc
# Add reviewer if any step was reviewer
has_reviewer = any(s.agent_role == "reviewer" for s in plan_result)
if has_reviewer:
plan_result = [plan_result[0], TaskStep(description="Revisar el resultado", agent_role="reviewer")]
else:
plan_result = [plan_result[0]]
task.plan = plan_result
task.status = TaskStatus.EXECUTING
except Exception as e: except Exception as e:
logger.error("Planning failed: %s", e) logger.error("Execution failed: %s", e)
task.mark_failed(f"Planning failed: {e}") task.mark_failed(str(e))
session.status = SessionStatus.ERROR session.status = SessionStatus.ERROR
await self.sse.emit( await self.sse.emit(
EventType.ERROR, EventType.ERROR,
{"error": "planning_failed", "message": str(e)}, {"error": "execution_failed", "message": str(e)},
session_id=session.session_id, session_id=session.session_id,
) )
return self._error_result(session, f"Planning failed: {e}") return self._error_result(session, str(e))
logger.info( # Compact to history
"Plan created with %d steps for task %s", content = result.get("content", "")
len(plan_result), usage = result.get("usage", {"input_tokens": 0, "output_tokens": 0})
task.task_id, key_data = self._extract_key_data_from_results([result])
)
# Emit plan to frontend as a visible block session.task_history.append({
plan_tool_id = f"plan_{task.task_id}" "task_id": task.task_id,
plan_steps_list = [ "objective": message,
{"step": i + 1, "agent": s.agent_role, "description": s.description} "status": "completed",
for i, s in enumerate(plan_result) "steps": 1,
] "facts": task.facts_extracted[-10:],
await self.sse.emit( "key_data": key_data,
EventType.TOOL_STARTED, "tools_used": [te.tool_name for te in result.get("tool_executions", [])],
{"tool": "plan", "step": 0, "tool_call_id": plan_tool_id}, "artifacts_count": len(result.get("artifacts", [])),
session_id=session.session_id, "summary": f"User: {message[:150]} → Agent: {content[:150]}",
) "review": "",
plan_text = "\n".join( })
f" {s['step']}. [{s['agent']}] {s['description']}" if len(session.task_history) > 20:
for s in plan_steps_list session.task_history = session.task_history[-20:]
)
await self.sse.emit(
EventType.TOOL_COMPLETED,
{
"tool": "plan",
"status": "completed",
"summary": f"Plan: {len(plan_result)} steps",
"raw_output": f"Plan de ejecución ({len(plan_result)} pasos):\n{plan_text}",
"tool_call_id": plan_tool_id,
},
session_id=session.session_id,
)
# 3. Execute each step — failures are logged and skipped # Clean old artifacts
results: list[dict[str, Any]] = [] artifacts = await self.memory.list_artifacts(session.session_id)
failed_steps: list[int] = [] recent_task_ids = {t["task_id"] for t in session.task_history[-2:]}
for artifact in artifacts:
if artifact.task_id not in recent_task_ids:
key = f"{self.memory._prefix}:session:{session.session_id}:artifacts"
await self.memory._r.hdel(key, artifact.artifact_id)
for i, step in enumerate(task.plan): # Complete
if i >= settings.max_execution_steps: task.status = TaskStatus.COMPLETED
logger.warning("Max execution steps reached")
break
task.current_step_index = i
step.status = TaskStatus.EXECUTING
step.started_at = datetime.now(timezone.utc)
role = route_step(step)
agent = self._create_agent(role)
await self.sse.emit(
EventType.SUBAGENT_ASSIGNED,
{
"step": i + 1,
"total_steps": len(task.plan),
"agent": role.value,
"description": step.description,
},
session_id=session.session_id,
)
try:
step_result = await agent.execute(
session=session,
max_steps=settings.subagent_max_steps,
)
results.append(step_result)
step.status = TaskStatus.COMPLETED
step.completed_at = datetime.now(timezone.utc)
step.result_summary = (step_result.get("content", ""))[:500]
step.tools_used = [
te.tool_name for te in step_result.get("tool_executions", [])
]
for artifact in step_result.get("artifacts", []):
task.facts_extracted.extend(artifact.facts[:5])
# Decide if previous steps should be compacted
if i > 0:
self._maybe_compact_previous_steps(task, current_index=i)
except Exception as e:
logger.error("Step %d failed: %s", i + 1, e)
step.status = TaskStatus.FAILED
step.completed_at = datetime.now(timezone.utc)
step.result_summary = f"Error: {e}"
failed_steps.append(i + 1)
await self.sse.emit(
EventType.ERROR,
{"error": "step_failed", "step": i + 1, "message": str(e)},
session_id=session.session_id,
)
# Continue with next step — don't block the pipeline
# 4. Review (if plan had more than 1 step and at least one succeeded)
review_result: dict[str, Any] = {}
if len(task.plan) > 1 and results:
task.status = TaskStatus.REVIEWING
try:
reviewer = self._create_agent(AgentRole.REVIEWER)
review_result = await reviewer.execute(
session=session,
max_steps=2,
)
except Exception as e:
logger.error("Review failed: %s", e)
review_result = {"content": f"Review skipped due to error: {e}"}
# 5. Compact task into history before completing
await self._compact_task_to_history(session, task, results, review_result)
# 6. Complete — session ALWAYS returns to idle
session.complete_task() session.complete_task()
final_content = self._assemble_response(results, review_result)
status = "completed" if not failed_steps else "partial"
# Accumulate token usage: planner + all steps + review
total_input = planner_usage.get("input_tokens", 0)
total_output = planner_usage.get("output_tokens", 0)
for r in results:
total_input += r.get("usage", {}).get("input_tokens", 0)
total_output += r.get("usage", {}).get("output_tokens", 0)
# Add review usage if any
total_input += review_result.get("usage", {}).get("input_tokens", 0)
total_output += review_result.get("usage", {}).get("output_tokens", 0)
# Calculate cost # Calculate cost
total_input = usage.get("input_tokens", 0)
total_output = usage.get("output_tokens", 0)
cost_usd = ( cost_usd = (
(total_input / 1_000_000) * settings.cost_per_1m_input (total_input / 1_000_000) * settings.cost_per_1m_input
+ (total_output / 1_000_000) * settings.cost_per_1m_output + (total_output / 1_000_000) * settings.cost_per_1m_output
@@ -347,38 +167,37 @@ class OrchestratorEngine:
{ {
"session_id": session.session_id, "session_id": session.session_id,
"task_id": task.task_id, "task_id": task.task_id,
"steps_completed": len(results), "steps_completed": 1,
"steps_failed": failed_steps, "steps_failed": [],
"status": status, "status": "completed",
"usage": { "usage": usage,
"input_tokens": total_input,
"output_tokens": total_output,
},
"total_cost_usd": round(cost_usd, 6), "total_cost_usd": round(cost_usd, 6),
}, },
session_id=session.session_id, session_id=session.session_id,
) )
logger.info(
"Task %s completed (%d tools, %d artifacts, %d input tokens)",
task.task_id,
len(result.get("tool_executions", [])),
len(result.get("artifacts", [])),
total_input,
)
return { return {
"session_id": session.session_id, "session_id": session.session_id,
"task_id": task.task_id, "task_id": task.task_id,
"content": final_content, "content": content or "Task completed.",
"steps_completed": len(results), "steps_completed": 1,
"steps_failed": failed_steps, "steps_failed": [],
"artifacts_count": sum( "artifacts_count": len(result.get("artifacts", [])),
len(r.get("artifacts", [])) for r in results "review": "",
), "status": "completed",
"review": review_result.get("content", ""), "usage": usage,
"status": status,
"usage": {
"input_tokens": total_input,
"output_tokens": total_output,
},
"total_cost_usd": round(cost_usd, 6), "total_cost_usd": round(cost_usd, 6),
} }
def _error_result(self, session: SessionState, error: str) -> dict[str, Any]: def _error_result(self, session: SessionState, error: str) -> dict[str, Any]:
"""Build a standardized error response."""
task_id = session.current_task.task_id if session.current_task else "none" task_id = session.current_task.task_id if session.current_task else "none"
return { return {
"session_id": session.session_id, "session_id": session.session_id,
@@ -391,204 +210,36 @@ class OrchestratorEngine:
"status": "error", "status": "error",
} }
# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------
async def _compact_task_to_history(
self,
session: SessionState,
task: TaskState,
results: list[dict[str, Any]],
review_result: dict[str, Any],
) -> None:
"""Compact a completed task into a minimal history entry.
This is critical for long sessions: instead of keeping all
artifacts and facts from every task, we compress each completed
task into a ~200 token summary that preserves:
- What was done (objective)
- What was produced (file changes, modules created)
- Key facts learned
- Issues found by reviewer
"""
# Collect all artifact summaries from this task
artifacts = await self.memory.list_artifacts(session.session_id)
task_artifacts = [a for a in artifacts if a.task_id == task.task_id]
# Build compact summary
step_summaries = []
for step in task.plan:
if step.result_summary:
step_summaries.append(f"{step.agent_role}: {step.result_summary[:100]}")
tools_used = set()
for step in task.plan:
tools_used.update(step.tools_used)
# Extract key structured data from tool executions
key_data = self._extract_key_data_from_results(results)
history_entry = {
"task_id": task.task_id,
"objective": task.objective,
"status": task.status.value,
"steps": len(task.plan),
"facts": task.facts_extracted[-10:],
"key_data": key_data,
"tools_used": list(tools_used)[:10],
"artifacts_count": len(task_artifacts),
"summary": "; ".join(step_summaries)[:300],
"review": (review_result.get("content", ""))[:200],
}
# Keep max 20 task histories (trim oldest)
session.task_history.append(history_entry)
if len(session.task_history) > 20:
session.task_history = session.task_history[-20:]
# Clean up old artifacts from Redis to free memory
# Keep only artifacts from the last 2 tasks
recent_task_ids = {t["task_id"] for t in session.task_history[-2:]}
for artifact in artifacts:
if artifact.task_id not in recent_task_ids:
# Remove old artifact from Redis hash
key = f"{self.memory._prefix}:session:{session.session_id}:artifacts"
await self.memory._r.hdel(key, artifact.artifact_id)
logger.info(
"Compacted task %s into history (%d facts, %d tools, %d artifacts → summary)",
task.task_id, len(task.facts_extracted), len(tools_used), len(task_artifacts),
)
@staticmethod @staticmethod
def _extract_key_data_from_results(results: list[dict[str, Any]]) -> dict[str, Any]: def _extract_key_data_from_results(results: list[dict[str, Any]]) -> dict[str, Any]:
"""Extract structured data from tool executions for task history. """Extract structured data from tool executions for task history."""
Preserves key identifiers (recordNum, sectionId, tableName, moduleId)
so the model retains context across tasks without re-querying.
"""
key_data: dict[str, Any] = {} key_data: dict[str, Any] = {}
seen_tables: dict[str, list[int]] = {} # tableName -> recordNums seen_tables: dict[str, list[int]] = {}
seen_sections: list[str] = [] seen_sections: list[str] = []
seen_modules: list[str] = [] seen_modules: list[str] = []
seen_pages: dict[str, int] = {} # page name/url -> recordNum
for result in results: for result in results:
for te in result.get("tool_executions", []): for te in result.get("tool_executions", []):
args = te.arguments args = te.arguments
name = te.tool_name
# Track table + record relationships
table = args.get("tableName", "") table = args.get("tableName", "")
record = args.get("recordNum") record = args.get("recordNum")
if table and record: if table and record:
record_int = int(record) if str(record).isdigit() else None record_int = int(record) if str(record).isdigit() else None
if record_int and table not in seen_tables: if record_int:
seen_tables[table] = [] seen_tables.setdefault(table, [])
if record_int and record_int not in seen_tables.get(table, []): if record_int not in seen_tables[table]:
seen_tables[table].append(record_int) seen_tables[table].append(record_int)
# Track section IDs
section = args.get("sectionId", "") section = args.get("sectionId", "")
if section and section not in seen_sections: if section and section not in seen_sections:
seen_sections.append(section) seen_sections.append(section)
# Track modules
module = args.get("moduleId", "") or args.get("moduleName", "") module = args.get("moduleId", "") or args.get("moduleName", "")
if module and module not in seen_modules: if module and module not in seen_modules:
seen_modules.append(module) seen_modules.append(module)
# Extract page info from raw output (enlace, name)
if te.raw_output and "enlace" in te.raw_output:
try:
import json as _json
# Try to parse structured data from output
for line in te.raw_output.splitlines():
line = line.strip()
if line.startswith("{"):
try:
data = _json.loads(line)
if "enlace" in data and "num" in data:
page_key = data.get("name", data["enlace"])
seen_pages[page_key] = int(data["num"])
except _json.JSONDecodeError:
pass
except Exception:
pass
if seen_tables: if seen_tables:
key_data["tables"] = {t: nums[:10] for t, nums in seen_tables.items()} key_data["tables"] = {t: nums[:10] for t, nums in seen_tables.items()}
if seen_sections: if seen_sections:
key_data["sections"] = seen_sections[:20] key_data["sections"] = seen_sections[:20]
if seen_modules: if seen_modules:
key_data["modules"] = seen_modules[:20] key_data["modules"] = seen_modules[:20]
if seen_pages:
key_data["pages"] = dict(list(seen_pages.items())[:20])
return key_data return key_data
def _maybe_compact_previous_steps(
self, task: TaskState, current_index: int
) -> None:
"""Decide if previous steps should be compacted. Deterministic rules."""
current_step = task.plan[current_index]
for i in range(current_index):
prev = task.plan[i]
if prev.compacted or prev.status != TaskStatus.COMPLETED:
continue
# Rule 1: Change of agent role → previous steps are a different focus
if prev.agent_role != current_step.agent_role:
prev.compacted = True
logger.info(
"Compacted step %d (%s) — agent changed to %s",
i + 1, prev.agent_role, current_step.agent_role,
)
continue
# Rule 2: More than 3 completed non-compacted steps → compact oldest
non_compacted = [
s for s in task.plan[:current_index]
if s.status == TaskStatus.COMPLETED and not s.compacted
]
if len(non_compacted) > 3:
non_compacted[0].compacted = True
logger.info("Compacted oldest step to stay within budget")
def _create_agent(self, role: AgentRole) -> PlannerAgent | CoderAgent | CollectorAgent | ReviewerAgent:
"""Instantiate a subagent for the given role."""
profile = self._profiles[role]
agent_cls = {
AgentRole.PLANNER: PlannerAgent,
AgentRole.CODER: CoderAgent,
AgentRole.COLLECTOR: CollectorAgent,
AgentRole.REVIEWER: ReviewerAgent,
}[role]
return agent_cls(
profile=profile,
model_adapter=self.model,
context_engine=self.context,
mcp_client=self.mcp,
memory_store=self.memory,
sse_emitter=self.sse,
)
@staticmethod
def _assemble_response(
results: list[dict[str, Any]],
review_result: dict[str, Any],
) -> str:
"""Combine step results into a coherent final response."""
parts: list[str] = []
for i, r in enumerate(results):
content = r.get("content", "").strip()
if content:
parts.append(f"### Step {i + 1}\n{content}")
if review_result.get("content"):
parts.append(f"### Review\n{review_result['content']}")
return "\n\n".join(parts) if parts else "Task completed."