分布式计算需要:
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。
RDD的分区是RDD数据存储的最小单位。一份RDD的数据,本质上分隔成了多个分区。
在初始化RDD(读取数据的时候)规划的时候,分区会尽量规划到存储数据所在的服务器上。
因为这样可以走本地读取,避免网络读取。
本地读取:Executor所在的服务器,同样是一个DataNode,同时这个DataNode上有它要读的数据,所以可以直接读取到机器硬盘即可,无需走网络传输。
总结:
Spark会在确保并行计算能力的前提下,尽可能确保本地读取。
Spark RDD 编程的程序入口对象是SparkContext对象(不论何种编程语言),只有构建出SparkContext, 基于它才能执行后续的API调用和计算。
本质上, SparkContext对编程来说, 主要功能就是创建第一个RDD出来。
from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 0. 初始化执行环境 构建SparkContext对象conf = SparkConf().setAppName("test").setMaster("local[*]")src = SparkContext(conf=conf)
RDD的创建主要有2种方式:
概念:并行化创建,是指:将本地集合转成分布式RDD
# _*_ coding:utf-8 _*_
"""
@Software :pyspark
@FileName :01_RDD_create_parallelize.py
@Date :2022/11/18 6:28
@Author :wuk
并行化集合,将本地集合转成分布式对象RDD
"""
from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 0. 初始化执行环境 构建SparkContext对象conf = SparkConf().setAppName("test").setMaster("local[*]")src = SparkContext(conf=conf)# 演示通过并行化集合的方式去创建RDD, 本地集合 -> 分布式对象(RDD)rdd = src.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])# parallelize方法, 没有给定 分区数, 默认分区数是多少? 根据CPU核心来定print(f"默认分区数量:{rdd.getNumPartitions()}")rdd = src.parallelize([1, 2, 3], 3)print("分区数: ", rdd.getNumPartitions())# collect方法, 是将RDD(分布式对象)中每个分区的数据, 都发送到Driver中, 形成一个Python List对象# collect: 分布式 转 -> 本地集合print("rdd的内容是: ", rdd.collect())
parallelize:
参数1:集合对象,比如list
参数2:分区数量
rdd.getNumPartitions()
# _*_ coding:utf-8 _*_
"""
@Software :pyspark
@FileName :02_RDD_create_textFile.py
@Date :2022/11/18 6:28
@Author :wuk
读取本地文件
"""
from pyspark import SparkContext, SparkConfif __name__ == '__main__':# 构建SparkContext对象conf = SparkConf().setAppName("text").setMaster("local[*]")context = SparkContext(conf=conf)# 通过textFile API 读取数据# 读取本地文件数据file = context.textFile("./data/words.txt")print(f"默认分区数量:{file.getNumPartitions()}")print(f"内容是{file.collect()}")# 加最小分区数参数的测试text_file = context.textFile("./data/words.txt", 100)print(f"默认分区数量:{text_file.getNumPartitions()}")print(f"内容是{text_file.collect()}")# 读取HDFS文件数据测试context_text_file = context.textFile("hdfs://master:8020/input/word.txt")print(context_text_file.getNumPartitions())print(context_text_file.collect())
textFile:
参数1:必填,读取文件路径,可以是本地文件,也可以是HDFS路径。
参数2:选填,表示最小分区数量(一般不会去设置)
# _*_ coding:utf-8 _*_
"""
@Software :pyspark
@FileName :03_RDD_create_wholeTextFile.py
@Date :2022/11/18 6:28
@Author :wuk
wholeTextFiles 读取文件夹下的所有文件内容
"""
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")context = SparkContext(conf=conf)files_rdd = context.wholeTextFiles("./data")print(files_rdd.collect())print(files_rdd.map(lambda x: x[1]).collect())
注意:该API适用于少量分区读取数据,是小文件专用。
注意:
定义:RDD的算子,返回值仍旧是一个RDD,称之为转换算子。
特性:这类算子是lazy懒加载的,如果没有action算子,转换算子是不工作的。
功能:将RDD数据一条条处理,处理逻辑基于map算子中接收的处理函数,返回新的RDD.
# _*_ coding:utf-8 _*_
"""
@Software :pyspark
@FileName :04_RDD_operators_map.py
@Date :2022/11/18 6:29
@Author :wuk
map的使用
"""
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf= SparkConf().setAppName("test").setMaster("local[*]")context = SparkContext(conf=conf)rdd1 = context.parallelize([1, 2, 3, 4, 5, 6], 3)print(rdd1.getNumPartitions())print(rdd1.map(lambda x: x * 10).collect())
语法如下:
功能:对rdd执行map操作,然后进行解嵌套
操作。
解除嵌套:
# _*_ coding:utf-8 _*_
"""
@Software :pyspark
@FileName :05_RDD_operators_flatMap.py
@Date :2022/11/19 14:21
@Author :wuk
@Description : flatMap的使用
"""
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")context = SparkContext(conf=conf)rdd = context.parallelize(["hadoop spark hadoop", "spark hadoop hadoop", "hadoop flink spark"])# 得到所有的单词, 组成RDD, flatMap的传入参数 和map一致, 就是给map逻辑用的, 解除嵌套无需逻辑(传参)print(rdd.map(lambda x: x.split(" ")).collect())print(rdd.flatMap(lambda x: x.split(" ")).collect())
功能:针对KV型RDD
,自动按照key进行分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合。
用法:
注意:reduceByKey中接收的函数,只负责聚合,不理会分组,分组是自动by key来分组的。
# _*_ coding:utf-8 _*_
"""
@Software :pyspark
@FileName :06_RDD_operators_reduceByKey.py
@Date :2022/11/19 14:32
@Author :wuk
@Description : reduceByKey的使用
"""
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")context = SparkContext(conf=conf)rdd = context.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('a', 1)])# reduceByKey 对相同key 的数据执行聚合相加print(rdd.reduceByKey(lambda a, b: a + b).collect())
聚合逻辑:
功能:将RDD的数据进行分组
语法:
# _*_ coding:utf-8 _*_
"""
@Software :pyspark
@FileName :08_RDD_operators_groupBy.py
@Date :2022/11/19 14:33
@Author :wuk
@Description : group by的使用
"""
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 2), ('b', 3)])# 通过groupBy对数据进行分组# groupBy传入的函数的 意思是: 通过这个函数, 确定按照谁来分组(返回谁即可)# 分组规则 和SQL是一致的, 也就是相同的在一个组(Hash分组)print(rdd.groupBy(lambda t: t[0]).map(lambda t: (t[0], list(t[1]))).collect())rdd = sc.parallelize([1, 2, 3, 4, 5, 6])print(rdd.groupBy(lambda t: "even" if (t % 2 == 0) else "odd").map(lambda t: (t[0], list(t[1]))).collect())
功能:过滤想要的数据进行保留
语法:
# _*_ coding:utf-8 _*_
"""
@Software :pyspark
@FileName :09_RDD_operators_filter.py
@Date :2022/11/19 14:33
@Author :wuk
@Description : filter的使用
"""
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5, 6])print(rdd.filter(lambda x: x % 2 == 0).collect())
功能:对Rdd数据进行去重,返回新RDD。
语法:
# _*_ coding:utf-8 _*_
"""
@Software :pyspark
@FileName :10_RDD_operators_distinct.py
@Date :2022/11/19 14:33
@Author :wuk
@Description : distinct的使用
"""
from pyspark import SparkContext, SparkConfif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 1, 1, 2, 2, 2, 3, 3, 3])# distinct 进行RDD数据去重操作print(rdd.distinct().collect())rdd2 = sc.parallelize([('a', 1), ('a', 1), ('a', 3), ('b', 3)])print(rdd2.distinct().collect())
功能:两个RDD合并成一个RDD返回
注意:
只合并,不去重,不同类型的依旧可以混合。
# _*_ coding:utf-8 _*_
"""
@Software :pyspark
@FileName :11_RDD_operators_union.py
@Date :2022/11/19 14:34
@Author :wuk
@Description : union的使用
"""
from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd1 = sc.parallelize([1, 1, 3, 3])rdd2 = sc.parallelize(["a", "b", "a"])rdd3 = rdd1.union(rdd2)print(rdd3.collect())
功能:对于两个RDD执行join操作,实现内外连接。
注意:join只适合于二元元组。
# coding:utf8from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd1 = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhaoliu")])rdd2 = sc.parallelize([(1001, "销售部"), (1002, "科技部")])# 通过join算子来进行rdd之间的关联# 对于join算子来说 关联条件 按照二元元组的key来进行关联print(rdd1.join(rdd2).collect())# 左外连接, 右外连接 可以更换一下rdd的顺序 或者调用rightOuterJoin即可print(rdd1.leftOuterJoin(rdd2).collect())print(rdd1.rightOuterJoin(rdd2).collect())
功能:求两个RDD的交集。
# coding:utf8from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd1 = sc.parallelize([('a', 1), ('a', 3)])rdd2 = sc.parallelize([('a', 1), ('b', 3)])# 通过intersection算子求RDD之间的交集, 将交集取出 返回新RDDprint(rdd1.intersection(rdd2).collect())
功能:将RDD的数据,加上嵌套,这个嵌套按照分区
来进行。
# coding:utf8from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)# 将数据进行分区print(rdd.glom().collect())
功能:针对kv型RDD,自动按照key分组
# coding:utf8from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])rdd2 = rdd.groupByKey()print(rdd2.collect())print(rdd2.map(lambda x: (x[0], list(x[1]))).collect())
功能:对RDD数据进行排序,基于你指定的排序依据。
语法:
# coding:utf8from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([('c', 3), ('f', 1), ('b', 11), ('c', 3), ('a', 1), ('c', 5), ('e', 1), ('n', 9), ('a', 1)], 3)# 使用sortBy对rdd执行排序# 按照value 数字进行排序# 参数1函数, 表示的是 , 告知Spark 按照数据的哪个列进行排序# 参数2: True表示升序 False表示降序# 参数3: 排序的分区数"""注意: 如果要全局有序, 排序分区数请设置为1"""print(rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=1).collect())# 按照key来进行排序print(rdd.sortBy(lambda x: x[0], ascending=False, numPartitions=1).collect())
功能:针对kv型的rdd,根据key进行排序。
语法:
# coding:utf8from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([('a', 1), ('E', 1), ('C', 1), ('D', 1), ('b', 1), ('g', 1), ('f', 1),('y', 1), ('u', 1), ('i', 1), ('o', 1), ('p', 1),('m', 1), ('n', 1), ('j', 1), ('k', 1), ('l', 1)], 3)print(rdd.sortByKey(ascending=True, numPartitions=3, keyfunc=lambda key: str(key).lower()).collect())
方式1:在pyCharm中直接执行
如果在PyCharm中直接提交yarn,依赖了其他的python文件,可以通过设置属性来指定依赖的代码
方式2:在服务器上通过spark-submit提交到集群运行
定义:返回值不是RDD的都是动作算子。
功能:统计key出现的次数,一般适用于KV型RDD
# coding:utf8from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.textFile("./data/words.txt")rdd2 = rdd.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1))# 通过countByKey来对key进行计数, 这是一个Action算子result = rdd2.countByKey()print(result)print(type(result))
功能:将RDD各个分区的数据,统一收集到Driver中,形成一个List对象
用法:
rdd.collect()
功能:对RDD数据按照传入的逻辑进行聚合。
语法:
功能:和reduce一样,接收传入逻辑进行聚合,聚合是带有初始值的。
功能:取出RDD第一个元素
用法:
功能:取出前N个元素,组成List返给你
用法:
功能:对RDD数据集进行降序排序,取前N个
用法:
功能:计算RDD有多少条数据,返回值是一个数字。
用法:
功能:随机抽样RDD的数据
用法:
功能:对RDD进行排序取前N个
用法:
功能:对RDD每一个元素,执行你提供的逻辑操作,和map一致,不过没有返回值。
用法:
功能:将RDD的数据写入文本文件中,支持本地写出,HDFS等文件系统。
注意点:
对RDD的分区执行重新分区(仅数量)
# coding:utf8from pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5], 3)# repartition 修改分区print(rdd.repartition(1).getNumPartitions())print(rdd.repartition(5).getNumPartitions())# coalesce 修改分区print(rdd.coalesce(1).getNumPartitions())print(rdd.coalesce(5, shuffle=True).getNumPartitions())
groupByKey+聚合逻辑(Shuffle)
的,原因是reduceByKey在分组前已经做了预聚合,那么在Shuffle分组节点,被Shuffle的数据可以极大的减少
,如下图所示:rdd之间进行迭代计算,当执行开启以后,新的rdd生成,老的就会消失,rdd的数据是过程数据,只在处理的过程中存在,一旦处理完成了,数据就不存在了
。
该特性可以最大化利用资源,老旧rdd没有用了,就从内存中清理,给后续的计算腾出空间。
如图所示:
如上图,rdd3被两次使用,第一次使用后,其实rdd3就已经不存在了,第二次使用的时候,只能基于rdd的血缘关系,从rdd1重新执行,构建出rdd3。
针对上述rdd的过程数据问题,肯定需要优化,优化就是不要让rdd3消失,所以提供了rdd缓存技术,可以将指定的rdd数据保留在硬盘或者内存中。
缓存API如下:
所以缓存有一个特点就是,保留RDD的血缘关系,一旦发生丢失,就可以基于血缘关系,重新计算这个RDD的数据
。RDD是将自己的分区数据,每个分区自行将其数据保存在所在的Executor内存和硬盘上,这就是分散存储。
# coding:utf8
import timefrom pyspark import SparkConf, SparkContext, StorageLevelif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd1 = sc.textFile("./data/words.txt")rdd2 = rdd1.flatMap(lambda x: x.split(" "))rdd3 = rdd2.map(lambda x: (x, 1))rdd3.cache()# 缓存到磁盘和内存中print(rdd3.persist().getStorageLevel())rdd4 = rdd3.reduceByKey(lambda a, b: a + b)print(rdd4.collect())rdd5 = rdd3.groupByKey()rdd6 = rdd5.mapValues(lambda x: sum(x))print(rdd6.collect())# 清理缓存rdd3.unpersist()
checkPoint技术,也是将RDD数据保存下来,但是它只支持硬盘存储,并且被设定是安全的,不会保留血缘关系
。
checkPoint存储RDD数据,是集中收集各个分区数据进行存储,而缓存是分散存储
。
缓存和checkPoint的对比:
API实现如下:
注意:
checkPoint是一种重量级的使用,也就是RDD的重新计算成本很高的时候,我们采用checkPoint比较合适,或者数据量很大,使用checkPoint比较合适。
如果数据量比较小,或者RDD重新计算速度比较快,用checkPoint没有必要,直接用缓存即可。
# coding:utf8
import timefrom pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevelif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)# 1. 告知spark, 开启CheckPoint功能sc.setCheckpointDir("hdfs://master:8020/output/ckp")rdd1 = sc.textFile("./data/words.txt")rdd2 = rdd1.flatMap(lambda x: x.split(" "))rdd3 = rdd2.map(lambda x: (x, 1))# 调用checkpoint API 保存数据即可rdd3.checkpoint()rdd4 = rdd3.reduceByKey(lambda a, b: a + b)print(rdd4.collect())rdd5 = rdd3.groupByKey()rdd6 = rdd5.mapValues(lambda x: sum(x))print(rdd6.collect())rdd3.unpersist()