SparkSQL入门详解

[infobox title=”什么是SparkSQL?”]

用于处理结构化数据的Spark模块。
可以通过DataFrame和DataSet处理数据。

 

SparkSQL可以看做是一个转换层,向下对接各种不同的结构化数据源,向上提供不同的数据访问方式

[/infobox]

[infobox title=”SparkSQL特点”]

1、易整合
可以使用java、scala、python、R等语言的API操作。
2、统一的数据访问
连接到任何数据源的方式相同。
3、兼容Hive
4、标准的数据连接(JDBC/ODBC)

[/infobox]

[infobox title=”SQL优缺点”]

优点:表达非常清晰,难度低、易学习。
缺点:复杂的业务需要复杂的SQL, 复杂分析,SQL嵌套较多。机器学习较难实现。

[/infobox]

[infobox title=”Hive和SparkSQL的对比”]

Hive是将sql转化成MapReduce进行计算
SparkSQL是将sql转化成rdd集进行计算

[/infobox]

[infobox title=”SparkSQL中的两个抽象”]

什么RDD??
弹性分布式数据集。

DataFrame
什么是DataFrame??
DataFrame是以RDD为基础的带有Schema元信息的分布式数据集。
(DataFrame=Schema+RDD*n)

什么是DataSaet??
含有类型信息的DataFrame就是DataSet
(DataSaet=DataFrame+类型= Schema+RDD*n+类型)
DataSet包含了DataFrame的功能

 

[warningbox title=”详解”]

在SparkSQL中Spark为我们提供了两个新的抽象,分别是DataFrame和DataSet;他们和RDD有什么区别呢?首先从版本的产生上来看:RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)
如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式
注意:在后期的Spark版本中,DataSet会逐步取代RDD和DataFrame成为唯一的API接口

 

[/warningbox]

[/infobox]

[infobox title=”Spark初体验”]

SparkSQL驱动为SparkSession
SparkSession可以执行SparkSQL也可以执行HiveSQL

//读取数据
val lineRDD= sc.textFile("hdfs://node01:8020/tt.txt").map(_.split(" "))
//实例样例类(类似于表的结构)
case class Person(id:Int, name:String, age:Int)
//遍历数据,将数据填充到样例类中
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
//将RDD转换成DataFrame
val personDF = personRDD.toDF
//查看数据
personDF.show
//输出表结构
personDF.printSchema
//将DataFrame注册为张表
personDF.createOrReplaceTempView("t_person")
//通过SQL语句进行查询
spark.sql("select id,name from t_person where id > 3").show

使用SparkSession对象(spark)直接读取数据,读取文本文件是没有元数据信息,读取json文件有元数据信息。

[/infobox]

[infobox title=”创建DataSet”]

1.通过spark.createDataset创建Dataset

val fileRdd = sc.textFile("hdfs://node01:8020/person.txt") //RDD[String]
val ds1 = spark.createDataset(fileRdd) //DataSet[String] 
ds1.show

2.通RDD.toDS方法生成DataSet

case class Person(name:String, age:Int)
val data = List(Person("zhangsan",20),Person("lisi",30)) //List[Person]
val dataRDD = sc.makeRDD(data)
val ds2 = dataRDD.toDS //Dataset[Person]
ds2.showS

3.通过DataFrame.as[泛型]转化生成DataSet

case class Person(name:String, age:Long)
val jsonDF= spark.read.json("file:///export/servers/spark/examples/src/main/resources/people.json")
val jsonDS = jsonDF.as[Person] //DataSet[Person]
jsonDS.show

[/infobox]

[infobox title=”SparkSQL查询数据的形态”]

1、类似方法调用,领域特定语言(DSL)。
准备数据
val lineRDD= sc.textFile(“hdfs://node01:8020/tt.txt”).map(_.split(” “))
case class Person(id:Int, name:String, age:Int)
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
val personDF = personRDD.toDF
personDF.show
查询
personDF.select(“id”,”name”,”age”).show
personDF.select($”id”,$”name”,$”age”+1).show
personDF.select($”id”,$”name”,$”age”+1).filter($”age”>25).show

2、SQL语句
注册成一张表
personDF.createOrReplaceTempView(“t_person”)
查询
spark.sql(“select * from t_person”).show
spark.sql(“select * from personDFT “).show
spark.sql(“select * from personDFT where age >25″).show
总结:
1.DataFrame和DataSet都可以通过RDD来进行创建
2.也可以通过读取普通文本创建–注意:直接读取没有完整的约束,需要通过RDD+Schema
3.通过josn/parquet会有完整的约束
4.不管是DataFrame还是DataSet都可以注册成表,之后就可以使用SQL进行查询了! 也可以使用DSL!

[/infobox]

[infobox title=”通过IDEA编写SparkSQL代码”]

创建DataFrame/DataSet
第1种:指定列名添加Schema
第2种:通过StructType指定Schema
第3种:编写样例类,利用反射机制推断Schema

第一种:指定列名添加Schema

    //创建sparkSession
    val sparkSession: SparkSession = SparkSession.builder().master("local[*]").appName("wula").getOrCreate()
    //创建sparkContext
    val sc: SparkContext = sparkSession.sparkContext
    //读取数据并操作
    val fileRdd: RDD[String] = sc.textFile("file:///D:\\vedio\\2020\\4月\\04.13\\words.txt")
    val fileRddC: RDD[(String, String, Int)] = fileRdd.map(_.split(" ")).map(x => (x(0), x(1), x(2).toInt))
    //隐式转换
    import sparkSession.implicits._
    val frame: DataFrame = fileRddC.toDF("id", "name", "age")
    //查看数据
    frame.show()
    frame.printSchema()
    //关闭sparkSession sparkContext
    sc.stop()
    sparkSession.stop()

StructType指定Schema

 //创建sparkSession
    val sparkSession: SparkSession = SparkSession.builder().master("local[*]").appName("wula").getOrCreate()
    //创建sparkContext
    val sc: SparkContext = sparkSession.sparkContext
    //读取数据并操作
    val fileRdd: RDD[String] = sc.textFile("file:///D:\\vedio\\2020\\4月\\04.13\\words.txt")
    val fileRddC: RDD[Row] = fileRdd.map(_.split(" ")).map(x => Row(x(0), x(1), x(2).toInt))
    val schema: StructType = StructType(Seq(StructField("id", StringType, true), StructField("name", StringType, true), StructField("age", IntegerType, true)))
    //    val schema: StructType = StructType(List(StructField("id", StringType, true), StructField("name", StringType, true), StructField("age", IntegerType, true)))
    val dataDF: DataFrame = sparkSession.createDataFrame(fileRddC, schema)
    //查看数据
    dataDF.show()
    dataDF.printSchema()
    //关闭sparkSession sparkContext
    sc.stop()
    sparkSession.stop()

反射推断Schema

case class Person(word: String, letter: String, num: Int)

  def main(args: Array[String]): Unit = {
    //创建sparkSession
    val sparkSession: SparkSession = SparkSession.builder().master("local[*]").appName("wula").getOrCreate()
    //创建sparkContext
    val sc: SparkContext = sparkSession.sparkContext
    //读取数据并操作
    val fileRdd: RDD[String] = sc.textFile("file:///D:\\vedio\\2020\\4月\\04.13\\words.txt")
    val personRdd: RDD[Person] = fileRdd.map(_.split(" ")).map(x => Person(x(0), x(1), x(2).toInt))
    //隐式转换
    import sparkSession.implicits._
    val personDf = personRdd.toDF()
    //查看数据
    personDf.show()
    personDf.printSchema()
    //关闭sparkSession sparkContext
    sc.stop()
    sparkSession.stop()
  }

[/infobox]

//———————————————————————————————————————

[infobox title=”SparkSQL查询的方式”]

1、
//0.注册表
    personDF.createOrReplaceTempView("t_person")
    //1.查询所有数据
    spark.sql("select * from t_person").show()

	2、
//1.查询所有数据
    personDF.select("name","age")
    //2.查询age+1
    personDF.select($"name",$"age" + 1)

 

RDD DF DS(比DF多了类型) 三者之间的转化

import spark.implicits._
    //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
    //所以SparkSQL可以通过反射自动获取到并添加给DF
    //=========================相互转换======================
    //1.RDD-->DF
    val personDF: DataFrame = personRDD.toDF
    //2.DF-->RDD
    val rdd: RDD[Row] = personDF.rdd
    //3.RDD-->DS
    val DS: Dataset[Person] = personRDD.toDS()
    //4.DS-->RDD
    val rdd2: RDD[Person] = DS.rdd
    //5.DF-->DS(DS比DF多了类型)
    val DS2: Dataset[Person] = personDF.as[Person]
    //6.DS-->DF
    val DF: DataFrame = DS2.toDF()

.toDF
.rdd
.toDS
    .as[Person]

[/infobox]

//———————————————————————————————————————

[infobox title=”RDD DF DS相互转化”]

//RDD    DF    DS三者之间的转化

  //RDD转换成DF     DS
  //前提是调用隐式转换
    personRDD.toDF()
    personRDD.toDS()
  //DF转换成RDD     DS
    DataDF.rdd
    val DSValue: Dataset[Person] = DataDF.as[Person]
  //DS转换成RDD     DF
    DSValue.rdd
    DSValue.toDF()

    //总结
    // 1 转换成RDD  .rdd
    // 2 转换成DF   .toDF()
    // 3 转换成DS
    //   3.1-rdd     .toDS()
    //   3.2-DF     .as[Person]

[/infobox]

[warningbox title=”Spark SQL多数据源交互”]

多数据源交互指的是可以读取多种数据源,可以将数据保存成多种数据格式

[infobox title=”读取多种数据源”]

def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL")
.getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    spark.read.json("D:\\data\\output\\json").show()
    spark.read.csv("D:\\data\\output\\csv").toDF("id","name","age").show()
    spark.read.parquet("D:\\data\\output\\parquet").show()
    val prop = new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","root")
    spark.read.jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop).show()
    sc.stop()
    spark.stop()
  }

[/infobox]

[infobox title=”将数据保存成多种数据格式(personDF.write.XXX)”]

personDF.write.json("D:\\data\\output\\json")
    personDF.write.csv("D:\\data\\output\\csv")
    personDF.write.parquet("D:\\data\\output\\parquet")
    val prop = new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","root")
    personDF.write.mode(SaveMode.Overwrite).jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)

[/infobox]

[/warningbox]

[infobox title=”Spark SQL自定义函数”]

自定义函数
1.UDF(User-Defined-Function)
输入一行,输出一行 (将小写转为大写)
2.UDAF(User-Defined Aggregation Funcation)
输入多行,输出一行 (求班级平均身高)
3.UDTF(User-Defined Table-Generating Functions)
输入一行,输出多行 (数据拆分)

.[warningbox title=”UDF案例”]

def main(args: Array[String]): Unit = {
  //1、创建sparksession
  val spark: SparkSession = SparkSession.builder().master("local[*]").appName("demo01").getOrCreate()
  //2、创建sparkcontext
  val sc: SparkContext = spark.sparkContext
  //3、读取数据。并操作
  val ttRDD: RDD[String] = sc.textFile("file:///F:\\传智播客\\传智专修学院\\第二学期\\34\\05-Spark\\资料\\udf.txt")
  import spark.implicits._
  val UDFDS: Dataset[String] = ttRDD.toDS()

  //自定义函数
  spark.udf.register("toUpperAdd123",(str:String)=>{
    //根据业务需求对数据进行加工
  str.toUpperCase +" 123"

  })


  UDFDS.createOrReplaceTempView("UDF")
  //调用函数
  spark.sql("SELECT  value,toUpperAdd123(value) as length_10 FROM UDF").show()
  sc.stop()
  spark.stop()

}

[/warningbox]

[warningbox title=”UDAF案例”]

object Udaf {
  //编写计算平均工资的方法SalaryAvg
  class  SalaryAvg extends  UserDefinedAggregateFunction{
    //输入的数据的类型
    override def inputSchema: StructType = {
      StructType(StructField("input",LongType)::Nil)
    }
    //中间结果缓存的类型
    override def bufferSchema: StructType = {
      //sum缓存总金额
      //total缓存总次数
      StructType(List(StructField("sum",LongType),(StructField("total",LongType))))
    }

    //数据返回的类型
    override def dataType: DataType = {
        DoubleType
    }
    //是否有相同的输出   true
    override def deterministic: Boolean = {
      true
    }
    /*
    List(1,2,3,4,5).reduce((a,b)=>a+b)
    1  a=1  b=2
    2  a=3  b=3
    3  a=6  b=4
    4  a=10 b=5
    5  a=51
     */
    //数据的初始化
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
      //用于存储总金额
      buffer(0)=0L   //=> a
      //用于存储次数
      buffer(1)=0L   //=>b
    }
    //rdd是分区的    此方法是计算一个分区内的数据和    和数据数量
    /*
    {"name":"Michael","salary":3000}
    {"name":"Andy","salary":4500}

    {"name":"Justin","salary":3500}
    {"name":"Berta","salary":4000}
     */
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit ={
      //计算次分区的总金额
      buffer(0)=buffer.getLong(0)+input.getLong(0)
      //计算次分区的总数量
      buffer(1)=buffer.getLong(1)+1
    }

    //汇总所有分区内的总金额   和  总次数
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
      //汇聚所有分区的总金额
      buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)
      //汇聚所有分区的总次数
      buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)
    }

    //求最后的平均值
      //计算平均薪资
      //总金额/总数量
    override def evaluate(buffer: Row): Any = {
      buffer.getLong(0).toDouble/buffer.getLong(1).toDouble
    }
  }


  def main(args: Array[String]): Unit = {
    //1、创建sparksession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("demo01").getOrCreate()
    val JsonDatas: DataFrame = spark.read.json("file:///F:\\传智播客\\传智专修学院\\第二学期\\34\\05-Spark\\资料\\udaf.json")
    JsonDatas.createOrReplaceTempView("UDAFTable")
    //注册程UDAF函数
    spark.udf.register("SalaryAvg",new SalaryAvg)

    //计算平均工资的算法名称为SalaryAvg
    spark.sql("select SalaryAvg(salary)  from UDAFTable").show()
    spark.sql("select avg(salary)  from UDAFTable").show()

    spark.stop()
  }
}

[/warningbox]
[/infobox]
[collapse title=”开窗函数”]

开窗函数

开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据。

[warningbox title=”聚合函数和开窗函数”]

聚合函数是将多行变成一行,count,avg….
开窗函数是将一行变成多行;
聚合函数如果要显示其他的列必须将列加入到group by中
开窗函数可以不使用group by,直接将所有信息显示出来

[/warningbox]

[warningbox title=”开窗函数分类”]

1.聚合开窗函数
聚合函数(列) OVER(选项),这里的选项可以是PARTITION BY 子句,但不可以是 ORDER BY 子句。
2.排序开窗函数
排序函数(列) OVER(选项),这里的选项可以是ORDER BY 子句,也可以是 OVER(PARTITION BY 子句 ORDER BY 子句),但不可以是 PARTITION BY 子句。

[/warningbox]

 

[infobox title=”聚合开窗函数”]

OVER 关键字表示把聚合函数当成聚合开窗函数而不是聚合函数。

spark.sql(“select name, class, score, count(name) over() name_count from scores”).show

OVER 关键字后的括号中还可以添加选项用以改变进行聚合运算的窗口范围。
如果 OVER 关键字后的括号中的选项为空,则开窗函数会对结果集中的所有行进行聚合运算。
开窗函数的 OVER 关键字后括号中的可以使用 PARTITION BY 子句来定义行的分区来供进行聚合计算。与 GROUP BY 子句不同,PARTITION BY 子句创建的分区是独立于结果集的,创建的分区只是供进行聚合计算的,而且不同的开窗函数所创建的分区也不互相影响。

下面的 SQL 语句用于显示按照班级分组后每组的人数:
OVER(PARTITION BY class)表示对结果集按照 class 进行分区,并且计算当前行所属的组的聚合计算结果。
spark.sql(“select name, class, score, count(name) over(partition by class) name_count from scores”).show

[/infobox]
[dangerbox title=”排序开窗函数”]

[successbox title=”ROW_NUMBER顺序排序”]

row_number() over(order by score) as rownum 表示按score 升序的方式来排序,并得出排序结果的序号
注意:
在排序开窗函数中使用 PARTITION BY 子句需要放置在ORDER BY 子句之前。
spark.sql(“select name,successbox class, score, row_number() over(order by score) rank from scores”).show()

[/successbox]
[successbox title=”RANK跳跃排序”]
rank() over(order by score) as rank表示按 score升序的方式来排序,并得出排序结果的排名号。
这个函数求出来的排名结果可以并列(并列第一/并列第二),并列排名之后的排名将是并列的排名加上并列数
spark.sql(“select name, class, score, rank() over(order by score) rank from scores”).show()
[/successbox]
[successbox title=”DENSE_RANK连续排序”]
dense_rank() over(order by score) as dense_rank 表示按score 升序的方式来排序,并得出排序结果的排名号。
这个函数并列排名之后的排名是并列排名加1
spark.sql(“select name, class, score, dense_rank() over(order by score) rank from scores”).show()
[/successbox]
[successbox title=”NTILE分组排名”]
ntile(6) over(order by score)as ntile表示按 score 升序的方式来排序,然后 6 等分成 6 个组,并显示所在组的序号。
spark.sql(“select name, class, score, ntile(6) over(order by score) rank from scores”).show()
[/successbox]
[/dangerbox]
[/collapse]


已发布

分类

, , ,

作者:

标签

评论

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注