文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

python websockets实现server和client的通信

2023-09-02 19:54

关注

项目地址:https://github.com/aaugustin/websockets

文档地址:https://websockets.readthedocs.io/en/stable/

1、websockets

WebSocket是一种在单个TCP连接上进行全双工通讯的协议,使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就可以创建持久性的连接,并进行双向数据传输。主要有如下特点:

1)建立在 TCP 协议之上,服务器端的实现比较容易;

2)与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器;

3)数据格式比较轻量,性能开销小,通信高效;

4)可以发送文本,也可以发送二进制数据;

5)没有同源限制,客户端可以与任意服务器通信;

6)协议标识符是ws(如果加密,则为wss),服务器网址就是 URL。

包安装:pip install websockets

2、websockets常用函数

serve:在server端使用,等待客户端的连接。如果连接成功,返回一个websocket。

connect:在client端使用,用于建立连接。

send:发送数据,server和client双方都可以使用。

recv:接收数据,server和client双方都可以使用。

close:关闭连接,server和client双方都可以使用。

3、一个简单的服务端客户端测试程序

Server端代码:

websockets.serve(),用于等待客户端的连接。如果客户端调用connect方法成功,则返回一个websocket。

import asyncioimport websocketsIP_ADDR = "127.0.0.1"IP_PORT = "8888"async def server_hands(websocket):    """ 握手,通过接收hello,发送"123"来进行双方的握手    """    while True:        recv_text = await websocket.recv()        print("recv_text=" + recv_text)        if recv_text == "hello":            print("connected success")            await websocket.send("123")            return True        else:            await websocket.send("connected fail")async def server_recv(websocket):    """ 接收从客户端发来的消息并处理,再返给客户端ok。    """    while True:        try:            recv_text = await websocket.recv()            print("recv:", recv_text)            await websocket.send("ok!!!")        except websockets.ConnectionClosed as e:            # 客户端关闭连接,跳出对客户端的读取,结束函数            print(e.code)            await asyncio.sleep(0.01)            breakasync def server_run(websocket, path):    """ 握手并且接收数据    :param websocket:    :param path:    """    # 下面两个函数顺序执行    await server_hands(websocket)  # 握手    await server_recv(websocket)  # 接收客户端消息并处理# main functionif __name__ == '__main__':    print("======server main begin======")    server = websockets.serve(server_run, IP_ADDR, IP_PORT)  # 服务器端起server    asyncio.get_event_loop().run_until_complete(server)  # 事件循环中调用    asyncio.get_event_loop().run_forever()  # 一直运行

Client端代码:

通过调用connect方法与服务的建立通信。

通过发送"hello",接收"123"来进行双方握手的校验,只有握手成功,才可以继续发送和接收数据。

import asyncioimport websocketsIP_ADDR = "127.0.0.1"IP_PORT = "8888"async def client_hands(websocket):    """ 握手,通过发送hello,接收"123"来进行双方的握手。        若成功,则跳出循环,结束函数;若失败,则继续发送hello.    """    while True:        await websocket.send("hello")        response_str = await websocket.recv()        if "123" in response_str:            print("握手成功")            return Trueasync def client_send(websocket):    """ 向服务器端发送消息    """    while True:        input_text = input("input text: ")        if input_text == "exit":            print(f'"exit", bye!')            await websocket.close(reason="exit")  # 关闭本次连接            return False        # 发送数据并打印服务端返回结果        await websocket.send(input_text)        recv_text = await websocket.recv()        print(f"{recv_text}")async def client_run():    """ 进行websocket连接    """    server_url = "ws://" + IP_ADDR + ":" + IP_PORT    print("websockets server url: ", server_url)    try:        async with websockets.connect(server_url) as websocket:            # 下面两行同步进行            await client_hands(websocket)  # 握手            await client_send(websocket)  # 发数据    except ConnectionRefusedError as e:        # 服务端未启动,或连接失败时退出.        print("e:", e)        return# main functionif __name__ == '__main__':    print("======client main begin======")    asyncio.get_event_loop().run_until_complete(client_run())  # 等价于asyncio.run(client_run())

运行结果:

服务端启动后,客户端依次输入111、exit。

服务端:

python server_1.py======server main begin======recv_text=helloconnected successrecv: 1111000

客户端:

>python client_1.py======client main begin======websockets server url:  ws://127.0.0.1:8888握手成功input text: 111ok!!!input text: exit"exit", bye!

4、重连机制

上面的例子中连接一段时间后,自动断开。再次发送请求时就会失败!所以,对客户端代码进行改进。我们先要知道的是:

1)当服务端拒绝连接(没有打开 ws ),客户端会抛出 ConnectionRefusedError 错误。

2)当服务端将连接上的 ws 关闭时(无论是正常关闭还是异常关闭),会抛出 ConnectionClosed 错误(是 websockets 内的错误类)。

4.1、捕获异常后重连

引用https://sakina.blog.csdn.net/article/details/108090049中的例子,捕获上述两个异常后重连。

import asyncioimport websockets as wsfrom websockets import ConnectionClosedcount = 0async def hello():    uri = "ws://localhost:8765"    while True:        try:            async with ws.connect(uri) as websocket:                await websocket.send('start')                while True:                    try:                        await websocket.recv()                    except ConnectionClosed as e:                        print(e.code)                        if e.code == 1006:print('restart')await asyncio.sleep(2)break        except ConnectionRefusedError as e:            print(e)            global count            if count == 10:                 return            count += 1            await asyncio.sleep(2)asyncio.get_event_loop().run_until_complete(hello())

这段代码的核心是两个 while :

第一层 while 的作用是循环 ws 连接上下文,当服务端拒绝连接时,会抛出 ConnectionRefusedError ,我们每隔 2 秒重试一次,最多重试 10 次。

第二层 while 的作用是保证 ws 连接一直处于接收状态(长连接),当 ws 被服务端关闭时,会抛出 ConnectionClosed ,一般我们会收到 1000 正常关闭码和 1006 服务端内部错误异常关闭码两种,在上文的代码中,我们收到异常 1006 关闭码时,就 break 退出 while 循环,从而自动关闭 ws 连接上下文,进行一次新的 ws 上下文连接。

注:在第二层不可以使用 continue 跳过本次循环,必须要重建 ws 上下文连接,否则 ws 连接总是处于被关闭状态。

4.2、定时任务

如果要加入定时任务,如自定义的心跳,我们需要创建一个异步 task 任务。任务函数如下:

async def ping(ws):    while True:        try:            await ws.send('ping')            await asyncio.sleep(10)        except:            break

之后在 ws 上下文流程中加入异步 task 执行任务。

async with ws.connect(uri) as websocket:    await websocket.send('start')    asyncio.create_task(ping(websocket))    while True:        try:            await websocket.recv()        except ConnectionClosed as e:            print(e.code)            if e.code == 1006:                print('restart')                await asyncio.sleep(2)                break

函数 ping() 是我们自定义的心跳 ping ,他会每隔 10 秒给服务端发送文本为 ping 的消息。

无论服务端将 ws 关闭还是 ws 连接不成功,此 task 必由于 ws 不可用而报错,我们 break 掉该 while 循环即可,从而此 task 执行完毕,当 ws 又被成功建立时,新的 task 定时任务又会被启动。

4.3、测试代码重连实现

支持:服务端未启动时重连;客户端主动退出;服务端异常时重连。

import asyncioimport websocketsfrom enum import Enum, unique@uniqueclass WSStatus(Enum):    Exit = 1  # 用户退出    Interrupt = 2  # 连接中断    Error = 3  # 程序内部错误    Success = 4  # 调用成功    Fail = 5  # 调用失败IP_ADDR = "127.0.0.1"IP_PORT = "8888"async def client_hands(websocket):    """ 握手,通过发送hello,接收"123"来进行双方的握手。        若成功,则跳出循环,结束函数;若失败,则继续发送hello.    """    while True:        await websocket.send("hello")        response_str = await websocket.recv()        if "123" in response_str:            print("握手成功")            return WSStatus.Successasync def client_send(websocket):    """ 向服务器端发送消息    """    input_text = input("input text: ")    if input_text == "exit":        print(f'"exit", bye!')        await websocket.close(reason="exit")  # 关闭本次连接        return WSStatus.Exit    # 发送数据并打印服务端返回结果    r = await websocket.send(input_text)    print("r:", r)    recv_text = await websocket.recv()    print(f"{recv_text}")    return WSStatus.Successasync def client_run():    """ 进行websocket连接    """    con_count = 0    MAX_RECONNECT_COUNT = 10  # 定义最大重连次数    server_url = "ws://" + IP_ADDR + ":" + IP_PORT    print("websockets server url: ", server_url)    while True:        try:            async with websockets.connect(server_url) as websocket:                # 下面两行同步进行                await client_hands(websocket)  # 握手                r = None                while True:                    try:                        r = await client_send(websocket)  # 发数据                        # print("r:", r)                        if r == WSStatus.Exit:  # 程序内部正常退出break                    except websockets.ConnectionClosed as e:                        print(e.code)                        if e.code == 1006:print('restart')await asyncio.sleep(0.1)break                if r == WSStatus.Exit:  # 程序内部正常退出                    break        except ConnectionRefusedError as e:            # 服务端未启动,或连接失败时退出.            print("e:", e)            if con_count == MAX_RECONNECT_COUNT:                return            con_count += 1            print(f"reconnect {con_count} times...")            await asyncio.sleep(0.01)# main functionif __name__ == '__main__':    print("======client main begin======")    asyncio.get_event_loop().run_until_complete(client_run())  # 等价于asyncio.run(client_run())

5、加密,增加headers调用

headers = {    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36',    'Origin': 'https://live.bilibili.com',    'Connection': 'Upgrade',    'Accept-Language': 'zh-CN,zh;q=0.9',}...sync with websockets.connect(self.wss_url,                 extra_headers=headers                ) as websocket:

参考:

1、python websockets的使用,实现server和client的通信

https://blog.csdn.net/liranke/article/details/120533682

2、wss连接的一些坑

https://blog.csdn.net/zouzhe121/article/details/103122532

3、测试用例代码

https://blog.csdn.net/liranke/article/details/120533682

4、python websockets 全双工通信

https://blog.csdn.net/qq_42195302/article/details/120150438

5、Python异步高性能websockets库简单入门(含重连机制与定时任务)

https://sakina.blog.csdn.net/article/details/108090049

6、python | websockets库的使用(增加headers调用)

https://www.cnblogs.com/Mz1-rc/p/16472294.html

来源地址:https://blog.csdn.net/weixin_34910922/article/details/128794946

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯