compactor final

This commit is contained in:
Jordan Diaz
2026-04-09 21:41:11 +00:00
parent 237dc00379
commit 19efed84b7

View File

@@ -327,8 +327,17 @@ class ContextCompactor:
prefix: str, prefix: str,
max_chars: int, max_chars: int,
) -> str: ) -> str:
"""Resume el contenido de un mensaje segun su tipo detectado.
Dispatcher que detecta JSON / tabla / stack-trace / texto plano y
aplica la estrategia de sumario mas apropiada. Si un handler
especializado no puede procesar el contenido (devuelve None o lanza),
cae al handler de texto plano (first + last).
"""
stripped = content.strip() stripped = content.strip()
compacted = self._compact_text(content) compacted = self._compact_text(content)
# Caso trivial: el contenido ya cabe, solo normalizamos whitespace
if len(compacted) <= max_chars: if len(compacted) <= max_chars:
if compacted != stripped: if compacted != stripped:
summary = f"{prefix} {compacted}".strip() summary = f"{prefix} {compacted}".strip()
@@ -337,7 +346,218 @@ class ContextCompactor:
return summary return summary
return compacted return compacted
lines = [l.strip() for l in compacted.splitlines() if l.strip()] # Detectar tipo de contenido y despachar al handler apropiado
ctype = self._detect_content_type(compacted)
try:
if ctype == "json":
result = self._summarize_json(compacted, prefix, max_chars)
if result is not None:
return result
elif ctype == "table":
result = self._summarize_table(compacted, prefix, max_chars)
if result is not None:
return result
elif ctype == "trace":
result = self._summarize_trace(compacted, prefix, max_chars)
if result is not None:
return result
except Exception as e:
logger.debug("typed summarizer failed (%s): %s", ctype, e)
# Fallback: texto plano (first + last)
return self._summarize_plain(compacted, prefix, max_chars)
# ------------------------------------------------------------------
# Handlers especializados por tipo de contenido
# ------------------------------------------------------------------
def _detect_content_type(self, text: str) -> str:
"""Heuristica para detectar el tipo de contenido del mensaje.
Devuelve 'json' | 'table' | 'trace' | 'plain'.
Solo devuelve 'json' si el parse funciona realmente.
"""
stripped = text.strip()
if not stripped:
return "plain"
# JSON: empieza con { o [ y parsea correctamente
first_char = stripped[0]
if first_char in ("{", "["):
try:
json.loads(stripped)
return "json"
except (json.JSONDecodeError, ValueError):
pass
# Stack trace / error: contiene marcadores tipicos
lower = stripped.lower()
trace_markers = ("traceback", "error:", "exception", "\n at ")
if any(m in lower for m in trace_markers):
return "trace"
# Tabla markdown: al menos una linea con pipes y un separador ---
has_pipe_line = False
has_separator = False
for line in stripped.splitlines()[:20]:
l = line.strip()
if l.startswith("|") and l.endswith("|") and l.count("|") >= 3:
has_pipe_line = True
if re.match(r"^\|[\s\|:\-]+\|$", l) and "---" in l:
has_separator = True
break
if has_pipe_line and has_separator:
return "table"
return "plain"
def _summarize_json(self, raw: str, prefix: str, max_chars: int) -> str | None:
"""Resume JSON truncando listas largas y preservando shape.
Devuelve None si el parse falla (no deberia si _detect_content_type lo
identifico correctamente, pero por seguridad).
"""
try:
data = json.loads(raw)
except (json.JSONDecodeError, ValueError):
return None
truncated, stats = self._truncate_json_value(data, list_limit=5, depth_limit=4)
try:
body = json.dumps(truncated, ensure_ascii=False, separators=(",", ":"))
except (TypeError, ValueError):
return None
stats_parts = []
if stats.get("lists_truncated"):
stats_parts.append(f"{stats['lists_truncated']} listas truncadas")
if stats.get("items_dropped"):
stats_parts.append(f"{stats['items_dropped']} items omitidos")
stats_text = ", ".join(stats_parts) or "truncado"
summary = f"{prefix} JSON ({stats_text}): {body}"
if len(summary) > max_chars:
summary = summary[: max_chars - 1].rstrip() + ""
return summary
def _truncate_json_value(
self,
value: Any,
list_limit: int,
depth_limit: int,
_depth: int = 0,
) -> tuple[Any, dict[str, int]]:
"""Trunca recursivamente listas y limita profundidad en un JSON."""
stats = {"lists_truncated": 0, "items_dropped": 0}
if _depth >= depth_limit:
if isinstance(value, (dict, list)):
return ("<…>", stats)
return (value, stats)
if isinstance(value, list):
original_len = len(value)
if original_len > list_limit:
stats["lists_truncated"] += 1
stats["items_dropped"] += original_len - list_limit
value = value[:list_limit] + [f"<…+{original_len - list_limit} más>"]
truncated_list = []
for item in value:
sub, sub_stats = self._truncate_json_value(
item, list_limit, depth_limit, _depth + 1
)
truncated_list.append(sub)
for k in stats:
stats[k] += sub_stats.get(k, 0)
return (truncated_list, stats)
if isinstance(value, dict):
truncated_dict = {}
for k, v in value.items():
sub, sub_stats = self._truncate_json_value(
v, list_limit, depth_limit, _depth + 1
)
truncated_dict[k] = sub
for key in stats:
stats[key] += sub_stats.get(key, 0)
return (truncated_dict, stats)
return (value, stats)
def _summarize_table(self, raw: str, prefix: str, max_chars: int) -> str | None:
"""Resume una tabla markdown preservando header + primeras N filas."""
lines = [l for l in raw.splitlines() if l.strip()]
if len(lines) < 3:
return None # Muy pocas lineas para ser una tabla
# Localizar header (primera linea con pipes) y separador
header_idx = -1
separator_idx = -1
for i, line in enumerate(lines):
stripped = line.strip()
if stripped.startswith("|") and "|" in stripped[1:]:
if header_idx < 0:
header_idx = i
continue
if re.match(r"^\|[\s\|:\-]+\|$", stripped) and "---" in stripped:
separator_idx = i
break
if header_idx < 0 or separator_idx < 0:
return None
data_rows = lines[separator_idx + 1 :]
data_rows = [r for r in data_rows if r.strip().startswith("|")]
keep_rows = 5
total_rows = len(data_rows)
parts = [
f"{prefix} Tabla ({total_rows} filas, mostrando {min(keep_rows, total_rows)}):",
lines[header_idx],
lines[separator_idx],
]
parts.extend(data_rows[:keep_rows])
if total_rows > keep_rows:
parts.append(f"| … {total_rows - keep_rows} filas más … |")
summary = "\n".join(parts)
if len(summary) > max_chars:
summary = summary[: max_chars - 1].rstrip() + ""
return summary
def _summarize_trace(self, raw: str, prefix: str, max_chars: int) -> str | None:
"""Resume un stack trace: mensaje de error + ultimas N frames."""
lines = [l for l in raw.splitlines() if l.strip()]
if not lines:
return None
# Localizar la linea del mensaje de error (la mas informativa)
error_line = None
for line in lines:
low = line.lower()
if any(m in low for m in ("error:", "exception:", "traceback")):
error_line = line.strip()
break
if error_line is None:
error_line = lines[0].strip()
# Ultimas 5 lineas del stack (suelen ser las mas relevantes)
tail_count = 5
tail_lines = [l.strip() for l in lines[-tail_count:]]
hidden = max(0, len(lines) - tail_count - 1)
parts = [f"{prefix} Error: {error_line[:200]}"]
if hidden > 0:
parts.append(f"{hidden} frames ocultos …")
parts.extend(tail_lines)
summary = "\n".join(parts)
if len(summary) > max_chars:
summary = summary[: max_chars - 1].rstrip() + ""
return summary
def _summarize_plain(self, raw: str, prefix: str, max_chars: int) -> str:
"""Fallback para texto plano: primera linea + ultima linea."""
lines = [l.strip() for l in raw.splitlines() if l.strip()]
if not lines: if not lines:
return prefix return prefix
if len(lines) == 1: if len(lines) == 1: