这是indexloc提供的服务,不要输入任何密码
Skip to content

LangGraph LLMAdapter streams intermediate LLM outputs instead of only final responses #2836

@zkewal

Description

@zkewal

Problem Description

The LLMAdapter for LangGraph streams all LLM outputs from every node in a multi-node graph, including intermediate classification and routing nodes. Users receive unwanted intermediate outputs mixed with the final response.

Current Behavior

User: "What is machine learning?"
Streamed output: "What is machine learning?{"category":"technical"}Machine learning is a subset of artificial intelligence..."

Expected: Only the final response should stream
Actual: Input echo + intermediate LLM outputs + final response all stream together

Root Cause

LangGraphStream._run() uses stream_mode="messages" without filtering, capturing ALL LLM invocations:

async def _run(self) -> None:
    async for message_chunk, _ in self._graph.astream(
        state, self._config, stream_mode="messages"  # ⚠️ No filtering
    ):
        chat_chunk = _to_chat_chunk(message_chunk)
        if chat_chunk:
            self._event_ch.send_nowait(chat_chunk)  # ⚠️ Streams everything

Reproduction Example

from langgraph.graph import StateGraph, END, START
from langchain_openai import ChatOpenAI
from pydantic import BaseModel

class Classification(BaseModel):
    category: str

def classifier_node(state):
    """This output should NOT stream to users"""
    llm = ChatOpenAI(model="gpt-4o-mini").with_structured_output(Classification)
    result = llm.invoke({"query": state["input"]})  # Still streams with stream_mode="messages"
    return {"classification": result}

def response_node(state):
    """Only this should stream to users"""
    llm = ChatOpenAI(model="gpt-4o-mini")
    response = llm.invoke({"query": state["input"]})
    return {"output": response.content}

workflow = StateGraph(dict)
workflow.add_node("classify", classifier_node)
workflow.add_node("respond", response_node)
workflow.add_edge(START, "classify")
workflow.add_edge("classify", "respond")
workflow.add_edge("respond", END)

graph = workflow.compile()
# When used with LLMAdapter, BOTH nodes stream to users

Proposed Solution

Add node filtering to LangGraphStream:

class LangGraphStream(llm.LLMStream):
    def __init__(self, ..., streaming_nodes: set[str] | None = None):
        self._streaming_nodes = streaming_nodes
    
    async def _run(self) -> None:
        async for message_chunk, metadata in self._graph.astream(..., stream_mode="messages"):
            # Filter by node name using metadata
            if self._streaming_nodes is None or metadata.get("langgraph_node") in self._streaming_nodes:
                chat_chunk = _to_chat_chunk(message_chunk)
                if chat_chunk:
                    self._event_ch.send_nowait(chat_chunk)

class LLMAdapter(llm.LLM):
    def __init__(self, graph: PregelProtocol, streaming_nodes: set[str] | None = None):
        self._streaming_nodes = streaming_nodes
    
    def chat(self, ...) -> LangGraphStream:
        return LangGraphStream(..., streaming_nodes=self._streaming_nodes)

Usage:

# Only stream from final response nodes
adapter = LLMAdapter(graph, streaming_nodes={"respond", "final_answer"})

Impact

  • User Experience: Confusing intermediate outputs in production chatbots
  • Costs: Unnecessary token streaming
  • Adoption: Blocks production use of multi-step LangGraph workflows with LiveKit

Metadata Available for Filtering

LangGraph provides these metadata fields for filtering:

  • metadata["langgraph_node"] - Node name
  • metadata["tags"] - LLM tags
  • metadata["langgraph_step"] - Execution step

Environment

  • LangGraph: Latest
  • LiveKit Agents: Latest
  • Python: 3.11+

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions