这期内容当中小编将会给大家带来有关如何解析SparkSQL外部数据源,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
场景介绍:
大数据MapReduce,Hive,Spark作业,首先需要加载数据,数据的存放源可能是HDFS、HBase、S3、OSS mongoDB;数据格式也可能为json、text、csv、parquet、jdbc..或者数据格式经过压缩,不同格式文件需要不同的解析方式,
如果需要HDFS关联MySQL数据,可以通过sqoop进行一些列转换到,如果使用External Data Source API直接加载为DF拿到数据,简单的说可以通过SparkSQL拿到外部数据源数据加载成DF。
加载方式:
build-in :内置加载外部数据如 json、text、parquet、jdbc、HDFS;
third-party:第三方加载外部数据如HBase、S3、OSS mongoDB
第三方JAR地址:https://spark-packages.org/
Maven工程需要导入gav
spark-shell:需要外部导入--package g:a:v
SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
优势:下载依赖包到本地
缺点:内网环境没有网络无法下载
一、外部数据源读取parquet文件:
Spark context Web UI available at http://hadoop001:4040
Spark context available as 'sc' (master = local[2], app id = local-1536244013147).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.read.load("file:///home/hadoop/app/spark--bin-2.6.0-cdh6.7.0/examples/src/main/resources/people.txt").show
提示错误:/people.txt is not a Parquet file
注意:spark.read.load()底层默认读取Parquet file
scala> spark.read.load("file:///home/hadoop/app/spark--bin-2.6.0-cdh6.7.0/examples/src/main/resources/users.parquet").show
18/09/06 10:32:29 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
+------+--------------+----------------+ | name|favorite_color|favorite_numbers|+------+--------------+----------------+|Alyssa| null| [3, 9, 15, 20]|| Ben| red| []|+------+--------------+----------------+
scala> val users = spark.read.load("file:///home/hadoop/app/spark--bin-2.6.0-cdh6.7.0/examples/src/main/resources/users.parquet")
users: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]
scala> users.printSchema
root
|-- name: string (nullable = true)
|-- favorite_color: string (nullable = true)
|-- favorite_numbers: array (nullable = true)
| |-- element: integer (containsNull = true)
scala> users.show
+------+--------------+----------------+| name|favorite_color|favorite_numbers|+------+--------------+----------------+|Alyssa| null| [3, 9, 15, 20]|| Ben| red| []|+------+--------------+----------------+
-- 查看列,常规操作
scala> users.select("name").show
+------+| name|+------+|Alyssa|| Ben|+------+
二、转换操作
-- 转成json格式输出
scala> users.select("name","favorite_color").write.format("json").save("file:////home/hadoop/data/parquet/")
[hadoop@hadoop001 parquet]$ cat *{"name":"Alyssa"}{"name":"Ben","favorite_color":"red"}
-- 不采取压缩
.option("compression","none")
-- 转成text格式输出
scala> users.select("name").write.format("text").save("file:////home/hadoop/data/parquet2/")
[hadoop@hadoop001 parquet2]$ cat *
Alyssa
-- Save Modes
用法:.mode("")
1、default -- 目标目录存在,抛出异常
2、append -- 目标目录存在,重跑数据+1,无法保证数据幂等
3、overwrite-- 目标目录存在,覆盖原文件
4、ignore-- 忽略你的模式,目标纯在将不保存
三、spark-shell操作JDBC数据
-- 读取外部MySQL数据为DF
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop001:3306/ruozedata").option("driver","com.mysql.jdbc.Driver").option("dbtable", "city_info").option("user","root").option("password", "root").load()
-- 查看表信息
jdbcDF.show()
-- 获取本地数据
val deptDF = spark.table("dept")
-- join关联使用
deptDF.join(jdbcDF,deptDF.col("deptid") === jdbcDF.col("deptid"))
-- DF写入MySQL本地,数据类型有变化,重复写入需要加上.mode("overwrite")
jdbcDF.write.format("jdbc").option("url", "jdbc:mysql://hadoop001:3306/hive_data").option("driver","com.mysql.jdbc.Driver").option("dbtable", "city_info_bak").option("user","root").option("password", "root").save()
mysql> show tables
+---------------------------+| Tables_in_hive_data |+---------------------------+| bucketing_cols || cds || city_info_bak |+---------------------------+
-- 如果想类型不发生变化指定option指定字段类型
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
四、spark-sql操作JDBC数据
-- SQL创建临时表视图,单session
CREATE TEMPORARY VIEW emp_sqlUSING org.apache.spark.sql.jdbcOPTIONS ( url "jdbc:mysql://hadoop001:3306/ruozedata", dbtable "city_info", user 'root', password 'root')
show tbales;
INSERT INTO TABLE emp_sql
SELECT * FROM emp_sql
五、Perdicate Push Down(PPD)
disk network CPU
外部数据外(1T)------->获取本地磁盘(1T)---------->提交到集群(1T)--------->结果(1G)
disk network CPU
外部数据外(1T)------->经过列裁剪(10G)----------->提交到集群(10G)----------->传结果(1g)
disk CPU network
外部数据外(1T)------->经过列裁剪(10G)---------->进过计算(1G)----------->传输结果
六、SparkSQL外部数据源实现机制
-- 0.有效的读取外部数据源的数据的
-- 1.buildScan扫描整张表,变成一个RDD[ROW]
trait TableScan {
def buildScan(): RDD[Row]
}
-- 2.PrunedScan获取表的裁剪列
trait PrunedScan {
def buildScan(requiredColumns: Array[String]): RDD[Row]
}
-- 3.PrunedFilteredScan列裁剪,行过滤
trait PrunedFilteredScan {
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}
-- 4.加载外部数据源的数据,定义数据的schema信息
abstract class BaseRelation{
}
-- 5.Relation产生BaseRelation使用
trait RelationProvider {
}
总归:
-- 查询类操作
trait PrunedScan {
def buildScan(requiredColumns: Array[String]): RDD[Row]
}
-- 列裁剪,行过滤
trait PrunedFilteredScan {
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}
-- 写入类操作
trait InsertableRelation {
def insert(data: DataFrame, overwrite: Boolean): Unit
}
上述就是小编为大家分享的如何解析SparkSQL外部数据源了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注编程网行业资讯频道。