Rediseño tool results + compactación por step + integración Docker

- Tool results completos en conversación (como Claude Code/Cursor)
  en vez de resúmenes en system prompt
- Parser multi-tool: trackea tool calls por tool_call_id para
  OpenAI streaming interleaved
- Deduplicación por fingerprint + detección de loop cuando todos
  los calls de un step son duplicados
- Compactación inteligente por step: el orquestador decide cuándo
  comprimir steps anteriores (cambio de agente o >3 steps)
- stdio.js lee URLs del .acai como fallback (local_web_url, local_forge_host)
- Buffer MCP aumentado a 1MB para respuestas grandes
- Dockerfile adaptado para build context desde raíz del proyecto

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Jordan Diaz
2026-04-03 12:09:08 +00:00
parent 0dd3adbebd
commit b88917c18d
7 changed files with 206 additions and 91 deletions

View File

@@ -22,9 +22,21 @@ registerResources(server);
// Static env vars (web_url and website don't change, token does)
const projectDir = process.env.ACAI_PROJECT_DIR || "";
const website = process.env.ACAI_WEBSITE || "";
const webUrl = process.env.ACAI_WEB_URL || "";
const acaiFilePath = projectDir ? path.join(projectDir, ".acai") : "";
// Read .acai once at startup for URL fallbacks
let acaiFileData = {};
if (acaiFilePath) {
try {
acaiFileData = JSON.parse(fs.readFileSync(acaiFilePath, "utf-8"));
} catch { /* ignore - fall back to env vars */ }
}
const website = process.env.ACAI_WEBSITE || acaiFileData.domain || "";
const webUrl = process.env.ACAI_WEB_URL || acaiFileData.local_web_url || "";
const derivedForgeHost = (() => {
// First check .acai for explicit forge host
if (acaiFileData.local_forge_host) return acaiFileData.local_forge_host;
if (!webUrl) return "";
try {
const parsed = new URL(webUrl);
@@ -35,7 +47,6 @@ const derivedForgeHost = (() => {
})();
const apiWebUrl = process.env.ACAI_API_WEB_URL || (derivedForgeHost ? "http://web:80/" : webUrl);
const forgeHost = process.env.ACAI_FORGE_HOST || derivedForgeHost;
const acaiFilePath = projectDir ? path.join(projectDir, ".acai") : "";
// Read fresh credentials from .acai file
function readFreshCredentials() {

View File

@@ -62,10 +62,15 @@ class ContextEngine:
session: SessionState,
agent: AgentProfile,
artifacts: list[ArtifactSummary] | None = None,
working_items: list[dict[str, Any]] | None = None,
conversation: list[dict[str, Any]] | None = None,
extra_instructions: str = "",
) -> ContextPackage:
"""Build a full ContextPackage for the given agent and session."""
"""Build a full ContextPackage for the given agent and session.
The conversation parameter contains real assistant/tool messages
with complete tool results. These go into the messages array,
not the system prompt — like professional agentic tools.
"""
sections: list[ContextSection] = []
allowed = set(agent.context_sections)
@@ -88,28 +93,18 @@ class ContextEngine:
if "task_state" in allowed and session.task_history:
sections.append(self._build_task_history(session))
# 5. Task state — current task
# 5. Task state — current task (includes compacted previous steps)
if "task_state" in allowed and session.current_task:
sections.append(self._build_task_state(session.current_task))
# 6. Artifact memory — summarised, never raw (only current task's)
if "artifact_memory" in allowed and artifacts:
sections.append(self._build_artifact_memory(artifacts))
# 6. Working context — recent relevant items
if "working_context" in allowed:
sections.append(
self._build_working_context(working_items or [], extra_instructions)
)
# Compact to fit budget
sections = self.compactor.compact_sections(sections)
# Assemble system prompt from sections
system_prompt = self._assemble_system_prompt(sections)
# Build messages (just user message — no chat history)
messages = self._build_messages(session)
# Build messages with real conversation history
messages = self._build_messages(session, conversation)
total_tokens = estimate_tokens(system_prompt) + sum(
estimate_tokens(m.get("content", "")) for m in messages
@@ -133,6 +128,7 @@ class ContextEngine:
"preview": s.content[:150].replace("\n", " "),
})
conv_len = len(conversation) if conversation else 0
debug_entry = {
"timestamp": time.time(),
"agent": agent.role.value,
@@ -144,7 +140,7 @@ class ContextEngine:
"system_prompt_tokens": estimate_tokens(system_prompt),
"user_message_preview": messages[0]["content"][:200] if messages else "",
"artifacts_count": len(artifacts) if artifacts else 0,
"working_items_count": len(working_items) if working_items else 0,
"conversation_messages": conv_len,
}
history = self._history[session.session_id]
@@ -153,18 +149,13 @@ class ContextEngine:
self._history[session.session_id] = history[-self._max_history:]
logger.info(
"Context built for [%s/%s] — %d sections, ~%d tokens, artifacts=%d, working_items=%d",
"Context built for [%s/%s] — %d sections, ~%d tokens, artifacts=%d, conversation=%d msgs",
session.session_id[:8],
agent.role.value,
len(sections),
total_tokens,
len(artifacts) if artifacts else 0,
len(working_items) if working_items else 0,
)
for s in section_summary:
logger.debug(
" Section [%s] prio=%d tokens=%d chars=%d",
s["type"], s["priority"], s["tokens"], s["chars"],
conv_len,
)
return package
@@ -236,10 +227,11 @@ class ContextEngine:
[
"",
"## Contrato de Contexto",
"- NUNCA recibirás salidas crudas de herramientas en tu contexto.",
"- Los resultados de herramientas se resumen como artefactos.",
"- Solicita rehidratación si necesitas el contenido completo.",
"- Los resultados de herramientas se incluyen completos en la conversación.",
"- Los steps anteriores pueden estar compactados como resúmenes.",
"- Mantén las respuestas enfocadas en el paso actual.",
"- Si ya tienes la información necesaria, genera tu respuesta final.",
"- NO repitas llamadas a herramientas con los mismos argumentos.",
"- Responde SIEMPRE en español.",
]
)
@@ -451,6 +443,14 @@ class ContextEngine:
for c in task.constraints:
lines.append(f"- {c}")
# Show compacted previous steps results
compacted_steps = [s for s in task.plan if s.compacted and s.result_summary]
if compacted_steps:
lines.append("")
lines.append("## Previous Steps (compacted)")
for step in compacted_steps:
lines.append(f"- [{step.agent_role}] {step.description}: {step.result_summary[:300]}")
# Show plan overview (compact)
if task.plan:
lines.append("")
@@ -458,8 +458,9 @@ class ContextEngine:
for i, step in enumerate(task.plan):
marker = "" if i == task.current_step_index else "·"
status_label = step.status.value
compacted_label = " (compacted)" if step.compacted else ""
lines.append(
f" {marker} Step {i + 1} [{status_label}]: {step.description}"
f" {marker} Step {i + 1} [{status_label}{compacted_label}]: {step.description}"
)
content = "\n".join(lines)
@@ -483,26 +484,6 @@ class ContextEngine:
token_estimate=estimate_tokens(content),
)
def _build_working_context(
self,
items: list[dict[str, Any]],
extra_instructions: str,
) -> ContextSection:
lines = ["# Working Context"]
if extra_instructions:
lines.append(f"\n{extra_instructions}")
for item in items[: settings.working_context_max_items]:
role = item.get("role", "info")
content_val = item.get("content", "")
lines.append(f"[{role}] {content_val}")
content = "\n".join(lines)
return ContextSection(
section_type=ContextSectionType.WORKING_CONTEXT,
content=content,
priority=30,
token_estimate=estimate_tokens(content),
)
# ------------------------------------------------------------------
# Assembly
# ------------------------------------------------------------------
@@ -510,14 +491,11 @@ class ContextEngine:
def _assemble_system_prompt(self, sections: list[ContextSection]) -> str:
"""Combine sections into a single system prompt string."""
parts: list[str] = []
# Order: rules → profile → task → artifacts → working
order = [
ContextSectionType.IMMUTABLE_RULES,
ContextSectionType.PROJECT_PROFILE,
ContextSectionType.KNOWLEDGE_BASE,
ContextSectionType.TASK_STATE,
ContextSectionType.ARTIFACT_MEMORY,
ContextSectionType.WORKING_CONTEXT,
]
section_map: dict[ContextSectionType, ContextSection] = {
s.section_type: s for s in sections
@@ -527,11 +505,15 @@ class ContextEngine:
parts.append(section_map[st].content)
return "\n\n---\n\n".join(parts)
def _build_messages(self, session: SessionState) -> list[dict[str, Any]]:
"""Build the messages array. We do NOT include chat history.
def _build_messages(
self,
session: SessionState,
conversation: list[dict[str, Any]] | None = None,
) -> list[dict[str, Any]]:
"""Build the messages array with real conversation history.
The user message is the current task objective (or a sentinel
if no task is active).
Includes the user objective message followed by the full
assistant/tool conversation — like professional agentic tools.
"""
if session.current_task:
step = session.current_task.current_step()
@@ -545,4 +527,10 @@ class ContextEngine:
else:
user_content = "Awaiting task assignment."
return [{"role": "user", "content": user_content}]
messages: list[dict[str, Any]] = [{"role": "user", "content": user_content}]
# Append real conversation (assistant messages + tool results)
if conversation:
messages.extend(conversation)
return messages

View File

@@ -74,6 +74,7 @@ class MCPClient:
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=self._env,
limit=1024 * 1024, # 1MB buffer for large MCP responses
)
self._running = True
self._reader_task = asyncio.create_task(self._read_loop())

View File

@@ -36,6 +36,7 @@ class TaskStep(BaseModel):
status: TaskStatus = TaskStatus.PENDING
result_summary: str = ""
tools_used: list[str] = Field(default_factory=list)
compacted: bool = False # True when step results have been compacted
started_at: datetime | None = None
completed_at: datetime | None = None

View File

@@ -33,7 +33,8 @@ class ToolExecution(BaseModel):
tool_name: str
arguments: dict[str, Any] = Field(default_factory=dict)
status: ToolExecutionStatus = ToolExecutionStatus.PENDING
result_summary: str = "" # Summarised result — raw output is NEVER stored here
result_summary: str = "" # Summarised result for artifacts and compacted history
raw_output: str = "" # Truncated raw output for conversation messages
error: str = ""
duration_ms: float = 0.0
started_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
import hashlib
import json
import logging
import time
@@ -47,6 +48,10 @@ class BaseAgent:
) -> dict[str, Any]:
"""Run the agent's execution loop.
Uses real conversation messages with complete tool results,
like professional agentic tools (Claude Code, Cursor).
Compaction happens at the step level, not per tool result.
Returns a result dict with keys: content, artifacts, tool_executions.
"""
artifacts: list[ArtifactSummary] = await self.memory.list_artifacts(
@@ -54,15 +59,18 @@ class BaseAgent:
)
tool_executions: list[ToolExecution] = []
accumulated_content = ""
working_items: list[dict[str, Any]] = []
# Real conversation history: assistant messages + tool results
conversation: list[dict[str, Any]] = []
tool_fingerprints: dict[str, ToolExecution] = {}
all_duplicates_streak = 0 # consecutive steps where ALL calls are duplicates
for step in range(max_steps):
# Build context — NEVER includes raw tool output
# Build context with real conversation
ctx = await self.context.build_context(
session=session,
agent=self.profile,
artifacts=artifacts,
working_items=working_items,
conversation=conversation,
)
# Prepare tool definitions
@@ -77,7 +85,7 @@ class BaseAgent:
full_text = ""
tool_calls: list[dict[str, Any]] = []
current_tool: dict[str, Any] | None = None
active_tools: dict[str, dict[str, Any]] = {}
async for chunk in self.model.stream(
messages=ctx.to_messages(),
@@ -96,8 +104,9 @@ class BaseAgent:
session_id=session.session_id,
)
if chunk.tool_name and (current_tool is None or not current_tool.get("name")):
current_tool = {
if chunk.tool_name and chunk.tool_call_id:
if chunk.tool_call_id not in active_tools:
active_tools[chunk.tool_call_id] = {
"id": chunk.tool_call_id,
"name": chunk.tool_name,
"arguments": "",
@@ -108,23 +117,27 @@ class BaseAgent:
session_id=session.session_id,
)
if chunk.tool_arguments and current_tool is not None and not chunk.finish_reason:
# Accumulate partial argument chunks (NOT the final one)
current_tool["arguments"] += chunk.tool_arguments
if chunk.tool_arguments and chunk.tool_call_id and not chunk.finish_reason:
tool = active_tools.get(chunk.tool_call_id)
if tool:
tool["arguments"] += chunk.tool_arguments
if chunk.finish_reason == "tool_use" and current_tool is not None and current_tool.get("name"):
# Final chunk carries complete arguments — use those if
# partial accumulation is empty, otherwise use accumulated
final_args = current_tool["arguments"] or chunk.tool_arguments or ""
if chunk.finish_reason == "tool_use" and chunk.tool_call_id:
tool = active_tools.pop(chunk.tool_call_id, None)
if not tool:
tool = {
"id": chunk.tool_call_id,
"name": chunk.tool_name or "",
"arguments": "",
}
final_args = tool["arguments"] or chunk.tool_arguments or ""
try:
args = json.loads(final_args) if final_args else {}
except json.JSONDecodeError:
logger.warning("Failed to parse tool args: %s", final_args[:200])
args = {}
current_tool["parsed_arguments"] = args
logger.debug("Tool call finalized: %s args=%s", current_tool["name"], json.dumps(args)[:200])
tool_calls.append(current_tool)
current_tool = None
tool["parsed_arguments"] = args
tool_calls.append(tool)
if chunk.finish_reason == "end_turn":
break
@@ -133,24 +146,90 @@ class BaseAgent:
# If no tool calls, we're done
if not tool_calls:
# Add final assistant message to conversation
if full_text:
conversation.append({"role": "assistant", "content": full_text})
break
# Execute tool calls
# Add assistant message with tool calls to conversation
# (OpenAI format: assistant message carries tool_calls)
assistant_msg: dict[str, Any] = {"role": "assistant"}
if full_text:
assistant_msg["content"] = full_text
assistant_msg["tool_calls"] = [
{
"id": tc["id"],
"type": "function",
"function": {
"name": tc["name"],
"arguments": json.dumps(tc.get("parsed_arguments", {})),
},
}
for tc in tool_calls
]
conversation.append(assistant_msg)
# Execute tool calls and add COMPLETE results to conversation
duplicates_this_step = 0
for tc in tool_calls:
fp_raw = f"{tc['name']}:{json.dumps(tc.get('parsed_arguments', {}), sort_keys=True)}"
fp = hashlib.md5(fp_raw.encode()).hexdigest()
if fp in tool_fingerprints:
prev_exec = tool_fingerprints[fp]
tool_executions.append(prev_exec)
duplicates_this_step += 1
# Return cached result as tool message
conversation.append({
"role": "tool",
"tool_call_id": tc["id"],
"content": f"[DUPLICADO] Ya ejecutada con mismos argumentos. Resultado: {prev_exec.raw_output[:2000]}",
})
logger.warning("Duplicate tool call skipped: %s (fingerprint: %s)", tc["name"], fp[:8])
continue
tool_exec = await self._execute_tool(
session=session,
tool_name=tc["name"],
arguments=tc.get("parsed_arguments", {}),
artifacts=artifacts,
)
tool_fingerprints[fp] = tool_exec
tool_executions.append(tool_exec)
# Add summarised result to working context (NEVER raw)
working_items.append({
"role": "tool_result",
"content": f"[{tc['name']}] {tool_exec.result_summary}",
# COMPLETE result in conversation (truncated to safe limit)
conversation.append({
"role": "tool",
"tool_call_id": tc["id"],
"content": tool_exec.raw_output[:8000] if tool_exec.raw_output else tool_exec.result_summary,
})
# Loop detection: if ALL tool calls in this step were duplicates
if duplicates_this_step == len(tool_calls):
all_duplicates_streak += 1
if all_duplicates_streak >= 2:
logger.warning("Loop detected: %d consecutive steps with all duplicate calls. Breaking.", all_duplicates_streak)
conversation.append({
"role": "user",
"content": "[SISTEMA] Se detectaron llamadas repetidas. Ya tienes toda la información necesaria. Genera tu respuesta final ahora.",
})
# One more chance to generate a final response
ctx = await self.context.build_context(
session=session, agent=self.profile,
artifacts=artifacts, conversation=conversation,
)
async for chunk in self.model.stream(
messages=ctx.to_messages(),
config=config,
):
if chunk.delta:
accumulated_content += chunk.delta
if chunk.finish_reason:
break
break
else:
all_duplicates_streak = 0
return {
"content": accumulated_content,
"artifacts": artifacts,
@@ -200,6 +279,7 @@ class BaseAgent:
tool_exec.status = ToolExecutionStatus.COMPLETED
tool_exec.result_summary = artifact.summary
tool_exec.raw_output = raw_output[:8000]
tool_exec.duration_ms = duration
await self.sse.emit(

View File

@@ -16,7 +16,7 @@ from ..context.engine import ContextEngine
from ..mcp.manager import MCPManager
from ..memory.store import MemoryStore
from ..models.agent import AgentRole
from ..models.session import SessionState, SessionStatus, TaskStatus
from ..models.session import SessionState, SessionStatus, TaskState, TaskStatus
from ..streaming.sse import SSEEmitter, EventType
from .agents.coder import CoderAgent, create_coder_profile
from .agents.collector import CollectorAgent, create_collector_profile
@@ -181,6 +181,10 @@ class OrchestratorEngine:
for artifact in step_result.get("artifacts", []):
task.facts_extracted.extend(artifact.facts[:5])
# Decide if previous steps should be compacted
if i > 0:
self._maybe_compact_previous_steps(task, current_index=i)
except Exception as e:
logger.error("Step %d failed: %s", i + 1, e)
step.status = TaskStatus.FAILED
@@ -323,6 +327,35 @@ class OrchestratorEngine:
task.task_id, len(task.facts_extracted), len(tools_used), len(task_artifacts),
)
def _maybe_compact_previous_steps(
self, task: TaskState, current_index: int
) -> None:
"""Decide if previous steps should be compacted. Deterministic rules."""
current_step = task.plan[current_index]
for i in range(current_index):
prev = task.plan[i]
if prev.compacted or prev.status != TaskStatus.COMPLETED:
continue
# Rule 1: Change of agent role → previous steps are a different focus
if prev.agent_role != current_step.agent_role:
prev.compacted = True
logger.info(
"Compacted step %d (%s) — agent changed to %s",
i + 1, prev.agent_role, current_step.agent_role,
)
continue
# Rule 2: More than 3 completed non-compacted steps → compact oldest
non_compacted = [
s for s in task.plan[:current_index]
if s.status == TaskStatus.COMPLETED and not s.compacted
]
if len(non_compacted) > 3:
non_compacted[0].compacted = True
logger.info("Compacted oldest step to stay within budget")
def _create_agent(self, role: AgentRole) -> PlannerAgent | CoderAgent | CollectorAgent | ReviewerAgent:
"""Instantiate a subagent for the given role."""
profile = self._profiles[role]