Spark(四)

Spark核心API高级编程

Spark中的共享变量

Spark API提供了两种在Spark集群中创建和使用共享变量(就是Spark集群中不同的工作节点都可以访问或者修改的变量)的机制.这两种机制是广播变量和累加器

广播变量

广播变量是由Spark驱动器程序设置的只读变量,可供Spark集群内的工作节点访问,也就是说工作节点上的执行器内运行的所有任务都可以访问这些变量.广播变量是由驱动器设置后,就只能读了.广播变量通过一种高效的点到点(P2P)共享协议在工作节点间共享,这种协议来自BitTorrent.相比简单地从Spark驱动器直接向所有执行器进程推送这些变量,P2P的方式具有更好的伸缩性.

广播变量在SparkContext内创建,然后再该Spark应用的环境中可以作为对象进行访问

  1. broadcast()
    1
    sc.broadcast(value)

broadcast()方法会在指定的SparkContext内创建出一个Broadcast对象实例.value参数可以是任意的Python对象,他会把序列化并封装到Broadcast对象中.在创建完成后,这些变量可以被应用内所有运行的任务访问.

1
2
stations=sc.broadcast({'83':'Mezes Park','84':'Ryland Park'})
stations

也可以基于文件内容创建广播变量,不论文件在本地、网络,还是在分布式文件系统上.

1
2
3
4
stationsfile='/opt/spark/data/stations.csv'
stationsdata=dict(map(lambda x:(x[0],x[1]),map(lambda x:x.split(','),open(stationsfile))))
stations=sc.broadcast(stationsdata)
stations.value['83']

上面展示了如何从csv文件(stations.csv)创建广播变量,这个文件包含由站点ID和站点名称组成的键值对字典.有了广播变量,你就可以在任何map()或者filter()这样的RDD操作中访问该字典了.

  1. unpersist()
    1
    Broadcast.unpersist(blocking=False)

Broadcast对象的unpersist()方法用来把广播变量从集群中所有保存该广播变量的工作节点的内存中移除.
布尔类型的blocking参数指定该操作是堵塞直至变量已经从所有节点删除,还是作为异步非堵塞操作执行.如果你希望立刻释放内存,应该把这个参数设置为True

1
2
3
stations=sc.broadcast({'83':'Mezes Park','84':'Ryland Park'})
stations.value['84']
stations.unpersist()

还有几个Spark配置项和广播变量有关
spark.broadcast.compress:指定在向工作节点传送广播变量时是否先进行压缩,默认为True
spark.broadcast.factory:指定使用何种广播实现
spark.broadcast.blockSize:指定广播变量每个数据块的大小
spark.broadcast.port:指定驱动器的HTTP广播服务器要监听的端口

广播的优点是:有时我们要把两个数据集组合起来获取结果数据集。
记住广播变量的下列要点:

  • 使用广播变量避免数据混洗操作
  • 广播变量使用了一种高效而伸缩性强的点到点分发机制
  • 每个工作节点只会复制一次数据,而不是每个任务复制一次.一个Spark应用的任务可能数以千计,所以每个任务复制一次的代价不容小觑.
  • 广播变量可以被多个任务多次重用
  • 广播变量是序列化过的对象,因此可以高效读取

累加器

Spark中另一种共享变量是累加器.和广播变量不同的是,你可以更新累加器。具体地说,累加器是可以增长的数值.
可以把累加器看作Spark编程中的通用型计数器.累加器让开发者可以在程序运行时聚合各种值.
累加器由驱动器程序设置,可以由相应的SparkContext中运行任务的执行器更新.驱动器程序可以读回累加器的最终结果,而这通常发生在程序结束时.

在Spark应用中,每成功完成一个任务只能更新一次累加器.工作节点把对累加器的更新量发送回驱动器程序,而只有驱动器程序可以读取累加器的值.累加器可以使用整型或浮点型的数值.

1
2
3
4
5
6
7
def addone(x):
global acc
acc+=1
return x+1
myrdd=sc.parallelize([1,2,3,4,5])
myrdd.map(lambda x:addone(x)).collect()
print("records processed: "+str(acc.value))

  1. accumulator()
    1
    sc.accumulator(value,accum_param=None)

accumulator()方法在指定的SparkContext中创建出Accumulator对象的一个实例,并把它根据value参数给定的初始值初始化。accum_param参数可以用来定义自定义累加器

  1. value()
    1
    Accumulator.value()

value()方法用来获取累加器的值.

  1. 自定义累加器
    在SparkContext中创建的标准累加器支持原生数值类型,例如int和float.而自定义累加器可以对标量数值以外的数据类型执行聚合操作.自定义累加器需要通过Accumulatorparam辅助对象创建.唯一的要求是执行的操作需满足交换律和结合律,也就是说改变操作顺序或改变前后次序都不会影响结果.
    要使用自定义累加器,需要扩展AccumulatorParam类为一个自定义类。这个类需要包含两个特殊的成员函数:一个是addInPlace(),定义如何操作自定义累加器对应数据类型的两个对象获得新的结果;另一个是zero(),提供对应类型的”零值”,比如map类型的”零值”是空的map

  2. 累加器的用处
    累加器一般用于运维场景,比如统计所处理的记录总数,或者跟踪错误记录的条数.你也可以用它们对记录的类型进行大致的计数,比如在映射处理日志事件时,统计发现的各种响应码的数量.

    Spark中的数据分区

    大多数情况下,分区对Spark处理不可或缺。高效的分区可以把应用性能提高几个数量级.反过来,低效的分区会导致程序无法跑完,过大的分区会引起执行器内存不足的错误等问题

分区概述

RDD的转化操作创建的分区数一般是可以配置的。不过Spark还有一些默认行为需要我们了解。
在使用HDFS时,Spark会把每个数据块(HDFS中一个数据块一般是128MB)作为一个RDD分区

1
2
myrdd=sc.textFile("hdfs:///dir/filescontaining10blocks")
myrdd.getNumPartitions()

groupByKey()、reduceByKey()等一系列操作都会导致数据混洗,而且没有指定num-Partitions值,这些操作产生的分区数等于配置项spark.default.parallelism对应的值.
如下:

1
2
3
4
配置spark.default.parallelism=4
myrdd=sc.textFile("hdfs:///dir/filescontaining10blocks")
mynewrdd=myrdd.flatMap(lambda x:x.split()).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
mynewrdd.getNumPartitions()

如果spark.default.parallelism配置参数没有设置,那么转化操作产生的分区数与当前RDD谱系中的上游RDD的最大分区数相等.

1
2
3
4
#不配置spark.default.parallelism的值
myrdd=sc.textFile("hdfs:///dir/filescontaining10blocks")
mynewrdd=myrdd.flatMap(lambda x:x.split()).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
mynewrdd.getNumPartitions()

Spark使用的默认分区方式类是HashPartitioner,它把所有的键以确定性的哈希方法求哈希值,然后使用键的哈希值创建出一组大致均衡的桶.目的是根据键把数据均匀地分到制定数量的分区中.

filter()等一些Spark转化操作不允许用户改变所产生的RDD的分区行为.比如,如果你对一个有4个分区的RDD使用filter()函数,所产生的过滤后的新RDD仍然为4个分区,分区方式也和原RDD一样(也就是哈希分区)

掌控分区

一个RDD应该有多少分区?对于这个问题来说,分区过多或过少都会导致问题,如果分区过少,则单个分区过大,可能导致执行器内存不足.而小分区过多也不好,因为输入集即使规模很小,也会生成多任务.大小分区混合会在打开了预测执行的情况下导致发生不必要的预测执行.预测执行是集群调度器对于执行较慢的进程的抢占机制.如果Spark应用中少数进程慢的根本原因是低效的分区,那么预测执行在这种情况下也无济于事.

filter()操作对每个输入分区一对一地创建一个新分区,把满足过滤条件地记录写入对应地新分区里.这会导致一些分区中的数据明显少于其他分区,造成不好地结果,比如数据倾斜,这可能在后续阶段中引起预测执行和性能表现欠佳.

在这种场景下,你可以使用Spark API中提供的重分区方法,包括partitionBy()、coalesce()、repartition()以及repartitionAndSortWithinPartitions()
这些函数接收已分区的输入RDD,创建分区数为n的新RDD,这里的n可以比原始的分区数更多,也可以更少.

重分区函数

  1. partitionBy()
    1
    RDD.partitionBy(numPartitions,partitionFunc=portable_hash)

partitionBy()方法返回的RDD包含的数据与输入RDD相同,但是分区数变成了numPartitions

1
2
3
4
kvrdd=sc.parallelize([(1,'A'),(2,'B'),(3,'C'),(4,'D')],4)
kvrdd.getNumPartitions()
kvrdd.partitionBy(2).getNumPartitions()
#返回2

partitionBy()函数也会被其他函数调用,比如sortByKey()就会使用rangePartitioner函数而不是portable_hash函数来调用partitionBy().rangePartitioner把记录根据键排序,分入大小均匀的范围分区.这是哈希分区的一种替代方法.

  1. repartition()
    1
    RDD.repartition(numPartitions)

repartition()方法返回的RDD包含的数据与输入RDD相同,分区数与numPartitions指定的完全一样.repartition()方法会引起数据混洗,并且它不像partitionBy()可以改变分区函数,也就是无法改变分区依据.repartition()方法也允许创建比输入RDD更多的分区数.

1
2
kvrdd=sc.parallelize([(1,'A'),(2,'B'),(3,'C'),(4,'D')],4)
kvrdd.repartition(2).getNumPartitions()

  1. coalesce()
    1
    RDD.coalesce(numPartitions,shuffle=False)

coalesce()方法返回的RDD的分区数由numPartitions参数指定.coalesce()方法也允许用户用布尔类型的shuffle参数控制是否触发数据混洗.coalesce(n,shuffle=True)操作等价于repartition(n)
coalesce()方法是对repartition()优化的实现.不过,与repartition()不同的是,coalesce()让用户能更多地控制混洗行为,同时在很多情况下允许避免数据移动.另外,coalesce()只允许使用比输入RDD更少的目标分区数,这也和repartition()不同.

1
2
3
kvrdd=sc.parallelize([(1,'A'),(2,'B'),(3,'C'),(4,'D')],4)
kvrdd.coalesce(2,shuffle=False).getNumPartitions()
#返回2

  1. repartitionAndSortWithinPartitions
1
RDD.repartitionAndSortWithinPartitions(numPartitions=None,partitionFunc=portable_hash,ascending=True,keyfunc=<lambda function>)

repartitionAndSortWithinPartitions()方法把输入RDD根据partitionFunc参数指定的函数,重新分区为numPartitions参数指定的分区数.在生成的每个分区中,记录根据键按照keyfunc参数定义的函数和ascending参数定义的顺序排序.

针对分区的API方法

Spark中的许多方法都是把分区作为原子单位进行交互的,这些方法中既有行动操作,也有转化操作.

  1. foreachPartition()
    RDD.foreachPartition(func)
    foreachPartition()方法是一个行动操作,它类似于行动操作foreach(),会把func参数指定的函数应用到RDD的每个分区.

    1
    2
    3
    4
    5
     def f(x):
    for rec in x:
    print(rec)
    kvrdd=sc.parallelize([((1,99),'A'),((1,101),'B'),((2,99),'C'),((2,101),'D')],2)
    kvrdd.foreachPartition(f)

    牢记foreachPartition()是行动操作,而不是转化操作,因此它会触发对输入RDD及其整个谱系的计算.另外,这个函数还会导致数据传输到驱动器端,因此在运行这个函数时需要注意最终RDD的数据量。

  2. glom()

    1
    RDD.glom()

    glom()方法把RDD的每个分区中的元素合并为一个列表,以新的RDD返回.这个方法可以用于以列表的形式查看RDD分区

  3. lookup()

    1
    RDD.lookup(key)

    lookup()方法返回RDD中与key参数指定的键相匹配的数据的列表.如果操作的RDD的分区方式是已知的,那么lookup()会利用它来收紧对键所属的分区的搜索.

  4. mapPartitions()

    1
    RDD.mapPartitions(func,preservesPartitioning=False)

    mapPartitions()方法将func参数指定的方法用于输入RDD的每个分区,返回一个新的RDD.

    1
    2
    3
    4
    kvrdd=sc.parallelize([(1,'A'),(1,'B'),(2,'C'),(2,'D')],2)
    def f(iterator): yield [(b,a) for (a,b) in iterator]
    kvrdd.mapPartitions(f).collect()
    #返回[[('A',1),('B',1)],[('C',2),('D',2)]]

    mapPartitions()方法的一大优势在于它只对每个分区使用一次指定函数,而不是每个元素一次.如果创建这个函数有很大的额外开销,那么使用这种方式就好得多.

RDD的存储选项

回顾RDD谱系

Spark会把程序的执行过程以DAG(有向无环图)的形式进行规划,DAG把参数按阶段和阶段依赖进行了划分.像map()这样的一些操作可以完全地并发执行,而像reduceByKey()这样的一些操作则需要数据混洗.这样自然而然就引入了阶段依赖.
Spark驱动器记录着每个RDD的谱系,也就是为生成一个RDD或其分区所需的一系列转化操作。这使得每个RDD的每个姐u但都可以在发生故障时进行重算,提供了弹性分布式数据集的所谓弹性.

RDD存储选项

不论Spark集群部署在YARN、独立集群还是Mesos上,RDD都以分区的形式存储在集群中不同的工作节点上.
RDD存储级别
(1)MEMORY_ONLY:仅把RDD分区存储在内存中.这是存储级别的缺省值
(2)MEMORY_AND_DISK:把内存里存不下的RDD分区存储在硬盘上
(3)MEMORY_ONLY_SER:把RDD分区以序列化的对象的形式存储在内存中.使用这个选项可以节省内存,因为序列化的对象会比未序列化的对象占用更少的空间
(4)MEMORY_AND_DISK_SER
:把RDD分区以序列化的对象的形式存储在内存里.内存中放不下的对象溢写到硬盘上.
(5)DISK_ONLY:仅把RDD分区存储在硬盘上
(6)OFF_HEAP:把RDD分区以序列化的对象的形式存储在内存里.该选项要求使用堆外内存.注意这个存储选项仅供试验性目的使用.

  1. 存储级别标记值
    存储级别是由一组控制RDD存储的标记值实现的.这些标记值决定是否使用内存、是否在内存放不下时溢写到硬盘、是否以序列化的形式存储对象,还有是否把RDD分区复制到多个节点上.这些标记值在StorageLevel的构造函数中实现.

    1
    StorageLevel(useDisk,useMemory,useOffHeap,deserialized,replication=1)
  2. getStorageLevel()

    1
    RDD.getStorageLevel()

Spark API包含一个名为getStorageLevel()的函数,你可以用它查看RDD的存储级别

  1. 选择存储级别
    RDD存储级别让用户可以调优Spark作业,并且可以容纳集群所有内存都放不下的大规模操作.此外,可用存储级别的复制选项可以减少任务或节点发生故障时的恢复时间。
    一般来说,如果RDD能保存在集群的可用内存中,那么使用默认的仅使用内存的存储级别就足够了,所提供的性能也是最好的.

RDD缓存

Spark里的RDD及其所有父RDD会在同一个会话或应用中每次调用行动操作时重新计算.缓存RDD会把数据持久化到内存中.当后续调用行动操作时,其他要用到这个RDD的计算就可以多次重用缓存,而无需重新计算.
缓存不会触发执行或计算.实际上,缓存更像是一种建议.如果没有足够的内存可以缓存RDD,RDD还会再每次有行动操作触发时计算整个谱系.缓存不会溢写到硬盘,因为缓存只使用内存.缓存的RDD使用存储级别MEMORY_ONLY持久化.

1
2
3
4
5
6
doc=sc.textFile("file:///opt/spark/data/shakespeare.txt")
words=doc.flatMap(lambda x:x.split()).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
words.cache()
words.count()
words.take(3)
words.count()

持久化RDD

缓存的分区,也就是调用了cache()方法的RDD的分区,存储在Spark工作节点上的执行器的JVM中.如果有一个工作节点要宕机或变为不可用状态,Spark需要从对应RDD的谱系重新计算缓存的分区
persist()方法提供了其他的存储选项,包括MEMORY_AND_DISK、DISK_ONLY、MEMORY_ONLY_SER、MEMORY_AND_DISK_SER,还有与cache()方法一样的MEMORY_ONLY.在使用任意一种需要硬盘的存储选项时,持久化的分区会以本地文件的形式,存储在运行对应应用的Spark执行器的工作节点上.你可以使用持久化在硬盘上的数据,重建出因执行器或内存出故障而丢弃的分区.

另外,persist()可以使用备份,把同一个分区持久化到多个节点.备份可以进一步减少重新计算的发生,因为重新计算的触发需要不止一个节点发生故障或进入不可用状态.
持久化在提供了比缓存更高的耐用性的同时,依旧提供了性能提升.不论Spark中的RDD是否持久化,它们都是容错的,总是可以在发生故障时重建.持久化只是加快了重建的过程.
持久化和缓存一样只是一个建议,只会在有行动操作触发该RDD计算时才真正发生.如果没有足够的资源,持久化就不会发生,比如在内存不足时.

  1. persist()
    1
    RDD.persist(storageLevel=StorageLevel.MEMORY_ONLY_SER)

persist()方法指定了RDD所需的存储级别和存储属性.在RDD第一次被计算时,所需的存储选项才真正达成.如果无法实现这样的存储级别,比如如果要把RDD持久化在内存中而恰好内存不足,那么Spark会回到没有持久化时的那种仅仅在内存中保留所需分区的行为.

  1. unpersist()
    1
    RDD.unpersist()

unpersist()方法”解除持久化”RDD.当某个RDD不再需要持久化时调用这个方法.还有,如果你想要改变一个已经持久化的RDD的存储选项,你必须先解除这个RDD的持久化.如果尝试改变已经标记为持久化的RDD的存储级别,则会遇到异常提示.

选择合适持久化或缓存RDD

缓存可以提高性能,减少恢复时间。如果一个RDD可能重复使用,并且集群的工作节点上有充足的内存,缓存这些RDD一般会有不错的效果.机器学习中经常使用迭代算法,这种算法就很适合缓存.
缓存减少了发生故障的恢复时间,因为需要重算的RDD可以从缓存住的RDD开始计算.然而,如果想获得更高一级的耐用性,可以考虑使用一种基于硬盘的持久化选项,或是更高的备份级别,这都可以提高RDD在Spark集群上的某处存在持久化的备份的可能性.

保存RDD检查点

保存检查点会把数据保存到文件里.和刚刚介绍的基于硬盘的持久化选项的不同之处在于,这种持久化会在Spark驱动器程序完成时删除持久化的RDD数据,而检查点保存的数据在应用结束后依然保存着.
检查点机制让Spark可以不再维护RDD谱系,因为谱系过长时会导致很多问题,例如在流式处理或迭代处理应用中就可能发生.长长的谱系一般会导致很长的恢复时间,并且可能导致栈溢出.
把数据的检查点保存到HDFS这样的分布式文件系统上,也可以让我们获得更好的存储容错性.保存检查点的代价较大,因此在要保存RDD检查点时,三思而后行.
与缓存和持久化选项一样,检查点也只会在有例如count()这样的行动操作触发了该RDD的计算时真正保存.

  1. setCheckpointDir()
    1
    sc.setCheckpointDir(dirName)

setCheckpointDir()方法设置把RDD的检查点存到哪个目录内.如果在Hadoop集群上运行Spark,dirName参数指定的目录需要是一个HDFS路径

  1. checkpoint()
    1
    RDD.checkpoint()

checkpoint()方法把RDD标记为需要保存检查点.在执行第一个用到该RDD的行动操作时,它的检查点就会保存下来,而文件保存的目录是通过setCheckpointDir()方法设置的.
当检查点保存完毕时,包括所有引用该RDD和父RDD的完整的RDD谱系都会被删除.

  1. isCheckpointed()
    1
    RDD.isCheckpointed()

isCheckpointed()函数返回一个布尔值,表示该RDD是否被设置了检查点.

  1. getCheckpointFile()
    1
    RDD.getCheckpointFile()

getCheckpointFile()函数返回RDD检查点所保存的文件的文件名.

使用外部程序处理RDD

Spark提供了一种使用Spark原生支持的语言(Scala、Python以及Java)之外的其他语言运行函数(转化操作)的机制.你可以使用Ruby、Perl或Bash,还有其他语言.也不一定需要使用脚本,如果要在Spark里使用C或者FORTRAN也是可以的.
Spark程序中使用已有的而Python、Scala或Java中不存在的代码库,而不是使用Spark原生语言重新实现一遍.
可以通过pipe()函数实现在Spark中使用外部程序.

1
RDD.pipe(command,env=None,checkCode=False)

pipe()方法返回一个RDD,它通过把输入RDD的元素通过“管道”传给command参数指定的外部进程,获取新RDD的对应元素.参数env是一个由环境变量组成的字典对象,默认值为None.checkCode参数指定是否检查shell命令的返回值.

使用Spark进行数据采样

在使用Spark进行开发和探索时,你可能想要在对整个输入数据集执行处理之前,对RDD中的数据先进行采样.Spark的API提供了几个可以用于采样RDD,生成包含采样数据的新RDD的函数。这些采样函数包括返回新RDD的转换操作和返回数据到Spark驱动器程序的行动操作.

  1. sample()
    1
    RDD.sample(withReplacement,fraction,seed=None)

转化操作sample()根据原RDD整体数据集的百分比,创建出由采样数据子集组成的新RDD.
参数withReplacement是布尔型的值,有放回还是无放回采样
参数fraction是一个在0和1之间的双精度浮点数,代表一个元素被选中的概率

1
2
3
4
doc=sc.textFile("file:///opt/spark/data/shakespeare.txt")
doc.count()
sampled_doc=doc.sample(False,0.1,seed=None)
sampled_doc.count()
  1. takeSample()
    1
    RDD.takeSample(withReplacement,num,seed=None)

行动操作takeSample()从被采样的RDD中返回一个随机的值列表.
参数num是随机选择的返回记录的条数
参数withReplacement和seed的行为与刚才介绍的sample()函数中的类似

-------------本文结束感谢您的阅读-------------