微软Agent Framework多智能体协作框架:基于智谱AI的实现

项目结构.
├── main.py
├── pyproject.toml
└── src
    ├── __init__.py
    ├── my_multi_agent_project.egg-info
    │   ├── dependency_links.txt
    │   ├── PKG-INFO
    │   ├── SOURCES.txt
    │   └── top_level.txt
    ├── __pycache__
    │   ├── __init__.cpython-310.pyc
    │   ├── tools.cpython-310.pyc
    │   ├── web_search_agent.cpython-310.pyc
    │   ├── workflow.cpython-310.pyc
    │   └── zhipu_client.cpython-310.pyc
    ├── tools.py
    ├── web_search_agent.py
    ├── workflow.py
    └── zhipu_client.py
关键代码:
# /main.py 

# 1. 确保在所有其他导入之前加载环境变量
from dotenv import load_dotenv
load_dotenv()

import asyncio
from src.workflow import build_search_workflow
# 我们不再需要从 agent_framework 导入 ChatMessage,因为我们直接使用字符串

async def main():
    print("--- 正在启动“全能研究员”工作流 ---")
    
    # 2. 构建我们的工作流
    search_workflow = build_search_workflow()
    
    # 3. 这是我们写死的问题 (一个纯字符串)
    user_question = "最新的 agent framework 框架是什么?请详细介绍一下它的核心特性和与AutoGen, Semantic Kernel的关系。"
    print(f"\n用户问题: {user_question}")

    # 4. 【核心修正】: 直接将用户问题的字符串传递给 run 方法
    events = await search_workflow.run(user_question)
    
    # 5. 获取并打印最终输出
    final_output = events.get_outputs()
    print("\n--- 全能研究员的最终回答 ---")
    if final_output:
        # 输出现在可能是一个列表,我们打印第一个元素
        print(final_output[0])
    else:
        print("没有收到任何输出。")
    print("-" * 50)

if __name__ == "__main__":
    asyncio.run(main())
# /my-multi-agent-project/pyproject.toml

[project]
name = "my_multi_agent_project"
version = "0.1.0"

[tool.setuptools.packages.find]
where = ["src"]
# /src/tools.py 
from typing import Annotated

# 这个函数体不会被执行,它只是一个“规范”
async def zhipu_web_search(
    query: Annotated[str, "用于在互联网上查找信息的搜索查询字符串。"]
) -> str:
    """当你需要查找实时的、最新的信息时,请使用此工具。"""
    pass
# /src/web_search_agent.py

import os
import httpx
from agent_framework import Executor, WorkflowContext, handler
from typing_extensions import Never

class WebSearchAgent(Executor):
    """
    多智能体框架中的网页搜索智能体。
    直接调用智谱官方 search-std 接口。
    """
    def __init__(self, name: str):
        super().__init__(id=name)
        self.api_key = os.getenv("ZHIPU_API_KEY")
        if not self.api_key:
            raise ValueError("请在 .env 文件中设置 ZHIPU_API_KEY")

    @handler
    async def run_search(self, query: str, ctx: WorkflowContext[Never, str]) -> None:
        try:
            payload = {
                "search_query": query,
                "search_engine": "search_std"
            }
            headers = {
                "Authorization": f"Bearer {self.api_key}",
                "Content-Type": "application/json"
            }

            async with httpx.AsyncClient(timeout=60.0) as client:
                resp = await client.post(
                    "https://open.bigmodel.cn/api/paas/v4/web_search",
                    headers=headers,
                    json=payload
                )
                resp.raise_for_status()
                data = resp.json()

            results = data.get("data", [])
            if not results:
                assistant_content = "没有找到相关搜索结果。"
            else:
                SEARCH_TOP_K = 5
                assistant_content = "\n\n".join([
                    f"标题: {item.get('title', 'N/A')}\n"
                    f"发布日期: {item.get('publish_date', 'N/A')}\n"
                    f"摘要: {item.get('content', '')[:200]}...\n"
                    f"链接: {item.get('link', 'N/A')}"
                    for item in results[:SEARCH_TOP_K]
                ])

            await ctx.yield_output(assistant_content)

        except Exception as e:
            error_text = f"WebSearchAgent 执行搜索时出错: {e}"
            print(error_text)
            await ctx.yield_output(error_text)
# /src/workflow.py (最终极简版)

from agent_framework import WorkflowBuilder, Workflow
from .zhipu_client import ZhipuAIChatClient

def build_search_workflow() -> Workflow:
    """
    构建一个只包含单个“全能研究员” Agent 的工作流。
    """
    client = ZhipuAIChatClient()
    
    # 我们只创建这一个 Agent,并赋予它所有能力
    all_in_one_agent = client.create_executor(
        name="All_In_One_Researcher",
        instructions=(
            "你是一名高级AI研究助理。你的任务是深入理解用户的问题。"
            "如果问题需要最新的、外部的或互联网上的信息,你必须使用你的搜索能力来寻找答案。"
            "在获得信息后,你需要对信息进行提炼、总结,并给出一个全面、清晰、高质量的回答。"
        ),
        use_search=True # 开启搜索能力
    )

    # 工作流只包含这一个 Agent
    workflow = WorkflowBuilder().set_start_executor(all_in_one_agent).build()
    
    return workflow
# /src/zhipu_client.py 
import os
import json
import httpx
from typing_extensions import Never
from agent_framework import Executor, WorkflowContext, handler

class ZhipuExecutor(Executor):
    def __init__(self, name: str, instructions: str, model: str, api_key: str, use_search: bool = False):
        super().__init__(id=name)
        self.instructions = instructions; self.model = model; self.api_key = api_key; self.use_search = use_search
        self.api_endpoint = "https://open.bigmodel.cn/api/paas/v4/chat/completions"

    @handler
    async def run_zhipu_model(self, user_input: str, ctx: WorkflowContext[Never, str]) -> None:
        headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" }
        messages = [{"role": "system", "content": self.instructions}, {"role": "user", "content": user_input}]
        payload = { "model": self.model, "messages": messages }

        if self.use_search:
            # 【核心修正】: 我们尝试构造一个最符合文档规范的 web_search tool
            payload["tools"] = [
                {
                    "type": "web_search",
                    "web_search": {
                        "enable": True,
                        # 根据某些文档示例,可能需要一个 search_query 参数,即使是空的
                        # 我们先不加,如果再报“不能为空”,就回来加上 "search_query": ""
                    }
                }
            ]
            payload["tool_choice"] = "auto"
        
        try:
            print(f"--- 最终发送的 Payload ---\n{json.dumps(payload, indent=2, ensure_ascii=False)}\n--------------------------")
            async with httpx.AsyncClient(timeout=180.0) as client:
                response = await client.post(self.api_endpoint, json=payload, headers=headers)
                if response.status_code != 200:
                    print(f"!!! 服务器返回错误: {response.status_code} - {response.text}")
                response.raise_for_status()
                response_data = response.json()

            choice = response_data.get("choices", [{}])[0]
            message = choice.get("message", {})

            if message.get("tool_calls"):
                # 提取 web_search 返回的结果
                search_results = [tc["web_search"] for tc in message["tool_calls"] if "web_search" in tc]
                # 将搜索结果整合成一个字符串返回
                final_result = "搜索结果: \n" + json.dumps(search_results, ensure_ascii=False, indent=2)
                await ctx.yield_output(final_result)
            else:
                await ctx.yield_output(message.get("content", ""))
        except Exception as e:
            await ctx.yield_output(f"请求智谱 API 时发生错误: {e}")

class ZhipuAIChatClient:
    def __init__(self, api_key: str = None):
        self.api_key = api_key or os.getenv("ZHIPU_API_KEY")
        if not self.api_key: raise ValueError("ZHIPU_API_KEY 未设置。")

    def create_executor(self, name: str, instructions: str, model: str = "glm-4.5-air", use_search: bool = False) -> ZhipuExecutor:
        return ZhipuExecutor(name=name, instructions=instructions, model=model, api_key=self.api_key, use_search=use_search)

crawl4ai负责爬取,openai库调用DeepSeek进行分析,LaTeX负责生成报告

整体流程:

创建环境: conda create -n crawl_env python=3.11 -y
激活环境: conda activate crawl_env
初始化Conda:conda init bash
从网页抓取核心内容并转换为Markdown:pip install “crawl4ai>=0.6.0”
setup 用于安装浏览器依赖,doctor 用于诊断环境是否配置正确:crawl4ai-setup 和 crawl4ai-doctor
安装OpenAI官方的Python库用它来调用DeepSeek等兼容OpenAI API规范的模型:pip install openai
切换到root用户: sudo -i
安装思源系列中文字体:sudo apt-get install fonts-noto-cjk

目录结构:

源码:

# ai_analyzer.py (修正版,增强章节定位能力)
# 负责与DeepSeek API交互,进行多任务分析

import asyncio
import re
import json
import os
from openai import AsyncOpenAI
from config import DEEPSEEK_API_KEY, DEEPSEEK_BASE_URL, DEFAULT_MODEL, CACHE_FILE

client = AsyncOpenAI(api_key=DEEPSEEK_API_KEY, base_url=DEEPSEEK_BASE_URL)

def _load_cache() -> dict:
    # ... (这部分函数保持不变) ...
    if os.path.exists(CACHE_FILE):
        with open(CACHE_FILE, 'r', encoding='utf-8') as f:
            try:
                return json.load(f)
            except json.JSONDecodeError:
                return {}
    return {}

def _save_cache(cache_data: dict):
    # ... (这部分函数保持不变) ...
    with open(CACHE_FILE, 'w', encoding='utf-8') as f:
        json.dump(cache_data, f, ensure_ascii=False, indent=4)

async def _call_ai(system_prompt: str, user_content: str, model: str) -> str | None:
    # ... (这部分函数保持不变) ...
    try:
        completion = await client.chat.completions.create(
            model=model,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_content}
            ],
            stream=False
        )
        return completion.choices[0].message.content
    except Exception as e:
        print(f"❌ 调用AI时发生错误: {e}")
        return None

async def analyze_content(full_markdown: str, url: str, model: str = DEFAULT_MODEL) -> dict | None:
    cache = _load_cache()
    cache_key = url
    if cache_key in cache and all(k in cache[cache_key] for k in ["abstract_translation", "main_body_summary", "conclusion_summary"]):
        print("✅ 从本地缓存中加载AI分析结果!")
        return cache[cache_key]

    print(f"➡️ [步骤 2/4] 本地无缓存,正在连接AI进行分析 (模型: {model})...")
    
    prompts = {
        "abstract": "You are a professional academic translator. Your task is to accurately translate the following research paper abstract into simplified Chinese.",
        "main_body": "You are an expert academic analyst. Summarize the core contributions, methods, and key findings from the main body of the following article in about 300-500 words. Present your summary in a structured, easy-to-read format in simplified Chinese.",
        "conclusion": "You are an expert academic analyst. Your task is to summarize the conclusion section of the following article, highlighting the main takeaways and future work mentioned. Provide the summary in simplified Chinese."
    }
    abstract_regex = r'(?i)(?:#+\s*|\n)\s*(?:\d*\.?\s*)?Abstract\n(.*?)(?=\n#+\s|\n\d*\.?\s*Introduction)'
    conclusion_regex = r'(?i)(?:#+\s*|\n)\s*(?:\d*\.?\s*)?Conclusion(?:s)?\n(.*?)(?=\n#+\s|\n\d*\.?\s*(?:References|Acknowledgements|Appendix))'

    abstract_content_match = re.search(abstract_regex, full_markdown, re.DOTALL)
    conclusion_content_match = re.search(conclusion_regex, full_markdown, re.DOTALL)
    
    # 提取内容,如果找不到匹配项则提供明确提示
    abstract_text = abstract_content_match.group(1).strip() if abstract_content_match else "Abstract not found in the document."
    conclusion_text = conclusion_content_match.group(1).strip() if conclusion_content_match else "Conclusion not found in the document."

    # 主体内容逻辑保持不变
    main_body_content = full_markdown
    if abstract_content_match and conclusion_content_match:
       main_body_start = abstract_content_match.end()
       main_body_end = conclusion_content_match.start()
       main_body_content = full_markdown[main_body_start:main_body_end]

    tasks = {
        "abstract_translation": _call_ai(prompts["abstract"], abstract_text, model),
        "main_body_summary": _call_ai(prompts["main_body"], main_body_content, model),
        "conclusion_summary": _call_ai(prompts["conclusion"], conclusion_text, model),
    }
    
    results = await asyncio.gather(*tasks.values())
    summaries = dict(zip(tasks.keys(), results))

    if not all(summaries.values()):
        print("❌ AI总结失败,部分内容未能生成。")
        return None
    
    print("✅ AI分析完成!正在将结果存入本地缓存...")
    
    cache[cache_key] = summaries
    _save_cache(cache)
    
    return summaries

修改你需要的模型:

# config.py
# 存放所有配置信息

import os

# ==============================================================================
#  API与模型配置
# ==============================================================================

# 您的API密钥。
DEEPSEEK_API_KEY = "  "

# DeepSeek的API服务器地址
DEEPSEEK_BASE_URL = "https://api.deepseek.com/v1"

# 要使用的AI模型名称
DEFAULT_MODEL = "deepseek-chat"

# ==============================================================================
#  输出配置
# ==============================================================================

# 生成的报告存放的文件夹名称
OUTPUT_DIR = "latex_reports"

# ==============================================================================
#  缓存配置
# ==============================================================================
# AI分析结果的缓存文件
CACHE_FILE = "ai_cache.json"
# crawler.py
# 负责爬取网页内容

import re
from crawl4ai import AsyncWebCrawler

async def fetch_article_data(url: str) -> tuple[str | None, str | None]:
    """
    爬取指定URL的网页,提取Markdown内容和标题。
    
    返回: (markdown_content, title) 或 (None, None)
    """
    print(f"➡️ [步骤 1/4] 正在爬取文献内容: {url}")
    crawler = AsyncWebCrawler()
    try:
        result = await crawler.arun(url=url)
        if not result or not result.markdown:
            print(f"❌ 爬取失败: 未能从 {url} 提取到有效内容。")
            return None, None
        
        # 尝试从Markdown中提取第一个一级标题
        title_match = re.search(r"^#\s+(.*)", result.markdown, re.MULTILINE)
        title = title_match.group(1).strip() if title_match else "Untitled Document"
        
        print("✅ 内容爬取成功!")
        return result.markdown, title
    except Exception as e:
        print(f"❌ 爬取时发生错误: {e}")
        return None, None
# report_generator.py
# 负责生成LaTeX源码并编译成PDF

import os
import re
import subprocess
from datetime import datetime
from config import OUTPUT_DIR

def _latex_escape(text: str) -> str:
    """对文本进行转义以安全插入LaTeX。"""
    replacements = {
        '&': r'\&', '%': r'\%', '$': r'\$', '#': r'\#', '_': r'\_',
        '{': r'\{', '}': r'\}', '~': r'\textasciitilde{}',
        '^': r'\textasciicircum{}', '\\': r'\textbackslash{}',
    }
    return re.sub(r'[&%$#_{}\\~^]', lambda m: replacements[m.group(0)], text)

def _create_latex_source(data: dict) -> str:
    """根据数据生成LaTeX源文件内容。"""
    title_escaped = _latex_escape(data['title'])
    url_escaped = _latex_escape(data['url'])
    abstract_escaped = _latex_escape(data.get('abstract_translation', ''))
    main_body_escaped = _latex_escape(data.get('main_body_summary', ''))
    conclusion_escaped = _latex_escape(data.get('conclusion_summary', ''))

    latex_template = rf"""
\documentclass[12pt, a4paper]{{article}}
\usepackage{{ctex}}
\usepackage[top=2.5cm, bottom=2.5cm, left=2.5cm, right=2.5cm]{{geometry}}
\usepackage{{fancyhdr}}
\usepackage{{hyperref}}
\usepackage{{titling}}
\setmainfont{{Times New Roman}}

\pagestyle{{fancy}}
\fancyhf{{}}
\fancyhead[C]{{{title_escaped}}}
\fancyfoot[C]{{\thepage}}
\renewcommand{{\headrulewidth}}{{0.4pt}}
\renewcommand{{\footrulewidth}}{{0.4pt}}

\pretitle{{\begin{{center}}\LARGE\bfseries}}\posttitle{{\end{{center}}}}
\preauthor{{\begin{{center}}\large}}\postauthor{{\end{{center}}}}
\predate{{\begin{{center}}\large}}\postdate{{\end{{center}}}}

\title{{{title_escaped}}}
\author{{文献来源: \href{{{url_escaped}}}{{{url_escaped}}}}}
\date{{AI总结报告生成于: {data['date']}}}

\begin{{document}}
\maketitle
\thispagestyle{{fancy}}
\section*{{摘要翻译}}
{abstract_escaped}
\section*{{核心内容总结}}
{main_body_escaped}
\section*{{结论总结}}
{conclusion_escaped}
\end{{document}}
"""
    return latex_template


def generate_pdf_report(report_data: dict):
    print("➡️ [步骤 3/4] 正在生成LaTeX报告源文件...")
    tex_source = _create_latex_source(report_data)
    print("✅ LaTeX源文件生成完毕!")
    
    print("➡️ [步骤 4/4] 正在编译PDF报告...")
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    
    title = report_data.get('title', 'report')
    filename_base = re.sub(r'[\\/*?:"<>|]', "", title).replace(" ", "_")[:50]
    tex_filepath = os.path.join(OUTPUT_DIR, f"{filename_base}.tex")
    
    with open(tex_filepath, 'w', encoding='utf-8') as f:
        f.write(tex_source)

    command = ['xelatex', '-interaction=nonstopmode', f'-output-directory={OUTPUT_DIR}', tex_filepath]
    
    for i in range(2):
        print(f"   ... LaTeX编译中 (第 {i+1}/2 轮)")
        result = subprocess.run(command, capture_output=True, text=True, encoding='utf-8')
        if result.returncode != 0:
            log_path = os.path.join(OUTPUT_DIR, f'{filename_base}.log')
            print(f"❌ PDF编译失败!请查看日志: {log_path}")
            print("-" * 20 + " LaTeX 错误日志 " + "-" * 20)
            if os.path.exists(log_path):
                with open(log_path, 'r', encoding='utf-8') as log_file:
                    print("".join(log_file.readlines()[-30:]))
            print("-" * 55)
            return
            
    for ext in ['.aux', '.log', '.out', '.tex']:
        try:
            os.remove(os.path.join(OUTPUT_DIR, f"{filename_base}{ext}"))
        except OSError:
            pass

    pdf_filepath = os.path.join(OUTPUT_DIR, f"{filename_base}.pdf")
    print(f"🎉 报告生成成功!文件已保存至: {os.path.abspath(pdf_filepath)}")

# run.py (带缓存功能)
# 主程序入口,负责调度所有模块

import asyncio
import sys
import argparse
import subprocess
import os
import json
from datetime import datetime

import config
from crawler import fetch_article_data
from ai_analyzer import analyze_content
from report_generator import generate_pdf_report

def check_dependencies():
    """检查必要的外部依赖(API密钥和LaTeX)。"""
    if not config.DEEPSEEK_API_KEY:
         print("❌ 错误: API密钥未在 config.py 中配置!")
         sys.exit(1)
    
    try:
        subprocess.run(['xelatex', '-version'], check=True, capture_output=True)
    except (subprocess.CalledProcessError, FileNotFoundError):
        print("❌ 错误: 系统中未找到 'xelatex' 命令。请先安装LaTeX发行版。")
        sys.exit(1)

async def main():
    """主执行流程"""
    parser = argparse.ArgumentParser(description="学术文献AI总结报告生成器 V2.1 (带缓存)")
    parser.add_argument('url', help="要处理的学术文献URL。")
    parser.add_argument('--model', default=config.DEFAULT_MODEL, help=f"使用的DeepSeek模型 (默认: {config.DEFAULT_MODEL})。")
    parser.add_argument('--force-reanalyze', action='store_true', help="强制重新进行AI分析,忽略此URL的现有缓存。")
    args = parser.parse_args()

    # 如果用户选择强制刷新,我们先从缓存中删除该URL的记录
    if args.force_reanalyze and os.path.exists(config.CACHE_FILE):
        print("🌀 用户选择强制重新分析,将更新此URL的缓存。")
        try:
            with open(config.CACHE_FILE, 'r', encoding='utf-8') as f:
                cache = json.load(f)
            if args.url in cache:
                del cache[args.url]
                with open(config.CACHE_FILE, 'w', encoding='utf-8') as f:
                    json.dump(cache, f, ensure_ascii=False, indent=4)
                print(f"   已从缓存中移除URL: {args.url}")
        except (json.JSONDecodeError, FileNotFoundError):
            pass # 如果缓存文件有问题,忽略即可

    # 1. 爬取
    markdown, title = await fetch_article_data(args.url)
    if not markdown:
        return

    # 2. AI分析
    summaries = await analyze_content(markdown, args.url, args.model)
    if not summaries:
        return

    # 3. 整合数据并生成报告
    report_data = {
        "title": title,
        "url": args.url,
        "date": datetime.now().strftime('%Y年%m月%d日'),
        **summaries
    }
    generate_pdf_report(report_data)

if __name__ == "__main__":
    print("--- 启动报告生成器 (带缓存功能) ---")
    check_dependencies()
    asyncio.run(main())
    print("--- 报告生成器运行完毕 ---")

爬取生成PDF:在latex_reports文件夹下

AutoGen框架深度解析:我如何用Python构建一个“AI足球分析师”团队?

*注意修改配置栏*

import asyncio
import json
import os
from datetime import datetime
from tavily import TavilyClient
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.messages import TextMessage

# ================== 配置 ==================
API_KEY = “sk-your API_KEY”
BASE_URL = “模型对应的URL”
MODEL = “模型名称”
TAVILY_API_KEY = “tvly-your TAVILY_API_KEY”

# 初始化客户端
model_client = OpenAIChatCompletionClient(model=MODEL, base_url=BASE_URL, api_key=API_KEY, model_info={“vision”: False, “function_calling”: True, “json_output”: True, “family”: “gpt-4”}, temperature=0.3)
tavily_client = TavilyClient(api_key=TAVILY_API_KEY)

# ================== 定义专家Agent团队 (全自动版) ==================

# Agent 1: 足球数据研究员 (负责规划与搜索)
research_planner = AssistantAgent(
name=”Research_Planner”,
model_client=model_client,
system_message=”””你是顶级的足球数据研究员。你的任务是接收用户关于一场比赛的预测问题,
然后制定一个全面的数据搜集计划,用于Tavily搜索引擎。

你的计划必须涵盖以下几个方面,以确保分析的全面性:
1. 两队之间的**历史交锋记录** (Head-to-Head)。
2. 每支球队**各自最近的比赛战绩**和状态 (Recent Form)。
3. 任何关于球队的**最新新闻**,如关键球员伤病、教练变动等 (Latest News)。

你的输出必须是标准的JSON格式,只包含一个’search_queries’键,其值为一个关键词列表。

示例输入: “预测 皇家马德里 vs 巴塞罗那 的比分”
你的输出:
{
“search_queries”: [
“Real Madrid vs Barcelona head to head results”,
“Real Madrid recent match results La Liga”,
“Barcelona recent match results La Liga”,
“Real Madrid team news injuries”,
“Barcelona team news formation”
]
}
“””
)

# Agent 2: 首席战术分析与评论员 (负责分析与报告)
final_reporter = AssistantAgent(
name=”Final_Reporter”,
model_client=model_client,
system_message=”””你是世界顶级的足球战术分析师兼专栏作家。
你的任务是接收一堆从网络上搜集来的、关于一场比赛的原始、非结构化文本资料。

你需要执行以下两个核心步骤来完成任务:
1. **数据提取与分析**: 首先,你必须仔细阅读所有资料,从中提取出结构化的关键信息,特别是【以往战绩】(包括历史交锋和近期战绩)。然后,基于这些提取出的数据进行深度战术分析,并形成你自己的胜率和比分预测。
2. **报告撰写**: 接着,你需要将你的所有分析和提取出的数据,撰写成一篇精彩、完整的赛前分析报告。

你的最终报告**必须**包含一个清晰的【以往战绩】部分,详细列出你找到的比赛记录。
报告的整体结构应包括:标题、核心看点、以往战绩回顾、战术分析、胜率与比分预测、总结。
“””
)

# ================== 工具与辅助函数 ==================

def perform_web_search_tool(queries: list) -> str:
print(f”\n— [Tool: Web Search] 正在执行深度搜索… —“)
raw_content = “”
try:
# 为了获取更全面的信息,我们对每个查询请求更多结果
all_results = []
for query in queries:
response = tavily_client.search(query=query, search_depth=”advanced”, max_results=5)
all_results.extend(response[‘results’])
raw_content = “\n\n—\n\n”.join([f”来源: {item.get(‘url’, ‘N/A’)}\n内容: {item.get(‘content’, ”)}” for item in all_results])
print(f”— [Tool: Web Search] 搜索完成,共找到 {len(all_results)} 条结果 —“)
except Exception as e:
print(f”— [Tool: Web Search] 搜索出错: {e}”)
return raw_content

def save_report_to_file(report_content: str, teams_input: str) -> None:
os.makedirs(“soccer_automated_reports”, exist_ok=True)
timestamp = datetime.now().strftime(“%Y%m%d_%H%M%S”)
safe_subject = “”.join([c if c.isalnum() else “_” for c in teams_input.replace(“vs”, “”)[:30]])
filename = f”soccer_automated_reports/{safe_subject}_{timestamp}.txt”
with open(filename, ‘w’, encoding=’utf-8′) as f: f.write(report_content)
print(f”\n— [System] 最终分析报告已保存至: {filename} —“)

# ================== 主工作流 (全自动版) ==================

async def process_prediction_request(core_question: str):
print(f”\n> 收到预测任务: {core_question}”)

# — 阶段一: 研究员规划并执行搜索 —
print(“\n— [Research Planner] 正在规划数据搜集策略… —“)

planner_input = TextMessage(content=core_question, source=”user”)
response_planner = await research_planner.on_messages([planner_input], None)
plan_json = response_planner.chat_message.content

print(“— [Research Planner] 搜集计划已生成 —“)
print(plan_json)

try:
plan = json.loads(plan_json.strip().lstrip(““`json”).rstrip(““`”))
search_queries = plan.get(“search_queries”, [])
except json.JSONDecodeError:
print(“错误:研究员未能生成有效的JSON计划,任务中断。”)
return

if not search_queries:
print(“未能从计划中提取到搜索关键词。”)
return

# (执行工具)
raw_data_from_web = perform_web_search_tool(search_queries)

# — 阶段二: 最终报告人进行分析与撰写 —
print(“\n— [Final Reporter] 正在分析数据并撰写最终报告… —“)

reporter_prompt = f”””
请基于以下从网络上搜集到的关于 ‘{core_question}’ 的原始资料,
提取关键战绩,进行深度分析,并撰写一份包含【以往战绩】的最终报告。

【原始资料】:

{raw_data_from_web}

“””
reporter_input = TextMessage(content=reporter_prompt, source=”user”)
response_reporter = await final_reporter.on_messages([reporter_input], None)
final_report = response_reporter.chat_message.content

print(“\n” + “=”*60 + “\n📋 最终分析报告 📋\n” + “=”*60)
print(final_report)

# 提取队名用于保存文件
try:
teams_input = core_question.split(“预测”)[1].split(“的”)[0].strip()
except:
teams_input = “match_prediction”

save_report_to_file(final_report, teams_input)


async def main():
print(“=”*60 + “\n⚽ 足球赛事预测系统启动 ⚽\n” + “=”*60)

while True:
print(“\n” + “#”*60)
core_question = input(“请输入您想预测的比赛 (例如: 预测 徐州队 vs 无锡队 的胜负和比分),或输入’exit’退出:\n> “)
if core_question.strip().lower() in [‘exit’, ‘quit’]:
print(“感谢使用,再见!”)
break
if not core_question.strip():
continue

await process_prediction_request(core_question)

if __name__ == “__main__”:
try:
asyncio.run(main())
except KeyboardInterrupt:
print(“\n程序被中断。”)

如果下载失败,复制下面代码命名requirements.txt与上段代码(soccer_analyst_v3.py)保存在同一文件夹下

aiofiles==24.1.0
annotated-types==0.7.0
anyio==4.10.0
attrs==25.3.0
autogen-agentchat==0.7.2
autogen-core==0.7.2
autogen-ext==0.7.2
beautifulsoup4==4.13.4
certifi==2025.8.3
cffi==1.17.1
charset-normalizer==3.4.2
coloredlogs==15.0.1
cryptography==45.0.6
ddddocr==1.5.6
Deprecated==1.2.18
distro==1.9.0
exceptiongroup==1.3.0
flatbuffers==25.2.10
greenlet==3.2.4
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
humanfriendly==10.0
idna==3.10
img2pdf==0.6.1
importlib_metadata==8.7.0
jiter==0.10.0
jsonref==1.1.0
lxml==6.0.0
mpmath==1.3.0
numpy==2.2.6
onnxruntime==1.22.1
openai==1.99.3
opencv-python-headless==4.12.0.88
opentelemetry-api==1.36.0
outcome==1.3.0.post0
packaging==25.0
pdfminer.six==20250506
pdfplumber==0.11.7
pikepdf==9.10.2
pillow==11.3.0
playwright==1.54.0
protobuf==5.29.5
pycparser==2.22
pydantic==2.11.7
pydantic_core==2.33.2
pyee==13.0.0
PyMuPDF==1.26.3
pypdfium2==4.30.0
PySocks==1.7.1
pytesseract==0.3.13
python-dotenv==1.1.1
regex==2025.7.34
requests==2.32.4
selenium==4.34.2
sniffio==1.3.1
sortedcontainers==2.4.0
soupsieve==2.7
sympy==1.14.0
tavily-python==0.7.10
tiktoken==0.10.0
tqdm==4.67.1
trio==0.30.0
trio-websocket==0.12.2
typing-inspection==0.4.1
typing_extensions==4.14.1
undetected-chromedriver==3.5.5
urllib3==2.5.0
webdriver-manager==4.0.2
websocket-client==1.8.0
websockets==15.0.1
wrapt==1.17.2
wsproto==1.2.0
zipp==3.23.0

指令1: (如果需要) 安装兼容的Python版本 (以Python 3.11为例)

如果你的系统已经有python3.11,可以跳过此步

sudo apt update
sudo apt install python3.11 python3.11-venv

指令2: 创建一个名为 .venv 的虚拟环境

python3.11 -m venv .venv

指令3: 激活(进入)这个虚拟环境

source .venv/bin/activate

指令4: (推荐) 升级环境中的pip工具

pip install –upgrade pip

指令5: 安装项目所需的所有依赖库

pip install -r requirements.txt

指令6:运行脚本

python soccer_analyst_v3.py

大模型微调利器LLaMA-Factory深度解析:一个集成可视化训练、多模型适配、高效PEFT算法与端到端评估的语言模型微调框架。

分享我的Colab文件:

https://colab.research.google.com/drive/1XIi1E_L0dod2CoJIz9QVEYCjQKTKAuhT?usp=sharing

把它另存到你的本地Colab笔记本(详细步骤见2025-08-11文章)

点击左侧上传你的数据集文件.json格式。

一定要修改第二个单元格代码filepath = ” “(你上传的数据集的名称)

免费 T4 申请教程:https://zhuanlan.zhihu.com/p/642542618

点击Run all一键运行所有代码格

点击Running on public URL的链接: https://b1b7e1e391675e8e31.gradio.live

GitHub提供文档有详细介绍:https://github.com/hiyouga/LLaMA-Factory/blob/main/README_zh.md

提供的视频非常详细,一看就会。

LangExtract第二期代码文件

可加V:adoresever

请先在.env文件中设置好 OPENAI_API_KEY
1.extract_tool.py :
“””
通用信息提取工具
使用方法: python extract_tool.py <配置名称> <输入文件>
“””

import langextract as lx
import os
import sys
import argparse
from datetime import datetime
from pathlib import Path
from dotenv import load_dotenv
from collections import Counter

# 导入配置
from config import EXTRACTION_CONFIGS

# 自动加载 .env 文件,同时加载 OPENAI_API_KEY 和 LANGEXTRACT_API_KEY
load_dotenv()

def process_document(config_name, input_file):
“””
根据配置处理文件 (已整合所有成功秘诀)
“””
# — 1. 配置和文件检查 —
if config_name not in EXTRACTION_CONFIGS:
print(f”❌ 错误: 未找到配置 ‘{config_name}'”)
sys.exit(1)

if not Path(input_file).exists():
print(f”❌ 错误: 文件不存在 ‘{input_file}'”)
sys.exit(1)

config = EXTRACTION_CONFIGS[config_name]

print(“-” * 50)
print(f”📁 处理文件: {input_file} | ⚙️ 配置: {config_name}”)
print(f”📝 任务描述: {config.get(‘description’, ‘无描述’)}”)

with open(input_file, ‘r’, encoding=’utf-8′) as f:
text = f.read()
print(f”📄 文件长度: {len(text):,} 字符”)

# — 2. 构建提取参数 —
model_id = config.get(“model”, “gpt-4o-mini”)

extract_params = {
“text_or_documents”: text,
“prompt_description”: config[“prompt”],
“examples”: config[“examples”],
“model_id”: model_id,
“extraction_passes”: config.get(“extraction_passes”, 1),
“max_workers”: config.get(“max_workers”, 10),
“max_char_buffer”: config.get(“max_char_buffer”, 2000)
}

# — 3. 针对不同模型应用特殊配置 (核心秘诀) —
if ‘gpt’ in model_id.lower():
print(f”🔧 检测到 OpenAI 模型 ({model_id}),应用特殊配置…”)
from langextract.inference import OpenAILanguageModel

api_key = os.environ.get(“OPENAI_API_KEY”)
if not api_key:
print(“\n❌ 致命错误: 未在 .env 文件中找到 ‘OPENAI_API_KEY’。”)
print(” 请在 .env 文件中添加一行: OPENAI_API_KEY=sk-…”)
sys.exit(1)

extract_params.update({
“language_model_type”: OpenAILanguageModel,
“api_key”: api_key,
“fence_output”: True,
“use_schema_constraints”: False
})
else:
# 默认使用 Gemini 或其他模型,langextract 会自动寻找 LANGEXTRACT_API_KEY
print(f”🔧 使用默认或 Gemini 模型 ({model_id}) 配置…”)
api_key = os.environ.get(“LANGEXTRACT_API_KEY”)
if not api_key:
print(f”\n❌ 致命错误: 未找到适用于 ‘{model_id}’ 的 API 密钥。”)
print(” 请在 .env 文件中添加: LANGEXTRACT_API_KEY=…”)
sys.exit(1)

# — 4. 执行提取 —
print(“🚀 开始提取,请耐心等待…”)
result = lx.extract(**extract_params)

if not result.extractions:
print(“\n⚠️ 警告:本次运行未能提取到任何实体。”)
print(” 请检查: 1. Prompt和示例是否清晰。 2. API密钥和账户额度。 3. 源文件内容是否匹配任务。”)
return

# — 5. 保存和可视化 (使用最新、正确的方式) —
output_dir = Path(“extraction_results”)
output_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime(“%Y%m%d_%H%M%S”)
output_base_name = f”{config_name}_{Path(input_file).stem}_{timestamp}”

jsonl_path = output_dir / f”{output_base_name}.jsonl”
lx.io.save_annotated_documents([result], output_name=str(jsonl_path.name), output_dir=str(output_dir))

html_path = output_dir / f”{output_base_name}.html”
html_content = lx.visualize(str(jsonl_path))
with open(html_path, “w”, encoding=”utf-8″) as f:
f.write(html_content)

print(“\n✅ 提取完成!”)
print(f” • 数据文件: {jsonl_path}”)
print(f” • 可视化报告: {html_path}”)

# — 6. 结果统计 —
class_counts = Counter(e.extraction_class for e in result.extractions)
print(“\n📊 提取统计:”)
print(f” 总实体数: {len(result.extractions)}”)
for class_name, count in sorted(class_counts.items()):
print(f” – {class_name}: {count} 个”)

def main():
parser = argparse.ArgumentParser(description=”通用信息提取工具 (最终增强版)”)
parser.add_argument(“config_name”, help=”配置名称 (如 medical, fable 等)”)
parser.add_argument(“input_file”, help=”输入文件路径”)
args = parser.parse_args()
process_document(args.config_name, args.input_file)

if __name__ == “__main__”:
main()


2.config.py :
“””
信息提取配置文件
在这里定义不同类型文档的提取规则
“””

import langextract as lx

# 提取配置字典
EXTRACTION_CONFIGS = {

# ========== 医疗记录配置 ==========
“medical”: {
“description”: “提取医疗记录中的药物、诊断和治疗信息”,
“model”: “gpt-4o-mini”, # 可选: gpt-4o, gpt-4, gpt-3.5-turbo
“extraction_passes”: 2, # 多轮提取提高准确率
“max_workers”: 10,
“max_char_buffer”: 1500,
“prompt”: “””
提取以下医疗信息:
1. 药物名称、剂量、用药频率、用药途径
2. 诊断结果
3. 症状描述
4. 治疗方案
使用原文精确提取,不要改写或总结。
为每个实体提供相关属性。
“””,
“examples”: [
lx.data.ExampleData(
text=”患者主诉头痛3天,诊断为偏头痛。处方:布洛芬400mg,口服,每日3次,饭后服用。”,
extractions=[
lx.data.Extraction(
extraction_class=”症状”,
extraction_text=”头痛3天”,
attributes={“持续时间”: “3天”}
),
lx.data.Extraction(
extraction_class=”诊断”,
extraction_text=”偏头痛”,
attributes={“类型”: “神经系统疾病”}
),
lx.data.Extraction(
extraction_class=”药物”,
extraction_text=”布洛芬”,
attributes={
“剂量”: “400mg”,
“途径”: “口服”,
“频率”: “每日3次”,
“注意事项”: “饭后服用”
}
),
]
)
]
},

# ========== 合同文件配置 ==========
“contract”: {
“description”: “提取合同中的关键条款和信息”,
“model”: “gpt-4o-mini”, # 合同一般比较规范,可以用便宜的模型
“extraction_passes”: 1,
“max_workers”: 5,
“max_char_buffer”: 2000,
“prompt”: “””
提取合同中的以下信息:
1. 甲方和乙方信息(公司名称、代表人)
2. 合同金额和支付条款
3. 合同期限和重要日期
4. 主要权利和义务
5. 违约条款
保持原文的准确性,提取完整条款。
“””,
“examples”: [
lx.data.ExampleData(
text=”甲方:北京科技有限公司(法定代表人:张明),乙方:上海贸易有限公司,合同总金额:人民币100万元整,合同期限:2024年1月1日至2024年12月31日。”,
extractions=[
lx.data.Extraction(
extraction_class=”甲方”,
extraction_text=”北京科技有限公司”,
attributes={“法定代表人”: “张明”}
),
lx.data.Extraction(
extraction_class=”乙方”,
extraction_text=”上海贸易有限公司”,
attributes={}
),
lx.data.Extraction(
extraction_class=”金额”,
extraction_text=”人民币100万元整”,
attributes={“币种”: “人民币”, “数额”: “100万元”}
),
lx.data.Extraction(
extraction_class=”期限”,
extraction_text=”2024年1月1日至2024年12月31日”,
attributes={“开始日期”: “2024年1月1日”, “结束日期”: “2024年12月31日”}
),
]
)
]
},

# ========== 新闻文章配置 ==========
“news”: {
“description”: “提取新闻文章的关键要素”,
“model”: “gpt-4o-mini”,
“extraction_passes”: 1,
“max_workers”: 15,
“max_char_buffer”: 2500,
“prompt”: “””
提取新闻文章中的:
1. 人物(姓名、职位、所属机构)
2. 地点
3. 时间
4. 事件(发生了什么)
5. 引用的言论
标注每个事件的重要性(高/中/低)。
“””,
“examples”: [
lx.data.ExampleData(
text=”昨天下午,苹果公司CEO库克在加州总部宣布,公司将投资50亿美元开发AI技术。他表示:’这是苹果历史上最重要的战略转型。'”,
extractions=[
lx.data.Extraction(
extraction_class=”时间”,
extraction_text=”昨天下午”,
attributes={“精确度”: “相对时间”}
),
lx.data.Extraction(
extraction_class=”人物”,
extraction_text=”库克”,
attributes={“职位”: “CEO”, “公司”: “苹果公司”}
),
lx.data.Extraction(
extraction_class=”地点”,
extraction_text=”加州总部”,
attributes={“类型”: “公司总部”}
),
lx.data.Extraction(
extraction_class=”事件”,
extraction_text=”投资50亿美元开发AI技术”,
attributes={“重要性”: “高”, “金额”: “50亿美元”, “领域”: “AI技术”}
),
lx.data.Extraction(
extraction_class=”言论”,
extraction_text=”这是苹果历史上最重要的战略转型”,
attributes={“发言人”: “库克”, “态度”: “积极”}
),
]
)
]
},

# ========== 学术论文配置 ==========
“academic”: {
“description”: “提取学术论文的关键信息”,
“model”: “gpt-4o-mini”,
“extraction_passes”: 2,
“max_workers”: 10,
“max_char_buffer”: 3000,
“prompt”: “””
提取学术论文中的:
1. 研究问题和假设
2. 研究方法
3. 主要发现和结论
4. 引用的重要文献
5. 数据和统计结果
保持学术术语的准确性。
“””,
“examples”: [
lx.data.ExampleData(
text=”本研究采用随机对照试验(RCT)方法,样本量n=500,结果显示治疗组相比对照组症状改善率提高了35% (p<0.001)。",
extractions=[
lx.data.Extraction(
extraction_class=”方法”,
extraction_text=”随机对照试验(RCT)”,
attributes={“类型”: “实验研究”}
),
lx.data.Extraction(
extraction_class=”样本”,
extraction_text=”n=500″,
attributes={“样本量”: “500”}
),
lx.data.Extraction(
extraction_class=”结果”,
extraction_text=”症状改善率提高了35%”,
attributes={“改善幅度”: “35%”, “统计显著性”: “p<0.001"}
),
]
)
]
},

# ========== 客户反馈配置 ==========
“feedback”: {
“description”: “提取客户反馈中的情感和问题”,
“model”: “gpt-3.5-turbo”,
“extraction_passes”: 1,
“max_workers”: 20,
“max_char_buffer”: 1000,
“prompt”: “””
提取客户反馈中的:
1. 情感倾向(正面/负面/中性)
2. 产品或服务名称
3. 具体问题或建议
4. 客户需求
为每个提取内容标注情感强度(1-5分)。
“””,
“examples”: [
lx.data.ExampleData(
text=”你们的客服态度太差了!我打了3次电话都没解决问题,产品质量倒是不错,就是售后服务需要改进。”,
extractions=[
lx.data.Extraction(
extraction_class=”负面情感”,
extraction_text=”客服态度太差了”,
attributes={“对象”: “客服”, “强度”: “5”}
),
lx.data.Extraction(
extraction_class=”问题”,
extraction_text=”打了3次电话都没解决问题”,
attributes={“频次”: “3次”, “状态”: “未解决”}
),
lx.data.Extraction(
extraction_class=”正面情感”,
extraction_text=”产品质量倒是不错”,
attributes={“对象”: “产品质量”, “强度”: “3”}
),
lx.data.Extraction(
extraction_class=”建议”,
extraction_text=”售后服务需要改进”,
attributes={“改进点”: “售后服务”}
),
]
)
]
},

# ========== 文学作品配置 ==========
“literary”: {
“description”: “提取文学作品(如小说、诗歌、戏剧)中的核心元素”,
“model”: “gpt-4o-mini”, # 文学分析需要更强的理解能力
“extraction_passes”: 2, # 多轮提取以捕捉深层含义
“max_workers”: 10,
“max_char_buffer”: 3000,
“prompt”: “””
提取文学作品中的以下要素:
1. 主要人物 (姓名、角色、性格特点)
2. 场景和氛围 (地点、时间、环境描述、情感基调)
3. 关键情节 (发生了什么,转折点)
4. 主题思想 (作品探讨的核心议题,如爱、死亡、成长、社会批判等)
5. 象征或意象 (具有特殊含义的物品或概念,如“红灯笼”象征压抑)
6. 修辞手法 (如比喻、拟人、排比等)
请精确引用原文,并为每个提取项提供相关属性。
“””,
“examples”: [
lx.data.ExampleData(
text=”我冒着严寒,回到相隔二千余里,别了二十余年的故乡去。时候既然是深冬,渐近故乡时,天气又阴晦了,冷风吹进船舱中,呜呜的响,从篷隙向外一望,苍黄的天底下,远近横着几个萧索的荒村,没有一些活气。”,
extractions=[
lx.data.Extraction(
extraction_class=”关键情节”,
extraction_text=”回到…故乡去”,
attributes={“事件类型”: “返乡”, “人物”: “我”}
),
lx.data.Extraction(
extraction_class=”场景”,
extraction_text=”相隔二千余里,别了二十余年的故乡”,
attributes={“地点”: “故乡”, “时间跨度”: “二十余年”, “距离”: “二千余里”}
),
lx.data.Extraction(
extraction_class=”氛围”,
extraction_text=”萧索的荒村,没有一些活气”,
attributes={“情感基调”: “荒凉、衰败”, “天气”: “严寒, 阴晦”}
),
lx.data.Extraction(
extraction_class=”意象”,
extraction_text=”苍黄的天”,
attributes={“色彩”: “苍黄”, “暗示”: “压抑、沉闷”}
),
lx.data.Extraction(
extraction_class=”修辞手法”,
extraction_text=”冷风吹进船舱中,呜呜的响”,
attributes={“类型”: “拟声/拟人”, “对象”: “冷风”}
),
]
)
]
},
}

# ========== 添加你自己的配置 ==========
# 复制上面的模板,修改为你需要的提取规则
# “your_config”: {
# “description”: “你的配置描述”,
# “model”: “gpt-4o-mini”,
# “prompt”: “你的提取指令…”,
# “examples”: […]
# }

详细教程:用Unsloth快速微调OpenAI首个开源大模型gpt-oss!低配显卡也能拥有专属GPT

分享我的Colab文件:

https://colab.research.google.com/drive/11jEpOECqYKEWh_75Bc3hSw2CmWqE4oF3?usp=sharing

点开链接会进入Colab界面,在这所做的修改不会保存,需要将它复制保存到Drive中。

上传文件

在这个单元格data_files=填入你的数据集名称;如果是别的数据集格式需要修改代码

修改微调细节:

最后一个单元格是测试微调效果,在原项目上是没有的,如果不需要可以直接删除:

如果使用T4 GPU会有概率超出可使用显存,导致失败;推荐选择L4 GPU性价比最高;A100 GPU速度更快,收费更高,具体看你需求。

点击Restart session and run all即可一键运行:

时序知识图谱测试解析

创建虚拟环境: python -m venv tkg_agent_env
激活虚拟环境: source tkg_agent_env/bin/activate
克隆代码库 git clone https://github.com/openai/openai-cookbook.git


进入项目目录: cd openai-cookbook/examples/partners/temporal_agents_with_knowledge_graphs/
安装Jupyter Lab: pip install jupyterlab
启动Jupyter Lab服务: jupyter lab


作用: 本地启动Web服务器,在浏览器打开Jupyter Lab操作界面,与项目文件进行交互。

双击打开temporal_agents_with_konwledge_graphs.ipynb文件

点击“双箭头”图标,重启内核并一键运行所有单元格

等待输出结果

需要OpenAI的API key调用模型分块、问答

分块时如遇到API速率上限可选择更换模型,需自己手动添加

后续没有问题就会生成图谱

可在单元格修改问题测试问答

在作者Huggingface上也有上传的已经处理好的数据集,代码可以直接下载调用

关于过程步骤,整个文件有详细的英文说明。

DataGraphX怎么帮企业从‘信息堆’迈向‘智能洞察’?

数据是新时代的石油,但未经提炼的原油毫无价值。今天,几乎所有企业都坐拥海量数据,却发现自己被困在了一座巨大的“信息堆”中——数据彼此割裂,难以查询,无法形成有效的洞察力来指导决策。

如何才能让数据开口说话,甚至能听懂我们的问题并给出智慧的答案?答案是知识图谱。它能将杂乱的数据连接成一张智能网络,让每一次查询都成为一次深度洞察的开始。

DataGraphX 正是为此而生的利器。它不仅仅是一个工具,更是一套帮助企业从“信息堆”迈向“智能洞察”的完整方法论和技术引擎。接下来,我们将探讨DataGraphX如何赋能企业,将数据资产转化为真正的决策优势。