文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

彻底搞清 Flink 中的 Window 机制

2024-12-02 18:03

关注

在这种情况下,我们必须定义一个窗口(window),用来收集最近1分钟内的数据,并对这个窗口内的数据进行计算

二、Window的分类

2.1 按照time和count分类

time-window:时间窗口:根据时间划分窗口,如:每xx分钟统计最近xx分钟的数据

count-window:数量窗口:根据数量划分窗口,如:每xx个数据统计最近xx个数据

2.2 按照slide和size分类

窗口有两个重要的属性: 窗口大小size和滑动间隔slide,根据它们的大小关系可分为:

tumbling-window:滚动窗口:size=slide,如:每隔10s统计最近10s的数据

sliding-window:滑动窗口:size>slide,如:每隔5s统计最近10s数据

注意:当size

小结

按照上面窗口的分类方式进行组合,可以得出如下的窗口:

注意:Flink还支持一个特殊的窗口:Session会话窗口,需要设置一个会话超时时间,如30s,则表示30s内没有数据到来,则触发上个窗口的计算

三、WindowAPI

3.1 window和windowAll

使用keyby的流,应该使用window方法

未使用keyby的流,应该调用windowAll方法

区别:

Window算子:是可以设置并行度的

WindowAll 算子:并行度始终为1

3.2 WindowAssigner

Windows Assigner的作用是指定窗口的类型,定义如何将数据流分配到一个或者多个窗口,API中通过window (WindowsAssigner assigner)指定。在Flink中支持两种类型的窗口,一种是基于时间的窗口(TimeWindow),另一种是基于数量的窗口(countWindow)。窗口所表现出的类型特性取决于window assigner的定义。

Flink底层Window模型仅有TimeWindow以及GlobalWindow。

Flink提供了很多各种场景用的WindowAssigner:

如果需要自定制数据分发策略,则可以实现一个 class,继承自 WindowAssigner。

3.3 evictor

evictor 主要用于做一些数据的自定义操作,可以在执行用户代码之前,也可以在执行

用户代码之后,更详细的描述可以参考org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter两个方法。

Flink 提供了如下三种通用的 evictor:

CountEvictor 保留指定数量的元素

TimeEvictor 设定一个阈值 interval,删除所有不再 max_ts - interval 范围内的元

素,其中 max_ts 是窗口内时间戳的最大值。

DeltaEvictor 通过执行用户给定的 DeltaFunction 以及预设的 theshold,判断是否删 除一个元素。

3.4 trigger

trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的trigger,

如果默认的 trigger 不能满足你的需求,则可以自定义一个类,继承自Trigger 即可,我们详细描述下 Trigger 的接口以及含义:

onEventTime() 当 event-time timer 被触发的时候会调用

onElement() 每次往 window 增加一个元素的时候都会触发

onMerge() 对两个 `rigger 的 state 进行 merge 操作

clear() window 销毁的时候被调用

上面的接口中前三个会返回一个 TriggerResult, TriggerResult 有如下几种可能的选 择:

四、WindowAPI调用案例示例

4.1 基于时间的滚动和滑动窗口

测试数据

  1. 信号灯编号和通过该信号灯的车的数量 
  2. 9,3 
  3. 9,2 
  4. 9,7 
  5. 4,9 
  6. 2,6 
  7. 1,5 
  8. 2,3 
  9. 5,7 
  10. 5,4 

需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口

需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口

  1. package com.flink.source 
  2.  
  3. import org.apache.flink.api.common.functions.MapFunction 
  4. import org.apache.flink.streaming.api.scala._ 
  5. import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingProcessingTimeWindows} 
  6. import org.apache.flink.streaming.api.windowing.time.Time
  7.  
  8.  
  9. object WindowDemo_TimeWindow { 
  10.   def main(args: Array[String]): Unit = { 
  11.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  12.  
  13.     val socketData = env.socketTextStream("192.168.100.101", 9999) 
  14.     val socketMap = socketData.map(new MapFunction[String, CartInfo]() { 
  15.       override def map(t: String): CartInfo = { 
  16.         val arr = t.split(","
  17.         CartInfo(arr(0), arr(1).toInt) 
  18.       } 
  19.     }) 
  20.     //需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量 
  21.     val result = socketMap.keyBy(_.sensorId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("count"
  22.     //需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量 
  23.     val result2 = socketMap.keyBy(_.sensorId).window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(10))).sum("count"
  24.     result.print() 
  25.     result2.print() 
  26.     env.execute("winds"
  27.   } 
  28.  
  29.  
  30. case class CartInfo(var sensorId: String, var countInt

4.2 基于数量的滚动和滑动窗口

测试数据

  1. 信号灯编号和通过该信号灯的车的数量 
  2. 9,3 
  3. 9,2 
  4. 9,7 
  5. 4,9 
  6. 2,6 
  7. 1,5 
  8. 2,3 
  9. 5,7 
  10. 5,4 

需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口

需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口

  1. package com.flink.source 
  2.  
  3. import org.apache.flink.api.common.functions.MapFunction 
  4. import org.apache.flink.streaming.api.scala._ 
  5.  
  6.  
  7. object WindowDemo_CountWindow { 
  8.   def main(args: Array[String]): Unit = { 
  9.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  10.     val socketData = env.socketTextStream("192.168.100.101", 9999) 
  11.     val socketMap = socketData.map(new MapFunction[String, CartInfo] { 
  12.       override def map(t: String): CartInfo = { 
  13.         val arr = t.split(","
  14.         CartInfo(arr(0), arr(1).toInt) 
  15.       } 
  16.     }) 
  17.      // 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计 
  18.     val result = socketMap.keyBy(_.sensorId).countWindow(5L).sum("count"
  19.      // 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计 
  20.     val result2 = socketMap.keyBy(_.sensorId).countWindow(5L,3L).sum("count"
  21.     result.print("result"
  22.     result2.print("result2"
  23.     env.execute() 
  24.  
  25.   } 
  26. case class CartInfo(var sensorId: String, var countInt

case class CartInfo(var sensorId: String, var count: Int)

4.3 会话窗口

测试数据

  1. 信号灯编号和通过该信号灯的车的数量 
  2. 9,3 
  3. 9,2 
  4. 9,7 
  5. 4,9 
  6. 2,6 
  7. 1,5 
  8. 2,3 
  9. 5,7 
  10. 5,4 

设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算

  1. package com.flink.source 
  2.  
  3. import org.apache.flink.api.common.functions.MapFunction 
  4. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator 
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment 
  6. import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows 
  7. import org.apache.flink.streaming.api.windowing.time.Time 
  8.  
  9.   
  10.  
  11. object WindowDemo_SessionWindow { 
  12.   def main(args: Array[String]): Unit = { 
  13.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  14.  
  15.     val socketData = env.socketTextStream("192.168.100.101", 9999) 
  16.     val socketMap: SingleOutputStreamOperator[CartInfo] = socketData.map(new MapFunction[String, CartInfo]() { 
  17.       override def map(t: String): CartInfo = { 
  18.         val arr = t.split(","
  19.         CartInfo(arr(0), arr(1).toInt) 
  20.       } 
  21.     }) 
  22.     //设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算 
  23.     val result = socketMap.keyBy(0) 
  24.       .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) 
  25.       .sum("count"
  26.     result.print() 
  27.     env.execute("winds"
  28.   } 
  29.  
  30.  
  31. case class CartInfo(var sensorId: String, var countInt

 

来源: 大数据老哥内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯