Agent Streaming

Stream agent responses token-by-token

Overview

The chatStream method returns an async iterable that yields StreamChunk objects as the agent generates its response. This enables real-time display of agent output in CLIs, web UIs, and other interfaces.

Basic usage (Node.js)

1import { Recursiv } from '@recursiv/sdk';
2
3const r = new Recursiv();
4
5const stream = r.agents.chatStream('agent_123', {
6 message: 'Explain how transformers work in machine learning.',
7});
8
9for await (const chunk of stream) {
10 if (chunk.type === 'text_delta') {
11 process.stdout.write(chunk.delta ?? '');
12 }
13}
14
15console.log('\n--- Done ---');

Stream chunk types

Each chunk has a type field indicating what kind of data it contains:

TypeFieldsDescription
text_deltadeltaA token of the agent’s response text.
tool_usetool_nameThe agent is calling a tool.
tool_resultcontentThe result of a tool call.
doneThe stream is complete.
errorerrorAn error occurred during streaming.
1interface StreamChunk {
2 type: 'text_delta' | 'tool_use' | 'tool_result' | 'done' | 'error';
3 delta?: string;
4 tool_name?: string;
5 content?: string;
6 error?: string;
7}

Continue a conversation

Pass conversation_id to maintain context across streamed messages:

1// First message
2let conversationId: string | undefined;
3let fullResponse = '';
4
5for await (const chunk of r.agents.chatStream('agent_123', {
6 message: 'What is Rust?',
7})) {
8 if (chunk.type === 'text_delta') {
9 fullResponse += chunk.delta ?? '';
10 }
11}
12
13// Use the conversation_id from the non-streaming chat to continue
14const { data: initial } = await r.agents.chat('agent_123', {
15 message: 'What is Rust?',
16});
17
18// Follow-up with streaming
19for await (const chunk of r.agents.chatStream('agent_123', {
20 message: 'How does its ownership model work?',
21 conversation_id: initial.conversation_id,
22})) {
23 if (chunk.type === 'text_delta') {
24 process.stdout.write(chunk.delta ?? '');
25 }
26}

Collecting the full response

If you want to stream to the UI but also capture the full text:

1let fullText = '';
2
3for await (const chunk of r.agents.chatStream('agent_123', {
4 message: 'Write a haiku about TypeScript.',
5})) {
6 switch (chunk.type) {
7 case 'text_delta':
8 fullText += chunk.delta ?? '';
9 process.stdout.write(chunk.delta ?? '');
10 break;
11 case 'tool_use':
12 console.log(`\n[Using tool: ${chunk.tool_name}]`);
13 break;
14 case 'tool_result':
15 console.log(`[Tool result: ${chunk.content}]`);
16 break;
17 case 'error':
18 console.error(`\nStream error: ${chunk.error}`);
19 break;
20 case 'done':
21 break;
22 }
23}
24
25console.log('\n\nFull response:', fullText);

Error handling

Errors during streaming can come from either the initial connection or within the stream itself.

Connection errors

If the initial request fails (auth error, agent not found, etc.), chatStream throws before yielding any chunks:

1import { AuthenticationError, NotFoundError } from '@recursiv/sdk';
2
3try {
4 for await (const chunk of r.agents.chatStream('bad_id', {
5 message: 'Hello',
6 })) {
7 // ...
8 }
9} catch (err) {
10 if (err instanceof NotFoundError) {
11 console.error('Agent not found');
12 } else if (err instanceof AuthenticationError) {
13 console.error('Invalid API key');
14 }
15}

In-stream errors

If something goes wrong during generation, you will receive a chunk with type: 'error':

1for await (const chunk of r.agents.chatStream('agent_123', {
2 message: 'Hello',
3})) {
4 if (chunk.type === 'error') {
5 console.error('Stream error:', chunk.error);
6 break;
7 }
8 if (chunk.type === 'text_delta') {
9 process.stdout.write(chunk.delta ?? '');
10 }
11}

Web / React example

1import { Recursiv } from '@recursiv/sdk';
2import { useState } from 'react';
3
4function AgentChat({ agentId }: { agentId: string }) {
5 const [response, setResponse] = useState('');
6 const [isStreaming, setIsStreaming] = useState(false);
7
8 async function sendMessage(message: string) {
9 setResponse('');
10 setIsStreaming(true);
11
12 const r = new Recursiv({ apiKey: 'sk_live_...' });
13
14 for await (const chunk of r.agents.chatStream(agentId, { message })) {
15 if (chunk.type === 'text_delta') {
16 setResponse((prev) => prev + (chunk.delta ?? ''));
17 }
18 }
19
20 setIsStreaming(false);
21 }
22
23 return (
24 <div>
25 <button onClick={() => sendMessage('Hello!')}>
26 {isStreaming ? 'Streaming...' : 'Send'}
27 </button>
28 <pre>{response}</pre>
29 </div>
30 );
31}

React Native warning

chatStream() does not work in React Native or Expo. React Native’s fetch implementation does not support ReadableStream, which is required for SSE parsing.

See the React Native guide for a complete workaround using raw fetch and manual SSE parsing.

React Native workaround (summary)

In React Native, you must use raw fetch with react-native-sse or manual line parsing. Here is the essential pattern:

1import EventSource from 'react-native-sse';
2
3function streamAgentChat(
4 agentId: string,
5 message: string,
6 apiKey: string,
7 onDelta: (text: string) => void,
8 onDone: () => void,
9 onError: (err: string) => void,
10) {
11 const baseUrl = 'https://api.recursiv.io/api/v1';
12 const url = `${baseUrl}/agents/${agentId}/chat/stream`;
13
14 const es = new EventSource(url, {
15 method: 'POST',
16 headers: {
17 Authorization: `Bearer ${apiKey}`,
18 'Content-Type': 'application/json',
19 },
20 body: JSON.stringify({ message }),
21 });
22
23 es.addEventListener('message', (event: any) => {
24 if (event.data === '[DONE]') {
25 es.close();
26 onDone();
27 return;
28 }
29
30 try {
31 const chunk = JSON.parse(event.data);
32 if (chunk.type === 'text_delta' && chunk.delta) {
33 onDelta(chunk.delta);
34 } else if (chunk.type === 'error') {
35 onError(chunk.error ?? 'Unknown error');
36 es.close();
37 }
38 } catch {
39 // Skip malformed chunks
40 }
41 });
42
43 es.addEventListener('error', (event: any) => {
44 onError(event.message ?? 'Connection error');
45 es.close();
46 });
47
48 return () => es.close();
49}

See the full React Native guide at React Native for a complete React component example.

How it works

Under the hood, chatStream makes a POST request to /agents/{id}/chat/stream and parses the Server-Sent Events (SSE) response. Each SSE data: line is parsed as JSON into a StreamChunk.

The SSE format looks like:

data: {"type":"text_delta","delta":"Hello"}
data: {"type":"text_delta","delta":" world"}
data: {"type":"done"}
data: [DONE]

The SDK’s parseSSE utility handles buffering, line splitting, and JSON parsing. You can import it directly if you need it for custom streaming:

1import { parseSSE } from '@recursiv/sdk';