文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

怎么使用python中的生成器实现周期性报文发送功能

2023-07-05 10:17

关注

这篇文章主要介绍了怎么使用python中的生成器实现周期性报文发送功能的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇怎么使用python中的生成器实现周期性报文发送功能文章都会有所收获,下面我们一起来看看吧。

使用python中的生成器实现周期性发送列表中数值的报文发送功能。

功能开发背景:提取cantest工具采集到的现场报文数据,希望使用原始的现场数据模拟验证程序现有逻辑,需要开发一个工具能够自动按照报文发送周期依次发送采集到的报文数据中的一个数值。

功能开发需求:多个报文发送对象共用同一个报文发送线程,多个对象间的报文发送周期不同,多个对象间的总报文发送数据长度不同,能够允许报文发送过程中断及恢复某个对象的报文发送。

功能开发实现逻辑:在固定发送对象某个数值的基础程序版本上增加新的功能,考虑使用python中生成器实现周期性提取对象数值发送报文的功能。

目前只需要发送两个对象的报文数据,先定义两个使用yield生成器:

    def yield_item_value_1(self):        item_value_list = self.item_value_dict[item_list[0]]        for i in range(len(item_value_list)):            yield item_value_list[i]    def yield_item_value_2(self):        item_value_list = self.item_value_dict[item_list[1]]        for i in range(len(item_value_list)):            yield item_value_list[i]

报文发送线程中的run()函数:

    def run(self):        # 实时更新item的被选状态        self.get_checkbox_res_func()        # 获取每个对象的实际物理值        self.get_item_value_dict()        self.item1_value_func = self.yield_item_value_1()        self.item2_value_func = self.yield_item_value_2()        while self.Flag:            if any(msg_send_flag_dict.values()):                # 每隔second秒执行func函数                timer = Timer(0.01, self.tick_10ms_func)                timer.start()                self.send_working_msg(self.working_can_device, self.working_can_channel)                timer.join()            else:                mes_info = "Goodbye *** 自动发送所有报文数据结束!!!"                toastone = wx.MessageDialog(None, mes_info, "信息提示",                                            wx.YES_DEFAULT | wx.ICON_QUESTION)                if toastone.ShowModal() == wx.ID_YES:  # 如果点击了提示框的确定按钮                    toastone.Destroy()  # 则关闭提示框                break

报文周期性发送函数:

    def send_working_msg(self, can_device, device_id):        for idx in range(len(item_list)):            if msg_send_flag_dict[item_list[idx]] == 1:                msg_id_idx = msg_operation_list.index("报文ID") - 1                msg_id = eval(str(self.operation_dict[item_list[idx]][msg_id_idx]).strip())                # 获取报文发送帧类型                msg_type_idx = msg_operation_list.index("帧类型") - 1                msg_type = str(self.operation_dict[item_list[idx]][msg_type_idx])                msg_type = 1 if msg_type == "扩展帧" else 0                # 获取报文发送周期                msg_cycle_idx = msg_operation_list.index("周期(ms)") - 1                msg_cycle = int(self.operation_dict[item_list[idx]][msg_cycle_idx])                send_cycle = msg_cycle / 10                if msg_tick_10ms_dict["_".join(["tick", "10ms", str(idx)])] >= send_cycle:                    # 开始喂值                    if idx == 0:                        try:                            item_phyValue = next(self.item1_value_func)                        except StopIteration:                            msg_send_flag_dict[item_list[idx]] = 0                            continue                    else:                        try:                            item_phyValue = next(self.item2_value_func)                        except StopIteration:                            msg_send_flag_dict[item_list[idx]] = 0                            continue                    msg_data = self.get_item_msg(item_list[idx], item_phyValue)                    if send_msg(msg_id, msg_type, msg_data, can_device, device_id, 0):                        print("发送报文成功")                        # print("msg_data", msg_data)                        msg_tick_10ms_dict["_".join(["tick", "10ms", str(idx)])] = 0                    else:                        pass                        # print("发送报文失败")                        # mes_info = "发送报文失败"                        # toastone = wx.MessageDialog(None, mes_info, "信息提示",                        #                             wx.YES_DEFAULT | wx.ICON_QUESTION)                        # if toastone.ShowModal() == wx.ID_YES:  # 如果点击了提示框的确定按钮                        #     toastone.Destroy()  # 则关闭提示框

功能实现逻辑的待优化点:存在多个对象就需要定义多个存储报文数据的生成器。

上述功能实现逻辑优化如下:

    def set_yield_func(self):        item_yield_func_dict = dict()        for i in range(len(item_list)):            item_yield_func_dict[item_list[i]] = self.yield_item_value(i)        return item_yield_func_dict    def yield_item_value(self, item_idx):        item_value_list = self.item_value_dict[item_list[item_idx]]        for i in range(len(item_value_list)):            yield item_value_list[i]

报文发送线程的run()函数中调用这个存储对象报文发送数据生成器的字典item_yield_func_dict:

    def run(self):        # 实时更新item的被选状态        self.get_checkbox_res_func()        # 获取每个对象的实际物理值        self.get_item_value_dict()        self.item_yield_func_dict = self.set_yield_func()        …………

从存储每个对象生成器的字典item_yield_func_dict中获取生成器对象:

                    try:                        item_phyValue = next(self.item_yield_func_dict[item_list[idx]])                    except StopIteration:                        msg_send_flag_dict[item_list[idx]] = 0                        continue

关于“怎么使用python中的生成器实现周期性报文发送功能”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“怎么使用python中的生成器实现周期性报文发送功能”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注编程网行业资讯频道。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯