Optimize retrieval pipelines

RAG pipelines may perform poorly due to un-optimized retrieval which then handicaps an LLM's ability to generate relevant responses. If the LLM does not have the relevant contexts, it can't answer the question correctly. To help developers improve their retrieval pipelines, Not Diamond provides a framework that allows you to define your RAG workflow variables. Using test queries generated with the Not Diamond SDK, or test queries you've prepared yourself, Not Diamond will automatically run experiments to optimize the workflow parameters for the metrics you care about.

πŸ‘

Try it in Colab

You can follow along the example below or try it in Colab.

Installation

Python: Requires Python 3.9+. It’s recommended that you create and activate a virtualenv prior to installing the package. We'll install notdiamond and its optional rag dependencies, along with a few other libraries for this example.

pip install -q notdiamond[rag] datasets llama-index-llms-anthropic --upgrade

Setting up

Create a .env file with the API keys of the models you want to use:

OPENAI_API_KEY = "YOUR_OPENAI_API_KEY"
ANTHROPIC_API_KEY = "YOUR_ANTHROPIC_API_KEY"

We will also download some sample data to use for this example

mkdir data
curl https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt -o data/paul_graham_essay.txt

Optimizing retrieval pipelines

This example workflow defines 5 parameters:

  • chunk_size is the chunk size of the documents in the index. Documents will be broken down into chunk_size number of words before being stored in the index.
  • chunk_overlap is the number of words each chunk overlaps.
  • temperature is the temperature of the generator LLM used to form a response to a query.
  • top_k is the number of hits to return from the index based on the query.
  • llm is the LLM to use for generation.

Not Diamond will automatically search through a space of possible parameter values defined by the user and return the best parameters found.

from typing import Annotated, Any, List

import pandas as pd

from llama_index.core import VectorStoreIndex
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.response_synthesizers import get_response_synthesizer
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.llms.openai import OpenAI
from llama_index.llms.anthropic import Anthropic

from notdiamond.toolkit.rag.evaluation import auto_optimize, evaluate
from notdiamond.toolkit.rag.llms import get_embedding, get_llm
from notdiamond.toolkit.rag.evaluation_dataset import RAGSample, RAGEvaluationDataset
from notdiamond.toolkit.rag.testset import TestDataGenerator
from notdiamond.toolkit.rag.metrics import (
    FactualCorrectness,
    Faithfulness,
    ContextRecall,
    SemanticSimilarity,
)
from notdiamond.toolkit.rag.workflow import (
    BaseNDRagWorkflow,
    CategoricalValueOptions,
    FloatValueRange,
    IntValueRange,
)


class ExampleNDRagWorkflow(BaseNDRagWorkflow):
    """
    RAG workflow definition. Inherits from BaseNDRagWorkflow.
    The attribute parameter_specs must be implemented.
    parameter_specs defines the set of parameters in your rag_workflow that
    you want to optimize.
    Each parameter must be defined as a search space of integers, floats, or categorical.
    Not Diamond provides the following dataclasses to help you define the search space:
    * IntValueRange(low: int, high: int, step: int): An integer value range with a step size.
    * FloatValueRange(low: float, high: float, step: float): A float value range with a step size.
    * CategoricalValueOptions(values: List[str]): A set of categorical options.

    The BaseNDRagWorkflow class constructor takes in documents and test_data.
    Both arguments will be passed to rag_workflow.
    """
    parameter_specs = {
        "chunk_size": (Annotated[int, IntValueRange(1000, 2500, 500)], 1000),
        "chunk_overlap": (Annotated[int, IntValueRange(50, 200, 25)], 100),
        "top_k": (Annotated[int, IntValueRange(1, 5, 1)], 1),
        "temperature": (Annotated[float, FloatValueRange(0.0, 1.0, 0.2)], 0.9),
        "llm": (Annotated[str, CategoricalValueOptions(["openai/gpt-4o", "anthropic/claude-3-5-sonnet-20241022"])], "openai/gpt-4o")
    }

    def __init__(self, documents: Any, objective_maximize: bool = True):
        test_data = self.get_test_queries(documents)
        """
        The base class constructor takes in Langchain or Llama index documents
        and test data, such as test queries, that will be used in the `def rag_workflow`
        method to create the RAGEvaluationDataset.
        """
        super().__init__(documents, test_data, objective_maximize)

    def get_test_queries(self, documents: Any) -> List[Any]:
        """
        Uses the test data generation workflow in the SDK to create test queries.
        If you already have test queries, simply pass it into the base class constructor.
        """
        test_data = self.generate_test_data(documents)
        test_queries = []
        for _, row in test_data.iterrows():
            user_input = row["user_input"]
            reference = row["reference"]
            test_queries.append((user_input, reference))
        return test_queries

    @property
    def job_name(self):
        """
        Used to create the job name of the optimization run
        """
        return "my-rag-workflow"

    def generate_test_data(self, documents: Any) -> pd.DataFrame:
        """
        Uses the test data generation workflow in the SDK to create test queries.
        """
        print("Generating test data")
        generator_llm = get_llm("openai/gpt-4o")
        generator_embedding = get_embedding("openai/text-embedding-3-large")

        generator = TestDataGenerator(
            llm=generator_llm,
            embedding_model=generator_embedding,
        )
        test_data = generator.generate_from_docs(
            documents, testset_size=20
        )
        return test_data

    def get_generation_prompt(self, retrieved_contexts: List[str], user_input: str) -> str:
        """
        A helper method to return the generation prompt for the RAG workflow.
        """
        context = "\n".join(retrieved_contexts)
        prompt = f"""
        Use the following context to answer the question.

        Context: {context}

        Question: {user_input}
        """
        return prompt

    def get_llama_index_llm(self, llm: str):
        if llm == "openai/gpt-4o":
            return OpenAI(model="gpt-4o", temperature=self.temperature)
        elif llm == "anthropic/claude-3-5-sonnet-20241022":
            return Anthropic(model="claude-3-5-sonnet-20241022", temperature=self.temperature)

    def rag_workflow(self, documents: Any, test_queries: List[Any]) -> RAGEvaluationDataset:
        """
        The main method that creates the RAGEvaluationDataset.
        This method must be implemented by the subclass and .
        The purpose is to define the RAG workflow such that the retrieval parameters
        can be optimized.
        """
        self.index = VectorStoreIndex.from_documents(
            documents,
            transformations=[
                SentenceSplitter(
                    chunk_size=self.chunk_size,
                    chunk_overlap=self.chunk_overlap,
                )
            ],
        )

        self.retriever = VectorIndexRetriever(
            index=self.index,
            similarity_top_k=self.top_k,
        )

        response_synthesizer = get_response_synthesizer(llm=self.get_llama_index_llm(self.llm))
        self.query_engine = RetrieverQueryEngine(
            retriever=self.retriever, response_synthesizer=response_synthesizer
        )

        samples = []
        print(f"Generating responses using {self.llm}")
        for (user_input, reference) in test_queries:
            response = self.get_response(user_input)
            retrieved_contexts = self.get_retrieved_context(user_input)
            generation_prompt = self.get_generation_prompt(retrieved_contexts, user_input)
            sample = RAGSample(
                user_input=user_input,
                retrieved_contexts=retrieved_contexts,
                response=response,
                generation_prompt=generation_prompt,
                reference=reference,
                generator_llm=self.llm,
            )
            samples.append(sample)
        return RAGEvaluationDataset(samples)

    def get_retrieved_context(self, query: str) -> List[str]:
        """
        A helper method to return the retrieved context for the RAG workflow.
        """
        return [node.get_text() for node in self.retriever.retrieve(query)]

    def get_response(self, query: str) -> str:
        """
        A helper method to return the generated response for the RAG workflow.
        """
        return self.query_engine.query(query).response

    def objective(self):
        """
        The objective function.
        This method must be implemented by the subclass.
        The purpose is to define the metric to optimize for.
        The attribute self.evaluation_dataset is set by the base class constructor and is the return of the self.rag_workflow method.
        Here we are using Not Diamond's evaluation framework to evaluate the RAG workflow and optimize it for the faithfulness metric.
        """
        evaluator_llm = get_llm("openai/gpt-4o")

        metrics = [
            Faithfulness(llm=evaluator_llm),
        ]
        results = evaluate(dataset=self.evaluation_dataset, metrics=metrics)
        return results[self.llm]["faithfulness"].mean()

To run the example:

from llama_index.core import SimpleDirectoryReader

# Use llama-index's SimpleDirectoryReader to load sample document
documents = SimpleDirectoryReader("data").load_data()

# Create the workflow class
example_workflow = ExampleNDRagWorkflow(
	documents, objective_maximize=True
)

# Optimize pipeline parameters defined in parameter_specs
results = auto_optimize(example_workflow, n_trials=10)

Breaking down this example

At the core of this framework is the BaseNDRagWorkflow class. The class constructor takes in 3 arguments:

  • document: this is the documents you want to use for your RAG workflow.
  • test_queries: this is the test user queries you want to use during the workflow evaluation.
  • objective_maximize: this is to indicate whether the objective is to maximize the score of the evaluation. Defaults to true.
def __init__(self, documents: Any, objective_maximize: bool = True):
        test_data = self.get_test_queries(documents)
        """
        The base class constructor takes in Langchain or Llama index documents
        and test data, such as test queries, that will be used in the `def rag_workflow`
        method to create the RAGEvaluationDataset.
        """
        super().__init__(documents, test_data, objective_maximize)

The BaseNDRagWorkflow requires the following implementations:

  • The parameter_specs attribute, which defines the parameters that you want to optimize. Each parameter to be optimized should be defined as a Annotated[type, TypeRange]. This tells Not Diamond the parameter type and the range to search through. To define the search space of each parameter, Not Diamond offers 3 TypeRange:

    • IntValueRange(low: int, high: int, step: int): An integer value range with a step size.
    • FloatValueRange(low: float, high: float, step: float): A float value range with a step size.
    • CategoricalValueOptions(values: List[str]): A set of categorical options.
    class ExampleNDRagWorkflow(BaseNDRagWorkflow):
        """
        RAG workflow definition. Inherits from BaseNDRagWorkflow.
        The attribute parameter_specs must be implemented.
        parameter_specs defines the set of parameters in your rag_workflow that 
        you want to optimize.
        Each parameter must be defined as a search space of integers, floats, or categorical.
        Not Diamond provides the following dataclasses to help you define the search space:
        * IntValueRange(low: int, high: int, step: int): An integer value range with a step size.
        * FloatValueRange(low: float, high: float, step: float): A float value range with a step size.
        * CategoricalValueOptions(values: List[str]): A set of categorical options.
    
        The BaseNDRagWorkflow class constructor takes in documents and test_data.
        Both arguments will be passed to rag_workflow.
        """
        parameter_specs = {
            "chunk_size": (Annotated[int, IntValueRange(1000, 2500, 500)], 1000),
            "chunk_overlap": (Annotated[int, IntValueRange(50, 200, 25)], 100),
            "top_k": (Annotated[int, IntValueRange(1, 5, 1)], 1),
            "temperature": (Annotated[float, FloatValueRange(0.0, 1.0, 0.2)], 0.9),
            "llm": (Annotated[str, CategoricalValueOptions(["openai/gpt-4o", "anthropic/claude-3-5-sonnet-20241022"])], "openai/gpt-4o")
        }
    
  • rag_workflow(self, documents: Any, test_queries: List[Any]) method, which defines your RAG workflow. The purpose of this method is to generate a RAGEvaluationDataset containing the test queries, retrieved contexts, reference answers, and LLM generations, using a sample of the parameter_specs. The returned object is accessible via the attribute self.evaluation_dataset.

    def rag_workflow(self, documents: Any, test_queries: List[Any]) -> RAGEvaluationDataset:
            """
            The main method that creates the RAGEvaluationDataset.
            This method must be implemented by the subclass and .
            The purpose is to define the RAG workflow such that the retrieval parameters
            can be optimized.
            """
            self.index = VectorStoreIndex.from_documents(
                documents,
                transformations=[
                    SentenceSplitter(
                        chunk_size=self.chunk_size,
                        chunk_overlap=self.chunk_overlap,
                    )
                ],
            )
    
            self.retriever = VectorIndexRetriever(
                index=self.index,
                similarity_top_k=self.top_k,
            )
    
            response_synthesizer = get_response_synthesizer(llm=self.get_llama_index_llm(self.llm))
            self.query_engine = RetrieverQueryEngine(
                retriever=self.retriever, response_synthesizer=response_synthesizer
            )
    
            samples = []
            print(f"Generating responses using {self.llm}")
            for (user_input, reference) in test_queries:
                response = self.get_response(user_input)
                retrieved_contexts = self.get_retrieved_context(user_input)
                generation_prompt = self.get_generation_prompt(retrieved_contexts, user_input)
                sample = RAGSample(
                    user_input=user_input,
                    retrieved_contexts=retrieved_contexts,
                    response=response,
                    generation_prompt=generation_prompt,
                    reference=reference,
                    generator_llm=self.llm,
                )
                samples.append(sample)
            return RAGEvaluationDataset(samples)
    
  • objective(self) method. This method defines the objective function, returning a scalar value that Not Diamond will try to optimize, depending on the objective direction defined by the objective_maximize parameter. The attribute self.evaluation_dataset is the return object of def rag_workflow, which is run before objective is called in each trial. Here, we use Not Diamond's built in evaluation tools and metrics to evaluate the faithfulness of the LLM response. In the next section, we will go into more details about running RAG evaluations using Not Diamond.

    def objective(self):
            """
            The objective function.
            This method must be implemented by the subclass.
            The purpose is to define the metric to optimize for.
            The attribute self.evaluation_dataset is set by the base class constructor and is the return of the self.rag_workflow method.
            Here we are using Not Diamond's evaluation framework to evaluate the RAG workflow and optimize it for the faithfulness metric.
            """
            evaluator_llm = get_llm("openai/gpt-4o")
    
            metrics = [
                Faithfulness(llm=evaluator_llm),
            ]
            results = evaluate(dataset=self.evaluation_dataset, metrics=metrics)
            return results[self.llm]["faithfulness"].mean()
    

After the workflow is defined, use the auto_optimize method to optimize the workflow parameters. Underneath the hood, we leverage derivative-free optimization techniques, such as Bayesian optimization, to efficiently search the problem space and return a set of retrieval parameters optimized for your data.

# Create the workflow class
example_workflow = ExampleNDRagWorkflow(
	documents, objective_maximize=True
)

# Optimize pipeline parameters defined in parameter_specs
results = auto_optimize(example_workflow, n_trials=10)

# Best retrieval parameters found
print(results['best_params'])

Wrapping up

Now that we've identified the ideal retrieval parameters for our document store, we can auto-evaluate our candidate LLMs to see how they compare to each other and to train our custom model router to maximize performance.