Spark(六)

使用Spark处理流数据与信息

Spark Streaming简介

事件处理也被称为流处理,是大数据平台的关键组件之一.Spark项目所包含所包含的子项目Spark Streaming提供了具有容错性和数据保证的低延迟处理
Spark Streaming 提供了一个与Spark基于RDD的批处理框架整合在一起的事件处理系统,保证每个事件恰好被处理以此,即使发生节点崩溃或其他错误.

Spark Streaming包括如下设计初衷:

  • 低延迟(秒级)
  • 精确一次的事件处理
  • 线性伸缩性
  • 整合Spark核心API和DataFrame API

    Spark Streaming架构

    Spark引入了离散流(即DStream)的概念.DStream本质上是存储在一系列RDD中的批数据,每个批代表一个时间窗口内的数据,时间窗口长度通常为秒级.然后,所生成的这一系列RDD就可以用核心的Spark RDD API和转化操作来进行处理.

之前讨论的程序入口SparkContext和SparkSession一样,Spark Streaming应用有一个名为StreamingContext的入口。StreamingContext代表使已有的SparkContext建立的与Spark平台或集群的连接.你可以使用StreamContext来从流式输入数据源创建DStream,并管理流计算和DStream的转化操作

StreamContext还指定了batchDuration参数,它是一个以秒为单位的时间间隔,表示将流数据分为一系列批数据的间隔.在实例化出StreamContext后,你可以创建数据流的连接,并定义一系列要执行的转化操作.在创建StreamContext之后,你可以使用start()方法(或ssc.start())来触发新数据的求值.你也可以在程序中使用ssc.stop()或ssc.awaitTermination()停止StreamContext

1
2
3
4
5
from pyspark.streaming import StreamingContext
ssc=StreamingContext(sc,1)
····
ssc.start()
····

正如sc和sqlContext分别是SparkContext和SQLContext或HiveContext类在对象实例化时的惯用实例名一样,ssc是StreamingContext实例的惯用名.不过和前面两个入口的区别在于,StreamingContext不会在交互式shell pyspark和spark-shell中自动实例化

DStream简介

离散流(DStream)是Spark Streaming API的基本编程对象.DStream表示从连续的数据流创建的连续的一系列RDD,其中每个底层RDD表示数据流在一个时间窗口内的数据.
DStream可以从TCP套接字、消息系统、流API(比如Twitter流API)等创建出来.作为一种RDD抽象,DStream也可以由已有DStream的转化操作创建,如map()、flatMap()、以及其他操作.

DStream支持两种类型的操作:

  • 转化操作
  • 输出操作
    输出操作类似于RDD的行动操作.DStream按输出操作的需要惰性执行,这也类似于Spark RDD的惰性求值.
  1. DStream数据源
    在StreamingContext中,DStream是为特定的输入数据流定义的,在与SparkContext中为输入数据源定义RDD颇为相似.Streaming API中包含许多常见的流式输入源,比如从TCP套接字读取数据的输入源,或在输入写入HDFS时读取的输入源.
    创建DStream的基本输入数据源
    (1)socketTextStream()
    1
    StreamingContext.socketTextStream(hostname,port,storageLevel=StorageLevel(True,True,False,False,2))

使用socketTextStream()方法可以从hostname参数和port参数定义的TCP输入源创建DStream.收到的数据使用UTF8编码来解码,并把换行符作为记录间的分割符.storage-Level参数定义DStream的存储级别,默认为MEMORY_AND_DISK_SER.

1
2
3
4
5
6
7
from pyspark.streaming import StreamContext
ssc=StreamingContext(sc,1)
lines=ssc.socketTextStream('localhost',9999)
counts=lines.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()

(2)textFileStream()

1
StreamingContext.textFileStream(directory)

使用textFileStream()方法可以监控当前系统或应用配置所指定的HDFS上的一个目录,从而创建DStream.textFileStream()监听directory参数所指定的目录中新文件的创建,并捕获写入的数据,作为流式数据源.

1
2
3
4
5
6
7
from pyspark.streaming import StreamingContext
ssc=StreamingContext(sc,1)
lines=ssc.textFileStream('hdfs:///data/incoming/')
counts=lines.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1)).reduceByKey(lambda a,b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()

Spark提供用于Apach kafka,亚马逊Kinesis、Apache Flume等常见信息系统的内建数据源.

  1. DStream转化操作
    DStream API包括许多来自RDD API的转化操作.DStream转化操作与RDD转化操作类似,对现有DStream运用函数会创建出新的DStream

    1
    2
    3
    4
    5
    6
    7
    from pyspark.streaming import StreamingContext
    ssc=StreamingContext(sc,30)
    lines=ssc.socketTextStream('localhost',9999)
    counts=lines.map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()
  2. DStream谱系与检查点
    与RDD和DataFrame的谱系很像,Spark会维护每个DStream的谱系以实现容错.因为流处理应用天生是长时间运行的应用,检查点通常很有必要.DStream的检查点与RDD和DataFrame API的检查点类似.
    (1)StreamingContext.checkpoint()

    1
    StreamingContext.checkpoint(directory)

StreamingContext.checkpoint()方法让DStream操作可以定期保存检查点,以实现耐久性和容错性.应用DAG会在StreamingContext定义的每个批处理间隔保存检查点.directory参数一般配置为HDFS中的目录,用于持久化检查点数据.
(2)DStream.checkpoint()

1
DStream.checkpoint(interval)

DStream.checkpoint()可以定期保存特定DStream中包含的RDD的检查点.interval参数是以秒为单位的时间,每隔这么长时间DStream底层的RDD就会保存一次检查点.
注意interval参数必须设置为StreamingContext所设置的batchDuration参数的正整数倍。

  1. DStream的缓存与持久化
    DStream支持使用RDD中名称相同且用途类似的接口cache()和persist()进行缓存和持久化.如果DStream会在下游处理操作中多次,这些选项尤其方便.DStream的存储级别与RDD的存储级别一样.

  2. 流处理应用中的广播变量和累加器
    DStream的输出操作在概念上类似于RDD的行动操作.DStream输出操作将数据、结果、时间,或其他数据写入控制台、文件系统、数据库,或其他目的地,比如Kafka这样的消息平台.
    (1)pprint()

    1
    DStream.pprint(num=10)

pprint()方法打印DStream中每个RDD的前几个元素,元素数量通过num参数指定,默认值为10.使用pprint()是从流处理应用获取交互式控制台反馈的常用方式.
(2)saveAsTextFile()

1
DStream.saveAsTextFile(prefix,suffix-=None)

-------------本文结束感谢您的阅读-------------