文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

如何在pyspark中创建DataFrame

2023-06-15 02:54

关注

如何在pyspark中创建DataFrame?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

pyspark创建DataFrame

为了便于操作,使用pyspark时我们通常将数据转为DataFrame的形式来完成清洗和分析动作。

RDD和DataFrame

在上一篇pyspark基本操作有提到RDD也是spark中的操作的分布式数据对象。

这里简单看一下RDD和DataFrame的类型。

print(type(rdd))  # <class 'pyspark.rdd.RDD'>print(type(df))   # <class 'pyspark.sql.dataframe.DataFrame'>

翻阅了一下源码的定义,可以看到他们之间并没有继承关系。

class RDD(object):    """    A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.    Represents an immutable, partitioned collection of elements that can be    operated on in parallel.    """
class DataFrame(object):    """A distributed collection of data grouped into named columns.    A :class:`DataFrame` is equivalent to a relational table in Spark SQL,    and can be created using various functions in :class:`SparkSession`:: ...    """

RDD是一种弹性分布式数据集,Spark中的基本抽象。表示一种不可变的、分区储存的集合,可以进行并行操作。
DataFrame是一种以列对数据进行分组表达的分布式集合, DataFrame等同于Spark SQL中的关系表。相同点是,他们都是为了支持分布式计算而设计。

但是RDD只是元素的集合,但是DataFrame以列进行分组,类似于MySQL的表或pandas中的DataFrame。

如何在pyspark中创建DataFrame

实际工作中,我们用的更多的还是DataFrame。

使用二元组创建DataFrame

尝试第一种情形发现,仅仅传入二元组,结果是没有列名称的。
于是我们尝试第二种,同时传入二元组和列名称。

a = [('Alice', 1)]output = spark.createDataFrame(a).collect()print(output)# [Row(_1='Alice', _2=1)]output = spark.createDataFrame(a, ['name', 'age']).collect()print(output)# [Row(name='Alice', age=1)]

这里collect()是按行展示数据表,也可以使用show()对数据表进行展示。

spark.createDataFrame(a).show()# +-----+---+# |   _1| _2|# +-----+---+# |Alice|  1|# +-----+---+spark.createDataFrame(a, ['name', 'age']).show()# +-----+---+# | name|age|# +-----+---+# |Alice|  1|# +-----+---+

使用键值对创建DataFrame

d = [{'name': 'Alice', 'age': 1}]output = spark.createDataFrame(d).collect()print(output)# [Row(age=1, name='Alice')]

使用rdd创建DataFrame

a = [('Alice', 1)]rdd = sc.parallelize(a)output = spark.createDataFrame(rdd).collect()print(output)output = spark.createDataFrame(rdd, ["name", "age"]).collect()print(output)# [Row(_1='Alice', _2=1)]# [Row(name='Alice', age=1)]

基于rdd和ROW创建DataFrame

from pyspark.sql import Rowa = [('Alice', 1)]rdd = sc.parallelize(a)Person = Row("name", "age")person = rdd.map(lambda r: Person(*r))output = spark.createDataFrame(person).collect()print(output)# [Row(name='Alice', age=1)]

基于rdd和StructType创建DataFrame

from pyspark.sql.types import *a = [('Alice', 1)]rdd = sc.parallelize(a)schema = StructType(    [        StructField("name", StringType(), True),        StructField("age", IntegerType(), True)    ])output = spark.createDataFrame(rdd, schema).collect()print(output)# [Row(name='Alice', age=1)]

基于pandas DataFrame创建pyspark DataFrame

df.toPandas()可以把pyspark DataFrame转换为pandas DataFrame。

df = spark.createDataFrame(rdd, ['name', 'age'])print(df)  # DataFrame[name: string, age: bigint]print(type(df.toPandas()))  # <class 'pandas.core.frame.DataFrame'># 传入pandas DataFrameoutput = spark.createDataFrame(df.toPandas()).collect()print(output)# [Row(name='Alice', age=1)]

创建有序的DataFrame

output = spark.range(1, 7, 2).collect()print(output)# [Row(id=1), Row(id=3), Row(id=5)]output = spark.range(3).collect()print(output)# [Row(id=0), Row(id=1), Row(id=2)]

通过临时表得到DataFrame

spark.registerDataFrameAsTable(df, "table1")df2 = spark.table("table1")b = df.collect() == df2.collect()print(b)# True

配置DataFrame和临时表

创建DataFrame时指定列类型

在createDataFrame中可以指定列类型,只保留满足数据类型的列,如果没有满足的列,会抛出错误。

a = [('Alice', 1)]rdd = sc.parallelize(a)# 指定类型于预期数据对应时,正常创建output = spark.createDataFrame(rdd, "a: string, b: int").collect()print(output)  # [Row(a='Alice', b=1)]rdd = rdd.map(lambda row: row[1])print(rdd)  # PythonRDD[7] at RDD at PythonRDD.scala:53# 只有int类型对应上,过滤掉其他列。output = spark.createDataFrame(rdd, "int").collect()print(output)   # [Row(value=1)]# 没有列能对应上,会抛出错误。output = spark.createDataFrame(rdd, "boolean").collect()# TypeError: field value: BooleanType can not accept object 1 in type <class 'int'>

注册DataFrame为临时表

spark.registerDataFrameAsTable(df, "table1")spark.dropTempTable("table1")

获取和修改配置

print(spark.getConf("spark.sql.shuffle.partitions"))  # 200print(spark.getConf("spark.sql.shuffle.partitions", u"10"))  # 10print(spark.setConf("spark.sql.shuffle.partitions", u"50"))  # Noneprint(spark.getConf("spark.sql.shuffle.partitions", u"10"))  # 50

注册自定义函数

spark.registerFunction("stringLengthString", lambda x: len(x))output = spark.sql("SELECT stringLengthString('test')").collect()print(output)# [Row(stringLengthString(test)='4')]spark.registerFunction("stringLengthString", lambda x: len(x), IntegerType())output = spark.sql("SELECT stringLengthString('test')").collect()print(output)# [Row(stringLengthString(test)=4)]spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType())output = spark.sql("SELECT stringLengthInt('test')").collect()print(output)# [Row(stringLengthInt(test)=4)]

查看临时表列表

可以查看所有临时表名称和对象。

spark.registerDataFrameAsTable(df, "table1")print(spark.tableNames())  # ['table1']print(spark.tables())  # DataFrame[database: string, tableName: string, isTemporary: boolean]print("table1" in spark.tableNames())  # Trueprint("table1" in spark.tableNames("default"))  # Truespark.registerDataFrameAsTable(df, "table1")df2 = spark.tables()df2.filter("tableName = 'table1'").first()print(df2)  # DataFrame[database: string, tableName: string, isTemporary: boolean]

从其他数据源创建DataFrame

MySQL

前提是需要下载jar包。
Mysql-connector-java.jar

from pyspark import SparkContextfrom pyspark.sql import SQLContextimport pyspark.sql.functions as Fsc = SparkContext("local", appName="mysqltest")sqlContext = SQLContext(sc)df = sqlContext.read.format("jdbc").options(    url="jdbc:mysql://localhost:3306/mydata?user=root&password=mysql&"        "useUnicode=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&"        "useLegacyDatetimeCode=false&serverTimezone=UTC ", dbtable="detail_data").load()df.show(n=5)sc.stop()

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注编程网行业资讯频道,感谢您对编程网的支持。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯