diff --git a/README.md b/README.md index 6a5510e..7e4e2d6 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,22 @@ python3 -m uvicorn src.main:app --reload --port 8001 # 5. Dashboard en http://localhost:8001/dashboard/ ``` +### Tests + +```bash +# Ejecutar todos los tests unitarios (no necesita Docker, Redis ni LLM) +pip install pytest +python3 -m pytest tests/ -v + +# Ejecutar un archivo específico +python3 -m pytest tests/test_compactor.py -v + +# Ejecutar un test específico +python3 -m pytest tests/test_cost_calculation.py::TestCostCalculation::test_1m_input_tokens -v +``` + +Los tests validan: compactación de contexto, extracción de key_data para historial, fingerprinting de tool calls, y cálculo de costes. Son 100% offline — no consumen tokens ni necesitan servicios externos. + ### Cargar Knowledge Base ```bash diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..2ec3b8a --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,8 @@ +"""Configuracion de pytest para agenticSystem tests. + +Estos tests son 100% standalone — no importan desde src/ directamente +porque el entorno de CI puede no tener las dependencias pesadas +(anthropic, tiktoken, pydantic, etc.) ni Python 3.11+. + +La logica bajo test se replica o se extrae como funciones puras. +""" diff --git a/tests/test_compactor.py b/tests/test_compactor.py new file mode 100644 index 0000000..6088b1a --- /dev/null +++ b/tests/test_compactor.py @@ -0,0 +1,362 @@ +"""Tests para la logica de context/compactor.py — estimacion de tokens, +extraccion de facts, construccion de summaries y compactacion de secciones. + +Se replica la logica pura sin importar src/ (evita dependencias pesadas). +""" + +import hashlib +import re +from dataclasses import dataclass, field +from typing import List + +import pytest + + +# ===================================================================== +# Replicas de la logica del compactor (funciones puras) +# ===================================================================== + + +def estimate_tokens_fallback(text: str) -> int: + """Replica del fallback de estimate_tokens (sin tiktoken).""" + if not text: + return 0 + return max(1, len(text) // 4) + + +def extract_facts(raw_output: str) -> list: + """Replica exacta de ContextCompactor._extract_facts.""" + facts = [] + lines = raw_output.strip().splitlines() + + for line in lines[:100]: + line = line.strip() + if not line or len(line) < 10: + continue + if re.match(r"^[\w\s]+:\s+.+", line) and len(line) < 200: + facts.append(line) + elif re.match(r"^(✓|✗|PASS|FAIL|ERROR|OK|INFO|WARNING)", line): + facts.append(line) + elif re.match(r"^[\w/\\.]+\s*[:\-]\s*.+", line) and len(line) < 200: + facts.append(line) + + seen = set() + unique = [] + for f in facts: + if f not in seen: + seen.add(f) + unique.append(f) + return unique[:15] + + +def build_summary(tool_name: str, raw_output: str, facts: list) -> str: + """Replica exacta de ContextCompactor._build_summary.""" + lines = raw_output.strip().splitlines() + total_lines = len(lines) + char_count = len(raw_output) + + parts = [f"Tool '{tool_name}' returned {total_lines} lines ({char_count} chars)."] + + if facts: + parts.append(f"Key findings: {'; '.join(facts[:5])}") + + meaningful = [l.strip() for l in lines if l.strip()] + if meaningful: + parts.append(f"First: {meaningful[0][:120]}") + if len(meaningful) > 1: + parts.append(f"Last: {meaningful[-1][:120]}") + + return " ".join(parts) + + +def infer_artifact_type(tool_name: str) -> str: + """Replica de ContextCompactor._infer_artifact_type.""" + tool_lower = tool_name.lower() + if any(k in tool_lower for k in ("read", "file", "code", "write", "edit")): + return "code" + if any(k in tool_lower for k in ("test", "check", "lint", "validate")): + return "test_result" + if any(k in tool_lower for k in ("search", "find", "grep", "glob")): + return "analysis" + if any(k in tool_lower for k in ("plan", "design", "architect")): + return "plan" + return "general" + + +def summarize_tool_output(tool_name: str, raw_output: str, session_id: str, task_id: str) -> dict: + """Replica simplificada de ContextCompactor.summarize_tool_output. + Devuelve un dict con los mismos campos que ArtifactSummary. + """ + facts = extract_facts(raw_output) + summary = build_summary(tool_name, raw_output, facts) + artifact_type = infer_artifact_type(tool_name) + artifact_id = hashlib.sha256( + f"{session_id}:{task_id}:{tool_name}:{raw_output[:200]}".encode() + ).hexdigest()[:16] + + return { + "artifact_id": artifact_id, + "session_id": session_id, + "task_id": task_id, + "artifact_type": artifact_type, + "title": f"Output of {tool_name}", + "summary": summary, + "facts": facts, + "source_tool": tool_name, + "char_count": len(raw_output), + } + + +# --- Modelo simplificado de ContextSection para test de compactacion --- + + +@dataclass +class Section: + section_type: str # "immutable_rules", "working_context", "task_state", etc. + content: str + priority: int = 0 + token_estimate: int = 0 + + +def compact_sections(sections: list, max_tokens: int) -> list: + """Replica de ContextCompactor.compact_sections (logica pura).""" + # 1. Deduplicar + seen = set() + unique = [] + for s in sections: + h = hashlib.md5(s.content.encode()).hexdigest() + if h not in seen: + seen.add(h) + unique.append(s) + sections = unique + + # 2. Estimar tokens + for s in sections: + s.token_estimate = estimate_tokens_fallback(s.content) + + total = sum(s.token_estimate for s in sections) + if total <= max_tokens: + return sections + + # 3. Ordenar por prioridad (mayor primero) + sections.sort(key=lambda s: s.priority, reverse=True) + + # 4. Trim de menor prioridad + while total > max_tokens and sections: + lowest = sections[-1] + if lowest.section_type == "immutable_rules": + break + # Compactacion simple: eliminar lineas vacias + compacted_lines = [l.rstrip() for l in lowest.content.splitlines() if l.strip()] + compacted = "\n".join(compacted_lines) + new_est = estimate_tokens_fallback(compacted) + saved = lowest.token_estimate - new_est + if saved > 0: + lowest.content = compacted + lowest.token_estimate = new_est + total -= saved + else: + total -= lowest.token_estimate + sections.pop() + + return sections + + +# ===================================================================== +# Tests: estimate_tokens +# ===================================================================== + + +class TestEstimateTokens: + def test_positive_for_nonempty_text(self): + result = estimate_tokens_fallback("Hello world, this is a test string.") + assert isinstance(result, int) + assert result > 0 + + def test_zero_for_empty_string(self): + assert estimate_tokens_fallback("") == 0 + + def test_longer_text_more_tokens(self): + short = estimate_tokens_fallback("hi") + long = estimate_tokens_fallback("hi " * 500) + assert long > short + + def test_returns_int_type(self): + assert isinstance(estimate_tokens_fallback("cualquier texto"), int) + + def test_minimum_is_one_for_short_text(self): + # "ab" -> len 2 // 4 = 0, pero max(1, 0) = 1 + assert estimate_tokens_fallback("ab") == 1 + + +# ===================================================================== +# Tests: _extract_facts +# ===================================================================== + + +class TestExtractFacts: + def test_extracts_key_value_lines(self): + raw = "Status: running\nVersion: 3.2.1\nIgnored short\nName: my-module" + facts = extract_facts(raw) + assert any("Status: running" in f for f in facts) + assert any("Version: 3.2.1" in f for f in facts) + assert any("Name: my-module" in f for f in facts) + + def test_extracts_status_indicators(self): + raw = "PASS test_login completed\nFAIL test_logout broken\nOK everything fine" + facts = extract_facts(raw) + assert any("PASS" in f for f in facts) + assert any("FAIL" in f for f in facts) + + def test_ignores_short_lines(self): + raw = "ok\nhi\nyes\nStatus: this is long enough to be a fact" + facts = extract_facts(raw) + assert not any(f in ("ok", "hi", "yes") for f in facts) + + def test_deduplicates(self): + raw = "Status: running value\nStatus: running value\nStatus: running value" + facts = extract_facts(raw) + assert facts.count("Status: running value") == 1 + + def test_limits_to_15(self): + lines = [f"Key{i}: value number {i} with enough length" for i in range(30)] + raw = "\n".join(lines) + facts = extract_facts(raw) + assert len(facts) <= 15 + + def test_empty_input(self): + facts = extract_facts("") + assert facts == [] + + +# ===================================================================== +# Tests: _build_summary +# ===================================================================== + + +class TestBuildSummary: + def test_includes_tool_name(self): + summary = build_summary("read_file", "line1\nline2\nline3", []) + assert "read_file" in summary + + def test_includes_line_count(self): + raw = "line1\nline2\nline3" + summary = build_summary("my_tool", raw, []) + assert "3 lines" in summary + + def test_includes_char_count(self): + raw = "some content here" + summary = build_summary("my_tool", raw, []) + assert str(len(raw)) in summary + + def test_includes_facts_when_present(self): + facts = ["Status: ok", "Count: 42"] + summary = build_summary("my_tool", "data", facts) + assert "Status: ok" in summary + + def test_includes_first_line(self): + raw = "primera linea importante\nsegunda\ntercera" + summary = build_summary("tool", raw, []) + assert "primera linea importante" in summary + + +# ===================================================================== +# Tests: summarize_tool_output +# ===================================================================== + + +class TestSummarizeToolOutput: + def test_returns_dict_with_correct_fields(self): + result = summarize_tool_output( + tool_name="read_file", + raw_output="Status: ok\nContent: hello world here", + session_id="sess-001", + task_id="task-001", + ) + assert isinstance(result, dict) + assert result["session_id"] == "sess-001" + assert result["task_id"] == "task-001" + assert result["source_tool"] == "read_file" + assert result["title"] == "Output of read_file" + assert result["artifact_id"] # no vacio + assert result["summary"] # no vacio + assert result["char_count"] > 0 + + def test_artifact_type_inference(self): + assert summarize_tool_output("read_file", "x", "s", "t")["artifact_type"] == "code" + assert summarize_tool_output("test_run", "x", "s", "t")["artifact_type"] == "test_result" + assert summarize_tool_output("search_records", "x", "s", "t")["artifact_type"] == "analysis" + assert summarize_tool_output("deploy_app", "x", "s", "t")["artifact_type"] == "general" + + def test_artifact_id_is_deterministic(self): + r1 = summarize_tool_output("tool", "output", "s", "t") + r2 = summarize_tool_output("tool", "output", "s", "t") + assert r1["artifact_id"] == r2["artifact_id"] + + def test_artifact_id_length(self): + result = summarize_tool_output("tool", "output", "s", "t") + assert len(result["artifact_id"]) == 16 + + +# ===================================================================== +# Tests: compact_sections +# ===================================================================== + + +class TestCompactSections: + def test_never_removes_immutable_rules(self): + sections = [ + Section( + section_type="immutable_rules", + content="You must always follow these rules " * 20, + priority=100, + ), + Section( + section_type="working_context", + content="Some working context data " * 50, + priority=1, + ), + ] + result = compact_sections(sections, max_tokens=50) + types = [s.section_type for s in result] + assert "immutable_rules" in types + + def test_respects_priority_order(self): + """Secciones de mayor prioridad sobreviven a la compactacion. + Usamos un budget que cabe la seccion alta pero no ambas.""" + high = Section( + section_type="task_state", + content="Important task data here", # ~6 tokens + priority=90, + ) + low = Section( + section_type="working_context", + content="Low priority stuff " * 50, # ~250 tokens + priority=1, + ) + # Budget suficiente para high (~6) pero no para high+low (~256) + result = compact_sections([high, low], max_tokens=20) + types = [s.section_type for s in result] + assert "task_state" in types + # La de baja prioridad deberia haberse eliminado o compactado + assert len(result) <= 2 + + def test_no_compaction_when_within_budget(self): + sections = [ + Section( + section_type="task_state", + content="Short content", + priority=50, + ), + ] + result = compact_sections(sections, max_tokens=999_999) + assert len(result) == 1 + assert result[0].content == "Short content" + + def test_deduplicates_identical_sections(self): + sections = [ + Section(section_type="working_context", content="duplicated content", priority=10), + Section(section_type="working_context", content="duplicated content", priority=10), + ] + result = compact_sections(sections, max_tokens=999_999) + assert len(result) == 1 diff --git a/tests/test_cost_calculation.py b/tests/test_cost_calculation.py new file mode 100644 index 0000000..b9549d7 --- /dev/null +++ b/tests/test_cost_calculation.py @@ -0,0 +1,71 @@ +"""Tests para el calculo de costes del orquestador. + +Replica la formula de coste de OrchestratorEngine._run_pipeline(): + cost_usd = (input_tokens / 1_000_000) * cost_per_1m_input + + (output_tokens / 1_000_000) * cost_per_1m_output + +Defaults: cost_per_1m_input=2.50, cost_per_1m_output=15.00 +""" + +import pytest + + +def calculate_cost( + input_tokens: int, + output_tokens: int, + cost_per_1m_input: float = 2.50, + cost_per_1m_output: float = 15.00, +) -> float: + """Replica exacta de la formula de coste en engine.py.""" + return ( + (input_tokens / 1_000_000) * cost_per_1m_input + + (output_tokens / 1_000_000) * cost_per_1m_output + ) + + +class TestCostCalculation: + def test_1m_input_tokens(self): + cost = calculate_cost(1_000_000, 0) + assert cost == pytest.approx(2.50) + + def test_1m_output_tokens(self): + cost = calculate_cost(0, 1_000_000) + assert cost == pytest.approx(15.00) + + def test_500k_input_100k_output(self): + cost = calculate_cost(500_000, 100_000) + # (500_000 / 1_000_000) * 2.50 + (100_000 / 1_000_000) * 15.00 + # = 1.25 + 1.50 = 2.75 + assert cost == pytest.approx(2.75) + + def test_zero_tokens(self): + cost = calculate_cost(0, 0) + assert cost == 0.0 + + def test_custom_pricing(self): + cost = calculate_cost( + 1_000_000, 1_000_000, + cost_per_1m_input=3.00, + cost_per_1m_output=10.00, + ) + assert cost == pytest.approx(13.00) + + def test_small_token_count(self): + """Pocos tokens = coste muy bajo pero no cero.""" + cost = calculate_cost(100, 50) + assert cost > 0 + assert cost < 0.01 + + def test_round_to_6_decimals(self): + """El engine hace round(cost_usd, 6).""" + cost = calculate_cost(1, 1) + rounded = round(cost, 6) + # (1/1M)*2.50 + (1/1M)*15.00 = 1.75e-05 + # round(1.75e-05, 6) = 1.7e-05 (banker's rounding: 5 rounds to even) + assert rounded == pytest.approx(0.000017, abs=1e-7) + + def test_output_more_expensive_than_input(self): + """Con defaults, output es 6x mas caro que input.""" + input_cost = calculate_cost(1_000_000, 0) + output_cost = calculate_cost(0, 1_000_000) + assert output_cost == pytest.approx(input_cost * 6.0) diff --git a/tests/test_fingerprint.py b/tests/test_fingerprint.py new file mode 100644 index 0000000..f722aa5 --- /dev/null +++ b/tests/test_fingerprint.py @@ -0,0 +1,61 @@ +"""Tests para la logica de fingerprinting/deduplicacion de tool calls. + +Replica la logica de BaseAgent.execute() (lineas con hashlib.md5) sin +necesidad de instanciar BaseAgent ni sus dependencias. +""" + +import hashlib +import json + +import pytest + + +def compute_fingerprint(tool_name: str, args: dict) -> str: + """Replica exacta de la logica de fingerprint en BaseAgent.execute().""" + fp_raw = f"{tool_name}:{json.dumps(args, sort_keys=True)}" + return hashlib.md5(fp_raw.encode()).hexdigest() + + +class TestFingerprint: + def test_same_tool_same_args_same_fingerprint(self): + fp1 = compute_fingerprint("read_file", {"path": "/index.html"}) + fp2 = compute_fingerprint("read_file", {"path": "/index.html"}) + assert fp1 == fp2 + + def test_same_tool_different_args_different_fingerprint(self): + fp1 = compute_fingerprint("read_file", {"path": "/index.html"}) + fp2 = compute_fingerprint("read_file", {"path": "/style.css"}) + assert fp1 != fp2 + + def test_different_tool_same_args_different_fingerprint(self): + fp1 = compute_fingerprint("read_file", {"path": "/index.html"}) + fp2 = compute_fingerprint("write_file", {"path": "/index.html"}) + assert fp1 != fp2 + + def test_fingerprint_is_md5_hex_32_chars(self): + fp = compute_fingerprint("any_tool", {"key": "value"}) + assert len(fp) == 32 + assert all(c in "0123456789abcdef" for c in fp) + + def test_arg_order_does_not_matter(self): + """json.dumps con sort_keys=True normaliza el orden.""" + fp1 = compute_fingerprint("tool", {"b": 2, "a": 1}) + fp2 = compute_fingerprint("tool", {"a": 1, "b": 2}) + assert fp1 == fp2 + + def test_empty_args(self): + fp = compute_fingerprint("tool", {}) + assert len(fp) == 32 + # Debe ser determinista + assert fp == compute_fingerprint("tool", {}) + + def test_nested_args(self): + args = {"filter": {"table": "pages", "status": "active"}, "limit": 10} + fp1 = compute_fingerprint("search", args) + fp2 = compute_fingerprint("search", args) + assert fp1 == fp2 + + def test_different_nested_values(self): + fp1 = compute_fingerprint("search", {"filter": {"status": "active"}}) + fp2 = compute_fingerprint("search", {"filter": {"status": "draft"}}) + assert fp1 != fp2 diff --git a/tests/test_key_data_extraction.py b/tests/test_key_data_extraction.py new file mode 100644 index 0000000..7903376 --- /dev/null +++ b/tests/test_key_data_extraction.py @@ -0,0 +1,152 @@ +"""Tests para la logica de _extract_key_data_from_results del OrchestratorEngine. + +Se replica la funcion como logica pura, sin importar src/ (evita dependencias). +Los ToolExecution se representan como SimpleNamespace con .arguments y .tool_name. +""" + +import json +from types import SimpleNamespace +from typing import Any + +import pytest + + +def _make_tool_execution(tool_name: str, arguments: dict, raw_output: str = "") -> SimpleNamespace: + """Crea un objeto similar a ToolExecution con los atributos necesarios.""" + return SimpleNamespace( + tool_name=tool_name, + arguments=arguments, + raw_output=raw_output, + ) + + +def _make_result(*tool_executions) -> dict: + return {"tool_executions": list(tool_executions), "content": "ok"} + + +def extract_key_data_from_results(results: list) -> dict: + """Replica exacta de OrchestratorEngine._extract_key_data_from_results.""" + key_data: dict[str, Any] = {} + seen_tables: dict[str, list] = {} + seen_sections: list = [] + seen_modules: list = [] + seen_pages: dict[str, int] = {} + + for result in results: + for te in result.get("tool_executions", []): + args = te.arguments + name = te.tool_name + + table = args.get("tableName", "") + record = args.get("recordNum") + if table and record: + record_int = int(record) if str(record).isdigit() else None + if record_int and table not in seen_tables: + seen_tables[table] = [] + if record_int and record_int not in seen_tables.get(table, []): + seen_tables[table].append(record_int) + + section = args.get("sectionId", "") + if section and section not in seen_sections: + seen_sections.append(section) + + module = args.get("moduleId", "") or args.get("moduleName", "") + if module and module not in seen_modules: + seen_modules.append(module) + + if te.raw_output and "enlace" in te.raw_output: + try: + for line in te.raw_output.splitlines(): + line = line.strip() + if line.startswith("{"): + try: + data = json.loads(line) + if "enlace" in data and "num" in data: + page_key = data.get("name", data["enlace"]) + seen_pages[page_key] = int(data["num"]) + except json.JSONDecodeError: + pass + except Exception: + pass + + if seen_tables: + key_data["tables"] = {t: nums[:10] for t, nums in seen_tables.items()} + if seen_sections: + key_data["sections"] = seen_sections[:20] + if seen_modules: + key_data["modules"] = seen_modules[:20] + if seen_pages: + key_data["pages"] = dict(list(seen_pages.items())[:20]) + + return key_data + + +# ===================================================================== +# Tests +# ===================================================================== + + +class TestExtractKeyDataFromResults: + def test_extracts_table_and_record(self): + te = _make_tool_execution("update_record", {"tableName": "pages", "recordNum": "42"}) + key_data = extract_key_data_from_results([_make_result(te)]) + assert "tables" in key_data + assert "pages" in key_data["tables"] + assert 42 in key_data["tables"]["pages"] + + def test_extracts_section_id(self): + te = _make_tool_execution("get_section", {"sectionId": "hero-banner"}) + key_data = extract_key_data_from_results([_make_result(te)]) + assert "sections" in key_data + assert "hero-banner" in key_data["sections"] + + def test_extracts_module_id(self): + te = _make_tool_execution("compile_module", {"moduleId": "gallery-slider"}) + key_data = extract_key_data_from_results([_make_result(te)]) + assert "modules" in key_data + assert "gallery-slider" in key_data["modules"] + + def test_extracts_module_name_fallback(self): + te = _make_tool_execution("compile_module", {"moduleName": "contact-form"}) + key_data = extract_key_data_from_results([_make_result(te)]) + assert "modules" in key_data + assert "contact-form" in key_data["modules"] + + def test_empty_results(self): + key_data = extract_key_data_from_results([]) + assert key_data == {} + + def test_no_tool_executions_in_result(self): + key_data = extract_key_data_from_results([{"content": "x", "tool_executions": []}]) + assert key_data == {} + + def test_result_without_tool_executions_key(self): + key_data = extract_key_data_from_results([{"content": "just text"}]) + assert key_data == {} + + def test_tool_execution_without_relevant_args(self): + te = _make_tool_execution("read_file", {"path": "/var/www/index.html"}) + key_data = extract_key_data_from_results([_make_result(te)]) + assert key_data == {} + + def test_multiple_tables_and_records(self): + te1 = _make_tool_execution("update_record", {"tableName": "pages", "recordNum": "1"}) + te2 = _make_tool_execution("update_record", {"tableName": "pages", "recordNum": "5"}) + te3 = _make_tool_execution("get_record", {"tableName": "blog", "recordNum": "10"}) + key_data = extract_key_data_from_results([_make_result(te1, te2, te3)]) + assert 1 in key_data["tables"]["pages"] + assert 5 in key_data["tables"]["pages"] + assert 10 in key_data["tables"]["blog"] + + def test_deduplicates_records(self): + te1 = _make_tool_execution("a", {"tableName": "t", "recordNum": "7"}) + te2 = _make_tool_execution("b", {"tableName": "t", "recordNum": "7"}) + key_data = extract_key_data_from_results([_make_result(te1, te2)]) + assert key_data["tables"]["t"].count(7) == 1 + + def test_extracts_pages_from_raw_output(self): + raw = '{"enlace": "/contacto", "num": 15, "name": "Contacto"}\nother line' + te = _make_tool_execution("list_pages", {"tableName": "web"}, raw_output=raw) + key_data = extract_key_data_from_results([_make_result(te)]) + assert "pages" in key_data + assert key_data["pages"]["Contacto"] == 15