Spark 各项代码示例

[successbox title=”SparkSql向mysql建表 写数据 查询操作”]

  val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("wula")
    val sparkContext = new SparkContext(sparkConf)
    //设置日志等级
    sparkContext.setLogLevel("warn")

[infobox title=”MySql建表”]

 //jdbc的数据库驱动类
    val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?characterEncoding=UTF-8", "root", "123456")
    val sql = "CREATE TABLE test (\n  `id` VARCHAR(255) NOT NULL,\n  `title` VARCHAR(255) NOT NULL COMMENT '标题',\n  PRIMARY KEY (`id`)\n) ENGINE=INNODB DEFAULT CHARSET=utf8;"
    conn.prepareStatement(sql).execute()

[/infobox]

[infobox title=”MySql写入数据”]

case class user(id: String, title: String) extends Serializable
    //读取数据并去除脏数据
    val lineRdd: RDD[String] = sparkContext.textFile("D:\\data.txt").filter(x => x.split("\t").length == 2).filter(x => {
      var lineEmpty = true
      x.split("\t").foreach(y => if (y.trim == "") lineEmpty = false)
      lineEmpty
    })
    val userRdd: RDD[user] = lineRdd.map(x => {
      val dataLine = x.split("\t")
      user(dataLine(0), dataLine(1))
    })
    userRdd.foreach(data => {
      val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?characterEncoding=UTF-8", "root", "123456")
      val sql = "INSERT INTO `test` (`id`,`title`) VALUES (?,?);"
      //PreparedStatement预编译的,对于批量处理可以大大提高效率. 也叫JDBC存储过程
      val ps: PreparedStatement = conn.prepareStatement(sql)
      ps.setString(1, data.id)
      ps.setString(2, data.title)
      ps.execute()
      conn.close()
    })

[/infobox]

[infobox title=”MySql读取数据”]

  //与mysql数据库建立连接方法
def getConn(): Connection = {
      DriverManager.getConnection("jdbc:mysql://localhost:3306/test?characterEncoding=UTF-8", "root", "123456")
    }
    val value: JdbcRDD[Unit] = new JdbcRDD(sparkContext, getConn, "select* from test where id != ? and id != ?;", 0, 0, 1, x => {
      println(x.getString(1)+"_"+x.getString(2))
    }
    )
    value.collect()

[/infobox]

sparkContext.stop()

[/successbox]

[infobox title=”使用Structured Streaming读取Socket数据,把单词和单词的反转组成 json 格式写入到当前目录中的file文件夹中”]

val sparkSession: SparkSession = SparkSession.builder().master("local[*]").appName("wula").getOrCreate()
    val sparkContext: SparkContext = sparkSession.sparkContext
    sparkContext.setLogLevel("warn")
    val sData: DataFrame = sparkSession.readStream.option("host", "node01").option("port", "9999").format("socket").load()
    import sparkSession.implicits._
    val frame = sData.as[String].map(x => {
      (x,x.reverse)
    }).toDF("单词", "单词反转")
    frame.writeStream.format("json").outputMode("append").trigger(Trigger.ProcessingTime(0)).option("path","file").option("checkpointLocation", "file/checkpointLocation").start().awaitTermination()

[/infobox]

 

 

[successbox title=”Structured Streaming读取kafka数据 统计单词数量 写入mysql”]

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.streaming.Trigger

object rua01 {
  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = SparkSession.builder().master("local[*]").appName("wula").getOrCreate()
    val sparkContext: SparkContext = sparkSession.sparkContext
    sparkContext.setLogLevel("warn")
    val kafkaDF: DataFrame = sparkSession.readStream.format("kafka").option("kafka.bootstrap.servers", "node01:9092,node02:9092,node03:9092").option("subscribe", "test").load()
    import sparkSession.implicits._
    val kafkaDataS: Dataset[(String, String)] = kafkaDF.selectExpr("CAST(key as string)", "CAST(value as string)").as[(String, String)]
    val wordCount = kafkaDataS.flatMap(_._2.split(" ")).groupBy("value").count().sort($"count")

    val toMysql = new dataToMysql("jdbc:mysql://node01:3306/test?characterEncoding=UTF-8", "root", "123456")
    wordCount.writeStream.trigger(Trigger.ProcessingTime(0)).foreach(toMysql).outputMode("complete").start().awaitTermination()

  }

  class dataToMysql(url: String, user: String, password: String) extends ForeachWriter[Row] with Serializable {
    var connection: Connection = _
    var preparedStatement: PreparedStatement = _

    override def open(partitionId: Long, version: Long): Boolean = {
      connection = DriverManager.getConnection(url, user, password)
      true
    }

    override def process(value: Row): Unit = {
      val word = value.get(0).toString
      val wordC = value.get(1).toString.toInt
      println(word + "____" + wordC)
      val sql = "replace into `wordcount` (`id`,`word`,`wordcount`) values(null,?,?);"
      preparedStatement = connection.prepareStatement(sql)
      preparedStatement.setString(1, word)
      preparedStatement.setInt(2, wordC)
      preparedStatement.executeUpdate()
    }

    override def close(errorOrNull: Throwable): Unit = {
      if (connection != null) {
        connection.close()
      }
      if (preparedStatement != null) {
        preparedStatement.close()
      }
    }
  }

}

[/successbox]

 

[warningbox title=”使用Structured Streaming读取student_info文件夹写的csv文件”]

 val sparkSession = SparkSession.builder().appName("wula").master("local[*]").getOrCreate()
    val sparkContext = sparkSession.sparkContext
    sparkContext.setLogLevel("warn")

    //学号,姓名,性别,所属班级编号,入学日期
    val structType: StructType = new StructType().add("id", "string").add("name", "string").add("sex", "string").add("sclass", "string").add("sdata", "string")
    //    val dataFrame = sparkSession.readStream.schema(structType).option("header","true").csv("D:\\vedio\\2020\\4月\\04.17\\4.16号练习题50道2.0\\student_info")
    val dataFrame = sparkSession.readStream.format("csv").option("header", "true") //说明csv文件有标题
      .schema(structType).load("D:\\vedio\\2020\\4月\\04.17\\4.16号练习题50道2.0\\student_info")
    import sparkSession.implicits._
    //统计出文件中的男女生各有多少人
    //    val sdf = dataFrame.groupBy($"sex").count().sort($"count")
    //统计出姓“王”男生和女生的各有多少人
    //val sdf = dataFrame.filter($"name" like "王%").groupBy($"sex").count().sort($"count")
    //sdf.writeStream.format("console").outputMode("complete").trigger(Trigger.ProcessingTime(0)).start().awaitTermination()
    //--------
    dataFrame.createOrReplaceTempView("student")
    val sdf: DataFrame = sparkSession.sql("select sex,count(sex) from student where name like '王%' group by sex")
    sdf.writeStream.format("console").outputMode("complete").trigger(Trigger.ProcessingTime(0)).start().awaitTermination()

[/warningbox]

 

 

 

[successbox title=”Structured Streaming 读取文件统计字符数量”]

 def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName("StructuredStreaming02_TextFile")
      .config(new SparkConf())
      .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
	
	//添加结构信息
    val Schema: StructType = new StructType()
      .add("name","string")
      .add("age","integer")
      .add("hobby","string")
    //接收数据
    import spark.implicits._
    val dataDF: DataFrame =
      spark.readStream.schema(Schema).json("input/json")
    //处理数据
    val result: Dataset[Row] =
      dataDF.groupBy("hobby").count().sort($"count".desc)
    //4.输出结果
    result.writeStream
      .format("console")
      .outputMode("complete")
      .trigger(Trigger.ProcessingTime(0))
      .start()
      .awaitTermination()
    spark.stop()
  }

[/successbox]


已发布

分类

, , , , , ,

作者:

标签

评论

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注