From 79ec267aa63b01a90c1e70a541441b77860e11be Mon Sep 17 00:00:00 2001 From: Jordan Diaz Date: Wed, 10 Jun 2026 19:08:53 +0000 Subject: [PATCH] Compactor: garantizar emparejamiento tool_use/tool_result (sesiones largas bloqueadas) Las sesiones largas con DeepSeek quedaban bloqueadas permanentemente con 400 "Messages with role 'tool' must be a response to a preceding message with 'tool_calls'": el paso de ultimo recurso del compactor colapsaba assistants con tool_use a un string placeholder dejando huerfanos los tool_result del user siguiente. - compactor: paso de ultimo recurso pair-aware + _enforce_tool_pairing como invariante final (matching por IDs, ambas direcciones, repara tambien historiales ya corruptos persistidos). - openai_adapter: _repair_tool_sequence como guard defensivo del contrato del proveedor (tool huerfano -> user; tool_call sin respuesta -> fuera), con warning para detectar regresiones. - recent_messages: trim por presupuesto de tokens al persistir (AGENTIC_RECENT_MESSAGES_MAX_TOKENS, default 60k) sin cortar pares; cierra el crecimiento sin limite que empujaba al paso destructivo. - tests/test_tool_pairing_real.py: 23 tests que importan el codigo REAL (a diferencia de los tests standalone existentes). Suite completa: 92 ok. Verificado offline contra los recent_messages reales de la sesion bloqueada en prod: 0 violaciones con presupuesto normal y agresivo. Co-Authored-By: Claude Fable 5 --- src/adapters/openai_adapter.py | 104 +++++- src/config.py | 4 + src/context/compactor.py | 231 ++++++++++++- src/orchestrator/engine.py | 73 +++- tests/test_context_budget.py | 31 +- tests/test_tool_pairing_real.py | 585 ++++++++++++++++++++++++++++++++ 6 files changed, 1020 insertions(+), 8 deletions(-) create mode 100644 tests/test_tool_pairing_real.py diff --git a/src/adapters/openai_adapter.py b/src/adapters/openai_adapter.py index 9563cd0..bac7896 100644 --- a/src/adapters/openai_adapter.py +++ b/src/adapters/openai_adapter.py @@ -444,4 +444,106 @@ class OpenAIAdapter(ModelAdapter): text_parts.append(b.get("text", "")) if text_parts: out.append({"role": "user", "content": "\n".join(text_parts)}) - return out + # Guard defensivo: el compactor ya garantiza el invariante tool_use ↔ + # tool_result (`_enforce_tool_pairing`), pero si algo se escapa el + # proveedor devuelve 400 y la sesion queda bloqueada. Cinturon y tirantes. + return self._repair_tool_sequence(out) + + @staticmethod + def _repair_tool_sequence(out: list[dict[str, Any]]) -> list[dict[str, Any]]: + """Garantiza el contrato OpenAI sobre la secuencia ya convertida: + + - Todo `role: tool` debe responder a un tool_call_id del assistant + inmediatamente anterior (o de su bloque contiguo de tool messages). + Si no → se convierte a user con placeholder. + - Todo assistant con `tool_calls` debe tener respuesta para CADA id. + Los tool_calls sin respuesta se eliminan; si la lista queda vacia se + elimina la key (y se asegura `content` no-None — "content or + tool_calls must be set"). + + No deberia activarse nunca (el compactor repara antes); si se activa, + loguea warning para detectar regresiones del compactor. + """ + repaired: list[dict[str, Any]] = [] + i = 0 + n = len(out) + while i < n: + msg = out[i] + role = msg.get("role") + + if role == "assistant" and msg.get("tool_calls"): + # Bloque contiguo de tool messages que responden a este assistant. + j = i + 1 + block: list[dict[str, Any]] = [] + while j < n and out[j].get("role") == "tool": + block.append(out[j]) + j += 1 + answered = {t.get("tool_call_id", "") for t in block} + kept_calls = [ + tc for tc in msg["tool_calls"] if tc.get("id", "") in answered + ] + dropped = [ + tc for tc in msg["tool_calls"] if tc.get("id", "") not in answered + ] + new_msg = dict(msg) + if dropped: + for tc in dropped: + logger.warning( + "repaired unanswered tool_call at index %d (tool_call_id=%s)", + i, + tc.get("id", ""), + ) + if kept_calls: + new_msg["tool_calls"] = kept_calls + else: + new_msg.pop("tool_calls", None) + if new_msg.get("content") is None: + # Promover reasoning a content si existe (mismo + # criterio que el quirk DeepSeek de arriba); si no, + # placeholder para no enviar content=None sin tools. + rc = new_msg.pop("reasoning_content", None) + new_msg["content"] = rc or "[ASSISTANT COMPACTADO]" + repaired.append(new_msg) + valid_ids = {tc.get("id", "") for tc in kept_calls} + converted: list[dict[str, Any]] = [] + for t in block: + if t.get("tool_call_id", "") in valid_ids: + repaired.append(t) + else: + logger.warning( + "repaired orphan tool message (tool_call_id=%s)", + t.get("tool_call_id", ""), + ) + converted.append( + { + "role": "user", + "content": "[Resultado de herramienta (contexto compactado)]: " + + str(t.get("content", ""))[:500], + } + ) + # Los huerfanos convertidos van DESPUES del bloque de tools + # validos para no romper la contiguidad assistant → tools. + repaired.extend(converted) + i = j + continue + + if role == "tool": + # Tool message sin assistant con tool_calls delante → huerfano. + logger.warning( + "repaired orphan tool message at index %d (tool_call_id=%s)", + i, + msg.get("tool_call_id", ""), + ) + repaired.append( + { + "role": "user", + "content": "[Resultado de herramienta (contexto compactado)]: " + + str(msg.get("content", ""))[:500], + } + ) + i += 1 + continue + + repaired.append(msg) + i += 1 + return repaired diff --git a/src/config.py b/src/config.py index 79a405f..4c546ae 100644 --- a/src/config.py +++ b/src/config.py @@ -102,6 +102,10 @@ class Settings(BaseSettings): conversation_recent_raw_limit: int = 2 task_history_max_entries: int = 20 task_history_max_tokens: int = 1500 + # Presupuesto de tokens para la ventana de recent_messages persistida en + # sesion. Sin esto crece sin limite y empuja al compactor a su paso + # destructivo (colapsar bloques perdiendo tool_use ids). 0 = sin limite. + recent_messages_max_tokens: int = 60_000 # --- MCP --- mcp_config_path: str = "" # Path to mcp.json; empty = legacy single-server mode diff --git a/src/context/compactor.py b/src/context/compactor.py index eab8922..fc5e0ee 100644 --- a/src/context/compactor.py +++ b/src/context/compactor.py @@ -180,7 +180,13 @@ class ContextCompactor: "raw_tool_results_kept": 0, } if total <= max_tokens: - return messages, meta + # Aunque no haga falta compactar, garantizamos el invariante + # tool_use/tool_result (repara historiales ya rotos persistidos). + repaired = self._enforce_tool_pairing([dict(m) for m in messages]) + meta["output_tokens"] = sum( + self._estimate_message_tokens(m) for m in repaired + ) + return repaired, meta compacted = [dict(m) for m in messages] last_user_idx = max( @@ -343,20 +349,241 @@ class ContextCompactor: message["content"] = "[USER CONTEXT COMPACTADO]" elif isinstance(content, list) and content: # Anthropic-style: reemplazar lista entera por placeholder string. - # Nota: pierde tool_use ids — solo aplicar al final como ultimo recurso. + # Nota: colapsar pierde los tool_use/tool_result ids, asi que + # lo hacemos PAIR-AWARE (colapsar un lado del par colapsa el + # otro en la misma iteracion) y ademas `_enforce_tool_pairing` + # al final garantiza el invariante aunque algo se escape. if role == "assistant": message["content"] = "[ASSISTANT COMPACTADO]" + # Si este assistant tenia tool_use, colapsar tambien el + # user de tool_results que lo sigue (mismo par). + if self._blocks_have_type(content, "tool_use"): + nxt = idx + 1 + if ( + nxt < len(compacted) + and nxt != last_user_idx + and compacted[nxt].get("role") == "user" + and self._blocks_have_type( + compacted[nxt].get("content"), "tool_result" + ) + ): + compacted[nxt]["content"] = "[USER CONTEXT COMPACTADO]" elif role == "user": message["content"] = "[USER CONTEXT COMPACTADO]" + # Si este user llevaba tool_results, colapsar tambien el + # assistant anterior con sus tool_use (mismo par). + if self._blocks_have_type(content, "tool_result"): + prv = idx - 1 + if ( + prv >= 0 + and compacted[prv].get("role") == "assistant" + and self._blocks_have_type( + compacted[prv].get("content"), "tool_use" + ) + ): + compacted[prv]["content"] = "[ASSISTANT COMPACTADO]" else: continue total = sum(self._estimate_message_tokens(m) for m in compacted) if total <= max_tokens: break + # Invariante final: tras toda la compactacion, reparar cualquier par + # tool_use/tool_result roto. Sin esto, un tool_result huerfano se emite + # como `role: tool` sin `tool_calls` previo y el proveedor devuelve 400 + # ("Messages with role 'tool' must be a response to a preceding message + # with 'tool_calls'"). + compacted = self._enforce_tool_pairing(compacted) + total = sum(self._estimate_message_tokens(m) for m in compacted) + meta["output_tokens"] = total return compacted, meta + # ------------------------------------------------------------------ + # Invariante tool_use ↔ tool_result + # ------------------------------------------------------------------ + + @staticmethod + def _blocks_have_type(content: Any, block_type: str) -> bool: + """True si `content` es una lista de bloques con alguno del tipo dado.""" + if not isinstance(content, list): + return False + return any( + isinstance(b, dict) and b.get("type") == block_type for b in content + ) + + @staticmethod + def _tool_use_ids(message: dict[str, Any]) -> set[str]: + """IDs de tool calls emitidos por un assistant (bloques `tool_use` + estilo Anthropic y/o `tool_calls` estilo OpenAI legacy).""" + ids: set[str] = set() + content = message.get("content") + if isinstance(content, list): + for b in content: + if isinstance(b, dict) and b.get("type") == "tool_use": + ids.add(str(b.get("id", ""))) + for tc in message.get("tool_calls") or []: + if isinstance(tc, dict): + ids.add(str(tc.get("id", ""))) + ids.discard("") + return ids + + def _enforce_tool_pairing( + self, messages: list[dict[str, Any]] + ) -> list[dict[str, Any]]: + """Repara el invariante tool_use ↔ tool_result en ambas direcciones. + + La compactacion puede colapsar el content de un assistant (perdiendo sus + bloques `tool_use`) mientras el user siguiente conserva sus `tool_result`, + o al reves. El matching es por IDs (`tool_use.id` vs `tool_result.tool_use_id` + y `tool_calls[].id` vs `tool_call_id`), no solo por adyacencia, asi que + tambien repara desajustes parciales (p.ej. 3 tool_use vs 2 tool_result). + + - tool_result sin tool_use previo → bloque text placeholder. + - tool_use sin tool_result siguiente → se elimina el bloque (thinking/text + se conservan; si el content queda vacio, placeholder string). + - `role: tool` legacy sin assistant con `tool_calls` → user placeholder. + """ + repaired: list[dict[str, Any]] = [] + for idx, msg in enumerate(messages): + role = msg.get("role", "") + content = msg.get("content") + + if role == "assistant": + tool_ids = self._tool_use_ids(msg) + if not tool_ids: + repaired.append(msg) + continue + # IDs respondidos: user con tool_results inmediato y/o run + # contiguo de mensajes legacy `role: tool`. + answered: set[str] = set() + j = idx + 1 + if ( + j < len(messages) + and messages[j].get("role") == "user" + and isinstance(messages[j].get("content"), list) + ): + for b in messages[j]["content"]: + if isinstance(b, dict) and b.get("type") == "tool_result": + answered.add(str(b.get("tool_use_id", ""))) + j += 1 + while j < len(messages) and messages[j].get("role") == "tool": + answered.add(str(messages[j].get("tool_call_id", ""))) + j += 1 + unanswered = tool_ids - answered + if not unanswered: + repaired.append(msg) + continue + # Eliminar los tool_use/tool_calls sin respuesta. + new_msg = dict(msg) + if isinstance(content, list): + new_content = [ + b + for b in content + if not ( + isinstance(b, dict) + and b.get("type") == "tool_use" + and str(b.get("id", "")) in unanswered + ) + ] + if not new_content: + new_msg["content"] = "[ASSISTANT COMPACTADO]" + else: + new_msg["content"] = new_content + if isinstance(new_msg.get("tool_calls"), list): + kept_calls = [ + tc + for tc in new_msg["tool_calls"] + if isinstance(tc, dict) + and str(tc.get("id", "")) not in unanswered + ] + if kept_calls: + new_msg["tool_calls"] = kept_calls + else: + new_msg.pop("tool_calls", None) + if not new_msg.get("content"): + new_msg["content"] = "[ASSISTANT COMPACTADO]" + repaired.append(new_msg) + continue + + if role == "user" and self._blocks_have_type(content, "tool_result"): + # IDs disponibles en el assistant inmediatamente anterior + # (YA reparado — usar `repaired[-1]` refleja los tool_use que + # sobrevivieron, no los del mensaje original). + available: set[str] = set() + if repaired and repaired[-1].get("role") == "assistant": + available = self._tool_use_ids(repaired[-1]) + new_content: list[Any] = [] + orphaned = False + for b in content: + if ( + isinstance(b, dict) + and b.get("type") == "tool_result" + and str(b.get("tool_use_id", "")) not in available + ): + orphaned = True + # Fusionar placeholders consecutivos en un unico bloque text. + if not ( + new_content + and isinstance(new_content[-1], dict) + and new_content[-1].get("type") == "text" + and new_content[-1].get("text") + == "[Resultado de herramienta compactado]" + ): + new_content.append( + { + "type": "text", + "text": "[Resultado de herramienta compactado]", + } + ) + continue + new_content.append(b) + if not orphaned: + repaired.append(msg) + continue + new_msg = dict(msg) + only_placeholders = all( + isinstance(b, dict) + and b.get("type") == "text" + and b.get("text") == "[Resultado de herramienta compactado]" + for b in new_content + ) + if not new_content or only_placeholders: + new_msg["content"] = "[Resultado de herramienta compactado]" + else: + new_msg["content"] = new_content + repaired.append(new_msg) + continue + + if role == "tool": + # Legacy: el assistant anterior (saltando otros `role: tool` + # contiguos) debe tener este tool_call_id en sus tool_calls. + prev_assistant: dict[str, Any] | None = None + for prev in reversed(repaired): + if prev.get("role") == "tool": + continue + if prev.get("role") == "assistant": + prev_assistant = prev + break + call_id = str(msg.get("tool_call_id", "")) + valid = ( + prev_assistant is not None + and call_id in self._tool_use_ids(prev_assistant) + ) + if valid: + repaired.append(msg) + else: + repaired.append( + { + "role": "user", + "content": "[Resultado de herramienta compactado]", + } + ) + continue + + repaired.append(msg) + return repaired + # ------------------------------------------------------------------ # Internals # ------------------------------------------------------------------ diff --git a/src/orchestrator/engine.py b/src/orchestrator/engine.py index 7b01232..e073f25 100644 --- a/src/orchestrator/engine.py +++ b/src/orchestrator/engine.py @@ -14,7 +14,7 @@ from typing import Any from ..adapters.base import ModelAdapter from ..config import settings from ..context.engine import ContextEngine -from ..context.compactor import estimate_tokens +from ..context.compactor import ContextCompactor, estimate_tokens from ..mcp.manager import MCPManager from ..memory.store import MemoryStore from ..models.agent import AgentProfile @@ -260,7 +260,76 @@ class OrchestratorEngine: current_turn.append(sanitized) merged.extend(current_turn) - return merged + return OrchestratorEngine._trim_recent_messages(merged) + + @staticmethod + def _trim_recent_messages( + messages: list[dict[str, Any]], + ) -> list[dict[str, Any]]: + """Recorta recent_messages a un presupuesto de tokens eliminando + mensajes ENTEROS desde el principio (los mas antiguos). + + Dos reglas para no romper el invariante tool_use ↔ tool_result: + - Nunca cortar dentro de un par: si se elimina un assistant con + tool_use, se eliminan tambien sus tool_results (user carrier o run + de mensajes legacy `role: tool`). + - El primer mensaje resultante nunca puede ser un carrier de + tool_result ni un `role: tool`. + + Mantiene siempre al menos los ultimos 4 mensajes aunque excedan el + presupuesto. + """ + budget = settings.recent_messages_max_tokens + if budget <= 0 or not messages: + return messages + + estimate = ContextCompactor._estimate_message_tokens + total = sum(estimate(m) for m in messages) + if total <= budget: + return messages + + def _is_tool_result_carrier(msg: dict[str, Any]) -> bool: + if msg.get("role") == "tool": + return True + if msg.get("role") != "user": + return False + content = msg.get("content") + return isinstance(content, list) and any( + isinstance(b, dict) and b.get("type") == "tool_result" + for b in content + ) + + def _has_tool_use(msg: dict[str, Any]) -> bool: + if msg.get("role") != "assistant": + return False + if msg.get("tool_calls"): + return True + content = msg.get("content") + return isinstance(content, list) and any( + isinstance(b, dict) and b.get("type") == "tool_use" + for b in content + ) + + min_keep = 4 + n = len(messages) + start = 0 + while total > budget and start < n - min_keep: + end = start + 1 + if _has_tool_use(messages[start]): + # Arrastrar los tool_results del par (no cortar dentro de el). + while end < n and _is_tool_result_carrier(messages[end]): + end += 1 + if n - end < min_keep: + break # Eliminar el par completo invadiria los ultimos min_keep + for k in range(start, end): + total -= estimate(messages[k]) + start = end + + trimmed = messages[start:] + # El primer mensaje nunca puede ser un tool_result sin su tool_use. + while trimmed and _is_tool_result_carrier(trimmed[0]): + trimmed.pop(0) + return trimmed @staticmethod def _sanitize_recent_message(message: dict[str, Any]) -> dict[str, Any]: diff --git a/tests/test_context_budget.py b/tests/test_context_budget.py index 69c8726..415a751 100644 --- a/tests/test_context_budget.py +++ b/tests/test_context_budget.py @@ -294,11 +294,27 @@ class TestTaskHistoryTrim: class TestConversationCompaction: def test_compactor_preserves_last_user_and_compacts_old_tool_results(self): compactor = ContextCompactor(max_tokens=999999) + # Los assistants llevan sus tool_calls: sin ellos los `role: tool` + # serian huerfanos y `_enforce_tool_pairing` los convertiria a user. messages = [ {"role": "user", "content": "Contexto anterior " * 10}, - {"role": "assistant", "content": "Voy a revisar el modulo ahora mismo. " * 6}, + { + "role": "assistant", + "content": "Voy a revisar el modulo ahora mismo. " * 6, + "tool_calls": [ + {"id": "tool-1", "type": "function", + "function": {"name": "t", "arguments": "{}"}}, + ], + }, {"role": "tool", "tool_call_id": "tool-1", "content": "resultado antiguo\n" * 80}, - {"role": "assistant", "content": "He visto el resultado anterior. " * 6}, + { + "role": "assistant", + "content": "He visto el resultado anterior. " * 6, + "tool_calls": [ + {"id": "tool-2", "type": "function", + "function": {"name": "t", "arguments": "{}"}}, + ], + }, {"role": "tool", "tool_call_id": "tool-2", "content": "resultado reciente\n" * 80}, {"role": "user", "content": "Este es el ultimo mensaje del usuario y debe quedar intacto."}, ] @@ -358,9 +374,18 @@ class TestConversationCompaction: def test_compactor_only_touches_user_messages_as_last_resort(self): compactor = ContextCompactor(max_tokens=999999) + # tool_calls en el assistant para que el `role: tool` no sea huerfano + # (el invariante `_enforce_tool_pairing` convertiria un huerfano a user). messages = [ {"role": "user", "content": "Contexto previo del usuario " * 8}, - {"role": "assistant", "content": "Respuesta previa del asistente " * 6}, + { + "role": "assistant", + "content": "Respuesta previa del asistente " * 6, + "tool_calls": [ + {"id": "tool-1", "type": "function", + "function": {"name": "t", "arguments": "{}"}}, + ], + }, {"role": "tool", "tool_call_id": "tool-1", "content": "resultado viejo\n" * 80}, {"role": "user", "content": "Ultimo mensaje del usuario"}, ] diff --git a/tests/test_tool_pairing_real.py b/tests/test_tool_pairing_real.py new file mode 100644 index 0000000..77150cd --- /dev/null +++ b/tests/test_tool_pairing_real.py @@ -0,0 +1,585 @@ +"""Tests de REGRESION REAL del invariante tool_use ↔ tool_result. + +A diferencia del resto de tests (que replican logica), este archivo importa el +codigo REAL de src/. Cubre el bug de produccion: sesiones largas (~130k tokens) +donde `compact_conversation` colapsaba assistants a "[ASSISTANT COMPACTADO]" +perdiendo los bloques `tool_use`, dejando tool_results huerfanos que el adapter +emitia como `role: tool` sin `tool_calls` → 400 del proveedor en cada reintento. + +Requiere las dependencias de src/ (pydantic, Python 3.11+). Si no estan +disponibles (p.ej. host con Python 3.10), el modulo entero se salta — ejecutar +dentro del container: `docker exec acai-agentic python3 -m pytest ...`. +""" + +import pytest + +try: + from src.context.compactor import ContextCompactor +except Exception as e: # pragma: no cover - entorno sin deps de src/ + pytest.skip(f"src/ no importable en este entorno: {e}", allow_module_level=True) + + +# ===================================================================== +# Helper de validacion reutilizable +# ===================================================================== + + +def collect_tool_use_ids(message: dict) -> set: + """IDs de tool calls de un assistant (Anthropic blocks + OpenAI legacy).""" + ids = set() + content = message.get("content") + if isinstance(content, list): + for b in content: + if isinstance(b, dict) and b.get("type") == "tool_use": + ids.add(str(b.get("id", ""))) + for tc in message.get("tool_calls") or []: + if isinstance(tc, dict): + ids.add(str(tc.get("id", ""))) + ids.discard("") + return ids + + +def assert_tool_pairing_ok(messages: list) -> None: + """Valida el invariante completo sobre una lista de mensajes internos: + + - Todo tool_result (block) referencia un tool_use del assistant anterior. + - Todo tool_use (block) tiene su tool_result en el mensaje siguiente. + - Todo `role: tool` legacy responde a un tool_call del assistant previo. + """ + for i, msg in enumerate(messages): + role = msg.get("role") + content = msg.get("content") + + if role == "user" and isinstance(content, list): + result_ids = { + str(b.get("tool_use_id", "")) + for b in content + if isinstance(b, dict) and b.get("type") == "tool_result" + } + if result_ids: + assert i > 0, f"msg[{i}]: tool_result al inicio de la conversacion" + prev = messages[i - 1] + assert prev.get("role") == "assistant", ( + f"msg[{i}]: tool_result sin assistant inmediatamente anterior" + ) + available = collect_tool_use_ids(prev) + orphans = result_ids - available + assert not orphans, ( + f"msg[{i}]: tool_result huerfanos {orphans} " + f"(assistant previo solo tiene {available})" + ) + + if role == "assistant": + tool_ids = collect_tool_use_ids(msg) + if tool_ids: + answered = set() + j = i + 1 + if ( + j < len(messages) + and messages[j].get("role") == "user" + and isinstance(messages[j].get("content"), list) + ): + for b in messages[j]["content"]: + if isinstance(b, dict) and b.get("type") == "tool_result": + answered.add(str(b.get("tool_use_id", ""))) + j += 1 + while j < len(messages) and messages[j].get("role") == "tool": + answered.add(str(messages[j].get("tool_call_id", ""))) + j += 1 + unanswered = tool_ids - answered + assert not unanswered, ( + f"msg[{i}]: tool_use sin respuesta {unanswered}" + ) + + if role == "tool": + prev_assistant = None + for k in range(i - 1, -1, -1): + if messages[k].get("role") == "tool": + continue + if messages[k].get("role") == "assistant": + prev_assistant = messages[k] + break + assert prev_assistant is not None, ( + f"msg[{i}]: role tool sin assistant previo" + ) + call_id = str(msg.get("tool_call_id", "")) + assert call_id in collect_tool_use_ids(prev_assistant), ( + f"msg[{i}]: role tool con tool_call_id={call_id} no presente " + f"en el assistant previo" + ) + + +def make_turn(n: int, payload_chars: int = 4000) -> list: + """Genera un turno completo: user → assistant(thinking+text+tool_use) → + user(tool_result). Payloads grandes para forzar la compactacion.""" + tid = f"call_{n}" + return [ + {"role": "user", "content": f"Peticion {n}: " + ("x" * payload_chars)}, + { + "role": "assistant", + "content": [ + {"type": "thinking", "thinking": "razonando " * (payload_chars // 10)}, + {"type": "text", "text": f"Voy a ejecutar la tool del turno {n}."}, + { + "type": "tool_use", + "id": tid, + "name": "acai_get_records", + "input": {"tableName": f"tabla_{n}"}, + }, + ], + }, + { + "role": "user", + "content": [ + { + "type": "tool_result", + "tool_use_id": tid, + "content": "resultado " * (payload_chars // 10), + } + ], + }, + ] + + +# ===================================================================== +# (a) compact_conversation end-to-end: el paso de ultimo recurso ya no +# deja tool_results huerfanos ni tool_use sin respuesta +# ===================================================================== + + +class TestCompactConversationPairing: + def test_last_resort_does_not_orphan_tool_results(self): + compactor = ContextCompactor() + messages = [] + for n in range(12): + messages.extend(make_turn(n, payload_chars=6000)) + messages.append({"role": "user", "content": "ultima peticion del usuario"}) + + # Presupuesto minusculo: fuerza TODOS los pasos incluida la colapsa + # de listas a placeholder string (el paso que causaba el bug). + compacted, meta = compactor.compact_conversation(messages, max_tokens=300) + + assert meta["output_tokens"] < meta["input_tokens"] + assert_tool_pairing_ok(compacted) + + def test_moderate_budget_keeps_pairing(self): + compactor = ContextCompactor() + messages = [] + for n in range(8): + messages.extend(make_turn(n, payload_chars=3000)) + messages.append({"role": "user", "content": "peticion final"}) + + compacted, _ = compactor.compact_conversation(messages, max_tokens=2000) + assert_tool_pairing_ok(compacted) + + def test_under_budget_passthrough_keeps_pairing(self): + compactor = ContextCompactor() + messages = make_turn(1, payload_chars=50) + compacted, meta = compactor.compact_conversation(messages, max_tokens=100_000) + assert meta["messages_compacted"] == 0 + assert_tool_pairing_ok(compacted) + # Los tool_use/tool_result originales se conservan intactos + assert collect_tool_use_ids(compacted[1]) == {"call_1"} + + def test_last_user_message_preserved(self): + compactor = ContextCompactor() + messages = [] + for n in range(10): + messages.extend(make_turn(n, payload_chars=5000)) + final = "esta es la peticion actual que NO debe perderse" + messages.append({"role": "user", "content": final}) + + compacted, _ = compactor.compact_conversation(messages, max_tokens=300) + assert compacted[-1]["content"] == final + + +# ===================================================================== +# (b) _enforce_tool_pairing directo +# ===================================================================== + + +class TestEnforceToolPairing: + def setup_method(self): + self.compactor = ContextCompactor() + + def test_collapsed_assistant_with_orphan_tool_results(self): + """Assistant colapsado a string + user con tool_results → los + tool_result se convierten en placeholder.""" + messages = [ + {"role": "assistant", "content": "[ASSISTANT COMPACTADO]"}, + { + "role": "user", + "content": [ + {"type": "tool_result", "tool_use_id": "call_a", "content": "datos"}, + {"type": "tool_result", "tool_use_id": "call_b", "content": "mas datos"}, + ], + }, + ] + repaired = self.compactor._enforce_tool_pairing(messages) + assert_tool_pairing_ok(repaired) + # Solo placeholders → content string (fusionados en uno) + assert repaired[1]["role"] == "user" + assert repaired[1]["content"] == "[Resultado de herramienta compactado]" + + def test_orphan_tool_results_mixed_with_text(self): + """tool_result huerfano junto a un bloque text → placeholder en lista, + el text se conserva.""" + messages = [ + {"role": "assistant", "content": "[ASSISTANT COMPACTADO]"}, + { + "role": "user", + "content": [ + {"type": "tool_result", "tool_use_id": "call_a", "content": "datos"}, + {"type": "text", "text": "y ademas haz esto"}, + ], + }, + ] + repaired = self.compactor._enforce_tool_pairing(messages) + assert_tool_pairing_ok(repaired) + content = repaired[1]["content"] + assert isinstance(content, list) + types = [b.get("type") for b in content] + assert types == ["text", "text"] + assert content[0]["text"] == "[Resultado de herramienta compactado]" + assert content[1]["text"] == "y ademas haz esto" + + def test_partial_id_mismatch_drops_unanswered_tool_use(self): + """Assistant con 3 tool_use, user con solo 2 tool_result → se elimina + el tool_use sin respuesta, thinking/text intactos.""" + messages = [ + { + "role": "assistant", + "content": [ + {"type": "thinking", "thinking": "pensando"}, + {"type": "text", "text": "ejecuto tres tools"}, + {"type": "tool_use", "id": "c1", "name": "t1", "input": {}}, + {"type": "tool_use", "id": "c2", "name": "t2", "input": {}}, + {"type": "tool_use", "id": "c3", "name": "t3", "input": {}}, + ], + }, + { + "role": "user", + "content": [ + {"type": "tool_result", "tool_use_id": "c1", "content": "r1"}, + {"type": "tool_result", "tool_use_id": "c3", "content": "r3"}, + ], + }, + ] + repaired = self.compactor._enforce_tool_pairing(messages) + assert_tool_pairing_ok(repaired) + assert collect_tool_use_ids(repaired[0]) == {"c1", "c3"} + types = [b.get("type") for b in repaired[0]["content"]] + assert "thinking" in types and "text" in types + + def test_assistant_tool_use_with_no_results_at_all(self): + """Assistant con tool_use y SIN user de resultados detras → se + eliminan los tool_use; si el content queda vacio, placeholder.""" + messages = [ + { + "role": "assistant", + "content": [ + {"type": "tool_use", "id": "c9", "name": "t", "input": {}}, + ], + }, + {"role": "user", "content": "otra cosa"}, + ] + repaired = self.compactor._enforce_tool_pairing(messages) + assert_tool_pairing_ok(repaired) + assert repaired[0]["content"] == "[ASSISTANT COMPACTADO]" + + def test_legacy_orphan_role_tool_converted_to_user(self): + """role:tool legacy cuyo assistant anterior no tiene tool_calls → + se convierte a user placeholder.""" + messages = [ + {"role": "assistant", "content": "[ASSISTANT COMPACTADO]"}, + {"role": "tool", "tool_call_id": "call_x", "content": "salida tool"}, + ] + repaired = self.compactor._enforce_tool_pairing(messages) + assert_tool_pairing_ok(repaired) + assert repaired[1]["role"] == "user" + assert repaired[1]["content"] == "[Resultado de herramienta compactado]" + + def test_legacy_valid_role_tool_untouched(self): + messages = [ + { + "role": "assistant", + "content": "lanzo tool", + "tool_calls": [ + {"id": "call_x", "type": "function", + "function": {"name": "t", "arguments": "{}"}}, + ], + }, + {"role": "tool", "tool_call_id": "call_x", "content": "salida"}, + ] + repaired = self.compactor._enforce_tool_pairing(messages) + assert_tool_pairing_ok(repaired) + assert repaired[1]["role"] == "tool" + + def test_well_paired_history_is_noop(self): + messages = make_turn(7, payload_chars=50) + repaired = self.compactor._enforce_tool_pairing(messages) + assert repaired == messages + + +# ===================================================================== +# (c) Trim de recent_messages (OrchestratorEngine._trim_recent_messages) +# ===================================================================== + + +orchestrator_engine = pytest.importorskip( + "src.orchestrator.engine", + reason="deps del orquestador (mcp, sse, redis) no disponibles", +) +OrchestratorEngine = orchestrator_engine.OrchestratorEngine + + +class TestTrimRecentMessages: + def _set_budget(self, monkeypatch, tokens: int): + from src.config import settings + monkeypatch.setattr(settings, "recent_messages_max_tokens", tokens) + + def test_under_budget_untouched(self, monkeypatch): + self._set_budget(monkeypatch, 100_000) + messages = make_turn(0, payload_chars=100) + assert OrchestratorEngine._trim_recent_messages(list(messages)) == messages + + def test_trims_oldest_whole_pairs(self, monkeypatch): + self._set_budget(monkeypatch, 500) + messages = [] + for n in range(10): + messages.extend(make_turn(n, payload_chars=1000)) + trimmed = OrchestratorEngine._trim_recent_messages(messages) + + assert len(trimmed) < len(messages) + # Nunca se corta dentro de un par + assert_tool_pairing_ok(trimmed) + # El primer mensaje nunca es un carrier de tool_result ni role tool + first = trimmed[0] + assert first.get("role") != "tool" + if isinstance(first.get("content"), list): + assert not any( + isinstance(b, dict) and b.get("type") == "tool_result" + for b in first["content"] + ) + # Se eliminan los mas antiguos: el final se conserva + assert trimmed[-1] == messages[-1] + + def test_keeps_last_four_even_over_budget(self, monkeypatch): + self._set_budget(monkeypatch, 10) # presupuesto imposible + messages = [] + for n in range(5): + messages.extend(make_turn(n, payload_chars=2000)) + trimmed = OrchestratorEngine._trim_recent_messages(messages) + assert len(trimmed) >= 4 + + def test_pair_dragging_includes_legacy_tool_run(self, monkeypatch): + """Un assistant legacy con tool_calls arrastra su run de role:tool.""" + self._set_budget(monkeypatch, 300) + big = "y" * 3000 + messages = [ + { + "role": "assistant", + "content": big, + "tool_calls": [ + {"id": "c1", "type": "function", + "function": {"name": "t", "arguments": "{}"}}, + {"id": "c2", "type": "function", + "function": {"name": "t", "arguments": "{}"}}, + ], + }, + {"role": "tool", "tool_call_id": "c1", "content": big}, + {"role": "tool", "tool_call_id": "c2", "content": big}, + {"role": "user", "content": "pregunta"}, + {"role": "assistant", "content": "respuesta"}, + {"role": "user", "content": "otra pregunta"}, + {"role": "assistant", "content": "otra respuesta"}, + ] + trimmed = OrchestratorEngine._trim_recent_messages(messages) + # El par legacy entero (assistant + 2 tools) se elimino junto + assert trimmed[0] == {"role": "user", "content": "pregunta"} + assert_tool_pairing_ok(trimmed) + + def test_append_recent_messages_applies_trim(self, monkeypatch): + self._set_budget(monkeypatch, 500) + existing = [] + for n in range(10): + existing.extend(make_turn(n, payload_chars=1000)) + merged = OrchestratorEngine._append_recent_messages( + existing, message="nueva peticion", conversation=[ + {"role": "assistant", "content": "ok hecho"}, + ], + ) + assert len(merged) < len(existing) + 2 + assert merged[-1] == {"role": "assistant", "content": "ok hecho"} + assert_tool_pairing_ok(merged) + + +# ===================================================================== +# (d) Guard defensivo del adapter (_repair_tool_sequence) +# ===================================================================== + + +openai_mod = pytest.importorskip("openai", reason="SDK openai no instalado") + + +class TestRepairToolSequence: + @property + def repair(self): + from src.adapters.openai_adapter import OpenAIAdapter + return OpenAIAdapter._repair_tool_sequence + + def test_valid_sequence_untouched(self): + msgs = [ + {"role": "system", "content": "sys"}, + {"role": "user", "content": "hola"}, + { + "role": "assistant", + "content": None, + "tool_calls": [ + {"id": "c1", "type": "function", + "function": {"name": "t", "arguments": "{}"}}, + ], + }, + {"role": "tool", "tool_call_id": "c1", "content": "resultado"}, + {"role": "assistant", "content": "listo"}, + ] + assert self.repair(list(msgs)) == msgs + + def test_orphan_tool_message_converted_to_user(self): + msgs = [ + {"role": "assistant", "content": "[ASSISTANT COMPACTADO]"}, + {"role": "tool", "tool_call_id": "c_orphan", "content": "datos " * 200}, + ] + out = self.repair(msgs) + assert out[1]["role"] == "user" + assert out[1]["content"].startswith( + "[Resultado de herramienta (contexto compactado)]: " + ) + # Content truncado a 500 chars (+ prefijo) + assert len(out[1]["content"]) <= 500 + len( + "[Resultado de herramienta (contexto compactado)]: " + ) + assert not any(m.get("role") == "tool" for m in out) + + def test_unanswered_tool_calls_removed(self): + msgs = [ + { + "role": "assistant", + "content": None, + "tool_calls": [ + {"id": "c1", "type": "function", + "function": {"name": "t", "arguments": "{}"}}, + {"id": "c2", "type": "function", + "function": {"name": "t", "arguments": "{}"}}, + ], + }, + {"role": "tool", "tool_call_id": "c1", "content": "r1"}, + {"role": "user", "content": "sigue"}, + ] + out = self.repair(msgs) + assert [tc["id"] for tc in out[0]["tool_calls"]] == ["c1"] + assert out[1] == {"role": "tool", "tool_call_id": "c1", "content": "r1"} + + def test_all_tool_calls_unanswered_drops_key_and_sets_content(self): + msgs = [ + { + "role": "assistant", + "content": None, + "tool_calls": [ + {"id": "c1", "type": "function", + "function": {"name": "t", "arguments": "{}"}}, + ], + }, + {"role": "user", "content": "sigue"}, + ] + out = self.repair(msgs) + assert "tool_calls" not in out[0] + assert out[0]["content"] # nunca None sin tool_calls + + def test_reasoning_promoted_when_tool_calls_dropped(self): + """No romper la promocion de reasoning a content del fix anterior.""" + msgs = [ + { + "role": "assistant", + "content": None, + "reasoning_content": "razonamiento del modelo", + "tool_calls": [ + {"id": "c1", "type": "function", + "function": {"name": "t", "arguments": "{}"}}, + ], + }, + {"role": "user", "content": "sigue"}, + ] + out = self.repair(msgs) + assert "tool_calls" not in out[0] + assert out[0]["content"] == "razonamiento del modelo" + assert "reasoning_content" not in out[0] + + def test_mixed_orphan_in_tool_block(self): + """Un huerfano en medio de un bloque de tools validos se convierte a + user DESPUES del bloque (no rompe la contiguidad assistant→tools).""" + msgs = [ + { + "role": "assistant", + "content": None, + "tool_calls": [ + {"id": "c1", "type": "function", + "function": {"name": "t", "arguments": "{}"}}, + {"id": "c2", "type": "function", + "function": {"name": "t", "arguments": "{}"}}, + ], + }, + {"role": "tool", "tool_call_id": "c1", "content": "r1"}, + {"role": "tool", "tool_call_id": "huerfano", "content": "rx"}, + {"role": "tool", "tool_call_id": "c2", "content": "r2"}, + {"role": "user", "content": "sigue"}, + ] + out = self.repair(msgs) + roles = [m["role"] for m in out] + assert roles == ["assistant", "tool", "tool", "user", "user"] + assert out[1]["tool_call_id"] == "c1" + assert out[2]["tool_call_id"] == "c2" + assert out[3]["content"].startswith("[Resultado de herramienta") + + +class TestAdapterEndToEnd: + """_to_openai_messages + guard sobre un historial roto realista.""" + + def test_collapsed_assistant_history_produces_valid_openai_sequence(self): + from src.adapters.openai_adapter import OpenAIAdapter + adapter = OpenAIAdapter.__new__(OpenAIAdapter) # sin cliente real + + internal = [ + {"role": "system", "content": "eres un agente"}, + {"role": "user", "content": "haz algo"}, + # Assistant colapsado por el compactor (perdio sus tool_use) + {"role": "assistant", "content": "[ASSISTANT COMPACTADO]"}, + # …pero el user conserva sus tool_results (el bug de produccion) + { + "role": "user", + "content": [ + {"type": "tool_result", "tool_use_id": "call_1", "content": "datos"}, + ], + }, + {"role": "assistant", "content": "termine"}, + {"role": "user", "content": "siguiente peticion"}, + ] + out = adapter._to_openai_messages(internal) + # Contrato OpenAI: ningun role:tool sin tool_calls previo + for i, m in enumerate(out): + if m.get("role") == "tool": + assert i > 0 + prev = out[i - 1] + prev_ids = set() + k = i - 1 + while k >= 0 and out[k].get("role") == "tool": + k -= 1 + if k >= 0 and out[k].get("role") == "assistant": + prev_ids = { + tc.get("id") for tc in out[k].get("tool_calls") or [] + } + assert m.get("tool_call_id") in prev_ids, ( + f"role tool huerfano en out[{i}]" + ) + # El tool_result huerfano acabo como user, no como role tool + assert not any(m.get("role") == "tool" for m in out)