智能简历筛选系统
1. 项目概述
1.1 项目简介
这是一个基于大语言模型 (LLM) 和 Agent 工作流 的智能简历筛选系统,能够自动化完成以下任务:
- 📧 从邮箱自动获取简历附件
- 📄 解析 PDF/Word 简历并提取文本
- 🤖 使用 AI 对简历进行结构化解析
- 📊 基于岗位 JD 智能评分(LLM 评分 + RAG 向量相似度)
- 🗂️ 自动筛选符合要求的简历
- 💾 存储和管理简历数据
- 🌐 提供 Web 界面和 API 接口
1.2 应用场景
- 企业 HR 部门:自动化初筛简历,提高效率
- 招聘机构:批量处理候选人简历
- 猎头公司:快速匹配岗位与候选人
- 校园招聘:处理大量应届生简历
1.3 核心优势
| 特性 | 传统方式 | 本系统 |
|---|---|---|
| 处理速度 | 人工逐份阅读,每份 5-10 分钟 | 自动化处理,秒级完成 |
| 评分一致性 | 不同 HR 标准不一 | 统一 AI 评分标准 |
| 筛选维度 | 有限,易遗漏关键信息 | 多维度综合评估 |
| 可扩展性 | 人力有限 | 可无限扩展 |
1.4 开源地址
bird-six/filter_resume_agent: 批量获取邮箱投递的简历并筛选的AI Agent
2. 技术栈详解
2.1 后端技术
2.1.1 Python 3.11
为什么选择 Python 3.11?
- 性能提升:相比 Python 3.10 快 10-60%
- 更好的错误提示
- 更完善的类型提示支持
2.1.2 FastAPI
FastAPI 核心特性:
from fastapi import FastAPI
app = FastAPI()
@app.get("/items/{item_id}")
async def read_item(item_id: int):
return {"item_id": item_id}
优势:
- 🚀 高性能:基于 Starlette 和 Pydantic,性能媲美 Node.js 和 Go
- 📝 自动文档:自动生成 Swagger UI 和 ReDoc
- ✅ 类型安全:基于 Python 类型提示,减少 40% 人为错误
- 🛠️ 内置验证:自动数据验证和序列化
知识拓展:ASGI 与 WSGI
- WSGI (Web Server Gateway Interface):同步 Web 服务器接口,Flask/Django 使用
- ASGI (Asynchronous Server Gateway Interface):异步 Web 服务器接口,FastAPI 使用
- FastAPI 支持
async/await,可处理高并发请求
2.1.3 Pydantic
数据验证模型:
from pydantic import BaseModel, Field
class User(BaseModel):
id: int
name: str = Field(..., min_length=1, max_length=50)
email: str
age: int = Field(ge=0, le=150) # ge: greater or equal, le: less or equal
核心功能:
- 类型验证和转换
- 数据序列化/反序列化
- 自动文档生成
- 嵌套模型支持
2.2 AI 相关框架
2.2.1 LangChain
LangChain 是什么? LangChain 是一个用于开发大语言模型应用的框架,提供:
- 组件抽象:统一的 LLM、嵌入、存储接口
- 链式调用:将多个组件组合成复杂应用
- Agent 能力:让 LLM 能够调用工具和 API
核心概念:
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import PydanticOutputParser
# 1. 初始化模型
llm = init_chat_model(model="qwen-plus", model_provider="openai")
# 2. 定义提示模板
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个助手"),
("human", "{input}")
])
# 3. 定义输出解析器
parser = PydanticOutputParser(pydantic_object=User)
# 4. 创建链
chain = prompt | llm | parser
# 5. 执行
result = chain.invoke({"input": "你好"})
知识拓展:Prompt Engineering
- Zero-shot:直接提问,不给示例
- Few-shot:提供几个示例后再提问
- Chain of Thought:让模型逐步推理
- ReAct:Reasoning + Acting,结合推理和行动
2.2.2 LangGraph
LangGraph 简介: LangGraph 是 LangChain 的扩展,用于构建有状态的多 Agent 工作流。
核心概念:
- State(状态):工作流中共享的数据结构
- Node(节点):执行具体任务的函数
- Edge(边):节点之间的连接关系
- Graph(图):由节点和边组成的工作流
示例:
from langgraph.graph import StateGraph, END
from typing import TypedDict
# 1. 定义状态
class State(TypedDict):
input: str
output: str
# 2. 定义节点函数
def node_a(state):
return {"output": state["input"] + " - processed by A"}
def node_b(state):
return {"output": state["output"] + " - processed by B"}
# 3. 构建图
graph = StateGraph(State)
graph.add_node("node_a", node_a)
graph.add_node("node_b", node_b)
graph.set_entry_point("node_a")
graph.add_edge("node_a", "node_b")
graph.add_edge("node_b", END)
# 4. 编译并运行
compiled_graph = graph.compile()
result = compiled_graph.invoke({"input": "Hello"})
知识拓展:有状态 vs 无状态
- 无状态:每次请求独立,不保留历史信息(如传统 REST API)
- 有状态:保留上下文信息,支持复杂的多轮交互(如 Agent 工作流)
2.2.3 ChromaDB
ChromaDB 是什么? ChromaDB 是一个开源的向量数据库,用于存储和检索嵌入向量。
核心操作:
import chromadb
# 1. 创建客户端
client = chromadb.PersistentClient(path="./chroma_db")
# 2. 创建集合
collection = client.create_collection(name="my_collection")
# 3. 添加向量
collection.add(
embeddings=[[0.1, 0.2, 0.3, ...]], # 嵌入向量
ids=["doc1"],
documents=["这是文档内容"]
)
# 4. 查询相似向量
results = collection.query(
query_embeddings=[[0.15, 0.25, 0.35, ...]],
n_results=5
)
知识拓展:向量数据库原理
- 嵌入 (Embedding):将文本转换为高维向量,语义相似的文本向量距离更近
- 相似度计算:常用余弦相似度、欧氏距离
- 应用场景:语义搜索、推荐系统、RAG(检索增强生成)
2.3 数据库
2.3.1 MySQL 8.0
SQLAlchemy ORM:
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String(50))
知识拓展:ORM vs 原生 SQL
| 特性 | ORM | 原生 SQL |
|---|---|---|
| 开发效率 | 高,面向对象 | 低,需手写 SQL |
| 性能 | 中等,有开销 | 最优,直接执行 |
| 可维护性 | 高,代码清晰 | 低,SQL 分散 |
| 灵活性 | 中等 | 高,可写复杂查询 |
2.4 前端技术
2.4.1 Vue 3
Vue 3 核心特性:
const { createApp, ref, computed } = Vue
createApp({
setup() {
const count = ref(0)
const double = computed(() => count.value * 2)
function increment() {
count.value++
}
return { count, double, increment }
}
}).mount('#app')
知识拓展:响应式原理
- Vue 2:使用
Object.defineProperty(),无法检测数组和对象的变化 - Vue 3:使用
Proxy,完全重写响应式系统,性能更好
2.4.2 Tailwind CSS
实用优先的 CSS 框架:
<div class="flex items-center justify-center p-4 bg-white rounded-lg shadow-md">
<span class="text-gray-700 font-bold">Hello World</span>
</div>
优势:
- 快速原型开发
- 一致的设计系统
- 响应式支持
- 无需离开 HTML 写样式
3. 环境准备
3.1 基础软件安装
3.1.1 Python 3.11
Windows 安装:
# 1. 下载 Python 3.11 安装包
# 访问 https://www.python.org/downloads/windows/
# 2. 运行安装程序,勾选"Add Python to PATH"
# 3. 验证安装
python --version # 应显示 Python 3.11.x
Linux 安装:
# Ubuntu/Debian
sudo apt update
sudo apt install python3.11 python3.11-venv python3.11-dev
# CentOS/RHEL
sudo yum install python3.11
3.1.2 Docker 和 Docker Compose
Docker 安装:
# Windows: 下载 Docker Desktop for Windows
# https://www.docker.com/products/docker-desktop
# Linux
curl -fsSL https://get.docker.com | sh
sudo systemctl start docker
sudo systemctl enable docker
# 验证安装
docker --version
docker compose version
3.2 API 密钥准备
3.2.1 通义千问 API
获取步骤:
- 访问阿里云:https://www.aliyun.com/
- 注册/登录账号
- 开通 DashScope 服务
- 创建 API Key
- 查看模型列表和价格
支持的模型:
qwen-turbo:速度快,成本低qwen-plus:性能均衡(推荐)qwen-max:最强性能
3.2.2 邮箱配置
以 QQ 邮箱为例:
- 登录 QQ 邮箱网页版
- 设置 -> 账户
- 开启 IMAP/SMTP 服务
- 获取授权码(不是登录密码)
常见邮箱 IMAP 服务器:
QQ 邮箱:imap.qq.com
163 邮箱:imap.163.com
Gmail:imap.gmail.com
Outlook:imap-mail.outlook.com
4. 项目搭建步骤
4.1 创建项目结构
# 1. 创建项目目录
mkdir resume_screen_agent
cd resume_screen_agent
# 2. 创建子目录
mkdir -p db utils workflow/nodes static fliter_resumes resumes chroma_db
# 3. 创建__init__.py 文件(使目录成为 Python 包)
touch db/__init__.py utils/__init__.py workflow/__init__.py workflow/nodes/__init__.py
4.2 创建虚拟环境
# 1. 创建虚拟环境
python -m venv venv
# 2. 激活虚拟环境
# Windows
venv\Scripts\activate
# Linux/Mac
source venv/bin/activate
# 3. 验证
which python # 应指向虚拟环境中的 Python
4.3 安装依赖
创建 requirements.txt:
fastapi==0.135.1
uvicorn==0.34.0
python-dotenv==1.2.2
sqlalchemy==2.0.23
pymysql==1.1.2
langchain==1.2.12
langgraph==1.1.2
langchain-openai==1.1.11
chromadb==1.5.5
dashscope==1.25.14
pypdf2==3.0.1
python-docx==1.2.0
pydantic==2.12.5
numpy==2.4.3
安装依赖:
pip install -r requirements.txt
4.4 配置文件
4.4.1 创建 .env 文件
# 邮箱配置
EMAIL_USER=your_email@example.com
EMAIL_PWD=your_email_password_or_auth_code
EMAIL_SERVER=imap.example.com
# 通义千问 API
DASHSCOPE_API_KEY=your_api_key
DASHSCOPE_API_URL=https://dashscope.aliyuncs.com/compatible-mode/v1
DASHSCOPE_MODEL=qwen-plus
# 数据库配置
DB_USER=root
DB_PASS=your_password
DB_HOST=localhost
DB_NAME=resume_db
知识拓展:环境变量管理
- 开发环境:使用
.env文件 - 生产环境:使用容器环境变量或密钥管理服务
- 最佳实践:
- 永远不要将
.env提交到 Git - 在
.gitignore中添加.env - 提供
.env.example模板
4.4.2 创建 .gitignore
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
venv/
env/
ENV/
# 环境变量
.env
.env.local
# IDE
.idea/
.vscode/
*.swp
*.swo
# 数据库
*.db
*.sqlite
# 日志
*.log
# 上传文件
resumes/
fliter_resumes/
# ChromaDB
chroma_db/
4.5 Docker 配置
4.5.1 创建 Dockerfile
# 使用 Python 3.11 作为基础镜像
FROM python:3.11-slim
# 设置工作目录
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
build-essential \
curl \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制项目代码
COPY . .
# 暴露端口
EXPOSE 8000
# 设置环境变量
ENV PYTHONUNBUFFERED=1
# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
知识拓展:Dockerfile 最佳实践
- 使用多阶段构建减少镜像大小
- 合并 RUN 指令减少层数
- 使用
.dockerignore排除不必要文件 - 指定具体版本号,避免不确定性
4.5.2 创建 docker-compose.yml
version: '3.8'
services:
app:
build: .
ports:
- "8000:8000"
depends_on:
- db
environment:
- DB_USER=root
- DB_PASS=password
- DB_HOST=db
- DB_NAME=resume_db
volumes:
- ./fliter_resumes:/app/fliter_resumes
- ./static:/app/static
db:
image: mysql:8.0
environment:
- MYSQL_ROOT_PASSWORD=password
- MYSQL_DATABASE=resume_db
ports:
- "3306:3306"
volumes:
- mysql_data:/var/lib/mysql
volumes:
mysql_data:
知识拓展:Docker Compose
- 多容器编排:同时管理应用、数据库、缓存等
- 服务依赖:
depends_on确保启动顺序 - 数据持久化:使用 volumes 避免数据丢失
- 网络隔离:每个服务在独立网络中
4.6 启动项目
4.6.1 使用 Docker Compose(推荐)
# 1. 构建并启动所有服务
docker compose up -d
# 2. 查看日志
docker compose logs -f
# 3. 停止服务
docker compose down
# 4. 停止并删除数据卷
docker compose down -v
4.6.2 本地开发模式
# 1. 启动 MySQL(需要预先安装)
# Windows: 启动 MySQL 服务
# Linux: sudo systemctl start mysql
# 2. 创建数据库
mysql -u root -p -e "CREATE DATABASE resume_db;"
# 3. 运行应用
uvicorn main:app --reload --host 0.0.0.0 --port 8000
# --reload: 代码变化自动重启(开发环境)
# --host: 监听地址
# --port: 监听端口
访问应用:
- API 文档:http://localhost:8000/docs
- 备用文档:http://localhost:8000/redoc
- 前端界面:http://localhost:8000/static
5. 核心模块解析
5.1 数据库模块 (db/database.py)
5.1.1 数据库连接配置
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from dotenv import load_dotenv
import os
load_dotenv()
# 从环境变量读取配置
DB_USER = os.getenv("DB_USER")
DB_PASS = os.getenv("DB_PASS")
DB_HOST = os.getenv("DB_HOST")
DB_NAME = os.getenv("DB_NAME")
# 构建数据库 URL
DATABASE_URL = f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}/{DB_NAME}"
# 创建数据库引擎
engine = create_engine(
DATABASE_URL,
pool_size=10, # 连接池大小
max_overflow=20, # 最大溢出连接数
pool_recycle=3600, # 连接回收时间(秒)
echo=False # 是否打印 SQL 日志
)
# 创建会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
知识拓展:连接池
- 为什么需要连接池?
- 数据库连接是昂贵资源,创建和销毁开销大
- 连接池复用连接,提高性能
- 关键参数:
pool_size:池中保持的连接数max_overflow:超出 pool_size 后允许创建的额外连接pool_recycle:连接自动回收时间,避免超时断开
5.1.2 数据模型定义
from sqlalchemy import Column, Integer, String, ForeignKey, Text
from sqlalchemy.orm import relationship, declarative_base
Base = declarative_base()
# 部门模型
class Department(Base):
__tablename__ = "departments"
id = Column(String(100), primary_key=True)
name = Column(String(255), nullable=False)
# 一对多关系:一个部门有多个岗位
positions = relationship("Position", back_populates="department", cascade="all, delete-orphan")
# 岗位模型
class Position(Base):
__tablename__ = "positions"
id = Column(String(100), primary_key=True)
name = Column(String(255), nullable=False)
jd = Column(Text, nullable=True) # 岗位描述
department_id = Column(String(100), ForeignKey("departments.id"))
# 多对一关系:多个岗位属于一个部门
department = relationship("Department", back_populates="positions")
# 系统配置模型
class SystemConfig(Base):
__tablename__ = "system_configs"
id = Column(Integer, primary_key=True)
email = Column(String(255), nullable=False)
email_password = Column(String(255), nullable=False)
email_server = Column(String(255), nullable=False)
api_key = Column(String(255), nullable=False)
api_model = Column(String(255), nullable=False)
api_url = Column(String(255), nullable=False)
# 筛选简历模型
class FilteredResume(Base):
__tablename__ = "filtered_resumes"
id = Column(Integer, primary_key=True)
name = Column(String(255))
school = Column(String(255))
age = Column(Integer)
education = Column(String(255))
phone = Column(String(100))
email = Column(String(255))
final_score = Column(Integer)
date = Column(String(100))
email_path = Column(String(500))
position = Column(String(255))
知识拓展:SQLAlchemy 关系
- 一对一:
uselist=False - 一对多:默认,使用
relationship() - 多对多:使用关联表(Association Table)
- 级联操作:
cascade="all, delete-orphan"删除父对象时自动删除子对象
5.2 工具模块 (utils/)
5.2.1 邮件处理工具
IMAP 协议详解:
import imaplib
import email
from email.header import decode_header
# 1. 连接 IMAP 服务器
server = imaplib.IMAP4_SSL("imap.qq.com")
# 2. 登录
server.login("user@qq.com", "password")
# 3. 选择邮箱文件夹
server.select("INBOX")
# 4. 搜索邮件
status, messages = server.search(None, 'ALL')
# 5. 获取邮件内容
status, msg_data = server.fetch("1", "(RFC822)")
# 6. 解析邮件
email_obj = email.message_from_bytes(msg_data[0][1])
# 7. 解码邮件头
subject = decode_header(email_obj["Subject"])
知识拓展:IMAP vs POP3
| 特性 | IMAP | POP3 |
|---|---|---|
| 邮件存储 | 服务器 | 本地 |
| 多设备同步 | 支持 | 不支持 |
| 离线访问 | 有限 | 完全 |
| 服务器空间 | 占用 | 不占用 |
| 推荐使用场景 | 多设备 | 单设备 |
5.2.2 简历文本提取
PDF 文本提取:
import PyPDF2
def extract_pdf_text(filepath):
with open(filepath, 'rb') as f:
reader = PyPDF2.PdfReader(f)
text = '\n'.join([page.extract_text() for page in reader.pages])
return text
Word 文档提取:
from docx import Document
def extract_docx_text(filepath):
doc = Document(filepath)
text = '\n'.join([para.text for para in doc.paragraphs])
return text
知识拓展:PDF 结构
- PDF 是二进制格式,不是纯文本
- 文本存储在内容流 (Content Stream) 中
- 可能包含字体编码、图形、表格等复杂元素
- OCR 需求:扫描件 PDF 需要 OCR 识别
5.3 工作流节点 (workflow/nodes/)
5.3.1 邮件读取节点
def read_resume_emails(state: ResumeProcessState):
"""
从邮箱读取简历附件
1. 查询数据库获取最新邮件时间
2. 登录 IMAP 服务器
3. 搜索新邮件
4. 下载附件
5. 保存信息到数据库
"""
# 获取最新邮件时间(避免重复处理)
latest_date = get_latest_email_from_db()
# 连接邮箱
server = imaplib.IMAP4_SSL(EMAIL_SERVER)
server.login(EMAIL_USER, EMAIL_PWD)
server.select("INBOX", readonly=True)
# 搜索邮件
if latest_date:
imap_date = format_imap_date(latest_date)
status, messages = server.search(None, f'SINCE {imap_date}')
else:
status, messages = server.search(None, 'ALL')
# 处理每封邮件
resume_info_list = []
for msg_id in messages[0].split():
email_data = process_email(server, msg_id)
if email_data:
resume_info_list.append(email_data)
# 保存到数据库
save_to_db(resume_info_list)
return {"email_raw": {"raw_email_resumes": resume_info_list}}
知识拓展:增量同步策略
- 时间戳法:记录最后处理时间,只处理新邮件
- ID 法:记录已处理邮件 ID,避免重复
- 标志法:在邮件上打标签(如
\Seen)
5.3.2 简历解析节点
def parse_resume(state: ResumeProcessState):
"""
使用 LLM 解析简历为结构化数据
"""
# 初始化 LLM
llm = init_chat_model(
model=os.getenv("DASHSCOPE_MODEL"),
model_provider="openai",
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url=os.getenv("DASHSCOPE_API_URL"),
)
# 定义输出格式
parser = PydanticOutputParser(pydantic_object=Resume)
# 创建提示模板
prompt = ChatPromptTemplate.from_messages([
("system", "你必须按照以下格式输出:{format_instructions}"),
("human", "请提取简历信息:{resume_text}")
])
# 创建处理链
chain = prompt | llm | parser
# 处理每个简历
structured_resumes = []
for item in state.resume_text.resume_texts:
result = chain.invoke({
"resume_text": item["text"],
"format_instructions": parser.get_format_instructions()
})
structured_resumes.append(result)
return {"structured": {"structured_resumes": structured_resumes}}
知识拓展:LLM 输出格式化
- JSON 模式:要求模型输出 JSON 格式
- Pydantic 解析器:自动验证和转换数据类型
- 函数调用:使用 LLM 的 function calling 功能
- 正则提取:后处理提取结构化数据
6. 工作流设计原理
6.1 状态管理
6.1.1 状态定义
from pydantic import BaseModel, Field
from typing import List, Dict, Optional
# 子状态:原始邮件信息
class EmailRawState(BaseModel):
raw_email_resumes: List[Dict[str, str]] = Field(default_factory=list)
# 子状态:简历文本
class ResumeTextState(BaseModel):
resume_texts: List[Dict[str, str]] = Field(default_factory=list)
# 子状态:结构化简历
class StructuredResumeState(BaseModel):
structured_resumes: List[Resume] = Field(default_factory=list)
# 子状态:评分结果
class ScoreState(BaseModel):
llm_scores: List[ResumeLLMScore] = Field(default_factory=list)
final_scores: List[Dict[str, Any]] = Field(default_factory=list)
filtered_resumes: List[Dict[str, Any]] = Field(default_factory=list)
# 子状态:JD 信息
class JdState(BaseModel):
jd_text: Optional[str] = None
# 主状态:整合所有子状态
class ResumeProcessState(BaseModel):
email_raw: EmailRawState = Field(default_factory=EmailRawState)
resume_text: ResumeTextState = Field(default_factory=ResumeTextState)
structured: StructuredResumeState = Field(default_factory=StructuredResumeState)
score: ScoreState = Field(default_factory=ScoreState)
jd: JdState = Field(default_factory=JdState)
知识拓展:状态模式
- 不可变状态:每次更新返回新状态,不修改原状态
- 状态快照:保留历史状态,支持回滚和调试
- 状态分区:将大状态拆分为小状态,便于管理
6.1.2 状态流转
初始状态 → [read_resume_emails] → 包含原始邮件信息
→ [read_resume_file] → 包含简历文本
→ [parse_resume] → 包含结构化简历
→ [llm_evaluate_resume] → 包含 LLM 评分
→ [get_filter_resume] → 包含最终筛选结果
6.2 图构建
from langgraph.graph import StateGraph, END
# 1. 初始化图
graph_builder = StateGraph(ResumeProcessState)
# 2. 添加节点
graph_builder.add_node("read_resume_emails", read_resume_emails)
graph_builder.add_node("read_resume_file", read_resume_file)
graph_builder.add_node("parse_resume", parse_resume)
graph_builder.add_node("llm_evaluate_resume", llm_evaluate_resume)
graph_builder.add_node("get_filter_resume", get_filter_resume)
# 3. 设置边(执行顺序)
graph_builder.set_entry_point("read_resume_emails")
graph_builder.add_edge("read_resume_emails", "read_resume_file")
graph_builder.add_edge("read_resume_file", "parse_resume")
graph_builder.add_edge("parse_resume", "llm_evaluate_resume")
graph_builder.add_edge("llm_evaluate_resume", "get_filter_resume")
graph_builder.add_edge("get_filter_resume", END)
# 4. 编译图
graph = graph_builder.compile()
知识拓展:有向无环图 (DAG)
- 节点:执行单元
- 边:执行顺序
- 无环:不存在循环依赖
- 应用:工作流引擎、任务调度、数据流水线
6.3 条件分支(扩展)
from langgraph.graph import START, END
# 添加条件分支
def should_continue(state):
if state["score"] >= 60:
return "high_score_branch"
else:
return "low_score_branch"
graph.add_conditional_edges(
"evaluate",
should_continue,
{
"high_score_branch": "accept",
"low_score_branch": "reject"
}
)
7. AI 模型集成
7.1 模型选择
7.1.1 通义千问系列
| 模型 | 上下文长度 | 知识截止时间 | 适用场景 |
|---|---|---|---|
| qwen-turbo | 32K | 2024-01 | 快速响应、简单任务 |
| qwen-plus | 32K | 2024-01 | 复杂推理、代码生成 |
| qwen-max | 32K | 2024-01 | 高难度任务、专业领域 |
知识拓展:上下文长度
- 定义:模型一次能处理的最大 token 数
- Token 计算:中文约 1.5 字符/token,英文约 4 字符/token
- 长上下文优势:可处理长文档、多轮对话
- 成本考虑:上下文越长,费用越高
7.2 Prompt 设计
7.2.1 系统提示词
sys_prompt = """
你是专业的简历匹配度评估专家,需要完成以下任务:
1. 分析 JD 要求:{jd_text}
2. 分析简历信息:{resume_info}
3. 从技能匹配度、经验匹配度、学历匹配度、岗位契合度、行业适配度 5 个维度综合评估
4. 严格要求以下格式输出:{format_instructions}
"""
知识拓展:Prompt 设计原则
- 明确角色:定义 AI 的专业身份
- 任务分解:将复杂任务拆分为步骤
- 格式要求:明确输出格式和结构
- 示例引导:提供 Few-shot 示例
- 约束条件:设定边界和限制
7.2.2 Few-shot 示例
examples = """
示例 1:
JD:招聘 Python 工程师,要求 3 年经验,熟悉 FastAPI、SQLAlchemy
简历:张三,5 年 Python 经验,精通 FastAPI、Django、MySQL
评分:{"score": 85}
示例 2:
JD:招聘数据科学家,要求硕士学历,精通机器学习
简历:李四,本科学历,1 年数据分析经验
评分:{"score": 45}
"""
7.3 链式调用
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import PydanticOutputParser
# 1. 提示模板
prompt = ChatPromptTemplate.from_messages([
("system", sys_prompt),
("human", "请评估上述 JD 和简历的综合匹配度")
])
# 2. 输出解析器
parser = PydanticOutputParser(pydantic_object=ResumeLLMScore)
# 3. 创建链
chain = prompt | llm | parser
# 4. 执行
result = chain.invoke({
"jd_text": jd_text,
"resume_info": resume_text,
"format_instructions": parser.get_format_instructions()
})
知识拓展:LCEL (LangChain Expression Language)
- 管道操作符:
|连接多个组件 - 流式处理:支持流式输出
- 批量处理:
batch()并行处理多个请求 - 异步支持:
ainvoke()异步执行
8. 向量检索与 RAG
8.1 嵌入向量生成
import dashscope
def get_embedding(text):
"""
使用通义千问 API 生成文本嵌入向量
"""
dashscope.api_key = os.getenv("DASHSCOPE_API_KEY")
res = dashscope.TextEmbedding.call(
model="text-embedding-v1",
input=text,
)
if res and "output" in res and "embeddings" in res["output"]:
embedding = res["output"]["embeddings"][0]["embedding"]
return embedding # 返回向量列表,如 [0.1, 0.2, 0.3, ...]
return None
知识拓展:嵌入模型
- text-embedding-v1:通义千问嵌入模型,1536 维
- text-embedding-v2:升级版,2048 维
- bge-large-zh:北京智源,中文效果好
- m3e-base:中文语义理解优秀
8.2 余弦相似度计算
import numpy as np
def get_cosine_similarity(vec1, vec2):
"""
计算两个向量的余弦相似度
公式:cos(θ) = (A·B) / (||A|| × ||B||)
"""
vec1 = np.array(vec1)
vec2 = np.array(vec2)
# 点积
dot_product = np.dot(vec1, vec2)
# L2 范数(模长)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
# 余弦相似度
similarity = dot_product / (norm1 * norm2)
# 限制在 [-1, 1] 范围(浮点精度问题)
similarity = np.clip(similarity, -1.0, 1.0)
return float(similarity)
知识拓展:相似度度量
| 方法 | 公式 | 特点 | 适用场景 |
|---|---|---|---|
| 余弦相似度 | A·B/( | A | |
| 欧氏距离 | √Σ(A-B)² | 考虑绝对距离 | 空间距离 |
| 曼哈顿距离 | Σ | A-B | |
| 杰卡德相似度 | A∩B | / |
8.3 ChromaDB 向量存储
import chromadb
import uuid
# 创建客户端
chroma_client = chromadb.PersistentClient(path="../chroma_db")
def add_jd_to_chroma(jd_text: str):
"""
将 JD 添加到 Chroma 数据库
"""
# 获取集合
collection = chroma_client.get_or_create_collection(name="jd_collection")
# 生成嵌入向量
jd_embedding = get_embedding(jd_text)
# 检查是否已存在相似 JD
results = collection.query(
query_embeddings=[jd_embedding],
n_results=1
)
# 如果距离很小(相似度高),复用已有向量
if results['distances'] and results['distances'][0][0] < 0.01:
print("找到相似 JD,复用向量")
return results['embeddings'][0][0]
# 添加新向量
doc_id = str(uuid.uuid4())
collection.add(
embeddings=[jd_embedding],
ids=[doc_id],
documents=[jd_text]
)
return jd_embedding
知识拓展:向量数据库选型
| 数据库 | 特点 | 适用场景 |
|---|---|---|
| ChromaDB | 轻量级,本地存储 | 开发、小规模 |
| Milvus | 高性能,分布式 | 大规模生产 |
| Pinecone | 托管服务,易用 | 快速部署 |
| Weaviate | 支持 GraphQL,多模态 | 复杂查询 |
| Qdrant | Rust 编写,高性能 | 高并发场景 |
8.4 RAG 评分计算
def calculate_final_score(rag_score, llm_score):
"""
加权计算最终评分
RAG 评分(余弦相似度)× 100 × 0.6 + LLM 评分 × 0.4
"""
final_score = round(rag_score * 100 * 0.6 + llm_score * 0.4)
return final_score
知识拓展:RAG(检索增强生成)
- 原理:先从知识库检索相关信息,再让 LLM 基于检索结果生成答案
- 优势:
- 减少幻觉(Hallucination)
- 利用最新知识
- 可追溯信息来源
- 流程:
- 用户提问
- 将问题转换为向量
- 在向量库中检索相似文档
- 将检索结果 + 问题一起发送给 LLM
- LLM 生成答案
9. 数据库设计
9.1 ER 图
┌─────────────────┐ ┌─────────────────┐
│ departments │ │ system_configs │
├─────────────────┤ ├─────────────────┤
│ id (PK) │ │ id (PK) │
│ name │ │ email │
└────────┬────────┘ │ email_password │
│ │ email_server │
│ 1:N │ api_key │
│ │ api_model │
▼ │ api_url │
┌─────────────────┐ └─────────────────┘
│ positions │
├─────────────────┤
│ id (PK) │ ┌─────────────────┐
│ name │ │ filtered_resumes│
│ jd (TEXT) │ ├─────────────────┤
│ department_id │───┐ │ id (PK) │
└─────────────────┘ │ │ name │
│ │ school │
│ │ age │
│ │ education │
│ │ phone │
│ │ email │
│ │ final_score │
│ │ date │
│ │ position │
│ └─────────────────┘
│
│ ┌─────────────────┐
└──▶│ resumes │
├─────────────────┤
│ id (PK) │
│ sender │
│ subject │
│ date │
│ filepath │
│ position │
└─────────────────┘
9.2 表结构详解
9.2.1 departments(部门表)
CREATE TABLE departments (
id VARCHAR(100) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
INDEX idx_name (name)
);
9.2.2 positions(岗位表)
CREATE TABLE positions (
id VARCHAR(100) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
jd TEXT,
department_id VARCHAR(100) NOT NULL,
FOREIGN KEY (department_id) REFERENCES departments(id)
ON DELETE CASCADE,
INDEX idx_department (department_id)
);
9.2.3 filtered_resumes(筛选简历表)
CREATE TABLE filtered_resumes (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255),
school VARCHAR(255),
age INT,
education VARCHAR(255),
phone VARCHAR(100),
email VARCHAR(255),
final_score INT,
date VARCHAR(100),
email_path VARCHAR(500),
position VARCHAR(255),
INDEX idx_score (final_score DESC),
INDEX idx_position (position)
);
知识拓展:索引优化
- 主键索引:自动创建,唯一且非空
- 普通索引:加速查询
- 复合索引:多列组合索引
- 覆盖索引:查询列都在索引中,无需回表
- 索引选择原则:
- 高频查询列
- 区分度高的列
- 避免在低基数列上建索引
9.3 SQLAlchemy 会话管理
from sqlalchemy.orm import Session
# 依赖注入:获取数据库会话
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# 在 API 中使用
@app.get("/api/departments")
async def get_departments(db: Session = Depends(get_db)):
departments = db.query(Department).all()
return departments
知识拓展:会话生命周期
- 创建:
SessionLocal() - 使用:执行查询/更新
- 提交:
db.commit() - 回滚:
db.rollback()(异常时) - 关闭:
db.close()(释放连接)
10. API 接口设计
10.1 RESTful 规范
10.1.1 资源命名
# 正确
GET /api/departments # 获取部门列表
GET /api/departments/123 # 获取单个部门
POST /api/departments # 创建部门
# 错误
GET /api/getDepartments # 动词不应出现在 URL 中
POST /api/createDepartment
10.1.2 HTTP 方法
| 方法 | 功能 | 幂等性 |
|---|---|---|
| GET | 获取资源 | 是 |
| POST | 创建资源 | 否 |
| PUT | 更新资源(全量) | 是 |
| PATCH | 更新资源(部分) | 否 |
| DELETE | 删除资源 | 是 |
10.2 核心接口实现
10.2.1 运行工作流
@app.get("/api/run-workflow")
async def run_workflow(position_id: str, db: Session = Depends(get_db)):
"""
运行简历筛选工作流
"""
# 1. 获取岗位 JD
position = db.query(Position).filter(Position.id == position_id).first()
if not position:
raise HTTPException(status_code=404, detail="岗位不存在")
# 2. 创建初始状态
initial_state = create_initial_state(position.jd)
# 3. 执行工作流
final_state = graph.invoke(initial_state)
# 4. 返回结果
return {
"status": "success",
"message": f"处理了 {len(final_state['email_raw']['raw_email_resumes'])} 个简历",
"data": {
"filtered_resumes": final_state["score"]["filtered_resumes"],
"final_scores": final_state["score"]["final_scores"]
}
}
10.2.2 获取候选人列表(分页)
@app.get("/api/candidates")
async def get_candidates(
page: int = 1,
page_size: int = 10,
position_id: str = None,
db: Session = Depends(get_db)
):
"""
获取筛选后的候选人列表(支持分页和岗位筛选)
"""
# 计算偏移量
offset = (page - 1) * page_size
# 构建查询
query = db.query(FilteredResume)
if position_id:
position = db.query(Position).filter(Position.id == position_id).first()
query = query.filter(FilteredResume.position == position.name)
# 获取总数
total = query.count()
# 分页查询
resumes = query.offset(offset).limit(page_size).all()
# 转换为响应格式
items = []
for resume in resumes:
items.append({
"id": resume.id,
"name": resume.name,
"school": resume.school,
"education": resume.education,
"final_score": resume.final_score,
"rating": resume.final_score / 100 * 5, # 转换为 5 星制
"info": f"{resume.education} / {resume.school}"
})
return {
"items": items,
"total": total,
"page": page,
"page_size": page_size
}
知识拓展:分页策略
- 偏移分页:
LIMIT 10 OFFSET 20(适合小数据量) - 游标分页:
WHERE id > last_id LIMIT 10(适合大数据量) - 关键分页:基于唯一键,性能最优
10.3 响应格式规范
# 成功响应
{
"status": "success",
"message": "操作成功",
"data": {...}
}
# 错误响应
{
"status": "error",
"message": "错误描述",
"code": "ERROR_CODE"
}
11. 前端界面实现
11.1 Vue 3 组件结构
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>HR 简历管理系统</title>
<script src="https://cdn.jsdelivr.net/npm/vue@3/dist/vue.global.prod.js"></script>
<script src="https://cdn.tailwindcss.com"></script>
</head>
<body>
<div id="app">
<!-- 侧边栏 -->
<aside class="w-64 bg-slate-900">
<!-- 部门列表 -->
<div v-for="dept in departments" :key="dept.id">
<h3>{{ dept.name }}</h3>
<a v-for="position in dept.positions"
@click="selectPosition(position.id)">
{{ position.name }}
</a>
</div>
</aside>
<!-- 主内容区 -->
<main class="flex-1">
<!-- 候选人列表 -->
<div v-for="candidate in candidates" :key="candidate.id">
<h4>{{ candidate.name }}</h4>
<p>{{ candidate.school }}</p>
<span>评分:{{ candidate.final_score }}</span>
</div>
</main>
</div>
<script>
const { createApp, ref, onMounted } = Vue
createApp({
setup() {
const departments = ref([])
const candidates = ref([])
const activePosition = ref(null)
// 加载部门列表
async function loadDepartments() {
const res = await fetch('/api/departments')
departments.value = await res.json()
}
// 选择岗位
async function selectPosition(positionId) {
activePosition.value = positionId
const res = await fetch(`/api/candidates?position_id=${positionId}`)
const data = await res.json()
candidates.value = data.items
}
onMounted(() => {
loadDepartments()
})
return {
departments,
candidates,
selectPosition
}
}
}).mount('#app')
</script>
</body>
</html>
知识拓展:Vue 3 组合式 API
- setup():组件逻辑入口
- ref():创建响应式引用
- reactive():创建响应式对象
- computed():计算属性
- watch():监听变化
- onMounted():生命周期钩子
11.2 Tailwind CSS 实用类
<!-- 布局 -->
<div class="flex h-screen"> <!-- Flex 布局,全屏高度 -->
<div class="w-64 flex-shrink-0"> <!-- 固定宽度 256px -->
<!-- 间距 -->
<div class="p-4"> <!-- 内边距 1rem -->
<div class="m-2"> <!-- 外边距 0.5rem -->
<div class="space-y-2"> <!-- 子元素垂直间距 -->
<!-- 颜色 -->
<div class="bg-blue-500"> <!-- 蓝色背景 -->
<div class="text-white"> <!-- 白色文字 -->
<!-- 响应式 -->
<div class="md:w-1/2 lg:w-1/4"> <!-- 中屏 50%,大屏 25% -->
知识拓展:Tailwind 命名规则
p-4:padding (0.25rem × 4 = 1rem)w-64:width (0.25rem × 64 = 16rem)bg-blue-500:background-color (蓝色 500 色阶)hover:bg-blue-600:悬停状态md:flex:中屏及以上使用 flex
12. Docker 容器化部署
12.1 构建优化
12.1.1 多阶段构建
# 阶段 1:构建依赖
FROM python:3.11-slim as builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt
# 阶段 2:运行环境
FROM python:3.11-slim
WORKDIR /app
COPY --from=builder /root/.local /root/.local
COPY . .
ENV PATH=/root/.local/bin:$PATH
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
优势:
- 减少最终镜像大小
- 分离构建和运行依赖
- 提高构建速度
12.2 生产环境配置
12.2.1 docker-compose.prod.yml
version: '3.8'
services:
app:
build: .
ports:
- "8000:8000"
environment:
- DB_USER=root
- DB_PASS=${DB_PASSWORD}
- DB_HOST=db
- DB_NAME=resume_db
depends_on:
db:
condition: service_healthy
restart: always
deploy:
resources:
limits:
cpus: '2'
memory: 2G
db:
image: mysql:8.0
environment:
- MYSQL_ROOT_PASSWORD=${DB_PASSWORD}
- MYSQL_DATABASE=resume_db
volumes:
- mysql_data:/var/lib/mysql
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "localhost"]
interval: 10s
timeout: 5s
retries: 5
restart: always
volumes:
mysql_data:
知识拓展:生产环境最佳实践
- 健康检查:确保依赖服务可用
- 重启策略:
restart: always自动重启 - 资源限制:防止内存泄漏
- 环境变量:使用
.env文件管理敏感信息 - 日志收集:配置日志驱动
12.3 部署流程
# 1. 准备生产环境变量
cp .env.example .env
# 编辑.env 文件,设置生产配置
# 2. 构建并启动
docker compose -f docker-compose.prod.yml up -d --build
# 3. 查看日志
docker compose logs -f app
# 4. 数据库迁移(如果需要)
docker compose exec app alembic upgrade head
# 5. 健康检查
curl http://localhost:8000/health
# 6. 停止服务
docker compose -f docker-compose.prod.yml down
13. 常见问题与调试
13.1 环境配置问题
问题 1:环境变量未加载
症状:
Error: DASHSCOPE_API_KEY is not set
解决方案:
# 确保在文件开头加载环境变量
from dotenv import load_dotenv
load_dotenv() # 加载.env 文件
# 验证环境变量
import os
print(os.getenv("DASHSCOPE_API_KEY"))
13.2 数据库连接问题
问题 2:无法连接 MySQL
症状:
sqlalchemy.exc.OperationalError: (2003, "Can't connect to MySQL server")
解决方案:
# 1. 检查 MySQL 容器是否运行
docker compose ps
# 2. 查看 MySQL 日志
docker compose logs db
# 3. 测试连接
docker compose exec db mysql -u root -p
# 4. 检查网络
docker compose exec app ping db
13.3 LLM API 问题
问题 3:API 调用失败
症状:
dashscope.api_error.APIError: Invalid API key
排查步骤:
# 1. 验证 API Key
import dashscope
dashscope.api_key = os.getenv("DASHSCOPE_API_KEY")
print(f"API Key: {dashscope.api_key[:10]}...") # 打印前 10 位
# 2. 测试 API
res = dashscope.TextEmbedding.call(
model="text-embedding-v1",
input="测试"
)
print(res)
# 3. 检查余额
# 登录阿里云控制台 -> DashScope -> 用量查询
13.4 工作流调试
问题 4:工作流卡住
解决方案:
# 1. 添加日志
def node_a(state):
print("=== 开始执行 node_a ===")
print(f"输入状态:{state}")
# ... 处理逻辑
print("=== node_a 执行完成 ===")
return {"output": "..."}
# 2. 单步调试
final_state = graph.invoke(initial_state, {"debug": True})
# 3. 检查节点输出
intermediate_states = graph.invoke(
initial_state,
return_intermediate_steps=True
)
13.5 性能优化
问题 5:处理速度慢
优化方案:
# 1. 批量处理(使用 LangChain batch)
results = chain.batch(resume_texts)
# 2. 异步处理
async def process_resume(text):
return await chain.ainvoke({"resume_text": text})
# 3. 并发限制
import asyncio
semaphore = asyncio.Semaphore(5) # 最多 5 个并发
# 4. 缓存嵌入向量
from functools import lru_cache
@lru_cache(maxsize=1000)
def get_embedding_cached(text):
return get_embedding(text)
14. 项目扩展建议
14.1 功能扩展
14.1.1 多 JD 并行处理
# 支持同时处理多个岗位
@app.post("/api/run-workflow-batch")
async def run_workflow_batch(position_ids: list[str]):
tasks = []
for position_id in position_ids:
task = process_position(position_id)
tasks.append(task)
results = await asyncio.gather(*tasks)
return {"results": results}
14.1.2 简历去重
def is_duplicate_resume(email, phone, db):
"""
检查简历是否重复
"""
existing = db.query(FilteredResume).filter(
(FilteredResume.email == email) |
(FilteredResume.phone == phone)
).first()
return existing is not None
14.1.3 自动标签生成
def generate_tags(resume_dict, jd_text):
"""
基于简历和 JD 自动生成标签
"""
tags = []
# 学历标签
if resume_dict.get("education") in ["博士", "硕士"]:
tags.append("高学历")
# 经验标签
work_years = int(resume_dict.get("work_years", 0))
if work_years >= 5:
tags.append("资深")
elif work_years >= 3:
tags.append("中级")
else:
tags.append("初级")
# 技能标签(基于关键词匹配)
skills = ["Python", "Java", "机器学习", "深度学习"]
for skill in skills:
if skill.lower() in jd_text.lower():
tags.append(skill)
return tags
14.2 技术升级
14.2.1 添加 Redis 缓存
import redis
import json
redis_client = redis.Redis(host='redis', port=6379, db=0)
def get_cached_embedding(text):
"""
从 Redis 获取缓存的嵌入向量
"""
cache_key = f"embedding:{hash(text)}"
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# 未命中,生成新向量并缓存
embedding = get_embedding(text)
redis_client.setex(cache_key, 3600, json.dumps(embedding)) # 缓存 1 小时
return embedding
14.2.2 添加消息队列
# 使用 Celery 处理异步任务
from celery import Celery
celery = Celery('tasks', broker='redis://redis:6379/0')
@celery.task
def process_resume_async(resume_id):
"""
异步处理简历
"""
# 处理逻辑...
return {"status": "completed"}
# API 中调用
@app.post("/api/process-resume-async")
async def process_resume_api(resume_id: int):
task = process_resume_async.delay(resume_id)
return {"task_id": task.id}
14.3 监控与日志
14.3.1 添加 Prometheus 监控
from prometheus_fastapi_instrumentator import Instrumentator
app = FastAPI()
Instrumentator().instrument(app).expose(app)
# 访问 /metrics 查看指标
14.3.2 结构化日志
import logging
import json
# 配置 JSON 日志
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"level": record.levelname,
"message": record.getMessage(),
"timestamp": self.formatTime(record),
"module": record.module
}
return json.dumps(log_data)
logger = logging.getLogger()
logger.addHandler(logging.StreamHandler())
logger.handlers[0].setFormatter(JSONFormatter())
14.4 安全加固
14.4.1 API 认证
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""
验证 Bearer Token
"""
token = credentials.credentials
if token != os.getenv("API_TOKEN"):
raise HTTPException(status_code=401, detail="Invalid token")
return token
@app.get("/api/protected")
async def protected_route(token: str = Depends(verify_token)):
return {"message": "访问成功"}
14.4.2 速率限制
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
@app.get("/api/run-workflow")
@limiter.limit("5/minute") # 每分钟最多 5 次
async def run_workflow_limited(request):
# ...
附录
A. 完整项目结构
resume_screen_agent/
├── db/
│ ├── __init__.py
│ └── database.py # 数据库模型和会话管理
├── utils/
│ ├── __init__.py
│ ├── cosine_similarity.py # 余弦相似度计算
│ ├── get_embedding.py # 嵌入向量生成
│ ├── jd_chroma.py # JD 向量存储
│ ├── message_pydantic.py # 简历数据模型
│ └── resume_core_text.py # 简历文本提取
├── workflow/
│ ├── __init__.py
│ ├── graph_desin.py # 工作流图设计
│ ├── graph_state.py # 工作流状态定义
│ └── nodes/
│ ├── __init__.py
│ ├── filter_resume_node.py # 简历筛选节点
│ ├── llm_score_node.py # LLM 评分节点
│ ├── meta_email_node.py # 邮件处理节点
│ ├── resume_text_node.py # 简历文本读取节点
│ └── structured_resume_node.py # 简历结构化节点
├── static/
│ └── index.html # Web 前端界面
├── fliter_resumes/ # 筛选后的简历存储
├── resumes/ # 原始简历存储
├── chroma_db/ # ChromaDB 数据(.gitignore)
├── .env # 环境变量(.gitignore)
├── .env.example # 环境变量模板
├── .gitignore
├── Dockerfile
├── docker-compose.yml
├── docker-compose.prod.yml
├── main.py # 应用入口
├── requirements.txt
└── README.md
B. 常用命令速查
# 开发环境
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install -r requirements.txt
uvicorn main:app --reload
# Docker 部署
docker compose up -d
docker compose logs -f
docker compose down
# 数据库操作
docker compose exec db mysql -u root -p
docker compose exec app python -c "from db.database import Base, engine; Base.metadata.create_all(bind=engine)"
# 测试 API
curl http://localhost:8000/api/departments
curl http://localhost:8000/docs
C. 故障排查清单
- [ ] 检查
.env文件配置是否正确 - [ ] 验证数据库连接
- [ ] 确认 API Key 有效
- [ ] 检查 Docker 容器状态
- [ ] 查看应用日志
- [ ] 测试网络连通性
- [ ] 验证文件路径权限
- [ ] 检查发件人填写主题是否合规
发表评论