Skip to content

Python SDK Reference

The DuraGraph Python SDK provides a decorator-based interface for building AI agents and workflows. Build graphs with simple decorators, run them locally, or deploy to the control plane.

Terminal window
# Core SDK
pip install duragraph
# With LLM providers
pip install duragraph[openai] # OpenAI support
pip install duragraph[anthropic] # Anthropic support
# With DSPy support
pip install duragraph[dspy] # DSPy 2.x integration
# With vector stores
pip install duragraph[chroma] # Chroma vector store
pip install duragraph[pinecone] # Pinecone vector store
pip install duragraph[qdrant] # Qdrant vector store
# All features
pip install duragraph[all]
from duragraph import Graph, llm_node, entrypoint
@Graph(id="customer_support")
class CustomerSupportAgent:
"""A customer support agent that classifies and responds to queries."""
@entrypoint
@llm_node(model="gpt-4o-mini")
def classify(self, state):
"""Classify the customer intent."""
return {"intent": "billing"}
@llm_node(model="gpt-4o-mini")
def respond(self, state):
"""Generate a response based on intent."""
return {"response": f"I'll help you with {state['intent']}."}
# Define flow with edge operator
classify >> respond
# Run locally
agent = CustomerSupportAgent()
result = agent.run({"message": "I have a billing question"})
print(result)
# Or deploy to control plane
agent.serve("http://localhost:8081")

The @Graph decorator turns a Python class into a workflow graph.

from duragraph import Graph
@Graph(
id="my_graph", # Unique graph identifier
state_schema=MyStateClass, # Optional state schema
checkpointer=None, # Optional checkpointer for persistence
)
class MyGraph:
pass

Parameters:

  • id (str, required) - Unique identifier for the graph
  • state_schema (Type, optional) - Pydantic model or TypedDict for state validation
  • checkpointer (Checkpointer, optional) - State persistence implementation

Node decorators define executable steps in your graph.

Marks the starting node of the graph.

@entrypoint
def start(self, state):
return state

Creates a node that calls an LLM.

@llm_node(
model="gpt-4o-mini", # Model name
temperature=0.7, # Optional: sampling temperature
max_tokens=1000, # Optional: max response tokens
system_prompt="You are...", # Optional: system message
)
def generate(self, state):
# Return messages or prompt
return {"messages": [{"role": "user", "content": "Hello"}]}

Supported Providers:

  • OpenAI (GPT-4, GPT-4o, GPT-3.5-turbo)
  • Anthropic (Claude 3.5 Sonnet, Claude 3 Opus, Claude 3 Haiku)

Defines a tool that can be called by LLMs or other nodes.

from duragraph import tool_node
@tool_node(
name="web_search",
description="Search the web for information",
)
def search(self, query: str, max_results: int = 5):
"""Execute a web search."""
results = perform_search(query, limit=max_results)
return {"results": results}

Creates a conditional routing node.

from duragraph import router_node
@router_node
def route_by_intent(self, state):
"""Route based on classified intent."""
intent = state.get("intent")
if intent == "billing":
return "billing_handler"
elif intent == "support":
return "support_handler"
else:
return "general_handler"

Creates a human-in-the-loop node that pauses for human input.

from duragraph import human_node
@human_node(
prompt="Please review and approve the following:",
timeout=3600, # 1 hour timeout
)
def review(self, state):
"""Wait for human approval."""
return state

The >> operator defines transitions between nodes.

@Graph(id="my_graph")
class MyGraph:
@entrypoint
def start(self, state):
return state
def process(self, state):
return state
def end(self, state):
return state
# Sequential flow
start >> process >> end

Conditional edges:

from duragraph import conditional_edge
@Graph(id="conditional_graph")
class ConditionalGraph:
@entrypoint
def start(self, state):
return state
@router_node
def decide(self, state):
return "path_a" if state["condition"] else "path_b"
def path_a(self, state):
return state
def path_b(self, state):
return state
# Conditional routing
start >> decide
decide >> {"path_a": path_a, "path_b": path_b}

Creates a node powered by a DSPy module. Requires pip install duragraph[dspy].

from duragraph import Graph, dspy_node, entrypoint
@Graph(id="dspy_agent")
class DspyAgent:
@entrypoint
@dspy_node(
signature="question -> answer",
module_type="ChainOfThought", # or "Predict", "ReAct"
model="openai/gpt-4o-mini", # per-node LM override (optional)
)
def reason(self, state):
"""Post-process DSPy output."""
return state
agent = DspyAgent()
result = agent.run({"question": "What is event sourcing?"})
print(result["answer"])

Parameters:

ParameterTypeDescription
signaturestrDSPy signature (e.g., "question -> answer")
module_typestr"Predict", "ChainOfThought", or "ReAct" (default: "Predict")
modelstr | NoneLiteLLM model string for per-node LM override
input_mapdict | NoneMap state keys to DSPy input field names
output_mapdict | NoneMap DSPy output field names to state keys
optimized_pathstr | NonePath to pre-optimized module JSON (from DSPy optimizers)
max_retriesintMax retries on DSPy module failures (default: 3)
toolslist | NoneTool functions for ReAct modules

State-to-signature mapping:

By default, state keys matching signature field names are passed as inputs, and DSPy output fields are written directly to state. Use input_map and output_map to rename:

@dspy_node(
signature="context, query -> response",
input_map={"user_message": "query", "documents": "context"},
output_map={"response": "assistant_reply"},
)
def answer(self, state):
return state

Loading optimized modules:

If you’ve optimized a DSPy module with BootstrapFewShot or MIPROv2, load the optimized weights:

@dspy_node(
signature="question -> answer",
module_type="ChainOfThought",
optimized_path="optimized/qa_cot.json",
)
def optimized_qa(self, state):
return state

DuraGraph integrates with DSPy to let you build nodes using DSPy’s declarative programming model. This enables structured outputs, chain-of-thought reasoning, and optimizable prompts within your workflow graphs.

Terminal window
pip install duragraph[dspy]

DuraGraph provides DuraGraphLM for bridging DuraGraph’s LLM provider configuration to DSPy:

from duragraph.dspy import DuraGraphLM, configure_from_provider
import dspy
# Option 1: Configure DSPy globally with a DuraGraph provider string
configure_from_provider("openai/gpt-4o-mini")
# Option 2: Use DuraGraphLM directly
lm = DuraGraphLM(model="openai/gpt-4o-mini", temperature=0.7)
dspy.configure(lm=lm)

DuraGraphLM wraps dspy.LM (which uses LiteLLM under the hood), so any model supported by LiteLLM works.

TypeDescriptionWhen to use
PredictDirect input→output mappingSimple structured extraction
ChainOfThoughtAdds reasoning step before outputComplex reasoning tasks
ReActReason + Act with tool useTasks requiring external tools
from duragraph import Graph, entrypoint, dspy_node
from duragraph.dspy import configure_from_provider
configure_from_provider("openai/gpt-4o")
@Graph(id="dspy_rag")
class DspyRAG:
@entrypoint
def retrieve(self, state):
"""Retrieve relevant documents."""
docs = search_documents(state["question"])
return {"context": "\n".join(docs)}
@dspy_node(
signature="context, question -> answer",
module_type="ChainOfThought",
)
def generate(self, state):
"""DSPy generates the answer, then this runs post-processing."""
return state
retrieve >> generate

Full async/await support for parallel execution.

import asyncio
from duragraph import Graph, llm_node, entrypoint
@Graph(id="async_agent")
class AsyncAgent:
@entrypoint
@llm_node(model="gpt-4o-mini")
async def think(self, state):
"""Async LLM call."""
return state
async def custom_async(self, state):
"""Custom async node."""
result = await some_async_operation()
return {"result": result}
think >> custom_async
# Run async
async def main():
agent = AsyncAgent()
result = await agent.arun({"input": "Hello"})
print(result)
asyncio.run(main())

Deploy your graph as a worker that connects to the control plane.

from duragraph import Graph, Worker
@Graph(id="production_agent")
class ProductionAgent:
# ... define nodes ...
pass
# Create worker
worker = Worker(
graph=ProductionAgent,
control_plane_url="http://localhost:8081",
heartbeat_interval=30, # Send heartbeat every 30 seconds
max_concurrent_tasks=10,
)
# Start worker (blocking)
worker.start()
# Or async
async def run_worker():
await worker.astart()
asyncio.run(run_worker())

Graceful Shutdown:

import signal
def shutdown_handler(signum, frame):
print("Shutting down gracefully...")
worker.stop()
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
worker.start()
from duragraph.llm import OpenAIProvider
provider = OpenAIProvider(
api_key="sk-...", # Or set OPENAI_API_KEY env var
organization="org-...", # Optional
base_url="https://api.openai.com/v1", # Optional custom endpoint
)
@llm_node(
model="gpt-4o",
temperature=0.7,
provider=provider,
)
def generate(self, state):
return state

Supported Models:

  • gpt-4o
  • gpt-4o-mini
  • gpt-4-turbo
  • gpt-4
  • gpt-3.5-turbo
from duragraph.llm import AnthropicProvider
provider = AnthropicProvider(
api_key="sk-ant-...", # Or set ANTHROPIC_API_KEY env var
)
@llm_node(
model="claude-3-5-sonnet-20241022",
temperature=0.7,
provider=provider,
)
def generate(self, state):
return state

Supported Models:

  • claude-3-5-sonnet-20241022
  • claude-3-opus-20240229
  • claude-3-sonnet-20240229
  • claude-3-haiku-20240307
from duragraph.vectorstores import ChromaVectorStore
vector_store = ChromaVectorStore(
collection_name="my_docs",
persist_directory="./chroma_db",
embedding_function=None, # Uses default
)
# Add documents
vector_store.add_documents([
{"id": "1", "text": "Document 1", "metadata": {"source": "file1.txt"}},
{"id": "2", "text": "Document 2", "metadata": {"source": "file2.txt"}},
])
# Search
results = vector_store.similarity_search(
query="search term",
k=5,
filter={"source": "file1.txt"},
)
from duragraph.vectorstores import PineconeVectorStore
vector_store = PineconeVectorStore(
api_key="your-api-key",
environment="us-east-1-aws",
index_name="my-index",
)
# Use same interface as Chroma
from duragraph.vectorstores import QdrantVectorStore
vector_store = QdrantVectorStore(
url="http://localhost:6333",
collection_name="my_collection",
api_key=None, # Optional for cloud
)
from duragraph.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
api_key="sk-...",
)
# Generate embeddings
vectors = embeddings.embed_documents([
"Document 1",
"Document 2",
])
# Single query
query_vector = embeddings.embed_query("search term")
from duragraph.embeddings import CohereEmbeddings
embeddings = CohereEmbeddings(
model="embed-english-v3.0",
api_key="your-key",
)
from duragraph.embeddings import OllamaEmbeddings
embeddings = OllamaEmbeddings(
model="nomic-embed-text",
base_url="http://localhost:11434",
)
from duragraph.document_loaders import TextLoader
loader = TextLoader("path/to/file.txt")
documents = loader.load()
from duragraph.document_loaders import PDFLoader
loader = PDFLoader("path/to/file.pdf")
documents = loader.load()
from duragraph.document_loaders import DirectoryLoader
loader = DirectoryLoader(
path="./docs",
glob="**/*.md",
loader_cls=TextLoader,
)
documents = loader.load()
from duragraph.text_splitters import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", " ", ""],
)
chunks = splitter.split_documents(documents)

Register and execute tools dynamically.

from duragraph import ToolRegistry, tool
# Create registry
registry = ToolRegistry()
# Register tools
@registry.register
@tool(
name="calculator",
description="Perform arithmetic operations",
)
def calculator(operation: str, a: float, b: float) -> float:
"""Execute a calculation."""
if operation == "add":
return a + b
elif operation == "multiply":
return a * b
# ... more operations
# Use in graph
@Graph(id="math_agent")
class MathAgent:
@entrypoint
@llm_node(model="gpt-4o-mini", tools=registry.get_tools())
def solve(self, state):
return state

Local development commands.

Terminal window
# Initialize new project
duragraph init my-agent
# Run graph locally
duragraph dev my_graph.py:MyGraph
# Deploy to control plane
duragraph deploy my_graph.py:MyGraph --url http://localhost:8081
# Visualize graph
duragraph visualize my_graph.py:MyGraph --output graph.png
from typing import TypedDict
from duragraph import Graph
class MyState(TypedDict):
messages: list[dict]
intent: str
result: str
@Graph(id="typed_graph", state_schema=MyState)
class TypedGraph:
@entrypoint
def start(self, state: MyState) -> MyState:
return state
from pydantic import BaseModel, Field
from duragraph import Graph
class MyState(BaseModel):
messages: list[dict] = Field(default_factory=list)
intent: str = ""
result: str = ""
@Graph(id="pydantic_graph", state_schema=MyState)
class PydanticGraph:
@entrypoint
def start(self, state: MyState) -> MyState:
return state
from duragraph.exceptions import (
DuraGraphError,
NodeExecutionError,
WorkerConnectionError,
)
try:
result = agent.run(input_state)
except NodeExecutionError as e:
print(f"Node {e.node_id} failed: {e.message}")
except WorkerConnectionError as e:
print(f"Worker connection failed: {e}")
except DuraGraphError as e:
print(f"Error: {e}")
from duragraph import Graph, llm_node, entrypoint
from duragraph.vectorstores import ChromaVectorStore
from duragraph.embeddings import OpenAIEmbeddings
@Graph(id="rag_agent")
class RAGAgent:
def __init__(self):
self.vector_store = ChromaVectorStore(
collection_name="docs",
embedding_function=OpenAIEmbeddings(),
)
@entrypoint
def retrieve(self, state):
"""Retrieve relevant documents."""
query = state["query"]
results = self.vector_store.similarity_search(query, k=5)
return {"documents": results}
@llm_node(model="gpt-4o")
def generate(self, state):
"""Generate answer from documents."""
context = "\n".join([doc["text"] for doc in state["documents"]])
return {
"messages": [
{"role": "system", "content": f"Context:\n{context}"},
{"role": "user", "content": state["query"]},
]
}
retrieve >> generate
from duragraph import Graph, llm_node, entrypoint
@Graph(id="researcher")
class Researcher:
@entrypoint
@llm_node(model="gpt-4o")
def research(self, state):
return {"findings": "research results"}
@Graph(id="writer")
class Writer:
@entrypoint
@llm_node(model="gpt-4o")
def write(self, state):
return {"article": "written article"}
@Graph(id="multi_agent")
class MultiAgent:
def __init__(self):
self.researcher = Researcher()
self.writer = Writer()
@entrypoint
def orchestrate(self, state):
# Run researcher
research_result = self.researcher.run(state)
# Pass to writer
article = self.writer.run({
"findings": research_result["findings"]
})
return article

For complete API documentation, see:

See Contributing Guide