From 4c73d848bb455c0d348528f54f1d7bc1d0c575a3 Mon Sep 17 00:00:00 2001 From: Jordan Diaz Date: Thu, 9 Apr 2026 18:27:36 +0000 Subject: [PATCH] Primera fase context --- src/api/routes.py | 10 ++ src/config.py | 40 +++++++- src/context/compactor.py | 32 +++++-- src/context/engine.py | 109 +++++++++++++++++++--- src/models/context.py | 1 + src/orchestrator/agents/base.py | 16 +++- src/orchestrator/engine.py | 96 ++++++++++++++++--- tests/test_context_budget.py | 160 ++++++++++++++++++++++++++++++++ 8 files changed, 424 insertions(+), 40 deletions(-) create mode 100644 tests/test_context_budget.py diff --git a/src/api/routes.py b/src/api/routes.py index 14bd02e..3ca269c 100644 --- a/src/api/routes.py +++ b/src/api/routes.py @@ -11,6 +11,7 @@ from fastapi import APIRouter, HTTPException, Request from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field +from ..config import settings from ..models.context import MemoryDocument, MemoryType from ..models.session import SessionState, SessionStatus from ..orchestrator.engine import OrchestratorEngine @@ -348,6 +349,15 @@ async def get_context_debug(session_id: str) -> dict[str, Any]: "last_build": last, "full_context": full_context, "history": history, + "budgets": { + "effective_context_budget": settings.effective_context_budget, + "compaction_threshold": settings.effective_compaction_threshold, + "reserve_tokens": settings.reserve_tokens, + "knowledge_base_max_tokens": settings.knowledge_base_max_tokens, + "tool_raw_output_max_chars": settings.tool_raw_output_max_chars, + "task_history_max_entries": settings.task_history_max_entries, + "task_history_max_tokens": settings.task_history_max_tokens, + }, } diff --git a/src/config.py b/src/config.py index ba58d0b..7dc2f73 100644 --- a/src/config.py +++ b/src/config.py @@ -38,10 +38,19 @@ class Settings(BaseSettings): temperature: float = 0.3 # --- Context engine --- - context_max_tokens: int = 120_000 - compaction_threshold_tokens: int = 80_000 + model_context_window: int = 0 # 0 = use legacy fixed budget / explicit override + model_max_output_tokens: int = 4096 + context_max_tokens: int = 0 # 0 = auto-budget from model window, fallback legacy 120k + compaction_threshold_tokens: int = 0 # 0 = derive from ratio + compaction_threshold_ratio: float = 0.80 + context_reserve_ratio: float = 0.10 artifact_summary_max_chars: int = 2000 + knowledge_base_max_tokens: int = 30_000 working_context_max_items: int = 20 + tool_raw_output_max_chars: int = 2000 + conversation_recent_raw_limit: int = 2 + task_history_max_entries: int = 20 + task_history_max_tokens: int = 1500 # --- MCP --- mcp_config_path: str = "" # Path to mcp.json; empty = legacy single-server mode @@ -64,5 +73,32 @@ class Settings(BaseSettings): model_config = {"env_prefix": "AGENTIC_", "env_file": ".env", "extra": "ignore"} + @property + def reserve_tokens(self) -> int: + if self.model_context_window <= 0: + return 0 + return max(0, int(self.model_context_window * self.context_reserve_ratio)) + + @property + def effective_context_budget(self) -> int: + if self.context_max_tokens > 0: + return self.context_max_tokens + + if self.model_context_window > 0: + budget = ( + self.model_context_window + - max(0, self.model_max_output_tokens) + - self.reserve_tokens + ) + return max(1, budget) + + return 120_000 + + @property + def effective_compaction_threshold(self) -> int: + if self.compaction_threshold_tokens > 0: + return min(self.compaction_threshold_tokens, self.effective_context_budget) + return max(1, int(self.effective_context_budget * self.compaction_threshold_ratio)) + settings = Settings() diff --git a/src/context/compactor.py b/src/context/compactor.py index 32c1716..306490b 100644 --- a/src/context/compactor.py +++ b/src/context/compactor.py @@ -47,25 +47,41 @@ class ContextCompactor: # ------------------------------------------------------------------ def compact_sections( - self, sections: list[ContextSection] - ) -> list[ContextSection]: + self, + sections: list[ContextSection], + max_tokens: int | None = None, + ) -> tuple[list[ContextSection], dict[str, Any]]: """Remove redundancy and trim low-priority sections to fit budget.""" + budget = max_tokens if max_tokens is not None else self.max_tokens + original_count = len(sections) + # 1. Deduplicate identical content across sections sections = self._deduplicate(sections) + duplicates_removed = original_count - len(sections) # 2. Estimate tokens per section for s in sections: s.token_estimate = estimate_tokens(s.content) total = sum(s.token_estimate for s in sections) - if total <= self.max_tokens: - return sections + meta = { + "budget_tokens": budget, + "input_tokens": total, + "output_tokens": total, + "sections_input": original_count, + "sections_output": len(sections), + "duplicates_removed": duplicates_removed, + "sections_compacted": 0, + "sections_removed": 0, + } + if total <= budget: + return sections, meta # 3. Sort by priority (highest first) — immutable_rules never trimmed sections.sort(key=lambda s: s.priority, reverse=True) # 4. Progressively trim lowest-priority sections - while total > self.max_tokens and sections: + while total > budget and sections: lowest = sections[-1] if lowest.section_type == ContextSectionType.IMMUTABLE_RULES: break # Never trim rules @@ -77,12 +93,16 @@ class ContextCompactor: lowest.content = compacted lowest.token_estimate = new_estimate total -= saved + meta["sections_compacted"] += 1 else: # Remove the section entirely total -= lowest.token_estimate sections.pop() + meta["sections_removed"] += 1 - return sections + meta["output_tokens"] = total + meta["sections_output"] = len(sections) + return sections, meta def summarize_tool_output( self, diff --git a/src/context/engine.py b/src/context/engine.py index 05f859a..336e5c2 100644 --- a/src/context/engine.py +++ b/src/context/engine.py @@ -45,7 +45,7 @@ class ContextEngine: memory_store: MemoryStore | None = None, ) -> None: self.compactor = compactor or ContextCompactor( - max_tokens=settings.context_max_tokens + max_tokens=settings.effective_context_budget ) self.memory = memory_store self._embed_service: EmbeddingService | None = None @@ -85,9 +85,17 @@ class ContextEngine: if "project_profile" in allowed: sections.append(self._build_project_profile(session)) + include_artifact_memory = ( + artifacts + and ("artifact_memory" in allowed or "task_state" in allowed) + ) + # 3. Knowledge base — loaded from memory store if "knowledge_base" in allowed and self.memory: - kb_section = await self._build_knowledge_base(session) + kb_section = await self._build_knowledge_base( + session, + max_tokens=settings.knowledge_base_max_tokens, + ) if kb_section: sections.append(kb_section) @@ -99,17 +107,45 @@ class ContextEngine: if "task_state" in allowed and session.current_task: sections.append(self._build_task_state(session.current_task)) - # Compact to fit budget - sections = self.compactor.compact_sections(sections) + # 6. Artifact memory — summaries for recent/current artifacts + if include_artifact_memory: + context_artifacts = self._select_context_artifacts(session, artifacts or []) + if context_artifacts: + sections.append(self._build_artifact_memory(context_artifacts)) + + # Build messages with real conversation history first so sections can + # compact against the remaining budget. + messages = self._build_messages(session, conversation) + message_tokens = sum(self._estimate_message_tokens(m) for m in messages) + pre_compaction_section_tokens = sum(estimate_tokens(s.content) for s in sections) + pre_compaction_total = pre_compaction_section_tokens + message_tokens + section_budget = max( + 1, + settings.effective_context_budget - message_tokens, + ) + + # Compact sections only when the full prompt is approaching the target. + section_compaction = { + "budget_tokens": section_budget, + "input_tokens": pre_compaction_section_tokens, + "output_tokens": pre_compaction_section_tokens, + "sections_input": len(sections), + "sections_output": len(sections), + "duplicates_removed": 0, + "sections_compacted": 0, + "sections_removed": 0, + } + if pre_compaction_total > settings.effective_compaction_threshold: + sections, section_compaction = self.compactor.compact_sections( + sections, + max_tokens=section_budget, + ) # Assemble system prompt from sections system_prompt = self._assemble_system_prompt(sections) - # Build messages with real conversation history - messages = self._build_messages(session, conversation) - total_tokens = estimate_tokens(system_prompt) + sum( - estimate_tokens(m.get("content", "")) for m in messages + self._estimate_message_tokens(m) for m in messages ) package = ContextPackage( @@ -124,6 +160,8 @@ class ContextEngine: "system_prompt": system_prompt, "messages": messages, "total_tokens": total_tokens, + "budget_tokens": settings.effective_context_budget, + "threshold_tokens": settings.effective_compaction_threshold, "timestamp": time.time(), } @@ -146,11 +184,23 @@ class ContextEngine: "total_tokens": total_tokens, "sections": section_summary, "sections_count": len(sections), - "compacted": len(sections) < len(allowed), + "compacted": bool( + section_compaction.get("sections_compacted") + or section_compaction.get("sections_removed") + or section_compaction.get("duplicates_removed") + ), "system_prompt_tokens": estimate_tokens(system_prompt), "user_message_preview": messages[0]["content"][:200] if messages else "", "artifacts_count": len(artifacts) if artifacts else 0, "conversation_messages": conv_len, + "budget_tokens": settings.effective_context_budget, + "threshold_tokens": settings.effective_compaction_threshold, + "message_tokens": message_tokens, + "pre_compaction_tokens": pre_compaction_total, + "post_compaction_tokens": total_tokens, + "section_budget_tokens": section_budget, + "section_compaction": section_compaction, + "over_budget": total_tokens > settings.effective_context_budget, } history = self._history[session.session_id] @@ -273,7 +323,9 @@ class ContextEngine: ) async def _build_knowledge_base( - self, session: SessionState + self, + session: SessionState, + max_tokens: int, ) -> ContextSection | None: """Load relevant knowledge documents via semantic search. @@ -314,8 +366,7 @@ class ContextEngine: ] # Include ALL docs — 42K tokens fits well within model context (128K) - max_kb_tokens = 50_000 - token_budget = max_kb_tokens + token_budget = max_tokens full_docs: list[MemoryDocument] = [] for doc_id in ranked_ids: @@ -433,7 +484,7 @@ class ContextEngine: content = "\n".join(lines) return ContextSection( - section_type=ContextSectionType.TASK_STATE, + section_type=ContextSectionType.TASK_HISTORY, content=content, priority=55, # Below knowledge (60), above artifacts (50) token_estimate=estimate_tokens(content), @@ -523,7 +574,9 @@ class ContextEngine: ContextSectionType.IMMUTABLE_RULES, ContextSectionType.PROJECT_PROFILE, ContextSectionType.KNOWLEDGE_BASE, + ContextSectionType.TASK_HISTORY, ContextSectionType.TASK_STATE, + ContextSectionType.ARTIFACT_MEMORY, ] section_map: dict[ContextSectionType, ContextSection] = { s.section_type: s for s in sections @@ -596,3 +649,33 @@ class ContextEngine: messages.extend(conversation) return messages + + @staticmethod + def _estimate_message_tokens(message: dict[str, Any]) -> int: + content = message.get("content", "") + if isinstance(content, str): + return estimate_tokens(content) + return estimate_tokens(str(content)) + + @staticmethod + def _select_context_artifacts( + session: SessionState, + artifacts: list[ArtifactSummary], + ) -> list[ArtifactSummary]: + if not artifacts: + return [] + + current_task_id = session.current_task.task_id if session.current_task else "" + recent_task_ids = { + entry.get("task_id", "") + for entry in session.task_history[-2:] + if entry.get("task_id") + } + + selected: list[ArtifactSummary] = [] + for artifact in sorted(artifacts, key=lambda a: a.created_at): + if current_task_id and artifact.task_id == current_task_id: + selected.append(artifact) + elif artifact.task_id in recent_task_ids: + selected.append(artifact) + return selected[-settings.working_context_max_items:] diff --git a/src/models/context.py b/src/models/context.py index 3b0c681..ff2562c 100644 --- a/src/models/context.py +++ b/src/models/context.py @@ -13,6 +13,7 @@ class ContextSectionType(StrEnum): IMMUTABLE_RULES = "immutable_rules" PROJECT_PROFILE = "project_profile" KNOWLEDGE_BASE = "knowledge_base" + TASK_HISTORY = "task_history" TASK_STATE = "task_state" ARTIFACT_MEMORY = "artifact_memory" WORKING_CONTEXT = "working_context" diff --git a/src/orchestrator/agents/base.py b/src/orchestrator/agents/base.py index e9cd196..0ccf5db 100644 --- a/src/orchestrator/agents/base.py +++ b/src/orchestrator/agents/base.py @@ -10,6 +10,7 @@ import uuid from typing import Any, AsyncIterator from ...adapters.base import ModelAdapter, ModelConfig, StreamChunk +from ...config import settings from ...context.engine import ContextEngine from ...mcp.manager import MCPManager from ...memory.store import MemoryStore @@ -202,7 +203,10 @@ class BaseAgent: conversation.append({ "role": "tool", "tool_call_id": tc["id"], - "content": f"[DUPLICADO] Ya ejecutada con mismos argumentos. Resultado: {prev_exec.raw_output[:2000]}", + "content": ( + "[DUPLICADO] Ya ejecutada con mismos argumentos. Resultado: " + f"{prev_exec.raw_output[:settings.tool_raw_output_max_chars]}" + ), }) logger.warning("Duplicate tool call skipped: %s (fingerprint: %s)", tc["name"], fp[:8]) continue @@ -221,7 +225,11 @@ class BaseAgent: conversation.append({ "role": "tool", "tool_call_id": tc["id"], - "content": tool_exec.raw_output[:8000] if tool_exec.raw_output else tool_exec.result_summary, + "content": ( + tool_exec.raw_output[:settings.tool_raw_output_max_chars] + if tool_exec.raw_output + else tool_exec.result_summary + ), }) # Loop detection: if ALL tool calls in this step were duplicates @@ -304,7 +312,7 @@ class BaseAgent: tool_exec.status = ToolExecutionStatus.COMPLETED tool_exec.result_summary = artifact.summary - tool_exec.raw_output = raw_output[:8000] + tool_exec.raw_output = raw_output[:settings.tool_raw_output_max_chars] tool_exec.duration_ms = duration await self.sse.emit( @@ -313,7 +321,7 @@ class BaseAgent: "tool": tool_name, "status": "completed", "summary": artifact.summary[:200], - "raw_output": raw_output[:4000], + "raw_output": raw_output[:min(4000, settings.tool_raw_output_max_chars)], "tool_call_id": tool_call_id, }, session_id=session.session_id, diff --git a/src/orchestrator/engine.py b/src/orchestrator/engine.py index 7eb3830..8697f45 100644 --- a/src/orchestrator/engine.py +++ b/src/orchestrator/engine.py @@ -13,6 +13,7 @@ from typing import Any from ..adapters.base import ModelAdapter from ..config import settings from ..context.engine import ContextEngine +from ..context.compactor import estimate_tokens from ..mcp.manager import MCPManager from ..memory.store import MemoryStore from ..models.agent import AgentProfile @@ -132,21 +133,19 @@ class OrchestratorEngine: usage = result.get("usage", {"input_tokens": 0, "output_tokens": 0}) key_data = self._extract_key_data_from_results([result]) - session.task_history.append({ - "task_id": task.task_id, - "objective": message, - "agent_id": session.agent_id, - "status": "completed", - "steps": 1, - "facts": task.facts_extracted[-10:], - "key_data": key_data, - "tools_used": [te.tool_name for te in result.get("tool_executions", [])], - "artifacts_count": len(result.get("artifacts", [])), - "summary": f"User: {message[:150]} → Agent: {content[:150]}", - "review": "", - }) - if len(session.task_history) > 20: - session.task_history = session.task_history[-20:] + session.task_history.append( + self._build_task_history_entry( + task_id=task.task_id, + message=message, + content=content, + agent_id=session.agent_id, + facts=task.facts_extracted, + key_data=key_data, + tool_executions=result.get("tool_executions", []), + artifacts_count=len(result.get("artifacts", [])), + ) + ) + session.task_history = self._trim_task_history(session.task_history) # Clean old artifacts artifacts = await self.memory.list_artifacts(session.session_id) @@ -252,3 +251,70 @@ class OrchestratorEngine: if seen_modules: key_data["modules"] = seen_modules[:20] return key_data + + @staticmethod + def _build_task_history_entry( + task_id: str, + message: str, + content: str, + agent_id: str, + facts: list[str], + key_data: dict[str, Any], + tool_executions: list[Any], + artifacts_count: int, + ) -> dict[str, Any]: + message_summary = " ".join(message.strip().split())[:120] + content_summary = " ".join(content.strip().split())[:160] + if content_summary: + summary = f"User: {message_summary} → Agent: {content_summary}" + else: + summary = f"User: {message_summary}" + + tools_used: list[str] = [] + for tool_exec in tool_executions: + tool_name = getattr(tool_exec, "tool_name", "") + if tool_name and tool_name not in tools_used: + tools_used.append(tool_name) + + return { + "task_id": task_id, + "objective": message[:200], + "agent_id": agent_id, + "status": "completed", + "steps": 1, + "facts": facts[-5:], + "key_data": key_data, + "tools_used": tools_used[:8], + "artifacts_count": artifacts_count, + "summary": summary, + "review": "", + } + + @staticmethod + def _trim_task_history(history: list[dict[str, Any]]) -> list[dict[str, Any]]: + if not history: + return [] + + trimmed = history[-settings.task_history_max_entries:] + kept: list[dict[str, Any]] = [] + total_tokens = 0 + + for entry in reversed(trimmed): + entry_tokens = OrchestratorEngine._estimate_task_history_entry_tokens(entry) + if kept and total_tokens + entry_tokens > settings.task_history_max_tokens: + break + kept.append(entry) + total_tokens += entry_tokens + + return list(reversed(kept)) + + @staticmethod + def _estimate_task_history_entry_tokens(entry: dict[str, Any]) -> int: + parts = [ + entry.get("objective", ""), + entry.get("summary", ""), + " ".join(entry.get("facts", [])[:5]), + " ".join(entry.get("tools_used", [])[:5]), + str(entry.get("key_data", {})), + ] + return estimate_tokens("\n".join(p for p in parts if p)) diff --git a/tests/test_context_budget.py b/tests/test_context_budget.py new file mode 100644 index 0000000..f13444e --- /dev/null +++ b/tests/test_context_budget.py @@ -0,0 +1,160 @@ +"""Tests para budget efectivo de contexto e integracion del ContextEngine.""" + +from __future__ import annotations + +import asyncio +import enum +import sys +import types + +import pytest + +if not hasattr(enum, "StrEnum"): + class _CompatStrEnum(str, enum.Enum): + pass + + enum.StrEnum = _CompatStrEnum + +if "anthropic" not in sys.modules: + anthropic_stub = types.ModuleType("anthropic") + + class _AsyncAnthropic: + pass + + anthropic_stub.AsyncAnthropic = _AsyncAnthropic + sys.modules["anthropic"] = anthropic_stub + +if "openai" not in sys.modules: + openai_stub = types.ModuleType("openai") + + class _AsyncOpenAI: + pass + + openai_stub.AsyncOpenAI = _AsyncOpenAI + sys.modules["openai"] = openai_stub + +from src.config import Settings, settings +from src.context.engine import ContextEngine +from src.models.agent import AgentProfile +from src.models.artifacts import ArtifactSummary +from src.models.session import SessionState +from src.orchestrator.engine import OrchestratorEngine + + +class TestSettingsBudget: + def test_effective_budget_uses_explicit_override(self): + cfg = Settings( + context_max_tokens=120_000, + model_context_window=200_000, + model_max_output_tokens=8_192, + _env_file=None, + ) + assert cfg.effective_context_budget == 120_000 + + def test_effective_budget_uses_model_window_when_no_override(self): + cfg = Settings( + context_max_tokens=0, + model_context_window=200_000, + model_max_output_tokens=8_000, + context_reserve_ratio=0.10, + _env_file=None, + ) + assert cfg.reserve_tokens == 20_000 + assert cfg.effective_context_budget == 172_000 + assert cfg.effective_compaction_threshold == 137_600 + + +class TestContextEngine: + def test_build_context_keeps_task_history_and_current_task(self): + session = SessionState( + immutable_rules=["No romper el proyecto"], + project_profile={"name": "demo"}, + task_history=[ + { + "task_id": "prev1", + "objective": "Crear banner", + "status": "completed", + "summary": "User: Crear banner → Agent: Banner creado", + "facts": ["Section: home"], + "key_data": {"sections": ["home"]}, + "tools_used": ["create_module"], + } + ], + ) + session.begin_task("Actualizar hero") + agent = AgentProfile( + role="acai", + name="Acai", + system_prompt="Haz el trabajo.", + ) + + package = asyncio.run(ContextEngine().build_context(session=session, agent=agent)) + + assert "# Session History" in package.system_prompt + assert "# Current Task" in package.system_prompt + + def test_build_context_includes_artifact_memory_with_task_state_agents(self): + session = SessionState( + immutable_rules=["No romper el proyecto"], + project_profile={"name": "demo"}, + ) + task = session.begin_task("Revisar modulo") + agent = AgentProfile( + role="acai", + name="Acai", + system_prompt="Haz el trabajo.", + context_sections=[ + "immutable_rules", + "project_profile", + "task_state", + ], + ) + artifacts = [ + ArtifactSummary( + artifact_id="art-1", + session_id=session.session_id, + task_id=task.task_id, + artifact_type="code", + title="Output of read_file", + summary="Resumen del archivo", + facts=["Status: ok"], + source_tool="read_file", + char_count=120, + ) + ] + + package = asyncio.run( + ContextEngine().build_context( + session=session, + agent=agent, + artifacts=artifacts, + ) + ) + + assert "## Artifacts" in package.system_prompt + assert "Resumen del archivo" in package.system_prompt + + +class TestTaskHistoryTrim: + def test_trim_respects_entry_limit_and_token_budget(self, monkeypatch): + monkeypatch.setattr(settings, "task_history_max_entries", 3) + monkeypatch.setattr(settings, "task_history_max_tokens", 60) + + history = [ + {"objective": "old", "summary": "muy antiguo", "facts": [], "tools_used": [], "key_data": {}}, + { + "objective": "medio", + "summary": "contenido " * 20, + "facts": [], + "tools_used": [], + "key_data": {}, + }, + {"objective": "nuevo", "summary": "corto", "facts": [], "tools_used": [], "key_data": {}}, + {"objective": "final", "summary": "ultimo", "facts": [], "tools_used": [], "key_data": {}}, + ] + + trimmed = OrchestratorEngine._trim_task_history(history) + + assert len(trimmed) <= 3 + assert trimmed[-1]["objective"] == "final" + assert all(entry["objective"] != "old" for entry in trimmed)