Chatting with your Delta Tables Using Databricks Connect, Streamlit, and Databricks Model Serving
In a previous post, I described how you can use natural language to generate SQL statements within your Databricks notebooks. The essential idea is to rely upon the metadata stored in your information.schema
along with some prompt engineering to generate the SQL code, run that code using spark.sql()
, send the response and query pair to an LLM for interpretation, and return it to the user. When all of these components are running together, we’re ready to build a small application to speak to our data in a natural way.
In this post, I’ll be demonstrating how to tables stored in Unity Catalog (UC) from VSCode (VSC) and integrating that with a simple Streamlit application that can be ran locally on your machine. I’ll be doing this using Databricks’ Model Serving, utilizing OpenAI’s gpt-3.5-turbo
.
Below are some initial configurations and necessary Python libraries that need to be installed.
- Install the
databricks-connect
python library withpip install — upgrade “databricks-connect==14.0.*”
. - Setup the Databricks CLI.
- Set up a model serving endpoint using Python or the UI under Serving in Databricks. I chose to use an external model (OpenAI) in this example. The only thing you will need to change is the
client.predict()
function in the provided code. - Streamlit is a free and open-source framework that is extremely easy to use and quick if you want an interface that “just works” without much setup.
- MLflow is another open-source tool I’ll be using to establish a connection to my cluster by utilizing the Databricks profile I set up (see step 2) and the cluster ID.
I decided to write this article in a step-by-step approach to help accelerate debugging.
Checkpoint 1:
So far, if you’ve completed the steps above, the following code will run without any issues.
import streamlit as st
import mlflow.deployments
client = mlflow.deployments.get_deploy_client("databricks")
# PySpark and databricks-connect related imports
from databricks.connect.session import DatabricksSession as SparkSession
from databricks.sdk.core import Config
config = Config(profile="<your_profile_name>", cluster_id="<your_cluster_id>")
spark = SparkSession.builder.sdkConfig(config).getOrCreate()
Additionally, you should be able to run the following command in the terminal in VSC.
streamlit run <your_file_name>.py
Checkpoint 2:
Next, I’m going to build some helper functions to will help with the prompt engineering aspect.
This first function will be used to provide metadata text and an example response to the system
role.
def sql_template(table_meta_text,column_meta_text):
SQL_TEMPLATE = f"""This is the relevant table and column information for building SQL code.
CONTEXT TO USE TO CONSTRUCT SQL QUERY:
TABLE INFORMATION:
{table_meta_text}
COLUMN INFORMATION:
{column_meta_text}
"""
EXAMPLE_RESPONSE = f""" You are a SQL assistant. Your only job is to write proper SQL code that can be ran in Databricks notebooks. YOU DO NOT DO ANYTHING OTHER THAN WRITE SQL CODE.. Here is an example of how you would respond:
<USER'S QUERY>:
How many customers are there?
<YOUR RESPONSE>:
SELECT COUNT(customer_id) AS total_customers
FROM customers
<USER'S QUERY>:
How many trips are there?
<YOUR RESPONSE>:
SELECT COUNT(trip_id) AS total_customers
FROM customers
"""
return SQL_TEMPLATE, EXAMPLE_RESPONSE
This next function will used to feed the returned PySpark DataFrame (transformed into a dictionary object) along with the user’s query to the LLM for interpretation.
def return_sql_response(df_to_list, user_question):
#df is a pandas dataframe
RESPONSE_TEMPLATE = f"""
You are a nice assistant that organizes information into a summary.
The user asked <USER'S QUERY> and the response is given by <DICTIONARY>. You should be conversational and speak as a human. For example, don't mention the dictionary.
<DICTIONARY>:
{df_to_list}
<USER'S QUERY>:
{user_question}
"""
return RESPONSE_TEMPLATE
This function will be used to extract stored table and column comments that I mentioned in my previous post.
def get_metadata(catalog,table_name):
table_meta = spark.sql(f"select * from {catalog}.information_schema.tables where table_name = '{table_name}'")
table_meta_text = table_meta.select('comment').collect()[0]['comment']
import json
column_meta = spark.sql(f"select * from {catalog}.information_schema.columns where table_name = '{table_name}'")
column_meta_text_preprocess = column_meta.select('column_name', 'comment', 'data_type').collect()[:]
# Initialize a list to store dictionaries for each row
data_list = []
# Iterate over the sample data and populate the list with dictionaries
for row in column_meta_text_preprocess:
data_list.append({
'column_name': row.column_name,
'comment': row.comment,
'data_type': row.data_type
})
# Convert the list of dictionaries to JSON format
column_meta_text = json.dumps(data_list, indent=4)
return table_meta_text, column_meta_text
With these functions defined, we can run the following code without any issues. Here, I created a sample catalog and two sample tables that revolve around customer information and trip information. Here I chose to isolate the prompt that builds the SQL code and the prompt that interprets a resulting output from running the generated SQL code through Spark.
catalog = 'ai_gen_cat' #sample catalog
table_name1 = 'trips' #sample table1
table_name2 = 'customers'#sample table2
table_meta_text1, column_meta_text1 = get_metadata(catalog,table_name1)
table_meta_text2, column_meta_text2 = get_metadata(catalog,table_name2)
table_meta_text = table_meta_text1 + table_meta_text2
column_meta_text = column_meta_text1 + column_meta_text2
spark.sql("use catalog ai_gen_cat;")
spark.sql("use database ai_gen_db;")
SQL_TEMPLATE, EXAMPLE_RESPONSE = sql_template(table_meta_text,column_meta_text)
messages = [
{
"role": "system",
"content": EXAMPLE_RESPONSE
},
{
"role": "system",
"content": SQL_TEMPLATE
},
]
messages2 = [] # This will be used for interpretation
Checkpoint 3:
Here is the main()
definition. There are two parts:
- Send the input query (using the Streamlit UI) to Databricks.
- Send the returned result back to the LLM.
I changed the temperature
parameter on the second LLM response so the response is more chat-like.
def main():
# Set page title
st.title("Simple Databricks App")
# Get user input
query = st.text_input("Ask your question here:")
messages.append({"role": "user", "content": query})
completions_response = client.predict(
endpoint="chat_completion",
inputs={
"messages": messages,
"temperature": 0.1,
"max_tokens": 50,
"n": 1
}
)
sql_result = completions_response.choices[0]['message']['content']
# print(sql_result)
# # Execute the SQL code using Spark
df = spark.sql(sql_result)
df_to_list = [row.asDict() for row in df.collect()]
#####################################################################
###############| Send response to LLM for interpretation |###########
#####################################################################
RESPONSE_TEMPLATE = return_sql_response(df_to_list, query)
messages2.append({"role": "system", "content": RESPONSE_TEMPLATE})
messages2.append({"role": "user", "content": "Interpret the result please."})
print(messages)
completions_final_response = client.predict(
endpoint="chat_completion",
inputs={
"messages": messages2,
"temperature": 0.5,
"max_tokens": 50,
"n": 1
}
)
# # Extract SQL code from completion
final_result = completions_final_response.choices[0]['message']['content']
# Check if user has entered a name
if query:
# Display personalized greeting
st.write(final_result)
if __name__ == "__main__":
main()
Now, running Streamlit should give you the ability to talk to this data!
streamlit run <your_file_name>.py
Result Demo
Below is a demo of some sample questions along with average response times. This is far and away from a perfect solution — anyone who has tried to use LLMs over the past year for code generation knows that there is ample room for hallucinations. However, this still exhibits a fairly quick response time with so many components running in the background.
Conclusion
In this article, I’ve demonstrated how one can use Databricks Connect in Visual Studio Code along with Streamlit to speak to your data stored in Unity Catalog using Databricks Model Serving.