文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Flink SQL 知其所以然:Deduplication去重以及如何获取最新状态操作

2024-12-01 19:19

关注

大家好,我是老羊,今天我们来学习 Flink SQL 中的 Deduplication 去重以及如何通过 Deduplication 操作获取最新的状态。

  1. Deduplication 定义(支持 Batch\Streaming):Deduplication 其实就是去重,也即上文介绍到的 TopN 中 row_number = 1 的场景,但是这里有一点不一样在于其排序字段一定是时间属性列,不能是其他非时间属性的普通列。在 row_number = 1 时,如果排序字段是普通列 planner 会翻译成 TopN 算子,如果是时间属性列 planner 会翻译成 Deduplication,这两者最终的执行算子是不一样的,Deduplication 相比 TopN 算子专门做了对应的优化,性能会有很大提升。
  2. 应用场景:比如上游数据发重了,或者计算 DAU 明细数据等场景,都可以使用 Deduplication 语法去做去重。
  3. SQL 语法标准:
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr [asc|desc]) AS rownum
FROM table_name)
WHERE rownum = 1

其中:

  1. 实际案例:

博主这里举两个案例:

-- 数据源:当每一个用户的等级初始化及后续变化的时候的数据,即用户等级变化明细数据。
CREATE TABLE source_table (
user_id BIGINT COMMENT '用户 id',
level STRING COMMENT '用户等级',
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)) COMMENT '事件时间戳',
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.level.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '1000000'
);
-- 数据汇:输出即每一个等级的用户数
CREATE TABLE sink_table (
level STRING COMMENT '等级',
uv BIGINT COMMENT '当前等级用户数',
row_time timestamp(3) COMMENT '时间戳'
) WITH (
'connector' = 'print'
);
-- 处理逻辑:
INSERT INTO sink_table
select
level
, count(1) as uv
, max(row_time) as row_time
from (
SELECT
user_id,
level,
row_time,
row_number() over(partition by user_id order by row_time) as rn
FROM source_table
)
where rn = 1
group by
level

输出结果:

+I[等级 1, 6928, 2021-1-28T22:34]
-I[等级 1, 6928, 2021-1-28T22:34]
+I[等级 1, 8670, 2021-1-28T22:34]
-I[等级 1, 8670, 2021-1-28T22:34]
+I[等级 1, 77287, 2021-1-28T22:34]
...

可以看到其有回撤数据。

其对应的 SQL 语义如下:

-- 数据源:原始日志明细数据
CREATE TABLE source_table (
user_id BIGINT COMMENT '用户 id',
name STRING COMMENT '用户姓名',
server_timestamp BIGINT COMMENT '用户访问时间戳',
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.name.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '10',
'fields.server_timestamp.min' = '1',
'fields.server_timestamp.max' = '100000'
);

-- 数据汇:根据 user_id 去重的第一条数据
CREATE TABLE sink_table (
user_id BIGINT,
name STRING,
server_timestamp BIGINT
) WITH (
'connector' = 'print'
);
-- 处理逻辑:
INSERT INTO sink_table
select user_id,
name,
server_timestamp
from (
SELECT
user_id,
name,
server_timestamp,
row_number() over(partition by user_id order by proctime) as rn
FROM source_table
)
where rn = 1

输出结果:

+I[1, 用户 1, 2021-1-28T22:34]
+I[2, 用户 2, 2021-1-28T22:34]
+I[3, 用户 3, 2021-1-28T22:34]
...

可以看到这个处理逻辑是没有回撤数据的。其对应的 SQL 语义如下:

注意:

在 Deduplication 关于是否会出现回撤流,博主总结如下:

  1. ⭐ Order by 事件时间 DESC:会出现回撤流,因为当前 key 下可能会有 比当前事件时间还大的数据。
  2. ⭐ Order by 事件时间 ASC:会出现回撤流,因为当前 key 下可能会有 比当前事件时间还小的数据。
  3. ⭐ Order by 处理时间 DESC:会出现回撤流,因为当前 key 下可能会有 比当前处理时间还大的数据。
  4. ⭐ Order by 处理时间 ASC:不会出现回撤流,因为当前 key 下不可能会有 比当前处理时间还小的数据。
来源:大数据羊说内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯