这篇文章主要讲解了“flink连接消费kafka实例”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“flink连接消费kafka实例”吧!
package flink.streamingimport java.util.Propertiesimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.windowing.time.Timeobject StreamingTest { def main(args: Array[String]): Unit = { val kafkaProps = new Properties() //kafka的一些属性 kafkaProps.setProperty("bootstrap.servers", "bigdata01:9092") //所在的消费组 kafkaProps.setProperty("group.id", "group1") //获取当前的执行环境 val evn = StreamExecutionEnvironment.getExecutionEnvironment //kafka的consumer,test1是要消费的topic val kafkaSource = new FlinkKafkaConsumer[String]("test1",new SimpleStringSchema,kafkaProps) //设置从最新的offset开始消费 kafkaSource.setStartFromLatest() //自动提交offset kafkaSource.setCommitOffsetsOnCheckpoints(true) //flink的checkpoint的时间间隔 evn.enableCheckpointing(5000) //添加consumer val stream = evn.addSource(kafkaSource) stream.setParallelism(3) val text = stream.flatMap{ _.toLowerCase().split("\\W+")filter { _.nonEmpty} } .map{(_,1)} .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1) text.print() //启动执行 evn.execute("kafkawd") }}
//
pom.xml<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>hgs</groupId> <artifactId>flink_lesson</artifactId> <version>1.0.0</version> <packaging>jar</packaging> <name>flink_lesson</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.1</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core --><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.7.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.7.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.7.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 --><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>1.7.1</version></dependency><!-- https://mvnrepository.com/artifact/io.netty/netty-all --><dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.32.Final</version></dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.6</version> <configuration> <archive> <manifest> <!-- 我运行这个jar所运行的主类 --> <mainClass>hgs.flink_lesson.WordCount</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef> <!-- 必须是这样写 --> jar-with-dependencies </descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal> </goals><configuration><args><!-- <arg>-make:transitive</arg> --> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><!-- If you have classpath issue like NoDefClassError,... --><!-- useManifestOnlyJar>false</useManifestOnlyJar --><includes><include>***Suite.*</include></includes></configuration></plugin> </plugins> </build></project>
感谢各位的阅读,以上就是“flink连接消费kafka实例”的内容了,经过本文的学习后,相信大家对flink连接消费kafka实例这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是编程网,小编将为大家推送更多相关知识点的文章,欢迎关注!