Chat with Bedrock in the Amazon Connect contact control panel 🎧 🤖
Enable your contact center agents to have conversations with AI assistants in the same place that they receive customer calls


- Create a dynamoDB table with the next partition key: "contact_id" (String) and the rest default settings (see Create a table example).
- Check the Amazon Connect instance ID and contact flow ID (see Find the flow ID). This will be the Connect contact flow that will receive the Bedrock conversation. Check also that your Connect agents have access to this contact flow and its queue (see How routing works).
- If you don't have yet an Amazon Connect instance check the Get started with Amazon Connect guide. In just some minutes you can start your own contact center instance in the cloud!.
- Create a standard SNS topic as explained in the real-time chat message streaming guide step1. (FIFO topics don't work with this!!).
- Create an Amazon Bedrock agent (see Create an agent in Amazon Bedrock guide).
- Create a SQS queue as the destination of the SNS topic. (see Fanout to Amazon SQS queues):
- Set the Visibility timeout to 60 seconds, you can leave the rest default parameters.
- In the SNS console select the SQS subscription and add the next message attribute policy:
1
2
3
4
5
{
"ParticipantRole": [
"AGENT"
]
}
- Increase timeout value to over 1min. Most of generative AI models will take several seconds to elaborate the answer (see Configure Lambda function timeout).
- Add lambda role next permissions policies: Bedrock access, Amazon connect access, Dynamodb access and Amazon SQS access (see Lambda execution role and AWS managed policies).
- Add the previously created SQS queue as the lambda trigger with the next configuration: batch size = 1.
- Create the next environment variables with your own parameters. You can get the agent_id and alias_id from bedrock agent console, the contact_flow_id and instance_id from Amazon Connect console, the dynamo_table name from DynamoDB and the sns_endpoint_arn from the SNS topic console:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import json
import logging
import boto3
import uuid
import pprint
import os
from boto3.dynamodb.conditions import Key
## creating logger
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
# getting boto3 clients for required AWS services
sts_client = boto3.client('sts')
bedrock_agent_client = boto3.client('bedrock-agent')
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime')
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ["dynamo_table"])
def get_session_and_token(table, contact_id):
response = table.query(
KeyConditionExpression=Key('contact_id').eq(contact_id)
)
if response['Items']:
item = response['Items'][0]
session_id = item['session_id']
connection_token = item['connection_token']
return session_id, connection_token
else:
return None, None
#session ID
enable_trace:bool = True
end_session:bool = False
#Agent Alias
agent_id = os.environ["agent_id"]
agent_alias_id = os.environ["alias_id"]
#Start Agent function
def start_agent_conversation(input_text, agent_id, agent_alias_id, session_id, enable_trace, end_session):
agentResponse = bedrock_agent_runtime_client.invoke_agent(
inputText=input_text,
agentId=agent_id,
agentAliasId=agent_alias_id,
sessionId=session_id,
enableTrace=enable_trace,
endSession= end_session
)
event_stream = agentResponse['completion']
try:
for event in event_stream:
if 'chunk' in event:
data = event['chunk']['bytes']
logger.info(f"Final answer ->\n{data.decode('utf8')}")
agent_answer = data.decode('utf8')
end_event_received = True
# End event indicates that the request finished successfully
elif 'trace' in event:
logger.info(json.dumps(event['trace'], indent=2))
else:
raise Exception("unexpected event.", event)
except Exception as e:
raise Exception("unexpected event.", e)
return agent_answer
# Connect chat functions
connect_client = boto3.client('connect')
instance_id = os.environ["instance_id"]
contact_flow_id = os.environ["contact_flow_id"]
participant_details="Bedrock bot"
#StartChatContact
def start_chat_contact(instance_id, contact_flow_id, participant_display_name):
response = connect_client.start_chat_contact(
InstanceId=instance_id,
ContactFlowId=contact_flow_id,
ParticipantDetails={
'DisplayName': participant_display_name
}
)
return response['ContactId'], response['ParticipantToken'], response['ParticipantId']
#StartContactStreaming
sns_endpoint_arn = os.environ["sns_endpoint_arn"]
def start_contact_streaming(instance_id, contact_id, sns_endpoint_arn, participant_id):
response = connect_client.start_contact_streaming(
InstanceId=instance_id,
ContactId=contact_id,
ChatStreamingConfiguration={
'StreamingEndpointArn': sns_endpoint_arn
},
ClientToken=participant_id
)
return response
def create_participant_connection(connectparticipant_client, participant_token):
response = connectparticipant_client.create_participant_connection(
Type=[
'CONNECTION_CREDENTIALS',
],
ParticipantToken=participant_token,
ConnectParticipant=True
)
connection_token = response["ConnectionCredentials"]["ConnectionToken"]
return connection_token
# Saving session and contact ID in dynamo
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ["dynamo_table"])
def save_conversation_id(table, session_id, contact_id, connection_token):
table.put_item(
Item={
'session_id': session_id,
'contact_id': contact_id,
'connection_token' : connection_token
}
)
def lambda_handler(event, context):
#If the event comes from SQS it will have Records in the event. That means that the messague is from a existent conversation so we apply a different logic:
if 'Records' in event:
for record in event['Records']:
payload = json.loads(record['body'])
message = json.loads(payload['Message'])
response = "No content"
if 'Content' in message:
#extract attributes
content = message['Content']
contact_id = payload['MessageAttributes']['ContactId']['Value']
#get session tokens
session_id, connection_token = get_session_and_token(table, contact_id)
#sendmessagetoagent
agent_answer = start_agent_conversation(content, agent_id, agent_alias_id, session_id, enable_trace, end_session)
print(f"Session_id: {session_id}./n Connection_token {connection_token}. /n Content: {content}. /n Agent_answer: {agent_answer}")
#sending message
client_token = str(uuid.uuid4())
connectparticipant_client = boto3.client('connectparticipant')
response = connectparticipant_client.send_message(
ContentType='text/plain',
Content=agent_answer,
ClientToken=client_token,
ConnectionToken=connection_token
)
return response
#If the event doesn't comes from SQS that means we have to start a new conversation so we apply the next logic:
else:
body = event["body"]
#new sessionID
session_id:str = str(uuid.uuid4())
connectparticipant_client = boto3.client('connectparticipant')
# Creating input and starting agent conversation
input_text = f"Starting conversation with bedrock agent. Use the next information for context: {body}"
print(f"agent input: {input_text}")
agent_answer = start_agent_conversation(input_text, agent_id, agent_alias_id, session_id, enable_trace, end_session)
print(f"agent answer: {agent_answer}")
print(f"session ID: {session_id}")
# Creating connect streaming conversation
contact_id, participant_token, participant_id = start_chat_contact(instance_id, contact_flow_id, participant_details)
streaming_response = start_contact_streaming(instance_id, contact_id, sns_endpoint_arn, participant_id)
connection_token = create_participant_connection(connectparticipant_client, participant_token)
#saving conversation ID
save_conversation_id(table, session_id, contact_id, connection_token)
#sending message
client_token = str(uuid.uuid4())
response = connectparticipant_client.send_message(
ContentType='text/plain',
Content=agent_answer,
ClientToken=client_token,
ConnectionToken=connection_token
)
return {
'statusCode': 200,
'body': json.dumps("Agent conversation triggered")
#'body': json.dumps(agent_answer)
}
- Start a new Connect conversation. This step is only executed if the message doesn't comes from the SQS trigger as this means that is a new conversation (See if-else logic in the lambda handler). In the code this is done with the function: start_agent_conversation.
- Saving and reading the connection_token (for the Amazon Connect conversation) and the session_id (For the Bedrock conversation). We have to match this two parameters to be able to have the same conversation in the Connect panel and in the Bedrock session. We use a DynamoDB table to store these values ​​persistently. In the code this is done with the functions: save_conversation_id and get_session_and_token.
- Send the Connect response to the Bedrock model. In the code this is done with the function: start_agent_conversation.
- Send the Bedrock response to Connect. In the code this is done with the send_message Api call.
1
2
3
{
"body": "Hi!"
}


Any opinions in this post are those of the individual author and may not reflect the opinions of AWS.