Tests unitarios: 51 tests para compactor, key_data, fingerprint y costes
- tests/test_compactor.py: 24 tests (estimate_tokens, extract_facts, build_summary, summarize_tool_output, compact_sections) - tests/test_key_data_extraction.py: 11 tests (extracción de tables, records, sections, modules, pages desde tool executions) - tests/test_fingerprint.py: 8 tests (deduplicación MD5, sort_keys, nested args) - tests/test_cost_calculation.py: 8 tests (pricing formula, custom pricing, rounding) - README.md: sección Tests con instrucciones de ejecución Todos offline, sin Docker/Redis/LLM. Ejecutar: python3 -m pytest tests/ -v Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
16
README.md
16
README.md
@@ -59,6 +59,22 @@ python3 -m uvicorn src.main:app --reload --port 8001
|
||||
# 5. Dashboard en http://localhost:8001/dashboard/
|
||||
```
|
||||
|
||||
### Tests
|
||||
|
||||
```bash
|
||||
# Ejecutar todos los tests unitarios (no necesita Docker, Redis ni LLM)
|
||||
pip install pytest
|
||||
python3 -m pytest tests/ -v
|
||||
|
||||
# Ejecutar un archivo específico
|
||||
python3 -m pytest tests/test_compactor.py -v
|
||||
|
||||
# Ejecutar un test específico
|
||||
python3 -m pytest tests/test_cost_calculation.py::TestCostCalculation::test_1m_input_tokens -v
|
||||
```
|
||||
|
||||
Los tests validan: compactación de contexto, extracción de key_data para historial, fingerprinting de tool calls, y cálculo de costes. Son 100% offline — no consumen tokens ni necesitan servicios externos.
|
||||
|
||||
### Cargar Knowledge Base
|
||||
|
||||
```bash
|
||||
|
||||
0
tests/__init__.py
Normal file
0
tests/__init__.py
Normal file
8
tests/conftest.py
Normal file
8
tests/conftest.py
Normal file
@@ -0,0 +1,8 @@
|
||||
"""Configuracion de pytest para agenticSystem tests.
|
||||
|
||||
Estos tests son 100% standalone — no importan desde src/ directamente
|
||||
porque el entorno de CI puede no tener las dependencias pesadas
|
||||
(anthropic, tiktoken, pydantic, etc.) ni Python 3.11+.
|
||||
|
||||
La logica bajo test se replica o se extrae como funciones puras.
|
||||
"""
|
||||
362
tests/test_compactor.py
Normal file
362
tests/test_compactor.py
Normal file
@@ -0,0 +1,362 @@
|
||||
"""Tests para la logica de context/compactor.py — estimacion de tokens,
|
||||
extraccion de facts, construccion de summaries y compactacion de secciones.
|
||||
|
||||
Se replica la logica pura sin importar src/ (evita dependencias pesadas).
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# =====================================================================
|
||||
# Replicas de la logica del compactor (funciones puras)
|
||||
# =====================================================================
|
||||
|
||||
|
||||
def estimate_tokens_fallback(text: str) -> int:
|
||||
"""Replica del fallback de estimate_tokens (sin tiktoken)."""
|
||||
if not text:
|
||||
return 0
|
||||
return max(1, len(text) // 4)
|
||||
|
||||
|
||||
def extract_facts(raw_output: str) -> list:
|
||||
"""Replica exacta de ContextCompactor._extract_facts."""
|
||||
facts = []
|
||||
lines = raw_output.strip().splitlines()
|
||||
|
||||
for line in lines[:100]:
|
||||
line = line.strip()
|
||||
if not line or len(line) < 10:
|
||||
continue
|
||||
if re.match(r"^[\w\s]+:\s+.+", line) and len(line) < 200:
|
||||
facts.append(line)
|
||||
elif re.match(r"^(✓|✗|PASS|FAIL|ERROR|OK|INFO|WARNING)", line):
|
||||
facts.append(line)
|
||||
elif re.match(r"^[\w/\\.]+\s*[:\-]\s*.+", line) and len(line) < 200:
|
||||
facts.append(line)
|
||||
|
||||
seen = set()
|
||||
unique = []
|
||||
for f in facts:
|
||||
if f not in seen:
|
||||
seen.add(f)
|
||||
unique.append(f)
|
||||
return unique[:15]
|
||||
|
||||
|
||||
def build_summary(tool_name: str, raw_output: str, facts: list) -> str:
|
||||
"""Replica exacta de ContextCompactor._build_summary."""
|
||||
lines = raw_output.strip().splitlines()
|
||||
total_lines = len(lines)
|
||||
char_count = len(raw_output)
|
||||
|
||||
parts = [f"Tool '{tool_name}' returned {total_lines} lines ({char_count} chars)."]
|
||||
|
||||
if facts:
|
||||
parts.append(f"Key findings: {'; '.join(facts[:5])}")
|
||||
|
||||
meaningful = [l.strip() for l in lines if l.strip()]
|
||||
if meaningful:
|
||||
parts.append(f"First: {meaningful[0][:120]}")
|
||||
if len(meaningful) > 1:
|
||||
parts.append(f"Last: {meaningful[-1][:120]}")
|
||||
|
||||
return " ".join(parts)
|
||||
|
||||
|
||||
def infer_artifact_type(tool_name: str) -> str:
|
||||
"""Replica de ContextCompactor._infer_artifact_type."""
|
||||
tool_lower = tool_name.lower()
|
||||
if any(k in tool_lower for k in ("read", "file", "code", "write", "edit")):
|
||||
return "code"
|
||||
if any(k in tool_lower for k in ("test", "check", "lint", "validate")):
|
||||
return "test_result"
|
||||
if any(k in tool_lower for k in ("search", "find", "grep", "glob")):
|
||||
return "analysis"
|
||||
if any(k in tool_lower for k in ("plan", "design", "architect")):
|
||||
return "plan"
|
||||
return "general"
|
||||
|
||||
|
||||
def summarize_tool_output(tool_name: str, raw_output: str, session_id: str, task_id: str) -> dict:
|
||||
"""Replica simplificada de ContextCompactor.summarize_tool_output.
|
||||
Devuelve un dict con los mismos campos que ArtifactSummary.
|
||||
"""
|
||||
facts = extract_facts(raw_output)
|
||||
summary = build_summary(tool_name, raw_output, facts)
|
||||
artifact_type = infer_artifact_type(tool_name)
|
||||
artifact_id = hashlib.sha256(
|
||||
f"{session_id}:{task_id}:{tool_name}:{raw_output[:200]}".encode()
|
||||
).hexdigest()[:16]
|
||||
|
||||
return {
|
||||
"artifact_id": artifact_id,
|
||||
"session_id": session_id,
|
||||
"task_id": task_id,
|
||||
"artifact_type": artifact_type,
|
||||
"title": f"Output of {tool_name}",
|
||||
"summary": summary,
|
||||
"facts": facts,
|
||||
"source_tool": tool_name,
|
||||
"char_count": len(raw_output),
|
||||
}
|
||||
|
||||
|
||||
# --- Modelo simplificado de ContextSection para test de compactacion ---
|
||||
|
||||
|
||||
@dataclass
|
||||
class Section:
|
||||
section_type: str # "immutable_rules", "working_context", "task_state", etc.
|
||||
content: str
|
||||
priority: int = 0
|
||||
token_estimate: int = 0
|
||||
|
||||
|
||||
def compact_sections(sections: list, max_tokens: int) -> list:
|
||||
"""Replica de ContextCompactor.compact_sections (logica pura)."""
|
||||
# 1. Deduplicar
|
||||
seen = set()
|
||||
unique = []
|
||||
for s in sections:
|
||||
h = hashlib.md5(s.content.encode()).hexdigest()
|
||||
if h not in seen:
|
||||
seen.add(h)
|
||||
unique.append(s)
|
||||
sections = unique
|
||||
|
||||
# 2. Estimar tokens
|
||||
for s in sections:
|
||||
s.token_estimate = estimate_tokens_fallback(s.content)
|
||||
|
||||
total = sum(s.token_estimate for s in sections)
|
||||
if total <= max_tokens:
|
||||
return sections
|
||||
|
||||
# 3. Ordenar por prioridad (mayor primero)
|
||||
sections.sort(key=lambda s: s.priority, reverse=True)
|
||||
|
||||
# 4. Trim de menor prioridad
|
||||
while total > max_tokens and sections:
|
||||
lowest = sections[-1]
|
||||
if lowest.section_type == "immutable_rules":
|
||||
break
|
||||
# Compactacion simple: eliminar lineas vacias
|
||||
compacted_lines = [l.rstrip() for l in lowest.content.splitlines() if l.strip()]
|
||||
compacted = "\n".join(compacted_lines)
|
||||
new_est = estimate_tokens_fallback(compacted)
|
||||
saved = lowest.token_estimate - new_est
|
||||
if saved > 0:
|
||||
lowest.content = compacted
|
||||
lowest.token_estimate = new_est
|
||||
total -= saved
|
||||
else:
|
||||
total -= lowest.token_estimate
|
||||
sections.pop()
|
||||
|
||||
return sections
|
||||
|
||||
|
||||
# =====================================================================
|
||||
# Tests: estimate_tokens
|
||||
# =====================================================================
|
||||
|
||||
|
||||
class TestEstimateTokens:
|
||||
def test_positive_for_nonempty_text(self):
|
||||
result = estimate_tokens_fallback("Hello world, this is a test string.")
|
||||
assert isinstance(result, int)
|
||||
assert result > 0
|
||||
|
||||
def test_zero_for_empty_string(self):
|
||||
assert estimate_tokens_fallback("") == 0
|
||||
|
||||
def test_longer_text_more_tokens(self):
|
||||
short = estimate_tokens_fallback("hi")
|
||||
long = estimate_tokens_fallback("hi " * 500)
|
||||
assert long > short
|
||||
|
||||
def test_returns_int_type(self):
|
||||
assert isinstance(estimate_tokens_fallback("cualquier texto"), int)
|
||||
|
||||
def test_minimum_is_one_for_short_text(self):
|
||||
# "ab" -> len 2 // 4 = 0, pero max(1, 0) = 1
|
||||
assert estimate_tokens_fallback("ab") == 1
|
||||
|
||||
|
||||
# =====================================================================
|
||||
# Tests: _extract_facts
|
||||
# =====================================================================
|
||||
|
||||
|
||||
class TestExtractFacts:
|
||||
def test_extracts_key_value_lines(self):
|
||||
raw = "Status: running\nVersion: 3.2.1\nIgnored short\nName: my-module"
|
||||
facts = extract_facts(raw)
|
||||
assert any("Status: running" in f for f in facts)
|
||||
assert any("Version: 3.2.1" in f for f in facts)
|
||||
assert any("Name: my-module" in f for f in facts)
|
||||
|
||||
def test_extracts_status_indicators(self):
|
||||
raw = "PASS test_login completed\nFAIL test_logout broken\nOK everything fine"
|
||||
facts = extract_facts(raw)
|
||||
assert any("PASS" in f for f in facts)
|
||||
assert any("FAIL" in f for f in facts)
|
||||
|
||||
def test_ignores_short_lines(self):
|
||||
raw = "ok\nhi\nyes\nStatus: this is long enough to be a fact"
|
||||
facts = extract_facts(raw)
|
||||
assert not any(f in ("ok", "hi", "yes") for f in facts)
|
||||
|
||||
def test_deduplicates(self):
|
||||
raw = "Status: running value\nStatus: running value\nStatus: running value"
|
||||
facts = extract_facts(raw)
|
||||
assert facts.count("Status: running value") == 1
|
||||
|
||||
def test_limits_to_15(self):
|
||||
lines = [f"Key{i}: value number {i} with enough length" for i in range(30)]
|
||||
raw = "\n".join(lines)
|
||||
facts = extract_facts(raw)
|
||||
assert len(facts) <= 15
|
||||
|
||||
def test_empty_input(self):
|
||||
facts = extract_facts("")
|
||||
assert facts == []
|
||||
|
||||
|
||||
# =====================================================================
|
||||
# Tests: _build_summary
|
||||
# =====================================================================
|
||||
|
||||
|
||||
class TestBuildSummary:
|
||||
def test_includes_tool_name(self):
|
||||
summary = build_summary("read_file", "line1\nline2\nline3", [])
|
||||
assert "read_file" in summary
|
||||
|
||||
def test_includes_line_count(self):
|
||||
raw = "line1\nline2\nline3"
|
||||
summary = build_summary("my_tool", raw, [])
|
||||
assert "3 lines" in summary
|
||||
|
||||
def test_includes_char_count(self):
|
||||
raw = "some content here"
|
||||
summary = build_summary("my_tool", raw, [])
|
||||
assert str(len(raw)) in summary
|
||||
|
||||
def test_includes_facts_when_present(self):
|
||||
facts = ["Status: ok", "Count: 42"]
|
||||
summary = build_summary("my_tool", "data", facts)
|
||||
assert "Status: ok" in summary
|
||||
|
||||
def test_includes_first_line(self):
|
||||
raw = "primera linea importante\nsegunda\ntercera"
|
||||
summary = build_summary("tool", raw, [])
|
||||
assert "primera linea importante" in summary
|
||||
|
||||
|
||||
# =====================================================================
|
||||
# Tests: summarize_tool_output
|
||||
# =====================================================================
|
||||
|
||||
|
||||
class TestSummarizeToolOutput:
|
||||
def test_returns_dict_with_correct_fields(self):
|
||||
result = summarize_tool_output(
|
||||
tool_name="read_file",
|
||||
raw_output="Status: ok\nContent: hello world here",
|
||||
session_id="sess-001",
|
||||
task_id="task-001",
|
||||
)
|
||||
assert isinstance(result, dict)
|
||||
assert result["session_id"] == "sess-001"
|
||||
assert result["task_id"] == "task-001"
|
||||
assert result["source_tool"] == "read_file"
|
||||
assert result["title"] == "Output of read_file"
|
||||
assert result["artifact_id"] # no vacio
|
||||
assert result["summary"] # no vacio
|
||||
assert result["char_count"] > 0
|
||||
|
||||
def test_artifact_type_inference(self):
|
||||
assert summarize_tool_output("read_file", "x", "s", "t")["artifact_type"] == "code"
|
||||
assert summarize_tool_output("test_run", "x", "s", "t")["artifact_type"] == "test_result"
|
||||
assert summarize_tool_output("search_records", "x", "s", "t")["artifact_type"] == "analysis"
|
||||
assert summarize_tool_output("deploy_app", "x", "s", "t")["artifact_type"] == "general"
|
||||
|
||||
def test_artifact_id_is_deterministic(self):
|
||||
r1 = summarize_tool_output("tool", "output", "s", "t")
|
||||
r2 = summarize_tool_output("tool", "output", "s", "t")
|
||||
assert r1["artifact_id"] == r2["artifact_id"]
|
||||
|
||||
def test_artifact_id_length(self):
|
||||
result = summarize_tool_output("tool", "output", "s", "t")
|
||||
assert len(result["artifact_id"]) == 16
|
||||
|
||||
|
||||
# =====================================================================
|
||||
# Tests: compact_sections
|
||||
# =====================================================================
|
||||
|
||||
|
||||
class TestCompactSections:
|
||||
def test_never_removes_immutable_rules(self):
|
||||
sections = [
|
||||
Section(
|
||||
section_type="immutable_rules",
|
||||
content="You must always follow these rules " * 20,
|
||||
priority=100,
|
||||
),
|
||||
Section(
|
||||
section_type="working_context",
|
||||
content="Some working context data " * 50,
|
||||
priority=1,
|
||||
),
|
||||
]
|
||||
result = compact_sections(sections, max_tokens=50)
|
||||
types = [s.section_type for s in result]
|
||||
assert "immutable_rules" in types
|
||||
|
||||
def test_respects_priority_order(self):
|
||||
"""Secciones de mayor prioridad sobreviven a la compactacion.
|
||||
Usamos un budget que cabe la seccion alta pero no ambas."""
|
||||
high = Section(
|
||||
section_type="task_state",
|
||||
content="Important task data here", # ~6 tokens
|
||||
priority=90,
|
||||
)
|
||||
low = Section(
|
||||
section_type="working_context",
|
||||
content="Low priority stuff " * 50, # ~250 tokens
|
||||
priority=1,
|
||||
)
|
||||
# Budget suficiente para high (~6) pero no para high+low (~256)
|
||||
result = compact_sections([high, low], max_tokens=20)
|
||||
types = [s.section_type for s in result]
|
||||
assert "task_state" in types
|
||||
# La de baja prioridad deberia haberse eliminado o compactado
|
||||
assert len(result) <= 2
|
||||
|
||||
def test_no_compaction_when_within_budget(self):
|
||||
sections = [
|
||||
Section(
|
||||
section_type="task_state",
|
||||
content="Short content",
|
||||
priority=50,
|
||||
),
|
||||
]
|
||||
result = compact_sections(sections, max_tokens=999_999)
|
||||
assert len(result) == 1
|
||||
assert result[0].content == "Short content"
|
||||
|
||||
def test_deduplicates_identical_sections(self):
|
||||
sections = [
|
||||
Section(section_type="working_context", content="duplicated content", priority=10),
|
||||
Section(section_type="working_context", content="duplicated content", priority=10),
|
||||
]
|
||||
result = compact_sections(sections, max_tokens=999_999)
|
||||
assert len(result) == 1
|
||||
71
tests/test_cost_calculation.py
Normal file
71
tests/test_cost_calculation.py
Normal file
@@ -0,0 +1,71 @@
|
||||
"""Tests para el calculo de costes del orquestador.
|
||||
|
||||
Replica la formula de coste de OrchestratorEngine._run_pipeline():
|
||||
cost_usd = (input_tokens / 1_000_000) * cost_per_1m_input
|
||||
+ (output_tokens / 1_000_000) * cost_per_1m_output
|
||||
|
||||
Defaults: cost_per_1m_input=2.50, cost_per_1m_output=15.00
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def calculate_cost(
|
||||
input_tokens: int,
|
||||
output_tokens: int,
|
||||
cost_per_1m_input: float = 2.50,
|
||||
cost_per_1m_output: float = 15.00,
|
||||
) -> float:
|
||||
"""Replica exacta de la formula de coste en engine.py."""
|
||||
return (
|
||||
(input_tokens / 1_000_000) * cost_per_1m_input
|
||||
+ (output_tokens / 1_000_000) * cost_per_1m_output
|
||||
)
|
||||
|
||||
|
||||
class TestCostCalculation:
|
||||
def test_1m_input_tokens(self):
|
||||
cost = calculate_cost(1_000_000, 0)
|
||||
assert cost == pytest.approx(2.50)
|
||||
|
||||
def test_1m_output_tokens(self):
|
||||
cost = calculate_cost(0, 1_000_000)
|
||||
assert cost == pytest.approx(15.00)
|
||||
|
||||
def test_500k_input_100k_output(self):
|
||||
cost = calculate_cost(500_000, 100_000)
|
||||
# (500_000 / 1_000_000) * 2.50 + (100_000 / 1_000_000) * 15.00
|
||||
# = 1.25 + 1.50 = 2.75
|
||||
assert cost == pytest.approx(2.75)
|
||||
|
||||
def test_zero_tokens(self):
|
||||
cost = calculate_cost(0, 0)
|
||||
assert cost == 0.0
|
||||
|
||||
def test_custom_pricing(self):
|
||||
cost = calculate_cost(
|
||||
1_000_000, 1_000_000,
|
||||
cost_per_1m_input=3.00,
|
||||
cost_per_1m_output=10.00,
|
||||
)
|
||||
assert cost == pytest.approx(13.00)
|
||||
|
||||
def test_small_token_count(self):
|
||||
"""Pocos tokens = coste muy bajo pero no cero."""
|
||||
cost = calculate_cost(100, 50)
|
||||
assert cost > 0
|
||||
assert cost < 0.01
|
||||
|
||||
def test_round_to_6_decimals(self):
|
||||
"""El engine hace round(cost_usd, 6)."""
|
||||
cost = calculate_cost(1, 1)
|
||||
rounded = round(cost, 6)
|
||||
# (1/1M)*2.50 + (1/1M)*15.00 = 1.75e-05
|
||||
# round(1.75e-05, 6) = 1.7e-05 (banker's rounding: 5 rounds to even)
|
||||
assert rounded == pytest.approx(0.000017, abs=1e-7)
|
||||
|
||||
def test_output_more_expensive_than_input(self):
|
||||
"""Con defaults, output es 6x mas caro que input."""
|
||||
input_cost = calculate_cost(1_000_000, 0)
|
||||
output_cost = calculate_cost(0, 1_000_000)
|
||||
assert output_cost == pytest.approx(input_cost * 6.0)
|
||||
61
tests/test_fingerprint.py
Normal file
61
tests/test_fingerprint.py
Normal file
@@ -0,0 +1,61 @@
|
||||
"""Tests para la logica de fingerprinting/deduplicacion de tool calls.
|
||||
|
||||
Replica la logica de BaseAgent.execute() (lineas con hashlib.md5) sin
|
||||
necesidad de instanciar BaseAgent ni sus dependencias.
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def compute_fingerprint(tool_name: str, args: dict) -> str:
|
||||
"""Replica exacta de la logica de fingerprint en BaseAgent.execute()."""
|
||||
fp_raw = f"{tool_name}:{json.dumps(args, sort_keys=True)}"
|
||||
return hashlib.md5(fp_raw.encode()).hexdigest()
|
||||
|
||||
|
||||
class TestFingerprint:
|
||||
def test_same_tool_same_args_same_fingerprint(self):
|
||||
fp1 = compute_fingerprint("read_file", {"path": "/index.html"})
|
||||
fp2 = compute_fingerprint("read_file", {"path": "/index.html"})
|
||||
assert fp1 == fp2
|
||||
|
||||
def test_same_tool_different_args_different_fingerprint(self):
|
||||
fp1 = compute_fingerprint("read_file", {"path": "/index.html"})
|
||||
fp2 = compute_fingerprint("read_file", {"path": "/style.css"})
|
||||
assert fp1 != fp2
|
||||
|
||||
def test_different_tool_same_args_different_fingerprint(self):
|
||||
fp1 = compute_fingerprint("read_file", {"path": "/index.html"})
|
||||
fp2 = compute_fingerprint("write_file", {"path": "/index.html"})
|
||||
assert fp1 != fp2
|
||||
|
||||
def test_fingerprint_is_md5_hex_32_chars(self):
|
||||
fp = compute_fingerprint("any_tool", {"key": "value"})
|
||||
assert len(fp) == 32
|
||||
assert all(c in "0123456789abcdef" for c in fp)
|
||||
|
||||
def test_arg_order_does_not_matter(self):
|
||||
"""json.dumps con sort_keys=True normaliza el orden."""
|
||||
fp1 = compute_fingerprint("tool", {"b": 2, "a": 1})
|
||||
fp2 = compute_fingerprint("tool", {"a": 1, "b": 2})
|
||||
assert fp1 == fp2
|
||||
|
||||
def test_empty_args(self):
|
||||
fp = compute_fingerprint("tool", {})
|
||||
assert len(fp) == 32
|
||||
# Debe ser determinista
|
||||
assert fp == compute_fingerprint("tool", {})
|
||||
|
||||
def test_nested_args(self):
|
||||
args = {"filter": {"table": "pages", "status": "active"}, "limit": 10}
|
||||
fp1 = compute_fingerprint("search", args)
|
||||
fp2 = compute_fingerprint("search", args)
|
||||
assert fp1 == fp2
|
||||
|
||||
def test_different_nested_values(self):
|
||||
fp1 = compute_fingerprint("search", {"filter": {"status": "active"}})
|
||||
fp2 = compute_fingerprint("search", {"filter": {"status": "draft"}})
|
||||
assert fp1 != fp2
|
||||
152
tests/test_key_data_extraction.py
Normal file
152
tests/test_key_data_extraction.py
Normal file
@@ -0,0 +1,152 @@
|
||||
"""Tests para la logica de _extract_key_data_from_results del OrchestratorEngine.
|
||||
|
||||
Se replica la funcion como logica pura, sin importar src/ (evita dependencias).
|
||||
Los ToolExecution se representan como SimpleNamespace con .arguments y .tool_name.
|
||||
"""
|
||||
|
||||
import json
|
||||
from types import SimpleNamespace
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def _make_tool_execution(tool_name: str, arguments: dict, raw_output: str = "") -> SimpleNamespace:
|
||||
"""Crea un objeto similar a ToolExecution con los atributos necesarios."""
|
||||
return SimpleNamespace(
|
||||
tool_name=tool_name,
|
||||
arguments=arguments,
|
||||
raw_output=raw_output,
|
||||
)
|
||||
|
||||
|
||||
def _make_result(*tool_executions) -> dict:
|
||||
return {"tool_executions": list(tool_executions), "content": "ok"}
|
||||
|
||||
|
||||
def extract_key_data_from_results(results: list) -> dict:
|
||||
"""Replica exacta de OrchestratorEngine._extract_key_data_from_results."""
|
||||
key_data: dict[str, Any] = {}
|
||||
seen_tables: dict[str, list] = {}
|
||||
seen_sections: list = []
|
||||
seen_modules: list = []
|
||||
seen_pages: dict[str, int] = {}
|
||||
|
||||
for result in results:
|
||||
for te in result.get("tool_executions", []):
|
||||
args = te.arguments
|
||||
name = te.tool_name
|
||||
|
||||
table = args.get("tableName", "")
|
||||
record = args.get("recordNum")
|
||||
if table and record:
|
||||
record_int = int(record) if str(record).isdigit() else None
|
||||
if record_int and table not in seen_tables:
|
||||
seen_tables[table] = []
|
||||
if record_int and record_int not in seen_tables.get(table, []):
|
||||
seen_tables[table].append(record_int)
|
||||
|
||||
section = args.get("sectionId", "")
|
||||
if section and section not in seen_sections:
|
||||
seen_sections.append(section)
|
||||
|
||||
module = args.get("moduleId", "") or args.get("moduleName", "")
|
||||
if module and module not in seen_modules:
|
||||
seen_modules.append(module)
|
||||
|
||||
if te.raw_output and "enlace" in te.raw_output:
|
||||
try:
|
||||
for line in te.raw_output.splitlines():
|
||||
line = line.strip()
|
||||
if line.startswith("{"):
|
||||
try:
|
||||
data = json.loads(line)
|
||||
if "enlace" in data and "num" in data:
|
||||
page_key = data.get("name", data["enlace"])
|
||||
seen_pages[page_key] = int(data["num"])
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if seen_tables:
|
||||
key_data["tables"] = {t: nums[:10] for t, nums in seen_tables.items()}
|
||||
if seen_sections:
|
||||
key_data["sections"] = seen_sections[:20]
|
||||
if seen_modules:
|
||||
key_data["modules"] = seen_modules[:20]
|
||||
if seen_pages:
|
||||
key_data["pages"] = dict(list(seen_pages.items())[:20])
|
||||
|
||||
return key_data
|
||||
|
||||
|
||||
# =====================================================================
|
||||
# Tests
|
||||
# =====================================================================
|
||||
|
||||
|
||||
class TestExtractKeyDataFromResults:
|
||||
def test_extracts_table_and_record(self):
|
||||
te = _make_tool_execution("update_record", {"tableName": "pages", "recordNum": "42"})
|
||||
key_data = extract_key_data_from_results([_make_result(te)])
|
||||
assert "tables" in key_data
|
||||
assert "pages" in key_data["tables"]
|
||||
assert 42 in key_data["tables"]["pages"]
|
||||
|
||||
def test_extracts_section_id(self):
|
||||
te = _make_tool_execution("get_section", {"sectionId": "hero-banner"})
|
||||
key_data = extract_key_data_from_results([_make_result(te)])
|
||||
assert "sections" in key_data
|
||||
assert "hero-banner" in key_data["sections"]
|
||||
|
||||
def test_extracts_module_id(self):
|
||||
te = _make_tool_execution("compile_module", {"moduleId": "gallery-slider"})
|
||||
key_data = extract_key_data_from_results([_make_result(te)])
|
||||
assert "modules" in key_data
|
||||
assert "gallery-slider" in key_data["modules"]
|
||||
|
||||
def test_extracts_module_name_fallback(self):
|
||||
te = _make_tool_execution("compile_module", {"moduleName": "contact-form"})
|
||||
key_data = extract_key_data_from_results([_make_result(te)])
|
||||
assert "modules" in key_data
|
||||
assert "contact-form" in key_data["modules"]
|
||||
|
||||
def test_empty_results(self):
|
||||
key_data = extract_key_data_from_results([])
|
||||
assert key_data == {}
|
||||
|
||||
def test_no_tool_executions_in_result(self):
|
||||
key_data = extract_key_data_from_results([{"content": "x", "tool_executions": []}])
|
||||
assert key_data == {}
|
||||
|
||||
def test_result_without_tool_executions_key(self):
|
||||
key_data = extract_key_data_from_results([{"content": "just text"}])
|
||||
assert key_data == {}
|
||||
|
||||
def test_tool_execution_without_relevant_args(self):
|
||||
te = _make_tool_execution("read_file", {"path": "/var/www/index.html"})
|
||||
key_data = extract_key_data_from_results([_make_result(te)])
|
||||
assert key_data == {}
|
||||
|
||||
def test_multiple_tables_and_records(self):
|
||||
te1 = _make_tool_execution("update_record", {"tableName": "pages", "recordNum": "1"})
|
||||
te2 = _make_tool_execution("update_record", {"tableName": "pages", "recordNum": "5"})
|
||||
te3 = _make_tool_execution("get_record", {"tableName": "blog", "recordNum": "10"})
|
||||
key_data = extract_key_data_from_results([_make_result(te1, te2, te3)])
|
||||
assert 1 in key_data["tables"]["pages"]
|
||||
assert 5 in key_data["tables"]["pages"]
|
||||
assert 10 in key_data["tables"]["blog"]
|
||||
|
||||
def test_deduplicates_records(self):
|
||||
te1 = _make_tool_execution("a", {"tableName": "t", "recordNum": "7"})
|
||||
te2 = _make_tool_execution("b", {"tableName": "t", "recordNum": "7"})
|
||||
key_data = extract_key_data_from_results([_make_result(te1, te2)])
|
||||
assert key_data["tables"]["t"].count(7) == 1
|
||||
|
||||
def test_extracts_pages_from_raw_output(self):
|
||||
raw = '{"enlace": "/contacto", "num": 15, "name": "Contacto"}\nother line'
|
||||
te = _make_tool_execution("list_pages", {"tableName": "web"}, raw_output=raw)
|
||||
key_data = extract_key_data_from_results([_make_result(te)])
|
||||
assert "pages" in key_data
|
||||
assert key_data["pages"]["Contacto"] == 15
|
||||
Reference in New Issue
Block a user