文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解

2023-09-21 21:52

关注

目录

前言

一、PySpark基础功能

 1.Spark SQL 和DataFrame

2.Pandas API on Spark

3.Streaming

4.MLBase/MLlib

5.Spark Core

二、PySpark依赖

Dependencies

三、DataFrame

1.创建

创建不输入schema格式的DataFrame

创建带有schema的DataFrame

从Pandas DataFrame创建

通过由元组列表组成的RDD创建

2.查看

DataFrame.show()

spark.sql.repl.eagerEval.enabled

纵向显示

 查看DataFrame格式和列名

查看统计描述信息

PySpark DataFrame转换为Pandas DataFrame

 3.查询

添加新列实例:

条件查询DataFrame.filter()

 4.运算

Pandas_udf

DataFrame.mapInPandas

5.分组

 联合分组和应用函数

 6.获取数据输入/输出

CSV

 Parquet

 ORC

 四、结合Spark SQL

点关注,防走丢,如有纰漏之处,请留言指教,非常感谢


前言

要想了解PySpark能够干什么可以去看看我之前写的文章,里面很详细介绍了Spark的生态:

Spark框架深度理解一:开发缘由及优缺点

Spark框架深度理解二:生态圈

Spark框架深度理解三:运行架构、核心数据集RDD

PySpark只是通过JVM转换使得Python代码能够在Spark集群上识别运行。故Spark的绝大多数功能都可以被Python程序使用。

上篇文章:一文速学-PySpark数据分析基础:PySpark原理详解

已经把PySpark运行原理讲的很清楚了,现在我们需要了解PySpark语法基础来逐渐编写PySpark程序实现分布式数据计算。

已搭建环境:

Spark:3.3.0

Hadoop:3.3.3

Scala:2.11.12

JDK:1.8.0_201

PySpark:3.1.2


一、PySpark基础功能

PySpark是Python中Apache Spark的接口。它不仅可以使用Python API编写Spark应用程序,还提供了PySpark shell,用于在分布式环境中交互分析数据。PySpark支持Spark的大多数功能,如Spark SQL、DataFrame、Streaming、MLlib(机器学习)和Spark Core。

 1.Spark SQL 和DataFrame

Spark SQL是用于结构化数据处理的Spark模块。它提供了一种称为DataFrame的编程抽象,是由SchemaRDD发展而来。不同于SchemaRDD直接继承RDD,DataFrame自己实现了RDD的绝大多数功能。可以把Spark SQL DataFrame理解为一个分布式的Row对象的数据集合。

Spark SQL已经集成在spark-shell中,因此只要启动spark-shell就可以使用Spark SQL的Shell交互接口。如果在spark-shell中执行SQL语句,需要使用SQLContext对象来调用sql()方法。Spark SQL对数据的查询分成了两个分支:SQLContext和HiveContext,其中HiveContext继承了SQLContext,因此HiveContext除了拥有SQLContext的特性之外还拥有自身的特性。

Spark SQL允许开发人员直接处理RDD,同时也可查询例如在 Apache Hive上存在的外部数据。Spark SQL的一个重要特点是其能够统一处理关系表和RDD,使得开发人员可以轻松地使用SQL命令进行外部查询,同时进行更复杂的数据分析。

2.Pandas API on Spark

Spark上的pandas API可以扩展使用 python pandas库。

3.Streaming

Apache Spark中的Streaming功能运行在Spark之上,支持跨Streaming和历史数据的强大交互和分析应用程序,同时继承了Spark的易用性和容错特性。Spark Streaming是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是Spark Core,也就是把Spark Streaming的输入数据按照batch size(如1秒)分成一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),然后将Spark Streaming中对DStream的Transformation操作变为针对Spark中对RDD的Transformation操作,将RDD经过操作变成中间结果保存在内存中。

4.MLBase/MLlib

MLlib构建在Spark之上,是一个可扩展的机器学习库,它提供了一组统一的高级API,帮助用户创建和调整实用的机器学习管道。MLBase分为四部分:MLlib、MLI、ML Optimizer和MLRuntime。

5.Spark Core

Spark Core是Spark平台的底层通用执行引擎,所有其他功能都构建在其之上。它提供了RDD(弹性分布式数据集)和内存计算能力。

二、PySpark依赖

Dependencies

Package最低版本限制Note
pandas1.0.5支撑Spark SQL
Numpy1.7满足支撑MLlib基础API
pyarrow1.0.0支撑Spark SQL
Py4j0.10.9.5要求
pandas1.0.5pandas API on Spark需要
pyarrow1.0.0pandas API on Spark需要
Numpy1.14pandas API on Spark需要

请注意,PySpark需要Java 8或更高版本,并正确设置Java_HOME。如果使用JDK 11,请设置Dio.netty.tryReflectionSetAccessible=true 以获取与箭头相关的功能。

AArch64(ARM64)用户注意:PyArrow是PySpark SQL所必需的,但PyArrow 4.0.0中引入了对AArch64的PyArrow支持。如果由于PyArrow安装错误导致PyArrow安装在AArch64上失败,可以按如下方式安装PyArrow>=4.0.0:

pip install "pyarrow>=4.0.0" --prefer-binary

三、DataFrame

PySpark应用程序从初始化SparkSession开始,SparkSession是PySpark的入口点,如下所示。如果通过PySpark可执行文件在PySpark shell中运行它,shell会自动在变量spark中为用户创建会话。

from pyspark.sql import SparkSessionspark = SparkSession.builder.getOrCreate()

1.创建

PySpark DataFrame能够通过pyspark.sql.SparkSession.createDataFrame创建,通常通过传递列表(list)、元组(tuples)和字典(dictionaries)的列表和pyspark.sql.Rows,Pandas DataFrame,由此类列表组成的RDD转换。pyspark.sql.SparkSession.createDataFrame接收schema参数指定DataFrame的架构(优化可加速)。省略时,PySpark通过从数据中提取样本来推断相应的模式。

创建不输入schema格式的DataFrame

from datetime import datetime, dateimport pandas as pdfrom pyspark.sql import Rowdf = spark.createDataFrame([    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))])df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

创建带有schema的DataFrame

df = spark.createDataFrame([    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))], schema='a long, b double, c string, d date, e timestamp')df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

从Pandas DataFrame创建

pandas_df = pd.DataFrame({    'a': [1, 2, 3],    'b': [2., 3., 4.],    'c': ['string1', 'string2', 'string3'],    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]})df = spark.createDataFrame(pandas_df)df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

通过由元组列表组成的RDD创建

rdd = spark.sparkContext.parallelize([    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))])df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

 以上的DataFrame格式创建的都是一样的。

df.printSchema()
root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: date (nullable = true) |-- e: timestamp (nullable = true)

2.查看

DataFrame.show()

使用格式:

df.show()
df.show(1)
+---+---+-------+----------+-------------------+|  a|  b|      c|         d|                  e|+---+---+-------+----------+-------------------+|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|+---+---+-------+----------+-------------------+only showing top 1 row

spark.sql.repl.eagerEval.enabled

spark.sql.repl.eagerEval.enabled用于在notebooks(如Jupyter)中快速生成PySpark DataFrame的配置。控制行数可以使用spark.sql.repl.eagerEval.maxNumRows。

spark.conf.set('spark.sql.repl.eagerEval.enabled', True)df

 

spark.conf.set('spark.sql.repl.eagerEval.maxNumRows',1)df

 

纵向显示

行也可以垂直显示。当行太长而无法水平显示时,纵向显示就很明显。

df.show(1, vertical=True)
-RECORD 0------------------ a   | 1 b   | 2.0 c   | string1 d   | 2000-01-01 e   | 2000-01-01 12:00:00only showing top 1 row

 查看DataFrame格式和列名

df.columns
['a', 'b', 'c', 'd', 'e']
df.printSchema()
root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: date (nullable = true) |-- e: timestamp (nullable = true)

查看统计描述信息

df.select("a", "b", "c").describe().show()
+-------+---+---+-------+|summary|  a|  b|      c|+-------+---+---+-------+|  count|  3|  3|      3||   mean|2.0|3.0|   null|| stddev|1.0|1.0|   null||    min|  1|2.0|string1||    max|  3|4.0|string3|+-------+---+---+-------+

DataFrame.collect()将分布式数据收集到驱动程序端,作为Python中的本地数据。请注意,当数据集太大而无法容纳在驱动端时,这可能会引发内存不足错误,因为它将所有数据从执行器收集到驱动端。

df.collect()
[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)), Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)), Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

 为了避免引发内存不足异常可以使用DataFrame.take()或者是DataFrame.tail():

df.take(1)
[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))]
df.tail(1)
[Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

PySpark DataFrame转换为Pandas DataFrame

 PySpark DataFrame还提供了到pandas DataFrame的转换,以利用pandas API。注意,toPandas还将所有数据收集到driver端,当数据太大而无法放入driver端时,很容易导致内存不足错误。

df.toPandas()

 

 3.查询

PySpark DataFrame是惰性计算的,仅选择一列不会触发计算,但它会返回一个列实例:

df.a
Column<'a'>

大多数按列操作都返回列:

from pyspark.sql import Columnfrom pyspark.sql.functions import uppertype(df.c) == type(upper(df.c)) == type(df.c.isNull())
True

上述生成的Column可用于从DataFrame中选择列。例如,DataFrame.select()获取返回另一个DataFrame的列实例:

df.select(df.c).show()
+-------+|      c|+-------+|string1||string2||string3|+-------+

添加新列实例:

df.withColumn('upper_c', upper(df.c)).show()
+---+---+-------+----------+-------------------+-------+|  a|  b|      c|         d|                  e|upper_c|+---+---+-------+----------+-------------------+-------+|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1||  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2||  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|+---+---+-------+----------+-------------------+-------+

条件查询DataFrame.filter()

df.filter(df.a == 1).show()
+---+---+-------+----------+-------------------+|  a|  b|      c|         d|                  e|+---+---+-------+----------+-------------------+|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|+---+---+-------+----------+-------------------+

 4.运算

Pandas_udf

PySpark支持各种UDF和API,允许用户执行Python本机函数。另请参阅最新的Pandas UDF( Pandas UDFs)和Pandas Function API( Pandas Function APIs)。例如,下面的示例允许用户在Python本机函数中直接使用pandas Series中的API。

Apache Arrow in PySpark

import pandas as pdfrom pyspark.sql.functions import pandas_udf@pandas_udf('long')def pandas_plus_one(series: pd.Series) -> pd.Series:    # Simply plus one by using pandas Series.    return series + 1df.select(pandas_plus_one(df.a)).show()
+------------------+|pandas_plus_one(a)|+------------------+|                 2||                 3||                 4|+------------------+

DataFrame.mapInPandas

DataFrame.mapInPandas允许用户在pandas DataFrame中直接使用API,而不受结果长度等任何限制。

def pandas_filter_func(iterator):    for pandas_df in iterator:        yield pandas_df[pandas_df.a == 1]df.mapInPandas(pandas_filter_func, schema=df.schema).show()
+---+---+-------+----------+-------------------+|  a|  b|      c|         d|                  e|+---+---+-------+----------+-------------------+|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|+---+---+-------+----------+-------------------+

5.分组

PySpark DataFrame还提供了一种使用常见方法,即拆分-应用-合并策略来处理分组数据的方法。它根据特定条件对数据进行分组,对每个组应用一个函数,然后将它们组合回DataFrame。

df = spark.createDataFrame([    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])df.show()
+-----+------+---+---+|color| fruit| v1| v2|+-----+------+---+---+|  red|banana|  1| 10|| blue|banana|  2| 20||  red|carrot|  3| 30|| blue| grape|  4| 40||  red|carrot|  5| 50||black|carrot|  6| 60||  red|banana|  7| 70||  red| grape|  8| 80|+-----+------+---+---+

 分组,然后将avg()函数应用于结果组。

df.groupby('color').avg().show()
+-----+-------+-------+|color|avg(v1)|avg(v2)|+-----+-------+-------+|  red|    4.8|   48.0|| blue|    3.0|   30.0||black|    6.0|   60.0|+-----+-------+-------+

还可以使用pandas API对每个组应用Python自定义函数。

def plus_mean(pandas_df):    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()
+-----+------+---+---+|color| fruit| v1| v2|+-----+------+---+---+|black|carrot|  0| 60|| blue|banana| -1| 20|| blue| grape|  1| 40||  red|banana| -3| 10||  red|carrot| -1| 30||  red|carrot|  0| 50||  red|banana|  2| 70||  red| grape|  3| 80|+-----+------+---+---+

 联合分组和应用函数

df1 = spark.createDataFrame(    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],    ('time', 'id', 'v1'))df2 = spark.createDataFrame(    [(20000101, 1, 'x'), (20000101, 2, 'y')],    ('time', 'id', 'v2'))def asof_join(l, r):    return pd.merge_asof(l, r, on='time', by='id')df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(    asof_join, schema='time int, id int, v1 double, v2 string').show()
+--------+---+---+---+|    time| id| v1| v2|+--------+---+---+---+|20000101|  1|1.0|  x||20000102|  1|3.0|  x||20000101|  2|2.0|  y||20000102|  2|4.0|  y|+--------+---+---+---+

 6.获取数据输入/输出

CSV简单易用。Parquet和ORC是高效紧凑的文件格式,读写速度更快。

PySpark中还有许多其他可用的数据源,如JDBC、text、binaryFile、Avro等。另请参阅Apache Spark文档中最新的Spark SQL、DataFrames和Datasets指南。Spark SQL, DataFrames and Datasets Guide

CSV

df.write.csv('foo.csv', header=True)spark.read.csv('foo.csv', header=True).show()

这里记录一个报错:

java.lang.UnsatisfiedLinkError:org.apache.hadoop.io.nativeio.NativeIO$Windows.access0

 将Hadoop安装目录下的 bin 文件夹中的 hadoop.dll 和 winutils.exe 这两个文件拷贝到 C:\Windows\System32 下,问题解决。

+---+---+-------+----------+--------------------+|  a|  b|      c|         d|                   e|+---+---+-------+----------+--------------------+|  1|2.0|string1|2000-01-01|2000-01-01T12:00:...||  2|3.0|string2|2000-02-01|2000-01-02T12:00:...||  3|4.0|string3|2000-03-01|2000-01-03T12:00:...|+---+---+-------+----------+--------------------+

 Parquet

df.write.parquet('bar.parquet')spark.read.parquet('bar.parquet').show()
+-----+------+---+---+|color| fruit| v1| v2|+-----+------+---+---+|black|carrot|  6| 60|| blue|banana|  2| 20|| blue| grape|  4| 40||  red|carrot|  5| 50||  red|banana|  7| 70||  red|banana|  1| 10||  red|carrot|  3| 30||  red| grape|  8| 80|+-----+------+---+---+

 ORC

df.write.orc('zoo.orc')spark.read.orc('zoo.orc').show()
+-----+------+---+---+|color| fruit| v1| v2|+-----+------+---+---+|  red|banana|  7| 70||  red| grape|  8| 80||black|carrot|  6| 60|| blue|banana|  2| 20||  red|banana|  1| 10||  red|carrot|  5| 50||  red|carrot|  3| 30|| blue| grape|  4| 40|+-----+------+---+---+

 四、结合Spark SQL

DataFrame和Spark SQL共享同一个执行引擎,因此可以无缝地互换使用。例如,可以将数据帧注册为表,并按如下方式轻松运行SQL:

df.createOrReplaceTempView("tableA")spark.sql("SELECT count(*) from tableA").show()
+--------+|count(1)|+--------+|       8|+--------+

 此外UDF也可在现成的SQL中注册和调用

@pandas_udf("integer")def add_one(s: pd.Series) -> pd.Series:    return s + 1spark.udf.register("add_one", add_one)spark.sql("SELECT add_one(v1) FROM tableA").show()

 

这些SQL表达式可以直接混合并用作PySpark列。

from pyspark.sql.functions import exprdf.selectExpr('add_one(v1)').show()df.select(expr('count(*)') > 0).show()


点关注,防走丢,如有纰漏之处,请留言指教,非常感谢

以上就是本期全部内容。我是fanstuck ,有问题大家随时留言讨论 ,我们下期见。

来源地址:https://blog.csdn.net/master_hunter/article/details/125855069

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯