[collapse title="分组聚合&分流操作"]
import org.apache.flink.api.common.functions.{FilterFunction, MapFunction, ReduceFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
object rua01 {
case class TemperatureSensorReanding(id: String, temperature: Double, time: Long)
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataStream: DataStream[String] = env.readTextFile("C:\\tool\\dev\\JAVA\\2020.09\\day0911_work01\\data\\sensor.txt")
val tData: DataStream[TemperatureSensorReanding] = dataStream.map(data => {
val arr = data.split(",")
TemperatureSensorReanding(arr(0), arr(1).toDouble, arr(2).toLong)
}) //.filter(new testFilterFunction)
//分组聚合 每个传感器最小值
tData.keyBy("id").minBy("temperature").print()
tData.keyBy("id").reduce((curData, newData) =>
TemperatureSensorReanding(curData.id, curData.temperature.min(newData.temperature), newData.time)
).print()
//自定义ReduceFunction进行处理
tData.keyBy("id").reduce(new testReduceFunction01).print()
//分流操作,传感器低温高温分为2条流
val splitStream: SplitStream[TemperatureSensorReanding] = tData.split(data => {
if (data.temperature > 30.0) Seq("high") else Seq("low")
})
val highStream: DataStream[TemperatureSensorReanding] = splitStream.select("high")
val lowStream: DataStream[TemperatureSensorReanding] = splitStream.select("low")
val allStream: DataStream[TemperatureSensorReanding] = splitStream.select("high", "low")
highStream.print("high")
lowStream.print("low")
allStream.print("all")
//合流 connect 数据类型可不相同 2条流进行connect
val warningStream: DataStream[(String, Double)] = highStream.map(data => (data.id, data.temperature))
val connectedStreams: ConnectedStreams[(String, Double), TemperatureSensorReanding] = warningStream.connect(lowStream)
//用coMap对数据分别进行处理
val coMapResultStream: DataStream[Any] = connectedStreams.map(
waringData => (waringData._1, waringData._2, "warn"),
lowTempData => (lowTempData.id, "healthy")
)
coMapResultStream.print("coMAp")
//Union合流 数据流必须为相同数据类型 可传入多条数据量
val lowhighStream: DataStream[TemperatureSensorReanding] = highStream.union(lowStream)
lowhighStream.print()
env.execute("test")
}
class testReduceFunction01 extends ReduceFunction[TemperatureSensorReanding] {
override def reduce(t: TemperatureSensorReanding, t1: TemperatureSensorReanding): TemperatureSensorReanding = {
TemperatureSensorReanding(t.id, t.temperature.min(t1.temperature), t1.time)
}
}
//自定义拦截器
class testFilterFunction extends FilterFunction[TemperatureSensorReanding] {
override def filter(value: TemperatureSensorReanding): Boolean = value.id.equals("a1")
}
//富函数 可获取运行时上下文,生命周期
class testRichMongoMapper extends RichMapFunction[TemperatureSensorReanding, String] {
//初始化操作 如数据库连接
override def open(parameters: Configuration): Unit = {
//getRuntimeContext
}
//收尾工作,如关闭连接 或清空状态
override def close(): Unit = super.close()
override def map(value: TemperatureSensorReanding): String = value.id + "Temperture"
}
class testMongoMapper extends MapFunction[TemperatureSensorReanding, String] {
override def map(value: TemperatureSensorReanding): String = value.id + "Temperture"
}
}
[/collapse]
[collapse title="自定义SourceFunction"]
package com.kami.demo02
import java.util.Date
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import scala.collection.immutable
import scala.util.Random
object rua04 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataStream: DataStream[TemperatureSensorReanding] = env.addSource(new SensorSource())
dataStream.print()
env.execute("test")
}
case class TemperatureSensorReanding(id: String, temperature: Double, time: Long)
//自定义SourceFunction
class SensorSource() extends SourceFunction[TemperatureSensorReanding] {
//自定义标识位flag,表示是否正常接受数据源数据
var running: Boolean = true
override def cancel(): Unit = running = false
override def run(ctx: SourceFunction.SourceContext[TemperatureSensorReanding]): Unit = {
//随机生成
val random = new Random()
var curTemp = 1.to(10).map(i => ("sersor_" + i, random.nextDouble() * 100))
while (running) {
curTemp = curTemp.map(
data => (data._1, data._2 + random.nextGaussian())
)
val curTime: Long = System.currentTimeMillis()
curTemp.foreach(data => ctx.collect(TemperatureSensorReanding(data._1, data._2, curTime)))
Thread.sleep(100)
}
}
}
}
[/collapse]
文章评论