asyncio 入门
asyncio 入门
并发模型
asyncio
是在 Python 3.4 中引入的,用来在单线程中实现并发的一种方式。 刚接触它时,总是用多线程的思路去理解,结果总是困惑重重。要真正理解它,先得搞清楚 Python 中的并发到底是怎么回事。由于 Python 的 GIL 限制,同一 Python 进程中,任意时刻只能有一个线程在执行 Python 字节码。因此,除了开多个进程,Python 中的并发通常是依赖 I/O 等待来“切换任务”的,而不是多个 CPU 核心同时跑 Python 代码。
总体上,常见的并发模型可以分为两类:
抢占式多任务:在这种模型中,由操作系统来决定任务之间何时切换。操作系统会把 CPU 时间切分成一个个时间片,周期性地暂停当前任务,把执行权交给其他任务。这种“切换”不需要应用程序显式配合,因此叫抢占式。抢占式多任务通常通过多线程和多进程来实现。
协作式多任务:在这种模型中,操作系统不会强行打断当前任务,而是由任务自己决定何时交出执行权。 换句话说,应用程序中的代码会显式写出“我现在遇到 IO 可以暂停一下了,让其他任务先执行”的逻辑。这种方式下,多个任务之间需要互相信任,都要主动在合适的时机“让出”执行权。
因此,可以这样理解:抢占式就像大家一起抢麦克风,谁说着说着被主持人(操作系统)打断了,麦克风就递给下一个人;协作式则像大家按顺序发言,谁觉得自己要翻笔记(I/O 等待)了,就主动把麦克风递给别人。两者的根本区别在于抢占式依赖操作系统调度,任务切换是被动的;协作式依赖应用程序显式切换,任务切换是主动的。
asyncio
正是基于协作式多任务模式实现并发的:当程序运行到一个可等待的时机(例如等待 I/O、网络请求或定时器)时,会在代码中显式使用 await
来让出执行权。此时,事件循环会调度其他任务运行;等这个等待结束后,再回到原任务继续执行。
asyncio 基本概念
我们先来看一个例子,感受一下 asyncio
的运行机制:
1 | import asyncio |
运行结果大致如下:
1 | I'm running other code while I'm waiting! |
可以看到:
first_delay
和second_delay
几乎同时启动,立即输出各自的 “sleeping…”。hello_every_second
在两次 1 秒的等待间隔中打印提示信息。- 大约 3 秒后,两个延迟任务几乎同时完成。
整个程序只耗时约 3 秒,而不是串行运行的 3 + 3 + 2 秒,这就是 asyncio
并发的效果。这篇博客的主要目的,就是通过对一系列 asyncio
基础概念的介绍和梳理,最终理解这个例子的运行流程和逻辑。
协程(Coroutine)
“计算机科学的任何问题,都可以通过增加一个中间层来解决。”
(Any problem in computer science can be solved by another level of indirection.)
—— David Wheeler
在 asyncio
的世界里,这个“中间层”就是协程。
普通的 Python 函数一旦调用,就会从头到尾执行完,中间无法被外部调度。要想把他们能够被 asyncio
调度,就需要在外面再包一层——也就是把函数变成协程函数。协程是一种特殊的函数——它能在执行过程中主动暂停(await
),把控制权交还给事件循环,让其他任务有机会运行,然后在合适的时候再恢复执行。相比于线程,协程拥有更高的执行效率。这是因为线程的切换需要操作系统来调度,涉及到更复杂的上下文切换,如保存和恢复寄存器、栈信息等。而协程切换只需要保存和恢复少量状态,开销更小。
在 Python 中,用 async
关键字定义的函数就是协程函数,调用它不会立即执行,而是返回一个协程对象(coroutine object):
1 | async def my_coroutine(): |
这个协程对象只是“待执行计划”,要想真正运行它,必须把它交给事件循环(event loop)调度。
事件循环
协程的运行
协程只是一个“可运行的对象”,它本身不会自动执行。要让协程真正运行起来,需要交给事件循环调度,而在 asyncio
里,这通常是通过把协程包装成任务(Task)并提交到事件循环中完成的。
1 | import asyncio |
这里我们通过 asyncio.new_event_loop()
创建了一个事件循环,我们通过 loop.create_task(main())
注册任务,通过 loop.run_until_complete(task_1)
来启动任务。用 Python 运行上述代码,我们会发现输出是:
1 | dalu! from task_1 |
两个任务都完成了。这个例子可以很好地帮助我们理解事件循环的工作逻辑:为什么 task_2
也打印了?
这是因为 run_until_complete(x)
的作用是驱动事件循环,直到 x
完成。在这段时间内,事件循环会正常调度所有已注册且就绪的任务,而不仅仅是我们传入的那个任务。由于 task_2
在同一个事件循环中注册,并且处于就绪状态,所以它也会在这个“运行周期”里被调度执行。上述代码的具体执行流程为:
创建
task_1
和task_2
,它们都登记在事件循环的任务队列中(但协程体还没开始执行)。调用
loop.run_until_complete(task_1)
,事件循环开始运转。事件循环从任务队列中取出就绪的任务依次执行:
- 首先调度到
task_1
:打印"dalu! from task_1"
,任务完成。 - 继续调度剩下的就绪任务
task_2
:打印"dalu! from task_2"
,任务完成。
- 首先调度到
此时
task_1
已完成,run_until_complete()
的退出条件满足,函数返回。由于事件循环会统一推进任务队列,所以即使只等待了
task_1
,其他就绪任务也会被执行。
所以事件循环的关键之处在于:它不会只关心我们等待的那个任务,而是在运行过程中,把所有处于就绪状态的任务都推进一轮。 实际上,即使我们把上例中 loop.run_until_complete()
的参数改成 task_2
,事件循环依然会先执行 task_1
,再执行 task_2
。原因是我们在启动事件循环之前,已经先把 task_1
注册到任务队列中,而 loop.run_until_complete()
的作用只是——启动事件循环,并在指定任务完成前一直运行。它本身并不会改变任务的调度顺序,顺序完全由事件循环的调度机制和任务注册的先后顺序决定。
事件循环的功能
从上面的例子可以看出,事件循环是整个异步运行时的“心脏”。在 asyncio
中,事件循环的职责包括:
- 维护任务队列(Task Queue)
- 调度执行任务:从队列中取出就绪任务运行
- 挂起等待 I/O:遇到阻塞 I/O(如
await asyncio.sleep()
)时,挂起任务并交由操作系统处理 - 唤醒任务:当 I/O 完成后,任务会被重新放回队列等待执行
- 循环往复,直到所有任务完成
可以把事件循环想象成一个高速旋转的“调度轮”,每次转动都会处理队列里的任务,只要它们已经准备好运行,就会被推进一步。
在实际开发中,如果不需要手动管理事件循环,可以直接用 asyncio.run()
来启动协程。它会帮我们自动创建一个新的事件循环,同时将协程封装为任务(Task)。之后,它会运行事件循环直到任务完成,并在最后自动关闭事件循环。
1 | import asyncio |
这种方式简单、安全,推荐在绝大多数情况下使用,除非我们有复杂的事件循环管理需求(如在已有的循环中动态添加任务)。在一般情况下,asyncio.run(main())
就是我们的 asyncio
应用程序的入口点。它只执行 main()
这个主协程,然后由该协程来启动应用程序的所有其他组件。
await
与异步调度
前面我们已经看过事件循环的基本运行方式。在没有 await
的情况下,事件循环的行为就像一个普通的任务队列:所有注册的任务(Task)会被按顺序运行,直到它们结束。但 asyncio
的真正威力在于——它通过协作式多任务,让任务在“可以等待的时候”主动交出执行权,从而在单线程内实现并发。这就是 await
的核心作用。
await X
表示暂停当前协程,并把执行权交还给事件循环,直到 X
完成,当前协程才会恢复。其中 X
必须是 awaitable 对象(如协程、asyncio.Future
、Task
等)。事件循环会在这段时间去调度运行其他处于就绪状态的任务,而不是傻等。
我们举一个最简单的例子:
1 | import asyncio |
在这个例子里:
asyncio.run(main())
会把main()
封装成一个 Task 并启动事件循环。main()
开始执行,到await add_one(1)
时:- 暂停
main
这个 Task,把执行权交回事件循环。 - 事件循环运行
add_one(1)
,它是阻塞式协程,因此直接执行完返回结果。 - 恢复
main
,把结果赋给one_plus_one
。
- 暂停
- 再遇到
await add_one(2)
,重复同样的过程。 - 最终打印
2
和3
。
这里 add_one
没有真正的异步等待,所以事件循环每次都是“马上”完成它。
让我们看看 await
遇到非阻塞任务时会发生什么:
1 | import asyncio |
在这个例子里:
- 事件循环启动,把
main()
作为一个 Task 加入队列。 main()
运行,创建了两个新任务:my_sleep
:内部是asyncio.sleep(3)
(非阻塞)my_print_task
:内部是time.sleep(1)
(阻塞)
- 执行到
await my_sleep
:- 暂停
main
,事件循环开始调度其他任务。
- 暂停
- 事件循环发现
my_sleep
要等 3 秒才能完成且是非阻塞的,因此启动它后暂时搁置它,先去运行my_print_task
。 - 问题来了:
my_print_task
用的是time.sleep(1)
,这是阻塞调用,事件循环无法切走,只能等它运行完。 my_print_task
结束后,事件循环回到my_sleep
,等待它的 3 秒计时器结束。- 计时器结束时,事件循环收到 I/O 完成的事件(底层是
Future
的回调机制),把my_sleep
标记为完成。 - 事件循环唤醒
main
,继续执行print('main() Task ends.')
,结束。
在这个两个例子中我们要记住几个关键性的结论,来帮助我们理解 await
:
main
本身就是一个 Task,它可以await
其他 Task。await X
会让当前 Task 挂起,并把执行权交还事件循环。- 事件循环会在当前 Task 等待时,去运行其他就绪任务。
- “这是怎么通知的?”——依赖
Future
的回调机制:当一个异步 I/O 完成时,事件循环会收到事件,标记对应的 Task 为可运行状态,并将它重新放回调度队列。 - 阻塞代码(如
time.sleep
)会卡住整个事件循环,破坏并发性。
任务状态
我们已经知道,事件循环在执行异步任务时,是以“任务(Task)”为单位进行调度的。那么它是如何知道哪些任务该执行、哪些正在等待、哪些已经完成或被取消的呢?这是因为,在 asyncio
中,每个任务都有明确的生命周期状态。事件循环就是通过这些状态,来高效管理和调度任务的。
stateDiagram-v2 [*] --> Created Created --> Scheduled: 被注册到事件循环 Scheduled --> Running: 被调度执行 Running --> Suspended: 遇到 await Suspended --> Scheduled: awaitable 完成,任务重新就绪 Running --> Done: 执行完成或抛出异常 Suspended --> Cancelled: 被取消 Scheduled --> Cancelled: 被取消 Cancelled --> Done
状态 | 含义 |
---|---|
Created | 协程被调用,创建了协程对象,还没被事件循环调度。 |
Scheduled | 通过 create_task() 注册到事件循环,进入等待调度的队列。 |
Running | 当前被事件循环调度执行。 |
Suspended | 执行中遇到 await ,任务被挂起,等待外部操作完成(如 I/O、sleep)。 |
Done | 协程执行完毕或抛出异常,任务完成。 |
Cancelled | 调用 .cancel() 主动取消了任务。未处理的 CancelledError 仍会进入 Done。 |
在我们这一节的第一个例子中:
1 | async def main() -> None: |
我们可能会以为 add_one(1)
、add_one(2)
是单独的任务,其实不是。实际上在这个例子里,main()
是事件循环调度的 唯一一个任务。 add_one(1)
和 add_one(2)
只是 main()
内部 await 的 协程对象,它们没有被注册为独立任务,因此不会被事件循环“并行调度”,而是串行执行的。
举个更直观的例子来理解:
1 | async def main(): |
这里 main()
是被事件循环调度的任务。虽然我们 await asyncio.sleep(1)
,但事件循环只关心 main()
:
- 一旦执行到
await asyncio.sleep(1)
,事件循环会暂停main()
并开始等待这个 sleep 的完成 - 一旦 sleep 完成,事件循环就会恢复
main()
的执行
所以即使 await asyncio.sleep(1)
是非阻塞的,但事件循环里没有其他就绪任务时,它就只能等待这个 sleep 完成,然后再回来继续 main()
。但是如果我们把这个例子稍作修改:
1 | import asyncio |
这个程序运行的流程是:
asyncio.run(main())
启动事件循环,运行main()
协程- 执行
print('main() Task starts.')
- 创建并注册
my_print_task
到事件循环,但此时没有调度它(只是登记) - 执行
await asyncio.sleep(5)
- 事件循环会将当前任务
main()
挂起(进入 Suspended 状态) - 然后为
sleep(5)
安排一个 5 秒的定时器 - 控制权立即返回事件循环,让它可以继续调度其它就绪任务
- 事件循环会将当前任务
- 事件循环看到
my_print_task
是就绪的,就调度它执行 - 执行
my_print()
中的time.sleep(5)
→ 整个事件循环被阻塞 5 秒 - 5 秒后,
my_print()
执行完毕,事件循环恢复运行 - 恰好这时
asyncio.sleep(5)
也“完成”了(本质是定时器 Future 到点) - 恢复
main()
执行,打印'main() Task ends.'
所以在这个例子里,await asyncio.sleep(5)
触发了事件循环开始调度任务,先调度到了 my_print()
,虽然它阻塞了事件循环 5 秒,但刚好 main()
的 sleep 也需要 5 秒,所以这两个任务“刚好并行”。
阻塞与非阻塞协程
在 asyncio
中,并不是所有协程都是“异步”的 —— 协程 ≠ 非阻塞任务。是否阻塞,取决于协程内部的操作是否会主动让出控制权。
一个非阻塞的协程,它的特征是:
- 会在运行中通过
await
调用其它awaitable
对象(如asyncio.sleep
,asyncio.open_connection
等) - 每当遇到
await
,它就把控制权交回给事件循环,允许其他任务被调度执行
比如:
1 | async def non_blocking_task(): |
像 asyncio.sleep()
就是一个模拟的非阻塞协程:它不做实际的 I/O 操作,而是通过内部设定一个定时器事件,挂起当前任务、释放控制权,直到定时器触发再继续执行。
而阻塞的协程,是指虽然形式上是协程(用 async def
定义),但它内部调用了同步阻塞代码,比如:
1 | async def blocking_task(): |
即便是 async def
包裹的函数,如果内部用了 time.sleep()
、文件 IO、网络请求等同步阻塞代码,它依然会阻塞整个线程,导致事件循环卡住、其它协程无法被调度运行。
套接字
套接字(socket)是网络通信的基础抽象,提供了在不同进程之间进行数据交换的能力。通常,我们使用它来构建客户端与服务器之间的连接:
- 发送数据:将字节数据写入套接字,由操作系统通过网络发送出去;
- 接收数据:等待远端服务器返回数据,从套接字中读取字节内容。
套接字支持多种协议,最常见的是基于 TCP 的流式连接(AF_INET + SOCK_STREAM
)。
阻塞与非阻塞 I/O
在默认模式下,Python 中的 socket 是阻塞式的。
这意味着当调用如下代码时:
1 | data = sock.recv(1024) |
如果没有数据可读,程序会一直卡在这行代码上,直到远端返回数据为止。在此期间,当前线程被完全阻塞,无法执行其他操作。
非阻塞 I/O 的模式中,socket 是非阻塞式的,操作系统在后台监视多个套接字,一旦其中一个就绪,再通知我们继续处理。不同操作系统为非阻塞 I/O 提供了不同的事件通知机制:
操作系统 | 底层通知机制 | 描述 |
---|---|---|
Linux | epoll |
高性能、边缘触发的事件通知机制 |
macOS | kqueue |
BSD 系统特有的事件机制 |
Windows | IOCP |
基于完成端口(Completion Port)的异步模型 |
这些系统调用允许我们注册多个非阻塞套接字,一旦某个 socket 可读/可写,操作系统就会通知我们,这样就不需要去反复轮询检查了。
asyncio
正是利用操作系统提供的这些底层 I/O 机制,来实现高效的非阻塞并发。
当我们写下如下代码:
1 | reader, writer = await asyncio.open_connection('example.com', 80) |
实际发生的事情:
asyncio.open_connection
内部创建了 socket,并设置为非阻塞;- 将该 socket 注册到事件循环;
- 当前协程
await
时被挂起,事件循环开始调度其他任务; - 一旦 socket 可读,操作系统(如 epoll)通知事件循环;
- 事件循环恢复该协程,读取数据并继续往下执行。
最初的例子
到博客的这里,我们就可以理解最开始的例子的执行流程和逻辑了。
1 | import asyncio |
最终输出如下(约耗时 3 秒):
1 | I'm running other code while I'm waiting! |
整体流程详解
Step 0. asyncio.run(main())
- 自动创建事件循环;
- 将
main()
封装为一个 Task,注册进事件循环并开始执行。
Step 1. 执行 main()
创建两个新任务并注册到事件循环队列中:
1
2first_delay = asyncio.create_task(delay(3))
second_delay = asyncio.create_task(delay(3))此时,队列中已有三个任务(一个是
main()
本身,另两个是 delay 任务),但仍处于main()
的执行流程中。
Step 2. 执行 await hello_every_second()
开始运行
hello_every_second()
协程(注意,它不是独立任务,是main()
的一部分);进入第一次循环,先打印:
1
I'm running other code while I'm waiting!
遇到
await asyncio.sleep(1)
,协程挂起,控制权交还事件循环;此时,事件循环开始调度其它就绪任务。
Step 3. 调度两个 delay 任务(first_delay 和 second_delay)
它们几乎同时被调度:
1
2sleeping for 3 second(s)
sleeping for 3 second(s)然后遇到
await asyncio.sleep(3)
被挂起,等待 3 秒后唤醒。
Step 4. 一秒过去,恢复 hello_every_second()
的第二轮循环
打印:
1
I'm running other code while I'm waiting!
再次
await asyncio.sleep(1)
,main()
再次挂起。
Step 5. 又过一秒,hello_every_second()
执行完毕
main()
恢复,继续向下执行。
Step 6. await first_delay
- 此时 first_delay 仍处于挂起状态(等待的 sleep 任务还未完成),所以
main()
再次挂起。
Step 7. 第 3 秒结束
delay()
协程的asyncio.sleep(3)
完成;事件循环调度它们继续执行后半段,输出:
1
2finished sleeping for 3 second(s)
finished sleeping for 3 second(s)它们分别返回值后,
main()
中的await first_delay
和await second_delay
依次完成。
Step 8. 所有任务完成
- 事件循环自动退出,程序结束。
🏁 写在最后
在本文中,我们从协程的定义、事件循环的启动方式、任务的调度机制,到 await
如何让出控制权,再到任务的状态变化,逐步揭开了 asyncio
的运行原理。通过一个个简洁的例子,我们厘清了:
- 协程是什么,为什么要用
async def
; - 事件循环如何统一调度所有任务;
await
不只是“等待”,更是让出 CPU 主动权;- 为什么就算只
await
一个任务,其他任务也会运行; - 何为非阻塞协程,如何影响调度;
- 套接字、I/O、多任务的背后,其实藏着操作系统的支持。
在协作式并发模型中,没有“魔法”,只有一层一层精妙的控制权交接。每次 await
,都是一次“换挡”,从当前任务中抽身,把执行的机会交给事件循环,让其它任务得以运行。