Table of Contents
OpenBB as the infrastructure layer to connect your data and AI agents
Integrating Snowflake's Cortex Search
Setting up on Snowflake
Setting up
Upload Data
Parse PDF Files
Cortex Search Service
Setting up on OpenBB
copilots.json
main.py
Snowflake Search Functions
Running in OpenBB
OpenBB as the infrastructure layer to connect your data and AI agents
As organizations increasingly seek to leverage the power of AI and data, the ability to seamlessly integrate proprietary data sources and custom AI agents into their workflows has become a critical need.
At OpenBB, we understand the challenges firms face in bridging the gap between data intelligence and operational workflows. That's why we've built the OpenBB Workspace as the infrastructure layer that empowers organizations to connect their data and AI strategies effortlessly.
Integrating Snowflake’s Cortex Search
As I was exploring Cortex Analyst and Cortex Search recently, I came across a tutorial from Snowflake on how to create a PDF chatbot. By leveraging our open source agentic framework, I decided to integrate it into OpenBB to demonstrate how we empower teams to build tailored solutions using their own data and AI models.
In this blog post, I’ll walk you through the whole process of integrating Snowflake's Cortex Search into OpenBB as a custom AI agent.
This example is particularly relevant for firms looking to maximize the value of their Snowflake data using AI.
For a practical demonstration of this integration, check out my video showcasing the AI chatbot using Snowflake’s Cortex search over FOMC minutes.
If you’re a technical user looking for a deeper explanation of how I built this, let’s dive in.
Step-by-step guide
Setting up on Snowflake
1 - Setting up
We are going to follow along with the tutorial for setup. Everything here is available open source.
The first thing we need to do is create our snowflake objects.
1 2 3 4 5 6 7 8 9 10
CREATE DATABASE IF NOT EXISTS cortex_search_tutorial_db; CREATE OR REPLACE WAREHOUSE cortex_search_tutorial_wh WITH WAREHOUSE_SIZE='X-SMALL' AUTO_SUSPEND = 120 AUTO_RESUME = TRUE INITIALLY_SUSPENDED=TRUE; USE WAREHOUSE cortex_search_tutorial_wh; CREATE OR REPLACE STAGE cortex_search_tutorial_db.public.fomc DIRECTORY = (ENABLE = TRUE) ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');
2 - Upload Data
The sample data includes 12 PDFs - minutes from February 2023 through June 2024 and the link is here.
In snowsight - you can navigate to the Data section in the left hand Nav-Bar then go to the cortex_search_tutorial_db.public.fomc
stage we just created and upload the 12 pdfs.
3 - Parse PDF Files
Here we present the Snowflake tutorial for chunking.
What the following code does is create a user defined function (UDF) that will read the pdf using the PyPDF2 library. For each page in a specific file, this function will extract the text and then it will recursively chunk that text into 2000 character groups (overlapping 300 characters).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
CREATE OR REPLACE FUNCTION cortex_search_tutorial_db.public.pdf_text_chunker(file_url STRING) RETURNS TABLE (chunk VARCHAR) LANGUAGE PYTHON RUNTIME_VERSION = '3.9' HANDLER = 'pdf_text_chunker' PACKAGES = ('snowflake-snowpark-python', 'PyPDF2', 'langchain') AS $$ from snowflake.snowpark.types import StringType, StructField, StructType from langchain.text_splitter import RecursiveCharacterTextSplitter from snowflake.snowpark.files import SnowflakeFile import PyPDF2, io import logging import pandas as pd class pdf_text_chunker: def read_pdf(self, file_url: str) -> str: logger = logging.getLogger("udf_logger") logger.info(f"Opening file {file_url}") with SnowflakeFile.open(file_url, 'rb') as f: buffer = io.BytesIO(f.readall()) reader = PyPDF2.PdfReader(buffer) text = "" for page in reader.pages: try: text += page.extract_text().replace('\n', ' ').replace('\0', ' ') except: text = "Unable to Extract" logger.warn(f"Unable to extract from file {file_url}, page {page}") return text def process(self, file_url: str): text = self.read_pdf(file_url) text_splitter = RecursiveCharacterTextSplitter( chunk_size = 2000, # Adjust this as needed chunk_overlap = 300, # Overlap to keep chunks contextual length_function = len ) chunks = text_splitter.split_text(text) df = pd.DataFrame(chunks, columns=['chunk']) yield from df.itertuples(index=False, name=None) $$;
Once the function is created, we then need to build the Snowflake table that contains the chunks. This is done here. Note that this is keeping the file url as well, so that we can reference it later!
1 2 3 4 5 6 7 8 9 10
CREATE OR REPLACE TABLE cortex_search_tutorial_db.public.docs_chunks_table AS SELECT relative_path, build_scoped_file_url(@cortex_search_tutorial_db.public.fomc, relative_path) AS file_url, -- preserve file title information by concatenating relative_path with the chunk CONCAT(relative_path, ': ', func.chunk) AS chunk, 'English' AS language FROM directory(@cortex_search_tutorial_db.public.fomc), TABLE(cortex_search_tutorial_db.public.pdf_text_chunker(build_scoped_file_url(@cortex_search_tutorial_db.public.fomc, relative_path))) AS func;
This process took me about a minute, so don't worry that it is not immediate.
4 - Cortex Search Service
Here we create the Search Service that we will be using. For more information on creating the cortex search, including the embedding model, the documentation is here.
1 2 3 4 5 6 7 8 9 10 11 12 13
CREATE OR REPLACE CORTEX SEARCH SERVICE cortex_search_tutorial_db.public.fomc_meeting ON chunk ATTRIBUTES language WAREHOUSE = cortex_search_tutorial_wh TARGET_LAG = '1 hour' AS ( SELECT chunk, relative_path, file_url, language FROM cortex_search_tutorial_db.public.docs_chunks_table );
Setting up on OpenBB
Up to now, I have not provided any additional information from what the linked tutorial gave. Now we'll move away and walk through the key components of the OpenBB custom chatbot.
For more examples on custom copilots, and how they get extended to use widgets or other components, see our open source repo.
These are the main components:
1 - copilots.json
Here we define the custom chat bot.
1 2 3 4 5 6 7 8 9 10 11 12 13
{ "snowflake_cortex": { "name": "Snowflake Cortex", "description": "Tutorial Chatbot for FOMC minutes in Snowflake Search", "image": "<https://upload.wikimedia.org/wikipedia/commons/f/ff/Snowflake_Logo.svg>", "hasStreaming": true, "hasDocuments": true, "hasFunctionCalling": false, "endpoints": { "query": "<http://localhost:7777/v1/query>" } } }
The important one to note here is the query endpoint -- this will be where we define the endpoint to return the data from.
2 - main.py
This is our FastAPI application for the chat interface. Here we point out two things, first is the SSE events. The frontend expects data to be returned like this, so we implement this AsyncGenerator:
1 2 3 4 5 6
async def create_message_stream(content: AsyncStreamedStr) -> AsyncGenerator[dict, None]: async for chunk in content: yield { "event": "copilotMessageChunk", "data": json.dumps({"delta": chunk}), }
And then the query endpoint:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
@app.post("/v1/query") async def query(request: AgentQueryRequest) -> EventSourceResponse: """Query the Copilot.""" # Prepare chat messages for the LLM. chat_messages = [] for message in request.messages: if message.role == "ai": chat_messages.append( AssistantMessage(content=sanitize_message(message.content)) ) elif message.role == "human": chat_messages.append( UserMessage(content=sanitize_message(message.content)) ) # This is the main execution loop for the Copilot. async def execution_loop(): # Right now we only do QA one message at a time result = _llm(sanitize_message(message.content)) for event in result: yield event # Stream the SSEs back to the client. return EventSourceResponse( content=create_message_stream(execution_loop()), media_type="text/event-stream")
Breaking this down:
The input request contains the messages in request.messages. If we wanted to implement the chat history, we would utilize the chat_messages
list. However, for this demo, I am just showing a simple one question at a time interface, so we only use the last message. The function _llm
will be defined next in our snowflake, but that returns a stream of messages that we then send to the copilot.
3 - Snowflake Search Functions
In this repo, I have created a cortex_search
function. Let's look at what we have in here:
Connecting to snowflake session
We need to define these in environment variables:
1 2 3 4 5 6 7 8 9 10 11 12
from snowflake.core import Root from snowflake.cortex import complete from snowflake.snowpark import Session dotenv.load_dotenv() CONNECTION_PARAMETERS = { "account": os.environ["SNOWFLAKE_ACCOUNT"], "user": os.environ["SNOWFLAKE_USER"], "password": os.environ["SNOWFLAKE_PASSWORD"], "role": os.environ["SNOWFLAKE_ROLE"], } session = Session.builder.configs(CONNECTION_PARAMETERS).create() root = Root(session)
Getting and using the Search Service
Here is where we would edit if we have multiple Cortex Search Services. In this example I just have the one, so we look at it and extract the needed objects:
1 2 3 4 5 6 7 8
#Assuming we have a single service services = session.sql("SHOW CORTEX SEARCH SERVICES;").collect() database, warehouse, schema, service_name = ( services[0].database_name, services[0].warehouse, services[0].schema_name, services[0].name, )
We will use the search service that we found below. Here we pass through the query, and the function will return the results of the search, which are the limit
most similar documents. In a real use case, you could pass through specific columns or filters through to limit the search for better results.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
def query_cortex_search_service( query, columns=[], filter={}, limit=5, database=database, schema=schema, service_name=service_name, ): cortex_search_service = ( root.databases[database].schemas[schema].cortex_search_services[service_name] ) context_documents = cortex_search_service.search( query, columns=columns, filter=filter, limit=limit ) return context_documents.results
Generate the prompt and completion
Here we take in the user's query to create a prompt for the complete service, which was taken from the Streamlit example. This first performs a similarity search over the document chunks using the Cortex Service, then injects the most similar examples to the prompt for summary.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
def create_fomc_prompt( query, columns=["chunk", "file_url", "relative_path"], filter={}, limit=5 ): """ Create a prompt for the language model by combining the query with context retrieved from the cortex search service. Args: query (str): The user's question to generate a prompt for. columns (list): Columns to retrieve from the search service filter (dict): Filter criteria for the search limit (int): Maximum number of results to return Returns: tuple: (str, list) The generated prompt and search results """ # Get context from Cortex search service if not filter: filter = {"@and": [{"@eq": {"language": "English"}}]} results = query_cortex_search_service( query, columns=columns, filter=filter, limit=limit ) # Create the prompt template prompt = f""" [INST] You are a helpful AI chat assistant with RAG capabilities. When a user asks you a question, you will also be given context provided between <context> and </context> tags. Use that context to provide a summary that addresses the user's question. Ensure the answer is coherent, concise, and directly relevant to the user's question. If the user asks a generic question which cannot be answered with the given context, or if the user asks a question that is not related to the context, just say "I don't know the answer to that question." Don't say things like "according to the provided context". But do cite any specific context so that the user can find more information. If you cite, only use the chunk, not the file_url. <context> {results} </context> <question> {query} </question> [/INST] Answer: """ return prompt, results
Then the completion (what is returned from the LLM) is obtained by running:
1 2 3 4 5 6 7 8 9 10
def llm_complete(model, prompt): """ Generate a completion for the given prompt using the specified model. Args: model (str): The name of the model to use for completion. prompt (str): The prompt to generate a completion for. Returns: str: The generated completion. """ return complete(model, prompt, stream=True)
Note that this is where we stream the result for the nice frontend experience.
Putting it all together
Here we define the _llm function
, which takes the query, creates the prompt and then returns the chunks. Note that in this case we are hard coding the model to be 'mistral-7b', but this can be changed to whatever you would like!
1 2 3 4 5 6 7 8 9 10 11
def _llm(query): prompt, results = create_fomc_prompt(query) response = complete("mistral-7b", prompt, stream=True) for chunk in response: yield chunk.replace("$", "\$") # After response is complete, yield the references table markdown_table = "\n\n###### References \n\n| PDF Title | URL |\n|-------|-----|\n" for ref in results: markdown_table += f"| {ref['relative_path']} | {ref['file_url']} |\n" # Yield the table as a final chunk yield markdown_table
4 - Running in OpenBB
We launch the copilot with:
uvicorn custom-copilot.main:app --port 7777
We navigate into OpenBB and add a new copilot in the window, where the url will be https://localhost:7777.
And now this is available in the agent’s interface:
And we can then query the model with the Cortex Search results!
Conclusion
This integration example showcases the flexibility of OpenBB's infrastructure since this approach can easily be extended to any data in your Snowflake stages, Snowflake Document AI, or your existing RAG pipelines. This can be further extended with your own pipelines for processing, utilizing snowpipe and tasks (maybe even Snowflake’s Document AI, as shown here: Tutorial: Create a document processing pipeline with Document AI, or interacting with a chat history or other OpenBB workspace widgets!
By leveraging OpenBB's open-source agentic framework, you can bring your own AI agent into our workspace while maintaining the highest standards of security and compliance.
If you’re wondering how OpenBB could meet your firm’s specific needs, we’d love to discuss your particular use case in a tailored demo. You can reach out to our Head of Quantitative Research, Ihsan Saracgil at ihsan.saracgil@openbb.finance.