Celery+RabbitMQ的多机器worker节点介绍

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。由于在工作的平台中用到Celery系统(用于发送邮件、发送短信等任务),记录一下学习的知识。还有就是“百毒”上关于Celery的博客,基本都没有多机器worker节点怎么部署的介绍,所以有这方面疑惑的童鞋可以参考之。如有疑问请联系我,大家共同学习!

安装

1.Celery Borker:

使用RabbitMQ作为Celery Borker的优点:

  • Highly customizable routing
  • Persistent queues

使用Redis作为Celery Borker的优点:

  • high speed due to in memory datastore
  • can double up as both key-value datastore and job queue

Celery官网首推RabbitMQ,博主这里也使用了RabbitMQ作为Celery Borker
关于RabbitMQ的入门介绍有一篇非常好的译文,强烈推荐:http://simple-is-better.com/news/353
RabbitMQ的安装可以直接参考RabbitMQ的官网,这里以ubuntu平台作为安装示例如下:

1
sudo apt-get install rabbitmq-server

安装完成后,新增一个RabbitMQ用户(用户名:artcm,密码:111)

1
sudo rabbitmqctl add_user artcm 111

设置RabbitMQ用户角色

1
sudo rabbitmqctl set_user_tags artcm administrator

启动RabbitMQ的web监控(访问http://127.0.0.1:15672/即可查看RabbitMQ服务的各种信息)

1
sudo rabbitmq-plugins enable rabbitmq_management

2.Celery:

1
pip install celery

3.celery web监控:

1
pip install flower

原理介绍

mahua

介绍:
带有路由键key1的任务消息请求celery执行,首先交换机根据绑定的规则把任务消息放到指定的队列中去,此队列对应的worker节点取出任务执行

说明:
(1)为什么要定义多个交换机?
每个交换机都会新建一个进程,充分利用服务器多核,提高执行效率
另外,多个交换机可扩展性强

(2)同一个服务器可启动多个worker节点?
是的,启动的时候加上不同的–hostname参数即可

(3)celery默认会创建一个celery交换机、绑定、队列,没有匹配任何绑定的任务会发送到此celery队列中

(4)交换机绑定的方式(原理介绍图中使用的是Direct Exchange)

  • Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
  • Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
  • Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.” 只会匹配到“audit.irs”。

构建Celery+RabbitMQ的多机器worker节点实例

(1)准备:
假如我们现在有四台服务器,地址分别如下:
192.168.1.190
192.168.1.191
192.168.1.192
192.168.1.193

我们把RabbitMQ服务放在193那台服务器上
我们把flower服务也启动在193那台服务器上
代码中定义的队列有queue_add_reduce、queue_sum (还有个默认队列celery)
190、191、192服务器用于启动worker节点

190服务器上启动处理celery和queue_add_reduce队列的worker节点
191服务器上启动处理celery和queue_sum队列的worker节点
192服务器上启动处理queue_add_reduce和queue_sum队列的worker节点

(2)celery项目文件结构
mahua

(3)代码(在190、191、192服务器都得有celeryservice项目代码)
celery配置文件./celeryservice/celeryconfig.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# -*- coding: utf-8 -*-
import datetime
from celery.schedules import crontab
from kombu import Exchange, Queue
####################################
# Broker settings #
####################################
BROKER_URL = 'amqp://artcm:111@192.168.1.193:5672//'
# BROKER_URL = 'redis://localhost:6379/1'
BROKER_POOL_LIMIT = 10 # 默认celery与borker的连接池连接数
# List of modules to import when celery starts.
CELERY_IMPORTS = ('celeryservice.tasks', )
# 存储revokes状态(Persistent revokes)
CELERYD_STATE_DB = './celeryservice/celerymain/celery_revokes_state_db'
####################################
# 默认task结果存储配置 #
# 建议:存储结果会浪费很多资源,业务上非必须的话,不存储结果
####################################
# CELERY_IGNORE_RESULT = True # celery结果忽略
## Using the database to store task state and results.
# CELERY_RESULT_BACKEND = 'mongodb://192.168.1.121:27017/'
# CELERY_MONGODB_BACKEND_SETTINGS = {
# 'database': 'celery_backend',
# 'taskmeta_collection': 'task_result',
# 'max_pool_size':10,
# }
####################################
# 默认频次限制配置 #
# 建议:频次内部实现灰常复杂,最好不要使用
####################################
CELERY_DISABLE_RATE_LIMITS = True # 不使用频次限制
# '1/s'每秒,'1/m'每分钟, '1/h'每小时 (每个task每秒最多执行频次,多出的任务为received状态,等待一段时间后执行,特别注意:这样就有可能导致有的任务会延迟执行)
# CELERY_DEFAULT_RATE_LIMIT = '10000/m'
####################################
# 一般配置 #
####################################
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True
# CELERY_MESSAGE_COMPRESSION = 'gzip' # 如果发送的tasks参数信息量较大,则使用压缩传输
####################################
# 默认log配置 #
####################################
CELERYD_LOG_FILE="./celeryservice/celerymain/celery.log"
####################################
# 默认邮件配置 #
# 注意:如果是task中retry了,最后failure,不会发出邮件(待研究) #
####################################
CELERY_SEND_TASK_ERROR_EMAILS = True #celery接收错误邮件
ADMINS = (
("minkedong", "190926234@qq.com"), # celery接收错误邮件地址
)
SERVER_EMAIL = "minkedong89@126.com" # 从哪里发送的错误地址
EMAIL_HOST = "smtp.126.com"
EMAIL_HOST_USER = SERVER_EMAIL
EMAIL_HOST_PASSWORD = 'xxxxxxxx'
EMAIL_PORT = 25
EMAIL_USE_SSL = False
EMAIL_USE_TLS = False
EMAIL_TIMEOUT = 2 # 2秒
####################################
# routing #
# 需要了解一下rabbitMQ的“队列””交换机“”绑定“的概念:http://simple-is-better.com/news/353
####################################
##### 队列属性定义 ####
# 相当于定义RabbitMQ中的"队列、交换机、绑定"机制(跟celery无关)(可以使用同一个交换机绑定多个队列)
# 没有在CELERY_ROUTES中的task,默认发送到"celery"队列中
CELERY_QUEUES = (
Queue('queue_add_reduce', exchange=Exchange('calculate_exchange', type='direct'), routing_key='key1'),
Queue('queue_sum', exchange=Exchange('calculate_exchange', type='direct'), routing_key='key2'),
Queue('celery', Exchange('celery'), routing_key='celery'), # 里面使用的都是默认参数值
)
CELERY_DEFAULT_QUEUE = 'celery'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_DEFAULT_ROUTING_KEY = 'celery'
###### 路由:taks对应发送的队列及使用的routing_key #####
# delivery_mode参数(决定tasks发送到RabbitMQ后,是否存储到磁盘中)(celery默认使用2:持久化方式):
# 1表示rabbitmq不存储celery发送的tasks到磁盘,RabbitMQ重启后,任务丢失(建议使用这种方式)
# 2表示rabbitmq可以存储celery发送的tasks到磁盘,RabbitMQ重启后,任务不会丢失(磁盘IO资源消耗极大,影响性能)
CELERY_ROUTES = {
'celeryservice.tasks.add': {'queue': 'queue_add_reduce', 'routing_key': 'key1', 'delivery_mode': 1},
'celeryservice.tasks.reduce': {'queue': 'queue_add_reduce', 'routing_key': 'key1', 'delivery_mode': 1},
'celeryservice.tasks.sum_all': {'queue': 'queue_sum', 'routing_key': 'key2', 'delivery_mode': 1},
}

任务文件./celeryservice/tasks.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import os
import sys
import datetime
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
from celery import Celery
from celery import chain, group, chord, Task
from celeryservice import celeryconfig
app = Celery()
app.config_from_object(celeryconfig)
__all__ = ['add', 'reduce','sum_all', 'other']
####################################
# task定义 #
####################################
@app.task
def add(x, y):
return x + y
@app.task
def reduce(x, y):
return x - y
@app.task
def sum_all(values):
return sum([int(value) for value in values])
@app.task
def other(x, y):
return x * y

flower配置文件./celeryservice/flower/flowerconfig.py

1
2
3
4
5
6
7
8
9
10
11
12
# -*- coding: utf-8 -*-
"""
flower启动命令使用的配置文件
"""
address = '0.0.0.0' # 保证外网可以访问
port = 5555
basic_auth = ['mkd:mkd','artcm:111'] # 用户名:密码
persistent = True # 持久化celery tasks(如果为False的话,重启flower后,监控的tasks全部消失了!)
db = './celeryservice/flower/flowerdb' # 持久化tasks存储的地方

(4)启动(在celeryservice同级目录上运行以下命令)
190服务器上启动处理celery和queue_add_reduce队列的worker节点:

1
celery worker -A celeryservice.tasks --loglevel=info --queues=celery,queue_add_reduce --hostname=celery_worker190 --pidfile=./celeryservice/celerymain/celery_worker.pid >/dev/null 2>&1 &

191服务器上启动处理celery和queue_sum队列的worker节点:

1
celery worker -A celeryservice.tasks --loglevel=info --queues=celery,queue_sum --hostname=celery_worker191 --pidfile=./celeryservice/celerymain/celery_worker.pid >/dev/null 2>&1 &

192服务器上启动处理queue_add_reduce和queue_sum队列的worker节点:

1
celery worker -A celeryservice.tasks --loglevel=info --queues=queue_add_reduce,queue_sum --hostname=celery_worker192 --pidfile=./celeryservice/celerymain/celery_worker.pid >/dev/null 2>&1 &

我们把flower服务也启动在193那台服务器上(可通过http://192.168.1.193:5555/访问):

1
celery flower --broker=amqp://artcm:111@192.168.1.193:5672// --conf=./celeryservice/flower/flowerconfig.py >/dev/null 2>&1 &

(4)运行测试

在任一台有celeryservice项目代码的服务器上,运行add、reduce、sum_all、other任务(测试可简单使用add.delay(1,2)等)

add和reduce任务,可能会在190或192服务器的worker节点运行
sum_all任务,可能会在191或192服务器的worker节点运行
other任务,可能会在190或191服务器的worker节点运行

Celery配置建议

  • task定义代码中不要再发送另外的task(上一个task输出结果作为下一个task输入),
    导致task相互依赖等待结果,运行有可能会有问题(最好使用Canvas Designing Workflows)

  • 使用Persistent revokes,在celeryconfig.py中定义CELERYD_STATE_DB

  • 根据实际业务场景,调整celeryconfig.py中的BROKER_POOL_LIMIT(celery与borker的连接池连接数)

  • 发送到rabbitmq的tasks,使用delivery_mode=1的方式(不存储tasks信息到磁盘),见CELERY_ROUTES设定

  • 注意设置celery的worker预取值“CELERYD_PREFETCH_MULTIPLIER“,如果你的task是很耗时的任务,最好设置为1,避免造成队列拥堵。如果你的task是非耗时的任务,则可根据实际情况调大此值,提高吞吐量

  • 最好设置task的软超时时间”CELERYD_TASK_SOFT_TIME_LIMIT“,如果任务超过此时间没有执行完成,则会报错celery.exceptions.SoftTimeLimitExceeded(可在代码中捕获做相应业务处理),避免造成队列拥堵

  • 一定要设置“CELERYD_MAX_TASKS_PER_CHILD”,此值表示每个worker工作进程在执行了多少task之后便重新建立worker进程,解决celery的内存泄漏问题

坚持原创技术分享,您的支持将鼓励我继续创作!

热评文章