logo
Menu
Chat with Bedrock in the Amazon Connect contact control panel 🎧 🤖

Chat with Bedrock in the Amazon Connect contact control panel 🎧 🤖

Enable your contact center agents to have conversations with AI assistants in the same place that they receive customer calls

Published Apr 24, 2024
Last Modified May 16, 2024
Let's see how we can link an Amazon Bedrock assistant with our Amazon Connect contact center. Although there are already built-in generativeAI capabilities in Amazon Connect the Connect agents can also chat directly with a genAI model using the contact control panel with this integration.

Services used

For this solution I used the Amazon Connect real-time chat message streaming feature that allows to integrate Amazon Connect chats in external chat services.
I also used AWS lambda to act as the link between Bedrock and the Amazon Connect APIs.
Finally I used DynamoDB for matching the conversation IDs between Bedrock and the Connect chat.
Note: regarding Bedrock usage I decided to leverage the Agents for Amazon Bedrock capability, as this comes with a managed chat feature that handles automatically conversation history and makes things simpler. Would also be possible to use directly the bedrock invokeModel API and implement a conversation history by ourselves.
Architecture diagram

Let's build

Prerequisites

First some initial requirements:
  1. Create a dynamoDB table with the next partition key: "contact_id" (String) and the rest default settings (see Create a table example).
  2. Check the Amazon Connect instance ID and contact flow ID (see Find the flow ID). This will be the Connect contact flow that will receive the Bedrock conversation. Check also that your Connect agents have access to this contact flow and its queue (see How routing works).
    • If you don't have yet an Amazon Connect instance check the Get started with Amazon Connect guide. In just some minutes you can start your own contact center instance in the cloud!.
  3. Create a standard SNS topic as explained in the real-time chat message streaming guide step1. (FIFO topics don't work with this!!).
  4. Create an Amazon Bedrock agent (see Create an agent in Amazon Bedrock guide).
  5. Create a SQS queue as the destination of the SNS topic. (see Fanout to Amazon SQS queues):
    • Set the Visibility timeout to 60 seconds, you can leave the rest default parameters.
    • In the SNS console select the SQS subscription and add the next message attribute policy:
1
2
3
4
5
{
"ParticipantRole": [
"AGENT"
]
}

Orchestration lambda

Now let's create the main lambda function of the solution. This function will use the StartChatContact Api to start a new Connect conversation, then it uses the Bedrock InvokeAgent Api to send the message to the Bedrock model and finally it uses the send_message Api to send Bedrock response back to the Connect agent.
Go to the lambda console and create a new lambda function with Python as runtime and the rest of default configuration parameters. Once the function is created is necessary to configure several things:
  1. Increase timeout value to over 1min. Most of generative AI models will take several seconds to elaborate the answer (see Configure Lambda function timeout).
  2. Add lambda role next permissions policies: Bedrock access, Amazon connect access, Dynamodb access and Amazon SQS access (see Lambda execution role and AWS managed policies).
  3. Add the previously created SQS queue as the lambda trigger with the next configuration: batch size = 1.
  4. Create the next environment variables with your own parameters. You can get the agent_id and alias_id from bedrock agent console, the contact_flow_id and instance_id from Amazon Connect console, the dynamo_table name from DynamoDB and the sns_endpoint_arn from the SNS topic console:
With this now you can copy paste the next code into the lambda code editor:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import json
import logging
import boto3
import uuid
import pprint
import os
from boto3.dynamodb.conditions import Key

## creating logger
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)

# getting boto3 clients for required AWS services
sts_client = boto3.client('sts')
bedrock_agent_client = boto3.client('bedrock-agent')
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime')
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ["dynamo_table"])

def get_session_and_token(table, contact_id):
response = table.query(
KeyConditionExpression=Key('contact_id').eq(contact_id)
)

if response['Items']:
item = response['Items'][0]
session_id = item['session_id']
connection_token = item['connection_token']
return session_id, connection_token
else:
return None, None

#session ID
enable_trace:bool = True
end_session:bool = False

#Agent Alias
agent_id = os.environ["agent_id"]
agent_alias_id = os.environ["alias_id"]

#Start Agent function

def start_agent_conversation(input_text, agent_id, agent_alias_id, session_id, enable_trace, end_session):

agentResponse = bedrock_agent_runtime_client.invoke_agent(
inputText=input_text,
agentId=agent_id,
agentAliasId=agent_alias_id,
sessionId=session_id,
enableTrace=enable_trace,
endSession= end_session
)

event_stream = agentResponse['completion']
try:
for event in event_stream:
if 'chunk' in event:
data = event['chunk']['bytes']
logger.info(f"Final answer ->\n{data.decode('utf8')}")
agent_answer = data.decode('utf8')
end_event_received = True
# End event indicates that the request finished successfully
elif 'trace' in event:
logger.info(json.dumps(event['trace'], indent=2))
else:
raise Exception("unexpected event.", event)
except Exception as e:
raise Exception("unexpected event.", e)

return agent_answer

# Connect chat functions

connect_client = boto3.client('connect')
instance_id = os.environ["instance_id"]
contact_flow_id = os.environ["contact_flow_id"]
participant_details="Bedrock bot"

#StartChatContact

def start_chat_contact(instance_id, contact_flow_id, participant_display_name):

response = connect_client.start_chat_contact(
InstanceId=instance_id,
ContactFlowId=contact_flow_id,
ParticipantDetails={
'DisplayName': participant_display_name
}
)
return response['ContactId'], response['ParticipantToken'], response['ParticipantId']

#StartContactStreaming

sns_endpoint_arn = os.environ["sns_endpoint_arn"]

def start_contact_streaming(instance_id, contact_id, sns_endpoint_arn, participant_id):
response = connect_client.start_contact_streaming(
InstanceId=instance_id,
ContactId=contact_id,
ChatStreamingConfiguration={
'StreamingEndpointArn': sns_endpoint_arn
},
ClientToken=participant_id
)

return response

def create_participant_connection(connectparticipant_client, participant_token):
response = connectparticipant_client.create_participant_connection(
Type=[
'CONNECTION_CREDENTIALS',
],
ParticipantToken=participant_token,
ConnectParticipant=True
)

connection_token = response["ConnectionCredentials"]["ConnectionToken"]
return connection_token

# Saving session and contact ID in dynamo

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ["dynamo_table"])

def save_conversation_id(table, session_id, contact_id, connection_token):
table.put_item(
Item={
'session_id': session_id,
'contact_id': contact_id,
'connection_token' : connection_token
}
)

def lambda_handler(event, context):
#If the event comes from SQS it will have Records in the event. That means that the messague is from a existent conversation so we apply a different logic:
if 'Records' in event:
for record in event['Records']:
payload = json.loads(record['body'])
message = json.loads(payload['Message'])
response = "No content"

if 'Content' in message:
#extract attributes
content = message['Content']
contact_id = payload['MessageAttributes']['ContactId']['Value']

#get session tokens
session_id, connection_token = get_session_and_token(table, contact_id)

#sendmessagetoagent
agent_answer = start_agent_conversation(content, agent_id, agent_alias_id, session_id, enable_trace, end_session)
print(f"Session_id: {session_id}./n Connection_token {connection_token}. /n Content: {content}. /n Agent_answer: {agent_answer}")

#sending message
client_token = str(uuid.uuid4())

connectparticipant_client = boto3.client('connectparticipant')

response = connectparticipant_client.send_message(
ContentType='text/plain',
Content=agent_answer,
ClientToken=client_token,
ConnectionToken=connection_token
)

return response

#If the event doesn't comes from SQS that means we have to start a new conversation so we apply the next logic:
else:
body = event["body"]

#new sessionID
session_id:str = str(uuid.uuid4())
connectparticipant_client = boto3.client('connectparticipant')

# Creating input and starting agent conversation
input_text = f"Starting conversation with bedrock agent. Use the next information for context: {body}"
print(f"agent input: {input_text}")
agent_answer = start_agent_conversation(input_text, agent_id, agent_alias_id, session_id, enable_trace, end_session)
print(f"agent answer: {agent_answer}")
print(f"session ID: {session_id}")

# Creating connect streaming conversation
contact_id, participant_token, participant_id = start_chat_contact(instance_id, contact_flow_id, participant_details)
streaming_response = start_contact_streaming(instance_id, contact_id, sns_endpoint_arn, participant_id)
connection_token = create_participant_connection(connectparticipant_client, participant_token)

#saving conversation ID
save_conversation_id(table, session_id, contact_id, connection_token)

#sending message
client_token = str(uuid.uuid4())

response = connectparticipant_client.send_message(
ContentType='text/plain',
Content=agent_answer,
ClientToken=client_token,
ConnectionToken=connection_token
)

return {
'statusCode': 200,
'body': json.dumps("Agent conversation triggered")
#'body': json.dumps(agent_answer)
}
 You can explore the comments in the code to understand better what the function is doing. The most important points are the next ones:
  • Start a new Connect conversation. This step is only executed if the message doesn't comes from the SQS trigger as this means that is a new conversation (See if-else logic in the lambda handler). In the code this is done with the function: start_agent_conversation.
  • Saving and reading the connection_token (for the Amazon Connect conversation) and the session_id (For the Bedrock conversation). We have to match this two parameters to be able to have the same conversation in the Connect panel and in the Bedrock session. We use a DynamoDB table to store these values ​​persistently. In the code this is done with the functions: save_conversation_id and get_session_and_token.
  • Send the Connect response to the Bedrock model. In the code this is done with the function: start_agent_conversation.
  • Send the Bedrock response to Connect. In the code this is done with the send_message Api call.

Trigger the lambda and receive the call in Connect

We are ready to start receiving Bedrock calls in our Connect Control Panel. We just need to trigger the lambda and the agent will receive a new conversation message from Bedrock.
You can use the test function of lambda to do the trigger. Test event example, this is the first message that the Bedrock model will receive so you can tailor it to your specific use case:
1
2
3
{
"body": "Hi!"
}
For a production environment one common approach would be to use an Amazon API Gateway to trigger the lambda. Then this API can be integrated in a web app with a trigger button, an application with automated triggers or even into a Connect contact flow and let the same Connect agents to trigger the bedrock conversation. The flexibility is huge!
Once the lambda has been triggered we will receive a call in our Amazon Connect queue.
Let's see how this finally looks like:
For this specific conversation I created a Bedrock assistant in a retail business that has access to different company systems like current customer opened issues. I configured to trigger this assistant every X hours to do calls to the Connect agents and inform them about the current status of the issues.
As this is a generative AI assistant the connect agents can then engage in a conversation for obtaining guidance in resolving those issues. See the example below:

Conclusions

There are a lot of different possibilities and use cases to implement thanks to Amazon Bedrock models and the agent feature.
From assistants that review open issues in your contact center, to productivity helpers and / or models that access company systems and perform actions on your behalf.
The API usage of Amazon Bedrock gives the service a lot of flexibility, this was an example of an external integration with Amazon Connect, but similarly to this agents can be embedded in other communication applications like Slack, Whatsapp, Teams etc...
Keep and eye on new post where I will explore more about this other integrations.
For more information on this topic check the next resources:

Code references

Building with Amazon Bedrock:

Any opinions in this post are those of the individual author and may not reflect the opinions of AWS.

2 Comments