From 967d5bf25d3a32a67a6106a11d8c2ba6a4ff4f26 Mon Sep 17 00:00:00 2001 From: Jordan Diaz Date: Fri, 3 Apr 2026 23:57:08 +0000 Subject: [PATCH] =?UTF-8?q?Simplificar=20a=20agente=20=C3=BAnico:=20elimin?= =?UTF-8?q?ar=20planner/reviewer/steps?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit El sistema multi-agente (planner → coder → reviewer) añadía complejidad y causaba problemas (sobreplanificaci��n, repetición de tareas, pérdida de contexto entre steps). Ahora: mensaje → coder → respuesta. Como Claude Code. - El coder decide si responder directamente o usar tools - Sin plan intermedio, sin reviewer, sin steps - Un solo execute() con conversación real completa - Historial compactado con key_data al finalizar - System prompt actualizado: asistente de desarrollo, no agente Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orchestrator/agents/coder.py | 10 +- src/orchestrator/engine.py | 519 +++++-------------------------- 2 files changed, 89 insertions(+), 440 deletions(-) diff --git a/src/orchestrator/agents/coder.py b/src/orchestrator/agents/coder.py index 821089c..64a2776 100644 --- a/src/orchestrator/agents/coder.py +++ b/src/orchestrator/agents/coder.py @@ -5,14 +5,12 @@ from __future__ import annotations from ...models.agent import AgentProfile, AgentRole from .base import BaseAgent -CODER_SYSTEM_PROMPT = """Eres un Agente Programador de Acai CMS. Tu rol es ejecutar tareas de implementación usando las herramientas MCP disponibles. +CODER_SYSTEM_PROMPT = """Eres el asistente de desarrollo de Acai CMS. Ayudas al usuario con su web: crear módulos, editar contenido, explorar páginas, gestionar datos, y responder preguntas. ## Instrucciones -- Concéntrate en la descripción del paso actual. -- Usa herramientas para lograr la tarea. -- Sé preciso y minucioso. -- Reporta lo que lograste, problemas encontrados y hechos relevantes. -- NO produzcas explicaciones innecesarias — produce resultados. +- Si el usuario saluda o hace una pregunta conversacional, responde directamente sin usar herramientas. +- Si el usuario pide una acción sobre su web, usa las herramientas MCP disponibles. +- Sé conciso y directo. NO expliques lo que vas a hacer — hazlo. - Responde SIEMPRE en español. - SIGUE ESTRICTAMENTE la Knowledge Base — contiene las convenciones obligatorias del proyecto. diff --git a/src/orchestrator/engine.py b/src/orchestrator/engine.py index eb1ffdb..7f11bb9 100644 --- a/src/orchestrator/engine.py +++ b/src/orchestrator/engine.py @@ -1,13 +1,13 @@ -"""Orchestrator Engine — the main execution loop. +"""Orchestrator Engine — single-agent execution. -Flow: message → plan → route → execute steps → summarize → update → stream +Flow: message → coder agent (with tools) → response +No planner, no reviewer. The coder decides what to do. """ from __future__ import annotations import asyncio import logging -from datetime import datetime, timezone from typing import Any from ..adapters.base import ModelAdapter @@ -16,19 +16,15 @@ from ..context.engine import ContextEngine from ..mcp.manager import MCPManager from ..memory.store import MemoryStore from ..models.agent import AgentRole -from ..models.session import SessionState, SessionStatus, TaskState, TaskStatus, TaskStep +from ..models.session import SessionState, SessionStatus, TaskStatus from ..streaming.sse import SSEEmitter, EventType from .agents.coder import CoderAgent, create_coder_profile -from .agents.collector import CollectorAgent, create_collector_profile -from .agents.planner import PlannerAgent, create_planner_profile -from .agents.reviewer import ReviewerAgent, create_reviewer_profile -from .router import route_step logger = logging.getLogger(__name__) class OrchestratorEngine: - """Drives the full execution lifecycle for a session message.""" + """Drives execution for a session message. Single agent, no planning.""" def __init__( self, @@ -43,14 +39,7 @@ class OrchestratorEngine: self.mcp = mcp_client self.memory = memory_store self.sse = sse_emitter - - # Pre-built agent profiles - self._profiles = { - AgentRole.PLANNER: create_planner_profile(), - AgentRole.CODER: create_coder_profile(), - AgentRole.COLLECTOR: create_collector_profile(), - AgentRole.REVIEWER: create_reviewer_profile(), - } + self._coder_profile = create_coder_profile() # ------------------------------------------------------------------ # Public @@ -61,17 +50,10 @@ class OrchestratorEngine: session: SessionState, message: str, ) -> dict[str, Any]: - """Process a user message through the full orchestration pipeline. - - Pipeline: plan → execute steps → review → complete - - Handles errors gracefully: failed steps are marked and skipped, - the session always returns to idle/error — never stuck in executing. - """ - task = None + """Process a user message. Single agent execution with timeout.""" try: return await asyncio.wait_for( - self._run_pipeline(session, message), + self._run(session, message), timeout=settings.max_execution_timeout_seconds, ) except asyncio.TimeoutError: @@ -86,7 +68,7 @@ class OrchestratorEngine: ) return self._error_result(session, "Execution timed out") except Exception as e: - logger.exception("Unhandled error in pipeline for session %s", session.session_id) + logger.exception("Unhandled error for session %s", session.session_id) if session.current_task: session.current_task.mark_failed(str(e)) session.status = SessionStatus.ERROR @@ -97,12 +79,12 @@ class OrchestratorEngine: ) return self._error_result(session, str(e)) - async def _run_pipeline( + async def _run( self, session: SessionState, message: str, ) -> dict[str, Any]: - """Inner pipeline — wrapped by process_message for error handling.""" + """Execute: message → coder → response.""" await self.sse.emit( EventType.EXECUTION_STARTED, @@ -110,233 +92,71 @@ class OrchestratorEngine: session_id=session.session_id, ) - # 1. Create task from message + # Create task task = session.begin_task(objective=message) + task.status = TaskStatus.EXECUTING + + # Execute with the coder agent directly + agent = CoderAgent( + profile=self._coder_profile, + model_adapter=self.model, + context_engine=self.context, + mcp_client=self.mcp, + memory_store=self.memory, + sse_emitter=self.sse, + ) - # 2. Plan - task.status = TaskStatus.PLANNING - planner_usage: dict[str, int] = {"input_tokens": 0, "output_tokens": 0} try: - planner = self._create_agent(AgentRole.PLANNER) - plan_result, planner_usage = await planner.plan(session) - - # Direct response — no plan needed (saludo, pregunta simple) - if isinstance(plan_result, str): - logger.info("Planner returned direct response for task %s", task.task_id) - - # Save to task history so conversation context is preserved - session.task_history.append({ - "task_id": task.task_id, - "objective": message, - "status": "completed", - "steps": 0, - "facts": [], - "key_data": {}, - "tools_used": [], - "artifacts_count": 0, - "summary": f"User: {message[:200]} → Agent: {plan_result[:200]}", - "review": "", - }) - if len(session.task_history) > 20: - session.task_history = session.task_history[-20:] - - task.status = TaskStatus.COMPLETED - session.complete_task() - - # Emit as text streaming for the frontend - await self.sse.emit( - EventType.AGENT_DELTA, - {"agent": "coder", "delta": plan_result, "step": 0}, - session_id=session.session_id, - ) - - cost_usd = ( - (planner_usage.get("input_tokens", 0) / 1_000_000) * settings.cost_per_1m_input - + (planner_usage.get("output_tokens", 0) / 1_000_000) * settings.cost_per_1m_output - ) - - await self.sse.emit( - EventType.EXECUTION_COMPLETED, - { - "session_id": session.session_id, - "task_id": task.task_id, - "steps_completed": 0, - "steps_failed": [], - "status": "completed", - "usage": planner_usage, - "total_cost_usd": round(cost_usd, 6), - }, - session_id=session.session_id, - ) - - return { - "session_id": session.session_id, - "task_id": task.task_id, - "content": plan_result, - "steps_completed": 0, - "steps_failed": [], - "artifacts_count": 0, - "review": "", - "status": "completed", - "usage": planner_usage, - "total_cost_usd": round(cost_usd, 6), - } - - # Enforce max 2 steps: 1 coder + 1 optional reviewer - # The coder is capable enough to do everything in 1 step - if len(plan_result) > 2: - logger.warning("Plan had %d steps, trimming to 2", len(plan_result)) - # Keep first coder step + merge descriptions of remaining into it - merged_desc = "; ".join(s.description for s in plan_result) - plan_result[0].description = merged_desc - # Add reviewer if any step was reviewer - has_reviewer = any(s.agent_role == "reviewer" for s in plan_result) - if has_reviewer: - plan_result = [plan_result[0], TaskStep(description="Revisar el resultado", agent_role="reviewer")] - else: - plan_result = [plan_result[0]] - - task.plan = plan_result - task.status = TaskStatus.EXECUTING + result = await agent.execute( + session=session, + max_steps=settings.subagent_max_steps, + ) except Exception as e: - logger.error("Planning failed: %s", e) - task.mark_failed(f"Planning failed: {e}") + logger.error("Execution failed: %s", e) + task.mark_failed(str(e)) session.status = SessionStatus.ERROR await self.sse.emit( EventType.ERROR, - {"error": "planning_failed", "message": str(e)}, + {"error": "execution_failed", "message": str(e)}, session_id=session.session_id, ) - return self._error_result(session, f"Planning failed: {e}") + return self._error_result(session, str(e)) - logger.info( - "Plan created with %d steps for task %s", - len(plan_result), - task.task_id, - ) + # Compact to history + content = result.get("content", "") + usage = result.get("usage", {"input_tokens": 0, "output_tokens": 0}) + key_data = self._extract_key_data_from_results([result]) - # Emit plan to frontend as a visible block - plan_tool_id = f"plan_{task.task_id}" - plan_steps_list = [ - {"step": i + 1, "agent": s.agent_role, "description": s.description} - for i, s in enumerate(plan_result) - ] - await self.sse.emit( - EventType.TOOL_STARTED, - {"tool": "plan", "step": 0, "tool_call_id": plan_tool_id}, - session_id=session.session_id, - ) - plan_text = "\n".join( - f" {s['step']}. [{s['agent']}] {s['description']}" - for s in plan_steps_list - ) - await self.sse.emit( - EventType.TOOL_COMPLETED, - { - "tool": "plan", - "status": "completed", - "summary": f"Plan: {len(plan_result)} steps", - "raw_output": f"Plan de ejecución ({len(plan_result)} pasos):\n{plan_text}", - "tool_call_id": plan_tool_id, - }, - session_id=session.session_id, - ) + session.task_history.append({ + "task_id": task.task_id, + "objective": message, + "status": "completed", + "steps": 1, + "facts": task.facts_extracted[-10:], + "key_data": key_data, + "tools_used": [te.tool_name for te in result.get("tool_executions", [])], + "artifacts_count": len(result.get("artifacts", [])), + "summary": f"User: {message[:150]} → Agent: {content[:150]}", + "review": "", + }) + if len(session.task_history) > 20: + session.task_history = session.task_history[-20:] - # 3. Execute each step — failures are logged and skipped - results: list[dict[str, Any]] = [] - failed_steps: list[int] = [] + # Clean old artifacts + artifacts = await self.memory.list_artifacts(session.session_id) + recent_task_ids = {t["task_id"] for t in session.task_history[-2:]} + for artifact in artifacts: + if artifact.task_id not in recent_task_ids: + key = f"{self.memory._prefix}:session:{session.session_id}:artifacts" + await self.memory._r.hdel(key, artifact.artifact_id) - for i, step in enumerate(task.plan): - if i >= settings.max_execution_steps: - logger.warning("Max execution steps reached") - break - - task.current_step_index = i - step.status = TaskStatus.EXECUTING - step.started_at = datetime.now(timezone.utc) - - role = route_step(step) - agent = self._create_agent(role) - - await self.sse.emit( - EventType.SUBAGENT_ASSIGNED, - { - "step": i + 1, - "total_steps": len(task.plan), - "agent": role.value, - "description": step.description, - }, - session_id=session.session_id, - ) - - try: - step_result = await agent.execute( - session=session, - max_steps=settings.subagent_max_steps, - ) - results.append(step_result) - - step.status = TaskStatus.COMPLETED - step.completed_at = datetime.now(timezone.utc) - step.result_summary = (step_result.get("content", ""))[:500] - step.tools_used = [ - te.tool_name for te in step_result.get("tool_executions", []) - ] - - for artifact in step_result.get("artifacts", []): - task.facts_extracted.extend(artifact.facts[:5]) - - # Decide if previous steps should be compacted - if i > 0: - self._maybe_compact_previous_steps(task, current_index=i) - - except Exception as e: - logger.error("Step %d failed: %s", i + 1, e) - step.status = TaskStatus.FAILED - step.completed_at = datetime.now(timezone.utc) - step.result_summary = f"Error: {e}" - failed_steps.append(i + 1) - - await self.sse.emit( - EventType.ERROR, - {"error": "step_failed", "step": i + 1, "message": str(e)}, - session_id=session.session_id, - ) - # Continue with next step — don't block the pipeline - - # 4. Review (if plan had more than 1 step and at least one succeeded) - review_result: dict[str, Any] = {} - if len(task.plan) > 1 and results: - task.status = TaskStatus.REVIEWING - try: - reviewer = self._create_agent(AgentRole.REVIEWER) - review_result = await reviewer.execute( - session=session, - max_steps=2, - ) - except Exception as e: - logger.error("Review failed: %s", e) - review_result = {"content": f"Review skipped due to error: {e}"} - - # 5. Compact task into history before completing - await self._compact_task_to_history(session, task, results, review_result) - - # 6. Complete — session ALWAYS returns to idle + # Complete + task.status = TaskStatus.COMPLETED session.complete_task() - final_content = self._assemble_response(results, review_result) - status = "completed" if not failed_steps else "partial" - - # Accumulate token usage: planner + all steps + review - total_input = planner_usage.get("input_tokens", 0) - total_output = planner_usage.get("output_tokens", 0) - for r in results: - total_input += r.get("usage", {}).get("input_tokens", 0) - total_output += r.get("usage", {}).get("output_tokens", 0) - # Add review usage if any - total_input += review_result.get("usage", {}).get("input_tokens", 0) - total_output += review_result.get("usage", {}).get("output_tokens", 0) # Calculate cost + total_input = usage.get("input_tokens", 0) + total_output = usage.get("output_tokens", 0) cost_usd = ( (total_input / 1_000_000) * settings.cost_per_1m_input + (total_output / 1_000_000) * settings.cost_per_1m_output @@ -347,38 +167,37 @@ class OrchestratorEngine: { "session_id": session.session_id, "task_id": task.task_id, - "steps_completed": len(results), - "steps_failed": failed_steps, - "status": status, - "usage": { - "input_tokens": total_input, - "output_tokens": total_output, - }, + "steps_completed": 1, + "steps_failed": [], + "status": "completed", + "usage": usage, "total_cost_usd": round(cost_usd, 6), }, session_id=session.session_id, ) + logger.info( + "Task %s completed (%d tools, %d artifacts, %d input tokens)", + task.task_id, + len(result.get("tool_executions", [])), + len(result.get("artifacts", [])), + total_input, + ) + return { "session_id": session.session_id, "task_id": task.task_id, - "content": final_content, - "steps_completed": len(results), - "steps_failed": failed_steps, - "artifacts_count": sum( - len(r.get("artifacts", [])) for r in results - ), - "review": review_result.get("content", ""), - "status": status, - "usage": { - "input_tokens": total_input, - "output_tokens": total_output, - }, + "content": content or "Task completed.", + "steps_completed": 1, + "steps_failed": [], + "artifacts_count": len(result.get("artifacts", [])), + "review": "", + "status": "completed", + "usage": usage, "total_cost_usd": round(cost_usd, 6), } def _error_result(self, session: SessionState, error: str) -> dict[str, Any]: - """Build a standardized error response.""" task_id = session.current_task.task_id if session.current_task else "none" return { "session_id": session.session_id, @@ -391,204 +210,36 @@ class OrchestratorEngine: "status": "error", } - # ------------------------------------------------------------------ - # Internals - # ------------------------------------------------------------------ - - async def _compact_task_to_history( - self, - session: SessionState, - task: TaskState, - results: list[dict[str, Any]], - review_result: dict[str, Any], - ) -> None: - """Compact a completed task into a minimal history entry. - - This is critical for long sessions: instead of keeping all - artifacts and facts from every task, we compress each completed - task into a ~200 token summary that preserves: - - What was done (objective) - - What was produced (file changes, modules created) - - Key facts learned - - Issues found by reviewer - """ - # Collect all artifact summaries from this task - artifacts = await self.memory.list_artifacts(session.session_id) - task_artifacts = [a for a in artifacts if a.task_id == task.task_id] - - # Build compact summary - step_summaries = [] - for step in task.plan: - if step.result_summary: - step_summaries.append(f"{step.agent_role}: {step.result_summary[:100]}") - - tools_used = set() - for step in task.plan: - tools_used.update(step.tools_used) - - # Extract key structured data from tool executions - key_data = self._extract_key_data_from_results(results) - - history_entry = { - "task_id": task.task_id, - "objective": task.objective, - "status": task.status.value, - "steps": len(task.plan), - "facts": task.facts_extracted[-10:], - "key_data": key_data, - "tools_used": list(tools_used)[:10], - "artifacts_count": len(task_artifacts), - "summary": "; ".join(step_summaries)[:300], - "review": (review_result.get("content", ""))[:200], - } - - # Keep max 20 task histories (trim oldest) - session.task_history.append(history_entry) - if len(session.task_history) > 20: - session.task_history = session.task_history[-20:] - - # Clean up old artifacts from Redis to free memory - # Keep only artifacts from the last 2 tasks - recent_task_ids = {t["task_id"] for t in session.task_history[-2:]} - for artifact in artifacts: - if artifact.task_id not in recent_task_ids: - # Remove old artifact from Redis hash - key = f"{self.memory._prefix}:session:{session.session_id}:artifacts" - await self.memory._r.hdel(key, artifact.artifact_id) - - logger.info( - "Compacted task %s into history (%d facts, %d tools, %d artifacts → summary)", - task.task_id, len(task.facts_extracted), len(tools_used), len(task_artifacts), - ) - @staticmethod def _extract_key_data_from_results(results: list[dict[str, Any]]) -> dict[str, Any]: - """Extract structured data from tool executions for task history. - - Preserves key identifiers (recordNum, sectionId, tableName, moduleId) - so the model retains context across tasks without re-querying. - """ + """Extract structured data from tool executions for task history.""" key_data: dict[str, Any] = {} - seen_tables: dict[str, list[int]] = {} # tableName -> recordNums + seen_tables: dict[str, list[int]] = {} seen_sections: list[str] = [] seen_modules: list[str] = [] - seen_pages: dict[str, int] = {} # page name/url -> recordNum for result in results: for te in result.get("tool_executions", []): args = te.arguments - name = te.tool_name - - # Track table + record relationships table = args.get("tableName", "") record = args.get("recordNum") if table and record: record_int = int(record) if str(record).isdigit() else None - if record_int and table not in seen_tables: - seen_tables[table] = [] - if record_int and record_int not in seen_tables.get(table, []): - seen_tables[table].append(record_int) - - # Track section IDs + if record_int: + seen_tables.setdefault(table, []) + if record_int not in seen_tables[table]: + seen_tables[table].append(record_int) section = args.get("sectionId", "") if section and section not in seen_sections: seen_sections.append(section) - - # Track modules module = args.get("moduleId", "") or args.get("moduleName", "") if module and module not in seen_modules: seen_modules.append(module) - # Extract page info from raw output (enlace, name) - if te.raw_output and "enlace" in te.raw_output: - try: - import json as _json - # Try to parse structured data from output - for line in te.raw_output.splitlines(): - line = line.strip() - if line.startswith("{"): - try: - data = _json.loads(line) - if "enlace" in data and "num" in data: - page_key = data.get("name", data["enlace"]) - seen_pages[page_key] = int(data["num"]) - except _json.JSONDecodeError: - pass - except Exception: - pass - if seen_tables: key_data["tables"] = {t: nums[:10] for t, nums in seen_tables.items()} if seen_sections: key_data["sections"] = seen_sections[:20] if seen_modules: key_data["modules"] = seen_modules[:20] - if seen_pages: - key_data["pages"] = dict(list(seen_pages.items())[:20]) - return key_data - - def _maybe_compact_previous_steps( - self, task: TaskState, current_index: int - ) -> None: - """Decide if previous steps should be compacted. Deterministic rules.""" - current_step = task.plan[current_index] - - for i in range(current_index): - prev = task.plan[i] - if prev.compacted or prev.status != TaskStatus.COMPLETED: - continue - - # Rule 1: Change of agent role → previous steps are a different focus - if prev.agent_role != current_step.agent_role: - prev.compacted = True - logger.info( - "Compacted step %d (%s) — agent changed to %s", - i + 1, prev.agent_role, current_step.agent_role, - ) - continue - - # Rule 2: More than 3 completed non-compacted steps → compact oldest - non_compacted = [ - s for s in task.plan[:current_index] - if s.status == TaskStatus.COMPLETED and not s.compacted - ] - if len(non_compacted) > 3: - non_compacted[0].compacted = True - logger.info("Compacted oldest step to stay within budget") - - def _create_agent(self, role: AgentRole) -> PlannerAgent | CoderAgent | CollectorAgent | ReviewerAgent: - """Instantiate a subagent for the given role.""" - profile = self._profiles[role] - agent_cls = { - AgentRole.PLANNER: PlannerAgent, - AgentRole.CODER: CoderAgent, - AgentRole.COLLECTOR: CollectorAgent, - AgentRole.REVIEWER: ReviewerAgent, - }[role] - - return agent_cls( - profile=profile, - model_adapter=self.model, - context_engine=self.context, - mcp_client=self.mcp, - memory_store=self.memory, - sse_emitter=self.sse, - ) - - @staticmethod - def _assemble_response( - results: list[dict[str, Any]], - review_result: dict[str, Any], - ) -> str: - """Combine step results into a coherent final response.""" - parts: list[str] = [] - for i, r in enumerate(results): - content = r.get("content", "").strip() - if content: - parts.append(f"### Step {i + 1}\n{content}") - - if review_result.get("content"): - parts.append(f"### Review\n{review_result['content']}") - - return "\n\n".join(parts) if parts else "Task completed."