spark是基于内存的用于大规模数据处理(离线计算、实时计算、快速查询)的统一分析引擎。
也是一个生态系统。
1、速度快
比MapReduce块10-100倍
2、易用(算法多)
MR只支持一种计算 算法,Spark支持多种算法。
3、通用
Spark可以支持离线计算、实时计算、快速查询(交互式)、机器学习、图计算
4、兼容性强
支持大数据中现有的Yarn. Mesos等多种调度平台,可以处理hadoop支持的数据。
2009 年诞生于加州大学伯克利分校AMP 实验室
2014年成为 Apache 的顶级项目
原因1:优秀的数据模型和计算抽
支持多种计算模型,而且基于内存(内存比硬盘速度快)
RDD 是一个可以容错且并行的数据结构
原因2:完善的生态圈(Spark生态圈)
Spark Core:实现Spark 基本功能(RDD)
SparK SQL: 操作结构化数据
Spark Streaming : 对实时数据进行流式计算
Spark MLlib : 机器学习(ML)功能
GraphX(图计算) : 用于图计算的API
Hadoop(HDFS-MR-YARN) | Spark | |
类型 | 基础平台, 包含计算, 存储, 调度 | 分布式计算工具 |
场景 | 大规模数据集上的批处理 | 迭代计算, 交互式计算, 流计算 |
价格 | 对机器要求低, 便宜 | 对内存有要求, 相对较贵 |
编程范式 | Map+Reduce, API 较为底层, 算法适应性差 | RDD组成DAG有向无环图, API 较为顶层, 方便使用 |
数据存储结构 | MapReduce中间计算结果存在HDFS磁盘上, 延迟大 | RDD中间运算结果存在内存中 , 延迟小 |
运行方式 | Task以进程方式维护, 任务启动慢 | Task以线程方式维护, 任务启动快 |
1.local本地模式(单机)--开发测试使用
2.standalone独立集群模式--开发测试使用
3.standalone-HA高可用模式--生产环境使用
4.on yarn集群模式--生产环境使用
5.on mesos集群模式--国内使用较少
6.on cloud集群模式--中小公司未来会更多的使用云服务
[infobox title="Local模式安装部署"]
使用CDH5.14.0-Spark2.2版本
第一步:上传解压
第二步:开箱即用(local模式)
进入spark-shell方式
1、Spark-shell
2、Spark-shell --master local[*]
*表示使用当前机器上所有可用的资源
3、Spark-shell --master local[n]
数字N表示在本地模拟N个线程来运行当前任务
本地数据计算
val textFile = sc.textFile("file:////opt/spark01/tt.txt")
val counts = textFile.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
集群上的数据计算
val textFile = sc.textFile("hdfs://node01:8020/tt.txt")
val counts = textFile.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://node01:8020/ttt")
[/infobox]
[warningbox title="standalone集群模式部署"]
第一步:上传并解压
第二步:修改配置
#配置java环境变量
export JAVA_HOME=/export/servers/jdk1.8
#指定spark Master的IP
export SPARK_MASTER_HOST=node01
#指定spark Master的端口
export SPARK_MASTER_PORT=7077
第三步:分发到其他节点
scp -r /export/servers/spark node02:/export/servers
scp -r /export/servers/spark node03:/export/servers
说明:Spark的环境变量可以添加到服务器环境变量内,但是spark和hadoop有部分脚冲突,需要修改冲突的脚本中的一个。
第四步:启动
sbin/start-all.sh
sbin/stop-all.sh
[/warningbox]
[dangerbox title="standaloneHA集群模式部署 "]
第一步:上传并解压
第二步:修改配置
#配置java环境变量
export JAVA_HOME=/export/servers/jdk1.8
#指定zookeeper
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node01:2181,node02:2181,node03:2181 -Dspark.deploy.zookeeper.dir=/spark"
#指定spark Master的端口
export SPARK_MASTER_PORT=7077
第三步同步到其他节点
scp -r spark-2.2.0-bin-2.6.0-cdh5.14.0 node02:$PWD
scp -r spark-2.2.0-bin-2.6.0-cdh5.14.0 node03:$PWD
第四步: 先启动ZK ,再启动spark
[/dangerbox]
[infobox title="on yarn集群模式 安装部署"]
准备工作
Hadoop正常安装、 单机版本的spark安装成功
第一步:上传解压
第二步:修改配置
#配置java环境变量
export JAVA_HOME=/export/servers/jdk1.8
#指定spark Master的IP
export SPARK_MASTER_HOST=node01
#指定spark Master的端口
export SPARK_MASTER_PORT=7077
#设置hadoop配置路径
export HADOOP_CONF_DIR=/export/servers/hadoop/etc/hadoop
第三步:使用spark-submit提交任务(不需要开启spark)
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master S \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 2 \
--queue default \
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \
10
参数说明
-deploy-mode 任务运行模式
cluster模式:生产环境中使用该模式
1.Driver程序在YARN集群中
2.应用的运行结果不能在客户端显示
3.该模式下Driver运行ApplicattionMaster这个进程中,如果出现问题,yarn会重启ApplicattionMaster(Driver)
●client模式:
1.Driver运行在Client上的Spark Submit进程中
2.应用程序运行结果会在客户端显示
spark-submit命令用来提交jar包给spark集群/YARN
--master spark://node01:7077 指定 Master 的地址
--name "appName" 指定程序运行的名称
--class 程序的main方法所在的类
--jars xx.jar 程序额外使用的 jar 包
--driver-memory 512m Driver运行所需要的内存, 默认1g
--executor-memory 2g 指定每个 executor 可用内存为 2g, 默认1g
--executor-cores 1 指定每一个 executor 可用的核数
--total-executor-cores 2 指定整个集群运行任务使用的 cup 核数为 2 个
--queue default 指定任务的对列
--deploy-mode S 指定运行模式(client/cluster)
[/infobox]
前提:创建一个maven项目
编写代码
1、创建spark conf
2、实例一个sparkcontext
3、读物数据,对数据进行操作(业务逻辑)
4、保存最终的结果
Jar包执行
讲代码到导出成为jar文件,上传到集群,通过spark-submit提交任务
。。。。。。
什么是RDD
弹性分布式数据集(保存在内存中)
弹性的,RDD中的数据可以保存在内存中或者磁盘里面
分布式存储的,可以用于分布式计算
集合,可以存放很多元素
代表一个不可变、可分区、里面的元素可并行计算的集合。
rdd1 rdd2 rdd3 不能改变
1、数据集的基本组成单位,一组分片或多分区
每个分片(每个分区)都会被一个计算任务处理,分片数决定并行度(与kafka相同)
用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值(默认值是2)
2、Spark中RDD的计算是以分区为单位的,计算函数会被作用在每一个分区。
3、一个RDD会依赖于其他多个RDD。
RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算(Spark的容错机制)
4、对于KV类型的RDD会有一个Partitioner函数, 即RDD的分区函数(可选项)
非key-value的RDD的Parititioner的值是None
Partitioner函数决定了RDD本身的分区数量,也决定了parent RDD Shuffle输出时的分区数量。
5、一个列表,存储每个Partition的位置(preferred location)。
计算程序通过列表找到数据
创建RDD
1、由外部存储系统的数据集创建
val rdd1 = sc.textFile("hdfs://node01:8020/wordcount/input/words.txt")
2、通过已有的RDD经过算子转换生成新的RDD
val rdd2=rdd1.flatMap(_.split(" "))
3、由一个已经存在的Scala集合创建
A: val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
B: val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))
RDD的算子分为两类:
1.Transformation转换操作:返回一个新的RDD
2.Action动作操作:返回值不是RDD(无返回值或返回其他的)
如何判断一个方法是Transformation?还是Action?
当经过转换后返回 值是rdd表示此操作是个Transformation,反之就是一个Actions。
如何理解Spark的惰性计算?
RDD中的所有转换都是惰性求值/延迟执行的,也就是说并不会直接计算
遇到Action动作时,这些转换才会真正运行。没有遇到不执行。
之所以使用惰性求值/延迟执行,是因为这样可以在Action时对RDD操作形成DAG有向无环图进行Stage的划分和并行优化,这种设计让Spark更加有效率地运行。
[success]
Transformation
转换 | 含义 |
map(func) | 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 |
filter(func) | 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成 |
flatMap(func) | 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素) |
mapPartitions(func) | 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是
(Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) | 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子 |
union(otherDataset) | 对源RDD和参数RDD求并集后返回一个新的RDD |
intersection(otherDataset) | 对源RDD和参数RDD求交集后返回一个新的RDD |
distinct([numTasks])) | 对源RDD进行去重后返回一个新的RDD |
groupByKey([numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | |
sortByKey([ascending], [numTasks]) | 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) | 与sortByKey类似,但是更灵活 |
join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD |
cartesian(otherDataset) | 笛卡尔积 |
pipe(command, [envVars]) | 对rdd进行管道操作 |
coalesce(numPartitions) | 减少 RDD 的分区数到指定值。在过滤大量数据之后,可以执行此操作 |
repartition(numPartitions) | 重新给 RDD 分区 |
[/success]
[info]
Action
动作 | 含义 |
reduce(func) | 通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的 |
collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
count() | 返回RDD的元素个数 |
first() | 返回RDD的第一个元素(类似于take(1)) |
take(n) | 返回一个由数据集的前n个元素组成的数组 |
takeSample(withReplacement,num, [seed]) | 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子 |
takeOrdered(n, [ordering]) | 返回自然顺序或者自定义顺序的前 n 个元素 |
saveAsTextFile(path) | 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本 |
saveAsSequenceFile(path) | 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。 |
saveAsObjectFile(path) | 将数据集的元素,以 Java 序列化的方式保存到指定的目录下 |
countByKey() | 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。 |
foreach(func) | 在数据集的每一个元素上,运行函数func进行更新。 |
foreachPartition(func) | 在数据集的每一个分区上,运行函数func |
[/info]
1.启动的时候指定的CPU核数确定了一个参数值:
spark.default.parallelism=指定的CPU核数(集群模式最小2)
2.对于Scala集合调用parallelize(集合,分区数)方法,
如果没有指定分区数,就使用spark.default.parallelism,
如果指定了就使用指定的分区数(不要指定大于spark.default.parallelism)
3.对于textFile(文件,分区数) defaultMinPartitions
如果没有指定分区数sc.defaultMinPartitions=min(defaultParallelism,2)
如果指定了就使用指定的分区数sc.defaultMinPartitions=指定的分区数
rdd的分区数
对于本地文件:
rdd的分区数 = max(本地file的分片数, sc.defaultMinPartitions)
对于HDFS文件:
rdd的分区数 = max(hdfs文件的block数目, sc.defaultMinPartitions)
所以如果分配的核数为多个,且从文件中读取数据创建RDD,即使hdfs文件只有1个切片,最后的Spark的RDD的partition数也有可能是2
文章评论