Initial commit
This commit is contained in:
229
src/context/compactor.py
Normal file
229
src/context/compactor.py
Normal file
@@ -0,0 +1,229 @@
|
||||
"""Context compaction: extract facts, remove redundancy, maintain constraints.
|
||||
|
||||
The compactor is responsible for keeping the context within token budget
|
||||
while preserving the most important information.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
from ..models.artifacts import ArtifactSummary
|
||||
from ..models.context import ContextSection, ContextSectionType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# --- Token counting with tiktoken ---
|
||||
try:
|
||||
import tiktoken
|
||||
_encoding = tiktoken.get_encoding("cl100k_base") # Works for GPT-4o and Claude
|
||||
|
||||
def estimate_tokens(text: str) -> int:
|
||||
"""Accurate token count using tiktoken (cl100k_base encoding)."""
|
||||
if not text:
|
||||
return 0
|
||||
return len(_encoding.encode(text, disallowed_special=()))
|
||||
|
||||
logger.info("Using tiktoken for accurate token counting")
|
||||
except ImportError:
|
||||
def estimate_tokens(text: str) -> int:
|
||||
"""Fallback: ~4 chars per token."""
|
||||
return max(1, len(text) // 4)
|
||||
|
||||
logger.warning("tiktoken not installed — using approximate token counting")
|
||||
|
||||
|
||||
class ContextCompactor:
|
||||
"""Compacts context sections to fit within token budgets."""
|
||||
|
||||
def __init__(self, max_tokens: int = 120_000) -> None:
|
||||
self.max_tokens = max_tokens
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Public
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def compact_sections(
|
||||
self, sections: list[ContextSection]
|
||||
) -> list[ContextSection]:
|
||||
"""Remove redundancy and trim low-priority sections to fit budget."""
|
||||
# 1. Deduplicate identical content across sections
|
||||
sections = self._deduplicate(sections)
|
||||
|
||||
# 2. Estimate tokens per section
|
||||
for s in sections:
|
||||
s.token_estimate = estimate_tokens(s.content)
|
||||
|
||||
total = sum(s.token_estimate for s in sections)
|
||||
if total <= self.max_tokens:
|
||||
return sections
|
||||
|
||||
# 3. Sort by priority (highest first) — immutable_rules never trimmed
|
||||
sections.sort(key=lambda s: s.priority, reverse=True)
|
||||
|
||||
# 4. Progressively trim lowest-priority sections
|
||||
while total > self.max_tokens and sections:
|
||||
lowest = sections[-1]
|
||||
if lowest.section_type == ContextSectionType.IMMUTABLE_RULES:
|
||||
break # Never trim rules
|
||||
# Try to compact the section first
|
||||
compacted = self._compact_text(lowest.content)
|
||||
new_estimate = estimate_tokens(compacted)
|
||||
saved = lowest.token_estimate - new_estimate
|
||||
if saved > 0:
|
||||
lowest.content = compacted
|
||||
lowest.token_estimate = new_estimate
|
||||
total -= saved
|
||||
else:
|
||||
# Remove the section entirely
|
||||
total -= lowest.token_estimate
|
||||
sections.pop()
|
||||
|
||||
return sections
|
||||
|
||||
def summarize_tool_output(
|
||||
self,
|
||||
tool_name: str,
|
||||
raw_output: str,
|
||||
session_id: str,
|
||||
task_id: str,
|
||||
) -> ArtifactSummary:
|
||||
"""Summarise raw tool output into an ArtifactSummary.
|
||||
|
||||
The raw output is NEVER passed through to the model context.
|
||||
"""
|
||||
facts = self._extract_facts(raw_output)
|
||||
summary = self._build_summary(tool_name, raw_output, facts)
|
||||
|
||||
artifact_type = self._infer_artifact_type(tool_name)
|
||||
artifact_id = hashlib.sha256(
|
||||
f"{session_id}:{task_id}:{tool_name}:{raw_output[:200]}".encode()
|
||||
).hexdigest()[:16]
|
||||
|
||||
return ArtifactSummary(
|
||||
artifact_id=artifact_id,
|
||||
session_id=session_id,
|
||||
task_id=task_id,
|
||||
artifact_type=artifact_type,
|
||||
title=f"Output of {tool_name}",
|
||||
summary=summary,
|
||||
facts=facts,
|
||||
source_tool=tool_name,
|
||||
char_count=len(raw_output),
|
||||
)
|
||||
|
||||
def compact_artifact_summaries(
|
||||
self, summaries: list[ArtifactSummary], max_chars: int = 2000
|
||||
) -> str:
|
||||
"""Merge multiple artifact summaries into a single compact block."""
|
||||
if not summaries:
|
||||
return ""
|
||||
|
||||
lines: list[str] = ["## Artifacts"]
|
||||
budget = max_chars - 20
|
||||
for art in summaries:
|
||||
entry = f"- [{art.artifact_type}] {art.title}: {art.summary}"
|
||||
if art.facts:
|
||||
entry += " | Facts: " + "; ".join(art.facts[:3])
|
||||
if len(entry) > budget:
|
||||
entry = entry[:budget] + "…"
|
||||
lines.append(entry)
|
||||
budget -= len(entry)
|
||||
if budget <= 0:
|
||||
lines.append(f" … and {len(summaries) - len(lines) + 1} more artifacts")
|
||||
break
|
||||
return "\n".join(lines)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internals
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _deduplicate(
|
||||
self, sections: list[ContextSection]
|
||||
) -> list[ContextSection]:
|
||||
seen: set[str] = set()
|
||||
result: list[str] = []
|
||||
unique: list[ContextSection] = []
|
||||
for s in sections:
|
||||
h = hashlib.md5(s.content.encode()).hexdigest()
|
||||
if h not in seen:
|
||||
seen.add(h)
|
||||
unique.append(s)
|
||||
return unique
|
||||
|
||||
def _compact_text(self, text: str) -> str:
|
||||
"""Aggressively compact text: remove blank lines, collapse whitespace."""
|
||||
lines = text.splitlines()
|
||||
# Remove empty or whitespace-only lines
|
||||
lines = [l.rstrip() for l in lines if l.strip()]
|
||||
# Collapse consecutive duplicate lines
|
||||
compacted: list[str] = []
|
||||
for line in lines:
|
||||
if not compacted or line != compacted[-1]:
|
||||
compacted.append(line)
|
||||
return "\n".join(compacted)
|
||||
|
||||
def _extract_facts(self, raw_output: str) -> list[str]:
|
||||
"""Extract short factual claims from tool output."""
|
||||
facts: list[str] = []
|
||||
lines = raw_output.strip().splitlines()
|
||||
|
||||
for line in lines[:100]: # Limit scan depth
|
||||
line = line.strip()
|
||||
if not line or len(line) < 10:
|
||||
continue
|
||||
# Lines that look like key-value facts
|
||||
if re.match(r"^[\w\s]+:\s+.+", line) and len(line) < 200:
|
||||
facts.append(line)
|
||||
# Lines starting with status indicators
|
||||
elif re.match(r"^(✓|✗|PASS|FAIL|ERROR|OK|INFO|WARNING)", line):
|
||||
facts.append(line)
|
||||
# Lines that contain file paths with results
|
||||
elif re.match(r"^[\w/\\.]+\s*[:\-]\s*.+", line) and len(line) < 200:
|
||||
facts.append(line)
|
||||
|
||||
# Deduplicate and limit
|
||||
seen: set[str] = set()
|
||||
unique: list[str] = []
|
||||
for f in facts:
|
||||
if f not in seen:
|
||||
seen.add(f)
|
||||
unique.append(f)
|
||||
return unique[:15]
|
||||
|
||||
def _build_summary(
|
||||
self, tool_name: str, raw_output: str, facts: list[str]
|
||||
) -> str:
|
||||
"""Build a concise summary from tool output."""
|
||||
lines = raw_output.strip().splitlines()
|
||||
total_lines = len(lines)
|
||||
char_count = len(raw_output)
|
||||
|
||||
parts = [f"Tool '{tool_name}' returned {total_lines} lines ({char_count} chars)."]
|
||||
|
||||
if facts:
|
||||
parts.append(f"Key findings: {'; '.join(facts[:5])}")
|
||||
|
||||
# Include first and last meaningful lines as bookends
|
||||
meaningful = [l.strip() for l in lines if l.strip()]
|
||||
if meaningful:
|
||||
parts.append(f"First: {meaningful[0][:120]}")
|
||||
if len(meaningful) > 1:
|
||||
parts.append(f"Last: {meaningful[-1][:120]}")
|
||||
|
||||
return " ".join(parts)
|
||||
|
||||
def _infer_artifact_type(self, tool_name: str) -> str:
|
||||
tool_lower = tool_name.lower()
|
||||
if any(k in tool_lower for k in ("read", "file", "code", "write", "edit")):
|
||||
return "code"
|
||||
if any(k in tool_lower for k in ("test", "check", "lint", "validate")):
|
||||
return "test_result"
|
||||
if any(k in tool_lower for k in ("search", "find", "grep", "glob")):
|
||||
return "analysis"
|
||||
if any(k in tool_lower for k in ("plan", "design", "architect")):
|
||||
return "plan"
|
||||
return "general"
|
||||
Reference in New Issue
Block a user