后端专题
FastAPI 面试
FastAPI 的依赖注入系统是怎么工作的?—— 从 Starlette 到多层 DI 容器
第一层:函数即依赖
FastAPI 的依赖注入(DI)核心思想极其简单——一个函数就是一个依赖:
from fastapi import Depends, FastAPI
app = FastAPI()
# 这就是一个"依赖"
def get_db():
db = create_connection()
try:
yield db
finally:
db.close()
# 使用 Depends 声明依赖
@app.get("/users")
def list_users(db=Depends(get_db)):
return db.query("SELECT * FROM users")
不需要像 Spring 那样的注册中心、XML 配置或装饰器工厂。FastAPI 的 DI 就是函数调用——框架负责调用 get_db(),然后把返回值注入到路径函数中。
第二层:层级依赖
依赖可以嵌套,形成一个 DAG(有向无环图):
def get_current_user(token: str = Depends(oauth2_scheme)):
user = decode_token(token)
if not user:
raise HTTPException(status_code=401)
return user
def get_user_permissions(user=Depends(get_current_user)):
return load_permissions(user)
@app.get("/admin")
def admin_dashboard(
user=Depends(get_current_user),
permissions=Depends(get_user_permissions)
):
return {"user": user, "permissions": permissions}
框架会解析依赖图,按拓扑顺序执行,保证每个依赖只实例化一次(单次请求范围内):
请求到达 → oauth2_scheme() → get_current_user() → get_user_permissions()
↘ ↗
admin_dashboard
第三层:三种作用域
FastAPI 的 Depends 有三种生命周期:
| 作用域 | 行为 | 使用场景 |
|---|---|---|
| 请求级(默认) | 每次请求创建新实例 | 数据库会话、请求验证 |
| 全局单例 | 应用启动时创建一次 | HTTP 客户端、配置对象、模型加载 |
| 连接级(WebSocket) | WebSocket 生命周期内共享 | WebSocket 会话管理 |
# 全局单例——用普通函数即可
# FastAPI 会在首次依赖时缓存它
@lru_cache
def get_settings():
return Settings()
@app.get("/config")
def read_config(settings: Settings = Depends(get_settings)):
return settings.dict()
第四层:Sub-dependencies 和缓存
FastAPI 使用 FastAPI DI cache:在同一请求中,同一个依赖函数只执行一次。
call_count = 0
def get_db():
global call_count
call_count += 1
print(f"called {call_count} times") # 只会输出一次
return Database()
@app.get("/items")
def get_items(db=Depends(get_db)):
...
@app.get("/users")
def get_users(db=Depends(get_db)):
...
但这里的"一次"是按路径函数算的——每个路径函数都有自己的依赖缓存。所以两个不同的路径里调用 get_db 会各执行一次。
第五层:依赖覆盖与测试
FastAPI DI 最强大的特性是依赖可覆盖,这对测试至关重要:
# 生产代码
def get_db():
return ProductionDB()
# 测试覆盖
app.dependency_overrides[get_db] = lambda: TestDB()
# 也可以用更优雅的方式
def override_get_db():
db = TestDB()
db.init()
yield db
db.cleanup()
app.dependency_overrides[get_db] = override_get_db
依赖覆盖使得不需要任何 mock 框架——只需要把生产依赖替换为测试依赖即可。
高手视野
FastAPI 的 DI 本质上是对函数式编程中 Partial Application 的封装——Depends 只是声明了"这个参数将来由框架注入"。它与路径参数的绑定是在应用启动时通过类型签名静态确定的,不是运行时动态查找的。
对比 Spring:Spring 的 DI 是类级别、全局容器的 IoC;FastAPI 的 DI 是函数级别、请求范围的显式注入。两者理念相反,但各有优势。
Pydantic V2 相比 V1 核心变化是什么?—— Rust 核心到性能飞跃
第一层:V2 是重写,不是升级
Pydantic V2 的核心变化:验证引擎从 Python 重写为 Rust。
Pydantic V1: Python 实现验证逻辑
Pydantic V2: Rust 核心 (pydantic-core) + Python 绑定
这个变化带来了 5-50x 的性能提升,特别是对于复杂嵌套模型的验证。
第二层:Validator 写法的变化
V1 的 @validator 在 V2 中变为 @field_validator,且行为有差异:
# Pydantic V1
from pydantic import BaseModel, validator
class User(BaseModel):
name: str
age: int
@validator('name')
def name_must_not_empty(cls, v):
if not v:
raise ValueError('name is empty')
return v
# Pydantic V2
from pydantic import BaseModel, field_validator
class User(BaseModel):
name: str
age: int
@field_validator('name')
@classmethod
def name_must_not_empty(cls, v):
if not v:
raise ValueError('name is empty')
return v
关键区别:
- V2 的 validator 默认是 实例方法(
self),不是类方法——需要显式加@classmethod - V2 的
@field_validator不再支持each_item=True(用@field_validator('items', mode='wrap')替代) - V2 validator 的返回值只影响当前字段,不再支持修改其他字段(用
@model_validator替代)
第三层:model_validator 替代整个模型验证
V2 引入了更清晰的分离:
from pydantic import BaseModel, model_validator
class Order(BaseModel):
items: list[str]
total: float
discount: float = 0.0
@model_validator(mode='after')
def calculate_discount(self):
"""验证整个模型后才执行的逻辑"""
if self.total > 100:
self.discount = 0.1
return self
mode='after':模型字段都验证完再执行,可以安全地访问 self
mode='before':在字段验证之前执行,接收原始字典
第四层:类型窄化和 discriminated union
V2 原生支持 Discriminated Union(可辨别联合),替代了 V1 中的复杂自定义:
from typing import Literal, Union
from pydantic import BaseModel, Field
class Cat(BaseModel):
pet_type: Literal['cat']
meows: int
class Dog(BaseModel):
pet_type: Literal['dog']
barks: int
class Animal(BaseModel):
pet: Union[Cat, Dog] = Field(discriminator='pet_type')
# 自动根据 pet_type 决定用哪个模型
animal = Animal.model_validate({'pet': {'pet_type': 'cat', 'meows': 3}})
底层使用 Tagged Union(标记联合)模式——Rust 枚举的标准做法,比 V1 的穷举匹配更高效。
第五层:Config 的变更
V2 中 Config 类的写法变了:
# Pydantic V1
class Model(BaseModel):
class Config:
orm_mode = True
allow_population_by_field_name = True
# Pydantic V2
class Model(BaseModel):
model_config = {
"from_attributes": True, # 原名 orm_mode
"populate_by_name": True, # 原名 allow_population_by_field_name
"extra": "forbid", # 禁止额外字段
}
model_config 是类属性,支持字典和 ConfigDict 两种写法。
第六层:序列化方式变化
V2 推荐使用 model_dump() 替代 dict(),model_dump_json() 替代 json():
# V1 写法(仍可用,但已废弃)
user.dict()
user.json()
# V2 推荐写法
user.model_dump()
user.model_dump_json()
并且 V2 支持自定义序列化器 @field_serializer:
from pydantic import BaseModel, field_serializer
from datetime import datetime
class Event(BaseModel):
name: str
timestamp: datetime
@field_serializer('timestamp')
def serialize_dt(self, value: datetime) -> str:
return value.isoformat()
高手视野
Pydantic V2 的核心差异是从纯 Python 的类型注解验证 → Rust 引擎 + Python 胶水。这个架构变化意味着:
- V2 的
model_validate比 V1 的parse_obj快 5-50x - V2 的内存占用更低(Rust 侧直接管理内存)
- V2 的错误信息格式变了(Rust 风格的
ValidationError结构)
迁移建议:不要手动迁移,用 bump-pydantic 工具自动转换。
FastAPI 如何处理异步请求?—— ASGI、Event Loop 与线程池
第一层:ASGI 是基础
FastAPI 底层是 Starlette,Starlette 的底层是 ASGI(Asynchronous Server Gateway Interface):
WSGI(Django/Flask):
Client → WSGI Server (Gunicorn) → WSGI App → 同步处理
ASGI(FastAPI/Starlette):
Client → ASGI Server (Uvicorn) → ASGI App → 异步/同步处理
ASGI 的接口非常简单——就是一个异步函数:
# ASGI 规范:一个可调用对象,接收 scope, receive, send
async def app(scope, receive, send):
assert scope['type'] == 'http'
await send({
'type': 'http.response.start',
'status': 200,
'headers': [(b'content-type', b'text/plain')]
})
await send({
'type': 'http.response.body',
'body': b'Hello, ASGI!'
})
第二层:async def vs def —— 自动线程池
FastAPI 的精妙设计:自动判断同步/异步执行路径。
@app.get("/async")
async def async_endpoint():
# 直接在主事件循环中运行
await asyncio.sleep(1)
return {"result": "async"}
@app.get("/sync")
def sync_endpoint():
# FastAPI 自动放入线程池执行
import time
time.sleep(1)
return {"result": "sync"}
当你定义 def(同步函数)时,FastAPI 会:
- 检测到这个函数不是
async def - 自动用 线程池执行器(
run_in_executor)包装它 - 主事件循环不会被阻塞,仍然可以处理其他请求
async def → 直接 await → 事件循环直接调用
def → run_in_executor → 丢给线程池 → 事件循环不阻塞
第三层:为什么同步会阻塞事件循环
@app.get("/bad")
def blocking_operation():
# 这是个同步函数,FastAPI 会在线程池执行
# 但是如果在 async def 中调用同步阻塞代码:
time.sleep(10) # 这会阻塞事件循环!
return {"result": "done"}
@app.get("/also-bad")
async def bad_async():
# ❌ 在 async 函数中调用同步阻塞代码
time.sleep(10) # 阻塞整个事件循环!
return {"result": "done"}
@app.get("/correct")
async def correct_async():
# ✅ 用 asyncio 的异步版本
await asyncio.sleep(10)
# 或者用 run_in_executor 包装
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, time.sleep, 10)
return {"result": "done"}
第四层:协程与数据库驱动
FastAPI 异步性能的关键不在框架,而在于 IO 操作是否真正异步:
# ❌ 不匹配:urllib 是同步的
import urllib.request
@app.get("/bad-external")
async def bad_external():
resp = urllib.request.urlopen("https://api.example.com") # 阻塞!
data = resp.read()
return {"data": data}
# ✅ 匹配:httpx 支持异步
import httpx
@app.get("/good-external")
async def good_external():
async with httpx.AsyncClient() as client:
resp = await client.get("https://api.example.com")
return {"data": resp.json()}
对于数据库:
同步 ORM: async def + 同步 DB → 阻塞事件循环 ❌
异步 ORM (SQLAlchemy 1.4+ async):
async def + async DB → 真正非阻塞 ✅
同步 ORM + 线程池: def + sync DB → FastAPI 自动线程池 ✅(性能稍差)
第五层:并发模型对比
Flask (WSGI, 多进程/多线程):
请求 1 → 线程 1 (完成) 请求 2 → 线程 2 (完成)
↑ 每个请求消耗一个 OS 线程,上下文切换开销大
FastAPI (ASGI, 事件循环):
请求 1 → 协程 A (await IO) ─→ 协程 B (继续) → 协程 A 继续
↑ 协程切换是用户态操作,微秒级,没有锁竞争
FastAPI + sync def (线程池):
请求 1 → 线程池... 请求 2 → 线程池...
↑ 同步操作仍然走线程,但主循环不阻塞
高手视野
FastAPI 不是"异步框架" 而是 "混合同步/异步框架"。它的核心能力是允许你先用同步代码写业务逻辑,性能瓶颈处逐步替换为异步。这是实践中最务实的策略——100% 异步化很难,也没有必要。
选择原则:
CPU 密集型 → 用多进程/走线程池(asyncio 解决不了 CPU 瓶颈)
IO 密集型 → 用 async/await(网络 IO、数据库查询、文件读写)
简单操作 → 用 def(FastAPI 自动线程池,够用了)
FastAPI WebSocket 如何处理?—— 从连接管理到广播
第一层:基础 WebSocket 端点
FastAPI 的 WebSocket 支持基于 Starlette,使用起来非常直接:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"你说了: {data}")
except WebSocketDisconnect:
print("客户端断开连接")
核心流程:
客户端发起 WebSocket 握手
→ FastAPI 验证路由
→ 调用 websocket.accept() 确认连接
→ 循环:receive → 处理 → send
→ WebSocketDisconnect 异常 → 清理
第二层:连接管理
生产环境中需要管理多个连接:
from fastapi import FastAPI, WebSocket
from typing import Set
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: dict[str, WebSocket] = {}
async def connect(self, user_id: str, ws: WebSocket):
await ws.accept()
self.active_connections[user_id] = ws
def disconnect(self, user_id: str):
self.active_connections.pop(user_id, None)
async def send_to_user(self, user_id: str, message: str):
if ws := self.active_connections.get(user_id):
await ws.send_text(message)
async def broadcast(self, message: str):
for ws in self.active_connections.values():
await ws.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{user_id}")
async def ws_endpoint(ws: WebSocket, user_id: str):
await manager.connect(user_id, ws)
try:
while True:
data = await ws.receive_text()
# 向特定用户发送
await manager.send_to_user(some_user_id, data)
except WebSocketDisconnect:
manager.disconnect(user_id)
第三层:WebSocket 与 HTTP 端点的依赖
WebSocket 也支持 Depends,但行为与 HTTP 不同:
from fastapi import Depends, WebSocket, WebSocketException, status
async def get_cookie_or_token(websocket: WebSocket):
# WebSocket 的 query params 常用于认证
token = websocket.query_params.get("token")
if not token:
await websocket.close(code=1008) # Policy Violation
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
return token
@app.websocket("/ws")
async def ws_with_auth(
websocket: WebSocket,
token: str = Depends(get_cookie_or_token)
):
await websocket.accept()
# ...
注意:异常处理在 WebSocket 中不一样——HTTPException 不适用,要用 WebSocketException 或直接 websocket.close()。
第四层:WebSocket 与异步消息队列(生产级)
Client → WebSocket → FastAPI → Redis Pub/Sub → 其他服务
↕
广播给所有 WebSocket 连接
生产级别的 WebSocket 通常会结合消息队列做广播:
import asyncio
import json
import redis.asyncio as aioredis
class PubSubManager:
def __init__(self, redis_url="redis://localhost:6379"):
self.redis = aioredis.from_url(redis_url)
self.pubsub = self.redis.pubsub()
self.channels: dict[str, set[WebSocket]] = {}
async def subscribe(self, channel: str, ws: WebSocket):
if channel not in self.channels:
self.channels[channel] = set()
await self.pubsub.subscribe(channel)
self.channels[channel].add(ws)
async def unsubscribe(self, channel: str, ws: WebSocket):
self.channels[channel].discard(ws)
if not self.channels[channel]:
await self.pubsub.unsubscribe(channel)
del self.channels[channel]
async def listen(self):
while True:
message = await self.pubsub.get_message(
ignore_subscribe_messages=True
)
if message:
channel = message["channel"].decode()
data = message["data"].decode()
for ws in self.channels.get(channel, set()):
try:
await ws.send_text(data)
except Exception:
pass
await asyncio.sleep(0.01)
第五层:性能与扩展
WebSocket 长连接在服务器端是有限资源:
| 资源 | 单机限制 | 优化策略 |
|---|---|---|
| 文件描述符 | 默认 1024 (ulimit) | 调整 ulimit -n 65536 |
| 内存 | 每个连接 ~20-50 KB RAM | 用 epoll/kqueue,不用每个连接一个线程 |
| CPU | 维持连接开销极低 | 大量连接时用 asyncio 的 read/write 不占 CPU |
WebSocket 的扩展方向:
单机 (Uvicorn + 事件循环)
↓ 连接数 > 10,000
Uvicorn workers (多进程)
↓ 需要跨进程通信
ASGI 网关 (Daphne / Hypercorn)
↓ 需要水平扩展
WebSocket 网关层 (API Gateway + Redis Pub/Sub + K8s)
高手视野
WebSocket 的真正难点不在 FastAPI 本身,而在于:
- 重新连接策略:客户端重连时如何恢复状态
- 心跳保活:30s 一次 ping/pong 防止代理断开
- 消息有序性:WebSocket 保证帧有序,但多个连接间的消息需要应用层保证
- 优雅关闭:服务器关闭时先通知所有客户端,再等待清理完成
FastAPI 的 WebSocket 设计是"轻量可用的"——简单场景直接上手,复杂场景需要自己补全缺失的抽象层(连接池、重连、心跳、序列化协议)。
FastAPI 的 BackgroundTasks 与 Celery 如何选?—— 从同步提交到任务队列
第一层:BackgroundTasks —— 轻量级后处理
FastAPI 内置的 BackgroundTasks 适用于"用户不需要等待结果"的轻量任务:
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def write_log(message: str):
with open("log.txt", "a") as f:
f.write(f"{message}\n")
@app.post("/send-email")
def send_email(email: str, bg: BackgroundTasks):
# 先返回响应
bg.add_task(write_log, f"Email sent to {email}")
return {"message": "Email sent"}
特性:
BackgroundTasks:
├── 同一进程内执行
├── 与请求共享事件循环(async def 时)或线程池(def 时)
├── 没有持久化——服务器崩溃任务丢失
├── 没有重试机制
├── 没有任务状态追踪
└── 适合:写日志、发通知、缓存预热
第二层:为什么不是"真正的异步"
BackgroundTasks 不是 asyncio.create_task 或 threading.Thread,而是在返回响应后才执行的任务:
# BackgroundTasks 的执行时机
@app.post("/task")
async def create_task(bg: BackgroundTasks):
bg.add_task(heavy_task) # 排队
return {"message": "done"}
# ↑ 返回响应
# ↓ 然后才执行 heavy_task(在同一个 asyncio 循环中)
所以如果 heavy_task 是同步且耗时的,它仍然会阻塞事件循环,影响后续请求。
def heavy_task():
time.sleep(10) # ❌ 虽然不影响响应,但阻塞了事件
# ✅ 正确的做法:用 async def 定义后台任务
async def heavy_task_async():
await asyncio.sleep(10) # 不阻塞事件循环
第三层:Celery —— 分布式任务队列
当 BackgroundTasks 不够用时(需要持久化、重试、定时调度、分布式执行),就是 Celery 上场的时候:
from celery import Celery
celery = Celery(
"tasks",
broker="redis://localhost:6379/0", # 消息队列
backend="redis://localhost:6379/1" # 结果存储
)
@celery.task(bind=True, max_retries=3, default_retry_delay=60)
def process_pdf(self, pdf_path: str):
try:
# 耗时处理
result = heavy_pdf_processing(pdf_path)
return {"status": "done", "result": result}
except Exception as e:
raise self.retry(exc=e)
Celery 架构:
┌──────────────┐ ┌──────────┐ ┌─────────────┐
│ FastAPI APP │───▶│ Broker │───▶│ Worker 1 │
│ (生产者) │ │ (Redis) │ │ Worker 2 │
└──────────────┘ └──────────┘ │ Worker 3 │
└──────┬──────┘
▼
┌─────────────┐
│ Backend │
│ (Redis/DB) │
└─────────────┘
第四层:结合 BackgroundTasks 和 Celery
生产中的常见模式——BackgroundTasks 负责触发 Celery 任务:
from celery.result import AsyncResult
@celery.task
def generate_report(user_id: int):
# 耗时任务
report = create_report(user_id)
save_to_db(report)
return report
@app.post("/reports")
async def request_report(user_id: int, bg: BackgroundTasks):
# 提交 Celery 任务
task = generate_report.delay(user_id)
# 用 BackgroundTasks 启动查询轮询
bg.add_task(poll_task_completion, task.id)
return {"task_id": task.id, "status": "queued"}
@app.get("/reports/{task_id}")
async def get_report_status(task_id: str):
result = AsyncResult(task_id, app=celery)
if result.ready():
return {"status": "done", "result": result.get()}
return {"status": "processing"}
第五层:选型决策矩阵
BackgroundTasks Celery Dramatiq ARQ
─────────────── ─────── ──────── ───
持久化 ✗ ✅ ✅ ✅
重试机制 ✗ ✅ ✅ ✅
定时调度 ✗ ✅ ✅ ✅
任务优先级 ✗ ✅ ✅ ✅
任务链/工作流 ✗ ✅ ✅ ✅
分布式执行 ✗ ✅ ✅ ✅
监控面板 ✗ ✅(Flower) ✅(WebUI) ✅(WebUI)
外部依赖 无 Redis/RabbitMQ Redis Redis
部署复杂度 零 中 低 低
选择原则:
- 任务在秒级、可丢失、不需要重试 → BackgroundTasks
- 任务在分钟级、需要磁盘或 GPU 资源、需要重试 → Celery(生态最成熟)
- 任务在秒-分级、不想写太多配置、只需要 Redis → ARQ(轻量异步,与 FastAPI 生态匹配好)
- "我就跑个定时任务" → APScheduler 就够了
高手视野
Celery 最大的问题是配置复杂和任务死锁(任务依赖同类型资源时容易饿死)。对于现代 Python 异步项目,ARQ(基于 Redis 和 asyncio)正在成为 Celery 的轻量替代品:
# ARQ —— 纯异步、轻量、与 FastAPI 天然配合
from arq import create_pool
from arq.connections import RedisSettings
async def process_video(ctx, video_id: str):
# ctx 中包含 redis 连接
await heavy_processing(video_id)
return {"video_id": video_id, "status": "done"}
class WorkerSettings:
functions = [process_video]
redis_settings = RedisSettings()
如果项目已经是全异步栈(FastAPI + SQLAlchemy async + httpx),ARQ 比 Celery 更匹配。
FastAPI 如何做 API 版本管理?—— 从 URL 路径到 Header 协商
第一层:URL 路径版本
最直观的方式——在路由上加版本前缀:
from fastapi import APIRouter, FastAPI
app = FastAPI()
# V1 路由
v1_router = APIRouter(prefix="/v1")
@v1_router.get("/users")
def list_users_v1():
return [{"name": "Alice", "age": 30}]
# V2 路由
v2_router = APIRouter(prefix="/v2")
@v2_router.get("/users")
def list_users_v2():
return [
{"name": "Alice", "age": 30, "email": "alice@example.com"}
]
app.include_router(v1_router)
app.include_router(v2_router)
第二层:子应用挂载
更彻底的隔离——不同版本用不同的 ASGI 应用:
from fastapi import FastAPI
app_v1 = FastAPI()
app_v2 = FastAPI()
app = FastAPI()
# 挂载为子应用
app.mount("/v1", app_v1)
app.mount("/v2", app_v2)
@app_v1.get("/users")
def list_users():
return [{"name": "Alice", "age": 30}]
@app_v2.get("/users")
def list_users_v2():
return [{"name": "Alice", "age": 30, "email": "alice"}]
优点是真隔离——中间件、异常处理、依赖都可以不同。缺点是不能共享代码。
第三层:Accept Header 版本(内容协商)
更优雅的方式——不想污染 URL,可以用 Header:
from fastapi import APIRouter, FastAPI, Request, HTTPException
from typing import Callable
app = FastAPI()
class VersionedRouter:
def __init__(self):
self._handlers: dict[tuple[str, str, int], Callable] = {}
def add(self, method: str, path: str, version: int):
def decorator(func: Callable):
self._handlers[(method.upper(), path, version)] = func
return func
return decorator
def get_handler(self, method: str, path: str, version: int):
return self._handlers.get((method.upper(), path, version))
router = VersionedRouter()
@router.add("GET", "/users", version=1)
def list_users_v1():
return [{"name": "Alice", "age": 30}]
@router.add("GET", "/users", version=2)
def list_users_v2():
return [{"name": "Alice", "age": 30, "email": "alice"}]
@app.get("/users")
async def users(request: Request):
version = int(request.headers.get("Accept-Version", "1"))
handler = router.get_handler("GET", "/users", version)
if not handler:
raise HTTPException(406, "Version not supported")
return handler()
第四层:生产中的版本策略
├── /v1/ —— 稳定接口,只修 bug,不新增
├── /v2/ —— 当前稳定版本
├── /v3/ —— 最新版本(可能有不兼容变更)
└── /v3/experimental/ —— 实验特性,提前预览
生产中的建议:
from fastapi import APIRouter, FastAPI
from datetime import datetime
app = FastAPI()
# V1 冻结 —— 只修 bug
v1 = APIRouter(prefix="/v1")
@v1.get("/ping")
def ping():
return {"version": "1", "deprecated_at": "2025-06-01"}
# V2 当前版本
v2 = APIRouter(prefix="/v2")
@v2.get("/users")
def get_users():
...
# V3 — 最新特性
v3 = APIRouter(prefix="/v3")
@v3.get("/users")
def get_users_v3():
...
app.include_router(v1)
app.include_router(v2)
app.include_router(v3)
高手视野
API 版本管理最重要的决策不是"用什么技术",而是什么时候需要新版本:
- 兼容变更(加字段、加可选参数)→ 不需要新版本
- 不兼容但可共存(改字段名、改类型) → URL 路径版本
- 完全不兼容且需要(改协议、改认证方式)→ 子应用或 Header 版本
实际项目中最常见的是:永远不需要多版本共存。大部分团队的做法是:标记废弃(deprecated),保留旧版本一段时间,到期强制升级。这不是偷懒,而是维护多个版本的成本远超收益。
FastAPI 的 Middleware 执行顺序是怎样的?—— ASGI 洋葱模型
第一层:基础用法
from fastapi import FastAPI, Request
import time
app = FastAPI()
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
第二层:洋葱模型
FastAPI / Starlette 的 middleware 是典型的洋葱模型(Onion Model):
@app.middleware("http")
async def middleware_a(request: Request, call_next):
print("A before")
response = await call_next(request)
print("A after")
return response
@app.middleware("http")
async def middleware_b(request: Request, call_next):
print("B before")
response = await call_next(request)
print("B after")
return response
@app.get("/")
async def main():
print("handler")
return {"message": "OK"}
输出顺序:
A before
B before
handler
B after
A after
执行流(洋葱剖面):
Middleware A ──→ Middleware B ──→ Route Handler ──→ Middleware B ──→ Middleware A
外层 内层 核心 内层反回 外层反回
第三层:注册顺序决定内外层
注意:先注册的 middleware 是外层,后注册的是内层。
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
# 这是最外层
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["example.com", "*.example.com"],
)
# 这是 TrustedHost 的内层 / CORSMiddleware 的外层
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
)
# 自定义 middleware 在内层
@app.middleware("http")
async def custom_middleware(request, call_next):
...
TrustedHostMiddleware (最外层)
└── CORSMiddleware
└── custom_middleware (最内层)
└── Route Handler
第四层:CLI / ASGI 级别的 middleware
除了 @app.middleware("http"),还有更底层的 ASGI middleware:
from starlette.middleware.base import BaseHTTPMiddleware
class CustomMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# 前置处理
start = time.time()
response = await call_next(request)
# 后置处理
elapsed = time.time() - start
response.headers["X-Elapsed"] = str(elapsed)
return response
app.add_middleware(CustomMiddleware)
BaseHTTPMiddleware 是 @app.middleware("http") 的类版本,两者执行顺序模型相同。
第五层:Middleware 与 Depends 的执行时机
注意:Middleware 在 Depends 之前执行。
@app.middleware("http")
async def logging_middleware(request: Request, call_next):
print("1 - middleware before")
response = await call_next(request)
print("4 - middleware after")
return response
async def verify_token(token: str = Header(...)):
print("2 - dependency")
return token
@app.get("/secure")
async def secure_endpoint(token: str = Depends(verify_token)):
print("3 - handler")
return {"token": token}
输出:
1 - middleware before
2 - dependency
3 - handler
2 (dep 执行完,但已经过)
4 - middleware after
高手视野
理解执行顺序对调试和安全性至关重要:
- 想在所有请求前检查 IP 黑名单 → 最外层 middleware
- 想在所有 handler 后添加通用 header → 最外层 middleware 的 after 部分
- 想在认证之后做请求日志 → 用 Depends 而非 middleware
- 想在middleware 里访问认证用户 → 确保 middleware 在认证 Depends 之后——但 middleware 在 Depends 之前执行,所以需要通过
request.state传递:
# 在依赖中设置
async def get_current_user(request: Request):
user = await verify_token(request)
request.state.user = user
return user
# 在 middleware 中读取
@app.middleware("http")
async def log_user(request: Request, call_next):
response = await call_next(request)
if hasattr(request.state, "user"):
print(f"User: {request.state.user}")
return response
FastAPI 项目的最佳目录结构是什么?—— 从单体到模块化
第一层:快速原型(单文件够了)
app.py # 所有代码在一个文件
这不丢人——MVP / 原型阶段保持单文件是最快的。
第二层:工程化结构(推荐)
project/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI 应用实例、启动配置
│ ├── config.py # 配置加载(Pydantic Settings)
│ ├── database.py # 数据库连接、Session 管理
│ ├── dependencies.py # 全局依赖(认证、数据库获取)
│ ├── models/ # SQLAlchemy 模型
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── order.py
│ ├── schemas/ # Pydantic 模型(请求/响应)
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── order.py
│ ├── routers/ # 路由模块
│ │ ├── __init__.py
│ │ ├── users.py
│ │ └── orders.py
│ ├── services/ # 业务逻辑层
│ │ ├── __init__.py
│ │ ├── user_service.py
│ │ └── order_service.py
│ └── utils/ # 工具函数
│ ├── __init__.py
│ └── security.py
├── tests/
│ ├── conftest.py # pytest fixtures(覆盖依赖)
│ ├── test_users.py
│ └── test_orders.py
├── alembic/ # 数据库迁移
├── alembic.ini
├── pyproject.toml
└── Dockerfile
第三层:模块化(大项目)
project/
├── app/
│ ├── main.py
│ ├── config.py
│ ├── database.py
│ ├── common/ # 跨模块共享
│ │ ├── dependencies.py
│ │ ├── exceptions.py
│ │ └── middleware.py
│ └── modules/ # 按业务域拆分
│ ├── users/
│ │ ├── __init__.py
│ │ ├── router.py
│ │ ├── schemas.py
│ │ ├── service.py
│ │ └── models.py
│ └── orders/
│ ├── __init__.py
│ ├── router.py
│ ├── schemas.py
│ ├── service.py
│ └── models.py
第四层:service 层 vs router 层
路由层(routers/)只做三件事:
# ✅ 好的 router 代码
@router.get("/users/{user_id}")
def get_user(
user_id: int,
service: UserService = Depends(get_user_service)
):
"""只负责:参数验证、调用 service、返回响应"""
user = service.get_user_by_id(user_id)
if not user:
raise HTTPException(404, "User not found")
return UserResponse.model_validate(user)
# ❌ 坏的 router 代码——业务逻辑泄漏到了路由层
@router.get("/users/{user_id}")
def get_user_bad(user_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(404)
# 业务逻辑混在这里...
if user.role == "admin" and not user.is_active:
user.is_active = True
db.commit()
return user
高手视野
路由层 → 参数处理 + 请求/响应结构
Service 层 → 业务逻辑 + 事务管理
Repository 层 → 数据访问(可选,当 ORM 不够用才加)
Model 层 → 数据库表映射
Schema 层 → 数据验证 + 序列化
原则:
- 路由层应该薄——可以厚的是 service 层
- Service 层不感知 HTTP——不引用
Request、Response、Header - Service 层是可测试的——传什么依赖,出什么结果,不需要启动服务器
- Schema 只做数据描述——不包含业务逻辑
Django 面试
Django ORM 的 N+1 查询问题是什么?—— 从懒加载到查询优化
第一层:N+1 是什么
# models.py
class Author(models.Model):
name = models.CharField(max_length=100)
class Book(models.Model):
title = models.CharField(max_length=100)
author = models.ForeignKey(Author, on_delete=models.CASCADE)
# 查询所有书籍及其作者姓名
books = Book.objects.all() # 1 次查询
for book in books:
print(book.author.name) # N 次查询(每本书查询一次作者)
总共执行了 1 + N 次 SQL 查询——这就是 N+1 问题。
SQL 执行:
SELECT * FROM book; ← 1 次
SELECT * FROM author WHERE id = 1; ← 第 1 本书
SELECT * FROM author WHERE id = 2; ← 第 2 本书
SELECT * FROM author WHERE id = 3; ← 第 3 本书
... ← N 本书 → N 次
第二层:select_related —— JOIN 解决
select_related 通过 SQL JOIN 一次性查完所有关联数据:
# ✅ 用 select_related
books = Book.objects.select_related('author').all() # 1 次 SQL
for book in books:
print(book.author.name) # 不会产生额外查询
生成的 SQL:
SELECT book.id, book.title, book.author_id,
author.id, author.name
FROM book
INNER JOIN author ON book.author_id = author.id;
适用场景:ForeignKey、OneToOneField(一对多、一对一)
第三层:prefetch_related —— 额外查询解决
prefetch_related 无法用 JOIN 实现时使用——比如反向关联和多对多:
class Author(models.Model):
name = models.CharField(max_length=100)
class Book(models.Model):
title = models.CharField(max_length=100)
author = models.ForeignKey(Author, on_delete=models.CASCADE, related_name='books')
# 查询所有作者及其书籍列表
authors = Author.objects.prefetch_related('books').all() # 2 次 SQL
for author in authors:
print(author.books.all()) # 不会产生额外查询
生成的 SQL:
SELECT * FROM author; -- 1 次
SELECT * FROM book WHERE author_id IN (1, 2, 3); -- 1 次(而不是 N 次)
prefetch_related 在 Python 中做关联,而不是在数据库中 JOIN。
第四层:Prefetch 对象——精细控制
Prefetch 对象允许对预取做更精细的控制:
from django.db.models import Prefetch
# 只预取最近 5 本书,且只取已出版的
authors = Author.objects.prefetch_related(
Prefetch(
'books',
queryset=Book.objects.filter(published=True).order_by('-pub_date')[:5],
to_attr='recent_books'
)
)
for author in authors:
print(author.recent_books) # 使用自定义属性名
第五层:only/defer —— 只加载需要的字段
# 只加载 name,其他字段按需加载
users = User.objects.only('id', 'name').all()
# 排除 content 字段,其他字段正常加载
articles = Article.objects.defer('content').all()
配合 select_related 可以进一步减少数据传输量:
books = Book.objects.select_related('author').only(
'title', 'author__name' # 只取书的 title 和作者的 name
)
第六层:annotate —— 聚合查询避免循环
from django.db.models import Count
# ❌ 坏做法:循环中查询
authors = Author.objects.all()
for author in authors:
book_count = author.book_set.count() # N 次查询
# ✅ 好做法:用 annotate 聚合
authors = Author.objects.annotate(
book_count=Count('book')
)
for author in authors:
print(author.book_count) # 无额外查询
高手视野
选择器 本质 适用关联
──────── ──── ──────
select_related SQL JOIN 外键正向、一对一
prefetch_related 额外 IN 查询 + Python 关联 多对多、外键反向
Prefetch 带过滤/排序/限量的预取 复杂预取场景
annotate SQL 聚合 计数、求和等统计
only/defer 字段选择 减少数据传输
判断标准:
# 当你只需要访问关联对象的属性 → select_related
book.author.name
# 当你需要遍历关联对象集合 → prefetch_related
author.books.all()
# 当你需要统计关联数量 → annotate
Author.objects.annotate(book_count=Count('book'))
始终记住:Django ORM 是懒加载的,模板中每访问一个关联属性都可能触发一次 SQL。这就是为什么在 Django 项目中,N+1 最常见的地方是模板渲染。
Django REST Framework 的 ViewSet 和 View 有什么区别?—— 从函数到类到路由集
第一层:APIView —— 最底层的类视图
from rest_framework.views import APIView
from rest_framework.response import Response
class UserList(APIView):
def get(self, request):
users = User.objects.all()
serializer = UserSerializer(users, many=True)
return Response(serializer.data)
def post(self, request):
serializer = UserSerializer(data=request.data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=201)
return Response(serializer.errors, status=400)
APIView 是 DRF 的基础类——需要手动处理 HTTP 方法的映射。
第二层:GenericAPIView —— 加上了通用逻辑
from rest_framework import generics
class UserList(generics.ListCreateAPIView):
queryset = User.objects.all()
serializer_class = UserSerializer
只需要声明 queryset 和 serializer_class,框架自动处理:
GET /users/ → list (查询列表)
POST /users/ → create (创建)
GET /users/1/ → retrieve (查询单个)
PUT /users/1/ → update (全量更新)
PATCH /users/1/ → partial_update(部分更新)
DELETE /users/1/ → destroy (删除)
第三层:ViewSet —— 路由自动生成
from rest_framework import viewsets
class UserViewSet(viewsets.ModelViewSet):
queryset = User.objects.all()
serializer_class = UserSerializer
ViewSet 和 GenericAPIView 的区别不在于功能,而在于路由注册方式:
# GenericAPIView 路由——每个视图自己注册
urlpatterns = [
path('users/', UserList.as_view()),
path('users/<int:pk>/', UserDetail.as_view()),
]
# ViewSet 路由——用 Router 自动生成
from rest_framework.routers import DefaultRouter
router = DefaultRouter()
router.register(r'users', UserViewSet)
urlpatterns = router.urls
# 自动生成:
# GET /users/ → UserViewSet.list
# POST /users/ → UserViewSet.create
# GET /users/{id}/ → UserViewSet.retrieve
# PUT /users/{id}/ → UserViewSet.update
# PATCH /users/{id}/ → UserViewSet.partial_update
# DELETE /users/{id}/ → UserViewSet.destroy
第四层:自定义 Action
ViewSet 允许添加自定义方法(非 CRUD 操作):
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
class UserViewSet(viewsets.ModelViewSet):
queryset = User.objects.all()
serializer_class = UserSerializer
@action(detail=True, methods=['post'])
def set_password(self, request, pk=None):
user = self.get_object()
serializer = PasswordSerializer(data=request.data)
if serializer.is_valid():
user.set_password(serializer.validated_data['password'])
user.save()
return Response({'status': 'password set'})
return Response(serializer.errors, status=400)
@action(detail=False, methods=['get'])
def admins(self, request):
admins = self.get_queryset().filter(is_admin=True)
serializer = self.get_serializer(admins, many=True)
return Response(serializer.data)
自动生成的路由:
POST /users/{id}/set_password/
GET /users/admins/
detail=True → 路径需要 {id}
detail=False → 路径不需要 {id}
第五层:ViewSet 层次体系
View (Django 原生)
└── APIView (DRF 基础)
└── GenericAPIView (加 queryset + serializer_class)
├── Mixins (ListModelMixin, CreateModelMixin...)
│ └── 各种 Generic 视图
│ ├── ListAPIView
│ ├── CreateAPIView
│ ├── RetrieveAPIView
│ ├── UpdateAPIView
│ ├── DestroyAPIView
│ └── ListCreateAPIView... (组合)
│
└── ViewSet (把 Mixins 打包成一个类)
└── ModelViewSet (CRUD + 自动路由)
└── ReadOnlyModelViewSet (只读: list + retrieve)
高手视野
选择建议:
小型项目(< 10 个端点) → View 够用,简单直接
中型项目(10-50 个端点) → ViewSet + Router,少写大量 URL 配置
大型项目(> 50 个端点) → ViewSet 仍推荐,但需注意:
1. 避免在 ViewSet 里塞太多自定义 action
2. 自定义 action 超过 3 个 → 拆成独立的 View
3. 序列化器在不同 action 不同 → 用 get_serializer_class
DRF 的设计哲学:ViewSet 不是"视图",而是"资源端点集合"。它把对一个资源的所有操作(CRUD + 自定义)放在一个类里,Router 负责 URL 分发。弊端是 ViewSet 容易膨胀成上帝类——一个 ViewSet 里放 10 个 action,每个 action 做不同的事。
Django 的中间件执行顺序是怎样的?—— 从 WSGI 到响应返回的完整流程
第一层:中间件是什么
Django 中间件是一个处理请求/响应的插件系统,在全局层面拦截和处理 HTTP 请求:
# settings.py
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
第二层:洋葱模型
Django 中间件也是洋葱模型:
Request 进入
↓
SecurityMiddleware.process_request ← ① 从上到下执行 process_request
SessionMiddleware.process_request
CommonMiddleware.process_request
...
View(包括 URL 路由 → View 函数 → 处理)
...
XFrameOptionsMiddleware.process_response ← ② 从下到上执行 process_response
↓
Response 返回
# 一个自定义中间件演示
class SimpleMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# 请求阶段(process_request)
print("① 请求到达中间件")
request.custom_flag = "set_by_middleware"
# 传递给下一个中间件或视图
response = self.get_response(request)
# 响应阶段(process_response)
print("③ 响应离开中间件")
response["X-Custom"] = "added_by_middleware"
return response
第三层:Django 1.10 前后变化
Django 1.10 之前:
# 旧的 middleware 方式(每个方法单独定义)
class OldStyleMiddleware:
def process_request(self, request):
pass
def process_view(self, request, view_func, view_args, view_kwargs):
pass
def process_exception(self, request, exception):
pass
def process_response(self, request, response):
return response
Django 1.10 之后:
# 新的 middleware 方式(可调用对象)
class NewStyleMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# process_request
response = self.get_response(request)
# process_response
return response
新方式更接近 WSGI 中间件的标准模式。
第四层:完整的处理钩子
Django 请求生命周期(完整版):
① process_request → 在 URL 路由之前执行
如果返回 HttpResponse,直接跳过后续中间件和视图
② URL 路由 → 匹配视图函数
③ process_view → 在视图执行之前,传入视图函数本身
可用于权限检查、日志等
④ 视图执行 → 核心业务逻辑
⑤ process_template_response → 仅在视图返回 TemplateResponse 时调用
用于注入额外的模板上下文
⑥ process_response → 在所有场景下执行
任何中间件都不能跳过它
⑦ process_exception → 当视图抛出异常时触发
如果返回 None → Django 继续传播异常
如果返回 HttpResponse → 替换异常响应
┌────────────────────────────────────┐
│ │
▼ │
process_request → URL路由 → process_view → View → process_template_response
│ │
└─ (短路返回 HttpResponse) │
▼
process_response
│
▼
客户端
第五层:短路与异常
某些中间件会短路请求处理链:
# SecurityMiddleware 对非 HTTPS 请求的处理
class SecurityMiddleware(MiddlewareMixin):
def process_request(self, request):
# 如果不是 HTTPS,且设置了 SECURE_SSL_REDIRECT
if (not request.is_secure() and
settings.SECURE_SSL_REDIRECT):
# 直接返回重定向响应
return HttpResponsePermanentRedirect(
request.build_absolute_uri().replace('http://', 'https://')
)
# 返回 None → 继续执行下一个中间件
return None
如果 process_request 返回 HttpResponse,后续的中间件和视图都不会执行,直接进入响应阶段。
高手视野
调试中间件问题的常见模式:
现象 → 排查方向
──────────────────────────────────────────────────
请求没有到达视图 → process_request 返回了 response
视图执行了,但响应被修改 → process_response 修改了响应
部分用户被重定向了 → SecurityMiddleware 的 HTTPS 处理
CSRF token 相关错误 → csrf 中间件的位置(必须在 session 之后)
自定义 header 失效 → 顺序不对,被后面的中间件覆盖了
关键经验:多数中间件问题都是顺序问题。AuthenticationMiddleware 必须在 SessionMiddleware 之后,因为需要 session 才能获取用户。CsrfViewMiddleware 必须在所有能修改数据的视图之前。
Django 的信号(Signal)机制是什么?—— 从观察者模式到解耦
第一层:信号是什么
Django 的信号实现了观察者模式(Observer Pattern)——允许某些事件发生时通知监听者:
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.contrib.auth.models import User
# 监听器:当 User 创建时自动创建 Profile
@receiver(post_save, sender=User)
def create_user_profile(sender, instance, created, **kwargs):
if created:
Profile.objects.create(user=instance)
# 或者用连接方式
from django.db.models.signals import post_save
post_save.connect(create_user_profile, sender=User)
第二层:内置信号
Django 提供了丰富的内置信号:
| 信号 | 触发时机 | 常用场景 |
|---|---|---|
pre_save / post_save | 模型 save() 前/后 | 自动填充字段、创建关联对象 |
pre_delete / post_delete | 模型 delete() 前/后 | 级联清理、日志记录 |
m2m_changed | ManyToMany 变更 | 多对多关系维护 |
request_started / request_finished | HTTP 请求开始/结束 | 请求级别统计、连接管理 |
user_logged_in / user_logged_out | 用户登录/登出 | 登录日志、最后登录时间 |
user_login_failed | 登录失败 | 登录安全告警 |
第三层:自定义信号
from django.dispatch import Signal, receiver
# 1. 定义信号
order_cancelled = Signal()
# 2. 发送信号
class OrderService:
def cancel_order(self, order_id):
order = Order.objects.get(id=order_id)
order.status = 'cancelled'
order.save()
# 发送信号
order_cancelled.send(
sender=self.__class__,
order_id=order_id,
reason='user_request'
)
# 3. 监听信号
@receiver(order_cancelled)
def on_order_cancelled(sender, order_id, reason, **kwargs):
# 释放库存
InventoryService.release_stock(order_id)
# 发退款通知
notify_refund(order_id)
# 记录日志
logger.info(f"Order {order_id} cancelled: {reason}")
第四层:信号的执行流程
save() 调用
↓
pre_save 信号 → 所有监听器同步执行
↓
模型字段验证
↓
数据库 INSERT/UPDATE
↓
post_save 信号 → 所有监听器同步执行
↓
返回保存的对象
关键点:信号是同步执行的——所有监听器按注册顺序在当前线程中逐个执行,不会异步。
@receiver(post_save, sender=Order)
def send_email_notification(sender, instance, created, **kwargs):
# 如果这里花了 5 秒,整个请求就卡 5 秒
send_email(instance.user.email, "订单已创建")
所以信号中不要做耗时操作——应该放到 Celery 任务中。
第五层:信号的问题
# 问题 1:隐式执行——难以追踪
class Order(models.Model):
status = models.CharField(max_length=20)
# apps.py
class OrdersConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'orders'
def ready(self):
import orders.signals # 导入以确保信号被注册
# 问题:信号在看不见的地方执行,新人很难 debug
# 问题 2:信号中的异常会冒泡
@receiver(pre_save, sender=Order)
def validate_on_save(sender, instance, **kwargs):
if instance.total < 0:
raise ValueError("订单总额不能为负数")
# 这个异常会中断 save() 的调用
# 问题 3:bulk_create/update 不触发的信号
Order.objects.bulk_create([order1, order2]) # ❌ 不触发 post_save
Order.objects.update(status='cancelled') # ❌ 不触发 pre_save
高手视野
信号 vs 重写 save():
信号适用:
├── 跨应用解耦(如:auth.User 创建 → 通知 profile 应用)
├── 不直接相关但需要响应的事件(如:登录日志)
├── 无法修改 save() 源码的第三方 app
重写 save() 适用:
├── 同应用内的业务逻辑
├── 需要访问 save() 的具体参数(update_fields 等)
├── 需要修改即将保存的值
推荐的信号策略:
- 信号只做调度,不做执行——信号里只
delayCelery 任务 - 信号处理函数要幂等——可能被多次调用
- 信号的
sender尽可能精确——避免监听所有模型的post_save - 考虑用
dispatch_uid防止重复注册:
@receiver(post_save, sender=User, dispatch_uid='create_profile_signal')
def create_profile(sender, instance, created, **kwargs):
...
- 最新趋势:避免使用信号。很多 Django 团队(包括我)的经验是,信号在项目早期很美好,项目大了以后信号导致的隐式副作用是最难 debug 的问题。显式调用 Service 方法代替信号更可控。
Django 的迁移系统是怎么工作的?—— 从 detect changes 到 SQL 生成
第一层:迁移的本质
Django 迁移是一系列对数据库模式变更的 Python 描述文件:
# migrations/0001_initial.py
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = []
operations = [
migrations.CreateModel(
name='User',
fields=[
('id', models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False
)),
('name', models.CharField(max_length=100)),
],
),
]
第二层:makemigrations 做了什么
python manage.py makemigrations 的执行流程:
① 加载所有已应用的迁移 → 构建"当前数据库状态"
② 加载当前 models.py → 构建"目标数据库状态"
③ 比较两个状态 → 生成 Operation 列表
④ 写入新的迁移文件
检测逻辑(简化):
┌─────────────────────────────────────────────────┐
│ 数据库状态 │ 模型状态 │
│ ──────────────── │ ──────────── │
│ user: │ user: │
│ id: BigAutoField │ id: BigAutoField │
│ name: CharField │ name: CharField │
│ │ email: CharField ← 新增 │
│ │ │
│ 差异 → 生成 AddField("email") │
└─────────────────────────────────────────────────┘
第三层:migrate 做了什么
① 查询 django_migrations 表 → 获取已应用的迁移列表
② 找到未应用的迁移文件 → 按依赖顺序排列
③ 逐个执行迁移:
├── 读取 migration 文件的 operations
├── 将每个 operation 转换为 SQL
├── 执行 SQL
└── 在 django_migrations 记录已应用
django_migrations 表的结构:
CREATE TABLE django_migrations (
id INTEGER PRIMARY KEY,
app VARCHAR(255),
name VARCHAR(255),
applied DATETIME
);
第四层:合并与冲突
当多人并行开发时,可能出现迁移冲突:
$ python manage.py makemigrations --merge
迁移文件的依赖关系形成了 DAG:
0002_create_post.py
↗ ↖
0001_initial.py 0003_merge.py
↖ ↗
0002_add_email_to_user.py
--merge 不会改变数据库结构,只是创建一个合并迁移文件来解决两个分支的依赖关系:
class Migration(migrations.Migration):
dependencies = [
('myapp', '0002_create_post'),
('myapp', '0002_add_email_to_user'),
]
operations = [] # 空的 operations,只是合并依赖
第五层:SQLite 与复杂迁移
某些迁移在 SQLite 上受限——因为 SQLite 不支持 ALTER TABLE 的大部分操作:
# 对 SQLite 来说,这个操作是"重写整个表"
class Migration(migrations.Migration):
operations = [
migrations.AlterField(
model_name='user',
name='email',
field=models.EmailField(unique=True),
),
]
Django 在 SQLite 上的处理策略是:
① 创建新表(带新 schema)
② 复制所有数据
③ 删除旧表
④ 重命名新表
这就是为什么 SQLite 上的迁移(尤其是大表)特别慢。
第六层:SQL 查看——migrate 到底要做什么
# 查看迁移对应的 SQL(不执行)
python manage.py sqlmigrate myapp 0002
输出示例:
BEGIN;
--
-- Add field email to user
--
ALTER TABLE "myapp_user" ADD COLUMN "email" varchar(100) NOT NULL DEFAULT '';
ALTER TABLE "myapp_user" ALTER COLUMN "email" DROP DEFAULT;
COMMIT;
高手视野
迁移陷阱:
1. 不能在迁移中删除列后马上在同一迁移中重建
→ 拆成两个迁移文件
2. 大表加字段且不能为 NULL
→ 分三步:AddField(null=True) → 填充数据 → AlterField(null=False)
3. 不要在迁移中对大表执行 RunPython
→ 会导致长时间锁表
4. --fake 的陷阱
→ 告诉 Django"这个迁移已应用",但不执行 SQL
→ 用于手工处理数据库后的状态同步,但很容易造成状态不一致
5. 迁移文件应该提交到版本控制
→ 它们和代码一样是项目的资产
→ 生产环境的数据库状态 = 所有迁移文件的累积效果
一个实用的"重命名列"的迁移模式:
# 步骤 1:先在模型中添加新字段(别删旧的)
class User(models.Model):
old_name = models.CharField(max_length=100) # 旧字段(暂不删除)
name = models.CharField(max_length=100, null=True) # 新字段
# 步骤 2:创建数据迁移
from django.db import migrations
def migrate_names(apps, schema_editor):
User = apps.get_model('myapp', 'User')
for user in User.objects.all():
user.name = user.old_name
user.save()
class Migration(migrations.Migration):
operations = [
migrations.RunPython(migrate_names),
]
# 步骤 3:删除旧字段
class Migration(migrations.Migration):
operations = [
migrations.RemoveField('User', 'old_name'),
migrations.AlterField(
model_name='User',
name='name',
field=models.CharField(max_length=100), # 去掉 null=True
),
]
Django 的 QuerySet 是何时被求值的?—— 惰性求值到数据库查询
第一层:QuerySet 是惰性的
# 这里没有 SQL 查询
qs = User.objects.filter(is_active=True)
# 这里也没有
qs = qs.exclude(age__lt=18)
# 这里也没有
qs = qs.order_by('name')
# 到这里才触发 SQL
users = list(qs) # 或者 for user in qs: 或者 qs[0]
QuerySet 生命周期:
qs = User.objects.all() → 创建 QuerySet(无 SQL)
qs = qs.filter(...) → 追加过滤条件(无 SQL)
qs = qs.order_by(...) → 追加排序条件(无 SQL)
qs[0] → 触发 SQL(LIMIT 1)
len(qs) → 触发 SQL(COUNT)
for user in qs: → 触发 SQL(SELECT)
第二层:什么操作会触发查询
# 显式求值
list(qs) # 转换为列表
tuple(qs) # 转换为元组
set(qs) # 转换为集合
# 迭代
for obj in qs: # 遍历
next(iter(qs)) # 获取下一个
# 切片(步进切片除外)
qs[0] # 获取第一个(LIMIT 1)
qs[0:5] # 获取前 5 个(LIMIT 5 OFFSET 0)
qs[::2] # ❌ 不支持步进切片
# 序列化
pickle.dumps(qs) # Pickle
bool(qs) # 布尔值(触发 EXISTS)
# 长度
len(qs) # 长度(触发 COUNT)
qs.count() # 显式 COUNT
# 其他
repr(qs) # 表示(触发 LIMIT 21 获取预览数据)
str(qs.query) # 查看 SQL(不触发查询)
第三层:缓存机制
# 第一次求值时,QuerySet 会缓存结果
qs = User.objects.filter(is_active=True)
users1 = list(qs) # SQL 查询,结果缓存到 qs._result_cache
users2 = list(qs) # 不查询,直接从缓存返回
# 但不同的 QuerySet 不共享缓存
qs1 = User.objects.filter(is_active=True)
qs2 = User.objects.filter(is_active=True)
list(qs1) # SQL 查询
list(qs2) # 另一次 SQL 查询(不同的 QuerySet 对象)
特别注意:
# ❌ 每个循环迭代都重新求值
for user in User.objects.filter(is_active=True):
print(user.name) # 正常——只触发一次查询
# ✅ 尽早求值,后面复用
active_users = list(User.objects.filter(is_active=True))
for user in active_users: # 无额外查询
print(user.name)
# ⚠️ 更隐蔽的——使用查询集作为条件判断
if User.objects.filter(is_active=True): # 第一次查询
users = User.objects.filter(is_active=True) # ❌ 还是一个新的查询集
第四层:查询集的切片与索引
# 索引会触发查询
first = User.objects.all()[0] # SQL: SELECT ... LIMIT 1
# 切片也会,但结果是新的 QuerySet
first_five = User.objects.all()[:5] # SQL: SELECT ... LIMIT 5
# 但是切片后再切片会怎样?
qs = User.objects.all()
first_five = qs[:5] # 求值
first_three = qs[:3] # 不会触发查询!因为 qs 已经缓存了
# ⚠️ 如果切片前 qs 没有被求值,切片就是惰性的
qs = User.objects.all()[:5] # 仍未触发查询
user = qs[0] # 触发查询(LIMIT 5 OFFSET 0)
user2 = qs[1] # 缓存命中,无查询
第五层:求值与 Django 模板
N+1 问题最常发生的地方——模板中的求值:
{% for book in books %} {# ① books 在这里求值 #}
<p>{{ book.title }}</p>
<p>{{ book.author.name }}</p> {# ② 每个 book 触发一次 author 查询 #}
{% endfor %}
如果在视图中没有 select_related:
# views.py(问题版)
def book_list(request):
books = Book.objects.all() # 惰性的
return render(request, 'books.html', {'books': books})
# SQL:1 + N 次
SELECT * FROM book; -- 模板中求值
SELECT * FROM author WHERE id=1 -- 模板中 book.author
SELECT * FROM author WHERE id=2 -- 模板中 book.author
...
# views.py(优化版)
def book_list(request):
books = Book.objects.select_related('author').all()
return render(request, 'books.html', {'books': books})
# SQL:1 次(JOIN)
SELECT * FROM book INNER JOIN author ON book.author_id = author.id;
高手视野
QuerySet 求值检查清单(面试高频):
① qs = Model.objects.all()
有无 SQL?→ 无
② qs = qs.filter(x=1)
有无 SQL?→ 无
③ qs.exists()
有无 SQL?→ 有(SELECT 1 FROM ... LIMIT 1)
④ if qs: ...
有无 SQL?→ 有(SELECT 1 FROM ... LIMIT 1)
⑤ len(qs)
有无 SQL?→ 有(SELECT COUNT(*) FROM ...)
⑥ qs[0:10]
有无 SQL?→ 无(切片惰性)
⑦ str(qs.query)
有无 SQL?→ 无(只生成 SQL 字符串)
⑧ qs.count()
有无 SQL?→ 有(SELECT COUNT(*))
⑨ for obj in qs.iterator():
有无缓存?→ 无(iterator() 不缓存,逐块读取)
适用场景:大结果集(> 10000 行),避免内存溢出
Django 的认证系统是如何工作的?—— Session、Token 到权限矩阵
第一层:Session-based 认证
Django 默认的认证基于 Session + Cookie:
登录请求:
① 提交用户名/密码
② authenticate() 验证凭据
③ login() 创建 Session 记录(存数据库)
④ 设置 Session ID Cookie(返回浏览器)
后续请求:
① 浏览器携带 Session ID Cookie
② SessionMiddleware 从 Cookie 中提取 session_id
③ 从 session 表查找对应的数据
④ AuthenticationMiddleware 从 session 中获取 user_id
⑤ request.user 被设置为当前用户
from django.contrib.auth import authenticate, login, logout
from django.contrib.auth.decorators import login_required
def login_view(request):
if request.method == 'POST':
user = authenticate(
request,
username=request.POST['username'],
password=request.POST['password']
)
if user is not None:
login(request, user) # 创建 session
return redirect('home')
return render(request, 'login.html')
@login_required
def dashboard(request):
return render(request, 'dashboard.html')
def logout_view(request):
logout(request) # 清除 session
return redirect('login')
第二层:Session 存储后端
# settings.py
SESSION_ENGINE = 'django.contrib.sessions.backends.db' # 默认:数据库
# 其他选项:
# 'django.contrib.sessions.backends.cache' # 缓存(速度快,但可能丢失)
# 'django.contrib.sessions.backends.cached_db' # 缓存 + 数据库(推荐)
# 'django.contrib.sessions.backends.file' # 文件系统
# 'django.contrib.sessions.backends.signed_cookies' # 签名 Cookie(小数据)
第三层:Token 认证(DRF)
DRF 的 Token 认证更适用于 API:
# settings.py
INSTALLED_APPS = [
'rest_framework',
'rest_framework.authtoken',
]
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.TokenAuthentication',
'rest_framework.authentication.SessionAuthentication',
],
'DEFAULT_PERMISSION_CLASSES': [
'rest_framework.permissions.IsAuthenticated',
],
}
from rest_framework.authtoken.models import Token
from rest_framework.decorators import api_view
@api_view(['POST'])
def login_api(request):
user = authenticate(
username=request.data['username'],
password=request.data['password']
)
if user:
token, _ = Token.objects.get_or_create(user=user)
return Response({'token': token.key})
return Response({'error': 'Invalid credentials'}, status=400)
第四层:JWT 认证(django-rest-framework-simplejwt)
最广泛使用的 API 认证方案:
from rest_framework_simplejwt.views import TokenObtainPairView
from rest_framework_simplejwt.tokens import RefreshToken
# URL 配置
urlpatterns = [
path('api/token/', TokenObtainPairView.as_view()),
path('api/token/refresh/', TokenRefreshView.as_view()),
path('api/token/verify/', TokenVerifyView.as_view()),
]
JWT 流程:
登录 → 服务端签发 JWT Token(包含用户 ID + 过期时间 + 签名)
→ 返回 { access_token, refresh_token }
API 请求:
Authorization: Bearer <access_token>
→ 服务端验证签名和过期时间
→ 无需查数据库即完成认证
→ 从 token payload 中获取用户信息
Token 刷新:
access_token 过期 → 用 refresh_token 换新的 access_token
→ refresh_token 也可设置过期时间,到期重新登录
第五层:权限系统
Django 的权限是三层体系:
第一层:是否登录(IsAuthenticated)
第二层:模型权限(Django Permissions)
├── add_<model>
├── change_<model>
├── delete_<model>
└── view_<model>
第三层:自定义权限(Object-level permissions)
预定义权限通过 Migration 自动创建。
from django.contrib.auth.models import Permission
from django.contrib.contenttypes.models import ContentType
# 检查权限
user.has_perm('app.add_post') # 是否有添加 Post 的权限
user.has_perms(['app.add_post', 'app.change_post'])
# 给用户/组分配权限
from django.contrib.auth.models import Group
editors = Group.objects.create(name='Editors')
content_type = ContentType.objects.get_for_model(Post)
permission = Permission.objects.get(
codename='change_post',
content_type=content_type,
)
editors.permissions.add(permission)
user.groups.add(editors)
# 视图级别检查
from django.contrib.auth.decorators import permission_required
@permission_required('app.add_post')
def create_post(request):
...
第六层:自定义权限
from django.contrib.auth.models import AbstractUser
class User(AbstractUser):
# 扩展用户模型
is_editor = models.BooleanField(default=False)
department = models.ForeignKey('Department', null=True, on_delete=models.SET_NULL)
class Meta:
permissions = [
("can_publish_post", "Can publish post"), # 自定义权限
("can_approve_comment", "Can approve comment"), # 自定义权限
("can_view_analytics", "Can view analytics"),
]
DRF 中的对象级权限:
from rest_framework.permissions import BasePermission
class IsOwner(BasePermission):
"""只有资源所有者才能操作"""
def has_object_permission(self, request, view, obj):
return obj.author == request.user
class IsEditorOrReadOnly(BasePermission):
"""编辑者有写入权限,其他人只读"""
def has_permission(self, request, view):
if request.method in ('GET', 'HEAD', 'OPTIONS'):
return True
return request.user.is_editor
# 在 ViewSet 中使用
class PostViewSet(ModelViewSet):
queryset = Post.objects.all()
serializer_class = PostSerializer
permission_classes = [IsAuthenticated, IsEditorOrReadOnly]
def perform_create(self, serializer):
serializer.save(author=self.request.user)
高手视野
选择哪种认证方式?
├── 传统 Django Web 应用(带模板) → Session-based
├── 纯 API / SPA 前端 → JWT (simplejwt)
├── 微服务间认证 → JWT 或 mTLS
├── 第三方登录 → django-allauth
└── OAuth2 提供商 → django-oauth-toolkit
安全要点:
├── 始终使用 HTTPS(防止 Token 泄露)
├── JWT 的 access_token 过期时间要短(15-30 分钟)
├── refresh_token 可以设置 7-30 天
├── Session 的 SECRET_KEY 必须轮换
├── 使用 HttpOnly + Secure + SameSite Cookie
└── 密码不要明文存储(Django 默认已用 PBKDF2)
Django 的缓存框架如何使用?—— 从 Redis 到模板片段的层级策略
第一层:缓存后端配置
# settings.py
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': 'redis://127.0.0.1:6379/1',
'TIMEOUT': 300, # 默认 5 分钟
},
'local': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
'LOCATION': 'unique-snowflake',
}
}
Django 支持的缓存后端:
MemcachedCache → 分布式内存缓存(速度极快,但重启数据丢失)
RedisCache → 常用,支持持久化+更多数据结构
DatabaseCache → 存储在数据库表中(慢,但可靠)
FileBasedCache → 文件系统缓存(小项目可用)
LocMemCache → 进程内内存缓存(开发测试,不跨进程)
DummyCache → 什么都不做(开发时关闭缓存)
第二层:视图缓存
from django.views.decorators.cache import cache_page
# 缓存整个视图 15 分钟
@cache_page(60 * 15)
def article_list(request):
articles = Article.objects.filter(published=True)
return render(request, 'articles.html', {'articles': articles})
在 URL 配置中:
from django.views.decorators.cache import cache_page
urlpatterns = [
path('articles/',
cache_page(60 * 15)(ArticleListView.as_view())),
]
第三层:模板片段缓存
更精细的控制——只缓存模板的一部分:
{% load cache %}
{% cache 500 sidebar %}
<div class="sidebar">
{% for category in categories %}
<a href="/category/{{ category.slug }}/">
{{ category.name }}
</a>
{% endfor %}
</div>
{% endcache %}
带参数的变体:
{% cache 500 sidebar request.user.id %}
{# 每个用户的 sidebar 缓存不同 #}
<div class="sidebar">
{% for item in user.recent_items %}
...
{% endfor %}
</div>
{% endcache %}
第四层:底层缓存 API
最灵活的缓存方式:
from django.core.cache import cache
# 设置
cache.set('my_key', 'hello', timeout=300)
# 获取
value = cache.get('my_key')
# 不存在时设置默认值
value = cache.get('my_key', default='default_value')
# 原子操作——如果不存在才设置
cache.add('my_key', 'value') # 已存在则返回 False
# 批量
cache.set_many({'a': 1, 'b': 2, 'c': 3})
result = cache.get_many(['a', 'b', 'c'])
# 删除
cache.delete('my_key')
cache.clear() # 清空整个缓存
# 自增/自减
cache.incr('counter')
cache.decr('counter')
第五层:缓存策略模式
# 1. Cache-Aside(旁路缓存)——最常用
def get_article(article_id):
# 先查缓存
article = cache.get(f'article:{article_id}')
if article is None:
# 缓存未命中 → 查数据库
article = Article.objects.get(id=article_id)
# 写入缓存
cache.set(f'article:{article_id}', article, timeout=300)
return article
# 2. 缓存穿透保护
def get_user(user_id):
# 对不存在的用户也缓存空值(短时间),防止恶意攻击穿透
cache_key = f'user:{user_id}'
result = cache.get(cache_key)
if result is None:
try:
user = User.objects.get(id=user_id)
cache.set(cache_key, user, timeout=300)
return user
except User.DoesNotExist:
# 缓存空值,防止缓存穿透
cache.set(cache_key, 'NOT_FOUND', timeout=60)
return None
if result == 'NOT_FOUND':
return None
return result
第六层:缓存失效策略
# 1. 主动失效——在更新数据时删除缓存
class ArticleService:
def update_article(self, article_id, data):
article = Article.objects.get(id=article_id)
for key, value in data.items():
setattr(article, key, value)
article.save()
# 删除相关缓存
cache.delete(f'article:{article_id}')
cache.delete('article_list') # 列表缓存也失效
def publish_article(self, article_id):
article = Article.objects.get(id=article_id)
article.published = True
article.save()
# 删除缓存的列表
cache.delete_pattern('article_list:*') # Redis 支持模式删除
# 2. 信号驱动的失效
@receiver(post_save, sender=Article)
def clear_article_cache(sender, instance, **kwargs):
cache.delete(f'article:{instance.id}')
@receiver(post_delete, sender=Article)
def clear_article_cache_on_delete(sender, instance, **kwargs):
cache.delete(f'article:{instance.id}')
cache.delete('article_list')
高手视野
多级缓存架构:
┌──────────┐ ┌───────────┐ ┌──────────┐
│ 浏览器缓存 │ │ Django │ │ Redis │
│ (CDN/本地) │ ←── │ LocMem │ ←── │ (集中式) │ ←── 数据库
│ │ │ (进程内) │ │ │
│ 30s │ │ 10s │ │ 5 分钟 │
└──────────┘ └───────────┘ └──────────┘
第一层 第二层 第三层
缓存键规范:
├── 单一对象: {model_name}:{object_id}
├── 列表: {model_name}:list:{page}:{query_params_hash}
├── 计数: {model_name}:count
├── 用户相关: {model_name}:{user_id}:{action}
└── 模板: template:{template_name}:{checksum}
缓存常见问题:
-
缓存雪崩:大量缓存同时过期 → 瞬间请求全部打到数据库
- 解决:设置随机过期时间
timeout=300 + random.randint(0, 60)
- 解决:设置随机过期时间
-
缓存穿透:查询一个根本不存在的数据 → 每次都穿透到数据库
- 解决:缓存空值(短过期时间)
-
缓存击穿:热点 key 恰好过期 → 大量请求打到数据库
- 解决:互斥锁(只有第一个请求去查数据库,其他等缓存重建)
-
数据不一致:数据库更新了,缓存还是旧的
- 解决:先更新数据库 → 再删除缓存(Cache-Aside 的 Write-Around 策略)
- 或者:先删除缓存 → 再更新数据库(但存在竞态——并发读可能补入旧数据)
Django 生产部署的最佳实践是什么?—— 从静态文件到 WSGI/ASGI
第一层:部署架构
生产环境部署(简化):
┌──────────────┐
│ Nginx │
│ (反向代理) │
│ + 静态文件 │
│ + SSL 终止 │
└──────┬───────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Gunicorn │ │ Gunicorn │ │ Gunicorn │
│ (worker) │ │ (worker) │ │ (worker) │
│ WSGI │ │ WSGI │ │ WSGI │
└──────────┘ └──────────┘ └──────────┘
│ │ │
└────────────┼────────────┘
▼
┌──────────────┐
│ PostgreSQL │
└──────────────┘
┌──────────────┐
│ Redis │
│ (缓存/会话) │
└──────────────┘
第二层:Gunicorn 配置
# gunicorn.conf.py
import multiprocessing
bind = "0.0.0.0:8000"
workers = multiprocessing.cpu_count() * 2 + 1 # 经典公式
worker_class = "gevent" # 协程 worker(IO 密集型场景)
timeout = 120
keepalive = 5
max_requests = 1000 # 每个 worker 处理 1000 请求后重启(防止内存泄漏)
max_requests_jitter = 200
# 使用 Uvicorn(ASGI)替代 Gunicorn(WSGI)
# 如果项目用了 async 特性
uvicorn config.asgi:application \
--host 0.0.0.0 \
--port 8000 \
--workers 4 \
--loop uvloop \
--http httptools
第三层:settings 环境分离
# settings/base.py —— 共用配置
INSTALLED_APPS = [...]
MIDDLEWARE = [...]
DATABASES = { ... }
# settings/development.py
from .base import *
DEBUG = True
ALLOWED_HOSTS = ['*']
# settings/production.py
from .base import *
DEBUG = False
ALLOWED_HOSTS = ['yourdomain.com']
SECURE_SSL_REDIRECT = True
SESSION_COOKIE_SECURE = True
CSRF_COOKIE_SECURE = True
SECURE_HSTS_SECONDS = 31536000
SECURE_HSTS_INCLUDE_SUBDOMAINS = True
# 环境变量加载
import environ
env = environ.Env()
SECRET_KEY = env('SECRET_KEY')
DATABASES['default'] = env.db()
启动时指定配置:
# 生产启动
DJANGO_SETTINGS_MODULE=config.settings.production gunicorn config.wsgi:application
# 开发启动
python manage.py runserver --settings=config.settings.development
第四层:静态文件处理
# settings.py
STATIC_URL = '/static/'
STATIC_ROOT = '/var/www/static/' # collectstatic 目标目录
STATICFILES_DIRS = [BASE_DIR / 'static'] # 额外的静态文件源
# 媒体文件
MEDIA_URL = '/media/'
MEDIA_ROOT = '/var/www/media/'
# 收集静态文件到 STATIC_ROOT
python manage.py collectstatic --noinput
# Nginx 配置
location /static/ {
alias /var/www/static/;
expires 30d;
add_header Cache-Control "public, immutable";
}
location /media/ {
alias /var/www/media/;
}
第五层:数据库连接池
# 生产环境推荐配置
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql_psycopg2',
'NAME': env('DB_NAME'),
'USER': env('DB_USER'),
'PASSWORD': env('DB_PASSWORD'),
'HOST': env('DB_HOST'),
'PORT': env('DB_PORT'),
'CONN_MAX_AGE': 60, # 连接池保活 60 秒
'OPTIONS': {
'pool': {
'min_size': 5, # 最小连接数
'max_size': 20, # 最大连接数
'timeout': 30, # 获取连接超时
}
}
}
}
第六层:Django 检查清单
# Django 自带的生产环境检查
python manage.py check --deploy
输出示例及需要关注的项目:
❌ SECRET_KEY: 生产环境不应该硬编码 → 使用环境变量
❌ DEBUG = True → 生产环境必须 False
❌ ALLOWED_HOSTS 为空 → 必须设置
❌ CSRF_COOKIE_SECURE = False → 生产环境必须 True
❌ SESSION_COOKIE_SECURE = False → 生产环境必须 True
❌ SECURE_SSL_REDIRECT = False → 建议开启
⚠ Not using HTTPS → Nginx 配置 SSL
完整的 Django 安全检查列表:
# settings/production.py —— 安全检查清单
# 安全基础
SECRET_KEY = env('SECRET_KEY') # 不要硬编码!
DEBUG = False
ALLOWED_HOSTS = env.list('ALLOWED_HOSTS')
# HTTPS
SECURE_SSL_REDIRECT = True
SECURE_PROXY_SSL_HEADER = ('HTTP_X_FORWARDED_PROTO', 'https')
# HSTS(HTTP Strict Transport Security)
SECURE_HSTS_SECONDS = 31536000 # 1 年
SECURE_HSTS_INCLUDE_SUBDOMAINS = True
SECURE_HSTS_PRELOAD = True
# Cookie
SESSION_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Lax'
CSRF_COOKIE_SECURE = True
CSRF_COOKIE_HTTPONLY = True
# 内容安全
SECURE_CONTENT_TYPE_NOSNIFF = True
SECURE_BROWSER_XSS_FILTER = True
X_FRAME_OPTIONS = 'DENY'
第七层:日志配置
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
'style': '{',
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'verbose',
},
'file': {
'class': 'logging.handlers.RotatingFileHandler',
'filename': '/var/log/django/app.log',
'maxBytes': 1024 * 1024 * 100, # 100MB
'backupCount': 10,
'formatter': 'verbose',
},
},
'root': {
'handlers': ['console', 'file'],
'level': 'INFO',
},
'loggers': {
'django.request': {
'handlers': ['file'],
'level': 'WARNING',
'propagate': False,
},
'django.db.backends': {
'handlers': ['file'],
'level': 'WARNING', # 开发时可以设为 DEBUG 看 SQL
},
},
}
高手视野
现代 Django 部署演进:
传统模式:Nginx → Gunicorn → Django (WSGI)
→ Celery (Worker)
→ Redis (Cache + Broker)
改进模式:Nginx → Daphne/Uvicorn → Django (ASGI, 同时处理 HTTP + WebSocket)
→ Django Q/ARQ (异步 Worker)
→ Redis
Docker 化:
├── 每个服务一个容器(Nginx、Django、Celery、Redis、PostgreSQL)
├── 用 Docker Compose 编排(dev)
├── 用 Kubernetes 编排(production)
└── 使用 Django 的健康检查端点:/health/
部署中最常见的问题:
1. 忘记 collectstatic → 404 静态文件
2. 忘记 migrate → OperationalError
3. SECRET_KEY 在不同 worker 间不一致 → Session 丢失
4. ALLOWED_HOSTS 配置错误 → 403 响应
5. 没有文件存储策略 → 多台服务器媒体文件不同步(用 S3/MinIO)
Django 的 Form 组件和 ModelForm 有什么区别?—— 从表单渲染到验证链
第一层:Form —— 纯表单
from django import forms
class ContactForm(forms.Form):
name = forms.CharField(max_length=100)
email = forms.EmailField()
message = forms.CharField(widget=forms.Textarea)
def clean_email(self):
# 字段级验证
email = self.cleaned_data['email']
if not email.endswith('@company.com'):
raise forms.ValidationError('只接受公司邮箱')
return email
def clean(self):
# 跨字段验证
cleaned_data = super().clean()
# 跨字段逻辑
return cleaned_data
第二层:ModelForm —— 从模型自动创建
from django import forms
from .models import Article
class ArticleForm(forms.ModelForm):
class Meta:
model = Article
fields = ['title', 'content', 'category', 'published']
# 或排除某些字段
# exclude = ['author', 'created_at']
# 仍然可以自定义验证
def clean_title(self):
title = self.cleaned_data['title']
if len(title) < 10:
raise forms.ValidationError('标题至少 10 个字符')
return title
第三层:两者核心区别
Form ModelForm
────── ──────────
字段定义 手动声明 从模型自动生成
save() 不提供 instance.save()
验证规则 手动写 clean 继承模型约束 + 自定义
关联模型 无 ForeignKey, ManyToMany
代码量 多 少,尤其在字段多的时候
灵活度 高 受模型约束
ModelForm.save() 的细节:
# 创建
form = ArticleForm(request.POST)
article = form.save() # 直接创建并返回
# 更新
article = Article.objects.get(id=1)
form = ArticleForm(request.POST, instance=article)
form.save() # 更新已有实例
# 提交前修改
if form.is_valid():
article = form.save(commit=False) # 暂不提交
article.author = request.user # 补充自动字段
article.save() # 手动保存
# 多对多需要额外处理
form.save_m2m() # 如果有 ManyToMany 字段
第四层:验证链
form.is_valid() 的完整流程:
① 运行 full_clean()
├── 每个字段的 to_python() → 类型转换
├── 每个字段的 validate() → 验证器规则(max_length 等)
├── 每个字段的 run_validators() → 自定义验证器
├── 每个字段的 clean_<field>() → 字段级 clean 方法
└── 模型的 clean() 和 clean_fields()
└── 表单的 clean() → 跨字段验证
② 如果有错误 → errors 不为空
③ 如果没有错误 → cleaned_data 有值
class RegistrationForm(forms.ModelForm):
password1 = forms.CharField(widget=forms.PasswordInput)
password2 = forms.CharField(widget=forms.PasswordInput)
class Meta:
model = User
fields = ['username', 'email']
def clean_password2(self):
"""字段级验证:两个密码一致"""
password1 = self.cleaned_data.get('password1')
password2 = self.cleaned_data.get('password2')
if password1 and password2 and password1 != password2:
raise forms.ValidationError('两次密码不一致')
return password2
def clean_email(self):
"""字段级验证:邮箱唯一"""
email = self.cleaned_data.get('email')
if User.objects.filter(email=email).exists():
raise forms.ValidationError('邮箱已被注册')
return email
def save(self, commit=True):
"""重写 save 以正确设置密码"""
user = super().save(commit=False)
user.set_password(self.cleaned_data['password1'])
if commit:
user.save()
return user
第五层:表单在 Class-based View 中的使用
from django.views.generic.edit import CreateView, UpdateView
from django.views.generic import FormView
# CreateView —— 自动处理 ModelForm
class ArticleCreateView(CreateView):
model = Article
form_class = ArticleForm
template_name = 'articles/create.html'
success_url = '/articles/'
def form_valid(self, form):
form.instance.author = self.request.user
return super().form_valid(form)
# FormView —— 不需要模型的表单
class ContactView(FormView):
form_class = ContactForm
template_name = 'contact.html'
success_url = '/thanks/'
def form_valid(self, form):
form.send_email() # 自定义逻辑
return super().form_valid(form)
高手视野
Django Form 的演进与变化:
Django 1.x: Form + ModelForm 是前端表单验证的唯一方式
Django 2.x: Form 仍然重要,但 View 层开始使用 Class-based View
Django 3.x: DRF Serializer 开始替代 API 场景下的 ModelForm
Django 4.x+: 前端框架(React/Vue)流行 → DRF Serializer 成为主流
→ Django Form 在后端管理界面和传统项目中仍然活跃
当前场景:
├── 传统 Django 项目(带模板):Form / ModelForm 仍然是主力
├── SPA + DRF 前后端分离:Serializer 替代 Form
├── Django Admin:仍然重度依赖 Form
├── Django + HTMX:Form 重新焕发生机——服务端渲染表单
└── 纯 API 后端:Serializer 为主,Form 用于文件上传等特殊场景
核心认识:Django Form 的核心价值不是在前端渲染(那是 JS 框架的领域),而在后端验证层——它提供了一套声明式、可组合、可继承的验证体系,这在任何 Web 框架中都是非常重要的。