# @mariozechner/pi-agent-core Stateful agent with tool execution and event streaming. Built on `@mariozechner/pi-ai`. ## Installation ```bash npm install @mariozechner/pi-agent-core ``` ## Quick Start ```typescript import { Agent } from "@mariozechner/pi-agent-core"; import { getModel } from "@mariozechner/pi-ai"; const agent = new Agent({ initialState: { systemPrompt: "You are a helpful assistant.", model: getModel("anthropic", "claude-sonnet-4-20250514"), }, }); agent.subscribe((event) => { if (event.type === "message_update" && event.assistantMessageEvent.type === "text_delta") { // Stream just the new text chunk process.stdout.write(event.assistantMessageEvent.delta); } }); await agent.prompt("Hello!"); ``` ## Core Concepts ### AgentMessage vs LLM Message The agent works with `AgentMessage`, a flexible type that can include: - Standard LLM messages (`user`, `assistant`, `toolResult`) - Custom app-specific message types via declaration merging LLMs only understand `user`, `assistant`, and `toolResult`. The `convertToLlm` function bridges this gap by filtering and transforming messages before each LLM call. ### Message Flow ``` AgentMessage[] → transformContext() → AgentMessage[] → convertToLlm() → Message[] → LLM (optional) (required) ``` 1. **transformContext**: Prune old messages, inject external context 2. **convertToLlm**: Filter out UI-only messages, convert custom types to LLM format ## Event Flow The agent emits events for UI updates. Understanding the event sequence helps build responsive interfaces. ### prompt() Event Sequence When you call `prompt("Hello")`: ``` prompt("Hello") ├─ agent_start ├─ turn_start ├─ message_start { message: userMessage } // Your prompt ├─ message_end { message: userMessage } ├─ message_start { message: assistantMessage } // LLM starts responding ├─ message_update { message: partial... } // Streaming chunks ├─ message_update { message: partial... } ├─ message_end { message: assistantMessage } // Complete response ├─ turn_end { message, toolResults: [] } └─ agent_end { messages: [...] } ``` ### With Tool Calls If the assistant calls tools, the loop continues: ``` prompt("Read config.json") ├─ agent_start ├─ turn_start ├─ message_start/end { userMessage } ├─ message_start { assistantMessage with toolCall } ├─ message_update... ├─ message_end { assistantMessage } ├─ tool_execution_start { toolCallId, toolName, args } ├─ tool_execution_update { partialResult } // If tool streams ├─ tool_execution_end { toolCallId, result } ├─ message_start/end { toolResultMessage } ├─ turn_end { message, toolResults: [toolResult] } │ ├─ turn_start // Next turn ├─ message_start { assistantMessage } // LLM responds to tool result ├─ message_update... ├─ message_end ├─ turn_end └─ agent_end ``` Tool execution mode is configurable: - `parallel` (default): preflight tool calls sequentially, execute allowed tools concurrently, emit `tool_execution_end` as soon as each tool is finalized, then emit toolResult messages and `turn_end.toolResults` in assistant source order - `sequential`: execute tool calls one by one, matching the historical behavior In parallel mode, tool completion events follow tool completion order, but persisted toolResult messages still follow assistant source order. The mode can be set globally via `toolExecution` in the agent config, or per-tool via `executionMode` on `AgentTool`. If any tool call in a batch targets a tool with `executionMode: "sequential"`, the entire batch executes sequentially regardless of the global setting. The `beforeToolCall` hook runs after `tool_execution_start` and validated argument parsing. It can block execution. The `afterToolCall` hook runs after tool execution finishes and before `tool_execution_end` and final tool result message events are emitted. Tools can also return `terminate: true` to hint that the automatic follow-up LLM call should be skipped. The loop only stops early when every finalized tool result in that batch sets `terminate: true`. Mixed batches continue normally. Low-level loop callers can set `shouldStopAfterTurn` to stop gracefully after the current turn completes: ```typescript const stream = agentLoop(prompts, context, { model, convertToLlm, shouldStopAfterTurn: async ({ message, toolResults, context, newMessages }) => { return shouldCompactBeforeNextTurn(context.messages); }, }); ``` `shouldStopAfterTurn` runs after `turn_end` is emitted and after the assistant response and any tool executions have completed normally. If it returns `true`, the loop emits `agent_end` and exits before polling steering or follow-up queues, and before starting another LLM call. It does not abort the provider stream, does not cancel running tools, and does not alter the assistant message stop reason. When you use the `Agent` class, assistant `message_end` processing is treated as a barrier before tool preflight begins. That means `beforeToolCall` sees agent state that already includes the assistant message that requested the tool call. ### continue() Event Sequence `continue()` resumes from existing context without adding a new message. Use it for retries after errors. ```typescript // After an error, retry from current state await agent.continue(); ``` The last message in context must be `user` or `toolResult` (not `assistant`). ### Event Types | Event | Description | |-------|-------------| | `agent_start` | Agent begins processing | | `agent_end` | Final event for the run. Awaited subscribers for this event still count toward settlement | | `turn_start` | New turn begins (one LLM call + tool executions) | | `turn_end` | Turn completes with assistant message and tool results | | `message_start` | Any message begins (user, assistant, toolResult) | | `message_update` | **Assistant only.** Includes `assistantMessageEvent` with delta | | `message_end` | Message completes | | `tool_execution_start` | Tool begins | | `tool_execution_update` | Tool streams progress | | `tool_execution_end` | Tool completes | `Agent.subscribe()` listeners are awaited in registration order. `agent_end` means no more loop events will be emitted, but `await agent.waitForIdle()` and `await agent.prompt(...)` only settle after awaited `agent_end` listeners finish. ## Agent Options ```typescript const agent = new Agent({ // Initial state initialState: { systemPrompt: string, model: Model, thinkingLevel: "off" | "minimal" | "low" | "medium" | "high" | "xhigh", tools: AgentTool[], messages: AgentMessage[], }, // Convert AgentMessage[] to LLM Message[] (required for custom message types) convertToLlm: (messages) => messages.filter(...), // Transform context before convertToLlm (for pruning, compaction) transformContext: async (messages, signal) => pruneOldMessages(messages), // Steering mode: "one-at-a-time" (default) or "all" steeringMode: "one-at-a-time", // Follow-up mode: "one-at-a-time" (default) or "all" followUpMode: "one-at-a-time", // Custom stream function (for proxy backends) streamFn: streamProxy, // Session ID for provider caching sessionId: "session-123", // Dynamic API key resolution (for expiring OAuth tokens) getApiKey: async (provider) => refreshToken(), // Tool execution mode: "parallel" (default) or "sequential" toolExecution: "parallel", // Preflight each tool call after args are validated. Can block execution. beforeToolCall: async ({ toolCall, args, context }) => { if (toolCall.name === "bash") { return { block: true, reason: "bash is disabled" }; } }, // Postprocess each tool result before final tool events are emitted. afterToolCall: async ({ toolCall, result, isError, context }) => { if (toolCall.name === "notify_done" && !isError) { return { terminate: true }; } if (!isError) { return { details: { ...result.details, audited: true } }; } }, // Custom thinking budgets for token-based providers thinkingBudgets: { minimal: 128, low: 512, medium: 1024, high: 2048, }, }); ``` ## Agent State ```typescript interface AgentState { systemPrompt: string; model: Model; thinkingLevel: ThinkingLevel; tools: AgentTool[]; messages: AgentMessage[]; readonly isStreaming: boolean; readonly streamingMessage?: AgentMessage; readonly pendingToolCalls: ReadonlySet; readonly errorMessage?: string; } ``` Access state via `agent.state`. Assigning `agent.state.tools = [...]` or `agent.state.messages = [...]` copies the top-level array before storing it. Mutating the returned array mutates the current agent state. During streaming, `agent.state.streamingMessage` contains the current partial assistant message. `agent.state.isStreaming` remains `true` until the run fully settles, including awaited `agent_end` subscribers. ## Methods ### Prompting ```typescript // Text prompt await agent.prompt("Hello"); // With images await agent.prompt("What's in this image?", [ { type: "image", data: base64Data, mimeType: "image/jpeg" } ]); // AgentMessage directly await agent.prompt({ role: "user", content: "Hello", timestamp: Date.now() }); // Continue from current context (last message must be user or toolResult) await agent.continue(); ``` ### State Management ```typescript agent.state.systemPrompt = "New prompt"; agent.state.model = getModel("openai", "gpt-4o"); agent.state.thinkingLevel = "medium"; agent.state.tools = [myTool]; agent.toolExecution = "sequential"; agent.beforeToolCall = async ({ toolCall }) => undefined; agent.afterToolCall = async ({ toolCall, result }) => undefined; agent.state.messages = newMessages; // top-level array is copied agent.state.messages.push(message); agent.reset(); ``` ### Session and Thinking Budgets ```typescript agent.sessionId = "session-123"; agent.thinkingBudgets = { minimal: 128, low: 512, medium: 1024, high: 2048, }; ``` ### Control ```typescript agent.abort(); // Cancel current operation await agent.waitForIdle(); // Wait for completion ``` ### Events ```typescript const unsubscribe = agent.subscribe(async (event, signal) => { if (event.type === "agent_end") { // Final barrier work for the run await flushSessionState(signal); } }); unsubscribe(); ``` ## Steering and Follow-up Steering messages let you interrupt the agent while tools are running. Follow-up messages let you queue work after the agent would otherwise stop. ```typescript agent.steeringMode = "one-at-a-time"; agent.followUpMode = "one-at-a-time"; // While agent is running tools agent.steer({ role: "user", content: "Stop! Do this instead.", timestamp: Date.now(), }); // After the agent finishes its current work agent.followUp({ role: "user", content: "Also summarize the result.", timestamp: Date.now(), }); const steeringMode = agent.steeringMode; const followUpMode = agent.followUpMode; agent.clearSteeringQueue(); agent.clearFollowUpQueue(); agent.clearAllQueues(); ``` Use clearSteeringQueue, clearFollowUpQueue, or clearAllQueues to drop queued messages. When steering messages are detected after a turn completes: 1. All tool calls from the current assistant message have already finished 2. Steering messages are injected 3. The LLM responds on the next turn Follow-up messages are checked only when there are no more tool calls and no steering messages. If any are queued, they are injected and another turn runs. ## Custom Message Types Extend `AgentMessage` via declaration merging: ```typescript declare module "@mariozechner/pi-agent-core" { interface CustomAgentMessages { notification: { role: "notification"; text: string; timestamp: number }; } } // Now valid const msg: AgentMessage = { role: "notification", text: "Info", timestamp: Date.now() }; ``` Handle custom types in `convertToLlm`: ```typescript const agent = new Agent({ convertToLlm: (messages) => messages.flatMap(m => { if (m.role === "notification") return []; // Filter out return [m]; }), }); ``` ## Tools Define tools using `AgentTool`: ```typescript import { Type } from "typebox"; const readFileTool: AgentTool = { name: "read_file", label: "Read File", // For UI display description: "Read a file's contents", parameters: Type.Object({ path: Type.String({ description: "File path" }), }), // Override execution mode for this tool (optional). // "sequential" forces the entire batch to run one at a time. // "parallel" allows concurrent execution with other tool calls. // If omitted, the global toolExecution config applies. executionMode: "sequential", execute: async (toolCallId, params, signal, onUpdate) => { const content = await fs.readFile(params.path, "utf-8"); // Optional: stream progress onUpdate?.({ content: [{ type: "text", text: "Reading..." }], details: {} }); // Optional: add `terminate: true` here to skip the automatic follow-up LLM call // when every finalized tool result in the batch does the same. return { content: [{ type: "text", text: content }], details: { path: params.path, size: content.length }, }; }, }; agent.state.tools = [readFileTool]; ``` ### Error Handling **Throw an error** when a tool fails. Do not return error messages as content. ```typescript execute: async (toolCallId, params, signal, onUpdate) => { if (!fs.existsSync(params.path)) { throw new Error(`File not found: ${params.path}`); } // Return content only on success return { content: [{ type: "text", text: "..." }] }; } ``` Thrown errors are caught by the agent and reported to the LLM as tool errors with `isError: true`. Return `terminate: true` from `execute()` or `afterToolCall` to hint that the agent should stop after the current tool batch. This only takes effect when every finalized tool result in the batch is terminating. The hint is runtime-only; emitted `toolResult` transcript messages remain standard LLM tool results. ## Proxy Usage For browser apps that proxy through a backend: ```typescript import { Agent, streamProxy } from "@mariozechner/pi-agent-core"; const agent = new Agent({ streamFn: (model, context, options) => streamProxy(model, context, { ...options, authToken: "...", proxyUrl: "https://your-server.com", }), }); ``` ## Low-Level API For direct control without the Agent class: ```typescript import { agentLoop, agentLoopContinue } from "@mariozechner/pi-agent-core"; const context: AgentContext = { systemPrompt: "You are helpful.", messages: [], tools: [], }; const config: AgentLoopConfig = { model: getModel("openai", "gpt-4o"), convertToLlm: (msgs) => msgs.filter(m => ["user", "assistant", "toolResult"].includes(m.role)), toolExecution: "parallel", // overridden by per-tool executionMode if set beforeToolCall: async ({ toolCall, args, context }) => undefined, afterToolCall: async ({ toolCall, result, isError, context }) => undefined, }; const userMessage = { role: "user", content: "Hello", timestamp: Date.now() }; for await (const event of agentLoop([userMessage], context, config)) { console.log(event.type); } // Continue from existing context for await (const event of agentLoopContinue(context, config)) { console.log(event.type); } ``` These low-level streams are observational. They preserve event order, but they do not wait for your async event handling to settle before later producer phases continue. If you need message processing to act as a barrier before tool preflight, use the `Agent` class instead of raw `agentLoop()` or `agentLoopContinue()`. ## License MIT