"""Claude Code CLI compatible SSE format emitter. Translates agenticSystem native events into the exact format that Claude Code CLI produces, so the frontend can consume them without any changes. Used via ?format=claude on the stream endpoint. Wire format: data: {json}\n\n (no event: or id: fields) """ from __future__ import annotations import asyncio import json import logging from typing import Any, AsyncIterator from .sse import EventType, SSEEmitter logger = logging.getLogger(__name__) class ClaudeFormatEmitter: """Emits events in Claude Code CLI SSE format. Maintains per-session state to track block indices and accumulate content for assistant snapshots. """ def __init__(self) -> None: self._queues: dict[str, list[asyncio.Queue[str | None]]] = {} # Per-session state self._block_counter: dict[str, int] = {} self._text_block_open: dict[str, bool] = {} self._text_block_index: dict[str, int] = {} self._tool_block_index: dict[str, dict[str, int]] = {} # session -> {tool_call_id -> index} self._content_blocks: dict[str, list[dict[str, Any]]] = {} self._text_accumulator: dict[str, str] = {} def _next_index(self, session_id: str) -> int: idx = self._block_counter.get(session_id, 0) self._block_counter[session_id] = idx + 1 return idx def _reset_session(self, session_id: str) -> None: self._block_counter[session_id] = 0 self._text_block_open[session_id] = False self._text_block_index[session_id] = -1 self._tool_block_index[session_id] = {} self._content_blocks[session_id] = [] self._text_accumulator[session_id] = "" def _push(self, session_id: str, payload: dict[str, Any]) -> None: """Push a formatted line to all subscribers of a session.""" line = f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" for q in self._queues.get(session_id, []): try: q.put_nowait(line) except asyncio.QueueFull: logger.warning("Claude SSE queue full for session %s", session_id[:8]) def _close_text_block(self, session_id: str) -> None: """Close the current open text block if any.""" if self._text_block_open.get(session_id): idx = self._text_block_index[session_id] self._push(session_id, { "type": "stream_event", "event": {"type": "content_block_stop", "index": idx}, }) # Save accumulated text to content blocks text = self._text_accumulator.get(session_id, "") if text: self._content_blocks.setdefault(session_id, []).append({ "type": "text", "text": text, }) self._text_block_open[session_id] = False self._text_accumulator[session_id] = "" def _open_text_block(self, session_id: str) -> None: """Open a new text block.""" idx = self._next_index(session_id) self._text_block_index[session_id] = idx self._text_block_open[session_id] = True self._text_accumulator[session_id] = "" self._push(session_id, { "type": "stream_event", "event": { "type": "content_block_start", "index": idx, "content_block": {"type": "text", "text": ""}, }, }) def _build_assistant_snapshot(self, session_id: str) -> dict[str, Any]: """Build assistant message snapshot for reconciliation.""" blocks = list(self._content_blocks.get(session_id, [])) return { "type": "assistant", "message": {"content": blocks}, "error": False, } async def emit( self, event_type: EventType, data: dict[str, Any], session_id: str, ) -> None: """Translate a native event into Claude Code CLI format.""" if event_type == EventType.EXECUTION_STARTED: self._reset_session(session_id) self._push(session_id, { "type": "stream_event", "event": {"type": "message_start"}, }) elif event_type == EventType.AGENT_DELTA: delta_text = data.get("delta", "") tool_args = data.get("tool_arguments", "") tool_call_id = data.get("tool_call_id", "") if delta_text: # Text streaming if not self._text_block_open.get(session_id): self._open_text_block(session_id) idx = self._text_block_index[session_id] self._text_accumulator[session_id] = self._text_accumulator.get(session_id, "") + delta_text self._push(session_id, { "type": "stream_event", "event": { "type": "content_block_delta", "index": idx, "delta": {"type": "text_delta", "text": delta_text}, }, }) elif tool_args and tool_call_id: # Tool input JSON streaming tool_indices = self._tool_block_index.get(session_id, {}) idx = tool_indices.get(tool_call_id) if idx is not None: self._push(session_id, { "type": "stream_event", "event": { "type": "content_block_delta", "index": idx, "delta": {"type": "input_json_delta", "partial_json": tool_args}, }, }) elif event_type == EventType.TOOL_STARTED: tool_name = data.get("tool", "unknown") tool_call_id = data.get("tool_call_id", "") # Close open text block self._close_text_block(session_id) # Open tool_use block idx = self._next_index(session_id) self._tool_block_index.setdefault(session_id, {})[tool_call_id] = idx self._push(session_id, { "type": "stream_event", "event": { "type": "content_block_start", "index": idx, "content_block": { "type": "tool_use", "name": tool_name, "id": tool_call_id, }, }, }) elif event_type == EventType.TOOL_COMPLETED: tool_name = data.get("tool", "unknown") tool_call_id = data.get("tool_call_id", "") status = data.get("status", "completed") raw_output = data.get("raw_output", data.get("summary", "")) is_error = status == "failed" # Close tool_use block tool_indices = self._tool_block_index.get(session_id, {}) idx = tool_indices.get(tool_call_id) if idx is not None: self._push(session_id, { "type": "stream_event", "event": {"type": "content_block_stop", "index": idx}, }) # Save tool_use to content blocks for snapshot self._content_blocks.setdefault(session_id, []).append({ "type": "tool_use", "id": tool_call_id, "name": tool_name, "input": {}, }) # Emit tool_result content = data.get("error", raw_output) if is_error else raw_output self._push(session_id, { "type": "tool_result", "tool_use_id": tool_call_id, "content": content[:4000] if isinstance(content, str) else str(content)[:4000], "is_error": is_error, }) # Emit assistant snapshot for reconciliation self._push(session_id, self._build_assistant_snapshot(session_id)) elif event_type == EventType.EXECUTION_COMPLETED: # Close any open text block self._close_text_block(session_id) # Final assistant snapshot self._push(session_id, self._build_assistant_snapshot(session_id)) # Result with usage usage = data.get("usage", {}) self._push(session_id, { "type": "result", "is_error": False, "usage": { "input_tokens": usage.get("input_tokens", 0), "output_tokens": usage.get("output_tokens", 0), "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, }, "total_cost_usd": data.get("total_cost_usd", 0), }) # Done self._push(session_id, {"type": "done"}) elif event_type == EventType.ERROR: error_msg = data.get("message", str(data.get("error", "Unknown error"))) # Close any open block self._close_text_block(session_id) self._push(session_id, { "type": "result", "is_error": True, "result": error_msg, "usage": {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0}, "total_cost_usd": 0, }) self._push(session_id, {"type": "done"}) # Ignore other event types (KEEPALIVE, SESSION_CREATED, SUBAGENT_ASSIGNED) async def subscribe(self, session_id: str) -> AsyncIterator[str]: """Subscribe to Claude-format SSE events for a session.""" queue: asyncio.Queue[str | None] = asyncio.Queue(maxsize=512) if session_id not in self._queues: self._queues[session_id] = [] self._queues[session_id].append(queue) try: while True: try: line = await asyncio.wait_for(queue.get(), timeout=15.0) if line is None: break yield line except asyncio.TimeoutError: yield 'data: {"type":"keepalive"}\n\n' finally: if queue in self._queues.get(session_id, []): self._queues[session_id].remove(queue) def cleanup_session(self, session_id: str) -> None: """Clean up session state and close subscribers.""" for q in self._queues.get(session_id, []): try: q.put_nowait(None) except asyncio.QueueFull: pass self._queues.pop(session_id, None) self._block_counter.pop(session_id, None) self._text_block_open.pop(session_id, None) self._text_block_index.pop(session_id, None) self._tool_block_index.pop(session_id, None) self._content_blocks.pop(session_id, None) self._text_accumulator.pop(session_id, None) class DualEmitter: """Wraps SSEEmitter (native) + ClaudeFormatEmitter. Agents call emit() and both formats are produced. Duck-type compatible with SSEEmitter. """ def __init__(self, native: SSEEmitter, claude: ClaudeFormatEmitter) -> None: self.native = native self.claude = claude async def emit( self, event_type: EventType, data: dict[str, Any], session_id: str, ) -> None: await self.native.emit(event_type, data, session_id) await self.claude.emit(event_type, data, session_id) # Delegate native SSE methods for backward compatibility async def subscribe(self, session_id: str) -> AsyncIterator[str]: async for line in self.native.subscribe(session_id): yield line async def get_history(self, session_id: str) -> list[dict[str, Any]]: return await self.native.get_history(session_id) def cleanup_session(self, session_id: str) -> None: self.native.cleanup_session(session_id) self.claude.cleanup_session(session_id) def set_storage(self, redis_storage: Any) -> None: self.native.set_storage(redis_storage)