文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

flink怎么使用Event_time处理实时数据

2023-06-02 16:39

关注

本篇内容主要讲解“flink怎么使用Event_time处理实时数据”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“flink怎么使用Event_time处理实时数据”吧!

//flink中关于时间的三个概念//event time:数据产生的时间//processing time:处理数据的时间 即操作数据的之间 比如一个flink在scala中的map函数处理数据时//ingest time:摄取数据时间,在一个streaming程序中 一个时间段收集数据的时间//而evet time在处理实时数据时是比较有用的,例如在由于网络的繁忙的原因,某些数据未能按时到达,假设迟到了30min,//而我们定义的最大延迟不能超过十分钟,那么一些数据包含了超时的数据那么这些数据是不会在这次操作中处理的而是会//丢弃掉
//kafka生产者代码package kafka.partition.test;import java.util.HashMap;import java.util.Map;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;public class PartitionProducer {public static void main(String[] args) {Map<String,Object> props = new HashMap<>();props.put("acks", "1");props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("bootstrap.servers", "bigdata01:9092");String topic = "event_time";KafkaProducer<String, String> producer = new KafkaProducer<>(props);for(int i = 0 ; i <= 20;i++) {//flink的watermarkassginer里面定义的超时时间是5000毫秒long mills = System.currentTimeMillis();if(i%3==0) {//数据的event time放在字符串的开头 以空格分割//kafka event_time topic的0分区超时4000毫秒String line = (mills-4000)+" "+"partition-0--this is a big +" +i;ProducerRecord< String,String> record = new ProducerRecord<String, String>(topic, new Integer(0), null, i+"", line);producer.send(record);}else if(i%3==1) {//kafka event_time topic的1分区超时5000毫秒String line = (mills-5000)+" "+"partition-1--this is a big +" +i;ProducerRecord< String,String> record = new ProducerRecord<String, String>(topic, new Integer(1), null, i+"", line);producer.send(record);}else if(i%3==2) {//kafka event_time topic的2分区超时8000毫秒String line = (mills-8000)+" "+"partition-2--this is a big +" +i;ProducerRecord< String,String> record = new ProducerRecord<String, String>(topic, new Integer(2), null, i+"", line);producer.send(record);}}producer.close();}}
//自定义的TimestampsAndWatermarkspackage flink.streamingimport org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarksimport org.apache.flink.streaming.api.watermark.Watermarkclass CustomWaterMarks extends AssignerWithPeriodicWatermarks[String]{    //超时时间  val maxOutOrderness = 5000l  //flink过一段时间便会调一次该函数获取水印  def getCurrentWatermark():Watermark ={    val  sysMilssecons =  System.currentTimeMillis()     new Watermark(sysMilssecons-maxOutOrderness)       }  //每条记录多会调用 来获得even time 在生产的数据中 even_time放在字符串的第一个字段 用空格分割  def extractTimestamp(element: String,previousElementTimestamp: Long): Long = {   ((element.split(" ")).head).toLong  }}
package flink.streamingimport java.util.Propertiesimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.sink.RichSinkFunctionimport org.apache.flink.streaming.api.CheckpointingModeimport org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarksobject StreamWithEventTimeAndWaterMarks {    def main(args: Array[String]): Unit = {    val kafkaProps = new Properties()    //kafka的一些属性    kafkaProps.setProperty("bootstrap.servers", "bigdata01:9092")    //所在的消费组    kafkaProps.setProperty("group.id", "group2")    //获取当前的执行环境    val evn = StreamExecutionEnvironment.getExecutionEnvironment    //配制处理数据的时候使用event time    evn.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)    //kafka的consumer,test1是要消费的topic    val kafkaSource = new FlinkKafkaConsumer[String]("event_time",new SimpleStringSchema,kafkaProps)    //添加自定义的 TimestampsAndWatermarks    kafkaSource.assignTimestampsAndWatermarks(new CustomWaterMarks)    //设置从最新的offset开始消费    //kafkaSource.setStartFromGroupOffsets()    kafkaSource.setStartFromLatest()    //自动提交offset    kafkaSource.setCommitOffsetsOnCheckpoints(true)        //flink的checkpoint的时间间隔    //evn.enableCheckpointing(2000)    //添加consumer    val stream = evn.addSource(kafkaSource)    evn.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE)    //stream.setParallelism(3)    val text = stream.flatMap{ _.toLowerCase().split(" ").drop(1).filter { _.nonEmpty} }          .map{(_,1)}          .keyBy(0)          .timeWindow(Time.seconds(5))          .sum(1)          .map(x=>{(x._1,(new Integer(x._2)))})     text.print()     //启动执行              //text.addSink(new Ssinks())         evn.execute("kafkawd")        }  }
打印结果 partition-2中的数据因为超时没有出现1> (big,14)4> (is,14)1> (+0,1)2> (+1,1)3> (partition-1--this,7)4> (+15,1)3> (+12,1)1> (partition-0--this,7)3> (+6,1)1> (+16,1)4> (+10,1)2> (+18,1)4> (+7,1)3> (+3,1)2> (+9,1)3> (+19,1)2> (+13,1)3> (a,14)2> (+4,1)

到此,相信大家对“flink怎么使用Event_time处理实时数据”有了更深的了解,不妨来实际操作一番吧!这里是编程网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯