Celery分布式任务队列实战指南
竞品对比
在Celery之外,还有几个值得关注的任务调度框架:
- Taskiq - 专为FastAPI设计的轻量级异步任务框架
https://taskiq-python.github.io/ - Funboost - 国产功能强大的任务调度框架
https://funboost.readthedocs.io/zh-cn/latest/articles/c4.html
Celery概述
Celery是一个简洁、可靠且强大的分布式消息处理系统,专注于实时异步任务队列,同时支持任务调度功能。其核心架构由三个关键组件构成:消息中间件(Message Broker)、任务执行单元(Worker)以及任务结果存储(Result Backend)。
架构组件详解
消息中间件
Celery本身不内置消息服务,但支持灵活集成多种第三方消息中间件,包括RabbitMQ、Redis等主流选择。
任务执行单元
Worker是Celery提供的任务执行实体,能够并发运行在分布式系统的多个节点上,实现高效的分布式计算。
任务结果存储
Result Backend用于持久化Worker执行任务后的返回值,Celery支持多种存储后端,如AMQP、Redis、数据库等。
并发与序列化支持
- 并发方式:Prefork、Eventlet、Gevent、Threads/Single Threaded
- 序列化格式:Pickle、JSON、YAML、Msgpack,以及zlib、bzip2压缩支持,另有加密消息签名机制
典型应用场景
Celery作为强大的分布式异步任务框架,能够使任务执行完全脱离主程序,甚至可分配至其他主机运行。常见用途包括:
- 异步任务:将耗时操作(如短信/邮件发送、消息推送、音视频处理)提交至Celery异步执行
- 定时任务:按预设周期执行数据统计、报表生成等任务
Celery通过消息进行通信协调。客户端(任务发起方)将任务发送至Broker(消息队列),Broker负责将任务分发给Worker(任务执行方)处理。
消息队列(Message Queue)是独立运行的中间层程序,作为消息传输过程中的临时缓存容器。消息可以是简单的文本字符串,也可以是复杂的JSON或哈希数据结构。队列采用FIFO(先进先出)原则,Python的list类型即可方便实现队列结构。
当前主流消息队列包括RabbitMQ、Kafka、RocketMQ、MetaMQ、ZeroMQ、ActiveMQ等。虽然Redis、MySQL、MongoDB也可充当消息中间件,但专业性和稳定性不及上述方案。
选型建议:并发10k以下使用Redis;10k-1000k使用RabbitMQ;1000k以上使用RocketMQ
环境配置
pip install -U Celery
# 或
sudo easy_install Celery
重要提示:Celery自4.0版本起不再支持Windows系统。如必须在Windows环境使用,需安装4.0之前版本,并配合gevent、geventlet或eventlet协程模块。
基础操作指南
数据存储注意事项
数据库字段类型选择
当需要将任务结果存储至MySQL数据库时,需注意以下数据类型容量:
| 类型 | 存储上限 | 典型用途 |
|---|---|---|
| TINYTEXT | 255字节 | 超短文本 |
| TEXT | 65,535字节(约64KB) | 普通文本 |
| MEDIUMTEXT | 16,777,215字节(约16MB) | 中等文本 |
| LONGTEXT | 4,294,967,295字节(约4GB) | 大型文本 |
| TINYBLOB | 255字节 | 小型二进制 |
| BLOB | 65,535字节(约64KB) | 中等二进制(如小图片) |
| MEDIUMBLOB | 16,777,215字节(约16MB) | 大型二进制(音频/视频) |
| LONGBLOB | 4,294,967,295字节(约4GB) | 超大型二进制(完整视频/数据库备份) |
关键点:由于Celery默认使用二进制格式存储结果,即使配置了JSON序列化,实际存入数据库的仍是二进制数据。因此应使用LONGBLOB类型而非LONGTEXT,并确保数据库字符集为UTF8或utf8mb4。
序列化格式配置
# 配置任务的序列化与反序列化方式
app.conf.update(
task_serializer='json',
result_serializer='json',
accept_content=['json', 'pickle'],
)
JSON vs Pickle对比:
- JSON:适合存储为可读字符串,兼容性优异,但仅支持基础数据类型
- Pickle:支持复杂Python对象,但存在安全风险——反序列化时可能执行任意代码,存在代码注入风险,仅在可信环境使用
时间配置要点
使用定时或预约功能时,必须正确配置时区并禁用UTC:
app.conf.update(timezone='Asia/Shanghai', enable_utc=False)
当使用eta参数时,必须传入带有时区信息的datetime对象,否则任务将无法正确执行。
延时任务实现
# 方式一:使用countdown参数(推荐)
task_alpha.apply_async(countdown=10) # 延时10秒执行
# 方式二:使用eta参数配合datetime
from datetime import datetime, timedelta
eta_time = datetime.now() + timedelta(seconds=5)
task_alpha.apply_async(eta=eta_time)
预约任务实现
from datetime import datetime, timedelta
import pytz
# 使用带有时区信息的datetime对象
预约时间 = datetime.now(pytz.timezone('Asia/Shanghai')) + timedelta(seconds=30)
task_alpha.apply_async(eta=预约时间)
定时任务(周期任务)实现
from celery.schedules import crontab
# 配置定时调度规则
app.conf.beat_schedule = {
'morning_job': {
'task': 'task_alpha',
'schedule': crontab(hour=7, minute=0), # 每天早晨7点执行
},
'periodic_job': {
'task': 'task_alpha',
'schedule': timedelta(minutes=10), # 每10分钟执行一次
},
}
启动Beat调度器:
celery -A MyProject.main beat -l info
注意:beat_schedule仅将任务放入队列,需要单独启动Worker才能实际执行任务。在Celery 5.4.0及以上版本中,建议分别启动Worker和Beat以提高稳定性。
Pickle安全警告处理
以root用户运行且启用pickle序列化时,会收到安全警告:
worker accepts messages serialized with pickle is a very bad idea!
If you really want to continue then you have to set the C_FORCE_ROOT
environment variable (but please think about this before you do).
安全建议
- 避免root运行:创建非特权用户运行Celery
sudo adduser celeryuser sudo su celeryuser celery -A tasks worker -l info - 禁用pickle:改用JSON序列化
app.conf.update( task_serializer='json', result_serializer='json', accept_content=['json'], ) - 确需root运行(不推荐):
export C_FORCE_ROOT=1 celery -A tasks worker -l info
常用配置参数详解
Task装饰器配置
@app.task() # 常用配置参数
# name: 自定义任务名称(默认使用函数路径)
@app.task(name='custom_task_name')
# ignore_result: 设为True则不存储执行结果,节省存储空间
@app.task(ignore_result=True)
# bind: 设为True使任务绑定到实例,可通过self访问任务上下文和配置重试
@app.task(bind=True, name='vulnerability_scan')
def vulnerability_scan(self, *args, **kwargs):
try:
# 业务逻辑
pass
except Exception as e:
logger.error(f"任务执行失败: {str(e)}")
# 重试配置
raise self.retry(exc=e, countdown=60, max_retries=self.max_retries, retry_backoff=True)
# max_retries: 最大重试次数
@app.task(max_retries=3)
# default_retry_delay: 重试默认延迟(秒)
@app.task(default_retry_delay=60)
# serializer: 序列化格式
@app.task(serializer='json')
重试机制参数
- exc:指定触发重试的异常对象
- countdown:重试前等待秒数
- eta:指定下次重试的具体时间点(datetime对象)
- max_retries:最大重试次数
- retry_backoff:启用指数退避策略
- retry_backoff_max:最大退避时间(秒)
- retry_jitter:在重试时间添加随机抖动,避免同时重试
全局配置参数
app.conf.update(
# broker_url: 消息代理地址
broker_url='redis://localhost:6379/0',
# result_backend: 结果存储后端
result_backend='redis://localhost:6379/0',
# task_routes: 任务路由规则
task_routes={'myapp.tasks.add': {'queue': 'high_priority'}},
# task_serializer: 任务序列化方式
task_serializer='json',
# result_serializer: 结果序列化方式
result_serializer='json',
# accept_content: 接受的序列化格式列表
accept_content=['json', 'application/text'],
# timezone: 时区设置
timezone='UTC',
# enable_utc: 是否使用UTC时间
enable_utc=True,
# task_acks_late: 任务完成后才确认
task_acks_late=True,
# worker_concurrency: Worker并发数
worker_concurrency=4,
)
Worker启动方式
方式一:直接启动
# 启动Worker实例
celery -A Apps.CeleryScripts.main:app worker -l info -c 2 -Q notification_queue
# 参数说明:
# -A: 指定Celery应用实例路径
# worker: 启动Worker模式
# -l info: 设置日志级别
# -c 2: 并发数为2
# -Q: 指定任务队列名称
# 同时启动Worker和Beat调度器(旧版方式)
celery -A Apps.CeleryScripts.main:app worker -l info -B
优缺点
优点:命令简洁,适合小型应用快速部署
缺点:扩展性有限,多实例管理复杂
方式二:多进程管理
# 启动Worker实例
celery multi start worker1 -A Apps.CeleryScripts.main:app -l info -c 2 -Q notification_queue --logfile=worker.log
# 启动Worker及Beat调度器
celery multi start worker2 -A Apps.CeleryScripts.main:app -l info -B --logfile=worker_beat.log
管理命令
- start:启动Worker实例
- stop:立即停止指定Worker
- restart:重启指定Worker
- stopwait:优雅停止,等待当前任务完成
优缺点
优点:支持大规模应用和多实例管理
缺点:配置相对复杂
任务调用方法对比
Celery提供三种任务调用方式,均继承自Task类:
- delay():适用于简单任务调用
- apply_async():适用于需要精细控制的高级调用
- send_task():适用于通过任务名称调用的场景
delay()方法
@app.task
def add_numbers(x, y):
return x + y
# 简洁调用方式
result = add_numbers.delay(4, 6)
print("执行结果:", result.get()) # 输出: 10
apply_async()方法
# 支持丰富的配置参数
result = add_numbers.apply_async(
args=(4, 6),
countdown=10, # 延时秒数
eta=datetime_obj, # 指定执行时间
queue='queue_name',
retry=True,
expires=3600
)
send_task()方法
# 通过任务名称调用
result = app.send_task('my_module.add_numbers', args=(4, 6))
print("执行结果:", result.get())
参数传递注意事项
kwargs在传递时采用**kwargs解构方式:
from celery import Celery
import json
app = Celery('demo', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@app.task(rate_limit='2/m', name='send_notification')
def process_message(age, name, extra):
print("任务执行中")
print(f"年龄: {age}, 姓名: {name}, 额外: {extra}")
return f"返回数据: {age}, 完整: {json.dumps({'age': age, 'name': name, 'extra': extra})}"
# 调用方式
ret = app.send_task(
name="send_notification",
kwargs={"age": 19, "name": "alex", "extra": {"Data": "@23"}}
)
result = ret.get(timeout=10)
print("结果:", result)
如需传入整体字典而不解构:
@app.task(rate_limit='2/m', name='send_notification')
def process_message(data):
age = data.get("age")
return f"返回: {age}, 完整: {json.dumps(data)}"
ret = app.send_task(
name="send_notification",
kwargs={"data": {"age": 19, "name": "alex"}}
)
单模块项目实践
创建celery_test目录,包含producer(生产端)、consumer(消费端)、result(结果获取):
消费端代码 consumer.py
import celery
import time
# result_backend: 存储任务结果的后端(可选)
result_backend = "redis://:password@ip:port/14"
# broker: 消息中间件,用于传递任务
broker = "redis://:password@ip:port/15"
# 创建Celery应用实例
app = celery.Celery('demo', backend=result_backend, broker=broker)
# 注册任务
@app.task()
def send_sms(name):
print("开始发送短信")
time.sleep(3)
return f"短信发送成功: {name}"
启动Worker(在tasks.py所在目录):
celery -A tasks worker -l info
生产端代码 producer.py
from tasks import send_sms
# 异步调用任务
ret = send_sms.delay("hello")
print(ret) # 返回任务ID
结果获取代码 result_fetcher.py
任务状态包括:PENDING、RECEIVED、STARTED、SUCCESS、FAILURE、REVOKED、REJECTED
from celery.result import AsyncResult
from tasks import app
async_result = AsyncResult(id=task_id, app=app)
if async_result.status == 'SUCCESS':
try:
result = async_result.get()
print("任务结果:", result)
except Exception as e:
print("获取结果异常:", str(e))
elif async_result.status == 'FAILURE':
error = async_result.result
print(f"任务失败: {error}")
traceback = async_result.traceback
print(f"堆栈信息: {traceback}")
elif async_result.status in ('PENDING', 'STARTED', 'RETRY'):
print(f"任务当前状态: {async_result.status}")
elif async_result.status == 'REVOKED':
print("任务已被撤销")
任务结果操作
async_result.get() # 获取结果
async_result.forget() # 删除结果(默认不会自动删除)
async_result.revoke(terminate=True) # 强制终止任务
async_result.revoke(terminate=False) # 如果任务未开始则终止
async_result.failed() # 任务执行失败返回True
async_result.status # 查看任务状态
多模块项目实践
项目结构:
Celery配置 mycelery.py
import celery
backend = 'redis://:password@ip:port/14'
broker = 'redis://:password@ip:port/15'
app = celery.Celery('demo', backend=backend, broker=broker, include=[
'celery_tasks.task01',
'celery_tasks.task02'
])
# 时区配置
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False
任务定义 task01.py / task02.py
from celery_tasks.mycelery import app
import time
@app.task()
def send_sms(name):
print("短信开始发送")
time.sleep(3)
return f"短信发送成功: {name}"
启动Worker
# 在项目根目录执行
celery -A celery_tasks.mycelery worker -l info
生产端调用
from celery_tasks.task01 import send_sms
result1 = send_sms.delay("张三")
print(result1.id)
result2 = send_sms.delay("李四")
print(result2.id)
任务终止处理
终止流程
- 通过AsyncResult获取任务当前状态
- 根据状态判断是否可安全终止
- 调用revoke方法执行终止操作
终止代码实现
from celery.result import AsyncResult
from celery.task.control import revoke
def cancel_task(celery_app, task_id):
"""尝试取消指定任务"""
result = AsyncResult(task_id, app=celery_app)
task_status = result.status
if task_status == 'PENDING':
print(f"任务{task_id}处于PENDING状态,安全撤销")
revoke(task_id, terminate=False)
elif task_status == 'RECEIVED':
print(f"任务{task_id}已接收但未执行,撤销")
revoke(task_id, terminate=False)
elif task_status == 'STARTED':
print(f"任务{task_id}正在执行,尝试终止")
revoke(task_id, terminate=True, signal='SIGTERM')
else:
print(f"任务{task_id}当前状态为{task_status},无法取消")
return f"已执行操作,任务原状态: {task_status}"
终止信号选择
- SIGTERM:优雅终止信号,允许程序清理资源,建议首选
- SIGKILL:强制立即终止,不执行清理,可能导致数据损坏
- SIGINT:中断信号(Ctrl+C),适用于模拟手动终止
- SIGQUIT:类似SIGTERM但生成核心转储,用于调试
- SIGHUP:常用于重新加载配置
安全建议:优先使用SIGTERM,确保任务代码能捕获信号并执行清理操作。
定时任务配置
延时执行模式
from datetime import datetime, timedelta
from celery_tasks.task01 import send_sms
current_time = datetime.now()
# 转为UTC时间
utc_time = datetime.utcfromtimestamp(current_time.timestamp())
delay_time = timedelta(seconds=10)
scheduled_time = utc_time + delay_time
# 使用apply_async设置执行时间
result = send_sms.apply_async(args=["测试用户"], eta=scheduled_time)
print(result.id)
Beat调度配置模式
from datetime import timedelta
import celery
from celery.schedules import crontab
backend = 'redis://:password@ip:port/14'
broker = 'redis://:password@ip:port/15'
app = celery.Celery('demo', backend=backend, broker=broker, include=[
'celery_tasks.task01',
'celery_tasks.task02'
])
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False
# 配置定时任务
app.conf.beat_schedule = {
'task-every-6-seconds': {
'task': 'celery_tasks.task01.send_sms',
'schedule': timedelta(seconds=6),
'args': ('定时任务参数',),
},
'daily-task': {
'task': 'celery_tasks.task01.send_email',
'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
'args': ('定时邮件参数',),
},
}
启动命令
# 启动Worker
celery -A celery_tasks.mycelery worker -l info
# 启动Beat调度器
celery -A celery_tasks.mycelery beat -l info
# 旧版合并启动(5.4.0前可用)
celery -A celery_tasks.mycelery worker -B -l info
Beat进程会周期性检查配置,将到期任务发送至任务队列。
任务组合模式
Chain(链式任务)
任务按顺序执行,前一个任务的输出作为下一个任务的输入:
from celery import Celery, chain
app = Celery('demo', broker='pyamqp://guest@localhost//')
@app.task(name="step_one")
def process_first(x):
return x + 1
@app.task(name="step_two")
def process_second(x):
return x * 2
@app.task(name="step_three")
def process_third(x):
return x - 3
# 链式调用
result = chain(
process_first.s(10),
process_second.s(),
process_three.s()
)()
# 按任务名称调用
result = chain(
app.tasks['step_one'].s(10),
app.tasks['step_two'].s(),
app.tasks['step_three'].s(3)
)()
Group(并行任务)
任务并行执行,独立运行:
from celery import group
@app.task
def double_value(x):
return x * 2
# 并行执行5个任务
group_result = group(double_value.s(i) for i in range(5))()
print("结果列表:", group_result.get()) # 输出: [0, 2, 4, 6, 8]
混合使用示例:先获取task1结果,再传给task2和task3并行执行:
from celery import Celery, group, Task
import pymysql
import random
pymysql.install_as_MySQLdb()
result_backend = 'db+mysql://user:password@localhost:3306/celery'
broker_url = 'redis://:@127.0.0.1:6379/0'
app = Celery('demo', backend=result_backend, broker=broker_url)
app.conf.update(
task_serializer='json',
result_serializer='json',
accept_content=['json'],
)
class UserProfile:
def __init__(self, age, gender):
self.age = age
self.gender = gender
def to_dict(self):
return {'age': self.age, 'gender': self.gender}
@app.task
def generate_user():
random_age = random.randint(1, 100)
profile = UserProfile(random_age, "男")
return random_age, profile.to_dict()
@app.task
def process_user(age_data, info, param):
print(f"处理用户: 年龄={info['age']}, 性别={info['gender']}, 参数={param}")
return 3
@app.task
def send_notification(age_data, info, param):
print(f"发送通知: 年龄={info['age']}, 性别={info['gender']}, 参数={param}")
return 3
# 生产端调用
result_1 = generate_user.apply_async().get()
print("第一步结果:", result_1)
# 并行执行后续任务
result_2_and_3 = group(
process_user.s(*result_1, "参数1"),
send_notification.s(*result_1, "参数2")
)().get()
print("并行任务结果:", result_2_and_3)
Chord(回调任务)
所有并行任务完成后执行回调任务:
from celery import chord
@app.task
def calculate_sum(results):
return sum(results)
@app.task
def process_item(x):
return x * 2
# 和弦任务:并行执行后执行回调
chord_result = chord((process_item.s(i) for i in range(5)), calculate_sum.s())()
print("结果:", chord_result.get()) # 输出: 20 (0+2+4+6+8)
总结:
- Chain:顺序执行任务
- Group:并行执行任务
- Chord:并行执行后执行回调
Django集成实践
独立项目结构
app01/
mycelery/
├── __init__.py
├── config.py # 配置文件
├── main.py # 主程序入口
└── sms/ # 短信任务模块
├── __init__.py
└── tasks.py # 任务文件(必须命名为tasks)
config.py配置
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = "Asia/Shanghai"
broker_url = "redis://:password@ip:port/14"
result_backend = "redis://:password@ip:port/15"
sms/tasks.py
from mycelery.main import app
import time
@app.task(name='send_sms_task')
def send_sms(mobile):
"""发送短信任务"""
print(f"向手机号{mobile}发送短信成功!")
time.sleep(5)
return "send_sms OK"
main.py
import os
from celery import Celery
app = Celery("sms")
app.config_from_object("mycelery.config")
app.autodiscover_tasks(["mycelery.sms"])
# 启动命令(建议在mycelery根目录执行)
# celery -A mycelery.main worker --loglevel=info
Django视图调用
from mycelery.sms.tasks import send_sms
def celery_test(request):
send_sms.delay("15867416745")
return HttpResponse("任务已提交")
与Django融合模式
集成思路
在main.py中初始化Django环境:
import os
import django
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings.dev')
django.setup()
app = Celery("project")
app.config_from_object("mycelery.config")
app.autodiscover_tasks(["mycelery.sms", "mycelery.email"])
# 启动命令
# celery -A mycelery.main worker -l info
Django配置集成
将Celery配置写入Django的settings.py:
# settings/dev.py
# 任务队列
CELERY_BROKER_URL = 'redis://:123456@127.0.0.1:6379/14'
CELERY_RESULT_BACKEND = 'redis://:123456@127.0.0.1:6379/15'
# 时区
CELERY_TIMEZONE = TIME_ZONE
CELERY_ENABLE_UTC = True
# 防止死锁
CELERY_FORCE_EXECV = True
CELERYD_CONCURRENCY = 200
CELERY_ACKS_LATE = True
CELERYD_MAX_TASKS_PER_CHILD = 500
CELERYD_TIME_LIMIT = 10 * 60
CELERY_DISABLE_RATE_LIMITS = True
# 序列化配置
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
# 结果过期时间
from datetime import timedelta
CELERY_RESULT_EXPIRES = timedelta(hours=1)
# 定时任务配置
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
"user-add": {
"task": "add",
"schedule": 10,
},
"backend-cleanup": {
"task": "celery.backend_cleanup",
"schedule": crontab(hour=0, minute=0),
},
}
初始化配置
在Django的__init__.py中添加:
import pymysql
from mycelery import app as celery_app
pymysql.install_as_MySQLdb()
__all__ = ['celery_app']
分布式任务模块
# users/tasks.py
from celery import shared_task
import logging
logger = logging.getLogger("django")
@shared_task(name="send_verification_code")
def send_sms(task_id, mobile, data):
"""异步发送验证码"""
try:
from ronglianyunapi import send_sms as sms_send
return sms_send(task_id, mobile, data)
except Exception as e:
logger.error(f"手机号{mobile}发送失败: {e}")
视图调用
from .tasks import send_sms
class SMSAPIView(APIView):
def get(self, request, mobile):
redis_conn = get_redis_connection("sms_code")
# 检查发送间隔
interval = redis_conn.ttl(f"interval_{mobile}")
if interval != -2:
return Response(
{"error": f"发送过于频繁,请{interval}秒后重试"},
status=status.HTTP_400_BAD_REQUEST
)
# 生成验证码
code = f"{random.randint(0, 999999):06d}"
expire_time = settings.SMS_EXPIRE
# 异步发送
send_sms.delay(settings.SMS_TASK_ID, mobile, (code, expire_time // 60))
# 存储验证码
pipe = redis_conn.pipeline()
pipe.multi()
pipe.setex(f"sms_{mobile}", expire_time, code)
pipe.setex(f"interval_{mobile}", settings.SMS_INTERVAL, "_")
pipe.execute()
return Response({"message": "OK"})
启动方式
# 方式一:普通运行
celery -A project worker -l INFO
# 方式二:守护进程运行
celery multi start worker -A project -E \
--pidfile="/path/worker.pid" \
--logfile="/path/celery.log" \
-l info -n worker1
# 停止
celery multi stop worker -A project --pidfile="/path/worker.pid"
测试调用
$ python manage.py shell
>>> from users.tasks import send_sms
>>> res = send_sms.delay()
>>> res = send_sms.apply_async(countdown=15)
>>> res.id
>>> res.state
>>> res.result
Worker参数详解
celery worker [OPTIONS]
# 基础选项
-n, --hostname HOSTNAME # 主机名(用于区分多Worker)
-D, --detach # 后台运行
-S, --statedb PATH # 状态数据库路径
-l, --loglevel [级别] # 日志级别
-O, --optimization [模式] # 优化配置
--prefetch-multiplier N # 预取倍数
# 进程池选项
-c, --concurrency N # 并发数(默认CPU核心数*2)
-P, --pool [类型] # 进程池类型
- prefork: 多进程池
- eventlet: 事件协程池
- gevent: Gevent协程池
- solo: 单进程
- threads: 线程池
# 任务限制
--time-limit FLOAT # 单任务硬超时
--soft-time-limit FLOAT # 单任务软超时
--max-tasks-per-child N # 单Worker最大任务数
--max-memory-per-child N(MB) # 单Worker最大内存
# 队列选项
--purge # 清空队列
-Q, --queues LIST # 监听的队列
-X, --exclude-queues LIST # 排除的队列
# 功能选项
--without-gossip # 禁用节点状态同步
--without-mingle # 禁用Worker间协调
--without-heartbeat # 禁用心跳
--autoscale MIN,MAX # 自动扩缩容
# Beat选项
-B, --beat # 启动Beat调度器
-s, --schedule FILE # 调度文件路径
# 守护进程选项
-f, --logfile PATH # 日志文件
--pidfile PATH # PID文件
--uid/--gid/--umask # 用户/组/权限
多区域分布式部署
架构设计
通过中心节点统一调度,任务下发至各区域执行:
项目结构/
MyCelery/
├── __init__.py
├── celery_config.py
├── main.py
├── signals.py
└── regions/
├── __init__.py
└── tasks.py
多实例配置
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'CloudEye.settings')
def create_celery_instance(name, broker, backend):
app = Celery(name)
app.conf.update(
broker_url=broker,
result_backend=backend,
task_default_queue='Celery',
)
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
return app
# 各区域配置
REGION1_BROKER = 'redis://1.1.1.1:6379/0'
REGION1_BACKEND = 'db+mysql://user:pass@1.1.1.1/dbname'
REGION2_BROKER = 'redis://1.1.10.1:6379/0'
REGION2_BACKEND = 'db+mysql://user:pass@1.1.10.1/dbname'
REGION3_BROKER = 'redis://1.1.20.1:6379/0'
REGION3_BACKEND = 'db+mysql://user:pass@1.1.20.1/dbname'
app_region1 = create_celery_instance('region1', REGION1_BROKER, REGION1_BACKEND)
app_region2 = create_celery_instance('region2', REGION2_BROKER, REGION2_BACKEND)
app_region3 = create_celery_instance('region3', REGION3_BROKER, REGION3_BACKEND)
app_dict = {
'region1': app_region1,
'region2': app_region2,
'region3': app_region3,
}
任务定义
from main import app_region1, app_region2, app_region3
@app_region1.task
def health_check():
return "区域1巡检完成"
@app_region2.task
def health_check():
return "区域2巡检完成"
@app_region3.task
def health_check():
return "区域3巡检完成"
视图接口
def dispatch_task(request):
if request.method == 'POST':
region = request.POST.get('region')
app = app_dict.get(region)
if not app:
return JsonResponse({'error': '无效区域'}, status=400)
result = app.send_task('regions.tasks.health_check')
task_result = result.get()
return JsonResponse({'message': f'{region}任务结果: {task_result}'})
return render(request, 'dispatch.html')
启动各区域Worker
# 区域1
celery -A main.app_region1 worker --loglevel=info
# 区域2
celery -A main.app_region2 worker --loglevel=info
# 区域3
celery -A main.app_region3 worker --loglevel=info
Signature对象
Signature对象为Celery任务流管理提供强大支持,实现参数自动传递、任务链组构建、回调错误处理等功能。
参数自动传递
# 使用.s()语法糖
result = chain(
app.tasks['task_alpha'].s(arg1, arg2),
app.tasks['task_beta'].s()
).apply_async()
# 使用Signature类
from celery import Signature
result = chain(
Signature('task_alpha', args=(arg1, arg2)),
Signature('task_beta')
).apply_async()
任务链构建
from celery import chain
chain_result = chain(
app.tasks['step_one'].s(value),
app.tasks['step_two'].s(),
app.tasks['step_three'].s()
).apply_async()
任务组构建
from celery import group
group_result = group(
app.tasks['task_a'].s(param1),
app.tasks['task_b'].s(param2),
app.tasks['task_c'].s(param3)
).apply_async()
和弦构建
from celery import chord, group
chord_result = chord(
group(
app.tasks['task_x'].s(data1),
app.tasks['task_y'].s(data2),
app.tasks['task_z'].s(data3)
)
)(app.tasks['final_callback'].s())
回调与错误处理
# 使用.s()语法糖
sig = app.tasks['target_task'].s(arg1, arg2).on_error(app.tasks['error_handler'].s())
sig.apply_async()
# 使用Signature类
sig = Signature('target_task', args=(arg1, arg2)).on_error(Signature('error_handler'))
sig.apply_async()
序列化支持
sig = app.tasks['some_task'].s(arg1, arg2)
serialized = sig.dumps()
restored = Signature(serialized)
restored.apply_async()
Celery信号机制
Celery基于Blinker库实现信号机制,允许在任务生命周期各阶段插入自定义逻辑。
常用信号列表
- task_prerun:任务执行前触发
- task_postrun:任务执行后触发
- task_success:任务成功完成触发
- task_failure:任务失败时触发
- task_retry:任务重试时触发
- task_revoked:任务被撤销时触发
- task_unknown:收到未知任务时触发
- task_rejected:任务被拒绝时触发
- before_task_publish:任务发布前触发
- after_task_publish:任务发布后触发
信号连接方式
装饰器方式
from celery import Celery
from celery.signals import task_prerun, task_postrun, task_success, task_failure
app = Celery('demo', broker='pyamqp://guest@localhost//')
@task_prerun.connect
def before_task(sender=None, task_id=None, task=None, args=None, kwargs=None, **extras):
print(f"任务{task_id}即将执行,参数: {args}, {kwargs}")
@task_postrun.connect
def after_task(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **extras):
print(f"任务{task_id}已完成,状态: {state},返回值: {retval}")
@task_success.connect
def on_success(sender=None, result=None, **extras):
print(f"任务{sender.name}成功,结果: {result}")
@task_failure.connect
def on_failure(sender=None, task_id=None, exception=None, args=None, kwargs=None, traceback=None, einfo=None, **extras):
print(f"任务{task_id}失败,异常: {exception}")
直接连接方式
from celery.signals import task_prerun
def task_handler(sender=None, task_id=None, **kwargs):
print(f"处理任务{task_id}")
task_prerun.connect(task_handler)
信号参数详解
task_prerun
@task_prerun.connect
def handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **extras):
# sender: 任务对象
# task_id: 任务唯一ID
# task: 任务实例
# args: 位置参数
# kwargs: 关键字参数
pass
task_postrun
@task_postrun.connect
def handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **extras):
# retval: 任务返回值
# state: 任务最终状态
pass
task_failure
from celery.signals import task_failure
import traceback
@task_failure.connect
def handler(sender=None, task_id=None, exception=None, args=None, kwargs=None, traceback=None, einfo=None, **extras):
formatted_tb = ''.join(traceback.format_exception(einfo.type, einfo.exception, einfo.tb))
print(f'任务{sender.name} ID:{task_id} 失败异常:{exception}')
print(f'堆栈:{formatted_tb}')
task_retry
@task_retry.connect
def handler(sender=None, request=None, reason=None, **extras):
# reason: 重试原因
pass
task_revoked
@task_revoked.connect
def handler(sender=None, request=None, terminated=None, signum=None, expired=None, **extras):
# terminated: 是否被终止
# signum: 终止信号
# expired: 是否已过期
pass
信号方法
connect - 连接信号处理器
from celery.signals import task_prerun
def handler(sender=None, task_id=None, **kwargs):
print(f"任务{task_id}执行前")
task_prerun.connect(handler)
disconnect - 断开信号连接
task_prerun.disconnect(handler)
send - 手动触发信号
task_prerun.send(sender="test_task", task_id="12345")
has_listeners - 检查监听器
if task_prerun.has_listeners():
print("信号有监听器")
else:
print("信号无监听器")
全局与局部信号
全局信号使用
# signals.py
from celery.signals import task_failure
import logging
error_logger = logging.getLogger('task_error')
@task_failure.connect
def handle_failure(sender, task_id, exception, **kwds):
error_logger.error(f'任务{sender.name} {task_id}失败: {exception}')
# main.py
import os
from MyCelery import signals
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')
app = Celery('demo')
app.config_from_object('mycelery.config')
app.autodiscover_tasks(['mycelery.tasks',])
局部信号(按任务区分)
from celery import Celery, signals
app = Celery('demo')
app.conf.broker_url = 'redis://localhost:6379/0'
@signals.task_prerun.connect
def handler(sender, task_id, task, args, kwargs, **options):
if task.name == 'demo.task_alpha':
print(f"Task {task.name} 即将运行")
@signals.task_failure.connect
def failure_handler(sender, task_id, args, kwargs, einfo, **options):
if sender.name == 'demo.task_alpha':
print(f"Task {sender.name} 失败")
@app.task(name='demo.task_alpha')
def task_alpha():
raise ValueError("task_alpha错误")
@app.task(name='demo.task_beta')
def task_beta():
raise ValueError("task_beta错误")
自定义任务类方式
from celery import Celery, Task
import logging
app = Celery('demo')
app.conf.broker_url = 'redis://localhost:6379/0'
logger = logging.getLogger('custom')
class CustomTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.error(f"任务{self.name} [{task_id}] 失败: {exc}")
@app.task(base=CustomTask, name='demo.custom_task')
def custom_task():
raise ValueError("自定义任务错误")