LLM Foundations: Vector Databases for Caching and Retrieval Augmented Generation (RAG)

Introduction

Scope

  • Vector and vector search concepts review
  • Concepts and Setup of Milvus DB
  • Milvus DB data manipulation and search
  • Vector DB as a LLM cache
  • Vector DB for Retrieval Augmented Generation (RAG)

Prerequisites

  • NLP for Machine learning
  • LLM and embeddings
  • Python, jupyter notebooks, docker
  • LangChain

Introduction to Vector Databases

What is a vector?

A vector is an object that has both magnitude (size, quantity) and direction (line, angle, trend)

Vectors in Programming:

  • A one-dimensional data structure
  • Homogeneous (has elements of the same type)
  • Defined position for each element
  • Storage / access different from lists and arrays
  • For example: [1.0, 2.1, 2.2]

Vectorization in NLP

  • ML algorithm can only handle numeric data
  • Text data needs to be converted to equivalent numeric representations for ML purposes
  • Vectorization converts text to numeric values
  • Captures structure and / or semantics of original text

Vectorization Techniques

  • Bag of words
  • TF-IDF (text frequency - inverse document frequency)
    • Creates sparse matrices of documents
  • Word embeddings
    • Captures semantic information in vectors
  • Sentence embeddings
    • Popular with large language model (LLM)-based applications
  • Each vector has a series of data points

  • A sentence can be a vector of its embeddings

  • Similarity measures how close two vectors are

  • Distance measures are used to measure similarity

    • Euclidean distance (L2)

      $$
      \begin{equation}d(i,j)=\sqrt {(p_1-q_1)^2 + (p_2-q_2)^2+…+(p_n-q_n)^2}\end{equation}
      $$

    • Inner product (IP)

      $$
      \begin{equation}A \cdot B = \sum_{i=1}^{n} {A_i}{B_i}\end{equation}
      $$

    • Cosine similarity (COSINE)

      $$
      \begin{equation}\cos(A,B) = \frac{A\cdot B}{|A||B|}=\frac{\sum_{i=1}^{n}{A_iB_i}}{\sqrt{\sum_{i=1}^{n}A_i^2}\cdot\sqrt{\sum_{i=1}^{n}B_i^2}}\end{equation}
      $$

  • Vectorize strings using any of the vectorization techniques

    • List of strings to search
    • Query string to compare against
  • Compare vectors using approximate nearest neighbor (ANN) algorithms

  • Use distance measures with ANN to determine similarity

  • Retrieve top-K results ordered by similarity

Vector databases

Vector databases are specialized database products that are optimized for storage and querying of vector data.

  • Vector Database Features
    • Support for vector data types
    • Support for regular datatypes
    • CRUD operations on vector and scalar data
    • Semantic search on vector data

Vectors Databases Available

Open Source Commercial
Specialized Vector Databases Milvus, Chroma, Vespa, Qdrant Pinecone, Weaviate
General databases supporting vector search PostgreSQL, Cassandra, OpenSearch Elasticsearch, Redis, SingleStore

Pros and cons of vector databases

Vector DB Advantages

  • Semantic search support (ANN, distance measures)
  • Bulk data loading
  • Indexing
  • Efficient data retrieval
  • Scalability
  • Clustering and fault tolerance

Vector DB Shortcomings

  • Limited support for traditional querying
  • Transactional support
  • Insert latency when handling large datasets
  • Computationally expensive for semantic searches
  • Memory intensive
  • Integrations

Milvus Databases Concepts

Introduction to Milvus DB

Milvus is a specialized database that is built for storing, indexing, and searching vectors.

  • Open source and commercial
  • Standalone, cluster, and managed (Zilliz cloud) options
  • Highly scalable for vector storage and search
  • Euclidean distance (L2), inner product (IP) and COSINE metrics
  • Hybrid data storage and search
  • Access with SDKs (Python, Node.js, Go, Java)

Milvus architecture

01

  • SDK
  • Access Layer
  • Coordinator Service
  • Metadata storage (ETCD)
  • Message Queue (RocksMQ(default), Kafka, Pulsar)
  • Worker Node
  • Object Storage (MinIO (default), S3, Azure Blob)

02

Collections in Milvus

  • Databases in Milvus
    • Each Milvus instance can manage multiple databases. A single instance can have up to 64 databases)
    • Default database is default. It is automatically created. If a new entity is created without a specified database name, it is stored in the default database.
    • A database is a container for data. It will store collections, partitions, and indexes within it.
    • RBAC implemented by database. Users can be created and configured at a database level. Roles can also be created for each database with specified permissions and then assigned to users.
    • Multi-tenancy option. Each tenant can be provided with their own database, and data belonging to that tenant can be stored there. This provides the highest level of tenant isolation within a Milvus database.
  • Collections in Milvus
    • A milvus collection is like a table in traditional databases. It is the logic entry that is used to store and manage data.
    • Each collection is created by providing a schema that defines fields for data storage. Schema can also be modified with certain restrictions.
    • Fields have datatypes, size, default values
    • Scalar and vector datatypes: A give collection can have a combination of scalar and vector fields
    • Primary keys and auto-generated keys are available
    • Dynamic fields allowed ad hoc fields to be added

Scalar datatypes: INT8, INT16, INT32, INT64, FLOAT, DOUBLE, VARCHAR, BOOL, JSON, ARRAY

Vector datatypes: BINARY_VECTOR, FLOAT_VECTOR

Partitions in Milvus

  • Each collection can bee split up as multiple partitions
  • Data in the same partition is stored physically together
  • Default partition is _default
  • 可以指定从特定partition存取 Data can be inserted to and queried from partitions specially
  • Partition keys can be used for automatic allocation
  • Partition help optimize storage and search options

Indexes in Milvus

  • Indexes help speed up search operations

  • Create on scalar or vector fields

  • One index only per field

  • Organizes vectors based on the approximate nearest neighbor (ANN) metrics type chosen (L2, IP)

  • Indexes is the prerequisite for doing ANN searches

  • Index Types

    Type Use
    FLAT Small dataset, 100% recall rate
    IVF_FLAT Large dataset, fast query, high recall rate
    GPU_IVF_FLAT Same as IVF_FLAT, for GPUs
    IVF_SQ8 Fast query with limited resources
    IVF_PQ Fast query, limited resources, and low recall rate
    HNSW Fast query, high recall, high memory
    SCANN Fast query, high recall, high memory

Managing Data in Milvus

  • Rows are called entities in Milvus
  • Bulk inserts possible and recommended
  • Flush operation is needed to index newly insert data. Milvus automatically flushes data after then pending records reach a specific size after insertion. But if immediate query is need, it is recommended to manually trigger the flush operation.
  • Upsert available based on the primary key. If a duplicated record is inserted with the same primary key, the existing record is updated rather than creating a new record.
  • Records can be deleted by a primary key or a boolean expression.

Query and Search in Milvus

  • Query
    • Scalar-based filtering and retrieval process (like RDBMS)
    • Specify output fields, offset and limits
    • Restrict query to partitions by partition key or name
    • Count(*) available to aggregate data, other capacity like sum() or avg() is not available.
    • Query features are limited compared to RDBMS systems.
  • Filters in Query
    • Comparison Operators (>, >=, <, <=, ==, !=, in)
    • Logical Operators (&&, ||)
    • Match Operators (like)
    • Array Operator (ARRAY_CONTAINS)
    • JSON Operator (JSON_CONTAINS)
    • https://milvus.io/docs/boolean.md
  • Search on Vector Fields
    • Search on any vector field using a search query using distance measures.
    • An input string can be compared to strings in the database, and related strings can be extracted with semantic search. For this, an input string or the search query should first be converted to a vector using the same embedding model as the one used when ingesting the vector field.
    • Metric used should be the same as the index metric (like L2, IP): The metric used for comparison should be the same metric that was used when creating the index for the vector field. Do note that the index is a prerequisite before search can be performed on the vector field.
    • Specify limit and offset
    • Radius can be used to filter based on similarity (distance). The smaller the distance, the higher the similarity.
    • Returns distance to the original query in addition to results

https://milvus.io/docs

Set up Milvus

https://milvus.io/docs/install_standalone-docker.md

1
2
3
4
5
6
7
8
9
10
11
12
13
# Download the installation script
$ curl -sfL https://raw.githubusercontent.com/milvus-io/milvus/master/scripts/standalone_embed.sh

# Start the Docker container
$ bash standalone_embed.sh start
# run UI
docker run -d -p 8000:3000 -e MILVUS_URL={milvus server IP}:19530 zilliz/attu:v2.4.0

# Stop Milvus
$ bash standalone_embed.sh stop

# Delete Milvus data
$ bash standalone_embed.sh stop

After running the installation script:

  • A docker container named milvus has been started at port 19530.
  • An embed etcd is installed along with Milvus in the same container and serves at port 2379. Its configuration file is mapped to embedEtcd.yaml in the current folder.
  • The Milvus data volume is mapped to volumes/milvus in the current folder.

3. Milvus Database Operations

Create a connection

1
2
3
4
5
6
7
8
!pip install pymilvus==2.3.5
!pip install openai==1.6.1
!pip install langchain==0.0.354
!pip install tiktoken==0.5.2
!pip install transformers==4.36.2
!pip install pandas==2.1.4
!pip install pdfminer==20191125
!pip install pdfminer.six==20221105
  • Connecting to Milvus

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    #Creating a connection

    #Import the pymilvus package
    from pymilvus import connections

    #Create list of connections
    connections.add_connection(
    #Specify a name for the connection
    learn={
    "host": "localhost",
    "port": "19530",
    "username" : "",
    "password" : ""
    })

    #Connect
    connection_id="learn" # connection name for future reference
    connections.connect(connection_id)

    #List all connections
    connections.list_connections()

Create databases and users

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#Database operations
from pymilvus import db

#Get current list of databases available to the connection
current_dbs=db.list_database(using=connection_id)
print("Current databases: ", current_dbs)

db_name="course_db"

if ( db_name not in current_dbs):
print("Creating database :", db_name)
wiki_db = db.create_database(db_name, using=connection_id)

#Switch to use the new database
db.using_database(db_name, using=connection_id)

output:

1
2
Current databases:  ['default']
Creating database : course_db

Create a new user:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#user management
from pymilvus import Role,utility

current_users=utility.list_usernames(using=connection_id)
print("Current user list: ", current_users)

new_user = "course_public"

if new_user not in current_users:
utility.create_user(new_user, "password", using=connection_id)

#Assign a role to the user
public_role = Role("public", using=connection_id)
print(" Role public exists? ", public_role.is_exist())

#Add user to role
public_role.add_user(new_user)

We can access 192.168.3.4:19530 - Attu to view the database and user.

Create collections

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from pymilvus import CollectionSchema, FieldSchema, DataType, Collection
import json

#Define fields
course_id = FieldSchema(
name="course_id",
dtype=DataType.INT64,
is_primary=True,
max_length=32)

title= FieldSchema(
name="title",
dtype=DataType.VARCHAR,
max_length=256)

description= FieldSchema(
name="description",
dtype=DataType.VARCHAR,
max_length=2048)

#Dim should match the embedding size to store vector for description
desc_embedding = FieldSchema(
name="desc_embedding",
dtype=DataType.FLOAT_VECTOR,
dim=1536 # We use OpenAI's embedding model which's dimension size is 1536
)

#Define schema
wiki_schema=CollectionSchema(
fields=[course_id, title, description, desc_embedding],
description="Courses List",
enable_dynamic_field=True
)

collection_name="courses_list"

#Creation collection
wiki_collection=Collection(
name=collection_name,
schema=wiki_schema,
using=connection_id,
shard_num=2 # The number of shards specify the amount of parallelism that is possible during DML operations
)

from pymilvus import utility

#List all collections
print("Current collections: ",utility.list_collections(using=connection_id))

#setup existing collection into another object
r_collection=Collection(collection_name, using=connection_id)
print("\n", r_collection.schema)

Inserting data into Milvus

1
2
3
4
#read the input course CSV
import pandas as pd
course_descriptions = pd.read_csv("course-descriptions.csv")
course_descriptions.head()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#Use langchain to create embeddings.
from langchain.llms import OpenAI
from langchain.embeddings import OpenAIEmbeddings
import os

#Setup open API key to use OpenAI's LLM
#Use your own key for OpenAI.

#If you use the free tier, you may hit rate limits with the number of requests

openai_api_key=""
os.environ["OPENAI_API_KEY"] = openai_api_key

embeddings_model = OpenAIEmbeddings()
1
2
3
4
5
6
7
8
9
10
11
#Prepare data for insert

i_course_id = course_descriptions["Course ID"].tolist()
i_title = course_descriptions["Title"].tolist()
i_description = course_descriptions["Description"].tolist()

i_desc_embedding=[embeddings_model.embed_query(i)
for i in i_description]

#Format for data input
insert_data=[i_course_id, i_title, i_description, i_desc_embedding]
1
2
3
4
5
6
7
8
9
#Initiate a collection object and insert data
course_collection = Collection(collection_name,using=connection_id)

#Insert
mr=course_collection.insert(insert_data)

#Flush the data after insert
print("Inserted data. Now flushing")
course_collection.flush(timeout=180)

Build an index

1
2
3
4
5
6
7
8
9
10
11
12
13
#Build an index
index_params = {
"metric_type":"L2", # or IP, COSINE
"index_type":"IVF_FLAT",
"params" :{"nlist":1024}
}

course_collection.create_index(
field_name="desc_embedding",
index_params=index_params
)

utility.index_building_progress(collection_name,using=connection_id)
  • nlist: the number of clusters or buckets to create the index. Higher values of this parameter can lead to better efficiency but lower search effectiveness.

05

Querying scalar data

1
2
3
4
5
6
#Load the Collection
# NOTE: A collection should first be loaded into memory before
# queries can be executed against it

course_collection.load()
print("Course collection loaded..")
1
2
3
4
5
6
q_result= course_collection.query(
expr = "course_id == 1001",
output_fields = ["title","description"]
)
print(q_result)
print("\n Result object :", type(q_result[0]))
1
2
3
4
5
q_result2= course_collection.query(
expr = "(title LIKE 'MLOps%') && (course_id > 1001) ",
output_fields = ["title","description"]
)
print(q_result2)

Searching vector fields

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#Make sure that the collection is already loaded.

search_params = {
"metric_type": "L2",
"offset": 0,
"ignore_growing": False,
"params": {"nprobe": 10}
}

#Embed the input search string
search_string = "machine learning"
search_embed=embeddings_model.embed_query(search_string)

#Perform search
s_results=course_collection.search(
data=[search_embed], #input query to search for
anns_field="desc_embedding", #field to search with ANN
param=search_params,
limit=10, #Limit output
expr=None, #Use additional scalar conditions
output_fields=["title"],
consistency_level="Strong"
)

print("Search result object:", type(s_results[0]),"\n")
#Print results in order of match
for i in s_results[0]:
print(i.id, str(round(i.distance, 2)), "\t",i.entity.get("title"))
  • ignore_growing : whether the search should ignore segments that are not fully populated. Milvus internally processes data in segments. If set to true, the search may ignore some newly added data. Setting it to false would also include all new data at an additional query cost.
  • nprobe indicates the number of clusters to search starting from the most matching records cluster. Reducing nprobe helps in efficiency, but may possibly ignore additional matches beyond the number of clusters searched.
  • consistency_level controls whether data in processing will be considered for the search.

06

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#Search an unrelated query

#Embed the input search string
search_string2 = "best movies of the year"
search_embed2=embeddings_model.embed_query(search_string2)

#Perform search
s_results2=course_collection.search(
data=[search_embed2], #input query to search for
anns_field="desc_embedding", #field to search with ANN
param=search_params,
limit=10, #Limit output
expr=None, #Use additional scalar conditions
output_fields=["title"],
consistency_level="Strong"
)

#Print results in order of match
for i in s_results2[0]:
print(i.id, str(round(i.distance, 2)), "\t",i.entity.get("title"))

07

So how do we ensure that we get results that are similar to the search string? We need to use the distances returned and use a similarity cut off threshold.

Deleting objects and entities

1
2
#Delete a single record
course_collection.delete("course_id in [1002]")
1
2
#Drop a collection
utility.drop_collection(collection_name,using=connection_id)
1
2
3
4
#drop a database
#Make sure to drop all collections in the database first

db.drop_database(db_name, using=connection_id)

4. Vector DB for LLM Query Caching

LLMs and Caching

Shortcomings with Using LLMs and how vector DB can help:

  • LLMs have revolutionized the use of AI
  • Several apps are being built with LLMs in the backend
  • LLMs are expensive to build, deploy, maintain, and use
  • Cost per inference call is high
  • Latency per inference is also high, given the nature of LLMs

How caching help?

  • In a given organization or context, users trigger similar prompts to the LLM, resulting in the same responses
  • Caching prompts and responses and serving similar prompts from the cache helps reduce cost and latency
  • Prompt/response caching is becoming an essential component of generative AI applications

Prompt caching workflow

08

Set up the Milvus cache

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#Setup database & collection
from pymilvus import connections
from pymilvus import db,Collection

from pymilvus import utility

#Names for connections, database and collections
conn_name = "cache_conn"
db_name="cache_db"
collection_name="llm_cache"

#Create a connection to Milvus
connections.add_connection(
cache_conn={
"host": "localhost",
"port": "19530",
"username" : "username",
"password" : "password"
})

#Connect
connections.connect(conn_name)

#Create a DB if not already present
current_dbs=db.list_database(using=conn_name)

if ( db_name not in current_dbs):
print("Creating database :", db_name)
resume_db = db.create_database(db_name, using=conn_name) #default db is "default"
else:
print(db_name, ": Database already exists")

#Switch to the new database
db.using_database(db_name, using=conn_name)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#Create a Collection for cache
from pymilvus import CollectionSchema, FieldSchema, DataType, Collection
import json

#Define fields in the cache
#Autogenerated ID field for each entity
cache_id = FieldSchema(
name="cache_id",
dtype=DataType.INT64,
auto_id=True,
is_primary=True,
max_length=32)

#Text for the input prompt
prompt_text= FieldSchema(
name="prompt_text",
dtype=DataType.VARCHAR,
max_length=2048)

#Text for the LLM response
response_text= FieldSchema(
name="response_text",
dtype=DataType.VARCHAR,
max_length=2048)

#Embedding for the input prompt
prompt_embedding = FieldSchema(
name="prompt_embedding",
dtype=DataType.FLOAT_VECTOR,
dim=1536 #Define based on embedding used
)

#Define the schema for the cache collection
cache_schema=CollectionSchema(
fields=[cache_id, prompt_text, response_text, prompt_embedding],
description="Cache for LLM",
enable_dynamic_field=True
)

#Create the collection
cache_collection=Collection(
name=collection_name,
schema=cache_schema,
using=conn_name,
shard_num=2
)

print("Schema : ", cache_collection.schema, "\n")

#Build an index for the prompt embedding field
index_params = {
"metric_type":"L2",
"index_type":"IVF_FLAT",
"params" :{"nlist":1024}
}

cache_collection.create_index(
field_name="prompt_embedding",
index_params=index_params
)

#Flush the collection to persist
cache_collection.flush()
#Load the collection in memory
cache_collection.load()

Inference processing and caching

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from transformers import AutoTokenizer
from langchain.llms import OpenAI
from langchain.embeddings import OpenAIEmbeddings
import os
import time

#Setup open API key to use OpenAI's LLM
openai_api_key=""
os.environ["OPENAI_API_KEY"] = openai_api_key

#Create an LLM object
llm= OpenAI(temperature=0., model="text-davinci-003")

#Setup embedding model for creating embeddings
embeddings_model = OpenAIEmbeddings()

#setup threshold for similarity between vectors
similarity_threshold=0.3

search_params = {
"metric_type": "L2",
"offset": 0,
"ignore_growing": False,
"params": {"nprobe": 20, "radius":similarity_threshold}
}

#create a function to run the inference loop
def get_response(prompt):

start_time=time.time()
#create embedding for incoming prompt
prompt_embed=embeddings_model.embed_query(prompt)

#Check cache if result exists
cache_results=cache_collection.search(
data=[prompt_embed],
anns_field="prompt_embedding",
param=search_params,
limit=1, #Look for the top result only
expr=None,
output_fields=["prompt_text", "response_text"],
consistency_level="Strong"
)

returned_response ="None"

if ( len(cache_results[0]) > 0 ):

#Cache hit
print(prompt, " :\n Cache hit : ",cache_results[0])
returned_response = cache_results[0][0].entity.get("response_text")

else:
#Find answer with LLM
llm_response=llm(prompt)
print(prompt, ":\n LLM returned :", llm_response)
returned_response = llm_response

#save prompt/response to cache
prompt_text = [prompt]
prompt_embedding=[prompt_embed]
response_text = [llm_response]

insert_data=[prompt_text, response_text, prompt_embedding]
mr=cache_collection.insert(insert_data)

end_time = time.time()
print("Time elapsed :", end_time - start_time, "\n")
return returned_response
  • radius: Only matches with distances less than the threshold
1
2
3
4
5
6
#Build up the cache
response=get_response("In which year was Abraham Lincoln born?")
response=get_response("What is distance between the sun and the moon?")
response=get_response("How many years have Lebron James played in the NBA?")
response=get_response("What are the advantages of the python language?")
response=get_response("What is the typical height of an elephant")
1
2
response=get_response("List some advantages of the python language")
response=get_response("How tall is an elephant?")

Cache management

  • Track cache hit ratio to measure cache effectiveness
  • Benchmark/test to find the right similarity threshold (radius)
  • Limit size of cached entries
  • Track last used timestamp (another scalar)
  • Prone entries based on age, last used
  • Get user feedback to measure if cached answers are accurate

5. Introduction to Retrieval Augmented Generation (RAG)

LLM as a knowledge source

LLM capabilities:

  • Language capabilities: Understanding, reasoning, generating, and translating text
  • Knowledge capabilities: Question answering, knowledge distillation

LLM as a Knowledge Base: shortcomings

  • can only answer questions based on the data they are trained on
  • Answers may not be current
  • LLMs can hallucinate
  • Cannot answer based on enterprise/confidential data
  • Building custom LLMs/fine-tuning with organizational

Introduction to retrieval augmented generation (RAG)

Retrieval augmented generation (RAG) is a framework that combines knowledge from a curated (精心策划的)knowledge base with the generation capabilities of an LLM to provide accurate and well-structured answers.

When users provide prompts, the knowledge base provides contextual knowledge and the LLM provides well-structured answers.

  • RAG Features
    • Use enterprise and confidential data sources
    • Combined data from multiple data sources in different formats
    • Curate/prune data to ensure up-to-date and accurate knowledge
    • To find answers to queries, we can combine scalar and vector searches. Vector searches can be used to find relevant answers in vectors, while scalar filters can help with narrowing down the context. For example, if the user asks a troubleshooting question about a specific product, scalar filters can be used to filter answers for that specific product.
    • RAG can use standard and out-of-the-box LLMs for language generation without the need to create or fine-tune custom models. This significantly reduces the cost.

RAG: Knowledge curation process

How do we build a RAG systems?

  • The knowledge curation process
  • The inference process

The knowledge curation process:

We can have one or more sources of data for the RAG system. This could be websites, ticket system, traditional RDBMS databases, document hubs like SharePoint or Google Drive, and a Doc documents.

Do note that the structure of the data sources will be vastly different. For each of these data sources, we need to build an acquisition module. The module will fetch data from the sources, filter it for relevant information, and then cleanse them to eliminate any kind of noise.

09

RAG question-answering process

10

Applications of RAG

  • Interactive chatbots
  • Automated email responses for customer queries
  • Root cause analysis (based on observations and manuals)
  • Ecommerce search
  • Automated help desks (HR, legal, logistics)
  • Document hub searches

6. Implementing RAG with Milvus

Set up Milvus for RAG

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#Create the Connection and database for RAG
from pymilvus import connections
from pymilvus import db,Collection

from pymilvus import utility

connections.add_connection(
rag_conn={
"host": "localhost",
"port": "19530",
"username" : "username",
"password" : "password"
})

conn_name="rag_conn"
db_name="rag_db"

connections.connect(conn_name)
connections.list_connections()

current_dbs=db.list_database(using=conn_name)
print("Databases: ", current_dbs)

if ( db_name not in current_dbs):
print("Creating database :", db_name)
resume_db = db.create_database(db_name, using=conn_name)

#Switch to the new database
db.using_database(db_name, using=conn_name)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#Create a new collection for RAG
from pymilvus import CollectionSchema, FieldSchema, DataType, Collection
import json

chunk_id_field = FieldSchema(
name="chunk_id",
dtype=DataType.INT64,
is_primary=True,
max_length=32)

rag_text_field= FieldSchema(
name="rag_text",
dtype=DataType.VARCHAR,
max_length=2048)

rag_embedding_field = FieldSchema(
name="rag_embedding",
dtype=DataType.FLOAT_VECTOR,
dim=1536 #Define based on embedding used
)

rag_schema=CollectionSchema(
fields=[chunk_id_field, rag_text_field, rag_embedding_field],
description="RAG Schema",
enable_dynamic_field=True
)

collection_name="rag_collection"

rag_collection=Collection(
name=collection_name,
schema=rag_schema,
using=conn_name,
shard_num=2
)

from pymilvus import utility
print("Collections: ", utility.list_collections(using=conn_name))

r_collection=Collection(collection_name, using=conn_name)
print("\n Schema :", r_collection.schema)

Prepare data for the knowledge base

1
2
3
4
5
#Load up the PDF document
from langchain.document_loaders import PDFMinerLoader

loader = PDFMinerLoader("Large Language Models.pdf")
pdf_docs = loader.load()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#Split document into chunks
from langchain.text_splitter import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(
chunk_size=512, # Specify the character chunk size
chunk_overlap=32, # "Allowed" Overlap across chunks
length_function=len # Function used to evaluate the chunk size (here in terms of characters)
)

pdf_docs = text_splitter.split_documents(pdf_docs)

#Create a list of chunks
rag_text =[]
for i in pdf_docs:
rag_text.append(i.page_content)

print("Total chunks :", len(rag_text))
print("Sample chunk text: ", rag_text[1])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#create embeddings
from langchain.embeddings import OpenAIEmbeddings
import os

openai_api_key=""
os.environ["OPENAI_API_KEY"] = openai_api_key

embeddings_model = OpenAIEmbeddings()

rag_embedding=[embeddings_model.embed_query(i)
for i in rag_text]

#Create chunk IDs
record_ids=[i for i in range(len(rag_text))]

Populate the Milvus database

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
insert_data=[record_ids, rag_text, rag_embedding]

i_collection = Collection(collection_name, using=conn_name)

#Insert the records
mr=i_collection.insert(insert_data)
#Flush the inserted records
i_collection.flush()

#Build an index on the embedding field
index_params = {
"metric_type":"L2",
"index_type":"IVF_FLAT",
"params" :{"nlist":1024}
}

i_collection.create_index(
field_name="rag_embedding",
index_params=index_params
)

utility.index_building_progress(collection_name, using=conn_name)

Answer questions with RAG

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#The retrieval process
search_params = {
"metric_type": "L2",
"offset": 0,
"ignore_growing": False,
"params": {"nprobe": 20, "radius":0.5}
}

query = "What is gender bias?"
search_embed=embeddings_model.embed_query(query)
#print(search_embed)

q_collection = Collection(collection_name, using=conn_name)
q_collection.load()

results=q_collection.search(
data=[search_embed],
anns_field="rag_embedding",
param=search_params,
limit=3, #Get top 3 results only
expr=None,
output_fields=["rag_text"],
consistency_level="Strong"
)

print("Top result :", results[0][0])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#Prepare prompt for LLM

context=[]

#Append all returned chunks
for i in range(len(results[0])):
context.append(results[0][i].entity.get("rag_text"))

#Create a prompt
prompt= ("Based on only the context provided, answer the query below: "
+ " Context: " + str(context)
+ "\n\n Query: " + query)

print(prompt)
1
2
3
4
5
6
7
8
#Generate with LLM

from langchain.llms import OpenAI

llm= OpenAI(temperature=0., model="text-davinci-003")

completion=llm(prompt)
print(completion)

7. Vector Databases Best Practices

Choose a vector database

  • Several vector DB options available
    • Cloud vs standalone, embedded vs cluster, specialized vs general
  • Use case decides the choice of the database
    • Storage, scalability and reliability needs
    • Frequency of hybrid queries
    • OK to store data in the cloud?
    • Can provide resources for local hosting and management?

Combine vector and scalar data

  • Specialized vector databases
    • Excellent support for vector search
    • Lack the extensive query capabilities that traditional databases provide
  • Does the use case require hybrid search?
  • Keep scalar and vector data in separate databases?
  • Choose carefully, since it has significant implications

Distance measure considerations

  • Vector search will always return hits as long as there are records available in the database. If we set a limit of 10 in the query, it will return 10 records as long as there are 10 records in the database.
  • Distance or similarity thresholds needs to check if vectors in DB match the vector in query. In Milvus, we can set the radius search parameter to this value.
  • What exactly is similar? Depends on the use case.
  • Embedding models and metric type impact similarity thresholds
    • Custom embedding by domain (examples: healthcare, finance)

Tune vector DB performance

  • Effectiveness of search depends upon the search data, embedding model, metric type, and thresholds
  • Find the best combination by experimentation
    • Use a good test dataset that matches real-word data
    • Experiment with embedding models and metric types
    • Experiment with different distance thresholds to find the optimal value
    • Continue to monitor this performance in production also

Conclusion

Keep exploring:

  • Other vector database products beyond Milvus to understand how they compare
  • Tools like LangChain and LlmalIndex help in building applications with vector databases
  • Retrieval augmented generation application for your organization with vector databases