初识分布式队列Celery

作者: admin 分类: 开发那些事 发布时间: 2018-06-10 12:44

认识Celery

Celery是什么?

Celery 是一个由 Python 编写的简单、灵活、可靠的用来处理大量信息的分布式系统,它同时提供操作和维护分布式系统所需的工具。
Celery 专注于实时任务处理,支持任务调度。
说白了,它是一个分布式队列的管理工具,我们可以用 Celery 提供的接口快速实现并管理一个分布式的任务队列。

官网对Celery的简介

Celery: Distributed Task Queue
Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well.

The execution units, called tasks, are executed concurrently on a single or more worker servers using multiprocessing, Eventlet, or gevent. Tasks can execute asynchronously (in the background) or synchronously (wait until ready).

Celery is used in production systems to process millions of tasks a day.

解释

Celery: 分布式任务队列
Celery是基于分布式消息传递的异步任务队列/作业队列。它专注于实时操作,但也支持调度。

执行单元称为任务,在使用多处理,Eventlet或gevent的单个或多个工作服务器上同时执行。任务可以异步执行(在后台)或同步执行(等待直到准备就绪)。

Celery生产系统中用于每天处理数百万个任务。

Celery相关概念

开始Celery前需要先了解一些基本的概念

Brokers

队列本身,排队的任务需要有地方存储,如:Redis
Celery基本处理流程:生产任务——>队列(Brokers)——>消费任务
常见的 brokers 有 RabbitMQ、Redis、Amazon SQS、Zookeeper

Result Stores / backend

Celery处理完任务后存储结果的地方(Result Stores),让任务发送者知道该结果。

Celery内置存储方式:SQLAlchemy / Django ORM, Memcached,Redis,RPC(RabbitMQ / AMQP),或者也可以定义自己的

Workers

从队列取出任务并执行

Tasks

任务本身

Celery基本使用

安装Redis服务:
https://redis.io/download

安装celery:

pip install celery
pip install redis

安装flower:
celery可视化管理工具

pip install flower

目录

celery_test # 根目录
    ├── proj
    │   ├── celery.py   # 服务启动  
    │   └── tasks.py        # 任务处理

创建服务

#celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
             broker='redis://@localhost:6379/10',
             backend='redis://@localhost:6379/10',
             include=['proj.tasks'])

app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

创建task

# tasks.py
from __future__ import absolute_import, unicode_literals
from .celery import app


@app.task
def add(x, y):
    return x + y

至此,broker、backend、task已经配置完成,在根目录运行命令:

celery -A proj worker -l info

运行结果:

➜  celery_test celery -A proj worker -l info                                                                    

 -------------- celery@localhost v4.1.0 (latentcall)
---- **** ----- 
--- * ***  * -- Darwin-16.7.0-x86_64-i386-64bit 2018-05-29 07:04:53
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         proj:0x102923518
- ** ---------- .> transport:   redis://localhost:6379/10
- ** ---------- .> results:     redis://localhost:6379/10
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> common_check     exchange=route_check(direct) key=common_check
                .> default          exchange=default(direct) key=default
                .> route_check      exchange=route_check(direct) key=route_check
                .> route_check_ignore_result exchange=route_check(direct) key=route_check_ignore_result

[tasks]
  . proj.tasks.add

[2018-05-29 07:04:53,527: INFO/MainProcess] Connected to redis://localhost:6379/10
[2018-05-29 07:04:53,569: INFO/MainProcess] mingle: searching for neighbors
[2018-05-29 07:04:54,621: INFO/MainProcess] mingle: sync with 1 nodes
[2018-05-29 07:04:54,621: INFO/MainProcess] mingle: sync complete
[2018-05-29 07:04:54,703: INFO/MainProcess] celery@localhost ready.
[2018-05-29 07:04:56,526: WARNING/MainProcess] Substantial drift from celery@worker1 may mean clocks are out of sync.  Current drift is
196 seconds.  [orig: 2018-05-29 07:04:56.525915 recv: 2018-05-29 07:08:12.599086]

[2018-05-29 07:04:57,777: INFO/MainProcess] Events of group {task} enabled by remote.

celery已经运行起来了,为了便于任务的查看,再运行flower,任务运行可视化!

运行flower

celery flower -A proj

运行结果:

 ➜  celery_test celery flower -A proj                                                                            
[I 180529 07:12:53 command:139] Visit me at http://localhost:5555
[I 180529 07:12:53 command:144] Broker: redis://localhost:6379/10
[I 180529 07:12:53 command:147] Registered tasks: 
    ['celery.accumulate',
     'celery.backend_cleanup',
     'celery.chain',
     'celery.chord',
     'celery.chord_unlock',
     'celery.chunks',
     'celery.group',
     'celery.map',
     'celery.starmap',
     'proj.tasks.add',
     'proj.tasks.mul',
     'proj.tasks.xsum']
[I 180529 07:12:53 mixins:224] Connected to redis://localhost:6379/10
[W 180529 07:12:55 state:112] Substantial drift from celery@worker1 may mean clocks are out of sync.  Current drift is
    196 seconds.  [orig: 2018-05-29 07:12:55.493550 recv: 2018-05-29 07:16:11.581825]

打开网址即可查看任务执行情况:http://localhost:5555
当然,现在还没有任务,接下来开始生产任务

# test.py
from proj.tasks import add
import time

res = add.apply_async((1, 2))
while not res.ready():
    time.sleep(1)
print('task done: {0}'.format(res.get()))

执行脚本触发任务

python test.py

等待处理完成,
flower可以查看任务处理情况!
至此,简单的celery应用即完成咯!

(完)

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

发表评论

电子邮件地址不会被公开。 必填项已用*标注