Runtime IA: modelo dinámico, razonamiento, coste por modelo y visión nativa
- Resolución dinámica del modelo por sesión (model_resolver): override de usuario (metadata) → default global (Redis db 0 acai:config:ai:*) → fallback. Mapea a string litellm; LiteLLMAdapter respeta el modelo por request y enruta openrouter/* con OPENROUTER_API_KEY del entorno. - Razonamiento: reasoning_effort por sesión en ModelConfig/AgentProfile, aplicado al agente y al planner. - Coste: cost.py calcula por modelo (catálogo OpenRouter/DeepSeek en Redis → litellm → fijo) y emite tarifas + modelo usado en EXECUTION_COMPLETED. - Visión nativa: imágenes como bloques image_url en el turno del usuario (TaskState.image_attachments → Context Engine → adapter), con persistencia en recent_messages y conteo de tokens de imagen (~1500). - El turno no se pierde al cancelar: se persiste el mensaje del usuario + marca de interrupción para que un "vuelve a intentarlo" tenga contexto. - Fix analyze_image: preservar el subdirectorio de usuario del chat-upload (basename descartaba "<user>/" → not found). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -57,11 +57,16 @@ function resolveChatPreviewPath(imageUrl) {
|
|||||||
const fileParam = qs.get("file");
|
const fileParam = qs.get("file");
|
||||||
if (!fileParam) return null;
|
if (!fileParam) return null;
|
||||||
|
|
||||||
// Sanitizar: evitar traversal — solo nombre base permitido
|
// Sanitizar contra traversal PRESERVANDO el subdirectorio de usuario
|
||||||
const safeName = path.basename(fileParam);
|
// (el file= es "<username>/<archivo>"; basename lo perdía → not found).
|
||||||
if (!safeName || safeName === "." || safeName === "..") return null;
|
if (fileParam.includes("..") || fileParam.startsWith("/") || fileParam.includes("\\")) return null;
|
||||||
|
|
||||||
return path.join(CHAT_UPLOADS_DIR, safeName);
|
const fullPath = path.join(CHAT_UPLOADS_DIR, fileParam);
|
||||||
|
// Asegurar que queda dentro de CHAT_UPLOADS_DIR.
|
||||||
|
const base = path.resolve(CHAT_UPLOADS_DIR);
|
||||||
|
if (!path.resolve(fullPath).startsWith(base + path.sep)) return null;
|
||||||
|
|
||||||
|
return fullPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -57,6 +57,10 @@ class ModelConfig:
|
|||||||
max_tokens: int = 4096
|
max_tokens: int = 4096
|
||||||
temperature: float = 0.3
|
temperature: float = 0.3
|
||||||
stop_sequences: list[str] = field(default_factory=list)
|
stop_sequences: list[str] = field(default_factory=list)
|
||||||
|
# Nivel de razonamiento (minimal|low|medium|high). Vacío = sin razonamiento
|
||||||
|
# explícito. LiteLLM lo traduce por proveedor; modelos que no lo soportan lo
|
||||||
|
# ignoran (litellm.drop_params=True).
|
||||||
|
reasoning_effort: str = ""
|
||||||
extra: dict[str, Any] = field(default_factory=dict)
|
extra: dict[str, Any] = field(default_factory=dict)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -59,7 +59,23 @@ class LiteLLMAdapter(OpenAIAdapter):
|
|||||||
|
|
||||||
async def _acreate(self, kwargs: dict[str, Any]):
|
async def _acreate(self, kwargs: dict[str, Any]):
|
||||||
kwargs = dict(kwargs)
|
kwargs = dict(kwargs)
|
||||||
kwargs["model"] = self._litellm_model
|
# Respetar el model_id por request (resuelto dinámicamente en
|
||||||
|
# send_message). Solo se honra si trae prefijo de proveedor
|
||||||
|
# ("deepseek/...", "openrouter/..."); cualquier otro valor (default
|
||||||
|
# no-litellm, vacío) cae al modelo por defecto del adapter — preserva el
|
||||||
|
# comportamiento previo para llamadas internas (planner, completions).
|
||||||
|
model = kwargs.get("model") or ""
|
||||||
|
if "/" not in model:
|
||||||
|
model = self._litellm_model
|
||||||
|
kwargs["model"] = model
|
||||||
|
|
||||||
|
if model.startswith("openrouter/"):
|
||||||
|
# OpenRouter: LiteLLM enruta con OPENROUTER_API_KEY del entorno y su
|
||||||
|
# base propia. NO forzar api_key/api_base del proxy DeepSeek — lo
|
||||||
|
# sobreescribirían y romperían el routing.
|
||||||
|
kwargs.pop("api_key", None)
|
||||||
|
kwargs.pop("api_base", None)
|
||||||
|
else:
|
||||||
if self._api_key:
|
if self._api_key:
|
||||||
kwargs["api_key"] = self._api_key
|
kwargs["api_key"] = self._api_key
|
||||||
if self._api_base:
|
if self._api_base:
|
||||||
|
|||||||
@@ -77,6 +77,8 @@ class OpenAIAdapter(ModelAdapter):
|
|||||||
"stream": True,
|
"stream": True,
|
||||||
"stream_options": {"include_usage": True},
|
"stream_options": {"include_usage": True},
|
||||||
}
|
}
|
||||||
|
if getattr(config, "reasoning_effort", ""):
|
||||||
|
kwargs["reasoning_effort"] = config.reasoning_effort
|
||||||
if tools:
|
if tools:
|
||||||
kwargs["tools"] = self._format_tools(tools)
|
kwargs["tools"] = self._format_tools(tools)
|
||||||
|
|
||||||
@@ -266,6 +268,8 @@ class OpenAIAdapter(ModelAdapter):
|
|||||||
"temperature": config.temperature,
|
"temperature": config.temperature,
|
||||||
"messages": self._to_openai_messages(messages),
|
"messages": self._to_openai_messages(messages),
|
||||||
}
|
}
|
||||||
|
if getattr(config, "reasoning_effort", ""):
|
||||||
|
kwargs["reasoning_effort"] = config.reasoning_effort
|
||||||
if tools:
|
if tools:
|
||||||
kwargs["tools"] = self._format_tools(tools)
|
kwargs["tools"] = self._format_tools(tools)
|
||||||
# Fuerza al modelo a usar un tool concreto para garantizar JSON por schema
|
# Fuerza al modelo a usar un tool concreto para garantizar JSON por schema
|
||||||
@@ -428,8 +432,9 @@ class OpenAIAdapter(ModelAdapter):
|
|||||||
if tool_calls:
|
if tool_calls:
|
||||||
m["tool_calls"] = tool_calls
|
m["tool_calls"] = tool_calls
|
||||||
out.append(m)
|
out.append(m)
|
||||||
else: # user (puede traer tool_result blocks)
|
else: # user (puede traer tool_result blocks, texto e imágenes)
|
||||||
text_parts = []
|
text_parts = []
|
||||||
|
image_blocks: list[dict[str, Any]] = []
|
||||||
for b in content:
|
for b in content:
|
||||||
if not isinstance(b, dict):
|
if not isinstance(b, dict):
|
||||||
continue
|
continue
|
||||||
@@ -442,7 +447,18 @@ class OpenAIAdapter(ModelAdapter):
|
|||||||
})
|
})
|
||||||
elif t == "text":
|
elif t == "text":
|
||||||
text_parts.append(b.get("text", ""))
|
text_parts.append(b.get("text", ""))
|
||||||
if text_parts:
|
elif t == "image_url":
|
||||||
|
# Visión nativa: preservar el bloque en formato multimodal OpenAI.
|
||||||
|
image_blocks.append({"type": "image_url", "image_url": b.get("image_url") or {}})
|
||||||
|
if image_blocks:
|
||||||
|
# Content como lista de bloques (texto + imágenes).
|
||||||
|
parts: list[dict[str, Any]] = []
|
||||||
|
joined = "\n".join(p for p in text_parts if p)
|
||||||
|
if joined:
|
||||||
|
parts.append({"type": "text", "text": joined})
|
||||||
|
parts.extend(image_blocks)
|
||||||
|
out.append({"role": "user", "content": parts})
|
||||||
|
elif text_parts:
|
||||||
out.append({"role": "user", "content": "\n".join(text_parts)})
|
out.append({"role": "user", "content": "\n".join(text_parts)})
|
||||||
# Guard defensivo: el compactor ya garantiza el invariante tool_use ↔
|
# Guard defensivo: el compactor ya garantiza el invariante tool_use ↔
|
||||||
# tool_result (`_enforce_tool_pairing`), pero si algo se escapa el
|
# tool_result (`_enforce_tool_pairing`), pero si algo se escapa el
|
||||||
|
|||||||
@@ -46,6 +46,10 @@ class SendMessageRequest(BaseModel):
|
|||||||
message: str
|
message: str
|
||||||
stream: bool = False
|
stream: bool = False
|
||||||
agent_id: str | None = None
|
agent_id: str | None = None
|
||||||
|
# Imágenes para visión nativa: bloques listos para el modelo
|
||||||
|
# {"type":"image_url","image_url":{"url":"data:<mime>;base64,..."}}. Solo se
|
||||||
|
# envían cuando el modelo activo es multimodal (lo decide acai-app).
|
||||||
|
attachments: list[dict[str, Any]] | None = None
|
||||||
# 'off' (default): la tool acai_plan no se expone al modelo, ejecuta directo.
|
# 'off' (default): la tool acai_plan no se expone al modelo, ejecuta directo.
|
||||||
# 'force': system prompt obliga a llamar acai_plan antes de ejecutar.
|
# 'force': system prompt obliga a llamar acai_plan antes de ejecutar.
|
||||||
# 'auto' (legacy): se trata como 'off'. UI: toggle en ChatPanel.
|
# 'auto' (legacy): se trata como 'off'. UI: toggle en ChatPanel.
|
||||||
@@ -335,6 +339,25 @@ async def send_message(
|
|||||||
if not agent_profile:
|
if not agent_profile:
|
||||||
agent_profile = agent_reg.get(agent_reg.default_agent_id)
|
agent_profile = agent_reg.get(agent_reg.default_agent_id)
|
||||||
|
|
||||||
|
# Resolución dinámica del modelo (Fase 2): override por-usuario (metadata de
|
||||||
|
# la sesión) → default global (Redis acai:config:ai:*). Si resuelve, se
|
||||||
|
# inyecta en una COPIA del profile para no mutar el del registry (singleton).
|
||||||
|
if agent_profile is not None:
|
||||||
|
from ..orchestrator.model_resolver import resolve_session_model
|
||||||
|
resolved = await resolve_session_model(session)
|
||||||
|
update = {}
|
||||||
|
if resolved.get("model_id"):
|
||||||
|
update["model_id"] = resolved["model_id"]
|
||||||
|
if resolved.get("reasoning_effort"):
|
||||||
|
update["reasoning_effort"] = resolved["reasoning_effort"]
|
||||||
|
if update:
|
||||||
|
agent_profile = agent_profile.model_copy(update=update)
|
||||||
|
logger.info(
|
||||||
|
"Session %s: modelo resuelto -> %s (reasoning=%s)",
|
||||||
|
session_id, update.get("model_id", "(default)"),
|
||||||
|
update.get("reasoning_effort", "off"),
|
||||||
|
)
|
||||||
|
|
||||||
# Plan mode controlado por el usuario desde el toggle del ChatPanel.
|
# Plan mode controlado por el usuario desde el toggle del ChatPanel.
|
||||||
# 'auto' (default): heuristica del modelo trivial-vs-complex.
|
# 'auto' (default): heuristica del modelo trivial-vs-complex.
|
||||||
# 'force': el agente DEBE llamar acai_plan como primera accion.
|
# 'force': el agente DEBE llamar acai_plan como primera accion.
|
||||||
@@ -359,7 +382,7 @@ async def send_message(
|
|||||||
|
|
||||||
if body.stream:
|
if body.stream:
|
||||||
task = asyncio.create_task(
|
task = asyncio.create_task(
|
||||||
_execute_and_persist(orchestrator, storage, session, body.message)
|
_execute_and_persist(orchestrator, storage, session, body.message, body.attachments)
|
||||||
)
|
)
|
||||||
_running_executions[session_id] = task
|
_running_executions[session_id] = task
|
||||||
# Auto-limpieza del registro al terminar (solo si seguimos siendo la
|
# Auto-limpieza del registro al terminar (solo si seguimos siendo la
|
||||||
@@ -377,11 +400,11 @@ async def send_message(
|
|||||||
"stream_url": f"/sessions/{session_id}/stream",
|
"stream_url": f"/sessions/{session_id}/stream",
|
||||||
}
|
}
|
||||||
|
|
||||||
result = await _execute_and_persist(orchestrator, storage, session, body.message)
|
result = await _execute_and_persist(orchestrator, storage, session, body.message, body.attachments)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
async def _execute_and_persist(orchestrator, storage, session, message) -> dict[str, Any]:
|
async def _execute_and_persist(orchestrator, storage, session, message, attachments=None) -> dict[str, Any]:
|
||||||
# Acquire exclusive lock — prevents concurrent execution on same session
|
# Acquire exclusive lock — prevents concurrent execution on same session
|
||||||
async with storage.session_lock(session.session_id) as acquired:
|
async with storage.session_lock(session.session_id) as acquired:
|
||||||
if not acquired:
|
if not acquired:
|
||||||
@@ -392,7 +415,7 @@ async def _execute_and_persist(orchestrator, storage, session, message) -> dict[
|
|||||||
}
|
}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
result = await orchestrator.process_message(session, message)
|
result = await orchestrator.process_message(session, message, attachments)
|
||||||
return result
|
return result
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
# Ejecución abortada por el usuario (stop) o preemptada por un
|
# Ejecución abortada por el usuario (stop) o preemptada por un
|
||||||
@@ -401,6 +424,24 @@ async def _execute_and_persist(orchestrator, storage, session, message) -> dict[
|
|||||||
# que el `await task` de la cancelación complete. El `finally`
|
# que el `await task` de la cancelación complete. El `finally`
|
||||||
# persiste el estado y el `session_lock` se libera al salir.
|
# persiste el estado y el `session_lock` se libera al salir.
|
||||||
logger.info("Execution cancelled for session %s", session.session_id)
|
logger.info("Execution cancelled for session %s", session.session_id)
|
||||||
|
# Persistir el turno del usuario aunque se cancele: si no, un
|
||||||
|
# "vuelve a intentarlo" posterior se queda sin contexto de lo pedido.
|
||||||
|
# Guardamos su mensaje (+ imagen) y un marcador de interrupción para
|
||||||
|
# mantener la alternancia user/assistant.
|
||||||
|
try:
|
||||||
|
task = session.current_task
|
||||||
|
if task and (task.objective or "").strip():
|
||||||
|
session.recent_messages = orchestrator._append_recent_messages(
|
||||||
|
session.recent_messages,
|
||||||
|
message=task.objective,
|
||||||
|
conversation=[{
|
||||||
|
"role": "assistant",
|
||||||
|
"content": "[Respuesta interrumpida por el usuario antes de completarse]",
|
||||||
|
}],
|
||||||
|
image_attachments=task.image_attachments,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("No se pudo persistir el turno cancelado")
|
||||||
session.status = SessionStatus.ACTIVE
|
session.status = SessionStatus.ACTIVE
|
||||||
session.current_task = None
|
session.current_task = None
|
||||||
raise
|
raise
|
||||||
|
|||||||
@@ -890,6 +890,10 @@ class ContextCompactor:
|
|||||||
elif btype == "tool_result":
|
elif btype == "tool_result":
|
||||||
tc = block.get("content", "")
|
tc = block.get("content", "")
|
||||||
tokens += estimate_tokens(tc if isinstance(tc, str) else str(tc))
|
tokens += estimate_tokens(tc if isinstance(tc, str) else str(tc))
|
||||||
|
elif btype == "image_url":
|
||||||
|
# Una imagen ~1500 tokens. NO medir el base64 como texto, que
|
||||||
|
# lo contaría como ~30k y reventaría presupuestos/trim.
|
||||||
|
tokens += 1500
|
||||||
else:
|
else:
|
||||||
tokens += estimate_tokens(str(block))
|
tokens += estimate_tokens(str(block))
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -924,7 +924,18 @@ class ContextEngine:
|
|||||||
messages.append({"role": "user", "content": "\n".join(history_lines)})
|
messages.append({"role": "user", "content": "\n".join(history_lines)})
|
||||||
messages.append({"role": "assistant", "content": "Entendido, tengo el contexto del historial. ¿En qué puedo ayudarte ahora?"})
|
messages.append({"role": "assistant", "content": "Entendido, tengo el contexto del historial. ¿En qué puedo ayudarte ahora?"})
|
||||||
|
|
||||||
# Current user message
|
# Current user message — con imágenes adjuntas (visión nativa) si las hay.
|
||||||
|
# En ese caso el content pasa a ser lista de bloques [texto, image_url...].
|
||||||
|
image_attachments = []
|
||||||
|
if session.current_task and getattr(session.current_task, "image_attachments", None):
|
||||||
|
image_attachments = [
|
||||||
|
b for b in session.current_task.image_attachments if isinstance(b, dict)
|
||||||
|
]
|
||||||
|
if image_attachments:
|
||||||
|
content_blocks = [{"type": "text", "text": user_content}]
|
||||||
|
content_blocks.extend(image_attachments)
|
||||||
|
messages.append({"role": "user", "content": content_blocks})
|
||||||
|
else:
|
||||||
messages.append({"role": "user", "content": user_content})
|
messages.append({"role": "user", "content": user_content})
|
||||||
|
|
||||||
# Append real conversation (assistant messages + tool results from current step)
|
# Append real conversation (assistant messages + tool results from current step)
|
||||||
@@ -1037,6 +1048,10 @@ class ContextEngine:
|
|||||||
elif btype == "tool_result":
|
elif btype == "tool_result":
|
||||||
tc = block.get("content", "")
|
tc = block.get("content", "")
|
||||||
total += estimate_tokens(tc if isinstance(tc, str) else str(tc))
|
total += estimate_tokens(tc if isinstance(tc, str) else str(tc))
|
||||||
|
elif btype == "image_url":
|
||||||
|
# Heurística conservadora: una imagen ~1500 tokens (no se
|
||||||
|
# cuenta el base64 como texto, que infla muchísimo).
|
||||||
|
total += 1500
|
||||||
else:
|
else:
|
||||||
total += estimate_tokens(str(block))
|
total += estimate_tokens(str(block))
|
||||||
return total
|
return total
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ class AgentProfile(BaseModel):
|
|||||||
allowed_tools: list[str] = Field(default_factory=list)
|
allowed_tools: list[str] = Field(default_factory=list)
|
||||||
model_id: str | None = None
|
model_id: str | None = None
|
||||||
planner_model_id: str | None = None # override del modelo solo para el sub-loop del planner
|
planner_model_id: str | None = None # override del modelo solo para el sub-loop del planner
|
||||||
|
reasoning_effort: str | None = None # nivel de razonamiento (minimal|low|medium|high) resuelto por sesión
|
||||||
temperature: float | None = None
|
temperature: float | None = None
|
||||||
max_tokens: int | None = None
|
max_tokens: int | None = None
|
||||||
context_sections: list[str] = Field(
|
context_sections: list[str] = Field(
|
||||||
|
|||||||
@@ -46,6 +46,9 @@ class TaskState(BaseModel):
|
|||||||
|
|
||||||
task_id: str = Field(default_factory=lambda: uuid.uuid4().hex[:12])
|
task_id: str = Field(default_factory=lambda: uuid.uuid4().hex[:12])
|
||||||
objective: str
|
objective: str
|
||||||
|
# Imágenes adjuntas a la petición actual (visión nativa). Cada item es un
|
||||||
|
# bloque listo para el modelo: {"type":"image_url","image_url":{"url":"data:..."}}.
|
||||||
|
image_attachments: list[dict[str, Any]] = Field(default_factory=list)
|
||||||
status: TaskStatus = TaskStatus.PENDING
|
status: TaskStatus = TaskStatus.PENDING
|
||||||
plan: list[TaskStep] = Field(default_factory=list)
|
plan: list[TaskStep] = Field(default_factory=list)
|
||||||
current_step_index: int = 0
|
current_step_index: int = 0
|
||||||
@@ -94,8 +97,8 @@ class SessionState(BaseModel):
|
|||||||
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
||||||
metadata: dict[str, Any] = Field(default_factory=dict)
|
metadata: dict[str, Any] = Field(default_factory=dict)
|
||||||
|
|
||||||
def begin_task(self, objective: str) -> TaskState:
|
def begin_task(self, objective: str, image_attachments: list[dict[str, Any]] | None = None) -> TaskState:
|
||||||
task = TaskState(objective=objective)
|
task = TaskState(objective=objective, image_attachments=image_attachments or [])
|
||||||
self.current_task = task
|
self.current_task = task
|
||||||
self.status = SessionStatus.EXECUTING
|
self.status = SessionStatus.EXECUTING
|
||||||
self.turn_count += 1
|
self.turn_count += 1
|
||||||
|
|||||||
@@ -93,6 +93,7 @@ class BaseAgent:
|
|||||||
model_id=self.profile.model_id or "",
|
model_id=self.profile.model_id or "",
|
||||||
max_tokens=self.profile.max_tokens or 4096,
|
max_tokens=self.profile.max_tokens or 4096,
|
||||||
temperature=self.profile.temperature or 0.3,
|
temperature=self.profile.temperature or 0.3,
|
||||||
|
reasoning_effort=self.profile.reasoning_effort or "",
|
||||||
)
|
)
|
||||||
|
|
||||||
# Snapshot del numero de tool_executions ya acumulados ANTES del
|
# Snapshot del numero de tool_executions ya acumulados ANTES del
|
||||||
|
|||||||
121
src/orchestrator/cost.py
Normal file
121
src/orchestrator/cost.py
Normal file
@@ -0,0 +1,121 @@
|
|||||||
|
"""Cálculo de coste por modelo (Fase 2).
|
||||||
|
|
||||||
|
Prioridad de fuentes de precio (para que el coste registrado en
|
||||||
|
`consumo_acaicode` coincida con lo que muestra el Forge Admin Panel):
|
||||||
|
1. Catálogo OpenRouter cacheado por el panel en Redis db 0
|
||||||
|
(`acai:config:ai:models_cache:openrouter` → price_in_1m / price_out_1m).
|
||||||
|
2. Price map de LiteLLM (conoce muchos modelos deepseek/, anthropic/, etc.).
|
||||||
|
3. Coste fijo de `settings` (comportamiento previo).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import redis.asyncio as redis
|
||||||
|
|
||||||
|
from ..config import settings
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Caches de catálogo que publica el Forge Admin Panel en Redis db 0, por proveedor.
|
||||||
|
# El id se guarda SIN el prefijo de proveedor de litellm (p.ej.
|
||||||
|
# "moonshotai/kimi-k2.7-code", "deepseek-v4-pro").
|
||||||
|
_CACHE_KEYS = {
|
||||||
|
"openrouter": "acai:config:ai:models_cache:openrouter",
|
||||||
|
"deepseek": "acai:config:ai:models_cache:deepseek",
|
||||||
|
}
|
||||||
|
_CONFIG_DB = 0
|
||||||
|
_cfg_redis: "redis.Redis | None" = None
|
||||||
|
|
||||||
|
|
||||||
|
def _get_cfg_redis() -> "redis.Redis":
|
||||||
|
global _cfg_redis
|
||||||
|
if _cfg_redis is None:
|
||||||
|
_cfg_redis = redis.Redis(
|
||||||
|
host=settings.redis_host,
|
||||||
|
port=settings.redis_port,
|
||||||
|
db=_CONFIG_DB,
|
||||||
|
password=settings.redis_password or None,
|
||||||
|
decode_responses=True,
|
||||||
|
)
|
||||||
|
return _cfg_redis
|
||||||
|
|
||||||
|
|
||||||
|
async def _catalog_price_per_1m(model_id: str | None):
|
||||||
|
"""(price_in_1m, price_out_1m) del catálogo del panel, o None.
|
||||||
|
|
||||||
|
model_id viene en formato litellm ("<provider>/<id>"). Separamos el prefijo
|
||||||
|
de proveedor para elegir el cache y buscar por el id catalogado.
|
||||||
|
"""
|
||||||
|
if not model_id or "/" not in model_id:
|
||||||
|
return None
|
||||||
|
provider, _, raw_id = model_id.partition("/")
|
||||||
|
cache_key = _CACHE_KEYS.get(provider)
|
||||||
|
if not cache_key:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
cached = await _get_cfg_redis().get(cache_key)
|
||||||
|
if not cached:
|
||||||
|
return None
|
||||||
|
models = json.loads(cached)
|
||||||
|
except Exception as e: # pragma: no cover - defensivo
|
||||||
|
logger.warning("catálogo %s no disponible para coste: %s", provider, e)
|
||||||
|
return None
|
||||||
|
for m in models:
|
||||||
|
if m.get("id") == raw_id:
|
||||||
|
pin = m.get("price_in_1m")
|
||||||
|
pout = m.get("price_out_1m")
|
||||||
|
if pin is not None and pout is not None:
|
||||||
|
return (float(pin), float(pout))
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
async def compute_cost(model_id: str | None, input_tokens: int, output_tokens: int) -> dict:
|
||||||
|
"""Coste de una ejecución para `model_id` y los tokens dados.
|
||||||
|
|
||||||
|
Devuelve {"cost_usd", "input_cost_1m", "output_cost_1m"} — el coste total y
|
||||||
|
las tarifas por 1M tokens REALMENTE aplicadas (se almacenan en
|
||||||
|
`consumo_acaicode.input_cost_1M` / `output_cost_1M`).
|
||||||
|
"""
|
||||||
|
input_tokens = int(input_tokens or 0)
|
||||||
|
output_tokens = int(output_tokens or 0)
|
||||||
|
|
||||||
|
def _result(in_1m: float, out_1m: float) -> dict:
|
||||||
|
return {
|
||||||
|
"cost_usd": (input_tokens / 1_000_000) * in_1m + (output_tokens / 1_000_000) * out_1m,
|
||||||
|
"input_cost_1m": round(in_1m, 6),
|
||||||
|
"output_cost_1m": round(out_1m, 6),
|
||||||
|
}
|
||||||
|
|
||||||
|
# 1. Precio del catálogo OpenRouter (fuente que muestra el admin).
|
||||||
|
prices = await _catalog_price_per_1m(model_id)
|
||||||
|
if prices:
|
||||||
|
return _result(prices[0], prices[1])
|
||||||
|
|
||||||
|
# 2. Price map de LiteLLM (deepseek/, anthropic/, etc.).
|
||||||
|
if model_id and "/" in model_id:
|
||||||
|
try:
|
||||||
|
import litellm
|
||||||
|
|
||||||
|
prompt_cost, completion_cost = litellm.cost_per_token(
|
||||||
|
model=model_id,
|
||||||
|
prompt_tokens=input_tokens,
|
||||||
|
completion_tokens=output_tokens,
|
||||||
|
)
|
||||||
|
total = (prompt_cost or 0.0) + (completion_cost or 0.0)
|
||||||
|
if total > 0:
|
||||||
|
# Derivar tarifa por 1M a partir del coste por-token de litellm.
|
||||||
|
in_1m = (prompt_cost / input_tokens) * 1_000_000 if input_tokens else 0.0
|
||||||
|
out_1m = (completion_cost / output_tokens) * 1_000_000 if output_tokens else 0.0
|
||||||
|
return {
|
||||||
|
"cost_usd": total,
|
||||||
|
"input_cost_1m": round(in_1m, 6),
|
||||||
|
"output_cost_1m": round(out_1m, 6),
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("cost_per_token(%s) falló, uso coste fijo: %s", model_id, e)
|
||||||
|
|
||||||
|
# 3. Coste fijo configurado.
|
||||||
|
return _result(settings.cost_per_1m_input, settings.cost_per_1m_output)
|
||||||
@@ -52,11 +52,16 @@ class OrchestratorEngine:
|
|||||||
self,
|
self,
|
||||||
session: SessionState,
|
session: SessionState,
|
||||||
message: str,
|
message: str,
|
||||||
|
image_attachments: list[dict[str, Any]] | None = None,
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
"""Process a user message. Single agent execution with timeout."""
|
"""Process a user message. Single agent execution with timeout.
|
||||||
|
|
||||||
|
`image_attachments`: bloques image_url (visión nativa) para el turno del
|
||||||
|
usuario, cuando el modelo activo es multimodal.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
return await asyncio.wait_for(
|
return await asyncio.wait_for(
|
||||||
self._run(session, message),
|
self._run(session, message, image_attachments),
|
||||||
timeout=settings.max_execution_timeout_seconds,
|
timeout=settings.max_execution_timeout_seconds,
|
||||||
)
|
)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
@@ -86,6 +91,7 @@ class OrchestratorEngine:
|
|||||||
self,
|
self,
|
||||||
session: SessionState,
|
session: SessionState,
|
||||||
message: str,
|
message: str,
|
||||||
|
image_attachments: list[dict[str, Any]] | None = None,
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
"""Execute: message → agent → response."""
|
"""Execute: message → agent → response."""
|
||||||
|
|
||||||
@@ -113,8 +119,8 @@ class OrchestratorEngine:
|
|||||||
f"Peticion del usuario:\n{message}"
|
f"Peticion del usuario:\n{message}"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create task
|
# Create task (con imágenes adjuntas si las hay — visión nativa)
|
||||||
task = session.begin_task(objective=message)
|
task = session.begin_task(objective=message, image_attachments=image_attachments)
|
||||||
task.status = TaskStatus.EXECUTING
|
task.status = TaskStatus.EXECUTING
|
||||||
|
|
||||||
# Reset del contador de invocaciones de `acai_plan` por turno (Fase 5).
|
# Reset del contador de invocaciones de `acai_plan` por turno (Fase 5).
|
||||||
@@ -154,6 +160,9 @@ class OrchestratorEngine:
|
|||||||
session.recent_messages,
|
session.recent_messages,
|
||||||
message=message,
|
message=message,
|
||||||
conversation=result.get("conversation", []),
|
conversation=result.get("conversation", []),
|
||||||
|
image_attachments=(
|
||||||
|
session.current_task.image_attachments if session.current_task else None
|
||||||
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
session.task_history.append(
|
session.task_history.append(
|
||||||
@@ -182,13 +191,18 @@ class OrchestratorEngine:
|
|||||||
task.status = TaskStatus.COMPLETED
|
task.status = TaskStatus.COMPLETED
|
||||||
session.complete_task()
|
session.complete_task()
|
||||||
|
|
||||||
# Calculate cost
|
# Calculate cost — por modelo realmente usado (Fase 2). El model_id
|
||||||
|
# efectivo vive en el agent_profile (resuelto en send_message).
|
||||||
total_input = usage.get("input_tokens", 0)
|
total_input = usage.get("input_tokens", 0)
|
||||||
total_output = usage.get("output_tokens", 0)
|
total_output = usage.get("output_tokens", 0)
|
||||||
cost_usd = (
|
model_used = (
|
||||||
(total_input / 1_000_000) * settings.cost_per_1m_input
|
self.agent_profile.model_id
|
||||||
+ (total_output / 1_000_000) * settings.cost_per_1m_output
|
or settings.litellm_model
|
||||||
|
or settings.default_model_id
|
||||||
)
|
)
|
||||||
|
from .cost import compute_cost
|
||||||
|
cost_info = await compute_cost(model_used, total_input, total_output)
|
||||||
|
cost_usd = cost_info["cost_usd"]
|
||||||
|
|
||||||
await self.sse.emit(
|
await self.sse.emit(
|
||||||
EventType.EXECUTION_COMPLETED,
|
EventType.EXECUTION_COMPLETED,
|
||||||
@@ -201,6 +215,19 @@ class OrchestratorEngine:
|
|||||||
"status": "completed",
|
"status": "completed",
|
||||||
"usage": usage,
|
"usage": usage,
|
||||||
"total_cost_usd": round(cost_usd, 6),
|
"total_cost_usd": round(cost_usd, 6),
|
||||||
|
# Modelo + tarifas usadas → se propagan a consumo_acaicode via
|
||||||
|
# _report_usage (columnas input_cost_1M / output_cost_1M).
|
||||||
|
"model": model_used,
|
||||||
|
"modelUsage": {
|
||||||
|
model_used: {
|
||||||
|
"inputTokens": total_input,
|
||||||
|
"outputTokens": total_output,
|
||||||
|
"costUSD": round(cost_usd, 6),
|
||||||
|
"inputCost1M": cost_info["input_cost_1m"],
|
||||||
|
"outputCost1M": cost_info["output_cost_1m"],
|
||||||
|
"reasoningEffort": self.agent_profile.reasoning_effort or "",
|
||||||
|
}
|
||||||
|
},
|
||||||
},
|
},
|
||||||
session_id=session.session_id,
|
session_id=session.session_id,
|
||||||
)
|
)
|
||||||
@@ -246,12 +273,20 @@ class OrchestratorEngine:
|
|||||||
existing: list[dict[str, Any]],
|
existing: list[dict[str, Any]],
|
||||||
message: str,
|
message: str,
|
||||||
conversation: list[dict[str, Any]],
|
conversation: list[dict[str, Any]],
|
||||||
|
image_attachments: list[dict[str, Any]] | None = None,
|
||||||
) -> list[dict[str, Any]]:
|
) -> list[dict[str, Any]]:
|
||||||
merged = [OrchestratorEngine._sanitize_recent_message(m) for m in existing]
|
merged = [OrchestratorEngine._sanitize_recent_message(m) for m in existing]
|
||||||
merged = [m for m in merged if m]
|
merged = [m for m in merged if m]
|
||||||
|
|
||||||
current_turn: list[dict[str, Any]] = []
|
current_turn: list[dict[str, Any]] = []
|
||||||
if message.strip():
|
if message.strip() or image_attachments:
|
||||||
|
if image_attachments:
|
||||||
|
# Guardar el turno con la imagen como bloques para que PERSISTA
|
||||||
|
# en el contexto de turnos siguientes (visión nativa multimodal).
|
||||||
|
content_blocks = [{"type": "text", "text": message}]
|
||||||
|
content_blocks.extend(image_attachments)
|
||||||
|
current_turn.append({"role": "user", "content": content_blocks})
|
||||||
|
else:
|
||||||
current_turn.append({"role": "user", "content": message})
|
current_turn.append({"role": "user", "content": message})
|
||||||
|
|
||||||
for message_obj in conversation:
|
for message_obj in conversation:
|
||||||
|
|||||||
113
src/orchestrator/model_resolver.py
Normal file
113
src/orchestrator/model_resolver.py
Normal file
@@ -0,0 +1,113 @@
|
|||||||
|
"""Resolución dinámica del modelo IA por sesión (Fase 2).
|
||||||
|
|
||||||
|
Prioridad:
|
||||||
|
1. Override por-usuario: `session.metadata["ai_provider"|"ai_model"]`. Lo
|
||||||
|
inyecta acai-app via self-read del WS (`getAcaiCodeUserAiModel`) al crear
|
||||||
|
la sesión.
|
||||||
|
2. Default global: Redis `acai:config:ai:provider` / `acai:config:ai:model`,
|
||||||
|
que escribe el Forge Admin Panel. OJO: esas keys NO llevan el prefijo
|
||||||
|
`agentic` (son globales del stack Acai).
|
||||||
|
3. Sin configuración → None: no se toca el modelo y el adapter usa su default
|
||||||
|
(comportamiento previo).
|
||||||
|
|
||||||
|
Solo aplica cuando el provider activo es `litellm` — los providers del catálogo
|
||||||
|
(openrouter, deepseek) se enrutan por LiteLLM. Para claude/openai no se toca.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import redis.asyncio as redis
|
||||||
|
|
||||||
|
from ..config import settings
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Keys del Forge Admin Panel (globales, SIN prefijo agentic).
|
||||||
|
_GLOBAL_PROVIDER_KEY = "acai:config:ai:provider"
|
||||||
|
_GLOBAL_MODEL_KEY = "acai:config:ai:model"
|
||||||
|
_GLOBAL_REASONING_KEY = "acai:config:ai:reasoning_effort"
|
||||||
|
|
||||||
|
# Niveles de razonamiento válidos (lo demás se ignora → sin razonamiento).
|
||||||
|
_VALID_EFFORTS = {"minimal", "low", "medium", "high"}
|
||||||
|
|
||||||
|
# El Forge Admin Panel escribe la config global en Redis db 0 (REDIS_DB=0 del
|
||||||
|
# admin). El agentic usa db 1 para sus sesiones, así que para leer la config
|
||||||
|
# global necesitamos una conexión dedicada a db 0 (misma instancia Redis).
|
||||||
|
_GLOBAL_CONFIG_DB = 0
|
||||||
|
_global_redis: "redis.Redis | None" = None
|
||||||
|
|
||||||
|
|
||||||
|
def _get_global_redis() -> "redis.Redis":
|
||||||
|
global _global_redis
|
||||||
|
if _global_redis is None:
|
||||||
|
_global_redis = redis.Redis(
|
||||||
|
host=settings.redis_host,
|
||||||
|
port=settings.redis_port,
|
||||||
|
db=_GLOBAL_CONFIG_DB,
|
||||||
|
password=settings.redis_password or None,
|
||||||
|
decode_responses=True,
|
||||||
|
)
|
||||||
|
return _global_redis
|
||||||
|
|
||||||
|
|
||||||
|
def to_litellm_model(provider: str | None, model: str | None) -> str:
|
||||||
|
"""Mapea {provider, model} del catálogo a un model string de LiteLLM."""
|
||||||
|
provider = (provider or "").strip().lower()
|
||||||
|
model = (model or "").strip()
|
||||||
|
if not model:
|
||||||
|
return ""
|
||||||
|
if provider == "openrouter":
|
||||||
|
# Los ids de OpenRouter ya vienen como "vendor/name" → prefijo openrouter/.
|
||||||
|
return model if model.startswith("openrouter/") else f"openrouter/{model}"
|
||||||
|
if provider == "deepseek":
|
||||||
|
return model if model.startswith("deepseek/") else f"deepseek/{model}"
|
||||||
|
# Provider desconocido: respetar el id tal cual (puede traer ya su prefijo).
|
||||||
|
return model
|
||||||
|
|
||||||
|
|
||||||
|
def _norm_effort(value) -> str | None:
|
||||||
|
v = (value or "").strip().lower()
|
||||||
|
return v if v in _VALID_EFFORTS else None
|
||||||
|
|
||||||
|
|
||||||
|
async def resolve_session_model(session) -> dict:
|
||||||
|
"""Resuelve modelo + razonamiento efectivos para la sesión.
|
||||||
|
|
||||||
|
Devuelve {"model_id": str|None, "reasoning_effort": str|None}. El effort se
|
||||||
|
toma de la MISMA fuente que el modelo (override de usuario o default global),
|
||||||
|
para que sean coherentes. model_id None = sin override (adapter usa default).
|
||||||
|
"""
|
||||||
|
none = {"model_id": None, "reasoning_effort": None}
|
||||||
|
if settings.default_model_provider != "litellm":
|
||||||
|
return none
|
||||||
|
|
||||||
|
# 1. Override por-usuario (metadata de la sesión).
|
||||||
|
meta = getattr(session, "metadata", None) or {}
|
||||||
|
provider = meta.get("ai_provider")
|
||||||
|
model = meta.get("ai_model")
|
||||||
|
if provider and model:
|
||||||
|
return {
|
||||||
|
"model_id": to_litellm_model(provider, model) or None,
|
||||||
|
"reasoning_effort": _norm_effort(meta.get("ai_reasoning_effort")),
|
||||||
|
}
|
||||||
|
|
||||||
|
# 2. Default global (Redis db 0, keys sin prefijo agentic).
|
||||||
|
try:
|
||||||
|
gr = _get_global_redis()
|
||||||
|
provider = await gr.get(_GLOBAL_PROVIDER_KEY)
|
||||||
|
model = await gr.get(_GLOBAL_MODEL_KEY)
|
||||||
|
effort = await gr.get(_GLOBAL_REASONING_KEY)
|
||||||
|
except Exception as e: # pragma: no cover - defensivo
|
||||||
|
logger.warning("resolve_session_model: lectura Redis falló: %s", e)
|
||||||
|
return none
|
||||||
|
|
||||||
|
if provider and model:
|
||||||
|
return {
|
||||||
|
"model_id": to_litellm_model(provider, model) or None,
|
||||||
|
"reasoning_effort": _norm_effort(effort),
|
||||||
|
}
|
||||||
|
|
||||||
|
# 3. Sin configuración → sin override.
|
||||||
|
return none
|
||||||
@@ -217,6 +217,8 @@ async def run_planner_subloop(
|
|||||||
max_tokens=settings.planner_max_tokens or 16000,
|
max_tokens=settings.planner_max_tokens or 16000,
|
||||||
# Temperatura mas baja que el agente principal — queremos JSON limpio.
|
# Temperatura mas baja que el agente principal — queremos JSON limpio.
|
||||||
temperature=0.1,
|
temperature=0.1,
|
||||||
|
# Mismo nivel de razonamiento resuelto por sesión que el agente principal.
|
||||||
|
reasoning_effort=agent_profile.reasoning_effort or "",
|
||||||
)
|
)
|
||||||
|
|
||||||
tool_defs = _build_planner_tools(mcp)
|
tool_defs = _build_planner_tools(mcp)
|
||||||
|
|||||||
@@ -363,6 +363,8 @@ class ClaudeFormatEmitter:
|
|||||||
"cache_creation_input_tokens": 0,
|
"cache_creation_input_tokens": 0,
|
||||||
},
|
},
|
||||||
"total_cost_usd": data.get("total_cost_usd", 0),
|
"total_cost_usd": data.get("total_cost_usd", 0),
|
||||||
|
# Modelo usado → acai-app lo registra en consumo_acaicode.
|
||||||
|
"modelUsage": data.get("modelUsage", {}),
|
||||||
})
|
})
|
||||||
|
|
||||||
# Done
|
# Done
|
||||||
|
|||||||
Reference in New Issue
Block a user