文章详情

短信预约信息系统项目管理师 报名、考试、查分时间动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Flink DataStream API

2019-07-15 02:05

关注

Flink DataStream API

1.  API基本概念

Flink程序可以对分布式集合进行转换(例如: filtering, mapping, updating state, joining, grouping, defining windows, aggregating)

集合最初是从源创建的(例如,从文件、kafka主题或本地内存集合中读取)

结果通过sink返回,例如,可以将数据写入(分布式)文件,或者写入标准输出(例如,命令行终端)

根据数据源的类型(有界或无界数据源),可以编写批处理程序流处理程序,其中使用DataSet API进行批处理,并使用DataStream API进行流处理

Flink有特殊的类DataSetDataStream来表示程序中的数据。在DataSet的情况下,数据是有限的,而对于DataStream,元素的数量可以是无限的。 

Flink程序看起来像转换数据集合的常规程序。每个程序都包含相同的基本部分:

 

为了方便演示,先创建一个项目,可以从maven模板创建,例如:

mvn archetype:generate 
      -DarchetypeGroupId=org.apache.flink 
      -DarchetypeArtifactId=flink-quickstart-java 
      -DarchetypeVersion=1.10.0 
      -DgroupId=com.cjs.example 
      -DartifactId=flink-quickstart 
      -Dversion=1.0.0-SNAPSHOT 
      -Dpackage=com.cjs.example.flink 
      -DinteractiveMode=false

也可以直接创建SpringBoot项目,自行引入依赖:

<dependency>
    <groupId>org.apache.flinkgroupId>
    <artifactId>flink-javaartifactId>
    <version>1.10.0version>
    <scope>providedscope>
dependency>
<dependency>
    <groupId>org.apache.flinkgroupId>
    <artifactId>flink-streaming-java_2.11artifactId>
    <version>1.10.0version>
    <scope>providedscope>
dependency>
<dependency>
    <groupId>org.apache.flinkgroupId>
    <artifactId>flink-connector-kafka-0.10_2.11artifactId>
    <version>1.10.0version>
dependency>

StreamExecutionEnvironment是所有Flink程序的基础。你可以在StreamExecutionEnvironment上使用以下静态方法获得一个:

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(String host, int port, String... jarFiles)

通常,只需要使用getExecutionEnvironment()即可,因为该方法会根据上下文自动推断出当前的执行环境

从文件中读取数据,例如:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream text = env.readTextFile("file:///path/to/file");

对DataStream应用转换,例如:

DataStream input = ...;

DataStream parsed = input.map(new MapFunction() {
    @Override
    public Integer map(String value) {
        return Integer.parseInt(value);
    }
});

通过创建一个sink将结果输出,例如:

writeAsText(String path)

print()

最后,调用StreamExecutionEnvironment上的execute()执行:

//  Triggers the program execution
env.execute();

//  Triggers the program execution asynchronously
final JobClient jobClient = env.executeAsync();
final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();

下面通过单词统计的例子来加深对这一流程的理解,WordCount程序之于大数据就相当于是HelloWorld之于Java,哈哈哈

package com.cjs.example.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


public class WordCount {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet text = env.readTextFile("/Users/asdf/Desktop/input.txt");
        DataSet> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .groupBy(0)
                        .sum(1);
        
        counts.writeAsCsv("/Users/asdf/Desktop/aaa", "
", " ");
        env.execute();
    }

    static class Tokenizer implements FlatMapFunction> {
        @Override
        public void flatMap(String value, Collector> out) throws Exception {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
}

为Tuple定义keys

Python中也有Tuple(元组)

DataStream> input = // [...]
KeyedStream,Tuple> keyed = input.keyBy(0)

元组按第一个字段(整数类型的字段)分组

还可以使用POJO的属性来定义keys,例如:

// some ordinary POJO (Plain old Java Object)
public class WC {
  public String word;
  public int count;
}
DataStream words = // [...]
DataStream wordCounts = words.keyBy("word").window();

先来了解一下KeyedStream

因此可以通过KeySelector方法来自定义

// some ordinary POJO
public class WC {public String word; public int count;}
DataStream words = // [...]
KeyedStream keyed = words
  .keyBy(new KeySelector() {
     public String getKey(WC wc) { return wc.word; }
   });

如何指定转换方法呢?

方式一:匿名内部类

data.map(new MapFunction () {
    public Integer map(String value) { return Integer.parseInt(value); }
});

方式二:Lamda

data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);

2.  DataStream API

下面这个例子,每10秒钟统计一次来自Web Socket的单词次数

package com.cjs.example.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream> dataStream = env.socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(10))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    static class Splitter implements FlatMapFunction> {
        @Override
        public void flatMap(String value, Collector> out) throws Exception {
            String[] words = value.split("\W+");
            for (String word : words) {
                out.collect(new Tuple2(word, 1));
            }
        }
    }
}

为了运行此程序,首先要在终端启动一个监听

nc -lk 9999

 

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/datastream_api.html 

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯