路由和匹配
使用装饰器实现条件消息路由
📖 概述
RoutedAgent 允许根据消息内容或上下文条件,将消息路由到不同的处理器。
1. @message_handler 装饰器
基础用法
python
@message_handler
async def on_message(self, message: MessageType, ctx: MessageContext):
# 处理所有 MessageType 的消息
pass条件匹配
python
@message_handler(match=lambda msg, ctx: msg.priority == "high")
async def on_high_priority(self, message: TaskMessage, ctx: MessageContext):
# 只处理高优先级消息
pass2. 显式路由装饰器
@rpc - RPC 消息
python
from autogen_core import rpc
@rpc
async def on_rpc_message(self, message: Msg, ctx: MessageContext):
return Response(...) # 必须返回@event - 事件消息
python
from autogen_core import event
@event
async def on_event_message(self, message: Msg, ctx: MessageContext):
pass # 不返回3. 匹配函数
使用 lambda
python
@message_handler(match=lambda msg, ctx: msg.type == "urgent")
async def on_urgent(self, message: Msg, ctx: MessageContext):
pass使用静态方法
python
class MyAgent(RoutedAgent):
@staticmethod
def is_vip_message(msg: Message, ctx: MessageContext) -> bool:
return msg.user_id in VIP_USERS
@message_handler(match=is_vip_message)
async def on_vip_message(self, message: Message, ctx: MessageContext):
pass4. 路由优先级
当多个处理器都匹配时,第一个匹配的处理器会被调用。
python
class PriorityAgent(RoutedAgent):
# 第一个匹配的处理器
@message_handler(match=lambda msg, ctx: msg.priority == "critical")
async def on_critical(self, message: Msg, ctx: MessageContext):
print("紧急处理")
# 第二个匹配的处理器
@message_handler(match=lambda msg, ctx: msg.priority == "high")
async def on_high(self, message: Msg, ctx: MessageContext):
print("高优先级处理")
# 默认处理器(没有 match 条件)
@message_handler
async def on_default(self, message: Msg, ctx: MessageContext):
print("默认处理")5. 完整示例
优先级路由
python
from dataclasses import dataclass
from autogen_core import (
RoutedAgent, message_handler, MessageContext,
SingleThreadedAgentRuntime, AgentId, TypeSubscription
)
@dataclass
class TaskMessage:
priority: str # "high" | "low"
content: str
class PriorityAgent(RoutedAgent):
def __init__(self):
super().__init__("优先级 Agent")
self.high_count = 0
self.low_count = 0
@message_handler(match=lambda msg, ctx: msg.priority == "high")
async def on_high_priority(self, message: TaskMessage, ctx: MessageContext):
self.high_count += 1
print(f"[高优先级] {message.content}")
@message_handler(match=lambda msg, ctx: msg.priority == "low")
async def on_low_priority(self, message: TaskMessage, ctx: MessageContext):
self.low_count += 1
print(f"[低优先级] {message.content}")
async def main():
runtime = SingleThreadedAgentRuntime()
await PriorityAgent.register(runtime, "priority", lambda: PriorityAgent())
await runtime.add_subscription(TypeSubscription("tasks", "priority"))
runtime.start()
agent_id = AgentId("priority", "default")
await runtime.send_message(TaskMessage("high", "紧急任务"), recipient=agent_id)
await runtime.send_message(TaskMessage("low", "普通任务"), recipient=agent_id)
await runtime.stop_when_idle()6. MessageContext
MessageContext 提供:
is_rpc- 是否是 RPC 消息cancellation_token- 取消令牌sender- 发送者 ID
python
@message_handler
async def on_message(self, message: Msg, ctx: MessageContext):
if ctx.is_rpc:
# RPC 消息处理
return Response()
else:
# 广播消息处理
print(f"收到事件:{message}")7. 最佳实践
明确路由条件
python
# ✅ 好的实践:清晰的条件
@message_handler(match=lambda msg, ctx: msg.type == "urgent")
async def on_urgent(self, message: Msg, ctx: MessageContext):
pass
# ❌ 避免:过于复杂的条件
@message_handler(match=lambda msg, ctx: msg.type == "urgent" or (msg.priority > 5 and msg.user_id in [...]))使用显式装饰器
python
# 明确声明是 RPC 还是事件
@rpc
async def on_request(self, message: Msg, ctx: MessageContext):
return Response()
@event
async def on_notification(self, message: Msg, ctx: MessageContext):
pass📝 练习
参考 03-消息路由 示例代码进行实践。
🔗 相关链接
- [01-Agent 和 Runtime](01-Agent 和 Runtime)
- 02-消息传递机制
- [04-多 Agent 协作](04-多 Agent 协作)