Spark编程基础
RDD
RDD是Spark编程中最基本的数据对象。RDD是Spark应用中的数据集,无论是最初的加载的数据集还是任何中间结果的数据集,或是最终的结果数据集,都是RDD.大多数Spark应用从外部数据加载RDD,然后对已有的RDD进行操作来创建新的RDD.这些操作就是 转化操作(trnsformation).这个过程不断重复,直到需要进行输出操作为止,比如把应用的结果写入文件系统这样,这种操作则是 行动操作.
RDD本质上是对象分布在各节点上的集合,用来表示Spark程序中的数据。
在PySpark中,RDD是由分布在各个节点上Python对象组成的,这里的对象可以是列表、元组、字典等.
RDD内部的对象与列表中的元素一样,可以是任何类型的对象.它可以是原生数据类型.
尽管可以选择把RDD持久化到硬盘,RDD主要还是存储在内存,至少是预期存储在内存中.因为Spark最初的用例之一就是支持机器学习,所以Spark的RDD提供了一种有限制的共享内存,这样Spark可以让连续和迭代的操作更高效地重用数据.
Hadoop的MapReduce实现的一大缺点就是它把中间数据持久化到硬盘上.并在运行时在节点间拷贝数据.虽然MapReduce的这种共享数据的分布式处理方式确定提供了弹性和容错性,但它牺牲了低延迟.这种设计的局限性是Spark项目诞生的主要催化剂之一.
RDD(弹性分布式数据集)是对这一概念的精确而简介的描述.
- 弹性:RDD是有弹性的,意思是Spark中的一个执行任务的节点丢失了,数据集依然可以被重建出来,因为Spark有每个RDD的谱系,也就是从头构建RDD的步骤
- 分布式:RDD是分布式的,RDD中的数据被分到至少一个分区中,在集群上跨工作节点分布式地作为对象集合保存在内存中.
- 数据集:RDD是由记录组成地数据集.记录是数据集中可以唯一区分的数据的集合.一条记录可以是由几个字段组成的,这类似于关系型数据库里面表中的行,或是文件中的一行文本,或其他的一些格式中类似的结构.
RDD的另一个关键特性是不可变,也就是说,在实例化出来并导入数据后,就无法更新了.每次对已有RDD进行转化操作(map或filter)都会生成新的RDD。
行动操作是RDD的另一种可用操作.行动操作产生的输出可以是把RDD中的数据以某种形式返回到驱动程序,也可以是把RDD的内容保存到文件系统(本地文件系统、HDFS、S3及其他一些文件系统均可).还有很多其他的行动操作,比如返回RDD中记录的条数.
加载数据到RDD
在用数据填充RDD后,再创建RDD就很快乐.在Spark程序中,对已有RDD进行转换操作的结果会成为新的RDD.
要开始一个Spark程序,你需要从外部源的数据初始化出至少一个RDD.接下来通过一系列转化操作和行动操作,用这个初始RDD创建更多的中间结果RDD,以及最终的RDD.初始RDD可以使用以下几种方式创建:
- 从文件中读取数据
- 从SQL或NoSQL数据存储等数据源读取数据
- 从流数据中读取数据
从文件中创建RDD
Spark提供了从文件创建RDD的API,这里的文件可以是单个文件、一组文件,也可以是一个目录.文件可以是多种格式的,比如非结构化的文本文件,诸如json这种半结构化的文件,也可以是CSV文件之类的结构化数据源.Spark也支持集中常见的序列化过的二进制编码文件格式.
- Spark与文件压缩
Spark原生支持几种无损压缩格式.Spark可以从集中常见的压缩格式的文件中无缝读取数据.这些压缩格式包括GZIP和ZIP还有BZIP2压缩包
Spark也提供原生的编码器,为数据压缩与解压提供支持,这样Spark就能读写压缩文件了.内置的编码器包括基于LZ77的无损压缩格式LZ4和LZF,还有Snappy. - RDD的数据本地化
默认情况下,Spark会尝试从靠近数据的节点读取数据.因为Spark访问的通常都是分区的分布式数据,比如HDFS或S3上的数据,所以Spark会创建分区来存放底层的分布式文件系统上的数据块,来优化转化操作的执行.
从文本文件创建RDD
Spark从文件中创建RDD的方法支持多种文件系统.URI中的协议指定了对应的文件系统.协议就是”://“前面的前缀.最常见的协议就是网址所用的协议:http:// 和 https:// 了。
- textFile()
1
sc.textFile(name,minPartitions=None,use_unicode=True)
textFile()方法可以用来从文件(压缩或未压缩)、目录、模式匹配路径(通配符匹配的一组文件)等创建RDD.
参数minPartitions决定要创建的RDD的分区.你可以设置比实际数据块数量更大的分区数,但是当设置的值比实际数据块数量少的时候,实际的行为就会和默认情况下一个数据块对应一个分区的情况没有区别.
参数use_unicode规定是使用Unicode编码还是UTF-8编码作为字符的编码协议.
- wholeTextFiles()
语法:1
sc.wholeTextFiles(path,minPartitions=None,use_unicode=True)
可以用wholeTextFiles()方法来读取包含多个文件的整个目录.每个文件会作为一条记录,其中文件名是记录的键,而文件的内容是记录的值.而使用textFile()方法读入一个目录下所有的文件时,每个文件的每一行都成为了一条单独的记录.
从对象文件创建RDD
Spark支持几种常见的对象文件的实现。术语对象文件指序列化后的数据结构,通常无法直接阅读,为数据提供结构或上下文,使得平台对数据的访问更加高效.
sequences 文件是Hadoop中常用的编码后的序列化文件.你可以使用sequenceFile()方法创建RDD.还有一个类似的方法叫做hadoopFile()。
另外,Spark也支持读写pickle文件,它是Python专用的序列化格式.objectFile()方法也对序列化过的java对象提供了类似的功能.
Spark也能原生支持JSON文件.
从数据源创建RDD
在Spark程序中,经常需要以数据库为数据源读取数据到RDD,作为历史数据、主数据、参考数据或查找数据.这种数据可以来自各种托管系统和数据库平台,包括Oracle,MySQL,Postgres,以及SQL Server等.
与使用外部文件创建RDD一样,使用外部数据库(比如MySQL数据库)中的数据创建RDD也会尝试把数据放到跨工作节点的多个分区中.这样可以最大化处理时的并发程度,尤其是第一阶段.另外,如果你对表进行划分(一般按照键的空间划分),把表中分到不同的分区,这些分区也可以并行加载,每个分区负责读取读取不重复的一部分的数据.
使用关系型数据库的表或者查询创建RDD,最好使用SparkSession对象里的函数.SparkSession是在Spark中操作包括表型数据在内的各种类型的数据的入口.SparkSession对象提供了read函数,返回一个DataFrameReader对象.接下来就可以用这个对象把数据读取到DataFrame中,DataFrame是一种特殊的RDD类型.
read()方法提供了jdbc函数,可以连接到任何兼容java数据库连接(JDBC)的数据源并收集数据.
从JSON文件创建RDD
语法1
spark.read.json(path,schema=None)
通过编程创建RDD
- parallelize()语法:
1
sc.parallelize(c,numSlices=None)
parallelize()方法要求列表已经创建好,并作为c参数传入.参数numSlices指定了所需创建的分区数量.
- range()语法
1
sc.range(start,end=None,step=1,numSlices=None)
range()方法会生成一个列表,并从这个列表创建并分发RDD.参数start、end、step定义了一个数值序列,而numSlices指定了所需的分区数量.
RDD操作
RDD核心概念
Spark中的转化操作是操作RDD并返回一个新RDD的函数,而行动操作是操作RDD并返回一个值或进行输出.
需介绍两个概念:粗粒度转化操作与惰性求值
- 粗粒度转化操作:RDD进行的操作被认为是粗粒度的,因为这些操作会把函数作用作用于数据集里的每一个元素,并返回转化操作应用后得到的新数据集.
- 细粒度转化操作:它可以操控单条记录或单元格,比如关系型数据库里面单条记录的更新,或NoSQL数据库中的put操作。
- 惰性求值:在处理Spark程序时,Spark使用惰性求值,也叫作惰性执行.
- 转化操作、行动操作、惰性求值
转化操作是对RDD进行的产生新RDD的操作.常见的转化操作包括map和filter函数.
下面操作时对已有RDD进行转化操作产生新的RDD:1
2originalrdd=sc.parallelize([0,1,2,3,4])
newrdd=originalrdd.filter(lambda x:x%2)
originalrdd源于并行化的数字集合.接着转化操作filter应用到originalrdd的每个元素,略过集合中的所有偶数.这个转化操作生成了名为newrdd的新RDD.
与返回新RDD对象的转化操作相反,行动操作向驱动器程序返回值或数据.常见的行动操作包括reduce(),collect(),count(),还有saveAsTextFile().下面的例子使用行动操作collect()展示newrdd的内容:1
newrdd.collect()
在处理Spark程序时,Spark使用惰性求值(lazy evaluation),也叫作惰性执行.惰性求值将处理过程推到调用行动操作时(也就是需要进行输出时).这可以轻松地使用交互式shell展现出来,你可以接连输入多个对RDD的转化操作方法,没有任何实际处理会发生.在请求了类似count()或saveAsTextFile()这样的行动操作后,Spark会创建出DAG图以及逻辑执行计划和物理执行计划.接下来驱动器进程就跨执行器协调并管理计划的执行.
惰性求值让Spark可以尽可能组合各种操作,这样可以减少处理的阶段,在数据混洗(shuffle)的过程里最小化在Spark执行器间传输的数据量.
- RDD持久化与重用
RDD主要创建和存在于执行器的内存中.默认情况下,RDD是易逝对象,仅在需要的时候存在.在它们被转化为新的RDD并不被其他操作所依赖后,这些RDD就被永远地操作了.如果一个RDD在多个行动操作中用到,就会产生问题,因为这个RDD每次都需要整个重新求值.解决这个问题的一种方式就是使用persist()方法缓存或持久化RDD.
未持久化的RDD用于多个行动操作1
2
3
4
5
6numbers=sc.range(0,10000,1,2)
evens=numbers.filter(lambda x:x%2)
noelements=evens.count()
print('There are %s elements in the collection'%(noelements))
listofelements=evens.collect()
print("The first five elements include "+(str(listofelements[0:5])))
将持久化的RDD用于多个行动操作1
2
3
4
5
6
7numbers=sc.range(0,10000,1,2)
evens=numbers.filter(lambda x:x%2)
evens.persist()
noelements=evens.count()
print('There are %s elements in the collection'%(noelements))
listofelements=evens.collect()
print("The first five elements include "+(str(listofelements[0:5])))
如果使用persist()方法(注意还有个类似的cache()方法)请求了持久化RDD,RDD就会在第一个行动操作调用它之后,驻留在集群里参与对其求值的各节点的内存中.你可以在Spark应用用户界面的Storage标签页看到持久化的RDD
- RDD谱系
Spark维护每个RDD的谱系,也就是获取这个RDD所需的一系列转化操作的序列.前面介绍过,默认情况下每个RDD操作都会重新计算整个谱系,除非调用了RDD持久化
在一个RDD的谱系,每个RDD都有父RDD或子RDD.Spark创建由RDD之间的依赖关系组成的有向无环图(DAG).RDD分阶段处理,这些阶段就是转化操作的集合.RDD之间的依赖关系可以是窄依赖,也可以是宽依赖.
窄依赖,也叫窄操作,可以根据下列特性区分出来:
- 多个操作可以合并为一个阶段,比如对同一个数据集进行的map()操作和filter()操作可以在数据集的各元素的一轮遍历中处理.
- 子RDD仅依赖于一个父RDD,比如一个从文本文件创建的RDD(父RDD)与一个在一个阶段内完成一组转化操作的子RDD
- 不需要进行节点间的数据混洗
窄操作是比较好的,因为它们能最大化地并行执行,同时最小化数据混洗,而数据混洗是开销很大的操作,而且可能成为执行的瓶颈
宽依赖,也就是宽操作有下列特性: - 宽操作定义出新的阶段并通常需要数据混洗
- RDD有多个依赖,比如join()操作里一个RDD需要依赖两个或更多代价.
宽操作在分组、归约、连接数据集时无法避免,但你应该了解使用这些操作的影响和代价.
RDD容错性
Spark记录了每个RDD的谱系,包括所有父RDD的谱系,以及父RDD的父RDD的谱系,以此类推.当发生崩溃时,比如一个节点崩溃时,任何RDD都可以把它的全部分区重建为原来的状态.因为RDD是分布式的,所以RDD可以容忍并从任何单节点崩溃的情况下恢复.RDD的类型
除了包含所有RDD通用的成员(属性与函数)的基本RDD类以外,还有一些特殊的RDD实现,它们可以支持额外的操作符和函数.这些RDD包括了下列附加类型:
- PairRDD:由键值对组成的RDD.
- DoubleRDD:仅由一组双精度浮点数组成的RDD
- DataFrame():按一组有固定名字和类型的列来组织的分布式数据集。DataFrame等价于Spark SQL中的关系型表.
- SequenceFileRDD:从压缩的或未压缩的SequenceFile创建出的RDD
- Hadoop:使用第一版的MapReduce API,提供读取存储在HDFS上的数据的核心功能的RDD
- NewHadoopRDD:使用新版API提供读取存储在Hadoop上的数据的核心功能的RDD
- CoGroupedRDD:对多个父RDD进行共同分组得到的RDD.对于父RDD的每个键,产生的RDD中都对应一个元组,其中包含由键对应的值组成的列表.
- JdbcRDD:从JDBC连接进行SQL查询获得的RDD.它只能在Scala API中使用
- PartitionPruningRDD:用来裁剪RDD分区以避免在所有分区上启动任务的RDD.
- ShuffledRDD:数据混洗产生的RDD,比如对数据进行重新分区
- UnionRDD:对两个以上的RDD进行union()操作产生的RDD.
基本的RDD转化操作
最常使用的函数包括map()、flatMap()、filter(),还有distinct()。你还会了解到groupBy()函数和sortBy()函数,它们常被其他函数实现.
对数据进行分组是执行类似求和、计数等聚合操作或总结操作中常用的前置操作.对数据排序则是另一个有用的操作.
- map()
RDD.map(,preservesPartitioning=False)
转化操作map()是所有转化操作中最基本的.它将一个具名函数或匿名函数对数据集内所有的元素进行求值.一个或多个map()函数可以异步执行,因为它们不会产生副作用,不需要维护状态,也不会尝试与别的map()操作通信或同步.也就是说无共享的操作.
参数preservesPartitioning是可选的,为Boolean类型的参数,用于定义了分区规则的RDD,它们由定义好的键,并按照键的哈希值或范围进行了分组.它们一般是键值对RDD.如果这个参数被设置为True,这些分区会保持完整. - flatMap()
RDD.flatMap(,preservesPartitioning=False)
转化操作flatMap()转化操作map()类似,它们都将函数作用于输入数据集的每一条记录.但是,flatMap()还会”拍平”输出数据,这表示他会去掉一层嵌套. - filter()
转化操作filter将一个Boolean类型的表达式对数据集里的每个元素进行求值,这个表达式通常用匿名函数表示.返回的布尔值决定了该记录是否被包含在产生的输出RDD里.这也是常用的转化操作,用于从RDD中移除不需要的记录作为中间结果,或者移除不需要放在放在最终输出里的记录. - distinct()
RDD.distinct(numPartitions=None)
转化操作distinct()返回一个新的RDD,其中仅包含输入RDD中去重后的元素.他可以用来去除重复的值.重复的值指数据集里的一条记录和另一条记录完全相同元素或字段. - groupby()
RDD.groupby(,numPartitions=None)
转化操作groupby()返回一个按指定函数对元素进行分组的RDD - sortBy()
RDD.sortBy(,ascending=True,numPartitions=None)
转化操作sortBy()将RDD按照参数选出的指定数据集的键进行排序.它根据键对象的类型顺序进行排序. 基本的RDD行动操作
Spark中的行动操作要么返回值,比如count();要么返回数据,比如collect();要么保存数据到外部,比如saveAsTextFile().在所有情况中,行动操作都会对RDD及其所有父RDD强制进行计算.一些行动操作返回计数,或是数据的聚合值,或是RDD中全部或部分数据.与这些不同的是,行动操作foreach()会对RDD中的每个元素执行一个函数 - count()
RDD.count()
返回一个long类型的值,代表RDD中的元素的个数. - collect()
RDD.collect()
行动操作collect()向Spark驱动器进程返回一个由RDD中所有元素组成的列表.collect()没有限制输出 - take()
RDD.take(n)
返回RDD的前n个元素 - top()
RDD.top(n,key=None)
行动操作top()返回一个RDD中的前n个元素,但是和take()不同的是,如果使用top(),元素会排序并按照降序输出.顺序是由对象类型决定. - first()
RDD.first()
返回RDD中的第一个元素 - reduce()
RDD.reduce()
行动操作reduce()使用指定的满足交换律和/或结合律的运算符来归约RDD中的所有元素. - fold()
RDD.fold(zeroValue,)
使用给定的function和zeroValue把RDD中每个分区的元素进行聚合,然后把每个分区的聚合结果再聚合.尽管reduce()和fold()的功能相似,但它们还是有区别的,fold()不满足交换律。 - foreach()
RDD.foreach()
行动操作foreach()把参数指定的具名或匿名函数应用到RDD中的所有元素上. 键值对RDD的转化操作
键值对RDD也就是PairRDD,它的记录由键和值组成.键可以是整型或者字符串对象,也可以是元组这样的复杂对象.而值可以是标量值,也可以是列表、元组、字典或集合等数据结构。PairRDD及其成员函数是Spark函数式编程中不可或缺的.这些函数大致分为如下:
- 字典函数
- 函数式转化操作
- 分组操作、聚合操作与排序操作
- 连接操作
keys()
RDD.keys()
keys()函数返回键值对RDD中所有键组成的RDD,或者说是由键值对RDD中每个二元组中的第一个元素组成的RDD.1
2kvpairs=sc.parallelize([('city','Hayward'),('state','CA'),('zip',94541),('country','USA')])
kvpairs.keys().collect()values()
RDD.values()
values()函数返回键值对RDD中所有值组成的RDD,或者说是由键值对RDD中每个二元组的第二个元素组成的RDD.1
2kvpairs=sc.parallelize([('city','Hayward'),('state','CA'),('zip',94541),('country','USA')])
kvpairs.values().collect()keyBy()
RDD.keyBy()
转化操作keyBy()创建出由从RDD中的元素里提取的键与值组成的元组,其中参数给定的函数将原元素转化为输出元素的键,而原来的整个元组是输出的值. - mapValues()
RDD.mapValues()
转化操作mapValues()把键值对RDD的每个值都传给一个函数(通过参数指定的具名函数或匿名函数),而键保持不变.与更一般化的等价物map()类似,mapValues()对于每个输入元素输出一个元素.原RDD的分区方式不会改变. - flatMapValues()
RDD.flatMapValues()
转化操作flatMapValues()把键值对RDD的每个值都传给一个函数,而键保持不变,并生成拍平的列表.它和之前介绍的flatMap()非常相似,对于每个输入元素,返回0个乃至很多个输出元素 - groupByKey()
RDD.groupBykey(numPartitions=None,partitionFunc=)
转化操作groupByKey()将键值对RDD按各个键对值进行分组,把同组的值整合成一个序列
参数numPartitions指定要创建多少个分区(也就是多个分组).分区使用partitionFunc参数的值创建,默认值为Spark内置的哈希分区函数.如果numPartitions为默认值None,就使用系统默认的分区数. - reduceByKey()
RDD.reduceByKey(,numPartitions=None,partitionFunc= )
转化操作reduceByKey()使用满足结合律的函数合并键对应的值.调用键值对数据集的reduceByKey()方法,返回的是键值对的数据集,其数据按照键聚合了对应的值,这个函数表示如下:求平均值不是满足结合律的操作.我们可以通过创建元组来绕过去,元组中包含每个键对应的值的总和与每个键对应的计数,这两个都满足交换律和结合律,然后在最后一步计算平均值.
注意reduceByKey()比较高效,是因为它在每个执行器本地对值进行了先行组合,然后把组合后的列表发送到远程的执行器来执行最后的结果姐u但。这是一个会产生数据混洗的操作. foldByKey()
RDD.foldByKey(zeroValue,,numPartitions=None,partitionFunc= )
转化操作foldByKey()在功能上和前面讨论的行动操作fold类似,但是foldByKey()是转化操作,操作预先定义的键值对元素.foldByKey()和fold()都提供了相同数据类型的zeroValue参数供RDD为空时使用.
提供的函数是如下所示的一般聚合函数形式:sortBykey()
RDD.sortByKey(ascending=True,numPartitions=None,keyfunc=)
转化操作sortByKey()把键值对RDD根据键进行排序。排序依据取决于键对象的类型,比如数值型对象会按照数值大小排序.该操作与之前介绍的sort()的区别之处在于sotr()要求指定排序依据的键,而sortByKey()的键值对RDD里定义的。
键按照ascending 参数提供的顺序进行排序,该参数默认值为True,表示升序.参数numPartitions指定了输出多少分区,分区函数为范围分区函数.参数keyfunc是一个可选参数,可以通过对原键使用另一个函数而修改原键.
keyfunc=lambda k: k.lower()
连接操作
连接操作对应于SQL编程中常见的JOIN操作.连接函数基于共同的字段(连接键)来组合两个RDD中的记录.因为Spark中的连接函数要求定义键,因此需要操作键值对RDD
- 连接(join)操作两个不同的数据集,对每个数据集中各有一个字段被选为键(连接键).数据集按照指定的顺序来指代.例如,指定的第一个数据集为左实体或者左数据集,第二个数据集则是右实体或者右数据集
- 内连接,通常被称为”连接”(“内连接”是连接的默认行为),返回同时存在指定键的两个数据集中的所有元素或记录
- 外连接,不要求两个数据集中的键一定要匹配.外连接分为左外连接、右外连接、全外连接
- join()
RDD.join(,numPartitions=None)
转化操作join()是内连接的一个实现
在大多数情况下,连接涉及的RDD不止一个分区,此时连接操作会导致数据混洗.Spark一般会自动计划并实现连接操作,来获取最好的性能.然而,有一条公理需要牢记:”用小表去连接大表” - leftOuterJoin()
RDD.leftOuterJoin(,numPartitions=None)
转化操作leftOuterJoin()返回第一个RDD中包含的所有元素和记录.如果第一个RDD(左RDD)中的键在右RDD中存在,那么右RDD中匹配的记录会和左RDD的记录一起返回.否则,右RDD的记录为None(空) - rightOuterJoin()
RDD.rightOuterJoin(,numPartitions=None)
转化操作rightOuterJoin()返回第二个RDD中所有的元素或者记录.如果第二个RDD(右RDD)中包含的键在左RDD中存在,则左RDD的记录也会和右RDD的记录一起返回.否则,左RDD的记录为None(空). fullOuterJoin()
RDD.fullOuterJoin(,numPartitions=None)
fullOuterJoin无论是否有匹配的键,都会返回两个RDD中的所有元素.左数据集或者右数据集中没有匹配的元素都用None(空)来表示.cogroup()
RDD.cogroup(,numPartitions=None)
转化操作cogroup()将多个键值对数据集按键进行分组.他在概念上和fullOuterJoin有些类似,但在实现上有以下几点关键区别:
- 转化操作cogroup()返回可迭代对象,类似前面讲的groupByKey()函数
- 转化操作cogroup()将两个RDD中的多个元素进行组合,而fullOuterJoin()则对同一个键创建出多个分开的输出元素
- 转化操作cogroup()可以通过scala API或者函数别名groupWith()对三个以上的RDD进行分组.
- cartesian()
RDD.cartesian()
转化操作cartesian()即笛卡儿积,有时也被口语化地称为交叉连接,它会根据两个RDD的记录生成所有可能的组合.该操作生成的记录条数等于第一个RDD的记录条数乘以第二个RDD的记录条数.
集合操作
类似数学上的集合操作。
- union()
RDD.union() - intersection()
RDD.intersection() - subtract()
RDD.subtract() - subtractByKey()
RDD.subtractByKey(,numPartitions=None) 数值型RDD的操作
- min()
RDD.min(key=None) - max()
RDD.max(key=None) - mean()
RDD.mean() - sum()
RDD.sum() - stdev()
RDD.stdev()
计算标准差 - variance()
RDD.variance()
求方差 - stats()
RDD.stats()
stats()返回StatCounter对象,一次调用即可获得这样一个包括count()、mean()、stdev()、max()以及min()的结构.