Python 神器 Celery 源码解析(5)

Python猫

共 19477字,需浏览 39分钟

 · 2021-11-27

Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 也是一款消息队列工具,可用于处理实时数据以及任务调度。

本文是是celery源码解析的第篇,在前4篇里分别介绍了vine, py-amqp和kombu:

  1. 神器 celery 源码解析- vine实现Promise功能
  2. 神器 celery 源码解析- py-amqp实现AMQP协议
  3. 神器 celery 源码解析- kombu,一个python实现的消息库
  4. 神器 celery 源码解析- kombu的企业级算法

基本扫清celery的基础库后,我们正式进入celery的源码解析,本文包括下面几个部分:

  • celery应用示例
  • celery项目概述
  • worker启动流程跟踪
  • client启动流程跟踪
  • celery的app
  • worker模式启动流程
  • 小结

celery应用示例

启动celery之前,我们先使用docker启动一个redis服务,作为broker:

$ docker run -p 6379:6379 --name redis -d redis:6.2.3-alpine

使用telnet监控redis服务,观测任务调度情况:

$ telnet 127.0.0.1 6379
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
monitor
+OK

下面是我们的celery服务代码 myapp.py :

# myapp.py
from celery import Celery

app = Celery(
    'myapp',
    broker='redis://localhost:6379/0',
    result_backend='redis://localhost:6379/0'
)

@app.task
def add(x, y):
    print("add", x, y)
    return x + y

if __name__ == '__main__':
    app.start()

打开一个新的终端,使用下面的命令启动celery的worker服务:

$ python myapp.py worker -l DEBUG

正常情况下,可以看到worker正常启动。启动的时候会显示一些banner信息,包括AMQP的实现协议,任务等:

$ celery -A myapp worker -l DEBUG
 
 -------------- celery@bogon v5.1.2 (sun-harmonics)
--- ***** ----- 
-- ******* ---- macOS-10.16-x86_64-i386-64bit 2021-09-08 20:33:45
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         myapp:0x7f855079e730
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 12 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . myapp.add

[2021-09-08 20:33:46,220: INFO/MainProcess] Connected to redis://localhost:6379/0
[2021-09-08 20:33:46,234: INFO/MainProcess] mingle: searching for neighbors
[2021-09-08 20:33:47,279: INFO/MainProcess] mingle: all alone
[2021-09-08 20:33:47,315: INFO/MainProcess] celery@bogon ready.

再开启一个终端窗口,作为client执行下面的代码, 可以看到add函数正确的执行,获取到计算 16+16 的结果 32。注意: 这个过程是远程执行的,使用的是delay方法,函数的打印print("add", x, y)并没有输出:

$ python
>>> from myapp import add
>>> task = add.delay(16,16)
>>> task

>>> task.get()
32

在celery的worker服务窗口,可以看到类似下面的输出。收到一个执行任务 myapp.add 的请求, 请求的uuid是 5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b ,参数数组是 [16, 16] 正常执行后返回结果32。

[2021-11-11 20:13:48,040: INFO/MainProcess] Task myapp.add[5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b] received
[2021-11-11 20:13:48,040: DEBUG/MainProcess] TaskPool: Apply <function fast_trace_task at 0x7fda086baa60> (args:('myapp.add''5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b', {'lang''py''task''myapp.add''id''5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b''shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id''5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b''parent_id': None, 'argsrepr''(16, 16)''kwargsrepr''{}''origin''gen63119@localhost''ignore_result': False, 'reply_to''97a3e117-c8cf-3d4c-97c0-c0a76aaf9a16''correlation_id''5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b''hostname''celery@localhost''delivery_info': {'exchange''''routing_key''celery''priority': 0, 'redelivered': None}, 'args': [16, 16], 'kwargs': {}}, b'[[16, 16], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]''application/json''utf-8') kwargs:{})
[2021-11-11 20:13:49,059: INFO/ForkPoolWorker-8] Task myapp.add[5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b] succeeded in 1.0166977809999995s: 32

在redis的monitor窗口,也可以可以看到类似的输出,展示了过程中一些对redis的操作命令:

+1636632828.304020 [0 172.16.0.117:51127] "SUBSCRIBE" "celery-task-meta-5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b"
+1636632828.304447 [0 172.16.0.117:51129] "PING"
+1636632828.305448 [0 172.16.0.117:51129] "LPUSH" "celery" "{\"body\": \"W1sxNiwgMTZdLCB7fSwgeyJjYWxsYmFja3MiOiBudWxsLCAiZXJyYmFja3MiOiBudWxsLCAiY2hhaW4iOiBudWxsLCAiY2hvcmQiOiBudWxsfV0=\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"myapp.add\", \"id\": \"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\", \"shadow\": null, \"eta\": null, \"expires\": null, \"group\": null, \"group_index\": null, \"retries\": 0, \"timelimit\": [null, null], \"root_id\": \"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\", \"parent_id\": null, \"argsrepr\": \"(16, 16)\", \"kwargsrepr\": \"{}\", \"origin\": \"gen63119@localhost\", \"ignore_result\": false}, \"properties\": {\"correlation_id\": \"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\", \"reply_to\": \"97a3e117-c8cf-3d4c-97c0-c0a76aaf9a16\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"celery\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"20dbd584-b669-4ef0-8a3b-41d19b354690\"}}"
+1636632828.307040 [0 172.16.0.117:52014] "MULTI"
+1636632828.307075 [0 172.16.0.117:52014] "ZADD" "unacked_index" "1636632828.038743" "20dbd584-b669-4ef0-8a3b-41d19b354690"
+1636632828.307088 [0 172.16.0.117:52014] "HSET" "unacked" "20dbd584-b669-4ef0-8a3b-41d19b354690" "[{\"body\": \"W1sxNiwgMTZdLCB7fSwgeyJjYWxsYmFja3MiOiBudWxsLCAiZXJyYmFja3MiOiBudWxsLCAiY2hhaW4iOiBudWxsLCAiY2hvcmQiOiBudWxsfV0=\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"myapp.add\", \"id\": \"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\", \"shadow\": null, \"eta\": null, \"expires\": null, \"group\": null, \"group_index\": null, \"retries\": 0, \"timelimit\": [null, null], \"root_id\": \"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\", \"parent_id\": null, \"argsrepr\": \"(16, 16)\", \"kwargsrepr\": \"{}\", \"origin\": \"gen63119@localhost\", \"ignore_result\": false}, \"properties\": {\"correlation_id\": \"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\", \"reply_to\": \"97a3e117-c8cf-3d4c-97c0-c0a76aaf9a16\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"celery\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"20dbd584-b669-4ef0-8a3b-41d19b354690\"}}, \"\", \"celery\"]"
...

我们再一次回顾下图,对比一下示例,加强理解:



hello-world-example-routing
  • 我们先启动一个celery的worker服务作为消费者
  • 再启动一个窗口作为生产者执行task
  • 使用redis作为broker,负责生产者和消费者之间的消息通讯
  • 最终生成者的task,作为消息发送到远程的消费者上执行,执行的结果又通过网络回传给生产者

上面示例展示了celery作为一个分布式任务调度系统的执行过程,本地的任务调用,通过AMQP协议的包装,作为消息发送到远程的消费者执行。


celery项目概述

解析celery采用的代码版本5.0.5, 主要模块结构:

模块描述
appcelery的app实现
appscelery服务的三种主要模式,worker,beat和multi
backends任务结果存储
bin命令行工具实现
concurrency各种并发实现,包括线程,gevent,asyncpool等
events事件实现
worker服务启动环节实现
beat.py&&schedules.py定时和调度实现
result.py任务结果实现
signals.py一些信号定义
status.py一些状态定义

从项目结构看,模块较多,功能复杂。不过我们已经搞定了vine, py-amqp和kombu三个库,接下来只需要理解worker,beat和multi三种服务模型,就可以较好的了解celery这个分布式系统如何构建。


worker启动流程跟踪

worker的启动命令 celery -A myapp worker -l DEBUG 使celery作为一个模块,入口在main文件的main函数:

# ch23-celery/celery-5.0.5/celery/__main__.py
def main():
    """Entrypoint to the ``celery`` umbrella command."""
    """celery命令入口"""
    ...
    # 具体执行的main函数
    from celery.bin.celery import main as _main
    sys.exit(_main())

celery命令作为主命令,加载celery-app的同时,还会启动worker子命令:

# ch23-celery/celery-5.0.5/celery/bin/celery.py
def celery(ctx, app, broker, result_backend, loader, config, workdir,
           no_color, quiet, version):
    """Celery command entrypoint."""
    ...
    ctx.obj = CLIContext(app=app, no_color=no_color, workdir=workdir,
                         quiet=quiet)
    # worker/beat/events三个主要子命令参数
    # User options
    worker.params.extend(ctx.obj.app.user_options.get('worker', []))
    beat.params.extend(ctx.obj.app.user_options.get('beat', []))
    events.params.extend(ctx.obj.app.user_options.get('events', []))

def main() -> int:
    """Start celery umbrella command.

    This function is the main entrypoint for the CLI.

    :return: The exit code of the CLI.
    "
""
    return celery(auto_envvar_prefix="CELERY")

在worker子命令中创建worker并启动:

# ch23-celery/celery-5.0.5/celery/bin/worker.py
def worker(ctx, hostname=None, pool_cls=None, app=None, uid=None, gid=None,
           loglevel=None, logfile=None, pidfile=None, statedb=None,
           **kwargs):
    # 创建和启动worker
    worker = app.Worker(
        hostname=hostname, pool_cls=pool_cls, loglevel=loglevel,
        logfile=logfile,  # node format handled by celery.app.log.setup
        pidfile=node_format(pidfile, hostname),
        statedb=node_format(statedb, hostname),
        no_color=ctx.obj.no_color,
        quiet=ctx.obj.quiet,
        **kwargs)
    worker.start()

下面是创建worker的方式,创一个 celery.apps.worker:Worker 对象:

# ch23-celery/celery-5.0.5/celery/app/base.py
def Worker(self):
    # 创建worker
    return self.subclass_with_self('celery.apps.worker:Worker')

服务启动过程中,调用链路如下:

                                 +----------+
                             +--->app.celery|
                             |   +----------+
+---------+   +----------+   |
|main.main+--->bin.celery+---+
+---------+   +----------+   |
                             |   +----------+   +-----------+
                             +--->bin.worker+--->apps.worker|
                                 +----------+   +-----------+

在这个服务启动过程中,创建了celery-application和worker-application两个应用程序。至于具体的启动流程,我们暂时跳过,先看看客户端的流程。


client启动流程分析

示例client的启动过程包括下面4步: 1 创建celery-application, 2 创建task 3 调用task的delay方法执行任务得到一个异步结果 4 最后使用异步结果的get方法获取真实结果

task是通过app创建的装饰器创建的Promise对象:

# ch23-celery/celery-5.0.5/celery/app/base.py
task_cls = 'celery.app.task:Task'

def task(self, *args, **opts):
    """Decorator to create a task class out of any callable.
    "
""
    def inner_create_task_cls(shared=True, filter=None, lazy=True, **opts):
        
        def _create_task_cls(fun):
            
            ret = PromiseProxy(self._task_from_fun, (fun,), opts,
                                       __doc__=fun.__doc__)
            return ret

        return _create_task_cls
    return inner_create_task_cls(**opts)

task实际上是一个由Task基类动态创建的子类:

def _task_from_fun(self, fun, name=None, base=None, bind=False, **options):
    base = base or self.Task
    task = type(fun.__name__, (base,), dict({
                'app': self,
                'name': name,
                'run': run,
                '_decorated': True,
                '__doc__': fun.__doc__,
                '__module__': fun.__module__,
                '__annotations__': fun.__annotations__,
                '__header__': staticmethod(head_from_fun(fun, bound=bind)),
                '__wrapped__': run}, **options))
    add_autoretry_behaviour(task, **options)
    # 增加task
    self._tasks[task.name] = task
    task.bind(self)  # connects task to this app
    add_autoretry_behaviour(task, **options)
    return task

任务的执行使用app的send_task方法进行:

# ch23-celery/celery-5.0.5/celery/app/task.py
def delay(self, *args, **kwargs):
    ...
    return app.send_task(
                self.name, args, kwargs, task_id=task_id, producer=producer,
                link=link, link_error=link_error, result_cls=self.AsyncResult,
                shadow=shadow, task_type=self,
                **options
            )

可以看到,client作为生产者启动任务,也需要创建celery-application,下面我们就先看celery-application的实现。


celery的app两大功能

Celery的构造函数:

class Celery:
    
    # 协议类
    amqp_cls = 'celery.app.amqp:AMQP'
    backend_cls = None
    # 事件类
    events_cls = 'celery.app.events:Events'
    loader_cls = None
    log_cls = 'celery.app.log:Logging'
    # 控制类
    control_cls = 'celery.app.control:Control'
    # 任务类
    task_cls = 'celery.app.task:Task'
    # 任务注册中心
    registry_cls = 'celery.app.registry:TaskRegistry'
    ...
    
    def __init__(self, main=None, loader=None, backend=None,
                 amqp=None, events=None, log=None, control=None,
                 set_as_current=True, tasks=None, broker=None, include=None,
                 changes=None, config_source=None, fixups=None, task_cls=None,
                 autofinalize=True, namespace=None, strict_typing=True,
                 **kwargs):
        # 启动步骤
        self.steps = defaultdict(set)
        # 待执行的task
        self._pending = deque()
        # 所有任务
        self._tasks = self.registry_cls(self._tasks or {})
        ...
        self.__autoset('broker_url', broker)
        self.__autoset('result_backend', backend)
        ...
        self.on_init()
        _register_app(self)

可以看到celery类提供了一些默认模块类的名称,可以根据这些类名动态创建对象。app对象任务的处理使用一个队列作为pending状态的任务容器,使用TaskRegistry来管理任务的注册。

任务通过task装饰器,记录到celery的TaskRegistry中:

def task(self, *args, **opts):
    ...
    # 增加task
    self._tasks[task.name] = task
    task.bind(self)  # connects task to this app
    add_autoretry_behaviour(task, **options)
    ...

celery另外一个核心功能是提供到broker的连接:

def _connection(self, url, userid=None, password=None,
                virtual_host=None, port=None, ssl=None,
                connect_timeout=None, transport=None,
                transport_options=None, heartbeat=None,
                login_method=None, failover_strategy=None, **kwargs):
    conf = self.conf
    return self.amqp.Connection(
        url,
        userid or conf.broker_user,
        password or conf.broker_password,
        virtual_host or conf.broker_vhost,
        port or conf.broker_port,
        transport=transport or conf.broker_transport,
        ssl=self.either('broker_use_ssl', ssl),
        heartbeat=heartbeat,
        login_method=login_method or conf.broker_login_method,
        failover_strategy=(
            failover_strategy or conf.broker_failover_strategy
        ),
        transport_options=dict(
            conf.broker_transport_options, **transport_options or {}
        ),
        connect_timeout=self.either(
            'broker_connection_timeout', connect_timeout
        ),
    )
broker_connection = connection

@cached_property
def amqp(self):
    """AMQP related functionality: :class:`~@amqp`."""
    return instantiate(self.amqp_cls, app=self)

AMQP的实现,是依赖kombu提供的AMQP协议封装:

from kombu import Connection, Consumer, Exchange, Producer, Queue, pools

class AMQP:
    """App AMQP API: app.amqp."""

    Connection = Connection

然后使用我们熟悉的Queue,Consumer,Producer进行消息的生成和消费:

def Queues(self, queues, create_missing=None,
           autoexchange=None, max_priority=None):
    ...
    return self.Queues(
            queues, self.default_exchange, create_missing,
            autoexchange, max_priority, default_routing_key,
        )
        
def TaskConsumer(self, channel, queues=None, accept=None, **kw):
    ...
    return self.Consumer(
        channel, accept=accept,
        queues=queues or list(self.queues.consume_from.values()),
        **kw
    )

def _create_task_sender(self):
    ...
    producer.publish(
                body,
                exchange=exchange,
                routing_key=routing_key,
                serializer=serializer or default_serializer,
                compression=compression or default_compressor,
                retry=retry, retry_policy=_rp,
                delivery_mode=delivery_mode, declare=declare,
                headers=headers2,
                **properties
            )
    ...

celery-app的两大功能,管理task和管理AMQP连接,我们有一个大概的了解。


worker模式启动流程

worker模式启动在WorkController中,将服务分成不同的阶段,然后将各个阶段组装成一个叫做蓝图(Blueprint)的方式进行管理:

class WorkController:
    # 内部类
    class Blueprint(bootsteps.Blueprint):
        """Worker bootstep blueprint."""

        name = 'Worker'
        default_steps = {
            'celery.worker.components:Hub',
            'celery.worker.components:Pool',
            'celery.worker.components:Beat',
            'celery.worker.components:Timer',
            'celery.worker.components:StateDB',
            'celery.worker.components:Consumer',
            'celery.worker.autoscale:WorkerComponent',
        }
    
    def __init__(self, app=None, hostname=None, **kwargs):
        self.blueprint = self.Blueprint(
            steps=self.app.steps['worker'],
            on_start=self.on_start,
            on_close=self.on_close,
            on_stopped=self.on_stopped,
        )
        self.blueprint.apply(self, **kwargs)

启动蓝图:

def start(self):
    try:
        # 启动worker
        self.blueprint.start(self)
    except WorkerTerminate:
        self.terminate()
    except Exception as exc:
        logger.critical('Unrecoverable error: %r', exc, exc_info=True)
        self.stop(exitcode=EX_FAILURE)
    except SystemExit as exc:
        self.stop(exitcode=exc.code)
    except KeyboardInterrupt:
        self.stop(exitcode=EX_FAILURE)

启动步骤,比较简单,大概代码如下:

class StepType(type):
    """Meta-class for steps."""

    name = None
    requires = None

class Step(metaclass=StepType):
    ...
    
    def instantiate(self, name, *args, **kwargs):
        return symbol_by_name(name)(*args, **kwargs)
    
    def include_if(self, parent):
        return self.enabled
        
    def _should_include(self, parent):
        if self.include_if(parent):
            return True, self.create(parent)
        return False, None

    def create(self, parent):
        """Create the step."""

从Step大概可以看出:

  • 每个步骤,可以有依赖requires
  • 每个步骤,可以有具体的动作instantiate
  • 步骤具有树状的父子结构,可以自动创建上级步骤

比如一个消费者步骤, 依赖Connection步骤。启动的时候对Connection进行消费。两者代码如下:

class ConsumerStep(StartStopStep):
    """Bootstep that starts a message consumer."""

    requires = ('celery.worker.consumer:Connection',)
    consumers = None

    def start(self, c):
        channel = c.connection.channel()
        self.consumers = self.get_consumers(channel)
        for consumer in self.consumers or []:
            consumer.consume()

class Connection(bootsteps.StartStopStep):
    """Service managing the consumer broker connection."""

    def __init__(self, c, **kwargs):
        c.connection = None
        super().__init__(c, **kwargs)

    def start(self, c):
        c.connection = c.connect()
        info('Connected to %s', c.connection.as_uri())

在Blueprint中创建和管理这些step:

class Blueprint:
    
    def __init__(self, steps=None, name=None,
                 on_start=None, on_close=None, on_stopped=None):
        self.name = name or self.name or qualname(type(self))
        # 并集
        self.types = set(steps or []) | set(self.default_steps)
        ...
        self.steps = {}

    def apply(self, parent, **kwargs):
        steps = self.steps = dict(symbol_by_name(step) for step in self.types)

        self._debug('Building graph...')
        for S in self._finalize_steps(steps):
            step = S(parent, **kwargs)
            steps[step.name] = step
            order.append(step)
        self._debug('New boot order: {%s}',
                    ', '.join(s.alias for s in self.order))
        for step in order:
            step.include(parent)
        return self

启动Blueprint:

def start(self, parent):
    self.state = RUN
    if self.on_start:
        self.on_start()
    for i, step in enumerate(s for s in parent.steps if s is not None):
        self._debug('Starting %s', step.alias)
        self.started = i + 1
        step.start(parent)
        logger.debug('^-- substep ok')

通过将启动过程拆分成多个step单元,然后组合单元构建成graph,逐一启动。


小结

本篇我们正式学习了一下celery的使用流程,了解celery如果使用redis作为broker,利用服务作为消费者,使用客户端作为生成者,完成一次远程任务的执行。简单探索worker服务模式的启动流程,重点分析celery-application的管理task和管理连接两大功能实现。

小技巧

celery中展示了一种动态创建类和对象的方法:

task = type(fun.__name__, (Task,), dict({
                'app': self,
                'name': name,
                'run': run,
                '_decorated': True,
                '__doc__': fun.__doc__,
                '__module__': fun.__module__,
                '__annotations__': fun.__annotations__,
                '__header__': staticmethod(head_from_fun(fun, bound=bind)),
                '__wrapped__': run}, **options))()

通过type函数创了一个动态的task子类,然后执行 () 实例化一个task子对象。

参考链接

  • 以编程方式定义类 https://python3-cookbook.readthedocs.io/zh_CN/latest/c09/p18_define_classes_programmatically.html
Python猫技术交流群开放啦!群里既有国内一二线大厂在职员工,也有国内外高校在读学生,既有十多年码龄的编程老鸟,也有中小学刚刚入门的新人,学习氛围良好!想入群的同学,请在公号内回复『交流群』,获取猫哥的微信(谢绝广告党,非诚勿扰!)~


还不过瘾?试试它们




分享几款超好用的 REST API 工具

Python进阶:自定义对象实现切片功能

len(x) 击败 x.len(),从内置函数看 Python 的设计思想

当谈论迭代器时,我谈些什么?

面向对象编程是否走向了消亡?

别再问了,万字长文教你用 Celery 执行和周期任务(多图)


如果你觉得本文有帮助
请慷慨分享点赞,感谢啦
浏览 89
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报