深入理解 Python 异步编程:多线程、多进程与 asyncio
本文是对 denglj/aiotutorial 系列文章《深入理解Python异步编程》上篇与中篇的梳理与总结,补充了多线程与多进程的独立讨论。原作者驹哥,教程深入浅出,建议阅读原文。
一、核心概念
阻塞 vs 非阻塞
# 阻塞 —— 调用结果返回之前,当前线程被挂起
sock.connect(('example.com', 80)) # 等 TCP 三次握手完成才返回
# 非阻塞 —— 调用立即返回,不论结果是否就绪
sock.setblocking(False)
sock.connect(('example.com', 80)) # 立即返回,但连接可能还没建立
| 阻塞 | 非阻塞 | |
|---|---|---|
| 调用方 | 调用后线程挂起,等待结果 | 调用后立即返回,线程继续执行 |
| CPU 利用率 | I/O 等待期间 CPU 空闲 | 不等待,但需轮询或事件通知 |
| 编程模型 | 简单直观 | 复杂(回调/事件) |
同步 vs 异步
同步 和 异步 描述的是被调用方的行为:
- 同步调用:任务 A 调用任务 B,A 必须等 B 返回才能继续——"你等我有结果"
- 异步调用:任务 A 调用任务 B,不等 B 返回就继续执行——B 完成后通过回调/事件通知 A——"有结果了我通知你"
并发 vs 并行
- 并发(Concurrent):多个任务在同一时间段内交替执行,逻辑上同时(单核即可)
- 并行(Parallel):多个任务在同一时刻同时执行,物理上同时(需要多核)
Python 的多线程是并发但非并行(受 GIL 限制),多进程是真正的并行。
二、多线程——最朴素的并发方案
解决什么问题
同步阻塞模型下,每个 I/O 操作(connect、send、recv)都会挂起线程。CPU 在 I/O 等待期间完全空闲。
最直接的想法:一个请求一个线程,让操作系统来调度。
from concurrent import futures
def blocking_way():
sock = socket.socket()
sock.connect(('example.com', 80))
request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
sock.send(request.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
return response
def thread_way():
with futures.ThreadPoolExecutor(10) as executor:
futs = {executor.submit(blocking_way) for i in range(10)}
return len([fut.result() for fut in futs])
GIL 是怎么回事
GIL(Global Interpreter Lock) 是 CPython 解释器的一个互斥锁,保证任何时候只有一个线程在执行 Python 字节码。
但这不代表多线程没用:
- I/O 密集型任务:多线程有效。 因为 I/O 操作(
read、write、send、recv)会释放 GIL,等待 I/O 期间其他线程可以执行 Python 代码。 - CPU 密集型任务:多线程无效。 计算型任务不释放 GIL,多个线程轮流抢占同一把锁,加上上下文切换开销,甚至比单线程还慢。
| 任务类型 | 多线程效果 | 原因 |
|---|---|---|
| 网络请求(I/O) | ✅ 有效 | recv 等待时释放 GIL |
| 文件读写(I/O) | ✅ 有效 | read/write 释放 GIL |
| 数学计算(CPU) | ❌ 无效 | 不释放 GIL,多线程争锁 |
| 图像处理(CPU) | ❌ 无效 | 同上 |
多线程的代价
| 问题 | 说明 |
|---|---|
| 线程切换开销 | 保存寄存器、TLB、调度器决策——万级线程时 CPU 被切换耗尽 |
| 内存占用 | 每个线程约 50KB 栈空间,万级线程需要数百 MB |
| 竞态条件 | 共享变量需要加锁(Lock、RLock) |
| 调试困难 | 死锁、活锁难以复现 |
这就是著名的 C10K 问题——当并发连接数达到一万以上时,多线程模型力不从心。
三、多进程——真正的并行方案
解决什么问题
多线程受 GIL 限制无法利用多核 CPU。要真正利用多核,必须使用多进程——每个进程有独立的 Python 解释器和内存空间,互不干扰:
from concurrent import futures
import math
def is_prime(n):
if n < 2:
return False
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
return False
return True
def process_way():
numbers = list(range(10_000_000, 10_001_000))
with futures.ProcessPoolExecutor(4) as executor: # 4 进程 = 4 核并行
return list(executor.map(is_prime, numbers))
线程 vs 进程
| 维度 | 多线程 | 多进程 |
|---|---|---|
| 内存空间 | 同一进程内共享 | 各自独立 |
| GIL | 受限制 | 不受限制 |
| 适用场景 | I/O 密集型 | CPU 密集型 |
| 创建开销 | ~50KB/线程 | ~10MB+/进程 |
| 通信方式 | 直接读写共享变量(需加锁) | Pipe、Queue、共享内存(需序列化) |
进程间通信(IPC)
from multiprocessing import Process, Queue, Pipe, Value
# 1. Queue
def worker(q):
q.put('done')
q = Queue()
Process(target=worker, args=(q,)).start()
print(q.get())
# 2. Pipe
def pipe_worker(conn):
conn.send('hello')
conn.close()
parent, child = Pipe()
Process(target=pipe_worker, args=(child,)).start()
print(parent.recv())
# 3. 共享内存
counter = Value('i', 0)
适用场景
| 场景 | 方案 |
|---|---|
| CPU 密集计算 | ✅ 多进程(唯一选择) |
| 大量 I/O + 少量 CPU | ❌ 多进程浪费(进程切换成本高) |
| 有状态服务 | ❌ 多进程通信复杂 |
| 数据科学/ML | ✅ 多进程(joblib、multiprocessing) |
四、从同步阻塞到事件循环
4.1 同步阻塞
def blocking_way():
sock = socket.socket()
sock.connect(('example.com', 80)) # 阻塞
sock.send(request.encode('ascii')) # 阻塞
chunk = sock.recv(4096) # 阻塞
这是最直观的编程模型,但效率最低——一个线程只能处理一个请求,大部分时间在空等 I/O。
4.2 非阻塞 + 轮询
def nonblocking_way():
sock = socket.socket()
sock.setblocking(False) # 非阻塞模式
try:
sock.connect(('example.com', 80))
except BlockingIOError:
pass
# 轮询发送
while True:
try:
sock.send(data)
break
except OSError:
pass
# 轮询接收
while True:
try:
chunk = sock.recv(4096)
# ...
break
except OSError:
pass
setblocking(False) 让 I/O 调用立即返回,不再阻塞线程。但缺点也很明显——轮询是空转 CPU。
4.3 I/O 多路复用——事件循环的诞生
操作系统提供了 I/O 多路复用(select/poll/epoll/kqueue)——把多个 Socket 注册到内核,内核在 Socket 就绪时通知应用:
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
selector = DefaultSelector() # Linux 用 epoll,macOS 用 kqueue
# 事件循环
def loop():
while not stopped:
events = selector.select() # 阻塞等待就绪事件
for key, mask in events:
callback = key.data # 取出回调
callback(key, mask) # 执行
这个架构——单线程 + 事件循环 + 非阻塞 I/O——是 Nginx、Node.js、Redis、HAProxy 等高性能服务的基础。
4.4 回调模型
class Crawler:
def fetch(self):
self.sock = socket.socket()
self.sock.setblocking(False)
try:
self.sock.connect(('example.com', 80))
except BlockingIOError:
pass
selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)
def connected(self, key, mask):
selector.unregister(key.fd)
self.sock.send(get.encode('ascii'))
selector.register(key.fd, EVENT_READ, self.read_response)
def read_response(self, key, mask):
chunk = self.sock.recv(4096)
if chunk:
self.response += chunk
else:
selector.unregister(key.fd)
这个模型已经能高效处理万级并发,但回调风格带来了新的问题。
4.5 回调地狱
def callback_1():
def callback_2():
def callback_3():
async_function(callback_3)
async_function(callback_2)
async_function(callback_1)
回调的硬伤:
| 问题 | 表现 |
|---|---|
| 控制流反转 | 不是"我调用",而是"你好了叫我" |
| 错误处理分散 | 每个回调单独处理异常 |
| 无法使用语言结构 | for、try/except 不能跨回调 |
| 组合困难 | 两个异步操作顺序执行需要嵌套 |
我们需要一个方案:既有事件循环的高并发能力,又能像写同步代码一样自然。
五、生成器与协程
5.1 yield 的双向通信
Python 的生成器原本是单向迭代器。但 generator.send() 让它变成了可暂停、可恢复、可双向通信的协程:
def gen():
received = yield 'hello' # send('world') → received = 'world'
yield received
5.2 Future + Task —— 自制 asyncio
基于生成器,可以构建出 asyncio 的核心抽象:
class Future:
"""异步结果的占位符"""
def __init__(self):
self.result = None
self._callbacks = []
def add_done_callback(self, fn):
self._callbacks.append(fn)
def set_result(self, result):
self.result = result
for fn in self._callbacks:
fn(self)
class Task:
"""驱动协程的调度器"""
def __init__(self, coro): # fetch() 生成器对象
self.coro = coro
f = Future()
f.set_result(None)
self.step(f)
def step(self, future):
try:
next_future = self.coro.send(future.result) # 恢复执行
except StopIteration:
return
next_future.add_done_callback(self.step) # Future 完成再 step
用生成器改写爬虫:
class Crawler:
def fetch(self):
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(('example.com', 80))
except BlockingIOError:
pass
f = Future()
def on_connected():
f.set_result(None)
selector.register(sock.fileno(), EVENT_WRITE, on_connected)
yield f # 🎯 暂停,交出 Future
selector.unregister(sock.fileno())
sock.send(get.encode('ascii'))
while True:
f = Future()
def on_readable():
f.set_result(sock.recv(4096))
selector.register(sock.fileno(), EVENT_READ, on_readable)
chunk = yield f # 🎯 暂停
selector.unregister(sock.fileno())
if chunk:
self.response += chunk
else:
break
这就是 asyncio 的核心闭环——Future 占位 → Task 驱动 → EventLoop 调度。
5.3 yield from —— 协程组合
yield from 可以把异步操作提取为独立单元,实现真正的协程组合:
def connect(sock, address):
f = Future()
sock.setblocking(False)
try:
sock.connect(address)
except BlockingIOError:
pass
selector.register(sock.fileno(), EVENT_WRITE, on_connected)
yield from f # 委托等待
selector.unregister(sock.fileno())
def read(sock):
f = Future()
selector.register(sock.fileno(), EVENT_READ, on_readable)
chunk = yield from f
selector.unregister(sock.fileno())
return chunk
def read_all(sock):
response = []
chunk = yield from read(sock)
while chunk:
response.append(chunk)
chunk = yield from read(sock)
return b''.join(response)
class Crawler:
def fetch(self):
sock = socket.socket()
yield from connect(sock, ('example.com', 80)) # 像同步调用
sock.send(get.encode('ascii'))
self.response = yield from read_all(sock) # 像同步调用
yield from 让异步代码写起来和同步代码几乎一样——没有回调,没有显式注册,控制流是线性的。
六、async/await 与事件循环
6.1 原生协程语法
Python 3.5(PEP 492)引入 async / await,把生成器协程升级为语言原语:
async def fetch(url): # 原生协程函数
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response = await response.read() # await 取代 yield from
return response
await 后面只能跟 awaitable 对象:
| awaitable 类型 | 说明 |
|---|---|
| 原生协程 | async def 返回的协程对象 |
@asyncio.coroutine | 旧版生成器协程(Python 3.4) |
实现了 __await__ 的类 | Future-like 对象 |
6.2 事件循环
import asyncio
# 方式一:run_until_complete(3.7 之前)
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch('/1'))
loop.close()
# 方式二:asyncio.run(3.7+ 推荐)
asyncio.run(fetch('/1')) # 自动创建和关闭事件循环
事件循环策略——asyncio 通过策略模式抽象事件循环的创建:
asyncio.get_event_loop_policy() # 获取当前策略
asyncio.set_event_loop_policy(p) # 设置策略
# 替换为 uvloop(基于 libuv,比默认快 2x+)
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
事件循环的状态管理:
| 方法 | 作用 | 类比 |
|---|---|---|
run_until_complete(coro) | 运行到协程完成,返回结果 | 打一次火,跑一趟 |
run_forever() | 一直运行直到 stop() | 引擎不熄火 |
stop() | 完成当前批次事件后停止 | 熄火(还可再点火) |
close() | 关闭循环,不能再启动 | 报废 |
注意:
get_event_loop()在主线程中有默认循环,子线程中调用会抛RuntimeError——需在子线程内先set_event_loop(new_loop())- 一个进程最好只使用一种事件循环策略
6.3 asyncio.run() 的演化
# Python 3.7:asyncio.run 登场
asyncio.run(main())
# Python 3.8+:asyncio.Runner 上下文管理器
with asyncio.Runner() as runner:
runner.run(main())
# Python 3.11+:asyncio.timeout() 上下文管理器
async with asyncio.timeout(5):
result = await fetch()
七、Future 与 Task
7.1 Future
Future 代表一个异步操作的未来结果。当协程执行到 await 时,会返回一个 Future——"等我有了结果再告诉你":
# 创建 Future
fut = loop.create_future()
fut = asyncio.Future()
# 常用方法
fut.set_result(value) # 设置结果(触发 done callbacks)
fut.result() # 获取结果(未就绪则抛出 InvalidStateError)
fut.add_done_callback(fn) # 注册回调 fn(fut)
fut.done() # 是否完成
fut.cancel() # 取消
fut.cancelled() # 是否已取消
fut.exception() # 获取异常
7.2 Task
Task 是 Future 的子类,专门包装协程,驱动其执行:
# 创建 Task 的三种方式
task = asyncio.create_task(coro) # 3.7+ 推荐
task = asyncio.ensure_future(coro) # 通用(Future 也接受)
task = loop.create_task(coro) # 指定 loop
# 区别:
# create_task() 仅接受协程
# ensure_future() 接受协程(包装为 Task)和 Future(直接返回)
# Task 的方法
task.cancel() # 取消任务
asyncio.all_tasks() # 获取所有正在调度的任务
asyncio.current_task() # 获取当前执行的任务
task.add_done_callback(fn) # Future 的方法,Task 继承
八、多协程调度
8.1 gather
全部完成,结果按输入顺序返回:
async def main():
results = await asyncio.gather(
fetch('/1'),
fetch('/2'),
fetch('/3'),
return_exceptions=True # 异常不中断,作为结果返回
)
# results = [response1, response2, response3]
gather 返回的 Future 本身是一个 Future 对象,await 它即等待全部结果。
8.2 wait
精细控制——支持超时和返回条件:
# 参数:return_when 可选值:
# ALL_COMPLETED — 全部完成(默认)
# FIRST_COMPLETED — 只要有一个完成
# FIRST_EXCEPTION — 遇到第一个异常
done, pending = await asyncio.wait(
[fetch('/1'), fetch('/2')],
timeout=2.0,
return_when=asyncio.FIRST_COMPLETED
)
# done — 已完成的任务集合
# pending — 未完成的任务集合
wait 返回 (done, pending) 两个集合,不自动抛出异常——需要通过 task.result() 手动检查。
8.3 as_completed
谁先完成先处理谁(顺序无关):
async def main():
for coro in asyncio.as_completed(
[fetch('/1'), fetch('/2'), fetch('/3')],
timeout=5.0
):
result = await coro # 谁先完成先返回
超时时抛 asyncio.TimeoutError。
8.4 对比
| 函数 | 返回值 | 特点 |
|---|---|---|
gather | 结果列表(顺序对应) | 等全部完成,顺序固定 |
wait | (done, pending) | 支持超时 + 条件判定 |
as_completed | 迭代器 | 按完成顺序产出 |
九、同步原语
单线程协程之间仍然有竞态——如果两个协程先后 await 同一个操作,中间可能被切换:
counter = 0
async def bad():
global counter
temp = counter # 读
await asyncio.sleep(0) # 切到另一个协程
counter = temp + 1 # 写 ← 另一个协程可能已经改了 counter
9.1 Lock
lock = asyncio.Lock()
async def safe():
global counter
async with lock: # 加锁(协程版 with)
temp = counter
await asyncio.sleep(0)
counter = temp + 1
9.2 Event
event = asyncio.Event()
# 等待事件
async def waiter():
await event.wait() # 阻塞直到 set()
print('event set')
async def setter():
await asyncio.sleep(1)
event.set() # 通知所有 waiter
9.3 Condition
cond = asyncio.Condition()
async def consumer():
async with cond:
await cond.wait() # 等待通知
# 条件满足,继续执行
async def producer():
async with cond:
# 生产数据...
cond.notify() # 通知一个 waiter
# cond.notify_all() # 通知全部
9.4 Semaphore
限制并发数:
sem = asyncio.Semaphore(5) # 最多 5 个并发
async def limited_fetch(url):
async with sem:
return await fetch(url)
# 同时启动 100 个任务,但只有 5 个会在执行
await asyncio.gather(*[limited_fetch(url) for url in 100_urls])
BoundedSemaphore 与 Semaphore 的区别:Semaphore 可以 release() 超过初始值,BoundedSemaphore 不允许(用于防止逻辑错误)。
9.5 Queue
queue = asyncio.Queue(maxsize=100)
# 生产者
async def producer(q):
for i in range(10):
await q.put(f"item-{i}")
# 消费者
async def consumer(q, name):
while True:
item = await q.get()
print(f"{name}: {item}")
q.task_done()
async def main():
q = asyncio.Queue()
prods = [asyncio.create_task(producer(q)) for _ in range(2)]
cons = [asyncio.create_task(consumer(q, f"w{i}")) for i in range(3)]
await asyncio.gather(*prods)
await q.join() # 等待所有元素被处理
for c in cons:
c.cancel()
asyncio.Queue 的不同变体:
| 类型 | 特征 |
|---|---|
Queue | 先进先出(FIFO) |
PriorityQueue | 按优先级取出 |
LifoQueue | 后进先出(栈) |
十、与多线程/多进程结合
10.1 为什么需要结合
asyncio 的协程期望所有操作都是非阻塞的。但现实中有大量阻塞操作无法绕过:
- 文件 I/O(
open、read、write) - 旧版数据库驱动(
psycopg2、pymysql) requests库(非httpx)- CPU 密集型计算
解决方案:把阻塞操作丢到线程池或进程池,asyncio 的事件循环不被阻塞。
10.2 run_in_executor
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
async def handle():
loop = asyncio.get_running_loop()
# 阻塞 I/O → 线程池
with ThreadPoolExecutor() as pool:
data = await loop.run_in_executor(
pool,
requests.get, 'https://api.example.com/data'
)
# CPU 密集 → 进程池
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool,
heavy_compute, data.text
)
本质:run_in_executor 把同步函数提交到独立的线程/进程执行,返回 asyncio.Future 对象——协程可以 await 它。
10.3 实战组合
import asyncio
from concurrent.futures import ProcessPoolExecutor
import hashlib
async def process_large_file(file_path: str):
loop = asyncio.get_running_loop()
# 1. asyncio 做网络 I/O
metadata = await fetch_metadata(file_path)
# 2. 文件读写 → 线程池
raw_data = await loop.run_in_executor(
ThreadPoolExecutor(),
read_file, file_path
)
# 3. CPU 计算 → 进程池
hash_result = await loop.run_in_executor(
ProcessPoolExecutor(),
sha256_hash, raw_data
)
return hash_result
| 组件 | 模型 | 为什么 |
|---|---|---|
| 网络 I/O | asyncio | 高并发、非阻塞 |
| 文件/DB I/O | 线程池 | 阻塞操作,释放 GIL |
| CPU 计算 | 进程池 | 需要用满多核 |
十一、Python 版本演进
| 版本 | asyncio 变化 |
|---|---|
| 3.4 | asyncio 进入标准库,@asyncio.coroutine + yield from |
| 3.5 | async/await 语法(PEP 492),原生协程诞生 |
| 3.6 | 异步生成器(async for、async yield) |
| 3.7 | asyncio.run() 一站式入口,asyncio.create_task() |
| 3.8 | asyncio.Runner 上下文管理器 |
| 3.9 | asyncio.to_thread() 简化 run_in_executor |
| 3.10 | asyncio.TaskGroup 结构化并发(PEP 654) |
| 3.11 | asyncio.Barrier、asyncio.timeout() 上下文管理器 |
| 3.12 | TaskGroup 稳定性提升,各类性能优化 |
| 3.13 | 逐步移除弃用 API,asyncio 运行效率持续改善 |
十二、GIL 对异步编程的深层影响
12.1 GIL 的本质
GIL 不是 Python 语言的特性,而是 CPython 解释器的实现细节。Jython、IronPython、PyPy(部分版本)没有 GIL。
GIL 存在的理由:CPython 的内存管理不是线程安全的(引用计数),加一把全局大锁是最简单的解决方案。
12.2 GIL 对三模型的影响
| 模型 | GIL 影响 | 有效 CPU 利用率 |
|---|---|---|
| 多线程(CPU) | ❌ 线程抢同一把锁 | 最多 1 核 |
| 多线程(I/O) | ✅ I/O 时释放 GIL | 接近多核(等待不占 GIL) |
| 多进程 | ✅ 每个进程独立 GIL | 满核 |
| asyncio(CPU) | ❌ 协程里做计算同样阻塞事件循环 | 最多 1 核 |
| asyncio(I/O) | ✅ await 让出控制权 | 单核但吞吐极高 |
12.3 asyncio 能否绕过 GIL
不能。asyncio 仍然是单线程,GIL 仍然在。但 asyncio 的优势不在于并行,而在于 I/O 等待时不占 GIL + 没有线程切换开销。
# 错误:在协程里做 CPU 密集计算,阻塞整个事件循环
async def bad_coro():
for i in range(10_000_000): # 占用 GIL 不释放
_ = i * i
# 正确:CPU 密集计算丢到进程池
async def good_coro():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
ProcessPoolExecutor(),
lambda: [i * i for i in range(10_000_000)]
)
十三、asyncio 踩坑经验
13.1 事件循环策略冲突
# 坑:主线程有默认循环,子线程没有
import asyncio
def thread_func():
loop = asyncio.get_event_loop() # RuntimeError!子线程没有默认循环
# 正确:在子线程内手动设置
def thread_func():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(some_coro())
经验: 一个 Python 进程最好只有一种事件循环策略,只使用一个事件循环对象。多线程若需各自的事件循环,需手动 new_event_loop() + set_event_loop()。
13.2 协程忘记 await
# 坑:忘记 await,协程不会执行
async def main():
fetch('/1') # ❌ 只是创建了协程对象,没执行!
await fetch('/2') # ✅
# 坑:gather 里传协程对象和传 Task 的区别
# 协程对象是惰性的,Task 创建即调度
task = asyncio.create_task(fetch('/1'))
await asyncio.gather(task, fetch('/2')) # 都正确
13.3 阻塞调用阻塞事件循环
async def bad_handler():
# ❌ time.sleep() 是阻塞的,它不会释放 GIL 给其他协程
time.sleep(5)
# ✅ 正确:asyncio.sleep() 会让出控制权
await asyncio.sleep(5)
# ❌ requests.get() 也是阻塞的
response = requests.get('https://api.example.com')
# ✅ 正确:用 aiohttp 或 run_in_executor
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com') as resp:
...
13.4 回调函数异常丢失
使用 call_soon / call_later 注册的回调函数,如果内部抛出异常,不会自动传播到事件循环外。需要用 try/except 包裹:
def fragile_callback():
1 / 0 # 异常被事件循环吞掉,没有任何输出
loop.call_soon(fragile_callback)
# 正确做法
def safe_callback():
try:
1 / 0
except Exception as e:
print(f"回调异常: {e}")
loop.call_soon(safe_callback)
13.5 Future 状态混乱
fut = asyncio.Future()
# 错误:对已完成的 Future 重复 set_result
fut.set_result('done')
fut.set_result('again') # InvalidStateError!
# 正确:先检查状态
if not fut.done():
fut.set_result('value')
13.6 忘记处理 Task 取消
async def worker():
try:
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
# 必须处理清理逻辑
print("worker 被取消,正在清理...")
await cleanup()
raise # 或者不 raise 则取消被吞掉
task = asyncio.create_task(worker())
await asyncio.sleep(3)
task.cancel()
十四、回调 vs 协程 vs 绿程 vs 线程
14.1 四种并发模型
| 模型 | 调度方式 | 切换成本 | 并发数 | 代表 |
|---|---|---|---|---|
| 回调 | 事件循环驱动 | 函数调用级别 | 十万级 | Node.js、Twisted |
| 协程 | 程序主动 await | 函数调用级别 | 十万级 | asyncio |
| 绿程 | 程序主动 yield | 函数调用级别 | 万级 | Gevent、Eventlet |
| 线程 | OS 抢占式 | 微秒级(内核态) | 千级 | CPython threading |
14.2 协程 vs 绿程
绿程(Green Thread) 是用户态线程——在单线程内通过 yield 模拟多线程调度。Python 中 Gevent 是典型代表:
from gevent import monkey; monkey.patch_all()
import requests
# 看起来就是同步代码,但内部是协程调度
urls = ['https://api.example.com'] * 100
results = [requests.get(url) for url in urls] # 自动并发
| 维度 | asyncio | Gevent |
|---|---|---|
| 语法 | async/await 显式标记 | monkey-patch,代码无侵入 |
| 心智负担 | 必须清楚哪些是异步函数 | 看起来全是同步代码 |
| 隐式行为 | 显式 await,清晰可控 | monkey-patch 可能打补丁到 C 扩展 |
| 生态 | 需要 aio 版库(aiohttp、httpx) | 兼容同步库(requests 自动变协程) |
| 调试 | asyncio 有标准调试工具 | monkey-patch 后调用栈混乱 |
| 控制力 | 细粒度控制挂起点 | 隐式 yield,难以预测 |
经验: Gevent 适合已有同步代码库快速异步化(monkey-patch 零改动),asyncio 适合新项目(显式、可控、生态健康发展)。
14.3 协程 vs 线程
| 维度 | 协程 | 线程 |
|---|---|---|
| 调度 | 协作式(程序自己让出) | 抢占式(OS 强制切换) |
| 切换成本 | 函数调用级(~50ns) | 内核态切换(~1μs) |
| 栈大小 | ~几 KB | ~1MB(默认 8MB) |
| 万级并发 | ✅ 轻松 | ❌ 内存和切换开销爆炸 |
| 竞态 | 共享变量仍需锁 | 共享变量仍需锁 |
| 多核 | ❌ 单线程 | ❌ GIL 限制(CPython) |
十五、uvloop/libuv vs asyncio 默认事件循环
15.1 事件循环的实现
| 事件循环 | 底层 | 平台 | 性能 |
|---|---|---|---|
asyncio.SelectorEventLoop | 原生 selectors 模块 | Linux/macOS | 基准 |
asyncio.ProactorEventLoop | Windows IOCP | Windows 默认 | 基准 |
| uvloop | libuv(C 语言实现) | Linux/macOS | 2x+ |
| 为什么更快 | libuv 减少 Python 函数调用 + 批量事件处理 | — | — |
15.2 uvloop 使用
import uvloop
import asyncio
# 全局替换(一行代码)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 之后所有 asyncio API 自动使用 uvloop
asyncio.run(main())
# 或者只替换当前循环
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main())
性能数据(来自 uvloop 官方 benchmark):uvloop 比默认 asyncio 事件循环快 2x 以上,某些场景接近 Go 语言的 goroutine 性能。
15.3 什么时候值得上 uvloop
| 场景 | 建议 |
|---|---|
| 高并发网络服务(>10k QPS) | ✅ 值得,立竿见影 |
| 普通 Web 应用(~1k QPS) | 差异不大,默认即可 |
| Windows 环境 | ❌ uvloop 不支持 Windows |
| 调试/开发阶段 | 先用默认,上线前切换 |
十六、异步编程指导细则
16.1 最佳实践
✅ DO
├─ 使用 asyncio.run() 作为入口(不要手动管理 loop)
├─ 使用 asyncio.create_task() 创建任务(不要用 ensure_future)
├─ 阻塞操作用 run_in_executor 丢到线程/进程池
├─ 使用 Semaphore 控制并发数(防雪崩)
├─ 使用 try/except CancelledError 处理任务取消
├─ 使用 TaskGroup 做结构化并发(3.11+)
├─ 优先使用 aiohttp/httpx/aiosqlite 等原生异步库
❌ DON'T
├─ 在协程里用 time.sleep()(用 asyncio.sleep())
├─ 在协程里做 CPU 密集计算(用 run_in_executor)
├─ 忘记 await 协程
├─ 多个线程共用同一个事件循环(用 run_coroutine_threadsafe)
├─ 在回调里抛异常不处理(异常会被吞掉)
├─ 在同一个 Future 上重复 set_result
16.2 结构化并发——TaskGroup(Python 3.11+)
# 传统方式:手动管理 Task,忘记处理异常
async def old_way():
task1 = asyncio.create_task(fetch('/1'))
task2 = asyncio.create_task(fetch('/2'))
task3 = asyncio.create_task(fetch('/3'))
# 如果 task1 抛异常,task2 和 task3 还在后台跑
results = await asyncio.gather(task1, task2, task3)
# TaskGroup:任一任务异常,自动取消所有兄弟任务
async def new_way():
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(fetch('/1'))
t2 = tg.create_task(fetch('/2'))
t3 = tg.create_task(fetch('/3'))
# 要么全部成功,要么全部取消并抛异常
TaskGroup 的关键语义: 结构化并发保证——子任务的生命周期不会超出父作用域。要么全成功,要么全失败。
16.3 超时处理
# 方式一:asyncio.timeout()(3.11+)
async with asyncio.timeout(5):
result = await fetch_slow()
# 方式二:asyncio.wait_for()
result = await asyncio.wait_for(fetch_slow(), timeout=5)
16.4 线程安全——跨线程操作事件循环
loop = asyncio.get_event_loop()
def thread_safe_submit():
# 在另一个线程里,不能直接 loop.create_task()
# 必须用 run_coroutine_threadsafe
fut = asyncio.run_coroutine_threadsafe(some_coro(), loop)
result = fut.result() # 阻塞等待结果(线程安全的 Future)
十七、asyncio 可扩展性与性能优化
17.1 瓶颈在哪
17.2 优化策略
| 策略 | 手段 | 效果 |
|---|---|---|
| 减少 Task 创建 | 复用连接池(aiohttp.TCPConnector) | 减少 socket 创建 |
| 控制并发数 | Semaphore 或自定义限流器 | 防止雪崩 |
| 替换事件循环 | uvloop(Linux) | 2x+ 性能提升 |
| CPU 计算分离 | run_in_executor(ProcessPoolExecutor) | 不阻塞事件循环 |
| 连接复用 | HTTP Keep-Alive、连接池 | 减少握手开销 |
| 批处理 | 合并小 I/O 为批量操作 | 减少事件循环轮次 |
17.3 连接池配置
import aiohttp
# 默认连接数限制为 100,高并发场景需调整
connector = aiohttp.TCPConnector(
limit=1000, # 总连接数上限
limit_per_host=100, # 每台主机的连接上限
ttl_dns_cache=300, # DNS 缓存时间
enable_cleanup_closed=True,
)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [session.get(f'https://api.example.com/page/{i}')
for i in range(5000)]
results = await asyncio.gather(*tasks)
17.4 大型应用的架构建议
# 1. 分层职责清晰
# ┌─────────────────────┐
# │ Route / Controller │ ← asyncio 处理请求路由
# ├─────────────────────┤
# │ Service / Business │ ← 纯 async 逻辑
# ├─────────────────────┤
# │ Data Access Layer │ ← aiosqlite / motor / asyncpg
# ├─────────────────────┤
# │ Blocking Executor │ ← 线程池/进程池封装
# └─────────────────────┘
# 2. 每个阻塞操作都有超时
async def query_with_timeout(db, sql):
try:
async with asyncio.timeout(10):
return await db.fetch(sql)
except asyncio.TimeoutError:
logger.error(f"DB 查询超时: {sql}")
raise
# 3. 所有外部调用都加 Semaphore 限流
sem = asyncio.Semaphore(100) # 全局并发上限
async def rate_limited_request(url):
async with sem:
return await session.get(url)
# 4. 关键路径打日志 + 监控
async def traced_coro(name):
start = time.monotonic()
try:
return await actual_coro()
finally:
elapsed = time.monotonic() - start
if elapsed > 1.0:
logger.warning(f"慢路径: {name} 耗时 {elapsed:.2f}s")
十八、总结
三个模型的本质
| 模型 | 本质 | 比喻 |
|---|---|---|
| 多线程 | OS 在 I/O 等待时帮你切到另一个线程 | 雇佣多个工人,等的时候换人 |
| 多进程 | 多个 CPU 核同时干活 | 开多个工厂,各不相干 |
| asyncio | 程序自己决定什么时候让出 CPU | 一个工人,知道什么时候该等什么时候该干 |
Python 异步演进全景
| 维度 | 上篇 | 中篇 | 下篇 |
|---|---|---|---|
| 核心主题 | 概念 + 演进 + 原理 | asyncio API + 调度 + 同步 | 实战 + 对比 + 性能 |
| 代码风格 | 手写 EventLoop/Future/Task | 标准库 asyncio | 生产级可靠模式 |
| 覆盖范围 | blocking → asyncio | 事件循环 → 同步原语 | GIL → 绿程 → uvloop → 优化 |
| 学习目标 | 理解 asyncio 是什么、为什么 | 掌握 asyncio 怎么用 | 知道怎么用好、不出错 |
三个模型的核心判断标准:
| 场景 | 最佳选择 | 原因 |
|---|---|---|
| CPU 密集计算 | 多进程 | GIL 限制,必须用多核 |
| 网络 I/O 高并发 | asyncio | 单线程万级,无切换开销 |
| 阻塞 SDK / 文件 I/O | 多线程(+asyncio) | run_in_executor 组合使用 |
| 混合负载 | 三者组合 | asyncio 主体 + 线程池 + 进程池 |
| 已有同步代码库快速异步化 | Gevent | monkey-patch 零改动 |
最后的话:
- 多线程是程序员驱动的并发——你告诉操作系统"帮我切"
- 多进程是计算机驱动的并行——你告诉 CPU "帮我算"
- asyncio 是程序自己驱动的异步——协程告诉事件循环"好了叫我"
选择并发模型不是技术炫技,是对你的 I/O 模式有清晰认知之后,选择成本最低的那个方案。
本文参考:
- denglj/aiotutorial —— 《深入理解Python异步编程》上篇、中篇(下篇创作中)
- 《深入理解Python异步编程 上》http://t.cn/R9W0JgN
- 《深入理解Python异步编程 中》https://mp.weixin.qq.com/s/cc_yM0waqSOqq8xfg1G79Q