From 5e64bbdfc88cade0499075067da0565b09eb1735 Mon Sep 17 00:00:00 2001 From: Jordan Diaz Date: Sun, 10 May 2026 18:47:08 +0000 Subject: [PATCH] Ajustes de estructura --- agents/acai/agent.yaml | 1 + agents/acai/system.md | 2 ++ src/api/routes.py | 18 ++++++---- src/config.py | 7 ++++ src/context/engine.py | 15 +++++++- src/models/agent.py | 1 + src/orchestrator/agents/base.py | 31 +++++++++++++---- src/orchestrator/planner.py | 62 ++++++++++++++++++++++++++++++--- src/orchestrator/registry.py | 1 + src/streaming/claude_format.py | 49 ++++++++++++++++++++++++++ 10 files changed, 170 insertions(+), 17 deletions(-) diff --git a/agents/acai/agent.yaml b/agents/acai/agent.yaml index 8051807..5ea2cf0 100644 --- a/agents/acai/agent.yaml +++ b/agents/acai/agent.yaml @@ -12,6 +12,7 @@ context_sections: - task_state allowed_tools: [] model_id: null +# planner_model_id: null # null → usa AGENTIC_PLANNER_MODEL_ID del .env stream_deltas: true kb_load_strategy: top_n kb_max_tokens: 4000 diff --git a/agents/acai/system.md b/agents/acai/system.md index 15bb38c..2c9a80f 100644 --- a/agents/acai/system.md +++ b/agents/acai/system.md @@ -1,5 +1,6 @@ Eres el **asistente de desarrollo de Acai CMS**. Trabajas en un chat conversacional continuo: el usuario te hace peticiones de muy distinto alcance dentro de la misma sesión, y el contexto del proyecto se va acumulando turno a turno. Tu misión es resolver cada petición con el mínimo número de pasos posibles, **reutilizando** lo que ya sabes del proyecto y los turnos anteriores. + # Cuándo planificar y cuándo ejecutar directo Antes de actuar, juzga el alcance de la petición. Hay dos modos de operación: @@ -39,6 +40,7 @@ Tras recibirlo: El plan persiste en `Active Plan` (lo verás en el contexto) hasta que termines o el usuario cambie de tema. Si retomas el mismo objetivo en un turno futuro, continúa por el `→ Step N` actual. Si el usuario te corrige a media ejecución ("no, mejor no toques el header"): ajusta los steps afectados y continúa con los demás. Si la corrección invalida el plan, llama `acai_plan_advance({"abandon": true})` y empieza de nuevo. + # Estructura del proyecto Acai (referencia mínima) diff --git a/src/api/routes.py b/src/api/routes.py index cec5431..7fcf5e2 100644 --- a/src/api/routes.py +++ b/src/api/routes.py @@ -46,9 +46,10 @@ class SendMessageRequest(BaseModel): message: str stream: bool = False agent_id: str | None = None - # 'auto' = el agente decide (heuristica trivial-vs-complex). 'force' = forzar - # acai_plan antes de cualquier ejecucion. UI: toggle en ChatPanel. - plan_mode: str = "auto" + # 'off' (default): la tool acai_plan no se expone al modelo, ejecuta directo. + # 'force': system prompt obliga a llamar acai_plan antes de ejecutar. + # 'auto' (legacy): se trata como 'off'. UI: toggle en ChatPanel. + plan_mode: str = "off" class CompletionRequest(BaseModel): @@ -298,9 +299,14 @@ async def send_message( # Plan mode controlado por el usuario desde el toggle del ChatPanel. # 'auto' (default): heuristica del modelo trivial-vs-complex. # 'force': el agente DEBE llamar acai_plan como primera accion. - plan_mode = (body.plan_mode or "auto").lower() - if plan_mode not in ("auto", "force"): - plan_mode = "auto" + # 'off' (default): la tool acai_plan NO se expone al modelo, ejecuta directo. + # 'force': la tool se expone y system prompt obliga a llamarla primero. + # 'auto' (legacy): se trata como 'off'. + plan_mode = (body.plan_mode or "off").lower() + if plan_mode == "auto": + plan_mode = "off" + if plan_mode not in ("off", "force"): + plan_mode = "off" session.metadata["plan_mode"] = plan_mode from ..mcp.manager import MCPManager diff --git a/src/config.py b/src/config.py index fa3c6c4..5c46b69 100644 --- a/src/config.py +++ b/src/config.py @@ -34,6 +34,13 @@ class Settings(BaseSettings): openai_base_url: str = "" # Custom base URL (for MiniMax, DeepInfra, etc.) default_model_provider: str = "claude" default_model_id: str = "claude-sonnet-4-20250514" + # Modelo override SOLO para el sub-loop del planner (acai_plan). Si vacio, + # usa default_model_id. Pensado para usar un modelo mas potente al planificar + # (p.ej. deepseek-v4-pro) y otro mas rapido al ejecutar (p.ej. deepseek-v4-flash). + planner_model_id: str = "" + # Max tokens del planner. Mas alto que el agente principal porque Pro con + # thinking puede gastar 2-4k tokens razonando antes de emitir el JSON del plan. + planner_max_tokens: int = 16000 max_tokens: int = 4096 temperature: float = 0.3 diff --git a/src/context/engine.py b/src/context/engine.py index aba9429..3577bfa 100644 --- a/src/context/engine.py +++ b/src/context/engine.py @@ -349,7 +349,20 @@ class ContextEngine: # por el registry al cargar). Aqui solo se añaden reglas de sesion # cuando existen — el bloque hardcoded de "Contrato de Contexto" que # vivia aqui se ha movido a `agents/_shared/contract.md` (Fase 3). - parts = [agent.system_prompt or ""] + system_prompt = agent.system_prompt or "" + # Si el usuario tiene el toggle de plan desactivado (plan_mode != "force"), + # quitamos la seccion del system prompt entre + # y . Asi el modelo no ve instrucciones para + # llamar acai_plan y no se inventa el namespace `acai_code__acai_plan`. + if (session.metadata.get("plan_mode") or "off").lower() != "force": + import re + system_prompt = re.sub( + r".*?\n*", + "", + system_prompt, + flags=re.DOTALL, + ) + parts = [system_prompt] if session.immutable_rules: parts.append("\n\n## Session Rules\n") for rule in session.immutable_rules: diff --git a/src/models/agent.py b/src/models/agent.py index 5154613..1d14468 100644 --- a/src/models/agent.py +++ b/src/models/agent.py @@ -19,6 +19,7 @@ class AgentProfile(BaseModel): system_prompt: str = "" allowed_tools: list[str] = Field(default_factory=list) model_id: str | None = None + planner_model_id: str | None = None # override del modelo solo para el sub-loop del planner temperature: float | None = None max_tokens: int | None = None context_sections: list[str] = Field( diff --git a/src/orchestrator/agents/base.py b/src/orchestrator/agents/base.py index c7bc2e3..388181b 100644 --- a/src/orchestrator/agents/base.py +++ b/src/orchestrator/agents/base.py @@ -81,9 +81,11 @@ class BaseAgent: conversation=conversation, ) - # Prepare tool definitions + # Prepare tool definitions. plan_mode "off" oculta acai_plan al + # modelo (toggle del UI desactivado). "force" la expone normalmente. tool_defs = self._get_allowed_tools( followup_mode=str(session.metadata.get("followup_mode", "none")), + plan_mode=str(session.metadata.get("plan_mode", "off") or "off"), ) # Stream model response @@ -146,6 +148,17 @@ class BaseAgent: turn_blocks_by_index[chunk.block_index] = blk if blk.get("type") == "thinking": blk["thinking"] = blk.get("thinking", "") + chunk.thinking_delta + if self.profile.stream_deltas: + await self.sse.emit( + EventType.AGENT_DELTA, + { + "agent": self.profile.role, + "thinking_delta": chunk.thinking_delta, + "block_index": chunk.block_index, + "step": step, + }, + session_id=session.session_id, + ) if chunk.thinking_signature and chunk.block_index >= 0: blk = turn_blocks_by_index.get(chunk.block_index) @@ -941,12 +954,18 @@ class BaseAgent: # ---- Allowed tools -------------------------------------------------------- - def _get_allowed_tools(self, followup_mode: str = "none") -> list[dict[str, Any]]: + def _get_allowed_tools( + self, + followup_mode: str = "none", + plan_mode: str = "force", + ) -> list[dict[str, Any]]: """Return tool definitions filtered by this agent's allowed_tools. - Si el agente tiene `has_planner_tool=True`, anade definiciones sinteticas - de `acai_plan` y `acai_plan_advance` (Fase 5: la tool interna no - atraviesa MCP — se intercepta en `_execute_tool`). + Si el agente tiene `has_planner_tool=True` Y `plan_mode == "force"`, + anade definiciones sinteticas de `acai_plan` y `acai_plan_advance` + (la tool interna no atraviesa MCP — se intercepta en `_execute_tool`). + Cuando `plan_mode != "force"` (toggle del UI desactivado), las tools + del planner NO se exponen y el agente ejecuta directo. """ if followup_mode == "transform": return [] @@ -958,7 +977,7 @@ class BaseAgent: else: tool_defs = list(all_tools) - if self.profile.has_planner_tool: + if self.profile.has_planner_tool and plan_mode == "force": tool_defs.append({ "name": "acai_plan", "description": ( diff --git a/src/orchestrator/planner.py b/src/orchestrator/planner.py index 5586937..c85b5aa 100644 --- a/src/orchestrator/planner.py +++ b/src/orchestrator/planner.py @@ -22,6 +22,7 @@ from dataclasses import dataclass from typing import Any from ..adapters.base import ModelAdapter, ModelConfig +from ..config import settings from ..mcp.manager import MCPManager from ..models.agent import AgentProfile from .tool_groups import PLANNER_TOOLS, strip_namespace @@ -29,6 +30,27 @@ from .tool_groups import PLANNER_TOOLS, strip_namespace logger = logging.getLogger(__name__) +def _serialize_thinking_blocks( + turn_thinking_blocks: dict[int, dict[str, str]], +) -> list[dict[str, Any]]: + """Convierte los thinking blocks acumulados de un turno en bloques + Anthropic-style, ordenados por block_index. DeepSeek (y Anthropic) exigen + que los assistant messages reenvien los thinking blocks con su signature + en turnos siguientes; si no, devuelven 400. + """ + out: list[dict[str, Any]] = [] + for idx in sorted(turn_thinking_blocks.keys()): + blk = turn_thinking_blocks[idx] + if not blk.get("thinking"): + continue + out.append({ + "type": "thinking", + "thinking": blk["thinking"], + "signature": blk.get("signature", ""), + }) + return out + + @dataclass class PlannerResult: """Resultado del sub-loop del planner.""" @@ -179,8 +201,20 @@ async def run_planner_subloop( ] config = ModelConfig( - model_id=agent_profile.model_id or "", - max_tokens=agent_profile.max_tokens or 4096, + # Resolucion del modelo del planner (mas a menos prioritario): + # 1) planner_model_id del agent yaml (override per-agent) + # 2) AGENTIC_PLANNER_MODEL_ID en .env (override global) + # 3) model_id del agent (mismo que ejecuciones) + # 4) default_model_id global (fallback final) + model_id=( + agent_profile.planner_model_id + or settings.planner_model_id + or agent_profile.model_id + or settings.default_model_id + ), + # Mas tokens que el agente principal: Pro con thinking puede gastar + # 2-4k razonando antes del JSON del plan; con 4k se truncaba. + max_tokens=settings.planner_max_tokens or 16000, # Temperatura mas baja que el agente principal — queremos JSON limpio. temperature=0.1, ) @@ -196,6 +230,10 @@ async def run_planner_subloop( active_tools: dict[str, dict[str, Any]] = {} tool_calls_this_step: list[dict[str, Any]] = [] finish_reason = "" + # Bloques de thinking de ESTE turno indexados por block_index. DeepSeek + # (y cualquier API Anthropic con thinking on) exige reenviar los bloques + # thinking + signature en los assistant messages de turnos siguientes. + turn_thinking_blocks: dict[int, dict[str, str]] = {} async for chunk in model_adapter.stream( messages=messages, @@ -207,6 +245,17 @@ async def run_planner_subloop( if chunk.thinking_delta: accumulated_thinking += chunk.thinking_delta + if chunk.block_index >= 0: + blk = turn_thinking_blocks.setdefault( + chunk.block_index, {"thinking": "", "signature": ""} + ) + blk["thinking"] += chunk.thinking_delta + + if chunk.thinking_signature and chunk.block_index >= 0: + blk = turn_thinking_blocks.setdefault( + chunk.block_index, {"thinking": "", "signature": ""} + ) + blk["signature"] = chunk.thinking_signature if chunk.tool_name and chunk.tool_call_id: if chunk.tool_call_id not in active_tools: @@ -259,7 +308,11 @@ async def run_planner_subloop( tool_executions=tool_executions_log, ) # Reintenta con un mensaje de correccion explicito. - messages.append({"role": "assistant", "content": full_text or accumulated_text}) + # Reenviar thinking blocks (con signature) si los hubo — DeepSeek + # rechaza el siguiente turno si el assistant message los omite. + retry_blocks: list[dict[str, Any]] = _serialize_thinking_blocks(turn_thinking_blocks) + retry_blocks.append({"type": "text", "text": full_text or accumulated_text}) + messages.append({"role": "assistant", "content": retry_blocks}) messages.append({ "role": "user", "content": ( @@ -272,7 +325,8 @@ async def run_planner_subloop( # Si llamo tools, ejecutamos las tools y seguimos el sub-loop. # Adjuntamos el assistant message con tool_use blocks y los tool_results. - assistant_blocks: list[dict[str, Any]] = [] + # Reenviar thinking blocks (con signature) primero — requerido por DeepSeek. + assistant_blocks: list[dict[str, Any]] = _serialize_thinking_blocks(turn_thinking_blocks) if full_text: assistant_blocks.append({"type": "text", "text": full_text}) for tc in tool_calls_this_step: diff --git a/src/orchestrator/registry.py b/src/orchestrator/registry.py index db9a385..3d18c47 100644 --- a/src/orchestrator/registry.py +++ b/src/orchestrator/registry.py @@ -103,6 +103,7 @@ class AgentRegistry: system_prompt=system_prompt, allowed_tools=meta.get("allowed_tools", []), model_id=meta.get("model_id"), + planner_model_id=meta.get("planner_model_id"), temperature=meta.get("temperature"), max_tokens=meta.get("max_tokens"), context_sections=meta.get("context_sections", [ diff --git a/src/streaming/claude_format.py b/src/streaming/claude_format.py index 8126c89..fb9ee14 100644 --- a/src/streaming/claude_format.py +++ b/src/streaming/claude_format.py @@ -35,6 +35,8 @@ class ClaudeFormatEmitter: self._tool_block_index: dict[str, dict[str, int]] = {} # session -> {tool_call_id -> index} self._content_blocks: dict[str, list[dict[str, Any]]] = {} self._text_accumulator: dict[str, str] = {} + self._thinking_block_open: dict[str, bool] = {} + self._thinking_block_index: dict[str, int] = {} def _next_index(self, session_id: str) -> int: idx = self._block_counter.get(session_id, 0) @@ -48,6 +50,8 @@ class ClaudeFormatEmitter: self._tool_block_index[session_id] = {} self._content_blocks[session_id] = [] self._text_accumulator[session_id] = "" + self._thinking_block_open[session_id] = False + self._thinking_block_index[session_id] = -1 def _push(self, session_id: str, payload: dict[str, Any]) -> None: """Push a formatted line to all subscribers of a session.""" @@ -119,7 +123,43 @@ class ClaudeFormatEmitter: tool_args = data.get("tool_arguments", "") tool_call_id = data.get("tool_call_id", "") + thinking_delta = data.get("thinking_delta", "") + if thinking_delta: + # Cerrar text block abierto si lo hay + self._close_text_block(session_id) + # Abrir thinking block si no esta abierto + if not self._thinking_block_open.get(session_id): + idx = self._next_index(session_id) + self._thinking_block_index[session_id] = idx + self._thinking_block_open[session_id] = True + self._push(session_id, { + "type": "stream_event", + "event": { + "type": "content_block_start", + "index": idx, + "content_block": {"type": "thinking", "thinking": ""}, + }, + }) + idx = self._thinking_block_index[session_id] + self._push(session_id, { + "type": "stream_event", + "event": { + "type": "content_block_delta", + "index": idx, + "delta": {"type": "thinking_delta", "thinking": thinking_delta}, + }, + }) + return + if delta_text: + # Cerrar thinking block abierto si lo hay antes de texto normal + if self._thinking_block_open.get(session_id): + idx = self._thinking_block_index[session_id] + self._push(session_id, { + "type": "stream_event", + "event": {"type": "content_block_stop", "index": idx}, + }) + self._thinking_block_open[session_id] = False # Text streaming if not self._text_block_open.get(session_id): self._open_text_block(session_id) @@ -152,6 +192,15 @@ class ClaudeFormatEmitter: tool_name = data.get("tool", "unknown") tool_call_id = data.get("tool_call_id", "") + # Cerrar thinking block abierto si lo hay + if self._thinking_block_open.get(session_id): + idx = self._thinking_block_index[session_id] + self._push(session_id, { + "type": "stream_event", + "event": {"type": "content_block_stop", "index": idx}, + }) + self._thinking_block_open[session_id] = False + # Close open text block self._close_text_block(session_id)