Spark(一)

初识Spark

Spark是一个开源的强大的分布式查询和处理引擎.他提供了MapReduce 的灵活性和可扩展性,但速度明显更高:当数据存储在内存中时,他比Hadoop快100倍,访问磁盘时高达10倍。

Spark允许用户读取、转换、聚合数据,还可以轻松地训练和部署复杂地统计模型.

pandas处理千万级以下的数据还是很快的,差不得一个G的数据量

如果追求更好的体验,就要用spark,pyspark可以满足需求,类似于sql语句的操作,主要应用的数据类型还是dataframe,处理方法和pandas类似,因为是集群处理,所以速度快,数据存储和读取都存在hadoop实现的hdfs上,主要文件形式,是csv文件

Spark作业和API

执行过程

任何Spark应用程序都会分离主节点上的单个驱动进程(可以包含多个作业),然后将执行进程(包含多个任务)分配给多个工作节点.

驱动进程会确定任务进程的数量和组成,这些任务进程是根据为指定作业生成的图像分配给执行节点.(注意:任何工作节点都可以执行来自多个不同作业的多个任务)
Spark作业与一系列对象依赖相关联,这些依赖关系是以有有向无环图(DAG)的方式组织.

Pyspark原理

转自:https://blog.csdn.net/pelick/article/details/38307631
这是前段时间在看spark的python支持的时候,简单过了一下pyspark里的python代码,整理了一个大致流程。虽然几乎不会python,但基本上能看懂pyspark是怎么让不同虚拟机之间传输数据的、如何在python环境调用java类的、pyspark SDK的丰富程度取决于什么、需要做些什么流程和封装等。
pic1
(Py4J 是一个用 Python 和 Java 编写的库。通过 Py4J,Python程序 能够动态访问 Java虚拟机 中的 Java对象,Java程序 也能够回调 Python对象。)

在python driver端,SparkContext利用Py4J启动一个JVM并产生一个JavaSparkContext。Py4J只使用在driver端,用于本地python与java SparkContext objects的通信。大量数据的传输使用的是另一个机制。

RDD在python下的转换会被映射成java环境下PythonRDD。在远端worker机器上,PythonRDD对象启动一些子进程并通过pipes与这些子进程通信,以此send用户代码和数据。

大致选择

java_gateway.py里启动了py4j.JavaGateWay,并从java里导入了所需要的主要类,

pic2
python能通过py4j访问jvm的前提是,jvm开启了GatewayServer,而在core工程的deploy工程下,PythonRunner单例里启动了GatewayServer。可能可以理解为py4j是基于socket的一套简单封装了调用java类和方法的协议吧,而且走的本地不同端口。

py4j的包为$SPARK_HOME/python/lib/py4j-0.8.1-src.zip,里面是py4j源码的几个类。

上述java_gateway的launch_gateway()方法是在context.py初始化的时候调用。

context.py初始化的时候,把SparkContext和其部分主要方法加入到了python环境中,所以大多数的调用都是通过py4j直接调用java的类。java的类主要是指core项目里的java api里的内容。序列化采用了cPickle库的PickleSerializer。
像python下使用spark sql的话,在sql.py里,从jvm里获取了SQLContext/HiveContext类,从而得到spark sql里的关键方法。

另一方面,worker.py里,worker启动的时候会起一个socket,从socket里,可以获取工作目录名字;可以获取PYTHONPATH下的其他要引入的.zip或.egg文件,将其加到file_dir里,这里的反序列化使用的是UTF8Deserializer;可以获取广播的变量,这里的反序列化使用的是PickleSerializer。

在daemon.py里,通过分配socket端口,启动POOLSIZE个worker,(里面还有很多其他细节),使用os.fork的方式创建子进程来启动。

在rdd.py里,声明了rdd的很多action和transformations,有些操作会触发数据在python worker上的传输。

传输大量数据的时候,Py4J很慢,因为socket.readline()很低效。传输的时候,把数据(序列化后)dump成一个文件。后续把这个文件反序列化回来后,可以转成python的类型和结构进行查看和输出(如collect),也可以调用PythonRDD的asJavaRDD方法(如PipelinedRDD计算时),在各个worker上启动python进程执行反序列化之后的函数,通过管道与python进程进行通信,最后得到JavaRDD.

管道传输利用的是Popen,这样做标准输入

过了一下pyspark是怎么让不同虚拟机之间传输数据并在python环境调用java类的,两者使用的是不同的网络实现方式。

弹性分布式数据集(RDD)

RDD是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现。
RDD是Spark最核心的东西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现。

RDD必须是可序列化的。RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作。这对于迭代运算比较常见的机器学习算法, 交互式数据挖掘来说,效率提升比较大。

RDD特性

RDD是分布式只读且已分区集合对象。这些集合是弹性(容错)的,如果数据集一部分丢失,则可以对它们进行重建。具有自动容错、位置感知调度和可伸缩性,而容错性是最难实现的,大多数分布式数据集的容错性有两种方式:数据检查点和记录数据的更新。对于大规模数据分析系统,数据检查点操作成本很高,主要原因是大规模数据在服务器之间的传输带来的各方面的问题,相比记录数据的更新,RDD 也只支持粗粒度的转换,也就是记录如何从其它 RDD 转换而来(即 Lineage),以便恢复丢失的分区。

  • 数据存储结构不可变
  • 支持跨集群的分布式数据操作
  • 可对数据记录按key进行分区
  • 提供了粗粒度的转换操作
  • 数据存储在内存中,保证了低延迟性
    我们需要注意的是这里弹性的概念,与粗粒度概念,弹性就是对于丢失的数据集,可以很快的重建,而在容错的下面,记录数据的更新是只记录数据集的来源RDD ,并不会记录具体转换过程之类的细节,这里还要提到的概念就是Lineage血缘关系

    RDD的好处

  • RDD只能从持久存储或通过Transformations操作产生,相比于分布式共享内存(DSM)可以更高效实现容错,对于丢失部分数据分区只需根据它的lineage就可重新计算出来,而不需要做特定的Checkpoint。
  • RDD的不变性,可以实现类Hadoop MapReduce的推测式执行。
  • RDD的数据分区特性,可以通过数据的本地性来提高性能,这与Hadoop MapReduce是一样的。
  • RDD都是可序列化的,在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上,这时性能会有大的下降但不会差于现在的MapReduce。

RDD编程接口

对于RDD,有两种类型的动作,一种是Transformation,一种是Action。它们本质区别是:
Transformation返回值还是一个RDD。它使用了链式调用的设计模式,对一个RDD进行计算后,变换成另外一个RDD,然后这个RDD又可以进行另外一次转换。这个过程是分布式的
Action返回值不是一个RDD。它要么是一个Scala的普通集合,要么是一个值,要么是空,最终或返回到Driver程序,或把RDD写入到文件系统中

所以我可以根据算子的返回类型来判断这个算子是Transformation还是action:
Transformations转换操作,返回值还是一个 RDD,如 map、 filter、 union;
Actions行动操作,返回结果或把RDD持久化起来,如 count、 collect、 save。

某种意义上来说,RDD转换操作是惰性的,因为他们不立即计算其结果。只有动作执行了并且需要将结果返回给驱动程序时,才会计算转换。

DataFrame

DataFrame也是分布式的数据集,但是DataFrame更像是一个传统的数据库里面的表,他除了数据之外还能够知道更多的信息,比如说列名、列值和列的属性,这一点就和hive很类似了,而且他也能够支持一些复杂的数据格式。从API应用的角度来说DataFrame提供的API他的层次更高,比RDD编程还要方便,学习的门槛更低。下面举个例子进行对比:
pic3

上图直观地体现了DataFrame和RDD的区别。左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。DataFrame多了数据的结构信息,即schema。RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。

sparksql执行的时候就能了解到更多的信息,sparksql的查询优化器(Catalyst)能够更好的优化,比如你只是想要那么字段,它仅仅只需要把那么那一列取出来就可以了,age和height根本就不需要去读取了,有了这些信息以后在编译的时候能够做更多的优化,比如filter下推、裁剪等。
使用RDD的方式如果你用java/Scala那你需要运行在jvm上,python就是python runtime,所以性能上会有所差别。
但是DataFrame底层采用同一个优化器,在性能上都是一样的。

Dataset

Spark1.6引入的Spark Dataset旨在提供一个API,允许用户轻松地表达域对象地转换,同时还提供了具有强大性能和优点的Spark SQL执行引擎。

Dataset可以认为是DataFrame的一个特例,主要区别是Dataset每一个record存储的是一个强类型值而不是一个Row。因此具有如下三个特点:

  1. DataSet可以在编译时检查类型
  2. 并且是面向对象的编程接口。
  3. 后面版本DataFrame会继承DataSet,DataFrame是面向Spark SQL的接口。

Catalyst优化器

Spark SQL的核心是Catalyst优化器.优化器基于函数式编程结构,并且旨在实现两个目的:简化向Spark SQL添加新的优化技术和特性的条件,并允许外部开发人员扩展优化器。

RDD、DataFrame和Dataset怎么选择还好

RDD

从一开始RDD就是Spark提供的面向用户的主要API。从根本上来说,一个RDD就是你的数据的一个不可变的分布式元素集合,在集群中跨节点分布,可以通过若干提供了转换和处理的底层API进行并行处理。

什么情况下使用RDD?
下面是使用RDD的场景和常见案例:

  1. 你希望可以对你的数据集进行最基本的转换、处理和控制;
  2. 你的数据是非结构化的,比如流媒体或者字符流;
    你想通过函数式编程而不是特定领域内的表达来处理你的数据;
  3. 你不希望像进行列式处理一样定义一个模式,通过名字或字段来处理或访问数据属性;
  4. 你并不在意通过DataFrame和Dataset进行结构化和半结构化数据处理所能获得的一些优化和性能上的好处;

DataFrame

与RDD相似,DataFrame也是数据的一个不可变分布式集合。但与RDD不同的是,数据都被组织到有名字的列中,就像关系型数据库中的表一样。设计DataFrame的目的就是要让对大型数据集的处理变得更简单,它让开发者可以为分布式的数据集指定一个模式,进行更高层次的抽象。它提供了特定领域内专用的API来处理你的分布式数据,并让更多的人可以更方便地使用Spark,而不仅限于专业的数据工程师。

pic4
在Spark 2.0中,DataFrame和Dataset的API将融合到一起,完成跨函数库的数据处理能力的整合。在整合完成之后,开发者们就不必再去学习或者记忆那么多的概念了,可以通过一套名为Dataset的高级并且类型安全的API完成工作。

Dataset

如下面的表格所示,从Spark 2.0开始,Dataset开始具有两种不同类型的API特征:有明确类型的API和无类型的API。从概念上来说,你可以把DataFrame当作一些通用对象Dataset[Row]的集合的一个别名,而一行就是一个通用的无类型的JVM对象。与之形成对比,Dataset就是一些有明确类型定义的JVM对象的集合,通过你在Scala中定义的Case Class或者Java中的Class来指定。
pic5
进一步可以认为:
DataFrame=Dataset[Row]

总结:Dataset/DataFrame/RDD的区别

(1)相同点:

  1. 都是分布式数据集
  2. DataFrame底层是RDD,但是DataSet不是,不过他们最后都是转换成RDD运行
  3. DataSet和DataFrame的相同点都是有数据特征、数据类型的分布式数据集(schema)
    (2)不同点:
  4. schema信息:
    (a)RDD中的数据是没有数据类型的
    (b)DataFrame中的数据是弱数据类型,不会做数据类型检查.虽然有schema规定了数据类型,但是编译时是不会报错的,运行时才会报错
    (c)DataSet中的数据类型是强数据类型
  5. 序列化机制:
    RDD和DataFrame默认的序列化机制是java的序列化,可以修改为Kyro的机制
    DataSet使用自定义的数据编码器进行序列化和反序列化

使用RDD的一般场景:

  • 你需要使用low-level的transformation和action来控制你的数据集;
  • 你的数据集非结构化,比如:流媒体或者文本流;
  • 你想使用函数式编程来操作你的数据,而不是用特定领域语言(DSL)表达;
  • 你不在乎schema,比如,当通过名字或者列处理(或访问)数据属性不在意列式存储格式;
  • 你放弃使用DataFrame和Dataset来优化结构化和半结构化数据集。

什么时候使用DataFrame或者Dataset?

  • 你想使用丰富的语义,high-level抽象,和特定领域语言API,那你可以使用DataFrame或者Dataset;
  • 你处理的半结构化数据集需要high-level表达,filter,map,aggregation,average,sum,SQL查询,列式访问和使用lambda函数,那你可以使用DataFrame或者Dataset;
  • 你想利用编译时高度的type-safety,Catalyst优化和Tungsten的code生成,那你可以使用DataFrame或者Dataset;
  • 你想统一和简化API使用跨Spark的Library,那你可以使用DataFrame或者Dataset;
  • 如果你是一个R使用者,那你可以使用DataFrame或者Dataset;
  • 如果你是一个Python使用者,那你可以使用DataFrame或者Dataset。

通过上面的分析,什么情况选择RDD,DataFrame还是Dataset已经很明显了。RDD适合需要low-level函数式编程和操作数据集的情况;DataFrame和Dataset适合结构化数据集,使用high-level和特定领域语言(DSL)编程,空间效率高和速度快。

弹性分布式数据集(RDD)

创建RDD

pyspark中,有两种方法可以创建RDD

  1. .parallelize(…)集合(元素list或array)

    1
    data=sc.parallelize([('Amber',22),('Alfred',23)])
  2. 要么引用位于本地或者外部的某个文件(或多个文件)

    1
    data_from_file=sc.textFile('/Users/drabast/VS14MORT.txt.gz',4)

sc.textFile(…,n)最后的参数n代表该数据集被划分的分区个数

Spark可以从多个文件系统中读取:如NTFS、FAT这类的本地文件系统,或者MacOS Extended(HFS+),或者如HDFS、S3、Cassandra这类的分布式文件系统,还有其他各类的文件系统.

根据数据读取方式的不同,持有的对象将以略有不同的方式表示.从文件中读取的数据表示为MapPartitionsRDD,而不是使用.paralellize(…)方法对一个集合进行操作时的ParallelCollectionRDD

Schema

RDD是无schema的数据结构.因此在以下的代码片段中的并行数据集,通过Spark使用RDD非常适用:

1
data_heterogenous=sc.parallelize([('Ferrrari','fast'),{'Porsche':10000},['Spain','visited',4504]]).collect()

所以这里混合适用了任何类型的数据结构
如果对数据集适用方法.collect()(执行把数据集送回驱动的操作),可以访问对象中的数据.
所谓的.collect()方法就是把RDD的所有元素返回给驱动程序,驱动程序将其序列化成了一个列表.

从文件读取

从文本文件读取数据时,文件中的每一行形成了RDD的一个元素.
data_from_file.take(1)命令输出一个元素

全局作用域和局部作用域

Spark有两种模式下运行:本地的和集群的.本地运行你的Spark代码时,和你目前的python没有什么不同:相比任何其他部分来说,变化最有可能是语法上的,只是加上了一个交织的部分,数据和代码在不同的工作者进程之间复制.

然而,如果你不小心将相同的代码部署到集群,便可能会导致大量的困扰.这就需要了解Spark是怎么在集群上执行工作的任务

在集群模式中,提交执行任务时,任务被发送给了驱动程序节点(或者主节点).该驱动程序节点为任务创建(DAG),并且决定哪一个执行者(工作者)节点将运行特定的任务
然后,该驱动程序指示工作者执行它们的任务,并且在结束时将结果返回给驱动程序.然而在这之前,驱动程序为每一个任务的终止做准备:驱动程序中有一组变量和方法,以便工作者在RDD上执行任务.
这组变量和方法在执行者的上下文中本质上是静态的,即每个执行器从驱动程序中获得一份变量和方法的副本。运行任务时,如果执行者改变这些变量或者覆盖这些方法,它不影响任何其他执行者的副本或者驱动程序的变量和方法.这可能会导致一些意想不到的行为和运行错误,这些行为和错误通常都很难被追踪到。

转换

转换可以调整数据集.包括映射、筛选、连接、转换数据集中的值.

由于RDD是无schema的,在这一节中我们假设你已经知道产生数据集的schema.

.map(….)转换

该方法应用在每个RDD元素上:在data_from_file_conv的数据集情况里,你可以认为这是每一行的转换

1
data_2014=data_from_file_conv.map(lambda row:int(row[16]))

.filter(…)转换

另外一个经常需要适用的转换方法是.filter(….),该方法可以让你从数据集中选择元素,该元素符合特定的标准。

.flatMap(..)

.flatMap(…)方法和.Map()方法的工作类似.但是.flatMap()返回一个扁平的结果,而不是一个列表。

.distinct(…)转换

该方法返回指定列中不同的列表.如果逆向知道你的数据集或者验证这个数据集,.distinct(…)方法会非常有用

.sample(…)转换

.sample(….)方法返回数据集的随机样本.第一个参数指定采样是否应该替换,第二个参数定义返回数据的分数(指数学中的分数),第三个参数是伪随机数产生器的种子.

1
2
fraction=0.1
data_sample=data_from_file_conv.sample(False,fraction,666)

.leftOuterJoin(…)转换

.leftOuterJoin(….),就像在SQL中一样,根据两个数据集中都有的值来连接两个RDD,并返回左侧的RDD记录,而右边的记录附加在两个RDD匹配的地方:

1
2
3
rdd1=sc.parallelize([('a',1),('b',4),('c',10)])
rdd2=sc.parallelize([('a',4),('a',1),('b','6'),('d',15)])
rdd3=rdd1.leftOuterJoin(rdd2)

另外,.join()方法相当于内联(inner)取rdd1与rdd2的交集.

而.intersection(…)返回的是两个RDD中相等的记录

.repartition(…)转换

重新对数据集进行分区,改变了数据集分区的数量.此功能应该谨慎适用,因为他会重组数据,导致对性能方面产生巨大的影响.

操作

和转换不同,操作执行数据集上的计划任务.一旦完成数据转换,则可以执行相应转换.这可能不包含转换(例如,take(n)会仅仅返回n条来自RDD的记录,即使你没有对RDD做任何转换)或者直接执行一系列转换.

.take()方法

这可以说是最有用的方法(也是用的最多的方法,如.map(…)方法).该方法优于.collect(..),因为它只返回单个数据分区的前n行,对比之下,.collect(…)返回的是整个RDD.处理大数据集时,这个区别很重要.

1
data_take_sampled=data_from_file_conv.take(1)

可以使用.takeSample(…)这个方法有三个参数,第一个参数代表采样是否应该被替换;第二个参数指定要返回的记录数量;第三个参数是伪随机数发生器的种子:

1
data_take_sampled=data_from_file_conv.takeSample(False,1,667)

.collect()方法

将所有RDD的元素返回给驱动程序

.reduce方法

.reduce(..)方法使用指定的方法减少RDD中的元素.
你可以利用该方法计算RDD总的元素数量

1
rdd1.map(lambda row:row[1]).reduce(lambda x,y:x+y)

这行代码输出的总数是15

首先通过.map(..)转换,创建一个包含rdd1所有值得列表,然后使用.reduce(..)方法对结果进行处理.在每个分区里,reduce(…)方法运行求和方法,将该总和返回给最终聚合所在得驱动程序节点。
.reduceByKey(..)和.reduce(..)方法类似,但.reduceByKey(…)是在键-键基础上进行.

.count(..)方法

.count(…)统计出了RDD里得元素数量

.saveAsTextFile(…)方法

在RDD执行.saveAsTextFile(..)可以让RDD保存为文本文件:每个文件一个分区.

.foreach(…)方法

这个方法对RDD里得每个元素,用迭代得方式应用相同的函数;和.map(…)比较,.foreach(…)方法按照一个接一个的方式,对每一条记录应用一个定义好的函数。当您希望将数据保存到PySpark本身不支持的数据库时,该方法很有用.

总结

转换的作用是从现有数据集创建新数据集.转换是惰性的,因为它们仅在动作需要将结果返回到驱动程序时才计算。

Action操作会对 RDD 计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如 HDFS)中。

  1. Transformation用于对RDD的创建,RDD只能使用Transformation创建,同时还提供大量操作方法,包括map,filter,groupBy,join等,RDD利用这些操作生成新的RDD,但是需要注意,无论多少次Transformation,在RDD中真正数据计算Action之前都不可能真正运行。

  2. Action是数据执行部分,其通过执行count,reduce,collect等方法真正执行数据的计算部分。实际上,RDD中所有的操作都是Lazy模式进行,运行在编译中不会立即计算最终结果,而是记住所有操作步骤和方法,只有显示的遇到启动命令才执行。这样做的好处在于大部分前期工作在Transformation时已经完成,当Action工作时,只需要利用全部自由完成业务的核心工作。

hadoop提供的接口只有map和reduce函数,spark是mapreduce的扩展,提供两类操作,而不是两个,使使用更方便,开发时的代码量会尽量的被spark的这种多样的API减少数十倍。

回顾Spark
Spark是以RDD概念为中心运行的。RDD是一个容错的、可以被并行操作的元素集合。创建一个RDD有两个方法:在你的驱动程序中并行化一个已经存在的集合;从外部存储系统中引用一个数据集。RDD的一大特性是分布式存储,分布式存储在最大的好处是可以让数据在不同工作节点并行存储,以便在需要数据时并行运算。弹性指其在节点存储时,既可以使用内存,也可已使用外存,为使用者进行大数据处理提供方便。除此之外,RDD的另一大特性是延迟计算,即一个完整的RDD运行任务被分为两部分:Transformation和Action.

Spark资源调度和任务调度过程介绍

参考文献:https://blog.csdn.net/helloxiaozhe/article/details/81533438

pic6

Spark资源调度和任务调度的流程:

  1. 启动集群后,Worker节点会向Master节点汇报资源情况,Master掌握了集群资源情况
  2. 当Spark提交一个Application后,根据RDD之间的依赖关系将Application形成一个DAG(有向无环图).任务提交后,Spark会在Driver端创建两个对象:DAGScheduler和TaskScheduler。
  3. DAGScheduler是任务调度的高层调度器,是一个对象.DAGScheduler的主要作用就是将DAG根据RDD之间的宽窄依赖关系划分为一个个的Stage,然后将这些Stage以TaskSet的形式提交给TaskScheduler(TaskScheduler是任务调度的低层调度器,这里TaskSet其实就是一个集合,里面封装的就是一个个的task任务,也就是stage中的并行度task任务)
  4. TaskSchedule会遍历TaskSet集合,拿到每个task后会将task发送到计算节点Executor中去执行(其实就是发送到Executor中的线程池ThreadPool去执行)。
  5. task在Executor线程池中的运行情况会向TaskScheduler反馈,
  6. 当task执行失败时,则由TaskScheduler负责重试,将task重新发送给Executor去执行,默认重试3次。如果重试3次依然失败,那么这个task所在的stage就失败了。
  7. stage失败了则由DAGScheduler来负责重试,重新发送TaskSet到TaskSchdeuler,Stage默认重试4次。如果重试4次以后依然失败,那么这个job就失败了。job失败了,Application就失败了。
  8. TaskScheduler不仅能重试失败的task,还会重试straggling(落后,缓慢)task(也就是执行速度比其他task慢太多的task)。如果有运行缓慢的task那么TaskScheduler会启动一个新的task来与这个运行缓慢的task执行相同的处理逻辑。两个task哪个先执行完,就以哪个task的执行结果为准。这就是Spark的推测执行机制。在Spark中推测执行默认是关闭的。推测执行可以通过spark.speculation属性来配置。
    总结:
  9. 对于ETL类型要入数据库的业务要关闭推测执行机制,这样就不会有重复的数据入库。
  10. 如果遇到数据倾斜的情况,开启推测执行则有可能导致一直会有task重新启动处理相同的逻辑,任务可能一直处于处理不完的状态。(所以一般关闭推测执行)
  11. 一个job中多个action, 就会有多个job,一般一个action对应一个job,如果一个application中有多个job时,按照顺序一次执行,即使后面的失败了,前面的执行完了就完了,不会回滚。
  12. 有SparkContext端就是Driver端。

粗粒度资源申请和细粒度资源申请

  • 粗粒度资源申请(Spark):在Application执行之前,将所有的资源申请完毕,当资源申请成功后,才会进行任务的调度,当所有的task执行完成后,才会释放这部分资源。
    优点:在Application执行之前,所有的资源都申请完毕,每一个task运行时直接使用资源就可以了,不需要task运行时在执行前自己去申请资源,task启动就快了,task执行快了,stage执行就快了,job就快了,application执行就快了。
    缺点:直到最后一个task执行完成才会释放资源,集群的资源无法充分利用。当数据倾斜时更严重。
  • 细粒度资源申请(MapReduce)
    Application执行之前不需要先去申请资源,而是直接执行,让job中的每一个task在执行前自己去申请资源,task执行完成就释放资源。
    优点:集群的资源可以充分利用。
    缺点:task自己去申请资源,task启动变慢,Application的运行就相应的变慢了。

DataFrame

DataFrame也是一种不可变的分布式数据集,这种数据集被组织成指定的列,类似于关系数据库的表。Spark DataFrame是一个近似的概念,即允许用户轻松地使用结构化数据(如数据表).不过也存在一些差异.
通过在分布式数据集上施加结构,让Spark用户利用Spark SQL来查询结构化的数据或使用Spark表达式方法(而不是lambda).

Python到RDD之间的通信

每当使用RDD执行PySpark程序时,潜在地需要巨大地开销来执行作业。在PySpark驱动器中,Spark Context通过Py4j启动一个使用JavaSparkContext的JVM。所有的RDD转换最初都映射到java中的PythonRDD对象.

一旦这些任务被推送到Spark工作节点,PythonRDD对象就使用管道(pipe)启动Python的子进程,发送代码和数据到Python中进行处理:
虽然该方法允许PySpark将数据处理分布到多个工作节点的多个Python子进程中,Python和JVM之间还是有很多上下文切换和通信开销。

利用DataFrame加速PySpark

引入DataFrame之前,Python查询速度普遍比使用RDD的Scala查询慢(后者快两倍)。通常情况,这种查询性能的降低源于Python和JVM之间的通信开销

常见DataFrame

通常情况下,通过使用SparkSession导入数据(或调用PySpark的shell脚本Spark)来创建DataFrame

生成自己的JSON数据

1
stringJSONRDD=sc.parallelize(("""{"id":"123","name":"Katie","age":19,"eyeColor":"brown"}""","""{"id":"234","name":"Michael","age":22,"eyeColor":"green"}"""))

现在已经创建了RDD,利用SparkSession read.json方法,RDD将会转换成一个DataFrame(即Spark.read.json(….)).还可以利用.createOrReplaceTempView方法创建一个临时表

1
2
swimmersJson=spark.read.json(stringJSONRDD)
swimmersJson.createOrReplaceTempView("swimmersJson")#创建临时表

许多的RDD操作都是有相关转换的,直到行为操作执行,这些RDD操作都不会被执行
重要的是注意parallelize、map和mapPartitions都是RDD的转换而来.spark.read.json包裹在DataFrame中,不仅仅是RDD转换,还是RDD转换成DataFrame的行动.
注意:创建临时表是一次DataFrame转换,并且只有直到执行DataFrame动作时,创建临时表才会被执行.

简单的DataFrame查询

.show()

swimmersJson.show()
默认显示前10行

SQL查询

如果你愿意编写SQL语句,则可以编写以下查询

1
spark.sql("select * from swimmersJson").collect()

使用.collect()方法,返回行对象列表所有的记录.也可以使用.show()方法

RDD的交互操作

有两种从RDD变换到DataFrame(或者Dataset[T])的不同方法:使用反射推断模式或以编程方式指定模式.

使用反射来推断模式

来建立DataFrame和运行查询的过程中,我们略过了DataFrame的模式是自动定义的这一事实.最初,行对象通过传递一列键/值对作为行类的**kwargs来构造.然后,SparkSQL将行对象的RDD转变为一个DataFrame,在DataFrame中键就是列,数据类型通过采用数据来推断.

在开始创建swimmersJSON DataFrame之后,没有指定模式,你会注意到利用printSchema()方法的模式定义

1
2
#打印模式
swimmersJson.printSchema()

编写指定模式

在这种情况下,我们通过Spark SQL中引入数据类型(pyspark.sql.types),以编程方式来指定模式,并生成一些.csv数据

1
2
3
4
5
6
from pyspark.sql.types import *
stringCSVRDD=sc.parallelize([
(123,'Katie',19,'brown'),
(234,'Michael',22,'green'),
(345,'Simone',23,'blue')
])

首先,我们根据以下的[schema]变量将模式编码成一个字符串.然后我们会利用StructType和StructField定义模式.

1
2
3
4
5
6
schema=StructType([
StructField("id",LongType(),True),
StructField("name",StringType(),True),
StructField("age",LongType(),True),
StructField("eyeColor",StringType(),True)
])

注意,StructField被分解为以下方面:

  • name:该字段的名字
  • dataType:该字段的数据类型
  • nullable:指示此字段的值是否为空
1
2
3
4
#对RDD应用该模式并创建DataFrame
swimmers=spark.createDataFrame(stringCSVRDD,scheam)
#利用DataFrame创建一个临时视图
swimmers.createOrReplaceTempView("swimmers")

利用DataFrame API查询

行数

得到DataFrame中的行数

1
swimmers.count()

运行筛选语句

运行一个筛选语句,可以使用filter子句(筛选子句);在下面的代码段中,我们使用select子句来指定要返回的列

1
2
3
4
#获取age22的id
swimmers.select("id","age").filter("age=22").show()
#上述查询的另外一种方法如下:
swimmers.select(swimmers.id,swimmers.age).filter(swimmers.age==22).show()

利用SQL查询

行数

1
spark.sql('select count(1) from swimmers').show()

利用where子句运行筛选语句

1
spark.sql("select id,age from swimmers where age=22").show()

和DataFrame API查询一样,如果指示想要取回眼睛颜色以字母b开头的游泳运动员的名字,我们还可以使用like语法:

1
2
spark.sql(
"select name,eyeColor from swimmers where eyeColor like 'b%'").show()

-------------本文结束感谢您的阅读-------------