"""Tests para la logica de _extract_key_data_from_results del OrchestratorEngine. Se replica la funcion como logica pura, sin importar src/ (evita dependencias). Los ToolExecution se representan como SimpleNamespace con .arguments y .tool_name. """ import json from types import SimpleNamespace from typing import Any import pytest def _make_tool_execution(tool_name: str, arguments: dict, raw_output: str = "") -> SimpleNamespace: """Crea un objeto similar a ToolExecution con los atributos necesarios.""" return SimpleNamespace( tool_name=tool_name, arguments=arguments, raw_output=raw_output, ) def _make_result(*tool_executions) -> dict: return {"tool_executions": list(tool_executions), "content": "ok"} def extract_key_data_from_results(results: list) -> dict: """Replica exacta de OrchestratorEngine._extract_key_data_from_results.""" key_data: dict[str, Any] = {} seen_tables: dict[str, list] = {} seen_sections: list = [] seen_modules: list = [] seen_pages: dict[str, int] = {} for result in results: for te in result.get("tool_executions", []): args = te.arguments name = te.tool_name table = args.get("tableName", "") record = args.get("recordNum") if table and record: record_int = int(record) if str(record).isdigit() else None if record_int and table not in seen_tables: seen_tables[table] = [] if record_int and record_int not in seen_tables.get(table, []): seen_tables[table].append(record_int) section = args.get("sectionId", "") if section and section not in seen_sections: seen_sections.append(section) module = args.get("moduleId", "") or args.get("moduleName", "") if module and module not in seen_modules: seen_modules.append(module) if te.raw_output and "enlace" in te.raw_output: try: for line in te.raw_output.splitlines(): line = line.strip() if line.startswith("{"): try: data = json.loads(line) if "enlace" in data and "num" in data: page_key = data.get("name", data["enlace"]) seen_pages[page_key] = int(data["num"]) except json.JSONDecodeError: pass except Exception: pass if seen_tables: key_data["tables"] = {t: nums[:10] for t, nums in seen_tables.items()} if seen_sections: key_data["sections"] = seen_sections[:20] if seen_modules: key_data["modules"] = seen_modules[:20] if seen_pages: key_data["pages"] = dict(list(seen_pages.items())[:20]) return key_data # ===================================================================== # Tests # ===================================================================== class TestExtractKeyDataFromResults: def test_extracts_table_and_record(self): te = _make_tool_execution("update_record", {"tableName": "pages", "recordNum": "42"}) key_data = extract_key_data_from_results([_make_result(te)]) assert "tables" in key_data assert "pages" in key_data["tables"] assert 42 in key_data["tables"]["pages"] def test_extracts_section_id(self): te = _make_tool_execution("get_section", {"sectionId": "hero-banner"}) key_data = extract_key_data_from_results([_make_result(te)]) assert "sections" in key_data assert "hero-banner" in key_data["sections"] def test_extracts_module_id(self): te = _make_tool_execution("compile_module", {"moduleId": "gallery-slider"}) key_data = extract_key_data_from_results([_make_result(te)]) assert "modules" in key_data assert "gallery-slider" in key_data["modules"] def test_extracts_module_name_fallback(self): te = _make_tool_execution("compile_module", {"moduleName": "contact-form"}) key_data = extract_key_data_from_results([_make_result(te)]) assert "modules" in key_data assert "contact-form" in key_data["modules"] def test_empty_results(self): key_data = extract_key_data_from_results([]) assert key_data == {} def test_no_tool_executions_in_result(self): key_data = extract_key_data_from_results([{"content": "x", "tool_executions": []}]) assert key_data == {} def test_result_without_tool_executions_key(self): key_data = extract_key_data_from_results([{"content": "just text"}]) assert key_data == {} def test_tool_execution_without_relevant_args(self): te = _make_tool_execution("read_file", {"path": "/var/www/index.html"}) key_data = extract_key_data_from_results([_make_result(te)]) assert key_data == {} def test_multiple_tables_and_records(self): te1 = _make_tool_execution("update_record", {"tableName": "pages", "recordNum": "1"}) te2 = _make_tool_execution("update_record", {"tableName": "pages", "recordNum": "5"}) te3 = _make_tool_execution("get_record", {"tableName": "blog", "recordNum": "10"}) key_data = extract_key_data_from_results([_make_result(te1, te2, te3)]) assert 1 in key_data["tables"]["pages"] assert 5 in key_data["tables"]["pages"] assert 10 in key_data["tables"]["blog"] def test_deduplicates_records(self): te1 = _make_tool_execution("a", {"tableName": "t", "recordNum": "7"}) te2 = _make_tool_execution("b", {"tableName": "t", "recordNum": "7"}) key_data = extract_key_data_from_results([_make_result(te1, te2)]) assert key_data["tables"]["t"].count(7) == 1 def test_extracts_pages_from_raw_output(self): raw = '{"enlace": "/contacto", "num": 15, "name": "Contacto"}\nother line' te = _make_tool_execution("list_pages", {"tableName": "web"}, raw_output=raw) key_data = extract_key_data_from_results([_make_result(te)]) assert "pages" in key_data assert key_data["pages"]["Contacto"] == 15