Web 后台任务管理
Web 后台任务管理
在 Web 开发中,经常会遇到这样的场景:用户点击一个按钮,背后需要执行一个耗时几秒甚至几分钟的操作,比如生成复杂报表、处理上传的视频、调用一个缓慢的第三方 API,或者运行数值求解器做大规模仿真。解决这类问题的核心思路是将耗时操作交给后台异步执行,让 Web 服务快速响应用户请求。为了达到这个目的,可以有多种方案,比如单线程同步执行、子进程异步执行、或者引入消息队列,通过中间件协调 Web 服务和后台任务进程等。这篇文章结合三个案例,从最简单的串行执行方式到基于 Redis + RQ 的任务队列,初步梳理后台任务管理的常见思路。这个文章暂时不涉及守护进程配置、任务结果持久化和后续任务控制等方面的内容,这些内容在后续的文章中整理。
项目准备
我们将构建一个极简的 Flask 应用,它有两个接口: * /task
: 触发一个模拟的耗时任务。 * /health
: 一个能立即返回的健康检查接口,用来检测我们的服务器是否“活着”。
我们创建一个简单的shell脚本来模拟一个需要10秒钟才能完成的任务。
long_task.sh
:
1 |
|
在 Shell 脚本中,$$
表示当前执行脚本或 Shell 的 PID(Process ID)。别忘了给它执行权限:chmod +x long_task.sh
案例一:同步阻塞
Werkzeug 实现
我们直接在API里调用任务并等待它完成。
1 | # case1_app.py |
在 Flask 开发服务器(Werkzeug)中,默认是使用多线程模式的。也就是说,即使某个请求处理过程中发生了阻塞(比如执行一个长时间运行的子进程),也不会阻塞整个服务器的进程,其他请求依然可以由其他线程并发处理,不会受到影响。例如,下面这个命令启动了一个默认的 Flask 开发服务器(开启了多线程):
1 | flask --app case1_app run --host=0.0.0.0 --port=9090 |
此时访问 /task
会同步执行脚本,但仍然可以同时访问 /health
获取立即响应。
为了演示阻塞对整个服务器的影响,我们可以强制关闭多线程模式,将服务器改成串行运行:
1 | flask --app case1_app run --host=0.0.0.0 --port=9090 --without-threads |
此时,在浏览器或 curl 中访问 http://127.0.0.1:9090/task
,会发现这个请求“卡住”了10秒钟才返回结果。在卡住的这10秒内,如果访问 http://127.0.0.1:9090/health
,会发现接口没有响应,直到 /task
执行完成,/health
才会返回 "OK"
。这说明所有请求都被串行处理了,整个服务被阻塞。
1 | 收到创建任务请求,开始同步执行... |
虽然 Flask 内置的开发服务器默认支持多线程,可以并发处理请求,但它并不适用于生产环境。原因如下:
- ❌ 缺乏进程隔离机制:所有请求都在同一个进程中处理,一个请求的异常可能导致整个服务崩溃。
- ❌ 性能有限:无法利用多核 CPU,处理高并发请求能力较弱。
- ❌ 功能缺失:没有连接管理、请求超时、守护进程、负载调节等必要的生产级特性。
- ❌ 稳定性不足:没有故障自愈机制,容易被单点故障拖垮整个服务。
因此,在生产环境中,我们应使用专业的 WSGI 服务器,如 Gunicorn、uWSGI 等。
Gunicorn 服务器
Gunicorn(Green Unicorn)是一个用于 Unix 的 Python WSGI HTTP 服务器,适合部署 Flask、Django 等 Web 应用。它采用 Pre-fork(预派生)模型,具备优秀的稳定性和可扩展性。
Pre-fork 模型
Pre-fork 是一种并发处理模型,其核心思想是由一个主进程预先创建多个子进程(Worker),这些子进程共享端口并独立处理请求。生命周期如下:
- 主进程启动:负责初始化、监听端口、管理 Worker。
- 预创建 Worker 子进程:主进程 fork 多个 Worker,每个都是主进程的副本,拥有独立的内存空间。
- 等待请求:Worker 保持空闲状态,等待接收主进程分发的客户端请求。
- 处理请求:有请求时,主进程将其分配给某个空闲 Worker,由它负责完整处理。
- 复用 Worker:Worker 处理完请求后不会退出,而是继续等待新的请求。
- 动态调节:主进程可根据负载情况动态增加或减少 Worker 数量。
Gunicorn 启动示例
1 | gunicorn app:app -w 2 -b 0.0.0.0:9090 --timeout 3 |
含义说明:
app:app
:Flask 应用实例(模块名:变量名)-w 2
:启动 2 个 Worker 进程(不含主进程)-b
:绑定地址和端口--timeout 3
:设置 Worker 的超时时间为 3 秒
启动日志示例:
1 | [2025-08-07 11:12:31 +0800] [28895] [INFO] Starting gunicorn 21.2.0 |
这里 28895
是 master 进程,28897
和 28898
是两个 Worker 进程。
自动重启 Worker:超时保护机制
当某个 Worker 卡死或超时,Gunicorn 会自动杀死并重启它。例如:
1 | [2025-08-07 11:12:37 +0800] [28895] [CRITICAL] WORKER TIMEOUT (pid:28898) |
我们可以通过 --timeout <seconds>
来设置 Worker 的空闲超时时间。如果一个 Worker 在处理请求时,两次 I/O 间隔超过该值,Gunicorn 认为它“失联”,会将其终止并启动一个新的 Worker 进程。注意,这个 timeout 是基于 I/O 活跃性,而不是“总请求时间”。 举个例子,如果一个请求需要 10 分钟处理,但期间一直有网络/文件 I/O,则不会超时。常见导致超时的场景主要包括:
- Worker 执行了长时间的 纯 CPU 运算,没有任何 I/O
- 调用外部服务(数据库、API)发生了阻塞,无响应
- 死循环或逻辑错误,导致 Worker 无法返回
对比总结
项目 | Flask 开发服务器(Werkzeug) | Gunicorn(生产服务器) |
---|---|---|
目标用途 | 开发调试 | 生产部署 |
并发模型 | 单进程 + 多线程(默认) | 多进程(Pre-fork) |
多核利用 | ❌ | ✅ |
稳定性和容错性 | 差 | 高 |
进程隔离 | 无 | 有 |
超时与恢复机制 | 无 | 有(自动重启 Worker) |
性能与扩展性 | 较弱 | 强 |
Gunicorn 实现
我们把上面的例子切换到 Gunicorn (生产环境) 中运行:
1 | # 使用2个worker进程来启动 |
我们同时打开2个终端,都去请求 /task
。我们发现这两个请求会分别被两个Worker进程处理,它们都会卡住10秒。在这两个请求还在处理时,立即打开第3个终端,访问 /health
。结果我们会发现,健康检查请求同样被卡住!因为它在等待一个空闲的Worker,但所有Worker都在忙。
1 | $ gunicorn --workers 2 --bind 0.0.0.0:9090 case1_app:app |
更糟的情况是 Gunicorn有 --timeout
机制(默认30秒)。如果我们的任务耗时过长,Gunicorn 的 Master 进程会认为那个 Worker 卡死了,然后会强行杀死它。用户会收到一个 502 Bad Gateway
错误,而任务可能只执行了一半。
1 | $ gunicorn --timeout 5 --workers 2 --bind 0.0.0.0:9090 case1_app:app |
案例一总结:同步执行长任务,无论在开发还是生产环境,都会轻易地阻塞服务,导致服务在一段时间内完全不可用,这是不可接受的。
案例二:异步子进程
既然同步执行会阻塞请求线程,我们不妨让任务在后台运行,这样 Flask 就可以立刻返回响应,而不必等待脚本执行完毕。
1 | # case2_app.py |
Werkzeug 实现
运行 flask --app case2_app run --host=0.0.0.0 --port=9090
。
此时,访问 /task
会立刻返回成功信息,同时访问 /health
也毫无压力。
1 | 收到创建任务请求,将任务放入后台... |
不过虽然通过 subprocess.Popen()
实现了非阻塞调用,表面上看任务已经成功后台执行,但本质上这种方式仍然非常脆弱,存在多个关键性问题:
问题 | 描述 |
---|---|
❌ 无任务追踪能力 | Flask 不知道任务是否成功、失败,无法返回任务状态 |
❌ 无日志记录 | 子进程输出没有保存,出错也不会被发现 |
❌ 受 Flask 生命周期影响 | 按下 Ctrl+C 停止服务时,后台子进程也会被杀掉 |
❌ 资源不可控 | 并发请求可能产生大量子进程,容易导致资源耗尽 |
❌ 无持久性或任务管理 | 无法重新尝试失败任务,无法查询执行历史 |
当然,我们可以开发一个简单的任务管理系统,来弥补这些短板,比如使用 SQLite 数据库来存储任务状态(状态、命令、结果、时间等),子进程执行完后更新数据库中的状态和输出。可以提供 API 接口,让用户查询任务执行结果和任务日志等。
Gunicorn 实现
运行 gunicorn --workers 2 --timeout 3 --bind 0.0.0.0:9090 case2_app:app
。
整体和Werkzeug里一样,一切看起来都很快、很正常。
1 | $ gunicorn --workers 2 --timeout 3 --bind 0.0.0.0:9090 case2_app:app |
但是,在 Gunicorn 中使用 subprocess.Popen()
启动子进程,实际上是一件很危险的事情:
- Flask 应用运行在 Gunicorn 的 worker 进程中;
- 用
subprocess.Popen()
启动的子进程,其 parent PID(PPID)就是这个 worker; - 如果该 worker 被 kill(timeout、崩溃、重启、升级等),子进程仍在运行,但已无人管理;
- 操作系统会将它交给系统的根进程
PID 1
管理(变成孤儿进程); - 我们将无法感知它的状态、无法终止它、也无法记录其执行结果
每发生一次 Worker 重启,就可能在服务器上留下一个或者多个这样的幽灵进程。日积月累,可能会最终导致整个服务器崩溃。
我们可以用一个例子来复现上述提到的过程:
1 | # case2_crush.py |
我们只启动一个 Worker 进程,并且设置超时为 5s:
1 | gunicorn -w 1 -b 0.0.0.0:9090 --timeout 5 case2_crush:app |
我们先访问 /task
,启动后台子进程。
1 | gunicorn -w 1 -b 0.0.0.0:9090 --timeout 5 case2_crush:app |
然后可以在终端中用以下命令来查看该子进程:
1 | ps -ef | grep sleep |
ps
是 process status 的缩写,-e
或者 -a
是系统中所有用户的所有进程(--everyone
),-f
是显示完整格式,包括 UID、PID、PPID、CMD 等等。这里 sleep 300
子进程的 PPID 就是 Gunicorn Worker 的 PID:
1 | shen 17724 17636 0 16:34 pts/22 00:00:00 sleep 300 |
接下来,我们访问 /block
,杀死这个 Worker 进程:
1 | 开始执行阻塞请求,模拟卡死... |
我们发现,这个 sleep 300
的进程并没有消失,而是被 PID 1
接管了:
1 | shen 17724 1 0 16:34 pts/22 00:00:00 sleep 300 |
案例二总结:使用“发射后不管”的异步子进程,比同步阻塞更危险。因为子进程很可能脱离服务器的掌控,问题被隐藏起来,直到最终服务器崩溃时才暴露出来。
案例三:消息队列
中间件和消息队列
“计算机科学的任何问题,都可以通过增加一个中间层来解决。”
(Any problem in computer science can be solved by another level of indirection.)
—— David Wheeler
中间件(Middleware)正是这种“中间层”思想的典型体现。想象一个场景:
- 有一个讲中文的演讲者(系统 A)
- 面对一群只懂英文的听众(系统 B)
要让他们顺利交流,有两个选择:
- 演讲者去学英文,或者让听众都学中文 —— 这会让系统高度耦合,一旦角色变化,整个结构就要重写;
- 引入一个会中英文的同声传译 —— 他在中间完成沟通解码,从而实现两端的解耦。
在 Web 后台任务的开发中,这种中间层也无处不在:
- Gunicorn Web 服务(演讲者):负责接收 HTTP 请求,处理业务逻辑,但不适合执行耗时的后台任务。
- 后台仿真进程(听众):专注计算,但不需要知道谁发起了请求,也不关心用户状态。
- Redis + Redis Queue(RQ)(中间层 / 翻译者):承担任务传递、排队、状态维护的职责。Gunicorn 把任务信息传给 Redis,后台 worker 从 Redis 中读取并执行。
RQ 是一个使用 Redis 作为消息队列(Message Queue,MQ)的 Python 库,它使用 Redis 来跟踪队列中需要执行的任务。MQ 是一种典型的中间件形式,它的本质就是在生产者(Producer)和消费者(Consumer)之间建立一个中间缓冲区(队列),实现解耦、异步和削峰。在我们的案例中:
- 生产者:Web 请求处理函数,收到用户请求后,立即将任务推入队列。
- 消费者:后台 worker 进程,独立运行,监听并处理队列中的任务。
- 消息队列:Redis 中的一个 List 结构,承担任务传递的角色。
这种设计有三个核心优势:
- 解耦:前端不再直接调用后端逻辑,只是丢一个“请求”进队列。
- 异步:请求响应速度快,不用等待任务完成。
- 削峰:即使同时有大量请求,队列可以缓存它们,逐步处理。
安装 Redis 和 RQ 很简单:
1 | sudo yum -y install redis |
Redis:高性能的中间通信组件
Redis 的全称是 Remote Dictionary Server,即“远程字典服务”。这个名字其实已经暗示了它的本质:它是一个可以通过网络远程访问的、内存中的键值对数据库。
从开发者的角度来看,它的行为很像 Python 中的 dict
:
1 | # Python 中的本地字典 |
而 Redis 提供了一个“全局的字典”,它可以被多个进程、多个主机、多个系统共享访问:
1 | # Redis 命令行 |
相比于传统的数据库,Redis 有以下几个显著特点:
特性 | 描述 |
---|---|
内存存储 | 所有数据都存放在内存中,读取速度极快(微秒级),适用于高并发场景 |
多种数据结构 | 不仅有 String,还包括 List、Set、Hash、Sorted Set 等复杂结构 |
轻量级通信 | 通过 TCP 使用简单的文本协议(RESP),即使是脚本语言也能轻松接入 |
持久化机制 | 虽然运行在内存中,但支持 RDB(快照)和 AOF(追加日志)两种持久化方式 |
多功能角色 | 既可以做缓存,也可以做消息队列、中间件、分布式锁、计数器、排行榜等 |
在我们的后台任务调度中,Redis 被用作 消息中转站。比如 RQ 框架中,它会:
- 将任务序列化成字符串,并存入 Redis 中的一个
List
(列表结构) - RQ 的 Worker 会不断监听这个列表,一旦有任务加入,就弹出执行
- 执行结果、状态等信息也会临时存储在 Redis 中,供客户端查询
这种模式天然适合分布式架构,因为 Redis 的 List 是线程安全的、支持原子性操作的,非常适合做“先进先出”(FIFO)的队列。
Redis 常用数据类型包括:
类型 | 作用举例 |
---|---|
String | 基本的键值对(可存储字符串、数字等) |
List | 消息队列(FIFO 弹出/插入任务) |
Hash | 类似字典结构,适合存储对象 |
Set | 去重集合,如“在线用户集合” |
Sorted Set | 排行榜/任务优先级队列 |
Redis 默认监听在 6379
端口,在各类语言中,我们都可以很简单地操纵 Redis:
1 | from redis import Redis |
这就是最原始的“任务队列”模型:lpush
添加任务,rpop
取出任务。
RQ:基于 Redis 的任务队列框架
基本图像
RQ 是一个基于 Redis 的轻量级任务队列框架,RQ 系统由三个核心角色组成:
组件 | 作用 |
---|---|
Producer(生产者) | 通常是 Web 应用,调用 enqueue() 将任务提交到 Redis 队列中 |
Redis | 用作中间件,缓存任务队列、任务状态、执行结果 |
Worker(消费者) | 独立运行的进程,监听 Redis 中的任务队列,并执行其中的任务函数 |
使用 RQ 一般分成以下几个步骤,首先我们需要定义任务函数:
1 | # tasks.py |
接下来,我们就可以提交任务:
1 | # run.py |
执行 python run.py
后,我们会得到类似于下方所示的输出:
1 | 任务已提交到 my_tasks 队列,任务 ID 为 9e4ce28a-604b-45a4-aff6-efd4ea7325f7 |
接下来在当前目录运行:
1 | rq worker my_tasks |
这会启动一个 worker,监听名为 my_tasks
的队列,并处理任务。
1 | $ rq worker my_tasks |
我们可以看到,RQ 在启动后,立即连接到 Redis,发现了 my_tasks
队列中存在挤压的任务,然后马上开始一个一个地取出并执行。当任务执行完了后,RQ Worker 会继续监听这个队列,并实时从中弹出任务、执行并更新状态。
数据结构
RQ 利用了 Redis 原生的、高性能的数据结构,主要是 Lists 和 Hashes。
A. 任务队列本身:一个 Redis List
每个 RQ 队列都对应 Redis 中的一个 List 数据结构。这个 List 里存放的是等待被执行的 Job ID。
- Key 的命名规则:
rq:queue:<queue_name>
- 例如:对于我们之前创建的 my_tasks 队列,它的 Key 就是
rq:queue:my_tasks
- 如何查看:我们可以通过
redis-cli
进入 Redis 命令行工具,使用 LRANGE 命令 (List RANGE):
1 | # 查看 my_tasks 队列中所有等待的 Job ID |
0 -1 的意思是“从第一个元素到最后一个元素”。如果队列里有任务,我们会看到一个 Job ID 列表。注意,这个队列只存储还未被处理的任务(即排队等待中的任务)。执行完毕后,该任务会从队列列表中移除。
B. 每个任务的详细信息:一个 Redis Hash
每个被推入队列的任务(Job),其所有的详细信息都被存储在一个 Hash 数据结构中。Hash 就像一个键值对字典。
- Key 的命名规则:
rq:job:<job_id>
- 例如:如果我们有一个 Job ID 是
a1b2c3d4-e5f6-....
,它的 Key 就是rq:job:a1b2c3d4-e5f6-....
- 如何查看: 使用 HGETALL 命令 (Hash GET ALL):
1 | # 查看某个特定 Job ID 的所有信息 |
执行后,我们会看到非常丰富的任务信息,包括:
- data: 序列化后的函数调用信息,看起来像 (
b'\x80\x04\x95...\x8c\x05tasks\x94\x8c\x11long_running_task\x94...'
。这里面包含了要调用的函数名、参数等。 - status: 任务的当前状态,比如 queued, started, finished, 或 failed。
- created_at: 创建时间。
- enqueued_at: 入队时间。
- ended_at: 结束时间(如果已完成)。
- result: 任务成功后的返回值(如果已完成)。
- exc_info: 任务失败后的异常堆栈信息(如果失败了)。
- origin: 它来自哪个队列,比如 my_tasks。
默认情况下,RQ 会保留 job 500 秒,这个可以通过启动 RQ 时的 result_ttl
参数来设定。在这段时间内,我们可以通过 job ID 查询它的状态、返回值等。 超时后,RQ 就会把这些储存在 Redis 中的任务都清理掉。
C. 任务状态存储
所以整个流程是,在 Web 中,调用 queue.enqueue(my_task, "参数")
RQ 会做两件事:
- 把任务信息序列化,写入 Redis Hash,如
rq:job:<job_id>
- 把 job_id 加入队列列表
rq:queue:my_tasks
Worker 启动时,会从 rq:queue:my_tasks
中 rpop()
一个任务 ID,并:
- 从对应
rq:job:<job_id>
中加载任务详情 - 执行任务
- 更新任务状态:比如
finished
、failed
- 结果仍然存在
rq:job:<job_id>
中
执行完后,该任务 会从队列列表中移除,不再存在于 rq:queue:my_tasks
中。这些任务的信息存在什么地方呢?还有其他键值:
键名 | 作用 | 是否定期清理 |
---|---|---|
rq:queue:<name> |
等待中的任务队列(List) | ❌ 只有任务取走才会移除 |
rq:job:<job_id> |
任务详情(函数名、参数、状态、结果) | ✅ 按 result_ttl 清理 |
rq:finished |
已完成任务的 ID 列表(Registry 类型) | ✅ 按 result_ttl 清理 |
rq:failed |
失败任务的 ID 列表(Registry 类型) | ✅ 按 failure_ttl 清理 |
rq:workers |
Worker 列表(活跃 worker 的心跳记录) | ✅ 按 worker_ttl 清理 |
消息队列实现
我们把任务逻辑单独放到一个文件里:
1 | import time |
这是因为 RQ 在执行任务时,并不是调用当前运行在 Flask 进程中的函数,而是用 Worker 进程重新导入任务函数。比如当我们调用
1 | job = q.enqueue(long_running_task, param) |
RQ 会把函数的导入路径(模块名+函数名,比如 tasks.long_running_task
)和参数序列化后写进 Redis。Worker 在另一端取到任务时,会用 importlib
根据模块路径重新导入这个模块,再调用函数执行。因此,如果任务函数定义在了 Flask 主文件 app.py
里,而在 app.py
里又导入了队列的逻辑,这时 Worker 在导入 app
模块时会触发一堆和 Web 服务相关的初始化代码(比如启动 Flask、连接数据库、加载蓝图等),还可能出现循环导入或者环境不一致等问题。
1 | from flask import Flask |
使用 Gunicorn 启动Web应用:gunicorn --workers 2 --bind 0.0.0.0:9090 case3_app:app
;
同时在目录中执行:rq worker my_tasks
。
访问 http://localhost:8000/task
,我们可以发现,
- Web服务器终端:Gunicorn立刻打印了"收到创建任务请求...",并且API请求瞬间返回了成功信息和一个job_id。
- RQ Worker终端:几乎在同时,
rq worker
打印出 "后台任务开始执行...",等待10秒后,又打印出 "任务成功完成!"。 - 健康检查:在任务执行的10秒内,
/health
接口始终是可用的。
案例三总结:这是生产环境处理后台任务的比较好的方式:
- 解耦:Web服务器和任务执行器分离,互不影响。Web服务器的崩溃不会影响正在执行的任务,反之亦然。
- 高可用:Web服务器始终保持响应,不会因为后台任务而被阻塞。
- 可扩展:如果任务太多处理不过来,只需要多开几个
rq worker
进程,而无需改动Web服务器。 - 可观测:RQ提供了工具来查看任务状态、失败的任务等,管理起来非常方便。
当然,这里还并没有涉及 systemd 配置和任务持久化等的相关处理,后面的文章再进行总结。
总结
通过这三个案例,我们可以总结出一个架构原则:
不要在 Web 请求–响应周期内执行耗时且不可预测的任务。
Web 服务器的职责,就像一位高效的接待员——它的使命是快速、准确地接收并转交请求,而不是亲自去后厨炒一盘要花二十分钟的菜。任何可能拖慢响应的工作,都应该通过消息队列交给独立的后台任务执行系统去处理。这种设计可以让网站在高并发下依然保持轻快响应,还能让任务执行逻辑与 Web 层彻底解耦,形成一个健壮、可扩展、易维护的架构基础。