"""Base subagent class with shared execution logic.""" from __future__ import annotations import hashlib import json import logging import time import uuid from typing import Any, AsyncIterator from ...adapters.base import ModelAdapter, ModelConfig, StreamChunk from ...context.engine import ContextEngine from ...mcp.manager import MCPManager from ...memory.store import MemoryStore from ...models.agent import AgentProfile from ...models.artifacts import ArtifactSummary from ...models.session import SessionState from ...models.tools import ToolExecution, ToolExecutionStatus from ...streaming.sse import SSEEmitter, EventType logger = logging.getLogger(__name__) class BaseAgent: """Base class for all subagents.""" def __init__( self, profile: AgentProfile, model_adapter: ModelAdapter, context_engine: ContextEngine, mcp_client: MCPManager, memory_store: MemoryStore, sse_emitter: SSEEmitter, ) -> None: self.profile = profile self.model = model_adapter self.context = context_engine self.mcp = mcp_client self.memory = memory_store self.sse = sse_emitter async def execute( self, session: SessionState, max_steps: int = 10, ) -> dict[str, Any]: """Run the agent's execution loop. Uses real conversation messages with complete tool results, like professional agentic tools (Claude Code, Cursor). Compaction happens at the step level, not per tool result. Returns a result dict with keys: content, artifacts, tool_executions. """ artifacts: list[ArtifactSummary] = await self.memory.list_artifacts( session.session_id ) tool_executions: list[ToolExecution] = [] accumulated_content = "" total_input_tokens = 0 total_output_tokens = 0 # Real conversation history: assistant messages + tool results conversation: list[dict[str, Any]] = [] tool_fingerprints: dict[str, ToolExecution] = {} all_duplicates_streak = 0 # consecutive steps where ALL calls are duplicates for step in range(max_steps): # Build context with real conversation ctx = await self.context.build_context( session=session, agent=self.profile, artifacts=artifacts, conversation=conversation, ) # Prepare tool definitions tool_defs = self._get_allowed_tools() # Stream model response config = ModelConfig( model_id=self.profile.model_id or "", max_tokens=self.profile.max_tokens or 4096, temperature=self.profile.temperature or 0.3, ) full_text = "" tool_calls: list[dict[str, Any]] = [] active_tools: dict[str, dict[str, Any]] = {} async for chunk in self.model.stream( messages=ctx.to_messages(), tools=tool_defs if tool_defs else None, config=config, ): if chunk.delta: full_text += chunk.delta await self.sse.emit( EventType.AGENT_DELTA, { "agent": self.profile.role, "delta": chunk.delta, "step": step, }, session_id=session.session_id, ) if chunk.tool_name and chunk.tool_call_id: if chunk.tool_call_id not in active_tools: active_tools[chunk.tool_call_id] = { "id": chunk.tool_call_id, "name": chunk.tool_name, "arguments": "", } await self.sse.emit( EventType.TOOL_STARTED, {"tool": chunk.tool_name, "step": step}, session_id=session.session_id, ) if chunk.tool_arguments and chunk.tool_call_id and not chunk.finish_reason: tool = active_tools.get(chunk.tool_call_id) if tool: tool["arguments"] += chunk.tool_arguments if chunk.finish_reason == "tool_use" and chunk.tool_call_id: tool = active_tools.pop(chunk.tool_call_id, None) if not tool: tool = { "id": chunk.tool_call_id, "name": chunk.tool_name or "", "arguments": "", } final_args = tool["arguments"] or chunk.tool_arguments or "" try: args = json.loads(final_args) if final_args else {} except json.JSONDecodeError: logger.warning("Failed to parse tool args: %s", final_args[:200]) args = {} tool["parsed_arguments"] = args tool_calls.append(tool) # Accumulate token usage from any chunk that has it if chunk.usage: total_input_tokens += chunk.usage.get("input_tokens", 0) total_output_tokens += chunk.usage.get("output_tokens", 0) if chunk.finish_reason == "end_turn": break accumulated_content += full_text # If no tool calls, we're done if not tool_calls: # Add final assistant message to conversation if full_text: conversation.append({"role": "assistant", "content": full_text}) break # Add assistant message with tool calls to conversation # (OpenAI format: assistant message carries tool_calls) assistant_msg: dict[str, Any] = {"role": "assistant"} if full_text: assistant_msg["content"] = full_text assistant_msg["tool_calls"] = [ { "id": tc["id"], "type": "function", "function": { "name": tc["name"], "arguments": json.dumps(tc.get("parsed_arguments", {})), }, } for tc in tool_calls ] conversation.append(assistant_msg) # Execute tool calls and add COMPLETE results to conversation duplicates_this_step = 0 for tc in tool_calls: fp_raw = f"{tc['name']}:{json.dumps(tc.get('parsed_arguments', {}), sort_keys=True)}" fp = hashlib.md5(fp_raw.encode()).hexdigest() if fp in tool_fingerprints: prev_exec = tool_fingerprints[fp] tool_executions.append(prev_exec) duplicates_this_step += 1 # Return cached result as tool message conversation.append({ "role": "tool", "tool_call_id": tc["id"], "content": f"[DUPLICADO] Ya ejecutada con mismos argumentos. Resultado: {prev_exec.raw_output[:2000]}", }) logger.warning("Duplicate tool call skipped: %s (fingerprint: %s)", tc["name"], fp[:8]) continue tool_exec = await self._execute_tool( session=session, tool_name=tc["name"], arguments=tc.get("parsed_arguments", {}), artifacts=artifacts, ) tool_fingerprints[fp] = tool_exec tool_executions.append(tool_exec) # COMPLETE result in conversation (truncated to safe limit) conversation.append({ "role": "tool", "tool_call_id": tc["id"], "content": tool_exec.raw_output[:8000] if tool_exec.raw_output else tool_exec.result_summary, }) # Loop detection: if ALL tool calls in this step were duplicates if duplicates_this_step == len(tool_calls): all_duplicates_streak += 1 if all_duplicates_streak >= 2: logger.warning("Loop detected: %d consecutive steps with all duplicate calls. Breaking.", all_duplicates_streak) conversation.append({ "role": "user", "content": "[SISTEMA] Se detectaron llamadas repetidas. Ya tienes toda la información necesaria. Genera tu respuesta final ahora.", }) # One more chance to generate a final response ctx = await self.context.build_context( session=session, agent=self.profile, artifacts=artifacts, conversation=conversation, ) async for chunk in self.model.stream( messages=ctx.to_messages(), config=config, ): if chunk.delta: accumulated_content += chunk.delta if chunk.finish_reason: break break else: all_duplicates_streak = 0 return { "content": accumulated_content, "artifacts": artifacts, "tool_executions": tool_executions, "usage": { "input_tokens": total_input_tokens, "output_tokens": total_output_tokens, }, } async def _execute_tool( self, session: SessionState, tool_name: str, arguments: dict[str, Any], artifacts: list[ArtifactSummary], ) -> ToolExecution: """Execute a tool and summarise the result.""" exec_id = uuid.uuid4().hex[:12] tool_exec = ToolExecution( execution_id=exec_id, tool_name=tool_name, arguments=arguments, status=ToolExecutionStatus.RUNNING, ) logger.info("Tool call: %s(%s)", tool_name, json.dumps(arguments)[:200]) start = time.monotonic() try: if self.mcp.is_running and tool_name in self.mcp.tools: result = await self.mcp.call_tool(tool_name, arguments) raw_output = self._extract_mcp_output(result) else: raw_output = f"Tool '{tool_name}' not available via MCP." duration = (time.monotonic() - start) * 1000 # Summarise — raw output NEVER enters context task_id = session.current_task.task_id if session.current_task else "none" artifact = self.context.summarize_tool_output( tool_name=tool_name, raw_output=raw_output, session_id=session.session_id, task_id=task_id, ) # Store artifact await self.memory.store_artifact(session.session_id, artifact) artifacts.append(artifact) tool_exec.status = ToolExecutionStatus.COMPLETED tool_exec.result_summary = artifact.summary tool_exec.raw_output = raw_output[:8000] tool_exec.duration_ms = duration await self.sse.emit( EventType.TOOL_COMPLETED, { "tool": tool_name, "status": "completed", "summary": artifact.summary[:200], }, session_id=session.session_id, ) except Exception as e: tool_exec.status = ToolExecutionStatus.FAILED tool_exec.error = str(e) tool_exec.duration_ms = (time.monotonic() - start) * 1000 logger.error("Tool execution failed: %s — %s", tool_name, e) await self.sse.emit( EventType.TOOL_COMPLETED, {"tool": tool_name, "status": "failed", "error": str(e)}, session_id=session.session_id, ) return tool_exec def _get_allowed_tools(self) -> list[dict[str, Any]]: """Return tool definitions filtered by this agent's allowed_tools.""" if not self.mcp.is_running: return [] all_tools = self.mcp.get_tool_definitions() if not self.profile.allowed_tools: return all_tools # No filter → all tools return [t for t in all_tools if t["name"] in self.profile.allowed_tools] @staticmethod def _extract_mcp_output(result: dict[str, Any]) -> str: """Extract text content from MCP tool result.""" content = result.get("content", []) if isinstance(content, list): parts: list[str] = [] for item in content: if isinstance(item, dict) and item.get("type") == "text": parts.append(item.get("text", "")) return "\n".join(parts) if parts else json.dumps(result) return str(content)