From 469ff65052b8b11c2a3243dd4dc4e577d80425f2 Mon Sep 17 00:00:00 2001 From: Jordan Diaz Date: Tue, 14 Apr 2026 07:12:50 +0000 Subject: [PATCH] =?UTF-8?q?A=C3=B1adir=20completion=20+=20ajustes=20del=20?= =?UTF-8?q?chat?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/adapters/claude_adapter.py | 179 ++++++++++++++++++++++----------- src/adapters/openai_adapter.py | 8 ++ src/api/routes.py | 98 ++++++++++++++++++ 3 files changed, 229 insertions(+), 56 deletions(-) diff --git a/src/adapters/claude_adapter.py b/src/adapters/claude_adapter.py index 4f0734d..c8842e6 100644 --- a/src/adapters/claude_adapter.py +++ b/src/adapters/claude_adapter.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio import json import logging from typing import Any, AsyncIterator @@ -14,6 +15,27 @@ from .base import ModelAdapter, ModelConfig, ModelResponse, StreamChunk logger = logging.getLogger(__name__) +# Errores transitorios del proxy del modelo (MiniMax/Anthropic). Reintentamos +# con backoff exponencial: 1s, 3s, 9s. 529 es overloaded_error de Anthropic; +# 429 rate-limit; 503 service unavailable. +_TRANSIENT_STATUSES = {429, 503, 529} +_RETRY_DELAYS = (1.0, 3.0, 9.0) + + +def _is_transient(exc: Exception) -> bool: + """True si el error es seguro de reintentar (sobrecarga / red transitoria).""" + if isinstance(exc, (anthropic.APIConnectionError, anthropic.APITimeoutError)): + return True + if isinstance(exc, anthropic.APIStatusError): + status = getattr(exc, "status_code", None) + if status in _TRANSIENT_STATUSES: + return True + msg = str(exc).lower() + if "overloaded" in msg or "high load" in msg: + return True + return False + + class ClaudeAdapter(ModelAdapter): """Adapter for the Anthropic Claude API.""" @@ -63,65 +85,87 @@ class ClaudeAdapter(ModelAdapter): if tools: kwargs["tools"] = self._format_tools(tools) - async with self._client.messages.stream(**kwargs) as stream: - current_tool_id = "" - current_tool_name = "" - accumulated_args = "" - input_tokens = 0 - - async for event in stream: - if event.type == "message_start" and hasattr(event, "message"): - usage = getattr(event.message, "usage", None) - if usage: - input_tokens = getattr(usage, "input_tokens", 0) - - if event.type == "content_block_start": - block = event.content_block - if block.type == "tool_use": - current_tool_id = block.id - current_tool_name = block.name - accumulated_args = "" - yield StreamChunk( - tool_call_id=current_tool_id, - tool_name=current_tool_name, - ) - continue - - if event.type == "content_block_delta": - delta = event.delta - if delta.type == "text_delta": - yield StreamChunk(delta=delta.text) - elif delta.type == "input_json_delta": - accumulated_args += delta.partial_json - yield StreamChunk( - tool_call_id=current_tool_id, - tool_name=current_tool_name, - tool_arguments=delta.partial_json, - ) - continue - - if event.type == "content_block_stop": - if current_tool_id and accumulated_args: - yield StreamChunk( - tool_call_id=current_tool_id, - tool_name=current_tool_name, - tool_arguments=accumulated_args, - finish_reason="tool_use", - ) + # Retry con backoff sobre errores transitorios al ABRIR el stream. + # Si ya hemos empezado a emitir chunks al consumidor, NO podemos + # reintentar (el orquestador ya recibió contenido parcial). + attempt = 0 + max_attempts = len(_RETRY_DELAYS) + 1 + while True: + yielded_any = False + try: + async with self._client.messages.stream(**kwargs) as stream: current_tool_id = "" current_tool_name = "" accumulated_args = "" - continue + input_tokens = 0 - if event.type == "message_delta": - output_tokens = getattr(event.usage, "output_tokens", 0) if event.usage else 0 - yield StreamChunk( - finish_reason=event.delta.stop_reason or "", - usage={ - "input_tokens": input_tokens, - "output_tokens": output_tokens, - }, - ) + async for event in stream: + yielded_any = True + if event.type == "message_start" and hasattr(event, "message"): + usage = getattr(event.message, "usage", None) + if usage: + input_tokens = getattr(usage, "input_tokens", 0) + + if event.type == "content_block_start": + block = event.content_block + if block.type == "tool_use": + current_tool_id = block.id + current_tool_name = block.name + accumulated_args = "" + yield StreamChunk( + tool_call_id=current_tool_id, + tool_name=current_tool_name, + ) + continue + + if event.type == "content_block_delta": + delta = event.delta + if delta.type == "text_delta": + yield StreamChunk(delta=delta.text) + elif delta.type == "input_json_delta": + accumulated_args += delta.partial_json + yield StreamChunk( + tool_call_id=current_tool_id, + tool_name=current_tool_name, + tool_arguments=delta.partial_json, + ) + continue + + if event.type == "content_block_stop": + if current_tool_id and accumulated_args: + yield StreamChunk( + tool_call_id=current_tool_id, + tool_name=current_tool_name, + tool_arguments=accumulated_args, + finish_reason="tool_use", + ) + current_tool_id = "" + current_tool_name = "" + accumulated_args = "" + continue + + if event.type == "message_delta": + output_tokens = getattr(event.usage, "output_tokens", 0) if event.usage else 0 + yield StreamChunk( + finish_reason=event.delta.stop_reason or "", + usage={ + "input_tokens": input_tokens, + "output_tokens": output_tokens, + }, + ) + return # consumo OK, salimos del retry loop + except Exception as e: + # Si ya emitimos algo al consumidor, no podemos reintentar + # de forma segura: el contenido parcial ya viajó. + if yielded_any or not _is_transient(e) or attempt >= max_attempts - 1: + raise + wait = _RETRY_DELAYS[attempt] + logger.warning( + "Claude stream() transient error (attempt %d/%d), retrying in %.1fs: %s", + attempt + 1, max_attempts, wait, str(e)[:200], + ) + await asyncio.sleep(wait) + attempt += 1 # ------------------------------------------------------------------ # Non-streaming @@ -158,8 +202,31 @@ class ClaudeAdapter(ModelAdapter): kwargs["system"] = system_content if tools: kwargs["tools"] = self._format_tools(tools) + # Fuerza al modelo a usar un tool concreto para garantizar JSON por schema + # (usado por /completions con json_schema). Ver OpenAIAdapter para la variante. + force_tool = (config.extra or {}).get("force_tool") + if force_tool: + kwargs["tool_choice"] = {"type": "tool", "name": force_tool} - response = await self._client.messages.create(**kwargs) + # Retry con backoff sobre errores transitorios (429/503/529). El proxy + # MiniMax devuelve 529 overloaded_error con cierta frecuencia bajo carga. + last_exc: Exception | None = None + for attempt in range(len(_RETRY_DELAYS) + 1): + try: + response = await self._client.messages.create(**kwargs) + break + except Exception as e: + if not _is_transient(e) or attempt == len(_RETRY_DELAYS): + raise + wait = _RETRY_DELAYS[attempt] + logger.warning( + "Claude complete() transient error (attempt %d/%d), retrying in %.1fs: %s", + attempt + 1, len(_RETRY_DELAYS) + 1, wait, str(e)[:200], + ) + last_exc = e + await asyncio.sleep(wait) + else: + raise last_exc or RuntimeError("Claude complete() retry exhausted") content = "" tool_calls: list[dict[str, Any]] = [] diff --git a/src/adapters/openai_adapter.py b/src/adapters/openai_adapter.py index 526641b..8726a3d 100644 --- a/src/adapters/openai_adapter.py +++ b/src/adapters/openai_adapter.py @@ -152,6 +152,14 @@ class OpenAIAdapter(ModelAdapter): } if tools: kwargs["tools"] = self._format_tools(tools) + # Fuerza al modelo a usar un tool concreto para garantizar JSON por schema + # (usado por /completions con json_schema). Ver ClaudeAdapter para la variante. + force_tool = (config.extra or {}).get("force_tool") + if force_tool: + kwargs["tool_choice"] = { + "type": "function", + "function": {"name": force_tool}, + } response = await self._client.chat.completions.create(**kwargs) choice = response.choices[0] diff --git a/src/api/routes.py b/src/api/routes.py index 3ca269c..dd54370 100644 --- a/src/api/routes.py +++ b/src/api/routes.py @@ -48,6 +48,28 @@ class SendMessageRequest(BaseModel): agent_id: str | None = None +class CompletionRequest(BaseModel): + """One-shot structured completion sin sesión, sin MCP, sin agente. + + Para callers que necesitan una respuesta directa del LLM (opcionalmente + conforme a un JSON schema). Ejemplo: `content_export.py` cuando extrae + brand info o genera chunks de texto para webs locales. + """ + message: str + system: str | None = None + model_id: str | None = None + max_tokens: int = 4096 + temperature: float = 0.3 + json_schema: dict[str, Any] | None = None + + +class CompletionResponse(BaseModel): + content: str = "" + parsed: dict[str, Any] | None = None + usage: dict[str, int] = Field(default_factory=dict) + model_id: str = "" + + class SessionResponse(BaseModel): session_id: str status: str @@ -157,6 +179,70 @@ async def create_session(body: CreateSessionRequest) -> CreateSessionResponse: ) +# ------------------------------------------------------------------ +# POST /completions — one-shot structured completion +# ------------------------------------------------------------------ + +@router.post("/completions", response_model=CompletionResponse) +async def completion(body: CompletionRequest) -> CompletionResponse: + """Llamada directa al LLM sin sesión, sin MCP, sin agente. + + Si se proporciona `json_schema`, el modelo es forzado a rellenar un tool + con ese schema (tool_use forzado para Claude/Anthropic-compatible, function + calling forzado para OpenAI). El resultado se devuelve parseado en `parsed` + además del JSON stringified en `content`. + """ + from ..adapters.base import ModelConfig + + adapter = _deps.get("model_adapter") + if adapter is None: + raise HTTPException(status_code=503, detail="Model adapter not initialized") + + messages: list[dict[str, Any]] = [] + if body.system: + messages.append({"role": "system", "content": body.system}) + messages.append({"role": "user", "content": body.message}) + + config = ModelConfig( + model_id=body.model_id or settings.default_model_id, + max_tokens=body.max_tokens, + temperature=body.temperature, + ) + + tools_param: list[dict[str, Any]] | None = None + if body.json_schema: + tools_param = [ + { + "name": "emit_response", + "description": "Emite la respuesta estructurada conforme al schema.", + "input_schema": body.json_schema, + } + ] + config.extra = {"force_tool": "emit_response"} + + try: + response = await adapter.complete(messages, tools=tools_param, config=config) + except Exception as e: + logger.exception("completion failed: %s", e) + raise HTTPException(status_code=502, detail="Model call failed: {}".format(e)) + + parsed: dict[str, Any] | None = None + content_out = response.content or "" + if body.json_schema and response.tool_calls: + tool_args = response.tool_calls[0].get("arguments") + if isinstance(tool_args, dict): + parsed = tool_args + import json as _json + content_out = _json.dumps(parsed, ensure_ascii=False) + + return CompletionResponse( + content=content_out, + parsed=parsed, + usage=response.usage or {}, + model_id=config.model_id, + ) + + # ------------------------------------------------------------------ # POST /sessions/{id}/messages # ------------------------------------------------------------------ @@ -170,6 +256,18 @@ async def send_message( if not session: raise HTTPException(status_code=404, detail="Session not found") + # Resetear sesión en estado ERROR al recibir un mensaje nuevo. Sin esto, + # un fallo transitorio (p.ej. 529 overloaded del proxy del modelo) deja + # la sesión bloqueada para siempre y los siguientes mensajes del usuario + # no se procesan. Limpiamos el current_task fallido también. + if session.status == SessionStatus.ERROR: + logger.info( + "Session %s was in ERROR state, resetting to ACTIVE for new message", + session_id, + ) + session.status = SessionStatus.ACTIVE + session.current_task = None + # Get or create session's MCP manager registry = _get_mcp_registry() mcp_manager = registry.get_for_session(session_id)