基于 Redis 配置异步 Celery

微信扫一扫,分享到朋友圈

基于 Redis 配置异步 Celery


 △点击上方

Python猫
”关注 ,回复“


1

”领取电子书

剧照:鬼灭之刃

作者:匿蟒

来源:https://note.qidong.name/2020/08/celery-with-redis

作为一个分布式异步计算框架,Celery虽然常用于Web框架中,但也可以单独使用。虽然常规搭配的消息队列是RabbitMQ,但是由于某些情况下系统已经包含了Redis,那就可以复用。

以下撇开Web框架,介绍基于Redis配置Celery任务的方法。

pip install celery[redis]

项目结构

$ tree your_project
your_project
├── __init__.py
├── main.py
├── celery.py
└── tasks.py

0 directories, 4 files

其中, main.py
是触发Task的业务代码。当然,文件名可以随意改。 celery.py
是Celery的app定义的位置, tasks.py
是Task定义的位置,文件名不建议修改。

配置Celery

celery.py
中写入如下代码:

from celery import Celery

from .settings import REDIS_URL

APP = Celery(
main=__package__,
broker=REDIS_URL,
backend=REDIS_URL,
include=[f'{__package__}.tasks'],
)

APP.conf.update(task_track_started=True)

其中, REDIS_URL
从同一的配置 settings.py
中引入, 形式大概是 redis://localhost:6379/0

这里既用Redis来当 broker
,又用来当 backend
。即,既当消息队列,又当结果反馈的数据库(默认仅保存1天)。

include=
,需要填一个下游worker的包名列表。这里选择了同一个包的 tasks.py
文件。

额外设置的 task_track_started
,是命令Worker反馈 STARTED
状态。默认情况下,是无法知道任务什么时候开始执行的。

编写任务并调用

tasks.py
文件中,添加异步任务的实现。

from .celery import APP

@APP.task
def do_sth():
pass

在需要发起任务的地方,用 .apply_async
可以触发异步调用。即,实际只是向消息队列发送消息,真正的执行操作在远程。

from celery.result import AsyncResult

from .tasks imprt do_sth

result = do_sth.apply_async()
assert isinstance(result, AsyncResult)

运行Worker:

celery -A your_project worker

运行原理

一次Task从触发到完成,序列图如下:

其中, main
代表业务代码主进程。它可能是Django、Flask这类Web服务,也可能是一个其它类型的进程。 worker
就是指Celery的Worker。

main
发送消息后,会得到一个 AsyncResult
,其中包含 task_id
。仅通过 task_id
,也可以自己构造一个 AsyncResult
,查询相关信息。其中,代表运行过程的,主要是 state

worker
会持续保持对Redis(或其它消息队列,如RabbitMQ)的关注,查询新的消息。如果获得新消息,将其消费后,开始运行 do_sth

运行完成会把返回值对应的结果,以及一些运行信息,回写到Redis(或其它backend,如Django数据库等)上。在系统的任何地方,通过对应的 AsyncResult(task_id)
就可以查询到结果。

Celery Task的状态

以下是状态图:

其中,除 SUCCESS
外,还有失败( FAILURE
)、取消( REVOKED
)两个结束状态。而 RETRY
则是在设置了重试机制后,进入的临时等待状态。

另外,如果保存在Redis的结果信息被清理(默认仅保存1天),那么任务状态又会变成 PENDING
。这在设计上是个巨大的问题,使用时要做对应容错。

常见控制操作

result = AsyncResult(task_id)
# 阻塞等待返回
result.wait()
# 取消任务
result.revoke()
# 删除任务记录
result.forget()

有时,在业务主进程中需要等待异步运行的结果,这时需要使用 wait
。如果要取消一个排队中、或已执行的任务,则可以使用 revoke
。即使任务已经执行完成,也可以使用 revoke
,但不会有任何变化。如果需要提前删除任务记录,可以使用 forget

参考

基本上都是参考官方文档。也有看过一些 StackOverFlow,但很多都是早期版本的方案,过时了。

官方文档:https://docs.celeryproject.org/en/stable

Python猫技术交流群开放啦!
群里既有国内一二线大厂在职员工,也有国内外高校在读学生,既有十多年码龄的编程老鸟,也有中小学刚刚入门的新人,学习氛围良好!想入群的同学,请在公号内回复『
交流群

』,获取猫哥的微信 (谢绝广告党,非诚勿扰!)
~

感谢创作者的好文

微信扫一扫,分享到朋友圈

基于 Redis 配置异步 Celery

加入在线教育烧钱大战,跟谁学的后手是什么?

上一篇

索尼确认PS5未来系统更新后将支持可变刷新率

下一篇

你也可能喜欢

基于 Redis 配置异步 Celery

长按储存图像,分享给朋友