Celery 与 Redis、RabbitMQ

celery是一个基于python开发的简单、灵活且可靠的分布式任务队列框架,支持使用任务队列的方式在分布式的机器/进程/线程上执行任务调度。采用典型的生产者-消费者模型,主要由三部分组成:
消息队列broker:broker实际上就是一个MQ队列服务,可以使用redis、rabbitmq等作为broker
处理任务的消费者workers:broker通知worker队列中有任务,worker去队列中取出任务执行,每一个worker就是一个进程
存储结果的backend:执行结果存储在backend,默认也会存储在broker使用的MQ队列服务中,也可以单独配置用何种服务做backend

1.安装celery、eventlet

(py3) [root@netman 2019]# pip install celery
Collecting celery
  Downloading https://files.pythonhosted.org/packages/47/53/6e2ccc87b18ddc582d2de31b3ed9144c72a68062659e9e4a68e19312d254/celery-4.4.0-py2.py3-none-any.whl (421kB)
     |████████████████████████████████| 430kB 190kB/s
Collecting billiard<4.0,>=3.6.1
  Downloading https://files.pythonhosted.org/packages/9a/bb/2a016ac912fca48e06ff5a662407f3d1681aa47df97fb19feba7cc931ee1/billiard-3.6.1.0-py3-none-any.whl (89kB)
     |████████████████████████████████| 92kB 473kB/s
Requirement already satisfied: pytz>dev in /opt/py3/lib/python3.6/site-packages (from celery) (2018.9)
Collecting vine==1.3.0
  Downloading https://files.pythonhosted.org/packages/7f/60/82c03047396126c8331ceb64da1dc52d4f1317209f32e8fe286d0c07365a/vine-1.3.0-py2.py3-none-any.whl
Collecting kombu<4.7,>=4.6.7
  Downloading https://files.pythonhosted.org/packages/96/94/b899b1d962f6b9ca0171a4796ca086c9785f0cc0b7705562e0a70d144910/kombu-4.6.7-py2.py3-none-any.whl (182kB)
     |████████████████████████████████| 184kB 184kB/s
Collecting importlib-metadata>=0.18; python_version < "3.8"
  Downloading https://files.pythonhosted.org/packages/e9/71/1a1e0ed0981bb6a67bce55a210f168126b7ebd2065958673797ea66489ca/importlib_metadata-1.3.0-py2.py3-none-any.whl
Collecting amqp<2.6,>=2.5.2
  Downloading https://files.pythonhosted.org/packages/fc/a0/6aa2a7923d4e82dda23db27711d565f0c4abf1570859f168e3d0975f1eb6/amqp-2.5.2-py2.py3-none-any.whl (49kB)
     |████████████████████████████████| 51kB 242kB/s
Collecting zipp>=0.5
  Downloading https://files.pythonhosted.org/packages/74/3d/1ee25a26411ba0401b43c6376d2316a71addcc72ef8690b101b4ea56d76a/zipp-0.6.0-py2.py3-none-any.whl
Collecting more-itertools
  Downloading https://files.pythonhosted.org/packages/68/03/0604cec1ea13c9f063dd50f900d1a36160334dd3cfb01fd0e638f61b46ba/more_itertools-8.0.2-py3-none-any.whl (40kB)
     |████████████████████████████████| 40kB 243kB/s
Installing collected packages: billiard, vine, more-itertools, zipp, importlib-metadata, amqp, kombu, celery
Successfully installed amqp-2.5.2 billiard-3.6.1.0 celery-4.4.0 importlib-metadata-1.3.0 kombu-4.6.7 more-itertools-8.0.2 vine-1.3.0 zipp-0.6.0


(py3) [root@netman 2019]# pip install eventlet
Collecting eventlet
  Downloading https://files.pythonhosted.org/packages/ef/01/83454d11bac9015f34e93cf11efcae169390ddf2df97cb73ca86de6465ed/eventlet-0.25.1-py2.py3-none-any.whl (222kB)
     |████████████████████████████████| 225kB 90kB/s
Collecting greenlet>=0.3
  Downloading https://files.pythonhosted.org/packages/bf/45/142141aa47e01a5779f0fa5a53b81f8379ce8f2b1cd13df7d2f1d751ae42/greenlet-0.4.15-cp36-cp36m-manylinux1_x86_64.whl (41kB)
     |████████████████████████████████| 51kB 133kB/s
Collecting monotonic>=1.4
  Downloading https://files.pythonhosted.org/packages/ac/aa/063eca6a416f397bd99552c534c6d11d57f58f2e94c14780f3bbf818c4cf/monotonic-1.5-py2.py3-none-any.whl
Requirement already satisfied: six>=1.10.0 in /opt/py3/lib/python3.6/site-packages (from eventlet) (1.12.0)
Collecting dnspython>=1.15.0
  Downloading https://files.pythonhosted.org/packages/ec/d3/3aa0e7213ef72b8585747aa0e271a9523e713813b9a20177ebe1e939deb0/dnspython-1.16.0-py2.py3-none-any.whl (188kB)
     |████████████████████████████████| 194kB 109kB/s
Installing collected packages: greenlet, monotonic, dnspython, eventlet
Successfully installed dnspython-1.16.0 eventlet-0.25.1 greenlet-0.4.15 monotonic-1.5
(py3) [root@netman 2019]#

2.编写tasks.py

from celery import Celery

brokers='redis://:redis@2019@127.0.0.1:6379/5'
backend='redis://:redis@2019@127.0.0.1:6379/6'

app = Celery('tasks', broker=brokers, backend=backend)
@app.task
def add(x,y):
    return x+y

3.启动

# celery -A tasks worker --loglevel=info
/iron/python_env/lib/python3.6/site-packages/celery/platforms.py:801: RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,

 -------------- celery@netman v4.4.0 (cliffs)
--- ***** ----- 
-- ******* ---- Linux-2.6.32-431.el6.x86_64-x86_64-with-centos-6.5-Final 2019-12-31 15:29:52
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7f944f7aaa90
- ** ---------- .> transport:   redis://:**@127.0.0.1:6379/5
- ** ---------- .> results:     redis://:**@127.0.0.1:6379/6
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.add

[2019-12-31 15:29:52,962: INFO/MainProcess] Connected to redis://:**@127.0.0.1:6379/5
[2019-12-31 15:29:52,974: INFO/MainProcess] mingle: searching for neighbors
[2019-12-31 15:29:53,996: INFO/MainProcess] mingle: all alone
[2019-12-31 15:29:54,005: INFO/MainProcess] celery@netman ready.

4. Ipython交互环境

# ipython 
Python 3.6.8 (default, Apr  7 2019, 12:01:10) 
Type 'copyright', 'credits' or 'license' for more information
IPython 7.11.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: from tasks import add

In [2]: result = add.delay(23, 79)

In [3]: result.ready()
Out[3]: True

In [4]: result.get()
Out[4]: 102

In [5]:

5. 后台响应日志

[2019-12-31 15:31:04,361: INFO/MainProcess] Received task: tasks.add[f1fb997c-c05a-4946-86bc-77d511bd213d]
[2019-12-31 15:31:04,371: INFO/ForkPoolWorker-1] Task tasks.add[f1fb997c-c05a-4946-86bc-77d511bd213d] succeeded in 0.007257293909788132s: 102

# redis-cli 
127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379> SELECT 5
OK
127.0.0.1:6379[5]> keys *
1) "_kombu.binding.celery"
2) "_kombu.binding.celery.pidbox"
3) "_kombu.binding.celeryev"
4) "unacked_mutex"
127.0.0.1:6379[5]>

127.0.0.1:6379[5]> SELECT 6
OK
127.0.0.1:6379[6]> keys *
1) "celery-task-meta-7aabff69-e5b1-4d8e-b51e-0639f878034c"
127.0.0.1:6379[6]> get celery-task-meta-7aabff69-e5b1-4d8e-b51e-0639f878034c
"{\"status\": \"SUCCESS\", \"result\": 77, \"traceback\": null, \"children\": [], \"date_done\": \"2019-12-31T09:20:04.146564\", \"task_id\": \"7aabff69-e5b1-4d8e-b51e-0639f878034c\"}"
127.0.0.1:6379[6]>

6. Celery 与 RabbitMQ


- Broker:标识消息队列服务器实体.
- Virtual Host:虚拟主机。标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。
- Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
- Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
- Banding:绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
- Channel:信道,多路复用连接中的一条独立的双向数据流通道。新到是建立在真实的TCP连接内地虚拟链接,AMQP命令都是通过新到发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。
- Connection:网络连接,比如一个TCP连接。
- Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
- Consumer:消息的消费者,表示一个从一个消息队列中取得消息的客户端应用程序。
- Message:消息,消息是不具名的,它是由消息头和消息体组成。消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息可能需要持久性存储[消息的路由模式])等。

6.1 创建任务:taskmq.py

from celery import Celery

brokers = 'amqp://admin:rabbit@2019@10.20.1.250/'

app = Celery('tasks', broker=brokers, backend='amqp')
@app.task
def add(x,y):
    return x+y

6.2 使用Celery参数启动程序

# celery -A taskmq worker --loglevel=info
/iron/python_env/lib/python3.6/site-packages/celery/platforms.py:801: RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,
/iron/python_env/lib/python3.6/site-packages/celery/backends/amqp.py:67: CPendingDeprecationWarning: 
    The AMQP result backend is scheduled for deprecation in     version 4.0 and removal in version v5.0.     Please use RPC backend or a persistent backend.

  alternative='Please use RPC backend or a persistent backend.')

 -------------- celery@netman v4.4.0 (cliffs)
--- ***** -----
-- ******* ---- Linux-2.6.32-431.el6.x86_64-x86_64-with-centos-6.5-Final 2019-12-31 16:39:51
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7f1b49a77b00
- ** ---------- .> transport:   amqp://admin:**@10.20.1.250:5672//
- ** ---------- .> results:     amqp://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . taskmq.add

[2019-12-31 16:39:51,243: INFO/MainProcess] Connected to amqp://admin:**@10.20.1.250:5672//
[2019-12-31 16:39:51,256: INFO/MainProcess] mingle: searching for neighbors
[2019-12-31 16:39:52,279: INFO/MainProcess] mingle: all alone
[2019-12-31 16:39:52,296: INFO/MainProcess] celery@netman ready.

6.3 RabbitMQ后台连接

6.4 进入Ipython交互

# ipython
Python 3.6.8 (default, Apr  7 2019, 12:01:10) 
Type 'copyright', 'credits' or 'license' for more information
IPython 7.11.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: from taskmq import add

In [2]: result = add.delay(38,91)

In [3]: result
Out[3]: <AsyncResult: 18e9a25e-c4fa-4995-a006-5677d2f4fd7a>

In [4]: result.ready()
Out[4]: True

In [5]: result.get
Out[5]: <bound method AsyncResult.get of <AsyncResult: 18e9a25e-c4fa-4995-a006-5677d2f4fd7a>>

In [6]: result.get()
Out[6]: 129

6.5 后台日志

[tasks]
  . taskmq.add

[2019-12-31 16:39:51,243: INFO/MainProcess] Connected to amqp://admin:**@10.20.1.250:5672//
[2019-12-31 16:39:51,256: INFO/MainProcess] mingle: searching for neighbors
[2019-12-31 16:39:52,279: INFO/MainProcess] mingle: all alone
[2019-12-31 16:39:52,296: INFO/MainProcess] celery@netman ready.
[2019-12-31 16:50:53,812: INFO/MainProcess] Received task: taskmq.add[18e9a25e-c4fa-4995-a006-5677d2f4fd7a]
[2019-12-31 16:50:53,839: INFO/ForkPoolWorker-2] Task taskmq.add[18e9a25e-c4fa-4995-a006-5677d2f4fd7a] succeeded in 0.024155164137482643s: 129
点赞

发表评论

电子邮件地址不会被公开。必填项已用 * 标注