文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

大文件下载以及进度条展示和MD5校验

2023-01-30 22:01

关注

使用socket网络,上传一个视频,大小在3G左右

能够显示进度条,显示花费时间

下载使用TCP协议

server向client发送文件


新建文件server.py,代码如下:

import os
import json
import socket
import struct

filepath = r'E:\BaiduYunDownload\[电影天堂www.dy2018.com]移动迷宫3:死亡解药BD国英双语中英双字.mp4'

sk = socket.socket()
sk.bind(('127.0.0.1', 9000))
sk.listen()

conn, addr = sk.accept()
filename = os.path.basename(filepath)
filesize = os.path.getsize(filepath)
dic = {'filename': filename, 'filesize': filesize}
str_dic = json.dumps(dic).encode('utf-8')
len_dic = len(str_dic)
length = struct.pack('i', len_dic)
conn.send(length)  # dic的长度
conn.send(str_dic)  # dic
with open(filepath, 'rb') as f:  # 文件
    while filesize:
        content = f.read(4096)
        conn.send(content)
        filesize -= len(content)
        '''
        这里不能减等4096,因为文件,最后可能只有3字节。
        要根据读取的长度len(content),来计算才是合理的。
        '''
conn.close()

新建文件client.py,代码如下:

import json
import struct
import socket
import sys
import time
 
def processBar(num, total):  # 进度条
    rate = num / total
    rate_num = int(rate * 100)
    if rate_num == 100:
        r = '\r%s>%d%%\n' % ('=' * rate_num, rate_num,)
    else:
        r = '\r%s>%d%%' % ('=' * rate_num, rate_num,)
    sys.stdout.write(r)
    sys.stdout.flush
 
start_time = time.time()  # 开始时间
 
sk = socket.socket()
sk.connect(('127.0.0.1',9000))
 
dic_len = sk.recv(4)
dic_len = struct.unpack('i',dic_len)[0]
dic = sk.recv(dic_len)
str_dic = dic.decode('utf-8')
dic = json.loads(str_dic)
with open(dic['filename'],'wb') as f:  # 使用wb更严谨一些,虽然可以使用ab
    content_size = 0
    while True:
        content = sk.recv(4096)
        f.write(content)  # 写入文件
        content_size += len(content)  # 接收大小
        processBar(content_size,dic['filesize'])  # 执行进度条函数
        if content_size == dic['filesize']:break  # 当接收的总大小等于文件大小时,终止循环
             
sk.close()  # 关闭连接
 
end_time = time.time()  # 结束时间
print('本次下载花费了{}秒'.format(end_time - start_time))

先执行server.py,再执行client.py,效果如下:

1.gif

上面效果展示了100个等号,太长了,那么要缩减到1/3呢?

修改进度条函数

def processBar(num, total):  # 进度条
    rate = num / total
    rate_num = int(rate * 100)
    if rate_num == 100:
        r = '\r%s>%d%%\n' % ('=' * int(rate_num / 3), rate_num,) # 控制等号输出数量,除以3,表示显示1/3
    else:
        r = '\r%s>%d%%' % ('=' * int(rate_num / 3), rate_num,)
    sys.stdout.write(r)
    sys.stdout.flush

再次执行,效果如下:

2.gif

再来一个高级版,显示绿色的飞机

代码如下:

def processBar(num, total):  # 进度条
    rate = num / total
    rate_num = int(rate * 100)
    pretty = '✈'
    if rate_num == 100:
        r = '\r\033[32m{}\033[0m{}%\n'.format(pretty * int(rate_num / 5), rate_num,)
    else:
        r = '\r\033[32m{}\033[0m{}%'.format(pretty * int(rate_num / 5), rate_num,)
    sys.stdout.write(r)
    sys.stdout.flush

再次执行,效果如下:

3.gif

再来一个每秒换色

导入一个随机换色类

import random
 
 
class Prompt(object):  # 提示信息显示
    colour_dic = {
        'red': 31,
        'green': 32,
        'yellow': 33,
        'blue': 34,
        'purple_red': 35,
        'bluish_blue': 36,
        'white': 37,
    }
 
    def __init__(self):
        pass
 
    @staticmethod
    def display(msg, colour='white'):
        choice = Prompt.colour_dic.get(colour)
        # print(choice)
        if choice:
            info = "\033[1;{};1m{}\033[1;0m".format(choice, msg)
            return info
        else:
            return False
 
    def random_color(msg):  # 随机换色
        colour_list = []
        for i in Prompt.colour_dic:
            colour_list.append(i)
 
        length = len(colour_list) - 1  # 最大索引值
        index = random.randint(0, length)  # 随机数
 
        ret = Prompt.display(msg, colour_list[index])  # 随机颜色
        return ret

修改client.py

from Prompt import Prompt
 
def processBar(num, total):  # 进度条
    rate = num / total
    rate_num = int(rate * 100)
    pretty = Prompt.random_color('✈')  # 随机换色
    if rate_num == 100:
        r = '\r{}{}%\n'.format(pretty * int(rate_num / 5), rate_num,)
    else:
        r = '\r{}{}%'.format(pretty * int(rate_num / 5), rate_num,)
    sys.stdout.write(r)
    sys.stdout.flush

再次执行,效果如下:

4.gif


增加MD5校验

server.py

import os
import json
import socket
import struct
import hashlib

sk = socket.socket()
sk.bind(('127.0.0.1', 9000))
sk.listen()

conn, addr = sk.accept()
filename = '[电影天堂www.dy2018.com]移动迷宫3:死亡解药BD国英双语中英双字.mp4'  # 文件名
absolute_path = os.path.join('E:\BaiduYunDownload',filename)  # 文件绝对路径
buffer_size = 1024*1024  # 缓冲大小,这里表示1MB

md5obj = hashlib.md5()
with open(absolute_path, 'rb') as f:
    while True:
        content = f.read(buffer_size)  # 每次读取指定字节
        if content:
            md5obj.update(content)
        else:
            break  # 当内容为空时,终止循环

md5 = md5obj.hexdigest()
print(md5)  # 打印md5值

dic = {'filename':filename,
       'filename_md5':str(md5),'buffer_size':buffer_size,
       'filesize':os.path.getsize(absolute_path)}
str_dic = json.dumps(dic).encode('utf-8')
len_dic = len(str_dic)
length = struct.pack('i', len_dic)
conn.send(length)  # dic的长度
conn.send(str_dic)  # dic
with open(absolute_path, 'rb') as f:  # 文件
    while dic['filesize']:
        content = f.read(dic['buffer_size'])
        conn.send(content)
        dic['filesize'] -= len(content)
        '''
        这里不能减等4096,因为文件,最后可能只有3字节。
        要根据读取的长度len(content),来计算才是合理的。
        '''
conn.close()

client.py

import json
import struct
import socket
import sys
import time
import hashlib
import os
from Prompt import Prompt

def processBar(num, total):  # 进度条
    rate = num / total
    rate_num = int(rate * 100)
    pretty = Prompt.random_color('✈')
    if rate_num == 100:
        r = '\r{}{}%\n'.format(pretty * int(rate_num / 5), rate_num,)
    else:
        r = '\r{}{}%'.format(pretty * int(rate_num / 5), rate_num,)
    sys.stdout.write(r)
    sys.stdout.flush

start_time = time.time()  # 开始时间

sk = socket.socket()
sk.connect(('127.0.0.1',9000))

dic_len = sk.recv(4)
dic_len = struct.unpack('i',dic_len)[0]
dic = sk.recv(dic_len)
str_dic = dic.decode('utf-8')
dic = json.loads(str_dic)

md5 = hashlib.md5()
with open(dic['filename'],'wb') as f:  # 使用wb更严谨一些,虽然可以使用ab
    content_size = 0
    while True:
        content = sk.recv(dic['buffer_size'])  # 接收指定大小
        f.write(content)  # 写入文件
        content_size += len(content)  # 接收大小
        md5.update(content)  # 摘要

        processBar(content_size,dic['filesize'])  # 执行进度条函数
        if content_size == dic['filesize']:break  # 当接收的总大小等于文件大小时,终止循环

    md5 = md5.hexdigest()
    print(md5)  # 打印md5值
    if dic['filename_md5'] == str(md5):
        print(Prompt.display('md5校验正确--下载成功','green'))
    else:
        print(Prompt.display('文件验证失败', 'red'))
        os.remove(dic['filename'])  # 删除文件

sk.close()  # 关闭连接

end_time = time.time()  # 结束时间
print('本次下载花费了{}秒'.format(end_time - start_time))

执行输出:

doload8.gif


阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯