Optimize retrieval pipelines
RAG pipelines may perform poorly due to un-optimized retrieval which then handicaps an LLM's ability to generate relevant responses. If the LLM does not have the relevant contexts, it can't answer the question correctly. To help developers improve their retrieval pipelines, Not Diamond provides a framework that allows you to define your RAG workflow variables. Using test queries generated with the Not Diamond SDK, or test queries you've prepared yourself, Not Diamond will automatically run experiments to optimize the workflow parameters for the metrics you care about.
You can follow along the example below or try it in Colab.
Installation
Python: Requires Python 3.9+. Itβs recommended that you create and activate a virtualenv prior to installing the package. We'll install notdiamond
and its optional rag
dependencies, along with a few other libraries for this example.
pip install -q notdiamond[rag] datasets llama-index-llms-anthropic --upgrade
Setting up
Create a .env
file with the API keys of the models you want to use:
OPENAI_API_KEY = "YOUR_OPENAI_API_KEY"
ANTHROPIC_API_KEY = "YOUR_ANTHROPIC_API_KEY"
We will also download some sample data to use for this example
mkdir data
curl https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt -o data/paul_graham_essay.txt
Optimizing retrieval pipelines
This example workflow defines 5 parameters:
chunk_size
is the chunk size of the documents in the index. Documents will be broken down intochunk_size
number of words before being stored in the index.chunk_overlap
is the number of words each chunk overlaps.temperature
is the temperature of the generator LLM used to form a response to a query.top_k
is the number of hits to return from the index based on the query.llm
is the LLM to use for generation.
Not Diamond will automatically search through a space of possible parameter values defined by the user and return the best parameters found.
from typing import Annotated, Any, List
import pandas as pd
from llama_index.core import VectorStoreIndex
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.response_synthesizers import get_response_synthesizer
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.llms.openai import OpenAI
from llama_index.llms.anthropic import Anthropic
from notdiamond.toolkit.rag.evaluation import auto_optimize, evaluate
from notdiamond.toolkit.rag.llms import get_embedding, get_llm
from notdiamond.toolkit.rag.evaluation_dataset import RAGSample, RAGEvaluationDataset
from notdiamond.toolkit.rag.testset import TestDataGenerator
from notdiamond.toolkit.rag.metrics import (
FactualCorrectness,
Faithfulness,
ContextRecall,
SemanticSimilarity,
)
from notdiamond.toolkit.rag.workflow import (
BaseNDRagWorkflow,
CategoricalValueOptions,
FloatValueRange,
IntValueRange,
)
class ExampleNDRagWorkflow(BaseNDRagWorkflow):
"""
RAG workflow definition. Inherits from BaseNDRagWorkflow.
The attribute parameter_specs must be implemented.
parameter_specs defines the set of parameters in your rag_workflow that
you want to optimize.
Each parameter must be defined as a search space of integers, floats, or categorical.
Not Diamond provides the following dataclasses to help you define the search space:
* IntValueRange(low: int, high: int, step: int): An integer value range with a step size.
* FloatValueRange(low: float, high: float, step: float): A float value range with a step size.
* CategoricalValueOptions(values: List[str]): A set of categorical options.
The BaseNDRagWorkflow class constructor takes in documents and test_data.
Both arguments will be passed to rag_workflow.
"""
parameter_specs = {
"chunk_size": (Annotated[int, IntValueRange(1000, 2500, 500)], 1000),
"chunk_overlap": (Annotated[int, IntValueRange(50, 200, 25)], 100),
"top_k": (Annotated[int, IntValueRange(1, 5, 1)], 1),
"temperature": (Annotated[float, FloatValueRange(0.0, 1.0, 0.2)], 0.9),
"llm": (Annotated[str, CategoricalValueOptions(["openai/gpt-4o", "anthropic/claude-3-5-sonnet-20241022"])], "openai/gpt-4o")
}
def __init__(self, documents: Any, objective_maximize: bool = True):
test_data = self.get_test_queries(documents)
"""
The base class constructor takes in Langchain or Llama index documents
and test data, such as test queries, that will be used in the `def rag_workflow`
method to create the RAGEvaluationDataset.
"""
super().__init__(documents, test_data, objective_maximize)
def get_test_queries(self, documents: Any) -> List[Any]:
"""
Uses the test data generation workflow in the SDK to create test queries.
If you already have test queries, simply pass it into the base class constructor.
"""
test_data = self.generate_test_data(documents)
test_queries = []
for _, row in test_data.iterrows():
user_input = row["user_input"]
reference = row["reference"]
test_queries.append((user_input, reference))
return test_queries
@property
def job_name(self):
"""
Used to create the job name of the optimization run
"""
return "my-rag-workflow"
def generate_test_data(self, documents: Any) -> pd.DataFrame:
"""
Uses the test data generation workflow in the SDK to create test queries.
"""
print("Generating test data")
generator_llm = get_llm("openai/gpt-4o")
generator_embedding = get_embedding("openai/text-embedding-3-large")
generator = TestDataGenerator(
llm=generator_llm,
embedding_model=generator_embedding,
)
test_data = generator.generate_from_docs(
documents, testset_size=20
)
return test_data
def get_generation_prompt(self, retrieved_contexts: List[str], user_input: str) -> str:
"""
A helper method to return the generation prompt for the RAG workflow.
"""
context = "\n".join(retrieved_contexts)
prompt = f"""
Use the following context to answer the question.
Context: {context}
Question: {user_input}
"""
return prompt
def get_llama_index_llm(self, llm: str):
if llm == "openai/gpt-4o":
return OpenAI(model="gpt-4o", temperature=self.temperature)
elif llm == "anthropic/claude-3-5-sonnet-20241022":
return Anthropic(model="claude-3-5-sonnet-20241022", temperature=self.temperature)
def rag_workflow(self, documents: Any, test_queries: List[Any]) -> RAGEvaluationDataset:
"""
The main method that creates the RAGEvaluationDataset.
This method must be implemented by the subclass and .
The purpose is to define the RAG workflow such that the retrieval parameters
can be optimized.
"""
self.index = VectorStoreIndex.from_documents(
documents,
transformations=[
SentenceSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap,
)
],
)
self.retriever = VectorIndexRetriever(
index=self.index,
similarity_top_k=self.top_k,
)
response_synthesizer = get_response_synthesizer(llm=self.get_llama_index_llm(self.llm))
self.query_engine = RetrieverQueryEngine(
retriever=self.retriever, response_synthesizer=response_synthesizer
)
samples = []
print(f"Generating responses using {self.llm}")
for (user_input, reference) in test_queries:
response = self.get_response(user_input)
retrieved_contexts = self.get_retrieved_context(user_input)
generation_prompt = self.get_generation_prompt(retrieved_contexts, user_input)
sample = RAGSample(
user_input=user_input,
retrieved_contexts=retrieved_contexts,
response=response,
generation_prompt=generation_prompt,
reference=reference,
generator_llm=self.llm,
)
samples.append(sample)
return RAGEvaluationDataset(samples)
def get_retrieved_context(self, query: str) -> List[str]:
"""
A helper method to return the retrieved context for the RAG workflow.
"""
return [node.get_text() for node in self.retriever.retrieve(query)]
def get_response(self, query: str) -> str:
"""
A helper method to return the generated response for the RAG workflow.
"""
return self.query_engine.query(query).response
def objective(self):
"""
The objective function.
This method must be implemented by the subclass.
The purpose is to define the metric to optimize for.
The attribute self.evaluation_dataset is set by the base class constructor and is the return of the self.rag_workflow method.
Here we are using Not Diamond's evaluation framework to evaluate the RAG workflow and optimize it for the faithfulness metric.
"""
evaluator_llm = get_llm("openai/gpt-4o")
metrics = [
Faithfulness(llm=evaluator_llm),
]
results = evaluate(dataset=self.evaluation_dataset, metrics=metrics)
return results[self.llm]["faithfulness"].mean()
To run the example:
from llama_index.core import SimpleDirectoryReader
# Use llama-index's SimpleDirectoryReader to load sample document
documents = SimpleDirectoryReader("data").load_data()
# Create the workflow class
example_workflow = ExampleNDRagWorkflow(
documents, objective_maximize=True
)
# Optimize pipeline parameters defined in parameter_specs
results = auto_optimize(example_workflow, n_trials=10)
Breaking down this example
At the core of this framework is the BaseNDRagWorkflow
class. The class constructor takes in 3 arguments:
document
: this is the documents you want to use for your RAG workflow.test_queries
: this is the test user queries you want to use during the workflow evaluation.objective_maximize
: this is to indicate whether the objective is to maximize the score of the evaluation. Defaults totrue
.
def __init__(self, documents: Any, objective_maximize: bool = True):
test_data = self.get_test_queries(documents)
"""
The base class constructor takes in Langchain or Llama index documents
and test data, such as test queries, that will be used in the `def rag_workflow`
method to create the RAGEvaluationDataset.
"""
super().__init__(documents, test_data, objective_maximize)
The BaseNDRagWorkflow
requires the following implementations:
-
The
parameter_specs
attribute, which defines the parameters that you want to optimize. Each parameter to be optimized should be defined as aAnnotated[type, TypeRange]
. This tells Not Diamond the parameter type and the range to search through. To define the search space of each parameter, Not Diamond offers 3TypeRange
:IntValueRange(low: int, high: int, step: int)
: An integer value range with a step size.FloatValueRange(low: float, high: float, step: float)
: A float value range with a step size.CategoricalValueOptions(values: List[str])
: A set of categorical options.
class ExampleNDRagWorkflow(BaseNDRagWorkflow): """ RAG workflow definition. Inherits from BaseNDRagWorkflow. The attribute parameter_specs must be implemented. parameter_specs defines the set of parameters in your rag_workflow that you want to optimize. Each parameter must be defined as a search space of integers, floats, or categorical. Not Diamond provides the following dataclasses to help you define the search space: * IntValueRange(low: int, high: int, step: int): An integer value range with a step size. * FloatValueRange(low: float, high: float, step: float): A float value range with a step size. * CategoricalValueOptions(values: List[str]): A set of categorical options. The BaseNDRagWorkflow class constructor takes in documents and test_data. Both arguments will be passed to rag_workflow. """ parameter_specs = { "chunk_size": (Annotated[int, IntValueRange(1000, 2500, 500)], 1000), "chunk_overlap": (Annotated[int, IntValueRange(50, 200, 25)], 100), "top_k": (Annotated[int, IntValueRange(1, 5, 1)], 1), "temperature": (Annotated[float, FloatValueRange(0.0, 1.0, 0.2)], 0.9), "llm": (Annotated[str, CategoricalValueOptions(["openai/gpt-4o", "anthropic/claude-3-5-sonnet-20241022"])], "openai/gpt-4o") }
-
rag_workflow(self, documents: Any, test_queries: List[Any])
method, which defines your RAG workflow. The purpose of this method is to generate aRAGEvaluationDataset
containing the test queries, retrieved contexts, reference answers, and LLM generations, using a sample of theparameter_specs
. The returned object is accessible via the attributeself.evaluation_dataset
.def rag_workflow(self, documents: Any, test_queries: List[Any]) -> RAGEvaluationDataset: """ The main method that creates the RAGEvaluationDataset. This method must be implemented by the subclass and . The purpose is to define the RAG workflow such that the retrieval parameters can be optimized. """ self.index = VectorStoreIndex.from_documents( documents, transformations=[ SentenceSplitter( chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap, ) ], ) self.retriever = VectorIndexRetriever( index=self.index, similarity_top_k=self.top_k, ) response_synthesizer = get_response_synthesizer(llm=self.get_llama_index_llm(self.llm)) self.query_engine = RetrieverQueryEngine( retriever=self.retriever, response_synthesizer=response_synthesizer ) samples = [] print(f"Generating responses using {self.llm}") for (user_input, reference) in test_queries: response = self.get_response(user_input) retrieved_contexts = self.get_retrieved_context(user_input) generation_prompt = self.get_generation_prompt(retrieved_contexts, user_input) sample = RAGSample( user_input=user_input, retrieved_contexts=retrieved_contexts, response=response, generation_prompt=generation_prompt, reference=reference, generator_llm=self.llm, ) samples.append(sample) return RAGEvaluationDataset(samples)
-
objective(self)
method. This method defines the objective function, returning a scalar value that Not Diamond will try to optimize, depending on the objective direction defined by theobjective_maximize
parameter. The attributeself.evaluation_dataset
is the return object ofdef rag_workflow
, which is run beforeobjective
is called in each trial. Here, we use Not Diamond's built in evaluation tools and metrics to evaluate thefaithfulness
of the LLM response. In the next section, we will go into more details about running RAG evaluations using Not Diamond.def objective(self): """ The objective function. This method must be implemented by the subclass. The purpose is to define the metric to optimize for. The attribute self.evaluation_dataset is set by the base class constructor and is the return of the self.rag_workflow method. Here we are using Not Diamond's evaluation framework to evaluate the RAG workflow and optimize it for the faithfulness metric. """ evaluator_llm = get_llm("openai/gpt-4o") metrics = [ Faithfulness(llm=evaluator_llm), ] results = evaluate(dataset=self.evaluation_dataset, metrics=metrics) return results[self.llm]["faithfulness"].mean()
After the workflow is defined, use the auto_optimize
method to optimize the workflow parameters. Underneath the hood, we leverage derivative-free optimization techniques, such as Bayesian optimization, to efficiently search the problem space and return a set of retrieval parameters optimized for your data.
# Create the workflow class
example_workflow = ExampleNDRagWorkflow(
documents, objective_maximize=True
)
# Optimize pipeline parameters defined in parameter_specs
results = auto_optimize(example_workflow, n_trials=10)
# Best retrieval parameters found
print(results['best_params'])
Wrapping up
Now that we've identified the ideal retrieval parameters for our document store, we can auto-evaluate our candidate LLMs to see how they compare to each other and to train our custom model router to maximize performance.
Updated about 1 month ago