Skip to content

Server-Sent Events (SSE) Streaming

DuraGraph provides real-time updates via Server-Sent Events (SSE) for monitoring workflow execution, LLM token streaming, and system events.

SSE allows clients to receive push notifications from the server over a single HTTP connection. Perfect for:

  • Real-time run status updates
  • LLM token-by-token streaming
  • Live workflow progress monitoring
  • Event-driven UIs
EventDescription
run.startedRun execution began
run.completedRun finished successfully
run.failedRun failed with error
run.requires_actionWaiting for human input or tool outputs
EventDescription
node.startedNode began execution
node.completedNode finished execution
EventDescription
llm.tokenSingle token generated (streaming mode)
llm.completionLLM completed generation
EventDescription
tool.callTool was called
tool.resultTool execution completed
EventDescription
checkpoint.savedWorkflow state checkpointed
heartbeatPeriodic keepalive (every 30s)
errorNon-fatal error occurred

Typical event sequence for a successful run:

data: {"type":"run.started","run_id":"..."}
data: {"type":"node.started","node_id":"input"}
data: {"type":"node.completed","node_id":"input"}
data: {"type":"checkpoint.saved","checkpoint_id":"chk_1"}
data: {"type":"node.started","node_id":"llm"}
data: {"type":"llm.token","token":"Hello"}
data: {"type":"llm.token","token":" there"}
data: {"type":"llm.token","token":"!"}
data: {"type":"llm.completion","content":"Hello there!"}
data: {"type":"node.completed","node_id":"llm"}
data: {"type":"run.completed","output":{"result":"..."}}
const runId = '323e4567-e89b-12d3-a456-426614174000';
const eventSource = new EventSource(`/api/v1/stream?run_id=${runId}`);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Event:', data);
};
eventSource.addEventListener('run.completed', (event) => {
console.log('Run completed:', JSON.parse(event.data));
eventSource.close();
});
eventSource.onerror = (error) => {
console.error('SSE error:', error);
eventSource.close();
};
import requests
import json
run_id = "323e4567-e89b-12d3-a456-426614174000"
url = f"http://localhost:8081/api/v1/stream?run_id={run_id}"
with requests.get(url, stream=True) as response:
for line in response.iter_lines():
if line:
decoded = line.decode('utf-8')
if decoded.startswith('data: '):
data = json.loads(decoded[6:])
print(f"Event: {data['type']}")
if data['type'] == 'run.completed':
break
func streamEvents(runID string) error {
url := fmt.Sprintf("http://localhost:8081/api/v1/stream?run_id=%s", runID)
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "data: ") {
data := strings.TrimPrefix(line, "data: ")
fmt.Println("Event:", data)
}
}
return scanner.Err()
}
class SSEClient {
constructor(url) {
this.url = url;
this.retryDelay = 1000;
this.maxRetryDelay = 30000;
}
connect() {
this.eventSource = new EventSource(this.url);
this.eventSource.onerror = () => {
this.eventSource.close();
setTimeout(() => {
this.retryDelay = Math.min(this.retryDelay * 2, this.maxRetryDelay);
this.connect();
}, this.retryDelay);
};
this.eventSource.onopen = () => {
this.retryDelay = 1000; // Reset on success
};
}
}
let buffer = '';
let bufferTimeout;
eventSource.addEventListener('llm.token', (event) => {
const data = JSON.parse(event.data);
buffer += data.token;
clearTimeout(bufferTimeout);
bufferTimeout = setTimeout(() => {
updateUI(buffer);
buffer = '';
}, 50); // Flush every 50ms
});
// Close on completion
eventSource.addEventListener('run.completed', () => {
eventSource.close();
});
// Close on page unload
window.addEventListener('beforeunload', () => {
eventSource.close();
});
const MAX_EVENTS = 1000;
const events = [];
eventSource.onmessage = (event) => {
events.push(JSON.parse(event.data));
if (events.length > MAX_EVENTS) {
events.shift(); // Remove oldest
}
};
class ChatStream {
constructor(runId) {
this.runId = runId;
this.container = document.getElementById('messages');
this.currentMessage = '';
}
start() {
const url = `/api/v1/stream?run_id=${this.runId}`;
this.eventSource = new EventSource(url);
this.eventSource.addEventListener('llm.token', (event) => {
const data = JSON.parse(event.data);
this.currentMessage += data.token;
this.container.textContent = this.currentMessage;
});
this.eventSource.addEventListener('run.completed', () => {
this.container.classList.add('complete');
this.eventSource.close();
});
}
}