"""Tests para la logica de context/compactor.py — estimacion de tokens, extraccion de facts, construccion de summaries y compactacion de secciones. Se replica la logica pura sin importar src/ (evita dependencias pesadas). """ import hashlib import re from dataclasses import dataclass, field from typing import List import pytest # ===================================================================== # Replicas de la logica del compactor (funciones puras) # ===================================================================== def estimate_tokens_fallback(text: str) -> int: """Replica del fallback de estimate_tokens (sin tiktoken).""" if not text: return 0 return max(1, len(text) // 4) def extract_facts(raw_output: str) -> list: """Replica exacta de ContextCompactor._extract_facts.""" facts = [] lines = raw_output.strip().splitlines() for line in lines[:100]: line = line.strip() if not line or len(line) < 10: continue if re.match(r"^[\w\s]+:\s+.+", line) and len(line) < 200: facts.append(line) elif re.match(r"^(✓|✗|PASS|FAIL|ERROR|OK|INFO|WARNING)", line): facts.append(line) elif re.match(r"^[\w/\\.]+\s*[:\-]\s*.+", line) and len(line) < 200: facts.append(line) seen = set() unique = [] for f in facts: if f not in seen: seen.add(f) unique.append(f) return unique[:15] def build_summary(tool_name: str, raw_output: str, facts: list) -> str: """Replica exacta de ContextCompactor._build_summary.""" lines = raw_output.strip().splitlines() total_lines = len(lines) char_count = len(raw_output) parts = [f"Tool '{tool_name}' returned {total_lines} lines ({char_count} chars)."] if facts: parts.append(f"Key findings: {'; '.join(facts[:5])}") meaningful = [l.strip() for l in lines if l.strip()] if meaningful: parts.append(f"First: {meaningful[0][:120]}") if len(meaningful) > 1: parts.append(f"Last: {meaningful[-1][:120]}") return " ".join(parts) def infer_artifact_type(tool_name: str) -> str: """Replica de ContextCompactor._infer_artifact_type.""" tool_lower = tool_name.lower() if any(k in tool_lower for k in ("read", "file", "code", "write", "edit")): return "code" if any(k in tool_lower for k in ("test", "check", "lint", "validate")): return "test_result" if any(k in tool_lower for k in ("search", "find", "grep", "glob")): return "analysis" if any(k in tool_lower for k in ("plan", "design", "architect")): return "plan" return "general" def summarize_tool_output(tool_name: str, raw_output: str, session_id: str, task_id: str) -> dict: """Replica simplificada de ContextCompactor.summarize_tool_output. Devuelve un dict con los mismos campos que ArtifactSummary. """ facts = extract_facts(raw_output) summary = build_summary(tool_name, raw_output, facts) artifact_type = infer_artifact_type(tool_name) artifact_id = hashlib.sha256( f"{session_id}:{task_id}:{tool_name}:{raw_output[:200]}".encode() ).hexdigest()[:16] return { "artifact_id": artifact_id, "session_id": session_id, "task_id": task_id, "artifact_type": artifact_type, "title": f"Output of {tool_name}", "summary": summary, "facts": facts, "source_tool": tool_name, "char_count": len(raw_output), } # --- Modelo simplificado de ContextSection para test de compactacion --- @dataclass class Section: section_type: str # "immutable_rules", "working_context", "task_state", etc. content: str priority: int = 0 token_estimate: int = 0 def compact_sections(sections: list, max_tokens: int) -> list: """Replica de ContextCompactor.compact_sections (logica pura).""" # 1. Deduplicar seen = set() unique = [] for s in sections: h = hashlib.md5(s.content.encode()).hexdigest() if h not in seen: seen.add(h) unique.append(s) sections = unique # 2. Estimar tokens for s in sections: s.token_estimate = estimate_tokens_fallback(s.content) total = sum(s.token_estimate for s in sections) if total <= max_tokens: return sections # 3. Ordenar por prioridad (mayor primero) sections.sort(key=lambda s: s.priority, reverse=True) # 4. Trim de menor prioridad while total > max_tokens and sections: lowest = sections[-1] if lowest.section_type == "immutable_rules": break # Compactacion simple: eliminar lineas vacias compacted_lines = [l.rstrip() for l in lowest.content.splitlines() if l.strip()] compacted = "\n".join(compacted_lines) new_est = estimate_tokens_fallback(compacted) saved = lowest.token_estimate - new_est if saved > 0: lowest.content = compacted lowest.token_estimate = new_est total -= saved else: total -= lowest.token_estimate sections.pop() return sections # ===================================================================== # Tests: estimate_tokens # ===================================================================== class TestEstimateTokens: def test_positive_for_nonempty_text(self): result = estimate_tokens_fallback("Hello world, this is a test string.") assert isinstance(result, int) assert result > 0 def test_zero_for_empty_string(self): assert estimate_tokens_fallback("") == 0 def test_longer_text_more_tokens(self): short = estimate_tokens_fallback("hi") long = estimate_tokens_fallback("hi " * 500) assert long > short def test_returns_int_type(self): assert isinstance(estimate_tokens_fallback("cualquier texto"), int) def test_minimum_is_one_for_short_text(self): # "ab" -> len 2 // 4 = 0, pero max(1, 0) = 1 assert estimate_tokens_fallback("ab") == 1 # ===================================================================== # Tests: _extract_facts # ===================================================================== class TestExtractFacts: def test_extracts_key_value_lines(self): raw = "Status: running\nVersion: 3.2.1\nIgnored short\nName: my-module" facts = extract_facts(raw) assert any("Status: running" in f for f in facts) assert any("Version: 3.2.1" in f for f in facts) assert any("Name: my-module" in f for f in facts) def test_extracts_status_indicators(self): raw = "PASS test_login completed\nFAIL test_logout broken\nOK everything fine" facts = extract_facts(raw) assert any("PASS" in f for f in facts) assert any("FAIL" in f for f in facts) def test_ignores_short_lines(self): raw = "ok\nhi\nyes\nStatus: this is long enough to be a fact" facts = extract_facts(raw) assert not any(f in ("ok", "hi", "yes") for f in facts) def test_deduplicates(self): raw = "Status: running value\nStatus: running value\nStatus: running value" facts = extract_facts(raw) assert facts.count("Status: running value") == 1 def test_limits_to_15(self): lines = [f"Key{i}: value number {i} with enough length" for i in range(30)] raw = "\n".join(lines) facts = extract_facts(raw) assert len(facts) <= 15 def test_empty_input(self): facts = extract_facts("") assert facts == [] # ===================================================================== # Tests: _build_summary # ===================================================================== class TestBuildSummary: def test_includes_tool_name(self): summary = build_summary("read_file", "line1\nline2\nline3", []) assert "read_file" in summary def test_includes_line_count(self): raw = "line1\nline2\nline3" summary = build_summary("my_tool", raw, []) assert "3 lines" in summary def test_includes_char_count(self): raw = "some content here" summary = build_summary("my_tool", raw, []) assert str(len(raw)) in summary def test_includes_facts_when_present(self): facts = ["Status: ok", "Count: 42"] summary = build_summary("my_tool", "data", facts) assert "Status: ok" in summary def test_includes_first_line(self): raw = "primera linea importante\nsegunda\ntercera" summary = build_summary("tool", raw, []) assert "primera linea importante" in summary # ===================================================================== # Tests: summarize_tool_output # ===================================================================== class TestSummarizeToolOutput: def test_returns_dict_with_correct_fields(self): result = summarize_tool_output( tool_name="read_file", raw_output="Status: ok\nContent: hello world here", session_id="sess-001", task_id="task-001", ) assert isinstance(result, dict) assert result["session_id"] == "sess-001" assert result["task_id"] == "task-001" assert result["source_tool"] == "read_file" assert result["title"] == "Output of read_file" assert result["artifact_id"] # no vacio assert result["summary"] # no vacio assert result["char_count"] > 0 def test_artifact_type_inference(self): assert summarize_tool_output("read_file", "x", "s", "t")["artifact_type"] == "code" assert summarize_tool_output("test_run", "x", "s", "t")["artifact_type"] == "test_result" assert summarize_tool_output("search_records", "x", "s", "t")["artifact_type"] == "analysis" assert summarize_tool_output("deploy_app", "x", "s", "t")["artifact_type"] == "general" def test_artifact_id_is_deterministic(self): r1 = summarize_tool_output("tool", "output", "s", "t") r2 = summarize_tool_output("tool", "output", "s", "t") assert r1["artifact_id"] == r2["artifact_id"] def test_artifact_id_length(self): result = summarize_tool_output("tool", "output", "s", "t") assert len(result["artifact_id"]) == 16 # ===================================================================== # Tests: compact_sections # ===================================================================== class TestCompactSections: def test_never_removes_immutable_rules(self): sections = [ Section( section_type="immutable_rules", content="You must always follow these rules " * 20, priority=100, ), Section( section_type="working_context", content="Some working context data " * 50, priority=1, ), ] result = compact_sections(sections, max_tokens=50) types = [s.section_type for s in result] assert "immutable_rules" in types def test_respects_priority_order(self): """Secciones de mayor prioridad sobreviven a la compactacion. Usamos un budget que cabe la seccion alta pero no ambas.""" high = Section( section_type="task_state", content="Important task data here", # ~6 tokens priority=90, ) low = Section( section_type="working_context", content="Low priority stuff " * 50, # ~250 tokens priority=1, ) # Budget suficiente para high (~6) pero no para high+low (~256) result = compact_sections([high, low], max_tokens=20) types = [s.section_type for s in result] assert "task_state" in types # La de baja prioridad deberia haberse eliminado o compactado assert len(result) <= 2 def test_no_compaction_when_within_budget(self): sections = [ Section( section_type="task_state", content="Short content", priority=50, ), ] result = compact_sections(sections, max_tokens=999_999) assert len(result) == 1 assert result[0].content == "Short content" def test_deduplicates_identical_sections(self): sections = [ Section(section_type="working_context", content="duplicated content", priority=10), Section(section_type="working_context", content="duplicated content", priority=10), ] result = compact_sections(sections, max_tokens=999_999) assert len(result) == 1