def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder().master("local[*]").appName("wula").getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
sparkContext.setLogLevel("warn")
val kafkaDF: DataFrame = sparkSession.readStream.format("kafka").option("kafka.bootstrap.servers", "node01:9092,node02:9092,node03:9092").option("subscribe", "test").load()
import sparkSession.implicits._
val kafkaDataS: Dataset[(String, String)] = kafkaDF.selectExpr("CAST(key as string)", "CAST(value as string)").as[(String, String)]
val wordCount = kafkaDataS.flatMap(_._2.split(" ")).groupBy("value").count().sort($"count")
val toMysql = new dataToMysql("jdbc:mysql://node01:3306/test?characterEncoding=UTF-8", "root", "123456")
wordCount.writeStream.trigger(Trigger.ProcessingTime(0)).foreach(toMysql).outputMode("complete").start().awaitTermination()
}
class dataToMysql(url: String, user: String, password: String) extends ForeachWriter[Row] with Serializable {
var connection: Connection = _
var preparedStatement: PreparedStatement = _
override def open(partitionId: Long, version: Long): Boolean = {
connection = DriverManager.getConnection(url, user, password)
true
}
override def process(value: Row): Unit = {
val word = value.get(0).toString
val wordC = value.get(1).toString.toInt
println(word + "____" + wordC)
val sql = "replace into `wordcount` (`id`,`word`,`wordcount`) values(null,?,?);"
preparedStatement = connection.prepareStatement(sql)
preparedStatement.setString(1, word)
preparedStatement.setInt(2, wordC)
preparedStatement.executeUpdate()
}
override def close(errorOrNull: Throwable): Unit = {
if (connection != null) {
connection.close()
}
if (preparedStatement != null) {
preparedStatement.close()
}
}
}
文章评论