-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Open
Labels
bugSomething isn't workingSomething isn't working
Description
Problem Description
The LLMAdapter
for LangGraph streams all LLM outputs from every node in a multi-node graph, including intermediate classification and routing nodes. Users receive unwanted intermediate outputs mixed with the final response.
Current Behavior ❌
User: "What is machine learning?"
Streamed output: "What is machine learning?{"category":"technical"}Machine learning is a subset of artificial intelligence..."
Expected: Only the final response should stream
Actual: Input echo + intermediate LLM outputs + final response all stream together
Root Cause
LangGraphStream._run()
uses stream_mode="messages"
without filtering, capturing ALL LLM invocations:
async def _run(self) -> None:
async for message_chunk, _ in self._graph.astream(
state, self._config, stream_mode="messages" # ⚠️ No filtering
):
chat_chunk = _to_chat_chunk(message_chunk)
if chat_chunk:
self._event_ch.send_nowait(chat_chunk) # ⚠️ Streams everything
Reproduction Example
from langgraph.graph import StateGraph, END, START
from langchain_openai import ChatOpenAI
from pydantic import BaseModel
class Classification(BaseModel):
category: str
def classifier_node(state):
"""This output should NOT stream to users"""
llm = ChatOpenAI(model="gpt-4o-mini").with_structured_output(Classification)
result = llm.invoke({"query": state["input"]}) # Still streams with stream_mode="messages"
return {"classification": result}
def response_node(state):
"""Only this should stream to users"""
llm = ChatOpenAI(model="gpt-4o-mini")
response = llm.invoke({"query": state["input"]})
return {"output": response.content}
workflow = StateGraph(dict)
workflow.add_node("classify", classifier_node)
workflow.add_node("respond", response_node)
workflow.add_edge(START, "classify")
workflow.add_edge("classify", "respond")
workflow.add_edge("respond", END)
graph = workflow.compile()
# When used with LLMAdapter, BOTH nodes stream to users
Proposed Solution
Add node filtering to LangGraphStream
:
class LangGraphStream(llm.LLMStream):
def __init__(self, ..., streaming_nodes: set[str] | None = None):
self._streaming_nodes = streaming_nodes
async def _run(self) -> None:
async for message_chunk, metadata in self._graph.astream(..., stream_mode="messages"):
# Filter by node name using metadata
if self._streaming_nodes is None or metadata.get("langgraph_node") in self._streaming_nodes:
chat_chunk = _to_chat_chunk(message_chunk)
if chat_chunk:
self._event_ch.send_nowait(chat_chunk)
class LLMAdapter(llm.LLM):
def __init__(self, graph: PregelProtocol, streaming_nodes: set[str] | None = None):
self._streaming_nodes = streaming_nodes
def chat(self, ...) -> LangGraphStream:
return LangGraphStream(..., streaming_nodes=self._streaming_nodes)
Usage:
# Only stream from final response nodes
adapter = LLMAdapter(graph, streaming_nodes={"respond", "final_answer"})
Impact
- User Experience: Confusing intermediate outputs in production chatbots
- Costs: Unnecessary token streaming
- Adoption: Blocks production use of multi-step LangGraph workflows with LiveKit
Metadata Available for Filtering
LangGraph provides these metadata fields for filtering:
metadata["langgraph_node"]
- Node namemetadata["tags"]
- LLM tagsmetadata["langgraph_step"]
- Execution step
Environment
- LangGraph: Latest
- LiveKit Agents: Latest
- Python: 3.11+
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working