Documentation
Framework
Version
Class References
Function References
Interface References
Type Alias References
Variable References

StreamProcessor

Class: StreamProcessor

Defined in: activities/chat/stream/processor.ts:127

StreamProcessor - State machine for processing AI response streams

Manages the full UIMessage[] conversation and emits events on changes. Trusts the adapter contract: adapters emit clean AG-UI events in the correct order.

State tracking:

  • Full message array
  • Current assistant message being streamed
  • Text content accumulation (reset on TEXT_MESSAGE_START)
  • Multiple parallel tool calls
  • Tool call completion via TOOL_CALL_END events

See

  • docs/chat-architecture.md#streamprocessor-internal-state — State field reference
  • docs/chat-architecture.md#adapter-contract — What this class expects from adapters

Constructors

Constructor

ts
new StreamProcessor(options): StreamProcessor;
new StreamProcessor(options): StreamProcessor;

Defined in: activities/chat/stream/processor.ts:154

Parameters

options

StreamProcessorOptions = {}

Returns

StreamProcessor

Methods

addToolApprovalResponse()

ts
addToolApprovalResponse(approvalId, approved): void;
addToolApprovalResponse(approvalId, approved): void;

Defined in: activities/chat/stream/processor.ts:336

Add an approval response (called by client after handling onApprovalRequest)

Parameters

approvalId

string

approved

boolean

Returns

void


addToolResult()

ts
addToolResult(
   toolCallId, 
   output, 
   error?): void;
addToolResult(
   toolCallId, 
   output, 
   error?): void;

Defined in: activities/chat/stream/processor.ts:292

Add a tool result (called by client after handling onToolCall)

Parameters

toolCallId

string

output

any

error?

string

Returns

void


addUserMessage()

ts
addUserMessage(content, id?): UIMessage;
addUserMessage(content, id?): UIMessage;

Defined in: activities/chat/stream/processor.ts:201

Add a user message to the conversation. Supports both simple string content and multimodal content arrays.

Parameters

content

The message content (string or array of content parts)

string | ContentPart[]

id?

string

Optional custom message ID (generated if not provided)

Returns

UIMessage

The created UIMessage

Example

ts
// Simple text message
processor.addUserMessage('Hello!')

// Multimodal message with image
processor.addUserMessage([
  { type: 'text', content: 'What is in this image?' },
  { type: 'image', source: { type: 'url', value: 'https://example.com/photo.jpg' } }
])

// With custom ID
processor.addUserMessage('Hello!', 'custom-id-123')
// Simple text message
processor.addUserMessage('Hello!')

// Multimodal message with image
processor.addUserMessage([
  { type: 'text', content: 'What is in this image?' },
  { type: 'image', source: { type: 'url', value: 'https://example.com/photo.jpg' } }
])

// With custom ID
processor.addUserMessage('Hello!', 'custom-id-123')

areAllToolsComplete()

ts
areAllToolsComplete(): boolean;
areAllToolsComplete(): boolean;

Defined in: activities/chat/stream/processor.ts:367

Check if all tool calls in the last assistant message are complete Useful for auto-continue logic

Returns

boolean


clearMessages()

ts
clearMessages(): void;
clearMessages(): void;

Defined in: activities/chat/stream/processor.ts:411

Clear all messages

Returns

void


finalizeStream()

ts
finalizeStream(): void;
finalizeStream(): void;

Defined in: activities/chat/stream/processor.ts:972

Finalize the stream — complete all pending operations.

Called when the async iterable ends (stream closed). Acts as the final safety net: completes any remaining tool calls, flushes un-emitted text, and fires onStreamEnd.

Returns

void

See

docs/chat-architecture.md#single-shot-text-response — Finalization step


getCurrentAssistantMessageId()

ts
getCurrentAssistantMessageId(): string | null;
getCurrentAssistantMessageId(): string | null;

Defined in: activities/chat/stream/processor.ts:253

Get the current assistant message ID (if one has been created). Returns null if prepareAssistantMessage() was called but no content has arrived yet.

Returns

string | null


getMessages()

ts
getMessages(): UIMessage[];
getMessages(): UIMessage[];

Defined in: activities/chat/stream/processor.ts:359

Get current messages

Returns

UIMessage[]


getRecording()

ts
getRecording(): ChunkRecording | null;
getRecording(): ChunkRecording | null;

Defined in: activities/chat/stream/processor.ts:1069

Get the current recording

Returns

ChunkRecording | null


getState()

ts
getState(): ProcessorState;
getState(): ProcessorState;

Defined in: activities/chat/stream/processor.ts:1042

Get current processor state

Returns

ProcessorState


prepareAssistantMessage()

ts
prepareAssistantMessage(): void;
prepareAssistantMessage(): void;

Defined in: activities/chat/stream/processor.ts:231

Prepare for a new assistant message stream. Does NOT create the message immediately -- the message is created lazily when the first content-bearing chunk arrives via ensureAssistantMessage(). This prevents empty assistant messages from flickering in the UI when auto-continuation produces no content.

Returns

void


process()

ts
process(stream): Promise<ProcessorResult>;
process(stream): Promise<ProcessorResult>;

Defined in: activities/chat/stream/processor.ts:424

Process a stream and emit events through handlers

Parameters

stream

AsyncIterable<any>

Returns

Promise<ProcessorResult>


processChunk()

ts
processChunk(chunk): void;
processChunk(chunk): void;

Defined in: activities/chat/stream/processor.ts:458

Process a single chunk from the stream.

Central dispatch for all AG-UI events. Each event type maps to a specific handler. Events not listed in the switch are intentionally ignored (RUN_STARTED, TEXT_MESSAGE_END, STEP_STARTED, STATE_SNAPSHOT, STATE_DELTA).

Parameters

chunk

AGUIEvent

Returns

void

See

docs/chat-architecture.md#adapter-contract — Expected event types and ordering


removeMessagesAfter()

ts
removeMessagesAfter(index): void;
removeMessagesAfter(index): void;

Defined in: activities/chat/stream/processor.ts:403

Remove messages after a certain index (for reload/retry)

Parameters

index

number

Returns

void


reset()

ts
reset(): void;
reset(): void;

Defined in: activities/chat/stream/processor.ts:1092

Full reset (including messages)

Returns

void


setMessages()

ts
setMessages(messages): void;
setMessages(messages): void;

Defined in: activities/chat/stream/processor.ts:173

Set the messages array (e.g., from persisted state)

Parameters

messages

UIMessage[]

Returns

void


startAssistantMessage()

ts
startAssistantMessage(): string;
startAssistantMessage(): string;

Defined in: activities/chat/stream/processor.ts:243

Returns

string

Deprecated

Use prepareAssistantMessage() instead. This eagerly creates an assistant message which can cause empty message flicker.


startRecording()

ts
startRecording(): void;
startRecording(): void;

Defined in: activities/chat/stream/processor.ts:1056

Start recording chunks

Returns

void


toModelMessages()

ts
toModelMessages(): ModelMessage<
  | string
  | ContentPart<unknown, unknown, unknown, unknown, unknown>[]
  | null>[];
toModelMessages(): ModelMessage<
  | string
  | ContentPart<unknown, unknown, unknown, unknown, unknown>[]
  | null>[];

Defined in: activities/chat/stream/processor.ts:348

Get the conversation as ModelMessages (for sending to LLM)

Returns

ModelMessage< | string | ContentPart<unknown, unknown, unknown, unknown, unknown>[] | null>[]


replay()

ts
static replay(recording, options?): Promise<ProcessorResult>;
static replay(recording, options?): Promise<ProcessorResult>;

Defined in: activities/chat/stream/processor.ts:1112

Replay a recording through the processor

Parameters

recording

ChunkRecording

options?

StreamProcessorOptions

Returns

Promise<ProcessorResult>