Skip to content

订阅和主题

Topic 是消息通道,Subscription 决定谁能接收


📖 核心概念

Topic (主题)

消息的逻辑通道,由 TopicId 标识。

python
topic_id = TopicId(type="news.tech", source="default")

Subscription (订阅)

决定哪些 Agent 能接收哪些 Topic 的消息。

python
subscription = TypeSubscription(
    topic_type="news.tech",    # 订阅的主题类型
    agent_type="tech_agent"    # 接收的 Agent 类型
)

1. Subscription 类型

TypeSubscription

订阅特定类型的 Topic。

python
# 订阅 news.tech 类型的所有消息
await runtime.add_subscription(
    TypeSubscription("news.tech", "tech_agent")
)

TypePrefixSubscription

订阅特定前缀的 Topic。

python
# 订阅所有以 "news." 开头的主题
await runtime.add_subscription(
    TypePrefixSubscription("news.", "news_agent")
)

DefaultSubscription

使用默认订阅配置。

python
@default_subscription
class MyAgent(RoutedAgent):
    pass

2. 添加订阅的时机

注册时自动添加

python
# 注册 Agent 时添加
await MyAgent.register(
    runtime,
    "my_agent",
    lambda: MyAgent("desc"),
    skip_class_subscriptions=False  # 添加类定义的订阅
)

手动添加

python
await runtime.add_subscription(
    TypeSubscription("topic_type", "agent_type")
)

3. 订阅模式

一对一

Agent A ---subscription---> Topic X

一对多

Topic X ---subscription---> Agent A
          ---subscription---> Agent B
          ---subscription---> Agent C

多对一

Topic X ---subscription--─┐
Topic Y ---subscription--─┼──> Agent A
Topic Z ---subscription--─┘

4. 消息流示例

发布消息                    接收消息
    ↓                          ↑
┌─────────────┐          ┌──────────┐
│ publish_    │  ──────→ │ 订阅匹配  │
│ message()   │  消息     │ 规则检查  │
└─────────────┘          └──────────┘

                          ┌──────────┐
                          │ Agent 处理│
                          └──────────┘

5. 代码示例

新闻订阅系统

python
from dataclasses import dataclass
from autogen_core import (
    RoutedAgent, message_handler, MessageContext,
    SingleThreadedAgentRuntime, TopicId,
    TypeSubscription, AgentId
)

@dataclass
class News:
    category: str
    headline: str

class NewsSubscriber(RoutedAgent):
    def __init__(self, categories: list[str]):
        super().__init__("新闻订阅者")
        self.categories = categories
        self.news_received = []

    @message_handler
    async def on_news(self, message: News, ctx: MessageContext):
        if message.category in self.categories:
            self.news_received.append(message)
            print(f"[{self.id}] 收到:{message.headline}")

async def main():
    runtime = SingleThreadedAgentRuntime()

    # 注册订阅者
    await NewsSubscriber.register(
        runtime,
        "tech_subscriber",
        lambda: NewsSubscriber(["tech", "ai"])
    )

    # 添加订阅
    await runtime.add_subscription(
        TypeSubscription("news.tech", "tech_subscriber")
    )

    runtime.start()

    # 发布消息
    await runtime.publish_message(
        News("tech", "新科技发布"),
        topic_id=TopicId("news.tech", "default")
    )

    await runtime.stop_when_idle()

6. 关键点

Subscription 的作用域

  • Subscription 是全局的,影响所有相同类型的 Agent
  • 一个 Agent 可以有多个 Subscription

Topic 匹配规则

  • TypeSubscription: 精确匹配 topic_type
  • TypePrefixSubscription: 前缀匹配

📝 练习

参考 02-发布订阅 示例代码进行实践。


🔗 相关链接

Released under the MIT License.