From b88917c18d46746061295dde6654e677475b69c0 Mon Sep 17 00:00:00 2001 From: Jordan Diaz Date: Fri, 3 Apr 2026 12:09:08 +0000 Subject: [PATCH] =?UTF-8?q?Redise=C3=B1o=20tool=20results=20+=20compactaci?= =?UTF-8?q?=C3=B3n=20por=20step=20+=20integraci=C3=B3n=20Docker?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Tool results completos en conversación (como Claude Code/Cursor) en vez de resúmenes en system prompt - Parser multi-tool: trackea tool calls por tool_call_id para OpenAI streaming interleaved - Deduplicación por fingerprint + detección de loop cuando todos los calls de un step son duplicados - Compactación inteligente por step: el orquestador decide cuándo comprimir steps anteriores (cambio de agente o >3 steps) - stdio.js lee URLs del .acai como fallback (local_web_url, local_forge_host) - Buffer MCP aumentado a 1MB para respuestas grandes - Dockerfile adaptado para build context desde raíz del proyecto Co-Authored-By: Claude Opus 4.6 (1M context) --- mcp-server/stdio.js | 17 +++- src/context/engine.py | 98 ++++++++++------------ src/mcp/client.py | 1 + src/models/session.py | 1 + src/models/tools.py | 3 +- src/orchestrator/agents/base.py | 142 +++++++++++++++++++++++++------- src/orchestrator/engine.py | 35 +++++++- 7 files changed, 206 insertions(+), 91 deletions(-) diff --git a/mcp-server/stdio.js b/mcp-server/stdio.js index 5381a97..002b40e 100644 --- a/mcp-server/stdio.js +++ b/mcp-server/stdio.js @@ -22,9 +22,21 @@ registerResources(server); // Static env vars (web_url and website don't change, token does) const projectDir = process.env.ACAI_PROJECT_DIR || ""; -const website = process.env.ACAI_WEBSITE || ""; -const webUrl = process.env.ACAI_WEB_URL || ""; +const acaiFilePath = projectDir ? path.join(projectDir, ".acai") : ""; + +// Read .acai once at startup for URL fallbacks +let acaiFileData = {}; +if (acaiFilePath) { + try { + acaiFileData = JSON.parse(fs.readFileSync(acaiFilePath, "utf-8")); + } catch { /* ignore - fall back to env vars */ } +} + +const website = process.env.ACAI_WEBSITE || acaiFileData.domain || ""; +const webUrl = process.env.ACAI_WEB_URL || acaiFileData.local_web_url || ""; const derivedForgeHost = (() => { + // First check .acai for explicit forge host + if (acaiFileData.local_forge_host) return acaiFileData.local_forge_host; if (!webUrl) return ""; try { const parsed = new URL(webUrl); @@ -35,7 +47,6 @@ const derivedForgeHost = (() => { })(); const apiWebUrl = process.env.ACAI_API_WEB_URL || (derivedForgeHost ? "http://web:80/" : webUrl); const forgeHost = process.env.ACAI_FORGE_HOST || derivedForgeHost; -const acaiFilePath = projectDir ? path.join(projectDir, ".acai") : ""; // Read fresh credentials from .acai file function readFreshCredentials() { diff --git a/src/context/engine.py b/src/context/engine.py index 9f7740f..d4d413b 100644 --- a/src/context/engine.py +++ b/src/context/engine.py @@ -62,10 +62,15 @@ class ContextEngine: session: SessionState, agent: AgentProfile, artifacts: list[ArtifactSummary] | None = None, - working_items: list[dict[str, Any]] | None = None, + conversation: list[dict[str, Any]] | None = None, extra_instructions: str = "", ) -> ContextPackage: - """Build a full ContextPackage for the given agent and session.""" + """Build a full ContextPackage for the given agent and session. + + The conversation parameter contains real assistant/tool messages + with complete tool results. These go into the messages array, + not the system prompt — like professional agentic tools. + """ sections: list[ContextSection] = [] allowed = set(agent.context_sections) @@ -88,28 +93,18 @@ class ContextEngine: if "task_state" in allowed and session.task_history: sections.append(self._build_task_history(session)) - # 5. Task state — current task + # 5. Task state — current task (includes compacted previous steps) if "task_state" in allowed and session.current_task: sections.append(self._build_task_state(session.current_task)) - # 6. Artifact memory — summarised, never raw (only current task's) - if "artifact_memory" in allowed and artifacts: - sections.append(self._build_artifact_memory(artifacts)) - - # 6. Working context — recent relevant items - if "working_context" in allowed: - sections.append( - self._build_working_context(working_items or [], extra_instructions) - ) - # Compact to fit budget sections = self.compactor.compact_sections(sections) # Assemble system prompt from sections system_prompt = self._assemble_system_prompt(sections) - # Build messages (just user message — no chat history) - messages = self._build_messages(session) + # Build messages with real conversation history + messages = self._build_messages(session, conversation) total_tokens = estimate_tokens(system_prompt) + sum( estimate_tokens(m.get("content", "")) for m in messages @@ -133,6 +128,7 @@ class ContextEngine: "preview": s.content[:150].replace("\n", " "), }) + conv_len = len(conversation) if conversation else 0 debug_entry = { "timestamp": time.time(), "agent": agent.role.value, @@ -144,7 +140,7 @@ class ContextEngine: "system_prompt_tokens": estimate_tokens(system_prompt), "user_message_preview": messages[0]["content"][:200] if messages else "", "artifacts_count": len(artifacts) if artifacts else 0, - "working_items_count": len(working_items) if working_items else 0, + "conversation_messages": conv_len, } history = self._history[session.session_id] @@ -153,19 +149,14 @@ class ContextEngine: self._history[session.session_id] = history[-self._max_history:] logger.info( - "Context built for [%s/%s] — %d sections, ~%d tokens, artifacts=%d, working_items=%d", + "Context built for [%s/%s] — %d sections, ~%d tokens, artifacts=%d, conversation=%d msgs", session.session_id[:8], agent.role.value, len(sections), total_tokens, len(artifacts) if artifacts else 0, - len(working_items) if working_items else 0, + conv_len, ) - for s in section_summary: - logger.debug( - " Section [%s] prio=%d tokens=%d chars=%d", - s["type"], s["priority"], s["tokens"], s["chars"], - ) return package @@ -236,10 +227,11 @@ class ContextEngine: [ "", "## Contrato de Contexto", - "- NUNCA recibirás salidas crudas de herramientas en tu contexto.", - "- Los resultados de herramientas se resumen como artefactos.", - "- Solicita rehidratación si necesitas el contenido completo.", + "- Los resultados de herramientas se incluyen completos en la conversación.", + "- Los steps anteriores pueden estar compactados como resúmenes.", "- Mantén las respuestas enfocadas en el paso actual.", + "- Si ya tienes la información necesaria, genera tu respuesta final.", + "- NO repitas llamadas a herramientas con los mismos argumentos.", "- Responde SIEMPRE en español.", ] ) @@ -451,6 +443,14 @@ class ContextEngine: for c in task.constraints: lines.append(f"- {c}") + # Show compacted previous steps results + compacted_steps = [s for s in task.plan if s.compacted and s.result_summary] + if compacted_steps: + lines.append("") + lines.append("## Previous Steps (compacted)") + for step in compacted_steps: + lines.append(f"- [{step.agent_role}] {step.description}: {step.result_summary[:300]}") + # Show plan overview (compact) if task.plan: lines.append("") @@ -458,8 +458,9 @@ class ContextEngine: for i, step in enumerate(task.plan): marker = "→" if i == task.current_step_index else "·" status_label = step.status.value + compacted_label = " (compacted)" if step.compacted else "" lines.append( - f" {marker} Step {i + 1} [{status_label}]: {step.description}" + f" {marker} Step {i + 1} [{status_label}{compacted_label}]: {step.description}" ) content = "\n".join(lines) @@ -483,26 +484,6 @@ class ContextEngine: token_estimate=estimate_tokens(content), ) - def _build_working_context( - self, - items: list[dict[str, Any]], - extra_instructions: str, - ) -> ContextSection: - lines = ["# Working Context"] - if extra_instructions: - lines.append(f"\n{extra_instructions}") - for item in items[: settings.working_context_max_items]: - role = item.get("role", "info") - content_val = item.get("content", "") - lines.append(f"[{role}] {content_val}") - content = "\n".join(lines) - return ContextSection( - section_type=ContextSectionType.WORKING_CONTEXT, - content=content, - priority=30, - token_estimate=estimate_tokens(content), - ) - # ------------------------------------------------------------------ # Assembly # ------------------------------------------------------------------ @@ -510,14 +491,11 @@ class ContextEngine: def _assemble_system_prompt(self, sections: list[ContextSection]) -> str: """Combine sections into a single system prompt string.""" parts: list[str] = [] - # Order: rules → profile → task → artifacts → working order = [ ContextSectionType.IMMUTABLE_RULES, ContextSectionType.PROJECT_PROFILE, ContextSectionType.KNOWLEDGE_BASE, ContextSectionType.TASK_STATE, - ContextSectionType.ARTIFACT_MEMORY, - ContextSectionType.WORKING_CONTEXT, ] section_map: dict[ContextSectionType, ContextSection] = { s.section_type: s for s in sections @@ -527,11 +505,15 @@ class ContextEngine: parts.append(section_map[st].content) return "\n\n---\n\n".join(parts) - def _build_messages(self, session: SessionState) -> list[dict[str, Any]]: - """Build the messages array. We do NOT include chat history. + def _build_messages( + self, + session: SessionState, + conversation: list[dict[str, Any]] | None = None, + ) -> list[dict[str, Any]]: + """Build the messages array with real conversation history. - The user message is the current task objective (or a sentinel - if no task is active). + Includes the user objective message followed by the full + assistant/tool conversation — like professional agentic tools. """ if session.current_task: step = session.current_task.current_step() @@ -545,4 +527,10 @@ class ContextEngine: else: user_content = "Awaiting task assignment." - return [{"role": "user", "content": user_content}] + messages: list[dict[str, Any]] = [{"role": "user", "content": user_content}] + + # Append real conversation (assistant messages + tool results) + if conversation: + messages.extend(conversation) + + return messages diff --git a/src/mcp/client.py b/src/mcp/client.py index 97c4ed6..319d8ca 100644 --- a/src/mcp/client.py +++ b/src/mcp/client.py @@ -74,6 +74,7 @@ class MCPClient: stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=self._env, + limit=1024 * 1024, # 1MB buffer for large MCP responses ) self._running = True self._reader_task = asyncio.create_task(self._read_loop()) diff --git a/src/models/session.py b/src/models/session.py index 1a9a406..1194065 100644 --- a/src/models/session.py +++ b/src/models/session.py @@ -36,6 +36,7 @@ class TaskStep(BaseModel): status: TaskStatus = TaskStatus.PENDING result_summary: str = "" tools_used: list[str] = Field(default_factory=list) + compacted: bool = False # True when step results have been compacted started_at: datetime | None = None completed_at: datetime | None = None diff --git a/src/models/tools.py b/src/models/tools.py index 0f2f57b..069e9fd 100644 --- a/src/models/tools.py +++ b/src/models/tools.py @@ -33,7 +33,8 @@ class ToolExecution(BaseModel): tool_name: str arguments: dict[str, Any] = Field(default_factory=dict) status: ToolExecutionStatus = ToolExecutionStatus.PENDING - result_summary: str = "" # Summarised result — raw output is NEVER stored here + result_summary: str = "" # Summarised result for artifacts and compacted history + raw_output: str = "" # Truncated raw output for conversation messages error: str = "" duration_ms: float = 0.0 started_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) diff --git a/src/orchestrator/agents/base.py b/src/orchestrator/agents/base.py index c82d5f2..8bbee5e 100644 --- a/src/orchestrator/agents/base.py +++ b/src/orchestrator/agents/base.py @@ -2,6 +2,7 @@ from __future__ import annotations +import hashlib import json import logging import time @@ -47,6 +48,10 @@ class BaseAgent: ) -> dict[str, Any]: """Run the agent's execution loop. + Uses real conversation messages with complete tool results, + like professional agentic tools (Claude Code, Cursor). + Compaction happens at the step level, not per tool result. + Returns a result dict with keys: content, artifacts, tool_executions. """ artifacts: list[ArtifactSummary] = await self.memory.list_artifacts( @@ -54,15 +59,18 @@ class BaseAgent: ) tool_executions: list[ToolExecution] = [] accumulated_content = "" - working_items: list[dict[str, Any]] = [] + # Real conversation history: assistant messages + tool results + conversation: list[dict[str, Any]] = [] + tool_fingerprints: dict[str, ToolExecution] = {} + all_duplicates_streak = 0 # consecutive steps where ALL calls are duplicates for step in range(max_steps): - # Build context — NEVER includes raw tool output + # Build context with real conversation ctx = await self.context.build_context( session=session, agent=self.profile, artifacts=artifacts, - working_items=working_items, + conversation=conversation, ) # Prepare tool definitions @@ -77,7 +85,7 @@ class BaseAgent: full_text = "" tool_calls: list[dict[str, Any]] = [] - current_tool: dict[str, Any] | None = None + active_tools: dict[str, dict[str, Any]] = {} async for chunk in self.model.stream( messages=ctx.to_messages(), @@ -96,35 +104,40 @@ class BaseAgent: session_id=session.session_id, ) - if chunk.tool_name and (current_tool is None or not current_tool.get("name")): - current_tool = { - "id": chunk.tool_call_id, - "name": chunk.tool_name, - "arguments": "", - } - await self.sse.emit( - EventType.TOOL_STARTED, - {"tool": chunk.tool_name, "step": step}, - session_id=session.session_id, - ) + if chunk.tool_name and chunk.tool_call_id: + if chunk.tool_call_id not in active_tools: + active_tools[chunk.tool_call_id] = { + "id": chunk.tool_call_id, + "name": chunk.tool_name, + "arguments": "", + } + await self.sse.emit( + EventType.TOOL_STARTED, + {"tool": chunk.tool_name, "step": step}, + session_id=session.session_id, + ) - if chunk.tool_arguments and current_tool is not None and not chunk.finish_reason: - # Accumulate partial argument chunks (NOT the final one) - current_tool["arguments"] += chunk.tool_arguments + if chunk.tool_arguments and chunk.tool_call_id and not chunk.finish_reason: + tool = active_tools.get(chunk.tool_call_id) + if tool: + tool["arguments"] += chunk.tool_arguments - if chunk.finish_reason == "tool_use" and current_tool is not None and current_tool.get("name"): - # Final chunk carries complete arguments — use those if - # partial accumulation is empty, otherwise use accumulated - final_args = current_tool["arguments"] or chunk.tool_arguments or "" + if chunk.finish_reason == "tool_use" and chunk.tool_call_id: + tool = active_tools.pop(chunk.tool_call_id, None) + if not tool: + tool = { + "id": chunk.tool_call_id, + "name": chunk.tool_name or "", + "arguments": "", + } + final_args = tool["arguments"] or chunk.tool_arguments or "" try: args = json.loads(final_args) if final_args else {} except json.JSONDecodeError: logger.warning("Failed to parse tool args: %s", final_args[:200]) args = {} - current_tool["parsed_arguments"] = args - logger.debug("Tool call finalized: %s args=%s", current_tool["name"], json.dumps(args)[:200]) - tool_calls.append(current_tool) - current_tool = None + tool["parsed_arguments"] = args + tool_calls.append(tool) if chunk.finish_reason == "end_turn": break @@ -133,24 +146,90 @@ class BaseAgent: # If no tool calls, we're done if not tool_calls: + # Add final assistant message to conversation + if full_text: + conversation.append({"role": "assistant", "content": full_text}) break - # Execute tool calls + # Add assistant message with tool calls to conversation + # (OpenAI format: assistant message carries tool_calls) + assistant_msg: dict[str, Any] = {"role": "assistant"} + if full_text: + assistant_msg["content"] = full_text + assistant_msg["tool_calls"] = [ + { + "id": tc["id"], + "type": "function", + "function": { + "name": tc["name"], + "arguments": json.dumps(tc.get("parsed_arguments", {})), + }, + } + for tc in tool_calls + ] + conversation.append(assistant_msg) + + # Execute tool calls and add COMPLETE results to conversation + duplicates_this_step = 0 for tc in tool_calls: + fp_raw = f"{tc['name']}:{json.dumps(tc.get('parsed_arguments', {}), sort_keys=True)}" + fp = hashlib.md5(fp_raw.encode()).hexdigest() + + if fp in tool_fingerprints: + prev_exec = tool_fingerprints[fp] + tool_executions.append(prev_exec) + duplicates_this_step += 1 + # Return cached result as tool message + conversation.append({ + "role": "tool", + "tool_call_id": tc["id"], + "content": f"[DUPLICADO] Ya ejecutada con mismos argumentos. Resultado: {prev_exec.raw_output[:2000]}", + }) + logger.warning("Duplicate tool call skipped: %s (fingerprint: %s)", tc["name"], fp[:8]) + continue + tool_exec = await self._execute_tool( session=session, tool_name=tc["name"], arguments=tc.get("parsed_arguments", {}), artifacts=artifacts, ) + tool_fingerprints[fp] = tool_exec tool_executions.append(tool_exec) - # Add summarised result to working context (NEVER raw) - working_items.append({ - "role": "tool_result", - "content": f"[{tc['name']}] {tool_exec.result_summary}", + # COMPLETE result in conversation (truncated to safe limit) + conversation.append({ + "role": "tool", + "tool_call_id": tc["id"], + "content": tool_exec.raw_output[:8000] if tool_exec.raw_output else tool_exec.result_summary, }) + # Loop detection: if ALL tool calls in this step were duplicates + if duplicates_this_step == len(tool_calls): + all_duplicates_streak += 1 + if all_duplicates_streak >= 2: + logger.warning("Loop detected: %d consecutive steps with all duplicate calls. Breaking.", all_duplicates_streak) + conversation.append({ + "role": "user", + "content": "[SISTEMA] Se detectaron llamadas repetidas. Ya tienes toda la información necesaria. Genera tu respuesta final ahora.", + }) + # One more chance to generate a final response + ctx = await self.context.build_context( + session=session, agent=self.profile, + artifacts=artifacts, conversation=conversation, + ) + async for chunk in self.model.stream( + messages=ctx.to_messages(), + config=config, + ): + if chunk.delta: + accumulated_content += chunk.delta + if chunk.finish_reason: + break + break + else: + all_duplicates_streak = 0 + return { "content": accumulated_content, "artifacts": artifacts, @@ -200,6 +279,7 @@ class BaseAgent: tool_exec.status = ToolExecutionStatus.COMPLETED tool_exec.result_summary = artifact.summary + tool_exec.raw_output = raw_output[:8000] tool_exec.duration_ms = duration await self.sse.emit( diff --git a/src/orchestrator/engine.py b/src/orchestrator/engine.py index a14e83e..508ed93 100644 --- a/src/orchestrator/engine.py +++ b/src/orchestrator/engine.py @@ -16,7 +16,7 @@ from ..context.engine import ContextEngine from ..mcp.manager import MCPManager from ..memory.store import MemoryStore from ..models.agent import AgentRole -from ..models.session import SessionState, SessionStatus, TaskStatus +from ..models.session import SessionState, SessionStatus, TaskState, TaskStatus from ..streaming.sse import SSEEmitter, EventType from .agents.coder import CoderAgent, create_coder_profile from .agents.collector import CollectorAgent, create_collector_profile @@ -181,6 +181,10 @@ class OrchestratorEngine: for artifact in step_result.get("artifacts", []): task.facts_extracted.extend(artifact.facts[:5]) + # Decide if previous steps should be compacted + if i > 0: + self._maybe_compact_previous_steps(task, current_index=i) + except Exception as e: logger.error("Step %d failed: %s", i + 1, e) step.status = TaskStatus.FAILED @@ -323,6 +327,35 @@ class OrchestratorEngine: task.task_id, len(task.facts_extracted), len(tools_used), len(task_artifacts), ) + def _maybe_compact_previous_steps( + self, task: TaskState, current_index: int + ) -> None: + """Decide if previous steps should be compacted. Deterministic rules.""" + current_step = task.plan[current_index] + + for i in range(current_index): + prev = task.plan[i] + if prev.compacted or prev.status != TaskStatus.COMPLETED: + continue + + # Rule 1: Change of agent role → previous steps are a different focus + if prev.agent_role != current_step.agent_role: + prev.compacted = True + logger.info( + "Compacted step %d (%s) — agent changed to %s", + i + 1, prev.agent_role, current_step.agent_role, + ) + continue + + # Rule 2: More than 3 completed non-compacted steps → compact oldest + non_compacted = [ + s for s in task.plan[:current_index] + if s.status == TaskStatus.COMPLETED and not s.compacted + ] + if len(non_compacted) > 3: + non_compacted[0].compacted = True + logger.info("Compacted oldest step to stay within budget") + def _create_agent(self, role: AgentRole) -> PlannerAgent | CoderAgent | CollectorAgent | ReviewerAgent: """Instantiate a subagent for the given role.""" profile = self._profiles[role]