Getting the most out of Databricks Vector Search: Using Writeback Tables as Feature Tables
Introduction
Anyone who has been paying attention to industry AI development over the past 2–3 years is likely aware of the advancements in document search technology, moving beyond simple keyword searches to searching based on the meaning of the user’s query. This concept, known as semantic searching, can be summarized as finding the closest result based on the intent of the input. For example, “find all the positive reviews of Sony TVs that talk about the screen quality” is something you simply can’t filter by keyword without a lot of effort. However, with vector search (and hybrid search), this becomes a fairly simple task.
I’ve written a few blog posts about Vector Search on Databricks already, but in this one, I want to stack several of the services together to give a deeper demonstration of how all the components fit together. I also provide some sample architecture with one goal in mind — simplicity. It is often the case that our view of the data/AI ecosystem changes once we have a firm foundation and reference architecture. Hopefully, this post can help with that.
I’ve numbered the steps in the following sections for readability and reference.
1. Setup a VS Endpoint
To start with getting to the writeback table, we first need to create a VS endpoint.
- Go to Compute.
- Click on Vector Search.
- Give the endpoint a name.
- Wait for the provisioning process to complete. This usually takes 5–10 minutes.
2. Get some data and process it for VS
Hopefully, you have an idea of the type of data you want to search against. This data needs to be transformed into a delta table. I will share some code and architecture to help demonstrate what’s happening. This architecture is meant to be simplistic — getting started with a developing ecosystem can be challenging without a model architecture to start from.
Example
As an example, I’ll use product reviews from amazon. In this scenario, a user is using an API like FastAPI to submit a review and being returned recommend products (vector search) based on their review and purchase.
Here’s some code to help speed up your own demo if you want to use this example as a reference. For example, if we’re using autoloader to read and write these reviews as they are stored from your source, we could do the following.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType(
[
StructField("id", StringType(), True),
StructField("name", StringType(), True),
StructField("asins", StringType(), True),
StructField("brand", StringType(), True),
StructField("categories", StringType(), True),
StructField("keys", StringType(), True),
StructField("manufacturer", StringType(), True),
StructField("reviews_date", StringType(), True),
StructField("reviews_dateAdded", StringType(), True),
StructField("reviews_dateSeen", StringType(), True),
StructField("reviews_didPurchase", StringType(), True),
StructField("reviews_doRecommend", StringType(), True),
StructField("reviews_id", StringType(), True),
StructField("reviews_numHelpful", StringType(), True),
StructField("reviews_rating", StringType(), True),
StructField("reviews_sourceURLs", StringType(), True),
StructField("reviews_text", StringType(), True),
StructField("reviews_title", StringType(), True),
StructField("reviews_userCity", StringType(), True),
StructField("reviews_userProvince", StringType(), True),
StructField("reviews_username", StringType(), True),
]
)
# Use Autoloader
(
spark
.readStream
.format("cloudfiles")
.option("cloudfiles.format", "csv")
.option("header", True)
.schema(schema)
.load(external_raw_path)
.writeStream
.option("checkpointLocation", "/Volumes/main/default/sample_data/amazon_reviews_checkpoint")
# .trigger(availableNow=True)
.toTable("main.default.amazon_reviews")
)
Next, we’ll add a primary key since the ID from this dataset has repeated values. I’m going to limit my dataset to 100 since this is just a demo and save this table as main.default.amazon_reviews_bronze
. Finally, don’t forget to enable change data feed (CDF).
from pyspark.sql.functions import monotonically_increasing_id
df = spark.read.table('main.default.al_test5').withColumn("primary_id", monotonically_increasing_id())
(
df
.select('primary_id','reviews_text', 'reviews_rating')
.limit(100)
.write
.mode('append')
.saveAsTable('main.default.amazon_reviews_bronze')
)
spark.sql("ALTER TABLE `main`.`default`.`amazon_reviews_bronze` SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
Note: Currently, Databricks will not carry the primary key configuration from the source delta table to the index/writeback table. That means, even if you run something like the following, you will lose the primary key assignment on the primary_id dimension when you create your index/writeback tables.
%sql
create table main.default.amazon_reviews_bronze(
primary_id bigint,
reviews_text string,
reviews_rating string,
primary key(primary_id)
)
3. Vectorization
Now we’re ready to create our embeddings. You can use an external embedding model or one whose endpoint is ready by default in Databricks. For example, databricks-bge-large-en
is currently in Preview and requires no setup. In this step, be sure to turn on Sync computed embeddings. This is how the writeback_table
delta table is created. I selected triggered to do this once, but of course, we can set this to continuous in a demo. The purpose of the writeback table is so the user can query the embeddings. You cannot query the index table to obtain embedding information.
Note: The delta table that will be created called main.default.amazon_reviews_bronze_index_writeback_table
will be linked to your vector search index table. That means if you delete the index table, the writeback table will also disappear. That means you will be erasing the embeddings. However, if we want to use these embeddings for, say, classification or cluster analysis, we need to register it in Databricks Feature Store.
4. Register the writebacktable in the Feature Store
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup
fe = FeatureEngineeringClient()
df = spark.read.table('main.default.amazon_reviews_bronze_index_writeback_table')
fe.create_table(
name='main.default.emeddings_feature_table',
primary_keys=["primary_id"],
df=df,
schema=df.schema,
description="test table"
)
Running the code creates the feature table. We can then take this table and use it for ML tasks. Notice in the image below that primary_id
is the primary key for this table. In fact, any table that has a primary key is automatically stored in Databricks Feature Store.
Going to Features within the Databricks UI will verify that this table is stored there.
Example Revisited
Going back to our example (see architecture), we can now do all of this in an automated way with this setup.
- A user submits a review using FastAPI
- The submission is sent to an external volume location within Azure
- The submission is processed using a continuous vector integration on our source table with CDF enabled
Then append the feature table with this review submission so it can be used for model training.
Summary
Here I have provided some sample architecture along with code to demonstrate how one can create and deploy a writeback table as a feature table. This is important for clustering and classification tasks that might take a user’s embeddings and train a model on them.