文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

一种基于布隆过滤器的大表计算优化方法

2024-11-30 12:28

关注

主要有以下挑战:

当前业内流行的优化方案

1.增加集群资源

优点:简单粗暴,对业务和数据开发人员友好,不用调整。

缺点:费钱,看你公司是否有钱。

2.采用增量计算

优点:可以在不大幅增加计算集群成本的情况下,完成日常计算任务。

缺点:对数据和业务都有一定要求,数据一般要求是日志类数据。或者具有一定的生命周期数据(历史数据可归档)。

问题场景和 Spark 算法分析

Spark 经典算法 SortMergeJoin(以大表间的 Join 分析为例)。

该算法也可以简化流程为: Map 一> Shuffle 一> Sort 一> Merge 一> Reduce

该算法的性能瓶颈主要在 Sort Merge Shuffle 阶段(红色流程部分),数据量越大,资源要求越高,性能越低。

大表问题思考

大数据计算优化思路,核心无非就三条:增加计算资源;减少被计算数据量;优化计算算法。其中前两条是我们普通人最常用的方法。

两个大表的 Join ,是不是真的每天都有大量的数据有变更呢?如果是的话,那我们的业务就应该思考一下是否合理了。

其实在我们的日常实践场景中,大部分是两个表里面的数据每天只有少量(十万百万至千万级)数据随机变化,大部分数据是不变的。

说到这里,很多人的第一想法是,我们增加分区,按数据是否有变化进行区分,计算有变化的(今日有更新的业务数据),合并未变化的(昨日计算完成的历史数据),不就可以解决问题了。其实这个想法存在以下问题:

图片

问题读到这里,如果我们分别把表 A、表 B 的有变化记录的关联主键取出来合并在一起,形成一个数组变量。计算的时候用这个变量分别从表 A 和表 B 中过滤出有变化的数据进行计算,并从未变化的表(昨日计算完成的历史数据)中过滤出不存在的(即未变化历史结果数据)。这样两份数据简单合并到一起,不就是表 A 和表 B 全量 Join 计算的结果了吗!

那什么样的数组可以轻易的存下这百万千万级的数据量呢?我们第一个想到的答案: 布隆过滤器!

使用布隆过滤器的优化方案

  1. 构建布隆过滤器:分别读取表 A 和表 B 中有变化的数据的关联主键。
  2. 使用布隆过滤器:分别过滤表 A 和表 B 中的数据(即关联主键命中布隆过滤器),然后进行 join 分析。
  3. 使用布隆过滤器:从未变化的表(昨日计算完成的历史数据)中过滤出数据(即没有命中布隆过滤器)。
  4. 合并 2、 3 步骤的数据结果。

也许这里有人会有疑惑,不是说布隆过滤器是命中并不代表一定存在,不命中才代表一定不存在!其实这个命中不代表一定存在,是一个极少量概率问题,即极少量没有更新的数据也会命中布隆过滤器,从而参与了接下来的数据计算,实际上只要所有变化的数据能命中即可。这个不影响它已经帮我买过滤了绝大部分不需要计算的数据。

回看我们的 Spark 经典算法 SortMergeJoin,我们可以看出,该方案是在 Map 阶段就过滤了数据,大大减少了数据量的,提升了计算效率,减少了计算资源使用!

Spark 函数 Java 代码实现

大家可以根据需要参考、修改和优化,有更好的实现方式欢迎大家分享交流。

程序流程图

图片

Spark 函数 Java 代码实现。

package org.example;

import org.apache.curator.shaded.com.google.common.hash.BloomFilter;
import org.apache.curator.shaded.com.google.common.hash.Funnels;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.api.java.*;
import org.apache.spark.SparkConf;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.util.RamUsageEstimator;


class MyBloomFilter {
    private BloomFilter bloomFilter;

    public MyBloomFilter(BloomFilter b) {
        bloomFilter = b;
    }

    public BloomFilter getBloomFilter() {
        return bloomFilter;
    }
}

public class BloomUdf implements UDF2 {
    //最大记录限制,安全起见
    private static int maxSize = 50000000;

    //布隆过滤器是否开启配置, 1 开启,0 关闭
    private static int udfBloomFilterEnable;

    //布隆过滤器是否开启参数,默认开启
    private static String bloomFilterConfKey = "spark.myudf.bloom.enable";

    //加配置配置参数,目前不起作用?? 
    static {
        SparkConf sparkConf = new SparkConf();
        udfBloomFilterEnable = sparkConf.getInt(bloomFilterConfKey, 1);
        System.out.println("the spark.myudf.bloom.enable value " + udfBloomFilterEnable);
    }

    //布隆过滤器列表,支持多个布隆过滤器
    private static ConcurrentHashMap bloomFilterMap = new ConcurrentHashMap<>();

    
    private synchronized static void buildBloomFilter(String key, String path) throws IOException {
        if (!bloomFilterMap.containsKey(key)) {
            BloomFilter bloomFilter;
            Configuration cnotallow=new Configuration();
            FileSystem hdfs=FileSystem.get(conf);
            Path pathDf=new Path(path);
            FileStatus[] stats=hdfs.listStatus(pathDf);

            //获取记录总数
            long sum = 0;
            for (int i=0; i maxSize) {
                //如果数据量大于期望值,则将布隆过滤器置空(即布隆过滤器不起作用)
                System.out.println("the max number is " + maxSize + ", but target num is too big, the " + key + " bloom will be invalid");
                bloomFilter = null;
            } else {
                //默认 1000 W,超过取样本数据 2 倍的量。这里取 2 倍是为了提高布隆过滤器的效果, 2 倍是一个比较合适的值
                long exceptSize = sum*2>10000000?sum*2:10000000;
                bloomFilter = BloomFilter.create(Funnels.stringFunnel(StandardCharsets.UTF_8), (int) exceptSize);
                for (int i=0; i

使用示例演示

表信息和数据准备。

--建表数据
create table default.A (
    item_id bigint comment '商品ID',
    item_name string comment '商品名称',
    item_price bigint comment '商品价格',
    create_time timestamp comment '创建时间',
    update_time timestamp comment '创建时间'
)

create table default.B (
    item_id bigint comment '商品ID',
    sku_id bigint comment 'skuID',
    sku_price bigint comment '商品价格',
    create_time timestamp comment '创建时间',
    update_time timestamp comment '创建时间'
)

create table default.ot (
    item_id bigint comment '商品ID',
    sku_id bigint comment 'skuID',
    sku_price bigint comment '商品价格',
    item_price bigint comment '商品价格'
) PARTITIONED BY (pt string COMMENT '分区字段') 

--准备数据
insert overwrite table default.A 
values
(1,'测试1',101,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(2,'测试2',102,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(3,'测试2',103,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(4,'测试2',104,'2023-03-25 08:00:00','2023-04-22 08:00:00'),
(5,'测试2',105,'2023-03-25 08:00:00','2023-04-22 08:00:00');

insert overwrite table default.B 
values 
(1,11,201,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(1,12,202,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(1,13,203,'2023-04-22 08:00:00','2023-04-22 08:00:00'),
(2,21,211,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(2,22,212,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(4,42,212,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(5,51,251,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(5,52,252,'2023-04-22 08:00:00','2023-04-22 08:00:00'),
(5,53,253,'2023-04-22 08:00:00','2023-04-22 08:00:00');

insert overwrite table default.ot partition(pt='20230421')
values 
(1,11,201,101),
(1,12,202,101),
(2,21,211,102),
(2,22,212,102),
(4,42,212,114),
(5,51,251,110);

原来处理的 SQL 语句。

insert overwrite table default.ot partition(pt='20230422')
select B.item_id 
,B.sku_id 
,B.sku_price 
,A.item_price
from B 
left join A on(A.item_id=B.item_id)

使用布隆过滤器的 SQL(Java 函数导入 Spark,函数名为 “bloom_filter”)。

--构建布隆过滤器
drop table if exists tmp.tmp_primary_key;
create table tmp.tmp_primary_key stored as TEXTFILE as 
select item_id
from (
    select item_id
    from default.A 
    where update_time>='2023-04-22'
    union all 
    select item_id
    from default.B 
    where update_time>='2023-04-22'
) where length(item_id)>0
group by item_id;

--增量数据计算
insert overwrite table default.ot partition(pt='20230422')
select B.item_id 
,B.sku_id 
,B.sku_price 
,A.item_price
from default.B 
left join default.A on(A.item_id=B.item_id and bloom_filter(A.item_id, "tmp.tmp_primary_key"))
where bloom_filter(B.item_id, "tmp.tmp_primary_key")
union all 
--合并历史未变更数据
select item_id
,sku_id
,sku_price
,item_price
from default.ot
where not bloom_filter(item_id, "tmp.tmp_primary_key")
and pt='20230421'

从上面代码可以看出,使用布隆过滤器的 SQL,核心业务逻辑代码只是在原来全量计算的逻辑中增加了过滤条件而已,使用起来还是比较方便的。

实测效果

以我司的 “dim.dim_itm_sku_info_detail_d” 和 “dim.dim_itm_info_detail_d” 任务为例,使用引擎 Spark2。

图片

总结

从理论分析和实测效果来看,使用布隆过滤器的解决方案可以大幅提升任务的性能,并减少集群资源的使用。

该方案不仅适用大表间 Join 分析计算,也适用大表相关的其它分析计算需求,核心思想就是计算有必要的数据,排除没必要数据,减小无效的计算损耗。

来源:政采云技术内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯