文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Python如何实现实时增量数据加载工具

2023-06-29 06:48

关注

这篇文章主要介绍Python如何实现实时增量数据加载工具,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

结合单例模式实际应用案例:实现实时增量数据加载工具的解决方案。最关键的是实现一个可进行添加、修改、删除等操作的增量ID记录表。

单例模式:提供全局访问点,确保类有且只有一个特定类型的对象。通常用于以下场景:日志记录或数据库操作等,避免对用一资源请求冲突。

创建增量ID记录表

import sqlite3import datetimeimport pymssqlimport pandas as pdimport timepd.set_option('expand_frame_repr', False)

导入所需模块

 # 创建数据表database_path = r'.\Database\ID_Record.db'from sqlite3 import connectwith connect(database_path) as conn:    conn.execute(        'CREATE TABLE IF NOT EXISTS Incremental_data_max_id_record(id INTEGER PRIMARY KEY AUTOINCREMENT,F_SDaqID_MAX TEXT,record_date datetime)')

增量最新记录ID-F_SDaqID_MAX数据库存储

#数据保存到本地txtdef text_save(filename, record):#filename为写入txt文件的路径,record为要写入F_SDaqID_MAX、record_date数据列表.    file = open(filename,'a') 追加方式    # file = open(filename, 'w')  #覆盖方式    for i in range(len(record)):        s = str(record[i]).replace('[','').replace(']','')        s = s.replace("'",'').replace(',','') +'\n'   #去除单引号,逗号,每行末尾追加换行符        file.write(s)    file.close()

增量最新记录ID-F_SDaqID_MAX临时文件存储

增量ID记录提供了两种实现方案 ,一个是数据持久化存储模式,另一个是临时文件存储模式。数据持久化模式顾名思义,也就是说在创建对象的时候,能将操作关键信息如增量ID-F_SDaqID_MAX记录下来,这种flag记录映射是常选择的设计模式。

数据库连接类

实现实时增量数据获取需要实现两个数据库连接类:增量数据ID存储类和增量目标数据源类。这里利用单例模式实现数据库操作类,将增量服务记录信息按照顺序存储到数据库或特定的日志文件中,以维护数据的一致性。

增量数据ID存储sqlite连接类代码

class Database_sqlite(metaclass=MetaSingleton):    database_path = r'.\Database\energy_rc_configure.db'    connection = None    def connect(self):        if self.connection is None:            self.connection = sqlite3.connect(self.database_path,check_same_thread=False,isolation_level=None)            self.cursorobj =  self.connection.cursor()        return self.cursorobj,self.connection    # 插入最大记录    @staticmethod    def Insert_Max_ID_Record(f1, f2):        cursor = Database_sqlite().connect()        print(cursor)        sql = f"""insert into Incremental_data_max_id_record(F_SDaqID_MAX,record_date) values("{f1}","{f2}")"""        cursor[0].execute(sql)        # sql = "insert  into Incremental_data_max_id_record(F_SDaqID_MAX,record_date) values(?,?)"        # cursor[0].execute(sql,(f"{f1}",f"{f2}"))        cursor[1].commit()        print("插入成功!")        # cursor[0].close()        return     # 取出增量数据库中最新一次ID记录    @staticmethod    def View_Max_ID_Records():        cursor = Database_sqlite().connect()        sql = "select max(F_SDaqID_MAX) from Incremental_data_max_id_record"        cursor[0].execute(sql)        results = cursor[0].fetchone()[0]        # #单例模式不用关闭数据库连接        # cursor[0].close()        print("最新记录ID", results)        return results    #删除数据记录ID    @staticmethod    def Del_Max_ID_Records():        cursor = Database_sqlite().connect()        sql = "delete from Incremental_data_max_id_record where record_date = (select MAX(record_date) from Incremental_data_max_id_record)"        cursor[0].execute(sql)        # results = cursor[0].fetchone()[0]        # # cursor[0].close()        cursor[1].commit()        print("删除成功")        return

增量数据源sqlserver连接类代码

class Database_sqlserver(metaclass=MetaSingleton):    """    #实时数据库    """    connection = None    # def connect(self):    def __init__(self):        if self.connection is None:            self.connection = pymssql.connect(host="xxxxx",user="xxxxx",password="xxxxx",database="xxxxx",charset="utf8")            if self.connection:                print("连接成功!")            # 打开数据库连接            self.cursorobj = self.connection.cursor()        # return self.cursorobj, self.connection    # 获取数据源中最大ID    @staticmethod    def get_F_SDaqID_MAX():        # cursor_insert = Database_sqlserver().connect()        cursor_insert = Database_sqlserver().cursorobj        sql_MAXID = """select MAX(F_SDaqID) from T_DaqDataForEnergy"""        cursor_insert.execute(sql_MAXID)  # 执行查询语句,选择表中所有数据        F_SDaqID_MAX = cursor_insert.fetchone()[0]  # 获取记录        print("最大ID值:{0}".format(F_SDaqID_MAX))        return F_SDaqID_MAX    # 提取增量数据    @staticmethod    def get_incremental_data(incremental_Max_ID):        # 开始获取增量数据        sql_incremental_data = """select F_ID,F_Datetime,F_Data from T_DaqDataForEnergy  where F_ID > {0}""".format(            incremental_Max_ID)        # cursor_find = Database_sqlserver().connect()        cursor_find = Database_sqlserver().cursorobj        cursor_find.execute(sql_incremental_data)  # 执行查询语句,选择表中所有数据        Target_data_source = cursor_find.fetchall()  # 获取所有数据记录        # cursor_find.close()        cursor_find.close()        df = pd.DataFrame(            Target_data_source,            columns=[                "F_ID",                "F_Datetime",                "F_Data"])        print("提取数据", df)        return df

数据资源应用服务设计主要考虑数据库操作的一致性和优化数据库的各种操作,提高内存或CPU利用率。

实现多种读取和写入操作,客户端操作调用API,执行相应的DB操作。

注:

使用metaclass实现创建具有单例特征的类

Database_sqlserver(metaclass=MetaSingleton)

Database_sqlite(metaclass=MetaSingleton)

使用class定义新类时,数据库类Database_sqlserver由MetaSingleton装饰后即指定了metaclass,那么MetaSingleton的特殊方法__call__方法将自动执行。

class MetaSingleton(type):    _instances={}    def __call__(cls, *args, **kwargs):        if cls not in cls._instances:            cls._instances[cls] = super(MetaSingleton,cls).__call__(*args,**kwargs)        return cls._instances[cls]

以上代码基于元类的单例实现,当客户端对数据库执行某些操作时,会多次实例化数据库类,但是只创建一个对象,所以对数据库的调用是同步的。

多线程使用同一数据库连接资源需采取一定同步机制

如果没采用同步机制,可能出现一些意料之外的情况

1)with cls.lock加锁

class MetaSingleton(type):    _instances={}    lock = threading.Lock()    def __call__(cls, *args, **kwargs):        with cls.lock:            if cls not in cls._instances:                time.sleep(0.05)  #模拟耗时                cls._instances[cls] = super(MetaSingleton,cls).__call__(*args,**kwargs)            return cls._instances[cls]

锁的创建和释放需要消耗资源,上面代码每次创建都必须获得锁。

如果我们开发的程序非单个应用,而是集群化的,即多个客户端共享单个数据库,导致数据库操作无法同步,而数据库连接池是更好的选择。大大节省了内存,提高了服务器地服务效率,能够支持更多的客户服务。

数据库连接池的解决方案是在应用程序启动时建立足够的数据库连接,并讲这些连接组成一个连接池,由应用程序动态地对池中的连接进行申请、使用和释放。对于多于连接池中连接数的并发请求,应该在请求队列中排队等待。

增量数据服务客户端

增量处理策略:第一次加载先判断增量数据表中是否存在最新记录,若有直接加载;否则,记录一下最大/最新的数据记录ID或时间点,保存到一个增量数据库或记录文件中。

 从第二次加载开始只加载最大/最新的ID或时间点以后的数据。当加载过程全部成功完成之后并同步更新增量数据库或记录文件,更新这次数据记录的最后记录ID或时间点。

一般这类数据记录表有自增长列,那么也可以使用自增长列来实现这个标识特征。比如本次我用到数据表增长列F_ID。

class IncrementalRecordServer:    _servers = []    _instance = None    def __new__(cls, *args, **kwargs):        if not IncrementalRecordServer._instance:            # IncrementalRecordServer._instance = super().__new__(cls)            IncrementalRecordServer._instance = super(IncrementalRecordServer,cls).__new__(cls)        return IncrementalRecordServer._instance    def __init__(self,changeServersID=None):        """        变量初始化过程        """        self.F_SDaqID_MAX = Database_sqlserver().get_F_SDaqID_MAX()        self.record_date = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')        self.changeServersID = changeServersID    # 回调更新本地记录,清空记录替换,临时记录    def record(func):        def Server_record(self):            v = func(self)            text_save(filename=r"F:\AutoOps_platform\Database\Server_record.txt",record=IncrementalRecordServer._servers)            print("保存成功")            return v        return Server_record    #增加服务记录    @record    def addServer(self):        self._servers.append([int(self.F_SDaqID_MAX),self.record_date])        print("添加记录")        Database_sqlite.Insert_Max_ID_Record(f1=self.F_SDaqID_MAX, f2=self.record_date)    #修改服务记录    @record    def changeServers(self):        # self._servers.pop()        # 此处传入手动修改的记录ID        self._servers.append([self.changeServersID,self.record_date])        #先删除再插入实现修改        Database_sqlite.Del_Max_ID_Records()        Database_sqlite.Insert_Max_ID_Record(f1=self.changeServersID, f2=self.record_date)        print("更新记录")    #删除服务记录    @record    def popServers(self):        # self._servers.pop()        print("删除记录")        Database_sqlite.Del_Max_ID_Records()    # 最新服务记录    def getServers(self):        # print(self._servers[-1])        Max_ID_Records = Database_sqlite.View_Max_ID_Records()        print("查看记录",Max_ID_Records)        return Max_ID_Records    #提取数据    def Incremental_data_client(self):        """        # 提取数据(增量数据MAXID获取,并提取增量数据)        """        # 实时数据库        # 第一次加载先判断是否存在最新记录        if self.getServers() == None:            # 插入增量数据库ID            self.addServer()            # 提取增量数据            data = Database_sqlserver.get_incremental_data(self.F_SDaqID_MAX)            return data        # 获取增量数据库中已有的最新最大ID记录        incremental_Max_ID = self.getServers()        #添加记录        self.addServer()        # 提取增量数据        Target_data_source = Database_sqlserver.get_incremental_data(incremental_Max_ID)        return Target_data_source

优化策略:

延迟加载方式

以上增量记录服务类IncrementalRecordServer通过覆盖__new__方法来控制对象的创建,我们在创建对象的时候会先检查对象是否存在。也可以通过懒加载的方式实现,节约资源优化如下。

class IncrementalRecordServer:    _servers = []    _instance = None    def __init__(self,changeServersID=None):        """        变量初始化过程        """        self.F_SDaqID_MAX = Database_sqlserver().get_F_SDaqID_MAX()        self.record_date = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')        self.changeServersID = changeServersID        if not IncrementalRecordServer._instance:            print("__init__对象创建")        else:            print("对象已经存在:",IncrementalRecordServer._instance)            self.getInstance()    @classmethod    def getInstance(cls):        if not cls._instance:            cls._instance = IncrementalRecordServer()        return cls._instance

懒汉式实例化能够确保实际需要时才创建对象,实例化a= IncrementalRecordServer()时,调用初始化__init__方法,但是没有新的对象创建。懒汉式这种方式加载类对象,也称为延迟加载方式。

单例模式能有效利用空间资源,每次利用同一空间资源。

不同操作对象的内存地址相同,且不同对象初始化将上一个对象初始化变量覆盖,确保最新记录实时更新。表面上以上代码实现了单例模式没问题,但多线程并发情况下,存在线程安全问题,可能同时创建不同的对象空间。考虑到线程安全,也可以进一步加锁处理.

适用范围及注意事项

本次代码适用于部署生产指定时间点运行之后产出的增量数据,长时间未启用再启动需要清空历史记录即增量数据库或文件ID需清空,一般实时数据增量实现一次加载没有什么问题,所以这一点也不用很关注(文件方式代码可自行完善);当加载历史数据库或定时间隔产生数据量过大时,需要进一步修改代码,需要判断数据规模,指定起始节点及加载数据量,综合因素考虑,下次分享一下亿级数据量提取方案。

进一步了解Python垃圾回收机制;并发情况下,通过优化线程池来管理资源。

最后可以添加一个函数来释放资源

def __del__(self):    class_name = self.__class__.__name__    print(class_name,"销毁")

del obj 调用__del__() 销毁对象,释放其空间;只有Python 对象在不再引用对象时被释放。当程序中有其它变量引用该实例对象时,即便手动调用 __del__() 方法,该方法也不会立即执行。这和 Python 的垃圾回收机制的实现有关。

结果测试

if __name__ == '__main__':    for i in range(6):        hc1 = IncrementalRecordServer()        hc1.addServer()        print("Record_ID",hc1._servers[i])        # del hc1        time.sleep(60)    #Server2-客户端client    # 最新服务记录    hc2 = IncrementalRecordServer()    hc2.getServers()    #查看增量数据    hc2.Incremental_data_client()

插入记录

模拟每1分钟插入一条记录,向增量数据库插入7条

Python如何实现实时增量数据加载工具

Python如何实现实时增量数据加载工具

if __name__ == '__main__':    # Server3-客户端client    # 手动添加增量起始ID记录    hc3 = IncrementalRecordServer(changeServersID='346449980')    hc3.changeServers()

Python如何实现实时增量数据加载工具

Python如何实现实时增量数据加载工具

if __name__ == '__main__':    #删除ID    hc3 = IncrementalRecordServer(changeServersID='346449980')    # hc3.changeServers()    hc3.popServers()

Python如何实现实时增量数据加载工具

以上是“Python如何实现实时增量数据加载工具”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注编程网行业资讯频道!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯