Data Streams SDK (TypeScript)
Data Streams SDKs
Choose the SDK version that matches your needs.
The Data Streams SDK for accessing Chainlink Data Streams with real-time streaming and historical data retrieval.
Requirements
- Node.js >= 20.0.0
- TypeScript >= 5.3.x
- Valid Chainlink Data Streams credentials
Features
- Real-time streaming via WebSocket connections
- High Availability mode with multiple connections and automatic failover
- Historical data access via REST API
- Automatic report decoding for all supported formats (V2, V3, V4, V5, V6, V7, V8, V9, V10)
- Metrics for monitoring and observability
- Type-safe with full TypeScript support
- Event-driven architecture for complete developer control
Installation
npm install @chainlink/data-streams-sdk
Configuration
Configuration Interface
interface Config {
// Required
apiKey: string // API key for authentication
userSecret: string // User secret for authentication
endpoint: string // REST API URL
wsEndpoint: string // WebSocket URL
// Optional - Request & Retry
timeout?: number // Request timeout (default: 30000ms)
retryAttempts?: number // Retry attempts (default: 3)
retryDelay?: number // Retry delay (default: 1000ms)
// Optional - High Availability
haMode?: boolean // Enable HA mode (default: false)
haConnectionTimeout?: number // HA connection timeout (default: 10000ms)
connectionStatusCallback?: (isConnected: boolean, host: string, origin: string) => void
// Optional - Logging
logging?: LoggingConfig // See Logging Configuration section
}
Basic Usage
const client = createClient({
apiKey: process.env.API_KEY,
userSecret: process.env.USER_SECRET,
endpoint: "https://api.dataengine.chain.link",
wsEndpoint: "wss://ws.dataengine.chain.link",
})
High Availability Example
const haClient = createClient({
apiKey: process.env.API_KEY,
userSecret: process.env.USER_SECRET,
endpoint: "https://api.dataengine.chain.link", // Mainnet only
wsEndpoint: "wss://ws.dataengine.chain.link", // Single endpoint with origin discovery
haMode: true,
})
Note: High Availability mode is only available on mainnet, not testnet.
Examples
Quick Commands:
# Real-time streaming
npx ts-node examples/stream-reports.ts 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782
# High Availability streaming
npx ts-node examples/stream-reports.ts 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782 --ha
# Get latest report
npx ts-node examples/get-latest-report.ts 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782
# List all available feeds
npx ts-node examples/list-feeds.ts
Complete examples See the SDK repo examples for detailed usage and setup. Available examples include:
- Streaming: Basic streaming, HA mode, metrics monitoring
- REST API: Latest reports, historical data, bulk operations, feed management
- Configuration: Logging setup, debugging, monitoring integration
API Reference
Streaming
// Create stream
const stream = client.createStream(feedIds, options?);
// Events
stream.on('report', (report) => { ... });
stream.on('error', (error) => { ... });
stream.on('disconnected', () => { ... });
stream.on('reconnecting', (info) => { ... });
// Control
await stream.connect();
await stream.close();
// Metrics
const metrics = stream.getMetrics();
Stream Options
interface StreamOptions {
maxReconnectAttempts?: number // Default: 5
// Base delay (in ms) for exponential backoff.
// Actual delay grows as: base * 2^(attempt-1) with jitter, capped at 10000ms.
// Default: 1000ms; user-provided values are clamped to the safe range [200ms, 10000ms].
reconnectInterval?: number
}
REST API
// Get feeds
const feeds = await client.listFeeds();
// Get latest report
const report = await client.getLatestReport(feedId);
// Get historical report
const report = await client.getReportByTimestamp(feedId, timestamp);
// Get report page
const reports = await client.getReportsPage(feedId, startTime, limit?);
// Get bulk reports
const reports = await client.getReportsBulk(feedIds, timestamp);
Report Format
Quick Decoder Usage
import { decodeReport } from "@chainlink/data-streams-sdk"
const decoded = decodeReport(report.fullReport, report.feedID)
Schema Auto-Detection
The SDK automatically detects and decodes all report versions based on Feed ID patterns:
- V2: Feed IDs starting with
0x0002
- V3: Feed IDs starting with
0x0003
(Crypto Streams) - V4: Feed IDs starting with
0x0004
(Real-World Assets) - V5: Feed IDs starting with
0x0005
- V6: Feed IDs starting with
0x0006
(Multiple Price Values) - V7: Feed IDs starting with
0x0007
- V8: Feed IDs starting with
0x0008
(Non-OTC RWA) - V9: Feed IDs starting with
0x0009
(NAV Fund Data) - V10: Feed IDs starting with
0x000a
(Tokenized Equity)
Common Fields
All reports include standard metadata:
interface BaseFields {
version: "V2" | "V3" | "V4" | "V5" | "V6" | "V7" | "V8" | "V9" | "V10"
nativeFee: bigint
linkFee: bigint
expiresAt: number
feedID: string
validFromTimestamp: number
observationsTimestamp: number
}
Schema-Specific Fields
- V2/V3/V4:
price: bigint
- Standard price data - V3:
bid: bigint, ask: bigint
- Crypto bid/ask spreads - V4:
marketStatus: MarketStatus
- Real-world asset market status - V5:
rate: bigint, timestamp: number, duration: number
- Interest rate data with observation timestamp and duration - V6:
price: bigint, price2: bigint, price3: bigint, price4: bigint, price5: bigint
- Multiple price values in a single payload - V7:
exchangeRate: bigint
- Exchange rate data - V8:
midPrice: bigint, lastUpdateTimestamp: number, marketStatus: MarketStatus
- Non-OTC RWA data - V9:
navPerShare: bigint, navDate: number, aum: bigint, ripcord: number
- NAV fund data - V10:
price: bigint, lastUpdateTimestamp: number, marketStatus: MarketStatus, currentMultiplier: bigint, newMultiplier: bigint, activationDateTime: number, tokenizedPrice: bigint
- Tokenized equity data
For complete field definitions, see the complete list of available reports and their schemas.
High Availability Mode
HA mode establishes multiple simultaneous connections for zero-downtime operation:
- Automatic failover between connections
- Report deduplication across connections
- Automatic origin discovery to find available endpoints
- Per-connection monitoring and statistics
const client = createClient({
// ...config
haMode: true,
wsEndpoint: "wss://ws.dataengine.chain.link", // Single endpoint (mainnet only)
})
How it works: When haMode
is true
, the SDK automatically discovers multiple origin endpoints behind the single URL and establishes separate connections to each origin.
Connection monitoring: The optional connectionStatusCallback
can be used to integrate with external monitoring systems. The SDK already provides comprehensive connection logs, so this callback is primarily useful for custom alerting or metrics collection. See examples/metrics-monitoring.ts
for a complete implementation example.
Important: HA mode is only available on mainnet endpoints.
Error Handling
Error Types Overview
Error Type | When Thrown | Key Properties |
---|---|---|
ValidationError | Invalid feed IDs, timestamps, parameters | message |
AuthenticationError | Invalid credentials, HMAC failures | message |
APIError | HTTP 4xx/5xx, network timeouts, rate limits | statusCode , message |
ReportDecodingError | Corrupted report data, unsupported versions | message |
WebSocketError | Connection failures, protocol errors | message |
OriginDiscoveryError | HA discovery failures | cause , message |
MultiConnectionError | All HA connections failed | message |
PartialConnectionFailureError | Some HA connections failed | failedConnections , totalConnections |
InsufficientConnectionsError | HA degraded performance | availableConnections , requiredConnections |
Usage Examples
import {
ValidationError,
AuthenticationError,
APIError,
ReportDecodingError,
WebSocketError,
OriginDiscoveryError,
MultiConnectionError,
} from "./src"
// REST API error handling
try {
const report = await client.getLatestReport(feedId)
} catch (error) {
if (error instanceof ValidationError) {
// Invalid feed ID or parameters
} else if (error instanceof AuthenticationError) {
// Check API credentials
} else if (error instanceof APIError) {
// Server error - check error.statusCode (429, 500, etc.)
} else if (error instanceof ReportDecodingError) {
// Corrupted or unsupported report format
}
}
// Streaming error handling
stream.on("error", (error) => {
if (error instanceof WebSocketError) {
// Connection issues - retry or fallback
} else if (error instanceof OriginDiscoveryError) {
// HA discovery failed - falls back to static config
} else if (error instanceof MultiConnectionError) {
// All HA connections failed - critical
}
})
Catch-all error handling:
import { DataStreamsError } from "./src"
try {
// Any SDK operation
} catch (error) {
if (error instanceof DataStreamsError) {
// Handles ANY SDK error (base class for all error types above)
console.log("SDK error:", error.message)
} else {
// Non-SDK error (network, system, etc.)
console.log("System error:", error)
}
}
Observability (Logs & Metrics)
The SDK is designed to plug into your existing observability stack.
Logging (Pino/Winston/Console)
Pass your logger to the SDK and choose a verbosity level. For deep WS diagnostics, enable connection debug.
Quick Start
import { createClient, LogLevel } from "@chainlink/data-streams-sdk"
// Silent mode (default) - Zero overhead
const client = createClient({
/* ... config without logging */
})
// Basic console logging
const client = createClient({
// ... other config
logging: {
logger: {
info: console.log,
warn: console.warn,
error: console.error,
},
},
})
Using Pino (structured JSON):
import pino from "pino"
import { createClient, LogLevel } from "@chainlink/data-streams-sdk"
const root = pino({ level: process.env.PINO_LEVEL || "info" })
const sdk = root.child({ component: "sdk" })
const client = createClient({
// ...config
logging: {
logger: {
info: sdk.info.bind(sdk),
warn: sdk.warn.bind(sdk),
error: sdk.error.bind(sdk),
debug: sdk.debug.bind(sdk),
},
logLevel: LogLevel.INFO,
// For very verbose WS diagnostics, set DEBUG + enableConnectionDebug
// logLevel: LogLevel.DEBUG,
// enableConnectionDebug: true,
},
})
Command-line with pretty output:
PINO_LEVEL=info npx ts-node examples/metrics-monitoring.ts | npx pino-pretty
Log Levels
๐ด ERROR
Critical failures only
- Authentication failures
- Network connection errors
- Report decoding failures
- API request failures
- Unexpected crashes
Example Use: Production alerts & monitoring
๐ก WARN
Everything in ERROR +
- Partial reconnections
- Fallback to static origins
- Retry attempts
- Connection timeouts
- Invalid data warnings
Example Use: Production environments
๐ต INFO
Everything in WARN +
- Client initialization
- Successful API calls
- Stream connections
- Report retrievals
- Connection status changes
- Connection mode determination
Example Use: Development & staging
๐ DEBUG
Everything in INFO +
- Feed ID validation
- Report decoding steps
- Auth header generation
- Request/response details
- WebSocket ping/pong
- Origin discovery process
- Configuration validation
- Origin tracking (HA mode)
Example Use: Debugging & development only
Logging Configuration Options
interface LoggingConfig {
/** External logger functions (console, winston, pino, etc.) */
logger?: {
debug?: (message: string, ...args: any[]) => void
info?: (message: string, ...args: any[]) => void
warn?: (message: string, ...args: any[]) => void
error?: (message: string, ...args: any[]) => void
}
/** Minimum logging level - filters out lower priority logs */
logLevel?: LogLevel // DEBUG (0) | INFO (1) | WARN (2) | ERROR (3)
/** Enable WebSocket ping/pong and connection state debugging logs */
enableConnectionDebug?: boolean
}
Compatible with: console, winston, pino, and any logger with debug/info/warn/error
methods. See examples/logging-basic.ts
for complete integration examples.
For debugging: Use LogLevel.DEBUG
for full diagnostics and enableConnectionDebug: true
to see WebSocket ping/pong messages and connection state transitions.
Origin tracking in HA mode shows which specific endpoint received each report.
Metrics (stream.getMetrics()
)
The stream.getMetrics()
API provides a complete snapshot for dashboards and alerts:
const m = stream.getMetrics()
// m.accepted, m.deduplicated, m.totalReceived
// m.partialReconnects, m.fullReconnects
// m.activeConnections, m.configuredConnections
// m.originStatus: { [origin]: ConnectionStatus }
Simple periodic print (example):
setInterval(() => {
const m = stream.getMetrics()
console.log(`accepted=${m.accepted} dedup=${m.deduplicated} active=${m.activeConnections}/${m.configuredConnections}`)
}, 30000)
Refer to examples/metrics-monitoring.ts
for a full metrics dashboard example.
Testing
npm test # All tests
npm run test:unit # Unit tests only
npm run test:integration # Integration tests only
Feed IDs
For available feed IDs, select your desired report from the report schema overview.