阿里 DeepResearch 智能体模型API调用

“””
通义千问 Deep Research 智能体调用示例
功能:联网调研 → 报告生成 → 提供来源

使用前准备:
1. 注册阿里云账号:https://www.aliyun.com
2. 开通百炼大模型服务
3. 获取 API Key
4. 安装依赖:pip install dashscope
“””

import os
import sys
import json
import time
from datetime import datetime
from typing import Dict, List, Optional

try:
import dashscope
from dashscope import Generation
except ImportError:
print(“错误:未安装 dashscope 库”)
print(“请运行:pip install dashscope”)
sys.exit(1)

# ==================== 配置区域 ====================
# 方式1:直接配置 API Key(不推荐在生产环境使用)
API_KEY = “sk-xxxxxxxxxxxxx” # 在这里填入你的 API Key,格式:sk-xxxxxxxxxxxxx

# 方式2:从环境变量读取(推荐)
if not API_KEY:
API_KEY = os.getenv(‘DASHSCOPE_API_KEY’)

# 模型配置
MODEL_NAME = “qwen-deep-research” # Deep Research 智能体模型
TIMEOUT = 180 # 超时时间(秒)
MAX_RETRIES = 3 # 最大重试次数

# 输出配置
SAVE_TO_FILE = True # 是否保存结果到文件
OUTPUT_DIR = “research_outputs” # 输出目录
SHOW_DETAILS = True # 是否显示详细过程

# ==================== 工具函数 ====================

def check_api_key():
“””检查 API Key 是否配置”””
if not API_KEY or API_KEY.startswith(“sk-xxx”):
print(“=” * 60)
print(“错误:未配置有效的 API Key”)
print(“-” * 60)
print(“请按以下步骤配置:”)
print(“1. 访问阿里云百炼平台获取 API Key”)
print(“2. 在代码中设置 API_KEY 变量”)
print(” 或设置环境变量 DASHSCOPE_API_KEY”)
print(“=” * 60)
return False
return True

def ensure_output_dir():
“””确保输出目录存在”””
if SAVE_TO_FILE and not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)

def save_research_result(content: str, topic: str):
“””保存研究结果到文件”””
if not SAVE_TO_FILE:
return

ensure_output_dir()
timestamp = datetime.now().strftime(“%Y%m%d_%H%M%S”)
filename = f”{OUTPUT_DIR}/research_{timestamp}.md”

with open(filename, ‘w’, encoding=’utf-8′) as f:
f.write(f”# 研究报告:{topic}\n\n”)
f.write(f”生成时间:{datetime.now().strftime(‘%Y-%m-%d %H:%M:%S’)}\n\n”)
f.write(“—\n\n”)
f.write(content)

print(f”\n📄 报告已保存到:{filename}”)

class DeepResearchAgent:
“””Deep Research 智能体封装类”””

def __init__(self, api_key: str = None):
self.api_key = api_key or API_KEY
self.research_goal = “”
self.web_sites = []
self.final_report = “”

def call_model(self, messages: List[Dict], step_name: str = “研究”) -> str:
“””调用 Deep Research 模型”””
print(f”\n{‘=’*60}”)
print(f”📊 {step_name}”)
print(‘=’*60)

for attempt in range(MAX_RETRIES):
try:
responses = Generation.call(
api_key=self.api_key,
model=MODEL_NAME,
messages=messages,
stream=True, # Deep Research 仅支持流式输出
timeout=TIMEOUT
)

return self._process_responses(responses, step_name)

except dashscope.DashScopeError as e:
print(f”\n❌ API 错误:{e}”)
if “InvalidApiKey” in str(e):
print(“API Key 无效,请检查配置”)
sys.exit(1)
elif “Throttling” in str(e):
wait_time = (attempt + 1) * 5
print(f”⏳ 请求频率限制,等待 {wait_time} 秒后重试…”)
time.sleep(wait_time)
else:
print(f”错误详情:{e}”)
if attempt == MAX_RETRIES – 1:
raise

except Exception as e:
print(f”\n❌ 未知错误:{e}”)
if attempt == MAX_RETRIES – 1:
raise
time.sleep(2)

return “”

def _process_responses(self, responses, step_name: str) -> str:
“””处理流式响应”””
current_phase = None
phase_content = “”
full_content = “”

for response in responses:
# 检查响应状态
if hasattr(response, ‘status_code’) and response.status_code != 200:
self._handle_error_response(response)
continue

if not hasattr(response, ‘output’) or not response.output:
continue

message = response.output.get(‘message’, {})
phase = message.get(‘phase’)
content = message.get(‘content’, ”)
status = message.get(‘status’)
extra = message.get(‘extra’, {})

# 处理阶段切换
if phase != current_phase:
if current_phase and phase_content:
self._display_phase_complete(current_phase, step_name)

current_phase = phase
phase_content = “”

if phase and phase != “KeepAlive”:
self._display_phase_start(phase, step_name)

# 处理不同阶段的内容
if phase == “WebResearch”:
self._process_web_research(extra, status)

elif phase == “answer”:
if content:
phase_content += content
full_content += content
if SHOW_DETAILS:
print(content, end=”, flush=True)

# 处理完成状态
if status == “finished”:
self._display_usage_stats(response)

if current_phase and phase_content:
self._display_phase_complete(current_phase, step_name)

self.final_report = full_content
return full_content

def _process_web_research(self, extra: Dict, status: str):
“””处理网络研究阶段的信息”””
if not SHOW_DETAILS:
return

research_info = extra.get(‘deep_research’, {}).get(‘research’, {})

if status == “streamingQueries”:
goal = research_info.get(‘researchGoal’, ”)
if goal:
self.research_goal += goal
print(f”\n🎯 研究目标:{goal}”)

elif status == “streamingWebResult”:
sites = research_info.get(‘webSites’, [])
if sites and sites != self.web_sites:
self.web_sites = sites
print(f”\n🔍 找到 {len(sites)} 个相关信息源:”)
for i, site in enumerate(sites[:5], 1): # 只显示前5个
print(f”\n [{i}] {site.get(‘title’, ‘无标题’)}”)
print(f” {site.get(‘description’, ‘无描述’)[:100]}…”)
print(f” 🔗 {site.get(‘url’, ”)}”)

if len(sites) > 5:
print(f”\n … 还有 {len(sites)-5} 个信息源”)

def _display_phase_start(self, phase: str, step_name: str):
“””显示阶段开始信息”””
if not SHOW_DETAILS:
return

phase_names = {
‘WebResearch’: ‘🌐 网络调研阶段’,
‘answer’: ‘📝 生成报告阶段’,
‘Planning’: ‘📋 规划阶段’
}
print(f”\n{phase_names.get(phase, phase)}”)
print(“-” * 40)

def _display_phase_complete(self, phase: str, step_name: str):
“””显示阶段完成信息”””
if not SHOW_DETAILS:
return
print(f”\n✅ {phase} 阶段完成”)

def _display_usage_stats(self, response):
“””显示 Token 使用统计”””
if not SHOW_DETAILS:
return

if hasattr(response, ‘usage’) and response.usage:
usage = response.usage
print(f”\n\n📊 Token 使用统计:”)
print(f” 输入:{usage.get(‘input_tokens’, 0)} tokens”)
print(f” 输出:{usage.get(‘output_tokens’, 0)} tokens”)
print(f” 总计:{usage.get(‘total_tokens’, 0)} tokens”)

def _handle_error_response(self, response):
“””处理错误响应”””
print(f”\n❌ HTTP 错误码:{response.status_code}”)
if hasattr(response, ‘code’):
print(f”错误码:{response.code}”)
if hasattr(response, ‘message’):
print(f”错误信息:{response.message}”)

def research(self, topic: str, clarification: str = None) -> str:
“””执行完整的研究流程”””
print(f”\n🔬 开始研究:{topic}”)

# 第一步:初步分析和反问
messages = [{‘role’: ‘user’, ‘content’: topic}]
initial_response = self.call_model(messages, “第一步:初步分析”)

# 第二步:深入研究
messages.append({‘role’: ‘assistant’, ‘content’: initial_response})

if clarification:
messages.append({‘role’: ‘user’, ‘content’: clarification})
else:
# 默认要求全面研究
messages.append({‘role’: ‘user’, ‘content’: ‘请全面深入地研究这个主题,包括最新进展、关键技术、应用案例和未来趋势’})

final_report = self.call_model(messages, “第二步:深入研究”)

# 保存结果
if SAVE_TO_FILE and final_report:
save_research_result(final_report, topic)

return final_report

# ==================== 便捷函数 ====================

def quick_research(topic: str, save: bool = True) -> str:
“””快速研究函数”””
global SAVE_TO_FILE
SAVE_TO_FILE = save

if not check_api_key():
return “”

agent = DeepResearchAgent()
return agent.research(topic)

def interactive_research():
“””交互式研究”””
if not check_api_key():
return

print(“\n” + “=”*60)
print(“🤖 通义 Deep Research 智能体”)
print(“=”*60)
print(“输入 ‘quit’ 或 ‘exit’ 退出”)
print(“输入 ‘help’ 查看帮助”)
print(“-“*60)

agent = DeepResearchAgent()

while True:
topic = input(“\n📝 请输入研究主题:”).strip()

if topic.lower() in [‘quit’, ‘exit’, ‘q’]:
print(“👋 再见!”)
break

if topic.lower() == ‘help’:
print(“\n使用说明:”)
print(“1. 输入任何想研究的主题”)
print(“2. 系统会先分析并可能提出细化问题”)
print(“3. 你可以回答细化问题或要求全面研究”)
print(“4. 等待生成完整的研究报告”)
continue

if not topic:
print(“❌ 主题不能为空”)
continue

try:
# 执行研究
result = agent.research(topic)

if result:
print(“\n” + “=”*60)
print(“✅ 研究完成!”)

if agent.web_sites:
print(f”\n📚 参考了 {len(agent.web_sites)} 个信息源”)

except KeyboardInterrupt:
print(“\n\n⚠️ 研究被中断”)
except Exception as e:
print(f”\n❌ 研究失败:{e}”)

# ==================== 主程序 ====================

def main():
“””主函数:提供多种使用方式”””

# 检查 API Key
if not check_api_key():
return

print(“\n请选择使用方式:”)
print(“1. 交互式研究(推荐)”)
print(“2. 快速研究示例”)
print(“3. 自定义研究”)

choice = input(“\n请输入选择 (1/2/3):”).strip()

if choice == “1”:
interactive_research()

elif choice == “2”:
# 示例研究主题
topics = [
“人工智能在教育中的应用”,
“区块链技术的最新发展”,
“量子计算的商业应用前景”
]

print(“\n示例主题:”)
for i, t in enumerate(topics, 1):
print(f”{i}. {t}”)

idx = input(“\n选择主题编号:”).strip()
if idx.isdigit() and 1 <= int(idx) <= len(topics):
result = quick_research(topics[int(idx)-1])
else:
print(“❌ 无效选择”)

elif choice == “3”:
# 自定义完整流程
topic = input(“\n研究主题:”).strip()
if topic:
agent = DeepResearchAgent()

# 第一步
messages = [{‘role’: ‘user’, ‘content’: topic}]
response1 = agent.call_model(messages, “初步分析”)

# 询问是否需要细化
clarify = input(“\n是否需要补充说明?(直接回车跳过):”).strip()

# 第二步
messages.append({‘role’: ‘assistant’, ‘content’: response1})
messages.append({‘role’: ‘user’, ‘content’: clarify or “请全面研究”})

final = agent.call_model(messages, “深入研究”)

if final and SAVE_TO_FILE:
save_research_result(final, topic)
else:
print(“❌ 无效选择”)

if __name__ == “__main__”:
# 可以直接运行交互式界面
# interactive_research()

# 或者运行主菜单
main()

# 或者直接调用快速研究
# result = quick_research(“你的研究主题”)

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注