Files
agenticSystem/tests/test_tool_pairing_real.py
Jordan Diaz 79ec267aa6 Compactor: garantizar emparejamiento tool_use/tool_result (sesiones largas bloqueadas)
Las sesiones largas con DeepSeek quedaban bloqueadas permanentemente con
400 "Messages with role 'tool' must be a response to a preceding message
with 'tool_calls'": el paso de ultimo recurso del compactor colapsaba
assistants con tool_use a un string placeholder dejando huerfanos los
tool_result del user siguiente.

- compactor: paso de ultimo recurso pair-aware + _enforce_tool_pairing
  como invariante final (matching por IDs, ambas direcciones, repara
  tambien historiales ya corruptos persistidos).
- openai_adapter: _repair_tool_sequence como guard defensivo del contrato
  del proveedor (tool huerfano -> user; tool_call sin respuesta -> fuera),
  con warning para detectar regresiones.
- recent_messages: trim por presupuesto de tokens al persistir
  (AGENTIC_RECENT_MESSAGES_MAX_TOKENS, default 60k) sin cortar pares;
  cierra el crecimiento sin limite que empujaba al paso destructivo.
- tests/test_tool_pairing_real.py: 23 tests que importan el codigo REAL
  (a diferencia de los tests standalone existentes). Suite completa: 92 ok.

Verificado offline contra los recent_messages reales de la sesion
bloqueada en prod: 0 violaciones con presupuesto normal y agresivo.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 19:08:53 +00:00

586 lines
24 KiB
Python

"""Tests de REGRESION REAL del invariante tool_use ↔ tool_result.
A diferencia del resto de tests (que replican logica), este archivo importa el
codigo REAL de src/. Cubre el bug de produccion: sesiones largas (~130k tokens)
donde `compact_conversation` colapsaba assistants a "[ASSISTANT COMPACTADO]"
perdiendo los bloques `tool_use`, dejando tool_results huerfanos que el adapter
emitia como `role: tool` sin `tool_calls` → 400 del proveedor en cada reintento.
Requiere las dependencias de src/ (pydantic, Python 3.11+). Si no estan
disponibles (p.ej. host con Python 3.10), el modulo entero se salta — ejecutar
dentro del container: `docker exec acai-agentic python3 -m pytest ...`.
"""
import pytest
try:
from src.context.compactor import ContextCompactor
except Exception as e: # pragma: no cover - entorno sin deps de src/
pytest.skip(f"src/ no importable en este entorno: {e}", allow_module_level=True)
# =====================================================================
# Helper de validacion reutilizable
# =====================================================================
def collect_tool_use_ids(message: dict) -> set:
"""IDs de tool calls de un assistant (Anthropic blocks + OpenAI legacy)."""
ids = set()
content = message.get("content")
if isinstance(content, list):
for b in content:
if isinstance(b, dict) and b.get("type") == "tool_use":
ids.add(str(b.get("id", "")))
for tc in message.get("tool_calls") or []:
if isinstance(tc, dict):
ids.add(str(tc.get("id", "")))
ids.discard("")
return ids
def assert_tool_pairing_ok(messages: list) -> None:
"""Valida el invariante completo sobre una lista de mensajes internos:
- Todo tool_result (block) referencia un tool_use del assistant anterior.
- Todo tool_use (block) tiene su tool_result en el mensaje siguiente.
- Todo `role: tool` legacy responde a un tool_call del assistant previo.
"""
for i, msg in enumerate(messages):
role = msg.get("role")
content = msg.get("content")
if role == "user" and isinstance(content, list):
result_ids = {
str(b.get("tool_use_id", ""))
for b in content
if isinstance(b, dict) and b.get("type") == "tool_result"
}
if result_ids:
assert i > 0, f"msg[{i}]: tool_result al inicio de la conversacion"
prev = messages[i - 1]
assert prev.get("role") == "assistant", (
f"msg[{i}]: tool_result sin assistant inmediatamente anterior"
)
available = collect_tool_use_ids(prev)
orphans = result_ids - available
assert not orphans, (
f"msg[{i}]: tool_result huerfanos {orphans} "
f"(assistant previo solo tiene {available})"
)
if role == "assistant":
tool_ids = collect_tool_use_ids(msg)
if tool_ids:
answered = set()
j = i + 1
if (
j < len(messages)
and messages[j].get("role") == "user"
and isinstance(messages[j].get("content"), list)
):
for b in messages[j]["content"]:
if isinstance(b, dict) and b.get("type") == "tool_result":
answered.add(str(b.get("tool_use_id", "")))
j += 1
while j < len(messages) and messages[j].get("role") == "tool":
answered.add(str(messages[j].get("tool_call_id", "")))
j += 1
unanswered = tool_ids - answered
assert not unanswered, (
f"msg[{i}]: tool_use sin respuesta {unanswered}"
)
if role == "tool":
prev_assistant = None
for k in range(i - 1, -1, -1):
if messages[k].get("role") == "tool":
continue
if messages[k].get("role") == "assistant":
prev_assistant = messages[k]
break
assert prev_assistant is not None, (
f"msg[{i}]: role tool sin assistant previo"
)
call_id = str(msg.get("tool_call_id", ""))
assert call_id in collect_tool_use_ids(prev_assistant), (
f"msg[{i}]: role tool con tool_call_id={call_id} no presente "
f"en el assistant previo"
)
def make_turn(n: int, payload_chars: int = 4000) -> list:
"""Genera un turno completo: user → assistant(thinking+text+tool_use) →
user(tool_result). Payloads grandes para forzar la compactacion."""
tid = f"call_{n}"
return [
{"role": "user", "content": f"Peticion {n}: " + ("x" * payload_chars)},
{
"role": "assistant",
"content": [
{"type": "thinking", "thinking": "razonando " * (payload_chars // 10)},
{"type": "text", "text": f"Voy a ejecutar la tool del turno {n}."},
{
"type": "tool_use",
"id": tid,
"name": "acai_get_records",
"input": {"tableName": f"tabla_{n}"},
},
],
},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": tid,
"content": "resultado " * (payload_chars // 10),
}
],
},
]
# =====================================================================
# (a) compact_conversation end-to-end: el paso de ultimo recurso ya no
# deja tool_results huerfanos ni tool_use sin respuesta
# =====================================================================
class TestCompactConversationPairing:
def test_last_resort_does_not_orphan_tool_results(self):
compactor = ContextCompactor()
messages = []
for n in range(12):
messages.extend(make_turn(n, payload_chars=6000))
messages.append({"role": "user", "content": "ultima peticion del usuario"})
# Presupuesto minusculo: fuerza TODOS los pasos incluida la colapsa
# de listas a placeholder string (el paso que causaba el bug).
compacted, meta = compactor.compact_conversation(messages, max_tokens=300)
assert meta["output_tokens"] < meta["input_tokens"]
assert_tool_pairing_ok(compacted)
def test_moderate_budget_keeps_pairing(self):
compactor = ContextCompactor()
messages = []
for n in range(8):
messages.extend(make_turn(n, payload_chars=3000))
messages.append({"role": "user", "content": "peticion final"})
compacted, _ = compactor.compact_conversation(messages, max_tokens=2000)
assert_tool_pairing_ok(compacted)
def test_under_budget_passthrough_keeps_pairing(self):
compactor = ContextCompactor()
messages = make_turn(1, payload_chars=50)
compacted, meta = compactor.compact_conversation(messages, max_tokens=100_000)
assert meta["messages_compacted"] == 0
assert_tool_pairing_ok(compacted)
# Los tool_use/tool_result originales se conservan intactos
assert collect_tool_use_ids(compacted[1]) == {"call_1"}
def test_last_user_message_preserved(self):
compactor = ContextCompactor()
messages = []
for n in range(10):
messages.extend(make_turn(n, payload_chars=5000))
final = "esta es la peticion actual que NO debe perderse"
messages.append({"role": "user", "content": final})
compacted, _ = compactor.compact_conversation(messages, max_tokens=300)
assert compacted[-1]["content"] == final
# =====================================================================
# (b) _enforce_tool_pairing directo
# =====================================================================
class TestEnforceToolPairing:
def setup_method(self):
self.compactor = ContextCompactor()
def test_collapsed_assistant_with_orphan_tool_results(self):
"""Assistant colapsado a string + user con tool_results → los
tool_result se convierten en placeholder."""
messages = [
{"role": "assistant", "content": "[ASSISTANT COMPACTADO]"},
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "call_a", "content": "datos"},
{"type": "tool_result", "tool_use_id": "call_b", "content": "mas datos"},
],
},
]
repaired = self.compactor._enforce_tool_pairing(messages)
assert_tool_pairing_ok(repaired)
# Solo placeholders → content string (fusionados en uno)
assert repaired[1]["role"] == "user"
assert repaired[1]["content"] == "[Resultado de herramienta compactado]"
def test_orphan_tool_results_mixed_with_text(self):
"""tool_result huerfano junto a un bloque text → placeholder en lista,
el text se conserva."""
messages = [
{"role": "assistant", "content": "[ASSISTANT COMPACTADO]"},
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "call_a", "content": "datos"},
{"type": "text", "text": "y ademas haz esto"},
],
},
]
repaired = self.compactor._enforce_tool_pairing(messages)
assert_tool_pairing_ok(repaired)
content = repaired[1]["content"]
assert isinstance(content, list)
types = [b.get("type") for b in content]
assert types == ["text", "text"]
assert content[0]["text"] == "[Resultado de herramienta compactado]"
assert content[1]["text"] == "y ademas haz esto"
def test_partial_id_mismatch_drops_unanswered_tool_use(self):
"""Assistant con 3 tool_use, user con solo 2 tool_result → se elimina
el tool_use sin respuesta, thinking/text intactos."""
messages = [
{
"role": "assistant",
"content": [
{"type": "thinking", "thinking": "pensando"},
{"type": "text", "text": "ejecuto tres tools"},
{"type": "tool_use", "id": "c1", "name": "t1", "input": {}},
{"type": "tool_use", "id": "c2", "name": "t2", "input": {}},
{"type": "tool_use", "id": "c3", "name": "t3", "input": {}},
],
},
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "c1", "content": "r1"},
{"type": "tool_result", "tool_use_id": "c3", "content": "r3"},
],
},
]
repaired = self.compactor._enforce_tool_pairing(messages)
assert_tool_pairing_ok(repaired)
assert collect_tool_use_ids(repaired[0]) == {"c1", "c3"}
types = [b.get("type") for b in repaired[0]["content"]]
assert "thinking" in types and "text" in types
def test_assistant_tool_use_with_no_results_at_all(self):
"""Assistant con tool_use y SIN user de resultados detras → se
eliminan los tool_use; si el content queda vacio, placeholder."""
messages = [
{
"role": "assistant",
"content": [
{"type": "tool_use", "id": "c9", "name": "t", "input": {}},
],
},
{"role": "user", "content": "otra cosa"},
]
repaired = self.compactor._enforce_tool_pairing(messages)
assert_tool_pairing_ok(repaired)
assert repaired[0]["content"] == "[ASSISTANT COMPACTADO]"
def test_legacy_orphan_role_tool_converted_to_user(self):
"""role:tool legacy cuyo assistant anterior no tiene tool_calls →
se convierte a user placeholder."""
messages = [
{"role": "assistant", "content": "[ASSISTANT COMPACTADO]"},
{"role": "tool", "tool_call_id": "call_x", "content": "salida tool"},
]
repaired = self.compactor._enforce_tool_pairing(messages)
assert_tool_pairing_ok(repaired)
assert repaired[1]["role"] == "user"
assert repaired[1]["content"] == "[Resultado de herramienta compactado]"
def test_legacy_valid_role_tool_untouched(self):
messages = [
{
"role": "assistant",
"content": "lanzo tool",
"tool_calls": [
{"id": "call_x", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": "call_x", "content": "salida"},
]
repaired = self.compactor._enforce_tool_pairing(messages)
assert_tool_pairing_ok(repaired)
assert repaired[1]["role"] == "tool"
def test_well_paired_history_is_noop(self):
messages = make_turn(7, payload_chars=50)
repaired = self.compactor._enforce_tool_pairing(messages)
assert repaired == messages
# =====================================================================
# (c) Trim de recent_messages (OrchestratorEngine._trim_recent_messages)
# =====================================================================
orchestrator_engine = pytest.importorskip(
"src.orchestrator.engine",
reason="deps del orquestador (mcp, sse, redis) no disponibles",
)
OrchestratorEngine = orchestrator_engine.OrchestratorEngine
class TestTrimRecentMessages:
def _set_budget(self, monkeypatch, tokens: int):
from src.config import settings
monkeypatch.setattr(settings, "recent_messages_max_tokens", tokens)
def test_under_budget_untouched(self, monkeypatch):
self._set_budget(monkeypatch, 100_000)
messages = make_turn(0, payload_chars=100)
assert OrchestratorEngine._trim_recent_messages(list(messages)) == messages
def test_trims_oldest_whole_pairs(self, monkeypatch):
self._set_budget(monkeypatch, 500)
messages = []
for n in range(10):
messages.extend(make_turn(n, payload_chars=1000))
trimmed = OrchestratorEngine._trim_recent_messages(messages)
assert len(trimmed) < len(messages)
# Nunca se corta dentro de un par
assert_tool_pairing_ok(trimmed)
# El primer mensaje nunca es un carrier de tool_result ni role tool
first = trimmed[0]
assert first.get("role") != "tool"
if isinstance(first.get("content"), list):
assert not any(
isinstance(b, dict) and b.get("type") == "tool_result"
for b in first["content"]
)
# Se eliminan los mas antiguos: el final se conserva
assert trimmed[-1] == messages[-1]
def test_keeps_last_four_even_over_budget(self, monkeypatch):
self._set_budget(monkeypatch, 10) # presupuesto imposible
messages = []
for n in range(5):
messages.extend(make_turn(n, payload_chars=2000))
trimmed = OrchestratorEngine._trim_recent_messages(messages)
assert len(trimmed) >= 4
def test_pair_dragging_includes_legacy_tool_run(self, monkeypatch):
"""Un assistant legacy con tool_calls arrastra su run de role:tool."""
self._set_budget(monkeypatch, 300)
big = "y" * 3000
messages = [
{
"role": "assistant",
"content": big,
"tool_calls": [
{"id": "c1", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
{"id": "c2", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": "c1", "content": big},
{"role": "tool", "tool_call_id": "c2", "content": big},
{"role": "user", "content": "pregunta"},
{"role": "assistant", "content": "respuesta"},
{"role": "user", "content": "otra pregunta"},
{"role": "assistant", "content": "otra respuesta"},
]
trimmed = OrchestratorEngine._trim_recent_messages(messages)
# El par legacy entero (assistant + 2 tools) se elimino junto
assert trimmed[0] == {"role": "user", "content": "pregunta"}
assert_tool_pairing_ok(trimmed)
def test_append_recent_messages_applies_trim(self, monkeypatch):
self._set_budget(monkeypatch, 500)
existing = []
for n in range(10):
existing.extend(make_turn(n, payload_chars=1000))
merged = OrchestratorEngine._append_recent_messages(
existing, message="nueva peticion", conversation=[
{"role": "assistant", "content": "ok hecho"},
],
)
assert len(merged) < len(existing) + 2
assert merged[-1] == {"role": "assistant", "content": "ok hecho"}
assert_tool_pairing_ok(merged)
# =====================================================================
# (d) Guard defensivo del adapter (_repair_tool_sequence)
# =====================================================================
openai_mod = pytest.importorskip("openai", reason="SDK openai no instalado")
class TestRepairToolSequence:
@property
def repair(self):
from src.adapters.openai_adapter import OpenAIAdapter
return OpenAIAdapter._repair_tool_sequence
def test_valid_sequence_untouched(self):
msgs = [
{"role": "system", "content": "sys"},
{"role": "user", "content": "hola"},
{
"role": "assistant",
"content": None,
"tool_calls": [
{"id": "c1", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": "c1", "content": "resultado"},
{"role": "assistant", "content": "listo"},
]
assert self.repair(list(msgs)) == msgs
def test_orphan_tool_message_converted_to_user(self):
msgs = [
{"role": "assistant", "content": "[ASSISTANT COMPACTADO]"},
{"role": "tool", "tool_call_id": "c_orphan", "content": "datos " * 200},
]
out = self.repair(msgs)
assert out[1]["role"] == "user"
assert out[1]["content"].startswith(
"[Resultado de herramienta (contexto compactado)]: "
)
# Content truncado a 500 chars (+ prefijo)
assert len(out[1]["content"]) <= 500 + len(
"[Resultado de herramienta (contexto compactado)]: "
)
assert not any(m.get("role") == "tool" for m in out)
def test_unanswered_tool_calls_removed(self):
msgs = [
{
"role": "assistant",
"content": None,
"tool_calls": [
{"id": "c1", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
{"id": "c2", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": "c1", "content": "r1"},
{"role": "user", "content": "sigue"},
]
out = self.repair(msgs)
assert [tc["id"] for tc in out[0]["tool_calls"]] == ["c1"]
assert out[1] == {"role": "tool", "tool_call_id": "c1", "content": "r1"}
def test_all_tool_calls_unanswered_drops_key_and_sets_content(self):
msgs = [
{
"role": "assistant",
"content": None,
"tool_calls": [
{"id": "c1", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
],
},
{"role": "user", "content": "sigue"},
]
out = self.repair(msgs)
assert "tool_calls" not in out[0]
assert out[0]["content"] # nunca None sin tool_calls
def test_reasoning_promoted_when_tool_calls_dropped(self):
"""No romper la promocion de reasoning a content del fix anterior."""
msgs = [
{
"role": "assistant",
"content": None,
"reasoning_content": "razonamiento del modelo",
"tool_calls": [
{"id": "c1", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
],
},
{"role": "user", "content": "sigue"},
]
out = self.repair(msgs)
assert "tool_calls" not in out[0]
assert out[0]["content"] == "razonamiento del modelo"
assert "reasoning_content" not in out[0]
def test_mixed_orphan_in_tool_block(self):
"""Un huerfano en medio de un bloque de tools validos se convierte a
user DESPUES del bloque (no rompe la contiguidad assistant→tools)."""
msgs = [
{
"role": "assistant",
"content": None,
"tool_calls": [
{"id": "c1", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
{"id": "c2", "type": "function",
"function": {"name": "t", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": "c1", "content": "r1"},
{"role": "tool", "tool_call_id": "huerfano", "content": "rx"},
{"role": "tool", "tool_call_id": "c2", "content": "r2"},
{"role": "user", "content": "sigue"},
]
out = self.repair(msgs)
roles = [m["role"] for m in out]
assert roles == ["assistant", "tool", "tool", "user", "user"]
assert out[1]["tool_call_id"] == "c1"
assert out[2]["tool_call_id"] == "c2"
assert out[3]["content"].startswith("[Resultado de herramienta")
class TestAdapterEndToEnd:
"""_to_openai_messages + guard sobre un historial roto realista."""
def test_collapsed_assistant_history_produces_valid_openai_sequence(self):
from src.adapters.openai_adapter import OpenAIAdapter
adapter = OpenAIAdapter.__new__(OpenAIAdapter) # sin cliente real
internal = [
{"role": "system", "content": "eres un agente"},
{"role": "user", "content": "haz algo"},
# Assistant colapsado por el compactor (perdio sus tool_use)
{"role": "assistant", "content": "[ASSISTANT COMPACTADO]"},
# …pero el user conserva sus tool_results (el bug de produccion)
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "call_1", "content": "datos"},
],
},
{"role": "assistant", "content": "termine"},
{"role": "user", "content": "siguiente peticion"},
]
out = adapter._to_openai_messages(internal)
# Contrato OpenAI: ningun role:tool sin tool_calls previo
for i, m in enumerate(out):
if m.get("role") == "tool":
assert i > 0
prev = out[i - 1]
prev_ids = set()
k = i - 1
while k >= 0 and out[k].get("role") == "tool":
k -= 1
if k >= 0 and out[k].get("role") == "assistant":
prev_ids = {
tc.get("id") for tc in out[k].get("tool_calls") or []
}
assert m.get("tool_call_id") in prev_ids, (
f"role tool huerfano en out[{i}]"
)
# El tool_result huerfano acabo como user, no como role tool
assert not any(m.get("role") == "tool" for m in out)