Streaming

Updated

The SDK uses Node.js native http/https modules for true streaming, yielding data chunks as they arrive over the wire. This provides lower latency than fetch-based approaches, which buffer the full response body.

Streaming is available at two levels:

  1. High-level: The onStreamEvent callback on chat.responses.create()
  2. Low-level: The StreamingHttpClient class and httpStreamRequest function
import.ts
import {
  // High-level
  Chaos,
  type ChatCreateRequestParams,
  // Low-level
  StreamingHttpClient,
  httpStreamRequest,
  type StreamingHttpClientOptions,
  type StreamRequestOptions,
  type HttpStreamOptions,
} from '@chaoslabs/ai-sdk';

onStreamEvent Callback

The simplest way to receive real-time streaming events. Pass an onStreamEvent callback when creating a response. The callback fires for each ChaosSDKMessage as it arrives, before the full response is assembled.

on-stream-event-type.ts
export type ChatCreateRequestParams = ChatCreateRequest & {
  onStreamEvent?: (message: ChaosSDKMessage) => void;
};
on-stream-event-example.ts
import { Chaos, WALLET_MODEL, extractText } from '@chaoslabs/ai-sdk';
 
const chaos = new Chaos({ apiKey: process.env.CHAOS_API_KEY! });
 
const response = await chaos.chat.responses.create({
  model: WALLET_MODEL,
  input: [{ type: 'message', role: 'user', content: 'Analyze my portfolio' }],
  metadata: {
    user_id: 'user-123',
    session_id: 'session-456',
    wallets: [{ address: '0x...', chain: 'ethereum' }],
  },
  onStreamEvent: (event) => {
    // Fires for each message as it arrives
    console.log(`[${event.type}]`, event);
  },
});
 
// response contains the fully assembled result
console.log(extractText(response));
[@portabletext/react] Unknown block type "callout", specify a component for it in the `components.types` prop

StreamingHttpClient

A higher-level streaming client that wraps httpStreamRequest with abort support and NDJSON line parsing.

streaming-http-client-types.ts
export interface StreamingHttpClientOptions {
  baseUrl: string;
  timeout: number;
}
 
export interface StreamRequestOptions {
  method: string;
  headers?: Record<string, string>;
  body: string;
}

Constructor

streaming-client-constructor.ts
const client = new StreamingHttpClient({
  baseUrl: 'https://ai.chaoslabs.co',
  timeout: 120000,
});

stream(path, options)

Returns an AsyncIterable<string> of raw chunks as they arrive from the server.

stream-method.ts
stream(path: string, options: StreamRequestOptions): AsyncIterable<string>;
 
// Usage
const chunks = client.stream('/v1/chat/stream', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json', Authorization: 'Bearer ck-...' },
  body: JSON.stringify(requestBody),
});
 
for await (const chunk of chunks) {
  process.stdout.write(chunk);
}

streamLines(path, options)

Returns an AsyncIterable<string> of complete NDJSON lines. Internally buffers raw chunks and splits on newline boundaries. Empty lines are skipped.

stream-lines-method.ts
streamLines(path: string, options: StreamRequestOptions): AsyncIterable<string>;
 
// Usage
for await (const line of client.streamLines('/v1/chat/stream', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json', Authorization: 'Bearer ck-...' },
  body: JSON.stringify(requestBody),
})) {
  const message = JSON.parse(line);
  console.log('Message type:', message.type);
}

abort()

Abort the current in-progress request. Causes the active iterator to throw a ChaosError('Request aborted').

abort-method.ts
abort(): void;
 
// Usage: abort after a condition
setTimeout(() => {
  client.abort();
}, 5000);

httpStreamRequest

The lowest-level streaming function. Makes an HTTP request using Node's native http/https modules and returns an AsyncIterable<string> that yields raw chunks.

http-stream-request-types.ts
export interface HttpStreamOptions {
  url: string;
  method: string;
  headers: Record<string, string>;
  body: string;
  timeout: number;
}
 
export function httpStreamRequest(options: HttpStreamOptions): AsyncIterable<string>;
http-stream-request-example.ts
import { httpStreamRequest } from '@chaoslabs/ai-sdk';
 
const stream = httpStreamRequest({
  url: 'https://ai.chaoslabs.co/v1/chat/stream',
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    Authorization: 'Bearer ck-...',
  },
  body: JSON.stringify({
    model: 'WALLET_MODEL',
    query: 'Swap 1 ETH to USDC',
    user_id: 'user-123',
    session_id: 'session-456',
    wallets: [{ address: '0x...', chain: 'ethereum' }],
  }),
  timeout: 120000,
});
 
for await (const chunk of stream) {
  process.stdout.write(chunk);
}

Timeout Behavior

The timeout applies as an inactivity timer, not a total duration limit.

  1. The timer starts when the request is initiated
  2. Each received data chunk resets the timer
  3. If no data arrives within the timeout window, a ChaosTimeoutError is thrown
  4. A continuously streaming response that sends data within each timeout window will never time out

Error Handling During Streaming

All streaming methods surface errors as typed exceptions.

streaming-error-handling.ts
import {
  Chaos,
  ChaosError,
  ChaosTimeoutError,
  WALLET_MODEL,
} from '@chaoslabs/ai-sdk';
 
const chaos = new Chaos({ apiKey: process.env.CHAOS_API_KEY! });
 
try {
  const response = await chaos.chat.responses.create({
    model: WALLET_MODEL,
    input: [{ type: 'message', role: 'user', content: 'Analyze my positions' }],
    metadata: {
      user_id: 'user-123',
      session_id: 'session-456',
      wallets: [{ address: '0x...', chain: 'ethereum' }],
    },
    onStreamEvent: (event) => {
      console.log('Streaming:', event.type);
    },
  });
} catch (error) {
  if (error instanceof ChaosTimeoutError) {
    console.error('Stream timed out — no data received within timeout window');
  } else if (error instanceof ChaosError) {
    if (error.message === 'Request aborted') {
      console.log('Stream was cancelled');
    } else if (error.message === 'Connection closed unexpectedly') {
      console.error('Server closed the connection');
    } else {
      console.error('Stream error:', error.status, error.message);
    }
  }
}

Cancelling Requests

Use cancel() on the responses object to abort an in-flight streaming request.

cancel-request.ts
import { Chaos, WALLET_MODEL, ChaosError } from '@chaoslabs/ai-sdk';
 
const chaos = new Chaos({ apiKey: process.env.CHAOS_API_KEY! });
 
// Start the request
const promise = chaos.chat.responses.create({
  model: WALLET_MODEL,
  input: [{ type: 'message', role: 'user', content: 'Full portfolio analysis' }],
  metadata: {
    user_id: 'user-123',
    session_id: 'session-456',
    wallets: [{ address: '0x...', chain: 'ethereum' }],
  },
  onStreamEvent: (event) => {
    // Cancel after receiving first event
    chaos.chat.responses.cancel();
  },
});
 
try {
  await promise;
} catch (error) {
  if (error instanceof ChaosError && error.message === 'Request aborted') {
    console.log('Successfully cancelled');
  }
}

Architecture

The streaming stack is layered:

LayerComponentReturns
High-levelchaos.chat.responses.create({ onStreamEvent })Promise<ChatCreateResponse> with real-time callbacks
Mid-levelStreamingHttpClient.streamLines()AsyncIterable<string> of complete NDJSON lines
Mid-levelStreamingHttpClient.stream()AsyncIterable<string> of raw chunks
Low-levelhttpStreamRequest()AsyncIterable<string> via Node native HTTP

For most use cases, the onStreamEvent callback is sufficient. Use the lower layers when you need custom streaming behavior or direct control over the HTTP connection.

Was this helpful?