"""Cálculo de coste por modelo (Fase 2). Prioridad de fuentes de precio (para que el coste registrado en `consumo_acaicode` coincida con lo que muestra el Forge Admin Panel): 1. Catálogo OpenRouter cacheado por el panel en Redis db 0 (`acai:config:ai:models_cache:openrouter` → price_in_1m / price_out_1m). 2. Price map de LiteLLM (conoce muchos modelos deepseek/, anthropic/, etc.). 3. Coste fijo de `settings` (comportamiento previo). """ from __future__ import annotations import asyncio import json import logging import time import urllib.request import redis.asyncio as redis from ..config import settings logger = logging.getLogger(__name__) # Caches de catálogo que publica el Forge Admin Panel en Redis db 0, por proveedor. # El id se guarda SIN el prefijo de proveedor de litellm (p.ej. # "moonshotai/kimi-k2.7-code", "deepseek-v4-pro"). _CACHE_KEYS = { "openrouter": "acai:config:ai:models_cache:openrouter", "deepseek": "acai:config:ai:models_cache:deepseek", } _CONFIG_DB = 0 _cfg_redis: "redis.Redis | None" = None def _get_cfg_redis() -> "redis.Redis": global _cfg_redis if _cfg_redis is None: _cfg_redis = redis.Redis( host=settings.redis_host, port=settings.redis_port, db=_CONFIG_DB, password=settings.redis_password or None, decode_responses=True, ) return _cfg_redis # --- Catálogo con self-heal ------------------------------------------------- # El catálogo OpenRouter lo publica el Forge Admin Panel con TTL de 1h y solo se # repuebla al abrir su ventana de IA. En runtime (coste y ventana de contexto) # eso es frágil: si caduca, perdemos precio Y context_length del modelo activo. # Aquí lo repoblamos nosotros (fetch público a OpenRouter, mismo shape que el # admin) cuando falta, con un cooldown para no martillear la API. DeepSeek es # persistente (lo escribe el admin en el arranque) y no necesita self-heal. _OPENROUTER_URL = "https://openrouter.ai/api/v1/models" _OPENROUTER_TIMEOUT = 15 _OR_SELFHEAL_TTL = 86_400 # 24h: persiste bastante; el admin lo refresca aparte _OR_REFRESH_COOLDOWN = 300 # como mucho un fetch / 5 min _or_last_refresh = [0.0] def _fetch_openrouter_catalog_sync() -> list[dict]: """GET público al catálogo OpenRouter, normalizado al MISMO shape que el admin panel (id, context_length, price_*, supports_reasoning, supports_images). Filtra a modelos con soporte `tools` (igual que el admin).""" req = urllib.request.Request(_OPENROUTER_URL, method="GET") req.add_header("Accept", "application/json") with urllib.request.urlopen(req, timeout=_OPENROUTER_TIMEOUT) as resp: payload = json.loads(resp.read().decode("utf-8")) items = payload.get("data") if isinstance(payload, dict) else None if not isinstance(items, list): return [] out: list[dict] = [] for it in items: if not isinstance(it, dict) or not it.get("id"): continue supported = it.get("supported_parameters") or [] if not isinstance(supported, list) or "tools" not in supported: continue pricing = it.get("pricing") or {} try: pin = float(pricing.get("prompt", 0) or 0) * 1_000_000 pout = float(pricing.get("completion", 0) or 0) * 1_000_000 except (TypeError, ValueError): pin = pout = 0.0 try: ctx = int(it.get("context_length") or 0) except (TypeError, ValueError): ctx = 0 mods = (it.get("architecture") or {}).get("input_modalities") or [] out.append({ "id": it.get("id"), "name": it.get("name") or it.get("id"), "context_length": ctx, "price_in_1m": pin, "price_out_1m": pout, "supports_reasoning": "reasoning" in supported or "include_reasoning" in supported, "supports_images": isinstance(mods, list) and "image" in mods, }) return out async def _get_catalog(provider: str | None) -> list[dict] | None: """Catálogo del proveedor desde Redis. Para OpenRouter, si falta (TTL caducado) lo repuebla en runtime (self-heal con cooldown).""" cache_key = _CACHE_KEYS.get(provider or "") if not cache_key: return None try: cached = await _get_cfg_redis().get(cache_key) if cached: data = json.loads(cached) if isinstance(data, list): return data except Exception as e: # pragma: no cover - defensivo logger.warning("catálogo %s no disponible: %s", provider, e) if provider != "openrouter": return None # Self-heal solo para OpenRouter, con cooldown para no martillear la API. now = time.time() if now - _or_last_refresh[0] < _OR_REFRESH_COOLDOWN: return None _or_last_refresh[0] = now try: models = await asyncio.to_thread(_fetch_openrouter_catalog_sync) except Exception as e: logger.warning("self-heal catálogo openrouter falló: %s", e) return None if models: try: await _get_cfg_redis().set(cache_key, json.dumps(models), ex=_OR_SELFHEAL_TTL) logger.info("catálogo openrouter repoblado en runtime: %d modelos", len(models)) except Exception: pass return models return None async def _catalog_price_per_1m(model_id: str | None): """(price_in_1m, price_out_1m) del catálogo, o None. model_id en formato litellm ("/").""" if not model_id or "/" not in model_id: return None provider, _, raw_id = model_id.partition("/") models = await _get_catalog(provider) if not models: return None for m in models: if m.get("id") == raw_id: pin = m.get("price_in_1m") pout = m.get("price_out_1m") if pin is not None and pout is not None: return (float(pin), float(pout)) return None # --- Ventana de contexto por modelo ----------------------------------------- # Cache en proceso con TTL corto: build_context resuelve la ventana en cada step # del loop, y el catálogo cambia rara vez. Evita pegar a Redis 25x/turno. _window_cache: dict[str, tuple[float, int | None]] = {} _WINDOW_TTL = 60.0 async def resolve_context_window(model_id: str | None) -> int | None: """Ventana de contexto (tokens) del modelo activo. Fuentes en orden: catálogo del Forge Admin Panel en Redis (`context_length`) → price/info map de LiteLLM (`max_input_tokens`/`max_tokens`) → None. `model_id` viene en formato litellm ("/"). """ if not model_id or "/" not in model_id: return None now = time.time() cached = _window_cache.get(model_id) if cached and (now - cached[0]) < _WINDOW_TTL: return cached[1] window: int | None = None # 1. Catálogo del panel (con self-heal para OpenRouter si caducó). provider, _, raw_id = model_id.partition("/") models = await _get_catalog(provider) if models: for m in models: if m.get("id") == raw_id: cl = m.get("context_length") if isinstance(cl, int) and cl > 0: window = cl break # 2. Fallback: LiteLLM conoce muchos modelos (deepseek/, anthropic/, ...). if window is None: try: import litellm info = litellm.get_model_info(model_id) or {} for key in ("max_input_tokens", "max_tokens"): v = info.get(key) if isinstance(v, int) and v > 0: window = v break except Exception: pass _window_cache[model_id] = (now, window) return window async def compute_cost(model_id: str | None, input_tokens: int, output_tokens: int) -> dict: """Coste de una ejecución para `model_id` y los tokens dados. Devuelve {"cost_usd", "input_cost_1m", "output_cost_1m"} — el coste total y las tarifas por 1M tokens REALMENTE aplicadas (se almacenan en `consumo_acaicode.input_cost_1M` / `output_cost_1M`). """ input_tokens = int(input_tokens or 0) output_tokens = int(output_tokens or 0) def _result(in_1m: float, out_1m: float) -> dict: return { "cost_usd": (input_tokens / 1_000_000) * in_1m + (output_tokens / 1_000_000) * out_1m, "input_cost_1m": round(in_1m, 6), "output_cost_1m": round(out_1m, 6), } # 1. Precio del catálogo OpenRouter (fuente que muestra el admin). prices = await _catalog_price_per_1m(model_id) if prices: return _result(prices[0], prices[1]) # 2. Price map de LiteLLM (deepseek/, anthropic/, etc.). if model_id and "/" in model_id: try: import litellm prompt_cost, completion_cost = litellm.cost_per_token( model=model_id, prompt_tokens=input_tokens, completion_tokens=output_tokens, ) total = (prompt_cost or 0.0) + (completion_cost or 0.0) if total > 0: # Derivar tarifa por 1M a partir del coste por-token de litellm. in_1m = (prompt_cost / input_tokens) * 1_000_000 if input_tokens else 0.0 out_1m = (completion_cost / output_tokens) * 1_000_000 if output_tokens else 0.0 return { "cost_usd": total, "input_cost_1m": round(in_1m, 6), "output_cost_1m": round(out_1m, 6), } except Exception as e: logger.warning("cost_per_token(%s) falló, uso coste fijo: %s", model_id, e) # 3. Coste fijo configurado. return _result(settings.cost_per_1m_input, settings.cost_per_1m_output)