- tests/test_compactor.py: 24 tests (estimate_tokens, extract_facts, build_summary, summarize_tool_output, compact_sections) - tests/test_key_data_extraction.py: 11 tests (extracción de tables, records, sections, modules, pages desde tool executions) - tests/test_fingerprint.py: 8 tests (deduplicación MD5, sort_keys, nested args) - tests/test_cost_calculation.py: 8 tests (pricing formula, custom pricing, rounding) - README.md: sección Tests con instrucciones de ejecución Todos offline, sin Docker/Redis/LLM. Ejecutar: python3 -m pytest tests/ -v Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
363 lines
12 KiB
Python
363 lines
12 KiB
Python
"""Tests para la logica de context/compactor.py — estimacion de tokens,
|
|
extraccion de facts, construccion de summaries y compactacion de secciones.
|
|
|
|
Se replica la logica pura sin importar src/ (evita dependencias pesadas).
|
|
"""
|
|
|
|
import hashlib
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from typing import List
|
|
|
|
import pytest
|
|
|
|
|
|
# =====================================================================
|
|
# Replicas de la logica del compactor (funciones puras)
|
|
# =====================================================================
|
|
|
|
|
|
def estimate_tokens_fallback(text: str) -> int:
|
|
"""Replica del fallback de estimate_tokens (sin tiktoken)."""
|
|
if not text:
|
|
return 0
|
|
return max(1, len(text) // 4)
|
|
|
|
|
|
def extract_facts(raw_output: str) -> list:
|
|
"""Replica exacta de ContextCompactor._extract_facts."""
|
|
facts = []
|
|
lines = raw_output.strip().splitlines()
|
|
|
|
for line in lines[:100]:
|
|
line = line.strip()
|
|
if not line or len(line) < 10:
|
|
continue
|
|
if re.match(r"^[\w\s]+:\s+.+", line) and len(line) < 200:
|
|
facts.append(line)
|
|
elif re.match(r"^(✓|✗|PASS|FAIL|ERROR|OK|INFO|WARNING)", line):
|
|
facts.append(line)
|
|
elif re.match(r"^[\w/\\.]+\s*[:\-]\s*.+", line) and len(line) < 200:
|
|
facts.append(line)
|
|
|
|
seen = set()
|
|
unique = []
|
|
for f in facts:
|
|
if f not in seen:
|
|
seen.add(f)
|
|
unique.append(f)
|
|
return unique[:15]
|
|
|
|
|
|
def build_summary(tool_name: str, raw_output: str, facts: list) -> str:
|
|
"""Replica exacta de ContextCompactor._build_summary."""
|
|
lines = raw_output.strip().splitlines()
|
|
total_lines = len(lines)
|
|
char_count = len(raw_output)
|
|
|
|
parts = [f"Tool '{tool_name}' returned {total_lines} lines ({char_count} chars)."]
|
|
|
|
if facts:
|
|
parts.append(f"Key findings: {'; '.join(facts[:5])}")
|
|
|
|
meaningful = [l.strip() for l in lines if l.strip()]
|
|
if meaningful:
|
|
parts.append(f"First: {meaningful[0][:120]}")
|
|
if len(meaningful) > 1:
|
|
parts.append(f"Last: {meaningful[-1][:120]}")
|
|
|
|
return " ".join(parts)
|
|
|
|
|
|
def infer_artifact_type(tool_name: str) -> str:
|
|
"""Replica de ContextCompactor._infer_artifact_type."""
|
|
tool_lower = tool_name.lower()
|
|
if any(k in tool_lower for k in ("read", "file", "code", "write", "edit")):
|
|
return "code"
|
|
if any(k in tool_lower for k in ("test", "check", "lint", "validate")):
|
|
return "test_result"
|
|
if any(k in tool_lower for k in ("search", "find", "grep", "glob")):
|
|
return "analysis"
|
|
if any(k in tool_lower for k in ("plan", "design", "architect")):
|
|
return "plan"
|
|
return "general"
|
|
|
|
|
|
def summarize_tool_output(tool_name: str, raw_output: str, session_id: str, task_id: str) -> dict:
|
|
"""Replica simplificada de ContextCompactor.summarize_tool_output.
|
|
Devuelve un dict con los mismos campos que ArtifactSummary.
|
|
"""
|
|
facts = extract_facts(raw_output)
|
|
summary = build_summary(tool_name, raw_output, facts)
|
|
artifact_type = infer_artifact_type(tool_name)
|
|
artifact_id = hashlib.sha256(
|
|
f"{session_id}:{task_id}:{tool_name}:{raw_output[:200]}".encode()
|
|
).hexdigest()[:16]
|
|
|
|
return {
|
|
"artifact_id": artifact_id,
|
|
"session_id": session_id,
|
|
"task_id": task_id,
|
|
"artifact_type": artifact_type,
|
|
"title": f"Output of {tool_name}",
|
|
"summary": summary,
|
|
"facts": facts,
|
|
"source_tool": tool_name,
|
|
"char_count": len(raw_output),
|
|
}
|
|
|
|
|
|
# --- Modelo simplificado de ContextSection para test de compactacion ---
|
|
|
|
|
|
@dataclass
|
|
class Section:
|
|
section_type: str # "immutable_rules", "working_context", "task_state", etc.
|
|
content: str
|
|
priority: int = 0
|
|
token_estimate: int = 0
|
|
|
|
|
|
def compact_sections(sections: list, max_tokens: int) -> list:
|
|
"""Replica de ContextCompactor.compact_sections (logica pura)."""
|
|
# 1. Deduplicar
|
|
seen = set()
|
|
unique = []
|
|
for s in sections:
|
|
h = hashlib.md5(s.content.encode()).hexdigest()
|
|
if h not in seen:
|
|
seen.add(h)
|
|
unique.append(s)
|
|
sections = unique
|
|
|
|
# 2. Estimar tokens
|
|
for s in sections:
|
|
s.token_estimate = estimate_tokens_fallback(s.content)
|
|
|
|
total = sum(s.token_estimate for s in sections)
|
|
if total <= max_tokens:
|
|
return sections
|
|
|
|
# 3. Ordenar por prioridad (mayor primero)
|
|
sections.sort(key=lambda s: s.priority, reverse=True)
|
|
|
|
# 4. Trim de menor prioridad
|
|
while total > max_tokens and sections:
|
|
lowest = sections[-1]
|
|
if lowest.section_type == "immutable_rules":
|
|
break
|
|
# Compactacion simple: eliminar lineas vacias
|
|
compacted_lines = [l.rstrip() for l in lowest.content.splitlines() if l.strip()]
|
|
compacted = "\n".join(compacted_lines)
|
|
new_est = estimate_tokens_fallback(compacted)
|
|
saved = lowest.token_estimate - new_est
|
|
if saved > 0:
|
|
lowest.content = compacted
|
|
lowest.token_estimate = new_est
|
|
total -= saved
|
|
else:
|
|
total -= lowest.token_estimate
|
|
sections.pop()
|
|
|
|
return sections
|
|
|
|
|
|
# =====================================================================
|
|
# Tests: estimate_tokens
|
|
# =====================================================================
|
|
|
|
|
|
class TestEstimateTokens:
|
|
def test_positive_for_nonempty_text(self):
|
|
result = estimate_tokens_fallback("Hello world, this is a test string.")
|
|
assert isinstance(result, int)
|
|
assert result > 0
|
|
|
|
def test_zero_for_empty_string(self):
|
|
assert estimate_tokens_fallback("") == 0
|
|
|
|
def test_longer_text_more_tokens(self):
|
|
short = estimate_tokens_fallback("hi")
|
|
long = estimate_tokens_fallback("hi " * 500)
|
|
assert long > short
|
|
|
|
def test_returns_int_type(self):
|
|
assert isinstance(estimate_tokens_fallback("cualquier texto"), int)
|
|
|
|
def test_minimum_is_one_for_short_text(self):
|
|
# "ab" -> len 2 // 4 = 0, pero max(1, 0) = 1
|
|
assert estimate_tokens_fallback("ab") == 1
|
|
|
|
|
|
# =====================================================================
|
|
# Tests: _extract_facts
|
|
# =====================================================================
|
|
|
|
|
|
class TestExtractFacts:
|
|
def test_extracts_key_value_lines(self):
|
|
raw = "Status: running\nVersion: 3.2.1\nIgnored short\nName: my-module"
|
|
facts = extract_facts(raw)
|
|
assert any("Status: running" in f for f in facts)
|
|
assert any("Version: 3.2.1" in f for f in facts)
|
|
assert any("Name: my-module" in f for f in facts)
|
|
|
|
def test_extracts_status_indicators(self):
|
|
raw = "PASS test_login completed\nFAIL test_logout broken\nOK everything fine"
|
|
facts = extract_facts(raw)
|
|
assert any("PASS" in f for f in facts)
|
|
assert any("FAIL" in f for f in facts)
|
|
|
|
def test_ignores_short_lines(self):
|
|
raw = "ok\nhi\nyes\nStatus: this is long enough to be a fact"
|
|
facts = extract_facts(raw)
|
|
assert not any(f in ("ok", "hi", "yes") for f in facts)
|
|
|
|
def test_deduplicates(self):
|
|
raw = "Status: running value\nStatus: running value\nStatus: running value"
|
|
facts = extract_facts(raw)
|
|
assert facts.count("Status: running value") == 1
|
|
|
|
def test_limits_to_15(self):
|
|
lines = [f"Key{i}: value number {i} with enough length" for i in range(30)]
|
|
raw = "\n".join(lines)
|
|
facts = extract_facts(raw)
|
|
assert len(facts) <= 15
|
|
|
|
def test_empty_input(self):
|
|
facts = extract_facts("")
|
|
assert facts == []
|
|
|
|
|
|
# =====================================================================
|
|
# Tests: _build_summary
|
|
# =====================================================================
|
|
|
|
|
|
class TestBuildSummary:
|
|
def test_includes_tool_name(self):
|
|
summary = build_summary("read_file", "line1\nline2\nline3", [])
|
|
assert "read_file" in summary
|
|
|
|
def test_includes_line_count(self):
|
|
raw = "line1\nline2\nline3"
|
|
summary = build_summary("my_tool", raw, [])
|
|
assert "3 lines" in summary
|
|
|
|
def test_includes_char_count(self):
|
|
raw = "some content here"
|
|
summary = build_summary("my_tool", raw, [])
|
|
assert str(len(raw)) in summary
|
|
|
|
def test_includes_facts_when_present(self):
|
|
facts = ["Status: ok", "Count: 42"]
|
|
summary = build_summary("my_tool", "data", facts)
|
|
assert "Status: ok" in summary
|
|
|
|
def test_includes_first_line(self):
|
|
raw = "primera linea importante\nsegunda\ntercera"
|
|
summary = build_summary("tool", raw, [])
|
|
assert "primera linea importante" in summary
|
|
|
|
|
|
# =====================================================================
|
|
# Tests: summarize_tool_output
|
|
# =====================================================================
|
|
|
|
|
|
class TestSummarizeToolOutput:
|
|
def test_returns_dict_with_correct_fields(self):
|
|
result = summarize_tool_output(
|
|
tool_name="read_file",
|
|
raw_output="Status: ok\nContent: hello world here",
|
|
session_id="sess-001",
|
|
task_id="task-001",
|
|
)
|
|
assert isinstance(result, dict)
|
|
assert result["session_id"] == "sess-001"
|
|
assert result["task_id"] == "task-001"
|
|
assert result["source_tool"] == "read_file"
|
|
assert result["title"] == "Output of read_file"
|
|
assert result["artifact_id"] # no vacio
|
|
assert result["summary"] # no vacio
|
|
assert result["char_count"] > 0
|
|
|
|
def test_artifact_type_inference(self):
|
|
assert summarize_tool_output("read_file", "x", "s", "t")["artifact_type"] == "code"
|
|
assert summarize_tool_output("test_run", "x", "s", "t")["artifact_type"] == "test_result"
|
|
assert summarize_tool_output("search_records", "x", "s", "t")["artifact_type"] == "analysis"
|
|
assert summarize_tool_output("deploy_app", "x", "s", "t")["artifact_type"] == "general"
|
|
|
|
def test_artifact_id_is_deterministic(self):
|
|
r1 = summarize_tool_output("tool", "output", "s", "t")
|
|
r2 = summarize_tool_output("tool", "output", "s", "t")
|
|
assert r1["artifact_id"] == r2["artifact_id"]
|
|
|
|
def test_artifact_id_length(self):
|
|
result = summarize_tool_output("tool", "output", "s", "t")
|
|
assert len(result["artifact_id"]) == 16
|
|
|
|
|
|
# =====================================================================
|
|
# Tests: compact_sections
|
|
# =====================================================================
|
|
|
|
|
|
class TestCompactSections:
|
|
def test_never_removes_immutable_rules(self):
|
|
sections = [
|
|
Section(
|
|
section_type="immutable_rules",
|
|
content="You must always follow these rules " * 20,
|
|
priority=100,
|
|
),
|
|
Section(
|
|
section_type="working_context",
|
|
content="Some working context data " * 50,
|
|
priority=1,
|
|
),
|
|
]
|
|
result = compact_sections(sections, max_tokens=50)
|
|
types = [s.section_type for s in result]
|
|
assert "immutable_rules" in types
|
|
|
|
def test_respects_priority_order(self):
|
|
"""Secciones de mayor prioridad sobreviven a la compactacion.
|
|
Usamos un budget que cabe la seccion alta pero no ambas."""
|
|
high = Section(
|
|
section_type="task_state",
|
|
content="Important task data here", # ~6 tokens
|
|
priority=90,
|
|
)
|
|
low = Section(
|
|
section_type="working_context",
|
|
content="Low priority stuff " * 50, # ~250 tokens
|
|
priority=1,
|
|
)
|
|
# Budget suficiente para high (~6) pero no para high+low (~256)
|
|
result = compact_sections([high, low], max_tokens=20)
|
|
types = [s.section_type for s in result]
|
|
assert "task_state" in types
|
|
# La de baja prioridad deberia haberse eliminado o compactado
|
|
assert len(result) <= 2
|
|
|
|
def test_no_compaction_when_within_budget(self):
|
|
sections = [
|
|
Section(
|
|
section_type="task_state",
|
|
content="Short content",
|
|
priority=50,
|
|
),
|
|
]
|
|
result = compact_sections(sections, max_tokens=999_999)
|
|
assert len(result) == 1
|
|
assert result[0].content == "Short content"
|
|
|
|
def test_deduplicates_identical_sections(self):
|
|
sections = [
|
|
Section(section_type="working_context", content="duplicated content", priority=10),
|
|
Section(section_type="working_context", content="duplicated content", priority=10),
|
|
]
|
|
result = compact_sections(sections, max_tokens=999_999)
|
|
assert len(result) == 1
|