AgentScope
AgentScope 是阿里巴巴通义实验室开源的企业级多智能体开发框架,核心理念是 "build and run agents you can see, understand and trust"(构建你看得见、听得懂、信得过的智能体)。目前已在 GitHub 收获超过 18k Star,提供从研发到生产的完整工程体系,让 AI Agent 开发从 "能跑" 到 "能用"。
官方文档:AgentScope
1. 核心概念
1.1 消息
消息(message)是 AgentScope 最核心的数据结构,用于
- 在智能体之间交换信息,
- 在用户交互界面显示信息,
- 在记忆中存储信息,
- 作为 AgentScope 与不同 LLM API 之间的统一媒介。
一个消息由四个字段组成:
| 字段 | 类型 | 描述 |
|---|---|---|
| name | str |
消息发送者的名称/身份 |
| role | Literal["system","assistant","user"] |
消息发送者的角色,必须是 "system"、"assistant" 或 "user" 之一。 |
| content | str | list[ContentBlock] |
消息包含的数据,可以是字符串或 block 的列表。 |
| metadata | dict[str, JSONSerializableObject] | None |
包含额外元数据的字典,通常用于结构化输出。 |
1.1.1文本消息
content字段中支持不同的block数据结构,直接输入文本属于文本消息。例如:
from agentscope.message import Msg
msg = Msg(
name="游戏主持人",
role="system",
content="游戏开始!",
)
1.1.2多模态消息
message 类通过提供不同的 block 结构来支持多模态内容:
| 类 | 描述 |
|---|---|
| TextBlock | 纯文本数据 |
| ImageBlock | 图像数据 |
| AudioBlock | 音频数据 |
| VideoBlock | 视频数据 |
ImageBlock、AudioBlock 和 VideoBlock,还可以使用 base64 编码的字符串作为数据源(Source)。
from agentscope.message import (
Msg,
Base64Source,
TextBlock,
ThinkingBlock,
ImageBlock,
AudioBlock,
VideoBlock,
ToolUseBlock,
ToolResultBlock,
)
msg = Msg(
name="Jarvis",
role="assistant",
content=[
TextBlock(
type="text",
text="这是一个包含 base64 编码数据的多模态消息。",
),
ImageBlock(
type="image",
source=Base64Source(
type="base64",
media_type="image/jpeg",
data="/9j/4AAQSkZ...",
),
),
AudioBlock(
type="audio",
source=Base64Source(
type="base64",
media_type="audio/mpeg",
data="SUQzBAAAAA...",
),
),
VideoBlock(
type="video",
source=Base64Source(
type="base64",
media_type="video/mp4",
data="AAAAIGZ0eX...",
),
),
],
)
详细介绍见下方:
(1)TextBlock
示例:
TextBlock(
type="text",
text="Hello, world!"
)
(2)ImageBlock
示例:
# 普通示例
ImageBlock(
type="image",
source=URLSource(
type="url",
url="https://example.com/image.jpg"
)
)
# base64示例
ImageBlock(
type="image",
source=Base64Source(
type="base64",
media_type="image/jpeg",
data="/9j/4AAQSkZ...",
),
)
(3)AudioBlock
示例:
# 普通示例
AudioBlock(
type="audio",
source=URLSource(
type="url",
url="https://example.com/audio.mp3"
)
)
# base64示例
AudioBlock(
type="audio",
source=Base64Source(
type="base64",
media_type="audio/mpeg",
data="SUQzBAAAAA...",
),
)
(4)VideoBlock
示例:
VideoBlock(
type="video",
source=URLSource(
type="url",
url="https://example.com/video.mp4"
)
)
1.1.3推理消息
ThinkingBlock 用于支持推理模型,包含模型的思考过程。
msg_thinking = Msg(
name="Jarvis",
role="assistant",
content=[
ThinkingBlock(
type="thinking",
thinking="我正在为 AgentScope 构建一个思考块的示例。",
),
TextBlock(
type="text",
text="这是一个思考块的示例。",
),
],
)
1.1.4工具使用和工具结果消息
ToolUseBlock 和 ToolResultBlock 用于支持工具 API:
msg_tool_call = Msg(
name="Jarvis",
role="assistant",
content=[
ToolUseBlock(
type="tool_use",
id="343",
name="get_weather",
input={
"location": "Beijing",
},
),
],
)
msg_tool_res = Msg(
name="system",
role="system",
content=[
ToolResultBlock(
type="tool_result",
id="343",
name="get_weather",
output="北京的天气是晴天,温度为 25°C。",
),
],
)
AgentScope 中,通常使用 role 为“system”的消息来记录工具函数的执行结果。
1.1.5 序列化与反序列化
消息对象可以分别通过 to_dict 和 from_dict 方法进行序列化和反序列化。
序列化:
serialized_msg = msg.to_dict()
print(type(serialized_msg))
print(json.dumps(serialized_msg, indent=4, ensure_ascii=False))
输出:
<class 'dict'>
{
"id": "gH8zDp9YSfLSKyGppA2QkL",
"name": "Jarvis",
"role": "assistant",
"content": [
{
"type": "text",
"text": "这是一个包含 base64 编码数据的多模态消息。"
},
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": "/9j/4AAQSkZ..."
}
},
{
"type": "audio",
"source": {
"type": "base64",
"media_type": "audio/mpeg",
"data": "SUQzBAAAAA..."
}
},
{
"type": "video",
"source": {
"type": "base64",
"media_type": "video/mp4",
"data": "AAAAIGZ0eX..."
}
}
],
"metadata": {},
"timestamp": "2026-03-12 03:27:29.431"
}
反序列化:
new_msg = Msg.from_dict(serialized_msg)
print(type(new_msg))
print(f'消息的发送者: "{new_msg.name}"')
print(f'发送者的角色: "{new_msg.role}"')
print(f'消息的内容: "{json.dumps(new_msg.content, indent=4, ensure_ascii=False)}"')
输出:
<class 'agentscope.message._message_base.Msg'>
消息的发送者: "Jarvis"
发送者的角色: "assistant"
消息的内容: "[
{
"type": "text",
"text": "这是一个包含 base64 编码数据的多模态消息。"
},
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": "/9j/4AAQSkZ..."
}
},
{
"type": "audio",
"source": {
"type": "base64",
"media_type": "audio/mpeg",
"data": "SUQzBAAAAA..."
}
},
{
"type": "video",
"source": {
"type": "base64",
"media_type": "video/mp4",
"data": "AAAAIGZ0eX..."
}
}
]"
1.1.6Msg函数
| 函数 | 参数 | 描述 |
|---|---|---|
get_text_content |
将所有 TextBlock 中的内容收集到单个字符串中(用 "\n" 分隔)。 |
|
get_content_blocks |
block_type |
返回指定类型的内容块列表。如果未提供 block_type,则以块格式返回全部内容。 |
has_content_blocks |
block_type |
检查消息是否具有指定类型的内容块。str 内容会被视为 TextBlock 类型。 |
1.2 工具
1.2.1工具函数
工具函数应该返回一个 ToolResponse 对象或产生 ToolResponse 对象的生成器,并且需要具有描述工具功能和参数的文档字符串注释
def tool_function(a: int, b: str) -> ToolResponse:
"""{函数描述}
Args:
a (int):
{第一个参数的描述}
b (str):
{第二个参数的描述}
"""
实例方法和类方法也可以用作工具函数,Toolkit 中将自动忽略 self 和 cls 参数。
agentscope.tool 模块下提供了几个内置工具函数,如 execute_python_code、execute_shell_command等。完整的内置工具函数如下表格:
| 函数名 | 核心功能 | 典型使用场景 |
|---|---|---|
| 代码 / 命令执行类 | ||
| execute_python_code | 安全沙箱中执行 Python 代码片段,返回执行结果(输出 / 异常) | 智能体需要做数据计算、逻辑验证、简单脚本运行(如批量处理数据) |
| execute_shell_command | 执行系统 Shell 命令(支持 Windows/Linux/macOS),返回命令输出 / 错误 | 智能体需要操作系统(如查看文件列表、启动服务、安装依赖、读取系统信息) |
| 文件操作类 | ||
| view_text_file | 读取文本文件内容(支持指定编码、读取指定行数) | 智能体需要查看配置文件、日志文件、代码文件、文档内容 |
| write_text_file | 覆盖式写入文本文件(若文件不存在则创建,支持指定编码) | 智能体生成报告、保存配置、创建新的文本文件(如生成 Markdown 文档) |
| insert_text_file | 向文本文件指定位置插入内容(不覆盖原有内容,支持行号定位) | 智能体补充文档内容、修改配置文件(如在代码文件中插入函数、在日志中追加记录) |
| 通义千问(DashScope)多媒体类 | ||
| dashscope_text_to_image | 调用通义千问文生图 API,将文本描述转为图片(支持指定尺寸、风格) | 智能体根据文字生成插画、海报、示意图(如 “生成一张科技感的 AI 智能体流程图”) |
| dashscope_text_to_audio | 调用通义千问文本转语音 API,将文字转为语音音频(支持指定音色、语速) | 智能体生成语音播报、有声读物、语音提示(如将生成的报告转为语音) |
| dashscope_image_to_text | 调用通义千问图生文 API,解析图片内容(OCR + 语义描述) | 智能体识别图片中的文字、描述图片内容、分析图片中的图表数据 |
| OpenAI 多媒体类 | ||
| openai_text_to_image | 调用 OpenAI DALL・E API,文本生成图片(支持分辨率、风格、数量) | 智能体生成创意图片、产品原型图、场景示意图 |
| openai_text_to_audio | 调用 OpenAI TTS API,文本转语音(支持不同语音模型、语速、音量) | 生成自然度更高的英文语音、多语种语音播报 |
| openai_edit_image | 调用 OpenAI Image Edit API,编辑图片(基于蒙版 + 文本指令修改指定区域) | 智能体修改图片局部内容(如 “把图片中的红色按钮改成蓝色”) |
| openai_create_image_variation | 调用 OpenAI Image Variation API,生成图片的变体(保留风格,修改细节) | 智能体对现有图片做风格微调、生成多张相似但不同的图片 |
| openai_image_to_text | 调用 OpenAI GPT-4V API,解析图片内容(复杂场景理解、多模态分析) | 智能体分析图片中的复杂图表、识别手写公式、理解图片中的场景逻辑 |
| openai_audio_to_text | 调用 OpenAI Whisper API,语音转文字(支持多语种、实时转录、识别说话人) | 智能体转录语音文件、实时解析语音对话、生成音频字幕 |
1.2.2工具模块
Toolkit 类设计用于管理工具函数,从文档字符串中提取它们的 JSON Schema,并为工具执行提供统一接口。
Toolkit 类的基本功能是注册工具函数并执行它们。
# 准备一个自定义工具函数
async def my_search(query: str, api_key: str) -> ToolResponse:
"""一个简单的示例工具函数。
Args:
query (str):
搜索查询。
api_key (str):
用于身份验证的 API 密钥。
"""
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"正在使用 API 密钥 '{api_key}' 搜索 '{query}'",
),
],
)
# 在工具模块中注册工具函数
toolkit = Toolkit()
toolkit.register_tool_function(my_search)
注册工具函数后,可以通过调用 get_json_schemas 方法获取其 JSON Schema。
print("工具 JSON Schemas:")
print(json.dumps(toolkit.get_json_schemas(), indent=4, ensure_ascii=False))
输出:
工具 JSON Schemas:
[
{
"type": "function",
"function": {
"name": "my_search",
"parameters": {
"properties": {
"query": {
"description": "搜索查询。",
"type": "string"
},
"api_key": {
"description": "用于身份验证的 API 密钥。",
"type": "string"
}
},
"required": [
"query",
"api_key"
],
"type": "object"
},
"description": "一个简单的示例工具函数。"
}
}
]
1.3智能体
在 AgentScope 中,智能体(Agent)行为被抽象为 AgentBase 类中的三个核心函数:
reply:处理传入的消息并生成响应消息。observe:接收来自环境或其它智能体的消息,但不返回响应。print:将消息显示到目标输出(例如终端、Web 界面)。
为了支持用户实时介入(Realtime Steering),AgentScope 提供了额外 的 handle_interrupt 函数来处理智能体回复过程中的用户中断。
此外,ReAct 智能体是 AgentScope 中最重要的智能体,该智能体的回复过程分为两个阶段:
- 推理(Reasoning):通过调用 LLM 进行推理并生成工具调用
- 行动(Acting):执行工具函数。
因此, ReActAgentBase 类中提供了两个额外的核心函数,_reasoning 和 _acting。
1.3.1 ReAct智能体
AgentScope 在 agentscope.agent 模块下提供了开箱即用的 ReAct 智能体 ReActAgent 供开发者使用。
基础功能:
支持围绕 reply、observe、print、_reasoning 和 _acting 的 钩子函数(hooks)支持结构化输出
实时介入(Realtime Steering):
支持用户 中断支持自定义 中断处理,基于 asyncio 取消机制实现了该功能。中断逻辑已在 AgentBase 类中作为基本功能实现。
工具:
支持 同步/异步 工具函数支持 流式 工具响应支持 状态化 的工具管理支持 并行 工具调用支持 MCP 服务器
记忆:
支持智能体 自主管理 长期记忆支持“静态”的长期记忆管理
ReActAgent类参数:
| 参数 | 描述 |
|---|---|
name (必需) |
智能体的名称 |
sys_prompt (必需) |
智能体的系统提示 |
model (必需) |
智能体用于生成响应的模型 |
formatter (必需) |
提示构建策略,应与使用的模型保持一致 |
toolkit |
用于注册/调用工具函数的工具模块 |
memory |
用于存储对话历史的短期记忆 |
long_term_memory |
长期记忆 |
long_term_memory_mode |
长期记忆的管理模式:agent_control: 允许智能体通过工具函数自己控制长期记忆static_control: 在每次回复(reply)的开始/结束时,会自动从长期记忆中检索/记录信息both: 同时激活上述两种模式 |
enable_meta_tool |
是否启用元工具(Meta tool),即允许智能体自主管理工具函数 |
parallel_tool_calls |
是否允许并行工具调用 |
max_iters |
智能体生成响应的最大迭代次数 |
plan_notebook |
计划模块,允许智能体制定和管理计划与子任务 |
print_hint_msg |
是否在终端打印 plan_notebook 生成的提示消息 |
对于formatter参数:
在 AgentScope 中,有两种类型的格式化器:"ChatFormatter" 和 "MultiAgentFormatter"。
主要负责:将 Msg 对象转换为不同 LLM API 要求的格式,(可选)截断消息以适应 max_token 的限制,(可选)执行提示工程,例如对长对话进行总结。
(1)ChatFormatter:专为标准的用户-助手场景(聊天机器人)设计,使用 role 字段来识别用户和助手。
(2)MultiAgentFormatter:专为多智能体场景设计,使用 name 字段来识别不同的实体,在格式化的过程中会将多智能体的对话历史合并为单个消息。
| API 提供商 | 用户-助手场景 | 多智能体场景 |
|---|---|---|
| OpenAI | OpenAIChatFormatter |
OpenAIMultiAgentFormatter |
| Anthropic | AnthropicChatFormatter |
AnthropicMultiAgentFormatter |
| DashScope | DashScopeChatFormatter |
DashScopeMultiAgentFormatter |
| Gemini | GeminiChatFormatter |
GeminiChatFormatter |
| Ollama | OllamaChatFormatter |
OllamaMultiAgentFormatter |
| DeepSeek | DeepSeekChatFormatter |
DeepSeekMultiAgentFormatter |
| vLLM | OpenAIFormatter |
OpenAIFormatter |
以 DashScope API 为例,创建一个智能体对象:
import asyncio
import os
from agentscope.agent import ReActAgent
from agentscope.formatter import DashScopeChatFormatter
from agentscope.memory import InMemoryMemory
from agentscope.message import Msg
from agentscope.model import DashScopeChatModel
from agentscope.tool import Toolkit, execute_python_code
async def creating_react_agent() -> None:
"""创建一个 ReAct 智能体并运行一个简单任务。"""
# 准备工具
toolkit = Toolkit()
toolkit.register_tool_function(execute_python_code) # 内置工具函数execute_python_code
jarvis = ReActAgent(
name="Jarvis",
sys_prompt="你是一个名为 Jarvis 的助手",
model=DashScopeChatModel(
model_name="qwen-max",
api_key=os.environ["DASHSCOPE_API_KEY"],
stream=True,
enable_thinking=False,
),
formatter=DashScopeChatFormatter(),
toolkit=toolkit,
memory=InMemoryMemory(),
)
msg = Msg(
name="user",
content="你好!Jarvis,用 Python 运行 Hello World。",
role="user",
)
await jarvis(msg)
asyncio.run(creating_react_agent())
1.3.2 自定义智能体
AgentScope 提供了两个基类:AgentBase 和 ReActAgentBase,它们在抽象方法和支持的钩子函数方面有所不同。 具体来说,ReActAgentBase 扩展了 AgentBase,增加了额外的 _reasoning 和 _acting 抽象方法,以及它们的前置和后置钩子函数。开发者可以根据需要选择继承这两个基类中的任一个。
| 类 | 抽象方法 | 支持的钩子函数 | 描述 |
|---|---|---|---|
AgentBase |
replyobserveprinthandle_interrupt |
pre_/post_replypre_/post_observepre_/post_print | 所有智能体的基类,提供基本接口和钩子。 |
ReActAgentBase |
replyobserveprinthandle_interrupt_reasoning_acting |
pre_/post_replypre_/post_observepre_/post_printpre_/post_reasoningpre_/post_acting | ReAct 类智能体的抽象类,扩展了 AgentBase,增加了 _reasoning 和 _acting 抽象方法及其钩子。 |
ReActAgent |
- | pre_/post_replypre_/post_observepre_/post_printpre_/post_reasoningpre_/post_acting | ReActAgentBase 的实现 |
UserAgent |
代表用户的特殊智能体,用于与智能体交互 | ||
A2aAgent |
- | pre_/post_replypre_/post_observepre_/post_print | 用于与远程 A2A 代理通信的智能体 |
1.3.3 钩子函数
钩子(Hook)是 AgentScope 中的扩展点,允许开发者在特定位置自定义智能体行为,提供了一种灵活的方式来修改或扩展智能体的功能,而无需更改其核心实现。
AgentScope 中支持的钩子类型:
AgentBase自带 3 组、共 6 个钩子点位,ReActAgentBase额外多 2 组、4 个专属钩子点位。
| 智能体类 | 核心函数 | 钩子类型 | 描述 |
|---|---|---|---|
AgentBase 及其子类 |
reply |
pre_replypost_reply |
智能体回复消息前/后的钩子 |
print |
pre_printpost_print |
向目标输出(如终端、Web 界面)打印消息前/后的钩子 | |
observe |
pre_observe``post_observe |
从环境或其它智能体观察消息前/后的钩子 | |
ReActAgentBase 及其子类 |
replyprintobserve |
pre_replypost_replypre_printpost_printpre_observepost_observe |
|
_reasoning |
pre_reasoningpost_reasoning |
智能体推理过程前/后的钩子 | |
_acting |
pre_actingpost_acting |
智能体行动过程前/后的钩子 |
(1)前置钩子
所有pre_开头的前置钩子,入参、返回值规则完全一致,核心作用是修改核心函数的输入参数。
def 前置钩子名(self, kwargs: dict[str, Any]) -> dict[str, Any] | None:
# 你的自定义逻辑
pass
| 部分 | 说明 |
|---|---|
| self | 就是当前的智能体实例本身,你可以通过 self 拿到智能体的所有属性(比如记忆、名称、模型配置) |
| kwargs | 核心函数的所有入参,都会被打包成这个字典。比如 reply 函数的入参是 msg,那 kwargs 里就有 "msg" 这个 key,直接取 kwargs ["msg"] 就能拿到消息对象 |
| 返回值 | 两种选择:1. 想修改输入:返回修改后的 kwargs 字典,会传给下一个钩子 / 核心函数2. 不想修改:直接返回 None,系统会沿用之前的参数 |
示例:
def pre_reply_filter(self, kwargs: dict[str, Any]) -> dict[str, Any] | None:
# 拿到用户输入的消息
user_msg = kwargs["msg"]
# 自定义逻辑:敏感词替换
user_msg.content = user_msg.content.replace("脏话", "**")
# 返回修改后的参数
return {**kwargs, "msg": user_msg}
(2)后置钩子
所有post_开头的后置钩子,规则统一,核心作用是修改核心函数的输出结果。
def 后置钩子名(self, kwargs: dict[str, Any], output: Any) -> Any | None:
# 你的自定义逻辑
pass
| 部分 | 说明 |
|---|---|
| self | 同上,智能体实例本身 |
| kwargs | 核心函数的原始入参字典(和前置钩子的入参一致,仅做参考,修改它不会影响已经执行完的核心函数) |
| output | 核心函数执行完的原始输出结果,比如 reply 函数返回的 Msg 对象,就是这里的 output |
| 返回值 | 两种选择:1. 想修改输出:返回修改后的 output,会传给下一个钩子 / 最终返回2. 不想修改:直接返回 None,系统会沿用之前的输出 |
示例:
def post_reply_add_disclaimer(self, kwargs: dict[str, Any], output: Any) -> Any | None:
# 拿到智能体的原始回复
reply_msg = output
# 加免责声明
reply_msg.content += "\n\n以上内容仅为AI生成,不构成任何建议"
# 返回修改后的输出
return reply_msg
(3)钩子管理
实例级和类级钩子两者的核心区别是钩子的生效范围不一样
| 类型 | 生效范围 | 适用场景 |
|---|---|---|
| 实例级钩子 | 仅对你注册的这一个智能体对象生效,同个类的其他实例完全不受影响 | 给某个特定智能体加专属逻辑,比如只给客服智能体加敏感词过滤 |
| 类级钩子 | 对这个类的所有实例生效,包括已经创建的、还没创建的所有对象 | 给全量智能体加通用功能,比如给所有智能体加请求日志、耗时统计 |
当同一个点位,既有实例钩子、又有类钩子、还有多个钩子时,执行顺序是固定的:
调用核心函数(比如reply)
↓
【前置钩子阶段】先执行实例级前置钩子(按注册顺序,先注册先执行)
↓
【前置钩子阶段】再执行类级前置钩子(按注册顺序,先注册先执行)
↓
执行核心函数本身的逻辑
↓
【后置钩子阶段】先执行实例级后置钩子(按注册顺序,先注册先执行)
↓
【后置钩子阶段】再执行类级后置钩子(按注册顺序,先注册先执行)
↓
返回最终结果

同一个点位可以注册多个钩子,它们会链式传递结果:
- 前置钩子:前一个钩子返回的非 None 的 kwargs,会直接传给下一个钩子当入参;如果返回 None,下一个钩子就用最近一次非 None 的结果。
- 后置钩子:前一个钩子返回的非 None 的 output,会直接传给下一个钩子当 output 参数;如果返回 None,下一个钩子就用最近一次非 None 的结果。
不要在钩子内调用核心函数(reply/speak/observe/_reasoning/_acting)以避免循环调用。
AgentScope 中的内置钩子管理方法:
| 级别 | 方法 | 描述 |
|---|---|---|
| 实例级 | register_instance_hook |
为当前对象注册具有给定钩子类型和名称的钩子。 |
remove_instance_hook |
移除当前对象具有给定钩子类型和名称的钩子。 | |
clear_instance_hooks |
清除当前对象具有给定钩子类型的所有钩子。 | |
| 类级 | register_class_hook |
为该类的所有对象注册具有给定钩子类型和名称的钩子。 |
remove_class_hook |
移除该类所有对象具有给定钩子类型和名称的钩子。 | |
clear_class_hooks |
清除该类所有对象具有给定钩子类型的所有钩子。 |
1.4 记忆
AgentScope 中的记忆模块负责:
- 存储消息对象(
Msg) - 利用标记(mark)管理消息
标记 是与记忆中每条消息关联的字符串标签,可用于根据消息的上下文或目的对消息进行分类、过滤和检索。 可用于实现进阶的记忆管理功能,例如在 ReActAgent 类中,使用hint标签标记一次性的提示消息, 以便在使用完成后将其从记忆中删除。
AgentScope 中的内置记忆类:
| 类 | 描述 |
|---|---|
InMemoryMemory |
简单的内存记忆存储实现。 |
AsyncSQLAlchemyMemory |
基于异步 SQLAlchemy 的记忆存储实现,支持如 SQLite、PostgreSQL、MySQL 等多种关系数据库。 |
RedisMemory |
基于 Redis 的记忆存储实现。 |
1.4.1 内存记忆
async def in_memory_example():
"""使用InMemoryMemory在内存中存储消息的示例。"""
memory = InMemoryMemory()
await memory.add(
Msg("Alice", "生成一份关于AgentScope的报告", "user"),
)
# 添加一条带有标记"hint"的提示消息
await memory.add(
[
Msg(
"system",
"<system-hint>首先创建一个计划来收集信息,然后逐步生成报告。</system-hint>",
"system",
),
],
marks="hint", # 标记一次性提示消息
)
msgs = await memory.get_memory(mark="hint")
print("带有标记'hint'的消息:")
for msg in msgs:
print(f"- {msg}")
# 所有存储的消息都可以通过 ``state_dict`` 和 ``load_state_dict`` 方法导出和加载。
state = memory.state_dict()
print("记忆的状态字典:")
print(json.dumps(state, indent=2, ensure_ascii=False))
# 通过标记删除消息
deleted_count = await memory.delete_by_mark("hint")
print(f"删除了 {deleted_count} 条带有标记'hint'的消息。")
print("删除后的记忆状态字典:")
state = memory.state_dict()
print(json.dumps(state, indent=2, ensure_ascii=False))
asyncio.run(in_memory_example())
输出:
带有标记'hint'的消息:
- Msg(id='XqLgHiuTEPEYtN3pM5EUDV', name='system', content='<system-hint>首先创建一个计划来收集信息,然后逐步生成报告。</system-hint>', role='system', metadata={}, timestamp='2026-03-12 03:30:48.641', invocation_id='None')
记忆的状态字典:
{
"_compressed_summary": "",
"content": [
[
{
"id": "Agmg28QmfUuyqBcc2sMucP",
"name": "Alice",
"role": "user",
"content": "生成一份关于AgentScope的报告",
"metadata": {},
"timestamp": "2026-03-12 03:30:48.641"
},
[]
],
[
{
"id": "XqLgHiuTEPEYtN3pM5EUDV",
"name": "system",
"role": "system",
"content": "<system-hint>首先创建一个计划来收集信息,然后逐步生成报告。</system-hint>",
"metadata": {},
"timestamp": "2026-03-12 03:30:48.641"
},
[
"hint"
]
]
]
}
删除了 1 条带有标记'hint'的消息。
删除后的记忆状态字典:
{
"_compressed_summary": "",
"content": [
[
{
"id": "Agmg28QmfUuyqBcc2sMucP",
"name": "Alice",
"role": "user",
"content": "生成一份关于AgentScope的报告",
"metadata": {},
"timestamp": "2026-03-12 03:30:48.641"
},
[]
]
]
}
1.4.2 关系数据库记忆
AgentScope 通过 SQLAlchemy 提供统一的接口来使用关系数据库,支持:
- 多种数据库,如 SQLite、PostgreSQL、MySQL 等
- 用户和会话管理
- 生产环境中的连接池
以SQLite支持的记忆为例:
async def sqlalchemy_example() -> None:
"""使用 AsyncSQLAlchemyMemory 在 SQLite 数据库中存储消息的示例。"""
# 首先创建一个异步 SQLAlchemy 引擎
engine = create_async_engine("sqlite+aiosqlite:///./test_memory.db")
# 然后使用该引擎创建记忆
memory = AsyncSQLAlchemyMemory(
engine_or_session=engine,
# 可选传入指定user_id和session_id
user_id="user_1",
session_id="session_1",
)
await memory.add(
Msg("Alice", "生成一份关于AgentScope的报告", "user"),
)
await memory.add(
[
Msg(
"system",
"<system-hint>首先创建一个计划来收集信息,然后逐步生成报告。</system-hint>",
"system",
),
],
marks="hint",
)
msgs = await memory.get_memory(mark="hint")
print("带有标记'hint'的消息:")
for msg in msgs:
print(f"- {msg}")
# 完成后关闭引擎
await memory.close()
asyncio.run(sqlalchemy_example())
输出:
带有标记'hint'的消息:
- Msg(id='BWboEGSoABPMkkseBtoLZD', name='system', content='<system-hint>首先创建一个计划来收集信息,然后逐步生成报告。</system-hint>', role='system', metadata={}, timestamp='2026-03-12 03:30:48.683', invocation_id='None')
1.4.3 NoSQL数据库记忆
示例:
async def redis_memory_example() -> None:
"""使用 RedisMemory 在 Redis 中存储消息的示例。"""
# 使用fakeredis进行内存测试,无需真实的 Redis 服务器
fake_redis = fakeredis.aioredis.FakeRedis(decode_responses=True)
# 创建 Redis 记忆
memory = RedisMemory(
# 使用fake redis进行演示
connection_pool=fake_redis.connection_pool,
# 也可以通过指定主机和端口连接到真实的Redis服务器
# host="localhost",
# port=6379,
# 可选地指定 user_id 和 session_id
user_id="user_1",
session_id="session_1",
)
# 向记忆中添加消息
await memory.add(
Msg(
"Alice",
"生成一份关于AgentScope的报告",
"user",
),
)
# 添加一条带有标记"hint"的提示消息
await memory.add(
Msg(
"system",
"<system-hint>首先创建一个计划来收集信息,然后逐步生成报告。</system-hint>",
"system",
),
marks="hint",
)
# 检索带有标记"hint"的消息
msgs = await memory.get_memory(mark="hint")
print("带有标记'hint'的消息:")
for msg in msgs:
print(f"- {msg}")
asyncio.run(redis_memory_example())
输出:
带有标记'hint'的消息:
- Msg(id='ZkPMW3LtzGnKEq6LEBTDfK', name='system', content='<system-hint>首先创建一个计划来收集信息,然后逐步生成报告。</system-hint>', role='system', metadata={}, timestamp='2026-03-12 03:30:48.710', invocation_id='None')
1.4.4 自定义记忆
要自定义您自己的记忆实现,只需从 MemoryBase 继承并实现以下方法:
| 方法 | 描述 |
|---|---|
add |
向记忆中添加 Msg 对象 |
delete |
从记忆中删除 Msg 对象 |
delete_by_mark |
通过标记从记忆中删除 Msg 对象 |
size |
记忆的大小 |
clear |
清空记忆内容 |
get_memory |
以 Msg 对象列表的形式获取记忆内容 |
update_messages_mark |
更新记忆中消息的标记 |
state_dict |
获取记忆的状态字典 |
load_state_dict |
加载记忆的状态字典 |
2. 管道
当用多个 AI 智能体做项目时,一定会遇到这些麻烦:
- 做群聊场景:A 说的话,要手动发给 B、C、D,智能体越多代码越乱
- 做工作流:要让「策划→文案→设计→开发」按顺序干活,前一个的结果传给后一个,得一行行写重复的 await
- 做专家评审:同一个问题要问多个 AI,得自己写循环、处理并发 / 串行、拷贝消息
- 做流式展示:想实时拿到大模型逐字输出的内容,不知道怎么捕获
而 agentscope.pipeline 模块,就是专门解决这些问题的语法糖工具集,核心包含 4 大组件:
- MsgHub:多智能体消息自动广播中心
- 顺序管道:按固定顺序串联执行智能体
- 扇出管道:同一份输入分发给多个智能体,批量执行
- 流式消息工具:实时捕获智能体的流式输出内容
2.1 MsgHub
MsgHub,多智能体消息自动广播中心, MsgHub 类是一个 异步上下文管理器,它接收一个智能体列表作为其参与者。 当一个参与者生成回复消息时,将通过调用所有其他参与者的 observe 方法广播该消息。 这意味着在 MsgHub 上下文中,开发者无需手动将回复消息从一个智能体发送到另一个智能体。
示例:
- 先创建 4 个智能体:Alice、Bob、Charlie、David
- 把前 3 个放进 MsgHub 里,让它们自我介绍
# 让Alice、Bob、Charlie自我介绍
async def example_broadcast_message():
"""使用 MsgHub 广播消息的示例。"""
# 创建消息中心
async with MsgHub(
participants=[alice, bob, charlie],
announcement=Msg(
"user",
"现在请简要介绍一下自己,包括你的姓名、年龄和职业。",
"user",
),
) as hub:
# 无需手动消息传递的群聊
await alice()
await bob()
await charlie()
asyncio.run(example_broadcast_message())
# 询问Alice之外的其他人是否知道Alice信息
async def check_broadcast_message():
"""检查消息是否正确广播。"""
user_msg = Msg(
"user",
"你知道 Alice 是谁吗,她是做什么的?",
"user",
)
await bob(user_msg)
await charlie(user_msg)
await david(user_msg)
asyncio.run(check_broadcast_message())
结果:
Bob 和 Charlie 在 MsgHub 里,收到了 Alice 的自我介绍,后续问它们,能说出 Alice 的信息
David 没在 MsgHub 里,完全不知道 Alice 是谁,完美体现了广播的范围控制
2.2 动态管理
MsgHub 支持在运行中修改参与的智能体:
hub.add(xxx):添加新的智能体(新加入的智能体收不到之前的历史消息)hub.delete(xxx):移除智能体,之后不会再收到广播hub.broadcast(消息):手动给所有当前参与者发一条消息
只有在async with MsgHub(...)代码块里的消息,才会被广播,块外面的对话不会自动同步。
async with MsgHub(participants=[alice]) as hub:
# 添加新参与者
hub.add(david)
# 移除参与者
hub.delete(alice)
# 向所有当前参与者广播
await hub.broadcast(
Msg("system", "现在我们开始...", "system"),
)
2.3 顺序管道
顺序管道(Sequential Pipeline)会让智能体列表顺序逐个执行,前一个智能体的输出,自动作为下一个智能体的输入,一行代码就能搞定整个线性流程。
函数式写法:
from agentscope.pipeline import sequential_pipeline
msg = await sequential_pipeline(
# 按顺序执行的智能体列表
agents=[alice, bob, charlie, david],
# 第一个输入消息,可以是 None
msg=None
)
类式写法:
from agentscope.pipeline import SequentialPipeline
# 先创建管道对象,定义好执行顺序
pipeline = SequentialPipeline(agents=[alice, bob, charlie, david])
# 第一次调用,初始输入为None
msg = await pipeline(msg=None)
# 换个输入,复用同一个管道,不用重新定义顺序
msg2 = await pipeline(msg=Msg("user", "你好!", "user"))
2.4 扇出管道
扇出管道(Fanout Pipeline)会把同一份输入消息,分发给列表里的所有智能体,自动收集所有智能体的回复结果,一行代码搞定批量执行。
函数式写法:
from agentscope.pipeline import fanout_pipeline
msgs = await fanout_pipeline(
# 要执行的智能体列表
agents=[alice, bob, charlie, david],
# 输入消息,可以是 None
msg=None,
enable_gather=False,
)
类式写法:
from agentscope.pipeline import FanoutPipeline
# 创建可复用的管道对象
pipeline = FanoutPipeline(agents=[alice, bob, charlie, david])
# 多次调用,换不同的输入
msgs = await pipeline(msg=None)
msgs2 = await pipeline(msg=Msg("user", "你对AI发展怎么看?", "user"))
enable_gather 参数控制扇出管道的执行模式:
enable_gather=True(默认): 使用asyncio.gather()并发 执行所有智能体。这为 I/O 密集型操作(如 API 调用)提供更好的性能,因为智能体并行运行。enable_gather=False: 逐个 顺序 执行智能体。当你需要确定性的执行顺序或想要避免并发请求压垮外部服务时,这很有用。
选择并发执行以获得更好的性能,或选择顺序执行以获得可预测的顺序和资源控制。
2.3 流式获取打印消息
stream_printing_messages 函数将智能体在回复过程中调用 self.print 打印的消息转换为一个异步生成器。 可以帮助开发者快速以流式方式获取智能体的中间消息。
该函数接受一个或多个智能体和一个协程任务作为输入,并返回一个异步生成器。 该异步生成器返回一个二元组,包含执行协程任务过程中通过 await self.print(...) 打印的消息,以及一个布尔值,表示该消息是否为一组流式消息中的最后一个。
需要注意的是,生成器返回的元组中,布尔值表示该消息是否为一组流式消息中的最后一个,而非此次智能体调用的最后一条消息。
async def run_example_pipeline() -> None:
"""运行流式打印消息的示例。"""
agent = create_agent("Alice", 20, "student")
# 我们关闭agent的终端打印以避免输出混乱
agent.set_console_output_enabled(False)
async for msg, last in stream_printing_messages(
agents=[agent],
coroutine_task=agent(
Msg("user", "你好,你是谁?", "user"),
),
):
print(msg, last)
if last:
print()
asyncio.run(run_example_pipeline())
发表评论