文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Flink SQL 知其所以然:Group 聚合操作

2024-12-13 22:35

关注

tumble window + key

那么这时候,小伙伴萌就会问到,我其实可以把窗口聚合的写法也转换为 Group 聚合,只需要把 Group 聚合的 Group By key 换成时间就行,那这两个聚合的区别到底在哪?

首先来举一个例子看看怎么将窗口聚合转换为 Group 聚合。假如一个窗口聚合是按照 1 分钟的粒度进行聚合,如下 SQL:

-- 数据源表
CREATE TABLE source_table (
-- 维度数据
dim STRING,
-- 用户 id
user_id BIGINT,
-- 用户
price BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
)

-- 数据汇表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
)

-- 数据处理逻辑
insert into sink_table
select dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 计算 uv 数
count(distinct user_id) as uv,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000 as window_start
from source_table
group by
dim,
-- 按照 Flink SQL tumble 窗口写法划分窗口
tumble(row_time, interval '1' minute)

转换为 Group 聚合的写法如下:

Group 聚合

-- 数据源表
CREATE TABLE source_table (
-- 维度数据
dim STRING,
-- 用户 id
user_id BIGINT,
-- 用户
price BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);

-- 数据汇表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
);

-- 数据处理逻辑
insert into sink_table
select dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 计算 uv 数
count(distinct user_id) as uv,
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
from source_table
group by
dim,
-- 将秒级别时间戳 / 60 转化为 1min
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)

确实没错,上面这个转换是一点问题都没有的。

但是窗口聚合和 Group by 聚合的差异在于:

本质区别:窗口聚合是具有时间语义的,其本质是想实现窗口结束输出结果之后,后续有迟到的数据也不会对原有的结果发生更改了,即输出结果值是定值(不考虑 allowLateness)。而 Group by 聚合是没有时间语义的,不管数据迟到多长时间,只要数据来了,就把上一次的输出的结果数据撤回,然后把计算好的新的结果数据发出。

运行层面:窗口聚合是和 时间 绑定的,窗口聚合其中窗口的计算结果触发都是由时间(Watermark)推动的。Group by 聚合完全由数据推动触发计算,新来一条数据去根据这条数据进行计算出结果发出;由此可见两者的实现方式也大为不同。

也是拿离线和实时做对比,Orders 为 kafka,target_table 为 Kafka,这个 SQL 生成的实时任务,在执行时,会生成三个算子:

数据源算子​(From Order):数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的Group 聚合算子,向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送,相同的 key 发到同一个 SubTask(并发) 中。

Group 聚合算子​(group by key + sum\count\max\min):接收到上游算子发的一条一条的数据,去状态 state 中找这个 key 之前的 sum\count\max\min 结果。如果有结果oldResult​,拿出来和当前的数据进行sum\count\max\min​ 计算出这个 key 的新结果newResult​,并将新结果[key, newResult]​ 更新到 state 中,在向下游发送新计算的结果之前,先发一条撤回上次结果的消息-[key, oldResult]​,然后再将新结果发往下游+[key, newResult]​;如果 state 中没有当前 key 的结果,则直接使用当前这条数据计算 sum\max\min 结果newResult​,并将新结果[key, newResult]​ 更新到 state 中,当前是第一次往下游发,则不需要先发回撤消息,直接发送+[key, newResult]。

数据汇算子(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中。

这个实时任务也是 24 小时一直在运行的,所有的算子在同一时刻都是处于 running 状态的。

特别注意:

如果这个 SQL 放在 Hive 中执行时,其中 Orders 为 Hive,target_table 也为 Hive,其也会生成三个相同的算子,但是其和实时任务的执行方式完全不同:

Group 聚合支持 Grouping sets、Rollup、Cube

Group 聚合也支持 Grouping sets、Rollup、Cube。

举一个 Grouping sets 的案例:

SELECT 
supplier_id
, rating
, product_id
, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
( supplier_id, product_id, rating ),
( supplier_id, product_id ),
( supplier_id, rating ),
( supplier_id ),
( product_id, rating ),
( product_id ),
( rating ),
( )
)

来源:大数据羊说内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯