Celery Backend Result 存储时区错误问题及函数重写

这是一个古老的 bug,在网上已经有很多解决方法
Celery Repo 负责修复这个 bug 的 PR 因为提交 PR 的人没有写 Unit Test,所以就没有 merge。

本文讲述的是 MongoBackend 重写,使用 MongoDB 的朋友可以参考。其他 custom backend 也可以参考。

问题重现

配置好了 Celery 后,设置了 timezone
enable_utc = True,
timezone = "Asia/Shanghai",
启动 Celery,并尝试触发 Task,然后查看数据库

上面是修改后的国内时间,下面是没有修改的
发现没有做任何修改的时候,我们在 backend database 存resultdate_done的时区是默认的 utc

celery/backends/base.py

1
2
3
4
5
6
7
8
9
def _get_result_meta(self, result,
state, traceback, request, format_date=True,
encode=False):
if state in self.READY_STATES:
date_done = datetime.utcnow()
if format_date:
date_done = date_done.isoformat()
else:
date_done = None

date_done = datetime.utcnow() 改成 date_done = datetime.now()就可以了,但是要在 python package 的目录下修改。

解决办法

在一个与别人合作的项目里,如果告诉别人去自己 python package 的安装目录下修改代码,那必然是不优雅的
因此,想要使用 MongoDB 存储 Celery Task 的results,那就额外加一个 python 文件,例如:patch.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from celery.backends.mongodb import MongoBackend
import pymongo
from datetime import datetime

class MongoResult(MongoBackend):
def _get_result_meta(self, result,
state, traceback, request, format_date=True,
encode=False):
if state in self.READY_STATES:
date_done = datetime.now()
if format_date:
date_done = date_done.isoformat()
else:
date_done = None

meta = {
'status': state,
'result': result,
'traceback': traceback,
'children': self.current_task_children(request),
'date_done': date_done,
}

if request and getattr(request, 'group', None):
meta['group_id'] = request.group
if request and getattr(request, 'parent_id', None):
meta['parent_id'] = request.parent_id

if self.app.conf.find_value_for_key('extended', 'result'):
if request:
request_meta = {
'name': getattr(request, 'task', None),
'args': getattr(request, 'args', None),
'kwargs': getattr(request, 'kwargs', None),
'worker': getattr(request, 'hostname', None),
'retries': getattr(request, 'retries', None),
'queue': request.delivery_info.get('routing_key')
if hasattr(request, 'delivery_info') and
request.delivery_info else None,
}
if getattr(request, 'stamps', None):
request_meta['stamped_headers'] = request.stamped_headers
request_meta.update(request.stamps)

if encode:
# args and kwargs need to be encoded properly before saving
encode_needed_fields = {"args", "kwargs"}
for field in encode_needed_fields:
value = request_meta[field]
encoded_value = self.encode(value)
request_meta[field] = ensure_bytes(encoded_value)

meta.update(request_meta)

return meta

然后在需要使用 Celery 的地方修改result_backend ,其中patch 就是 module 名字

1
2
3
4
5
from extensions import MongoBackend

# ....
result_backend = "patch.MongoResult",
# ....

运行以后,timezone 就是正确的了。