答疑vx群:adoresever
# AutoGen Hotspot Project
本项目利用微软 AutoGen 框架,构建一个用于分析和响应热点问题的多智能体(Multi-Agent)AI 系统。系统集成了语音合成(TTS)功能。
## 1. 先决条件 (Prerequisites)
在开始之前,请确保您的系统满足以下条件:
– **操作系统**: 一个基于 Linux 的操作系统(本项目在 Ubuntu 上开发和测试)。
– **Conda**: 已安装 Miniconda 或 Anaconda。推荐使用 [Miniconda](https://docs.conda.io/projects/miniconda/en/latest/),因为它更轻量。
– **Git**: 用于克隆本代码仓库。
## 2. 安装与配置 (Installation & Configuration)
我们提供了一个自动化的安装脚本 `setup.sh`,它将为您处理所有复杂的环境配置。请严格按照以下步骤操作。
### 步骤一:克隆代码仓库
cd autogen_hotspot_project
步骤二:运行自动化安装脚本
这是最关键的一步。 请先确保您处于 Conda 的 (base) 环境下(如果不是,请运行 conda deactivate 直到回到 base)。
然后,为脚本赋予执行权限并运行它:
chmod +x setup.sh
./setup.sh
该脚本会自动完成以下所有工作:
检查并清理任何旧的 autogen_env 环境。
创建一个全新的、纯净的 Conda 环境。
使用 conda-forge 频道安装所有核心的、复杂的依赖,从根本上保证环境的稳定性。
使用 pip 补全剩余的 Python 包。
脚本执行成功后,您将看到环境配置完毕的提示。
步骤三:配置环境变量
本项目需要使用 API 密钥和其他配置项。
复制环境变量示例文件:
cp .env.example .env
编辑新建的 .env 文件,填入您自己的密钥和配置信息。例如:
# .env 文件示例
DASHSCOPE_API_KEY=”sk-xxxxxxxxxxxxxxxxxxxxxxxx”
ZHIPU_API_KEY=”zz-xxxxxxxxxxxxxxxxxxxxxxxx”
OPENAI_API_KEY=”sk-xxxxxxxxxxxxxxxxxxxxxxxx”
OPENAI_API_BASE=”https://api.openai.com/v1″
3. 运行项目 (Running the Application)
所有配置完成后,按以下步骤启动程序:
激活 Conda 环境 (每次开启新的终端都需要执行此步骤):
conda activate autogen_env
运行主程序:
python main.py
程序启动后,您将看到提示 💭 您好!请输入您想了解的热点问题:,此时项目已成功运行。
4. 项目结构 (Project Structure)
.
├── agents/ # 存放所有 Agent 的定义文件
├── tools/ # 存放所有工具函数(搜索、语音、文件)
├── output/ # ❗ 自动生成:用于存放输出的报告和语音文件
│ ├── audio/ # -> 存放 .wav 语音文件
│ └── *.txt # -> 存放 .txt 文本报告
├── main.py # ✅ 项目主入口文件,负责编排整个流程
├── setup.sh # ✅ 推荐的一键安装脚本
├── README.md # 📖 本说明文档
├── requirements.txt # 📄 【参考】项目依赖列表,主要由 setup.sh 在内部使用
└── .env.example # 🔑 【重要】环境变量示例文件,请复制为 .env 并修改
5. 故障排除 (Troubleshooting)
问题: 运行 ./setup.sh 或其他 Conda 命令时出现 CondaHTTPError: HTTP 000 CONNECTION FAILED 错误。
原因: 这是网络问题,连接 Conda 官方服务器超时或失败。
解决方案: 更换为国内镜像源(如清华大学源),可以大幅提升下载速度和稳定性。
# 配置 Conda 镜像源
conda config –add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge/
conda config –add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/
conda config –set show_channel_urls yes
.env
# Qwen API 配置
# 获取方式:https://dashscope.console.aliyun.com/
BASE_URL="https://dashscope.aliyuncs.com/compatible-mode/v1"
MODEL="qwen-plus"
QWEN_API_KEY=
# 智谱 API 配置
# 获取方式:https://open.bigmodel.cn/
ZHIPU_API_KEY=
DASHSCOPE_API_KEY=
# 搜索结果数量(默认5)
SEARCH_TOP_K=5
# 调试模式(默认True)
DEBUG_MODE=True
requirements.txt
# 配置完成后,重新运行安装脚本 ./setup.sh
# AutoGen 框架的核心包
pyautogen
# 用于加载 .env 文件中的环境变量
python-dotenv>=1.0.0
# AutoGen的核心依赖,用于连接兼容接口
openai>=1.0.0
# 用于向智谱 API 发送 HTTP 请求
requests>=2.31.0
# 阿里云官方 SDK,用于文本转语音
dashscope
# 用于实时播放音频流的核心库
pyaudio
# 用于高效处理音频数据
numpy
setup.sh
#!/bin/bash
# ===================================================================
# AutoGen Hotspot Project - 自动化安装脚本
# 执行此脚本: ./setup.sh
# ===================================================================
# 如果任何命令失败,则立即退出脚本
set -e
# 定义环境名称
ENV_NAME="autogen_env"
echo "--> 第 1 步:清理旧环境(如果存在)..."
# 检查环境是否存在,如果存在则删除
if conda info --envs | grep -q "^$ENV_NAME\s"; then
echo "发现旧环境 '$ENV_NAME',正在删除..."
conda env remove --name $ENV_NAME --yes
echo "旧环境删除完毕。"
else
echo "未发现旧环境,跳过清理。"
fi
echo "--> 第 2 步:创建一个纯净的 Python 3.10 环境..."
conda create --name $ENV_NAME python=3.10 --yes
echo "--> 第 3 步:激活环境并执行使用 conda-forge 安装核心依赖..."
# conda run 会在指定的 conda 环境中执行命令
conda run -n $ENV_NAME conda install -c conda-forge pyautogen python-dotenv requests openai pyaudio numpy --yes
echo "--> 第 4 步:使用 pip 安装剩余的补充依赖..."
conda run -n $ENV_NAME pip install dashscope
echo ""
echo "====================================================="
echo " ✅ 环境 '$ENV_NAME' 已成功创建并配置完毕!"
echo ""
echo "请手动激活环境以开始使用:"
echo "conda activate $ENV_NAME"
echo "====================================================="
main.py
# main.py
import os
import autogen
import dotenv
from datetime import datetime
import logging
# --- 基础配置 ---
# 抑制 FLAML 和 AutoGen 的一些冗余日志输出,让终端更干净
logging.getLogger("flaml").setLevel(logging.ERROR)
logging.getLogger("autogen.oai.client").setLevel(logging.ERROR)
dotenv.load_dotenv()
# --- 导入 Agent 和工具 ---
from agents.user_proxy import create_user_proxy
from agents.query_analyst import create_query_analyst
from agents.info_synthesizer import create_info_synthesizer
from agents.report_generator import create_report_generator
from agents.announcer import create_announcer
from tools.search_tool import zhipu_web_search
from tools.file_tool import save_report_to_file
from tools.speaker_tool import text_to_speech
# --- 加载 LLM 配置 ---
QWEN_API_KEY = os.getenv("QWEN_API_KEY")
BASE_URL = os.getenv("BASE_URL")
MODEL = os.getenv("MODEL")
if not QWEN_API_KEY:
raise ValueError("错误: 未在 .env 文件中找到 QWEN_API_KEY")
# 共享的 LLM 配置
llm_config = {
"config_list": [{"model": MODEL, "api_key": QWEN_API_KEY, "base_url": BASE_URL}],
"temperature": 0.2
}
# ========== 主程序入口 ==========
if __name__ == "__main__":
initial_question = input("💭 您好!请输入您想了解的热点问题: ")
if not initial_question.strip():
print("错误:输入不能为空。")
exit()
print("\n" + "="*20 + " 流程启动 " + "="*20)
# ======================================================================================
# 阶段一:意图分析 (单 Agent 对话,获取清晰的任务描述)
# ======================================================================================
print("\n> [阶段一] 正在分析您的问题意图...")
# 1. 创建此阶段专用的 Agent
# - User Proxy 作为一个简单的任务发起者
# - Query Analyst (新版) 负责分析问题
analysis_user_proxy = autogen.UserProxyAgent(
name="Analysis_Task_Initiator",
human_input_mode="NEVER",
max_consecutive_auto_reply=2, # 只需要一轮对话
code_execution_config=False, # 此阶段不需要执行代码
is_termination_msg=lambda x: True # Analyst 回复后立即终止
)
query_analyst = create_query_analyst(llm_config)
# 2. 发起一次性的对话来获取分析结果
analysis_user_proxy.initiate_chat(
query_analyst,
message=initial_question,
)
# 3. 从对话历史中提取出分析师的最终结论
analyzed_task_description = analysis_user_proxy.last_message()["content"]
print(f"> [分析完成] 明确的任务目标是: '{analyzed_task_description}'")
# ======================================================================================
# 阶段二:工具执行 (由确定性的 Python 代码完成)
# ======================================================================================
print(f"\n> [阶段二] 正在围绕任务目标执行网络搜索...")
# 直接调用工具函数,传入分析后的、高质量的任务描述
search_results = zhipu_web_search(query=analyzed_task_description)
print(f"> [搜索完成] 已获取相关资料。")
# ======================================================================================
# 阶段三:内容生成 (GroupChat 负责文本创作)
# ======================================================================================
print(f"\n> [阶段三] 启动 AI 团队进行报告生成...")
# 1. 定义好所有在 GroupChat 中需要被执行的工具
# 未来如果有更多工具,也添加到这个字典里
tools_for_groupchat = {
"text_to_speech": text_to_speech
# 如果 announcer 还需要其他工具,也在这里添加
}
# 2. 创建此阶段需要的 Agent,并在创建 User_Proxy 时直接注入工具
user_proxy = create_user_proxy(function_map=tools_for_groupchat) # <-- 关键改动在这里
info_synthesizer = create_info_synthesizer(llm_config)
report_generator = create_report_generator(llm_config)
announcer = create_announcer(llm_config)
# 3. 定义 GroupChat 和 Manager (这部分不变)
groupchat = autogen.GroupChat(
agents=[user_proxy, info_synthesizer, report_generator, announcer],
messages=[],
max_round=10,
speaker_selection_method=lambda last_speaker, groupchat:
announcer if groupchat.messages[-1].get("role") == "tool" else
info_synthesizer if last_speaker is user_proxy else
report_generator if last_speaker is info_synthesizer else
announcer if last_speaker is report_generator else
user_proxy
)
manager = autogen.GroupChatManager(groupchat=groupchat, llm_config=llm_config)
# 4. 构造启动消息 (这部分不变)
initial_message_for_groupchat = f"""
用户的原始问题是:"{initial_question}"
经过分析,我们明确了核心任务是:"{analyzed_task_description}"
为了完成这个任务,我已经执行了网络搜索,相关资料如下:
--- 搜索结果 ---
{search_results}
--- 搜索结果结束 ---
现在,请团队开始工作,基于以上资料完成最终的报告。
"""
# 5. 启动 AI 团队的对话协作 (这部分不变)
user_proxy.initiate_chat(
manager,
message=initial_message_for_groupchat,
)
# ======================================================================================
# 阶段四:后处理
# ======================================================================================
print("\n" + "="*20 + " AI协作完成,开始执行最终任务 " + "="*20)
final_report_content = None
# 遍历所有消息,找到 Report_Generator 的发言
all_messages = user_proxy.chat_messages[manager]
for msg in all_messages:
if msg.get("name") == "Report_Generator":
final_report_content = msg["content"]
break
if final_report_content:
print("\n> [后处理] 已成功从对话历史中提取最终报告。")
# 执行保存
save_status = save_report_to_file(final_report_content, initial_question)
print(f"> {save_status}")
else:
print("\n> [警告] 未能在对话历史中找到报告内容,文件未保存,语音未播报。")
print("\n🎉 " + "="*20 + " 所有任务已成功完成! " + "="*20 + " 🎉")
agents/announcer.py
import autogen
def create_announcer(llm_config):
return autogen.AssistantAgent(
name="Announcer",
system_message="""你是一个专业的播音员。你的唯一任务是:
1. 接收`Report_Generator`生成的最终报告文本。
2. **必须**调用`text_to_speech`工具,并将**完整的报告文本**作为`content`参数传递给它。
3. 在你确认工具调用已发出后,你**必须在最后、且只说 'TERMINATE'** 来结束整个流程。""",
llm_config={
"config_list": llm_config["config_list"], "temperature": 0,
"tools": [{
"type": "function",
"function": {
"name": "text_to_speech",
"description": "将最终的报告文本内容转换为语音并实时播放。",
"parameters": {
"type": "object", "properties": {
"content": {"type": "string", "description": "需要朗读的完整报告文本。"}
}, "required": ["content"]},
}
}],
"tool_choice": "auto"
}
)
agents/info_synthesizer.py
import autogen
def create_info_synthesizer(llm_config):
return autogen.AssistantAgent(
name="Info_Synthesizer",
system_message="""你是一个信息整合师。你的任务是接收`zhipu_web_search`工具返回的原始搜索结果。
你必须对这些信息进行分析、去重和筛选,提炼出最核心、最相关的内容,并形成一个结构化的要点列表。""",
llm_config=llm_config
)
agents/query_analyst.py
# agents/query_analyst.py (终极时效性版)
import autogen
from datetime import datetime
def create_query_analyst(llm_config):
"""
创建一个 Query_Analyst Agent。
... (文档字符串不变) ...
"""
current_date_str = datetime.now().strftime('%Y年%m月%d日')
return autogen.AssistantAgent(
name="Query_Analyst",
system_message=f"""你是高级任务规划专家,今天是 {current_date_str}。
你的核心使命是确保所有信息都具备最高的时效性。
你的唯一职责是分析用户的原始问题,并遵循以下严格的逻辑树来生成最终的任务描述:
**1. 分析用户的输入中是否包含“近期”、“本周”、“本月”或类似的、指代一段时间的词语。**
**A) 如果是(例如,用户问“近期AI热点”):**
- 你的任务描述必须明确限定时间范围为“过去一个月内”。
- **你的输出格式必须是**: "搜索并总结过去一个月内关于 [用户主题] 的热点新闻和进展。"
- 示例: "搜索并总结过去一个月内关于AI领域的热点新闻和进展。"
**B) 如果否(默认情况,例如,用户问“今日热点”或“AI”):**
- 你的任务描述必须严格限定在“今天”,也就是“{current_date_str}”。
- **你的输出格式必须是**: "搜索并总结 {current_date_str} 关于 [用户主题] 的热点新闻。"
- 示例1 (用户问“今日热点”): "搜索并总结 {current_date_str} 的科技、财经和国际头条新闻。"
- 示例2 (用户问“AI”): "搜索并总结 {current_date_str} 关于AI领域的热点新闻。"
**你的最终输出,只能是根据上述逻辑生成的、格式完全匹配的任务描述字符串,绝对不能包含任何其他解释、问候或对话。**""",
llm_config=llm_config
)
agents/report_generator.py
import autogen
from datetime import datetime
def create_report_generator(llm_config):
return autogen.AssistantAgent(
name="Report_Generator",
system_message=f"""你是一个报告生成专家。今天是 {datetime.now().strftime('%Y年%m月%d日')}。
你的任务是接收`信息整合师`提供的要点列表,并将这些要点撰写成一篇通顺、易读的最终报告。
你的任务仅限于生成报告文本本身。完成后,直接输出报告内容即可。""",
llm_config=llm_config
)
agents/user_proxy.py
# agents/user_proxy.py
import autogen
def create_user_proxy(function_map=None):
"""
创建一个 UserProxyAgent。
这个新版本可以接收一个可选的 function_map 参数,
以便在创建时就为其注册所有需要的工具函数。
"""
return autogen.UserProxyAgent(
name="User_Proxy_For_Initiating_Chat",
human_input_mode="NEVER",
max_consecutive_auto_reply=15,
is_termination_msg=lambda x: "TERMINATE" in x.get("content", "").upper(),
code_execution_config={"work_dir": "temp_code_execution", "use_docker": False},
# 在这里,我们将 function_map 直接传递给 agent 的构造函数
function_map=function_map
)
tools/file_tool.py
# tools/file_tool.py
import os
from datetime import datetime
def save_report_to_file(content: str, topic: str) -> str:
print(f"\n> [工具执行] 正在保存报告...")
try:
os.makedirs("output", exist_ok=True)
safe_topic = "".join(c for c in topic if c.isalnum())[:20]
filename = f"output/{safe_topic}_report_{datetime.now().strftime('%Y%m%d')}.txt"
with open(filename, 'w', encoding='utf-8') as f:
f.write(content)
return f"报告已成功保存至 {filename}"
except Exception as e:
return f"保存文件出错: {e}"
tools/search_tool.py
# tools/search_tool.py
import os
import requests
# 这个工具需要智谱的API Key
ZHIPU_API_KEY = os.getenv("ZHIPU_API_KEY")
def zhipu_web_search(query: str) -> str:
"""一个调用智谱Web Search API的工具。"""
if not ZHIPU_API_KEY:
return "错误:未在 .env 文件中找到 ZHIPU_API_KEY"
print(f"\n> [工具执行中] 正在使用智谱搜索: '{query}'")
try:
response = requests.post(
"https://open.bigmodel.cn/api/paas/v4/web_search",
headers={"Authorization": f"Bearer {ZHIPU_API_KEY}"},
json={"search_query": query, "search_engine": "search_std"},
timeout=30
)
response.raise_for_status()
results = response.json().get("search_result", [])
if not results:
return "没有找到相关结果。"
SEARCH_TOP_K = int(os.getenv("SEARCH_TOP_K", 5))
return "\n\n".join([
f"标题: {item.get('title', 'N/A')}\n"
f"发布日期: {item.get('publish_date', 'N/A')}\n"
f"摘要: {item.get('content', 'N/A')[:200]}...\n"
f"链接: {item.get('link', 'N/A')}"
for item in results[:SEARCH_TOP_K]
])
except Exception as e:
return f"搜索工具出错: {e}"
tools/speaker_tool.py
import os
import dashscope
import pyaudio
import base64
import re
from datetime import datetime
def text_to_speech(content: str, voice: str = "Ethan") -> str:
api_key = os.getenv("DASHSCOPE_API_KEY")
if not api_key:
return "错误:未在 .env 文件中找到 DASHSCOPE_API_KEY。"
p = pyaudio.PyAudio()
stream = p.open(format=pyaudio.paInt16, channels=1, rate=24000, output=True)
sentences = re.split(r'([。?!;.!?])', content)
processed_sentences = [sentences[i] + (sentences[i+1] if i+1 < len(sentences) else '') for i in range(0, len(sentences), 2)]
processed_sentences = [s.strip() for s in processed_sentences if s.strip()]
full_audio_data = bytearray()
print(f"\n> [工具执行中] 准备实时朗读报告...")
try:
print("> 正在合成并实时播放语音...")
for i, sentence in enumerate(processed_sentences):
if not sentence: continue
response_generator = dashscope.audio.qwen_tts.SpeechSynthesizer.call(
model="qwen-tts", text=sentence, api_key=api_key, voice=voice, stream=True
)
for chunk in response_generator:
audio_data_b64 = chunk.get("output", {}).get("audio", {}).get("data")
if audio_data_b64:
wav_bytes = base64.b64decode(audio_data_b64)
stream.write(wav_bytes)
full_audio_data.extend(wav_bytes)
print(f"> ...已播放第 {i+1}/{len(processed_sentences)} 句...")
stream.stop_stream()
os.makedirs("output/audio", exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filepath = f"output/audio/report_{timestamp}.wav"
with open(filepath, "wb") as f:
f.write(full_audio_data)
print(f"> 语音文件已成功保存至: {filepath}")
return f"报告已朗读完毕,并已保存至 {filepath}"
except Exception as e:
import traceback
traceback.print_exc()
return f"文本转语音工具出错: {e}"
finally:
stream.close()
p.terminate()