文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Flink SQL 知其所以然之流 join 很难嘛???(上)

2024-12-02 16:31

关注

下面即是文章目录,也对应到本文的结论,小伙伴可以先看结论快速了解本文能给你带来什么帮助:

2.背景及应用场景介绍

在我们的日常场景中,应用最广的一种操作必然有 join 的一席之地,例如

计算曝光数据和点击数据的 CTR,需要通过唯一 id 进行 join 关联

事实数据关联维度数据获取维度,进而计算维度指标

上述场景,在离线数仓应用之广就不多说了。

那么,实时流之间的关联要怎么操作呢?

flink sql 为我们提供了四种强大的关联方式,帮助我们在流式场景中达到流关联的目的。如下图官网截图所示:

join

在实时数仓中,regular join 以及 interval join,以及两种 join 的结合使用是最常使用的。所以本文主要介绍这两种(太长的篇幅大家可能也不想看,所以之后的文章就以简洁,短为目标)。

3.先来一个实战案例

先来一个实际案例来看看在具体输入值的场景下,输出值应该长啥样。

场景:即常见的曝光日志流(show_log)通过 log_id 关联点击日志流(click_log),将数据的关联结果进行下发。

来一波输入数据:

曝光数据:

log_id timestamp show_params
1 2021-11-01 00:01:03 show_params
2 2021-11-01 00:03:00 show_params2
3 2021-11-01 00:05:00 show_params3

点击数据:

log_id timestamp click_params
1 2021-11-01 00:01:53 click_params
2 2021-11-01 00:02:01 click_params2

预期输出数据如下:

log_id timestamp show_params click_params
1 2021-11-01 00:01:00 show_params click_params
2 2021-11-01 00:01:00 show_params2 click_params2
3 2021-11-01 00:02:00 show_params3 null

熟悉离线 hive sql 的同学可能 10s 就写完上面这个 sql 了,如下 hive sql

  1. INSERT INTO sink_table 
  2. SELECT 
  3.     show_log.log_id as log_id, 
  4.     show_log.timestamp as timestamp
  5.     show_log.show_params as show_params, 
  6.     click_log.click_params as click_params 
  7. FROM show_log 
  8. LEFT JOIN click_log ON show_log.log_id = click_log.log_id; 

那么我们看看上述需求如果要以 flink sql 实现需要怎么做呢?

虽然不 flink sql 提供了 left join 的能力,但是在实际使用时,可能会出现预期之外的问题。下节详述。

4.flink sql join

4.1.flink sql

还是上面的案例,我们先实际跑一遍看看结果:

  1. INSERT INTO sink_table 
  2. SELECT 
  3.     show_log.log_id as log_id, 
  4.     show_log.timestamp as timestamp
  5.     show_log.show_params as show_params, 
  6.     click_log.click_params as click_params 
  7. FROM show_log 
  8. LEFT JOIN click_log ON show_log.log_id = click_log.log_id; 

flink web ui 算子图如下:

flink web ui

结果如下:

  1. +[1 | 2021-11-01 00:01:03 | show_params | null
  2.  
  3. -[1 | 2021-11-01 00:01:03 | show_params | null
  4.  
  5. +[1 | 2021-11-01 00:01:03 | show_params | click_params] 
  6.  
  7. +[2 | 2021-11-01 00:03:00 | show_params | click_params] 
  8.  
  9. +[3 | 2021-11-01 00:05:00 | show_params | null

从结果上看,其输出数据有 +,-,代表其输出的数据是一个 retract 流的数据。分析原因发现是,由于第一条 show_log 先于 click_log 到达, 所以就先直接发出 +[1 | 2021-11-01 00:01:03 | show_params | null],后面 click_log 到达之后,将上一次未关联到的 show_log 撤回, 然后将关联到的 +[1 | 2021-11-01 00:01:03 | show_params | click_params] 下发。

但是 retract 流会导致写入到 kafka 的数据变多,这是不可被接受的。我们期望的结果应该是一个 append 数据流。

为什么 left join 会出现这种问题呢?那就要从 left join 的原理说起了。

来定位到具体的实现源码。先看一下 transformations。

transformations

可以看到 left join 的具体 operator 是 org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator。

其核心逻辑就集中在 processElement 方法上面。并且源码对于 processElement 的处理逻辑有详细的注释说明,如下图所示。

StreamingJoinOperator#processElement

注释看起来逻辑比较复杂。我们这里按照 left join,inner join,right join,full join 分类给大家解释一下。

4.2.left join

首先是 left join,以上面的 show_log(左表) left join click_log(右表) 为例:

4.3.inner join

以上面的 show_log(左表) inner join click_log(右表) 为例:

首先如果 join xxx on 中的条件是等式则代表 join 是在相同 key 下进行的,join 的 key 即 show_log.log_id,click_log.log_id,相同 key 的数据会被发送到一个并发中进行处理。如果 join xxx on 中的条件是不等式,则两个流的 source 算子向 join 算子下发数据是按照 global 的 partition 策略进行下发的,并且 join 算子并发会被设置为 1,所有的数据会被发送到这一个并发中处理。

相同 key 下,当 show_log 来一条数据,如果 click_log 有数据:则 show_log 与 click_log 中的所有数据进行遍历关联一遍输出[+(show_log,click_log)]数据,并且把 show_log 保存到左表的状态中(以供后续 join 使用)。

相同 key 下,当 show_log 来一条数据,如果 click_log 中没有数据:则 show_log 不会输出数据,会把 show_log 保存到左表的状态中(以供后续 join 使用)。

相同 key 下,当 click_log 来一条数据,如果 show_log 有数据:则 click_log 与 show_log 中的所有数据进行遍历关联一遍输出[+(show_log,click_log)]数据,并且把 click_log 保存到右表的状态中(以供后续 join 使用)。

相同 key 下,当 click_log 来一条数据,如果 show_log 没有数据:则 click_log 不会输出数据,会把 click_log 保存到右表的状态中(以供后续 join 使用)。

4.4.right join

right join 和 left join 一样,只不过顺序反了,这里不再赘述。

4.5.full join

以上面的 show_log(左表) full join click_log(右表) 为例:

4.6.regular join 的总结

总的来说上述四种 join 可以按照以下这么划分。

inner join 会互相等,直到有数据才下发。

left join,right join,full join 不会互相等,只要来了数据,会尝试关联,能关联到则下发的字段是全的,关联不到则另一边的字段为 null。后续数据来了之后,发现之前下发过为没有关联到的数据时,就会做回撤,把关联到的结果进行下发

4.7.怎样才能解决 retract 导致数据重复下发到 kafka 这个问题呢?

既然 flink sql 在 left join、right join、full join 实现上的原理就是以这种 retract 的方式去实现的,就不能通过这种方式来满足业务了。

我们来转变一下思路,上述 join 的特点就是不会相互等,那有没有一种 join 是可以相互等待的呢。以 left join 的思路为例,左表在关联不到右表的时候,可以选择等待一段时间,如果超过这段时间还等不到再下发 (show_log,null),如果等到了就下发(show_log,click_log)。

interval join 闪亮登场。关于 interval join 是如何实现上述场景,及其原理实现,本篇的(下)会详细介绍,敬请期待。

5.总结与展望

源码公众号后台回复1.13.2 sql join 的奇妙解析之路获取。

本文主要介绍了 flink sql regular 的在满足 join 场景时存在的问题,并通过解析其实现说明了运行原理,主要包含下面两部分:

 

 

来源: 大数据羊说内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯