How to implement an integration package
This guide walks through the process of implementing a LangChain integration package.
Integration packages are just Python packages that can be installed with pip install <your-package>
,
which contain classes that are compatible with LangChain's core interfaces.
We will cover:
- How to implement components, such as chat models and vector stores, that adhere to the LangChain interface;
- (Optional) How to bootstrap a new integration package.
Implementing LangChain components
LangChain components are subclasses of base classes in langchain-core. Examples include chat models, vector stores, tools, embedding models and retrievers.
Your integration package will typically implement a subclass of at least one of these components. Expand the tabs below to see details on each.
- Chat models
- Vector stores
- Embeddings
- Tools
- Retrievers
Refer to the Custom Chat Model Guide guide for detail on a starter chat model implementation.
The langchain-cli
package contains template integrations
for major LangChain components that are tested against the standard unit and
integration tests in the LangChain Github repository. You can access the starter
chat model implementation here.
For convenience, we also include the code below.
Example chat model code
"""ParrotLink chat models."""
from typing import Any, Dict, Iterator, List, Optional
from langchain_core.callbacks import (
CallbackManagerForLLMRun,
)
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import (
AIMessage,
AIMessageChunk,
BaseMessage,
)
from langchain_core.messages.ai import UsageMetadata
from langchain_core.outputs import ChatGeneration, ChatGenerationChunk, ChatResult
from pydantic import Field
class ChatParrotLink(BaseChatModel):
# TODO: Replace all TODOs in docstring. See example docstring:
# https://github.com/langchain-ai/langchain/blob/7ff05357bac6eaedf5058a2af88f23a1817d40fe/libs/partners/openai/langchain_openai/chat_models/base.py#L1120
"""ParrotLink chat model integration.
The default implementation echoes the first `parrot_buffer_length` characters of the input.
# TODO: Replace with relevant packages, env vars.
Setup:
Install ``langchain-parrot-link`` and set environment variable ``PARROT_LINK_API_KEY``.
.. code-block:: bash
pip install -U langchain-parrot-link
export PARROT_LINK_API_KEY="your-api-key"
# TODO: Populate with relevant params.
Key init args — completion params:
model: str
Name of ParrotLink model to use.
temperature: float
Sampling temperature.
max_tokens: Optional[int]
Max number of tokens to generate.
# TODO: Populate with relevant params.
Key init args — client params:
timeout: Optional[float]
Timeout for requests.
max_retries: int
Max number of retries.
api_key: Optional[str]
ParrotLink API key. If not passed in will be read from env var PARROT_LINK_API_KEY.
See full list of supported init args and their descriptions in the params section.
# TODO: Replace with relevant init params.
Instantiate:
.. code-block:: python
from langchain_parrot_link import ChatParrotLink
llm = ChatParrotLink(
model="...",
temperature=0,
max_tokens=None,
timeout=None,
max_retries=2,
# api_key="...",
# other params...
)
Invoke:
.. code-block:: python
messages = [
("system", "You are a helpful translator. Translate the user sentence to French."),
("human", "I love programming."),
]
llm.invoke(messages)
.. code-block:: python
# TODO: Example output.
# TODO: Delete if token-level streaming isn't supported.
Stream:
.. code-block:: python
for chunk in llm.stream(messages):
print(chunk)
.. code-block:: python
# TODO: Example output.
.. code-block:: python
stream = llm.stream(messages)
full = next(stream)
for chunk in stream:
full += chunk
full
.. code-block:: python
# TODO: Example output.
# TODO: Delete if native async isn't supported.
Async:
.. code-block:: python
await llm.ainvoke(messages)
# stream:
# async for chunk in (await llm.astream(messages))
# batch:
# await llm.abatch([messages])
.. code-block:: python
# TODO: Example output.
# TODO: Delete if .bind_tools() isn't supported.
Tool calling:
.. code-block:: python
from pydantic import BaseModel, Field
class GetWeather(BaseModel):
'''Get the current weather in a given location'''
location: str = Field(..., description="The city and state, e.g. San Francisco, CA")
class GetPopulation(BaseModel):
'''Get the current population in a given location'''
location: str = Field(..., description="The city and state, e.g. San Francisco, CA")
llm_with_tools = llm.bind_tools([GetWeather, GetPopulation])
ai_msg = llm_with_tools.invoke("Which city is hotter today and which is bigger: LA or NY?")
ai_msg.tool_calls
.. code-block:: python
# TODO: Example output.
See ``ChatParrotLink.bind_tools()`` method for more.
# TODO: Delete if .with_structured_output() isn't supported.
Structured output:
.. code-block:: python
from typing import Optional
from pydantic import BaseModel, Field
class Joke(BaseModel):
'''Joke to tell user.'''
setup: str = Field(description="The setup of the joke")
punchline: str = Field(description="The punchline to the joke")
rating: Optional[int] = Field(description="How funny the joke is, from 1 to 10")
structured_llm = llm.with_structured_output(Joke)
structured_llm.invoke("Tell me a joke about cats")
.. code-block:: python
# TODO: Example output.
See ``ChatParrotLink.with_structured_output()`` for more.
# TODO: Delete if JSON mode response format isn't supported.
JSON mode:
.. code-block:: python
# TODO: Replace with appropriate bind arg.
json_llm = llm.bind(response_format={"type": "json_object"})
ai_msg = json_llm.invoke("Return a JSON object with key 'random_ints' and a value of 10 random ints in [0-99]")
ai_msg.content
.. code-block:: python
# TODO: Example output.
# TODO: Delete if image inputs aren't supported.
Image input:
.. code-block:: python
import base64
import httpx
from langchain_core.messages import HumanMessage
image_url = "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg"
image_data = base64.b64encode(httpx.get(image_url).content).decode("utf-8")
# TODO: Replace with appropriate message content format.
message = HumanMessage(
content=[
{"type": "text", "text": "describe the weather in this image"},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{image_data}"},
},
],
)
ai_msg = llm.invoke([message])
ai_msg.content
.. code-block:: python
# TODO: Example output.
# TODO: Delete if audio inputs aren't supported.
Audio input:
.. code-block:: python
# TODO: Example input
.. code-block:: python
# TODO: Example output
# TODO: Delete if video inputs aren't supported.
Video input:
.. code-block:: python
# TODO: Example input
.. code-block:: python
# TODO: Example output
# TODO: Delete if token usage metadata isn't supported.
Token usage:
.. code-block:: python
ai_msg = llm.invoke(messages)
ai_msg.usage_metadata
.. code-block:: python
{'input_tokens': 28, 'output_tokens': 5, 'total_tokens': 33}
# TODO: Delete if logprobs aren't supported.
Logprobs:
.. code-block:: python
# TODO: Replace with appropriate bind arg.
logprobs_llm = llm.bind(logprobs=True)
ai_msg = logprobs_llm.invoke(messages)
ai_msg.response_metadata["logprobs"]
.. code-block:: python
# TODO: Example output.
Response metadata
.. code-block:: python
ai_msg = llm.invoke(messages)
ai_msg.response_metadata
.. code-block:: python
# TODO: Example output.
""" # noqa: E501
model_name: str = Field(alias="model")
"""The name of the model"""
parrot_buffer_length: int
"""The number of characters from the last message of the prompt to be echoed."""
temperature: Optional[float] = None
max_tokens: Optional[int] = None
timeout: Optional[int] = None
stop: Optional[List[str]] = None
max_retries: int = 2
@property
def _llm_type(self) -> str:
"""Return type of chat model."""
return "chat-__package_name_short__"
@property
def _identifying_params(self) -> Dict[str, Any]:
"""Return a dictionary of identifying parameters.
This information is used by the LangChain callback system, which
is used for tracing purposes make it possible to monitor LLMs.
"""
return {
# The model name allows users to specify custom token counting
# rules in LLM monitoring applications (e.g., in LangSmith users
# can provide per token pricing for their model and monitor
# costs for the given LLM.)
"model_name": self.model_name,
}
def _generate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
"""Override the _generate method to implement the chat model logic.
This can be a call to an API, a call to a local model, or any other
implementation that generates a response to the input prompt.
Args:
messages: the prompt composed of a list of messages.
stop: a list of strings on which the model should stop generating.
If generation stops due to a stop token, the stop token itself
SHOULD BE INCLUDED as part of the output. This is not enforced
across models right now, but it's a good practice to follow since
it makes it much easier to parse the output of the model
downstream and understand why generation stopped.
run_manager: A run manager with callbacks for the LLM.
"""
# Replace this with actual logic to generate a response from a list
# of messages.
last_message = messages[-1]
tokens = last_message.content[: self.parrot_buffer_length]
ct_input_tokens = sum(len(message.content) for message in messages)
ct_output_tokens = len(tokens)
message = AIMessage(
content=tokens,
additional_kwargs={}, # Used to add additional payload to the message
response_metadata={ # Use for response metadata
"time_in_seconds": 3,
},
usage_metadata={
"input_tokens": ct_input_tokens,
"output_tokens": ct_output_tokens,
"total_tokens": ct_input_tokens + ct_output_tokens,
},
)
##
generation = ChatGeneration(message=message)
return ChatResult(generations=[generation])
def _stream(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
"""Stream the output of the model.
This method should be implemented if the model can generate output
in a streaming fashion. If the model does not support streaming,
do not implement it. In that case streaming requests will be automatically
handled by the _generate method.
Args:
messages: the prompt composed of a list of messages.
stop: a list of strings on which the model should stop generating.
If generation stops due to a stop token, the stop token itself
SHOULD BE INCLUDED as part of the output. This is not enforced
across models right now, but it's a good practice to follow since
it makes it much easier to parse the output of the model
downstream and understand why generation stopped.
run_manager: A run manager with callbacks for the LLM.
"""
last_message = messages[-1]
tokens = str(last_message.content[: self.parrot_buffer_length])
ct_input_tokens = sum(len(message.content) for message in messages)
for token in tokens:
usage_metadata = UsageMetadata(
{
"input_tokens": ct_input_tokens,
"output_tokens": 1,
"total_tokens": ct_input_tokens + 1,
}
)
ct_input_tokens = 0
chunk = ChatGenerationChunk(
message=AIMessageChunk(content=token, usage_metadata=usage_metadata)
)
if run_manager:
# This is optional in newer versions of LangChain
# The on_llm_new_token will be called automatically
run_manager.on_llm_new_token(token, chunk=chunk)
yield chunk
# Let's add some other information (e.g., response metadata)
chunk = ChatGenerationChunk(
message=AIMessageChunk(content="", response_metadata={"time_in_sec": 3})
)
if run_manager:
# This is optional in newer versions of LangChain
# The on_llm_new_token will be called automatically
run_manager.on_llm_new_token(token, chunk=chunk)
yield chunk
# TODO: Implement if ChatParrotLink supports async streaming. Otherwise delete.
# async def _astream(
# self,
# messages: List[BaseMessage],
# stop: Optional[List[str]] = None,
# run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
# **kwargs: Any,
# ) -> AsyncIterator[ChatGenerationChunk]:
# TODO: Implement if ChatParrotLink supports async generation. Otherwise delete.
# async def _agenerate(
# self,
# messages: List[BaseMessage],
# stop: Optional[List[str]] = None,
# run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
# **kwargs: Any,
# ) -> ChatResult:
Your vector store implementation will depend on your chosen database technology.
langchain-core
includes a minimal
in-memory vector store
that we can use as a guide. You can access the code here.
All vector stores must inherit from the VectorStore base class. This interface consists of methods for writing, deleting and searching for documents in the vector store.
VectorStore
supports a variety of synchronous and asynchronous search types (e.g.,
nearest-neighbor or maximum marginal relevance), as well as interfaces for adding
documents to the store. See the API Reference
for all supported methods. The required methods are tabulated below:
Method/Property | Description |
---|---|
add_documents | Add documents to the vector store. |
delete | Delete selected documents from vector store (by IDs) |
get_by_ids | Get selected documents from vector store (by IDs) |
similarity_search | Get documents most similar to a query. |
embeddings (property) | Embeddings object for vector store. |
from_texts | Instantiate vector store via adding texts. |
Note that InMemoryVectorStore
implements some optional search types, as well as
convenience methods for loading and dumping the object to a file, but this is not
necessary for all implementations.
The in-memory vector store is tested against the standard tests in the LangChain Github repository.
Example vector store code
"""ParrotLink vector stores."""
from __future__ import annotations
import uuid
from typing import (
Any,
Callable,
Iterator,
List,
Optional,
Sequence,
Tuple,
Type,
TypeVar,
)
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
from langchain_core.vectorstores.utils import _cosine_similarity as cosine_similarity
VST = TypeVar("VST", bound=VectorStore)
class ParrotLinkVectorStore(VectorStore):
# TODO: Replace all TODOs in docstring.
"""ParrotLink vector store integration.
# TODO: Replace with relevant packages, env vars.
Setup:
Install ``langchain-parrot-link`` and set environment variable ``PARROT_LINK_API_KEY``.
.. code-block:: bash
pip install -U langchain-parrot-link
export PARROT_LINK_API_KEY="your-api-key"
# TODO: Populate with relevant params.
Key init args — indexing params:
collection_name: str
Name of the collection.
embedding_function: Embeddings
Embedding function to use.
# TODO: Populate with relevant params.
Key init args — client params:
client: Optional[Client]
Client to use.
connection_args: Optional[dict]
Connection arguments.
# TODO: Replace with relevant init params.
Instantiate:
.. code-block:: python
from langchain_parrot_link.vectorstores import ParrotLinkVectorStore
from langchain_openai import OpenAIEmbeddings
vector_store = ParrotLinkVectorStore(
collection_name="foo",
embedding_function=OpenAIEmbeddings(),
connection_args={"uri": "./foo.db"},
# other params...
)
# TODO: Populate with relevant variables.
Add Documents:
.. code-block:: python
from langchain_core.documents import Document
document_1 = Document(page_content="foo", metadata={"baz": "bar"})
document_2 = Document(page_content="thud", metadata={"bar": "baz"})
document_3 = Document(page_content="i will be deleted :(")
documents = [document_1, document_2, document_3]
ids = ["1", "2", "3"]
vector_store.add_documents(documents=documents, ids=ids)
# TODO: Populate with relevant variables.
Delete Documents:
.. code-block:: python
vector_store.delete(ids=["3"])
# TODO: Fill out with relevant variables and example output.
Search:
.. code-block:: python
results = vector_store.similarity_search(query="thud",k=1)
for doc in results:
print(f"* {doc.page_content} [{doc.metadata}]")
.. code-block:: python
# TODO: Example output
# TODO: Fill out with relevant variables and example output.
Search with filter:
.. code-block:: python
results = vector_store.similarity_search(query="thud",k=1,filter={"bar": "baz"})
for doc in results:
print(f"* {doc.page_content} [{doc.metadata}]")
.. code-block:: python
# TODO: Example output
# TODO: Fill out with relevant variables and example output.
Search with score:
.. code-block:: python
results = vector_store.similarity_search_with_score(query="qux",k=1)
for doc, score in results:
print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]")
.. code-block:: python
# TODO: Example output
# TODO: Fill out with relevant variables and example output.
Async:
.. code-block:: python
# add documents
# await vector_store.aadd_documents(documents=documents, ids=ids)
# delete documents
# await vector_store.adelete(ids=["3"])
# search
# results = vector_store.asimilarity_search(query="thud",k=1)
# search with score
results = await vector_store.asimilarity_search_with_score(query="qux",k=1)
for doc,score in results:
print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]")
.. code-block:: python
# TODO: Example output
# TODO: Fill out with relevant variables and example output.
Use as Retriever:
.. code-block:: python
retriever = vector_store.as_retriever(
search_type="mmr",
search_kwargs={"k": 1, "fetch_k": 2, "lambda_mult": 0.5},
)
retriever.invoke("thud")
.. code-block:: python
# TODO: Example output
""" # noqa: E501
def __init__(self, embedding: Embeddings) -> None:
"""Initialize with the given embedding function.
Args:
embedding: embedding function to use.
"""
self._database: dict[str, dict[str, Any]] = {}
self.embedding = embedding
@classmethod
def from_texts(
cls: Type[ParrotLinkVectorStore],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> ParrotLinkVectorStore:
store = cls(
embedding=embedding,
)
store.add_texts(texts=texts, metadatas=metadatas, **kwargs)
return store
# optional: add custom async implementations
# @classmethod
# async def afrom_texts(
# cls: Type[VST],
# texts: List[str],
# embedding: Embeddings,
# metadatas: Optional[List[dict]] = None,
# **kwargs: Any,
# ) -> VST:
# return await asyncio.get_running_loop().run_in_executor(
# None, partial(cls.from_texts, **kwargs), texts, embedding, metadatas
# )
@property
def embeddings(self) -> Embeddings:
return self.embedding
def add_documents(
self,
documents: List[Document],
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""Add documents to the store."""
texts = [doc.page_content for doc in documents]
vectors = self.embedding.embed_documents(texts)
if ids and len(ids) != len(texts):
msg = (
f"ids must be the same length as texts. "
f"Got {len(ids)} ids and {len(texts)} texts."
)
raise ValueError(msg)
id_iterator: Iterator[Optional[str]] = (
iter(ids) if ids else iter(doc.id for doc in documents)
)
ids_ = []
for doc, vector in zip(documents, vectors):
doc_id = next(id_iterator)
doc_id_ = doc_id if doc_id else str(uuid.uuid4())
ids_.append(doc_id_)
self._database[doc_id_] = {
"id": doc_id_,
"vector": vector,
"text": doc.page_content,
"metadata": doc.metadata,
}
return ids_
# optional: add custom async implementations
# async def aadd_documents(
# self,
# documents: List[Document],
# ids: Optional[List[str]] = None,
# **kwargs: Any,
# ) -> List[str]:
# raise NotImplementedError
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> None:
if ids:
for _id in ids:
self._database.pop(_id, None)
# optional: add custom async implementations
# async def adelete(
# self, ids: Optional[List[str]] = None, **kwargs: Any
# ) -> None:
# raise NotImplementedError
def get_by_ids(self, ids: Sequence[str], /) -> list[Document]:
"""Get documents by their ids.
Args:
ids: The ids of the documents to get.
Returns:
A list of Document objects.
"""
documents = []
for doc_id in ids:
doc = self._database.get(doc_id)
if doc:
documents.append(
Document(
id=doc["id"],
page_content=doc["text"],
metadata=doc["metadata"],
)
)
return documents
# optional: add custom async implementations
# async def aget_by_ids(self, ids: Sequence[str], /) -> list[Document]:
# raise NotImplementedError
# NOTE: the below helper method implements similarity search for in-memory
# storage. It is optional and not a part of the vector store interface.
def _similarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Callable[[Document], bool]] = None,
**kwargs: Any,
) -> List[tuple[Document, float, List[float]]]:
# get all docs with fixed order in list
docs = list(self._database.values())
if filter is not None:
docs = [
doc
for doc in docs
if filter(Document(page_content=doc["text"], metadata=doc["metadata"]))
]
if not docs:
return []
similarity = cosine_similarity([embedding], [doc["vector"] for doc in docs])[0]
# get the indices ordered by similarity score
top_k_idx = similarity.argsort()[::-1][:k]
return [
(
# Document
Document(
id=doc_dict["id"],
page_content=doc_dict["text"],
metadata=doc_dict["metadata"],
),
# Score
float(similarity[idx].item()),
# Embedding vector
doc_dict["vector"],
)
for idx in top_k_idx
# Assign using walrus operator to avoid multiple lookups
if (doc_dict := docs[idx])
]
def similarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Document]:
embedding = self.embedding.embed_query(query)
return [
doc
for doc, _, _ in self._similarity_search_with_score_by_vector(
embedding=embedding, k=k, **kwargs
)
]
# optional: add custom async implementations
# async def asimilarity_search(
# self, query: str, k: int = 4, **kwargs: Any
# ) -> List[Document]:
# # This is a temporary workaround to make the similarity search
# # asynchronous. The proper solution is to make the similarity search
# # asynchronous in the vector store implementations.
# func = partial(self.similarity_search, query, k=k, **kwargs)
# return await asyncio.get_event_loop().run_in_executor(None, func)
def similarity_search_with_score(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Tuple[Document, float]]:
embedding = self.embedding.embed_query(query)
return [
(doc, similarity)
for doc, similarity, _ in self._similarity_search_with_score_by_vector(
embedding=embedding, k=k, **kwargs
)
]
# optional: add custom async implementations
# async def asimilarity_search_with_score(
# self, *args: Any, **kwargs: Any
# ) -> List[Tuple[Document, float]]:
# # This is a temporary workaround to make the similarity search
# # asynchronous. The proper solution is to make the similarity search
# # asynchronous in the vector store implementations.
# func = partial(self.similarity_search_with_score, *args, **kwargs)
# return await asyncio.get_event_loop().run_in_executor(None, func)
### ADDITIONAL OPTIONAL SEARCH METHODS BELOW ###
# def similarity_search_by_vector(
# self, embedding: List[float], k: int = 4, **kwargs: Any
# ) -> List[Document]:
# raise NotImplementedError
# optional: add custom async implementations
# async def asimilarity_search_by_vector(
# self, embedding: List[float], k: int = 4, **kwargs: Any
# ) -> List[Document]:
# # This is a temporary workaround to make the similarity search
# # asynchronous. The proper solution is to make the similarity search
# # asynchronous in the vector store implementations.
# func = partial(self.similarity_search_by_vector, embedding, k=k, **kwargs)
# return await asyncio.get_event_loop().run_in_executor(None, func)
# def max_marginal_relevance_search(
# self,
# query: str,
# k: int = 4,
# fetch_k: int = 20,
# lambda_mult: float = 0.5,
# **kwargs: Any,
# ) -> List[Document]:
# raise NotImplementedError
# optional: add custom async implementations
# async def amax_marginal_relevance_search(
# self,
# query: str,
# k: int = 4,
# fetch_k: int = 20,
# lambda_mult: float = 0.5,
# **kwargs: Any,
# ) -> List[Document]:
# # This is a temporary workaround to make the similarity search
# # asynchronous. The proper solution is to make the similarity search
# # asynchronous in the vector store implementations.
# func = partial(
# self.max_marginal_relevance_search,
# query,
# k=k,
# fetch_k=fetch_k,
# lambda_mult=lambda_mult,
# **kwargs,
# )
# return await asyncio.get_event_loop().run_in_executor(None, func)
# def max_marginal_relevance_search_by_vector(
# self,
# embedding: List[float],
# k: int = 4,
# fetch_k: int = 20,
# lambda_mult: float = 0.5,
# **kwargs: Any,
# ) -> List[Document]:
# raise NotImplementedError
# optional: add custom async implementations
# async def amax_marginal_relevance_search_by_vector(
# self,
# embedding: List[float],
# k: int = 4,
# fetch_k: int = 20,
# lambda_mult: float = 0.5,
# **kwargs: Any,
# ) -> List[Document]:
# raise NotImplementedError
Embeddings are used to convert str
objects from Document.page_content
fields
into a vector representation (represented as a list of floats).
The Embeddings
class must inherit from the Embeddings
base class. This interface has 5 methods that can be implemented.
Method/Property | Description |
---|---|
__init__ | Initialize the embeddings object. (optional) |
embed_query | Embed a list of texts. (required) |
embed_documents | Embed a list of documents. (required) |
aembed_query | Asynchronously embed a list of texts. (optional) |
aembed_documents | Asynchronously embed a list of documents. (optional) |
Constructor
The __init__
constructor is optional but common, but can be used to set up any necessary attributes
that a user can pass in when initializing the embeddings object. Common attributes include
model
- the id of the model to use for embeddings
Embedding queries vs documents
The embed_query
and embed_documents
methods are required. These methods both operate
on string inputs (the accessing of Document.page_content
attributes) is handled
by the VectorStore using the embedding model for legacy reasons.
embed_query
takes in a single string and returns a single embedding as a list of floats.
If your model has different modes for embedding queries vs the underlying documents, you can
implement this method to handle that.
embed_documents
takes in a list of strings and returns a list of embeddings as a list of lists of floats.
Implementation
The langchain-cli
package contains template integrations
for major LangChain components that are tested against the standard unit and
integration tests in the LangChain Github repository. You can access the starter
embedding model implementation here.
For convenience, we also include the code below.
Example embeddings code
from typing import List
from langchain_core.embeddings import Embeddings
class ParrotLinkEmbeddings(Embeddings):
"""ParrotLink embedding model integration.
# TODO: Replace with relevant packages, env vars.
Setup:
Install ``langchain-parrot-link`` and set environment variable
``PARROT_LINK_API_KEY``.
.. code-block:: bash
pip install -U langchain-parrot-link
export PARROT_LINK_API_KEY="your-api-key"
# TODO: Populate with relevant params.
Key init args — completion params:
model: str
Name of ParrotLink model to use.
See full list of supported init args and their descriptions in the params section.
# TODO: Replace with relevant init params.
Instantiate:
.. code-block:: python
from langchain_parrot_link import ParrotLinkEmbeddings
embed = ParrotLinkEmbeddings(
model="...",
# api_key="...",
# other params...
)
Embed single text:
.. code-block:: python
input_text = "The meaning of life is 42"
embed.embed_query(input_text)
.. code-block:: python
# TODO: Example output.
# TODO: Delete if token-level streaming isn't supported.
Embed multiple text:
.. code-block:: python
input_texts = ["Document 1...", "Document 2..."]
embed.embed_documents(input_texts)
.. code-block:: python
# TODO: Example output.
# TODO: Delete if native async isn't supported.
Async:
.. code-block:: python
await embed.aembed_query(input_text)
# multiple:
# await embed.aembed_documents(input_texts)
.. code-block:: python
# TODO: Example output.
"""
def __init__(self, model: str):
self.model = model
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Embed search docs."""
return [[0.5, 0.6, 0.7] for _ in texts]
def embed_query(self, text: str) -> List[float]:
"""Embed query text."""
return self.embed_documents([text])[0]
# optional: add custom async implementations here
# you can also delete these, and the base class will
# use the default implementation, which calls the sync
# version in an async executor:
# async def aembed_documents(self, texts: List[str]) -> List[List[float]]:
# """Asynchronous Embed search docs."""
# ...
# async def aembed_query(self, text: str) -> List[float]:
# """Asynchronous Embed query text."""
# ...
Example tool code
"""ParrotLink tools."""
from typing import Optional, Type
from langchain_core.callbacks import (
CallbackManagerForToolRun,
)
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
class ParrotLinkToolInput(BaseModel):
"""Input schema for ParrotLink tool.
This docstring is **not** part of what is sent to the model when performing tool
calling. The Field default values and descriptions **are** part of what is sent to
the model when performing tool calling.
"""
# TODO: Add input args and descriptions.
a: int = Field(..., description="first number to add")
b: int = Field(..., description="second number to add")
class ParrotLinkTool(BaseTool): # type: ignore[override]
"""ParrotLink tool.
Setup:
# TODO: Replace with relevant packages, env vars.
Install ``langchain-parrot-link`` and set environment variable ``PARROT_LINK_API_KEY``.
.. code-block:: bash
pip install -U langchain-parrot-link
export PARROT_LINK_API_KEY="your-api-key"
Instantiation:
.. code-block:: python
tool = ParrotLinkTool(
# TODO: init params
)
Invocation with args:
.. code-block:: python
# TODO: invoke args
tool.invoke({...})
.. code-block:: python
# TODO: output of invocation
Invocation with ToolCall:
.. code-block:: python
# TODO: invoke args
tool.invoke({"args": {...}, "id": "1", "name": tool.name, "type": "tool_call"})
.. code-block:: python
# TODO: output of invocation
""" # noqa: E501
# TODO: Set tool name and description
name: str = "TODO: Tool name"
"""The name that is passed to the model when performing tool calling."""
description: str = "TODO: Tool description."
"""The description that is passed to the model when performing tool calling."""
args_schema: Type[BaseModel] = ParrotLinkToolInput
"""The schema that is passed to the model when performing tool calling."""
# TODO: Add any other init params for the tool.
# param1: Optional[str]
# """param1 determines foobar"""
# TODO: Replaced (a, b) with real tool arguments.
def _run(
self, a: int, b: int, *, run_manager: Optional[CallbackManagerForToolRun] = None
) -> str:
return str(a + b + 80)
# TODO: Implement if tool has native async functionality, otherwise delete.
# async def _arun(
# self,
# a: int,
# b: int,
# *,
# run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
# ) -> str:
# ...
Example retriever code
"""ParrotLink retrievers."""
from typing import Any, List
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
class ParrotLinkRetriever(BaseRetriever):
# TODO: Replace all TODOs in docstring. See example docstring:
# https://github.com/langchain-ai/langchain/blob/master/libs/community/langchain_community/retrievers/tavily_search_api.py#L17
"""ParrotLink retriever.
# TODO: Replace with relevant packages, env vars, etc.
Setup:
Install ``langchain-parrot-link`` and set environment variable
``PARROT_LINK_API_KEY``.
.. code-block:: bash
pip install -U langchain-parrot-link
export PARROT_LINK_API_KEY="your-api-key"
# TODO: Populate with relevant params.
Key init args:
arg 1: type
description
arg 2: type
description
# TODO: Replace with relevant init params.
Instantiate:
.. code-block:: python
from langchain-parrot-link import ParrotLinkRetriever
retriever = ParrotLinkRetriever(
# ...
)
Usage:
.. code-block:: python
query = "..."
retriever.invoke(query)
.. code-block:: none
# TODO: Example output.
Use within a chain:
.. code-block:: python
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
prompt = ChatPromptTemplate.from_template(
\"\"\"Answer the question based only on the context provided.
Context: {context}
Question: {question}\"\"\"
)
llm = ChatOpenAI(model="gpt-3.5-turbo-0125")
def format_docs(docs):
return "\\n\\n".join(doc.page_content for doc in docs)
chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
chain.invoke("...")
.. code-block:: none
# TODO: Example output.
"""
k: int = 3
# TODO: This method must be implemented to retrieve documents.
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun, **kwargs: Any
) -> List[Document]:
k = kwargs.get("k", self.k)
return [
Document(page_content=f"Result {i} for query: {query}") for i in range(k)
]
# optional: add custom async implementations here
# async def _aget_relevant_documents(
# self,
# query: str,
# *,
# run_manager: AsyncCallbackManagerForRetrieverRun,
# **kwargs: Any,
# ) -> List[Document]: ...
(Optional) bootstrapping a new integration package
In this guide, we will be using Poetry for dependency management and packaging, and you're welcome to use any other tools you prefer.
Prerequisites
Boostrapping a new Python package with Poetry
First, install Poetry:
pip install poetry
Next, come up with a name for your package. For this guide, we'll use langchain-parrot-link
.
You can confirm that the name is available on PyPi by searching for it on the PyPi website.
Next, create your new Python package with Poetry, and navigate into the new directory with cd
:
poetry new langchain-parrot-link
cd langchain-parrot-link
Add main dependencies using Poetry, which will add them to your pyproject.toml
file:
poetry add langchain-core
We will also add some test
dependencies in a separate poetry dependency group. If
you are not using Poetry, we recommend adding these in a way that won't package them
with your published package, or just installing them separately when you run tests.
langchain-tests
will provide the standard tests we will use later.
We recommended pinning these to the latest version:
Note: Replace <latest_version>
with the latest version of langchain-tests
below.
poetry add --group test pytest pytest-socket pytest-asyncio langchain-tests==<latest_version>
And finally, have poetry set up a virtual environment with your dependencies, as well as your integration package:
poetry install --with test
You're now ready to start writing your integration package!
Writing your integration
Let's say you're building a simple integration package that provides a ChatParrotLink
chat model integration for LangChain. Here's a simple example of what your project
structure might look like:
langchain-parrot-link/
├── langchain_parrot_link/
│ ├── __init__.py
│ └── chat_models.py
├── tests/
│ ├── __init__.py
│ └── test_chat_models.py
├── pyproject.toml
└── README.md
All of these files should already exist from step 1, except for
chat_models.py
and test_chat_models.py
! We will implement test_chat_models.py
later, following the standard tests guide.
For chat_models.py
, simply paste the contents of the chat model implementation
above.
Push your package to a public Github repository
This is only required if you want to publish your integration in the LangChain documentation.
- Create a new repository on GitHub.
- Push your code to the repository.
- Confirm that your repository is viewable by the public (e.g. in a private browsing window, where you're not logged into Github).
Next Steps
Now that you've implemented your package, you can move on to testing your integration for your integration and successfully run them.