文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

大数据之使用Spark增量抽取MySQL的数据到Hive数据库(1)

2023-10-18 18:13

关注

目录

前言

题目:

一、读题分析

二、处理过程

1.采用SparkSQL使用max函数找到最大的日期然后转换成时间类型在变成字符串

2.这里提供除了SQL方法外的另一种过滤不满足条件的方法

三、重难点分析

总结 


前言

本题来源于全国职业技能大赛之大数据技术赛项电商赛题-离线数据处理-抽取

题目:


提示:以下是本篇文章正文内容,下面案例可供参考(使用Scala语言编写) 

一、读题分析

涉及组件:MYSQL,HIVE,SCALA,SPARK

涉及知识点:

  1. Spark读取数据库数据
  2. DataFrameAPI的使用(重点)
  3. Spark写入数据库数据
  4. Hive数据库的基本操作
  5. 增量数据的概念(思考:与全量数据有什么区别?)

二、处理过程

  与全量数据类似,唯一不同的点在于抽取增量的数据只是在全量数据中的一部分(形象来说)。个人认为,这样在实际应用中,抽取增量数据比抽取全量数据,更节省时间,带宽,硬件处理频率。总来说,抽取增量数据而不是全量数据的目的就是减少资源的浪费。

1.采用SparkSQL使用max函数找到最大的日期然后转换成时间类型在变成字符串

import org.apache.spark.sql.SparkSessionimport java.text.SimpleDateFormatimport java.util.{Calendar, Properties}object MysqlToHive {  def main(args: Array[String]): Unit = {    import org.apache.spark.sql.SaveMode    import org.apache.spark.sql.functions.lit    val spark =SparkSession.builder().appName("mysqltoHive").master("spark://bigdata1:7077").enableHiveSupport().getOrCreate()    //    读取mysql的配置    val jdbcurl = "jdbc:mysql://bigdata1:3306/db"    val tablename = "table1"    val properties = new Properties()    properties.setProperty("user", "root")    properties.setProperty("password", "123456")    properties.setProperty("driver", "com.mysql.jdbc.Driver")    //    读取mysql数据创建dataframe    val mysqlDF = spark.read.jdbc(jdbcurl, tablename, properties)    mysqlDF.createOrReplaceTempView("mysqldata")    //    读取hive数据ods库中最大的时间    spark.sql("use ods")    val hiveDF = spark.read.table("ods.table1")    hiveDF.createOrReplaceTempView("hivedata")    //    获取最大值    val maxValue = spark.sql("select max(modified_time) from hivedata").head().getTimestamp(0).toString     println("Hive最大的时间为:" + maxModifiedTime)    //    3. 使用Spark SQL查询获取customer_inf表中modified_time的最大值。    //    4. 使用head()方法获取结果集中的第一行数据。    //    5. 使用getTimestamp(0)方法获取第一列数据的Timestamp类型值。    //    6. 使用toString()方法将Timestamp类型值转换为字符串类型。    //    7. 打印最大修改时间的字符串值。    //    找到增量数据    val resultDF = spark.sql(s"select * from mysqldata where momdified_time > '$maxValue'")    //    取得昨天的日期    //    法1:    val sdf = new SimpleDateFormat("yyyyMMdd")    val str = sdf.format(Calendar.getInstance().getTime.getTime - 24 * 60 * 60 * 1000)    //    法2:    val str = new SimpleDateFormat("yyyyMMdd").format(Calendar.getInstance().getTime.getTime - 24 * 60 * 60 * 1000)    val reDF = resultDF.withColumn("etl_date", lit(str))    reDF.write.mode(SaveMode.Append).partitionBy("etl_date").saveAsTable("ods.table1")  }}

2.这里提供除了SQL方法外的另一种过滤不满足条件的方法

    //    这里可以写死来模拟增量数据    //    val givenTime = "2022-08-23 00:00:00"    val maxValue = spark.sql("select max(modified_time) from hivedata").head().getTimestamp(0).toString    //    gt获取比givenTime时间大的数据    //    lt小于    val dataf = df.filter(col("modified_time").lt(max)).toDF()

三、重难点分析

  1. 增量数据与全量数据的不同
  2. SparkSQL函数的使用
  3. 解决增量数据的方法

总结 

什么是全量数据、增量数据?

全量数据和增量数据是在数据库系统迁移时的概念。

1.全量数据:

        当前需要迁移的数据库系统的全部数据。

2.增量数据:

        在数据库系统迁移过程中,对比原数据,新产生的数据即为增量数据。

原创作品如需引用请标明出处

来源地址:https://blog.csdn.net/qq_36920766/article/details/130386324

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯