redis数据持久化什么作用??(将内存中的数据写入到硬盘中,进行永久保存)
防止数据丢失!
Rdd数据持久化什么作用?
1、对多次使用的rdd进行缓存,缓存到内存,当后续频繁使用时直接在内存中读取缓存的数据,不需要重新计算。
2、将RDD结果写入硬盘(容错机制),当RDD丢失数据时,或依赖的RDD丢失数据时,可以使用持久化到硬盘的数据恢复。
缓存方法
Persist
Cache
这两个方法被调用时立即缓存,而是触发后面的action时
代码
val rdd1 = sc.textFile("hdfs://node01:8020/wordcount/input/words.txt")
val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2.cache
rdd2.sortBy(_._2,false).collect//触发action,会去读取HDFS的文件,rdd2会真正执行持久化
rdd2.sortBy(_._2,false).collect
缓存级别
掌握的级别
MEMORY_ONLY(默认):数据写入内存,内存不足,部分分区数据不缓存。
MEMORY_AND_DISK:数据写入内存,内存不足写入磁盘。
DISK_ONLY:数据写入硬盘。
总结:
1.RDD持久化/缓存的目的是为了提高后续操作的速度
2.缓存的级别有很多,默认只存在内存中,开发中使用memory_and_disk
3.只有执行action操作的时候才会真正将RDD数据进行持久化/缓存
4.实际开发中如果某一个RDD后续会被频繁的使用,可以将该RDD进行持久化/缓存
将数据写入HDFS,利用HDFS永久存储。
操作过程
1、设置持久化的存储路径
2、调用checkpoint()进行数据的保存
SparkContext.setCheckpointDir("目录") //HDFS的目录
RDD.checkpoint()
代码:
sc.setCheckpointDir("hdfs://node01:8020/ckpdir")
val rdd1 = sc.textFile("hdfs://node01:8020/wordcount/input/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
rdd1.checkpoint()
rdd1.collect
持久化结果
如何保证数据的安全性性及读取效率??
先做缓存/持久化,再做checkpint操作
cache和Checkpoint的区别
位置
Persist 和 Cache将数据保存在内存
Checkpoint将数据保存在HDFS
生命周期
Persist 和 Cache 程序结束后会被清除或手动调用unpersist方法。
Checkpoint永久存储不会被删除。
RDD依赖关系(血统Lineage)
Persist和Cache,不会丢掉RDD间的依赖链/依赖关系
Checkpoint会斩断依赖链
RDD的依赖关系
窄依赖:父RDD的一个分区只会被子RDD的一个分区依赖(图一)
宽依赖:父RDD的一个分区会被子RDD的多个分区依赖(涉及到shuffle)(图二)
为什么划分宽窄依赖?
1.对于宽依赖
是划分Stage的依据(目的是实现并行化计算)
2.对于窄依赖
Spark Stage可以并行计算(并行计算速度快)
什么是DAG
指的是数据转换执行的过程,有方向,无闭环(其实就是RDD执行的流程)
DAG边界:(在哪里开始?哪里结束)
开始:通过SparkContext创建的RDD
结束:触发Action,一旦触发Action就形成了一个完整的DAG
说明:
一个Spark应用中可以有一到多个DAG,取决于触发了多少次Action
一个DAG中会有不同的阶段/stage,划分阶段/stage的依据就是宽依赖
一个阶段/stage中可以有多个Task,一个分区对应一个Task
为什么要划分Stage?
为了并行计算,提高计算效率。
总结
Spark会根据shuffle/宽依赖使用回溯算法来对DAG进行Stage划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的stage/阶段中
1.Application:指的是用户编写的Spark应用程序/代码,包含了Driver功能代码和分布在集群中多个节点上运行的Executor代码。
2.Driver:Spark中的Driver即运行上述Application的Main()函数并且创建SparkContext,SparkContext负责和ClusterManager通信,进行资源的申请、任务的分配和监控等
3.Cluster Manager:指的是在集群上获取资源的外部服务,Standalone模式下由Master负责,Yarn模式下ResourceManager负责;
4.Executor:是运行在工作节点Worker上的进程,负责运行任务,并为应用程序存储数据,是执行分区计算任务的进程;
5.RDD:Resilient Distributed Dataset弹性分布式数据集,是分布式内存的一个抽象概念;
6.DAG:Directed Acyclic Graph有向无环图,反映RDD之间的依赖关系和执行流程;
7.Job:作业,按照DAG执行就是一个作业;Job==DAG
8.Stage:阶段,是作业的基本调度单位,同一个Stage中的Task可以并行执行,多个Task组成TaskSet任务集
9.Task:任务,运行在Executor上的工作单元,一个Task计算一个分区,包括pipline上的一系列操作
1.当一个Spark应用被提交时,首先需要为这个Spark Application构建基本的运行环境,即由任务控制节点(Driver)创建一个SparkContext,
2.SparkContext向资源管理器注册并申请运行Executor资源;
3.资源管理器为Executor分配资源并启动Executor进程,Executor运行情况将随着心跳发送到资源管理器上;
4.SparkContext根据RDD的依赖关系构建成DAG图,并提交给DAGScheduler进行解析划分成Stage,并把该Stage中的Task组成Taskset发送给TaskScheduler。
5.TaskScheduler将Task发放给Executor运行,同时SparkContext将应用程序代码发放给Executor。
6.Executor将Task丢入到线程池中执行,把执行结果反馈给任务调度器,然后反馈给DAG调度器,运行完毕后写入数据并释放所有资源。
总结:
1.Spark应用被提交-->SparkContext向资源管理器注册并申请资源-->启动Executor
2.RDD-->构建DAG-->DAGScheduler划分Stage形成TaskSet-->TaskScheduler提交Task-->Worker上的Executor执行Task
当资源调度是YARN时,缺失资源申请过程!!
累加器
1.累加器accumulators:累加器支持在所有不同节点之间进行累加计算(比如计数或者求和)
import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkConf, SparkContext}
object AccumulatorTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
//使用scala集合完成累加
var counter1: Int = 0;
var data = Seq(1,2,3)
data.foreach(x => counter1 += x )
println(counter1)
println("+++++++++++++++++++++++++")
//使用RDD进行累加
var counter2: Int = 0;
val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3]
dataRDD.foreach(x => counter2 += x)
println(counter2)//0
//注意:上面的RDD操作运行结果是0
//因为foreach中的函数是传递给Worker中的Executor执行,用到了counter2变量
//而counter2变量在Driver端定义的,在传递给Executor的时候,各个Executor都有了一份counter2
//最后各个Executor将各自个x加到自己的counter2上面了,和Driver端的counter2没有关系
//那这个问题得解决啊!不能因为使用了Spark连累加都做不了了啊!
//如果解决?---使用累加器
val counter3: Accumulator[Int] = sc.accumulator(0)
dataRDD.foreach(x => counter3 += x)
println(counter3)//6
}
}
2.广播变量broadcast variables:广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object BroadcastVariablesTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
//不使用广播变量
val kvFruit: RDD[(Int, String)] = sc.parallelize(List((1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")))
val fruitMap: collection.Map[Int, String] =kvFruit.collectAsMap
//scala.collection.Map[Int,String] = Map(2 -> orange, 4 -> grape, 1 -> apple, 3 -> banana)
val fruitIds: RDD[Int] = sc.parallelize(List(2,4,1,3))
//根据水果编号取水果名称
val fruitNames: RDD[String] = fruitIds.map(x=>fruitMap(x))
fruitNames.foreach(println)
//注意:以上代码看似一点问题没有,但是考虑到数据量如果较大,且Task数较多,
//那么会导致,被各个Task共用到的fruitMap会被多次传输
//应该要减少fruitMap的传输,一台机器上一个,被该台机器中的Task共用即可
//如何做到?---使用广播变量
println("=====================")
//将Map数据添加到广播变量
val BroadcastFruitMap: Broadcast[collection.Map[Int, String]] = sc.broadcast(fruitMap)
//获取广播变量内的数据
val fruitNames2: RDD[String] = fruitIds.map(x=>BroadcastFruitMap.value(x))
fruitNames2.foreach(println)
}
}
广播表变量:通常与用于两个表(一个大表一个小表,将小标广播出去)的join。
文章评论