Compactor: garantizar emparejamiento tool_use/tool_result (sesiones largas bloqueadas)
Las sesiones largas con DeepSeek quedaban bloqueadas permanentemente con 400 "Messages with role 'tool' must be a response to a preceding message with 'tool_calls'": el paso de ultimo recurso del compactor colapsaba assistants con tool_use a un string placeholder dejando huerfanos los tool_result del user siguiente. - compactor: paso de ultimo recurso pair-aware + _enforce_tool_pairing como invariante final (matching por IDs, ambas direcciones, repara tambien historiales ya corruptos persistidos). - openai_adapter: _repair_tool_sequence como guard defensivo del contrato del proveedor (tool huerfano -> user; tool_call sin respuesta -> fuera), con warning para detectar regresiones. - recent_messages: trim por presupuesto de tokens al persistir (AGENTIC_RECENT_MESSAGES_MAX_TOKENS, default 60k) sin cortar pares; cierra el crecimiento sin limite que empujaba al paso destructivo. - tests/test_tool_pairing_real.py: 23 tests que importan el codigo REAL (a diferencia de los tests standalone existentes). Suite completa: 92 ok. Verificado offline contra los recent_messages reales de la sesion bloqueada en prod: 0 violaciones con presupuesto normal y agresivo. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
@@ -444,4 +444,106 @@ class OpenAIAdapter(ModelAdapter):
|
||||
text_parts.append(b.get("text", ""))
|
||||
if text_parts:
|
||||
out.append({"role": "user", "content": "\n".join(text_parts)})
|
||||
return out
|
||||
# Guard defensivo: el compactor ya garantiza el invariante tool_use ↔
|
||||
# tool_result (`_enforce_tool_pairing`), pero si algo se escapa el
|
||||
# proveedor devuelve 400 y la sesion queda bloqueada. Cinturon y tirantes.
|
||||
return self._repair_tool_sequence(out)
|
||||
|
||||
@staticmethod
|
||||
def _repair_tool_sequence(out: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||||
"""Garantiza el contrato OpenAI sobre la secuencia ya convertida:
|
||||
|
||||
- Todo `role: tool` debe responder a un tool_call_id del assistant
|
||||
inmediatamente anterior (o de su bloque contiguo de tool messages).
|
||||
Si no → se convierte a user con placeholder.
|
||||
- Todo assistant con `tool_calls` debe tener respuesta para CADA id.
|
||||
Los tool_calls sin respuesta se eliminan; si la lista queda vacia se
|
||||
elimina la key (y se asegura `content` no-None — "content or
|
||||
tool_calls must be set").
|
||||
|
||||
No deberia activarse nunca (el compactor repara antes); si se activa,
|
||||
loguea warning para detectar regresiones del compactor.
|
||||
"""
|
||||
repaired: list[dict[str, Any]] = []
|
||||
i = 0
|
||||
n = len(out)
|
||||
while i < n:
|
||||
msg = out[i]
|
||||
role = msg.get("role")
|
||||
|
||||
if role == "assistant" and msg.get("tool_calls"):
|
||||
# Bloque contiguo de tool messages que responden a este assistant.
|
||||
j = i + 1
|
||||
block: list[dict[str, Any]] = []
|
||||
while j < n and out[j].get("role") == "tool":
|
||||
block.append(out[j])
|
||||
j += 1
|
||||
answered = {t.get("tool_call_id", "") for t in block}
|
||||
kept_calls = [
|
||||
tc for tc in msg["tool_calls"] if tc.get("id", "") in answered
|
||||
]
|
||||
dropped = [
|
||||
tc for tc in msg["tool_calls"] if tc.get("id", "") not in answered
|
||||
]
|
||||
new_msg = dict(msg)
|
||||
if dropped:
|
||||
for tc in dropped:
|
||||
logger.warning(
|
||||
"repaired unanswered tool_call at index %d (tool_call_id=%s)",
|
||||
i,
|
||||
tc.get("id", ""),
|
||||
)
|
||||
if kept_calls:
|
||||
new_msg["tool_calls"] = kept_calls
|
||||
else:
|
||||
new_msg.pop("tool_calls", None)
|
||||
if new_msg.get("content") is None:
|
||||
# Promover reasoning a content si existe (mismo
|
||||
# criterio que el quirk DeepSeek de arriba); si no,
|
||||
# placeholder para no enviar content=None sin tools.
|
||||
rc = new_msg.pop("reasoning_content", None)
|
||||
new_msg["content"] = rc or "[ASSISTANT COMPACTADO]"
|
||||
repaired.append(new_msg)
|
||||
valid_ids = {tc.get("id", "") for tc in kept_calls}
|
||||
converted: list[dict[str, Any]] = []
|
||||
for t in block:
|
||||
if t.get("tool_call_id", "") in valid_ids:
|
||||
repaired.append(t)
|
||||
else:
|
||||
logger.warning(
|
||||
"repaired orphan tool message (tool_call_id=%s)",
|
||||
t.get("tool_call_id", ""),
|
||||
)
|
||||
converted.append(
|
||||
{
|
||||
"role": "user",
|
||||
"content": "[Resultado de herramienta (contexto compactado)]: "
|
||||
+ str(t.get("content", ""))[:500],
|
||||
}
|
||||
)
|
||||
# Los huerfanos convertidos van DESPUES del bloque de tools
|
||||
# validos para no romper la contiguidad assistant → tools.
|
||||
repaired.extend(converted)
|
||||
i = j
|
||||
continue
|
||||
|
||||
if role == "tool":
|
||||
# Tool message sin assistant con tool_calls delante → huerfano.
|
||||
logger.warning(
|
||||
"repaired orphan tool message at index %d (tool_call_id=%s)",
|
||||
i,
|
||||
msg.get("tool_call_id", ""),
|
||||
)
|
||||
repaired.append(
|
||||
{
|
||||
"role": "user",
|
||||
"content": "[Resultado de herramienta (contexto compactado)]: "
|
||||
+ str(msg.get("content", ""))[:500],
|
||||
}
|
||||
)
|
||||
i += 1
|
||||
continue
|
||||
|
||||
repaired.append(msg)
|
||||
i += 1
|
||||
return repaired
|
||||
|
||||
@@ -102,6 +102,10 @@ class Settings(BaseSettings):
|
||||
conversation_recent_raw_limit: int = 2
|
||||
task_history_max_entries: int = 20
|
||||
task_history_max_tokens: int = 1500
|
||||
# Presupuesto de tokens para la ventana de recent_messages persistida en
|
||||
# sesion. Sin esto crece sin limite y empuja al compactor a su paso
|
||||
# destructivo (colapsar bloques perdiendo tool_use ids). 0 = sin limite.
|
||||
recent_messages_max_tokens: int = 60_000
|
||||
|
||||
# --- MCP ---
|
||||
mcp_config_path: str = "" # Path to mcp.json; empty = legacy single-server mode
|
||||
|
||||
@@ -180,7 +180,13 @@ class ContextCompactor:
|
||||
"raw_tool_results_kept": 0,
|
||||
}
|
||||
if total <= max_tokens:
|
||||
return messages, meta
|
||||
# Aunque no haga falta compactar, garantizamos el invariante
|
||||
# tool_use/tool_result (repara historiales ya rotos persistidos).
|
||||
repaired = self._enforce_tool_pairing([dict(m) for m in messages])
|
||||
meta["output_tokens"] = sum(
|
||||
self._estimate_message_tokens(m) for m in repaired
|
||||
)
|
||||
return repaired, meta
|
||||
|
||||
compacted = [dict(m) for m in messages]
|
||||
last_user_idx = max(
|
||||
@@ -343,20 +349,241 @@ class ContextCompactor:
|
||||
message["content"] = "[USER CONTEXT COMPACTADO]"
|
||||
elif isinstance(content, list) and content:
|
||||
# Anthropic-style: reemplazar lista entera por placeholder string.
|
||||
# Nota: pierde tool_use ids — solo aplicar al final como ultimo recurso.
|
||||
# Nota: colapsar pierde los tool_use/tool_result ids, asi que
|
||||
# lo hacemos PAIR-AWARE (colapsar un lado del par colapsa el
|
||||
# otro en la misma iteracion) y ademas `_enforce_tool_pairing`
|
||||
# al final garantiza el invariante aunque algo se escape.
|
||||
if role == "assistant":
|
||||
message["content"] = "[ASSISTANT COMPACTADO]"
|
||||
# Si este assistant tenia tool_use, colapsar tambien el
|
||||
# user de tool_results que lo sigue (mismo par).
|
||||
if self._blocks_have_type(content, "tool_use"):
|
||||
nxt = idx + 1
|
||||
if (
|
||||
nxt < len(compacted)
|
||||
and nxt != last_user_idx
|
||||
and compacted[nxt].get("role") == "user"
|
||||
and self._blocks_have_type(
|
||||
compacted[nxt].get("content"), "tool_result"
|
||||
)
|
||||
):
|
||||
compacted[nxt]["content"] = "[USER CONTEXT COMPACTADO]"
|
||||
elif role == "user":
|
||||
message["content"] = "[USER CONTEXT COMPACTADO]"
|
||||
# Si este user llevaba tool_results, colapsar tambien el
|
||||
# assistant anterior con sus tool_use (mismo par).
|
||||
if self._blocks_have_type(content, "tool_result"):
|
||||
prv = idx - 1
|
||||
if (
|
||||
prv >= 0
|
||||
and compacted[prv].get("role") == "assistant"
|
||||
and self._blocks_have_type(
|
||||
compacted[prv].get("content"), "tool_use"
|
||||
)
|
||||
):
|
||||
compacted[prv]["content"] = "[ASSISTANT COMPACTADO]"
|
||||
else:
|
||||
continue
|
||||
total = sum(self._estimate_message_tokens(m) for m in compacted)
|
||||
if total <= max_tokens:
|
||||
break
|
||||
|
||||
# Invariante final: tras toda la compactacion, reparar cualquier par
|
||||
# tool_use/tool_result roto. Sin esto, un tool_result huerfano se emite
|
||||
# como `role: tool` sin `tool_calls` previo y el proveedor devuelve 400
|
||||
# ("Messages with role 'tool' must be a response to a preceding message
|
||||
# with 'tool_calls'").
|
||||
compacted = self._enforce_tool_pairing(compacted)
|
||||
total = sum(self._estimate_message_tokens(m) for m in compacted)
|
||||
|
||||
meta["output_tokens"] = total
|
||||
return compacted, meta
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Invariante tool_use ↔ tool_result
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def _blocks_have_type(content: Any, block_type: str) -> bool:
|
||||
"""True si `content` es una lista de bloques con alguno del tipo dado."""
|
||||
if not isinstance(content, list):
|
||||
return False
|
||||
return any(
|
||||
isinstance(b, dict) and b.get("type") == block_type for b in content
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _tool_use_ids(message: dict[str, Any]) -> set[str]:
|
||||
"""IDs de tool calls emitidos por un assistant (bloques `tool_use`
|
||||
estilo Anthropic y/o `tool_calls` estilo OpenAI legacy)."""
|
||||
ids: set[str] = set()
|
||||
content = message.get("content")
|
||||
if isinstance(content, list):
|
||||
for b in content:
|
||||
if isinstance(b, dict) and b.get("type") == "tool_use":
|
||||
ids.add(str(b.get("id", "")))
|
||||
for tc in message.get("tool_calls") or []:
|
||||
if isinstance(tc, dict):
|
||||
ids.add(str(tc.get("id", "")))
|
||||
ids.discard("")
|
||||
return ids
|
||||
|
||||
def _enforce_tool_pairing(
|
||||
self, messages: list[dict[str, Any]]
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Repara el invariante tool_use ↔ tool_result en ambas direcciones.
|
||||
|
||||
La compactacion puede colapsar el content de un assistant (perdiendo sus
|
||||
bloques `tool_use`) mientras el user siguiente conserva sus `tool_result`,
|
||||
o al reves. El matching es por IDs (`tool_use.id` vs `tool_result.tool_use_id`
|
||||
y `tool_calls[].id` vs `tool_call_id`), no solo por adyacencia, asi que
|
||||
tambien repara desajustes parciales (p.ej. 3 tool_use vs 2 tool_result).
|
||||
|
||||
- tool_result sin tool_use previo → bloque text placeholder.
|
||||
- tool_use sin tool_result siguiente → se elimina el bloque (thinking/text
|
||||
se conservan; si el content queda vacio, placeholder string).
|
||||
- `role: tool` legacy sin assistant con `tool_calls` → user placeholder.
|
||||
"""
|
||||
repaired: list[dict[str, Any]] = []
|
||||
for idx, msg in enumerate(messages):
|
||||
role = msg.get("role", "")
|
||||
content = msg.get("content")
|
||||
|
||||
if role == "assistant":
|
||||
tool_ids = self._tool_use_ids(msg)
|
||||
if not tool_ids:
|
||||
repaired.append(msg)
|
||||
continue
|
||||
# IDs respondidos: user con tool_results inmediato y/o run
|
||||
# contiguo de mensajes legacy `role: tool`.
|
||||
answered: set[str] = set()
|
||||
j = idx + 1
|
||||
if (
|
||||
j < len(messages)
|
||||
and messages[j].get("role") == "user"
|
||||
and isinstance(messages[j].get("content"), list)
|
||||
):
|
||||
for b in messages[j]["content"]:
|
||||
if isinstance(b, dict) and b.get("type") == "tool_result":
|
||||
answered.add(str(b.get("tool_use_id", "")))
|
||||
j += 1
|
||||
while j < len(messages) and messages[j].get("role") == "tool":
|
||||
answered.add(str(messages[j].get("tool_call_id", "")))
|
||||
j += 1
|
||||
unanswered = tool_ids - answered
|
||||
if not unanswered:
|
||||
repaired.append(msg)
|
||||
continue
|
||||
# Eliminar los tool_use/tool_calls sin respuesta.
|
||||
new_msg = dict(msg)
|
||||
if isinstance(content, list):
|
||||
new_content = [
|
||||
b
|
||||
for b in content
|
||||
if not (
|
||||
isinstance(b, dict)
|
||||
and b.get("type") == "tool_use"
|
||||
and str(b.get("id", "")) in unanswered
|
||||
)
|
||||
]
|
||||
if not new_content:
|
||||
new_msg["content"] = "[ASSISTANT COMPACTADO]"
|
||||
else:
|
||||
new_msg["content"] = new_content
|
||||
if isinstance(new_msg.get("tool_calls"), list):
|
||||
kept_calls = [
|
||||
tc
|
||||
for tc in new_msg["tool_calls"]
|
||||
if isinstance(tc, dict)
|
||||
and str(tc.get("id", "")) not in unanswered
|
||||
]
|
||||
if kept_calls:
|
||||
new_msg["tool_calls"] = kept_calls
|
||||
else:
|
||||
new_msg.pop("tool_calls", None)
|
||||
if not new_msg.get("content"):
|
||||
new_msg["content"] = "[ASSISTANT COMPACTADO]"
|
||||
repaired.append(new_msg)
|
||||
continue
|
||||
|
||||
if role == "user" and self._blocks_have_type(content, "tool_result"):
|
||||
# IDs disponibles en el assistant inmediatamente anterior
|
||||
# (YA reparado — usar `repaired[-1]` refleja los tool_use que
|
||||
# sobrevivieron, no los del mensaje original).
|
||||
available: set[str] = set()
|
||||
if repaired and repaired[-1].get("role") == "assistant":
|
||||
available = self._tool_use_ids(repaired[-1])
|
||||
new_content: list[Any] = []
|
||||
orphaned = False
|
||||
for b in content:
|
||||
if (
|
||||
isinstance(b, dict)
|
||||
and b.get("type") == "tool_result"
|
||||
and str(b.get("tool_use_id", "")) not in available
|
||||
):
|
||||
orphaned = True
|
||||
# Fusionar placeholders consecutivos en un unico bloque text.
|
||||
if not (
|
||||
new_content
|
||||
and isinstance(new_content[-1], dict)
|
||||
and new_content[-1].get("type") == "text"
|
||||
and new_content[-1].get("text")
|
||||
== "[Resultado de herramienta compactado]"
|
||||
):
|
||||
new_content.append(
|
||||
{
|
||||
"type": "text",
|
||||
"text": "[Resultado de herramienta compactado]",
|
||||
}
|
||||
)
|
||||
continue
|
||||
new_content.append(b)
|
||||
if not orphaned:
|
||||
repaired.append(msg)
|
||||
continue
|
||||
new_msg = dict(msg)
|
||||
only_placeholders = all(
|
||||
isinstance(b, dict)
|
||||
and b.get("type") == "text"
|
||||
and b.get("text") == "[Resultado de herramienta compactado]"
|
||||
for b in new_content
|
||||
)
|
||||
if not new_content or only_placeholders:
|
||||
new_msg["content"] = "[Resultado de herramienta compactado]"
|
||||
else:
|
||||
new_msg["content"] = new_content
|
||||
repaired.append(new_msg)
|
||||
continue
|
||||
|
||||
if role == "tool":
|
||||
# Legacy: el assistant anterior (saltando otros `role: tool`
|
||||
# contiguos) debe tener este tool_call_id en sus tool_calls.
|
||||
prev_assistant: dict[str, Any] | None = None
|
||||
for prev in reversed(repaired):
|
||||
if prev.get("role") == "tool":
|
||||
continue
|
||||
if prev.get("role") == "assistant":
|
||||
prev_assistant = prev
|
||||
break
|
||||
call_id = str(msg.get("tool_call_id", ""))
|
||||
valid = (
|
||||
prev_assistant is not None
|
||||
and call_id in self._tool_use_ids(prev_assistant)
|
||||
)
|
||||
if valid:
|
||||
repaired.append(msg)
|
||||
else:
|
||||
repaired.append(
|
||||
{
|
||||
"role": "user",
|
||||
"content": "[Resultado de herramienta compactado]",
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
repaired.append(msg)
|
||||
return repaired
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internals
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@@ -14,7 +14,7 @@ from typing import Any
|
||||
from ..adapters.base import ModelAdapter
|
||||
from ..config import settings
|
||||
from ..context.engine import ContextEngine
|
||||
from ..context.compactor import estimate_tokens
|
||||
from ..context.compactor import ContextCompactor, estimate_tokens
|
||||
from ..mcp.manager import MCPManager
|
||||
from ..memory.store import MemoryStore
|
||||
from ..models.agent import AgentProfile
|
||||
@@ -260,7 +260,76 @@ class OrchestratorEngine:
|
||||
current_turn.append(sanitized)
|
||||
|
||||
merged.extend(current_turn)
|
||||
return merged
|
||||
return OrchestratorEngine._trim_recent_messages(merged)
|
||||
|
||||
@staticmethod
|
||||
def _trim_recent_messages(
|
||||
messages: list[dict[str, Any]],
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Recorta recent_messages a un presupuesto de tokens eliminando
|
||||
mensajes ENTEROS desde el principio (los mas antiguos).
|
||||
|
||||
Dos reglas para no romper el invariante tool_use ↔ tool_result:
|
||||
- Nunca cortar dentro de un par: si se elimina un assistant con
|
||||
tool_use, se eliminan tambien sus tool_results (user carrier o run
|
||||
de mensajes legacy `role: tool`).
|
||||
- El primer mensaje resultante nunca puede ser un carrier de
|
||||
tool_result ni un `role: tool`.
|
||||
|
||||
Mantiene siempre al menos los ultimos 4 mensajes aunque excedan el
|
||||
presupuesto.
|
||||
"""
|
||||
budget = settings.recent_messages_max_tokens
|
||||
if budget <= 0 or not messages:
|
||||
return messages
|
||||
|
||||
estimate = ContextCompactor._estimate_message_tokens
|
||||
total = sum(estimate(m) for m in messages)
|
||||
if total <= budget:
|
||||
return messages
|
||||
|
||||
def _is_tool_result_carrier(msg: dict[str, Any]) -> bool:
|
||||
if msg.get("role") == "tool":
|
||||
return True
|
||||
if msg.get("role") != "user":
|
||||
return False
|
||||
content = msg.get("content")
|
||||
return isinstance(content, list) and any(
|
||||
isinstance(b, dict) and b.get("type") == "tool_result"
|
||||
for b in content
|
||||
)
|
||||
|
||||
def _has_tool_use(msg: dict[str, Any]) -> bool:
|
||||
if msg.get("role") != "assistant":
|
||||
return False
|
||||
if msg.get("tool_calls"):
|
||||
return True
|
||||
content = msg.get("content")
|
||||
return isinstance(content, list) and any(
|
||||
isinstance(b, dict) and b.get("type") == "tool_use"
|
||||
for b in content
|
||||
)
|
||||
|
||||
min_keep = 4
|
||||
n = len(messages)
|
||||
start = 0
|
||||
while total > budget and start < n - min_keep:
|
||||
end = start + 1
|
||||
if _has_tool_use(messages[start]):
|
||||
# Arrastrar los tool_results del par (no cortar dentro de el).
|
||||
while end < n and _is_tool_result_carrier(messages[end]):
|
||||
end += 1
|
||||
if n - end < min_keep:
|
||||
break # Eliminar el par completo invadiria los ultimos min_keep
|
||||
for k in range(start, end):
|
||||
total -= estimate(messages[k])
|
||||
start = end
|
||||
|
||||
trimmed = messages[start:]
|
||||
# El primer mensaje nunca puede ser un tool_result sin su tool_use.
|
||||
while trimmed and _is_tool_result_carrier(trimmed[0]):
|
||||
trimmed.pop(0)
|
||||
return trimmed
|
||||
|
||||
@staticmethod
|
||||
def _sanitize_recent_message(message: dict[str, Any]) -> dict[str, Any]:
|
||||
|
||||
Reference in New Issue
Block a user