文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

超实用 Demo:使用 FastAPI、Celery、RabbitMQ 和 MongoDB 实现一个异步任务工作流

2024-12-01 14:02

关注

今天分享一份代码,使用 Celery、RabbitMQ 和 MongoDB 实现一个异步任务工作流,你可以修改 task.py 来实现你自己的异步任务。

架构图如下:

其中 Celery 来执行异步任务,RabbitMQ 作为消息队列,MongoDB 存储任务执行结果,FastAPI 提供 Web 接口。

以上所有模块均可使用 Docker 一键部署。

下面为 Demo 使用方法:

1、确保本机已安装 Docker、Git

2、下载源代码:

git clone https://github.com/aarunjith/async-demo.git

3、部署并启动:

cd async-demo
docker compose up --build

4、启动一个异步任务:

$ curl -X POST http://localhost:8080/process

任务会发送到消息队列,同时会立即返回一个任务 id:

❯ curl -X POST http://localhost:8080/process
{"status":"PENDING","id":"a129c666-7b5b-45f7-ba54-9d7b96a1fe58","error":""}%

5、查询任务状态:

curl -X POST http://localhost:8080/check_progress/<task_id>

任务完成后的返回结果如下:

 ❯ curl -X POST http://localhost:8080/check_progress/a129c666-7b5b-45f7-ba54-9d7b96a1fe58
{"status":"SUCEESS","data":"\"hello\""}%

代码目录结构如下:

其中 app.py 如下:

from fastapi import FastAPI
from celery.result import AsyncResult
from tasks import start_processing
from loguru import logger
from pymongo import MongoClient
import uvicorn

# Lets create a connection to our backend where celery stores the results
client = MongoClient("mongodb://mongodb:27017")

# Default database and collection names that Celery create
db = client['task_results']
coll = db["celery_taskmeta"]

app = FastAPI()


@app.post('/process')
async def process_text_file():
'''
Process endpoint to trigger the start of a process
'''
try:
result = start_processing.delay()
logger.info(f'Started processing the task with id {result.id}')
return {
"status": result.state,
'id': result.id,
'error': ''
}
except Exception as e:
logger.info(f'Task Execution failed: {e}')
return {
"status": "FAILURE",
'id': None,
'error': e
}


@app.post('/check_progress/{task_id}')
async def check_async_progress(task_id: str):
'''
Endpoint to check the task progress and fetch the results if the task is
complete.
'''
try:
result = AsyncResult(task_id)
if result.ready():
data = coll.find({'_id': task_id})[0]
return {'status': 'SUCEESS', 'data': data['result']}
else:
return {"status": result.state, "error": ''}
except Exception as e:
data = coll.find({'_id': task_id})[0]
if data:
return {'status': 'SUCEESS', 'data': data['result']}
return {'status': 'Task ID invalid', 'error': e}

if __name__ == "__main__":
uvicorn.run("app:app", host='0.0.0.0', port='8080')

如果要实现自己的任务队列,就修改 task.py 来添加自己的异步任务,可以整合到自己的项目中。

来源:Python七号内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯