diff --git a/src/context/compactor.py b/src/context/compactor.py index 7994377..3a9389f 100644 --- a/src/context/compactor.py +++ b/src/context/compactor.py @@ -327,8 +327,17 @@ class ContextCompactor: prefix: str, max_chars: int, ) -> str: + """Resume el contenido de un mensaje segun su tipo detectado. + + Dispatcher que detecta JSON / tabla / stack-trace / texto plano y + aplica la estrategia de sumario mas apropiada. Si un handler + especializado no puede procesar el contenido (devuelve None o lanza), + cae al handler de texto plano (first + last). + """ stripped = content.strip() compacted = self._compact_text(content) + + # Caso trivial: el contenido ya cabe, solo normalizamos whitespace if len(compacted) <= max_chars: if compacted != stripped: summary = f"{prefix} {compacted}".strip() @@ -337,7 +346,218 @@ class ContextCompactor: return summary return compacted - lines = [l.strip() for l in compacted.splitlines() if l.strip()] + # Detectar tipo de contenido y despachar al handler apropiado + ctype = self._detect_content_type(compacted) + try: + if ctype == "json": + result = self._summarize_json(compacted, prefix, max_chars) + if result is not None: + return result + elif ctype == "table": + result = self._summarize_table(compacted, prefix, max_chars) + if result is not None: + return result + elif ctype == "trace": + result = self._summarize_trace(compacted, prefix, max_chars) + if result is not None: + return result + except Exception as e: + logger.debug("typed summarizer failed (%s): %s", ctype, e) + + # Fallback: texto plano (first + last) + return self._summarize_plain(compacted, prefix, max_chars) + + # ------------------------------------------------------------------ + # Handlers especializados por tipo de contenido + # ------------------------------------------------------------------ + + def _detect_content_type(self, text: str) -> str: + """Heuristica para detectar el tipo de contenido del mensaje. + + Devuelve 'json' | 'table' | 'trace' | 'plain'. + Solo devuelve 'json' si el parse funciona realmente. + """ + stripped = text.strip() + if not stripped: + return "plain" + + # JSON: empieza con { o [ y parsea correctamente + first_char = stripped[0] + if first_char in ("{", "["): + try: + json.loads(stripped) + return "json" + except (json.JSONDecodeError, ValueError): + pass + + # Stack trace / error: contiene marcadores tipicos + lower = stripped.lower() + trace_markers = ("traceback", "error:", "exception", "\n at ") + if any(m in lower for m in trace_markers): + return "trace" + + # Tabla markdown: al menos una linea con pipes y un separador --- + has_pipe_line = False + has_separator = False + for line in stripped.splitlines()[:20]: + l = line.strip() + if l.startswith("|") and l.endswith("|") and l.count("|") >= 3: + has_pipe_line = True + if re.match(r"^\|[\s\|:\-]+\|$", l) and "---" in l: + has_separator = True + break + if has_pipe_line and has_separator: + return "table" + + return "plain" + + def _summarize_json(self, raw: str, prefix: str, max_chars: int) -> str | None: + """Resume JSON truncando listas largas y preservando shape. + + Devuelve None si el parse falla (no deberia si _detect_content_type lo + identifico correctamente, pero por seguridad). + """ + try: + data = json.loads(raw) + except (json.JSONDecodeError, ValueError): + return None + + truncated, stats = self._truncate_json_value(data, list_limit=5, depth_limit=4) + try: + body = json.dumps(truncated, ensure_ascii=False, separators=(",", ":")) + except (TypeError, ValueError): + return None + + stats_parts = [] + if stats.get("lists_truncated"): + stats_parts.append(f"{stats['lists_truncated']} listas truncadas") + if stats.get("items_dropped"): + stats_parts.append(f"{stats['items_dropped']} items omitidos") + stats_text = ", ".join(stats_parts) or "truncado" + + summary = f"{prefix} JSON ({stats_text}): {body}" + if len(summary) > max_chars: + summary = summary[: max_chars - 1].rstrip() + "…" + return summary + + def _truncate_json_value( + self, + value: Any, + list_limit: int, + depth_limit: int, + _depth: int = 0, + ) -> tuple[Any, dict[str, int]]: + """Trunca recursivamente listas y limita profundidad en un JSON.""" + stats = {"lists_truncated": 0, "items_dropped": 0} + + if _depth >= depth_limit: + if isinstance(value, (dict, list)): + return ("<…>", stats) + return (value, stats) + + if isinstance(value, list): + original_len = len(value) + if original_len > list_limit: + stats["lists_truncated"] += 1 + stats["items_dropped"] += original_len - list_limit + value = value[:list_limit] + [f"<…+{original_len - list_limit} más>"] + truncated_list = [] + for item in value: + sub, sub_stats = self._truncate_json_value( + item, list_limit, depth_limit, _depth + 1 + ) + truncated_list.append(sub) + for k in stats: + stats[k] += sub_stats.get(k, 0) + return (truncated_list, stats) + + if isinstance(value, dict): + truncated_dict = {} + for k, v in value.items(): + sub, sub_stats = self._truncate_json_value( + v, list_limit, depth_limit, _depth + 1 + ) + truncated_dict[k] = sub + for key in stats: + stats[key] += sub_stats.get(key, 0) + return (truncated_dict, stats) + + return (value, stats) + + def _summarize_table(self, raw: str, prefix: str, max_chars: int) -> str | None: + """Resume una tabla markdown preservando header + primeras N filas.""" + lines = [l for l in raw.splitlines() if l.strip()] + if len(lines) < 3: + return None # Muy pocas lineas para ser una tabla + + # Localizar header (primera linea con pipes) y separador + header_idx = -1 + separator_idx = -1 + for i, line in enumerate(lines): + stripped = line.strip() + if stripped.startswith("|") and "|" in stripped[1:]: + if header_idx < 0: + header_idx = i + continue + if re.match(r"^\|[\s\|:\-]+\|$", stripped) and "---" in stripped: + separator_idx = i + break + if header_idx < 0 or separator_idx < 0: + return None + + data_rows = lines[separator_idx + 1 :] + data_rows = [r for r in data_rows if r.strip().startswith("|")] + keep_rows = 5 + total_rows = len(data_rows) + + parts = [ + f"{prefix} Tabla ({total_rows} filas, mostrando {min(keep_rows, total_rows)}):", + lines[header_idx], + lines[separator_idx], + ] + parts.extend(data_rows[:keep_rows]) + if total_rows > keep_rows: + parts.append(f"| … {total_rows - keep_rows} filas más … |") + + summary = "\n".join(parts) + if len(summary) > max_chars: + summary = summary[: max_chars - 1].rstrip() + "…" + return summary + + def _summarize_trace(self, raw: str, prefix: str, max_chars: int) -> str | None: + """Resume un stack trace: mensaje de error + ultimas N frames.""" + lines = [l for l in raw.splitlines() if l.strip()] + if not lines: + return None + + # Localizar la linea del mensaje de error (la mas informativa) + error_line = None + for line in lines: + low = line.lower() + if any(m in low for m in ("error:", "exception:", "traceback")): + error_line = line.strip() + break + if error_line is None: + error_line = lines[0].strip() + + # Ultimas 5 lineas del stack (suelen ser las mas relevantes) + tail_count = 5 + tail_lines = [l.strip() for l in lines[-tail_count:]] + hidden = max(0, len(lines) - tail_count - 1) + + parts = [f"{prefix} Error: {error_line[:200]}"] + if hidden > 0: + parts.append(f"… {hidden} frames ocultos …") + parts.extend(tail_lines) + + summary = "\n".join(parts) + if len(summary) > max_chars: + summary = summary[: max_chars - 1].rstrip() + "…" + return summary + + def _summarize_plain(self, raw: str, prefix: str, max_chars: int) -> str: + """Fallback para texto plano: primera linea + ultima linea.""" + lines = [l.strip() for l in raw.splitlines() if l.strip()] if not lines: return prefix if len(lines) == 1: