Spark Streaming 是一个基于Spark Core
Structured Streaming是一个基于Spark SQL引擎的可扩展、容错的流处理引擎。
意义:使用SQL计算实时数据,使用简单易学的技术实现实时计算。减低了实时处理的学习、使用难度。
SparkSQL作用是计算离线数据。接收sql转换成算子,对数据进行计算。意义降低了使用、学习成本。
Spark Streaming 作用是计算实时数据。意义在于能够近实时的计算最新的数据(实时计算)
优点
不用去考虑是流式计算,还是批处理,只要使用同样的方式来编写计算操作即可,
提供了快速、可扩展、容错、端到端的一次性流处理。
Spark SQL内部数据抽象(DF/DS)
DF=rdd+结构表
DS=rdd+结构+数据类型=DF+数据类型表
【Structured Streaming是一个基于 表 的可扩展、容错的流处理引擎。】
1.简洁的模型
可以直接把一个流想象成是无限增长的表格。
2.一致的 API
和 Spark SQL 共用大部分 API,同时批处理和流处理程序还可以共用代码。
3.卓越的性能
4.多语言支持
Scala,Java,Python,R 和 SQL
一个流数据本质上就是一个持续增长的动态表格,最新的数据会源源不断写入表格的尾部。
对动态数据源进行实时查询,本质上就是对实时表的执行的SQL 查询
数据查询方式
- 触发器(Trigger)
- 定期周期执行
一个流的输出有多种模式, 查询后的完整结果、查询相比的差异、追加最新的结果。
应用场景
将数据源映射为类似于关系数据库中的表,然后将经过计算得到的结果映射为另一张表,完全以结构化的方式去操作流式数据
WordCount案例图示
第一行表示从socket不断接收数据,
第二行可以看成是之前提到的“unbound table",
第三行为最终的wordCounts是结果集。
Socket数据实时计算
|
读取本地文件
|
数据的类型如下
- 输出文件、
- 输出kafka
- 二次计算
- 控制台
- 内存
集成kafka
def main(args: Array[String]): Unit = {
//1 创建sparksession
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName("StructStreaming_socket")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2 读物实时数据 数据(key value)不是字符串
val kafkaDatasRow: DataFrame = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers","node01:9092,node02:9092,node03:9092")
.option("subscribe","18BD34")
.load()
import spark.implicits._
//数据类型转换
val kafkaDatasString: Dataset[(String, String)] =
kafkaDatasRow.selectExpr("CAST(key AS String)","CAST(value AS String)").as[(String,String)]
//3 数据预处理和数据计算
val Word: Dataset[String] = kafkaDatasString.flatMap(a=>a._2.split(" "))
//使用DSL (SQL)对数据进行计算
val StructWordCount: Dataset[Row] = Word.groupBy("value").count().sort($"count")
//4 输出(启动-等待关闭)
StructWordCount.writeStream
.trigger(Trigger.ProcessingTime(0)) //尽快执行
.format("console") //数据输出到控制台
.outputMode("complete") //输出所有数据
.start() //开始计算
.awaitTermination() //=等待关闭
}
StructStreaming 集成kafka集成mysql
def main(args: Array[String]): Unit = {
//1 创建sparksession
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName("StructStreaming_socket")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2 读物实时数据 数据(key value)不是字符串
val kafkaDatasRow: DataFrame = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers","node01:9092,node02:9092,node03:9092")
.option("subscribe","18BD12")
.load()
import spark.implicits._
val kafkaDatasString: Dataset[(String, String)] =
kafkaDatasRow.selectExpr("CAST(key AS String)","CAST(value AS String)").as[(String,String)]
//3 数据预处理和数据计算
val Word: Dataset[String] = kafkaDatasString.flatMap(a=>a._2.split(" "))
//使用DSL (SQL)对数据进行计算
//计算完毕的结果有两个数据1 单词,2 数量
val StructWordCount: Dataset[Row] = Word.groupBy("value").count().sort($"count")
var intoMysql=new intoMysql("jdbc:mysql://node02:3306/bigdata?characterEncoding=UTF-8", "root", "123456")
//4 输出(启动-等待关闭)
StructWordCount.writeStream
.trigger(Trigger.ProcessingTime(0)) //尽快执行
.foreach(intoMysql)
// .format("console") //数据输出到控制台
.outputMode("complete") //输出所有数据
.start() //开始计算
.awaitTermination() //=等待关闭
}
//编写数据更新/入到mysql的class
class intoMysql(url:String,username:String,pass:String) extends ForeachWriter[Row] with Serializable{
//数据库连接 Connection类型引入java.sql.{Connection, DriverManager, PreparedStatement}
var connection:Connection = _ //_表示占位符,后面会给变量赋值
//设置sql
var preparedStatement: PreparedStatement = _
//打开数据库连接
override def open(partitionId: Long, version: Long): Boolean = {
//通过DriverManager (驱动管理)获得连接
connection= DriverManager.getConnection(url,username,pass)
//连接成功返回true
true
}
//数据处理方法(数据库中没有的数据直接插入,以有的数据更新)
//这里的 value: Row 就是前面计算的1 单词,2 数量
override def process(value: Row): Unit = {
//获取数据中的单词
var word=value.get(0).toString
//获取数据中单词的数量
var count=value.get(1).toString.toInt
println("Word: "+word+" count: "+count)
//将数据写入Mysql
//replace 没有数据就插入。有数据就更新
var sql ="replace into t_word_count (id,word,count) values (Null,?,?)"
preparedStatement=connection.prepareStatement(sql);
preparedStatement.setString(1,word)
preparedStatement.setInt(2,count)
preparedStatement.executeUpdate()
}
//数据写入完毕后,关闭连接
override def close(errorOrNull: Throwable): Unit = {
if (connection!=null){
connection.close()
}
if (preparedStatement!=null){
preparedStatement.close()
}
}
}
文章评论