asyncio 入门

asyncio 入门

并发模型

asyncio 是在 Python 3.4 中引入的,用来在单线程中实现并发的一种方式。 刚接触它时,总是用多线程的思路去理解,结果总是困惑重重。要真正理解它,先得搞清楚 Python 中的并发到底是怎么回事。由于 Python 的 GIL 限制,同一 Python 进程中,任意时刻只能有一个线程在执行 Python 字节码。因此,除了开多个进程,Python 中的并发通常是依赖 I/O 等待来“切换任务”的,而不是多个 CPU 核心同时跑 Python 代码。

总体上,常见的并发模型可以分为两类:

抢占式多任务:在这种模型中,由操作系统来决定任务之间何时切换。操作系统会把 CPU 时间切分成一个个时间片,周期性地暂停当前任务,把执行权交给其他任务。这种“切换”不需要应用程序显式配合,因此叫抢占式。抢占式多任务通常通过多线程和多进程来实现。

协作式多任务:在这种模型中,操作系统不会强行打断当前任务,而是由任务自己决定何时交出执行权。 换句话说,应用程序中的代码会显式写出“我现在遇到 IO 可以暂停一下了,让其他任务先执行”的逻辑。这种方式下,多个任务之间需要互相信任,都要主动在合适的时机“让出”执行权。

因此,可以这样理解:抢占式就像大家一起抢麦克风,谁说着说着被主持人(操作系统)打断了,麦克风就递给下一个人;协作式则像大家按顺序发言,谁觉得自己要翻笔记(I/O 等待)了,就主动把麦克风递给别人。两者的根本区别在于抢占式依赖操作系统调度,任务切换是被动的;协作式依赖应用程序显式切换,任务切换是主动的。

asyncio 正是基于协作式多任务模式实现并发的:当程序运行到一个可等待的时机(例如等待 I/O、网络请求或定时器)时,会在代码中显式使用 await 来让出执行权。此时,事件循环会调度其他任务运行;等这个等待结束后,再回到原任务继续执行。

asyncio 基本概念

我们先来看一个例子,感受一下 asyncio 的运行机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio

async def delay(delay_seconds: int) -> int:
print(f'sleeping for {delay_seconds} second(s)')
await asyncio.sleep(delay_seconds)
print(f'finished sleeping for {delay_seconds} second(s)')
return delay_seconds

async def hello_every_second():
for i in range(2):
print("I'm running other code while I'm waiting!")
await asyncio.sleep(1)

async def main():
first_delay = asyncio.create_task(delay(3))
second_delay = asyncio.create_task(delay(3))
await hello_every_second()
await first_delay
await second_delay

asyncio.run(main())

运行结果大致如下:

1
2
3
4
5
6
I'm running other code while I'm waiting!
sleeping for 3 second(s)
sleeping for 3 second(s)
I'm running other code while I'm waiting!
finished sleeping for 3 second(s)
finished sleeping for 3 second(s)

可以看到:

  • first_delaysecond_delay 几乎同时启动,立即输出各自的 “sleeping…”。
  • hello_every_second 在两次 1 秒的等待间隔中打印提示信息。
  • 大约 3 秒后,两个延迟任务几乎同时完成。

整个程序只耗时约 3 秒,而不是串行运行的 3 + 3 + 2 秒,这就是 asyncio 并发的效果。这篇博客的主要目的,就是通过对一系列 asyncio 基础概念的介绍和梳理,最终理解这个例子的运行流程和逻辑。

协程(Coroutine)

“计算机科学的任何问题,都可以通过增加一个中间层来解决。”

(Any problem in computer science can be solved by another level of indirection.)

​ —— David Wheeler

asyncio 的世界里,这个“中间层”就是协程

普通的 Python 函数一旦调用,就会从头到尾执行完,中间无法被外部调度。要想把他们能够被 asyncio 调度,就需要在外面再包一层——也就是把函数变成协程函数。协程是一种特殊的函数——它能在执行过程中主动暂停await),把控制权交还给事件循环,让其他任务有机会运行,然后在合适的时候再恢复执行。相比于线程,协程拥有更高的执行效率。这是因为线程的切换需要操作系统来调度,涉及到更复杂的上下文切换,如保存和恢复寄存器、栈信息等。而协程切换只需要保存和恢复少量状态,开销更小。

在 Python 中,用 async 关键字定义的函数就是协程函数,调用它不会立即执行,而是返回一个协程对象(coroutine object):

1
2
3
4
5
6
async def my_coroutine():
return 1 + 1

coro = my_coroutine()
print(coro)
# <coroutine object my_coroutine at 0x...>

这个协程对象只是“待执行计划”,要想真正运行它,必须把它交给事件循环(event loop)调度。

事件循环

协程的运行

协程只是一个“可运行的对象”,它本身不会自动执行。要让协程真正运行起来,需要交给事件循环调度,而在 asyncio 里,这通常是通过把协程包装成任务(Task)并提交到事件循环中完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio

async def main(name):
print(f'dalu! from {name}')

loop = asyncio.new_event_loop()

task_1 = loop.create_task(main('task_1')) # 把协程包装成 Task 并注册到 loop
task_2 = loop.create_task(main('task_2')) # 再注册一个任务

try:
loop.run_until_complete(task_1) # 直到 task_1 完成才返回
finally:
loop.close()

这里我们通过 asyncio.new_event_loop() 创建了一个事件循环,我们通过 loop.create_task(main()) 注册任务,通过 loop.run_until_complete(task_1) 来启动任务。用 Python 运行上述代码,我们会发现输出是:

1
2
dalu! from task_1
dalu! from task_2

两个任务都完成了。这个例子可以很好地帮助我们理解事件循环的工作逻辑:为什么 task_2 也打印了?

这是因为 run_until_complete(x) 的作用是驱动事件循环,直到 x 完成。在这段时间内,事件循环会正常调度所有已注册且就绪的任务,而不仅仅是我们传入的那个任务。由于 task_2 在同一个事件循环中注册,并且处于就绪状态,所以它也会在这个“运行周期”里被调度执行。上述代码的具体执行流程为:

  1. 创建 task_1task_2,它们都登记在事件循环的任务队列中(但协程体还没开始执行)。

  2. 调用 loop.run_until_complete(task_1),事件循环开始运转。

  3. 事件循环从任务队列中取出就绪的任务依次执行:

    • 首先调度到 task_1:打印 "dalu! from task_1",任务完成。
    • 继续调度剩下的就绪任务 task_2:打印 "dalu! from task_2",任务完成。
  4. 此时 task_1 已完成,run_until_complete() 的退出条件满足,函数返回。

  5. 由于事件循环会统一推进任务队列,所以即使只等待了 task_1,其他就绪任务也会被执行。

所以事件循环的关键之处在于:它不会只关心我们等待的那个任务,而是在运行过程中,把所有处于就绪状态的任务都推进一轮。 实际上,即使我们把上例中 loop.run_until_complete() 的参数改成 task_2,事件循环依然会先执行 task_1,再执行 task_2。原因是我们在启动事件循环之前,已经先把 task_1 注册到任务队列中,而 loop.run_until_complete() 的作用只是——启动事件循环,并在指定任务完成前一直运行。它本身并不会改变任务的调度顺序,顺序完全由事件循环的调度机制和任务注册的先后顺序决定。

事件循环的功能

从上面的例子可以看出,事件循环是整个异步运行时的“心脏”。在 asyncio 中,事件循环的职责包括:

  1. 维护任务队列(Task Queue)
  2. 调度执行任务:从队列中取出就绪任务运行
  3. 挂起等待 I/O:遇到阻塞 I/O(如 await asyncio.sleep())时,挂起任务并交由操作系统处理
  4. 唤醒任务:当 I/O 完成后,任务会被重新放回队列等待执行
  5. 循环往复,直到所有任务完成

可以把事件循环想象成一个高速旋转的“调度轮”,每次转动都会处理队列里的任务,只要它们已经准备好运行,就会被推进一步。

在实际开发中,如果不需要手动管理事件循环,可以直接用 asyncio.run() 来启动协程。它会帮我们自动创建一个新的事件循环,同时将协程封装为任务(Task)。之后,它会运行事件循环直到任务完成,并在最后自动关闭事件循环。

1
2
3
4
5
6
import asyncio

async def main():
print('dalu!')

asyncio.run(main())

这种方式简单、安全,推荐在绝大多数情况下使用,除非我们有复杂的事件循环管理需求(如在已有的循环中动态添加任务)。在一般情况下,asyncio.run(main()) 就是我们的 asyncio 应用程序的入口点。它只执行 main() 这个主协程,然后由该协程来启动应用程序的所有其他组件。

await 与异步调度

前面我们已经看过事件循环的基本运行方式。在没有 await 的情况下,事件循环的行为就像一个普通的任务队列:所有注册的任务(Task)会被按顺序运行,直到它们结束。但 asyncio 的真正威力在于——它通过协作式多任务,让任务在“可以等待的时候”主动交出执行权,从而在单线程内实现并发。这就是 await 的核心作用。

await X 表示暂停当前协程,并把执行权交还给事件循环,直到 X 完成,当前协程才会恢复。其中 X 必须是 awaitable 对象(如协程、asyncio.FutureTask 等)。事件循环会在这段时间去调度运行其他处于就绪状态的任务,而不是傻等。

我们举一个最简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
import asyncio

async def add_one(number: int) -> int:
return number + 1

async def main() -> None:
one_plus_one = await add_one(1)
two_plus_one = await add_one(2)
print(one_plus_one)
print(two_plus_one)

asyncio.run(main())

在这个例子里:

  1. asyncio.run(main()) 会把 main() 封装成一个 Task 并启动事件循环。
  2. main() 开始执行,到 await add_one(1) 时:
    • 暂停 main 这个 Task,把执行权交回事件循环。
    • 事件循环运行 add_one(1),它是阻塞式协程,因此直接执行完返回结果。
    • 恢复 main,把结果赋给 one_plus_one
  3. 再遇到 await add_one(2),重复同样的过程。
  4. 最终打印 23

这里 add_one 没有真正的异步等待,所以事件循环每次都是“马上”完成它。


让我们看看 await 遇到非阻塞任务时会发生什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
import time

async def my_print():
print('my_print() Task starts.')
time.sleep(1) # 阻塞
print('my_print() Task ends.')

async def main():
print('main() Task starts.')
my_sleep = asyncio.create_task(asyncio.sleep(3))
my_print_task = asyncio.create_task(my_print())
await my_sleep
print('main() Task ends.')

asyncio.run(main())

在这个例子里:

  1. 事件循环启动,把 main() 作为一个 Task 加入队列。
  2. main() 运行,创建了两个新任务:
    • my_sleep:内部是 asyncio.sleep(3)非阻塞
    • my_print_task:内部是 time.sleep(1)阻塞
  3. 执行到 await my_sleep
    • 暂停 main,事件循环开始调度其他任务
  4. 事件循环发现 my_sleep 要等 3 秒才能完成且是非阻塞的,因此启动它后暂时搁置它,先去运行 my_print_task
  5. 问题来了my_print_task 用的是 time.sleep(1),这是阻塞调用,事件循环无法切走,只能等它运行完。
  6. my_print_task 结束后,事件循环回到 my_sleep,等待它的 3 秒计时器结束。
  7. 计时器结束时,事件循环收到 I/O 完成的事件(底层是 Future 的回调机制),把 my_sleep 标记为完成。
  8. 事件循环唤醒 main,继续执行 print('main() Task ends.'),结束。

在这个两个例子中我们要记住几个关键性的结论,来帮助我们理解 await

  • main 本身就是一个 Task,它可以 await 其他 Task。
  • await X 会让当前 Task 挂起,并把执行权交还事件循环。
  • 事件循环会在当前 Task 等待时,去运行其他就绪任务。
  • “这是怎么通知的?”——依赖 Future 的回调机制:当一个异步 I/O 完成时,事件循环会收到事件,标记对应的 Task 为可运行状态,并将它重新放回调度队列。
  • 阻塞代码(如 time.sleep)会卡住整个事件循环,破坏并发性。

任务状态

我们已经知道,事件循环在执行异步任务时,是以“任务(Task)”为单位进行调度的。那么它是如何知道哪些任务该执行、哪些正在等待、哪些已经完成或被取消的呢?这是因为,在 asyncio 中,每个任务都有明确的生命周期状态。事件循环就是通过这些状态,来高效管理和调度任务的。

stateDiagram-v2
    [*] --> Created
    Created --> Scheduled: 被注册到事件循环
    Scheduled --> Running: 被调度执行
    Running --> Suspended: 遇到 await
    Suspended --> Scheduled: awaitable 完成,任务重新就绪
    Running --> Done: 执行完成或抛出异常
    Suspended --> Cancelled: 被取消
    Scheduled --> Cancelled: 被取消
    Cancelled --> Done
状态 含义
Created 协程被调用,创建了协程对象,还没被事件循环调度。
Scheduled 通过 create_task() 注册到事件循环,进入等待调度的队列。
Running 当前被事件循环调度执行。
Suspended 执行中遇到 await,任务被挂起,等待外部操作完成(如 I/O、sleep)。
Done 协程执行完毕或抛出异常,任务完成。
Cancelled 调用 .cancel() 主动取消了任务。未处理的 CancelledError 仍会进入 Done。

在我们这一节的第一个例子中:

1
2
3
4
5
async def main() -> None:
one_plus_one = await add_one(1)
two_plus_one = await add_one(2)
print(one_plus_one)
print(two_plus_one)

我们可能会以为 add_one(1)add_one(2) 是单独的任务,其实不是。实际上在这个例子里,main() 是事件循环调度的 唯一一个任务。 add_one(1)add_one(2) 只是 main() 内部 await 的 协程对象,它们没有被注册为独立任务,因此不会被事件循环“并行调度”,而是串行执行的。

举个更直观的例子来理解:

1
2
async def main():
await asyncio.sleep(1)

这里 main() 是被事件循环调度的任务。虽然我们 await asyncio.sleep(1),但事件循环只关心 main()

  • 一旦执行到 await asyncio.sleep(1),事件循环会暂停 main() 并开始等待这个 sleep 的完成
  • 一旦 sleep 完成,事件循环就会恢复 main() 的执行

所以即使 await asyncio.sleep(1) 是非阻塞的,但事件循环里没有其他就绪任务时,它就只能等待这个 sleep 完成,然后再回来继续 main()。但是如果我们把这个例子稍作修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio
import time

async def my_print():
print('my_print() Task starts.')
time.sleep(5) # ⚠️ 阻塞
print('my_print() Task ends.')

async def main():
print('main() Task starts.')
my_print_task = asyncio.create_task(my_print())
await asyncio.sleep(5)
print('main() Task ends.')

asyncio.run(main())

这个程序运行的流程是:

  1. asyncio.run(main()) 启动事件循环,运行 main() 协程
  2. 执行 print('main() Task starts.')
  3. 创建并注册 my_print_task 到事件循环,但此时没有调度它(只是登记)
  4. 执行 await asyncio.sleep(5)
    • 事件循环会将当前任务 main() 挂起(进入 Suspended 状态)
    • 然后为 sleep(5) 安排一个 5 秒的定时器
    • 控制权立即返回事件循环,让它可以继续调度其它就绪任务
  5. 事件循环看到 my_print_task 是就绪的,就调度它执行
  6. 执行 my_print() 中的 time.sleep(5)整个事件循环被阻塞 5 秒
  7. 5 秒后,my_print() 执行完毕,事件循环恢复运行
  8. 恰好这时 asyncio.sleep(5) 也“完成”了(本质是定时器 Future 到点)
  9. 恢复 main() 执行,打印 'main() Task ends.'

所以在这个例子里,await asyncio.sleep(5) 触发了事件循环开始调度任务,先调度到了 my_print(),虽然它阻塞了事件循环 5 秒,但刚好 main() 的 sleep 也需要 5 秒,所以这两个任务“刚好并行”。

阻塞与非阻塞协程

asyncio 中,并不是所有协程都是“异步”的 —— 协程 ≠ 非阻塞任务。是否阻塞,取决于协程内部的操作是否会主动让出控制权。

一个非阻塞的协程,它的特征是:

  • 会在运行中通过 await 调用其它 awaitable 对象(如 asyncio.sleep, asyncio.open_connection 等)
  • 每当遇到 await,它就把控制权交回给事件循环,允许其他任务被调度执行

比如:

1
2
3
async def non_blocking_task():
await asyncio.sleep(1) # 非阻塞,立即交还控制权
print("woke up!")

asyncio.sleep() 就是一个模拟的非阻塞协程:它不做实际的 I/O 操作,而是通过内部设定一个定时器事件,挂起当前任务、释放控制权,直到定时器触发再继续执行。

而阻塞的协程,是指虽然形式上是协程(用 async def 定义),但它内部调用了同步阻塞代码,比如:

1
2
3
async def blocking_task():
time.sleep(3) # 阻塞整个线程!
print("done")

即便是 async def 包裹的函数,如果内部用了 time.sleep()、文件 IO、网络请求等同步阻塞代码,它依然会阻塞整个线程,导致事件循环卡住、其它协程无法被调度运行。

套接字

套接字(socket)是网络通信的基础抽象,提供了在不同进程之间进行数据交换的能力。通常,我们使用它来构建客户端与服务器之间的连接:

  • 发送数据:将字节数据写入套接字,由操作系统通过网络发送出去;
  • 接收数据:等待远端服务器返回数据,从套接字中读取字节内容。

套接字支持多种协议,最常见的是基于 TCP 的流式连接(AF_INET + SOCK_STREAM)。

阻塞与非阻塞 I/O

在默认模式下,Python 中的 socket 是阻塞式的。

这意味着当调用如下代码时:

1
data = sock.recv(1024)

如果没有数据可读,程序会一直卡在这行代码上,直到远端返回数据为止。在此期间,当前线程被完全阻塞,无法执行其他操作。

非阻塞 I/O 的模式中,socket 是非阻塞式的,操作系统在后台监视多个套接字,一旦其中一个就绪,再通知我们继续处理。不同操作系统为非阻塞 I/O 提供了不同的事件通知机制:

操作系统 底层通知机制 描述
Linux epoll 高性能、边缘触发的事件通知机制
macOS kqueue BSD 系统特有的事件机制
Windows IOCP 基于完成端口(Completion Port)的异步模型

这些系统调用允许我们注册多个非阻塞套接字,一旦某个 socket 可读/可写,操作系统就会通知我们,这样就不需要去反复轮询检查了。

asyncio 正是利用操作系统提供的这些底层 I/O 机制,来实现高效的非阻塞并发。

当我们写下如下代码:

1
2
reader, writer = await asyncio.open_connection('example.com', 80)
data = await reader.read(1024)

实际发生的事情:

  1. asyncio.open_connection 内部创建了 socket,并设置为非阻塞;
  2. 将该 socket 注册到事件循环;
  3. 当前协程 await 时被挂起,事件循环开始调度其他任务;
  4. 一旦 socket 可读,操作系统(如 epoll)通知事件循环;
  5. 事件循环恢复该协程,读取数据并继续往下执行。

最初的例子

到博客的这里,我们就可以理解最开始的例子的执行流程和逻辑了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio

async def delay(delay_seconds: int) -> int:
print(f'sleeping for {delay_seconds} second(s)')
await asyncio.sleep(delay_seconds)
print(f'finished sleeping for {delay_seconds} second(s)')
return delay_seconds

async def hello_every_second():
for i in range(2):
await asyncio.sleep(1)
print("I'm running other code while I'm waiting!")

async def main():
first_delay = asyncio.create_task(delay(3))
second_delay = asyncio.create_task(delay(3))
await hello_every_second()
await first_delay
await second_delay

asyncio.run(main())

最终输出如下(约耗时 3 秒):

1
2
3
4
5
6
I'm running other code while I'm waiting!
sleeping for 3 second(s)
sleeping for 3 second(s)
I'm running other code while I'm waiting!
finished sleeping for 3 second(s)
finished sleeping for 3 second(s)

整体流程详解

Step 0. asyncio.run(main())

  • 自动创建事件循环;
  • main() 封装为一个 Task,注册进事件循环并开始执行。

Step 1. 执行 main()

  • 创建两个新任务并注册到事件循环队列中:

    1
    2
    first_delay = asyncio.create_task(delay(3))
    second_delay = asyncio.create_task(delay(3))
  • 此时,队列中已有三个任务(一个是 main() 本身,另两个是 delay 任务),但仍处于 main() 的执行流程中。


Step 2. 执行 await hello_every_second()

  • 开始运行 hello_every_second() 协程(注意,它不是独立任务,是 main() 的一部分);

  • 进入第一次循环,先打印:

    1
    I'm running other code while I'm waiting!
  • 遇到 await asyncio.sleep(1),协程挂起,控制权交还事件循环;

  • 此时,事件循环开始调度其它就绪任务。


Step 3. 调度两个 delay 任务(first_delay 和 second_delay)

  • 它们几乎同时被调度:

    1
    2
    sleeping for 3 second(s)
    sleeping for 3 second(s)
  • 然后遇到 await asyncio.sleep(3) 被挂起,等待 3 秒后唤醒。


Step 4. 一秒过去,恢复 hello_every_second() 的第二轮循环

  • 打印:

    1
    I'm running other code while I'm waiting!
  • 再次 await asyncio.sleep(1)main() 再次挂起。


Step 5. 又过一秒,hello_every_second() 执行完毕

  • main() 恢复,继续向下执行。

Step 6. await first_delay

  • 此时 first_delay 仍处于挂起状态(等待的 sleep 任务还未完成),所以 main() 再次挂起。

Step 7. 第 3 秒结束

  • delay() 协程的 asyncio.sleep(3) 完成;

  • 事件循环调度它们继续执行后半段,输出:

    1
    2
    finished sleeping for 3 second(s)
    finished sleeping for 3 second(s)
  • 它们分别返回值后,main() 中的 await first_delayawait second_delay 依次完成。


Step 8. 所有任务完成

  • 事件循环自动退出,程序结束。

🏁 写在最后

在本文中,我们从协程的定义、事件循环的启动方式、任务的调度机制,到 await 如何让出控制权,再到任务的状态变化,逐步揭开了 asyncio 的运行原理。通过一个个简洁的例子,我们厘清了:

  • 协程是什么,为什么要用 async def
  • 事件循环如何统一调度所有任务;
  • await 不只是“等待”,更是让出 CPU 主动权;
  • 为什么就算只 await 一个任务,其他任务也会运行;
  • 何为非阻塞协程,如何影响调度;
  • 套接字、I/O、多任务的背后,其实藏着操作系统的支持。

在协作式并发模型中,没有“魔法”,只有一层一层精妙的控制权交接。每次 await,都是一次“换挡”,从当前任务中抽身,把执行的机会交给事件循环,让其它任务得以运行。