AgentScope基础学习


AgentScope

AgentScope 是阿里巴巴通义实验室开源的企业级多智能体开发框架,核心理念是 "build and run agents you can see, understand and trust"(构建你看得见、听得懂、信得过的智能体)。目前已在 GitHub 收获超过 18k Star,提供从研发到生产的完整工程体系,让 AI Agent 开发从 "能跑" 到 "能用"。

官方文档:AgentScope

1. 核心概念

1.1 消息

消息(message)是 AgentScope 最核心的数据结构,用于

  • 在智能体之间交换信息,
  • 在用户交互界面显示信息,
  • 在记忆中存储信息,
  • 作为 AgentScope 与不同 LLM API 之间的统一媒介。

一个消息由四个字段组成:

字段 类型 描述
name str 消息发送者的名称/身份
role Literal["system","assistant","user"] 消息发送者的角色,必须是 "system"、"assistant" 或 "user" 之一。
content str | list[ContentBlock] 消息包含的数据,可以是字符串或 block 的列表。
metadata dict[str, JSONSerializableObject] | None 包含额外元数据的字典,通常用于结构化输出。

1.1.1文本消息

content字段中支持不同的block数据结构,直接输入文本属于文本消息。例如:

from agentscope.message import Msg

msg = Msg(
    name="游戏主持人",
    role="system",
    content="游戏开始!",
)

1.1.2多模态消息

message 类通过提供不同的 block 结构来支持多模态内容:

描述
TextBlock 纯文本数据
ImageBlock 图像数据
AudioBlock 音频数据
VideoBlock 视频数据

ImageBlockAudioBlockVideoBlock,还可以使用 base64 编码的字符串作为数据源(Source)。

from agentscope.message import (
    Msg,
    Base64Source,
    TextBlock,
    ThinkingBlock,
    ImageBlock,
    AudioBlock,
    VideoBlock,
    ToolUseBlock,
    ToolResultBlock,
)

msg = Msg(
    name="Jarvis",
    role="assistant",
    content=[
        TextBlock(
            type="text",
            text="这是一个包含 base64 编码数据的多模态消息。",
        ),
        ImageBlock(
            type="image",
            source=Base64Source(
                type="base64",
                media_type="image/jpeg",
                data="/9j/4AAQSkZ...",
            ),
        ),
        AudioBlock(
            type="audio",
            source=Base64Source(
                type="base64",
                media_type="audio/mpeg",
                data="SUQzBAAAAA...",
            ),
        ),
        VideoBlock(
            type="video",
            source=Base64Source(
                type="base64",
                media_type="video/mp4",
                data="AAAAIGZ0eX...",
            ),
        ),
    ],
)

详细介绍见下方:

(1)TextBlock

示例:

TextBlock(
   type="text",
   text="Hello, world!"
)

(2)ImageBlock

示例:

# 普通示例
ImageBlock(
   type="image",
   source=URLSource(
       type="url",
       url="https://example.com/image.jpg"
   )
)
# base64示例
ImageBlock(
    type="image",
    source=Base64Source(
        type="base64",
        media_type="image/jpeg",
        data="/9j/4AAQSkZ...",
    ),
)

(3)AudioBlock

示例:

# 普通示例
AudioBlock(
   type="audio",
   source=URLSource(
       type="url",
       url="https://example.com/audio.mp3"
   )
)
# base64示例
AudioBlock(
    type="audio",
    source=Base64Source(
        type="base64",
        media_type="audio/mpeg",
        data="SUQzBAAAAA...",
    ),
)

(4)VideoBlock

示例:

VideoBlock(
   type="video",
   source=URLSource(
       type="url",
       url="https://example.com/video.mp4"
   )
)

1.1.3推理消息

ThinkingBlock 用于支持推理模型,包含模型的思考过程。

msg_thinking = Msg(
    name="Jarvis",
    role="assistant",
    content=[
        ThinkingBlock(
            type="thinking",
            thinking="我正在为 AgentScope 构建一个思考块的示例。",
        ),
        TextBlock(
            type="text",
            text="这是一个思考块的示例。",
        ),
    ],
)

1.1.4工具使用和工具结果消息

ToolUseBlockToolResultBlock 用于支持工具 API:

msg_tool_call = Msg(
    name="Jarvis",
    role="assistant",
    content=[
        ToolUseBlock(
            type="tool_use",
            id="343",
            name="get_weather",
            input={
                "location": "Beijing",
            },
        ),
    ],
)

msg_tool_res = Msg(
    name="system",
    role="system",
    content=[
        ToolResultBlock(
            type="tool_result",
            id="343",
            name="get_weather",
            output="北京的天气是晴天,温度为 25°C。",
        ),
    ],
)

AgentScope 中,通常使用 role 为“system”的消息来记录工具函数的执行结果。

1.1.5 序列化与反序列化

消息对象可以分别通过 to_dictfrom_dict 方法进行序列化和反序列化。

序列化:

serialized_msg = msg.to_dict()

print(type(serialized_msg))
print(json.dumps(serialized_msg, indent=4, ensure_ascii=False))

输出:

<class 'dict'>
{
    "id": "gH8zDp9YSfLSKyGppA2QkL",
    "name": "Jarvis",
    "role": "assistant",
    "content": [
        {
            "type": "text",
            "text": "这是一个包含 base64 编码数据的多模态消息。"
        },
        {
            "type": "image",
            "source": {
                "type": "base64",
                "media_type": "image/jpeg",
                "data": "/9j/4AAQSkZ..."
            }
        },
        {
            "type": "audio",
            "source": {
                "type": "base64",
                "media_type": "audio/mpeg",
                "data": "SUQzBAAAAA..."
            }
        },
        {
            "type": "video",
            "source": {
                "type": "base64",
                "media_type": "video/mp4",
                "data": "AAAAIGZ0eX..."
            }
        }
    ],
    "metadata": {},
    "timestamp": "2026-03-12 03:27:29.431"
}

反序列化:

new_msg = Msg.from_dict(serialized_msg)

print(type(new_msg))
print(f'消息的发送者: "{new_msg.name}"')
print(f'发送者的角色: "{new_msg.role}"')
print(f'消息的内容: "{json.dumps(new_msg.content, indent=4, ensure_ascii=False)}"')

输出:

<class 'agentscope.message._message_base.Msg'>
消息的发送者: "Jarvis"
发送者的角色: "assistant"
消息的内容: "[
    {
        "type": "text",
        "text": "这是一个包含 base64 编码数据的多模态消息"
    },
    {
        "type": "image",
        "source": {
            "type": "base64",
            "media_type": "image/jpeg",
            "data": "/9j/4AAQSkZ..."
        }
    },
    {
        "type": "audio",
        "source": {
            "type": "base64",
            "media_type": "audio/mpeg",
            "data": "SUQzBAAAAA..."
        }
    },
    {
        "type": "video",
        "source": {
            "type": "base64",
            "media_type": "video/mp4",
            "data": "AAAAIGZ0eX..."
        }
    }
]"

1.1.6Msg函数

函数 参数 描述
get_text_content 将所有 TextBlock 中的内容收集到单个字符串中(用 "\n" 分隔)。
get_content_blocks block_type 返回指定类型的内容块列表。如果未提供 block_type,则以块格式返回全部内容。
has_content_blocks block_type 检查消息是否具有指定类型的内容块。str 内容会被视为 TextBlock 类型。

1.2 工具

1.2.1工具函数

工具函数应该返回一个 ToolResponse 对象或产生 ToolResponse 对象的生成器,并且需要具有描述工具功能和参数的文档字符串注释

def tool_function(a: int, b: str) -> ToolResponse:
    """{函数描述}

    Args:
        a (int):
            {第一个参数的描述}
        b (str):
            {第二个参数的描述}
    """

实例方法和类方法也可以用作工具函数,Toolkit 中将自动忽略 selfcls 参数。

agentscope.tool 模块下提供了几个内置工具函数,如 execute_python_codeexecute_shell_command等。完整的内置工具函数如下表格:

函数名 核心功能 典型使用场景
代码 / 命令执行类
execute_python_code 安全沙箱中执行 Python 代码片段,返回执行结果(输出 / 异常) 智能体需要做数据计算、逻辑验证、简单脚本运行(如批量处理数据)
execute_shell_command 执行系统 Shell 命令(支持 Windows/Linux/macOS),返回命令输出 / 错误 智能体需要操作系统(如查看文件列表、启动服务、安装依赖、读取系统信息)
文件操作类
view_text_file 读取文本文件内容(支持指定编码、读取指定行数) 智能体需要查看配置文件、日志文件、代码文件、文档内容
write_text_file 覆盖式写入文本文件(若文件不存在则创建,支持指定编码) 智能体生成报告、保存配置、创建新的文本文件(如生成 Markdown 文档)
insert_text_file 向文本文件指定位置插入内容(不覆盖原有内容,支持行号定位) 智能体补充文档内容、修改配置文件(如在代码文件中插入函数、在日志中追加记录)
通义千问(DashScope)多媒体类
dashscope_text_to_image 调用通义千问文生图 API,将文本描述转为图片(支持指定尺寸、风格) 智能体根据文字生成插画、海报、示意图(如 “生成一张科技感的 AI 智能体流程图”)
dashscope_text_to_audio 调用通义千问文本转语音 API,将文字转为语音音频(支持指定音色、语速) 智能体生成语音播报、有声读物、语音提示(如将生成的报告转为语音)
dashscope_image_to_text 调用通义千问图生文 API,解析图片内容(OCR + 语义描述) 智能体识别图片中的文字、描述图片内容、分析图片中的图表数据
OpenAI 多媒体类
openai_text_to_image 调用 OpenAI DALL・E API,文本生成图片(支持分辨率、风格、数量) 智能体生成创意图片、产品原型图、场景示意图
openai_text_to_audio 调用 OpenAI TTS API,文本转语音(支持不同语音模型、语速、音量) 生成自然度更高的英文语音、多语种语音播报
openai_edit_image 调用 OpenAI Image Edit API,编辑图片(基于蒙版 + 文本指令修改指定区域) 智能体修改图片局部内容(如 “把图片中的红色按钮改成蓝色”)
openai_create_image_variation 调用 OpenAI Image Variation API,生成图片的变体(保留风格,修改细节) 智能体对现有图片做风格微调、生成多张相似但不同的图片
openai_image_to_text 调用 OpenAI GPT-4V API,解析图片内容(复杂场景理解、多模态分析) 智能体分析图片中的复杂图表、识别手写公式、理解图片中的场景逻辑
openai_audio_to_text 调用 OpenAI Whisper API,语音转文字(支持多语种、实时转录、识别说话人) 智能体转录语音文件、实时解析语音对话、生成音频字幕

1.2.2工具模块

Toolkit 类设计用于管理工具函数,从文档字符串中提取它们的 JSON Schema,并为工具执行提供统一接口。

Toolkit 类的基本功能是注册工具函数并执行它们。

# 准备一个自定义工具函数
async def my_search(query: str, api_key: str) -> ToolResponse:
    """一个简单的示例工具函数。

    Args:
        query (str):
            搜索查询。
        api_key (str):
            用于身份验证的 API 密钥。
    """
    return ToolResponse(
        content=[
            TextBlock(
                type="text",
                text=f"正在使用 API 密钥 '{api_key}' 搜索 '{query}'",
            ),
        ],
    )


# 在工具模块中注册工具函数
toolkit = Toolkit()
toolkit.register_tool_function(my_search)

注册工具函数后,可以通过调用 get_json_schemas 方法获取其 JSON Schema。

print("工具 JSON Schemas:")
print(json.dumps(toolkit.get_json_schemas(), indent=4, ensure_ascii=False))

输出:

工具 JSON Schemas
[
    {
        "type": "function",
        "function": {
            "name": "my_search",
            "parameters": {
                "properties": {
                    "query": {
                        "description": "搜索查询。",
                        "type": "string"
                    },
                    "api_key": {
                        "description": "用于身份验证的 API 密钥。",
                        "type": "string"
                    }
                },
                "required": [
                    "query",
                    "api_key"
                ],
                "type": "object"
            },
            "description": "一个简单的示例工具函数。"
        }
    }
]

1.3智能体

在 AgentScope 中,智能体(Agent)行为被抽象为 AgentBase 类中的三个核心函数:

  • reply:处理传入的消息并生成响应消息。
  • observe:接收来自环境或其它智能体的消息,但不返回响应。
  • print:将消息显示到目标输出(例如终端、Web 界面)。

为了支持用户实时介入(Realtime Steering),AgentScope 提供了额外 的 handle_interrupt 函数来处理智能体回复过程中的用户中断。

此外,ReAct 智能体是 AgentScope 中最重要的智能体,该智能体的回复过程分为两个阶段:

  • 推理(Reasoning):通过调用 LLM 进行推理并生成工具调用
  • 行动(Acting):执行工具函数。

因此, ReActAgentBase 类中提供了两个额外的核心函数,_reasoning_acting

1.3.1 ReAct智能体

AgentScope 在 agentscope.agent 模块下提供了开箱即用的 ReAct 智能体 ReActAgent 供开发者使用。

基础功能

支持围绕 replyobserveprint_reasoning_acting钩子函数(hooks)支持结构化输出

实时介入(Realtime Steering):

支持用户 中断支持自定义 中断处理,基于 asyncio 取消机制实现了该功能。中断逻辑已在 AgentBase 类中作为基本功能实现。

工具:

支持 同步/异步 工具函数支持 流式 工具响应支持 状态化 的工具管理支持 并行 工具调用支持 MCP 服务器

记忆:

支持智能体 自主管理 长期记忆支持“静态”的长期记忆管理

ReActAgent类参数:

参数 描述
name (必需) 智能体的名称
sys_prompt (必需) 智能体的系统提示
model (必需) 智能体用于生成响应的模型
formatter (必需) 提示构建策略,应与使用的模型保持一致
toolkit 用于注册/调用工具函数的工具模块
memory 用于存储对话历史的短期记忆
long_term_memory 长期记忆
long_term_memory_mode 长期记忆的管理模式:agent_control: 允许智能体通过工具函数自己控制长期记忆static_control: 在每次回复(reply)的开始/结束时,会自动从长期记忆中检索/记录信息both: 同时激活上述两种模式
enable_meta_tool 是否启用元工具(Meta tool),即允许智能体自主管理工具函数
parallel_tool_calls 是否允许并行工具调用
max_iters 智能体生成响应的最大迭代次数
plan_notebook 计划模块,允许智能体制定和管理计划与子任务
print_hint_msg 是否在终端打印 plan_notebook 生成的提示消息

对于formatter参数:

在 AgentScope 中,有两种类型的格式化器:"ChatFormatter" 和 "MultiAgentFormatter"。

主要负责:将 Msg 对象转换为不同 LLM API 要求的格式,(可选)截断消息以适应 max_token 的限制,(可选)执行提示工程,例如对长对话进行总结。

(1)ChatFormatter:专为标准的用户-助手场景(聊天机器人)设计,使用 role 字段来识别用户和助手。

(2)MultiAgentFormatter:专为多智能体场景设计,使用 name 字段来识别不同的实体,在格式化的过程中会将多智能体的对话历史合并为单个消息。

API 提供商 用户-助手场景 多智能体场景
OpenAI OpenAIChatFormatter OpenAIMultiAgentFormatter
Anthropic AnthropicChatFormatter AnthropicMultiAgentFormatter
DashScope DashScopeChatFormatter DashScopeMultiAgentFormatter
Gemini GeminiChatFormatter GeminiChatFormatter
Ollama OllamaChatFormatter OllamaMultiAgentFormatter
DeepSeek DeepSeekChatFormatter DeepSeekMultiAgentFormatter
vLLM OpenAIFormatter OpenAIFormatter

以 DashScope API 为例,创建一个智能体对象:

import asyncio
import os

from agentscope.agent import ReActAgent
from agentscope.formatter import DashScopeChatFormatter
from agentscope.memory import InMemoryMemory
from agentscope.message import Msg
from agentscope.model import DashScopeChatModel
from agentscope.tool import Toolkit, execute_python_code


async def creating_react_agent() -> None:
    """创建一个 ReAct 智能体并运行一个简单任务。"""
    # 准备工具
    toolkit = Toolkit()
    toolkit.register_tool_function(execute_python_code)  # 内置工具函数execute_python_code

    jarvis = ReActAgent(
        name="Jarvis",
        sys_prompt="你是一个名为 Jarvis 的助手",
        model=DashScopeChatModel(
            model_name="qwen-max",
            api_key=os.environ["DASHSCOPE_API_KEY"],
            stream=True,
            enable_thinking=False,
        ),
        formatter=DashScopeChatFormatter(),
        toolkit=toolkit,
        memory=InMemoryMemory(),
    )

    msg = Msg(
        name="user",
        content="你好!Jarvis,用 Python 运行 Hello World。",
        role="user",
    )

    await jarvis(msg)


asyncio.run(creating_react_agent())

1.3.2 自定义智能体

AgentScope 提供了两个基类:AgentBaseReActAgentBase,它们在抽象方法和支持的钩子函数方面有所不同。 具体来说,ReActAgentBase 扩展了 AgentBase,增加了额外的 _reasoning_acting 抽象方法,以及它们的前置和后置钩子函数。开发者可以根据需要选择继承这两个基类中的任一个。

抽象方法 支持的钩子函数 描述
AgentBase reply
observe
print
handle_interrupt
pre_/post_replypre_/post_observepre_/post_print 所有智能体的基类,提供基本接口和钩子。
ReActAgentBase reply
observe
print
handle_interrupt
_reasoning
_acting
pre_/post_replypre_/post_observepre_/post_printpre_/post_reasoningpre_/post_acting ReAct 类智能体的抽象类,扩展了 AgentBase,增加了 _reasoning_acting 抽象方法及其钩子。
ReActAgent - pre_/post_replypre_/post_observepre_/post_printpre_/post_reasoningpre_/post_acting ReActAgentBase 的实现
UserAgent 代表用户的特殊智能体,用于与智能体交互
A2aAgent - pre_/post_replypre_/post_observepre_/post_print 用于与远程 A2A 代理通信的智能体

1.3.3 钩子函数

钩子(Hook)是 AgentScope 中的扩展点,允许开发者在特定位置自定义智能体行为,提供了一种灵活的方式来修改或扩展智能体的功能,而无需更改其核心实现。

AgentScope 中支持的钩子类型:

AgentBase自带 3 组、共 6 个钩子点位,ReActAgentBase额外多 2 组、4 个专属钩子点位。

智能体类 核心函数 钩子类型 描述
AgentBase 及其子类 reply pre_reply
post_reply
智能体回复消息前/后的钩子
print pre_print
post_print
向目标输出(如终端、Web 界面)打印消息前/后的钩子
observe pre_observe``post_observe 从环境或其它智能体观察消息前/后的钩子
ReActAgentBase 及其子类 reply
print
observe
pre_reply
post_reply
pre_print
post_print
pre_observe
post_observe
_reasoning pre_reasoning
post_reasoning
智能体推理过程前/后的钩子
_acting pre_acting
post_acting
智能体行动过程前/后的钩子

(1)前置钩子

所有pre_开头的前置钩子,入参、返回值规则完全一致,核心作用是修改核心函数的输入参数

def 前置钩子名(self, kwargs: dict[str, Any]) -> dict[str, Any] | None:
    # 你的自定义逻辑
    pass
部分 说明
self 就是当前的智能体实例本身,你可以通过 self 拿到智能体的所有属性(比如记忆、名称、模型配置)
kwargs 核心函数的所有入参,都会被打包成这个字典。比如 reply 函数的入参是 msg,那 kwargs 里就有 "msg" 这个 key,直接取 kwargs ["msg"] 就能拿到消息对象
返回值 两种选择:1. 想修改输入:返回修改后的 kwargs 字典,会传给下一个钩子 / 核心函数2. 不想修改:直接返回 None,系统会沿用之前的参数

示例:

def pre_reply_filter(self, kwargs: dict[str, Any]) -> dict[str, Any] | None:
    # 拿到用户输入的消息
    user_msg = kwargs["msg"]
    # 自定义逻辑:敏感词替换
    user_msg.content = user_msg.content.replace("脏话", "**")
    # 返回修改后的参数
    return {**kwargs, "msg": user_msg}

(2)后置钩子

所有post_开头的后置钩子,规则统一,核心作用是修改核心函数的输出结果

def 后置钩子名(self, kwargs: dict[str, Any], output: Any) -> Any | None:
    # 你的自定义逻辑
    pass
部分 说明
self 同上,智能体实例本身
kwargs 核心函数的原始入参字典(和前置钩子的入参一致,仅做参考,修改它不会影响已经执行完的核心函数)
output 核心函数执行完的原始输出结果,比如 reply 函数返回的 Msg 对象,就是这里的 output
返回值 两种选择:1. 想修改输出:返回修改后的 output,会传给下一个钩子 / 最终返回2. 不想修改:直接返回 None,系统会沿用之前的输出

示例:

def post_reply_add_disclaimer(self, kwargs: dict[str, Any], output: Any) -> Any | None:
    # 拿到智能体的原始回复
    reply_msg = output
    # 加免责声明
    reply_msg.content += "\n\n以上内容仅为AI生成,不构成任何建议"
    # 返回修改后的输出
    return reply_msg

(3)钩子管理

实例级和类级钩子两者的核心区别是钩子的生效范围不一样

类型 生效范围 适用场景
实例级钩子 仅对你注册的这一个智能体对象生效,同个类的其他实例完全不受影响 给某个特定智能体加专属逻辑,比如只给客服智能体加敏感词过滤
类级钩子 这个类的所有实例生效,包括已经创建的、还没创建的所有对象 给全量智能体加通用功能,比如给所有智能体加请求日志、耗时统计

当同一个点位,既有实例钩子、又有类钩子、还有多个钩子时,执行顺序是固定的:

调用核心函数比如reply

前置钩子阶段先执行实例级前置钩子按注册顺序先注册先执行

前置钩子阶段再执行类级前置钩子按注册顺序先注册先执行

执行核心函数本身的逻辑

后置钩子阶段先执行实例级后置钩子按注册顺序先注册先执行

后置钩子阶段再执行类级后置钩子按注册顺序先注册先执行

返回最终结果

钩子流程

同一个点位可以注册多个钩子,它们会链式传递结果:

  • 前置钩子:前一个钩子返回的非 None 的 kwargs,会直接传给下一个钩子当入参;如果返回 None,下一个钩子就用最近一次非 None 的结果。
  • 后置钩子:前一个钩子返回的非 None 的 output,会直接传给下一个钩子当 output 参数;如果返回 None,下一个钩子就用最近一次非 None 的结果。

不要在钩子内调用核心函数(reply/speak/observe/_reasoning/_acting)以避免循环调用。

AgentScope 中的内置钩子管理方法:

级别 方法 描述
实例级 register_instance_hook 为当前对象注册具有给定钩子类型和名称的钩子。
remove_instance_hook 移除当前对象具有给定钩子类型和名称的钩子。
clear_instance_hooks 清除当前对象具有给定钩子类型的所有钩子。
类级 register_class_hook 为该类的所有对象注册具有给定钩子类型和名称的钩子。
remove_class_hook 移除该类所有对象具有给定钩子类型和名称的钩子。
clear_class_hooks 清除该类所有对象具有给定钩子类型的所有钩子。

1.4 记忆

AgentScope 中的记忆模块负责:

  • 存储消息对象(Msg
  • 利用标记(mark)管理消息

标记 是与记忆中每条消息关联的字符串标签,可用于根据消息的上下文或目的对消息进行分类、过滤和检索。 可用于实现进阶的记忆管理功能,例如在 ReActAgent 类中,使用hint标签标记一次性的提示消息, 以便在使用完成后将其从记忆中删除。

AgentScope 中的内置记忆类:

描述
InMemoryMemory 简单的内存记忆存储实现。
AsyncSQLAlchemyMemory 基于异步 SQLAlchemy 的记忆存储实现,支持如 SQLite、PostgreSQL、MySQL 等多种关系数据库。
RedisMemory 基于 Redis 的记忆存储实现。

1.4.1 内存记忆

async def in_memory_example():
    """使用InMemoryMemory在内存中存储消息的示例。"""
    memory = InMemoryMemory()
    await memory.add(
        Msg("Alice", "生成一份关于AgentScope的报告", "user"),
    )

    # 添加一条带有标记"hint"的提示消息
    await memory.add(
        [
            Msg(
                "system",
                "<system-hint>首先创建一个计划来收集信息,然后逐步生成报告。</system-hint>",
                "system",
            ),
        ],
        marks="hint",   # 标记一次性提示消息
    )

    msgs = await memory.get_memory(mark="hint")
    print("带有标记'hint'的消息:")
    for msg in msgs:
        print(f"- {msg}")

    # 所有存储的消息都可以通过 ``state_dict`` 和 ``load_state_dict`` 方法导出和加载。
    state = memory.state_dict()
    print("记忆的状态字典:")
    print(json.dumps(state, indent=2, ensure_ascii=False))

    # 通过标记删除消息
    deleted_count = await memory.delete_by_mark("hint")
    print(f"删除了 {deleted_count} 条带有标记'hint'的消息。")

    print("删除后的记忆状态字典:")
    state = memory.state_dict()
    print(json.dumps(state, indent=2, ensure_ascii=False))


asyncio.run(in_memory_example())

输出:

带有标记'hint'的消息
- Msg(id='XqLgHiuTEPEYtN3pM5EUDV', name='system', content='<system-hint>首先创建一个计划来收集信息,然后逐步生成报告。</system-hint>', role='system', metadata={}, timestamp='2026-03-12 03:30:48.641', invocation_id='None')
记忆的状态字典
{
  "_compressed_summary": "",
  "content": [
    [
      {
        "id": "Agmg28QmfUuyqBcc2sMucP",
        "name": "Alice",
        "role": "user",
        "content": "生成一份关于AgentScope的报告",
        "metadata": {},
        "timestamp": "2026-03-12 03:30:48.641"
      },
      []
    ],
    [
      {
        "id": "XqLgHiuTEPEYtN3pM5EUDV",
        "name": "system",
        "role": "system",
        "content": "<system-hint>首先创建一个计划来收集信息,然后逐步生成报告。</system-hint>",
        "metadata": {},
        "timestamp": "2026-03-12 03:30:48.641"
      },
      [
        "hint"
      ]
    ]
  ]
}
删除了 1 条带有标记'hint'的消息
删除后的记忆状态字典
{
  "_compressed_summary": "",
  "content": [
    [
      {
        "id": "Agmg28QmfUuyqBcc2sMucP",
        "name": "Alice",
        "role": "user",
        "content": "生成一份关于AgentScope的报告",
        "metadata": {},
        "timestamp": "2026-03-12 03:30:48.641"
      },
      []
    ]
  ]
}

1.4.2 关系数据库记忆

AgentScope 通过 SQLAlchemy 提供统一的接口来使用关系数据库,支持:

  • 多种数据库,如 SQLite、PostgreSQL、MySQL 等
  • 用户和会话管理
  • 生产环境中的连接池

以SQLite支持的记忆为例:

async def sqlalchemy_example() -> None:
    """使用 AsyncSQLAlchemyMemory 在 SQLite 数据库中存储消息的示例。"""

    # 首先创建一个异步 SQLAlchemy 引擎
    engine = create_async_engine("sqlite+aiosqlite:///./test_memory.db")

    # 然后使用该引擎创建记忆
    memory = AsyncSQLAlchemyMemory(
        engine_or_session=engine,
        # 可选传入指定user_id和session_id
        user_id="user_1",
        session_id="session_1",
    )

    await memory.add(
        Msg("Alice", "生成一份关于AgentScope的报告", "user"),
    )

    await memory.add(
        [
            Msg(
                "system",
                "<system-hint>首先创建一个计划来收集信息,然后逐步生成报告。</system-hint>",
                "system",
            ),
        ],
        marks="hint",
    )

    msgs = await memory.get_memory(mark="hint")
    print("带有标记'hint'的消息:")
    for msg in msgs:
        print(f"- {msg}")

    # 完成后关闭引擎
    await memory.close()


asyncio.run(sqlalchemy_example())

输出:

带有标记'hint'的消息
- Msg(id='BWboEGSoABPMkkseBtoLZD', name='system', content='<system-hint>首先创建一个计划来收集信息,然后逐步生成报告。</system-hint>', role='system', metadata={}, timestamp='2026-03-12 03:30:48.683', invocation_id='None')

1.4.3 NoSQL数据库记忆

示例:

async def redis_memory_example() -> None:
    """使用 RedisMemory 在 Redis 中存储消息的示例。"""
    # 使用fakeredis进行内存测试,无需真实的 Redis 服务器
    fake_redis = fakeredis.aioredis.FakeRedis(decode_responses=True)
    # 创建 Redis 记忆
    memory = RedisMemory(
        # 使用fake redis进行演示
        connection_pool=fake_redis.connection_pool,
        # 也可以通过指定主机和端口连接到真实的Redis服务器
        # host="localhost",
        # port=6379,
        # 可选地指定 user_id 和 session_id
        user_id="user_1",
        session_id="session_1",
    )

    # 向记忆中添加消息
    await memory.add(
        Msg(
            "Alice",
            "生成一份关于AgentScope的报告",
            "user",
        ),
    )

    # 添加一条带有标记"hint"的提示消息
    await memory.add(
        Msg(
            "system",
            "<system-hint>首先创建一个计划来收集信息,然后逐步生成报告。</system-hint>",
            "system",
        ),
        marks="hint",
    )

    # 检索带有标记"hint"的消息
    msgs = await memory.get_memory(mark="hint")
    print("带有标记'hint'的消息:")
    for msg in msgs:
        print(f"- {msg}")


asyncio.run(redis_memory_example())

输出:

带有标记'hint'的消息
- Msg(id='ZkPMW3LtzGnKEq6LEBTDfK', name='system', content='<system-hint>首先创建一个计划来收集信息,然后逐步生成报告。</system-hint>', role='system', metadata={}, timestamp='2026-03-12 03:30:48.710', invocation_id='None')

1.4.4 自定义记忆

要自定义您自己的记忆实现,只需从 MemoryBase 继承并实现以下方法:

方法 描述
add 向记忆中添加 Msg 对象
delete 从记忆中删除 Msg 对象
delete_by_mark 通过标记从记忆中删除 Msg 对象
size 记忆的大小
clear 清空记忆内容
get_memory Msg 对象列表的形式获取记忆内容
update_messages_mark 更新记忆中消息的标记
state_dict 获取记忆的状态字典
load_state_dict 加载记忆的状态字典

2. 管道

当用多个 AI 智能体做项目时,一定会遇到这些麻烦:

  • 做群聊场景:A 说的话,要手动发给 B、C、D,智能体越多代码越乱
  • 做工作流:要让「策划→文案→设计→开发」按顺序干活,前一个的结果传给后一个,得一行行写重复的 await
  • 做专家评审:同一个问题要问多个 AI,得自己写循环、处理并发 / 串行、拷贝消息
  • 做流式展示:想实时拿到大模型逐字输出的内容,不知道怎么捕获

agentscope.pipeline 模块,就是专门解决这些问题的语法糖工具集,核心包含 4 大组件:

  1. MsgHub:多智能体消息自动广播中心
  2. 顺序管道:按固定顺序串联执行智能体
  3. 扇出管道:同一份输入分发给多个智能体,批量执行
  4. 流式消息工具:实时捕获智能体的流式输出内容

2.1 MsgHub

MsgHub,多智能体消息自动广播中心, MsgHub 类是一个 异步上下文管理器,它接收一个智能体列表作为其参与者。 当一个参与者生成回复消息时,将通过调用所有其他参与者的 observe 方法广播该消息。 这意味着在 MsgHub 上下文中,开发者无需手动将回复消息从一个智能体发送到另一个智能体。

示例:

  1. 先创建 4 个智能体:Alice、Bob、Charlie、David
  2. 把前 3 个放进 MsgHub 里,让它们自我介绍
# 让Alice、Bob、Charlie自我介绍
async def example_broadcast_message():
    """使用 MsgHub 广播消息的示例。"""

    # 创建消息中心
    async with MsgHub(
        participants=[alice, bob, charlie],
        announcement=Msg(
            "user",
            "现在请简要介绍一下自己,包括你的姓名、年龄和职业。",
            "user",
        ),
    ) as hub:
        # 无需手动消息传递的群聊
        await alice()
        await bob()
        await charlie()


asyncio.run(example_broadcast_message())


# 询问Alice之外的其他人是否知道Alice信息
async def check_broadcast_message():
    """检查消息是否正确广播。"""
    user_msg = Msg(
        "user",
        "你知道 Alice 是谁吗,她是做什么的?",
        "user",
    )

    await bob(user_msg)
    await charlie(user_msg)
    await david(user_msg)


asyncio.run(check_broadcast_message())

结果:

Bob 和 Charlie 在 MsgHub 里,收到了 Alice 的自我介绍,后续问它们,能说出 Alice 的信息

David 没在 MsgHub 里,完全不知道 Alice 是谁,完美体现了广播的范围控制

2.2 动态管理

MsgHub 支持在运行中修改参与的智能体:

  • hub.add(xxx):添加新的智能体(新加入的智能体收不到之前的历史消息
  • hub.delete(xxx):移除智能体,之后不会再收到广播
  • hub.broadcast(消息):手动给所有当前参与者发一条消息

只有在async with MsgHub(...)代码块里的消息,才会被广播,块外面的对话不会自动同步。

async with MsgHub(participants=[alice]) as hub:
    # 添加新参与者
    hub.add(david)

    # 移除参与者
    hub.delete(alice)

    # 向所有当前参与者广播
    await hub.broadcast(
        Msg("system", "现在我们开始...", "system"),
    )

2.3 顺序管道

顺序管道(Sequential Pipeline)会让智能体列表顺序逐个执行,前一个智能体的输出,自动作为下一个智能体的输入,一行代码就能搞定整个线性流程。

函数式写法:

from agentscope.pipeline import sequential_pipeline

msg = await sequential_pipeline(
    # 按顺序执行的智能体列表
    agents=[alice, bob, charlie, david],
    # 第一个输入消息,可以是 None
    msg=None
)

类式写法:

from agentscope.pipeline import SequentialPipeline

# 先创建管道对象,定义好执行顺序
pipeline = SequentialPipeline(agents=[alice, bob, charlie, david])

# 第一次调用,初始输入为None
msg = await pipeline(msg=None)

# 换个输入,复用同一个管道,不用重新定义顺序
msg2 = await pipeline(msg=Msg("user", "你好!", "user"))

2.4 扇出管道

扇出管道(Fanout Pipeline)会把同一份输入消息,分发给列表里的所有智能体,自动收集所有智能体的回复结果,一行代码搞定批量执行。

函数式写法:

from agentscope.pipeline import fanout_pipeline

msgs = await fanout_pipeline(
    # 要执行的智能体列表
    agents=[alice, bob, charlie, david],
    # 输入消息,可以是 None
    msg=None,
    enable_gather=False,
)

类式写法:

from agentscope.pipeline import FanoutPipeline

# 创建可复用的管道对象
pipeline = FanoutPipeline(agents=[alice, bob, charlie, david])

# 多次调用,换不同的输入
msgs = await pipeline(msg=None)
msgs2 = await pipeline(msg=Msg("user", "你对AI发展怎么看?", "user"))

enable_gather 参数控制扇出管道的执行模式:

  • enable_gather=True (默认): 使用 asyncio.gather() 并发 执行所有智能体。这为 I/O 密集型操作(如 API 调用)提供更好的性能,因为智能体并行运行。
  • enable_gather=False: 逐个 顺序 执行智能体。当你需要确定性的执行顺序或想要避免并发请求压垮外部服务时,这很有用。

选择并发执行以获得更好的性能,或选择顺序执行以获得可预测的顺序和资源控制。

2.3 流式获取打印消息

stream_printing_messages 函数将智能体在回复过程中调用 self.print 打印的消息转换为一个异步生成器。 可以帮助开发者快速以流式方式获取智能体的中间消息。

该函数接受一个或多个智能体和一个协程任务作为输入,并返回一个异步生成器。 该异步生成器返回一个二元组,包含执行协程任务过程中通过 await self.print(...) 打印的消息,以及一个布尔值,表示该消息是否为一组流式消息中的最后一个。

需要注意的是,生成器返回的元组中,布尔值表示该消息是否为一组流式消息中的最后一个,而非此次智能体调用的最后一条消息。

async def run_example_pipeline() -> None:
    """运行流式打印消息的示例。"""
    agent = create_agent("Alice", 20, "student")

    # 我们关闭agent的终端打印以避免输出混乱
    agent.set_console_output_enabled(False)

    async for msg, last in stream_printing_messages(
        agents=[agent],
        coroutine_task=agent(
            Msg("user", "你好,你是谁?", "user"),
        ),
    ):
        print(msg, last)
        if last:
            print()


asyncio.run(run_example_pipeline())

0 条评论

发表评论

暂无评论,欢迎发表您的观点!