当前位置:首页 > 技术 > 正文内容

Celery分布式任务队列实战指南

访客 技术 2026年6月17日 1

竞品对比

在Celery之外,还有几个值得关注的任务调度框架:

  • Taskiq - 专为FastAPI设计的轻量级异步任务框架
    https://taskiq-python.github.io/
  • Funboost - 国产功能强大的任务调度框架
    https://funboost.readthedocs.io/zh-cn/latest/articles/c4.html

Celery概述

Celery是一个简洁、可靠且强大的分布式消息处理系统,专注于实时异步任务队列,同时支持任务调度功能。其核心架构由三个关键组件构成:消息中间件(Message Broker)、任务执行单元(Worker)以及任务结果存储(Result Backend)。

Celery架构图

架构组件详解

消息中间件

Celery本身不内置消息服务,但支持灵活集成多种第三方消息中间件,包括RabbitMQ、Redis等主流选择。

任务执行单元

Worker是Celery提供的任务执行实体,能够并发运行在分布式系统的多个节点上,实现高效的分布式计算。

任务结果存储

Result Backend用于持久化Worker执行任务后的返回值,Celery支持多种存储后端,如AMQP、Redis、数据库等。

并发与序列化支持

  • 并发方式:Prefork、Eventlet、Gevent、Threads/Single Threaded
  • 序列化格式:Pickle、JSON、YAML、Msgpack,以及zlib、bzip2压缩支持,另有加密消息签名机制

典型应用场景

Celery作为强大的分布式异步任务框架,能够使任务执行完全脱离主程序,甚至可分配至其他主机运行。常见用途包括:

  • 异步任务:将耗时操作(如短信/邮件发送、消息推送、音视频处理)提交至Celery异步执行
  • 定时任务:按预设周期执行数据统计、报表生成等任务

Celery通过消息进行通信协调。客户端(任务发起方)将任务发送至Broker(消息队列),Broker负责将任务分发给Worker(任务执行方)处理。

消息队列(Message Queue)是独立运行的中间层程序,作为消息传输过程中的临时缓存容器。消息可以是简单的文本字符串,也可以是复杂的JSON或哈希数据结构。队列采用FIFO(先进先出)原则,Python的list类型即可方便实现队列结构。

当前主流消息队列包括RabbitMQ、Kafka、RocketMQ、MetaMQ、ZeroMQ、ActiveMQ等。虽然Redis、MySQL、MongoDB也可充当消息中间件,但专业性和稳定性不及上述方案。

选型建议:并发10k以下使用Redis;10k-1000k使用RabbitMQ;1000k以上使用RocketMQ

环境配置

pip install -U Celery
# 或
sudo easy_install Celery

重要提示:Celery自4.0版本起不再支持Windows系统。如必须在Windows环境使用,需安装4.0之前版本,并配合gevent、geventlet或eventlet协程模块。

基础操作指南

数据存储注意事项

数据库字段类型选择

当需要将任务结果存储至MySQL数据库时,需注意以下数据类型容量:

类型存储上限典型用途
TINYTEXT255字节超短文本
TEXT65,535字节(约64KB)普通文本
MEDIUMTEXT16,777,215字节(约16MB)中等文本
LONGTEXT4,294,967,295字节(约4GB)大型文本
TINYBLOB255字节小型二进制
BLOB65,535字节(约64KB)中等二进制(如小图片)
MEDIUMBLOB16,777,215字节(约16MB)大型二进制(音频/视频)
LONGBLOB4,294,967,295字节(约4GB)超大型二进制(完整视频/数据库备份)

关键点:由于Celery默认使用二进制格式存储结果,即使配置了JSON序列化,实际存入数据库的仍是二进制数据。因此应使用LONGBLOB类型而非LONGTEXT,并确保数据库字符集为UTF8或utf8mb4。

序列化格式配置

# 配置任务的序列化与反序列化方式
app.conf.update(
    task_serializer='json',
    result_serializer='json',
    accept_content=['json', 'pickle'],
)

JSON vs Pickle对比

  • JSON:适合存储为可读字符串,兼容性优异,但仅支持基础数据类型
  • Pickle:支持复杂Python对象,但存在安全风险——反序列化时可能执行任意代码,存在代码注入风险,仅在可信环境使用

时间配置要点

使用定时或预约功能时,必须正确配置时区并禁用UTC:

app.conf.update(timezone='Asia/Shanghai', enable_utc=False)

当使用eta参数时,必须传入带有时区信息的datetime对象,否则任务将无法正确执行。

延时任务实现

# 方式一:使用countdown参数(推荐)
task_alpha.apply_async(countdown=10)  # 延时10秒执行

# 方式二:使用eta参数配合datetime
from datetime import datetime, timedelta

eta_time = datetime.now() + timedelta(seconds=5)
task_alpha.apply_async(eta=eta_time)

预约任务实现

from datetime import datetime, timedelta
import pytz

# 使用带有时区信息的datetime对象
预约时间 = datetime.now(pytz.timezone('Asia/Shanghai')) + timedelta(seconds=30)
task_alpha.apply_async(eta=预约时间)

定时任务(周期任务)实现

from celery.schedules import crontab

# 配置定时调度规则
app.conf.beat_schedule = {
    'morning_job': {
        'task': 'task_alpha',
        'schedule': crontab(hour=7, minute=0),  # 每天早晨7点执行
    },
    'periodic_job': {
        'task': 'task_alpha',
        'schedule': timedelta(minutes=10),  # 每10分钟执行一次
    },
}

启动Beat调度器:

celery -A MyProject.main beat -l info

注意:beat_schedule仅将任务放入队列,需要单独启动Worker才能实际执行任务。在Celery 5.4.0及以上版本中,建议分别启动Worker和Beat以提高稳定性。

Pickle安全警告处理

以root用户运行且启用pickle序列化时,会收到安全警告:

worker accepts messages serialized with pickle is a very bad idea!
If you really want to continue then you have to set the C_FORCE_ROOT
environment variable (but please think about this before you do).

安全建议

  1. 避免root运行:创建非特权用户运行Celery
    sudo adduser celeryuser
    sudo su celeryuser
    celery -A tasks worker -l info
  2. 禁用pickle:改用JSON序列化
    app.conf.update(
        task_serializer='json',
        result_serializer='json',
        accept_content=['json'],
    )
  3. 确需root运行(不推荐):
    export C_FORCE_ROOT=1
    celery -A tasks worker -l info

常用配置参数详解

Task装饰器配置

@app.task()  # 常用配置参数

# name: 自定义任务名称(默认使用函数路径)
@app.task(name='custom_task_name')

# ignore_result: 设为True则不存储执行结果,节省存储空间
@app.task(ignore_result=True)

# bind: 设为True使任务绑定到实例,可通过self访问任务上下文和配置重试
@app.task(bind=True, name='vulnerability_scan')
def vulnerability_scan(self, *args, **kwargs):
    try:
        # 业务逻辑
        pass
    except Exception as e:
        logger.error(f"任务执行失败: {str(e)}")
        # 重试配置
        raise self.retry(exc=e, countdown=60, max_retries=self.max_retries, retry_backoff=True)

# max_retries: 最大重试次数
@app.task(max_retries=3)

# default_retry_delay: 重试默认延迟(秒)
@app.task(default_retry_delay=60)

# serializer: 序列化格式
@app.task(serializer='json')

重试机制参数

  • exc:指定触发重试的异常对象
  • countdown:重试前等待秒数
  • eta:指定下次重试的具体时间点(datetime对象)
  • max_retries:最大重试次数
  • retry_backoff:启用指数退避策略
  • retry_backoff_max:最大退避时间(秒)
  • retry_jitter:在重试时间添加随机抖动,避免同时重试

全局配置参数

app.conf.update(
    # broker_url: 消息代理地址
    broker_url='redis://localhost:6379/0',
    
    # result_backend: 结果存储后端
    result_backend='redis://localhost:6379/0',
    
    # task_routes: 任务路由规则
    task_routes={'myapp.tasks.add': {'queue': 'high_priority'}},
    
    # task_serializer: 任务序列化方式
    task_serializer='json',
    
    # result_serializer: 结果序列化方式
    result_serializer='json',
    
    # accept_content: 接受的序列化格式列表
    accept_content=['json', 'application/text'],
    
    # timezone: 时区设置
    timezone='UTC',
    
    # enable_utc: 是否使用UTC时间
    enable_utc=True,
    
    # task_acks_late: 任务完成后才确认
    task_acks_late=True,
    
    # worker_concurrency: Worker并发数
    worker_concurrency=4,
)

Worker启动方式

方式一:直接启动

# 启动Worker实例
celery -A Apps.CeleryScripts.main:app worker -l info -c 2 -Q notification_queue

# 参数说明:
# -A: 指定Celery应用实例路径
# worker: 启动Worker模式
# -l info: 设置日志级别
# -c 2: 并发数为2
# -Q: 指定任务队列名称
# 同时启动Worker和Beat调度器(旧版方式)
celery -A Apps.CeleryScripts.main:app worker -l info -B

优缺点

优点:命令简洁,适合小型应用快速部署

缺点:扩展性有限,多实例管理复杂

方式二:多进程管理

# 启动Worker实例
celery multi start worker1 -A Apps.CeleryScripts.main:app -l info -c 2 -Q notification_queue --logfile=worker.log

# 启动Worker及Beat调度器
celery multi start worker2 -A Apps.CeleryScripts.main:app -l info -B --logfile=worker_beat.log

管理命令

  • start:启动Worker实例
  • stop:立即停止指定Worker
  • restart:重启指定Worker
  • stopwait:优雅停止,等待当前任务完成

优缺点

优点:支持大规模应用和多实例管理

缺点:配置相对复杂

任务调用方法对比

Celery提供三种任务调用方式,均继承自Task类:

  • delay():适用于简单任务调用
  • apply_async():适用于需要精细控制的高级调用
  • send_task():适用于通过任务名称调用的场景

delay()方法

@app.task
def add_numbers(x, y):
    return x + y

# 简洁调用方式
result = add_numbers.delay(4, 6)
print("执行结果:", result.get())  # 输出: 10

apply_async()方法

# 支持丰富的配置参数
result = add_numbers.apply_async(
    args=(4, 6),
    countdown=10,    # 延时秒数
    eta=datetime_obj, # 指定执行时间
    queue='queue_name',
    retry=True,
    expires=3600
)

send_task()方法

# 通过任务名称调用
result = app.send_task('my_module.add_numbers', args=(4, 6))
print("执行结果:", result.get())

参数传递注意事项

kwargs在传递时采用**kwargs解构方式:

from celery import Celery
import json

app = Celery('demo', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task(rate_limit='2/m', name='send_notification')
def process_message(age, name, extra):
    print("任务执行中")
    print(f"年龄: {age}, 姓名: {name}, 额外: {extra}")
    return f"返回数据: {age}, 完整: {json.dumps({'age': age, 'name': name, 'extra': extra})}"

# 调用方式
ret = app.send_task(
    name="send_notification",
    kwargs={"age": 19, "name": "alex", "extra": {"Data": "@23"}}
)
result = ret.get(timeout=10)
print("结果:", result)

如需传入整体字典而不解构:

@app.task(rate_limit='2/m', name='send_notification')
def process_message(data):
    age = data.get("age")
    return f"返回: {age}, 完整: {json.dumps(data)}"

ret = app.send_task(
    name="send_notification",
    kwargs={"data": {"age": 19, "name": "alex"}}
)

单模块项目实践

创建celery_test目录,包含producer(生产端)、consumer(消费端)、result(结果获取):

目录结构

消费端代码 consumer.py

import celery
import time

# result_backend: 存储任务结果的后端(可选)
result_backend = "redis://:password@ip:port/14"

# broker: 消息中间件,用于传递任务
broker = "redis://:password@ip:port/15"

# 创建Celery应用实例
app = celery.Celery('demo', backend=result_backend, broker=broker)

# 注册任务
@app.task()
def send_sms(name):
    print("开始发送短信")
    time.sleep(3)
    return f"短信发送成功: {name}"

启动Worker(在tasks.py所在目录):

celery -A tasks worker -l info
Worker启动成功

生产端代码 producer.py

from tasks import send_sms

# 异步调用任务
ret = send_sms.delay("hello")
print(ret)  # 返回任务ID

结果获取代码 result_fetcher.py

任务状态包括:PENDING、RECEIVED、STARTED、SUCCESS、FAILURE、REVOKED、REJECTED

from celery.result import AsyncResult
from tasks import app

async_result = AsyncResult(id=task_id, app=app)

if async_result.status == 'SUCCESS':
    try:
        result = async_result.get()
        print("任务结果:", result)
    except Exception as e:
        print("获取结果异常:", str(e))
elif async_result.status == 'FAILURE':
    error = async_result.result
    print(f"任务失败: {error}")
    traceback = async_result.traceback
    print(f"堆栈信息: {traceback}")
elif async_result.status in ('PENDING', 'STARTED', 'RETRY'):
    print(f"任务当前状态: {async_result.status}")
elif async_result.status == 'REVOKED':
    print("任务已被撤销")

任务结果操作

async_result.get()     # 获取结果
async_result.forget()  # 删除结果(默认不会自动删除)
async_result.revoke(terminate=True)  # 强制终止任务
async_result.revoke(terminate=False) # 如果任务未开始则终止
async_result.failed()  # 任务执行失败返回True
async_result.status   # 查看任务状态

多模块项目实践

项目结构:

多模块结构

Celery配置 mycelery.py

import celery

backend = 'redis://:password@ip:port/14'
broker = 'redis://:password@ip:port/15'

app = celery.Celery('demo', backend=backend, broker=broker, include=[
    'celery_tasks.task01',
    'celery_tasks.task02'
])

# 时区配置
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False

任务定义 task01.py / task02.py

from celery_tasks.mycelery import app
import time

@app.task()
def send_sms(name):
    print("短信开始发送")
    time.sleep(3)
    return f"短信发送成功: {name}"

启动Worker

# 在项目根目录执行
celery -A celery_tasks.mycelery worker -l info

生产端调用

from celery_tasks.task01 import send_sms

result1 = send_sms.delay("张三")
print(result1.id)
result2 = send_sms.delay("李四")
print(result2.id)

任务终止处理

终止流程

  1. 通过AsyncResult获取任务当前状态
  2. 根据状态判断是否可安全终止
  3. 调用revoke方法执行终止操作

终止代码实现

from celery.result import AsyncResult
from celery.task.control import revoke

def cancel_task(celery_app, task_id):
    """尝试取消指定任务"""
    result = AsyncResult(task_id, app=celery_app)
    task_status = result.status

    if task_status == 'PENDING':
        print(f"任务{task_id}处于PENDING状态,安全撤销")
        revoke(task_id, terminate=False)
    elif task_status == 'RECEIVED':
        print(f"任务{task_id}已接收但未执行,撤销")
        revoke(task_id, terminate=False)
    elif task_status == 'STARTED':
        print(f"任务{task_id}正在执行,尝试终止")
        revoke(task_id, terminate=True, signal='SIGTERM')
    else:
        print(f"任务{task_id}当前状态为{task_status},无法取消")
    
    return f"已执行操作,任务原状态: {task_status}"

终止信号选择

  • SIGTERM:优雅终止信号,允许程序清理资源,建议首选
  • SIGKILL:强制立即终止,不执行清理,可能导致数据损坏
  • SIGINT:中断信号(Ctrl+C),适用于模拟手动终止
  • SIGQUIT:类似SIGTERM但生成核心转储,用于调试
  • SIGHUP:常用于重新加载配置

安全建议:优先使用SIGTERM,确保任务代码能捕获信号并执行清理操作。

定时任务配置

延时执行模式

from datetime import datetime, timedelta
from celery_tasks.task01 import send_sms

current_time = datetime.now()
# 转为UTC时间
utc_time = datetime.utcfromtimestamp(current_time.timestamp())
delay_time = timedelta(seconds=10)
scheduled_time = utc_time + delay_time

# 使用apply_async设置执行时间
result = send_sms.apply_async(args=["测试用户"], eta=scheduled_time)
print(result.id)

Beat调度配置模式

from datetime import timedelta
import celery
from celery.schedules import crontab

backend = 'redis://:password@ip:port/14'
broker = 'redis://:password@ip:port/15'

app = celery.Celery('demo', backend=backend, broker=broker, include=[
    'celery_tasks.task01',
    'celery_tasks.task02'
])

app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False

# 配置定时任务
app.conf.beat_schedule = {
    'task-every-6-seconds': {
        'task': 'celery_tasks.task01.send_sms',
        'schedule': timedelta(seconds=6),
        'args': ('定时任务参数',),
    },
    'daily-task': {
        'task': 'celery_tasks.task01.send_email',
        'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
        'args': ('定时邮件参数',),
    },
}

启动命令

# 启动Worker
celery -A celery_tasks.mycelery worker -l info

# 启动Beat调度器
celery -A celery_tasks.mycelery beat -l info

# 旧版合并启动(5.4.0前可用)
celery -A celery_tasks.mycelery worker -B -l info

Beat进程会周期性检查配置,将到期任务发送至任务队列。

任务组合模式

Chain(链式任务)

任务按顺序执行,前一个任务的输出作为下一个任务的输入:

from celery import Celery, chain

app = Celery('demo', broker='pyamqp://guest@localhost//')

@app.task(name="step_one")
def process_first(x):
    return x + 1

@app.task(name="step_two")
def process_second(x):
    return x * 2

@app.task(name="step_three")
def process_third(x):
    return x - 3

# 链式调用
result = chain(
    process_first.s(10),
    process_second.s(),
    process_three.s()
)()

# 按任务名称调用
result = chain(
    app.tasks['step_one'].s(10),
    app.tasks['step_two'].s(),
    app.tasks['step_three'].s(3)
)()

Group(并行任务)

任务并行执行,独立运行:

from celery import group

@app.task
def double_value(x):
    return x * 2

# 并行执行5个任务
group_result = group(double_value.s(i) for i in range(5))()
print("结果列表:", group_result.get())  # 输出: [0, 2, 4, 6, 8]

混合使用示例:先获取task1结果,再传给task2和task3并行执行:

from celery import Celery, group, Task
import pymysql
import random

pymysql.install_as_MySQLdb()

result_backend = 'db+mysql://user:password@localhost:3306/celery'
broker_url = 'redis://:@127.0.0.1:6379/0'

app = Celery('demo', backend=result_backend, broker=broker_url)
app.conf.update(
    task_serializer='json',
    result_serializer='json',
    accept_content=['json'],
)

class UserProfile:
    def __init__(self, age, gender):
        self.age = age
        self.gender = gender
    
    def to_dict(self):
        return {'age': self.age, 'gender': self.gender}

@app.task
def generate_user():
    random_age = random.randint(1, 100)
    profile = UserProfile(random_age, "男")
    return random_age, profile.to_dict()

@app.task
def process_user(age_data, info, param):
    print(f"处理用户: 年龄={info['age']}, 性别={info['gender']}, 参数={param}")
    return 3

@app.task
def send_notification(age_data, info, param):
    print(f"发送通知: 年龄={info['age']}, 性别={info['gender']}, 参数={param}")
    return 3
# 生产端调用
result_1 = generate_user.apply_async().get()
print("第一步结果:", result_1)

# 并行执行后续任务
result_2_and_3 = group(
    process_user.s(*result_1, "参数1"),
    send_notification.s(*result_1, "参数2")
)().get()
print("并行任务结果:", result_2_and_3)
执行结果

Chord(回调任务)

所有并行任务完成后执行回调任务:

from celery import chord

@app.task
def calculate_sum(results):
    return sum(results)

@app.task
def process_item(x):
    return x * 2

# 和弦任务:并行执行后执行回调
chord_result = chord((process_item.s(i) for i in range(5)), calculate_sum.s())()
print("结果:", chord_result.get())  # 输出: 20 (0+2+4+6+8)

总结

  • Chain:顺序执行任务
  • Group:并行执行任务
  • Chord:并行执行后执行回调

Django集成实践

独立项目结构

app01/
mycelery/
├── __init__.py
├── config.py          # 配置文件
├── main.py            # 主程序入口
└── sms/               # 短信任务模块
    ├── __init__.py
    └── tasks.py       # 任务文件(必须命名为tasks)

config.py配置

CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = "Asia/Shanghai"
broker_url = "redis://:password@ip:port/14"
result_backend = "redis://:password@ip:port/15"

sms/tasks.py

from mycelery.main import app
import time

@app.task(name='send_sms_task')
def send_sms(mobile):
    """发送短信任务"""
    print(f"向手机号{mobile}发送短信成功!")
    time.sleep(5)
    return "send_sms OK"

main.py

import os
from celery import Celery

app = Celery("sms")
app.config_from_object("mycelery.config")
app.autodiscover_tasks(["mycelery.sms"])

# 启动命令(建议在mycelery根目录执行)
# celery -A mycelery.main worker --loglevel=info

Django视图调用

from mycelery.sms.tasks import send_sms

def celery_test(request):
    send_sms.delay("15867416745")
    return HttpResponse("任务已提交")

与Django融合模式

集成思路

在main.py中初始化Django环境:

import os
import django
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings.dev')
django.setup()

app = Celery("project")
app.config_from_object("mycelery.config")
app.autodiscover_tasks(["mycelery.sms", "mycelery.email"])

# 启动命令
# celery -A mycelery.main worker -l info

Django配置集成

将Celery配置写入Django的settings.py:

# settings/dev.py

# 任务队列
CELERY_BROKER_URL = 'redis://:123456@127.0.0.1:6379/14'
CELERY_RESULT_BACKEND = 'redis://:123456@127.0.0.1:6379/15'

# 时区
CELERY_TIMEZONE = TIME_ZONE
CELERY_ENABLE_UTC = True

# 防止死锁
CELERY_FORCE_EXECV = True
CELERYD_CONCURRENCY = 200
CELERY_ACKS_LATE = True
CELERYD_MAX_TASKS_PER_CHILD = 500
CELERYD_TIME_LIMIT = 10 * 60
CELERY_DISABLE_RATE_LIMITS = True

# 序列化配置
CELERY_ACCEPT_CONTENT = ['json', 'pickle']

# 结果过期时间
from datetime import timedelta
CELERY_RESULT_EXPIRES = timedelta(hours=1)

# 定时任务配置
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
    "user-add": {
        "task": "add",
        "schedule": 10,
    },
    "backend-cleanup": {
        "task": "celery.backend_cleanup",
        "schedule": crontab(hour=0, minute=0),
    },
}

初始化配置

在Django的__init__.py中添加:

import pymysql
from mycelery import app as celery_app

pymysql.install_as_MySQLdb()

__all__ = ['celery_app']

分布式任务模块

# users/tasks.py
from celery import shared_task
import logging

logger = logging.getLogger("django")

@shared_task(name="send_verification_code")
def send_sms(task_id, mobile, data):
    """异步发送验证码"""
    try:
        from ronglianyunapi import send_sms as sms_send
        return sms_send(task_id, mobile, data)
    except Exception as e:
        logger.error(f"手机号{mobile}发送失败: {e}")

视图调用

from .tasks import send_sms

class SMSAPIView(APIView):
    def get(self, request, mobile):
        redis_conn = get_redis_connection("sms_code")
        
        # 检查发送间隔
        interval = redis_conn.ttl(f"interval_{mobile}")
        if interval != -2:
            return Response(
                {"error": f"发送过于频繁,请{interval}秒后重试"},
                status=status.HTTP_400_BAD_REQUEST
            )
        
        # 生成验证码
        code = f"{random.randint(0, 999999):06d}"
        expire_time = settings.SMS_EXPIRE
        
        # 异步发送
        send_sms.delay(settings.SMS_TASK_ID, mobile, (code, expire_time // 60))
        
        # 存储验证码
        pipe = redis_conn.pipeline()
        pipe.multi()
        pipe.setex(f"sms_{mobile}", expire_time, code)
        pipe.setex(f"interval_{mobile}", settings.SMS_INTERVAL, "_")
        pipe.execute()
        
        return Response({"message": "OK"})

启动方式

# 方式一:普通运行
celery -A project worker -l INFO

# 方式二:守护进程运行
celery multi start worker -A project -E \
    --pidfile="/path/worker.pid" \
    --logfile="/path/celery.log" \
    -l info -n worker1

# 停止
celery multi stop worker -A project --pidfile="/path/worker.pid"

测试调用

$ python manage.py shell
>>> from users.tasks import send_sms
>>> res = send_sms.delay()
>>> res = send_sms.apply_async(countdown=15)
>>> res.id
>>> res.state
>>> res.result

Worker参数详解

celery worker [OPTIONS]

# 基础选项
-n, --hostname HOSTNAME        # 主机名(用于区分多Worker)
-D, --detach                   # 后台运行
-S, --statedb PATH             # 状态数据库路径
-l, --loglevel [级别]          # 日志级别
-O, --optimization [模式]      # 优化配置
--prefetch-multiplier N        # 预取倍数

# 进程池选项
-c, --concurrency N            # 并发数(默认CPU核心数*2)
-P, --pool [类型]              # 进程池类型
    - prefork: 多进程池
    - eventlet: 事件协程池
    - gevent: Gevent协程池
    - solo: 单进程
    - threads: 线程池

# 任务限制
--time-limit FLOAT             # 单任务硬超时
--soft-time-limit FLOAT        # 单任务软超时
--max-tasks-per-child N        # 单Worker最大任务数
--max-memory-per-child N(MB)   # 单Worker最大内存

# 队列选项
--purge                        # 清空队列
-Q, --queues LIST              # 监听的队列
-X, --exclude-queues LIST      # 排除的队列

# 功能选项
--without-gossip               # 禁用节点状态同步
--without-mingle               # 禁用Worker间协调
--without-heartbeat            # 禁用心跳
--autoscale MIN,MAX            # 自动扩缩容

# Beat选项
-B, --beat                     # 启动Beat调度器
-s, --schedule FILE            # 调度文件路径

# 守护进程选项
-f, --logfile PATH             # 日志文件
--pidfile PATH                 # PID文件
--uid/--gid/--umask            # 用户/组/权限

多区域分布式部署

架构设计

通过中心节点统一调度,任务下发至各区域执行:

项目结构/
MyCelery/
├── __init__.py
├── celery_config.py
├── main.py
├── signals.py
└── regions/
    ├── __init__.py
    └── tasks.py

多实例配置

import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'CloudEye.settings')

def create_celery_instance(name, broker, backend):
    app = Celery(name)
    app.conf.update(
        broker_url=broker,
        result_backend=backend,
        task_default_queue='Celery',
    )
    app.config_from_object('django.conf:settings', namespace='CELERY')
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    return app

# 各区域配置
REGION1_BROKER = 'redis://1.1.1.1:6379/0'
REGION1_BACKEND = 'db+mysql://user:pass@1.1.1.1/dbname'

REGION2_BROKER = 'redis://1.1.10.1:6379/0'
REGION2_BACKEND = 'db+mysql://user:pass@1.1.10.1/dbname'

REGION3_BROKER = 'redis://1.1.20.1:6379/0'
REGION3_BACKEND = 'db+mysql://user:pass@1.1.20.1/dbname'

app_region1 = create_celery_instance('region1', REGION1_BROKER, REGION1_BACKEND)
app_region2 = create_celery_instance('region2', REGION2_BROKER, REGION2_BACKEND)
app_region3 = create_celery_instance('region3', REGION3_BROKER, REGION3_BACKEND)

app_dict = {
    'region1': app_region1,
    'region2': app_region2,
    'region3': app_region3,
}

任务定义

from main import app_region1, app_region2, app_region3

@app_region1.task
def health_check():
    return "区域1巡检完成"

@app_region2.task
def health_check():
    return "区域2巡检完成"

@app_region3.task
def health_check():
    return "区域3巡检完成"

视图接口

def dispatch_task(request):
    if request.method == 'POST':
        region = request.POST.get('region')
        app = app_dict.get(region)
        
        if not app:
            return JsonResponse({'error': '无效区域'}, status=400)
        
        result = app.send_task('regions.tasks.health_check')
        task_result = result.get()
        
        return JsonResponse({'message': f'{region}任务结果: {task_result}'})
    
    return render(request, 'dispatch.html')

启动各区域Worker

# 区域1
celery -A main.app_region1 worker --loglevel=info

# 区域2
celery -A main.app_region2 worker --loglevel=info

# 区域3
celery -A main.app_region3 worker --loglevel=info

Signature对象

Signature对象为Celery任务流管理提供强大支持,实现参数自动传递、任务链组构建、回调错误处理等功能。

参数自动传递

# 使用.s()语法糖
result = chain(
    app.tasks['task_alpha'].s(arg1, arg2),
    app.tasks['task_beta'].s()
).apply_async()

# 使用Signature类
from celery import Signature

result = chain(
    Signature('task_alpha', args=(arg1, arg2)),
    Signature('task_beta')
).apply_async()

任务链构建

from celery import chain

chain_result = chain(
    app.tasks['step_one'].s(value),
    app.tasks['step_two'].s(),
    app.tasks['step_three'].s()
).apply_async()

任务组构建

from celery import group

group_result = group(
    app.tasks['task_a'].s(param1),
    app.tasks['task_b'].s(param2),
    app.tasks['task_c'].s(param3)
).apply_async()

和弦构建

from celery import chord, group

chord_result = chord(
    group(
        app.tasks['task_x'].s(data1),
        app.tasks['task_y'].s(data2),
        app.tasks['task_z'].s(data3)
    )
)(app.tasks['final_callback'].s())

回调与错误处理

# 使用.s()语法糖
sig = app.tasks['target_task'].s(arg1, arg2).on_error(app.tasks['error_handler'].s())
sig.apply_async()

# 使用Signature类
sig = Signature('target_task', args=(arg1, arg2)).on_error(Signature('error_handler'))
sig.apply_async()

序列化支持

sig = app.tasks['some_task'].s(arg1, arg2)
serialized = sig.dumps()
restored = Signature(serialized)
restored.apply_async()

Celery信号机制

Celery基于Blinker库实现信号机制,允许在任务生命周期各阶段插入自定义逻辑。

常用信号列表

  • task_prerun:任务执行前触发
  • task_postrun:任务执行后触发
  • task_success:任务成功完成触发
  • task_failure:任务失败时触发
  • task_retry:任务重试时触发
  • task_revoked:任务被撤销时触发
  • task_unknown:收到未知任务时触发
  • task_rejected:任务被拒绝时触发
  • before_task_publish:任务发布前触发
  • after_task_publish:任务发布后触发

信号连接方式

装饰器方式

from celery import Celery
from celery.signals import task_prerun, task_postrun, task_success, task_failure

app = Celery('demo', broker='pyamqp://guest@localhost//')

@task_prerun.connect
def before_task(sender=None, task_id=None, task=None, args=None, kwargs=None, **extras):
    print(f"任务{task_id}即将执行,参数: {args}, {kwargs}")

@task_postrun.connect
def after_task(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **extras):
    print(f"任务{task_id}已完成,状态: {state},返回值: {retval}")

@task_success.connect
def on_success(sender=None, result=None, **extras):
    print(f"任务{sender.name}成功,结果: {result}")

@task_failure.connect
def on_failure(sender=None, task_id=None, exception=None, args=None, kwargs=None, traceback=None, einfo=None, **extras):
    print(f"任务{task_id}失败,异常: {exception}")

直接连接方式

from celery.signals import task_prerun

def task_handler(sender=None, task_id=None, **kwargs):
    print(f"处理任务{task_id}")

task_prerun.connect(task_handler)

信号参数详解

task_prerun

@task_prerun.connect
def handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **extras):
    # sender: 任务对象
    # task_id: 任务唯一ID
    # task: 任务实例
    # args: 位置参数
    # kwargs: 关键字参数
    pass

task_postrun

@task_postrun.connect
def handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **extras):
    # retval: 任务返回值
    # state: 任务最终状态
    pass

task_failure

from celery.signals import task_failure
import traceback

@task_failure.connect
def handler(sender=None, task_id=None, exception=None, args=None, kwargs=None, traceback=None, einfo=None, **extras):
    formatted_tb = ''.join(traceback.format_exception(einfo.type, einfo.exception, einfo.tb))
    print(f'任务{sender.name} ID:{task_id} 失败异常:{exception}')
    print(f'堆栈:{formatted_tb}')

task_retry

@task_retry.connect
def handler(sender=None, request=None, reason=None, **extras):
    # reason: 重试原因
    pass

task_revoked

@task_revoked.connect
def handler(sender=None, request=None, terminated=None, signum=None, expired=None, **extras):
    # terminated: 是否被终止
    # signum: 终止信号
    # expired: 是否已过期
    pass

信号方法

connect - 连接信号处理器

from celery.signals import task_prerun

def handler(sender=None, task_id=None, **kwargs):
    print(f"任务{task_id}执行前")

task_prerun.connect(handler)

disconnect - 断开信号连接

task_prerun.disconnect(handler)

send - 手动触发信号

task_prerun.send(sender="test_task", task_id="12345")

has_listeners - 检查监听器

if task_prerun.has_listeners():
    print("信号有监听器")
else:
    print("信号无监听器")

全局与局部信号

全局信号使用

# signals.py
from celery.signals import task_failure
import logging

error_logger = logging.getLogger('task_error')

@task_failure.connect
def handle_failure(sender, task_id, exception, **kwds):
    error_logger.error(f'任务{sender.name} {task_id}失败: {exception}')
# main.py
import os
from MyCelery import signals
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')

app = Celery('demo')
app.config_from_object('mycelery.config')
app.autodiscover_tasks(['mycelery.tasks',])

局部信号(按任务区分)

from celery import Celery, signals

app = Celery('demo')
app.conf.broker_url = 'redis://localhost:6379/0'

@signals.task_prerun.connect
def handler(sender, task_id, task, args, kwargs, **options):
    if task.name == 'demo.task_alpha':
        print(f"Task {task.name} 即将运行")

@signals.task_failure.connect
def failure_handler(sender, task_id, args, kwargs, einfo, **options):
    if sender.name == 'demo.task_alpha':
        print(f"Task {sender.name} 失败")

@app.task(name='demo.task_alpha')
def task_alpha():
    raise ValueError("task_alpha错误")

@app.task(name='demo.task_beta')
def task_beta():
    raise ValueError("task_beta错误")

自定义任务类方式

from celery import Celery, Task
import logging

app = Celery('demo')
app.conf.broker_url = 'redis://localhost:6379/0'

logger = logging.getLogger('custom')

class CustomTask(Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.error(f"任务{self.name} [{task_id}] 失败: {exc}")

@app.task(base=CustomTask, name='demo.custom_task')
def custom_task():
    raise ValueError("自定义任务错误")
标签: Celery

相关文章

Linux crontab 详解

1) crontab 是什么cron 是 Linux 的定时任务守护进程;crontab 是用来编辑/查看“按时间周期执行命令”的表(cron table)。常见两类:用户 crontab:每个用户一份(crontab -e 编辑)系统级 crontab / cron.d:可指定执行用户(/etc/crontab、/etc/cron.d/*)2) crontab 时间...

富文本里可以允许的 HTML 属性

一、所有标签默认允许的安全属性(极少)class        (可选)id           (通常建议禁用)title️ 注意:id 容易被滥用做锚点注入,很多系统直接禁用class 允许的话最好只允许固定前缀(如 editor-*)二、a 标签允许属性<a href="" t...

Mac 安装 Node.js 指南

方法一:通过官网安装包(最简单,适合初学者)如果你只是想快速安装并开始使用,这是最直接的方法。访问 Node.js 官网。页面会显示两个版本:LTS (Recommended For Most Users):长期支持版,最稳定。建议选这个。Current:最新特性版,包含最新功能但可能不够稳定。下载 .pkg 安装包并运行。按照安装向导点击“下一步”即可完成。方法二:使用 Homebrew 安装(...

Dom\HTML_NO_DEFAULT_NS 的副作用:自动加闭合标签

在使用Dom\HTMLDocument时,Dom\HTML_NO_DEFAULT_NS 将禁止在解析过程中设置元素的命名空间, 此设置是为了与DOMDocument向后兼容而存在的。当使用它时,已知的一个副作用就是:自动加闭合标签例如 </img> 为什么会这样?当你使用:Dom\HTML_NO_DEFAULT_NS文档会变成 无命名空间模式,此时内部更接近 XML...

Laravel 事件和监听器创建

在 Laravel 中,使用 Artisan 命令创建 Events(事件) 和 Listeners(监听器) 是非常高效的。你可以通过以下几种方式来实现:1. 手动创建单个 Event如果你只想创建一个事件类,可以使用 make:event 命令:Bashphp artisan make:event UserRegistered执行后,文件将生成在 app/Even...

自定义域名解析神器 dnsmasq

什么是 dnsmasq?dnsmasq 是一个轻量级、功能强大的网络服务工具,专为小型和中等规模网络设计。它是一个综合的网络基础设施解决方案[1]。dnsmasq 能做什么?功能说明应用场景DNS 转发与缓存将 DNS 查询转发到上游服务器(ISP、Google DNS 等),并在本地缓存结果加快 DNS 查询速度,减少外部 DNS 流量本地 DNS解析本地网络设备的主机名,无需编辑&n...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。