请先在.env文件中设置好 OPENAI_API_KEY 1.extract_tool.py : “”” 通用信息提取工具 使用方法: python extract_tool.py <配置名称> <输入文件> “”” import langextract as lx import os import sys import argparse from datetime import datetime from pathlib import Path from dotenv import load_dotenv from collections import Counter # 导入配置 from config import EXTRACTION_CONFIGS # 自动加载 .env 文件,同时加载 OPENAI_API_KEY 和 LANGEXTRACT_API_KEY load_dotenv() def process_document(config_name, input_file): “”” 根据配置处理文件 (已整合所有成功秘诀) “”” # — 1. 配置和文件检查 — if config_name not in EXTRACTION_CONFIGS: print(f”❌ 错误: 未找到配置 ‘{config_name}'”) sys.exit(1) if not Path(input_file).exists(): print(f”❌ 错误: 文件不存在 ‘{input_file}'”) sys.exit(1) config = EXTRACTION_CONFIGS[config_name] print(“-” * 50) print(f”📁 处理文件: {input_file} | ⚙️ 配置: {config_name}”) print(f”📝 任务描述: {config.get(‘description’, ‘无描述’)}”) with open(input_file, ‘r’, encoding=’utf-8′) as f: text = f.read() print(f”📄 文件长度: {len(text):,} 字符”) # — 2. 构建提取参数 — model_id = config.get(“model”, “gpt-4o-mini”) extract_params = { “text_or_documents”: text, “prompt_description”: config[“prompt”], “examples”: config[“examples”], “model_id”: model_id, “extraction_passes”: config.get(“extraction_passes”, 1), “max_workers”: config.get(“max_workers”, 10), “max_char_buffer”: config.get(“max_char_buffer”, 2000) } # — 3. 针对不同模型应用特殊配置 (核心秘诀) — if ‘gpt’ in model_id.lower(): print(f”🔧 检测到 OpenAI 模型 ({model_id}),应用特殊配置…”) from langextract.inference import OpenAILanguageModel api_key = os.environ.get(“OPENAI_API_KEY”) if not api_key: print(“\n❌ 致命错误: 未在 .env 文件中找到 ‘OPENAI_API_KEY’。”) print(” 请在 .env 文件中添加一行: OPENAI_API_KEY=sk-…”) sys.exit(1) extract_params.update({ “language_model_type”: OpenAILanguageModel, “api_key”: api_key, “fence_output”: True, “use_schema_constraints”: False }) else: # 默认使用 Gemini 或其他模型,langextract 会自动寻找 LANGEXTRACT_API_KEY print(f”🔧 使用默认或 Gemini 模型 ({model_id}) 配置…”) api_key = os.environ.get(“LANGEXTRACT_API_KEY”) if not api_key: print(f”\n❌ 致命错误: 未找到适用于 ‘{model_id}’ 的 API 密钥。”) print(” 请在 .env 文件中添加: LANGEXTRACT_API_KEY=…”) sys.exit(1) # — 4. 执行提取 — print(“🚀 开始提取,请耐心等待…”) result = lx.extract(**extract_params) if not result.extractions: print(“\n⚠️ 警告:本次运行未能提取到任何实体。”) print(” 请检查: 1. Prompt和示例是否清晰。 2. API密钥和账户额度。 3. 源文件内容是否匹配任务。”) return # — 5. 保存和可视化 (使用最新、正确的方式) — output_dir = Path(“extraction_results”) output_dir.mkdir(exist_ok=True) timestamp = datetime.now().strftime(“%Y%m%d_%H%M%S”) output_base_name = f”{config_name}_{Path(input_file).stem}_{timestamp}” jsonl_path = output_dir / f”{output_base_name}.jsonl” lx.io.save_annotated_documents([result], output_name=str(jsonl_path.name), output_dir=str(output_dir)) html_path = output_dir / f”{output_base_name}.html” html_content = lx.visualize(str(jsonl_path)) with open(html_path, “w”, encoding=”utf-8″) as f: f.write(html_content) print(“\n✅ 提取完成!”) print(f” • 数据文件: {jsonl_path}”) print(f” • 可视化报告: {html_path}”) # — 6. 结果统计 — class_counts = Counter(e.extraction_class for e in result.extractions) print(“\n📊 提取统计:”) print(f” 总实体数: {len(result.extractions)}”) for class_name, count in sorted(class_counts.items()): print(f” – {class_name}: {count} 个”) def main(): parser = argparse.ArgumentParser(description=”通用信息提取工具 (最终增强版)”) parser.add_argument(“config_name”, help=”配置名称 (如 medical, fable 等)”) parser.add_argument(“input_file”, help=”输入文件路径”) args = parser.parse_args() process_document(args.config_name, args.input_file) if __name__ == “__main__”: main() 2.config.py : “”” 信息提取配置文件 在这里定义不同类型文档的提取规则 “”” import langextract as lx # 提取配置字典 EXTRACTION_CONFIGS = { # ========== 医疗记录配置 ========== “medical”: { “description”: “提取医疗记录中的药物、诊断和治疗信息”, “model”: “gpt-4o-mini”, # 可选: gpt-4o, gpt-4, gpt-3.5-turbo “extraction_passes”: 2, # 多轮提取提高准确率 “max_workers”: 10, “max_char_buffer”: 1500, “prompt”: “”” 提取以下医疗信息: 1. 药物名称、剂量、用药频率、用药途径 2. 诊断结果 3. 症状描述 4. 治疗方案 使用原文精确提取,不要改写或总结。 为每个实体提供相关属性。 “””, “examples”: [ lx.data.ExampleData( text=”患者主诉头痛3天,诊断为偏头痛。处方:布洛芬400mg,口服,每日3次,饭后服用。”, extractions=[ lx.data.Extraction( extraction_class=”症状”, extraction_text=”头痛3天”, attributes={“持续时间”: “3天”} ), lx.data.Extraction( extraction_class=”诊断”, extraction_text=”偏头痛”, attributes={“类型”: “神经系统疾病”} ), lx.data.Extraction( extraction_class=”药物”, extraction_text=”布洛芬”, attributes={ “剂量”: “400mg”, “途径”: “口服”, “频率”: “每日3次”, “注意事项”: “饭后服用” } ), ] ) ] }, # ========== 合同文件配置 ========== “contract”: { “description”: “提取合同中的关键条款和信息”, “model”: “gpt-4o-mini”, # 合同一般比较规范,可以用便宜的模型 “extraction_passes”: 1, “max_workers”: 5, “max_char_buffer”: 2000, “prompt”: “”” 提取合同中的以下信息: 1. 甲方和乙方信息(公司名称、代表人) 2. 合同金额和支付条款 3. 合同期限和重要日期 4. 主要权利和义务 5. 违约条款 保持原文的准确性,提取完整条款。 “””, “examples”: [ lx.data.ExampleData( text=”甲方:北京科技有限公司(法定代表人:张明),乙方:上海贸易有限公司,合同总金额:人民币100万元整,合同期限:2024年1月1日至2024年12月31日。”, extractions=[ lx.data.Extraction( extraction_class=”甲方”, extraction_text=”北京科技有限公司”, attributes={“法定代表人”: “张明”} ), lx.data.Extraction( extraction_class=”乙方”, extraction_text=”上海贸易有限公司”, attributes={} ), lx.data.Extraction( extraction_class=”金额”, extraction_text=”人民币100万元整”, attributes={“币种”: “人民币”, “数额”: “100万元”} ), lx.data.Extraction( extraction_class=”期限”, extraction_text=”2024年1月1日至2024年12月31日”, attributes={“开始日期”: “2024年1月1日”, “结束日期”: “2024年12月31日”} ), ] ) ] }, # ========== 新闻文章配置 ========== “news”: { “description”: “提取新闻文章的关键要素”, “model”: “gpt-4o-mini”, “extraction_passes”: 1, “max_workers”: 15, “max_char_buffer”: 2500, “prompt”: “”” 提取新闻文章中的: 1. 人物(姓名、职位、所属机构) 2. 地点 3. 时间 4. 事件(发生了什么) 5. 引用的言论 标注每个事件的重要性(高/中/低)。 “””, “examples”: [ lx.data.ExampleData( text=”昨天下午,苹果公司CEO库克在加州总部宣布,公司将投资50亿美元开发AI技术。他表示:’这是苹果历史上最重要的战略转型。'”, extractions=[ lx.data.Extraction( extraction_class=”时间”, extraction_text=”昨天下午”, attributes={“精确度”: “相对时间”} ), lx.data.Extraction( extraction_class=”人物”, extraction_text=”库克”, attributes={“职位”: “CEO”, “公司”: “苹果公司”} ), lx.data.Extraction( extraction_class=”地点”, extraction_text=”加州总部”, attributes={“类型”: “公司总部”} ), lx.data.Extraction( extraction_class=”事件”, extraction_text=”投资50亿美元开发AI技术”, attributes={“重要性”: “高”, “金额”: “50亿美元”, “领域”: “AI技术”} ), lx.data.Extraction( extraction_class=”言论”, extraction_text=”这是苹果历史上最重要的战略转型”, attributes={“发言人”: “库克”, “态度”: “积极”} ), ] ) ] }, # ========== 学术论文配置 ========== “academic”: { “description”: “提取学术论文的关键信息”, “model”: “gpt-4o-mini”, “extraction_passes”: 2, “max_workers”: 10, “max_char_buffer”: 3000, “prompt”: “”” 提取学术论文中的: 1. 研究问题和假设 2. 研究方法 3. 主要发现和结论 4. 引用的重要文献 5. 数据和统计结果 保持学术术语的准确性。 “””, “examples”: [ lx.data.ExampleData( text=”本研究采用随机对照试验(RCT)方法,样本量n=500,结果显示治疗组相比对照组症状改善率提高了35% (p<0.001)。", extractions=[ lx.data.Extraction( extraction_class=”方法”, extraction_text=”随机对照试验(RCT)”, attributes={“类型”: “实验研究”} ), lx.data.Extraction( extraction_class=”样本”, extraction_text=”n=500″, attributes={“样本量”: “500”} ), lx.data.Extraction( extraction_class=”结果”, extraction_text=”症状改善率提高了35%”, attributes={“改善幅度”: “35%”, “统计显著性”: “p<0.001"} ), ] ) ] }, # ========== 客户反馈配置 ========== “feedback”: { “description”: “提取客户反馈中的情感和问题”, “model”: “gpt-3.5-turbo”, “extraction_passes”: 1, “max_workers”: 20, “max_char_buffer”: 1000, “prompt”: “”” 提取客户反馈中的: 1. 情感倾向(正面/负面/中性) 2. 产品或服务名称 3. 具体问题或建议 4. 客户需求 为每个提取内容标注情感强度(1-5分)。 “””, “examples”: [ lx.data.ExampleData( text=”你们的客服态度太差了!我打了3次电话都没解决问题,产品质量倒是不错,就是售后服务需要改进。”, extractions=[ lx.data.Extraction( extraction_class=”负面情感”, extraction_text=”客服态度太差了”, attributes={“对象”: “客服”, “强度”: “5”} ), lx.data.Extraction( extraction_class=”问题”, extraction_text=”打了3次电话都没解决问题”, attributes={“频次”: “3次”, “状态”: “未解决”} ), lx.data.Extraction( extraction_class=”正面情感”, extraction_text=”产品质量倒是不错”, attributes={“对象”: “产品质量”, “强度”: “3”} ), lx.data.Extraction( extraction_class=”建议”, extraction_text=”售后服务需要改进”, attributes={“改进点”: “售后服务”} ), ] ) ] }, # ========== 文学作品配置 ========== “literary”: { “description”: “提取文学作品(如小说、诗歌、戏剧)中的核心元素”, “model”: “gpt-4o-mini”, # 文学分析需要更强的理解能力 “extraction_passes”: 2, # 多轮提取以捕捉深层含义 “max_workers”: 10, “max_char_buffer”: 3000, “prompt”: “”” 提取文学作品中的以下要素: 1. 主要人物 (姓名、角色、性格特点) 2. 场景和氛围 (地点、时间、环境描述、情感基调) 3. 关键情节 (发生了什么,转折点) 4. 主题思想 (作品探讨的核心议题,如爱、死亡、成长、社会批判等) 5. 象征或意象 (具有特殊含义的物品或概念,如“红灯笼”象征压抑) 6. 修辞手法 (如比喻、拟人、排比等) 请精确引用原文,并为每个提取项提供相关属性。 “””, “examples”: [ lx.data.ExampleData( text=”我冒着严寒,回到相隔二千余里,别了二十余年的故乡去。时候既然是深冬,渐近故乡时,天气又阴晦了,冷风吹进船舱中,呜呜的响,从篷隙向外一望,苍黄的天底下,远近横着几个萧索的荒村,没有一些活气。”, extractions=[ lx.data.Extraction( extraction_class=”关键情节”, extraction_text=”回到…故乡去”, attributes={“事件类型”: “返乡”, “人物”: “我”} ), lx.data.Extraction( extraction_class=”场景”, extraction_text=”相隔二千余里,别了二十余年的故乡”, attributes={“地点”: “故乡”, “时间跨度”: “二十余年”, “距离”: “二千余里”} ), lx.data.Extraction( extraction_class=”氛围”, extraction_text=”萧索的荒村,没有一些活气”, attributes={“情感基调”: “荒凉、衰败”, “天气”: “严寒, 阴晦”} ), lx.data.Extraction( extraction_class=”意象”, extraction_text=”苍黄的天”, attributes={“色彩”: “苍黄”, “暗示”: “压抑、沉闷”} ), lx.data.Extraction( extraction_class=”修辞手法”, extraction_text=”冷风吹进船舱中,呜呜的响”, attributes={“类型”: “拟声/拟人”, “对象”: “冷风”} ), ] ) ] }, } # ========== 添加你自己的配置 ========== # 复制上面的模板,修改为你需要的提取规则 # “your_config”: { # “description”: “你的配置描述”, # “model”: “gpt-4o-mini”, # “prompt”: “你的提取指令…”, # “examples”: […] # }
import requests
import time
import os
class PDFProcessor:
def __init__(self, token):
self.token = token
self.headers = {
'Authorization': f'Bearer {token}'
}
def upload_file(self, file_path):
"""上传本地文件到临时存储"""
print(f"📤 正在上传文件: {os.path.basename(file_path)}")
# 使用 tmpfiles.org
try:
with open(file_path, 'rb') as f:
response = requests.post(
'https://tmpfiles.org/api/v1/upload',
files={'file': f}
)
if response.status_code == 200:
result = response.json()
# 获取直接下载链接
url = result['data']['url']
direct_url = url.replace('tmpfiles.org/', 'tmpfiles.org/dl/')
print(f"✅ 上传成功: {direct_url}")
return direct_url
except Exception as e:
print(f"❌ 上传失败: {e}")
return None
def process_pdf(self, file_path):
"""处理本地PDF文件"""
# 1. 上传文件
pdf_url = self.upload_file(file_path)
if not pdf_url:
return None
# 2. 创建MinerU任务
print("📄 创建解析任务...")
task_url = 'https://mineru.net/api/v4/extract/task'
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.token}'
}
data = {
'url': pdf_url,
'is_ocr': True,
'enable_formula': True,
'enable_table': True,
'language': 'auto' # 中文文档
}
response = requests.post(task_url, headers=headers, json=data)
result = response.json()
if result['code'] != 0:
print(f"❌ 创建任务失败: {result['msg']}")
return None
task_id = result['data']['task_id']
print(f"✅ 任务ID: {task_id}")
# 3. 等待处理完成
print("⏳ 等待处理...")
while True:
time.sleep(5)
status_url = f'https://mineru.net/api/v4/extract/task/{task_id}'
status_response = requests.get(status_url, headers=headers)
status_data = status_response.json()
state = status_data['data']['state']
if state == 'done':
zip_url = status_data['data']['full_zip_url']
print(f"✅ 处理完成!")
print(f"📦 下载地址: {zip_url}")
# 下载结果
self.download_result(zip_url, task_id)
return status_data
elif state == 'failed':
print(f"❌ 处理失败: {status_data['data']['err_msg']}")
return None
elif state == 'running':
progress = status_data['data'].get('extract_progress', {})
extracted = progress.get('extracted_pages', 0)
total = progress.get('total_pages', 0)
print(f"⏳ 正在处理... {extracted}/{total} 页")
else:
print(f"📊 状态: {state}")
def download_result(self, zip_url, task_id):
"""下载结果文件"""
save_path = f"mineru_result_{task_id}.zip"
try:
response = requests.get(zip_url, stream=True)
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"✅ 结果已保存到: {save_path}")
except Exception as e:
print(f"❌ 下载失败: {e}")
# 使用您的Token和文件
TOKEN = "eyJ0eXBlIjoiSldUIiwiYWxnIjoiSFM1MTIifQ.eyJqdGkiOiIyOTkwMzQ1NiIsInJvbCI6IlJPTEVfUkVHSVNURVIiLCJpc3MiOiJPcGVuWExhYiIsImlhdCI6MTc1MTM3NDYyOCwiY2xpZW50SWQiOiJsa3pkeDU3bnZ5MjJqa3BxOXgydyIsInBob25lIjoiMTg5NTIxNTUyNTAiLCJvcGVuSWQiOm51bGwsInV1aWQiOiI1YTA0YmUxMC1kMTJkLTQ3NzktYjYyYi1mM2U4NTRmZWI0YTQiLCJlbWFpbCI6IiIsImV4cCI6MTc1MjU4NDIyOH0.kKzBBW2Jp2sVh3HXRVRlz-Df8WMHcDB7PM1pZbrmn3_QEt39bw3OrNAf8OkrmgY9Kign1fpTxPRzxopOenqO7Q"
# 处理您的PDF文件
processor = PDFProcessor(TOKEN)
local_pdf = "/home/wang/1_time/00bjfei/宁德时代宁德时代新能源科技股份有限公司2024年半年度报告174页(1).pdf"
# 检查文件是否存在
if os.path.exists(local_pdf):
processor.process_pdf(local_pdf)
else:
print(f"❌ 文件不存在: {local_pdf}")
微信:adoresever
我的开源项目:
https://github.com/adoresever/DataGraphX_Learn
https://github.com/adoresever/Pretuning
fastapi-mcp安装部署
sudo snap install astral-uv (如果已经安装了uv,执行第二步)
如果失败,出现Command ‘uv’ not found之类
sudo snap install astral-uv –classic
用上面的命令可以无视风险继续安装uv
uv pip install fastapi-mcp “uvicorn[standard]”
使用uv安装mcp-proxy
uv tool install mcp-proxy
如果出现warning: /home/a/.local/bin is not on your PATH.
uv tool update-shell
关闭并重新打开终端
执行 which mcp-proxy
如果 PATH 设置正确,它应该会输出 /home/a/.local/bin/mcp-proxy)
创建 .py文件 和.env文件
python 文件名.py
代码
import os
import httpx
from fastapi import FastAPI, HTTPException, Query
from fastapi_mcp import FastApiMCP
from pydantic import BaseModel, Field
from dotenv import load_dotenv
import logging
import json
# --- 配置与设置 ---
load_dotenv()
API_KEY = os.getenv("TAVILY_API_KEY")
if not API_KEY:
raise ValueError("在环境变量或 .env 文件中未找到 TAVILY_API_KEY")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
TAVILY_API_URL = "https://api.tavily.com/search"
# --- Pydantic 模型 (目前保持不变) ---
class NewsResponse(BaseModel):
"""描述 Tavily 搜索工具返回的新闻摘要。"""
query: str = Field(..., description="用于搜索新闻的原始查询。")
news_summary: str = Field(..., description="由 Tavily AI 基于查询找到的近期新闻的更详细摘要,合并了多个来源。")
# --- FastAPI 应用 ---
app = FastAPI(
title="通过 MCP 使用 Tavily 搜索详细新闻", # 更新标题
description="使用 Tavily AI 搜索查找并合并近期新闻细节,基于查询,并通过 MCP 暴露。",
version="1.0.0",
)
# --- 新闻获取逻辑 (修改以获取更多细节) ---
async def fetch_detailed_news_from_tavily(news_query: str) -> str:
"""使用 Tavily AI 搜索新闻并合并结果以获取更多细节。"""
payload = json.dumps({
"api_key": API_KEY,
"query": news_query,
"search_depth": "advanced", # 使用高级搜索以获取可能更多的细节
"include_answer": False, # 如果你只想依赖结果内容,设为 False
"max_results": 7 # 增加考虑的结果数量
# "include_raw_content": True # 可选:可能获取更多原始内容,但需要仔细解析
})
headers = {'Content-Type': 'application/json'}
async with httpx.AsyncClient(timeout=30.0) as client: # 为高级搜索增加超时时间
response_data = None
try:
logger.info(f"正在向 Tavily AI 请求关于查询 '{news_query}' 的详细新闻摘要...")
response = await client.post(TAVILY_API_URL, headers=headers, content=payload)
response.raise_for_status()
response_data = response.json()
logger.debug(f"Tavily 对于查询 '{news_query}' 的原始响应: {response_data}")
# --- 重点在于合并结果 ---
combined_summary = ""
if response_data.get("results"):
summaries = []
for i, res in enumerate(response_data["results"]):
title = res.get('title', f"来源 {i+1}")
content = res.get('content', '').strip() # 获取内容并去除首尾空格
# url = res.get('url', '#') # 如果需要,可以包含 URL
if content: # 仅当有内容时才包含
#清晰地格式化每个结果
summaries.append(f"--- 结果 {i+1}: {title} ---\n{content}")
if summaries:
# 使用两个换行符连接所有摘要
combined_summary = "\n\n".join(summaries)
# 如果需要,可以限制总长度
# max_len = 2000 # 示例长度限制
# if len(combined_summary) > max_len:
# combined_summary = combined_summary[:max_len] + "..."
logger.info(f"已合并来自 Tavily 对于查询 '{news_query}' 的 {len(summaries)} 条结果。")
return combined_summary
else:
logger.warning(f"Tavily 对于查询 '{news_query}' 的结果没有可用的内容进行合并。")
# 如果根本没有找到可用的结果
logger.warning(f"Tavily 对于查询 '{news_query}' 没有返回可用的结果。")
raise HTTPException(status_code=404, detail=f"无法使用 Tavily AI 找到与查询 '{news_query}' 匹配的近期新闻细节。")
# --- 异常处理 (与天气示例大部分相同) ---
except httpx.HTTPStatusError as exc:
error_detail = "与 Tavily AI 服务通信时出错。"
try: error_body = exc.response.json(); error_detail = error_body.get("error", error_detail)
except ValueError: error_detail = exc.response.text or error_detail
logger.error(f"从 Tavily 获取查询 '{news_query}' 的新闻时发生 HTTP 错误: {exc.response.status_code} - 详情: {error_detail}")
if exc.response.status_code in [401, 403]: raise HTTPException(status_code=exc.response.status_code, detail=f"Tavily API 密钥错误: {error_detail}") from exc
raise HTTPException(status_code=exc.response.status_code, detail=error_detail) from exc
except httpx.RequestError as exc:
logger.error(f"从 Tavily 获取查询 '{news_query}' 的新闻时发生网络错误: {exc}")
raise HTTPException(status_code=503, detail="无法连接到 Tavily AI 服务。") from exc
except ValueError as exc:
logger.error(f"解码来自 Tavily 对查询 '{news_query}' 的 JSON 响应时出错: {exc}")
raise HTTPException(status_code=500, detail="从 Tavily AI 服务收到无效的数据格式。") from exc
except Exception as exc:
logger.exception(f"获取查询 '{news_query}' 的 Tavily 新闻时发生意外错误: {exc}")
raise HTTPException(status_code=500, detail="获取新闻摘要时发生意外的内部错误。") from exc
# --- FastAPI 端点 (调用新的获取函数) ---
@app.get(
"/news/search",
response_model=NewsResponse,
operation_id="search_detailed_recent_news_tavily", # 更新 operation_id
summary="搜索详细近期新闻 (通过 Tavily AI)", # 更新摘要
tags=["News Tools", "Tavily AI"]
)
async def search_news(
q: str = Query(..., min_length=3, description="用于搜索近期新闻的主题或查询。")
):
"""
使用 Tavily AI 的高级搜索查找并合并来自近期新闻的细节,基于用户查询。
返回一个合并了多个来源的更长的文本摘要。
"""
logger.info(f"收到关于查询的详细新闻搜索请求: {q}")
# 调用修改后的获取函数
news_summary_text = await fetch_detailed_news_from_tavily(q)
response_data = NewsResponse(
query=q,
news_summary=news_summary_text
)
logger.info(f"已通过 Tavily 成功检索到查询 '{q}' 的详细新闻摘要。")
return response_data
# --- FastAPI-MCP 集成 ---
logger.info("正在初始化 FastAPI-MCP...")
mcp = FastApiMCP(app)
mcp.mount()
logger.info("FastAPI-MCP 服务器已挂载到 /mcp")
# --- 运行服务器 ---
if __name__ == "__main__":
import uvicorn
logger.info("正在启动用于 Tavily 详细新闻的 Uvicorn 服务器...")
uvicorn.run(
"news_mcp:app", # 确保文件名匹配
host="127.0.0.1",
port=8000, # 如果天气服务也在运行,可以更改端口,例如 8001
reload=True
)
核心概念
智能体(Agents) :配置了指令、工具、防护栏和移交功能的LLM
移交(Handoffs) :允许智能体将控制权转移给其他智能体来处理特定任务
防护栏(Guardrails) :可配置的输入/输出验证安全检查
追踪(Tracing) :内置的智能体运行跟踪,用于调试和优化
安装
git clone https://github.com/openai/openai-agents-python.git
cd openai-agents-python
设置Python环境 :
python -m venv env source env/bin/activate # Linux/Mac # 或者在Windows上: # env\Scripts\activate
安装SDK :
pip install openai-agents
设置API密钥 :
# Linux/Mac export OPENAI_API_KEY=你的密钥 # Windows # set OPENAI_API_KEY=你的密钥
运行示例 :
# 例如,运行hello world示例 cd examples/basic python hello_world.py
实例分析 :
agents_as_tools.py :
展示了”代理即工具”模式
前线代理(orchestrator_agent)接收用户消息,然后从一组翻译代理中选择合适的工具
包含西班牙语、法语和意大利语翻译代理
最后使用合成代理检查翻译并产生最终响应
deterministic.py :
展示了确定性流程,每个步骤由特定代理执行
流程:生成故事大纲 → 检查大纲质量和类型 → 如果符合条件则创作故事
包含质量门控检查,如果大纲不是高质量或不是科幻故事则终止流程
input_guardrails.py :
展示了如何使用输入防护机制
防护机制在代理执行的同时并行运行
实现了一个检测用户是否请求解决数学作业的防护措施
如果防护触发,会用拒绝消息响应而不是正常处理请求
llm_as_a_judge.py :
展示了”LLM作为评判者”模式
第一个代理生成故事大纲,第二个代理评判并提供反馈
循环迭代直到评判代理对大纲满意为止
强制要求至少一次修改(”第一次尝试永远不给通过”)
output_guardrails.py :
展示了如何使用输出防护机制
检查代理输出是否包含敏感数据(这里是电话号码)
如果检测到敏感数据,防护措施会触发
parallelization.py :
展示了并行化模式
并行运行同一代理三次,产生三个不同的翻译结果
然后使用选择代理挑选最佳翻译
routing.py :
展示了交接/路由模式
分流代理接收第一条消息,然后根据请求的语言交给合适的代理
支持法语、西班牙语和英语代理
响应实时流式传输给用户
hello_world.py
这是最基本的例子,展示了如何创建一个简单的代理并运行它:
创建一个只用海口(haiku)回答的代理
使用 Runner.run()
执行代理
获取最终输出结果
dynamic_system_prompt.py
展示了如何基于上下文动态设置代理的指令:
创建自定义上下文类(CustomContext
),包含代理的风格
定义动态指令函数,根据风格返回不同的指令
随机选择风格(海口、海盗或机器人)并运行代理
agent_lifecycle_example.py
和 lifecycle_example.py
这两个文件展示了代理生命周期的监控:
定义自定义钩子类来监控代理的各种事件
使用工具(如 random_number
和 multiply_by_two
)
设置代理之间的交接逻辑(如果数字为奇数则交给乘法代理)
监控和输出代理执行的完整生命周期
区别在于 agent_lifecycle_example.py
使用 AgentHooks
绑定到特定代理,而 lifecycle_example.py
使用 RunHooks
绑定到整个运行过程。
stream_items.py
和 stream_text.py
展示了框架的流式输出功能:
stream_items.py
展示如何处理各种流式事件(工具调用、工具输出、消息输出等)
stream_text.py
展示如何直接流式输出文本内容
计算机控制 :通过 ComputerTool
和 AsyncComputer
接口,代理可以直接控制浏览器执行复杂的网页操作任务,这对于自动化网络交互、数据收集和UI测试非常有用。
向量搜索 :通过 FileSearchTool
,代理可以搜索预先准备的向量数据库,适用于文档检索、知识库问答等场景。
网络搜索 :通过 WebSearchTool
,代理可以直接搜索互联网获取实时信息,适合需要最新数据的场景。
演示
research_bot
web_search.py