微软Agent Framework多智能体协作框架:基于智谱AI的实现

项目结构.
├── main.py
├── pyproject.toml
└── src
    ├── __init__.py
    ├── my_multi_agent_project.egg-info
    │   ├── dependency_links.txt
    │   ├── PKG-INFO
    │   ├── SOURCES.txt
    │   └── top_level.txt
    ├── __pycache__
    │   ├── __init__.cpython-310.pyc
    │   ├── tools.cpython-310.pyc
    │   ├── web_search_agent.cpython-310.pyc
    │   ├── workflow.cpython-310.pyc
    │   └── zhipu_client.cpython-310.pyc
    ├── tools.py
    ├── web_search_agent.py
    ├── workflow.py
    └── zhipu_client.py
关键代码:
# /main.py 

# 1. 确保在所有其他导入之前加载环境变量
from dotenv import load_dotenv
load_dotenv()

import asyncio
from src.workflow import build_search_workflow
# 我们不再需要从 agent_framework 导入 ChatMessage,因为我们直接使用字符串

async def main():
    print("--- 正在启动“全能研究员”工作流 ---")
    
    # 2. 构建我们的工作流
    search_workflow = build_search_workflow()
    
    # 3. 这是我们写死的问题 (一个纯字符串)
    user_question = "最新的 agent framework 框架是什么?请详细介绍一下它的核心特性和与AutoGen, Semantic Kernel的关系。"
    print(f"\n用户问题: {user_question}")

    # 4. 【核心修正】: 直接将用户问题的字符串传递给 run 方法
    events = await search_workflow.run(user_question)
    
    # 5. 获取并打印最终输出
    final_output = events.get_outputs()
    print("\n--- 全能研究员的最终回答 ---")
    if final_output:
        # 输出现在可能是一个列表,我们打印第一个元素
        print(final_output[0])
    else:
        print("没有收到任何输出。")
    print("-" * 50)

if __name__ == "__main__":
    asyncio.run(main())
# /my-multi-agent-project/pyproject.toml

[project]
name = "my_multi_agent_project"
version = "0.1.0"

[tool.setuptools.packages.find]
where = ["src"]
# /src/tools.py 
from typing import Annotated

# 这个函数体不会被执行,它只是一个“规范”
async def zhipu_web_search(
    query: Annotated[str, "用于在互联网上查找信息的搜索查询字符串。"]
) -> str:
    """当你需要查找实时的、最新的信息时,请使用此工具。"""
    pass
# /src/web_search_agent.py

import os
import httpx
from agent_framework import Executor, WorkflowContext, handler
from typing_extensions import Never

class WebSearchAgent(Executor):
    """
    多智能体框架中的网页搜索智能体。
    直接调用智谱官方 search-std 接口。
    """
    def __init__(self, name: str):
        super().__init__(id=name)
        self.api_key = os.getenv("ZHIPU_API_KEY")
        if not self.api_key:
            raise ValueError("请在 .env 文件中设置 ZHIPU_API_KEY")

    @handler
    async def run_search(self, query: str, ctx: WorkflowContext[Never, str]) -> None:
        try:
            payload = {
                "search_query": query,
                "search_engine": "search_std"
            }
            headers = {
                "Authorization": f"Bearer {self.api_key}",
                "Content-Type": "application/json"
            }

            async with httpx.AsyncClient(timeout=60.0) as client:
                resp = await client.post(
                    "https://open.bigmodel.cn/api/paas/v4/web_search",
                    headers=headers,
                    json=payload
                )
                resp.raise_for_status()
                data = resp.json()

            results = data.get("data", [])
            if not results:
                assistant_content = "没有找到相关搜索结果。"
            else:
                SEARCH_TOP_K = 5
                assistant_content = "\n\n".join([
                    f"标题: {item.get('title', 'N/A')}\n"
                    f"发布日期: {item.get('publish_date', 'N/A')}\n"
                    f"摘要: {item.get('content', '')[:200]}...\n"
                    f"链接: {item.get('link', 'N/A')}"
                    for item in results[:SEARCH_TOP_K]
                ])

            await ctx.yield_output(assistant_content)

        except Exception as e:
            error_text = f"WebSearchAgent 执行搜索时出错: {e}"
            print(error_text)
            await ctx.yield_output(error_text)
# /src/workflow.py (最终极简版)

from agent_framework import WorkflowBuilder, Workflow
from .zhipu_client import ZhipuAIChatClient

def build_search_workflow() -> Workflow:
    """
    构建一个只包含单个“全能研究员” Agent 的工作流。
    """
    client = ZhipuAIChatClient()
    
    # 我们只创建这一个 Agent,并赋予它所有能力
    all_in_one_agent = client.create_executor(
        name="All_In_One_Researcher",
        instructions=(
            "你是一名高级AI研究助理。你的任务是深入理解用户的问题。"
            "如果问题需要最新的、外部的或互联网上的信息,你必须使用你的搜索能力来寻找答案。"
            "在获得信息后,你需要对信息进行提炼、总结,并给出一个全面、清晰、高质量的回答。"
        ),
        use_search=True # 开启搜索能力
    )

    # 工作流只包含这一个 Agent
    workflow = WorkflowBuilder().set_start_executor(all_in_one_agent).build()
    
    return workflow
# /src/zhipu_client.py 
import os
import json
import httpx
from typing_extensions import Never
from agent_framework import Executor, WorkflowContext, handler

class ZhipuExecutor(Executor):
    def __init__(self, name: str, instructions: str, model: str, api_key: str, use_search: bool = False):
        super().__init__(id=name)
        self.instructions = instructions; self.model = model; self.api_key = api_key; self.use_search = use_search
        self.api_endpoint = "https://open.bigmodel.cn/api/paas/v4/chat/completions"

    @handler
    async def run_zhipu_model(self, user_input: str, ctx: WorkflowContext[Never, str]) -> None:
        headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" }
        messages = [{"role": "system", "content": self.instructions}, {"role": "user", "content": user_input}]
        payload = { "model": self.model, "messages": messages }

        if self.use_search:
            # 【核心修正】: 我们尝试构造一个最符合文档规范的 web_search tool
            payload["tools"] = [
                {
                    "type": "web_search",
                    "web_search": {
                        "enable": True,
                        # 根据某些文档示例,可能需要一个 search_query 参数,即使是空的
                        # 我们先不加,如果再报“不能为空”,就回来加上 "search_query": ""
                    }
                }
            ]
            payload["tool_choice"] = "auto"
        
        try:
            print(f"--- 最终发送的 Payload ---\n{json.dumps(payload, indent=2, ensure_ascii=False)}\n--------------------------")
            async with httpx.AsyncClient(timeout=180.0) as client:
                response = await client.post(self.api_endpoint, json=payload, headers=headers)
                if response.status_code != 200:
                    print(f"!!! 服务器返回错误: {response.status_code} - {response.text}")
                response.raise_for_status()
                response_data = response.json()

            choice = response_data.get("choices", [{}])[0]
            message = choice.get("message", {})

            if message.get("tool_calls"):
                # 提取 web_search 返回的结果
                search_results = [tc["web_search"] for tc in message["tool_calls"] if "web_search" in tc]
                # 将搜索结果整合成一个字符串返回
                final_result = "搜索结果: \n" + json.dumps(search_results, ensure_ascii=False, indent=2)
                await ctx.yield_output(final_result)
            else:
                await ctx.yield_output(message.get("content", ""))
        except Exception as e:
            await ctx.yield_output(f"请求智谱 API 时发生错误: {e}")

class ZhipuAIChatClient:
    def __init__(self, api_key: str = None):
        self.api_key = api_key or os.getenv("ZHIPU_API_KEY")
        if not self.api_key: raise ValueError("ZHIPU_API_KEY 未设置。")

    def create_executor(self, name: str, instructions: str, model: str = "glm-4.5-air", use_search: bool = False) -> ZhipuExecutor:
        return ZhipuExecutor(name=name, instructions=instructions, model=model, api_key=self.api_key, use_search=use_search)