文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

celery动态添加任务

2023-01-31 08:19

关注

celery是一个基于Python的分布式调度系统,文档在这 ,最近有个需求,想要动态的添加任务而不用重启celery服务,找了一圈没找到什么好办法(也有可能是文档没看仔细),所以只能自己实现囉

为celery动态添加任务,首先我想到的是传递一个函数进去,让某个特定任务去执行这个传递过去的函数,就像这样

@app.task
def execute(func, *args, **kwargs):
    return func(*args, **kwargs)

很可惜,会出现这样的错误

kombu.exceptions.EncodeError: Object of type 'function' is not JSON serializable

换一种序列化方式

@app.task(serializer='pickle')
def execute(func, *args, **kwargs):
    return func(*args, **kwargs)

结果又出现一大串错误信息

ERROR/MainProcess] Pool callback raised exception: ContentDisallowed('Refusing to deserialize untrusted content of type pickle (application/x-python-serialize)',)
Traceback (most recent call last):
  File "/home/jl/.virtualenvs/test/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
    return obj.__dict__[self.__name__]
KeyError: 'chord'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/jl/.virtualenvs/test/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
    return obj.__dict__[self.__name__]
KeyError: '_payload'

换一种思路

func = import_string(func)

不知道这样是否可以,结果测试: No

哎,流年不利.

最后一直测试,一直测试,终于找到了一种办法,直接上代码

from importlib import import_module, reload

app.conf.CELERY_IMPORTS = ['task', 'task.all_task']

def import_string(import_name):
    import_name = str(import_name).replace(':', '.')
    modules = import_name.split('.')
    mod = import_module(modules[0])
    for comp in modules[1:]:
        if not hasattr(mod, comp):
            reload(mod)
        mod = getattr(mod, comp)
    return mod

@app.task
def execute(func, *args, **kwargs):
    func = import_string(func)
    return func(*args, **kwargs)

项目结构是这样的

├── celery_app.py
├── config.py
├── task
│   ├── all_task.py
│   ├── __init__.py

注意: 任务必须大于等于两层目录

以后每次添加任务都可以先添加到all_task.py里,调用时不用再重启celery服务

# task/all_task.py

def ee(c, d):
    return c, d, '你好'

# example
from celery_app import execute

execute.delay('task.all_task.ee', 2, 444)

ok,另外发现celery也支持任务定时调用,就像这样

execute.apply_async(args=['task.all_task.aa'], eta=datetime(2017, 7, 9, 8, 12, 0))

简单实现一个任务重复调用的功能

@app.task
def interval(func, seconds, args=(), task_id=None):
    next_run_time = current_time() + timedelta(seconds=seconds)
    kwargs = dict(args=(func, seconds, args), eta=next_run_time)
    if task_id is not None:
        kwargs.update(task_id=task_id)
    interval.apply_async(**kwargs)
    func = import_string(func)
    return func(*args)

大概意思就是先计算下次运行的时间,然后把任务添加到celery队列里,这里有个task_id有些问题,因为假设添加了每隔3s执行一个任务,
它的task_id默认会使用uuid生成,如果想要再移除这个任务就不太方便,自定task_id可能会好一些,另外也许需要判断task_id是否存在

AsyncResult(task_id).state

ok,再献上一个好用的函数

from inspect import getmembers, isfunction

def get_tasks(module='task'):
    return [{
        'name': 'task:{}'.format(f[1].__name__),
        'doc': f[1].__doc__,
    } for f in getmembers(import_module(module), isfunction)]

就这样.

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯