文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

如何解析SparkSQL外部数据源

2023-06-02 21:01

关注

这期内容当中小编将会给大家带来有关如何解析SparkSQL外部数据源,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

场景介绍:

    大数据MapReduce,Hive,Spark作业,首先需要加载数据,数据的存放源可能是HDFS、HBase、S3、OSS mongoDB;数据格式也可能为json、text、csv、parquet、jdbc..或者数据格式经过压缩,不同格式文件需要不同的解析方式,

    如果需要HDFS关联MySQL数据,可以通过sqoop进行一些列转换到,如果使用External Data Source API直接加载为DF拿到数据,简单的说可以通过SparkSQL拿到外部数据源数据加载成DF。

加载方式:

build-in :内置加载外部数据如 json、text、parquet、jdbc、HDFS;

third-party:第三方加载外部数据如HBase、S3、OSS mongoDB

    第三方JAR地址:https://spark-packages.org/  

    Maven工程需要导入gav

    spark-shell:需要外部导入--package g:a:v  

    SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0

        优势:下载依赖包到本地

缺点:内网环境没有网络无法下载

一、外部数据源读取parquet文件:

Spark context Web UI available at http://hadoop001:4040

Spark context available as 'sc' (master = local[2], app id = local-1536244013147).

Spark session available as 'spark'.

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 2.3.1

      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)

Type in expressions to have them evaluated.

Type :help for more information.

scala> spark.read.load("file:///home/hadoop/app/spark--bin-2.6.0-cdh6.7.0/examples/src/main/resources/people.txt").show

提示错误:/people.txt is not a Parquet file

注意:spark.read.load()底层默认读取Parquet file

scala> spark.read.load("file:///home/hadoop/app/spark--bin-2.6.0-cdh6.7.0/examples/src/main/resources/users.parquet").show

18/09/06 10:32:29 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException

+------+--------------+----------------+                                        |  name|favorite_color|favorite_numbers|+------+--------------+----------------+|Alyssa|          null|  [3, 9, 15, 20]||   Ben|           red|              []|+------+--------------+----------------+

scala> val users = spark.read.load("file:///home/hadoop/app/spark--bin-2.6.0-cdh6.7.0/examples/src/main/resources/users.parquet")

users: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]

scala> users.printSchema

root

 |-- name: string (nullable = true)

 |-- favorite_color: string (nullable = true)

 |-- favorite_numbers: array (nullable = true)

 |    |-- element: integer (containsNull = true)

scala> users.show

+------+--------------+----------------+|  name|favorite_color|favorite_numbers|+------+--------------+----------------+|Alyssa|          null|  [3, 9, 15, 20]||   Ben|           red|              []|+------+--------------+----------------+

-- 查看列,常规操作

scala> users.select("name").show

+------+|  name|+------+|Alyssa||   Ben|+------+

二、转换操作

-- 转成json格式输出

scala> users.select("name","favorite_color").write.format("json").save("file:////home/hadoop/data/parquet/")

[hadoop@hadoop001 parquet]$ cat *{"name":"Alyssa"}{"name":"Ben","favorite_color":"red"}

-- 不采取压缩

.option("compression","none")  

-- 转成text格式输出

scala> users.select("name").write.format("text").save("file:////home/hadoop/data/parquet2/")

[hadoop@hadoop001 parquet2]$ cat *

Alyssa

-- Save Modes

用法:.mode("")

1、default  -- 目标目录存在,抛出异常

2、append   -- 目标目录存在,重跑数据+1,无法保证数据幂等

3、overwrite-- 目标目录存在,覆盖原文件

4、ignore-- 忽略你的模式,目标纯在将不保存

三、spark-shell操作JDBC数据

-- 读取外部MySQL数据为DF

val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop001:3306/ruozedata").option("driver","com.mysql.jdbc.Driver").option("dbtable", "city_info").option("user","root").option("password", "root").load()

-- 查看表信息

jdbcDF.show()

-- 获取本地数据 

val deptDF = spark.table("dept") 

-- join关联使用

deptDF.join(jdbcDF,deptDF.col("deptid") === jdbcDF.col("deptid"))

-- DF写入MySQL本地,数据类型有变化,重复写入需要加上.mode("overwrite")

jdbcDF.write.format("jdbc").option("url", "jdbc:mysql://hadoop001:3306/hive_data").option("driver","com.mysql.jdbc.Driver").option("dbtable", "city_info_bak").option("user","root").option("password", "root").save()

mysql> show tables

+---------------------------+| Tables_in_hive_data       |+---------------------------+| bucketing_cols            || cds                       || city_info_bak             |+---------------------------+

-- 如果想类型不发生变化指定option指定字段类型

.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")

四、spark-sql操作JDBC数据

-- SQL创建临时表视图,单session

CREATE TEMPORARY VIEW emp_sqlUSING org.apache.spark.sql.jdbcOPTIONS (  url "jdbc:mysql://hadoop001:3306/ruozedata",  dbtable "city_info",  user 'root',  password 'root')

show tbales;

INSERT INTO TABLE emp_sql

SELECT * FROM emp_sql

五、Perdicate Push Down(PPD)

               disk         network                  CPU

外部数据外(1T)------->获取本地磁盘(1T)---------->提交到集群(1T)--------->结果(1G)

               disk        network                   CPU

外部数据外(1T)------->经过列裁剪(10G)----------->提交到集群(10G)----------->传结果(1g)

               disk          CPU                 network

外部数据外(1T)------->经过列裁剪(10G)---------->进过计算(1G)----------->传输结果

六、SparkSQL外部数据源实现机制

-- 0.有效的读取外部数据源的数据的

-- 1.buildScan扫描整张表,变成一个RDD[ROW]

trait TableScan {

def buildScan(): RDD[Row]  

}

-- 2.PrunedScan获取表的裁剪列

trait PrunedScan {

def buildScan(requiredColumns: Array[String]): RDD[Row] 

-- 3.PrunedFilteredScan列裁剪,行过滤

trait PrunedFilteredScan {

def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]

-- 4.加载外部数据源的数据,定义数据的schema信息

abstract class BaseRelation{

-- 5.Relation产生BaseRelation使用

trait RelationProvider

}

总归:

-- 查询类操作

trait PrunedScan {

  def buildScan(requiredColumns: Array[String]): RDD[Row]

}  

-- 列裁剪,行过滤

trait PrunedFilteredScan {

  def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]

}  

-- 写入类操作

trait InsertableRelation {

  def insert(data: DataFrame, overwrite: Boolean): Unit

}

上述就是小编为大家分享的如何解析SparkSQL外部数据源了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注编程网行业资讯频道。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯