前述

本篇讲述Celery的原理和基本使用

基本原理

Celery基于Broker传递执行任务,当tasks调用被调用时,先序列化,然后传递到Broker,worker取出来执行。

下面是celery基本的架构。

Redis Pub/Sub模式

Broker通常是RabbitMQ和Redis

序列化

Celery通过Python的pickle模块或json序列化。

通过下面的例子简单看看Celery如何序列化Python对象的。

有两个Python模块,task和app。

## task.py
from celery import Celery

app = Celery(
    'demo',
    broker='redis://:@127.0.0.1:6379/1',
    backend='redis://:@127.0.0.1:6379/1')


@app.task
def add(x, y):
    return x + y
# app.py
from task import add

if __name__ == '__main__':
    print('start task')
    result = add.delay(2, 18)

我们在终端执行python app.py,连接Redis我们可以看到多了一个celery key,它是Redis的list类型,后加入的放到最前面。查看某一项内容是这样的。

{\"body\": \"W1syLCAxOF0sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"test.add\", \"id\": \"fd93e14c-984b-4931-87f6-8547eb8f91e7\", \"shadow\": null, \"eta\": null, \"expires\": null, \"group\": null, \"retries\": 0, \"timelimit\": [null, null], \"root_id\": \"fd93e14c-984b-4931-87f6-8547eb8f91e7\", \"parent_id\": null, \"argsrepr\": \"(2, 18)\", \"kwargsrepr\": \"{}\", \"origin\": \"gen11663@localhost\"}, \"properties\": {\"correlation_id\": \"fd93e14c-984b-4931-87f6-8547eb8f91e7\", \"reply_to\": \"b99f0911-fce8-3812-8c71-26f80016b102\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"celery\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"d48c84e6-68d5-4403-9aaa-c40849c1d515\"}}"

启动一个worker

celery work -A task -l info

可以看到结果被执行,查看Redis会看到有一个celery-task-meta-taskid的key,它是一个string,里面存放了task执行的结果和状态。

更多Celery的工作流程,查看celery消息的编码和序列化Python之Celery使用详解

命令

实现机制


Published

Category

redis

Tags

Stay in Touch

Friendship Links