celery任务队列学习

学习celery是为了后面写一个抢票系统,先咕咕咕一下。。

本文主要实现在Django中使用celery,安装模块为django-celery celery redis

Celery的基本架构

(此处是一堆废话)

Celery的架构有三个部分:消息中间件任务执行单元任务结果存储

  • 消息中间件(Broker):任务调度队列,接收任务并将任务存入队列。Celery不提供队列任务,本文使用RabbitMQ(因为我配置的redis一直没法让celery连接上)
  • 任务执行单元(Worker):任务执行处理单元,它实时监视消息队列,获取队列中调度的任务并执行。
  • 任务结果存储:用于存储任务的执行结果,以供查询

Django中celery的配置

首先配置环境,python环境安装celery django-celery,本地配置 RabbitMQ数据库,我本地建的项目名为CeleryTest

项目基本目录结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CeleryTest
├── app
│ ├── __init__.py
│ ├── admin.py
│ ├── apps.py
│ ├── models.py
│ ├── tasks.py
│ ├── tests.py
│ ├── views.py
├── CeleryTest
│ ├── __init__.py
│ ├── asgi.py
│ ├── celery.py
│ ├── settings.py
│ ├── urls.py
│ ├── wsgi.py
├── templates
└── manage.py

基本配置

在上面的目录结构中,tasks.py自行创建,用于写入任务程序让celery注册,celery.py也是自行创建,用于配置celery相关的东西

先修改settings.py中的配置,文件部分修改如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import djcelery
djcelery.setup_loader()

BROKER_URL= 'amqp://guest@localhost//' # Broker的地址,可以自行修改
CELERY_RESULT_BACKEND = 'amqp://guest@localhost//'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'app',
'djcelery'
]

主要配置Broker和Backend的地址,并将模块导入django

然后编写celery的配置文件celery.py ,需要详细的说明到官方文档查看

1
2
3
4
5
6
7
8
9
10
11
12
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'CeleryTest.settings')

app = Celery('CeleryTest')

app.config_from_object('django.conf:settings')

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

然后修改同一目录下的__init__.py

1
2
3
4
5
from __future__ import absolute_import, unicode_literals

from .celery import app as celery_app

__all__ = ('celery_app')

到这里,基本的配置已经完成了

任务函数

app目录下的tasks.py中写入任务函数,这里简单写了一个加法运算的函数

1
2
3
4
5
from celery import task

@task
def add(x, y):
return x + y

除了@task装饰器外,还有@share_task,用于在没有具体的Celery实例的情况下创建任务

运行项目

这里运行项目需要首先运行django项目,然后运行用于监控任务变化的任务调度器celery beat,最后启动worker

1
2
3
python manage.py runserver
python manage.py celery beat
python manage.py celery worker -l debug

(建议启动多个控制台分别运行)

到这里,celery的配置完成了,然后可以在django后台管理页面中添加任务,就能自动运行,这里不做示例了

在Django中实现定时Celery任务

这个的学习我一开始是打算做一个学校讲坛的抢票系统,但是大一的时候已经刷满了所以一直咕咕咕

系统的需求是这样的:讲坛会在微信上发布预告,并一些讲坛会有抢票的二维码,二维码扫描后会进入抢票地址。一般来说会定一个时间比如周六12:00开始抢票,那么就会在这个时间开放一个表单进行填写,先到者得。然而这个基本的抢票过程的程序我已经实现了,但是我觉得这样还不是特别的完美,因为虽然它存储了时间,但是依然需要定时去请求这个抢票网站上的一个连接以激发网站进行请求,这是特别麻烦的。而且我用这个自己写的抢票系统就抢到过一张×。所以想用celery来实现一个定时的任务,用户在输入抢票网址,后台的爬虫拉取了抢票信息后,会添加任务到任务队列,然后定时执行。

在网络上找了很久,都没有找到相关的可以动态添加定时任务的实现,所以自己看一下怎么整一个

在一篇文章里面它实现了通过向djcelery这个数据库的periodictask表中添加一个带有crontab的任务,但是我看了一下,django-celery中的crontab的格式是这样的m/h/d/dM/MY,也就是说这样其实是一个周期任务,不能保证在某一天执行后不再一次执行。

但是到这里想到了一个办法,用户每次添加一个抢票信息,那么就添加这个任务,在任务执行完了之后在表中删除这个任务,但是这个做法的实现还需要看一下celery执行的一些细节才能实现

然后可以考虑Celery中的任务延时,在注册了一个task之后,在调用task的时候可以使用方法apply_async,输入参数以及等待时间以调用task

apply_async支持的参数

countdonw: 等待一段时间后执行

eta: 定义任务开始时间(UTC时间)

expires: 设置超时时间

retry: 任务失败后是否重试

retry_policy : 重试的策略,可以为max_retries:最大重试次数、interval_start:重试等待的时间间隔秒数、interval_step: 每次重试让重试间隔增加的秒数、interval_max: 重试间隔最大的秒数

使用这个方法可以便捷地实现定时任务,该方法会返回一个celery任务的id。以为你之前之前的关注点都是关注在数据库中添加周期任务进行实现,没有查到可以使用这个方法直接实现任务的定时。假设用户在提交了一个任务请求时,可以使用下面的代码来实现任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from utils.bases import BaseAPIView
from utils.handle import get_goal_data
from .tasks import ticket_task
from .models import TaskInfo

class LectureGoalAPI(BaseAPIView):

def post(self, request):
user = request.user
data = request.data

task_data = get_goal_data(data['goal_url'])

goal = TaskInfo.object.create(stu_name=data['stu_name'],
stu_id=data['stu_id'],
task_url=data['goal_url'],
task_time=task_data['time'],
lecture_name=task_data['title'])

celery_task = ticket_task.apply_async(args=goal, eta=task_data['time'])
goal.celery_id = str(celery_task)
goal.save()

return self.success('Task Added Successful')

这里部分函数的实现被封装了,仅仅展示在使用apply_async实现定时任务的过程,后续会将系统的完整代码放到github上(如果我能完成的话)

这里还缺少一个获取任务执行状态的过程,但是实现的方式就比较多了,比如在用户请求列表的时候根据celery的id获取任务状态(这样可能比较耗时),还可以将对数据库内状态的修改在task中实现。

一点题外话:

在没有考虑使用Celery的时候所设计的系统的定时任务实现是比较抵消且极不稳定的,我使用了一个在14年那个时候比较流行的QQ空间秒赞系统的方法。写一个request页面,使用监控去定时访问这个页面,激发系统去执行一些操作。而这个操作的过程就比较麻烦,很多时候这样执行还不是很稳定。但是以前那些用php写的秒赞系统使用这个方法比较的稳定,不太清楚怎么做到的。也因为到了大学后php用的比较少,也没有去研究这些东西的实现,现在写一个网站系统都是习惯使用Django + Vue.js,或许以后有兴趣的话还会学习一些别的框架。

可能遇到的报错

beat启动错误

在运行python manage.py celery beat的时候出现AttributeError: 'DatabaseFeatures' object has no attribute 'autocommits_when_autocommit_is_off错误

貌似是包内代码出现的问题,在djcelerydb文件中删除52行位置的if判断即可正常运行

1
2
3
4
5
6
# if connection.features.autocommits_when_autocommit_is_off:
# ignore stupid warnings and errors
# yield
# else:
with transaction.atomic(using):
yield

参考文章