langchain-milvus¶
Reference docs
This page contains reference documentation for Milvus. See the docs for conceptual guides, tutorials, and examples on using Milvus modules.
langchain_milvus
¶
BaseMilvusBuiltInFunction
¶
Bases: ABC
Base class for Milvus built-in functions.
See: https://milvus.io/docs/manage-collections.md#Function
BM25BuiltInFunction
¶
Bases: BaseMilvusBuiltInFunction
Milvus BM25 built-in function.
See: https://milvus.io/docs/full-text-search.md
| METHOD | DESCRIPTION |
|---|---|
__init__ |
Args: |
__init__
¶
__init__(
*,
input_field_names: str = TEXT_FIELD,
output_field_names: str = SPARSE_VECTOR_FIELD,
analyzer_params: dict[Any, Any] | None = None,
enable_match: bool = False,
function_name: str | None = None,
)
| PARAMETER | DESCRIPTION |
|---|---|
input_field_names
|
The name of the input field, default is 'text'.
TYPE:
|
output_field_names
|
The name of the output field, default is 'sparse'.
TYPE:
|
analyzer_params
|
The parameters for the analyzer. Default is None. See: https://milvus.io/docs/analyzer-overview.md#Analyzer-Overview |
enable_match
|
Whether to enable match.
TYPE:
|
function_name
|
The name of the function. Default is None, which means a random name will be generated.
TYPE:
|
MilvusCollectionHybridSearchRetriever
¶
Bases: BaseRetriever
Hybrid search retriever that uses Milvus Collection to retrieve documents based on multiple fields.
For more information, please refer to: https://milvus.io/docs/release_notes.md#Multi-Embedding---Hybrid-Search
| METHOD | DESCRIPTION |
|---|---|
get_name |
Get the name of the |
get_input_schema |
Get a Pydantic model that can be used to validate input to the |
get_input_jsonschema |
Get a JSON schema that represents the input to the |
get_output_schema |
Get a Pydantic model that can be used to validate output to the |
get_output_jsonschema |
Get a JSON schema that represents the output of the |
config_schema |
The type of config this |
get_config_jsonschema |
Get a JSON schema that represents the config of the |
get_graph |
Return a graph representation of this |
get_prompts |
Return a list of prompts used by this |
__or__ |
Runnable "or" operator. |
__ror__ |
Runnable "reverse-or" operator. |
pipe |
Pipe |
pick |
Pick keys from the output |
assign |
Assigns new fields to the |
invoke |
Invoke the retriever to get relevant documents. |
ainvoke |
Asynchronously invoke the retriever to get relevant documents. |
batch |
Default implementation runs invoke in parallel using a thread pool executor. |
batch_as_completed |
Run |
abatch |
Default implementation runs |
abatch_as_completed |
Run |
stream |
Default implementation of |
astream |
Default implementation of |
astream_log |
Stream all output from a |
astream_events |
Generate a stream of events. |
transform |
Transform inputs to outputs. |
atransform |
Transform inputs to outputs. |
bind |
Bind arguments to a |
with_config |
Bind config to a |
with_listeners |
Bind lifecycle listeners to a |
with_alisteners |
Bind async lifecycle listeners to a |
with_types |
Bind input and output types to a |
with_retry |
Create a new |
map |
Return a new |
with_fallbacks |
Add fallbacks to a |
as_tool |
Create a |
is_lc_serializable |
Is this class serializable? |
get_lc_namespace |
Get the namespace of the LangChain object. |
lc_id |
Return a unique identifier for this class for serialization purposes. |
to_json |
Serialize the |
to_json_not_implemented |
Serialize a "not implemented" object. |
configurable_fields |
Configure particular |
configurable_alternatives |
Configure alternatives for |
__init__ |
|
name
class-attribute
instance-attribute
¶
name: str | None = None
The name of the Runnable. Used for debugging and tracing.
InputType
property
¶
InputType: type[Input]
Input type.
The type of input this Runnable accepts specified as a type annotation.
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
If the input type cannot be inferred. |
OutputType
property
¶
OutputType: type[Output]
Output Type.
The type of output this Runnable produces specified as a type annotation.
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
If the output type cannot be inferred. |
input_schema
property
¶
The type of input this Runnable accepts specified as a Pydantic model.
output_schema
property
¶
Output schema.
The type of output this Runnable produces specified as a Pydantic model.
config_specs
property
¶
config_specs: list[ConfigurableFieldSpec]
List configurable fields for this Runnable.
lc_secrets
property
¶
A map of constructor argument names to secret ids.
For example, {"openai_api_key": "OPENAI_API_KEY"}
lc_attributes
property
¶
lc_attributes: dict
List of attribute names that should be included in the serialized kwargs.
These attributes must be accepted by the constructor.
Default is an empty dictionary.
tags
class-attribute
instance-attribute
¶
Optional list of tags associated with the retriever.
These tags will be associated with each call to this retriever,
and passed as arguments to the handlers defined in callbacks.
You can use these to eg identify a specific instance of a retriever with its use case.
metadata
class-attribute
instance-attribute
¶
Optional metadata associated with the retriever.
This metadata will be associated with each call to this retriever,
and passed as arguments to the handlers defined in callbacks.
You can use these to eg identify a specific instance of a retriever with its use case.
rerank
instance-attribute
¶
Milvus ranker object. Such as WeightedRanker or RRFRanker.
anns_fields
instance-attribute
¶
The names of vector fields that are used for ANNS search.
field_embeddings
instance-attribute
¶
field_embeddings: list[Embeddings | BaseSparseEmbedding]
The embedding functions of each vector fields, which can be either Embeddings or BaseSparseEmbedding.
field_search_params
class-attribute
instance-attribute
¶
The search parameters of each vector fields. If not specified, the default search parameters will be used.
field_limits
class-attribute
instance-attribute
¶
Limit number of results for each ANNS field. If not specified, the default top_k will be used.
field_exprs
class-attribute
instance-attribute
¶
The boolean expression for filtering the search results.
top_k
class-attribute
instance-attribute
¶
top_k: int = 4
Final top-K number of documents to retrieve.
text_field
class-attribute
instance-attribute
¶
text_field: str = 'text'
The text field name,
which will be used as the page_content of a Document object.
output_fields
class-attribute
instance-attribute
¶
Final output fields of the documents.
If not specified, all fields except the vector fields will be used as output fields,
which will be the metadata of a Document object.
get_name
¶
get_input_schema
¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate input to the Runnable.
Runnable objects that leverage the configurable_fields and
configurable_alternatives methods will have a dynamic input schema that
depends on which configuration the Runnable is invoked with.
This method allows to get an input schema for a specific configuration.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
A config to use when generating the schema.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate input. |
get_input_jsonschema
¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the input to the Runnable.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
A config to use when generating the schema.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
A JSON schema that represents the input to the |
Example
Added in langchain-core 0.3.0
get_output_schema
¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate output to the Runnable.
Runnable objects that leverage the configurable_fields and
configurable_alternatives methods will have a dynamic output schema that
depends on which configuration the Runnable is invoked with.
This method allows to get an output schema for a specific configuration.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
A config to use when generating the schema.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate output. |
get_output_jsonschema
¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the output of the Runnable.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
A config to use when generating the schema.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
A JSON schema that represents the output of the |
Example
Added in langchain-core 0.3.0
config_schema
¶
The type of config this Runnable accepts specified as a Pydantic model.
To mark a field as configurable, see the configurable_fields
and configurable_alternatives methods.
| PARAMETER | DESCRIPTION |
|---|---|
include
|
A list of fields to include in the config schema. |
| RETURNS | DESCRIPTION |
|---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate config. |
get_config_jsonschema
¶
get_graph
¶
get_graph(config: RunnableConfig | None = None) -> Graph
Return a graph representation of this Runnable.
get_prompts
¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
Return a list of prompts used by this Runnable.
__or__
¶
__or__(
other: Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
| PARAMETER | DESCRIPTION |
|---|---|
other
|
Another
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RunnableSerializable[Input, Other]
|
A new |
__ror__
¶
__ror__(
other: Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
| PARAMETER | DESCRIPTION |
|---|---|
other
|
Another
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RunnableSerializable[Other, Output]
|
A new |
pipe
¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
Pipe Runnable objects.
Compose this Runnable with Runnable-like objects to make a
RunnableSequence.
Equivalent to RunnableSequence(self, *others) or self | others[0] | ...
Example
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| PARAMETER | DESCRIPTION |
|---|---|
*others
|
Other
TYPE:
|
name
|
An optional name for the resulting
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RunnableSerializable[Input, Other]
|
A new |
pick
¶
Pick keys from the output dict of this Runnable.
Pick a single key:
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
Pick a list of keys:
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| PARAMETER | DESCRIPTION |
|---|---|
keys
|
A key or list of keys to pick from the output dict. |
| RETURNS | DESCRIPTION |
|---|---|
RunnableSerializable[Any, Any]
|
a new |
assign
¶
assign(
**kwargs: Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
Assigns new fields to the dict output of this Runnable.
from langchain_core.language_models.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| PARAMETER | DESCRIPTION |
|---|---|
**kwargs
|
A mapping of keys to
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RunnableSerializable[Any, Any]
|
A new |
invoke
¶
invoke(
input: str, config: RunnableConfig | None = None, **kwargs: Any
) -> list[Document]
Invoke the retriever to get relevant documents.
Main entry point for synchronous retriever invocations.
| PARAMETER | DESCRIPTION |
|---|---|
input
|
The query string.
TYPE:
|
config
|
Configuration for the retriever.
TYPE:
|
**kwargs
|
Additional arguments to pass to the retriever.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of relevant documents. |
Examples:
ainvoke
async
¶
ainvoke(
input: str, config: RunnableConfig | None = None, **kwargs: Any
) -> list[Document]
Asynchronously invoke the retriever to get relevant documents.
Main entry point for asynchronous retriever invocations.
| PARAMETER | DESCRIPTION |
|---|---|
input
|
The query string.
TYPE:
|
config
|
Configuration for the retriever.
TYPE:
|
**kwargs
|
Additional arguments to pass to the retriever.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of relevant documents. |
Examples:
batch
¶
batch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable uses an API which supports a batch mode.
| PARAMETER | DESCRIPTION |
|---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the Please refer to
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Output]
|
A list of outputs from the |
batch_as_completed
¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
Run invoke in parallel on a list of inputs.
Yields results as they complete.
| PARAMETER | DESCRIPTION |
|---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the The config supports standard keys like Please refer to
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
tuple[int, Output | Exception]
|
Tuples of the index of the input and the output from the |
abatch
async
¶
abatch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
Default implementation runs ainvoke in parallel using asyncio.gather.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable uses an API which supports a batch mode.
| PARAMETER | DESCRIPTION |
|---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the The config supports standard keys like Please refer to
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Output]
|
A list of outputs from the |
abatch_as_completed
async
¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
Run ainvoke in parallel on a list of inputs.
Yields results as they complete.
| PARAMETER | DESCRIPTION |
|---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the The config supports standard keys like Please refer to
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
A tuple of the index of the input and the output from the |
stream
¶
stream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
Default implementation of stream, which calls invoke.
Subclasses must override this method if they support streaming output.
| PARAMETER | DESCRIPTION |
|---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
Output
|
The output of the |
astream
async
¶
astream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[Output]
Default implementation of astream, which calls ainvoke.
Subclasses must override this method if they support streaming output.
| PARAMETER | DESCRIPTION |
|---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncIterator[Output]
|
The output of the |
astream_log
async
¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
Stream all output from a Runnable, as reported to the callback system.
This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
| PARAMETER | DESCRIPTION |
|---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
diff
|
Whether to yield diffs between each step or the current state.
TYPE:
|
with_streamed_output_list
|
Whether to yield the
TYPE:
|
include_names
|
Only include logs with these names. |
include_types
|
Only include logs with these types. |
include_tags
|
Only include logs with these tags. |
exclude_names
|
Exclude logs with these names. |
exclude_types
|
Exclude logs with these types. |
exclude_tags
|
Exclude logs with these tags. |
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
A |
astream_events
async
¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[StreamEvent]
Generate a stream of events.
Use to create an iterator over StreamEvent that provide real-time information
about the progress of the Runnable, including StreamEvent from intermediate
results.
A StreamEvent is a dictionary with the following schema:
event: Event names are of the format:on_[runnable_type]_(start|stream|end).name: The name of theRunnablethat generated the event.run_id: Randomly generated ID associated with the given execution of theRunnablethat emitted the event. A childRunnablethat gets invoked as part of the execution of a parentRunnableis assigned its own unique ID.parent_ids: The IDs of the parent runnables that generated the event. The rootRunnablewill have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.tags: The tags of theRunnablethat generated the event.metadata: The metadata of theRunnablethat generated the event.data: The data associated with the event. The contents of this field depend on the type of event. See the table below for more details.
Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
Note
This reference table is for the v2 version of the schema.
| event | name | chunk | input | output |
|---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'Hello' |
||
on_llm_end |
'[model name]' |
'Hello human!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format:
| Attribute | Type | Description |
|---|---|---|
name |
str |
A user defined name for the event. |
data |
Any |
The data associated with the event. This can be anything, though we suggest making it JSON serializable. |
Here are declarations associated with the standard events shown above:
format_docs:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
For instance:
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
| PARAMETER | DESCRIPTION |
|---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
version
|
The version of the schema to use either
TYPE:
|
include_names
|
Only include events from |
include_types
|
Only include events from |
include_tags
|
Only include events from |
exclude_names
|
Exclude events from |
exclude_types
|
Exclude events from |
exclude_tags
|
Exclude events from |
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncIterator[StreamEvent]
|
An async stream of |
| RAISES | DESCRIPTION |
|---|---|
NotImplementedError
|
If the version is not |
transform
¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
Transform inputs to outputs.
Default implementation of transform, which buffers input and calls astream.
Subclasses must override this method if they can start producing output while input is still being generated.
| PARAMETER | DESCRIPTION |
|---|---|
input
|
An iterator of inputs to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
Output
|
The output of the |
atransform
async
¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> AsyncIterator[Output]
Transform inputs to outputs.
Default implementation of atransform, which buffers input and calls astream.
Subclasses must override this method if they can start producing output while input is still being generated.
| PARAMETER | DESCRIPTION |
|---|---|
input
|
An async iterator of inputs to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncIterator[Output]
|
The output of the |
bind
¶
Bind arguments to a Runnable, returning a new Runnable.
Useful when a Runnable in a chain requires an argument that is not
in the output of the previous Runnable or included in the user input.
| PARAMETER | DESCRIPTION |
|---|---|
**kwargs
|
The arguments to bind to the
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config
¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
Bind config to a Runnable, returning a new Runnable.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
The config to bind to the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Runnable[Input, Output]
|
A new |
with_listeners
¶
with_listeners(
*,
on_start: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
) -> Runnable[Input, Output]
Bind lifecycle listeners to a Runnable, returning a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
| PARAMETER | DESCRIPTION |
|---|---|
on_start
|
Called before the
TYPE:
|
on_end
|
Called after the
TYPE:
|
on_error
|
Called if the
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners
¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
Bind async lifecycle listeners to a Runnable.
Returns a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
| PARAMETER | DESCRIPTION |
|---|---|
on_start
|
Called asynchronously before the
TYPE:
|
on_end
|
Called asynchronously after the
TYPE:
|
on_error
|
Called asynchronously if the
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start, on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
# Result:
# on start callback starts at 2025-03-01T07:05:22.875378+00:00
# on start callback starts at 2025-03-01T07:05:22.875495+00:00
# on start callback ends at 2025-03-01T07:05:25.878862+00:00
# on start callback ends at 2025-03-01T07:05:25.878947+00:00
# Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
# Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
# Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
# on end callback starts at 2025-03-01T07:05:27.882360+00:00
# Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
# on end callback starts at 2025-03-01T07:05:28.882428+00:00
# on end callback ends at 2025-03-01T07:05:29.883893+00:00
# on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types
¶
with_types(
*, input_type: type[Input] | None = None, output_type: type[Output] | None = None
) -> Runnable[Input, Output]
Bind input and output types to a Runnable, returning a new Runnable.
| PARAMETER | DESCRIPTION |
|---|---|
input_type
|
The input type to bind to the
TYPE:
|
output_type
|
The output type to bind to the
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Runnable[Input, Output]
|
A new |
with_retry
¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
Create a new Runnable that retries the original Runnable on exceptions.
| PARAMETER | DESCRIPTION |
|---|---|
retry_if_exception_type
|
A tuple of exception types to retry on.
TYPE:
|
wait_exponential_jitter
|
Whether to add jitter to the wait time between retries.
TYPE:
|
stop_after_attempt
|
The maximum number of attempts to make before giving up.
TYPE:
|
exponential_jitter_params
|
Parameters for
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map
¶
with_fallbacks
¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
Add fallbacks to a Runnable, returning a new Runnable.
The new Runnable will try the original Runnable, and then each fallback
in order, upon failures.
| PARAMETER | DESCRIPTION |
|---|---|
fallbacks
|
A sequence of runnables to try if the original |
exceptions_to_handle
|
A tuple of exception types to handle.
TYPE:
|
exception_key
|
If If If used, the base
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
Example
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
| PARAMETER | DESCRIPTION |
|---|---|
fallbacks
|
A sequence of runnables to try if the original |
exceptions_to_handle
|
A tuple of exception types to handle.
TYPE:
|
exception_key
|
If If If used, the base
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
as_tool
¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None,
) -> BaseTool
Create a BaseTool from a Runnable.
as_tool will instantiate a BaseTool with a name, description, and
args_schema from a Runnable. Where possible, schemas are inferred
from runnable.get_input_schema.
Alternatively (e.g., if the Runnable takes a dict as input and the specific
dict keys are not typed), the schema can be specified directly with
args_schema.
You can also pass arg_types to just specify the required arguments and their
types.
| PARAMETER | DESCRIPTION |
|---|---|
args_schema
|
The schema for the tool. |
name
|
The name of the tool.
TYPE:
|
description
|
The description of the tool.
TYPE:
|
arg_types
|
A dictionary of argument names to types. |
| RETURNS | DESCRIPTION |
|---|---|
BaseTool
|
A |
Typed dict input:
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via args_schema:
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via arg_types:
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
str input:
is_lc_serializable
classmethod
¶
is_lc_serializable() -> bool
Is this class serializable?
By design, even if a class inherits from Serializable, it is not serializable
by default. This is to prevent accidental serialization of objects that should
not be serialized.
| RETURNS | DESCRIPTION |
|---|---|
bool
|
Whether the class is serializable. Default is |
get_lc_namespace
classmethod
¶
lc_id
classmethod
¶
Return a unique identifier for this class for serialization purposes.
The unique identifier is a list of strings that describes the path to the object.
For example, for the class langchain.llms.openai.OpenAI, the id is
["langchain", "llms", "openai", "OpenAI"].
to_json
¶
Serialize the Runnable to JSON.
| RETURNS | DESCRIPTION |
|---|---|
SerializedConstructor | SerializedNotImplemented
|
A JSON-serializable representation of the |
to_json_not_implemented
¶
Serialize a "not implemented" object.
| RETURNS | DESCRIPTION |
|---|---|
SerializedNotImplemented
|
|
configurable_fields
¶
configurable_fields(
**kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
Configure particular Runnable fields at runtime.
| PARAMETER | DESCRIPTION |
|---|---|
**kwargs
|
A dictionary of
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If a configuration key is not found in the |
| RETURNS | DESCRIPTION |
|---|---|
RunnableSerializable[Input, Output]
|
A new |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives
¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]],
) -> RunnableSerializable[Input, Output]
Configure alternatives for Runnable objects that can be set at runtime.
| PARAMETER | DESCRIPTION |
|---|---|
which
|
The
TYPE:
|
default_key
|
The default key to use if no alternative is selected.
TYPE:
|
prefix_keys
|
Whether to prefix the keys with the
TYPE:
|
**kwargs
|
A dictionary of keys to
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RunnableSerializable[Input, Output]
|
A new |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-sonnet-4-5-20250929"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
ZillizCloudPipelineRetriever
¶
Bases: BaseRetriever
Zilliz Cloud Pipeline retriever.
| PARAMETER | DESCRIPTION |
|---|---|
pipeline_ids
|
A dictionary of pipeline ids. Valid keys: "ingestion", "search", "deletion".
|
token
|
Zilliz Cloud's token. Defaults to "".
|
cloud_region
|
The region of Zilliz Cloud's cluster. Defaults to 'gcp-us-west1'.
|
| METHOD | DESCRIPTION |
|---|---|
get_name |
Get the name of the |
get_input_schema |
Get a Pydantic model that can be used to validate input to the |
get_input_jsonschema |
Get a JSON schema that represents the input to the |
get_output_schema |
Get a Pydantic model that can be used to validate output to the |
get_output_jsonschema |
Get a JSON schema that represents the output of the |
config_schema |
The type of config this |
get_config_jsonschema |
Get a JSON schema that represents the config of the |
get_graph |
Return a graph representation of this |
get_prompts |
Return a list of prompts used by this |
__or__ |
Runnable "or" operator. |
__ror__ |
Runnable "reverse-or" operator. |
pipe |
Pipe |
pick |
Pick keys from the output |
assign |
Assigns new fields to the |
invoke |
Invoke the retriever to get relevant documents. |
ainvoke |
Asynchronously invoke the retriever to get relevant documents. |
batch |
Default implementation runs invoke in parallel using a thread pool executor. |
batch_as_completed |
Run |
abatch |
Default implementation runs |
abatch_as_completed |
Run |
stream |
Default implementation of |
astream |
Default implementation of |
astream_log |
Stream all output from a |
astream_events |
Generate a stream of events. |
transform |
Transform inputs to outputs. |
atransform |
Transform inputs to outputs. |
bind |
Bind arguments to a |
with_config |
Bind config to a |
with_listeners |
Bind lifecycle listeners to a |
with_alisteners |
Bind async lifecycle listeners to a |
with_types |
Bind input and output types to a |
with_retry |
Create a new |
map |
Return a new |
with_fallbacks |
Add fallbacks to a |
as_tool |
Create a |
__init__ |
|
is_lc_serializable |
Is this class serializable? |
get_lc_namespace |
Get the namespace of the LangChain object. |
lc_id |
Return a unique identifier for this class for serialization purposes. |
to_json |
Serialize the |
to_json_not_implemented |
Serialize a "not implemented" object. |
configurable_fields |
Configure particular |
configurable_alternatives |
Configure alternatives for |
add_texts |
Add documents to store. |
add_doc_url |
Add a document from url. |
delete |
Delete documents. Only supported by a deletion pipeline in Zilliz Cloud. |
name
class-attribute
instance-attribute
¶
name: str | None = None
The name of the Runnable. Used for debugging and tracing.
InputType
property
¶
InputType: type[Input]
Input type.
The type of input this Runnable accepts specified as a type annotation.
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
If the input type cannot be inferred. |
OutputType
property
¶
OutputType: type[Output]
Output Type.
The type of output this Runnable produces specified as a type annotation.
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
If the output type cannot be inferred. |
input_schema
property
¶
The type of input this Runnable accepts specified as a Pydantic model.
output_schema
property
¶
Output schema.
The type of output this Runnable produces specified as a Pydantic model.
config_specs
property
¶
config_specs: list[ConfigurableFieldSpec]
List configurable fields for this Runnable.
lc_secrets
property
¶
A map of constructor argument names to secret ids.
For example, {"openai_api_key": "OPENAI_API_KEY"}
lc_attributes
property
¶
lc_attributes: dict
List of attribute names that should be included in the serialized kwargs.
These attributes must be accepted by the constructor.
Default is an empty dictionary.
tags
class-attribute
instance-attribute
¶
Optional list of tags associated with the retriever.
These tags will be associated with each call to this retriever,
and passed as arguments to the handlers defined in callbacks.
You can use these to eg identify a specific instance of a retriever with its use case.
metadata
class-attribute
instance-attribute
¶
Optional metadata associated with the retriever.
This metadata will be associated with each call to this retriever,
and passed as arguments to the handlers defined in callbacks.
You can use these to eg identify a specific instance of a retriever with its use case.
get_name
¶
get_input_schema
¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate input to the Runnable.
Runnable objects that leverage the configurable_fields and
configurable_alternatives methods will have a dynamic input schema that
depends on which configuration the Runnable is invoked with.
This method allows to get an input schema for a specific configuration.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
A config to use when generating the schema.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate input. |
get_input_jsonschema
¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the input to the Runnable.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
A config to use when generating the schema.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
A JSON schema that represents the input to the |
Example
Added in langchain-core 0.3.0
get_output_schema
¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate output to the Runnable.
Runnable objects that leverage the configurable_fields and
configurable_alternatives methods will have a dynamic output schema that
depends on which configuration the Runnable is invoked with.
This method allows to get an output schema for a specific configuration.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
A config to use when generating the schema.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate output. |
get_output_jsonschema
¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the output of the Runnable.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
A config to use when generating the schema.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
A JSON schema that represents the output of the |
Example
Added in langchain-core 0.3.0
config_schema
¶
The type of config this Runnable accepts specified as a Pydantic model.
To mark a field as configurable, see the configurable_fields
and configurable_alternatives methods.
| PARAMETER | DESCRIPTION |
|---|---|
include
|
A list of fields to include in the config schema. |
| RETURNS | DESCRIPTION |
|---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate config. |
get_config_jsonschema
¶
get_graph
¶
get_graph(config: RunnableConfig | None = None) -> Graph
Return a graph representation of this Runnable.
get_prompts
¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
Return a list of prompts used by this Runnable.
__or__
¶
__or__(
other: Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
| PARAMETER | DESCRIPTION |
|---|---|
other
|
Another
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RunnableSerializable[Input, Other]
|
A new |
__ror__
¶
__ror__(
other: Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
| PARAMETER | DESCRIPTION |
|---|---|
other
|
Another
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RunnableSerializable[Other, Output]
|
A new |
pipe
¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
Pipe Runnable objects.
Compose this Runnable with Runnable-like objects to make a
RunnableSequence.
Equivalent to RunnableSequence(self, *others) or self | others[0] | ...
Example
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| PARAMETER | DESCRIPTION |
|---|---|
*others
|
Other
TYPE:
|
name
|
An optional name for the resulting
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RunnableSerializable[Input, Other]
|
A new |
pick
¶
Pick keys from the output dict of this Runnable.
Pick a single key:
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
Pick a list of keys:
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| PARAMETER | DESCRIPTION |
|---|---|
keys
|
A key or list of keys to pick from the output dict. |
| RETURNS | DESCRIPTION |
|---|---|
RunnableSerializable[Any, Any]
|
a new |
assign
¶
assign(
**kwargs: Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
Assigns new fields to the dict output of this Runnable.
from langchain_core.language_models.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| PARAMETER | DESCRIPTION |
|---|---|
**kwargs
|
A mapping of keys to
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RunnableSerializable[Any, Any]
|
A new |
invoke
¶
invoke(
input: str, config: RunnableConfig | None = None, **kwargs: Any
) -> list[Document]
Invoke the retriever to get relevant documents.
Main entry point for synchronous retriever invocations.
| PARAMETER | DESCRIPTION |
|---|---|
input
|
The query string.
TYPE:
|
config
|
Configuration for the retriever.
TYPE:
|
**kwargs
|
Additional arguments to pass to the retriever.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of relevant documents. |
Examples:
ainvoke
async
¶
ainvoke(
input: str, config: RunnableConfig | None = None, **kwargs: Any
) -> list[Document]
Asynchronously invoke the retriever to get relevant documents.
Main entry point for asynchronous retriever invocations.
| PARAMETER | DESCRIPTION |
|---|---|
input
|
The query string.
TYPE:
|
config
|
Configuration for the retriever.
TYPE:
|
**kwargs
|
Additional arguments to pass to the retriever.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of relevant documents. |
Examples:
batch
¶
batch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable uses an API which supports a batch mode.
| PARAMETER | DESCRIPTION |
|---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the Please refer to
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Output]
|
A list of outputs from the |
batch_as_completed
¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
Run invoke in parallel on a list of inputs.
Yields results as they complete.
| PARAMETER | DESCRIPTION |
|---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the The config supports standard keys like Please refer to
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
tuple[int, Output | Exception]
|
Tuples of the index of the input and the output from the |
abatch
async
¶
abatch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
Default implementation runs ainvoke in parallel using asyncio.gather.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable uses an API which supports a batch mode.
| PARAMETER | DESCRIPTION |
|---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the The config supports standard keys like Please refer to
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Output]
|
A list of outputs from the |
abatch_as_completed
async
¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
Run ainvoke in parallel on a list of inputs.
Yields results as they complete.
| PARAMETER | DESCRIPTION |
|---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the The config supports standard keys like Please refer to
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
A tuple of the index of the input and the output from the |
stream
¶
stream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
Default implementation of stream, which calls invoke.
Subclasses must override this method if they support streaming output.
| PARAMETER | DESCRIPTION |
|---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
Output
|
The output of the |
astream
async
¶
astream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[Output]
Default implementation of astream, which calls ainvoke.
Subclasses must override this method if they support streaming output.
| PARAMETER | DESCRIPTION |
|---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncIterator[Output]
|
The output of the |
astream_log
async
¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
Stream all output from a Runnable, as reported to the callback system.
This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
| PARAMETER | DESCRIPTION |
|---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
diff
|
Whether to yield diffs between each step or the current state.
TYPE:
|
with_streamed_output_list
|
Whether to yield the
TYPE:
|
include_names
|
Only include logs with these names. |
include_types
|
Only include logs with these types. |
include_tags
|
Only include logs with these tags. |
exclude_names
|
Exclude logs with these names. |
exclude_types
|
Exclude logs with these types. |
exclude_tags
|
Exclude logs with these tags. |
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
A |
astream_events
async
¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[StreamEvent]
Generate a stream of events.
Use to create an iterator over StreamEvent that provide real-time information
about the progress of the Runnable, including StreamEvent from intermediate
results.
A StreamEvent is a dictionary with the following schema:
event: Event names are of the format:on_[runnable_type]_(start|stream|end).name: The name of theRunnablethat generated the event.run_id: Randomly generated ID associated with the given execution of theRunnablethat emitted the event. A childRunnablethat gets invoked as part of the execution of a parentRunnableis assigned its own unique ID.parent_ids: The IDs of the parent runnables that generated the event. The rootRunnablewill have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.tags: The tags of theRunnablethat generated the event.metadata: The metadata of theRunnablethat generated the event.data: The data associated with the event. The contents of this field depend on the type of event. See the table below for more details.
Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
Note
This reference table is for the v2 version of the schema.
| event | name | chunk | input | output |
|---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'Hello' |
||
on_llm_end |
'[model name]' |
'Hello human!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format:
| Attribute | Type | Description |
|---|---|---|
name |
str |
A user defined name for the event. |
data |
Any |
The data associated with the event. This can be anything, though we suggest making it JSON serializable. |
Here are declarations associated with the standard events shown above:
format_docs:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
For instance:
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
| PARAMETER | DESCRIPTION |
|---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
version
|
The version of the schema to use either
TYPE:
|
include_names
|
Only include events from |
include_types
|
Only include events from |
include_tags
|
Only include events from |
exclude_names
|
Exclude events from |
exclude_types
|
Exclude events from |
exclude_tags
|
Exclude events from |
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncIterator[StreamEvent]
|
An async stream of |
| RAISES | DESCRIPTION |
|---|---|
NotImplementedError
|
If the version is not |
transform
¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
Transform inputs to outputs.
Default implementation of transform, which buffers input and calls astream.
Subclasses must override this method if they can start producing output while input is still being generated.
| PARAMETER | DESCRIPTION |
|---|---|
input
|
An iterator of inputs to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
Output
|
The output of the |
atransform
async
¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> AsyncIterator[Output]
Transform inputs to outputs.
Default implementation of atransform, which buffers input and calls astream.
Subclasses must override this method if they can start producing output while input is still being generated.
| PARAMETER | DESCRIPTION |
|---|---|
input
|
An async iterator of inputs to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncIterator[Output]
|
The output of the |
bind
¶
Bind arguments to a Runnable, returning a new Runnable.
Useful when a Runnable in a chain requires an argument that is not
in the output of the previous Runnable or included in the user input.
| PARAMETER | DESCRIPTION |
|---|---|
**kwargs
|
The arguments to bind to the
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config
¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
Bind config to a Runnable, returning a new Runnable.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
The config to bind to the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Runnable[Input, Output]
|
A new |
with_listeners
¶
with_listeners(
*,
on_start: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
) -> Runnable[Input, Output]
Bind lifecycle listeners to a Runnable, returning a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
| PARAMETER | DESCRIPTION |
|---|---|
on_start
|
Called before the
TYPE:
|
on_end
|
Called after the
TYPE:
|
on_error
|
Called if the
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners
¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
Bind async lifecycle listeners to a Runnable.
Returns a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
| PARAMETER | DESCRIPTION |
|---|---|
on_start
|
Called asynchronously before the
TYPE:
|
on_end
|
Called asynchronously after the
TYPE:
|
on_error
|
Called asynchronously if the
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start, on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
# Result:
# on start callback starts at 2025-03-01T07:05:22.875378+00:00
# on start callback starts at 2025-03-01T07:05:22.875495+00:00
# on start callback ends at 2025-03-01T07:05:25.878862+00:00
# on start callback ends at 2025-03-01T07:05:25.878947+00:00
# Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
# Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
# Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
# on end callback starts at 2025-03-01T07:05:27.882360+00:00
# Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
# on end callback starts at 2025-03-01T07:05:28.882428+00:00
# on end callback ends at 2025-03-01T07:05:29.883893+00:00
# on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types
¶
with_types(
*, input_type: type[Input] | None = None, output_type: type[Output] | None = None
) -> Runnable[Input, Output]
Bind input and output types to a Runnable, returning a new Runnable.
| PARAMETER | DESCRIPTION |
|---|---|
input_type
|
The input type to bind to the
TYPE:
|
output_type
|
The output type to bind to the
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Runnable[Input, Output]
|
A new |
with_retry
¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
Create a new Runnable that retries the original Runnable on exceptions.
| PARAMETER | DESCRIPTION |
|---|---|
retry_if_exception_type
|
A tuple of exception types to retry on.
TYPE:
|
wait_exponential_jitter
|
Whether to add jitter to the wait time between retries.
TYPE:
|
stop_after_attempt
|
The maximum number of attempts to make before giving up.
TYPE:
|
exponential_jitter_params
|
Parameters for
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map
¶
with_fallbacks
¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
Add fallbacks to a Runnable, returning a new Runnable.
The new Runnable will try the original Runnable, and then each fallback
in order, upon failures.
| PARAMETER | DESCRIPTION |
|---|---|
fallbacks
|
A sequence of runnables to try if the original |
exceptions_to_handle
|
A tuple of exception types to handle.
TYPE:
|
exception_key
|
If If If used, the base
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
Example
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
| PARAMETER | DESCRIPTION |
|---|---|
fallbacks
|
A sequence of runnables to try if the original |
exceptions_to_handle
|
A tuple of exception types to handle.
TYPE:
|
exception_key
|
If If If used, the base
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
as_tool
¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None,
) -> BaseTool
Create a BaseTool from a Runnable.
as_tool will instantiate a BaseTool with a name, description, and
args_schema from a Runnable. Where possible, schemas are inferred
from runnable.get_input_schema.
Alternatively (e.g., if the Runnable takes a dict as input and the specific
dict keys are not typed), the schema can be specified directly with
args_schema.
You can also pass arg_types to just specify the required arguments and their
types.
| PARAMETER | DESCRIPTION |
|---|---|
args_schema
|
The schema for the tool. |
name
|
The name of the tool.
TYPE:
|
description
|
The description of the tool.
TYPE:
|
arg_types
|
A dictionary of argument names to types. |
| RETURNS | DESCRIPTION |
|---|---|
BaseTool
|
A |
Typed dict input:
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via args_schema:
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via arg_types:
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
str input:
is_lc_serializable
classmethod
¶
is_lc_serializable() -> bool
Is this class serializable?
By design, even if a class inherits from Serializable, it is not serializable
by default. This is to prevent accidental serialization of objects that should
not be serialized.
| RETURNS | DESCRIPTION |
|---|---|
bool
|
Whether the class is serializable. Default is |
get_lc_namespace
classmethod
¶
lc_id
classmethod
¶
Return a unique identifier for this class for serialization purposes.
The unique identifier is a list of strings that describes the path to the object.
For example, for the class langchain.llms.openai.OpenAI, the id is
["langchain", "llms", "openai", "OpenAI"].
to_json
¶
Serialize the Runnable to JSON.
| RETURNS | DESCRIPTION |
|---|---|
SerializedConstructor | SerializedNotImplemented
|
A JSON-serializable representation of the |
to_json_not_implemented
¶
Serialize a "not implemented" object.
| RETURNS | DESCRIPTION |
|---|---|
SerializedNotImplemented
|
|
configurable_fields
¶
configurable_fields(
**kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
Configure particular Runnable fields at runtime.
| PARAMETER | DESCRIPTION |
|---|---|
**kwargs
|
A dictionary of
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If a configuration key is not found in the |
| RETURNS | DESCRIPTION |
|---|---|
RunnableSerializable[Input, Output]
|
A new |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives
¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]],
) -> RunnableSerializable[Input, Output]
Configure alternatives for Runnable objects that can be set at runtime.
| PARAMETER | DESCRIPTION |
|---|---|
which
|
The
TYPE:
|
default_key
|
The default key to use if no alternative is selected.
TYPE:
|
prefix_keys
|
Whether to prefix the keys with the
TYPE:
|
**kwargs
|
A dictionary of keys to
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RunnableSerializable[Input, Output]
|
A new |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-sonnet-4-5-20250929"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
add_texts
¶
Add documents to store. Only supported by a text ingestion pipeline in Zilliz Cloud.
| PARAMETER | DESCRIPTION |
|---|---|
texts
|
A list of text strings. |
metadata
|
A key-value dictionary of metadata will be inserted as preserved fields required by ingestion pipeline. Defaults to None. |
Milvus
¶
Bases: VectorStore
Milvus vector store integration.
Key init args — indexing params: collection_name: str Name of the collection. collection_description: str Description of the collection. embedding_function: Union[Embeddings, BaseSparseEmbedding] Embedding function to use.
Key init args — client params: connection_args: Optional[dict] Connection arguments.
Instantiate
Add Documents
from langchain_core.documents import Document
document_1 = Document(page_content="foo", metadata={"baz": "bar"})
document_2 = Document(page_content="thud", metadata={"baz": "baz"})
document_3 = Document(page_content="i will be deleted :(", metadata={"baz": "qux"})
documents = [document_1, document_2, document_3]
ids = ["1", "2", "3"]
vector_store.add_documents(documents=documents, ids=ids)
Search
Search with score
Async
# add documents
# await vector_store.aadd_documents(documents=documents, ids=ids)
# delete documents
# await vector_store.adelete(ids=["3"])
# search
# results = vector_store.asimilarity_search(query="thud",k=1)
# search with score
results = await vector_store.asimilarity_search_with_score(query="qux",k=1)
for doc,score in results:
print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]")
Use as Retriever
| METHOD | DESCRIPTION |
|---|---|
get_by_ids |
Get documents by their IDs. |
aget_by_ids |
Async get documents by their IDs. |
search |
Return docs most similar to query using a specified search type. |
asearch |
Async return docs most similar to query using a specified search type. |
similarity_search_with_relevance_scores |
Return docs and relevance scores in the range |
asimilarity_search_with_relevance_scores |
Async return docs and relevance scores in the range |
from_documents |
Return |
afrom_documents |
Async return |
as_retriever |
Return |
__init__ |
Initialize the Milvus vector store. |
add_texts |
Insert text data into Milvus. |
add_embeddings |
Insert text data with embeddings vectors into Milvus. |
similarity_search |
Perform a similarity search against the query string. |
similarity_search_by_vector |
Perform a similarity search against the query string. |
similarity_search_with_score |
Perform a search on a query string and return results with score. |
similarity_search_with_score_by_vector |
Perform a search on an embedding and return results with score. |
max_marginal_relevance_search |
Perform a search and return results that are reordered by MMR. |
max_marginal_relevance_search_by_vector |
Perform a search and return results that are reordered by MMR. |
delete |
Delete by vector ID or boolean expression. |
drop |
Delete all the content in the index, by dropping the collection. |
from_texts |
Create a Milvus collection, indexes it with HNSW, and insert data. |
add_documents |
Run more documents through the embeddings and add to the vectorstore. |
get_pks |
Get primary keys with expression |
upsert |
Update/Insert documents to the vectorstore. |
search_by_metadata |
Searches the Milvus vector store based on metadata conditions. |
aadd_texts |
Insert text data into Milvus asynchronously. |
aadd_embeddings |
Insert text data with embeddings vectors into Milvus asynchronously. |
asimilarity_search |
Perform an async similarity search against the query string. |
asimilarity_search_by_vector |
Perform an async similarity search against the query vector. |
asimilarity_search_with_score |
Perform an async search on a query string and return results with score. |
asimilarity_search_with_score_by_vector |
Perform an async search on an embedding and return results with score. |
amax_marginal_relevance_search |
Perform an async search and return results that are reordered by MMR. |
amax_marginal_relevance_search_by_vector |
Perform an async search and return results that are reordered by MMR. |
adelete |
Async delete by vector ID or boolean expression. |
afrom_texts |
Create a Milvus collection, indexes it with HNSW, and insert data |
aadd_documents |
Run more documents through the embeddings and add to the vectorstore |
aget_pks |
Async get primary keys with expression |
aupsert |
Update/Insert documents to the vectorstore asynchronously. |
asearch_by_metadata |
Async searches the Milvus vector store based on metadata conditions. |
col
property
writable
¶
Lazy-loaded Collection object property with caching.
Returns the ORM Collection object if the collection exists. Uses cache to avoid repeated network calls and Collection() construction.
embeddings
property
¶
embeddings: EmbeddingType | list[EmbeddingType] | None
Get embedding function(s).
get_by_ids
¶
Get documents by their IDs.
The returned documents are expected to have the ID field set to the ID of the document in the vector store.
Fewer documents may be returned than requested if some IDs are not found or if there are duplicated IDs.
Users should not assume that the order of the returned documents matches the order of the input IDs. Instead, users should rely on the ID field of the returned documents.
This method should NOT raise exceptions if no documents are found for some IDs.
| PARAMETER | DESCRIPTION |
|---|---|
ids
|
List of IDs to retrieve. |
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of |
aget_by_ids
async
¶
Async get documents by their IDs.
The returned documents are expected to have the ID field set to the ID of the document in the vector store.
Fewer documents may be returned than requested if some IDs are not found or if there are duplicated IDs.
Users should not assume that the order of the returned documents matches the order of the input IDs. Instead, users should rely on the ID field of the returned documents.
This method should NOT raise exceptions if no documents are found for some IDs.
| PARAMETER | DESCRIPTION |
|---|---|
ids
|
List of IDs to retrieve. |
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of |
search
¶
Return docs most similar to query using a specified search type.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
Input text.
TYPE:
|
search_type
|
Type of search to perform. Can be
TYPE:
|
**kwargs
|
Arguments to pass to the search method.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If |
asearch
async
¶
Async return docs most similar to query using a specified search type.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
Input text.
TYPE:
|
search_type
|
Type of search to perform. Can be
TYPE:
|
**kwargs
|
Arguments to pass to the search method.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If |
similarity_search_with_relevance_scores
¶
similarity_search_with_relevance_scores(
query: str, k: int = 4, **kwargs: Any
) -> list[tuple[Document, float]]
Return docs and relevance scores in the range [0, 1].
0 is dissimilar, 1 is most similar.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
Input text.
TYPE:
|
k
|
Number of
TYPE:
|
**kwargs
|
kwargs to be passed to similarity search. Should include
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[tuple[Document, float]]
|
List of tuples of |
asimilarity_search_with_relevance_scores
async
¶
asimilarity_search_with_relevance_scores(
query: str, k: int = 4, **kwargs: Any
) -> list[tuple[Document, float]]
Async return docs and relevance scores in the range [0, 1].
0 is dissimilar, 1 is most similar.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
Input text.
TYPE:
|
k
|
Number of
TYPE:
|
**kwargs
|
kwargs to be passed to similarity search. Should include
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[tuple[Document, float]]
|
List of tuples of |
from_documents
classmethod
¶
from_documents(documents: list[Document], embedding: Embeddings, **kwargs: Any) -> Self
Return VectorStore initialized from documents and embeddings.
| PARAMETER | DESCRIPTION |
|---|---|
documents
|
List of |
embedding
|
Embedding function to use.
TYPE:
|
**kwargs
|
Additional keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Self
|
|
afrom_documents
async
classmethod
¶
afrom_documents(
documents: list[Document], embedding: Embeddings, **kwargs: Any
) -> Self
Async return VectorStore initialized from documents and embeddings.
| PARAMETER | DESCRIPTION |
|---|---|
documents
|
List of |
embedding
|
Embedding function to use.
TYPE:
|
**kwargs
|
Additional keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Self
|
|
as_retriever
¶
as_retriever(**kwargs: Any) -> VectorStoreRetriever
Return VectorStoreRetriever initialized from this VectorStore.
| PARAMETER | DESCRIPTION |
|---|---|
**kwargs
|
Keyword arguments to pass to the search function. Can include:
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
VectorStoreRetriever
|
Retriever class for |
Examples:
# Retrieve more documents with higher diversity
# Useful if your dataset has many similar documents
docsearch.as_retriever(
search_type="mmr", search_kwargs={"k": 6, "lambda_mult": 0.25}
)
# Fetch more documents for the MMR algorithm to consider
# But only return the top 5
docsearch.as_retriever(search_type="mmr", search_kwargs={"k": 5, "fetch_k": 50})
# Only retrieve documents that have a relevance score
# Above a certain threshold
docsearch.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"score_threshold": 0.8},
)
# Only get the single most similar document from the dataset
docsearch.as_retriever(search_kwargs={"k": 1})
# Use a filter to only retrieve documents from a specific paper
docsearch.as_retriever(
search_kwargs={"filter": {"paper_title": "GPT-4 Technical Report"}}
)
__init__
¶
__init__(
embedding_function: EmbeddingType | list[EmbeddingType] | None,
collection_name: str = "LangChainCollection",
collection_description: str = "",
collection_properties: dict[str, Any] | None = None,
connection_args: dict[str, Any] | None = None,
consistency_level: str = "Session",
index_params: dict | list[dict] | None = None,
search_params: dict | list[dict] | None = None,
drop_old: bool | None = False,
auto_id: bool = False,
*,
primary_field: str = PRIMARY_FIELD,
text_field: str = TEXT_FIELD,
vector_field: str | list[str] = VECTOR_FIELD,
enable_dynamic_field: bool = False,
metadata_field: str | None = None,
partition_key_field: str | None = None,
num_partitions: int | None = None,
partition_names: list | None = None,
replica_number: int = 1,
timeout: float | None = None,
num_shards: int | None = None,
vector_schema: dict[str, Any] | list[dict[str, Any]] | None = None,
metadata_schema: dict[str, Any] | None = None,
builtin_function: BaseMilvusBuiltInFunction
| list[BaseMilvusBuiltInFunction]
| None = None,
)
Initialize the Milvus vector store.
add_texts
¶
add_texts(
texts: Iterable[str],
metadatas: list[dict] | None = None,
timeout: float | None = None,
batch_size: int = 1000,
*,
ids: list[str] | None = None,
**kwargs: Any,
) -> list[str]
Insert text data into Milvus.
Inserting data when the collection has not be made yet will result in creating a new Collection. The data of the first entity decides the schema of the new collection, the dim is extracted from the first embedding and the columns are decided by the first metadata dict. Metadata keys will need to be present for all inserted values. At the moment there is no None equivalent in Milvus.
| PARAMETER | DESCRIPTION |
|---|---|
texts
|
The texts to embed, it is assumed that they all fit in memory. |
metadatas
|
Metadata dicts attached to each of the texts. Defaults to None. |
timeout
|
Timeout for each batch insert. Defaults to None.
TYPE:
|
batch_size
|
Batch size to use for insertion. Defaults to 1000.
TYPE:
|
ids
|
List of text ids. The length of each item |
| RAISES | DESCRIPTION |
|---|---|
MilvusException
|
Failure to add texts |
| RETURNS | DESCRIPTION |
|---|---|
list[str]
|
List[str]: The resulting keys for each inserted element. |
add_embeddings
¶
add_embeddings(
texts: list[str],
embeddings: List[List[float]] | List[List[List[float]]],
metadatas: list[dict] | None = None,
timeout: float | None = None,
batch_size: int = 1000,
*,
ids: list[str] | None = None,
**kwargs: Any,
) -> list[str]
Insert text data with embeddings vectors into Milvus.
This method inserts a batch of text embeddings into a Milvus collection. If the collection is not initialized, it will automatically initialize the collection based on the embeddings,metadatas, and other parameters. The embeddings are expected to be pre-generated using compatible embedding functions, and the metadata associated with each text is optional but must match the number of texts.
| PARAMETER | DESCRIPTION |
|---|---|
texts
|
the texts to insert |
embeddings
|
A vector embeddings for each text (in case of a single vector) or list of vectors for each text (in case of multi-vector) |
metadatas
|
Metadata dicts attached to each of the texts. Defaults to None. |
timeout
|
Timeout for each batch insert. Defaults to None.
TYPE:
|
batch_size
|
Batch size to use for insertion. Defaults to 1000.
TYPE:
|
ids
|
List of text ids. The length of each item |
| RAISES | DESCRIPTION |
|---|---|
MilvusException
|
Failure to add texts and embeddings |
| RETURNS | DESCRIPTION |
|---|---|
list[str]
|
List[str]: The resulting keys for each inserted element. |
similarity_search
¶
similarity_search(
query: str,
k: int = 4,
param: dict | list[dict] | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[Document]
Perform a similarity search against the query string.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
The text to search.
TYPE:
|
k
|
How many results to return. Defaults to 4.
TYPE:
|
param
|
The search params for the index type. Defaults to None. |
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List[Document]: Document results for search. |
similarity_search_by_vector
¶
similarity_search_by_vector(
embedding: list[float],
k: int = 4,
param: dict | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[Document]
Perform a similarity search against the query string.
| PARAMETER | DESCRIPTION |
|---|---|
embedding
|
The embedding vector to search. |
k
|
How many results to return. Defaults to 4.
TYPE:
|
param
|
The search params for the index type. Defaults to None.
TYPE:
|
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List[Document]: Document results for search. |
similarity_search_with_score
¶
similarity_search_with_score(
query: str,
k: int = 4,
param: dict | list[dict] | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[tuple[Document, float]]
Perform a search on a query string and return results with score.
For more information about the search parameters, take a look at the pymilvus documentation found here: https://milvus.io/api-reference/pymilvus/v2.5.x/ORM/Collection/search.md
| PARAMETER | DESCRIPTION |
|---|---|
query
|
The text being searched.
TYPE:
|
k
|
The amount of results to return. Defaults to 4.
TYPE:
|
param
|
The search params for the specified |
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() or hybrid_search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[tuple[Document, float]]
|
List[Tuple[Document, float]]: List of result doc and score. |
similarity_search_with_score_by_vector
¶
similarity_search_with_score_by_vector(
embedding: List[float] | Dict[int, float],
k: int = 4,
param: dict | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[tuple[Document, float]]
Perform a search on an embedding and return results with score.
For more information about the search parameters, take a look at the pymilvus documentation found here: https://milvus.io/api-reference/pymilvus/v2.5.x/ORM/Collection/search.md
| PARAMETER | DESCRIPTION |
|---|---|
embedding
|
The embedding vector being searched. |
k
|
The amount of results to return. Defaults to 4.
TYPE:
|
param
|
The search params for the specified index. Defaults to None.
TYPE:
|
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[tuple[Document, float]]
|
List[Tuple[Document, float]]: Result doc and score. |
max_marginal_relevance_search
¶
max_marginal_relevance_search(
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
param: dict | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[Document]
Perform a search and return results that are reordered by MMR.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
The text being searched.
TYPE:
|
k
|
How many results to give. Defaults to 4.
TYPE:
|
fetch_k
|
Total results to select k from. Defaults to 20.
TYPE:
|
lambda_mult
|
Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5
TYPE:
|
param
|
The search params for the specified index. Defaults to None.
TYPE:
|
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List[Document]: Document results for search. |
max_marginal_relevance_search_by_vector
¶
max_marginal_relevance_search_by_vector(
embedding: list[float] | dict[int, float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
param: dict | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[Document]
Perform a search and return results that are reordered by MMR.
| PARAMETER | DESCRIPTION |
|---|---|
embedding
|
The embedding vector being searched. |
k
|
How many results to give. Defaults to 4.
TYPE:
|
fetch_k
|
Total results to select k from. Defaults to 20.
TYPE:
|
lambda_mult
|
Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5
TYPE:
|
param
|
The search params for the specified index. Defaults to None.
TYPE:
|
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List[Document]: Document results for search. |
delete
¶
Delete by vector ID or boolean expression. Refer to Milvus documentation for notes and examples of expressions.
| PARAMETER | DESCRIPTION |
|---|---|
ids
|
List of ids to delete. |
expr
|
Boolean expression that specifies the entities to delete.
TYPE:
|
kwargs
|
Other parameters in Milvus delete api.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool | None
|
Optional[bool]: True if deletion is successful, |
bool | None
|
False otherwise. |
from_texts
classmethod
¶
from_texts(
texts: list[str],
embedding: EmbeddingType | list[EmbeddingType] | None,
metadatas: list[dict] | None = None,
collection_name: str = "LangChainCollection",
connection_args: dict[str, Any] | None = None,
consistency_level: str = "Session",
index_params: dict | list[dict] | None = None,
search_params: dict | list[dict] | None = None,
drop_old: bool = False,
*,
ids: list[str] | None = None,
auto_id: bool = False,
builtin_function: BaseMilvusBuiltInFunction
| list[BaseMilvusBuiltInFunction]
| None = None,
**kwargs: Any,
) -> Milvus
Create a Milvus collection, indexes it with HNSW, and insert data.
| PARAMETER | DESCRIPTION |
|---|---|
texts
|
Text data. |
embedding
|
Embedding function.
TYPE:
|
metadatas
|
Metadata for each text if it exists. Defaults to None. |
collection_name
|
Collection name to use. Defaults to "LangChainCollection".
TYPE:
|
connection_args
|
Connection args to use. Defaults to DEFAULT_MILVUS_CONNECTION. |
consistency_level
|
Which consistency level to use. Defaults to "Session".
TYPE:
|
index_params
|
Which index_params to use. Defaults to None.
TYPE:
|
search_params
|
Which search params to use. Defaults to None.
TYPE:
|
drop_old
|
Whether to drop the collection with that name if it exists. Defaults to False.
TYPE:
|
ids
|
List of text ids. Defaults to None. |
auto_id
|
Whether to enable auto id for primary key. Defaults to False. If False, you need to provide text ids (string less than 65535 bytes). If True, Milvus will generate unique integers as primary keys.
TYPE:
|
**kwargs
|
Other parameters in Milvus Collection.
TYPE:
|
Returns: Milvus: Milvus Vector Store
add_documents
¶
get_pks
¶
upsert
¶
search_by_metadata
¶
Searches the Milvus vector store based on metadata conditions.
This function performs a metadata-based query using an expression that filters stored documents without vector similarity.
| PARAMETER | DESCRIPTION |
|---|---|
expr
|
A filtering expression (e.g.,
TYPE:
|
fields
|
List of fields to retrieve. If None, retrieves all available fields. |
limit
|
Maximum number of results to return.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List[Document]: List of documents matching the metadata filter. |
aadd_texts
async
¶
aadd_texts(
texts: Iterable[str],
metadatas: list[dict] | None = None,
timeout: float | None = None,
batch_size: int = 1000,
*,
ids: list[str] | None = None,
**kwargs: Any,
) -> list[str]
Insert text data into Milvus asynchronously.
Inserting data when the collection has not be made yet will result in creating a new Collection. The data of the first entity decides the schema of the new collection, the dim is extracted from the first embedding and the columns are decided by the first metadata dict. Metadata keys will need to be present for all inserted values. At the moment there is no None equivalent in Milvus.
| PARAMETER | DESCRIPTION |
|---|---|
texts
|
The texts to embed, it is assumed that they all fit in memory. |
metadatas
|
Metadata dicts attached to each of the texts. Defaults to None. |
timeout
|
Timeout for each batch insert. Defaults to None.
TYPE:
|
batch_size
|
Batch size to use for insertion. Defaults to 1000.
TYPE:
|
ids
|
List of text ids. The length of each item |
| RAISES | DESCRIPTION |
|---|---|
MilvusException
|
Failure to add texts |
| RETURNS | DESCRIPTION |
|---|---|
list[str]
|
List[str]: The resulting keys for each inserted element. |
aadd_embeddings
async
¶
aadd_embeddings(
texts: list[str],
embeddings: List[List[float]] | List[List[List[float]]],
metadatas: list[dict] | None = None,
timeout: float | None = None,
batch_size: int = 1000,
*,
ids: list[str] | None = None,
**kwargs: Any,
) -> list[str]
Insert text data with embeddings vectors into Milvus asynchronously.
This method inserts a batch of text embeddings into a Milvus collection. If the collection is not initialized, it will automatically initialize the collection based on the embeddings,metadatas, and other parameters. The embeddings are expected to be pre-generated using compatible embedding functions, and the metadata associated with each text is optional but must match the number of texts.
| PARAMETER | DESCRIPTION |
|---|---|
texts
|
the texts to insert |
embeddings
|
A vector embeddings for each text (in case of a single vector) or list of vectors for each text (in case of multi-vector) |
metadatas
|
Metadata dicts attached to each of the texts. Defaults to None. |
timeout
|
Timeout for each batch insert. Defaults to None.
TYPE:
|
batch_size
|
Batch size to use for insertion. Defaults to 1000.
TYPE:
|
ids
|
List of text ids. The length of each item |
| RAISES | DESCRIPTION |
|---|---|
MilvusException
|
Failure to add texts and embeddings |
| RETURNS | DESCRIPTION |
|---|---|
list[str]
|
List[str]: The resulting keys for each inserted element. |
asimilarity_search
async
¶
asimilarity_search(
query: str,
k: int = 4,
param: dict | list[dict] | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[Document]
Perform an async similarity search against the query string.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
The text to search.
TYPE:
|
k
|
How many results to return. Defaults to 4.
TYPE:
|
param
|
The search params for the index type. Defaults to None. |
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List[Document]: Document results for search. |
asimilarity_search_by_vector
async
¶
asimilarity_search_by_vector(
embedding: list[float],
k: int = 4,
param: dict | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[Document]
Perform an async similarity search against the query vector.
| PARAMETER | DESCRIPTION |
|---|---|
embedding
|
The embedding vector to search. |
k
|
How many results to return. Defaults to 4.
TYPE:
|
param
|
The search params for the index type. Defaults to None.
TYPE:
|
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List[Document]: Document results for search. |
asimilarity_search_with_score
async
¶
asimilarity_search_with_score(
query: str,
k: int = 4,
param: dict | list[dict] | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[tuple[Document, float]]
Perform an async search on a query string and return results with score.
For more information about the search parameters, take a look at the pymilvus documentation found here: https://milvus.io/api-reference/pymilvus/v2.5.x/ORM/Collection/search.md
| PARAMETER | DESCRIPTION |
|---|---|
query
|
The text being searched.
TYPE:
|
k
|
The amount of results to return. Defaults to 4.
TYPE:
|
param
|
The search params for the specified |
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() or hybrid_search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[tuple[Document, float]]
|
List[Tuple[Document, float]]: List of result doc and score. |
asimilarity_search_with_score_by_vector
async
¶
asimilarity_search_with_score_by_vector(
embedding: List[float] | Dict[int, float],
k: int = 4,
param: dict | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[tuple[Document, float]]
Perform an async search on an embedding and return results with score.
For more information about the search parameters, take a look at the pymilvus documentation found here: https://milvus.io/api-reference/pymilvus/v2.5.x/ORM/Collection/search.md
| PARAMETER | DESCRIPTION |
|---|---|
embedding
|
The embedding vector being searched. |
k
|
The amount of results to return. Defaults to 4.
TYPE:
|
param
|
The search params for the specified index. Defaults to None.
TYPE:
|
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[tuple[Document, float]]
|
List[Tuple[Document, float]]: Result doc and score. |
amax_marginal_relevance_search
async
¶
amax_marginal_relevance_search(
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
param: dict | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[Document]
Perform an async search and return results that are reordered by MMR.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
The text being searched.
TYPE:
|
k
|
How many results to give. Defaults to 4.
TYPE:
|
fetch_k
|
Total results to select k from. Defaults to 20.
TYPE:
|
lambda_mult
|
Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5
TYPE:
|
param
|
The search params for the specified index. Defaults to None.
TYPE:
|
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List[Document]: Document results for search. |
amax_marginal_relevance_search_by_vector
async
¶
amax_marginal_relevance_search_by_vector(
embedding: list[float] | dict[int, float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
param: dict | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[Document]
Perform an async search and return results that are reordered by MMR.
| PARAMETER | DESCRIPTION |
|---|---|
embedding
|
The embedding vector being searched. |
k
|
How many results to give. Defaults to 4.
TYPE:
|
fetch_k
|
Total results to select k from. Defaults to 20.
TYPE:
|
lambda_mult
|
Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5
TYPE:
|
param
|
The search params for the specified index. Defaults to None.
TYPE:
|
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List[Document]: Document results for search. |
adelete
async
¶
Async delete by vector ID or boolean expression. Refer to Milvus documentation for notes and examples of expressions.
| PARAMETER | DESCRIPTION |
|---|---|
ids
|
List of ids to delete. |
expr
|
Boolean expression that specifies the entities to delete.
TYPE:
|
kwargs
|
Other parameters in Milvus delete api.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool | None
|
Optional[bool]: True if deletion is successful, |
bool | None
|
False otherwise. |
afrom_texts
async
classmethod
¶
afrom_texts(
texts: list[str],
embedding: EmbeddingType | list[EmbeddingType] | None,
metadatas: list[dict] | None = None,
collection_name: str = "LangChainCollection",
connection_args: dict[str, Any] | None = None,
consistency_level: str = "Session",
index_params: dict | list[dict] | None = None,
search_params: dict | list[dict] | None = None,
drop_old: bool = False,
*,
ids: list[str] | None = None,
auto_id: bool = False,
builtin_function: BaseMilvusBuiltInFunction
| list[BaseMilvusBuiltInFunction]
| None = None,
**kwargs: Any,
) -> Milvus
Create a Milvus collection, indexes it with HNSW, and insert data asynchronously.
| PARAMETER | DESCRIPTION |
|---|---|
texts
|
Text data. |
embedding
|
Embedding function.
TYPE:
|
metadatas
|
Metadata for each text if it exists. Defaults to None. |
collection_name
|
Collection name to use. Defaults to "LangChainCollection".
TYPE:
|
connection_args
|
Connection args to use. Defaults to DEFAULT_MILVUS_CONNECTION. |
consistency_level
|
Which consistency level to use. Defaults to "Session".
TYPE:
|
index_params
|
Which index_params to use. Defaults to None.
TYPE:
|
search_params
|
Which search params to use. Defaults to None.
TYPE:
|
drop_old
|
Whether to drop the collection with that name if it exists. Defaults to False.
TYPE:
|
ids
|
List of text ids. Defaults to None. |
auto_id
|
Whether to enable auto id for primary key. Defaults to False. If False, you need to provide text ids (string less than 65535 bytes). If True, Milvus will generate unique integers as primary keys.
TYPE:
|
**kwargs
|
Other parameters in Milvus Collection.
TYPE:
|
Returns: Milvus: Milvus Vector Store
aadd_documents
async
¶
aget_pks
async
¶
aupsert
async
¶
aupsert(
ids: list[str] | None = None,
documents: List[Document] | None = None,
batch_size: int = 1000,
timeout: float | None = None,
**kwargs: Any,
) -> None
Update/Insert documents to the vectorstore asynchronously.
| PARAMETER | DESCRIPTION |
|---|---|
ids
|
IDs to update - Let's call aget_pks to get ids with expression |
documents
|
Documents to add to the vectorstore. |
batch_size
|
Batch size to use for upsert. Defaults to 1000.
TYPE:
|
timeout
|
Timeout for each batch upsert. Defaults to None.
TYPE:
|
**kwargs
|
Other parameters in Milvus upsert api.
TYPE:
|
asearch_by_metadata
async
¶
asearch_by_metadata(
expr: str, fields: list[str] | None = None, limit: int = 10
) -> list[Document]
Async searches the Milvus vector store based on metadata conditions.
This function performs a metadata-based query using an expression that filters stored documents without vector similarity.
| PARAMETER | DESCRIPTION |
|---|---|
expr
|
A filtering expression (e.g.,
TYPE:
|
fields
|
List of fields to retrieve. If None, retrieves all available fields. |
limit
|
Maximum number of results to return.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List[Document]: List of documents matching the metadata filter. |
Zilliz
¶
Bases: Milvus
Zilliz vector store.
You need to have pymilvus installed and a
running Zilliz database.
See the following documentation for how to run a Zilliz instance: https://docs.zilliz.com/docs/create-cluster
IF USING L2/IP metric IT IS HIGHLY SUGGESTED TO NORMALIZE YOUR DATA.
| PARAMETER | DESCRIPTION |
|---|---|
embedding_function
|
Function used to embed the text.
TYPE:
|
collection_name
|
Which Zilliz collection to use. Defaults to "LangChainCollection".
TYPE:
|
connection_args
|
The connection args used for this class comes in the form of a dict. |
consistency_level
|
The consistency level to use for a collection. Defaults to "Session".
TYPE:
|
index_params
|
Which index params to use. Defaults to HNSW/AUTOINDEX depending on service.
TYPE:
|
search_params
|
Which search params to use. Defaults to default of index.
TYPE:
|
drop_old
|
Whether to drop the current collection. Defaults to False.
TYPE:
|
auto_id
|
Whether to enable auto id for primary key. Defaults to False. If False, you need to provide text ids (string less than 65535 bytes). If True, Milvus will generate unique integers as primary keys.
TYPE:
|
The connection args used for this class comes in the form of a dict, the two major arguments are: uri (str): The Public Endpoint of Zilliz instance. Example uri: "https://in03-ba4234asae.api.gcp-us-west1.zillizcloud.com", token (str): API key, for serverless clusters which can be used as replacements for user and password. For more information, please refer to: https://docs.zilliz.com/docs/on-zilliz-cloud-console#cluster-details and https://docs.zilliz.com/reference/python/python/Connections-connect
Example
from langchain_milvus import Zilliz
from langchain_openai import OpenAIEmbeddings
embedding = OpenAIEmbeddings()
# Connect to a Zilliz instance
milvus_store = Zilliz(
embedding_function = embedding,
collection_name = "LangChainCollection",
connection_args = {
"uri": "https://in03-ba4234asae.api.gcp-us-west1.zillizcloud.com",
"token": "temp", # API key
}
drop_old: True,
)
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the pymilvus python package is not installed. |
| METHOD | DESCRIPTION |
|---|---|
add_texts |
Insert text data into Milvus. |
delete |
Delete by vector ID or boolean expression. |
get_by_ids |
Get documents by their IDs. |
aget_by_ids |
Async get documents by their IDs. |
adelete |
Async delete by vector ID or boolean expression. |
aadd_texts |
Insert text data into Milvus asynchronously. |
add_documents |
Run more documents through the embeddings and add to the vectorstore. |
aadd_documents |
Run more documents through the embeddings and add to the vectorstore |
search |
Return docs most similar to query using a specified search type. |
asearch |
Async return docs most similar to query using a specified search type. |
similarity_search |
Perform a similarity search against the query string. |
similarity_search_with_score |
Perform a search on a query string and return results with score. |
asimilarity_search_with_score |
Perform an async search on a query string and return results with score. |
similarity_search_with_relevance_scores |
Return docs and relevance scores in the range |
asimilarity_search_with_relevance_scores |
Async return docs and relevance scores in the range |
asimilarity_search |
Perform an async similarity search against the query string. |
similarity_search_by_vector |
Perform a similarity search against the query string. |
asimilarity_search_by_vector |
Perform an async similarity search against the query vector. |
max_marginal_relevance_search |
Perform a search and return results that are reordered by MMR. |
amax_marginal_relevance_search |
Perform an async search and return results that are reordered by MMR. |
max_marginal_relevance_search_by_vector |
Perform a search and return results that are reordered by MMR. |
amax_marginal_relevance_search_by_vector |
Perform an async search and return results that are reordered by MMR. |
from_documents |
Return |
afrom_documents |
Async return |
from_texts |
Create a Milvus collection, indexes it with HNSW, and insert data. |
afrom_texts |
Create a Milvus collection, indexes it with HNSW, and insert data |
as_retriever |
Return |
add_embeddings |
Insert text data with embeddings vectors into Milvus. |
similarity_search_with_score_by_vector |
Perform a search on an embedding and return results with score. |
drop |
Delete all the content in the index, by dropping the collection. |
get_pks |
Get primary keys with expression |
upsert |
Update/Insert documents to the vectorstore. |
search_by_metadata |
Searches the Milvus vector store based on metadata conditions. |
aadd_embeddings |
Insert text data with embeddings vectors into Milvus asynchronously. |
asimilarity_search_with_score_by_vector |
Perform an async search on an embedding and return results with score. |
aget_pks |
Async get primary keys with expression |
aupsert |
Update/Insert documents to the vectorstore asynchronously. |
asearch_by_metadata |
Async searches the Milvus vector store based on metadata conditions. |
__init__ |
Initialize the Milvus vector store. |
embeddings
property
¶
embeddings: EmbeddingType | list[EmbeddingType] | None
Get embedding function(s).
col
property
writable
¶
Lazy-loaded Collection object property with caching.
Returns the ORM Collection object if the collection exists. Uses cache to avoid repeated network calls and Collection() construction.
add_texts
¶
add_texts(
texts: Iterable[str],
metadatas: list[dict] | None = None,
timeout: float | None = None,
batch_size: int = 1000,
*,
ids: list[str] | None = None,
**kwargs: Any,
) -> list[str]
Insert text data into Milvus.
Inserting data when the collection has not be made yet will result in creating a new Collection. The data of the first entity decides the schema of the new collection, the dim is extracted from the first embedding and the columns are decided by the first metadata dict. Metadata keys will need to be present for all inserted values. At the moment there is no None equivalent in Milvus.
| PARAMETER | DESCRIPTION |
|---|---|
texts
|
The texts to embed, it is assumed that they all fit in memory. |
metadatas
|
Metadata dicts attached to each of the texts. Defaults to None. |
timeout
|
Timeout for each batch insert. Defaults to None.
TYPE:
|
batch_size
|
Batch size to use for insertion. Defaults to 1000.
TYPE:
|
ids
|
List of text ids. The length of each item |
| RAISES | DESCRIPTION |
|---|---|
MilvusException
|
Failure to add texts |
| RETURNS | DESCRIPTION |
|---|---|
list[str]
|
List[str]: The resulting keys for each inserted element. |
delete
¶
Delete by vector ID or boolean expression. Refer to Milvus documentation for notes and examples of expressions.
| PARAMETER | DESCRIPTION |
|---|---|
ids
|
List of ids to delete. |
expr
|
Boolean expression that specifies the entities to delete.
TYPE:
|
kwargs
|
Other parameters in Milvus delete api.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool | None
|
Optional[bool]: True if deletion is successful, |
bool | None
|
False otherwise. |
get_by_ids
¶
Get documents by their IDs.
The returned documents are expected to have the ID field set to the ID of the document in the vector store.
Fewer documents may be returned than requested if some IDs are not found or if there are duplicated IDs.
Users should not assume that the order of the returned documents matches the order of the input IDs. Instead, users should rely on the ID field of the returned documents.
This method should NOT raise exceptions if no documents are found for some IDs.
| PARAMETER | DESCRIPTION |
|---|---|
ids
|
List of IDs to retrieve. |
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of |
aget_by_ids
async
¶
Async get documents by their IDs.
The returned documents are expected to have the ID field set to the ID of the document in the vector store.
Fewer documents may be returned than requested if some IDs are not found or if there are duplicated IDs.
Users should not assume that the order of the returned documents matches the order of the input IDs. Instead, users should rely on the ID field of the returned documents.
This method should NOT raise exceptions if no documents are found for some IDs.
| PARAMETER | DESCRIPTION |
|---|---|
ids
|
List of IDs to retrieve. |
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of |
adelete
async
¶
Async delete by vector ID or boolean expression. Refer to Milvus documentation for notes and examples of expressions.
| PARAMETER | DESCRIPTION |
|---|---|
ids
|
List of ids to delete. |
expr
|
Boolean expression that specifies the entities to delete.
TYPE:
|
kwargs
|
Other parameters in Milvus delete api.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool | None
|
Optional[bool]: True if deletion is successful, |
bool | None
|
False otherwise. |
aadd_texts
async
¶
aadd_texts(
texts: Iterable[str],
metadatas: list[dict] | None = None,
timeout: float | None = None,
batch_size: int = 1000,
*,
ids: list[str] | None = None,
**kwargs: Any,
) -> list[str]
Insert text data into Milvus asynchronously.
Inserting data when the collection has not be made yet will result in creating a new Collection. The data of the first entity decides the schema of the new collection, the dim is extracted from the first embedding and the columns are decided by the first metadata dict. Metadata keys will need to be present for all inserted values. At the moment there is no None equivalent in Milvus.
| PARAMETER | DESCRIPTION |
|---|---|
texts
|
The texts to embed, it is assumed that they all fit in memory. |
metadatas
|
Metadata dicts attached to each of the texts. Defaults to None. |
timeout
|
Timeout for each batch insert. Defaults to None.
TYPE:
|
batch_size
|
Batch size to use for insertion. Defaults to 1000.
TYPE:
|
ids
|
List of text ids. The length of each item |
| RAISES | DESCRIPTION |
|---|---|
MilvusException
|
Failure to add texts |
| RETURNS | DESCRIPTION |
|---|---|
list[str]
|
List[str]: The resulting keys for each inserted element. |
add_documents
¶
aadd_documents
async
¶
search
¶
Return docs most similar to query using a specified search type.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
Input text.
TYPE:
|
search_type
|
Type of search to perform. Can be
TYPE:
|
**kwargs
|
Arguments to pass to the search method.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If |
asearch
async
¶
Async return docs most similar to query using a specified search type.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
Input text.
TYPE:
|
search_type
|
Type of search to perform. Can be
TYPE:
|
**kwargs
|
Arguments to pass to the search method.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If |
similarity_search
¶
similarity_search(
query: str,
k: int = 4,
param: dict | list[dict] | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[Document]
Perform a similarity search against the query string.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
The text to search.
TYPE:
|
k
|
How many results to return. Defaults to 4.
TYPE:
|
param
|
The search params for the index type. Defaults to None. |
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List[Document]: Document results for search. |
similarity_search_with_score
¶
similarity_search_with_score(
query: str,
k: int = 4,
param: dict | list[dict] | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[tuple[Document, float]]
Perform a search on a query string and return results with score.
For more information about the search parameters, take a look at the pymilvus documentation found here: https://milvus.io/api-reference/pymilvus/v2.5.x/ORM/Collection/search.md
| PARAMETER | DESCRIPTION |
|---|---|
query
|
The text being searched.
TYPE:
|
k
|
The amount of results to return. Defaults to 4.
TYPE:
|
param
|
The search params for the specified |
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() or hybrid_search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[tuple[Document, float]]
|
List[Tuple[Document, float]]: List of result doc and score. |
asimilarity_search_with_score
async
¶
asimilarity_search_with_score(
query: str,
k: int = 4,
param: dict | list[dict] | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[tuple[Document, float]]
Perform an async search on a query string and return results with score.
For more information about the search parameters, take a look at the pymilvus documentation found here: https://milvus.io/api-reference/pymilvus/v2.5.x/ORM/Collection/search.md
| PARAMETER | DESCRIPTION |
|---|---|
query
|
The text being searched.
TYPE:
|
k
|
The amount of results to return. Defaults to 4.
TYPE:
|
param
|
The search params for the specified |
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() or hybrid_search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[tuple[Document, float]]
|
List[Tuple[Document, float]]: List of result doc and score. |
similarity_search_with_relevance_scores
¶
similarity_search_with_relevance_scores(
query: str, k: int = 4, **kwargs: Any
) -> list[tuple[Document, float]]
Return docs and relevance scores in the range [0, 1].
0 is dissimilar, 1 is most similar.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
Input text.
TYPE:
|
k
|
Number of
TYPE:
|
**kwargs
|
kwargs to be passed to similarity search. Should include
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[tuple[Document, float]]
|
List of tuples of |
asimilarity_search_with_relevance_scores
async
¶
asimilarity_search_with_relevance_scores(
query: str, k: int = 4, **kwargs: Any
) -> list[tuple[Document, float]]
Async return docs and relevance scores in the range [0, 1].
0 is dissimilar, 1 is most similar.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
Input text.
TYPE:
|
k
|
Number of
TYPE:
|
**kwargs
|
kwargs to be passed to similarity search. Should include
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[tuple[Document, float]]
|
List of tuples of |
asimilarity_search
async
¶
asimilarity_search(
query: str,
k: int = 4,
param: dict | list[dict] | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[Document]
Perform an async similarity search against the query string.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
The text to search.
TYPE:
|
k
|
How many results to return. Defaults to 4.
TYPE:
|
param
|
The search params for the index type. Defaults to None. |
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List[Document]: Document results for search. |
similarity_search_by_vector
¶
similarity_search_by_vector(
embedding: list[float],
k: int = 4,
param: dict | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[Document]
Perform a similarity search against the query string.
| PARAMETER | DESCRIPTION |
|---|---|
embedding
|
The embedding vector to search. |
k
|
How many results to return. Defaults to 4.
TYPE:
|
param
|
The search params for the index type. Defaults to None.
TYPE:
|
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List[Document]: Document results for search. |
asimilarity_search_by_vector
async
¶
asimilarity_search_by_vector(
embedding: list[float],
k: int = 4,
param: dict | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[Document]
Perform an async similarity search against the query vector.
| PARAMETER | DESCRIPTION |
|---|---|
embedding
|
The embedding vector to search. |
k
|
How many results to return. Defaults to 4.
TYPE:
|
param
|
The search params for the index type. Defaults to None.
TYPE:
|
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List[Document]: Document results for search. |
max_marginal_relevance_search
¶
max_marginal_relevance_search(
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
param: dict | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[Document]
Perform a search and return results that are reordered by MMR.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
The text being searched.
TYPE:
|
k
|
How many results to give. Defaults to 4.
TYPE:
|
fetch_k
|
Total results to select k from. Defaults to 20.
TYPE:
|
lambda_mult
|
Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5
TYPE:
|
param
|
The search params for the specified index. Defaults to None.
TYPE:
|
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List[Document]: Document results for search. |
amax_marginal_relevance_search
async
¶
amax_marginal_relevance_search(
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
param: dict | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[Document]
Perform an async search and return results that are reordered by MMR.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
The text being searched.
TYPE:
|
k
|
How many results to give. Defaults to 4.
TYPE:
|
fetch_k
|
Total results to select k from. Defaults to 20.
TYPE:
|
lambda_mult
|
Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5
TYPE:
|
param
|
The search params for the specified index. Defaults to None.
TYPE:
|
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List[Document]: Document results for search. |
max_marginal_relevance_search_by_vector
¶
max_marginal_relevance_search_by_vector(
embedding: list[float] | dict[int, float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
param: dict | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[Document]
Perform a search and return results that are reordered by MMR.
| PARAMETER | DESCRIPTION |
|---|---|
embedding
|
The embedding vector being searched. |
k
|
How many results to give. Defaults to 4.
TYPE:
|
fetch_k
|
Total results to select k from. Defaults to 20.
TYPE:
|
lambda_mult
|
Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5
TYPE:
|
param
|
The search params for the specified index. Defaults to None.
TYPE:
|
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List[Document]: Document results for search. |
amax_marginal_relevance_search_by_vector
async
¶
amax_marginal_relevance_search_by_vector(
embedding: list[float] | dict[int, float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
param: dict | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[Document]
Perform an async search and return results that are reordered by MMR.
| PARAMETER | DESCRIPTION |
|---|---|
embedding
|
The embedding vector being searched. |
k
|
How many results to give. Defaults to 4.
TYPE:
|
fetch_k
|
Total results to select k from. Defaults to 20.
TYPE:
|
lambda_mult
|
Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5
TYPE:
|
param
|
The search params for the specified index. Defaults to None.
TYPE:
|
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List[Document]: Document results for search. |
from_documents
classmethod
¶
from_documents(documents: list[Document], embedding: Embeddings, **kwargs: Any) -> Self
Return VectorStore initialized from documents and embeddings.
| PARAMETER | DESCRIPTION |
|---|---|
documents
|
List of |
embedding
|
Embedding function to use.
TYPE:
|
**kwargs
|
Additional keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Self
|
|
afrom_documents
async
classmethod
¶
afrom_documents(
documents: list[Document], embedding: Embeddings, **kwargs: Any
) -> Self
Async return VectorStore initialized from documents and embeddings.
| PARAMETER | DESCRIPTION |
|---|---|
documents
|
List of |
embedding
|
Embedding function to use.
TYPE:
|
**kwargs
|
Additional keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Self
|
|
from_texts
classmethod
¶
from_texts(
texts: list[str],
embedding: EmbeddingType | list[EmbeddingType] | None,
metadatas: list[dict] | None = None,
collection_name: str = "LangChainCollection",
connection_args: dict[str, Any] | None = None,
consistency_level: str = "Session",
index_params: dict | list[dict] | None = None,
search_params: dict | list[dict] | None = None,
drop_old: bool = False,
*,
ids: list[str] | None = None,
auto_id: bool = False,
builtin_function: BaseMilvusBuiltInFunction
| list[BaseMilvusBuiltInFunction]
| None = None,
**kwargs: Any,
) -> Milvus
Create a Milvus collection, indexes it with HNSW, and insert data.
| PARAMETER | DESCRIPTION |
|---|---|
texts
|
Text data. |
embedding
|
Embedding function.
TYPE:
|
metadatas
|
Metadata for each text if it exists. Defaults to None. |
collection_name
|
Collection name to use. Defaults to "LangChainCollection".
TYPE:
|
connection_args
|
Connection args to use. Defaults to DEFAULT_MILVUS_CONNECTION. |
consistency_level
|
Which consistency level to use. Defaults to "Session".
TYPE:
|
index_params
|
Which index_params to use. Defaults to None.
TYPE:
|
search_params
|
Which search params to use. Defaults to None.
TYPE:
|
drop_old
|
Whether to drop the collection with that name if it exists. Defaults to False.
TYPE:
|
ids
|
List of text ids. Defaults to None. |
auto_id
|
Whether to enable auto id for primary key. Defaults to False. If False, you need to provide text ids (string less than 65535 bytes). If True, Milvus will generate unique integers as primary keys.
TYPE:
|
**kwargs
|
Other parameters in Milvus Collection.
TYPE:
|
Returns: Milvus: Milvus Vector Store
afrom_texts
async
classmethod
¶
afrom_texts(
texts: list[str],
embedding: EmbeddingType | list[EmbeddingType] | None,
metadatas: list[dict] | None = None,
collection_name: str = "LangChainCollection",
connection_args: dict[str, Any] | None = None,
consistency_level: str = "Session",
index_params: dict | list[dict] | None = None,
search_params: dict | list[dict] | None = None,
drop_old: bool = False,
*,
ids: list[str] | None = None,
auto_id: bool = False,
builtin_function: BaseMilvusBuiltInFunction
| list[BaseMilvusBuiltInFunction]
| None = None,
**kwargs: Any,
) -> Milvus
Create a Milvus collection, indexes it with HNSW, and insert data asynchronously.
| PARAMETER | DESCRIPTION |
|---|---|
texts
|
Text data. |
embedding
|
Embedding function.
TYPE:
|
metadatas
|
Metadata for each text if it exists. Defaults to None. |
collection_name
|
Collection name to use. Defaults to "LangChainCollection".
TYPE:
|
connection_args
|
Connection args to use. Defaults to DEFAULT_MILVUS_CONNECTION. |
consistency_level
|
Which consistency level to use. Defaults to "Session".
TYPE:
|
index_params
|
Which index_params to use. Defaults to None.
TYPE:
|
search_params
|
Which search params to use. Defaults to None.
TYPE:
|
drop_old
|
Whether to drop the collection with that name if it exists. Defaults to False.
TYPE:
|
ids
|
List of text ids. Defaults to None. |
auto_id
|
Whether to enable auto id for primary key. Defaults to False. If False, you need to provide text ids (string less than 65535 bytes). If True, Milvus will generate unique integers as primary keys.
TYPE:
|
**kwargs
|
Other parameters in Milvus Collection.
TYPE:
|
Returns: Milvus: Milvus Vector Store
as_retriever
¶
as_retriever(**kwargs: Any) -> VectorStoreRetriever
Return VectorStoreRetriever initialized from this VectorStore.
| PARAMETER | DESCRIPTION |
|---|---|
**kwargs
|
Keyword arguments to pass to the search function. Can include:
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
VectorStoreRetriever
|
Retriever class for |
Examples:
# Retrieve more documents with higher diversity
# Useful if your dataset has many similar documents
docsearch.as_retriever(
search_type="mmr", search_kwargs={"k": 6, "lambda_mult": 0.25}
)
# Fetch more documents for the MMR algorithm to consider
# But only return the top 5
docsearch.as_retriever(search_type="mmr", search_kwargs={"k": 5, "fetch_k": 50})
# Only retrieve documents that have a relevance score
# Above a certain threshold
docsearch.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"score_threshold": 0.8},
)
# Only get the single most similar document from the dataset
docsearch.as_retriever(search_kwargs={"k": 1})
# Use a filter to only retrieve documents from a specific paper
docsearch.as_retriever(
search_kwargs={"filter": {"paper_title": "GPT-4 Technical Report"}}
)
add_embeddings
¶
add_embeddings(
texts: list[str],
embeddings: List[List[float]] | List[List[List[float]]],
metadatas: list[dict] | None = None,
timeout: float | None = None,
batch_size: int = 1000,
*,
ids: list[str] | None = None,
**kwargs: Any,
) -> list[str]
Insert text data with embeddings vectors into Milvus.
This method inserts a batch of text embeddings into a Milvus collection. If the collection is not initialized, it will automatically initialize the collection based on the embeddings,metadatas, and other parameters. The embeddings are expected to be pre-generated using compatible embedding functions, and the metadata associated with each text is optional but must match the number of texts.
| PARAMETER | DESCRIPTION |
|---|---|
texts
|
the texts to insert |
embeddings
|
A vector embeddings for each text (in case of a single vector) or list of vectors for each text (in case of multi-vector) |
metadatas
|
Metadata dicts attached to each of the texts. Defaults to None. |
timeout
|
Timeout for each batch insert. Defaults to None.
TYPE:
|
batch_size
|
Batch size to use for insertion. Defaults to 1000.
TYPE:
|
ids
|
List of text ids. The length of each item |
| RAISES | DESCRIPTION |
|---|---|
MilvusException
|
Failure to add texts and embeddings |
| RETURNS | DESCRIPTION |
|---|---|
list[str]
|
List[str]: The resulting keys for each inserted element. |
similarity_search_with_score_by_vector
¶
similarity_search_with_score_by_vector(
embedding: List[float] | Dict[int, float],
k: int = 4,
param: dict | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[tuple[Document, float]]
Perform a search on an embedding and return results with score.
For more information about the search parameters, take a look at the pymilvus documentation found here: https://milvus.io/api-reference/pymilvus/v2.5.x/ORM/Collection/search.md
| PARAMETER | DESCRIPTION |
|---|---|
embedding
|
The embedding vector being searched. |
k
|
The amount of results to return. Defaults to 4.
TYPE:
|
param
|
The search params for the specified index. Defaults to None.
TYPE:
|
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[tuple[Document, float]]
|
List[Tuple[Document, float]]: Result doc and score. |
get_pks
¶
upsert
¶
search_by_metadata
¶
Searches the Milvus vector store based on metadata conditions.
This function performs a metadata-based query using an expression that filters stored documents without vector similarity.
| PARAMETER | DESCRIPTION |
|---|---|
expr
|
A filtering expression (e.g.,
TYPE:
|
fields
|
List of fields to retrieve. If None, retrieves all available fields. |
limit
|
Maximum number of results to return.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List[Document]: List of documents matching the metadata filter. |
aadd_embeddings
async
¶
aadd_embeddings(
texts: list[str],
embeddings: List[List[float]] | List[List[List[float]]],
metadatas: list[dict] | None = None,
timeout: float | None = None,
batch_size: int = 1000,
*,
ids: list[str] | None = None,
**kwargs: Any,
) -> list[str]
Insert text data with embeddings vectors into Milvus asynchronously.
This method inserts a batch of text embeddings into a Milvus collection. If the collection is not initialized, it will automatically initialize the collection based on the embeddings,metadatas, and other parameters. The embeddings are expected to be pre-generated using compatible embedding functions, and the metadata associated with each text is optional but must match the number of texts.
| PARAMETER | DESCRIPTION |
|---|---|
texts
|
the texts to insert |
embeddings
|
A vector embeddings for each text (in case of a single vector) or list of vectors for each text (in case of multi-vector) |
metadatas
|
Metadata dicts attached to each of the texts. Defaults to None. |
timeout
|
Timeout for each batch insert. Defaults to None.
TYPE:
|
batch_size
|
Batch size to use for insertion. Defaults to 1000.
TYPE:
|
ids
|
List of text ids. The length of each item |
| RAISES | DESCRIPTION |
|---|---|
MilvusException
|
Failure to add texts and embeddings |
| RETURNS | DESCRIPTION |
|---|---|
list[str]
|
List[str]: The resulting keys for each inserted element. |
asimilarity_search_with_score_by_vector
async
¶
asimilarity_search_with_score_by_vector(
embedding: List[float] | Dict[int, float],
k: int = 4,
param: dict | None = None,
expr: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> list[tuple[Document, float]]
Perform an async search on an embedding and return results with score.
For more information about the search parameters, take a look at the pymilvus documentation found here: https://milvus.io/api-reference/pymilvus/v2.5.x/ORM/Collection/search.md
| PARAMETER | DESCRIPTION |
|---|---|
embedding
|
The embedding vector being searched. |
k
|
The amount of results to return. Defaults to 4.
TYPE:
|
param
|
The search params for the specified index. Defaults to None.
TYPE:
|
expr
|
Filtering expression. Defaults to None.
TYPE:
|
timeout
|
How long to wait before timeout error. Defaults to None.
TYPE:
|
kwargs
|
Collection.search() keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[tuple[Document, float]]
|
List[Tuple[Document, float]]: Result doc and score. |
aget_pks
async
¶
aupsert
async
¶
aupsert(
ids: list[str] | None = None,
documents: List[Document] | None = None,
batch_size: int = 1000,
timeout: float | None = None,
**kwargs: Any,
) -> None
Update/Insert documents to the vectorstore asynchronously.
| PARAMETER | DESCRIPTION |
|---|---|
ids
|
IDs to update - Let's call aget_pks to get ids with expression |
documents
|
Documents to add to the vectorstore. |
batch_size
|
Batch size to use for upsert. Defaults to 1000.
TYPE:
|
timeout
|
Timeout for each batch upsert. Defaults to None.
TYPE:
|
**kwargs
|
Other parameters in Milvus upsert api.
TYPE:
|
asearch_by_metadata
async
¶
asearch_by_metadata(
expr: str, fields: list[str] | None = None, limit: int = 10
) -> list[Document]
Async searches the Milvus vector store based on metadata conditions.
This function performs a metadata-based query using an expression that filters stored documents without vector similarity.
| PARAMETER | DESCRIPTION |
|---|---|
expr
|
A filtering expression (e.g.,
TYPE:
|
fields
|
List of fields to retrieve. If None, retrieves all available fields. |
limit
|
Maximum number of results to return.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List[Document]: List of documents matching the metadata filter. |