From bfccb02373cf9e4750a20a0ad725111760ce0414 Mon Sep 17 00:00:00 2001 From: Jordan Date: Thu, 2 Apr 2026 00:28:57 +0100 Subject: [PATCH] ultimos ajustes --- src/api/routes.py | 42 ++++++++-- src/context/engine.py | 175 +++++++++++++++------------------------ src/memory/embeddings.py | 55 ++++++++++++ 3 files changed, 155 insertions(+), 117 deletions(-) create mode 100644 src/memory/embeddings.py diff --git a/src/api/routes.py b/src/api/routes.py index 1012e96..0873870 100644 --- a/src/api/routes.py +++ b/src/api/routes.py @@ -315,27 +315,29 @@ class LoadKnowledgeRequest(BaseModel): @router.post("/knowledge/load") async def load_knowledge(body: LoadKnowledgeRequest) -> dict[str, Any]: - """Load markdown docs from a directory into the knowledge base.""" + """Load markdown docs from a directory into the knowledge base. + + Generates embeddings for semantic search via OpenAI text-embedding-3-small. + """ memory = _deps.get("memory_store") if not memory: raise HTTPException(status_code=501, detail="Memory store not available") docs_dir = pathlib.Path(body.docs_path) if not docs_dir.is_absolute(): - # Resolve relative to project root docs_dir = pathlib.Path(__file__).resolve().parent.parent.parent / body.docs_path if not docs_dir.is_dir(): raise HTTPException(status_code=400, detail=f"Directory not found: {docs_dir}") - loaded = [] + # Read all docs + docs_data: list[tuple[str, str, str, str, list[str]]] = [] # (id, title, content, summary, tags) for md_file in sorted(docs_dir.glob("*.md")): content = md_file.read_text(encoding="utf-8") doc_id = md_file.stem - - # Build a summary from the first ~500 chars lines = content.strip().splitlines() title = lines[0].lstrip("#").strip() if lines else doc_id + summary_lines = [] for line in lines[:30]: line = line.strip() @@ -345,12 +347,30 @@ async def load_knowledge(body: LoadKnowledgeRequest) -> dict[str, Any]: break summary = " ".join(summary_lines)[:500] - # Extract tags from headings tags = [] for line in lines: if line.startswith("## "): tags.append(line.lstrip("#").strip().lower()[:30]) + docs_data.append((doc_id, title, content, summary, tags[:10])) + + # Generate embeddings in batch + from ..memory.embeddings import EmbeddingService + embed_service = EmbeddingService() + embed_texts = [f"{title}\n{summary}\n{content[:2000]}" for _, title, content, summary, _ in docs_data] + + try: + embeddings = await embed_service.embed_batch(embed_texts) + has_embeddings = True + logger.info("Generated %d embeddings for knowledge base", len(embeddings)) + except Exception as e: + logger.warning("Failed to generate embeddings: %s — loading without semantic search", e) + embeddings = [None] * len(docs_data) + has_embeddings = False + + # Store docs + embeddings + loaded = [] + for i, (doc_id, title, content, summary, tags) in enumerate(docs_data): doc = MemoryDocument( memory_id=doc_id, memory_type=MemoryType.DOCUMENT, @@ -358,20 +378,26 @@ async def load_knowledge(body: LoadKnowledgeRequest) -> dict[str, Any]: title=title, content=content, summary=summary, - tags=tags[:10], + tags=tags, ) await memory.store_document(doc) + + if embeddings[i] is not None: + await memory.store_embedding(doc_id, embeddings[i], namespace="knowledge") + loaded.append({ "id": doc_id, "title": title, "chars": len(content), "tags": tags[:5], + "embedded": embeddings[i] is not None, }) - logger.info("Loaded %d knowledge documents from %s", len(loaded), docs_dir) + logger.info("Loaded %d knowledge documents from %s (embeddings: %s)", len(loaded), docs_dir, has_embeddings) return { "status": "loaded", "count": len(loaded), + "embeddings": has_embeddings, "documents": loaded, } diff --git a/src/context/engine.py b/src/context/engine.py index 778e9df..49de0a6 100644 --- a/src/context/engine.py +++ b/src/context/engine.py @@ -17,6 +17,7 @@ from ..config import settings logger = logging.getLogger(__name__) from ..models.agent import AgentProfile from ..models.artifacts import ArtifactSummary +from ..memory.embeddings import EmbeddingService from ..memory.store import MemoryStore from ..models.context import ( ContextPackage, @@ -47,6 +48,7 @@ class ContextEngine: max_tokens=settings.context_max_tokens ) self.memory = memory_store + self._embed_service: EmbeddingService | None = None # Debug history: last N context builds per session self._history: dict[str, list[dict[str, Any]]] = defaultdict(list) self._max_history = 20 @@ -263,21 +265,15 @@ class ContextEngine: async def _build_knowledge_base( self, session: SessionState ) -> ContextSection | None: - """Load relevant knowledge documents from the memory store. + """Load relevant knowledge documents via semantic search. - Uses keyword matching against the task objective and step - description to select only the most relevant docs. - Max budget: ~15k tokens for knowledge. + Uses embeddings to find the most relevant docs for the current + task. Always includes a title index of ALL docs so the agent + knows what exists and can request more. """ if not self.memory: return None - # Build search terms from current context - search_terms = self._extract_search_terms(session) - if not search_terms: - # No task → load summaries of all docs (lightweight) - return await self._build_knowledge_summaries_only() - all_docs: list[MemoryDocument] = [] all_docs.extend(await self.memory.list_documents( namespace="knowledge", @@ -291,52 +287,57 @@ class ContextEngine: if not all_docs: return None - # Score each doc by relevance - scored = self._score_docs(all_docs, search_terms) + doc_map = {d.memory_id: d for d in all_docs} - # Select top docs within token budget + # Rank docs by semantic similarity + query = self._build_search_query(session) + ranked_ids: list[str] = [] + + if query: + ranked_ids = await self._semantic_rank(query) + + if not ranked_ids: + # No embeddings or no task — sort by size (smallest first) + ranked_ids = [ + d.memory_id + for d in sorted(all_docs, key=lambda d: len(d.content)) + ] + + # Fill token budget with top-ranked docs max_kb_tokens = 15_000 - selected: list[tuple[MemoryDocument, int]] = [] token_budget = max_kb_tokens + full_docs: list[MemoryDocument] = [] - for doc, score in scored: - if score == 0: + for doc_id in ranked_ids: + doc = doc_map.get(doc_id) + if not doc: continue doc_tokens = estimate_tokens(doc.content) - if doc_tokens > token_budget: - # Include summary instead of full content - summary_tokens = estimate_tokens(doc.summary or doc.title) - if summary_tokens < token_budget: - selected.append((doc, -1)) # -1 = summary only - token_budget -= summary_tokens - continue - selected.append((doc, score)) - token_budget -= doc_tokens + if doc_tokens <= token_budget: + full_docs.append(doc) + token_budget -= doc_tokens - if not selected: - return await self._build_knowledge_summaries_only() - - # Build section - full_docs = [(d, s) for d, s in selected if s > 0] - summary_docs = [(d, s) for d, s in selected if s == -1] + # Build section — ALWAYS include title index of ALL docs + included_ids = {d.memory_id for d in full_docs} + not_included = [d for d in all_docs if d.memory_id not in included_ids] lines = [ "# Knowledge Base", - f"_{len(full_docs)} relevant doc(s) loaded, " - f"{len(summary_docs)} summarized, " - f"{len(all_docs) - len(selected)} filtered out_", + f"_{len(full_docs)} doc(s) loaded in full, " + f"{len(not_included)} available on request_", "", ] - for doc, _ in full_docs: + for doc in full_docs: lines.append(f"## {doc.title}") lines.append(doc.content) lines.append("") - if summary_docs: - lines.append("## Other Available Docs (summaries)") - for doc, _ in summary_docs: - lines.append(f"- **{doc.title}**: {doc.summary[:200]}") + if not_included: + lines.append("## Other Available Docs") + lines.append("_Ask for any of these if you need the full content:_") + for doc in not_included: + lines.append(f"- **{doc.title}** ({doc.memory_id}): {doc.summary[:150]}") lines.append("") content = "\n".join(lines) @@ -347,80 +348,36 @@ class ContextEngine: token_estimate=estimate_tokens(content), ) - async def _build_knowledge_summaries_only(self) -> ContextSection | None: - """Lightweight: only doc titles and summaries (no full content).""" - if not self.memory: - return None - docs = await self.memory.list_documents( - namespace="knowledge", memory_type=MemoryType.DOCUMENT - ) - if not docs: - return None - lines = ["# Knowledge Base (summaries)", ""] - for doc in docs: - lines.append(f"- **{doc.title}**: {doc.summary[:150]}") - content = "\n".join(lines) - return ContextSection( - section_type=ContextSectionType.KNOWLEDGE_BASE, - content=content, - priority=60, - token_estimate=estimate_tokens(content), - ) + async def _semantic_rank(self, query: str) -> list[str]: + """Rank knowledge docs by cosine similarity to the query.""" + try: + if not self._embed_service: + self._embed_service = EmbeddingService() - def _extract_search_terms(self, session: SessionState) -> set[str]: - """Extract keywords from the current task for doc matching.""" - terms: set[str] = set() - if not session.current_task: - return terms + query_embedding = await self._embed_service.embed(query) + results = await self.memory.search_by_similarity( + query_embedding=query_embedding, + namespace="knowledge", + top_k=50, + ) + return [doc_id for doc_id, _score in results] - text = session.current_task.objective.lower() - step = session.current_task.current_step() - if step: - text += " " + step.description.lower() - - # Split into words, filter short/common ones - stopwords = { - "de", "la", "el", "en", "un", "una", "los", "las", "del", "al", - "por", "para", "con", "que", "como", "cómo", "qué", "es", "son", - "se", "su", "más", "ya", "si", "no", "este", "esta", "esto", - "the", "a", "an", "is", "are", "and", "or", "to", "in", "of", - "for", "on", "with", "how", "what", "do", "does", "can", - } - for word in text.split(): - word = word.strip(".,;:!?¿¡()[]{}\"'`") - if len(word) >= 3 and word not in stopwords: - terms.add(word) - - return terms + except Exception as e: + logger.warning("Semantic search failed: %s — loading all docs", e) + return [] @staticmethod - def _score_docs( - docs: list[MemoryDocument], terms: set[str] - ) -> list[tuple[MemoryDocument, int]]: - """Score docs by keyword match against title, tags, and content.""" - scored: list[tuple[MemoryDocument, int]] = [] - - for doc in docs: - score = 0 - title_lower = doc.title.lower() - tags_lower = " ".join(doc.tags).lower() - content_lower = doc.content[:2000].lower() - - for term in terms: - # Title match = high weight - if term in title_lower: - score += 10 - # Tag match = medium weight - if term in tags_lower: - score += 5 - # Content match = low weight - if term in content_lower: - score += 1 - - scored.append((doc, score)) - - scored.sort(key=lambda x: x[1], reverse=True) - return scored + def _build_search_query(session: SessionState) -> str: + """Build a natural language query from the current task.""" + if not session.current_task: + return "" + parts = [session.current_task.objective] + step = session.current_task.current_step() + if step: + parts.append(step.description) + if session.current_task.facts_extracted: + parts.extend(session.current_task.facts_extracted[-5:]) + return " ".join(parts) def _build_task_state(self, task: TaskState) -> ContextSection: lines = [ diff --git a/src/memory/embeddings.py b/src/memory/embeddings.py new file mode 100644 index 0000000..bb5842f --- /dev/null +++ b/src/memory/embeddings.py @@ -0,0 +1,55 @@ +"""Embedding service — generates vectors via OpenAI API. + +Used for semantic search over the knowledge base. +""" + +from __future__ import annotations + +import logging +from typing import Any + +from openai import AsyncOpenAI + +from ..config import settings + +logger = logging.getLogger(__name__) + +# Default model: cheap, fast, good enough for doc retrieval +DEFAULT_MODEL = "text-embedding-3-small" +DEFAULT_DIMENSIONS = 1536 + + +class EmbeddingService: + """Generates embeddings via OpenAI's embedding API.""" + + def __init__( + self, + api_key: str | None = None, + model: str = DEFAULT_MODEL, + ) -> None: + self._client = AsyncOpenAI( + api_key=api_key or settings.openai_api_key, + ) + self._model = model + + async def embed(self, text: str) -> list[float]: + """Generate embedding for a single text.""" + # Truncate to ~8k tokens worth of text to stay within limits + text = text[:32_000] + response = await self._client.embeddings.create( + model=self._model, + input=text, + ) + return response.data[0].embedding + + async def embed_batch(self, texts: list[str]) -> list[list[float]]: + """Generate embeddings for multiple texts in one call.""" + # Truncate each + texts = [t[:32_000] for t in texts] + response = await self._client.embeddings.create( + model=self._model, + input=texts, + ) + # Sort by index to maintain order + sorted_data = sorted(response.data, key=lambda d: d.index) + return [d.embedding for d in sorted_data]