Compare commits

...

7 Commits

Author SHA1 Message Date
Jordan Diaz
6978764540 Tests unitarios: 51 tests para compactor, key_data, fingerprint y costes
- tests/test_compactor.py: 24 tests (estimate_tokens, extract_facts,
  build_summary, summarize_tool_output, compact_sections)
- tests/test_key_data_extraction.py: 11 tests (extracción de tables,
  records, sections, modules, pages desde tool executions)
- tests/test_fingerprint.py: 8 tests (deduplicación MD5, sort_keys,
  nested args)
- tests/test_cost_calculation.py: 8 tests (pricing formula, custom
  pricing, rounding)
- README.md: sección Tests con instrucciones de ejecución

Todos offline, sin Docker/Redis/LLM. Ejecutar: python3 -m pytest tests/ -v

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 14:28:32 +00:00
Jordan Diaz
7c891cf023 Token tracking y cálculo de costes por mensaje
- Config: COST_PER_1M_INPUT y COST_PER_1M_OUTPUT configurables via .env
- OpenAI adapter: stream_options include_usage para capturar tokens reales
- base.py: acumula input/output tokens de cada iteración del agente
- planner.py: devuelve usage junto con el plan
- engine.py: suma tokens de planner + steps + review, calcula coste USD
- Response incluye usage{input_tokens, output_tokens} y total_cost_usd

Formato compatible con el bridge de Claude Code CLI para integración
con el frontend y reporting a Acai webservice.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 14:18:23 +00:00
Jordan Diaz
2712c2fd49 Docs: create_module es legacy, acai_write es el flujo estándar
El server compila automáticamente al guardar index-base.tpl via
acai_write — no necesita create_module ni compile_module manual.

- mcp-tools-reference.md: flujo actualizado, create_module marcado legacy
- module-creation-guide.md: paso 2 usa acai_write
- ACAI-CLAUDE.md: key workflows actualizados
- coder.py: system prompt alineado

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 13:40:47 +00:00
Jordan Diaz
7bdb943e7f Fix problemas detectados en evaluación: historial, prompting, artifacts
1. Task history preserva key_data estructurado (recordNums, sectionIds,
   moduleIds, pages) extraído de las tool executions reales — el modelo
   retiene contexto entre tasks sin re-consultar.

2. Coder system prompt mejorado: instrucciones explícitas sobre qué tool
   usar para cada operación (create_module vs create_or_update_record),
   consultar knowledge base antes de actuar, y reutilizar key_data del
   historial.

3. Eliminado artifact_memory y working_context del coder context_sections
   — ya no son necesarios con conversación real. Reduce acumulación de
   artifacts en el context.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 13:29:09 +00:00
Jordan Diaz
3aa7a463d0 Fix LOCAL_SERVER_URL para escritura de archivos en Docker
El MCP acai-code usa HTTP al server Python para operaciones de
ficheros (write, view, delete). En Docker, el server Python está
en el container app:9091, no en localhost:29871 (legacy local).

- mcp.json: env LOCAL_SERVER_URL=http://app:9091 para acai-code

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 13:19:20 +00:00
Jordan Diaz
414210bceb Playwright con Chromium + fetch con uvx + executable-path fix
- Dockerfile: instalar deps sistema Playwright (root) + Chromium (appuser)
- Dockerfile: instalar uv/uvx para mcp-server-fetch
- mcp.json: --executable-path apunta al Chromium instalado por appuser
- Eliminar entrypoint (dnsmasq resuelve DNS dinámicamente)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 12:39:16 +00:00
Jordan Diaz
b88917c18d Rediseño tool results + compactación por step + integración Docker
- Tool results completos en conversación (como Claude Code/Cursor)
  en vez de resúmenes en system prompt
- Parser multi-tool: trackea tool calls por tool_call_id para
  OpenAI streaming interleaved
- Deduplicación por fingerprint + detección de loop cuando todos
  los calls de un step son duplicados
- Compactación inteligente por step: el orquestador decide cuándo
  comprimir steps anteriores (cambio de agente o >3 steps)
- stdio.js lee URLs del .acai como fallback (local_web_url, local_forge_host)
- Buffer MCP aumentado a 1MB para respuestas grandes
- Dockerfile adaptado para build context desde raíz del proyecto

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 12:09:08 +00:00
23 changed files with 1057 additions and 119 deletions

View File

@@ -26,6 +26,12 @@ RUN pip install --no-cache-dir -r requirements.txt
COPY agenticSystem/mcp-server/package.json agenticSystem/mcp-server/package-lock.json* ./mcp-server/
RUN cd mcp-server && npm install --production
# Instalar dependencias de sistema de Playwright (como root)
RUN cd mcp-server && npx playwright install-deps chromium
# Instalar uv (incluye uvx) para mcp-server-fetch
RUN pip install --no-cache-dir uv
# Copiar codigo fuente del MCP server
COPY agenticSystem/mcp-server/ ./mcp-server/
@@ -45,6 +51,9 @@ RUN useradd -m appuser \
&& chown -R appuser:appuser /opt/acai/webs
USER appuser
# Descargar Chromium como appuser (queda en ~/.cache/ms-playwright/)
RUN cd mcp-server && npx playwright install chromium
EXPOSE 8000
CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000"]

View File

@@ -59,6 +59,22 @@ python3 -m uvicorn src.main:app --reload --port 8001
# 5. Dashboard en http://localhost:8001/dashboard/
```
### Tests
```bash
# Ejecutar todos los tests unitarios (no necesita Docker, Redis ni LLM)
pip install pytest
python3 -m pytest tests/ -v
# Ejecutar un archivo específico
python3 -m pytest tests/test_compactor.py -v
# Ejecutar un test específico
python3 -m pytest tests/test_cost_calculation.py::TestCostCalculation::test_1m_input_tokens -v
```
Los tests validan: compactación de contexto, extracción de key_data para historial, fingerprinting de tool calls, y cálculo de costes. Son 100% offline — no consumen tokens ni necesitan servicios externos.
### Cargar Knowledge Base
```bash

View File

@@ -105,7 +105,7 @@ Do NOT modify web-base files — they are shared across all projects.
1. **Before working with any area (hooks, modules, templates, CSS/JS, etc.), read the corresponding documentation in `docs/` first.** Do not guess or assume — always consult the docs before taking action.
2. **NEVER use `mkdir` to create directories.** Instead, use the `Write` tool to create the first file inside the directory — this creates parent directories automatically. For example, to create a new module, directly write the `index-base.tpl` file.
3. Only edit `index-base.tpl` in modules — `index.tpl`, `index-twig.tpl`, and `builder.json` are auto-generated
3. **After editing any `index-base.tpl`, ALWAYS call the `compile_module` MCP tool** to compile the module/section. This is mandatory — without compilation, changes won't take effect in the CMS.
3. **Edit `index-base.tpl` using `acai_write` or `acai_line_replace`** — the server compiles automatically when the file is saved. No need to call `compile_module` manually.
4. Use Twig **filters** (with `|`), never Twig functions
5. Table names without `cms_` prefix everywhere
6. Primary key is `num`, never `id`
@@ -122,8 +122,8 @@ This project has MCP tools for managing modules, records, media, and more. **Bef
See [docs/mcp-tools-reference.md](docs/mcp-tools-reference.md) for the complete list of available tools and step-by-step workflows.
Key workflows:
- **Create module**: Read [docs/module-creation-guide.md](docs/module-creation-guide.md) first → `create_module``add_module_to_record` (returns sectionId) → `set_module_config_vars` (returns uploadFields) → images via uploadFields
- **Edit module**: read vars → edit `index-base.tpl` `compile_module`
- **Create module**: Read [docs/module-creation-guide.md](docs/module-creation-guide.md) first → Write `index-base.tpl` via `acai_write``add_module_to_record` (returns sectionId) → `set_module_config_vars` (returns uploadFields) → images via uploadFields
- **Edit module**: read vars → edit `index-base.tpl` with `acai_write` or `acai_line_replace` (server compiles automatically)
- **Add images**: use `uploadFields` from `set_module_config_vars` response → `upload_record_image`
- **Generate images**: `generate_image``upload_record_image` with returned URL

View File

@@ -4,7 +4,7 @@
| Tool | Categoría | Acción |
|------|-----------|--------|
| `create_module` | Módulos | Crea módulo nuevo (directorio + archivos + compila) |
| `create_module` | Módulos | (Legacy) Alternativa para crear módulo — preferir acai_write |
| `compile_module` | Módulos | Compila módulo tras editar index-base.tpl |
| `check_module` | Módulos | Preview de cómo renderiza un módulo |
| `check_module_usage` | Módulos | Qué páginas usan un módulo |
@@ -37,7 +37,7 @@
### Crear un módulo nuevo desde cero
1. `create_module` — Crea el directorio con index-base.tpl, style.css, script.js y compila
1. `acai_write` — Escribe `index-base.tpl` en `template/estandar/modulos/NOMBRE/`. El server crea la carpeta si no existe, compila y genera todos los archivos derivados (index-twig.tpl, index.tpl, builder.json, screenshots)
2. `add_module_to_record` — Añade el módulo a una página (tabla padre, ej: `apartados`)
3. `set_module_config_vars` — Rellena las variables con contenido (textos, colores, opciones). **OBLIGATORIO** — sin esto el módulo no muestra nada. Devuelve:
- `configVars`: mapa de variables → recordNums
@@ -46,12 +46,13 @@
4. Para imágenes: `generate_image` o `upload_record_image` usando el `recordNum` y `fieldName` del `uploadFields` devuelto en el paso 3
5. Verificar con `check_module` o recargando la página
> **Nota:** `create_module` es una alternativa legacy que hace lo mismo pero con menos control sobre el contenido del template.
### Editar un módulo existente
1. `get_module_config_vars` — Leer el estado actual del módulo (variables, recordNums)
2. Editar `index-base.tpl` con la tool `Write` o `Edit`
3. `compile_module`**OBLIGATORIO** tras cada edición de index-base.tpl
4. Si cambias variables: `set_module_config_vars` para actualizar valores
2. Editar `index-base.tpl` con `acai_write` o `acai_line_replace` — el server compila automáticamente al guardar
3. Si cambias variables: `set_module_config_vars` para actualizar valores
### Añadir/modificar imágenes de un módulo

View File

@@ -38,7 +38,7 @@ Each module lives in `template/estandar/modulos/<moduleId>/` with:
## Creating a Module — Full Workflow
1. **Read style reference** (steps above)
2. **`create_module`** — Creates the directory with index-base.tpl, style.css, script.js and compiles. Use descriptive `moduleId` and clear `label`.
2. **`acai_write`** — Write `index-base.tpl` to `template/estandar/modulos/MODULE_ID/index-base.tpl`. The server automatically creates the directory, compiles and generates all derived files. `create_module` is a legacy alternative.
3. **`add_module_to_record`** — Adds the module to a page. Response includes `sectionId` — use it directly in the next step.
4. **`set_module_config_vars`** — Fill variables with content. Response includes `uploadFields` with `{ fieldName, recordNum }` for each upload variable.
5. **Upload images** — Use `generate_image` then `upload_record_image` with the `recordNum` and `fieldName` from step 4's `uploadFields`. No need to read builder.json or call get_module_config_vars.
@@ -72,6 +72,6 @@ Modules with `MJMLModule: true` in their schema are email modules:
- Use `section_id` variable for unique anchors/scoping
- Use `interno` variable to detect CMS editor vs public view
- Include other modules with: `<module_id :param1="value1"></module_id>`
- After editing `index-base.tpl`, ALWAYS call `compile_module`
- After editing `index-base.tpl` with `acai_write` or `acai_line_replace`, the server compiles automatically — no need to call `compile_module`
- Twig uses filters (with `|`), never functions
- Twig concatenation uses `~`: `'value=' ~ variable`

View File

@@ -22,9 +22,21 @@ registerResources(server);
// Static env vars (web_url and website don't change, token does)
const projectDir = process.env.ACAI_PROJECT_DIR || "";
const website = process.env.ACAI_WEBSITE || "";
const webUrl = process.env.ACAI_WEB_URL || "";
const acaiFilePath = projectDir ? path.join(projectDir, ".acai") : "";
// Read .acai once at startup for URL fallbacks
let acaiFileData = {};
if (acaiFilePath) {
try {
acaiFileData = JSON.parse(fs.readFileSync(acaiFilePath, "utf-8"));
} catch { /* ignore - fall back to env vars */ }
}
const website = process.env.ACAI_WEBSITE || acaiFileData.domain || "";
const webUrl = process.env.ACAI_WEB_URL || acaiFileData.local_web_url || "";
const derivedForgeHost = (() => {
// First check .acai for explicit forge host
if (acaiFileData.local_forge_host) return acaiFileData.local_forge_host;
if (!webUrl) return "";
try {
const parsed = new URL(webUrl);
@@ -35,7 +47,6 @@ const derivedForgeHost = (() => {
})();
const apiWebUrl = process.env.ACAI_API_WEB_URL || (derivedForgeHost ? "http://web:80/" : webUrl);
const forgeHost = process.env.ACAI_FORGE_HOST || derivedForgeHost;
const acaiFilePath = projectDir ? path.join(projectDir, ".acai") : "";
// Read fresh credentials from .acai file
function readFreshCredentials() {

View File

@@ -3,13 +3,15 @@
"acai-code": {
"command": "node",
"args": ["mcp-server/stdio.js"],
"env": {},
"env": {
"LOCAL_SERVER_URL": "http://app:9091"
},
"timeout": 30,
"startup_timeout": 10
},
"playwright": {
"command": "npx",
"args": ["@playwright/mcp", "--headless"],
"args": ["@playwright/mcp", "--headless", "--executable-path", "/home/appuser/.cache/ms-playwright/chromium-1212/chrome-linux64/chrome"],
"timeout": 30,
"startup_timeout": 15
},

View File

@@ -44,6 +44,7 @@ class OpenAIAdapter(ModelAdapter):
"temperature": config.temperature,
"messages": messages,
"stream": True,
"stream_options": {"include_usage": True},
}
if tools:
kwargs["tools"] = self._format_tools(tools)
@@ -52,9 +53,22 @@ class OpenAIAdapter(ModelAdapter):
tool_calls_acc: dict[int, dict[str, str]] = {}
final_usage: dict[str, int] = {}
async for chunk in stream:
# With include_usage, the last chunk has usage but no choices
if chunk.usage:
final_usage = {
"input_tokens": chunk.usage.prompt_tokens or 0,
"output_tokens": chunk.usage.completion_tokens or 0,
}
choice = chunk.choices[0] if chunk.choices else None
if not choice:
# Usage-only chunk (last one with include_usage) — emit it
if final_usage:
yield StreamChunk(usage=final_usage)
final_usage = {} # Only emit once
continue
delta = choice.delta
@@ -99,16 +113,15 @@ class OpenAIAdapter(ModelAdapter):
tool_arguments=acc["arguments"],
finish_reason="tool_use",
)
# Emit usage after tool_use chunks
if final_usage:
yield StreamChunk(usage=final_usage)
else:
yield StreamChunk(
finish_reason="end_turn"
if choice.finish_reason == "stop"
else choice.finish_reason,
usage={
"output_tokens": chunk.usage.completion_tokens
if chunk.usage
else 0
},
usage=final_usage,
)
# ------------------------------------------------------------------

View File

@@ -48,6 +48,10 @@ class Settings(BaseSettings):
mcp_timeout_seconds: float = 30.0
mcp_startup_timeout_seconds: float = 10.0
# --- Pricing (per 1M tokens) ---
cost_per_1m_input: float = 2.50
cost_per_1m_output: float = 15.00
# --- Orchestrator ---
max_execution_steps: int = 25
subagent_max_steps: int = 10

View File

@@ -62,10 +62,15 @@ class ContextEngine:
session: SessionState,
agent: AgentProfile,
artifacts: list[ArtifactSummary] | None = None,
working_items: list[dict[str, Any]] | None = None,
conversation: list[dict[str, Any]] | None = None,
extra_instructions: str = "",
) -> ContextPackage:
"""Build a full ContextPackage for the given agent and session."""
"""Build a full ContextPackage for the given agent and session.
The conversation parameter contains real assistant/tool messages
with complete tool results. These go into the messages array,
not the system prompt — like professional agentic tools.
"""
sections: list[ContextSection] = []
allowed = set(agent.context_sections)
@@ -88,28 +93,18 @@ class ContextEngine:
if "task_state" in allowed and session.task_history:
sections.append(self._build_task_history(session))
# 5. Task state — current task
# 5. Task state — current task (includes compacted previous steps)
if "task_state" in allowed and session.current_task:
sections.append(self._build_task_state(session.current_task))
# 6. Artifact memory — summarised, never raw (only current task's)
if "artifact_memory" in allowed and artifacts:
sections.append(self._build_artifact_memory(artifacts))
# 6. Working context — recent relevant items
if "working_context" in allowed:
sections.append(
self._build_working_context(working_items or [], extra_instructions)
)
# Compact to fit budget
sections = self.compactor.compact_sections(sections)
# Assemble system prompt from sections
system_prompt = self._assemble_system_prompt(sections)
# Build messages (just user message — no chat history)
messages = self._build_messages(session)
# Build messages with real conversation history
messages = self._build_messages(session, conversation)
total_tokens = estimate_tokens(system_prompt) + sum(
estimate_tokens(m.get("content", "")) for m in messages
@@ -133,6 +128,7 @@ class ContextEngine:
"preview": s.content[:150].replace("\n", " "),
})
conv_len = len(conversation) if conversation else 0
debug_entry = {
"timestamp": time.time(),
"agent": agent.role.value,
@@ -144,7 +140,7 @@ class ContextEngine:
"system_prompt_tokens": estimate_tokens(system_prompt),
"user_message_preview": messages[0]["content"][:200] if messages else "",
"artifacts_count": len(artifacts) if artifacts else 0,
"working_items_count": len(working_items) if working_items else 0,
"conversation_messages": conv_len,
}
history = self._history[session.session_id]
@@ -153,19 +149,14 @@ class ContextEngine:
self._history[session.session_id] = history[-self._max_history:]
logger.info(
"Context built for [%s/%s] — %d sections, ~%d tokens, artifacts=%d, working_items=%d",
"Context built for [%s/%s] — %d sections, ~%d tokens, artifacts=%d, conversation=%d msgs",
session.session_id[:8],
agent.role.value,
len(sections),
total_tokens,
len(artifacts) if artifacts else 0,
len(working_items) if working_items else 0,
conv_len,
)
for s in section_summary:
logger.debug(
" Section [%s] prio=%d tokens=%d chars=%d",
s["type"], s["priority"], s["tokens"], s["chars"],
)
return package
@@ -236,10 +227,11 @@ class ContextEngine:
[
"",
"## Contrato de Contexto",
"- NUNCA recibirás salidas crudas de herramientas en tu contexto.",
"- Los resultados de herramientas se resumen como artefactos.",
"- Solicita rehidratación si necesitas el contenido completo.",
"- Los resultados de herramientas se incluyen completos en la conversación.",
"- Los steps anteriores pueden estar compactados como resúmenes.",
"- Mantén las respuestas enfocadas en el paso actual.",
"- Si ya tienes la información necesaria, genera tu respuesta final.",
"- NO repitas llamadas a herramientas con los mismos argumentos.",
"- Responde SIEMPRE en español.",
]
)
@@ -406,6 +398,20 @@ class ContextEngine:
lines.append(f" Result: {summary}")
if facts:
lines.append(f" Facts: {'; '.join(facts[:5])}")
# Key structured data (recordNums, sectionIds, etc.)
key_data = entry.get("key_data", {})
if key_data:
kd_parts = []
for table, nums in key_data.get("tables", {}).items():
kd_parts.append(f"{table}: records {nums}")
for page, num in key_data.get("pages", {}).items():
kd_parts.append(f"page '{page}' = record {num}")
if key_data.get("sections"):
kd_parts.append(f"sections: {key_data['sections']}")
if key_data.get("modules"):
kd_parts.append(f"modules: {key_data['modules']}")
if kd_parts:
lines.append(f" Key data: {'; '.join(kd_parts)}")
review = entry.get("review", "")
if review:
lines.append(f" Review: {review[:100]}")
@@ -451,6 +457,14 @@ class ContextEngine:
for c in task.constraints:
lines.append(f"- {c}")
# Show compacted previous steps results
compacted_steps = [s for s in task.plan if s.compacted and s.result_summary]
if compacted_steps:
lines.append("")
lines.append("## Previous Steps (compacted)")
for step in compacted_steps:
lines.append(f"- [{step.agent_role}] {step.description}: {step.result_summary[:300]}")
# Show plan overview (compact)
if task.plan:
lines.append("")
@@ -458,8 +472,9 @@ class ContextEngine:
for i, step in enumerate(task.plan):
marker = "" if i == task.current_step_index else "·"
status_label = step.status.value
compacted_label = " (compacted)" if step.compacted else ""
lines.append(
f" {marker} Step {i + 1} [{status_label}]: {step.description}"
f" {marker} Step {i + 1} [{status_label}{compacted_label}]: {step.description}"
)
content = "\n".join(lines)
@@ -483,26 +498,6 @@ class ContextEngine:
token_estimate=estimate_tokens(content),
)
def _build_working_context(
self,
items: list[dict[str, Any]],
extra_instructions: str,
) -> ContextSection:
lines = ["# Working Context"]
if extra_instructions:
lines.append(f"\n{extra_instructions}")
for item in items[: settings.working_context_max_items]:
role = item.get("role", "info")
content_val = item.get("content", "")
lines.append(f"[{role}] {content_val}")
content = "\n".join(lines)
return ContextSection(
section_type=ContextSectionType.WORKING_CONTEXT,
content=content,
priority=30,
token_estimate=estimate_tokens(content),
)
# ------------------------------------------------------------------
# Assembly
# ------------------------------------------------------------------
@@ -510,14 +505,11 @@ class ContextEngine:
def _assemble_system_prompt(self, sections: list[ContextSection]) -> str:
"""Combine sections into a single system prompt string."""
parts: list[str] = []
# Order: rules → profile → task → artifacts → working
order = [
ContextSectionType.IMMUTABLE_RULES,
ContextSectionType.PROJECT_PROFILE,
ContextSectionType.KNOWLEDGE_BASE,
ContextSectionType.TASK_STATE,
ContextSectionType.ARTIFACT_MEMORY,
ContextSectionType.WORKING_CONTEXT,
]
section_map: dict[ContextSectionType, ContextSection] = {
s.section_type: s for s in sections
@@ -527,11 +519,15 @@ class ContextEngine:
parts.append(section_map[st].content)
return "\n\n---\n\n".join(parts)
def _build_messages(self, session: SessionState) -> list[dict[str, Any]]:
"""Build the messages array. We do NOT include chat history.
def _build_messages(
self,
session: SessionState,
conversation: list[dict[str, Any]] | None = None,
) -> list[dict[str, Any]]:
"""Build the messages array with real conversation history.
The user message is the current task objective (or a sentinel
if no task is active).
Includes the user objective message followed by the full
assistant/tool conversation — like professional agentic tools.
"""
if session.current_task:
step = session.current_task.current_step()
@@ -545,4 +541,10 @@ class ContextEngine:
else:
user_content = "Awaiting task assignment."
return [{"role": "user", "content": user_content}]
messages: list[dict[str, Any]] = [{"role": "user", "content": user_content}]
# Append real conversation (assistant messages + tool results)
if conversation:
messages.extend(conversation)
return messages

View File

@@ -74,6 +74,7 @@ class MCPClient:
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=self._env,
limit=1024 * 1024, # 1MB buffer for large MCP responses
)
self._running = True
self._reader_task = asyncio.create_task(self._read_loop())

View File

@@ -36,6 +36,7 @@ class TaskStep(BaseModel):
status: TaskStatus = TaskStatus.PENDING
result_summary: str = ""
tools_used: list[str] = Field(default_factory=list)
compacted: bool = False # True when step results have been compacted
started_at: datetime | None = None
completed_at: datetime | None = None

View File

@@ -33,7 +33,8 @@ class ToolExecution(BaseModel):
tool_name: str
arguments: dict[str, Any] = Field(default_factory=dict)
status: ToolExecutionStatus = ToolExecutionStatus.PENDING
result_summary: str = "" # Summarised result — raw output is NEVER stored here
result_summary: str = "" # Summarised result for artifacts and compacted history
raw_output: str = "" # Truncated raw output for conversation messages
error: str = ""
duration_ms: float = 0.0
started_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
import hashlib
import json
import logging
import time
@@ -47,6 +48,10 @@ class BaseAgent:
) -> dict[str, Any]:
"""Run the agent's execution loop.
Uses real conversation messages with complete tool results,
like professional agentic tools (Claude Code, Cursor).
Compaction happens at the step level, not per tool result.
Returns a result dict with keys: content, artifacts, tool_executions.
"""
artifacts: list[ArtifactSummary] = await self.memory.list_artifacts(
@@ -54,15 +59,20 @@ class BaseAgent:
)
tool_executions: list[ToolExecution] = []
accumulated_content = ""
working_items: list[dict[str, Any]] = []
total_input_tokens = 0
total_output_tokens = 0
# Real conversation history: assistant messages + tool results
conversation: list[dict[str, Any]] = []
tool_fingerprints: dict[str, ToolExecution] = {}
all_duplicates_streak = 0 # consecutive steps where ALL calls are duplicates
for step in range(max_steps):
# Build context — NEVER includes raw tool output
# Build context with real conversation
ctx = await self.context.build_context(
session=session,
agent=self.profile,
artifacts=artifacts,
working_items=working_items,
conversation=conversation,
)
# Prepare tool definitions
@@ -77,7 +87,7 @@ class BaseAgent:
full_text = ""
tool_calls: list[dict[str, Any]] = []
current_tool: dict[str, Any] | None = None
active_tools: dict[str, dict[str, Any]] = {}
async for chunk in self.model.stream(
messages=ctx.to_messages(),
@@ -96,35 +106,45 @@ class BaseAgent:
session_id=session.session_id,
)
if chunk.tool_name and (current_tool is None or not current_tool.get("name")):
current_tool = {
"id": chunk.tool_call_id,
"name": chunk.tool_name,
"arguments": "",
}
await self.sse.emit(
EventType.TOOL_STARTED,
{"tool": chunk.tool_name, "step": step},
session_id=session.session_id,
)
if chunk.tool_name and chunk.tool_call_id:
if chunk.tool_call_id not in active_tools:
active_tools[chunk.tool_call_id] = {
"id": chunk.tool_call_id,
"name": chunk.tool_name,
"arguments": "",
}
await self.sse.emit(
EventType.TOOL_STARTED,
{"tool": chunk.tool_name, "step": step},
session_id=session.session_id,
)
if chunk.tool_arguments and current_tool is not None and not chunk.finish_reason:
# Accumulate partial argument chunks (NOT the final one)
current_tool["arguments"] += chunk.tool_arguments
if chunk.tool_arguments and chunk.tool_call_id and not chunk.finish_reason:
tool = active_tools.get(chunk.tool_call_id)
if tool:
tool["arguments"] += chunk.tool_arguments
if chunk.finish_reason == "tool_use" and current_tool is not None and current_tool.get("name"):
# Final chunk carries complete arguments — use those if
# partial accumulation is empty, otherwise use accumulated
final_args = current_tool["arguments"] or chunk.tool_arguments or ""
if chunk.finish_reason == "tool_use" and chunk.tool_call_id:
tool = active_tools.pop(chunk.tool_call_id, None)
if not tool:
tool = {
"id": chunk.tool_call_id,
"name": chunk.tool_name or "",
"arguments": "",
}
final_args = tool["arguments"] or chunk.tool_arguments or ""
try:
args = json.loads(final_args) if final_args else {}
except json.JSONDecodeError:
logger.warning("Failed to parse tool args: %s", final_args[:200])
args = {}
current_tool["parsed_arguments"] = args
logger.debug("Tool call finalized: %s args=%s", current_tool["name"], json.dumps(args)[:200])
tool_calls.append(current_tool)
current_tool = None
tool["parsed_arguments"] = args
tool_calls.append(tool)
# Accumulate token usage from any chunk that has it
if chunk.usage:
total_input_tokens += chunk.usage.get("input_tokens", 0)
total_output_tokens += chunk.usage.get("output_tokens", 0)
if chunk.finish_reason == "end_turn":
break
@@ -133,28 +153,98 @@ class BaseAgent:
# If no tool calls, we're done
if not tool_calls:
# Add final assistant message to conversation
if full_text:
conversation.append({"role": "assistant", "content": full_text})
break
# Execute tool calls
# Add assistant message with tool calls to conversation
# (OpenAI format: assistant message carries tool_calls)
assistant_msg: dict[str, Any] = {"role": "assistant"}
if full_text:
assistant_msg["content"] = full_text
assistant_msg["tool_calls"] = [
{
"id": tc["id"],
"type": "function",
"function": {
"name": tc["name"],
"arguments": json.dumps(tc.get("parsed_arguments", {})),
},
}
for tc in tool_calls
]
conversation.append(assistant_msg)
# Execute tool calls and add COMPLETE results to conversation
duplicates_this_step = 0
for tc in tool_calls:
fp_raw = f"{tc['name']}:{json.dumps(tc.get('parsed_arguments', {}), sort_keys=True)}"
fp = hashlib.md5(fp_raw.encode()).hexdigest()
if fp in tool_fingerprints:
prev_exec = tool_fingerprints[fp]
tool_executions.append(prev_exec)
duplicates_this_step += 1
# Return cached result as tool message
conversation.append({
"role": "tool",
"tool_call_id": tc["id"],
"content": f"[DUPLICADO] Ya ejecutada con mismos argumentos. Resultado: {prev_exec.raw_output[:2000]}",
})
logger.warning("Duplicate tool call skipped: %s (fingerprint: %s)", tc["name"], fp[:8])
continue
tool_exec = await self._execute_tool(
session=session,
tool_name=tc["name"],
arguments=tc.get("parsed_arguments", {}),
artifacts=artifacts,
)
tool_fingerprints[fp] = tool_exec
tool_executions.append(tool_exec)
# Add summarised result to working context (NEVER raw)
working_items.append({
"role": "tool_result",
"content": f"[{tc['name']}] {tool_exec.result_summary}",
# COMPLETE result in conversation (truncated to safe limit)
conversation.append({
"role": "tool",
"tool_call_id": tc["id"],
"content": tool_exec.raw_output[:8000] if tool_exec.raw_output else tool_exec.result_summary,
})
# Loop detection: if ALL tool calls in this step were duplicates
if duplicates_this_step == len(tool_calls):
all_duplicates_streak += 1
if all_duplicates_streak >= 2:
logger.warning("Loop detected: %d consecutive steps with all duplicate calls. Breaking.", all_duplicates_streak)
conversation.append({
"role": "user",
"content": "[SISTEMA] Se detectaron llamadas repetidas. Ya tienes toda la información necesaria. Genera tu respuesta final ahora.",
})
# One more chance to generate a final response
ctx = await self.context.build_context(
session=session, agent=self.profile,
artifacts=artifacts, conversation=conversation,
)
async for chunk in self.model.stream(
messages=ctx.to_messages(),
config=config,
):
if chunk.delta:
accumulated_content += chunk.delta
if chunk.finish_reason:
break
break
else:
all_duplicates_streak = 0
return {
"content": accumulated_content,
"artifacts": artifacts,
"tool_executions": tool_executions,
"usage": {
"input_tokens": total_input_tokens,
"output_tokens": total_output_tokens,
},
}
async def _execute_tool(
@@ -200,6 +290,7 @@ class BaseAgent:
tool_exec.status = ToolExecutionStatus.COMPLETED
tool_exec.result_summary = artifact.summary
tool_exec.raw_output = raw_output[:8000]
tool_exec.duration_ms = duration
await self.sse.emit(

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
from ...models.agent import AgentProfile, AgentRole
from .base import BaseAgent
CODER_SYSTEM_PROMPT = """Eres un Agente Programador. Tu rol es ejecutar tareas de implementación usando las herramientas disponibles.
CODER_SYSTEM_PROMPT = """Eres un Agente Programador de Acai CMS. Tu rol es ejecutar tareas de implementación usando las herramientas MCP disponibles.
## Instrucciones
- Concéntrate en la descripción del paso actual.
@@ -16,9 +16,17 @@ CODER_SYSTEM_PROMPT = """Eres un Agente Programador. Tu rol es ejecutar tareas d
- Responde SIEMPRE en español.
## Uso de herramientas
- Usa herramientas cuando necesites leer archivos, escribir código o ejecutar comandos.
- Los resultados de herramientas se te presentarán resumidos — no verás la salida cruda.
- Si necesitas más detalle de un resultado, solicita rehidratación.
- CONSULTA la Knowledge Base ANTES de actuar — tiene la referencia completa de tools y flujos de trabajo.
- Para CREAR/EDITAR MÓDULOS usa `acai_write` sobre `template/estandar/modulos/NOMBRE/index-base.tpl`. El server crea la carpeta si no existe, compila y genera todos los archivos derivados automáticamente. NO necesitas compile_module.
- `create_module` es legacy — funciona pero `acai_write` es el flujo estándar.
- Para GESTIONAR REGISTROS de tablas (apartados, travesias, etc.) usa `create_or_update_record`.
- Flujo de módulo nuevo: acai_write index-base.tpl → add_module_to_record → set_module_config_vars.
- tableName siempre SIN prefijo cms_ (ej: apartados, NO cms_apartados).
- La primary key es siempre `num`, nunca `id`.
## Datos del historial
- Si el historial de sesión incluye Key Data con recordNums o sectionIds, ÚSALOS directamente sin re-consultar.
- Ejemplo: si el historial dice "pages: Inicio = record 2", usa recordNum=2 para la portada.
"""
@@ -35,8 +43,6 @@ def create_coder_profile() -> AgentProfile:
"project_profile",
"knowledge_base",
"task_state",
"artifact_memory",
"working_context",
],
)

View File

@@ -55,9 +55,10 @@ def create_planner_profile() -> AgentProfile:
class PlannerAgent(BaseAgent):
"""Generates execution plans from objectives."""
async def plan(self, session: SessionState) -> list[TaskStep]:
"""Generate a plan and return TaskSteps."""
async def plan(self, session: SessionState) -> tuple[list[TaskStep], dict[str, int]]:
"""Generate a plan and return (TaskSteps, usage)."""
result = await self.execute(session, max_steps=1)
usage = result.get("usage", {"input_tokens": 0, "output_tokens": 0})
content = result["content"].strip()
# Parse the JSON plan from the model output
@@ -92,7 +93,7 @@ class PlannerAgent(BaseAgent):
parsed.get("facts", [])
)
return steps
return steps, usage
except (json.JSONDecodeError, KeyError) as e:
logger.warning("Failed to parse planner output: %s", e)
@@ -104,4 +105,4 @@ class PlannerAgent(BaseAgent):
else "Execute task",
agent_role="coder",
)
]
], usage

View File

@@ -16,7 +16,7 @@ from ..context.engine import ContextEngine
from ..mcp.manager import MCPManager
from ..memory.store import MemoryStore
from ..models.agent import AgentRole
from ..models.session import SessionState, SessionStatus, TaskStatus
from ..models.session import SessionState, SessionStatus, TaskState, TaskStatus
from ..streaming.sse import SSEEmitter, EventType
from .agents.coder import CoderAgent, create_coder_profile
from .agents.collector import CollectorAgent, create_collector_profile
@@ -115,9 +115,10 @@ class OrchestratorEngine:
# 2. Plan
task.status = TaskStatus.PLANNING
planner_usage: dict[str, int] = {"input_tokens": 0, "output_tokens": 0}
try:
planner = self._create_agent(AgentRole.PLANNER)
plan_steps = await planner.plan(session)
plan_steps, planner_usage = await planner.plan(session)
task.plan = plan_steps
task.status = TaskStatus.EXECUTING
except Exception as e:
@@ -181,6 +182,10 @@ class OrchestratorEngine:
for artifact in step_result.get("artifacts", []):
task.facts_extracted.extend(artifact.facts[:5])
# Decide if previous steps should be compacted
if i > 0:
self._maybe_compact_previous_steps(task, current_index=i)
except Exception as e:
logger.error("Step %d failed: %s", i + 1, e)
step.status = TaskStatus.FAILED
@@ -230,6 +235,21 @@ class OrchestratorEngine:
session_id=session.session_id,
)
# Accumulate token usage: planner + all steps + review
total_input = planner_usage.get("input_tokens", 0)
total_output = planner_usage.get("output_tokens", 0)
for r in results:
total_input += r.get("usage", {}).get("input_tokens", 0)
total_output += r.get("usage", {}).get("output_tokens", 0)
# Add review usage if any
total_input += review_result.get("usage", {}).get("input_tokens", 0)
total_output += review_result.get("usage", {}).get("output_tokens", 0)
# Calculate cost
cost_usd = (
(total_input / 1_000_000) * settings.cost_per_1m_input
+ (total_output / 1_000_000) * settings.cost_per_1m_output
)
return {
"session_id": session.session_id,
"task_id": task.task_id,
@@ -241,6 +261,11 @@ class OrchestratorEngine:
),
"review": review_result.get("content", ""),
"status": status,
"usage": {
"input_tokens": total_input,
"output_tokens": total_output,
},
"total_cost_usd": round(cost_usd, 6),
}
def _error_result(self, session: SessionState, error: str) -> dict[str, Any]:
@@ -292,12 +317,16 @@ class OrchestratorEngine:
for step in task.plan:
tools_used.update(step.tools_used)
# Extract key structured data from tool executions
key_data = self._extract_key_data_from_results(results)
history_entry = {
"task_id": task.task_id,
"objective": task.objective,
"status": task.status.value,
"steps": len(task.plan),
"facts": task.facts_extracted[-10:],
"key_data": key_data,
"tools_used": list(tools_used)[:10],
"artifacts_count": len(task_artifacts),
"summary": "; ".join(step_summaries)[:300],
@@ -323,6 +352,102 @@ class OrchestratorEngine:
task.task_id, len(task.facts_extracted), len(tools_used), len(task_artifacts),
)
@staticmethod
def _extract_key_data_from_results(results: list[dict[str, Any]]) -> dict[str, Any]:
"""Extract structured data from tool executions for task history.
Preserves key identifiers (recordNum, sectionId, tableName, moduleId)
so the model retains context across tasks without re-querying.
"""
key_data: dict[str, Any] = {}
seen_tables: dict[str, list[int]] = {} # tableName -> recordNums
seen_sections: list[str] = []
seen_modules: list[str] = []
seen_pages: dict[str, int] = {} # page name/url -> recordNum
for result in results:
for te in result.get("tool_executions", []):
args = te.arguments
name = te.tool_name
# Track table + record relationships
table = args.get("tableName", "")
record = args.get("recordNum")
if table and record:
record_int = int(record) if str(record).isdigit() else None
if record_int and table not in seen_tables:
seen_tables[table] = []
if record_int and record_int not in seen_tables.get(table, []):
seen_tables[table].append(record_int)
# Track section IDs
section = args.get("sectionId", "")
if section and section not in seen_sections:
seen_sections.append(section)
# Track modules
module = args.get("moduleId", "") or args.get("moduleName", "")
if module and module not in seen_modules:
seen_modules.append(module)
# Extract page info from raw output (enlace, name)
if te.raw_output and "enlace" in te.raw_output:
try:
import json as _json
# Try to parse structured data from output
for line in te.raw_output.splitlines():
line = line.strip()
if line.startswith("{"):
try:
data = _json.loads(line)
if "enlace" in data and "num" in data:
page_key = data.get("name", data["enlace"])
seen_pages[page_key] = int(data["num"])
except _json.JSONDecodeError:
pass
except Exception:
pass
if seen_tables:
key_data["tables"] = {t: nums[:10] for t, nums in seen_tables.items()}
if seen_sections:
key_data["sections"] = seen_sections[:20]
if seen_modules:
key_data["modules"] = seen_modules[:20]
if seen_pages:
key_data["pages"] = dict(list(seen_pages.items())[:20])
return key_data
def _maybe_compact_previous_steps(
self, task: TaskState, current_index: int
) -> None:
"""Decide if previous steps should be compacted. Deterministic rules."""
current_step = task.plan[current_index]
for i in range(current_index):
prev = task.plan[i]
if prev.compacted or prev.status != TaskStatus.COMPLETED:
continue
# Rule 1: Change of agent role → previous steps are a different focus
if prev.agent_role != current_step.agent_role:
prev.compacted = True
logger.info(
"Compacted step %d (%s) — agent changed to %s",
i + 1, prev.agent_role, current_step.agent_role,
)
continue
# Rule 2: More than 3 completed non-compacted steps → compact oldest
non_compacted = [
s for s in task.plan[:current_index]
if s.status == TaskStatus.COMPLETED and not s.compacted
]
if len(non_compacted) > 3:
non_compacted[0].compacted = True
logger.info("Compacted oldest step to stay within budget")
def _create_agent(self, role: AgentRole) -> PlannerAgent | CoderAgent | CollectorAgent | ReviewerAgent:
"""Instantiate a subagent for the given role."""
profile = self._profiles[role]

0
tests/__init__.py Normal file
View File

8
tests/conftest.py Normal file
View File

@@ -0,0 +1,8 @@
"""Configuracion de pytest para agenticSystem tests.
Estos tests son 100% standalone — no importan desde src/ directamente
porque el entorno de CI puede no tener las dependencias pesadas
(anthropic, tiktoken, pydantic, etc.) ni Python 3.11+.
La logica bajo test se replica o se extrae como funciones puras.
"""

362
tests/test_compactor.py Normal file
View File

@@ -0,0 +1,362 @@
"""Tests para la logica de context/compactor.py — estimacion de tokens,
extraccion de facts, construccion de summaries y compactacion de secciones.
Se replica la logica pura sin importar src/ (evita dependencias pesadas).
"""
import hashlib
import re
from dataclasses import dataclass, field
from typing import List
import pytest
# =====================================================================
# Replicas de la logica del compactor (funciones puras)
# =====================================================================
def estimate_tokens_fallback(text: str) -> int:
"""Replica del fallback de estimate_tokens (sin tiktoken)."""
if not text:
return 0
return max(1, len(text) // 4)
def extract_facts(raw_output: str) -> list:
"""Replica exacta de ContextCompactor._extract_facts."""
facts = []
lines = raw_output.strip().splitlines()
for line in lines[:100]:
line = line.strip()
if not line or len(line) < 10:
continue
if re.match(r"^[\w\s]+:\s+.+", line) and len(line) < 200:
facts.append(line)
elif re.match(r"^(✓|✗|PASS|FAIL|ERROR|OK|INFO|WARNING)", line):
facts.append(line)
elif re.match(r"^[\w/\\.]+\s*[:\-]\s*.+", line) and len(line) < 200:
facts.append(line)
seen = set()
unique = []
for f in facts:
if f not in seen:
seen.add(f)
unique.append(f)
return unique[:15]
def build_summary(tool_name: str, raw_output: str, facts: list) -> str:
"""Replica exacta de ContextCompactor._build_summary."""
lines = raw_output.strip().splitlines()
total_lines = len(lines)
char_count = len(raw_output)
parts = [f"Tool '{tool_name}' returned {total_lines} lines ({char_count} chars)."]
if facts:
parts.append(f"Key findings: {'; '.join(facts[:5])}")
meaningful = [l.strip() for l in lines if l.strip()]
if meaningful:
parts.append(f"First: {meaningful[0][:120]}")
if len(meaningful) > 1:
parts.append(f"Last: {meaningful[-1][:120]}")
return " ".join(parts)
def infer_artifact_type(tool_name: str) -> str:
"""Replica de ContextCompactor._infer_artifact_type."""
tool_lower = tool_name.lower()
if any(k in tool_lower for k in ("read", "file", "code", "write", "edit")):
return "code"
if any(k in tool_lower for k in ("test", "check", "lint", "validate")):
return "test_result"
if any(k in tool_lower for k in ("search", "find", "grep", "glob")):
return "analysis"
if any(k in tool_lower for k in ("plan", "design", "architect")):
return "plan"
return "general"
def summarize_tool_output(tool_name: str, raw_output: str, session_id: str, task_id: str) -> dict:
"""Replica simplificada de ContextCompactor.summarize_tool_output.
Devuelve un dict con los mismos campos que ArtifactSummary.
"""
facts = extract_facts(raw_output)
summary = build_summary(tool_name, raw_output, facts)
artifact_type = infer_artifact_type(tool_name)
artifact_id = hashlib.sha256(
f"{session_id}:{task_id}:{tool_name}:{raw_output[:200]}".encode()
).hexdigest()[:16]
return {
"artifact_id": artifact_id,
"session_id": session_id,
"task_id": task_id,
"artifact_type": artifact_type,
"title": f"Output of {tool_name}",
"summary": summary,
"facts": facts,
"source_tool": tool_name,
"char_count": len(raw_output),
}
# --- Modelo simplificado de ContextSection para test de compactacion ---
@dataclass
class Section:
section_type: str # "immutable_rules", "working_context", "task_state", etc.
content: str
priority: int = 0
token_estimate: int = 0
def compact_sections(sections: list, max_tokens: int) -> list:
"""Replica de ContextCompactor.compact_sections (logica pura)."""
# 1. Deduplicar
seen = set()
unique = []
for s in sections:
h = hashlib.md5(s.content.encode()).hexdigest()
if h not in seen:
seen.add(h)
unique.append(s)
sections = unique
# 2. Estimar tokens
for s in sections:
s.token_estimate = estimate_tokens_fallback(s.content)
total = sum(s.token_estimate for s in sections)
if total <= max_tokens:
return sections
# 3. Ordenar por prioridad (mayor primero)
sections.sort(key=lambda s: s.priority, reverse=True)
# 4. Trim de menor prioridad
while total > max_tokens and sections:
lowest = sections[-1]
if lowest.section_type == "immutable_rules":
break
# Compactacion simple: eliminar lineas vacias
compacted_lines = [l.rstrip() for l in lowest.content.splitlines() if l.strip()]
compacted = "\n".join(compacted_lines)
new_est = estimate_tokens_fallback(compacted)
saved = lowest.token_estimate - new_est
if saved > 0:
lowest.content = compacted
lowest.token_estimate = new_est
total -= saved
else:
total -= lowest.token_estimate
sections.pop()
return sections
# =====================================================================
# Tests: estimate_tokens
# =====================================================================
class TestEstimateTokens:
def test_positive_for_nonempty_text(self):
result = estimate_tokens_fallback("Hello world, this is a test string.")
assert isinstance(result, int)
assert result > 0
def test_zero_for_empty_string(self):
assert estimate_tokens_fallback("") == 0
def test_longer_text_more_tokens(self):
short = estimate_tokens_fallback("hi")
long = estimate_tokens_fallback("hi " * 500)
assert long > short
def test_returns_int_type(self):
assert isinstance(estimate_tokens_fallback("cualquier texto"), int)
def test_minimum_is_one_for_short_text(self):
# "ab" -> len 2 // 4 = 0, pero max(1, 0) = 1
assert estimate_tokens_fallback("ab") == 1
# =====================================================================
# Tests: _extract_facts
# =====================================================================
class TestExtractFacts:
def test_extracts_key_value_lines(self):
raw = "Status: running\nVersion: 3.2.1\nIgnored short\nName: my-module"
facts = extract_facts(raw)
assert any("Status: running" in f for f in facts)
assert any("Version: 3.2.1" in f for f in facts)
assert any("Name: my-module" in f for f in facts)
def test_extracts_status_indicators(self):
raw = "PASS test_login completed\nFAIL test_logout broken\nOK everything fine"
facts = extract_facts(raw)
assert any("PASS" in f for f in facts)
assert any("FAIL" in f for f in facts)
def test_ignores_short_lines(self):
raw = "ok\nhi\nyes\nStatus: this is long enough to be a fact"
facts = extract_facts(raw)
assert not any(f in ("ok", "hi", "yes") for f in facts)
def test_deduplicates(self):
raw = "Status: running value\nStatus: running value\nStatus: running value"
facts = extract_facts(raw)
assert facts.count("Status: running value") == 1
def test_limits_to_15(self):
lines = [f"Key{i}: value number {i} with enough length" for i in range(30)]
raw = "\n".join(lines)
facts = extract_facts(raw)
assert len(facts) <= 15
def test_empty_input(self):
facts = extract_facts("")
assert facts == []
# =====================================================================
# Tests: _build_summary
# =====================================================================
class TestBuildSummary:
def test_includes_tool_name(self):
summary = build_summary("read_file", "line1\nline2\nline3", [])
assert "read_file" in summary
def test_includes_line_count(self):
raw = "line1\nline2\nline3"
summary = build_summary("my_tool", raw, [])
assert "3 lines" in summary
def test_includes_char_count(self):
raw = "some content here"
summary = build_summary("my_tool", raw, [])
assert str(len(raw)) in summary
def test_includes_facts_when_present(self):
facts = ["Status: ok", "Count: 42"]
summary = build_summary("my_tool", "data", facts)
assert "Status: ok" in summary
def test_includes_first_line(self):
raw = "primera linea importante\nsegunda\ntercera"
summary = build_summary("tool", raw, [])
assert "primera linea importante" in summary
# =====================================================================
# Tests: summarize_tool_output
# =====================================================================
class TestSummarizeToolOutput:
def test_returns_dict_with_correct_fields(self):
result = summarize_tool_output(
tool_name="read_file",
raw_output="Status: ok\nContent: hello world here",
session_id="sess-001",
task_id="task-001",
)
assert isinstance(result, dict)
assert result["session_id"] == "sess-001"
assert result["task_id"] == "task-001"
assert result["source_tool"] == "read_file"
assert result["title"] == "Output of read_file"
assert result["artifact_id"] # no vacio
assert result["summary"] # no vacio
assert result["char_count"] > 0
def test_artifact_type_inference(self):
assert summarize_tool_output("read_file", "x", "s", "t")["artifact_type"] == "code"
assert summarize_tool_output("test_run", "x", "s", "t")["artifact_type"] == "test_result"
assert summarize_tool_output("search_records", "x", "s", "t")["artifact_type"] == "analysis"
assert summarize_tool_output("deploy_app", "x", "s", "t")["artifact_type"] == "general"
def test_artifact_id_is_deterministic(self):
r1 = summarize_tool_output("tool", "output", "s", "t")
r2 = summarize_tool_output("tool", "output", "s", "t")
assert r1["artifact_id"] == r2["artifact_id"]
def test_artifact_id_length(self):
result = summarize_tool_output("tool", "output", "s", "t")
assert len(result["artifact_id"]) == 16
# =====================================================================
# Tests: compact_sections
# =====================================================================
class TestCompactSections:
def test_never_removes_immutable_rules(self):
sections = [
Section(
section_type="immutable_rules",
content="You must always follow these rules " * 20,
priority=100,
),
Section(
section_type="working_context",
content="Some working context data " * 50,
priority=1,
),
]
result = compact_sections(sections, max_tokens=50)
types = [s.section_type for s in result]
assert "immutable_rules" in types
def test_respects_priority_order(self):
"""Secciones de mayor prioridad sobreviven a la compactacion.
Usamos un budget que cabe la seccion alta pero no ambas."""
high = Section(
section_type="task_state",
content="Important task data here", # ~6 tokens
priority=90,
)
low = Section(
section_type="working_context",
content="Low priority stuff " * 50, # ~250 tokens
priority=1,
)
# Budget suficiente para high (~6) pero no para high+low (~256)
result = compact_sections([high, low], max_tokens=20)
types = [s.section_type for s in result]
assert "task_state" in types
# La de baja prioridad deberia haberse eliminado o compactado
assert len(result) <= 2
def test_no_compaction_when_within_budget(self):
sections = [
Section(
section_type="task_state",
content="Short content",
priority=50,
),
]
result = compact_sections(sections, max_tokens=999_999)
assert len(result) == 1
assert result[0].content == "Short content"
def test_deduplicates_identical_sections(self):
sections = [
Section(section_type="working_context", content="duplicated content", priority=10),
Section(section_type="working_context", content="duplicated content", priority=10),
]
result = compact_sections(sections, max_tokens=999_999)
assert len(result) == 1

View File

@@ -0,0 +1,71 @@
"""Tests para el calculo de costes del orquestador.
Replica la formula de coste de OrchestratorEngine._run_pipeline():
cost_usd = (input_tokens / 1_000_000) * cost_per_1m_input
+ (output_tokens / 1_000_000) * cost_per_1m_output
Defaults: cost_per_1m_input=2.50, cost_per_1m_output=15.00
"""
import pytest
def calculate_cost(
input_tokens: int,
output_tokens: int,
cost_per_1m_input: float = 2.50,
cost_per_1m_output: float = 15.00,
) -> float:
"""Replica exacta de la formula de coste en engine.py."""
return (
(input_tokens / 1_000_000) * cost_per_1m_input
+ (output_tokens / 1_000_000) * cost_per_1m_output
)
class TestCostCalculation:
def test_1m_input_tokens(self):
cost = calculate_cost(1_000_000, 0)
assert cost == pytest.approx(2.50)
def test_1m_output_tokens(self):
cost = calculate_cost(0, 1_000_000)
assert cost == pytest.approx(15.00)
def test_500k_input_100k_output(self):
cost = calculate_cost(500_000, 100_000)
# (500_000 / 1_000_000) * 2.50 + (100_000 / 1_000_000) * 15.00
# = 1.25 + 1.50 = 2.75
assert cost == pytest.approx(2.75)
def test_zero_tokens(self):
cost = calculate_cost(0, 0)
assert cost == 0.0
def test_custom_pricing(self):
cost = calculate_cost(
1_000_000, 1_000_000,
cost_per_1m_input=3.00,
cost_per_1m_output=10.00,
)
assert cost == pytest.approx(13.00)
def test_small_token_count(self):
"""Pocos tokens = coste muy bajo pero no cero."""
cost = calculate_cost(100, 50)
assert cost > 0
assert cost < 0.01
def test_round_to_6_decimals(self):
"""El engine hace round(cost_usd, 6)."""
cost = calculate_cost(1, 1)
rounded = round(cost, 6)
# (1/1M)*2.50 + (1/1M)*15.00 = 1.75e-05
# round(1.75e-05, 6) = 1.7e-05 (banker's rounding: 5 rounds to even)
assert rounded == pytest.approx(0.000017, abs=1e-7)
def test_output_more_expensive_than_input(self):
"""Con defaults, output es 6x mas caro que input."""
input_cost = calculate_cost(1_000_000, 0)
output_cost = calculate_cost(0, 1_000_000)
assert output_cost == pytest.approx(input_cost * 6.0)

61
tests/test_fingerprint.py Normal file
View File

@@ -0,0 +1,61 @@
"""Tests para la logica de fingerprinting/deduplicacion de tool calls.
Replica la logica de BaseAgent.execute() (lineas con hashlib.md5) sin
necesidad de instanciar BaseAgent ni sus dependencias.
"""
import hashlib
import json
import pytest
def compute_fingerprint(tool_name: str, args: dict) -> str:
"""Replica exacta de la logica de fingerprint en BaseAgent.execute()."""
fp_raw = f"{tool_name}:{json.dumps(args, sort_keys=True)}"
return hashlib.md5(fp_raw.encode()).hexdigest()
class TestFingerprint:
def test_same_tool_same_args_same_fingerprint(self):
fp1 = compute_fingerprint("read_file", {"path": "/index.html"})
fp2 = compute_fingerprint("read_file", {"path": "/index.html"})
assert fp1 == fp2
def test_same_tool_different_args_different_fingerprint(self):
fp1 = compute_fingerprint("read_file", {"path": "/index.html"})
fp2 = compute_fingerprint("read_file", {"path": "/style.css"})
assert fp1 != fp2
def test_different_tool_same_args_different_fingerprint(self):
fp1 = compute_fingerprint("read_file", {"path": "/index.html"})
fp2 = compute_fingerprint("write_file", {"path": "/index.html"})
assert fp1 != fp2
def test_fingerprint_is_md5_hex_32_chars(self):
fp = compute_fingerprint("any_tool", {"key": "value"})
assert len(fp) == 32
assert all(c in "0123456789abcdef" for c in fp)
def test_arg_order_does_not_matter(self):
"""json.dumps con sort_keys=True normaliza el orden."""
fp1 = compute_fingerprint("tool", {"b": 2, "a": 1})
fp2 = compute_fingerprint("tool", {"a": 1, "b": 2})
assert fp1 == fp2
def test_empty_args(self):
fp = compute_fingerprint("tool", {})
assert len(fp) == 32
# Debe ser determinista
assert fp == compute_fingerprint("tool", {})
def test_nested_args(self):
args = {"filter": {"table": "pages", "status": "active"}, "limit": 10}
fp1 = compute_fingerprint("search", args)
fp2 = compute_fingerprint("search", args)
assert fp1 == fp2
def test_different_nested_values(self):
fp1 = compute_fingerprint("search", {"filter": {"status": "active"}})
fp2 = compute_fingerprint("search", {"filter": {"status": "draft"}})
assert fp1 != fp2

View File

@@ -0,0 +1,152 @@
"""Tests para la logica de _extract_key_data_from_results del OrchestratorEngine.
Se replica la funcion como logica pura, sin importar src/ (evita dependencias).
Los ToolExecution se representan como SimpleNamespace con .arguments y .tool_name.
"""
import json
from types import SimpleNamespace
from typing import Any
import pytest
def _make_tool_execution(tool_name: str, arguments: dict, raw_output: str = "") -> SimpleNamespace:
"""Crea un objeto similar a ToolExecution con los atributos necesarios."""
return SimpleNamespace(
tool_name=tool_name,
arguments=arguments,
raw_output=raw_output,
)
def _make_result(*tool_executions) -> dict:
return {"tool_executions": list(tool_executions), "content": "ok"}
def extract_key_data_from_results(results: list) -> dict:
"""Replica exacta de OrchestratorEngine._extract_key_data_from_results."""
key_data: dict[str, Any] = {}
seen_tables: dict[str, list] = {}
seen_sections: list = []
seen_modules: list = []
seen_pages: dict[str, int] = {}
for result in results:
for te in result.get("tool_executions", []):
args = te.arguments
name = te.tool_name
table = args.get("tableName", "")
record = args.get("recordNum")
if table and record:
record_int = int(record) if str(record).isdigit() else None
if record_int and table not in seen_tables:
seen_tables[table] = []
if record_int and record_int not in seen_tables.get(table, []):
seen_tables[table].append(record_int)
section = args.get("sectionId", "")
if section and section not in seen_sections:
seen_sections.append(section)
module = args.get("moduleId", "") or args.get("moduleName", "")
if module and module not in seen_modules:
seen_modules.append(module)
if te.raw_output and "enlace" in te.raw_output:
try:
for line in te.raw_output.splitlines():
line = line.strip()
if line.startswith("{"):
try:
data = json.loads(line)
if "enlace" in data and "num" in data:
page_key = data.get("name", data["enlace"])
seen_pages[page_key] = int(data["num"])
except json.JSONDecodeError:
pass
except Exception:
pass
if seen_tables:
key_data["tables"] = {t: nums[:10] for t, nums in seen_tables.items()}
if seen_sections:
key_data["sections"] = seen_sections[:20]
if seen_modules:
key_data["modules"] = seen_modules[:20]
if seen_pages:
key_data["pages"] = dict(list(seen_pages.items())[:20])
return key_data
# =====================================================================
# Tests
# =====================================================================
class TestExtractKeyDataFromResults:
def test_extracts_table_and_record(self):
te = _make_tool_execution("update_record", {"tableName": "pages", "recordNum": "42"})
key_data = extract_key_data_from_results([_make_result(te)])
assert "tables" in key_data
assert "pages" in key_data["tables"]
assert 42 in key_data["tables"]["pages"]
def test_extracts_section_id(self):
te = _make_tool_execution("get_section", {"sectionId": "hero-banner"})
key_data = extract_key_data_from_results([_make_result(te)])
assert "sections" in key_data
assert "hero-banner" in key_data["sections"]
def test_extracts_module_id(self):
te = _make_tool_execution("compile_module", {"moduleId": "gallery-slider"})
key_data = extract_key_data_from_results([_make_result(te)])
assert "modules" in key_data
assert "gallery-slider" in key_data["modules"]
def test_extracts_module_name_fallback(self):
te = _make_tool_execution("compile_module", {"moduleName": "contact-form"})
key_data = extract_key_data_from_results([_make_result(te)])
assert "modules" in key_data
assert "contact-form" in key_data["modules"]
def test_empty_results(self):
key_data = extract_key_data_from_results([])
assert key_data == {}
def test_no_tool_executions_in_result(self):
key_data = extract_key_data_from_results([{"content": "x", "tool_executions": []}])
assert key_data == {}
def test_result_without_tool_executions_key(self):
key_data = extract_key_data_from_results([{"content": "just text"}])
assert key_data == {}
def test_tool_execution_without_relevant_args(self):
te = _make_tool_execution("read_file", {"path": "/var/www/index.html"})
key_data = extract_key_data_from_results([_make_result(te)])
assert key_data == {}
def test_multiple_tables_and_records(self):
te1 = _make_tool_execution("update_record", {"tableName": "pages", "recordNum": "1"})
te2 = _make_tool_execution("update_record", {"tableName": "pages", "recordNum": "5"})
te3 = _make_tool_execution("get_record", {"tableName": "blog", "recordNum": "10"})
key_data = extract_key_data_from_results([_make_result(te1, te2, te3)])
assert 1 in key_data["tables"]["pages"]
assert 5 in key_data["tables"]["pages"]
assert 10 in key_data["tables"]["blog"]
def test_deduplicates_records(self):
te1 = _make_tool_execution("a", {"tableName": "t", "recordNum": "7"})
te2 = _make_tool_execution("b", {"tableName": "t", "recordNum": "7"})
key_data = extract_key_data_from_results([_make_result(te1, te2)])
assert key_data["tables"]["t"].count(7) == 1
def test_extracts_pages_from_raw_output(self):
raw = '{"enlace": "/contacto", "num": 15, "name": "Contacto"}\nother line'
te = _make_tool_execution("list_pages", {"tableName": "web"}, raw_output=raw)
key_data = extract_key_data_from_results([_make_result(te)])
assert "pages" in key_data
assert key_data["pages"]["Contacto"] == 15