文章详情

短信预约信息系统项目管理师 报名、考试、查分时间动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

DataStream API(一)

2018-10-20 16:05

关注

DataStream API(一)

在了解DataStream API之前我们先来了解一下Flink API的构成。Flink API是分层的。由最底层的Stateful Stream Process到最顶层的SQL分为四层。如下图:

API-Level

DataStream API 顾名思义,就是DataStream类的API,DataStream表示Flink程序中的流式数据集合。它是一个包含重复项的不可变数据集合,这些数据可以是有界的也可以是无界的,处理他们的API是相同的。

DataStream是不可变的,这意味着一旦它们被创建,就不能添加或删除元素。也不能简单地检查内部的元素,而只能使用DataStream API(Transform)来处理它们。

Flink程序基本部分组成:

  1. 获得执行环境(Environment),
  2. 加载/创建初始数据(Source),
  3. 指定此数据的转换(Transform),
  4. 指定计算结果的存放位置(Sink),
  5. 触发程序执行(Execut)

下面我们一起来了解一下Flink DataStream的执行环境。

Environment

Flink的执行环境包括两种,分别是StreamExecutionEnvironment和ExecutionEnvironment,他们分别对应StreamData和DataSet。StreamData是流式数据集,DataSet是批量数据集。

StreamExecutionEnvironment是所有Flink流式处理程序的基础。它为我们提供了三种实例化的方法,分别是:

getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)

createLocalEnvironment()

这个方法是获取本地的执行环境。它有两个重载,分别是:

//parallelism表示并行度
createLocalEnvironment(int parallelism)
createLocalEnvironment(int parallelism, Configuration configuration)

createRemoteEnvironment(String host, int port, String... jarFiles)

这个方法是获取集群的执行环境。与createLocalEnvironment()类似,它也有两个重载,分别是:

createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles)
createRemoteEnvironment(String host, int port, Configuration clientConfig, String... jarFiles)

getExecutionEnvironment()

getExecutionEnvironment()可以自行判断我们当前程序的执行环境并为我们返回与之相对应的实例。换句话说,通常情况下我们不需要自己判断到底是使用createLocalEnvironment还是使用createRemoteEnvironment,一律用getExecutionEnvironment就OK了。

Environment实例

通过Environment实例我们可以做很多事情,比如:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//获取数据源
//当然获取数据源的方式有很多种,下面会一一介绍
env.readTextFile("FilePath");
//执行程序
env.Execute();
//...

Data Sources

Data Source的核心组件包括三个,分别是Split、SourceReader、SplitEnumerator。

具体的过程可以参考以下图片:

Source

下面我们来介绍一下几个常用的获取数据源的方式。

从集合中读取

ArrayList strList = new ArrayList();
strList.add("are");
strList.add("you");
strList.add("ok");
env.fromCollection(strList);

从文件中读取

DataStreamSource inputData = env.readTextFile("FilePath");

消费kafka中的数据

引入kafka依赖


    org.apache.flink
    flink-connector-kafka_2.11
    1.11.0

代码示例

//kafka配置
Properties pro = new Properties();
pro.setProperty("bootstrap.servers", "localhost:9092");
pro.setProperty("group.id", "consumer-group");
pro.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
pro.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
pro.setProperty("auto.offset.reset", "latest");
//消费kafka数据
DataStreamSource inputStream = env.addSource(new FlinkKafkaConsumer("topic", new SimpleStringSchema(), pro));

当然,出了kafka之外Flink还为我们提供了ElasticSearch、HDFS、RabbitMQ、JDBC等作为数据源的接口。此处就不一一介绍了。

自定义数据源

其实深入研究之后我们会发现,FlinkKafkaConsumer其实是实现了一个SourceFunction接口。so,我们可以通过实现SourceFunction的方式来自定义我们自己的数据源。

有了这个功能我们可以很轻松的模拟真实的业务场景。毕竟,绝大多数的项目在开发阶段并不会有真实的业务场景来提供数据源。

DataStreamSource inputStream = env.addSource(new MyDataSource());

private static class MyDataSource implements SourceFunction {
    private boolean running = true;

    public void run(SourceContext sourceContext) throws Exception {
        while(running){
            //读取数据,可以从cvs文件...自定义数据源读取数据
            Thread.sleep(100);
        }
    }
    public void cancel() {
        running = false;
    }
}
阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯