文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

如何在windowns中配置PySpark环境

2023-06-15 02:58

关注

如何在windowns中配置PySpark环境?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

首先需要下载hadoop和spark,解压,然后设置环境变量。
hadoop清华源下载
spark清华源下载

HADOOP_HOME => /path/hadoopSPARK_HOME => /path/spark

安装pyspark。

pip install pyspark

基本使用

可以在shell终端,输入pyspark,有如下回显:

如何在windowns中配置PySpark环境

输入以下指令进行测试,并创建SparkContext,SparkContext是任何spark功能的入口点。

>>> from pyspark import SparkContext>>> sc = SparkContext("local", "First App")

如果以上不会报错,恭喜可以开始使用pyspark编写代码了。
不过,我这里使用IDE来编写代码,首先我们先在终端执行以下代码关闭SparkContext。

>>> sc.stop()

下面使用pycharm编写代码,如果修改了环境变量需要先重启pycharm。
在pycharm运行如下程序,程序会起本地模式的spark计算引擎,通过spark统计abc.txt文件中a和b出现行的数量,文件路径需要自己指定。

from pyspark import SparkContextsc = SparkContext("local", "First App")logFile = "abc.txt"logData = sc.textFile(logFile).cache()numAs = logData.filter(lambda s: 'a' in s).count()numBs = logData.filter(lambda s: 'b' in s).count()print("Line with a:%i,line with b:%i" % (numAs, numBs))

运行结果如下:

20/03/11 16:15:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/03/11 16:15:58 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Line with a:3,line with b:1

这里说一下,同样的工作使用python可以做,spark也可以做,使用spark主要是为了高效的进行分布式计算。
戳pyspark教程
戳spark教程

RDD

RDD代表Resilient Distributed Dataset,它们是在多个节点上运行和操作以在集群上进行并行处理的元素,RDD是spark计算的操作对象。
一般,我们先使用数据创建RDD,然后对RDD进行操作。
对RDD操作有两种方法:
Transformation(转换) - 这些操作应用于RDD以创建新的RDD。例如filter,groupBy和map。
Action(操作) - 这些是应用于RDD的操作,它指示Spark执行计算并将结果发送回驱动程序,例如count,collect等。

创建RDD

parallelize是从列表创建RDD,先看一个例子:

from pyspark import SparkContextsc = SparkContext("local", "count app")words = sc.parallelize(    ["scala",     "java",     "hadoop",     "spark",     "akka",     "spark vs hadoop",     "pyspark",     "pyspark and spark"     ])print(words)

结果中我们得到一个对象,就是我们列表数据的RDD对象,spark之后可以对他进行操作。

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

Count

count方法返回RDD中的元素个数。

from pyspark import SparkContextsc = SparkContext("local", "count app")words = sc.parallelize(    ["scala",     "java",     "hadoop",     "spark",     "akka",     "spark vs hadoop",     "pyspark",     "pyspark and spark"     ])print(words)counts = words.count()print("Number of elements in RDD -> %i" % counts)

返回结果:

Number of elements in RDD -> 8

Collect

collect返回RDD中的所有元素。

from pyspark import SparkContextsc = SparkContext("local", "collect app")words = sc.parallelize(    ["scala",     "java",     "hadoop",     "spark",     "akka",     "spark vs hadoop",     "pyspark",     "pyspark and spark"     ])coll = words.collect()print("Elements in RDD -> %s" % coll)

返回结果:

Elements in RDD -> ['scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark']

foreach

每个元素会使用foreach内的函数进行处理,但是不会返回任何对象。
下面的程序中,我们定义的一个累加器accumulator,用于储存在foreach执行过程中的值。

from pyspark import SparkContextsc = SparkContext("local", "ForEach app")accum = sc.accumulator(0)data = [1, 2, 3, 4, 5]rdd = sc.parallelize(data)def increment_counter(x):    print(x)    accum.add(x) return 0s = rdd.foreach(increment_counter)print(s)  # Noneprint("Counter value: ", accum)

返回结果:

None
Counter value:  15

filter

返回一个包含元素的新RDD,满足过滤器的条件。

from pyspark import SparkContextsc = SparkContext("local", "Filter app")words = sc.parallelize(    ["scala",     "java",     "hadoop",     "spark",     "akka",     "spark vs hadoop",     "pyspark",     "pyspark and spark"])words_filter = words.filter(lambda x: 'spark' in x)filtered = words_filter.collect()print("Fitered RDD -> %s" % (filtered)) Fitered RDD -> ['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']

也可以改写成这样:

from pyspark import SparkContextsc = SparkContext("local", "Filter app")words = sc.parallelize(    ["scala",     "java",     "hadoop",     "spark",     "akka",     "spark vs hadoop",     "pyspark",     "pyspark and spark"])def g(x):    for i in x:        if "spark" in x:            return iwords_filter = words.filter(g)filtered = words_filter.collect()print("Fitered RDD -> %s" % (filtered))

map

将函数应用于RDD中的每个元素并返回新的RDD。

from pyspark import SparkContextsc = SparkContext("local", "Map app")words = sc.parallelize(    ["scala",     "java",     "hadoop",     "spark",     "akka",     "spark vs hadoop",     "pyspark",     "pyspark and spark"])words_map = words.map(lambda x: (x, 1, "_{}".format(x)))mapping = words_map.collect()print("Key value pair -> %s" % (mapping))

返回结果:

Key value pair -> [('scala', 1, '_scala'), ('java', 1, '_java'), ('hadoop', 1, '_hadoop'), ('spark', 1, '_spark'), ('akka', 1, '_akka'), ('spark vs hadoop', 1, '_spark vs hadoop'), ('pyspark', 1, '_pyspark'), ('pyspark and spark', 1, '_pyspark and spark')]

Reduce

执行指定的可交换和关联二元操作后,然后返回RDD中的元素。

from pyspark import SparkContextfrom operator import addsc = SparkContext("local", "Reduce app")nums = sc.parallelize([1, 2, 3, 4, 5])adding = nums.reduce(add)print("Adding all the elements -> %i" % (adding))

 这里的add是python内置的函数,可以使用ide查看:

def add(a, b):    "Same as a + b."    return a + b

reduce会依次对元素相加,相加后的结果加上其他元素,最后返回结果(RDD中的元素)。

Adding all the elements -> 15

Join

返回RDD,包含两者同时匹配的键,键包含对应的所有元素。

from pyspark import SparkContextsc = SparkContext("local", "Join app")x = sc.parallelize([("spark", 1), ("hadoop", 4), ("python", 4)])y = sc.parallelize([("spark", 2), ("hadoop", 5)])print("x =>", x.collect())print("y =>", y.collect())joined = x.join(y)final = joined.collect()print( "Join RDD -> %s" % (final))

返回结果:

x => [('spark', 1), ('hadoop', 4), ('python', 4)]
y => [('spark', 2), ('hadoop', 5)]
Join RDD -> [('hadoop', (4, 5)), ('spark', (1, 2))]

看完上述内容,你们掌握如何在windowns中配置PySpark环境的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注编程网行业资讯频道,感谢各位的阅读!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯