logo
Menu

Build Retrieval Augmented Generation (RAG) based Generative AI (GenAI) application with Amazon Bedrock

Quick GenAI RAG-based application prototype using Large Language Models (LLM) with Amazon Bedrock

Published May 19, 2024

ABSTRACT

The main objective of this article is to share a quick and easy way to prototype Large Language Model (LLM) with Retrieval Augmented Generation (RAG) application using Amazon Bedrock. We are using information from few HTML documents with general descriptions in the areas of Transformer model architecture and Attention mechanism. These documents are embedded and form our knowledge base on the subject. This example approach can be easily adapted to use different models with advanced RAG to evaluate the LLM responses based on the required tasks.

Python Libraries

The following libraries are required for the LLM RAG application prototype.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# AWS Bedrock runtime client, Chunking, Embedding, Retrieval
import boto3
import json
import uuid
import hnswlib
from typing import List, Dict
from unstructured.partition.html import partition_html
from unstructured.chunking.title import chunk_by_title

# LangChain with ConversationBufferMemory
from langchain_aws import ChatBedrock
from langchain.memory import ConversationBufferMemory
from langchain_core.prompts import PromptTemplate
from langchain.chains import LLMChain

Create AWS client for making inference requests

  • Create a boto3 client to connect programmatically for making inference requests for Foundational Models (FM) hosted in Amazon Bedrock (eg. Cohere Command R).
1
bedrock_runtime = boto3.client(service_name='bedrock-runtime', region_name='us-east-1')

Explore Cohere Models with Embeddings

  • Load and Chunk (html documents with unstructured library).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def load_and_chunk(self) -> None:
"""
Loads the text from the sources and chunks the HTML content.
"""

for raw_document in self.raw_documents:
elements = partition_html(url=raw_document["url"])
chunks = chunk_by_title(elements)
for chunk in chunks:
self.docs.append(
{
"title": raw_document["title"],
"text": str(chunk),
"url": raw_document["url"],
}
)
  • Embed the document chunks in batches using Cohere Embed (English) model hosted in Amazon Bedrock.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def embed(self) -> None:
"""
Embeds the document chunks using the Cohere API.
"""

modelId = 'cohere.embed-english-v3'
contentType = 'application/json'
accept = '*/*'
num_embed_called = 1

batch_size = 90
self.docs_len = len(self.docs)
for i in range(0, self.docs_len, batch_size):
batch = self.docs[i : min(i + batch_size, self.docs_len)]
texts = [item["text"] for item in batch]

print("Docs_len :{}, Embed counter : {}".format(len(self.docs), num_embed_called))
cohere_body = json.dumps({
"texts": texts,
"input_type": "search_document" # search_query | classification |clustering
})
response = bedrock_runtime.invoke_model(body=cohere_body, modelId=modelId,
accept=accept, contentType=contentType)
embed_response_body = json.loads(response.get('body').read())
docs_embs_batch = embed_response_body.get('embeddings')
num_embed_called += 1

self.docs_embs.extend(docs_embs_batch)
  • Uses the hsnwlib package to index the document chunk embeddings. This ensures efficient similarity search during retrieval. For simplicity, we use hsnwlib as vector library for our knowledge database.
1
2
3
4
5
6
7
8
def index(self) -> None:
"""
Indexes the document chunks for efficient retrieval.
"""

self.idx = hnswlib.Index(space="ip", dim=1024)
self.idx.init_index(max_elements=self.docs_len, ef_construction=512, M=64)
self.idx.add_items(self.docs_embs, list(range(len(self.docs_embs))))
  • Chatbot decides if it needs to consult external information from knowledge database before responding. If so, it determines an optimal set of search queries to use for documents retrieval.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Generate search queries (if any) from user query
modelId = "cohere.command-r-v1:0"
cohere_body = json.dumps({
"temperature": 0.0,
"p": 0.99,
"k": 250,
"max_tokens": 1000,

"preamble": "You are an AI assistant with expertise in Transformers and attention models. \
You should say you do not know if you do not know and answer only if \
you are very confident. Answer in number bulleted form.",
# "chat_history" is not used for "search_queries_only" with empty []
"message": message,
"search_queries_only": True,
})
response = bedrock_runtime.invoke_model(body=cohere_body, modelId=modelId,
accept=accept, contentType=contentType)
search_response_body = json.loads(response.get('body').read())
  • The document search is performed by the knn_query() method from the hnswlib library. With a user query message, it returns the document chunks that are most similar to this query. We can define the number of document chunks to return using the attribute retrieve_top_k(). If there are matched documents, the retrieved document chunks are then passed as documents in a new query message send to the FM (Cohere Command R+).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Use Cohere Command R+ for continuous chat after query search
modelId = "cohere.command-r-plus-v1:0"
# If there are search queries, retrieve document chunks and respond
if search_response_body["search_queries"]:
print("Retrieving information...\n", end="")

# Retrieve document chunks for each query
documents = []
for query in search_response_body["search_queries"]:
documents.extend(self.vectorstore.retrieve(query["text"]))

# Use document chunks to respond
cohere_body = json.dumps({
"temperature": 0.0,
"p": 0.99,
"k": 250,
"max_tokens": 1000,

"preamble": "You are an AI assistant with expertise in Transformers and attention models. \
You should say you do not know if you do not know and answer only if \
you are very confident. Answer in number bulleted form.",

"chat_history": self.prev_docs_chat_history_response
"message": message,
"documents": documents,
})
response = bedrock_runtime.invoke_model(body=cohere_body, modelId=modelId,
accept=accept, contentType=contentType)
docs_response_body = json.loads(response.get('body').read())
  • Display external information from RAG with citations and retrieved document chunks based on the retrieve_top_k parameter.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Display citations and source documents
if 'citations' in docs_response_body.keys():
citations = docs_response_body.get('citations')

if citations:
cit_cnt = 1
print("\n\nCITATIONS:")
for citation in citations:
print("[{}]:\n{}".format(cit_cnt, citation))
print()
cit_cnt += 1

# Print retrieved documents
print("\nDOCUMENTS:")
doc_cnt = 1
for document in documents:
print("[{}]:\n{}".format(doc_cnt, document))
print()
doc_cnt += 1
  • The chat history is updated for next user query.
1
2
3
4
5
# print("\nConversationID : {} with prev chat history :\n{}".format(self.conversation_id, self.prev_docs_chat_history_response))
docs_chat_history_response = docs_response_body.get('chat_history')

# chat_history are saved in each invoke model with "chat_history" from previous queries
self.prev_docs_chat_history_response = docs_chat_history_response

Alternative LLM model to handle general questions

  • Using an alternative Foundational Models (FM) hosted in Amazon Bedrock (eg. Claude3 Sonnet) to handle general questions whenever the Cohere models do not know the answers. LangChain LLMChain and ConversationBufferMemory are used in to establish the conversation and store the chat history.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def run_GenModel(self, message):
modelId = 'anthropic.claude-3-sonnet-20240229-v1:0'
anthropic_version = 'bedrock-2023-05-31'

# ChatBedrock with Claude3 Sonnet as alternative LLM for general questions
claude3_sonnet_llm = ChatBedrock(
client=self.bedrock_runtime,
model_id=modelId,
region_name="us-east-1",
model_kwargs={"temperature": 0.01,
"top_p": 0.999,
"top_k": 250,

"anthropic_version": anthropic_version,
"max_tokens": 1000,
},
)

prompt_template = """You are an AI assistant with expertise in stock market trends and share prices analysis.
You should say you do not know if you do not know and answer only if you are very confident.
Answer in number bulleted form.

Previous conversation:
{chat_history}

Human: {input}
AI assistant:\n"""

prompt = PromptTemplate.from_template(prompt_template)

# Using LLMChain for conversation and store the conversation in buffer memory
LLMChain_conversation = LLMChain(
llm=claude3_sonnet_llm,
prompt=prompt,
memory=self.memory_chat_history,
verbose=True
)

gen_llm_response = LLMChain_conversation.predict(input=message)

LLM Chatbot with RAG User interface

Using Streamlit to build the User interface for the LLM Chatbot with RAG.
Amazon Bedrock RAG Chatbot - LOL

REFERENCES 📚

Comments