小结
按照上面窗口的分类方式进行组合,可以得出如下的窗口:
- 基于时间的滚动窗口tumbling-time-window--用的较多
- 基于时间的滑动窗口sliding-time-window--用的较多
- 基于数量的滚动窗口tumbling-count-window--用的较少
- 基于数量的滑动窗口sliding-count-window--用的较少
注意:Flink还支持一个特殊的窗口:Session会话窗口,需要设置一个会话超时时间,如30s,则表示30s内没有数据到来,则触发上个窗口的计算
三、WindowAPI
3.1 window和windowAll
使用keyby的流,应该使用window方法
未使用keyby的流,应该调用windowAll方法
区别:
Window算子:是可以设置并行度的
WindowAll 算子:并行度始终为1
3.2 WindowAssigner
Windows Assigner的作用是指定窗口的类型,定义如何将数据流分配到一个或者多个窗口,API中通过window (WindowsAssigner assigner)指定。在Flink中支持两种类型的窗口,一种是基于时间的窗口(TimeWindow),另一种是基于数量的窗口(countWindow)。窗口所表现出的类型特性取决于window assigner的定义。
Flink底层Window模型仅有TimeWindow以及GlobalWindow。
Flink提供了很多各种场景用的WindowAssigner:
如果需要自定制数据分发策略,则可以实现一个 class,继承自 WindowAssigner。
3.3 evictor
evictor 主要用于做一些数据的自定义操作,可以在执行用户代码之前,也可以在执行
用户代码之后,更详细的描述可以参考org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter两个方法。
Flink 提供了如下三种通用的 evictor:
CountEvictor 保留指定数量的元素
TimeEvictor 设定一个阈值 interval,删除所有不再 max_ts - interval 范围内的元
素,其中 max_ts 是窗口内时间戳的最大值。
DeltaEvictor 通过执行用户给定的 DeltaFunction 以及预设的 theshold,判断是否删 除一个元素。
3.4 trigger
trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的trigger,
如果默认的 trigger 不能满足你的需求,则可以自定义一个类,继承自Trigger 即可,我们详细描述下 Trigger 的接口以及含义:
onEventTime() 当 event-time timer 被触发的时候会调用
onElement() 每次往 window 增加一个元素的时候都会触发
onMerge() 对两个 `rigger 的 state 进行 merge 操作
clear() window 销毁的时候被调用
上面的接口中前三个会返回一个 TriggerResult, TriggerResult 有如下几种可能的选 择:
- CONTINUE 不做任何事情
- FIRE 触发 window
- PURGE 清空整个 window 的元素并销毁窗口
- PURGE 清空整个 window 的元素并销毁窗口
四、WindowAPI调用案例示例
4.1 基于时间的滚动和滑动窗口
测试数据
- 信号灯编号和通过该信号灯的车的数量
- 9,3
- 9,2
- 9,7
- 4,9
- 2,6
- 1,5
- 2,3
- 5,7
- 5,4
需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口
需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口
- package com.flink.source
-
- import org.apache.flink.api.common.functions.MapFunction
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingProcessingTimeWindows}
- import org.apache.flink.streaming.api.windowing.time.Time;
-
-
- object WindowDemo_TimeWindow {
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- val socketData = env.socketTextStream("192.168.100.101", 9999)
- val socketMap = socketData.map(new MapFunction[String, CartInfo]() {
- override def map(t: String): CartInfo = {
- val arr = t.split(",")
- CartInfo(arr(0), arr(1).toInt)
- }
- })
- //需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量
- val result = socketMap.keyBy(_.sensorId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("count")
- //需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量
- val result2 = socketMap.keyBy(_.sensorId).window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(10))).sum("count")
- result.print()
- result2.print()
- env.execute("winds")
- }
-
- }
-
- case class CartInfo(var sensorId: String, var count: Int)
4.2 基于数量的滚动和滑动窗口
测试数据
- 信号灯编号和通过该信号灯的车的数量
- 9,3
- 9,2
- 9,7
- 4,9
- 2,6
- 1,5
- 2,3
- 5,7
- 5,4
需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
- package com.flink.source
-
- import org.apache.flink.api.common.functions.MapFunction
- import org.apache.flink.streaming.api.scala._
-
-
- object WindowDemo_CountWindow {
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val socketData = env.socketTextStream("192.168.100.101", 9999)
- val socketMap = socketData.map(new MapFunction[String, CartInfo] {
- override def map(t: String): CartInfo = {
- val arr = t.split(",")
- CartInfo(arr(0), arr(1).toInt)
- }
- })
- // 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计
- val result = socketMap.keyBy(_.sensorId).countWindow(5L).sum("count")
- // 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计
- val result2 = socketMap.keyBy(_.sensorId).countWindow(5L,3L).sum("count")
- result.print("result")
- result2.print("result2")
- env.execute()
-
- }
- }
- case class CartInfo(var sensorId: String, var count: Int)
case class CartInfo(var sensorId: String, var count: Int)
4.3 会话窗口
测试数据
- 信号灯编号和通过该信号灯的车的数量
- 9,3
- 9,2
- 9,7
- 4,9
- 2,6
- 1,5
- 2,3
- 5,7
- 5,4
设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算
- package com.flink.source
-
- import org.apache.flink.api.common.functions.MapFunction
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
- import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows
- import org.apache.flink.streaming.api.windowing.time.Time
-
-
-
- object WindowDemo_SessionWindow {
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- val socketData = env.socketTextStream("192.168.100.101", 9999)
- val socketMap: SingleOutputStreamOperator[CartInfo] = socketData.map(new MapFunction[String, CartInfo]() {
- override def map(t: String): CartInfo = {
- val arr = t.split(",")
- CartInfo(arr(0), arr(1).toInt)
- }
- })
- //设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算
- val result = socketMap.keyBy(0)
- .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
- .sum("count")
- result.print()
- env.execute("winds")
- }
- }
-
-
- case class CartInfo(var sensorId: String, var count: Int)