Server-Sent Events (SSE) Streaming
DuraGraph provides real-time updates via Server-Sent Events (SSE) for monitoring workflow execution, LLM token streaming, and system events.
Interactive API Reference Try streaming endpoints directly in your browser
Overview
Section titled “Overview”SSE allows clients to receive push notifications from the server over a single HTTP connection. Perfect for:
- Real-time run status updates
- LLM token-by-token streaming
- Live workflow progress monitoring
- Event-driven UIs
Event Types
Section titled “Event Types”Run Events
Section titled “Run Events”| Event | Description |
|---|---|
run.started | Run execution began |
run.completed | Run finished successfully |
run.failed | Run failed with error |
run.requires_action | Waiting for human input or tool outputs |
Node Events
Section titled “Node Events”| Event | Description |
|---|---|
node.started | Node began execution |
node.completed | Node finished execution |
LLM Events
Section titled “LLM Events”| Event | Description |
|---|---|
llm.token | Single token generated (streaming mode) |
llm.completion | LLM completed generation |
Tool Events
Section titled “Tool Events”| Event | Description |
|---|---|
tool.call | Tool was called |
tool.result | Tool execution completed |
System Events
Section titled “System Events”| Event | Description |
|---|---|
checkpoint.saved | Workflow state checkpointed |
heartbeat | Periodic keepalive (every 30s) |
error | Non-fatal error occurred |
Event Flow Example
Section titled “Event Flow Example”Typical event sequence for a successful run:
data: {"type":"run.started","run_id":"..."}data: {"type":"node.started","node_id":"input"}data: {"type":"node.completed","node_id":"input"}data: {"type":"checkpoint.saved","checkpoint_id":"chk_1"}data: {"type":"node.started","node_id":"llm"}data: {"type":"llm.token","token":"Hello"}data: {"type":"llm.token","token":" there"}data: {"type":"llm.token","token":"!"}data: {"type":"llm.completion","content":"Hello there!"}data: {"type":"node.completed","node_id":"llm"}data: {"type":"run.completed","output":{"result":"..."}}Client Examples
Section titled “Client Examples”JavaScript (EventSource)
Section titled “JavaScript (EventSource)”const runId = '323e4567-e89b-12d3-a456-426614174000';const eventSource = new EventSource(`/api/v1/stream?run_id=${runId}`);
eventSource.onmessage = (event) => { const data = JSON.parse(event.data); console.log('Event:', data);};
eventSource.addEventListener('run.completed', (event) => { console.log('Run completed:', JSON.parse(event.data)); eventSource.close();});
eventSource.onerror = (error) => { console.error('SSE error:', error); eventSource.close();};Python
Section titled “Python”import requestsimport json
run_id = "323e4567-e89b-12d3-a456-426614174000"url = f"http://localhost:8081/api/v1/stream?run_id={run_id}"
with requests.get(url, stream=True) as response: for line in response.iter_lines(): if line: decoded = line.decode('utf-8') if decoded.startswith('data: '): data = json.loads(decoded[6:]) print(f"Event: {data['type']}")
if data['type'] == 'run.completed': breakfunc streamEvents(runID string) error { url := fmt.Sprintf("http://localhost:8081/api/v1/stream?run_id=%s", runID)
resp, err := http.Get(url) if err != nil { return err } defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body) for scanner.Scan() { line := scanner.Text() if strings.HasPrefix(line, "data: ") { data := strings.TrimPrefix(line, "data: ") fmt.Println("Event:", data) } } return scanner.Err()}Best Practices
Section titled “Best Practices”Reconnection with Exponential Backoff
Section titled “Reconnection with Exponential Backoff”class SSEClient { constructor(url) { this.url = url; this.retryDelay = 1000; this.maxRetryDelay = 30000; }
connect() { this.eventSource = new EventSource(this.url);
this.eventSource.onerror = () => { this.eventSource.close(); setTimeout(() => { this.retryDelay = Math.min(this.retryDelay * 2, this.maxRetryDelay); this.connect(); }, this.retryDelay); };
this.eventSource.onopen = () => { this.retryDelay = 1000; // Reset on success }; }}Token Buffering for Smooth UI
Section titled “Token Buffering for Smooth UI”let buffer = '';let bufferTimeout;
eventSource.addEventListener('llm.token', (event) => { const data = JSON.parse(event.data); buffer += data.token;
clearTimeout(bufferTimeout); bufferTimeout = setTimeout(() => { updateUI(buffer); buffer = ''; }, 50); // Flush every 50ms});Resource Cleanup
Section titled “Resource Cleanup”// Close on completioneventSource.addEventListener('run.completed', () => { eventSource.close();});
// Close on page unloadwindow.addEventListener('beforeunload', () => { eventSource.close();});Memory Management
Section titled “Memory Management”const MAX_EVENTS = 1000;const events = [];
eventSource.onmessage = (event) => { events.push(JSON.parse(event.data)); if (events.length > MAX_EVENTS) { events.shift(); // Remove oldest }};Real-time Chat UI Example
Section titled “Real-time Chat UI Example”class ChatStream { constructor(runId) { this.runId = runId; this.container = document.getElementById('messages'); this.currentMessage = ''; }
start() { const url = `/api/v1/stream?run_id=${this.runId}`; this.eventSource = new EventSource(url);
this.eventSource.addEventListener('llm.token', (event) => { const data = JSON.parse(event.data); this.currentMessage += data.token; this.container.textContent = this.currentMessage; });
this.eventSource.addEventListener('run.completed', () => { this.container.classList.add('complete'); this.eventSource.close(); }); }}Next Steps
Section titled “Next Steps”- Interactive API Reference - Try streaming endpoints
- REST API Reference - Code examples
- Webhooks - Alternative event delivery