Web 后台任务管理

Web 后台任务管理

在 Web 开发中,经常会遇到这样的场景:用户点击一个按钮,背后需要执行一个耗时几秒甚至几分钟的操作,比如生成复杂报表、处理上传的视频、调用一个缓慢的第三方 API,或者运行数值求解器做大规模仿真。解决这类问题的核心思路是将耗时操作交给后台异步执行,让 Web 服务快速响应用户请求。为了达到这个目的,可以有多种方案,比如单线程同步执行、子进程异步执行、或者引入消息队列,通过中间件协调 Web 服务和后台任务进程等。这篇文章结合三个案例,从最简单的串行执行方式到基于 Redis + RQ 的任务队列,初步梳理后台任务管理的常见思路。这个文章暂时不涉及守护进程配置、任务结果持久化和后续任务控制等方面的内容,这些内容在后续的文章中整理。

项目准备

我们将构建一个极简的 Flask 应用,它有两个接口: * /task: 触发一个模拟的耗时任务。 * /health: 一个能立即返回的健康检查接口,用来检测我们的服务器是否“活着”。

我们创建一个简单的shell脚本来模拟一个需要10秒钟才能完成的任务。

long_task.sh:

1
2
3
4
#!/bin/bash
echo "后台任务开始... (PID: $$)"
sleep 10
echo "后台任务完成!"

在 Shell 脚本中,$$ 表示当前执行脚本或 Shell 的 PID(Process ID)。别忘了给它执行权限:chmod +x long_task.sh

案例一:同步阻塞

Werkzeug 实现

我们直接在API里调用任务并等待它完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# case1_app.py

import subprocess
from flask import Flask

app = Flask(__name__)

@app.route('/task')
def create_task():
print("收到创建任务请求,开始同步执行...")
# subprocess.run 是阻塞的
result = subprocess.run(['./long_task.sh'], capture_output=True, text=True)
print("同步任务执行完成。")
return {
"message": "任务已同步完成",
"output": result.stdout
}

@app.route('/health')
def health_check():
return "OK"

在 Flask 开发服务器(Werkzeug)中,默认是使用多线程模式的。也就是说,即使某个请求处理过程中发生了阻塞(比如执行一个长时间运行的子进程),也不会阻塞整个服务器的进程,其他请求依然可以由其他线程并发处理,不会受到影响。例如,下面这个命令启动了一个默认的 Flask 开发服务器(开启了多线程):

1
flask --app case1_app run --host=0.0.0.0 --port=9090

此时访问 /task 会同步执行脚本,但仍然可以同时访问 /health 获取立即响应。

为了演示阻塞对整个服务器的影响,我们可以强制关闭多线程模式,将服务器改成串行运行:

1
flask --app case1_app run --host=0.0.0.0 --port=9090 --without-threads

此时,在浏览器或 curl 中访问 http://127.0.0.1:9090/task,会发现这个请求“卡住”了10秒钟才返回结果。在卡住的这10秒内,如果访问 http://127.0.0.1:9090/health,会发现接口没有响应,直到 /task 执行完成,/health 才会返回 "OK"。这说明所有请求都被串行处理了,整个服务被阻塞。

1
2
3
4
收到创建任务请求,开始同步执行...
同步任务执行完成。
127.0.0.1 - - [07/Aug/2025 14:19:43] "GET /task HTTP/1.1" 200 -
127.0.0.1 - - [07/Aug/2025 14:19:43] "GET /health HTTP/1.1" 200 -

虽然 Flask 内置的开发服务器默认支持多线程,可以并发处理请求,但它并不适用于生产环境。原因如下:

  • 缺乏进程隔离机制:所有请求都在同一个进程中处理,一个请求的异常可能导致整个服务崩溃。
  • 性能有限:无法利用多核 CPU,处理高并发请求能力较弱。
  • 功能缺失:没有连接管理、请求超时、守护进程、负载调节等必要的生产级特性。
  • 稳定性不足:没有故障自愈机制,容易被单点故障拖垮整个服务。

因此,在生产环境中,我们应使用专业的 WSGI 服务器,如 Gunicorn、uWSGI 等。

Gunicorn 服务器

Gunicorn(Green Unicorn)是一个用于 Unix 的 Python WSGI HTTP 服务器,适合部署 Flask、Django 等 Web 应用。它采用 Pre-fork(预派生)模型,具备优秀的稳定性和可扩展性。

Pre-fork 模型

Pre-fork 是一种并发处理模型,其核心思想是由一个主进程预先创建多个子进程(Worker),这些子进程共享端口并独立处理请求。生命周期如下:

  1. 主进程启动:负责初始化、监听端口、管理 Worker。
  2. 预创建 Worker 子进程:主进程 fork 多个 Worker,每个都是主进程的副本,拥有独立的内存空间。
  3. 等待请求:Worker 保持空闲状态,等待接收主进程分发的客户端请求。
  4. 处理请求:有请求时,主进程将其分配给某个空闲 Worker,由它负责完整处理。
  5. 复用 Worker:Worker 处理完请求后不会退出,而是继续等待新的请求。
  6. 动态调节:主进程可根据负载情况动态增加或减少 Worker 数量。

Gunicorn 启动示例

1
gunicorn app:app -w 2 -b 0.0.0.0:9090 --timeout 3

含义说明:

  • app:app:Flask 应用实例(模块名:变量名)
  • -w 2:启动 2 个 Worker 进程(不含主进程)
  • -b:绑定地址和端口
  • --timeout 3:设置 Worker 的超时时间为 3 秒

启动日志示例:

1
2
3
4
5
[2025-08-07 11:12:31 +0800] [28895] [INFO] Starting gunicorn 21.2.0
[2025-08-07 11:12:31 +0800] [28895] [INFO] Listening at: http://0.0.0.0:9090 (28895)
[2025-08-07 11:12:31 +0800] [28895] [INFO] Using worker: sync
[2025-08-07 11:12:31 +0800] [28897] [INFO] Booting worker with pid: 28897
[2025-08-07 11:12:31 +0800] [28898] [INFO] Booting worker with pid: 28898

这里 28895 是 master 进程,2889728898 是两个 Worker 进程。

自动重启 Worker:超时保护机制

当某个 Worker 卡死或超时,Gunicorn 会自动杀死并重启它。例如:

1
2
3
4
[2025-08-07 11:12:37 +0800] [28895] [CRITICAL] WORKER TIMEOUT (pid:28898)
[2025-08-07 11:12:37 +0800] [28898] [INFO] Worker exiting (pid: 28898)
[2025-08-07 11:12:37 +0800] [28895] [ERROR] Worker (pid:28898) exited with code 1
[2025-08-07 11:12:37 +0800] [28966] [INFO] Booting worker with pid: 28966

我们可以通过 --timeout <seconds> 来设置 Worker 的空闲超时时间。如果一个 Worker 在处理请求时,两次 I/O 间隔超过该值,Gunicorn 认为它“失联”,会将其终止并启动一个新的 Worker 进程。注意,这个 timeout 是基于 I/O 活跃性,而不是“总请求时间”。 举个例子,如果一个请求需要 10 分钟处理,但期间一直有网络/文件 I/O,则不会超时。常见导致超时的场景主要包括:

  • Worker 执行了长时间的 纯 CPU 运算,没有任何 I/O
  • 调用外部服务(数据库、API)发生了阻塞,无响应
  • 死循环或逻辑错误,导致 Worker 无法返回

对比总结

项目 Flask 开发服务器(Werkzeug) Gunicorn(生产服务器)
目标用途 开发调试 生产部署
并发模型 单进程 + 多线程(默认) 多进程(Pre-fork)
多核利用
稳定性和容错性
进程隔离
超时与恢复机制 有(自动重启 Worker)
性能与扩展性 较弱

Gunicorn 实现

我们把上面的例子切换到 Gunicorn (生产环境) 中运行:

1
2
# 使用2个worker进程来启动
gunicorn --workers 2 --bind 0.0.0.0:9090 case1_app:app

我们同时打开2个终端,都去请求 /task。我们发现这两个请求会分别被两个Worker进程处理,它们都会卡住10秒。在这两个请求还在处理时,立即打开第3个终端,访问 /health。结果我们会发现,健康检查请求同样被卡住!因为它在等待一个空闲的Worker,但所有Worker都在忙。

1
2
3
4
5
6
7
8
9
10
11
$ gunicorn --workers 2 --bind 0.0.0.0:9090 case1_app:app
[2025-08-07 14:40:04 +0800] [19410] [INFO] Starting gunicorn 21.2.0
[2025-08-07 14:40:04 +0800] [19410] [INFO] Listening at: http://0.0.0.0:9090 (19410)
[2025-08-07 14:40:04 +0800] [19410] [INFO] Using worker: sync
[2025-08-07 14:40:04 +0800] [19411] [INFO] Booting worker with pid: 19411
[2025-08-07 14:40:04 +0800] [19412] [INFO] Booting worker with pid: 19412
收到创建任务请求,开始同步执行...
收到创建任务请求,开始同步执行...
同步任务执行完成。
同步任务执行完成。
OK

更糟的情况是 Gunicorn有 --timeout 机制(默认30秒)。如果我们的任务耗时过长,Gunicorn 的 Master 进程会认为那个 Worker 卡死了,然后会强行杀死它。用户会收到一个 502 Bad Gateway 错误,而任务可能只执行了一半。

1
2
3
4
5
6
7
8
9
10
11
12
$ gunicorn --timeout 5 --workers 2 --bind 0.0.0.0:9090 case1_app:app
[2025-08-07 14:41:41 +0800] [19882] [INFO] Starting gunicorn 21.2.0
[2025-08-07 14:41:41 +0800] [19882] [INFO] Listening at: http://0.0.0.0:9090 (19882)
[2025-08-07 14:41:41 +0800] [19882] [INFO] Using worker: sync
[2025-08-07 14:41:41 +0800] [19883] [INFO] Booting worker with pid: 19883
[2025-08-07 14:41:41 +0800] [19885] [INFO] Booting worker with pid: 19885
收到创建任务请求,开始同步执行...
[2025-08-07 14:41:50 +0800] [19882] [CRITICAL] WORKER TIMEOUT (pid:19885)
[2025-08-07 14:41:50 +0800] [19885] [INFO] Worker exiting (pid: 19885)
[2025-08-07 14:41:50 +0800] [19882] [ERROR] Worker (pid:19885) exited with code 1
[2025-08-07 14:41:50 +0800] [19882] [ERROR] Worker (pid:19885) exited with code 1.
[2025-08-07 14:41:50 +0800] [19953] [INFO] Booting worker with pid: 19953

案例一总结:同步执行长任务,无论在开发还是生产环境,都会轻易地阻塞服务,导致服务在一段时间内完全不可用,这是不可接受的。

案例二:异步子进程

既然同步执行会阻塞请求线程,我们不妨让任务在后台运行,这样 Flask 就可以立刻返回响应,而不必等待脚本执行完毕。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# case2_app.py

import subprocess
from flask import Flask

app = Flask(__name__)

@app.route('/task')
def create_task():
print("收到创建任务请求,将任务放入后台...")
# Popen是非阻塞的,它会立即返回
subprocess.Popen(['./long_task.sh'])
return {"message": "任务已提交到后台运行"}

@app.route('/health')
def health_check():
return "OK"

Werkzeug 实现

运行 flask --app case2_app run --host=0.0.0.0 --port=9090

此时,访问 /task 会立刻返回成功信息,同时访问 /health 也毫无压力。

1
2
3
4
5
收到创建任务请求,将任务放入后台...
后台任务开始... (PID: 957)
101.6.35.52 - - [07/Aug/2025 15:32:51] "GET /task HTTP/1.1" 200 -
101.6.35.52 - - [07/Aug/2025 15:32:55] "GET /health HTTP/1.1" 200 -
后台任务完成!

不过虽然通过 subprocess.Popen() 实现了非阻塞调用,表面上看任务已经成功后台执行,但本质上这种方式仍然非常脆弱,存在多个关键性问题:

问题 描述
无任务追踪能力 Flask 不知道任务是否成功、失败,无法返回任务状态
无日志记录 子进程输出没有保存,出错也不会被发现
受 Flask 生命周期影响 按下 Ctrl+C 停止服务时,后台子进程也会被杀掉
资源不可控 并发请求可能产生大量子进程,容易导致资源耗尽
无持久性或任务管理 无法重新尝试失败任务,无法查询执行历史

当然,我们可以开发一个简单的任务管理系统,来弥补这些短板,比如使用 SQLite 数据库来存储任务状态(状态、命令、结果、时间等),子进程执行完后更新数据库中的状态和输出。可以提供 API 接口,让用户查询任务执行结果和任务日志等。

Gunicorn 实现

运行 gunicorn --workers 2 --timeout 3 --bind 0.0.0.0:9090 case2_app:app

整体和Werkzeug里一样,一切看起来都很快、很正常。

1
2
3
4
5
6
7
8
9
10
11
12
$ gunicorn --workers 2 --timeout 3 --bind 0.0.0.0:9090 case2_app:app
[2025-08-07 15:58:13 +0800] [7777] [INFO] Starting gunicorn 21.2.0
[2025-08-07 15:58:13 +0800] [7777] [INFO] Listening at: http://0.0.0.0:9090 (7777)
[2025-08-07 15:58:13 +0800] [7777] [INFO] Using worker: sync
[2025-08-07 15:58:13 +0800] [7778] [INFO] Booting worker with pid: 7778
[2025-08-07 15:58:13 +0800] [7779] [INFO] Booting worker with pid: 7779
收到创建任务请求,将任务放入后台...
后台任务开始... (PID: 7834)
收到创建任务请求,将任务放入后台...
后台任务开始... (PID: 7844)
后台任务完成!
后台任务完成!

但是,在 Gunicorn 中使用 subprocess.Popen() 启动子进程,实际上是一件很危险的事情:

  • Flask 应用运行在 Gunicorn 的 worker 进程中;
  • subprocess.Popen() 启动的子进程,其 parent PID(PPID)就是这个 worker;
  • 如果该 worker 被 kill(timeout、崩溃、重启、升级等),子进程仍在运行,但已无人管理;
  • 操作系统会将它交给系统的根进程 PID 1 管理(变成孤儿进程);
  • 我们将无法感知它的状态、无法终止它、也无法记录其执行结果

每发生一次 Worker 重启,就可能在服务器上留下一个或者多个这样的幽灵进程。日积月累,可能会最终导致整个服务器崩溃。


我们可以用一个例子来复现上述提到的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# case2_crush.py

import subprocess
from flask import Flask
import time

app = Flask(__name__)

@app.route('/task')
def create_task():
print("收到创建任务请求,将任务放入后台...")
subprocess.Popen(['sleep', '300']) # 模拟长任务
return {"message": "任务已提交到后台运行"}

@app.route('/block')
def block():
print("开始执行阻塞请求,模拟卡死...")
time.sleep(100) # 用来触发 gunicorn 超时
return "done"

我们只启动一个 Worker 进程,并且设置超时为 5s:

1
gunicorn -w 1 -b 0.0.0.0:9090 --timeout 5 case2_crush:app

我们先访问 /task,启动后台子进程。

1
2
3
4
5
6
gunicorn -w 1 -b 0.0.0.0:9090 --timeout 5 case2_crush:app
[2025-08-07 16:34:04 +0800] [17635] [INFO] Starting gunicorn 21.2.0
[2025-08-07 16:34:04 +0800] [17635] [INFO] Listening at: http://0.0.0.0:9090 (17635)
[2025-08-07 16:34:04 +0800] [17635] [INFO] Using worker: sync
[2025-08-07 16:34:04 +0800] [17636] [INFO] Booting worker with pid: 17636
收到创建任务请求,将任务放入后台...

然后可以在终端中用以下命令来查看该子进程:

1
ps -ef | grep sleep

ps 是 process status 的缩写,-e 或者 -a 是系统中所有用户的所有进程(--everyone),-f 是显示完整格式,包括 UID、PID、PPID、CMD 等等。这里 sleep 300 子进程的 PPID 就是 Gunicorn Worker 的 PID:

1
shen     17724 17636  0 16:34 pts/22   00:00:00 sleep 300

接下来,我们访问 /block,杀死这个 Worker 进程:

1
2
3
4
5
6
开始执行阻塞请求,模拟卡死...
[2025-08-07 16:34:44 +0800] [17635] [CRITICAL] WORKER TIMEOUT (pid:17636)
[2025-08-07 16:34:44 +0800] [17636] [INFO] Worker exiting (pid: 17636)
[2025-08-07 16:34:44 +0800] [17635] [ERROR] Worker (pid:17636) exited with code 1
[2025-08-07 16:34:44 +0800] [17635] [ERROR] Worker (pid:17636) exited with code 1.
[2025-08-07 16:34:44 +0800] [17853] [INFO] Booting worker with pid: 17853

我们发现,这个 sleep 300 的进程并没有消失,而是被 PID 1 接管了:

1
shen     17724     1  0 16:34 pts/22   00:00:00 sleep 300

案例二总结:使用“发射后不管”的异步子进程,比同步阻塞更危险。因为子进程很可能脱离服务器的掌控,问题被隐藏起来,直到最终服务器崩溃时才暴露出来。

案例三:消息队列

中间件和消息队列

“计算机科学的任何问题,都可以通过增加一个中间层来解决。”

(Any problem in computer science can be solved by another level of indirection.)

—— David Wheeler

中间件(Middleware)正是这种“中间层”思想的典型体现。想象一个场景:

  • 有一个讲中文的演讲者(系统 A)
  • 面对一群只懂英文的听众(系统 B)

要让他们顺利交流,有两个选择:

  1. 演讲者去学英文,或者让听众都学中文 —— 这会让系统高度耦合,一旦角色变化,整个结构就要重写;
  2. 引入一个会中英文的同声传译 —— 他在中间完成沟通解码,从而实现两端的解耦。

在 Web 后台任务的开发中,这种中间层也无处不在:

  • Gunicorn Web 服务(演讲者):负责接收 HTTP 请求,处理业务逻辑,但不适合执行耗时的后台任务。
  • 后台仿真进程(听众):专注计算,但不需要知道谁发起了请求,也不关心用户状态。
  • Redis + Redis Queue(RQ)(中间层 / 翻译者):承担任务传递、排队、状态维护的职责。Gunicorn 把任务信息传给 Redis,后台 worker 从 Redis 中读取并执行。

RQ 是一个使用 Redis 作为消息队列(Message Queue,MQ)的 Python 库,它使用 Redis 来跟踪队列中需要执行的任务。MQ 是一种典型的中间件形式,它的本质就是在生产者(Producer)和消费者(Consumer)之间建立一个中间缓冲区(队列),实现解耦、异步和削峰。在我们的案例中:

  • 生产者:Web 请求处理函数,收到用户请求后,立即将任务推入队列。
  • 消费者:后台 worker 进程,独立运行,监听并处理队列中的任务。
  • 消息队列:Redis 中的一个 List 结构,承担任务传递的角色。

这种设计有三个核心优势:

  1. 解耦:前端不再直接调用后端逻辑,只是丢一个“请求”进队列。
  2. 异步:请求响应速度快,不用等待任务完成。
  3. 削峰:即使同时有大量请求,队列可以缓存它们,逐步处理。

安装 Redis 和 RQ 很简单:

1
2
3
sudo yum -y install redis
sudo systemctl start redis
pip install redis rq

Redis:高性能的中间通信组件

Redis 的全称是 Remote Dictionary Server,即“远程字典服务”。这个名字其实已经暗示了它的本质:它是一个可以通过网络远程访问的、内存中的键值对数据库。

从开发者的角度来看,它的行为很像 Python 中的 dict

1
2
3
4
# Python 中的本地字典
d = {}
d["name"] = "Alice"
print(d["name"]) # 输出 Alice

而 Redis 提供了一个“全局的字典”,它可以被多个进程、多个主机、多个系统共享访问:

1
2
3
# Redis 命令行
SET name Alice
GET name

相比于传统的数据库,Redis 有以下几个显著特点:

特性 描述
内存存储 所有数据都存放在内存中,读取速度极快(微秒级),适用于高并发场景
多种数据结构 不仅有 String,还包括 List、Set、Hash、Sorted Set 等复杂结构
轻量级通信 通过 TCP 使用简单的文本协议(RESP),即使是脚本语言也能轻松接入
持久化机制 虽然运行在内存中,但支持 RDB(快照)和 AOF(追加日志)两种持久化方式
多功能角色 既可以做缓存,也可以做消息队列、中间件、分布式锁、计数器、排行榜等

在我们的后台任务调度中,Redis 被用作 消息中转站。比如 RQ 框架中,它会:

  1. 将任务序列化成字符串,并存入 Redis 中的一个 List(列表结构)
  2. RQ 的 Worker 会不断监听这个列表,一旦有任务加入,就弹出执行
  3. 执行结果、状态等信息也会临时存储在 Redis 中,供客户端查询

这种模式天然适合分布式架构,因为 Redis 的 List 是线程安全的、支持原子性操作的,非常适合做“先进先出”(FIFO)的队列。

Redis 常用数据类型包括:

类型 作用举例
String 基本的键值对(可存储字符串、数字等)
List 消息队列(FIFO 弹出/插入任务)
Hash 类似字典结构,适合存储对象
Set 去重集合,如“在线用户集合”
Sorted Set 排行榜/任务优先级队列

Redis 默认监听在 6379 端口,在各类语言中,我们都可以很简单地操纵 Redis:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from redis import Redis

r = Redis(host='localhost', port=6379, db=0)

# 设置一个键值
r.set('username', 'Alice')

# 获取一个键值
name = r.get('username')
print(name.decode()) # 输出 Alice

# 模拟消息队列
r.lpush('task_queue', 'run_simulation_123')
task = r.rpop('task_queue')
print(task.decode())

这就是最原始的“任务队列”模型:lpush 添加任务,rpop 取出任务。

RQ:基于 Redis 的任务队列框架

基本图像

RQ 是一个基于 Redis 的轻量级任务队列框架,RQ 系统由三个核心角色组成:

组件 作用
Producer(生产者) 通常是 Web 应用,调用 enqueue() 将任务提交到 Redis 队列中
Redis 用作中间件,缓存任务队列、任务状态、执行结果
Worker(消费者) 独立运行的进程,监听 Redis 中的任务队列,并执行其中的任务函数

使用 RQ 一般分成以下几个步骤,首先我们需要定义任务函数:

1
2
3
4
5
6
7
8
9
# tasks.py

import time

def long_running_task(param="默认参数"):
print(f"开始执行任务:{param}")
time.sleep(5) # 模拟耗时操作
print("任务完成")
return f"任务完成,参数是:{param}"

接下来,我们就可以提交任务:

1
2
3
4
5
6
7
8
9
10
# run.py

from redis import Redis
from rq import Queue
from tasks import long_running_task # 导入任务函数

redis_conn = Redis()
queue = Queue('my_tasks', connection=redis_conn)
job = queue.enqueue(long_running_task, "重要参数123")
print(f'任务已提交到 my_tasks 队列,任务 ID 为 {job.id}')

执行 python run.py 后,我们会得到类似于下方所示的输出:

1
任务已提交到 my_tasks 队列,任务 ID 为 9e4ce28a-604b-45a4-aff6-efd4ea7325f7

接下来在当前目录运行:

1
rq worker my_tasks

这会启动一个 worker,监听名为 my_tasks 的队列,并处理任务。

1
2
3
4
5
6
7
8
9
10
11
$ rq worker my_tasks
21:44:25 Worker 7c3e83d4fc244e8ca420f0b9ab100dd7: started with PID 3862, version 2.4.1
21:44:25 Worker 7c3e83d4fc244e8ca420f0b9ab100dd7: subscribing to channel rq:pubsub:7c3e83d4fc244e8ca420f0b9ab100dd7
21:44:25 *** Listening on my_tasks...
21:44:25 Worker 7c3e83d4fc244e8ca420f0b9ab100dd7: cleaning registries for queue: my_tasks
21:44:25 my_tasks: tasks.long_running_task('重要参数123') (9e4ce28a-604b-45a4-aff6-efd4ea7325f7)
开始执行任务:重要参数123
任务完成
21:44:30 Successfully completed tasks.long_running_task('重要参数123') job in 0:00:05.006365s on worker 7c3e83d4fc244e8ca420f0b9ab100dd7
21:44:30 my_tasks: Job OK (9e4ce28a-604b-45a4-aff6-efd4ea7325f7)
21:44:30 Result is kept for 500 seconds

我们可以看到,RQ 在启动后,立即连接到 Redis,发现了 my_tasks 队列中存在挤压的任务,然后马上开始一个一个地取出并执行。当任务执行完了后,RQ Worker 会继续监听这个队列,并实时从中弹出任务、执行并更新状态。

数据结构

RQ 利用了 Redis 原生的、高性能的数据结构,主要是 ListsHashes

A. 任务队列本身:一个 Redis List

每个 RQ 队列都对应 Redis 中的一个 List 数据结构。这个 List 里存放的是等待被执行的 Job ID。

  • Key 的命名规则rq:queue:<queue_name>
  • 例如:对于我们之前创建的 my_tasks 队列,它的 Key 就是 rq:queue:my_tasks
  • 如何查看:我们可以通过 redis-cli 进入 Redis 命令行工具,使用 LRANGE 命令 (List RANGE):
1
2
# 查看 my_tasks 队列中所有等待的 Job ID
LRANGE rq:queue:my_tasks 0 -1

0 -1 的意思是“从第一个元素到最后一个元素”。如果队列里有任务,我们会看到一个 Job ID 列表。注意,这个队列只存储还未被处理的任务(即排队等待中的任务)。执行完毕后,该任务会从队列列表中移除。

B. 每个任务的详细信息:一个 Redis Hash

每个被推入队列的任务(Job),其所有的详细信息都被存储在一个 Hash 数据结构中。Hash 就像一个键值对字典。

  • Key 的命名规则rq:job:<job_id>
  • 例如:如果我们有一个 Job ID 是 a1b2c3d4-e5f6-....,它的 Key 就是 rq:job:a1b2c3d4-e5f6-....
  • 如何查看: 使用 HGETALL 命令 (Hash GET ALL):
1
2
# 查看某个特定 Job ID 的所有信息
HGETALL rq:job:a1b2c3d4-e5f6-....

执行后,我们会看到非常丰富的任务信息,包括:

  • data: 序列化后的函数调用信息,看起来像 (b'\x80\x04\x95...\x8c\x05tasks\x94\x8c\x11long_running_task\x94...'。这里面包含了要调用的函数名、参数等。
  • status: 任务的当前状态,比如 queued, started, finished, 或 failed。
  • created_at: 创建时间。
  • enqueued_at: 入队时间。
  • ended_at: 结束时间(如果已完成)。
  • result: 任务成功后的返回值(如果已完成)。
  • exc_info: 任务失败后的异常堆栈信息(如果失败了)。
  • origin: 它来自哪个队列,比如 my_tasks。

默认情况下,RQ 会保留 job 500 秒,这个可以通过启动 RQ 时的 result_ttl 参数来设定。在这段时间内,我们可以通过 job ID 查询它的状态、返回值等。 超时后,RQ 就会把这些储存在 Redis 中的任务都清理掉。

C. 任务状态存储

所以整个流程是,在 Web 中,调用 queue.enqueue(my_task, "参数")

RQ 会做两件事:

  • 把任务信息序列化,写入 Redis Hash,如 rq:job:<job_id>
  • 把 job_id 加入队列列表 rq:queue:my_tasks

Worker 启动时,会从 rq:queue:my_tasksrpop() 一个任务 ID,并:

  • 从对应 rq:job:<job_id> 中加载任务详情
  • 执行任务
  • 更新任务状态:比如 finishedfailed
  • 结果仍然存在 rq:job:<job_id>

执行完后,该任务 会从队列列表中移除,不再存在于 rq:queue:my_tasks 中。这些任务的信息存在什么地方呢?还有其他键值:

键名 作用 是否定期清理
rq:queue:<name> 等待中的任务队列(List) ❌ 只有任务取走才会移除
rq:job:<job_id> 任务详情(函数名、参数、状态、结果) ✅ 按 result_ttl 清理
rq:finished 已完成任务的 ID 列表(Registry 类型) ✅ 按 result_ttl 清理
rq:failed 失败任务的 ID 列表(Registry 类型) ✅ 按 failure_ttl 清理
rq:workers Worker 列表(活跃 worker 的心跳记录) ✅ 按 worker_ttl 清理

消息队列实现

我们把任务逻辑单独放到一个文件里:

1
2
3
4
5
6
7
8
import time

def long_running_task(some_arg):
print(f"后台任务开始执行,参数: {some_arg}")
time.sleep(10) # 用Python的sleep模拟耗时IO
result = "任务成功完成!"
print(result)
return result

这是因为 RQ 在执行任务时,并不是调用当前运行在 Flask 进程中的函数,而是用 Worker 进程重新导入任务函数。比如当我们调用

1
job = q.enqueue(long_running_task, param)

RQ 会把函数的导入路径(模块名+函数名,比如 tasks.long_running_task)和参数序列化后写进 Redis。Worker 在另一端取到任务时,会用 importlib 根据模块路径重新导入这个模块,再调用函数执行。因此,如果任务函数定义在了 Flask 主文件 app.py 里,而在 app.py 里又导入了队列的逻辑,这时 Worker 在导入 app 模块时会触发一堆和 Web 服务相关的初始化代码(比如启动 Flask、连接数据库、加载蓝图等),还可能出现循环导入或者环境不一致等问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from flask import Flask
from redis import Redis
from rq import Queue
from tasks import long_running_task # 导入我们的任务函数

app = Flask(__name__)

# 连接到Redis
redis_conn = Redis()
# 创建一个名为 'my_tasks' 的队列
q = Queue('my_tasks', connection=redis_conn)

@app.route('/task')
def create_task():
print("收到创建任务请求,将任务加入队列...")
# 将任务函数和其参数放入队列
# enqueue是非阻塞的,会立即返回一个job对象
job = q.enqueue(long_running_task, "一个重要的参数")
return {
"message": "任务已成功加入队列",
"job_id": job.id
}

@app.route('/health')
def health_check():
return "OK"

使用 Gunicorn 启动Web应用:gunicorn --workers 2 --bind 0.0.0.0:9090 case3_app:app

同时在目录中执行:rq worker my_tasks

访问 http://localhost:8000/task,我们可以发现,

  • Web服务器终端:Gunicorn立刻打印了"收到创建任务请求...",并且API请求瞬间返回了成功信息和一个job_id。
  • RQ Worker终端:几乎在同时, rq worker 打印出 "后台任务开始执行...",等待10秒后,又打印出 "任务成功完成!"。
  • 健康检查:在任务执行的10秒内,/health 接口始终是可用的。

案例三总结:这是生产环境处理后台任务的比较好的方式:

  • 解耦:Web服务器和任务执行器分离,互不影响。Web服务器的崩溃不会影响正在执行的任务,反之亦然。
  • 高可用:Web服务器始终保持响应,不会因为后台任务而被阻塞。
  • 可扩展:如果任务太多处理不过来,只需要多开几个 rq worker 进程,而无需改动Web服务器。
  • 可观测:RQ提供了工具来查看任务状态、失败的任务等,管理起来非常方便。

当然,这里还并没有涉及 systemd 配置和任务持久化等的相关处理,后面的文章再进行总结。

总结

通过这三个案例,我们可以总结出一个架构原则:

不要在 Web 请求–响应周期内执行耗时且不可预测的任务。

Web 服务器的职责,就像一位高效的接待员——它的使命是快速、准确地接收并转交请求,而不是亲自去后厨炒一盘要花二十分钟的菜。任何可能拖慢响应的工作,都应该通过消息队列交给独立的后台任务执行系统去处理。这种设计可以让网站在高并发下依然保持轻快响应,还能让任务执行逻辑与 Web 层彻底解耦,形成一个健壮、可扩展、易维护的架构基础。