文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

基于Celery、Redis和Florence 2实战异步机器学习推理

2024-11-29 20:40

关注

译者 | 朱先忠

审校 | 重楼

本文将通过一个最小但功能强大的实例教程,引导你进入异步机器学习推理开发领域。

简介

大多数机器学习服务教程都专注于实时同步服务的介绍,这允许对预测请求做出即时响应。然而,这种方法可能难以应对流量激增,对于长时间运行的任务来说并不理想。因此,类似于这样的任务还需要更强大的机器来快速响应;否则,一旦客户端或服务器发生故障,预测结果通常会丢失。

在本文中,我们将演示如何使用分布式任务调度框架Celery和开源分布式键值对数据库Redis作为异步工作线程来运行机器学习模型。试验中,我们将使用微软开源的统一视觉基础模型Florence 2,这是一种以其令人印象深刻的性能而闻名的视觉语言模型。本教程将提供一个最小但功能强大的示例;当然,您可以根据自己的实战场景进一步进行调整和扩展。

您可以在下面链接处查看该应用程序的演示:

https://coral-app-qdy8z.ondigitalocean.app/

总体来看,我们提供的解决方案的核心基于Celery框架,这是一个支持我们实现客户端/工作线程逻辑的Python库。它允许我们将计算工作分配给许多工作线程,从而提高机器学习推理应用场景对高负载和不可预测负载的可扩展性。

总体运行流程如下:

简化实例

让我们从一个简化的例子开始:

图片由作者本人提供

首先,通过如下命令运行Redis:

Docker run -p 6379:6379 redis

下面给出的是工作线程代码:

from celery import Celery
#配置Celery以使用Redis作为代理和后端
app = Celery(
"tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0"
)
# 定义一个简单的任务
@app.task
def add(x, y):
return x + y
if __name__ == "__main__":
app.worker_main(["worker", "--loglevel=info"])

相应的客户端代码如下:

from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")
print(f"{app.control.inspect().active()=}")
task_name = "tasks.add"
add = app.signature(task_name)
print("Gotten Task")
#向工作线程发送一个任务
result = add.delay(4, 6)
print("Waiting for Task")
result.wait()
#得到结果
print(f"Result: {result.result}")

运行上面代码,将给出了我们期望的结果:“Result: 10”。

实战案例

下面,我们继续讨论构建一个真正的基于Florence 2模型服务的实用型案例。

具体地说,我们将构建一个多容器图像字幕应用程序,该应用程序使用Redis进行任务排队,使用Celery进行任务分发,并使用本地卷或谷歌云存储实现潜在的图像存储。该应用程序的设计包含几个核心组件:模型推理、任务分配、客户端交互和文件存储。

架构概述

图片由作者本人提供

各组件分工如下:

接下来,我们进行各组件功能的更具体的剖析。

1.模型推理(Model.py)

首先,实现依赖关系和初始化:

import os
from io import BytesIO
import requests
from google.cloud import storage
from loguru import logger
from modeling_florence2 import Florence2ForConditionalGeneration
from PIL import Image
from processing_florence2 import Florence2Processor
model = Florence2ForConditionalGeneration.from_pretrained(
"microsoft/Florence-2-base-ft"
)
processor = Florence2Processor.from_pretrained("microsoft/Florence-2-base-ft")

上面代码完成的任务如下:

然后,进行图像下载(Download_Image):

def download_image(url):
if url.startswith("http://") or url.startswith("https://"):
#处理HTTP/HTTPS URL
#…(从URL下载图像的代码)…
elif url.startswith("gs://"):
#处理谷歌云存储路径
#…(从GCS下载图像的代码)。
else:
#处理本地文件路径
# ... (code to open image from local path) ...

归纳一下的话,上面代码完成的任务如下:

接下来,执行推理(run_Inference):

def run_inference(url, task_prompt):
# …(使用donan_image函数下载图像的代码)。
try:
# …(打开和处理图像的代码)。
inputs = processor(text=task_prompt, images=image, return_tensors="pt")
except ValueError:
#错误处理
# …(使用模型生成字幕的代码)。
generated_ids = model.generate(
input_ids=inputs["input_ids"],
pixel_values=inputs["pixel_values"],
#……(模型生成参数)。
)
#…(解码生成的字幕的代码)。
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)[0]
#…(后处理生成的字幕的代码)。
parsed_answer = processor.post_process_generation(
generated_text, task=task_prompt, image_size=(image.width, image.height)
)
return parsed_answer

上面代码实现了编排图像字幕的过程,具体实现如下:

2.任务分配(worker.py)

首先,进行Celery设置:

import os
from celery import Celery
# ... 其他导入...
#从环境变量中获取Redis URL或使用默认值
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# 将Celery配置为使用Redis,作为代理和后端
app = Celery("tasks", broker=REDIS_URL, backend=REDIS_URL)
# ... (Celery配置) ...

这段代码完成的任务是:将Celery设置为使用Redis作为任务分发的消息代理。

接下来,定义任务(inference_task):

@app.task(bind=True, max_retries=3)
def inference_task(self, url, task_prompt):
#……(日志记录和错误处理)。
return run_inference(url, task_prompt)
上面代码具体实现了:
l 定义将由Celery工作线程执行的推理任务。
l 此任务从model.py调用run_inference函数。
最后,执行工作线程:
if __name__ == "__main__":
app.worker_main(["worker", "--loglevel=info", "--pool=solo"])

启动一个监听并执行任务的Celery工作线程。

3.客户端交互(Client.py)

首先,实现Celery连接:

import os
from celery import Celery
#从环境变量中获取Redis URL或使用默认值
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
#将Celery配置为使用Redis作为代理和后端
app = Celery("tasks", broker=REDIS_URL, backend=REDIS_URL)

使用Redis作为消息代理建立与Celery的连接。

接下来,进行任务提交(send_inference_Task):

def send_inference_task(url, task_prompt):
task = inference_task.delay(url, task_prompt)
print(f"Task sent with ID: {task.id}")
# 等待结果
result = task.get(timeout=120)
return result

上述代码完成了两项任务:

再接下来,实现Docker集成(Docker compose.yml)。

这一步主要是使用Docker Compose定义多容器设置:

此处花朵图片由RoonZ nl在Unsplash(https://unsplash.com/photos/yellow-and-blue-petaled-flower-vjDbHCjHlEY?utm_cnotallow=creditCopyText&utm_medium=referral&utm_source=unsplash)上提供

图片由作者本人提供

其实,您可以使用以下一句命令运行上面完整的栈操作:

docker-compose up

小结

至此,整个任务完成!归纳一下,我们刚刚探索了使用Celery、Redis和Florence 2构建异步机器学习推理系统的全过程。具体地说,本文演示了如何有效地使用Celery进行任务分配,使用Redis进行消息代理,使用Florence 2模型进行图像字幕处理。通过采用异步工作流方案,您可以处理大量请求,提高性能,并增强ML推理应用程序的整体弹性。最后,我们提供的Docker Compose设置允许您使用单个命令来自行运行整个系统。

准备好下一步操作了吗?将本文介绍的这种架构部署到云端可能会遇到一系列挑战。

项目源码地址:https://github.com/CVxTz/celery_ml_deploy

项目演示地址: https://coral-app-qdy8z.ondigitalocean.app/

译者介绍

朱先忠,51CTO社区编辑,51CTO专家博客、讲师,潍坊一所高校计算机教师,自由编程界老兵一枚。

原文Asynchronous Machine Learning Inference with Celery, Redis, and Florence 2,作者:Youness Mansar

链接:https://towardsdatascience.com/asynchronous-machine-learning-inference-with-celery-redis-and-florence-2-be18ebc0fbab。

想了解更多AIGC的内容,请访问:

51CTO AI.x社区

https://www.51cto.com/aigc/

来源:51CTO内容精选内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯