Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。

它是一个专注于实时处理的任务队列,同时也支持任务调度。

Celery 有广泛、多样的用户与贡献者社区,你可以通过 IRC 或是 邮件列表 加入我们。

Celery 是开源的,使用 BSD 许可证 授权。
中文官网文档地址:http://docs.jinkan.org/docs/celery/

举例

创建task.py

1
2
3
4
5
6
7
8
from celery import Celery

app =Celery('tasks',broker='redis://127.0.0.1:6379/0',backend='redis://127.0.0.1:6379/0')


@app.task
def add(x, y):
return x + y

启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
[root@localhost python]# celery -A task worker --loglevel=info
/usr/lib/python2.7/site-packages/celery/platforms.py:796: RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the -u option.

User information: uid=0 euid=0 gid=0 egid=0

uid=uid, euid=euid, gid=gid, egid=egid,

-------------- celery@localhost.localdomain v4.1.1 (latentcall)
---- **** -----
--- * *** * -- Linux-3.10.0-693.el7.x86_64-x86_64-with-centos-7.4.1708-Core 2018-06-06 05:25:17
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x7f5c3038c150
- ** ---------- .> transport: redis://127.0.0.1:6379/0
- ** ---------- .> results: redis://127.0.0.1:6379/0
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery


[tasks]
. task.add

[2018-06-06 05:25:17,206: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2018-06-06 05:25:17,218: INFO/MainProcess] mingle: searching for neighbors
[2018-06-06 05:25:18,241: INFO/MainProcess] mingle: all alone
[2018-06-06 05:25:18,281: INFO/MainProcess] celery@localhost.localdomain ready.
[2018-06-06 05:26:01,848: INFO/MainProcess] Received task: task.add[69c90eef-309c-45e7-a888-1357129aae6d]

注意安装redis模块:pip install redis

执行调度

导入模块并执行调度

1
2
3
In [1]: from task import add  #注意模块路径
In [2]: add.delay(2,3)
Out[2]: <AsyncResult: 4e4ab6aa-266e-4178-b668-39ada6382113>

查看Celery终端输出

1
[2018-06-06 05:32:20,583: INFO/ForkPoolWorker-1] Task task.add[4e4ab6aa-266e-4178-b668-39ada6382113] succeeded in 0.00385068000105s: 5

查看结果

1
2
3
4
In [3]: result = add.delay(2,3)

In [4]: print(result.get())
5

查看模块的属性列表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
In [5]: dir(result)

Out[5]:
['TimeoutError',
'__class__',
'__copy__',
'__delattr__',
'__dict__',
'__doc__',
'__eq__',
'__format__',
'__getattribute__',
'__hash__',
'__init__',
'__module__',
'__ne__',
'__new__',
u'__reduce__',
'__reduce_args__',
'__reduce_ex__',
'__repr__',
'__setattr__',
'__sizeof__',
'__str__',
'__subclasshook__',
'__unicode__',
'__unicode_repr__',
'__weakref__',
'_cache',
'_get_task_meta',
'_iter_meta',
'_maybe_reraise_parent_error',
'_maybe_set_cache',
'_on_fulfilled',
'_parents',
'_set_cache',
'_to_remote_traceback',
'app',
'as_tuple',
'backend',
'build_graph',
'children',
'collect',
'failed',
'forget',
'get',
'get_leaf',
'graph',
'id',
'info',
'iterdeps',
'maybe_reraise',
'maybe_throw',
'on_ready',
'parent',
'ready',
'result',
'revoke',
'state',
'status',
'successful',
'supports_native_join',
'task_id',
'then',
'throw',
'traceback',
'wait']

获取id

1
2
In [6]: print(result.id)
30f7fce2-2687-448c-816f-11334efc6442

登录reids查看

1
2
3
4
5
6
7
8
9
[root@localhost python]# redis-cli
127.0.0.1:6379> KEYS *
1) "_kombu.binding.celery.pidbox"
2) "celery-task-meta-69c90eef-309c-45e7-a888-1357129aae6d"
3) "_kombu.binding.celery"
4) "_kombu.binding.celeryev"
5) "celery-task-meta-4e4ab6aa-266e-4178-b668-39ada6382113"
127.0.0.1:6379> get celery-task-meta-4e4ab6aa-266e-4178-b668-39ada6382113
"{\"status\": \"SUCCESS\", \"traceback\": null, \"result\": 5, \"task_id\": \"4e4ab6aa-266e-4178-b668-39ada6382113\", \"children\": []}"

Comments

2018-06-16