Files
agenticSystem/src/models/context.py
2026-04-09 18:27:36 +00:00

69 lines
2.0 KiB
Python

"""Context package and memory document models."""
from __future__ import annotations
from datetime import datetime, timezone
from enum import StrEnum
from typing import Any
from pydantic import BaseModel, Field
class ContextSectionType(StrEnum):
IMMUTABLE_RULES = "immutable_rules"
PROJECT_PROFILE = "project_profile"
KNOWLEDGE_BASE = "knowledge_base"
TASK_HISTORY = "task_history"
TASK_STATE = "task_state"
ARTIFACT_MEMORY = "artifact_memory"
WORKING_CONTEXT = "working_context"
class ContextSection(BaseModel):
"""A discrete section of the assembled context."""
section_type: ContextSectionType
content: str
priority: int = 0 # Higher = more important, kept during compaction
token_estimate: int = 0
class ContextPackage(BaseModel):
"""The fully assembled context sent to the model. Never includes raw tool output."""
sections: list[ContextSection] = Field(default_factory=list)
system_prompt: str = ""
messages: list[dict[str, Any]] = Field(default_factory=list)
total_token_estimate: int = 0
def to_messages(self) -> list[dict[str, Any]]:
"""Produce the final messages list for the model adapter."""
result: list[dict[str, Any]] = []
if self.system_prompt:
result.append({"role": "system", "content": self.system_prompt})
result.extend(self.messages)
return result
class MemoryType(StrEnum):
RULE = "rule"
DOCUMENT = "document"
ARTIFACT = "artifact"
EMBEDDING = "embedding"
class MemoryDocument(BaseModel):
"""A single piece of persistent memory."""
memory_id: str
memory_type: MemoryType
namespace: str = "global"
title: str
content: str
summary: str = ""
tags: list[str] = Field(default_factory=list)
embedding: list[float] | None = None
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
metadata: dict[str, Any] = Field(default_factory=dict)