Skip to content

消息传递机制

RPC 消息 vs 广播消息


📖 两种消息类型

AutoGen 支持两种消息传递模式:

特性RPC 消息广播消息
方法send_message()publish_message()
响应期待返回不期待返回
模式一对一一对多
使用场景请求 - 响应通知、事件

1. RPC 消息

发送

python
response = await runtime.send_message(
    message,
    recipient=agent_id,
    sender=sender_id,  # 可选
    cancellation_token=token  # 可选
)

处理

python
@message_handler(match=lambda _, ctx: ctx.is_rpc)
async def on_rpc_message(self, message: MessageType, ctx: MessageContext):
    return response  # 返回响应

2. 广播消息(事件)

发送

python
await runtime.publish_message(
    message,
    topic_id=topic_id,
    sender=sender_id  # 可选
)

处理

python
@message_handler(match=lambda _, ctx: not ctx.is_rpc)
async def on_broadcast_message(self, message: MessageType, ctx: MessageContext):
    pass  # 不返回响应

3. TopicId 结构

python
topic_id = TopicId(
    type="news.tech",    # 主题类型
    source="default"     # 主题来源
)
  • type: 消息的逻辑分类
  • source: 消息的来源标识

4. 代码对比

RPC 示例

python
# 发送
response = await runtime.send_message(
    Greeting("你好"),
    recipient=AgentId("hello", "default")
)
print(f"收到响应:{response.content}")

# 接收方
@message_handler
async def on_greeting(self, message: Greeting, ctx: MessageContext):
    return Greeting("你好!")  # 返回

广播示例

python
# 发送
await runtime.publish_message(
    News("AI 突破!"),
    topic_id=TopicId("news.tech", "default")
)
# 没有响应

# 接收方
@message_handler
async def on_news(self, message: News, ctx: MessageContext):
    print(f"收到新闻:{message.headline}")
    # 不返回

5. 判断消息类型

在处理器中可以通过 ctx.is_rpc 判断:

python
@message_handler
async def on_message(self, message: Msg, ctx: MessageContext):
    if ctx.is_rpc:
        # RPC 消息,需要返回响应
        return Response(...)
    else:
        # 广播消息,不需要返回
        print(f"收到事件:{message}")

6. 最佳实践

使用 RPC 的场景

  • 需要获取结果
  • 一对一通信
  • 同步调用

使用广播的场景

  • 通知多个订阅者
  • 事件驱动架构
  • 不需要响应

📝 练习

参考 02-发布订阅 示例代码进行实践。


🔗 相关链接

Released under the MIT License.