跳到主要内容

深入理解 Python 异步编程:多线程、多进程与 asyncio

· 阅读需 27 分钟
Wuji
AI Engineer

本文是对 denglj/aiotutorial 系列文章《深入理解Python异步编程》上篇与中篇的梳理与总结,补充了多线程与多进程的独立讨论。原作者驹哥,教程深入浅出,建议阅读原文。


一、核心概念

阻塞 vs 非阻塞

# 阻塞 —— 调用结果返回之前,当前线程被挂起
sock.connect(('example.com', 80)) # 等 TCP 三次握手完成才返回

# 非阻塞 —— 调用立即返回,不论结果是否就绪
sock.setblocking(False)
sock.connect(('example.com', 80)) # 立即返回,但连接可能还没建立
阻塞非阻塞
调用方调用后线程挂起,等待结果调用后立即返回,线程继续执行
CPU 利用率I/O 等待期间 CPU 空闲不等待,但需轮询或事件通知
编程模型简单直观复杂(回调/事件)

同步 vs 异步

同步异步 描述的是被调用方的行为:

  • 同步调用:任务 A 调用任务 B,A 必须等 B 返回才能继续——"你等我有结果"
  • 异步调用:任务 A 调用任务 B,不等 B 返回就继续执行——B 完成后通过回调/事件通知 A——"有结果了我通知你"

并发 vs 并行

  • 并发(Concurrent):多个任务在同一时间段内交替执行,逻辑上同时(单核即可)
  • 并行(Parallel):多个任务在同一时刻同时执行,物理上同时(需要多核)

Python 的多线程是并发但非并行(受 GIL 限制),多进程是真正的并行


二、多线程——最朴素的并发方案

解决什么问题

同步阻塞模型下,每个 I/O 操作(connectsendrecv)都会挂起线程。CPU 在 I/O 等待期间完全空闲。

最直接的想法:一个请求一个线程,让操作系统来调度。

from concurrent import futures

def blocking_way():
sock = socket.socket()
sock.connect(('example.com', 80))
request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
sock.send(request.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
return response

def thread_way():
with futures.ThreadPoolExecutor(10) as executor:
futs = {executor.submit(blocking_way) for i in range(10)}
return len([fut.result() for fut in futs])

GIL 是怎么回事

GIL(Global Interpreter Lock) 是 CPython 解释器的一个互斥锁,保证任何时候只有一个线程在执行 Python 字节码。

但这不代表多线程没用:

  • I/O 密集型任务:多线程有效。 因为 I/O 操作(readwritesendrecv)会释放 GIL,等待 I/O 期间其他线程可以执行 Python 代码。
  • CPU 密集型任务:多线程无效。 计算型任务不释放 GIL,多个线程轮流抢占同一把锁,加上上下文切换开销,甚至比单线程还慢。
任务类型多线程效果原因
网络请求(I/O)✅ 有效recv 等待时释放 GIL
文件读写(I/O)✅ 有效read/write 释放 GIL
数学计算(CPU)❌ 无效不释放 GIL,多线程争锁
图像处理(CPU)❌ 无效同上

多线程的代价

问题说明
线程切换开销保存寄存器、TLB、调度器决策——万级线程时 CPU 被切换耗尽
内存占用每个线程约 50KB 栈空间,万级线程需要数百 MB
竞态条件共享变量需要加锁(LockRLock
调试困难死锁、活锁难以复现

这就是著名的 C10K 问题——当并发连接数达到一万以上时,多线程模型力不从心。


三、多进程——真正的并行方案

解决什么问题

多线程受 GIL 限制无法利用多核 CPU。要真正利用多核,必须使用多进程——每个进程有独立的 Python 解释器和内存空间,互不干扰:

from concurrent import futures
import math

def is_prime(n):
if n < 2:
return False
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
return False
return True

def process_way():
numbers = list(range(10_000_000, 10_001_000))
with futures.ProcessPoolExecutor(4) as executor: # 4 进程 = 4 核并行
return list(executor.map(is_prime, numbers))

线程 vs 进程

维度多线程多进程
内存空间同一进程内共享各自独立
GIL受限制不受限制
适用场景I/O 密集型CPU 密集型
创建开销~50KB/线程~10MB+/进程
通信方式直接读写共享变量(需加锁)PipeQueue、共享内存(需序列化)

进程间通信(IPC)

from multiprocessing import Process, Queue, Pipe, Value

# 1. Queue
def worker(q):
q.put('done')
q = Queue()
Process(target=worker, args=(q,)).start()
print(q.get())

# 2. Pipe
def pipe_worker(conn):
conn.send('hello')
conn.close()
parent, child = Pipe()
Process(target=pipe_worker, args=(child,)).start()
print(parent.recv())

# 3. 共享内存
counter = Value('i', 0)

适用场景

场景方案
CPU 密集计算多进程(唯一选择)
大量 I/O + 少量 CPU❌ 多进程浪费(进程切换成本高)
有状态服务❌ 多进程通信复杂
数据科学/ML✅ 多进程(joblibmultiprocessing

四、从同步阻塞到事件循环

4.1 同步阻塞

def blocking_way():
sock = socket.socket()
sock.connect(('example.com', 80)) # 阻塞
sock.send(request.encode('ascii')) # 阻塞
chunk = sock.recv(4096) # 阻塞

这是最直观的编程模型,但效率最低——一个线程只能处理一个请求,大部分时间在空等 I/O。

4.2 非阻塞 + 轮询

def nonblocking_way():
sock = socket.socket()
sock.setblocking(False) # 非阻塞模式
try:
sock.connect(('example.com', 80))
except BlockingIOError:
pass

# 轮询发送
while True:
try:
sock.send(data)
break
except OSError:
pass

# 轮询接收
while True:
try:
chunk = sock.recv(4096)
# ...
break
except OSError:
pass

setblocking(False) 让 I/O 调用立即返回,不再阻塞线程。但缺点也很明显——轮询是空转 CPU

4.3 I/O 多路复用——事件循环的诞生

操作系统提供了 I/O 多路复用(select/poll/epoll/kqueue)——把多个 Socket 注册到内核,内核在 Socket 就绪时通知应用:

from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ

selector = DefaultSelector() # Linux 用 epoll,macOS 用 kqueue

# 事件循环
def loop():
while not stopped:
events = selector.select() # 阻塞等待就绪事件
for key, mask in events:
callback = key.data # 取出回调
callback(key, mask) # 执行

这个架构——单线程 + 事件循环 + 非阻塞 I/O——是 Nginx、Node.js、Redis、HAProxy 等高性能服务的基础。

4.4 回调模型

class Crawler:
def fetch(self):
self.sock = socket.socket()
self.sock.setblocking(False)
try:
self.sock.connect(('example.com', 80))
except BlockingIOError:
pass
selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)

def connected(self, key, mask):
selector.unregister(key.fd)
self.sock.send(get.encode('ascii'))
selector.register(key.fd, EVENT_READ, self.read_response)

def read_response(self, key, mask):
chunk = self.sock.recv(4096)
if chunk:
self.response += chunk
else:
selector.unregister(key.fd)

这个模型已经能高效处理万级并发,但回调风格带来了新的问题。

4.5 回调地狱

def callback_1():
def callback_2():
def callback_3():
async_function(callback_3)
async_function(callback_2)
async_function(callback_1)

回调的硬伤:

问题表现
控制流反转不是"我调用",而是"你好了叫我"
错误处理分散每个回调单独处理异常
无法使用语言结构fortry/except 不能跨回调
组合困难两个异步操作顺序执行需要嵌套

我们需要一个方案:既有事件循环的高并发能力,又能像写同步代码一样自然。


五、生成器与协程

5.1 yield 的双向通信

Python 的生成器原本是单向迭代器。但 generator.send() 让它变成了可暂停、可恢复、可双向通信的协程:

def gen():
received = yield 'hello' # send('world') → received = 'world'
yield received

5.2 Future + Task —— 自制 asyncio

基于生成器,可以构建出 asyncio 的核心抽象:

class Future:
"""异步结果的占位符"""
def __init__(self):
self.result = None
self._callbacks = []

def add_done_callback(self, fn):
self._callbacks.append(fn)

def set_result(self, result):
self.result = result
for fn in self._callbacks:
fn(self)

class Task:
"""驱动协程的调度器"""
def __init__(self, coro): # fetch() 生成器对象
self.coro = coro
f = Future()
f.set_result(None)
self.step(f)

def step(self, future):
try:
next_future = self.coro.send(future.result) # 恢复执行
except StopIteration:
return
next_future.add_done_callback(self.step) # Future 完成再 step

用生成器改写爬虫:

class Crawler:
def fetch(self):
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(('example.com', 80))
except BlockingIOError:
pass
f = Future()

def on_connected():
f.set_result(None)

selector.register(sock.fileno(), EVENT_WRITE, on_connected)
yield f # 🎯 暂停,交出 Future

selector.unregister(sock.fileno())
sock.send(get.encode('ascii'))

while True:
f = Future()
def on_readable():
f.set_result(sock.recv(4096))
selector.register(sock.fileno(), EVENT_READ, on_readable)
chunk = yield f # 🎯 暂停
selector.unregister(sock.fileno())
if chunk:
self.response += chunk
else:
break

这就是 asyncio 的核心闭环——Future 占位 → Task 驱动 → EventLoop 调度。

5.3 yield from —— 协程组合

yield from 可以把异步操作提取为独立单元,实现真正的协程组合:

def connect(sock, address):
f = Future()
sock.setblocking(False)
try:
sock.connect(address)
except BlockingIOError:
pass
selector.register(sock.fileno(), EVENT_WRITE, on_connected)
yield from f # 委托等待
selector.unregister(sock.fileno())

def read(sock):
f = Future()
selector.register(sock.fileno(), EVENT_READ, on_readable)
chunk = yield from f
selector.unregister(sock.fileno())
return chunk

def read_all(sock):
response = []
chunk = yield from read(sock)
while chunk:
response.append(chunk)
chunk = yield from read(sock)
return b''.join(response)

class Crawler:
def fetch(self):
sock = socket.socket()
yield from connect(sock, ('example.com', 80)) # 像同步调用
sock.send(get.encode('ascii'))
self.response = yield from read_all(sock) # 像同步调用

yield from 让异步代码写起来和同步代码几乎一样——没有回调,没有显式注册,控制流是线性的。


六、async/await 与事件循环

6.1 原生协程语法

Python 3.5(PEP 492)引入 async / await,把生成器协程升级为语言原语:

async def fetch(url): # 原生协程函数
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response = await response.read() # await 取代 yield from
return response

await 后面只能跟 awaitable 对象

awaitable 类型说明
原生协程async def 返回的协程对象
@asyncio.coroutine旧版生成器协程(Python 3.4)
实现了 __await__ 的类Future-like 对象

6.2 事件循环

import asyncio

# 方式一:run_until_complete(3.7 之前)
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch('/1'))
loop.close()

# 方式二:asyncio.run(3.7+ 推荐)
asyncio.run(fetch('/1')) # 自动创建和关闭事件循环

事件循环策略——asyncio 通过策略模式抽象事件循环的创建:

asyncio.get_event_loop_policy() # 获取当前策略
asyncio.set_event_loop_policy(p) # 设置策略

# 替换为 uvloop(基于 libuv,比默认快 2x+)
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

事件循环的状态管理:

方法作用类比
run_until_complete(coro)运行到协程完成,返回结果打一次火,跑一趟
run_forever()一直运行直到 stop()引擎不熄火
stop()完成当前批次事件后停止熄火(还可再点火)
close()关闭循环,不能再启动报废

注意:

  • get_event_loop() 在主线程中有默认循环,子线程中调用会抛 RuntimeError——需在子线程内先 set_event_loop(new_loop())
  • 一个进程最好只使用一种事件循环策略

6.3 asyncio.run() 的演化

# Python 3.7:asyncio.run 登场
asyncio.run(main())

# Python 3.8+:asyncio.Runner 上下文管理器
with asyncio.Runner() as runner:
runner.run(main())

# Python 3.11+:asyncio.timeout() 上下文管理器
async with asyncio.timeout(5):
result = await fetch()

七、Future 与 Task

7.1 Future

Future 代表一个异步操作的未来结果。当协程执行到 await 时,会返回一个 Future——"等我有了结果再告诉你":

# 创建 Future
fut = loop.create_future()
fut = asyncio.Future()

# 常用方法
fut.set_result(value) # 设置结果(触发 done callbacks)
fut.result() # 获取结果(未就绪则抛出 InvalidStateError)
fut.add_done_callback(fn) # 注册回调 fn(fut)
fut.done() # 是否完成
fut.cancel() # 取消
fut.cancelled() # 是否已取消
fut.exception() # 获取异常

7.2 Task

TaskFuture 的子类,专门包装协程,驱动其执行:

# 创建 Task 的三种方式
task = asyncio.create_task(coro) # 3.7+ 推荐
task = asyncio.ensure_future(coro) # 通用(Future 也接受)
task = loop.create_task(coro) # 指定 loop

# 区别:
# create_task() 仅接受协程
# ensure_future() 接受协程(包装为 Task)和 Future(直接返回)

# Task 的方法
task.cancel() # 取消任务
asyncio.all_tasks() # 获取所有正在调度的任务
asyncio.current_task() # 获取当前执行的任务
task.add_done_callback(fn) # Future 的方法,Task 继承

八、多协程调度

8.1 gather

全部完成,结果按输入顺序返回:

async def main():
results = await asyncio.gather(
fetch('/1'),
fetch('/2'),
fetch('/3'),
return_exceptions=True # 异常不中断,作为结果返回
)
# results = [response1, response2, response3]

gather 返回的 Future 本身是一个 Future 对象await 它即等待全部结果。

8.2 wait

精细控制——支持超时和返回条件:

# 参数:return_when 可选值:
# ALL_COMPLETED — 全部完成(默认)
# FIRST_COMPLETED — 只要有一个完成
# FIRST_EXCEPTION — 遇到第一个异常

done, pending = await asyncio.wait(
[fetch('/1'), fetch('/2')],
timeout=2.0,
return_when=asyncio.FIRST_COMPLETED
)
# done — 已完成的任务集合
# pending — 未完成的任务集合

wait 返回 (done, pending) 两个集合,不自动抛出异常——需要通过 task.result() 手动检查。

8.3 as_completed

谁先完成先处理谁(顺序无关):

async def main():
for coro in asyncio.as_completed(
[fetch('/1'), fetch('/2'), fetch('/3')],
timeout=5.0
):
result = await coro # 谁先完成先返回

超时时抛 asyncio.TimeoutError

8.4 对比

函数返回值特点
gather结果列表(顺序对应)等全部完成,顺序固定
wait(done, pending)支持超时 + 条件判定
as_completed迭代器按完成顺序产出

九、同步原语

单线程协程之间仍然有竞态——如果两个协程先后 await 同一个操作,中间可能被切换:

counter = 0

async def bad():
global counter
temp = counter # 读
await asyncio.sleep(0) # 切到另一个协程
counter = temp + 1 # 写 ← 另一个协程可能已经改了 counter

9.1 Lock

lock = asyncio.Lock()

async def safe():
global counter
async with lock: # 加锁(协程版 with)
temp = counter
await asyncio.sleep(0)
counter = temp + 1

9.2 Event

event = asyncio.Event()

# 等待事件
async def waiter():
await event.wait() # 阻塞直到 set()
print('event set')

async def setter():
await asyncio.sleep(1)
event.set() # 通知所有 waiter

9.3 Condition

cond = asyncio.Condition()

async def consumer():
async with cond:
await cond.wait() # 等待通知
# 条件满足,继续执行

async def producer():
async with cond:
# 生产数据...
cond.notify() # 通知一个 waiter
# cond.notify_all() # 通知全部

9.4 Semaphore

限制并发数:

sem = asyncio.Semaphore(5) # 最多 5 个并发

async def limited_fetch(url):
async with sem:
return await fetch(url)

# 同时启动 100 个任务,但只有 5 个会在执行
await asyncio.gather(*[limited_fetch(url) for url in 100_urls])

BoundedSemaphoreSemaphore 的区别:Semaphore 可以 release() 超过初始值,BoundedSemaphore 不允许(用于防止逻辑错误)。

9.5 Queue

queue = asyncio.Queue(maxsize=100)

# 生产者
async def producer(q):
for i in range(10):
await q.put(f"item-{i}")

# 消费者
async def consumer(q, name):
while True:
item = await q.get()
print(f"{name}: {item}")
q.task_done()

async def main():
q = asyncio.Queue()
prods = [asyncio.create_task(producer(q)) for _ in range(2)]
cons = [asyncio.create_task(consumer(q, f"w{i}")) for i in range(3)]

await asyncio.gather(*prods)
await q.join() # 等待所有元素被处理
for c in cons:
c.cancel()

asyncio.Queue 的不同变体:

类型特征
Queue先进先出(FIFO)
PriorityQueue按优先级取出
LifoQueue后进先出(栈)

十、与多线程/多进程结合

10.1 为什么需要结合

asyncio 的协程期望所有操作都是非阻塞的。但现实中有大量阻塞操作无法绕过:

  • 文件 I/O(openreadwrite
  • 旧版数据库驱动(psycopg2pymysql
  • requests 库(非 httpx
  • CPU 密集型计算

解决方案:把阻塞操作丢到线程池或进程池,asyncio 的事件循环不被阻塞。

10.2 run_in_executor

import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

async def handle():
loop = asyncio.get_running_loop()

# 阻塞 I/O → 线程池
with ThreadPoolExecutor() as pool:
data = await loop.run_in_executor(
pool,
requests.get, 'https://api.example.com/data'
)

# CPU 密集 → 进程池
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool,
heavy_compute, data.text
)

本质run_in_executor 把同步函数提交到独立的线程/进程执行,返回 asyncio.Future 对象——协程可以 await 它。

10.3 实战组合

import asyncio
from concurrent.futures import ProcessPoolExecutor
import hashlib

async def process_large_file(file_path: str):
loop = asyncio.get_running_loop()

# 1. asyncio 做网络 I/O
metadata = await fetch_metadata(file_path)

# 2. 文件读写 → 线程池
raw_data = await loop.run_in_executor(
ThreadPoolExecutor(),
read_file, file_path
)

# 3. CPU 计算 → 进程池
hash_result = await loop.run_in_executor(
ProcessPoolExecutor(),
sha256_hash, raw_data
)

return hash_result
组件模型为什么
网络 I/Oasyncio高并发、非阻塞
文件/DB I/O线程池阻塞操作,释放 GIL
CPU 计算进程池需要用满多核

十一、Python 版本演进

版本asyncio 变化
3.4asyncio 进入标准库,@asyncio.coroutine + yield from
3.5async/await 语法(PEP 492),原生协程诞生
3.6异步生成器(async forasync yield
3.7asyncio.run() 一站式入口,asyncio.create_task()
3.8asyncio.Runner 上下文管理器
3.9asyncio.to_thread() 简化 run_in_executor
3.10asyncio.TaskGroup 结构化并发(PEP 654)
3.11asyncio.Barrierasyncio.timeout() 上下文管理器
3.12TaskGroup 稳定性提升,各类性能优化
3.13逐步移除弃用 API,asyncio 运行效率持续改善

十二、GIL 对异步编程的深层影响

12.1 GIL 的本质

GIL 不是 Python 语言的特性,而是 CPython 解释器的实现细节。Jython、IronPython、PyPy(部分版本)没有 GIL。

GIL 存在的理由:CPython 的内存管理不是线程安全的(引用计数),加一把全局大锁是最简单的解决方案。

12.2 GIL 对三模型的影响

模型GIL 影响有效 CPU 利用率
多线程(CPU)❌ 线程抢同一把锁最多 1 核
多线程(I/O)✅ I/O 时释放 GIL接近多核(等待不占 GIL)
多进程✅ 每个进程独立 GIL满核
asyncio(CPU)❌ 协程里做计算同样阻塞事件循环最多 1 核
asyncio(I/O)✅ await 让出控制权单核但吞吐极高

12.3 asyncio 能否绕过 GIL

不能。asyncio 仍然是单线程,GIL 仍然在。但 asyncio 的优势不在于并行,而在于 I/O 等待时不占 GIL + 没有线程切换开销

# 错误:在协程里做 CPU 密集计算,阻塞整个事件循环
async def bad_coro():
for i in range(10_000_000): # 占用 GIL 不释放
_ = i * i

# 正确:CPU 密集计算丢到进程池
async def good_coro():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
ProcessPoolExecutor(),
lambda: [i * i for i in range(10_000_000)]
)

十三、asyncio 踩坑经验

13.1 事件循环策略冲突

# 坑:主线程有默认循环,子线程没有
import asyncio

def thread_func():
loop = asyncio.get_event_loop() # RuntimeError!子线程没有默认循环

# 正确:在子线程内手动设置
def thread_func():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(some_coro())

经验: 一个 Python 进程最好只有一种事件循环策略,只使用一个事件循环对象。多线程若需各自的事件循环,需手动 new_event_loop() + set_event_loop()

13.2 协程忘记 await

# 坑:忘记 await,协程不会执行
async def main():
fetch('/1') # ❌ 只是创建了协程对象,没执行!
await fetch('/2') # ✅

# 坑:gather 里传协程对象和传 Task 的区别
# 协程对象是惰性的,Task 创建即调度
task = asyncio.create_task(fetch('/1'))
await asyncio.gather(task, fetch('/2')) # 都正确

13.3 阻塞调用阻塞事件循环

async def bad_handler():
# ❌ time.sleep() 是阻塞的,它不会释放 GIL 给其他协程
time.sleep(5)

# ✅ 正确:asyncio.sleep() 会让出控制权
await asyncio.sleep(5)

# ❌ requests.get() 也是阻塞的
response = requests.get('https://api.example.com')

# ✅ 正确:用 aiohttp 或 run_in_executor
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com') as resp:
...

13.4 回调函数异常丢失

使用 call_soon / call_later 注册的回调函数,如果内部抛出异常,不会自动传播到事件循环外。需要用 try/except 包裹:

def fragile_callback():
1 / 0 # 异常被事件循环吞掉,没有任何输出

loop.call_soon(fragile_callback)

# 正确做法
def safe_callback():
try:
1 / 0
except Exception as e:
print(f"回调异常: {e}")

loop.call_soon(safe_callback)

13.5 Future 状态混乱

fut = asyncio.Future()

# 错误:对已完成的 Future 重复 set_result
fut.set_result('done')
fut.set_result('again') # InvalidStateError!

# 正确:先检查状态
if not fut.done():
fut.set_result('value')

13.6 忘记处理 Task 取消

async def worker():
try:
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
# 必须处理清理逻辑
print("worker 被取消,正在清理...")
await cleanup()
raise # 或者不 raise 则取消被吞掉

task = asyncio.create_task(worker())
await asyncio.sleep(3)
task.cancel()

十四、回调 vs 协程 vs 绿程 vs 线程

14.1 四种并发模型

模型调度方式切换成本并发数代表
回调事件循环驱动函数调用级别十万级Node.js、Twisted
协程程序主动 await函数调用级别十万级asyncio
绿程程序主动 yield函数调用级别万级Gevent、Eventlet
线程OS 抢占式微秒级(内核态)千级CPython threading

14.2 协程 vs 绿程

绿程(Green Thread) 是用户态线程——在单线程内通过 yield 模拟多线程调度。Python 中 Gevent 是典型代表:

from gevent import monkey; monkey.patch_all()
import requests

# 看起来就是同步代码,但内部是协程调度
urls = ['https://api.example.com'] * 100
results = [requests.get(url) for url in urls] # 自动并发
维度asyncioGevent
语法async/await 显式标记monkey-patch,代码无侵入
心智负担必须清楚哪些是异步函数看起来全是同步代码
隐式行为显式 await,清晰可控monkey-patch 可能打补丁到 C 扩展
生态需要 aio 版库(aiohttp、httpx)兼容同步库(requests 自动变协程)
调试asyncio 有标准调试工具monkey-patch 后调用栈混乱
控制力细粒度控制挂起点隐式 yield,难以预测

经验: Gevent 适合已有同步代码库快速异步化(monkey-patch 零改动),asyncio 适合新项目(显式、可控、生态健康发展)。

14.3 协程 vs 线程

维度协程线程
调度协作式(程序自己让出)抢占式(OS 强制切换)
切换成本函数调用级(~50ns)内核态切换(~1μs)
栈大小~几 KB~1MB(默认 8MB)
万级并发✅ 轻松❌ 内存和切换开销爆炸
竞态共享变量仍需锁共享变量仍需锁
多核❌ 单线程❌ GIL 限制(CPython)

十五、uvloop/libuv vs asyncio 默认事件循环

15.1 事件循环的实现

事件循环底层平台性能
asyncio.SelectorEventLoop原生 selectors 模块Linux/macOS基准
asyncio.ProactorEventLoopWindows IOCPWindows 默认基准
uvlooplibuv(C 语言实现)Linux/macOS2x+
为什么更快libuv 减少 Python 函数调用 + 批量事件处理

15.2 uvloop 使用

import uvloop
import asyncio

# 全局替换(一行代码)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

# 之后所有 asyncio API 自动使用 uvloop
asyncio.run(main())

# 或者只替换当前循环
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main())

性能数据(来自 uvloop 官方 benchmark):uvloop 比默认 asyncio 事件循环快 2x 以上,某些场景接近 Go 语言的 goroutine 性能。

15.3 什么时候值得上 uvloop

场景建议
高并发网络服务(>10k QPS)✅ 值得,立竿见影
普通 Web 应用(~1k QPS)差异不大,默认即可
Windows 环境❌ uvloop 不支持 Windows
调试/开发阶段先用默认,上线前切换

十六、异步编程指导细则

16.1 最佳实践

✅ DO
├─ 使用 asyncio.run() 作为入口(不要手动管理 loop)
├─ 使用 asyncio.create_task() 创建任务(不要用 ensure_future)
├─ 阻塞操作用 run_in_executor 丢到线程/进程池
├─ 使用 Semaphore 控制并发数(防雪崩)
├─ 使用 try/except CancelledError 处理任务取消
├─ 使用 TaskGroup 做结构化并发(3.11+)
├─ 优先使用 aiohttp/httpx/aiosqlite 等原生异步库

❌ DON'T
├─ 在协程里用 time.sleep()(用 asyncio.sleep())
├─ 在协程里做 CPU 密集计算(用 run_in_executor)
├─ 忘记 await 协程
├─ 多个线程共用同一个事件循环(用 run_coroutine_threadsafe)
├─ 在回调里抛异常不处理(异常会被吞掉)
├─ 在同一个 Future 上重复 set_result

16.2 结构化并发——TaskGroup(Python 3.11+)

# 传统方式:手动管理 Task,忘记处理异常
async def old_way():
task1 = asyncio.create_task(fetch('/1'))
task2 = asyncio.create_task(fetch('/2'))
task3 = asyncio.create_task(fetch('/3'))
# 如果 task1 抛异常,task2 和 task3 还在后台跑
results = await asyncio.gather(task1, task2, task3)

# TaskGroup:任一任务异常,自动取消所有兄弟任务
async def new_way():
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(fetch('/1'))
t2 = tg.create_task(fetch('/2'))
t3 = tg.create_task(fetch('/3'))
# 要么全部成功,要么全部取消并抛异常

TaskGroup 的关键语义: 结构化并发保证——子任务的生命周期不会超出父作用域。要么全成功,要么全失败。

16.3 超时处理

# 方式一:asyncio.timeout()(3.11+)
async with asyncio.timeout(5):
result = await fetch_slow()

# 方式二:asyncio.wait_for()
result = await asyncio.wait_for(fetch_slow(), timeout=5)

16.4 线程安全——跨线程操作事件循环

loop = asyncio.get_event_loop()

def thread_safe_submit():
# 在另一个线程里,不能直接 loop.create_task()
# 必须用 run_coroutine_threadsafe
fut = asyncio.run_coroutine_threadsafe(some_coro(), loop)
result = fut.result() # 阻塞等待结果(线程安全的 Future)

十七、asyncio 可扩展性与性能优化

17.1 瓶颈在哪

17.2 优化策略

策略手段效果
减少 Task 创建复用连接池(aiohttp.TCPConnector减少 socket 创建
控制并发数Semaphore 或自定义限流器防止雪崩
替换事件循环uvloop(Linux)2x+ 性能提升
CPU 计算分离run_in_executor(ProcessPoolExecutor)不阻塞事件循环
连接复用HTTP Keep-Alive、连接池减少握手开销
批处理合并小 I/O 为批量操作减少事件循环轮次

17.3 连接池配置

import aiohttp

# 默认连接数限制为 100,高并发场景需调整
connector = aiohttp.TCPConnector(
limit=1000, # 总连接数上限
limit_per_host=100, # 每台主机的连接上限
ttl_dns_cache=300, # DNS 缓存时间
enable_cleanup_closed=True,
)

async with aiohttp.ClientSession(connector=connector) as session:
tasks = [session.get(f'https://api.example.com/page/{i}')
for i in range(5000)]
results = await asyncio.gather(*tasks)

17.4 大型应用的架构建议

# 1. 分层职责清晰
# ┌─────────────────────┐
# │ Route / Controller │ ← asyncio 处理请求路由
# ├─────────────────────┤
# │ Service / Business │ ← 纯 async 逻辑
# ├─────────────────────┤
# │ Data Access Layer │ ← aiosqlite / motor / asyncpg
# ├─────────────────────┤
# │ Blocking Executor │ ← 线程池/进程池封装
# └─────────────────────┘

# 2. 每个阻塞操作都有超时
async def query_with_timeout(db, sql):
try:
async with asyncio.timeout(10):
return await db.fetch(sql)
except asyncio.TimeoutError:
logger.error(f"DB 查询超时: {sql}")
raise

# 3. 所有外部调用都加 Semaphore 限流
sem = asyncio.Semaphore(100) # 全局并发上限

async def rate_limited_request(url):
async with sem:
return await session.get(url)

# 4. 关键路径打日志 + 监控
async def traced_coro(name):
start = time.monotonic()
try:
return await actual_coro()
finally:
elapsed = time.monotonic() - start
if elapsed > 1.0:
logger.warning(f"慢路径: {name} 耗时 {elapsed:.2f}s")

十八、总结

三个模型的本质

模型本质比喻
多线程OS 在 I/O 等待时帮你切到另一个线程雇佣多个工人,等的时候换人
多进程多个 CPU 核同时干活开多个工厂,各不相干
asyncio程序自己决定什么时候让出 CPU一个工人,知道什么时候该等什么时候该干

Python 异步演进全景

维度上篇中篇下篇
核心主题概念 + 演进 + 原理asyncio API + 调度 + 同步实战 + 对比 + 性能
代码风格手写 EventLoop/Future/Task标准库 asyncio生产级可靠模式
覆盖范围blocking → asyncio事件循环 → 同步原语GIL → 绿程 → uvloop → 优化
学习目标理解 asyncio 是什么、为什么掌握 asyncio 怎么用知道怎么用好、不出错

三个模型的核心判断标准:

场景最佳选择原因
CPU 密集计算多进程GIL 限制,必须用多核
网络 I/O 高并发asyncio单线程万级,无切换开销
阻塞 SDK / 文件 I/O多线程(+asyncio)run_in_executor 组合使用
混合负载三者组合asyncio 主体 + 线程池 + 进程池
已有同步代码库快速异步化Geventmonkey-patch 零改动

最后的话:

  • 多线程是程序员驱动的并发——你告诉操作系统"帮我切"
  • 多进程是计算机驱动的并行——你告诉 CPU "帮我算"
  • asyncio 是程序自己驱动的异步——协程告诉事件循环"好了叫我"

选择并发模型不是技术炫技,是对你的 I/O 模式有清晰认知之后,选择成本最低的那个方案。


本文参考: