Files
agenticSystem/src/streaming/claude_format.py
Jordan Diaz 6a03fdf284 Harden DeepSeek agent: LiteLLM adapter, DSML/reasoning/embeddings/error fixes
- LiteLLMAdapter (subclasses OpenAIAdapter via _acreate hook): routes DeepSeek
  through LiteLLM. Opt-in AGENTIC_DEFAULT_MODEL_PROVIDER=litellm. A/B beat the
  hand-rolled adapter (0 DSML, 0 parse-fails). Defensive chunk.usage getattr,
  token-estimate usage fallback for billing, quiet litellm logs.
- DSML parser: tolerate single/multi fullwidth pipes, honor string="true/false"
  typed args (openai_adapter fallback when DeepSeek leaks tool calls as text).
- Thinking mode: capture and round-trip reasoning_content across turns.
- Embeddings: dedicated AGENTIC_EMBEDDINGS_API_KEY (DeepSeek has no embeddings);
  disable cleanly when unset to avoid per-turn 401.
- claude_format: friendly generic error messages to the chat, raw only in logs.
- acai agent max_tokens 4096->16384 (whole-file writes no longer truncate);
  system.md size-based edit policy; strict tools opt-in (off).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 14:49:48 +00:00

462 lines
18 KiB
Python

"""Claude Code CLI compatible SSE format emitter.
Translates agenticSystem native events into the exact format that
Claude Code CLI produces, so the frontend can consume them without
any changes. Used via ?format=claude on the stream endpoint.
Wire format: data: {json}\n\n (no event: or id: fields)
"""
from __future__ import annotations
import asyncio
import json
import logging
from typing import Any, AsyncIterator
from .sse import EventType, SSEEmitter
logger = logging.getLogger(__name__)
_GENERIC_ERROR = (
"Ha ocurrido un error procesando tu mensaje. Vuelve a intentarlo en unos momentos."
)
# Patrones que el frontend interpreta por sí mismo (login / sesión expirada).
# No los genericamos para no romper esas detecciones.
_PASSTHROUGH_PATTERNS = (
"not logged in",
"login required",
"authentication required",
"no conversation found",
)
def friendly_error_message(raw: str, code: str = "") -> str:
"""Traduce un error crudo (proveedor/excepción) a un mensaje genérico y
localizado para el usuario final, sin filtrar detalles internos.
Devuelve el texto original sin tocar para los casos de auth/sesión que el
frontend ya gestiona por contenido.
"""
raw = raw or ""
text = "{} {}".format(code or "", raw).lower()
# Auth / sesión: dejar pasar el texto original (lo maneja el frontend)
if any(p in text for p in _PASSTHROUGH_PATTERNS):
return raw
# Timeout de ejecución
if "timeout" in text or "timed out" in text:
return (
"La tarea tardó demasiado en completarse. Prueba a dividirla en "
"pasos más pequeños o vuelve a intentarlo."
)
# Saldo insuficiente / facturación del proveedor (402)
if (
"402" in text
or "insufficient balance" in text
or "insufficient_quota" in text
or "billing" in text
):
return (
"El asistente no está disponible en este momento. Inténtalo de "
"nuevo en unos minutos."
)
# Credenciales del proveedor inválidas (401)
if (
"401" in text
or "invalid_api_key" in text
or "incorrect api key" in text
or "invalid api key" in text
):
return (
"El asistente no está disponible temporalmente por un problema de "
"configuración. Estamos trabajando en ello."
)
# Límite de peticiones (429)
if "429" in text or "rate limit" in text or "rate_limit" in text:
return (
"Hay mucha demanda en este momento. Espera unos segundos y vuelve "
"a intentarlo."
)
return _GENERIC_ERROR
class ClaudeFormatEmitter:
"""Emits events in Claude Code CLI SSE format.
Maintains per-session state to track block indices and
accumulate content for assistant snapshots.
"""
def __init__(self) -> None:
self._queues: dict[str, list[asyncio.Queue[str | None]]] = {}
# Per-session state
self._block_counter: dict[str, int] = {}
self._text_block_open: dict[str, bool] = {}
self._text_block_index: dict[str, int] = {}
self._tool_block_index: dict[str, dict[str, int]] = {} # session -> {tool_call_id -> index}
self._content_blocks: dict[str, list[dict[str, Any]]] = {}
self._text_accumulator: dict[str, str] = {}
self._thinking_block_open: dict[str, bool] = {}
self._thinking_block_index: dict[str, int] = {}
def _next_index(self, session_id: str) -> int:
idx = self._block_counter.get(session_id, 0)
self._block_counter[session_id] = idx + 1
return idx
def _reset_session(self, session_id: str) -> None:
self._block_counter[session_id] = 0
self._text_block_open[session_id] = False
self._text_block_index[session_id] = -1
self._tool_block_index[session_id] = {}
self._content_blocks[session_id] = []
self._text_accumulator[session_id] = ""
self._thinking_block_open[session_id] = False
self._thinking_block_index[session_id] = -1
def _push(self, session_id: str, payload: dict[str, Any]) -> None:
"""Push a formatted line to all subscribers of a session."""
line = f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
for q in self._queues.get(session_id, []):
try:
q.put_nowait(line)
except asyncio.QueueFull:
logger.warning("Claude SSE queue full for session %s", session_id[:8])
def _close_text_block(self, session_id: str) -> None:
"""Close the current open text block if any."""
if self._text_block_open.get(session_id):
idx = self._text_block_index[session_id]
self._push(session_id, {
"type": "stream_event",
"event": {"type": "content_block_stop", "index": idx},
})
# Save accumulated text to content blocks
text = self._text_accumulator.get(session_id, "")
if text:
self._content_blocks.setdefault(session_id, []).append({
"type": "text", "text": text,
})
self._text_block_open[session_id] = False
self._text_accumulator[session_id] = ""
def _open_text_block(self, session_id: str) -> None:
"""Open a new text block."""
idx = self._next_index(session_id)
self._text_block_index[session_id] = idx
self._text_block_open[session_id] = True
self._text_accumulator[session_id] = ""
self._push(session_id, {
"type": "stream_event",
"event": {
"type": "content_block_start",
"index": idx,
"content_block": {"type": "text", "text": ""},
},
})
def _build_assistant_snapshot(self, session_id: str) -> dict[str, Any]:
"""Build assistant message snapshot for reconciliation."""
blocks = list(self._content_blocks.get(session_id, []))
return {
"type": "assistant",
"message": {"content": blocks},
"error": False,
}
async def emit(
self,
event_type: EventType,
data: dict[str, Any],
session_id: str,
) -> None:
"""Translate a native event into Claude Code CLI format."""
if event_type == EventType.EXECUTION_STARTED:
self._reset_session(session_id)
self._push(session_id, {
"type": "stream_event",
"event": {"type": "message_start"},
})
elif event_type == EventType.AGENT_DELTA:
delta_text = data.get("delta", "")
tool_args = data.get("tool_arguments", "")
tool_call_id = data.get("tool_call_id", "")
thinking_delta = data.get("thinking_delta", "")
if thinking_delta:
# Cerrar text block abierto si lo hay
self._close_text_block(session_id)
# Abrir thinking block si no esta abierto
if not self._thinking_block_open.get(session_id):
idx = self._next_index(session_id)
self._thinking_block_index[session_id] = idx
self._thinking_block_open[session_id] = True
self._push(session_id, {
"type": "stream_event",
"event": {
"type": "content_block_start",
"index": idx,
"content_block": {"type": "thinking", "thinking": ""},
},
})
idx = self._thinking_block_index[session_id]
self._push(session_id, {
"type": "stream_event",
"event": {
"type": "content_block_delta",
"index": idx,
"delta": {"type": "thinking_delta", "thinking": thinking_delta},
},
})
return
if delta_text:
# Cerrar thinking block abierto si lo hay antes de texto normal
if self._thinking_block_open.get(session_id):
idx = self._thinking_block_index[session_id]
self._push(session_id, {
"type": "stream_event",
"event": {"type": "content_block_stop", "index": idx},
})
self._thinking_block_open[session_id] = False
# Text streaming
if not self._text_block_open.get(session_id):
self._open_text_block(session_id)
idx = self._text_block_index[session_id]
self._text_accumulator[session_id] = self._text_accumulator.get(session_id, "") + delta_text
self._push(session_id, {
"type": "stream_event",
"event": {
"type": "content_block_delta",
"index": idx,
"delta": {"type": "text_delta", "text": delta_text},
},
})
elif tool_args and tool_call_id:
# Tool input JSON streaming
tool_indices = self._tool_block_index.get(session_id, {})
idx = tool_indices.get(tool_call_id)
if idx is not None:
self._push(session_id, {
"type": "stream_event",
"event": {
"type": "content_block_delta",
"index": idx,
"delta": {"type": "input_json_delta", "partial_json": tool_args},
},
})
elif event_type == EventType.TOOL_STARTED:
tool_name = data.get("tool", "unknown")
tool_call_id = data.get("tool_call_id", "")
# Cerrar thinking block abierto si lo hay
if self._thinking_block_open.get(session_id):
idx = self._thinking_block_index[session_id]
self._push(session_id, {
"type": "stream_event",
"event": {"type": "content_block_stop", "index": idx},
})
self._thinking_block_open[session_id] = False
# Close open text block
self._close_text_block(session_id)
# Open tool_use block
idx = self._next_index(session_id)
self._tool_block_index.setdefault(session_id, {})[tool_call_id] = idx
self._push(session_id, {
"type": "stream_event",
"event": {
"type": "content_block_start",
"index": idx,
"content_block": {
"type": "tool_use",
"name": tool_name,
"id": tool_call_id,
},
},
})
elif event_type == EventType.TOOL_COMPLETED:
tool_name = data.get("tool", "unknown")
tool_call_id = data.get("tool_call_id", "")
status = data.get("status", "completed")
raw_output = data.get("raw_output", data.get("summary", ""))
is_error = status == "failed"
# Close tool_use block
tool_indices = self._tool_block_index.get(session_id, {})
idx = tool_indices.get(tool_call_id)
if idx is not None:
self._push(session_id, {
"type": "stream_event",
"event": {"type": "content_block_stop", "index": idx},
})
# Save tool_use to content blocks for snapshot
self._content_blocks.setdefault(session_id, []).append({
"type": "tool_use",
"id": tool_call_id,
"name": tool_name,
"input": {},
})
# Emit tool_result
content = data.get("error", raw_output) if is_error else raw_output
self._push(session_id, {
"type": "tool_result",
"tool_use_id": tool_call_id,
"content": content[:4000] if isinstance(content, str) else str(content)[:4000],
"is_error": is_error,
})
# Emit assistant snapshot for reconciliation
self._push(session_id, self._build_assistant_snapshot(session_id))
elif event_type == EventType.PLAN_CREATED:
# Fase 5.5: PlanStepper UI. Reenviamos los datos del plan al
# frontend como evento custom "plan.created".
self._push(session_id, {
"type": "plan.created",
"plan": data,
})
elif event_type == EventType.PLAN_ADVANCED:
self._push(session_id, {
"type": "plan.advanced",
"cursor": data.get("cursor", 0),
"completed_step_ids": data.get("completed_step_ids", []),
"status": data.get("status", "active"),
})
elif event_type == EventType.PLAN_ENDED:
self._push(session_id, {
"type": "plan.ended",
"status": data.get("status", "done"),
"objective": data.get("objective", ""),
})
elif event_type == EventType.EXECUTION_COMPLETED:
# Close any open text block
self._close_text_block(session_id)
# Final assistant snapshot
self._push(session_id, self._build_assistant_snapshot(session_id))
# Result with usage
usage = data.get("usage", {})
self._push(session_id, {
"type": "result",
"is_error": False,
"usage": {
"input_tokens": usage.get("input_tokens", 0),
"output_tokens": usage.get("output_tokens", 0),
"cache_read_input_tokens": 0,
"cache_creation_input_tokens": 0,
},
"total_cost_usd": data.get("total_cost_usd", 0),
})
# Done
self._push(session_id, {"type": "done"})
elif event_type == EventType.ERROR:
raw_msg = data.get("message", str(data.get("error", "Unknown error")))
user_msg = friendly_error_message(raw_msg, str(data.get("error", "")))
# El error real (detalles del proveedor) solo va al log, nunca al cliente.
logger.warning("Session %s error (raw): %s", session_id, raw_msg)
# Close any open block
self._close_text_block(session_id)
self._push(session_id, {
"type": "result",
"is_error": True,
"result": user_msg,
"usage": {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0},
"total_cost_usd": 0,
})
self._push(session_id, {"type": "done"})
# Ignore other event types (KEEPALIVE, SESSION_CREATED, SUBAGENT_ASSIGNED)
async def subscribe(self, session_id: str) -> AsyncIterator[str]:
"""Subscribe to Claude-format SSE events for a session."""
queue: asyncio.Queue[str | None] = asyncio.Queue(maxsize=512)
if session_id not in self._queues:
self._queues[session_id] = []
self._queues[session_id].append(queue)
try:
while True:
try:
line = await asyncio.wait_for(queue.get(), timeout=15.0)
if line is None:
break
yield line
except asyncio.TimeoutError:
yield 'data: {"type":"keepalive"}\n\n'
finally:
if queue in self._queues.get(session_id, []):
self._queues[session_id].remove(queue)
def cleanup_session(self, session_id: str) -> None:
"""Clean up session state and close subscribers."""
for q in self._queues.get(session_id, []):
try:
q.put_nowait(None)
except asyncio.QueueFull:
pass
self._queues.pop(session_id, None)
self._block_counter.pop(session_id, None)
self._text_block_open.pop(session_id, None)
self._text_block_index.pop(session_id, None)
self._tool_block_index.pop(session_id, None)
self._content_blocks.pop(session_id, None)
self._text_accumulator.pop(session_id, None)
class DualEmitter:
"""Wraps SSEEmitter (native) + ClaudeFormatEmitter.
Agents call emit() and both formats are produced.
Duck-type compatible with SSEEmitter.
"""
def __init__(self, native: SSEEmitter, claude: ClaudeFormatEmitter) -> None:
self.native = native
self.claude = claude
async def emit(
self,
event_type: EventType,
data: dict[str, Any],
session_id: str,
) -> None:
await self.native.emit(event_type, data, session_id)
await self.claude.emit(event_type, data, session_id)
# Delegate native SSE methods for backward compatibility
async def subscribe(self, session_id: str) -> AsyncIterator[str]:
async for line in self.native.subscribe(session_id):
yield line
async def get_history(self, session_id: str) -> list[dict[str, Any]]:
return await self.native.get_history(session_id)
def cleanup_session(self, session_id: str) -> None:
self.native.cleanup_session(session_id)
self.claude.cleanup_session(session_id)
def set_storage(self, redis_storage: Any) -> None:
self.native.set_storage(redis_storage)