A Hybrid Approach with Databricks’ Vector Search and Reciprocal Rank Fusion

Matthew McCoy, Ph.D.
6 min readApr 30, 2024

--

Hybrid search methods are rapidly becoming the norm, particularly with the widespread adoption of embeddings across various industries interested in AI solutions. Understanding these methods is crucial for teams seeking to implement customized search solutions (especially those that look to implement open-source solutions). While hybrid search isn’t a novel concept (consider Azure’s AI Search), here we delve into implementing a hybrid search method on Databricks using Vector Search and Best Match 25.

Goal: Provide sample code for how to implement a 2-tier information retrieval strategy using Reciprocal Rank Fusion. The keyword algorithm used here is BM25 and the embeddings are OpenAI’s text-embedding-3-small.

Example Data Source: I will be using this paper from Meta as a part of the code. The file is saved as meta_paper.pdf. This is a paper that was published by Meta in April 2024, so no GPT model has been trained on it (yet).

Reciprocal Rank Fusion (RRF)

What is RRF? This is a method that allows for the combination of various ranking models to enhance the overall performance of your information retrieval method. We’ll be collecting search results from 2 information retrieval strategies:

  1. Vector Search (Databricks)
  2. Traditional Keyword Search (BM25)

Why RRF? This algorithm satisfies two conditions that we’re interested in¹:

  1. High-quality results without the need for any tuning.
  2. Indicator scores can be completely independent of one another.
def reciprocal_rank(rank):
"""
Calculate the reciprocal rank for a given rank.
"""
return 1 / (rank + 1)

def reciprocal_rank_fusion(results_lists):
"""
Perform reciprocal rank fusion for a list of ranked results lists.
"""
fused_scores = {}

# Calculate reciprocal ranks for each result in each list
for i, results_list in enumerate(results_lists):
for rank, (doc, score) in enumerate(results_list):
if doc not in fused_scores:
fused_scores[doc] = 0
fused_scores[doc] += reciprocal_rank(rank)

# Sort the fused scores
sorted_fused_scores = sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)

return sorted_fused_scores

Databricks’ Vector Search

This is Databricks’ vector database offering that’s built right into the Databricks Intelligence Platform. This guarantees comprehensive governance with Unity Catalog (UC), which is crucial when building recommender systems. Combine this with Model Serving (a REST API that provides a highly available and low-latency service for deploying models), you have everything you need to start building a hybrid search and begin chatting with your document. Being able to perform a semantic search against parquet files and coupling that with traditional keyword search methods results in low-latency and quality results. It should be noted that LangChain does not currently have a retriever for databricks-vectorsearch.

Here are the steps and sample code for getting Vector Search ready. If you have never completed these steps before, please see the official Databricks documentation:

  1. Upload the pdf to a Volume within UC.
  2. Chunk your document and attach an index to each chunk.
  3. Write to delta.
  4. Have a model-serving endpoint ready to generate the embeddings in the next step.
  5. Go to the Catalog Explorer and use the UI to create a vector search index.

After completing these steps, you will be able to perform a similarity search. Below is some sample code along with a chunking strategy using CharacterTextSplitter.

from PyPDF2 import PdfReader
from langchain.text_splitter import CharacterTextSplitter
from pyspark.sql.types import StructType, StructField, StringType

path = '/Volumes/hybrid_search_cat/default/meta-paper-vol/meta_paper.pdf'

with open(path,'rb') as f:
pdf_reader = PdfReader(f)
full_text = ''
num_pages = len(pdf_reader.pages)

for page_num in range(num_pages):
page = pdf_reader.pages[page_num]
text = page.extract_text()
full_text += text

text_splitter = CharacterTextSplitter(
separator = '\n',
chunk_size= 1000,
chunk_overlap = 100,
length_function = len,
is_separator_regex= False
)

docs = text_splitter.create_documents([full_text])

schema = StructType([
StructField('page_content',
StringType(),
True)
])

doc_data = [(doc.page_content,) for doc in docs]

df = spark.createDataFrame(doc_data, schema)
df.write.format('delta').mode('overwrite').saveAsTable('meta_paper_chunked')
spark.sql('ALTER TABLE `hybrid_search_cat`.`default`.`meta_paper_chunked` SET TBLPROPERTIES (delta.enableChangeDataFeed = true)')
from databricks.vector_search.client import VectorSearchClient

# Vector Search
vsc = VectorSearchClient()
vector_search_result = (vsc.get_index(endpoint_name="meta_paper_endpoint",
index_name="hybrid_search_cat.default.meta_paper_index")
.similarity_search(num_results=num_results,
columns=["page_content"],
query_text=query)
.get('result', {})
.get('data_array',[]))

Keyword Search (BM25)

We will be using BM25, which is a ranking algorithm that’s used in traditional keyword searches. This is a probabilistic approach to traditional methods by taking into account the frequency of keywords in addition to the length of the document. Mathematically, the scoring has a more formal definition, but we’ll skip to the code³.

Here is some sample code for retrieving the top results from BM25 and combined code:

# Keyword search with BM25
loader = PyPDFLoader('/Volumes/hybrid_search_cat/default/meta-paper-vol/meta_paper.pdf')
loader_split = loader.load_and_split()
corpus = [loader_split[i].page_content for i in range(len(loader_split))]
tokenized_corpus = [doc.split(" ") for doc in corpus]
bm25 = BM25Okapi(tokenized_corpus)
tokenized_query = query.split(' ')
doc_score = bm25.get_scores(tokenized_query)
# Find the indices of the top num_results documents with the highest scores
top_k_indices = doc_score.argsort()[-num_results:][::-1]
# Retrieve the corresponding documents from the original corpus
top_k_results_with_scores = [(corpus[i], doc_score[i]) for i in top_k_indices]
import os
from databricks.vector_search.client import VectorSearchClient
from langchain_community.retrievers import BM25Retriever
from langchain.document_loaders import PyPDFLoader
from rank_bm25 import BM25Okapi
import re

def remove_non_alphabetic(text):
# Remove lowercase letters, punctuation, and non-alphabetic characters
cleaned_text = re.sub(r'[^a-z\s]', '', text.lower())
return cleaned_text

def results(query, num_results):

# Vector Search
vsc = VectorSearchClient(disable_notice=True)
vector_search_result = (vsc.get_index(endpoint_name="meta_paper_endpoint",
index_name="hybrid_search_cat.default.meta_paper_index")
.similarity_search(num_results=num_results,
columns=["page_content"],
query_text=query)
.get('result', {})
.get('data_array',[]))



# Keyword search with BM25
loader = PyPDFLoader('/Volumes/hybrid_search_cat/default/meta-paper-vol/meta_paper.pdf')
loader_split = loader.load_and_split()
corpus = [remove_non_alphabetic(loader_split[i].page_content) for i in range(len(loader_split))]
tokenized_corpus = [doc.split(" ") for doc in corpus]
bm25 = BM25Okapi(tokenized_corpus)
cleaned_query = remove_non_alphabetic(query)
tokenized_query = cleaned_query.split(' ')
doc_score = bm25.get_scores(tokenized_query)
# Find the indices of the top k documents with the highest scores
top_k_indices = doc_score.argsort()[-num_results:][::-1]
# Retrieve the corresponding documents from the original corpus
top_k_results_with_scores = [(corpus[i], doc_score[i]) for i in top_k_indices]
return top_k_results_with_scores, vector_search_result

def reciprocal_rank(rank):
"""
Calculate the reciprocal rank for a given rank.
"""
return 1 / (rank + 1)

def reciprocal_rank_fusion(results_lists):
"""
Perform reciprocal rank fusion for a list of ranked results lists.
"""
fused_scores = {}

# Calculate reciprocal ranks for each result in each list
for i, results_list in enumerate(results_lists):
for rank, (doc, score) in enumerate(results_list):
if doc not in fused_scores:
fused_scores[doc] = 0
fused_scores[doc] += reciprocal_rank(rank)

# Sort the fused scores
sorted_fused_scores = sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)

return sorted_fused_scores

Results

We’ll perform a basic test to showcase the enhancement from a hybrid search.

import mlflow.deployments

client = mlflow.deployments.get_deploy_client("databricks")
query = 'testing philosphy for exploitation evals'
# Define the number of results to retrieve
num_results = 1
top_k_results_with_scores, vector_search_result = results(query, num_results)

# Perform reciprocal rank fusion
results_lists = [top_k_results_with_scores, vector_search_result]
fused_results = reciprocal_rank_fusion(results_lists)
vs_context_string = fused_results[1][0]

rrf_context_string = ''
for doc, _ in fused_results:
context_string += doc

The returned results are shown below.

Vector Search result.
Fusion result (Vector Search + BM25).

In this case, BM25 pulled the correct information. We can then take this text and feed it to the AI model to return a user-friendly reponse that’s grounded in the user’s question and the retrieved text as a part of the RAG lifecycle. Additionally, changing the number of returned results to a value larger than 1 is recommended when testing your search system.

The correct response here is to pull information from this section of the paper.

Now, there is something to be said about a chunking strategy with respect to both methods. However, this is purely a demonstration of how to integrate BM25 and Vector Search with RRF and a scenario where combining a keyword search with a vector search can ground the model in the correct piece of context from a document.

Conclusion

Here we have demonstrated how to implement a hybrid search method using Vector Search on Databricks and BM25. These are two well-known information retrieval strategies, but combining them with RRF can be extremely effective in extracting relevant data from your document as a part of the RAG lifecycle.

References

[1] Reciprocal rank fusion | Elasticsearch Guide [8.13] | Elastic

[2] Retrieval Augmented Generation (RAG) Done Right: Chunking — Vectara

[3] Okapi BM25 — Wikipedia

--

--