Spark Streaming是一个基于Spark Core之上的实时计算框架。
特点
易用:可以像编写离线批处理一样去编写流式程序,支持java/scala/python语言。
容错:SparkStreaming在没有额外代码和配置的情况下可以恢复丢失的工作。
易整合到Spark体系:流式处理与批处理和交互式查询相结合。
在架构中的位置
在大数据计算模块中的实时计算模块
Spark Streaming中,会有一个接收器组件Receiver,作为一个长期运行的task跑在一个Executor上。Receiver接收外部的数据流形成input DStream
DStream会被按照时间间隔(自定)划分成一批一批的RDD
编写业务代码对DStream进行操作,实际就是对RDD进行操作,有多少个RDD业务代码就会执行多少次。
DStream:持续性的输入的数据流和经过各种Spark算子操作后的输出的结果数据流
本质上就是一系列时间上连续的RDD
准实时性计算/近实时性计算(不是100%的实时计算【5s中之内是可以接受的】)
对DStream的数据的进行操作也是按照RDD为单位来进行的
三者的数据抽象分别是什么?什么关系?
DStream相关操作
- 数据输入:Receiver
2、数据转化:Transformations(转换)
2.1每个批次的处理不依赖于之前批次的数据
2.2当前批次的处理需要使用之前批次的数据或者中间结果
2.2.1 UpdateStateByKey(func)
2.2.2 Window Operations 窗口操作
3、数据输出:Output Operations(输出)/Action
当某个Output Operations被调用时,spark streaming程序才会开始真正的计算过程。
[warningbox title="SparkStreaming案例"]
object SparkStreamingDemo01 {
//用于求DUOGE rdd总和
def sum(currentValues:Seq[Int], historyValue:Option[Int] ):Option[Int] ={
// currentValues当前值 hadoop-1,1,1
// historyValue历史值
val result: Int = currentValues.sum + historyValue.getOrElse(0)
Some(result)
}
def main(args: Array[String]): Unit = {
//1 创建sparkConf
var conf=new SparkConf().setAppName("SparkStreamingDemo01").setMaster("local[*]")
//2 创建sparkContext
var sc =new SparkContext(conf)
sc.setLogLevel("WARN")
//3 创建StreamingContext
val ssc= new StreamingContext(sc,Seconds(1))
//设置临时存储数据的路径
ssc.checkpoint("./WordCount")
//4 接收 数据,并根据业务逻辑进行计算
val inputDatas: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)
//val WordCount = inputDatas.flatMap(a=>a.split(" ")).map(z=>(z,1)).reduceByKey(_+_)
val WordOne = inputDatas.flatMap(a=>a.split(" ")).map(z=>(z,1)) //没有reduce
//计算最终的总和
val WordAllCount = WordOne.updateStateByKey(sum)
WordAllCount.print()
//5 开启实时任务
ssc.start()
//6 等待关闭任务
ssc.awaitTermination()
}
}
[/warningbox]
每一个方格都是一个rdd,多个则是DStream
def main(args: Array[String]): Unit = {
//1 创建sparkConf
var conf=new SparkConf().setAppName("SparkStreamingDemo01").setMaster("local[*]")
//2 创建sparkContext
var sc =new SparkContext(conf)
sc.setLogLevel("WARN")
//3 创建StreamingContext
val ssc= new StreamingContext(sc,Seconds(5))
//设置临时存储数据的路径
//4 接收 数据,并根据业务逻辑进行计算
val inputDatas: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)
//val WordCount = inputDatas.flatMap(a=>a.split(" ")).map(z=>(z,1)).reduceByKey(_+_)
val WordOne = inputDatas.flatMap(a=>a.split(" ")).map(z=>(z,1))
//计算最终的总和
val WordAllCount = WordOne.reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Seconds(10),Seconds(5))
WordAllCount.print()
//5 开启实时任务
ssc.start()
//6 等待关闭任务
ssc.awaitTermination()
}
Receiver接收方式
- 多个Receiver接受数据效率高,但有丢失数据的风险。
- 开启日志(WAL)可防止数据丢失,但写两遍数据效率低。
- Zookeeper维护offset有重复消费数据可能。
- 使用高层次的API
Direct直连方式
- 不使用Receiver,直接到kafka分区中读取数据
- 不使用日志(WAL)机制。
- Spark自己维护offset
- 使用低层次的API
|
文章评论