Skip to content

路由和匹配

使用装饰器实现条件消息路由


📖 概述

RoutedAgent 允许根据消息内容或上下文条件,将消息路由到不同的处理器。


1. @message_handler 装饰器

基础用法

python
@message_handler
async def on_message(self, message: MessageType, ctx: MessageContext):
    # 处理所有 MessageType 的消息
    pass

条件匹配

python
@message_handler(match=lambda msg, ctx: msg.priority == "high")
async def on_high_priority(self, message: TaskMessage, ctx: MessageContext):
    # 只处理高优先级消息
    pass

2. 显式路由装饰器

@rpc - RPC 消息

python
from autogen_core import rpc

@rpc
async def on_rpc_message(self, message: Msg, ctx: MessageContext):
    return Response(...)  # 必须返回

@event - 事件消息

python
from autogen_core import event

@event
async def on_event_message(self, message: Msg, ctx: MessageContext):
    pass  # 不返回

3. 匹配函数

使用 lambda

python
@message_handler(match=lambda msg, ctx: msg.type == "urgent")
async def on_urgent(self, message: Msg, ctx: MessageContext):
    pass

使用静态方法

python
class MyAgent(RoutedAgent):
    @staticmethod
    def is_vip_message(msg: Message, ctx: MessageContext) -> bool:
        return msg.user_id in VIP_USERS

    @message_handler(match=is_vip_message)
    async def on_vip_message(self, message: Message, ctx: MessageContext):
        pass

4. 路由优先级

当多个处理器都匹配时,第一个匹配的处理器会被调用。

python
class PriorityAgent(RoutedAgent):
    # 第一个匹配的处理器
    @message_handler(match=lambda msg, ctx: msg.priority == "critical")
    async def on_critical(self, message: Msg, ctx: MessageContext):
        print("紧急处理")

    # 第二个匹配的处理器
    @message_handler(match=lambda msg, ctx: msg.priority == "high")
    async def on_high(self, message: Msg, ctx: MessageContext):
        print("高优先级处理")

    # 默认处理器(没有 match 条件)
    @message_handler
    async def on_default(self, message: Msg, ctx: MessageContext):
        print("默认处理")

5. 完整示例

优先级路由

python
from dataclasses import dataclass
from autogen_core import (
    RoutedAgent, message_handler, MessageContext,
    SingleThreadedAgentRuntime, AgentId, TypeSubscription
)

@dataclass
class TaskMessage:
    priority: str  # "high" | "low"
    content: str

class PriorityAgent(RoutedAgent):
    def __init__(self):
        super().__init__("优先级 Agent")
        self.high_count = 0
        self.low_count = 0

    @message_handler(match=lambda msg, ctx: msg.priority == "high")
    async def on_high_priority(self, message: TaskMessage, ctx: MessageContext):
        self.high_count += 1
        print(f"[高优先级] {message.content}")

    @message_handler(match=lambda msg, ctx: msg.priority == "low")
    async def on_low_priority(self, message: TaskMessage, ctx: MessageContext):
        self.low_count += 1
        print(f"[低优先级] {message.content}")

async def main():
    runtime = SingleThreadedAgentRuntime()
    await PriorityAgent.register(runtime, "priority", lambda: PriorityAgent())
    await runtime.add_subscription(TypeSubscription("tasks", "priority"))

    runtime.start()
    agent_id = AgentId("priority", "default")

    await runtime.send_message(TaskMessage("high", "紧急任务"), recipient=agent_id)
    await runtime.send_message(TaskMessage("low", "普通任务"), recipient=agent_id)

    await runtime.stop_when_idle()

6. MessageContext

MessageContext 提供:

  • is_rpc - 是否是 RPC 消息
  • cancellation_token - 取消令牌
  • sender - 发送者 ID
python
@message_handler
async def on_message(self, message: Msg, ctx: MessageContext):
    if ctx.is_rpc:
        # RPC 消息处理
        return Response()
    else:
        # 广播消息处理
        print(f"收到事件:{message}")

7. 最佳实践

明确路由条件

python
# ✅ 好的实践:清晰的条件
@message_handler(match=lambda msg, ctx: msg.type == "urgent")
async def on_urgent(self, message: Msg, ctx: MessageContext):
    pass

# ❌ 避免:过于复杂的条件
@message_handler(match=lambda msg, ctx: msg.type == "urgent" or (msg.priority > 5 and msg.user_id in [...]))

使用显式装饰器

python
# 明确声明是 RPC 还是事件
@rpc
async def on_request(self, message: Msg, ctx: MessageContext):
    return Response()

@event
async def on_notification(self, message: Msg, ctx: MessageContext):
    pass

📝 练习

参考 03-消息路由 示例代码进行实践。


🔗 相关链接

  • [01-Agent 和 Runtime](01-Agent 和 Runtime)
  • 02-消息传递机制
  • [04-多 Agent 协作](04-多 Agent 协作)

Released under the MIT License.