Select your cookie preferences

We use essential cookies and similar tools that are necessary to provide our site and services. We use performance cookies to collect anonymous statistics, so we can understand how customers use our site and make improvements. Essential cookies cannot be deactivated, but you can choose “Customize” or “Decline” to decline performance cookies.

If you agree, AWS and approved third parties will also use cookies to provide useful site features, remember your preferences, and display relevant content, including relevant advertising. To accept or decline all non-essential cookies, choose “Accept” or “Decline.” To make more detailed choices, choose “Customize.”

AWS Logo
Menu
Amazon Bedrock, Amazon Transcribe Amazon OpenSearch

Amazon Bedrock, Amazon Transcribe Amazon OpenSearch

Enhance AI with Amazon Transcribe and Amazon OpenSearch for efficient data ingestion and real-time contextual search.

Published Nov 9, 2024
Last Modified Nov 12, 2024
Benefits of this solution:
  • Scalability: Automatically scales to handle varying loads.
  • Cost-Effective: Pay-as-you-go pricing reduces overhead.
  • High Performance: Enables fast, efficient similarity searches.
  • Enhanced AI Responses: Integrates with RAG for contextually accurate outputs.
  • Seamless Transcription: Combines Amazon Transcribe for easy audio-to-text processing.
Introduction
  • Overview: This blog is part two of our series on processing and utilizing audio transcriptions. After Amazon Transcribe completes its task and generates a transcription file, we will embed the text using Amazon Bedrock through the LangChain API. The embedded data will then be stored in an Amazon OpenSearch Serverless service which can be utilized as a vector database. This index enables efficient retrieval and allows for RAG (Retrieval-Augmented Generation) searches against the audio transcription, enhancing the search experience with contextual and relevant results.
  • Security: As discussed in the first article of this series, security is a critical aspect of any solution. In this example, we use AWS Lambda and Amazon OpenSearch Serverless , both deployed within a VPC. This setup includes network policies and data access policies to manage index access effectively. For more details, please refer to this article.We also utilize the LangChain API with the BedrockEmbeddings library for text embedding. It is important to emphasize the type of data being handled, especially if it is confidential or private, to meet regulatory and security requirements. Be sure to use AWS libraries, Bedrock Guardrails, and follow best security practices to ensure that security is integrated throughout AWS services and Amazon OpenSearch, safeguarding data security and integrity in your RAG application.
It's crucial to consider the type of data you're handling. Be sure to use AWS libraries, Bedrock Guardrails, and follow best security practices. This ensures that security is integrated across AWS services and Amazon OpenSearch, maintaining data security and integrity in your RAG application.
  • Purpose: The purpose of this solution is to enable seamless integration of audio transcription files into a RAG (Retrieval-Augmented Generation) system to perform semantic searches. By embedding transcribed audio content and storing it in a vector database, users can leverage advanced search capabilities that go beyond simple keyword matching. This allows for more meaningful and context-aware retrieval of information, improving the efficiency and relevance of search results within applications that require comprehensive analysis and querying of audio-derived text data.
  • Objective: To demonstrate the embedding process of transcribed audio content and its integration into a vector database for enabling semantic searches within a RAG solution.

Architecture

Image not found
Amazon Transcribe Ingest to Amazon OpenSearch
  • Workflow Overview:
Services Used
  • Amazon S3: For storing the transcribed JSON files. We configure Amazon EventBridge to trigger an AWS Lambda function when an transcription file created by Amazon Transcribe job.
  • AWS EventBridge: Trigger the AWS Lambda function upon an Object Create event.
  • AWS Lambda: The function is deployed in VPC to allow secure and private access to Amazon OpenSearch Serverless database, developed in Python with the AWS SDK, Boto3, is designed to accept a JSON payload from Amazon EventBridge and ingest an embedding file into the Amazon OpenSearch Service vector database. This setup ensures seamless integration with event-driven architectures by automatically triggering the function when a file upload event occurs. The Python code processes the input payload and embeds the data into the Amazon OpenSearch index, supporting efficient data processing and integration for prompt semantic querying and content updates.
  • Amazon OpenSearch: A Serverless vector database stores vector embeddings, which are numerical representations of data (e.g., words or images) capturing their semantics. This type of database is managed by cloud providers, removing the need for infrastructure management and scaling automatically based on use.It integrates with Retrieval-Augmented Generation (RAG) by enabling efficient similarity searches, such as calculating the cosine similarity of vectors. This allows the system to quickly identify and retrieve the most contextually relevant information, making it highly effective for generating responses based on related content. When a query is made, it’s converted into a vector, and the database finds the closest matches. This allows RAG systems to retrieve relevant context efficiently and use it to enhance their generated responses. Benefits include scalability, cost-effectiveness, and high performance.
  • IAM Roles: The IAM roles for the AWS Lambda function include the necessary policies to access both Amazon OpenSearch Serverless vector database and the Amazon S3 bucket.
Here is a code snippet of the AWS Lambda function demonstration only:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
import json
import logging
import uuid
import boto3
import os
import datetime
import urllib
from langchain.docstore.document import Document
from langchain_aws.embeddings.bedrock import BedrockEmbeddings
from langchain_community.document_loaders.text import TextLoader
from langchain_community.vectorstores import OpenSearchVectorSearch
from langchain_text_splitters import RecursiveCharacterTextSplitter
from opensearchpy import RequestsHttpConnection, AWSV4SignerAuth

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# Helper function to convert datetime to string in ISO format
def json_serial(obj):
"""JSON serializer for objects not serializable by default."""
if isinstance(obj, datetime.datetime): # Correct type check
return obj.isoformat() # Convert datetime to ISO 8601 string
raise TypeError(f"Type {type(obj)} not serializable")

# Get Boto3 clients
def get_s3_client(region):
s3_client = boto3.client("s3", region_name=region)
return s3_client

def get_bedrock_client(region):
bedrock_client = boto3.client("bedrock-runtime", region_name=region)
return bedrock_client

def get_transcribe_client(region):
transcribe_client = boto3.client("transcribe", region_name=region)
return transcribe_client

# Get Transcribe jobs by name

def get_transcribe_jobs_by_name(transcribe_client, job_name: str):
logger.info("Get Transcribe jobs: %s", job_name)
print("Get Transcribe jobs", job_name)
try:
response = transcribe_client.list_transcription_jobs()
job_exists = any(
job["TranscriptionJobName"] == job_name
for job in response.get("TranscriptionJobSummaries", [])
)
if not job_exists:
raise ValueError(f"Job {job_name} does not exist")
response = transcribe_client.get_transcription_job(
TranscriptionJobName=job_name
)
return response
except Exception as e:
logger.error("Error getting the job by name: %s", e)
print("Error getting the job by name", e)
raise e

# Create vector embedding with Bedrock
def create_vector_embedding_with_bedrock(bedrock_client):
model_id = "amazon.titan-embed-text-v1"
embeddings = BedrockEmbeddings(client=bedrock_client, model_id=model_id)
return embeddings

# Create vector search object
def create_vector_search_object(embeddings, host, region, index_name: str):
region = region
service = "aoss"
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, region, service)
docsearch = OpenSearchVectorSearch(
opensearch_url=f"{host}:443",
index_name=index_name,
embedding_function=embeddings,
http_auth=auth,
http_compresss=False,
use_ssl=True,
verify_certs=True,
ssl_assert_hostname=True,
ssl_show_warn=True,
connection_class=RequestsHttpConnection,
timeout=30,
)
return docsearch

# Check if the OpenSearch index exists
def check_opensearch_index(opensearch_client, index_name) -> bool:
return opensearch_client.indices.exists(index=index_name)

# Split the document into chunks
def load_and_split_document(file_path):
loader = TextLoader(file_path)
docs = loader.lazy_load()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
)
splits = text_splitter.split_documents(docs)
return splits

# Convert json transcript to text
def transcript_parser(s3_client, job_name, transcript_s3_url):

# Extract bucket name and key from the S3 URL
bucket_name = transcript_s3_url.split("/")[3]
key = "/".join(transcript_s3_url.split("/")[4:])

# Get the JSON file from S3
response = s3_client.get_object(Bucket=bucket_name, Key=key)
content = response["Body"].read().decode("utf-8")

# Parse the JSON content
data = json.loads(content)

# Check if the job name matches
if data.get("jobName") == job_name:
# Extract and return the transcript body
transcripts = data.get("results", {}).get("transcripts", [])
if transcripts:
return transcripts[0].get("transcript")

return None

# Load the document from in-memory and split
def load_from_in_memory_and_split(text_body):
# Create a document
docs = [Document(
page_content=text_body,
metadata={"source": "in-memory text",
"author": "unknown",
"date": "2024"}
)]
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
)
splits = text_splitter.split_documents(docs)
return splits

# Embed and index the text body
def embed_and_index_text_body(text_body, region, host, env_index_name):
# Get embeddings
logger.info("Getting bedrock client")
bedrock_client = get_bedrock_client(region)
logger.info("Getting embeddings")
embeddings = create_vector_embedding_with_bedrock(bedrock_client)

# Create vector search object
logger.info("Creating vector search object")
docsearch = create_vector_search_object(embeddings, host, region,index_name=env_index_name)

# Read in the document
logger.info("Reading in the document")
splits = load_from_in_memory_and_split(text_body)
logger.info(f" The split size: {len(splits)}")

# Index the document
logger.info("Indexing the document")
docsearch.add_documents(splits)
logger.info("Done indexing the document")

return docsearch

# Lambda handler function
def handler(event, context):

logger.info(f"Event Received: {event}")
print(f"Event Received: {event}")

# Get environment variables
env_region = os.environ.get("REGION", "us-east-1")
opensearch_host = os.environ.get("HOST", "endpoint")
env_index_name = os.environ.get("INDEX_NAME", "audio-index")
audio_bucket = os.environ.get("AUDIO_BUCKET_NAME", "audio-bucket")

# Log environment variables
logger.info(f"Region: {env_region}")
logger.info(f"Index Name: {env_index_name}")
logger.info(f"Audio Bucket: {audio_bucket}")

# extract params from the event
region = env_region
audio_bucket = audio_bucket

# Start Transcribe job
logger.info("Getting Transcribe client")

transcribe_client = get_transcribe_client(region)
s3_client = get_s3_client(region)

if event:
transcription_job_name = event['detail']['TranscriptionJobName']
transcription_job_status_event = event['detail']['TranscriptionJobStatus']

if transcription_job_name:
logger.info(f"Transcription Job Name: {transcription_job_name}")
get_jobs_by_name_response = get_transcribe_jobs_by_name(
transcribe_client, transcription_job_name
)
transcription_job_status = get_jobs_by_name_response.get(
"TranscriptionJob", {}
).get("TranscriptionJobStatus")

logger.info(f"Transcript job response: {get_jobs_by_name_response}")

if transcription_job_status == "COMPLETED" and transcription_job_status_event == "COMPLETED":
transcript_file_uri = (
get_jobs_by_name_response.get("TranscriptionJob", {})
.get("Transcript", {})
.get("TranscriptFileUri")
)
if transcript_file_uri:
logger.info(f"Transcript File URI: {transcript_file_uri}")
print(f"Transcript File URI: {transcript_file_uri}")
# call method to process the transcript file
transcript_body = transcript_parser(
s3_client, transcription_job_name, transcript_file_uri
)
if transcript_body:
# call method to delete message from the queue
store_log_in_dynamodb(
dynamodb_client=dynamodb_client,
transcribe_client=transcribe_client,
dynamodb_table_name=dynamodb_table_name,
data={
"TranscriptionJobName": transcription_job_name,
"TranscriptFileUri": transcript_file_uri,
"TranscriptionJobStatus": transcription_job_status,
},
)
logger.info(f"Transcript Body: {transcript_body[:50]}")
print("Transcript Body:", transcript_body[:50])
logger.info("Embedding and Indexing the Text Body")
embed_and_index_text_body(transcript_body, region, opensearch_host, env_index_name)
logger.info("Done embedding and indexing the text body!")

else:
logger.error("Transcript body is empty.")
else:
logger.error(
"TranscriptFileUri not found in the Transcribe job response."
)
else:
logger.error(
f"Transcription job is not completed. Current status: {transcription_job_status}"
)

else:
logger.error("TranscriptionJobName not found in the event message body.")
else:
logger.error("No event message body received.")

# if __name__ == "__main__":
# test_event = {
# 'version': '0',
# 'id': '9fbad820-dc3e-05f2-cec6-eebdd8358beb',
# 'detail-type': 'Transcribe Job State Change',
# 'source': 'aws.transcribe',
# 'account': 'xxxxxxxxx',
# 'time': '2024-09-25T14:22:34Z',
# 'region': 'us-east-1',
# 'resources': [],
# 'detail': {'TranscriptionJobName': ']test-transcribe-job', 'TranscriptionJobStatus': 'COMPLETED'}

# }

# test_context = None

# handler(test_event, test_context)

Conclusion

  • Summary: This blog covers how to integrate Amazon Transcribe with a Amazon OpenSearch Serverless vector database for processing and storing transcribed text as vector embeddings. This setup enables efficient similarity searches and retrieval for applications like Retrieval-Augmented Generation (RAG). By converting transcribed audio into embeddings, the system can enhance responses by retrieving contextually relevant content quickly. Benefits include seamless scalability, cost-effectiveness, and high performance, making it perfect for real-time AI solutions that need transcription and contextual search capabilities.
Next Steps: You can adapt this design pattern to suit your particular business requirements. In the next section, we'll explore how to search and retrieve data from a vector store using Amazon Bedrock.
 

Comments

Log in to comment