把Celery集成到Django里之异步任务

虽然Django3.x开始已经部分支持异步任务了,但是还是不够完善,所以如果您需要执行大量或长时间运行的任务,还是推荐使用Celery,而且Celery还可以设置计划任务。
在本文中,我们主要讲如何把Celery集成到Django并执行异步任务,Celery的消息队列和任务结果我们都使用redis来存储,假设我们的Django项目目录结构如下:

- myproj/
  - manage.py
  - myproj/
    - __init__.py
    - settings.py
    - urls.py
	- celery.py #稍后添加
  - app1/
    - models.py
	- views.py
	- tasks.py  #稍后添加
	......

第一步,安装celery和python操作redis的客户端:

[root@myproj ~]# pip install celery redis

第二步,在myproj/myproj/settings.py添加Celery需要变量

#Celery参数设定
CELERY_BROKER_URL = 'redis://localhost' # 消息中间件控制使用redis
CELERY_CACHE_BACKEND = 'redis://localhost' #缓存用redis存储
CELERY_RESULT_BACKEND = 'redis://localhost' #执行结果,也使用redis存储,之后的文章中我们将介绍使用数据库保存
CELERY_TASK_TRACK_STARTED = True #打开任务开始状态标记
CELERY_TASK_TIME_LIMIT = 3600 #任务限制时间,类似于timeout,我设置的一小时,根据需要变更

第三步,创建文件myproj/myproj/celery.py,内容如下:

#基本照着官方文档即可,对应信息变成我们自己的即可
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproj.settings')

app = Celery('myproj')

#让任务执行结果里包含任务名,参数,重试次数等更多的信息
app.conf.update(result_extended=True)

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
# 自动加载各个app里的任务模块,比如我们上面的app1
app.autodiscover_tasks()

第四步,在myproj/myproj/__init__.py中追加以下内容:

from .celery import app as myproj_celery

__all__ = ('myproj_celery',)

#之后我们可以在别的app里使用from myproj import myproj_celery来引入创建的celery实例

第五步,在app里创建异步任务,比如我们的示例app1,那么创建文件myproj/app1/tasks.py:

from celery import shared_task
import time

@shared_task
def add(x, y):
	#为了看到异步效果,我们睡一分钟再执行
	time.sleep(60)
    return x + y

第六步,在myproj/目录下启动celery worker(在后面的文章中我们将介绍使用systemd启动)

[root@myproj ~]# celery -A myproj worker -l INFO

第七步,在app的视图里(myproj/app1/views.py)使用刚刚创建的任务:

from django.http import HttpResponse
from .tasks import add

def test(request):
	my_task = add.delay(1900,100)
	task_id = ''
    if my_task:
		#你可以使用这个id去redis里查询结果
        task_id = my_task.id
		
	html = f"<html><body>Task ID:{task_id}</body></html>"
    return HttpResponse(html) 

访问这个视图的页面,然后用id去redis里查询任务执行状况。


除非标明,否则皆为<IT民工の在日生活 - 神户印象>原创之作,转载必须以明文链接标明出处。
本文链接:https://www.lifeinjp.net/share/494

发表评论