Routing between RAG agents

This guide walks you through creating your own router that routes queries between two custom RAG agents, and subsequently routes to a weak or strong model within those agents to optimize accuracy, cost, and latency. Agents often leverage LLMs and various other tools, such as a context retriever and other tools in order to allow LLMs to reason and plan. For agents to be effective, they need to be specialized, which means routing a query to the most suitable agent is incredibly important.

While it is possible to use an LLM to route queries using prompt engineering, it is not feasible in practice because it leads to unacceptable latency. Not Diamond is specifically designed for query routing and beats GPT-4o at not only accuracy but also latency.

🚀

Faster speeds

The average latency of Not Diamond’s API is only 300ms, while GPT-4o is 1500ms. You can also deploy to your VPC for faster routing speeds of 10-50ms.

You can follow along with the example below or in this notebook.

What you will learn

By the end of this guide, you will:

  1. Train a custom router that routes between two RAG agents.
  2. Use the Not Diamond + LangChain integration to build a workflow that routes queries to the most suitable agent.
  3. Use Not Diamond to save cost by routing simple queries to cheaper models within each agent.

Setup

Install the prerequisite packages and setup your API keys

!pip install --quiet --upgrade langchain langchain-community langchain-chroma pandas notdiamond[create]
import os

os.environ["NOTDIAMOND_API_KEY"] = 'NOTDIAMOND_API_KEY'
os.environ["OPENAI_API_KEY"] = "OPENAI_API_KEY"
os.environ["ANTHROPIC_API_KEY"] = 'ANTHROPIC_API_KEY'

Define your agents

In this example, we will define 2 RAG agents, one that answers questions about LLM powered agents, and another about RoRF, Not Diamond’s open-source LLM router.

We will use LangChain to build these retrievers

import bs4
from langchain_chroma import Chroma
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter

# Load, chunk and index the contents of the blog.
agents_blog_loader = WebBaseLoader(
    web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
    bs_kwargs=dict(
        parse_only=bs4.SoupStrainer(
            class_=("post-content", "post-title", "post-header")
        )
    ),
)
agent_blog_docs = agents_blog_loader.load()

agent_blog_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
agent_blog_splits = agent_blog_splitter.split_documents(agent_blog_docs)
agent_blog_vectorstore = Chroma.from_documents(documents=agent_blog_splits, embedding=OpenAIEmbeddings())

# Retrieve and generate using the relevant snippets of the blog.
agent_blog_retriever = agent_blog_vectorstore.as_retriever()
rorf_loader = WebBaseLoader(
    web_paths=("https://www.notdiamond.ai/blog/rorf",)
)
rorf_docs = rorf_loader.load()

rorf_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=900)
rorf_splits = rorf_splitter.split_documents(rorf_docs)
rorf_vectorstore = Chroma.from_documents(documents=rorf_splits, embedding=OpenAIEmbeddings())

rorf_retriever = rorf_vectorstore.as_retriever()

Next, we will define the retrievers as custom models using notdiamond.llms.config.LLMConfig

from notdiamond.llms.config import LLMConfig

agentic_blog_agent = LLMConfig(
    provider="agent",
    model="agentic_blog",
    is_custom=True,
    context_length=128000,
    input_price=2.5, # USD per million tokens
    output_price=10.0, # USD per million tokens
    latency=1 # time to first token (seconds)
)

rorf_agent = LLMConfig(
    provider="agent",
    model="rorf_blog",
    is_custom=True,
    context_length=200000,
    input_price=3., # USD per million tokens
    output_price=15., # USD per million tokens
    latency=1 # time to first token (seconds)
)

agents = [
    agentic_blog_agent,
    rorf_agent
]

Training your agent router

Not Diamond provides a convenient interface for training your own custom router. It supports not only models that are supported officially, but also any arbitrary custom models or workflows. Here, we’ll simply define an agent as a “custom model” and use the documents in the retrievers as the training dataset.

import pandas as pd
from pprint import pprint

train_data = {}
for provider in agents:
    df = {"input": [], "response": [], "score": []}

    for doc in agent_blog_splits:
        score = 1 if provider.model == "agentic_blog" else 0
        df["input"].append(doc.page_content)
        df["response"].append("")
        df["score"].append(score)

    for doc in rorf_splits:
        score = 1 if provider.model == "rorf_blog" else 0
        df["input"].append(doc.page_content)
        df["response"].append("")
        df["score"].append(score)

    train_data[provider] = pd.DataFrame(df)

provider = agents[0]
pprint(f"Train samples: {len(train_data[provider])}")

Here we simply use the document chunk in the retriever as the input prompt to the agent and give each agent a score of 1 depending on whether the chunk is from the agentic_blog_retriever or the rorf_retriever.

To start the training run, simply use the notdiamond.toolkit.CustomRouter class to initiate the training

from notdiamond.toolkit import CustomRouter

# Initialize the CustomRouter object for training
trainer = CustomRouter(
    language="english",
    maximize=True,  # Indicate if higher scores are better (setting to False indicates the opposite)
)

# Train the model using your dataset
preference_id = trainer.fit(
    dataset=train_data,  # The dataset containing inputs, responses, and scores
    prompt_column="input",  # Column name for the input prompts
    response_column="response",  # Column name for the model responses
    score_column="score"  # Column name for the scores
)
print(preference_id)

Calling the trainer.fit method will return a preference_id, which is used to call the trained router. You can monitor the training status on your Not Diamond Dashboard.

Once the status shows Deployed, you can proceed to use the custom router.

Not Diamond + LangChain integration

Not Diamond has built in integration with LangChain through NotDiamondRunnable and NotDiamondRoutedRunnable. The key difference between NotDiamondRunnable and NotDiamondRoutedRunnable is that NotDiamondRunnable only returns the recommended model, while NotDiamondRoutedRunnable will also invoke the recommended model and return the response.

We will use the integration to build a workflow that

  1. Routes a given query to the most suitable agent using the custom router we trained in the previous section.
  2. Saves cost by routing simple queries to cheaper models. This can often make a big difference as RAG queries tend to carry a lot of input tokens.

First we will define the components we need to build this workflow

from notdiamond import NotDiamond
from notdiamond.toolkit.langchain import NotDiamondRoutedRunnable, NotDiamondRunnable

from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_core.prompts import PromptTemplate, ChatPromptTemplate

# This client is instantiated with the preference_id, which will call the custom router we trained
custom_router_client = NotDiamond(preference_id=preference_id)

# This client uses Not Diamond's out-of-the-box (OOB) router
oob_client = NotDiamond()

system_prompt = """
You are an assistant for question-answering tasks.
Use the following pieces of retrieved context to answer the question.
If you don't know the answer, just say that you don't know.
Use three sentences maximum and keep the answer concise.
Context: {context}
"""

# Define the prompt template to use for the RAG request
rag_prompt = ChatPromptTemplate(
    [
        ("system", system_prompt),
        ("human", "{question}")
    ]
)

def format_docs(docs):
		"""Concatenate all the relevant documents into a single string"""
    return "\n\n".join(doc.page_content for doc in docs)

# These are the LLMs we want to route between depending on query difficulty
# in the rorf_agent
rorf_llms = [
    "anthropic/claude-3-haiku-20240307", 
    "anthropic/claude-3-5-sonnet-20240620"
]

# Define the runnable that will return the recommended LLM
rorf_router = NotDiamondRunnable(
    nd_client=oob_client,
    nd_kwargs={
        'tradeoff': "cost", # By setting 'tradeoff': 'cost' Not Diamond will route queries to cheaper LLMs when doing so does not affect quality
        "model": rorf_llms
        }
)

# Define the runnable that will return the recommended LLM's response
rorf_routed_runnable = NotDiamondRoutedRunnable(
    nd_client=oob_client, 
    nd_kwargs={
        'tradeoff': "cost", 
        "model": rorf_llms
        }
    )
    
def rorf_prompt_formatter(input):
		"""Print the recommended model and format the prompt template"""
    model = rorf_router.invoke(input["question"])
    print(f"This prompt is routed to the RoRF blog agent using {model}")
    return rag_prompt.invoke(input)

agentic_blog_llms = [
    "openai/gpt-4o-mini", 
    "openai/gpt-4o"
]

agentic_blog_router = NotDiamondRunnable(
    nd_client=oob_client,
    nd_kwargs={
        'tradeoff': "cost", 
        "model": agentic_blog_llms
        }
)

agentic_blog_routed_runnable = NotDiamondRoutedRunnable(
    nd_client=oob_client,
    nd_kwargs={
        'tradeoff': "cost", 
        "model": agentic_blog_llms
        }
    )

def agentic_prompt_formatter(input):
    model = agentic_blog_router.invoke(input["question"])
    print(f"This prompt is routed to the agentic blog agent using {model}")
    return rag_prompt.invoke(input)

Next we will define the workflow

agent_blog_agent = (
    {"context": agent_blog_retriever | format_docs, "question": RunnablePassthrough()}
    | RunnableLambda(agentic_prompt_formatter)
    | agentic_blog_routed_runnable
    | StrOutputParser()
)

rorf_agent = (
    {"context": rorf_retriever | format_docs, "question": RunnablePassthrough()}
    | RunnableLambda(rorf_prompt_formatter)
    | rorf_routed_runnable
    | StrOutputParser()
)

def routes(query):
    if query["agent"] == "agent/agentic_blog":
        return agent_blog_agent.invoke(query["question"])
    else:
        return rorf_agent.invoke(query["question"])

def agent_router(query):
    messages = [
        {"role": "user", "content": query}
    ]
    session_id, provider = custom_router_client.chat.completions.model_select(
        messages=messages,
        model=agents
    )
    return str(provider)

full_chain = (
    {"agent": RunnableLambda(agent_router), "question": RunnablePassthrough()} 
    | RunnableLambda(routes)
    )

Let’s test it!

full_chain.invoke("How many models does RoRF LLM router support?")
This prompt is routed to the RoRF blog agent using anthropic/claude-3-haiku-20240307

According to the context, the RoRF LLM router supports routing between two models. The passage states that "RoRF can only route between two models and it does not have the ability to learn in real-time based on feedback."

Our router is working as expected and a cheap model was used as it was a relatively simple question. What about the agentic blog agent?

full_chain.invoke("What is Task Decomposition?")
This prompt is routed to the agentic blog agent using openai/gpt-4o-mini

Task decomposition is the process of breaking down a complicated task into smaller, more manageable steps. It often involves techniques like Chain of Thought (CoT), where the model is prompted to "think step by step" to enhance its performance on complex tasks. This approach helps clarify the model\'s reasoning process and makes it easier to tackle difficult tasks.

It got routed to the right agent using a cheap model. What if we ask a complicated question?

full_chain.invoke("Explain self-reflection for agentic workflows and write a short example using langchain")
This prompt is routed to the agentic blog agent using openai/gpt-4o

Self-reflection in agentic workflows refers to the process by which autonomous agents iteratively refine their decision-making by analyzing and learning from past actions and mistakes. This capability is crucial for improving performance over time, particularly in complex environments where trial and error are common. Through self-reflection, agents can dynamically adjust their strategies to better achieve their objectives.\n\nExample using Langchain:\nImagine an agent using Langchain to manage a customer support chatbot. After each interaction, the agent logs the conversation and evaluates whether the customer issue was resolved satisfactorily. If not, the agent reviews the conversation to identify where the response could be improved, updates its response database, and applies this learning in future interactions to provide better support.

A more powerful model is used here to maximize output quality on this more challenging question.

Conclusion

In this example, we showed how you can train a custom router to route queries between two custom RAG agents, and subsequently routes to a weak or strong model within those agents to optimize accuracy, cost, and latency..

To try out this example, sign up to get a Not Diamond API key and run the example in this notebook.