diff --git a/src/adapters/openai_adapter.py b/src/adapters/openai_adapter.py index 355b408..bc0c894 100644 --- a/src/adapters/openai_adapter.py +++ b/src/adapters/openai_adapter.py @@ -44,6 +44,7 @@ class OpenAIAdapter(ModelAdapter): "temperature": config.temperature, "messages": messages, "stream": True, + "stream_options": {"include_usage": True}, } if tools: kwargs["tools"] = self._format_tools(tools) @@ -52,9 +53,22 @@ class OpenAIAdapter(ModelAdapter): tool_calls_acc: dict[int, dict[str, str]] = {} + final_usage: dict[str, int] = {} + async for chunk in stream: + # With include_usage, the last chunk has usage but no choices + if chunk.usage: + final_usage = { + "input_tokens": chunk.usage.prompt_tokens or 0, + "output_tokens": chunk.usage.completion_tokens or 0, + } + choice = chunk.choices[0] if chunk.choices else None if not choice: + # Usage-only chunk (last one with include_usage) — emit it + if final_usage: + yield StreamChunk(usage=final_usage) + final_usage = {} # Only emit once continue delta = choice.delta @@ -99,16 +113,15 @@ class OpenAIAdapter(ModelAdapter): tool_arguments=acc["arguments"], finish_reason="tool_use", ) + # Emit usage after tool_use chunks + if final_usage: + yield StreamChunk(usage=final_usage) else: yield StreamChunk( finish_reason="end_turn" if choice.finish_reason == "stop" else choice.finish_reason, - usage={ - "output_tokens": chunk.usage.completion_tokens - if chunk.usage - else 0 - }, + usage=final_usage, ) # ------------------------------------------------------------------ diff --git a/src/config.py b/src/config.py index dd47241..1e7e9eb 100644 --- a/src/config.py +++ b/src/config.py @@ -48,6 +48,10 @@ class Settings(BaseSettings): mcp_timeout_seconds: float = 30.0 mcp_startup_timeout_seconds: float = 10.0 + # --- Pricing (per 1M tokens) --- + cost_per_1m_input: float = 2.50 + cost_per_1m_output: float = 15.00 + # --- Orchestrator --- max_execution_steps: int = 25 subagent_max_steps: int = 10 diff --git a/src/orchestrator/agents/base.py b/src/orchestrator/agents/base.py index 8bbee5e..b65c339 100644 --- a/src/orchestrator/agents/base.py +++ b/src/orchestrator/agents/base.py @@ -59,6 +59,8 @@ class BaseAgent: ) tool_executions: list[ToolExecution] = [] accumulated_content = "" + total_input_tokens = 0 + total_output_tokens = 0 # Real conversation history: assistant messages + tool results conversation: list[dict[str, Any]] = [] tool_fingerprints: dict[str, ToolExecution] = {} @@ -139,6 +141,11 @@ class BaseAgent: tool["parsed_arguments"] = args tool_calls.append(tool) + # Accumulate token usage from any chunk that has it + if chunk.usage: + total_input_tokens += chunk.usage.get("input_tokens", 0) + total_output_tokens += chunk.usage.get("output_tokens", 0) + if chunk.finish_reason == "end_turn": break @@ -234,6 +241,10 @@ class BaseAgent: "content": accumulated_content, "artifacts": artifacts, "tool_executions": tool_executions, + "usage": { + "input_tokens": total_input_tokens, + "output_tokens": total_output_tokens, + }, } async def _execute_tool( diff --git a/src/orchestrator/agents/planner.py b/src/orchestrator/agents/planner.py index ca802f4..8ed46a9 100644 --- a/src/orchestrator/agents/planner.py +++ b/src/orchestrator/agents/planner.py @@ -55,9 +55,10 @@ def create_planner_profile() -> AgentProfile: class PlannerAgent(BaseAgent): """Generates execution plans from objectives.""" - async def plan(self, session: SessionState) -> list[TaskStep]: - """Generate a plan and return TaskSteps.""" + async def plan(self, session: SessionState) -> tuple[list[TaskStep], dict[str, int]]: + """Generate a plan and return (TaskSteps, usage).""" result = await self.execute(session, max_steps=1) + usage = result.get("usage", {"input_tokens": 0, "output_tokens": 0}) content = result["content"].strip() # Parse the JSON plan from the model output @@ -92,7 +93,7 @@ class PlannerAgent(BaseAgent): parsed.get("facts", []) ) - return steps + return steps, usage except (json.JSONDecodeError, KeyError) as e: logger.warning("Failed to parse planner output: %s", e) @@ -104,4 +105,4 @@ class PlannerAgent(BaseAgent): else "Execute task", agent_role="coder", ) - ] + ], usage diff --git a/src/orchestrator/engine.py b/src/orchestrator/engine.py index c5fb2ad..07d8b88 100644 --- a/src/orchestrator/engine.py +++ b/src/orchestrator/engine.py @@ -115,9 +115,10 @@ class OrchestratorEngine: # 2. Plan task.status = TaskStatus.PLANNING + planner_usage: dict[str, int] = {"input_tokens": 0, "output_tokens": 0} try: planner = self._create_agent(AgentRole.PLANNER) - plan_steps = await planner.plan(session) + plan_steps, planner_usage = await planner.plan(session) task.plan = plan_steps task.status = TaskStatus.EXECUTING except Exception as e: @@ -234,6 +235,21 @@ class OrchestratorEngine: session_id=session.session_id, ) + # Accumulate token usage: planner + all steps + review + total_input = planner_usage.get("input_tokens", 0) + total_output = planner_usage.get("output_tokens", 0) + for r in results: + total_input += r.get("usage", {}).get("input_tokens", 0) + total_output += r.get("usage", {}).get("output_tokens", 0) + # Add review usage if any + total_input += review_result.get("usage", {}).get("input_tokens", 0) + total_output += review_result.get("usage", {}).get("output_tokens", 0) + # Calculate cost + cost_usd = ( + (total_input / 1_000_000) * settings.cost_per_1m_input + + (total_output / 1_000_000) * settings.cost_per_1m_output + ) + return { "session_id": session.session_id, "task_id": task.task_id, @@ -245,6 +261,11 @@ class OrchestratorEngine: ), "review": review_result.get("content", ""), "status": status, + "usage": { + "input_tokens": total_input, + "output_tokens": total_output, + }, + "total_cost_usd": round(cost_usd, 6), } def _error_result(self, session: SessionState, error: str) -> dict[str, Any]: