项目结构.
├── main.py
├── pyproject.toml
└── src
├── __init__.py
├── my_multi_agent_project.egg-info
│ ├── dependency_links.txt
│ ├── PKG-INFO
│ ├── SOURCES.txt
│ └── top_level.txt
├── __pycache__
│ ├── __init__.cpython-310.pyc
│ ├── tools.cpython-310.pyc
│ ├── web_search_agent.cpython-310.pyc
│ ├── workflow.cpython-310.pyc
│ └── zhipu_client.cpython-310.pyc
├── tools.py
├── web_search_agent.py
├── workflow.py
└── zhipu_client.py
关键代码:
# /main.py
# 1. 确保在所有其他导入之前加载环境变量
from dotenv import load_dotenv
load_dotenv()
import asyncio
from src.workflow import build_search_workflow
# 我们不再需要从 agent_framework 导入 ChatMessage,因为我们直接使用字符串
async def main():
print("--- 正在启动“全能研究员”工作流 ---")
# 2. 构建我们的工作流
search_workflow = build_search_workflow()
# 3. 这是我们写死的问题 (一个纯字符串)
user_question = "最新的 agent framework 框架是什么?请详细介绍一下它的核心特性和与AutoGen, Semantic Kernel的关系。"
print(f"\n用户问题: {user_question}")
# 4. 【核心修正】: 直接将用户问题的字符串传递给 run 方法
events = await search_workflow.run(user_question)
# 5. 获取并打印最终输出
final_output = events.get_outputs()
print("\n--- 全能研究员的最终回答 ---")
if final_output:
# 输出现在可能是一个列表,我们打印第一个元素
print(final_output[0])
else:
print("没有收到任何输出。")
print("-" * 50)
if __name__ == "__main__":
asyncio.run(main())
# /my-multi-agent-project/pyproject.toml
[project]
name = "my_multi_agent_project"
version = "0.1.0"
[tool.setuptools.packages.find]
where = ["src"]
# /src/tools.py
from typing import Annotated
# 这个函数体不会被执行,它只是一个“规范”
async def zhipu_web_search(
query: Annotated[str, "用于在互联网上查找信息的搜索查询字符串。"]
) -> str:
"""当你需要查找实时的、最新的信息时,请使用此工具。"""
pass
# /src/web_search_agent.py
import os
import httpx
from agent_framework import Executor, WorkflowContext, handler
from typing_extensions import Never
class WebSearchAgent(Executor):
"""
多智能体框架中的网页搜索智能体。
直接调用智谱官方 search-std 接口。
"""
def __init__(self, name: str):
super().__init__(id=name)
self.api_key = os.getenv("ZHIPU_API_KEY")
if not self.api_key:
raise ValueError("请在 .env 文件中设置 ZHIPU_API_KEY")
@handler
async def run_search(self, query: str, ctx: WorkflowContext[Never, str]) -> None:
try:
payload = {
"search_query": query,
"search_engine": "search_std"
}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
async with httpx.AsyncClient(timeout=60.0) as client:
resp = await client.post(
"https://open.bigmodel.cn/api/paas/v4/web_search",
headers=headers,
json=payload
)
resp.raise_for_status()
data = resp.json()
results = data.get("data", [])
if not results:
assistant_content = "没有找到相关搜索结果。"
else:
SEARCH_TOP_K = 5
assistant_content = "\n\n".join([
f"标题: {item.get('title', 'N/A')}\n"
f"发布日期: {item.get('publish_date', 'N/A')}\n"
f"摘要: {item.get('content', '')[:200]}...\n"
f"链接: {item.get('link', 'N/A')}"
for item in results[:SEARCH_TOP_K]
])
await ctx.yield_output(assistant_content)
except Exception as e:
error_text = f"WebSearchAgent 执行搜索时出错: {e}"
print(error_text)
await ctx.yield_output(error_text)
# /src/workflow.py (最终极简版)
from agent_framework import WorkflowBuilder, Workflow
from .zhipu_client import ZhipuAIChatClient
def build_search_workflow() -> Workflow:
"""
构建一个只包含单个“全能研究员” Agent 的工作流。
"""
client = ZhipuAIChatClient()
# 我们只创建这一个 Agent,并赋予它所有能力
all_in_one_agent = client.create_executor(
name="All_In_One_Researcher",
instructions=(
"你是一名高级AI研究助理。你的任务是深入理解用户的问题。"
"如果问题需要最新的、外部的或互联网上的信息,你必须使用你的搜索能力来寻找答案。"
"在获得信息后,你需要对信息进行提炼、总结,并给出一个全面、清晰、高质量的回答。"
),
use_search=True # 开启搜索能力
)
# 工作流只包含这一个 Agent
workflow = WorkflowBuilder().set_start_executor(all_in_one_agent).build()
return workflow
# /src/zhipu_client.py
import os
import json
import httpx
from typing_extensions import Never
from agent_framework import Executor, WorkflowContext, handler
class ZhipuExecutor(Executor):
def __init__(self, name: str, instructions: str, model: str, api_key: str, use_search: bool = False):
super().__init__(id=name)
self.instructions = instructions; self.model = model; self.api_key = api_key; self.use_search = use_search
self.api_endpoint = "https://open.bigmodel.cn/api/paas/v4/chat/completions"
@handler
async def run_zhipu_model(self, user_input: str, ctx: WorkflowContext[Never, str]) -> None:
headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" }
messages = [{"role": "system", "content": self.instructions}, {"role": "user", "content": user_input}]
payload = { "model": self.model, "messages": messages }
if self.use_search:
# 【核心修正】: 我们尝试构造一个最符合文档规范的 web_search tool
payload["tools"] = [
{
"type": "web_search",
"web_search": {
"enable": True,
# 根据某些文档示例,可能需要一个 search_query 参数,即使是空的
# 我们先不加,如果再报“不能为空”,就回来加上 "search_query": ""
}
}
]
payload["tool_choice"] = "auto"
try:
print(f"--- 最终发送的 Payload ---\n{json.dumps(payload, indent=2, ensure_ascii=False)}\n--------------------------")
async with httpx.AsyncClient(timeout=180.0) as client:
response = await client.post(self.api_endpoint, json=payload, headers=headers)
if response.status_code != 200:
print(f"!!! 服务器返回错误: {response.status_code} - {response.text}")
response.raise_for_status()
response_data = response.json()
choice = response_data.get("choices", [{}])[0]
message = choice.get("message", {})
if message.get("tool_calls"):
# 提取 web_search 返回的结果
search_results = [tc["web_search"] for tc in message["tool_calls"] if "web_search" in tc]
# 将搜索结果整合成一个字符串返回
final_result = "搜索结果: \n" + json.dumps(search_results, ensure_ascii=False, indent=2)
await ctx.yield_output(final_result)
else:
await ctx.yield_output(message.get("content", ""))
except Exception as e:
await ctx.yield_output(f"请求智谱 API 时发生错误: {e}")
class ZhipuAIChatClient:
def __init__(self, api_key: str = None):
self.api_key = api_key or os.getenv("ZHIPU_API_KEY")
if not self.api_key: raise ValueError("ZHIPU_API_KEY 未设置。")
def create_executor(self, name: str, instructions: str, model: str = "glm-4.5-air", use_search: bool = False) -> ZhipuExecutor:
return ZhipuExecutor(name=name, instructions=instructions, model=model, api_key=self.api_key, use_search=use_search)