文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Python中怎么使用Flask实现进度条

2023-06-30 12:32

关注

本篇内容主要讲解“Python中怎么使用Flask实现进度条”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Python中怎么使用Flask实现进度条”吧!

使用Flask实现进度条

问题描述

Python异步处理,新起一个进程返回处理进度

解决方案

使用 tqdm 和 multiprocessing.Pool

安装

pip install tqdm

代码

import timeimport threadingfrom multiprocessing import Poolfrom tqdm import tqdmdef do_work(x):    time.sleep(x)    return xdef progress():    time.sleep(3)  # 3秒后查进度    print(f'任务有: {pbar.total} 已完成:{pbar.n}')tasks = range(10)pbar = tqdm(total=len(tasks))if __name__ == '__main__':    thread = threading.Thread(target=progress)    thread.start()    results = []    with Pool(processes=5) as pool:        for result in pool.imap_unordered(do_work, tasks):            results.append(result)            pbar.update(1)    print(results)

效果

Python中怎么使用Flask实现进度条

Flask

安装

pip install flask

main.py

import timefrom multiprocessing import Poolfrom tqdm import tqdmfrom flask import Flask, make_response, jsonifyapp = Flask(__name__)def do_work(x):    time.sleep(x)    return xtotal = 5  # 总任务数tasks = range(total)pbar = tqdm(total=len(tasks))@app.route('/run/')def run():    """执行任务"""    results = []    with Pool(processes=2) as pool:        for _result in pool.imap_unordered(do_work, tasks):            results.append(_result)            if pbar.n >= total:                pbar.n = 0  # 重置            pbar.update(1)    response = make_response(jsonify(dict(results=results)))    response.headers.add('Access-Control-Allow-Origin', '*')    response.headers.add('Access-Control-Allow-Headers', '*')    response.headers.add('Access-Control-Allow-Methods', '*')    return response@app.route('/progress/')def progress():    """查看进度"""    response = make_response(jsonify(dict(n=pbar.n, total=pbar.total)))    response.headers.add('Access-Control-Allow-Origin', '*')    response.headers.add('Access-Control-Allow-Headers', '*')    response.headers.add('Access-Control-Allow-Methods', '*')    return response

启动(以 Windows 为例)

set FLASK_APP=mainflask run

接口列表

test.html

<!DOCTYPE html><html lang="zh"><head>    <meta charset="UTF-8">    <title>进度条</title>    <script src="https://cdn.bootcss.com/jquery/3.0.0/jquery.min.js"></script>    <script src="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/js/bootstrap.min.js"></script>    <link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel="external nofollow"  rel="stylesheet"></head><body><button id="run">执行任务</button><br><br><div class="progress">    <div class="progress-bar" role="progressbar" aria-valuenow="1" aria-valuemin="0" aria-valuemax="100"         >0.00%    </div></div></body><script>    function set_progress_rate(n, total) {        //设置进度        var rate = (n / total * 100).toFixed(2);        if (n > 0) {            $(".progress-bar").attr("aria-valuenow", n);            $(".progress-bar").attr("aria-valuemax", total);            $(".progress-bar").text(rate + "%");            $(".progress-bar").css("width", rate + "%");        }    }    $("#run").click(function () {        //执行任务        $.ajax({            url: "http://127.0.0.1:5000/run/",            type: "GET",            success: function (response) {                set_progress_rate(100, 100);                console.log('执行完成,结果为:' + response['results']);            }        });    });    setInterval(function () {        //每1秒请求一次进度        $.ajax({            url: "http://127.0.0.1:5000/progress/",            type: "GET",            success: function (response) {                console.log(response);                var n = response["n"];                var total = response["total"];                set_progress_rate(n, total);            }        });    }, 1000);</script></html>

效果

Python中怎么使用Flask实现进度条

Flask使用简单异步任务

在Flask中使用简单异步任务最简洁优雅的原生实现:

from flask import Flaskfrom time import sleepfrom concurrent.futures import ThreadPoolExecutor# DOCS https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutorexecutor = ThreadPoolExecutor(2)app = Flask(__name__)@app.route('/jobs')def run_jobs():    executor.submit(some_long_task1)    executor.submit(some_long_task2, 'hello', 123)    return 'Two jobs was launched in background!'def some_long_task1():    print("Task #1 started!")    sleep(10)    print("Task #1 is done!")def some_long_task2(arg1, arg2):    print("Task #2 started with args: %s %s!" % (arg1, arg2))    sleep(5)    print("Task #2 is done!")if __name__ == '__main__':    app.run()

到此,相信大家对“Python中怎么使用Flask实现进度条”有了更深的了解,不妨来实际操作一番吧!这里是编程网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯