
Supercharging Amazon Bedrock Flows with AWS Multi-Agent Orchestrator
Amazon Bedrock Flows is an intuitive visual design tool that enables users to create complex AI workflows through a user-friendly drag-and-drop interface. By seamlessly integrating with AWS Multi-Agent Orchestrator, it simplifies the coordination of AI agents, making sophisticated AI solutions more accessible to organizations of all technical levels.
- No memory across conversations (each request starts fresh)
- No built-in way to manage multiple flows
history
and question
from the Flow input.tech_agent
node:1
2
3
4
5
6
7
8
9
10
You are an AWS Services Expert Agent, a specialized AI assistant
with comprehensive knowledge of AWS cloud services.
Your primary objective is to provide expert-level guidance,
architectural recommendations, and technical solutions
for complex cloud infrastructure challenges.
Conversation History:
{{history}}
Answer the following question: {{question}}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from multi_agent_orchestrator.orchestrator import MultiAgentOrchestrator
from multi_agent_orchestrator.classifiers import ClassifierResult
from multi_agent_orchestrator.agents import (
BedrockFlowsAgent,
BedrockFlowsAgentOptions)
orchestrator = MultiAgentOrchestrator()
# implement a custom Flow input encoder to include our question and history as object
def flow_input_encoder(agent:Agent, input: str, **kwargs) -> Any:
global tech_support_flow
if agent == tech_support_flow:
chat_history:List[ConversationMessage] = kwargs.get('chat_history', [])
chat_history_string = '\n'.join(f"{message.role}:{message.content[0].get('text')}" for message in chat_history)
return {
"question": input,
"history":chat_history_string
}
else:
return input # return input as a string
# Create a tech agent flow
tech_support_flow = BedrockFlowsAgent(BedrockFlowsAgentOptions(
name="tech-agent",
description="Handles technical requests about AWS services",
flowIdentifier='YOUR_FLOW_ID',
flowAliasIdentifier='YOUR_FLOW_ALIAS_ID',
flow_input_encoder=flow_input_encoder, # our custom flow input encoder
enableTrace=True # Good for debugging!
))
orchestrator.add_agent(tech_support_flow)
# Call it directly using agent_process_request
response = orchestrator.agent_process_request(
input_text="What is AWS Lambda?",
user_id="123",
session_id="abc",
ClassifierResult(selected_agent=tech_support_flow, confidence=1.0)
)
print(response) # AWS Lambda is a serverless computing service provided by Amazon Web Services (AWS)....
# Call it again with a follow up question
response_next = await orchestrator.agent_process_request(
input_text="is it cost efficient?",
user_id="123",
session_id="abc",
ClassifierResult(selected_agent=tech_support_flow, confidence=1.0)
)
print(response_next) # Yes, AWS Lambda can be a cost-efficient solution in many cases. Here's why...
question
and history
are extracted from the Flow input node.1
2
3
4
5
6
7
You are a Health Agent, an advanced AI assistant specializing in
comprehensive health support and personalized wellness guidance.
Here is the conversation history:
{{history}}
Answer the following question: {{question}}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from multi_agent_orchestrator.orchestrator import MultiAgentOrchestrator, OrchestratorConfig
from multi_agent_orchestrator.agents import (
BedrockFlowsAgent,
BedrockFlowsAgentOptions)
# implement a custom Flow input encoder to include our question and history as object
def flow_input_encoder(agent:Agent, input: str, **kwargs) -> Any:
global tech_support_flow, health_agent_flow
if agent == tech_support_flow or \
agent == health_agent_flow:
chat_history:List[ConversationMessage] = kwargs.get('chat_history', [])
chat_history_string = '\n'.join(f"{message.role}:{message.content[0].get('text')}" for message in chat_history)
return {
"question": input,
"history":chat_history_string
}
else:
return input # return input as a string
# Create the orchestrator
orchestrator = MultiAgentOrchestrator(options=OrchestratorConfig(
LOG_AGENT_CHAT=True,
LOG_CLASSIFIER_OUTPUT=True
))
# Create multiple flow agents
tech_support_flow = BedrockFlowsAgent(BedrockFlowsAgentOptions(
name="tech-agent",
description="Handles technical requests about AWS services",
flowIdentifier='YOUR_FLOW_ID',
flowAliasIdentifier='YOUR_FLOW_ALIAS_ID',
flow_input_encoder=flow_input_encoder, # our custom flow input encoder
))
health_agent_flow = BedrockFlowsAgent(BedrockFlowsAgentOptions(
name="health-agent",
description="Specialized in comprehensive health support and personalized wellness guidance.",
flowIdentifier='HEALTH_FLOW_ID',
flowAliasIdentifier='HEALTH_FLOW_ALIAS',
flow_input_encoder=flow_input_encoder, # our custom flow input encoder
))
# Add all agents to the orchestrator
orchestrator.add_agent(tech_support_flow)
orchestrator.add_agent(health_agent_flow)
# Use it!
response = await orchestrator.route_request(
"How much sleep do I really need for optimal health?",
user_id="123",
session_id="abc"
)
response = await orchestrator.route_request(
"What is AWS Lambda?",
user_id="123",
session_id="abc"
)
- Flows that integrates conversation history
- Smart routing between different flows
- A clean way to mix flows with other agent types
Any opinions in this post are those of the individual author and may not reflect the opinions of AWS.