文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

如何使用数据版本控制管理数据湖中的模式验证

2024-11-30 11:04

关注

审校 | 重楼

数据团队依赖许多其他“第三方”发送数据的情况并不少见,他们经常在没有进行任何沟通或让数据团队知道太晚的情况下更改数据的模式。

每当发生这种情况时,数据管道就会遭到破坏,数据团队需要修复数据湖。这是一个充满繁重任务的人工过程。在通常情况下,数据团队可能会推卸责任,试图证明模式已经改变。

但是随着发展和进步,数据团队意识到,以自动持续集成(CI)/持续交付(CD)的方式简单地阻止模式一起更改是更明智的。

模式更改和模式验证给数据团队带来了很多痛苦,但是市场上有一些解决方案可以帮助解决这个问题——幸运的是,其中一些是开源的。

以下一个循序渐进的教程,介绍如何使用开源数据版本控制工具lakeFS解决模式验证问题。

什么是模式验证?

模式验证允许用户为数据湖创建验证规则,例如允许的数据类型和值范围。它保证保存在数据湖中的数据遵循已建立的模式,该模式描述了数据的结构、格式和限制。

由于用户的数据湖可以填充来自具有不同模式定义的各种来源的数据,因此在数据湖中的所有数据上强制使用统一的模式是一个挑战。

这是一个需要解决的问题——如果不快速采取行动,就会在数据处理过程中看到不一致和错误。

为什么需要处理模式验证?

花费一些时间正确地管理模式是值得的,有以下四个原因:

处理数据湖中的模式并非一帆风顺

在数据仓库中,用户处理的是严格的数据模型和严格的模式。数据湖与之相反。大多数情况下,它们最终包含广泛的数据源。

为什么这很重要?因为在数据湖中,模式的定义可以在数据源之间发生变化,并且当添加新数据时,模式可能会随着时间的推移而变化。这使得在数据湖中的所有数据上实施统一的模式成为一个巨大的挑战。如果不能解决这个问题,将不得不解决数据处理问题。

但这还不是全部。由于构建在数据湖之上的数据管道的复杂性不断增加,无法拥有一个一致的模式。数据管道可以包括多个流程和转换,每个流程和转换都需要一个唯一的模式定义。

模式可能随着数据的处理和修改而变化,因此很难确保跨整个管道进行模式验证。

这就是版本控制系统可以派上用场的地方。

在数据湖中实现模式验证的数据版本控制

lakeFS是一个开源工具,它可以将数据湖转换为类似Git的存储库,让用户像软件工程师管理代码一样管理它。这就是数据版本控制的意义所在。

与其他源代码控制系统一样,lakeFS有一个称为hook的特性,它是定制的脚本或程序,lakeFS平台可以运行这些脚本或程序来响应指定的事件或操作。

这些事件可以包括提交更改、合并分支、创建新分支、添加或删除标记等等。例如,当合并发生时,在合并完成之前,在源分支上运行一个预合并挂钩。

它如何应用于模式验证呢? 用户可以创建一个预合并挂钩来验证Parquet文件的模式与当前模式是否相同。

需要准备什么

在这个场景中,将在一个摄取分支中创建一个delta表,并将其合并到生产中。接下来将更改表的模式,并尝试再次合并它,模拟将数据提升到生产的过程。

1.设置

首先,将设置一些全局变量并安装将在本例中使用的包,这些包将在Python笔记本中运行。

在设置好lakeFS凭证后,可以开始创建一些包含存储库和分支名称的全局变量:

Python 
 repo = "schema-validation-example-repo"

 mainBranch = "main"

 ingestionBranch = "ingestion_branch"

每个lakeFS存储库都需要有自己的存储命名空间,所以也需要创建一个:

Python 
 storageNamespace = 's3://' # e.g. "s3://username-lakefs-cloud/"

在本例中,使用AWS S3存储。为了使一切顺利进行,用户的存储需要配置为与lakeFS一起运行,lakeFS与AWS、Azure、Google Cloud或内部部署对象存储(如MinIO)一起工作。

如果在云中运行lakeFS,则可以通过复制示例存储库的存储名称空间并将字符串附加到其上,将其链接到存储。所以,如果lakeFS Cloud提供了这个sample-repo

可以通过以下方式进行配置:

Python 
 storageNamespace = 's3://lakefs-sample-us-east-1-production/AROA5OU4KHZHHFCX4PTOM:2ae87b7718e5bb16573c021e542dd0ec429b7ccc1a4f9d0e3f17d6ee99253655/my_random_string'

在笔记本中,将使用Python代码,因此也必须导入lakeFS Python客户端包:

Python 
 import lakefs_client

 from lakefs_client import models

 from lakefs_client.client import LakeFSClient



 import os

from pyspark.sql.types import ByteType, IntegerType, LongType, StringType, StructType, StructField

接下来,配置客户端:

Python 
 %xmode Minimal

 if not 'client' in locals():

 # lakeFS credentials and endpoint

 configuration = lakefs_client.Configuration()

 configuration.username = lakefsAccessKey

 configuration.password = lakefsSecretKey

 configuration.host = lakefsEndPoint



 client = LakeFSClient(configuration)

 print("Created lakeFS client.")

以下将在本例中创建delta表,因此需要包括以下包:

Python 
 os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.12:2.0.0 --conf "spark.sql.extensinotallow=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" pyspark-shell'

lakeFS公开了一个S3网关,它允许应用程序以与S3通信的方式与lakeFS进行接口。要配置网关,并执行以下步骤:

Python 
 from pyspark.context import SparkContext

 from pyspark.sql.session import SparkSession

 sc = SparkContext.getOrCreate()

 spark = SparkSession(sc)



 sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", lakefsAccessKey)

 sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", lakefsSecretKey)

 sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", lakefsEndPoint)

 sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")

现在已经准备好在笔记本中大规模使用lakeFS版本控制。

2.创建存储库和挂钩

以下将使用Python客户端创建存储库:

Python 
 client.repositories.create_repository(

 repository_creatinotallow=models.RepositoryCreation(

 name=repo,

 storage_namespace=storageNamespace,

 default_branch=mainBranch))

在这种情况下,将使用预合并挂钩来确保架构没有更改。操作文件应提交到lakeFS存储库,前缀为_lakeFS_actions/。未能分析操作文件将导致运行失败。

将提交以下钩子配置操作文件,pre-merge-schema-validation.yaml:

Python 
 #Parquet schema Validator

 #Args:

 # - locations (list of strings): locations to look for parquet files under

 # - sample (boolean): whether reading one new/changed file per directory is enough, or go through all of them

 #Example hook declaration: (_lakefs_actions/pre-merge-schema-validation.yaml):

 name: pre merge checks on main branch

 on:
、
 pre-merge:

 branches:

 - main

 hooks:

 - id: check_schema_changes

 type: lua

 properties:

 script_path: scripts/parquet_schema_change.lua # location of this script in the repository

 args:

 sample: false

 locations:

 - tables/customers/

这个文件(pre-merge-schema-validation.yaml)存储在example repo中的子文件夹LuaHooks中。必须将文件提交到文件夹_lakeFS_actions下的lakeFS存储库:

Python 
 hooks_config_yaml = "pre-merge-schema-validation.yaml"

 hooks_prefix = "_lakefs_actions"

 with open(f'./LuaHooks/{hooks_config_yaml}', 'rb') as f:

 client.objects.upload_object(repository=repo,

 branch=mainBranch,

 path=f'{hooks_prefix}/{hooks_config_yaml}',


 cnotallow=f

 )

只是设置了一个动作脚本,在合并到main之前运行scripts/parquet_schema_che.lua。

然后将创建脚本本身(parquet_schema_che.lua)并将其上载到脚本目录中。正如人们所看到的,使用嵌入式LuaVM来运行钩子,而不依赖于其他组件。

此文件也位于ample-repo中的LuaHooks子文件夹中:

Python 
 --[[
 
  Parquet schema validator
 
 
 
 Args:

 - locations (list of strings): locations to look for parquet files under

 - sample (boolean): whether reading one new/changed file per directory is enough, or go through all of them

 ]]





 lakefs = require("lakefs")

  strings = require("strings")


parquet = require("encoding/parquet")

 regexp = require("regexp")

 path = require("path")





  visited_directories = {}



 for _, location in ipairs(args.locations) do

    after = ""

  has_more = true

  need_more = true

  print("checking location: " .. location)

  while has_more do

     print("running diff, location = " .. location .. " after = " .. after)

     local code, resp = lakefs.diff_refs(action.repository_id, action.branch_id, action.source_ref, after, location)

     if code ~= 200 then

         error("could not diff: " .. resp.message)

     end



    for _, result in pairs(resp.results) do

        p = path.parse(result.path)

      print("checking: '" .. result.path .. "'")

         if not args.sample or (p.parent and not visited_directories[p.parent]) then

            if result.path_type == "object" and result.type ~= "removed" then

                if strings.has_suffix(p.base_name, ".parquet") then

                     -- check it!

                  code, content = lakefs.get_object(action.repository_id, action.source_ref, result.path)

                   if code ~= 200 then

                        error("could not fetch data file: HTTP " .. tostring(code) .. "body:\n" .. content)

                     end

                   schema = parquet.get_schema(content)

                   for _, column in ipairs(schema) do

                        for _, pattern in ipairs(args.column_block_list) do

                           if regexp.match(pattern, column.name) then

                               error("Column is not allowed: '" .. column.name .. "': type: " .. column.type .. " in path: " .. result.path)

                           end

                        end

                   end

                   print("\t all columns are valid")

                   visited_directories[p.parent] = true

               end

          end

       else

          print("\t skipping path, directory already sampled")
   end

    end



    -- pagination

    has_more = resp.pagination.has_more

    after = resp.pagination.next_offset

 end

  end

把文件(这次是parquet_schema_che.lua)从LuaHooks目录上传到lakeFS存储库中操作配置文件中指定的位置(即脚本文件夹内):

Python 
hooks_config_yaml = "pre-merge-schema-validation.yaml"

 hooks_prefix = "_lakefs_actions"



 with open(f'./LuaHooks/{hooks_config_yaml}', 'rb') as f:

  client.objects.upload_object(repository=repo,

                            branch=mainBranch,

                           path=f'{hooks_prefix}/{hooks_config_yaml}',

                            content=f

                               )

必须在提交操作文件后提交更改才能生效:

Python 
 client.commits.commit(

 repository=repo,

 branch=mainBranch,

 commit_creatinotallow=models.CommitCreation(

 message='Added hook config file and schema validation scripts'))

如果切换到lakeFS UI,应该会在主目录下看到以下目录结构和文件:

LakeFS UI的目录结构

lakeFS UI中显示的合并前架构验证

lakeFS UI中的架构验证脚本

3.使用原始模式运行第一个ETL

在lakeFS中,可以在与生产(主要)分支不同的分支上进行摄取和转化。

以下将建立一个摄取分支:

Python 
 client.branches.create_branch(

  repository=repo,

  branch_creatinotallow=models.BranchCreation(
 
 name=ingestionBranch, source=mainBranch))

接下来,将使用Kaggle数据集Orion Star——运动和户外RDBMS数据集。使用Customer.csv,可以从data/samples/OrionStar/将其上传到示例存储库。

首先,需要定义表模式:

Python 
 customersSchema = StructType([
 
 StructField("User_ID", IntegerType(), False),
 
  StructField("Country", StringType(), False),
 
 StructField("Gender", StringType(), False),

  StructField("Personal_ID", IntegerType(), True),

  StructField("Customer_Name", StringType(), False),

  StructField("Customer_FirstName", StringType(), False),

 StructField("Customer_LastName", StringType(), False),
 
 StructField("Birth_Date", StringType(), False),
 
  StructField("Customer_Address", StringType(), False),

  StructField("Street_ID", LongType(), False),
 
  StructField("Street_Number", IntegerType(), False),

  StructField("Customer_Type_ID", IntegerType(), False)

 ])

然后,从CSV文件中,将创建一个delta表,并将其提交到存储库:

Python 
 customersTablePath = f"s3a://{repo}/{ingestionBranch}/tables/customers"

 df = spark.read.csv('./data/samples/OrionStar/CUSTOMER.csv',header=True,schema=customersSchema)

 df.write.format("delta").mode("overwrite").save(customersTablePath)

在这里需要做出改变:

Python 
 client.commits.commit(

 repository=repo,

 branch=ingestionBranch,

 commit_creatinotallow=models.CommitCreation(

 message='Added customers Delta table',

 metadata={'using': 'python_api'}))

然后,使用合并将数据发送到生产:

Python 
 client.refs.merge_into_branch(
 
  repository=repo,
 
  source_ref=ingestionBranch,
 
  destination_branch=mainBranch)

已经完成的架构验证序列:

4. 修改模式并尝试将表移动到生产环境

为了简化操作,将重命名其中一列。以下将Country_name替换为Country_name:

Python 
 customersSchema = StructType([

 StructField("User_ID", IntegerType(), False),
 
  StructField("Country_Name", StringType(), False), # Column name changes from Country to Country_name

  StructField("Gender", StringType(), False),
 
 StructField("Personal_ID", IntegerType(), True),

  StructField("Customer_Name", StringType(), False),
 
  StructField("Customer_FirstName", StringType(), False),

 StructField("Customer_LastName", StringType(), False),

  StructField("Birth_Date", StringType(), False),

 StructField("Customer_Address", StringType(), False),

  StructField("Street_ID", LongType(), False),

  StructField("Street_Number", IntegerType(), False),

 StructField("Customer_Type_ID", IntegerType(), False)

 ])

在摄取分支中,重新创建delta表:


Python 
 customersTablePath = f"s3a://{repo}/{ingestionBranch}/tables/customers"
 
 df = spark.read.csv('./data/samples/OrionStar/CUSTOMER.csv',header=True,schema=customersSchema)

 df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(customersTablePath)

 ])

需要进行修改:

Python 
 client.commits.commit(

 repository=repo,

  branch=ingestionBranch,

 commit_creatinotallow=models.CommitCreation(

 message='Added customers table with schema changes',

 metadata={'using': 'python_api'}))

然后,可以尝试将数据投入生产:

Python 
 client.commits.commit(

 repository=repo,

 branch=ingestionBranch,

 commit_creatinotallow=models.CommitCreation(

 message='Added customer tables with schema changes!',

 metadata={'using': 'python_api'}))

由于模式修改,得到了一个先决条件Failed错误。合并前的挂钩阻碍了晋升。因此,这些数据不会在生产中使用:

从lakeFS UI中,可以导航到存储库并选择“Actions”选项。接下来,单击失败操作的Run ID,选择“主分支上的合并前检查”,展开check_schema_changes,并查看错误消息。

结语

由于存储数据的异构性和原始性,数据湖上的模式验证至关重要,但也很困难。管理模式演变、数据转换和跨多种格式的兼容性检查意味着每个数据从业者都需要一些非常强大的方法和工具。

数据湖的去中心化性质,许多用户和系统可以在其中编辑数据,使模式验证更加复杂。模式的验证对于数据治理、集成和可靠的分析至关重要。

像上面展示的预合并挂钩这样的解决方案有助于在将模式文件合并到生产分支之前验证它们。它在保证数据完整性和防止不兼容的模式更改合并到主分支时非常方便。它还增加了一层额外的质量控制,使数据更加一致。

原文Managing Schema Validation in a Data Lake Using Daa Version Control,作者:Iddo Avneri


来源:51CTO内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯