澳门新浦京娱乐场网站-www.146.net-新浦京娱乐场官网
做最好的网站

澳门新浦京娱乐场网站:django2定时任务,Celery分

一、前言

  Celery是3个基于python开垦的分布式任务队列,假使不精晓请阅读小编上一篇博文Celery入门与进级,而做python WEB开采最为流行的框架莫属Django,可是Django的伏乞管理进程都是联合的黔驴技穷兑现异步职分,若要完结异步职分管理供给经过别的办法(前端的貌似化解方案是ajax操作),而后台Celery正是毋庸置疑的挑3拣四。假如一个用户在奉行有些操作需求拭目以俟很久才回到,那大大降低了网址的吞吐量。上面将讲述Django的伸手管理大致流程(图片来自网络):

澳门新浦京娱乐场网站 1

伸手进度差不多表达:浏览器发起呼吁-->请求处理-->请求经过中间件-->路由映射-->视图处理职业逻辑-->响应请求(template或response)

网络有无数celery django达成按期职务的教程,可是它们大多数是依靠djcelery celery三的;
或许是使用django_celery_beat配置较为麻烦的。

享有演示均依据Django2.0

Celery 是二个简单易行、灵活且保证的,管理大批量新闻的布满式系统,并且提供维护这么三个系统的要求工具。
它是七个注意于实时管理的职务队列,同时也支撑任务调治。
上述是celery自身官方网站的介绍

【Celery布满式职责队列】

贰、配置使用

  celery很轻易集成到Django框架中,当然若是想要完毕定时职务的话还必要设置django-celery-beta插件,后边会表达。供给留意的是Celery4.0只援助Django版本>=一.八的,假使是自愧不及1.八本子须要选用Celery三.一。

大廷广众简洁而连忙才是我们最后的言情,而celery四已经无需特别插件就能够与django结合完毕定时职责了,原生的celery beat就能够很好的兑现定期职务成效。

celery是1个基于python开辟的简要、灵活且保险的分布式义务队列框架,支持使用任务队列的办法在布满式的机械/进度/线程上施行职务调整。选择独立的生产者-消费者模型,首要由三有的组成:

celery的利用场景很常见

壹、Celery介绍和主导选择

Celery 是贰个基于python开拓的布满式异步音讯职分队列,通过它能够轻易的达成职责的异步管理, 如果你的政工场景中须求用到异步任务,就足以思索选取celery, 举多少个实例场景中可用的事例:

  1. 您想对拾0台机器施行一条批量命令,大概会花不长日子 ,但你不想让您的主次等着结果再次回到,而是给您回去 1个任务ID,你过1段时间只要求拿着那么些义务id就可以得到任务履行结果, 在职责奉行ing举行时,你能够三番五次做任何的政工。 
  2. 你想做多少个定时职务,比方每一天检测一下你们全部客户的材质,假如开采今日是客户的风水,就给他发个短信祝福

Celery 在实践职分时供给通过一个新闻中间件来接受和发送任务消息,以及存款和储蓄职分结果, 一般选拔rabbitMQ or Redis

壹.一 Celery有以下优点:

  简单:1单熟稔了celery的劳作流程后,配置和使用或许相比轻巧的

  高可用:当任务施行倒闭或进行进度中产生延续中断,celery 会自动尝试再一次推行职责

  急速:四个单进度的celery每秒钟可管理上百万个职分

澳门新浦京娱乐场网站:django2定时任务,Celery分布式任务队列。  灵活: 大致celery的逐一零部件都得以被扩展及自定制

Celery基本工作流程图

  澳门新浦京娱乐场网站 2

配置

  新建立项目taskproj,目录结构(各类app下多了个tasks文件,用于定义职责):

taskproj
├── app01
│   ├── __init__.py
│   ├── apps.py
│   ├── migrations
│   │   └── __init__.py
│   ├── models.py
│   ├── tasks.py
│   └── views.py
├── manage.py
├── taskproj
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
└── templates

在项目目录taskproj/taskproj/目录下新建celery.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery


os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'taskproj.settings')  # 设置django环境

app = Celery('taskproj')

app.config_from_object('django.conf:settings', namespace='CELERY') #  使用CELERY_ 作为前缀,在settings中写配置

app.autodiscover_tasks()  # 发现任务文件每个app下的task.py

taskproj/taskproj/__init__.py:

from __future__ import absolute_import, unicode_literals

from .celery import app as celery_app

__all__ = ['celery_app']

taskproj/taskproj/settings.py

CELERY_BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

进去项目标taskproj目录运维worker:

celery worker -A taskproj -l debug

理所必然使用原生方案的还要有几点插件所带动的补益被大家扬弃了:

  • 音讯队列broker:broker实际上正是3个MQ队列服务,可以使用redis、rabbitmq等作为broker
  • 处理职责的消费者workers:broker文告worker队列中有任务,worker去队列中收取职务实践,每贰个worker正是三个进度
  • 蕴蓄结果的backend:试行结果存款和储蓄在backend,暗中同意也会积存在broker使用的MQ队列服务中,也足以独立布置用何种服务做backend
  • 处理异步任务
  • 职责调节
  • 拍卖定期职分
  • 遍及式调治

1.二 Celery安装使用

Celery的私下认可broker是RabbitMQ, 仅需配备1行就能够

broker_url ``= 'amqp://guest:guest@localhost:5672//'

澳门新浦京娱乐场网站:django2定时任务,Celery分布式任务队列。 

使用Redis做broker也可以

  安装redis组件

$ pip3 install -U "celery[redis]"

 

配置

Configuration is easy, just configure the location of your Redis database:

app.conf.broker_url = 'redis://localhost:6379/0'

Where the URL is in the format of:

redis://:password@hostname:port/db_number

all fields after the scheme are optional, and will default to localhost on port 6379, using database 0.

 

 

如果想获取每个任务的执行结果,还需要配置一下把任务结果存在哪

If you also want to store the state and return values of tasks in Redis, you should configure these settings:

app.conf.result_backend = 'redis://localhost:6379/0'

 

概念与触发职分

  职务定义在各类tasks文件中,app0壹/tasks.py:

from __future__ import absolute_import, unicode_literals
from celery import shared_task


@shared_task
def add(x, y):
    return x   y


@shared_task
def mul(x, y):
    return x * y

视图中触发职务

from django.http import JsonResponse
from app01 import tasks

# Create your views here.

def index(request,*args,**kwargs):
    res=tasks.add.delay(1,3)
    #任务逻辑
    return JsonResponse({'status':'successful','task_id':res.task_id})

访问

澳门新浦京娱乐场网站 3

 若想获得任务结果,能够由此task_id使用AsyncResult获取结果,还能直接通过backend获取:

澳门新浦京娱乐场网站 4

 

  • 插件提供的按期职务处理将不在可用,当大家只必要职责定时实践而没有要求人工资调治度的时候这一点忽略不计。
  • 没辙快速的管住或追踪定期义务,定期职责的追踪其实交给日志更客观,不过对职分的修改就从未有过那么便宜了,不过只要无需通常转移/增减职务的话那点也在可接受范围内。

澳门新浦京娱乐场网站 5

利润也大多,越发在运用python营造的运用种类中,无缝过渡,使用一定有益。

壹. 三 初步利用Celery

  安装celery模块

    pip3 install celery

创办2个celery application 用来定义你的任务列表

  创立1个职分文件就叫tasks.py

 

from celery import Celery

app = Celery('tasks',
             broker='redis://localhost',
        #有用户名密码的话,broker="redis://:mima@127.0.0.1"
             backend='redis://localhost')

@app.task
def add(x,y):
    print("running...",x,y)
    return x y

 

运转Celery Worker来开首监听并实施任务

$ celery -A tasks worker --loglevel=info

 

调用职责

  再张开叁个终极, 进行命令行格局,调用职务

>>> from tasks import add
>>> add.delay(4, 4)

看你的worker终端会显示收到 一个任务,此时你想看任务结果的话,需要在调用 任务时 赋值个变量

>>> result ``= add.delay(``4``, ``4``)

 

The ready() method returns whether the task has finished processing or not:

>>> result.ready()
False

You can wait for the result to complete, but this is rarely used since it turns the asynchronous call into a synchronous one:

>>> result.get(timeout=1)
8

In case the task raised an exception, get() will re-raise the exception, but you can override this by specifying the propagate argument:

>>> result.get(propagate=False)

If the task raised an exception you can also gain access to the original traceback:

>>> result.traceback
…

扩展

  除了redis、rabbitmq能做结果存款和储蓄外,还足以应用Django的orm作为结果存储,当然需求安装正视插件,那样的功利在于大家得以平素通过django的数据查看到任务情况,同时为能够制定更加多的操作,上边介绍怎么着行使orm作为结果存款和储蓄。

1.安装

pip install django-celery-results

2.配置settings.py,注册app

INSTALLED_APPS = (
    ...,
    'django_celery_results',
)

4.修改backend配置,将redis改为django-db

#CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_BACKEND = 'django-db'  #使用django orm 作为结果存储

伍.退换数据库

python3 manage.py migrate django_celery_results

那会儿会看到数据库会多创立:

澳门新浦京娱乐场网站 6 当然你有时候必要对task表进行操作,以下源码的表结构定义:

class TaskResult(models.Model):
    """Task result/status."""

    task_id = models.CharField(_('task id'), max_length=255, unique=True)
    task_name = models.CharField(_('task name'), null=True, max_length=255)
    task_args = models.TextField(_('task arguments'), null=True)
    task_kwargs = models.TextField(_('task kwargs'), null=True)
    status = models.CharField(_('state'), max_length=50,
                              default=states.PENDING,
                              choices=TASK_STATE_CHOICES
                              )
    content_type = models.CharField(_('content type'), max_length=128)
    content_encoding = models.CharField(_('content encoding'), max_length=64)
    result = models.TextField(null=True, default=None, editable=False)
    date_done = models.DateTimeField(_('done at'), auto_now=True)
    traceback = models.TextField(_('traceback'), blank=True, null=True)
    hidden = models.BooleanField(editable=False, default=False, db_index=True)
    meta = models.TextField(null=True, default=None, editable=False)

    objects = managers.TaskResultManager()

    class Meta:
        """Table information."""

        ordering = ['-date_done']

        verbose_name = _('task result')
        verbose_name_plural = _('task results')

    def as_dict(self):
        return {
            'task_id': self.task_id,
            'task_name': self.task_name,
            'task_args': self.task_args,
            'task_kwargs': self.task_kwargs,
            'status': self.status,
            'result': self.result,
            'date_done': self.date_done,
            'traceback': self.traceback,
            'meta': self.meta,
        }

    def __str__(self):
        return '<Task: {0.task_id} ({0.status})>'.format(self)

 

Celery定期职分布署

在进展示公布局前先来看看项目组织:

.├── linux_news│   ├── celery.py│   ├── __init__.py│   ├── settings.py│   ├── urls.py│   └── wsgi.py├── manage.py├── news│   ├── admin.py│   ├── apps.py│   ├── __init__.py│   ├── migrations│   ├── models│   ├── tasks.py│   ├── tests.py│   └── views└── start-celery.sh

里面news是大家的app,用于从一些rss订阅源获取情报音讯,linux_news则是大家的project。我们必要关注的入眼是celery.py,settings.py,tasks.py和start-celery.sh。

第3是celery.py,想让celery推行职分就不可能不实例化二个celery app,并把settings.py里的布署传入app:

import osfrom celery import Celery# set the default Django settings module for the 'celery' program.os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'linux_news.settings')app = Celery('linux_news')# 'django.conf:settings'表示django,conf.settings也就是django项目的配置,celery会根据前面设置的环境变量自动查找并导入# - namespace表示在settings.py中celery配置项的名字的统一前缀,这里是'CELERY_',配置项的名字也需要大写app.config_from_object('django.conf:settings', namespace='CELERY')# Load task modules from all registered Django app configs.app.autodiscover_tasks()

布置正是那样轻巧,为了能在django里使用这一个app,大家需求在__init__.py中程导弹入它:

from .celery import app as celery_app

接下来大家来看tasks.py,它应当放在你的app目录中,后面大家配备了自行发掘,所以celery会自动找到那个tasks,大家的tasks将写在这一模块中,代码涉及了一些orm的选取,为了顺应主旨小编做了些轻易:

from linux_news.celery import celery_app as appfrom .models import *import timeimport feedparserimport pytzimport html@app.task(ignore_result=True)def fetch_news(origin_name):    """    fetch all news from origin_name    """    origin = get_feeds_origin(origin_name)    feeds = feedparser.parse(origin.feed_link)    for item in feeds['entries']:        entry = NewsEntry()        entry.title = item.title        entry.origin = origin        entry.author = item.author        entry.link = item.link        # add timezone        entry.publish_time = item.time.replace(tzinfo=pytz.utc)        entry.summary = html.escape(item.summary)        entry.save()@app.task(ignore_result=True)def fetch_all_news():    """    这是我们的定时任务    fetch all origins' news to db    """    origins = NewsOrigin.objects.all()    for origin in origins:        fetch_news.delay(origin.origin_name)

tasks里是局地耗费时间操作,比方网络IO可能数据库读写,因为大家不拥戴职责的重回值,所以利用@app.task(ignore_result=True)将其屏蔽了。

职责安顿完毕后大家就要陈设celery了,大家挑选redis作为天职队列,小编刚强建议在生育条件中央银行使rabbitmq或许redis作为任务队列或结果缓存后端,而不应有运用关系型数据库:

# redisREDIS_PORT = 6379REDIS_DB = 0# 从环境变量中取得redis服务器地址REDIS_HOST = os.environ.get('REDIS_ADDR', 'redis')# celery settings# 这两项必须设置,否则不能正常启动celery beatCELERY_ENABLE_UTC = TrueCELERY_TIMEZONE = TIME_ZONE# 任务队列配置CELERY_BROKER_URL = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'CELERY_ACCEPT_CONTENT = ['application/json', ]CELERY_RESULT_BACKEND = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'CELERY_TASK_SERIALIZER = 'json'

然后是我们的按时职务设置:

from celery.schedules import crontabCELERY_BEAT_SCHEDULE={        'fetch_news_every-1-hour': {            'task': 'news.tasks.fetch_all_news',            'schedule': crontab(minute=0, hour='*/1'),        }}

定期任务安顿对象是二个dict,由职分名和配置项构成,首要配备想如下:

  • task:职责函数所在的模块,模块路线得写全,不然找不到将不可能运行该职务
  • schedule:定期宗旨,一般选择celery.schedules.crontab,下边例子为每时辰的0分试行3回职务,具体写法与linux的crontab类似能够参见文档表达
  • args:是个元组,给出职务急需的参数,借使不必要参数也足以不写进配置,就如例子中的同样
  • 其它配置项较少用,能够参见文书档案
    由来,配置celery beat的片段就得了了。

异步职务

Celery

在等级次序中采纳celery 

能够把celery配置成3个运用

目录格式如下

proj/__init__.py
    /celery.py
    /tasks.py

3、Django中使用按时职分

  要是想要在django中使用定期义务功效雷同是靠beat完毕任务发送效率,当在Django中使用定时职务时,要求设置django-celery-beat插件。以下将介绍使用进程。

启动celery beat

布置完毕后只须要运行celery了。

起步从前布署一下条件。不要用root运转celery!不要用root运营celery!不要用root运营celery!首要的专门的学业说一次。

start-celery.sh:

export REDIS_ADDR=127.0.0.1celery -A linux_news worker -l info -B -f /path/to/log

-A 表示app所在的目录,-B表示运行celery beat运转定时职责。
celery平常运营后就能够通过日记来查阅职务是或不是正规运作了:

[2018-12-21 13:00:00,022: INFO/MainProcess] Received task: news.tasks.fetch_all_news[e4566ede-2cfa-4c19-b2f3-0c7d6c38690d]  [2018-12-21 13:00:00,046: INFO/MainProcess] Received task: news.tasks.fetch_news[583e96dc-f508-49fa-a24a-331e0c07a86b]  [2018-12-21 13:00:00,051: INFO/ForkPoolWorker-2] Task news.tasks.fetch_all_news[e4566ede-2cfa-4c19-b2f3-0c7d6c38690d] succeeded in 0.02503809699555859s: None[2018-12-21 13:00:00,052: INFO/MainProcess] Received task: news.tasks.fetch_news[c61a3e55-dd3c-4d49-8d6d-ca9b1757db25]  [2018-12-21 13:00:00,449: INFO/ForkPoolWorker-5] Task news.tasks.fetch_news[c61a3e55-dd3c-4d49-8d6d-ca9b1757db25] succeeded in 0.39487219898728654s: None[2018-12-21 13:00:00,606: INFO/ForkPoolWorker-3] Task news.tasks.fetch_news[583e96dc-f508-49fa-a24a-331e0c07a86b] succeeded in 0.5523456179944333s: None

如上正是celery4运转按期职务的从头到尾的经过,如有错误和疏漏,接待指正。

自己的异步使用意况为项目上线:前端web上有个上线开关,点击按键后发请求给后端,后端施行上线过程要5分钟,后端在收取到请求后把职分放入队列异步执行,同时马上重返给前端一个任务实施中的结果。若果未有异步试行会怎么呢?同步的动静便是执行进程中前端一贯在等后端再次回到结果,页面转呀转的就转超时了。

安装

proj/celery.py内容

 

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
             broker='amqp://',
             backend='amqp://',
             include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

 

设置配备

一.beat插件设置

pip3 install django-celery-beat

2.注册APP

INSTALLED_APPS = [
    ....   
    'django_celery_beat',
]

三.数据库更动

python3 manage.py migrate django_celery_beat

4.分级运维woker和beta

celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler  #启动beta 调度器使用数据库

celery worker -A taskproj -l info #启动woker

5.配置admin

urls.py

# urls.py
from django.conf.urls import url
from django.contrib import admin

urlpatterns = [
    url(r'^admin/', admin.site.urls),
]

陆.创制用户

python3 manage.py createsuperuser 

七.登入admin举行处理(地址

澳门新浦京娱乐场网站 7

 

 使用示例:

澳门新浦京娱乐场网站 8

 

 

 

 

澳门新浦京娱乐场网站 9

 

 

 查看结果:

澳门新浦京娱乐场网站 10

 

异步职责安顿

安装Celery

引入使用pip安装,若是您采用的是虚拟情况,请在虚拟情况里安装

$ pip install celery

proj/tasks.py中的内容

 

from __future__ import absolute_import, unicode_literals
from .celery import app


@app.task
def add(x, y):
    return x   y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

 

启动worker 

$ celery -A proj worker -l info

输出

 

-------------- celery@Zhangwei-MacBook-Pro.local v4.0.2 (latentcall)
---- **** -----
--- * ***  * -- Darwin-15.6.0-x86_64-i386-64bit 2017-01-26 21:50:24
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         proj:0x103a020f0
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

 

后台运营worker

一回开辟

  django-celery-beat插件本质上是对数码库表变化检查,1旦有数量库表改造,调治重视新读取职分进行调解,所以若是想本身定制的职责页面,只供给操作beat插件的四张表就能够了。当然你还足以友善定义调整器,django-celery-beat插件已经松开了model,只须求开展导入便可开始展览orm操作,以下作者用django reset api实行出现说法:

settings.py

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'app01.apps.App01Config',
    'django_celery_results',
    'django_celery_beat',
    'rest_framework',
]

urls.py

urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^index$', views.index),
    url(r'^res$', views.get_res),
    url(r'^tasks$', views.TaskView.as_view({'get':'list'})),
]

views.py

from django_celery_beat.models import PeriodicTask  #倒入插件model
from rest_framework import serializers
from rest_framework import pagination
from rest_framework.viewsets import ModelViewSet
class Userserializer(serializers.ModelSerializer):
    class Meta:
        model = PeriodicTask
        fields = '__all__'

class Mypagination(pagination.PageNumberPagination):
    """自定义分页"""
    page_size=2
    page_query_param = 'p'
    page_size_query_param='size'
    max_page_size=4

class TaskView(ModelViewSet):
    queryset = PeriodicTask.objects.all()
    serializer_class = Userserializer
    permission_classes = []
    pagination_class = Mypagination

访问

澳门新浦京娱乐场网站 11

 

一.安装rabbitmq,这里我们运用rabbitmq作为broker,安装实现后私下认可运营了,也无需任何任何配置

设置音信中间件

Celery 支持 RabbitMQ、Redis 以至其余数据库系统作为其音信代理中间件

您期望用哪些中间件和后端就请自行安装,一般都应用redis或然RabbitMQ

In the background

In production you’ll want to run the worker in the background, this is described in detail in the daemonization tutorial.

The daemonization scripts uses the celery multi command to start one or more workers in the background:

$ celery multi start w1 -A proj -l info
celery multi v4.0.0 (latentcall)
> Starting nodes...
    > w1.halcyon.local: OK

You can restart it too:

$ celery  multi restart w1 -A proj -l info
celery multi v4.0.0 (latentcall)
> Stopping nodes...
    > w1.halcyon.local: TERM -> 64024
> Waiting for 1 node.....
    > w1.halcyon.local: OK
> Restarting node w1.halcyon.local: OK
celery multi v4.0.0 (latentcall)
> Stopping nodes...
    > w1.halcyon.local: TERM -> 64052

or stop it:

$ celery multi stop w1 -A proj -l info

The stop command is asynchronous so it won’t wait for the worker to shutdown. You’ll probably want to use the stopwait command instead, this ensures all currently executing tasks is completed before exiting:

$ celery multi stopwait w1 -A proj -l info
# apt-get install rabbitmq-server

安装Redis

在Ubuntu系统下利用apt-get命令就可以

$ sudo apt-get install redis-server

只要你使用redis作为中间件,还索要设置redis帮助包,同样应用pip安装就可以

$ pip install redis

能出现以下结果即为成功

redis 127.0.0.1:6379>

其他的redis知识这里不左介绍,借使有意思味,可以活动理解

假设你利用RabbitMQ,也请安装RabbitMQ

Celery 定期职责

celery协理定期职责,设定好职责的施行时间,celery就能够定期自动帮你推行, 这一个按时任务模块叫celery beat

写一个本子 叫periodic_task.py

 

from celery import Celery
from celery.schedules import crontab

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)

 

add_periodic_task 会增添一条定期职责

上边是通过调用函数增添按期职责,也得以像写配置文件 一样的花样丰盛, 下边是每30s施行的职务

 

app.conf.beat_schedule = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': 30.0,
        'args': (16, 16)
    },
}
app.conf.timezone = 'UTC'

 

  职责增添好了,需求让celery单独运营一个进程来定期发起那个职务, 注意, 这里是倡议职责,不是推行,这些进度只会持续的去检查你的职分陈设, 每开采有职务必要试行了,就提倡2个职分调用音讯,交给celery worker去推行

起步义务调解器 celery beat

$ celery -A periodic_task beat

输出like below

 

celery beat v4.0.2 (latentcall) is starting.
__    -    ... __   -        _
LocalTime -> 2017-02-08 18:39:31
Configuration ->
    . broker -> redis://localhost:6379//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%WARNING
    . maxinterval -> 5.00 minutes (300s)

 

 

那时还差一步,便是还索要运维一个worker,负担施行celery beat发起的职责

起步celery worker来推行职务

 

$ celery -A periodic_task worker

 -------------- celery@Alexs-MacBook-Pro.local v4.0.2 (latentcall)
---- **** -----
--- * ***  * -- Darwin-15.6.0-x86_64-i386-64bit 2017-02-08 18:42:08
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x104d420b8
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

 

好啊,此时观测worker的出口,是否每隔一小会,就会施行三遍按时任务吗!

注意:Beat needs to store the last run times of the tasks in a local database file (named celerybeat-schedule by default), so it needs access to write in the current directory, or alternatively you can specify a custom location for this file:

$ celery -A periodic_task beat -s /home/celery/var/run/celerybeat-schedule

2.安装celery

安装RabbitMQ

$ sudo apt-get install rabbitmq-server

更复杂的定期配置

# pip3 install celery

使用Celery

上面包车型地铁定期职责相比轻巧,只是每多少s推行3个职责,但一旦你想要每一周一3五的晚上八点给你发邮件如何做吧?哈,其实也差不多,用crontab成效,跟linux自带的crontab效率是同样的,可以本性化定制职务施行时间

三.celery用在django项目中,django项目目录结构(简化)如下

粗略直接选用

能够在须求的地点一直引进Celery,直接动用就可以。最轻便易行的方法只要求配置1个职分和中间人就可以

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/3')

@app.task
def add(x, y):
    return x   y

本身那边运用了redis作为中间件,那是能够按本身的习贯替换的

是因为暗中同意的布置不是最契合我们的品类其实需求,一般的话我们都亟需按大家和好的须要配备部分,
唯独出于须要将品种解耦,也好维护,我们最佳利用单独的二个文件编写制定配置。

Linux crontab  

 

 

from celery.schedules import crontab

app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },
}

 

地点的那条意思是每一周1的上午7.30进行tasks.add职责

还有更加多定期配置格局如下:

Example

  Meaning
crontab() Execute every minute.
crontab(minute=0, hour=0) Execute daily at midnight.
crontab(minute=0, hour='*/3') Execute every three hours: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.
crontab(minute=0,
hour='0,3,6,9,12,15,18,21')
Same as previous.
crontab(minute='*/15') Execute every 15 minutes.
crontab(day_of_week='sunday') Execute every minute (!) at Sundays.
crontab(minute='*',
hour='*',day_of_week='sun')
Same as previous.
crontab(minute='*/10',
hour='3,17,22',day_of_week='thu,fri')
Execute every ten minutes, but only between 3-4 am, 5-6 pm, and 10-11 pm on Thursdays or Fridays.
crontab(minute=0,hour='*/2,*/3') Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
crontab(minute=0, hour='*/5') Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5).
crontab(minute=0, hour='*/3,8-17') Execute every hour divisible by 3, and every hour during office hours (8am-5pm).
crontab(0, 0,day_of_month='2') Execute on the second day of every month.
crontab(0, 0,
day_of_month='2-30/3')
Execute on every even numbered day.
crontab(0, 0,
day_of_month='1-7,15-21')
Execute on the first and third weeks of the month.
crontab(0, 0,day_of_month='11',
month_of_year='5')
Execute on the eleventh of May every year.
crontab(0, 0,
month_of_year='*/3')
Execute on the first month of every quarter.

地点能满意你绝大好些个定时职责必要了,以至还能够依附潮起潮落来布局定期任务, 具体看

website/
|-- deploy
|  |-- admin.py
|  |-- apps.py
|  |-- __init__.py
|  |-- models.py
|  |-- tasks.py
|  |-- tests.py
|  |-- urls.py
|  `-- views.py
|-- manage.py
|-- README
`-- website
  |-- celery.py
  |-- __init__.py
  |-- settings.py
  |-- urls.py
  `-- wsgi.py

单身安排配置文件

比上边的有个别复杂一点,我们要求创建七个文本,3个为config.py的celery配置文件,在里面填写适合大家项目标安插,在创设2个tasks.py文件来编排大家的职务。文件的名字能够按您的喜好和睦取名。

config.py内容为:

# coding=utf-8
# 配置文件同一配置celery
BROKER_URL = 'redis://localhost:6379/3'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/4'

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

# 把“脏活”路由到专用的队列:
CELERY_ROUTES = {
    'tasks.add': 'low-priority',
}

# 限制任务的速率,这样每分钟只允许处理 10 个该类型的任务:
CELERY_ANNOTATIONS = {
    'tasks.add': {'rate_limit': '10/m'}
}

铺排好之后能够用以下命令检查陈设文件是还是不是正确(config为布局文件名)

$ python -m config

tasks.py内容为:

# coding=utf-8
from celery import Celery

app = Celery()
# 参数为配置文件的文件名
app.config_from_object('config')

@app.task
def add(x, y):
    return x   y

再有1种同等设置配置的点子,不是很推荐

app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

在app使用前先需求用上述措施批量更新配备文件。

Celery与django结合

django 能够轻易跟celery结合完成异步任务,只需简单布署就能够

If you have a modern Django project layout like:

- proj/
  - proj/__init__.py
  - proj/settings.py
  - proj/urls.py
- manage.py

then the recommended way is to create a new proj/proj/celery.py module that defines the Celery instance:

file: proj/proj/celery.py 

 

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

 

Then you need to import this app in your proj/proj/__init__.py module. This ensures that the app is loaded when Django starts so that the @shared_task decorator (mentioned later) will use it:

proj/proj/__init__.py: 

  

 

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

 

Note that this example project layout is suitable for larger projects, for simple projects you may use a single contained module that defines both the app and tasks, like in the First Steps with Celery tutorial.

Let’s break down what happens in the first module, first we import absolute imports from the future, so that our celery.py module won’t clash with the library:

from __future__ import absolute_import

Then we set the default DJANGO_SETTINGS_MODULE environment variable for the celery command-line program:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

You don’t need this line, but it saves you from always passing in the settings module to the celery program. It must always come before creating the app instances, as is what we do next:

app = Celery('proj')

This is our instance of the library.

We also add the Django settings module as a configuration source for Celery. This means that you don’t have to use multiple configuration files, and instead configure Celery directly from the Django settings; but you can also separate them if wanted.

The uppercase name-space means that all Celery configuration options must be specified in uppercase instead of lowercase, and start with CELERY_, so for example the task_always_eager` setting becomes CELERY_TASK_ALWAYS_EAGER, and the broker_url setting becomes CELERY_BROKER_URL.

You can pass the object directly here, but using a string is better since then the worker doesn’t have to serialize the object.

app.config_from_object('django.conf:settings', namespace='CELERY')

Next, a common practice for reusable apps is to define all tasks in a separate tasks.pymodule, and Celery does have a way to  auto-discover these modules:

  app.autodiscover_tasks()

With the line above Celery will automatically discover tasks from all of your installed apps, following the tasks.py convention:

  

- app1``/

``- tasks.py

``- models.py

- app2``/

``- tasks.py

``- models.py

Finally, the debug_task example is a task that dumps its own request information. This is using the new bind=True task option introduced in Celery 3.1 to easily refer to the current task instance.

下一场在具体的app里的tasks.py里写你的天职

 

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task


@shared_task
def add(x, y):
    return x   y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)

 

 

在你的django views里调用celery task

 

from django.shortcuts import render,HttpResponse

# Create your views here.

from  bernard import tasks

def task_test(request):

    res = tasks.add.delay(228,24)
    print("start running task")
    print("async task res",res.get() )

    return HttpResponse('res %s'%res.get())

 

 

4.创建 website/celery.py 主文件

在使用上选用

工程目录结构为

proj/
    __init__.py
    # 存放配置和启动celery代码
    celery.py
    # 存放任务
    tasks.py

celery.py为:

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
             broker='redis://localhost:6379/3',
             backend='redis://localhost:6379/4',
             include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

tasks.py为:

from __future__ import absolute_import, unicode_literals
from .celery import app


@app.task
def add(x, y):
    return x   y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

起头celery只需求在proj同级目录下:

$ celery -A proj worker -l info

在django中使用安顿任务功用

There’s  the django-celery-beat extension that stores the schedule in the Django database, and presents a convenient admin interface to manage periodic tasks at runtime.

To install and use this extension:

  1. Use pip to install the package:

    $ pip install django-celery-beat
    
  2. Add the django_celery_beat module to INSTALLED_APPS in your Django project’ settings.py:

        INSTALLED_APPS = (
            ...,
            'django_celery_beat',
        )
    
    Note that there is no dash in the module name, only underscores.
    
  3. Apply Django database migrations so that the necessary tables are created:

    $ python manage.py migrate
    
  4. Start the celery beat service using the django scheduler:

    $ celery -A proj beat -l info -S django
    
  5. Visit the Django-Admin interface to set up some periodic tasks.

 

在admin页面里,有3张表

澳门新浦京娱乐场网站 12

布署完长那样

澳门新浦京娱乐场网站 13

 

 

那儿运行你的celery beat 和worker,会意识每隔贰分钟,beat会发起一个职分新闻让worker实行scp_task任务

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'website.settings')

app = Celery('website')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#  should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

# 允许root 用户运行celery
platforms.C_FORCE_ROOT = True

@app.task(bind=True)
def debug_task(self):
  print('Request: {0!r}'.format(self.request))

在django中使用celery

咱俩的django的品种的目录结构相似如下

proj/
    manage.py
    myapp/
    proj/
        __init__py
        settings.py
        urls.py
        wsgi.py

想要在django项目中利用celery,我们第1须要在django中配置celery

大家供给在与工程名同名的子文件夹中增加celery.py文件
在本例中也等于proj/proj/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
# 第二个参数为工程名.settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

# 括号里的参数为工程名
app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
# 配置文件需要写在setting.py中,并且配置项需要使用`CELERY_`作为前缀
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
# 能够自动加载所有在django中注册的app,也就是setting.py中的INSTALLED_APPS
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

然后大家需求在同级目录下的**init.py文件中配置如下内容 proj/proj/init.py**

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

接下来我们就可以把要求的职责放到必要的app下的tasks.py中,现在项目目录结构如下

proj/
    manage.py
    myapp1/
        __init__.py
        tasks.py
        views.py
        model.py
        tests.py
    myapp2/
        __init__.py
        tasks.py
        views.py
        model.py
        tests.py
    proj/
        __init__py
        settings.py
        urls.py
        wsgi.py

想必的1个tasks.py文件内容如下:
myapp1/tasks.py为:

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time


@shared_task
def add(x, y):
    # 为了测试是否是异步,特意休眠5s,观察是否会卡主主进程
    time.sleep(5)
    print(x y)
    return x   y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)

@shared_task修饰器可以令你创立task无需app实体

在急需的地方调用相关职务就可以,比方在myapp1/views.py中调用

from django.shortcuts import render
from .tasks import add


def index(request):
    # 测试celery任务
    add.delay(4,5)
    return render(request,'index.html')

接下来就能够运营项目,celery需求单独运转,所以供给开四个顶峰,分别

运转web应用服务器

$ python manage.py runserver

启动celery

$ celery -A proj worker -l info

接下来访问浏览器就能够在起步celery的终点中看看输出

澳门新浦京娱乐场网站 14

测试结果

5.在 website/__init__.py 文件中扩充如下内容,确认保障django运行的时候那个app能够被加载到

扩展

  • 假诺你的花色供给在admin中管理调解,请使用django-celery-beat
  1. 使用pip安装django-celery-beat
$ pip install django-celery-beat

无须在使用django-celery,这几个类型已经告1段落更新好好些个年。。。。

  1. 在settings.py中加多那么些app
INSTALLED_APPS = (
    ...,
    'django_celery_beat',
)
  1. 同台一下数据库
$ python manage.py migrate
  1. 设置celery beat劳务应用django_celery_澳门新浦京娱乐场网站,beat.schedulers:DatabaseScheduler scheduler
$ celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

下一场在就足以admin分界面看到了。

  • 假使您想选拔Django-OMuranoM大概Django Cache作为后端,供给安装django-celery-results增添(我不提议)
  1. 使用pip安装django-celery-results
$ pip install django-celery-results

不用在应用django-celery,这一个种类曾经终止更新好大多年。。。。

  1. 在settings.py中增加那些app
INSTALLED_APPS = (
    ...,
    'django_celery_results',
)
  1. 协助实行一下数据库
$ python manage.py migrate django_celery_results
  1. 配备后端,在settings.py中安排
# 使用数据库作为结果后端
CELERY_RESULT_BACKEND = 'django-db'

# 使用缓存作为结果后端
CELERY_RESULT_BACKEND = 'django-cache'

主题选择差不离正是上述那么些,其余实际安插和平运动用还需协应用研讨读法定文书档案

注:

  • 上述条件在ubuntu16.04 lts django1.9中搭建测试成功
  • 上述文字皆为民用见解,如有错误或提出请立时调换小编
from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

陆.各使用创造tasks.py文件,这里为 deploy/tasks.py

from __future__ import absolute_import
from celery import shared_task

@shared_task
def add(x, y):
  return x   y

在意tasks.py必须建在各app的根目录下,且不得不叫tasks.py,不能随便命名

7.views.py中援引使用那么些tasks异步管理

from deploy.tasks import add

def post(request):
  result = add.delay(2, 3)


result.ready()
result.get(timeout=1)
result.traceback

8.启动celery

# celery -A website worker -l info

9.这么在调用post这些方法时,里边的add就足以异步管理了

定时任务

定期职责的运用情况就很广阔了,譬如本身索要定时发送报告给主管娘~

定期职务布署

  1. website/celery.py 文件增加如下配置以支撑定期义务crontab
from celery.schedules import crontab

app.conf.update(
  CELERYBEAT_SCHEDULE = {
    'sum-task': {
      'task': 'deploy.tasks.add',
      'schedule': timedelta(seconds=20),
      'args': (5, 6)
    }
    'send-report': {
      'task': 'deploy.tasks.report',
      'schedule': crontab(hour=4, minute=30, day_of_week=1),
    }
  }
)

概念了多个task:

  • 名称叫'sum-task'的task,每20秒推行三次add函数,并传了五个参数伍和陆
  • 名为'send-report'的task,每一周1上午四:30进行report函数

timedelta是datetime中的多少个目的,供给 from datetime import timedelta 引进,有如下多少个参数

  • days
  • seconds
  • microseconds
  • milliseconds
  • minutes
  • hours

crontab的参数有:

month_of_year
day_of_month
day_of_week
hour
minute

  1. deploy/tasks.py 文件增添report方法:
@shared_task
def report():
  return 5

叁.起步celery beat,celery运转了七个beat进程一向在不断的判断是还是不是有任务急需试行

# celery -A website beat -l info

Tips

1.例如你同时利用了异步任务和铺排职务,有1种更轻松的运转方式 celery -A website worker -b -l info ,可同时开动worker和beat

2.假使选拔的不是rabbitmq做队列那么需求在主配置文件中 website/celery.py 配置broker和backend,如下:

# redis做MQ配置
app = Celery('website', backend='redis', broker='redis://localhost')
# rabbitmq做MQ配置
app = Celery('website', backend='amqp', broker='amqp://admin:admin@localhost')

三.celery不能够用root用户运转的话供给在主配置文件中加多 platforms.C_FORCE_ROOT = True

四.celery在长日子运作后大概出现内部存款和储蓄器泄漏,须要加上配置 CELERYD_MAX_TASKS_PER_CHILD = 10 ,表示每种worker实践了多少个职分就死掉

如上便是本文的全体内容,希望对我们的就学抱有帮忙,也意在大家多多协理脚本之家。

您恐怕感兴趣的小说:

  • 异步职分队列Celery在Django中的使用格局
  • Django使用Celery异步职责队列的采纳
  • Django中使用celery完结异步义务的亲自去做代码

本文由澳门新浦京娱乐场网站发布于www.146.net,转载请注明出处:澳门新浦京娱乐场网站:django2定时任务,Celery分