文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

记录一次MySQL+Redis实现优化百万数据统计的方式

2024-11-28 13:38

关注

刚接到这个任务后真的是一脸懵逼,因为既没有文档,也没有相关的开发人员,甚至连需求都不了解。唯一的解决办法就是向上面多要时间,有了足够的时间就可以通过代码梳理出业务逻辑。

背景

客户在我司采购了WAF防火墙产品,用于拦截和阻断非法请求和一些具有攻击行为的请求。随着系统的不断运作,数据量也随之增长,这就导致客户系统部分报表页面加载时间过长,用户体验极差。

技术栈

SSM + Gateway + Redis + Kafka + MySQL

其中Gateway负责安全防护和限流,当请求经过Gateway时,Gateway会将该请求的原参数,以及安全状态,是否存在攻击,请求ip等信息通过Kafka发送到后台系统并当作日志记录到数据库中。

优化思路

当我看到报表接口的第一眼,就被惊呆了。先不说业务逻辑,单单一个函数中的代码行数将近1000行,在这1000行的代码中依稀残留着几行简洁而又模糊的注释,并且函数内对象的命名也是惨不忍睹,比如format1,data1,data2,collect1, collect2......。即使冒着涉密的风险,我也要复制出来,与大家一起分享。

图片图片

if (pageResultDTO.isPresent()) {
    List data = pageResultDTO.get().getData();
    Long count = Long.parseLong(pageResultDTO.get().getCount().toString());
    long normalCount = data.stream().filter(log -> log.getType().equals("正常")).count();
    response.setTotalCount(count);
    response.setNormalCount(normalCount);
    response.setAbNormalCount(count - normalCount);
    Map> collect = data.stream()
            .filter(log -> log.getType().equals("正常"))
            .collect(Collectors.groupingBy(
                    item -> new SimpleDateFormat(
                            "yyyy-MM-dd HH").format(
                            com.payegis.antispider.admin.common.utils.DateUtil
                                    .pars2Calender(item.getTime())
                                    .getTime())));

    Map> collect1 = data.stream()
            .filter(log -> !log.getType().equals("正常"))
            .collect(Collectors.groupingBy(
                    item -> new SimpleDateFormat(
                            "yyyy-MM-dd HH").format(
                            com.payegis.antispider.admin.common.utils.DateUtil
                                    .pars2Calender(item.getTime())
                                    .getTime())));
    
    Map> ipMap = data.stream()
            .filter(log -> !log.getType().equals("正常"))
            .collect(Collectors.groupingBy(
                    SecurityIncidentDTO::getSourceIp));
    for (String s : ipMap.keySet()) {
        List tempList = ipMap.get(s);
        int size = tempList.size();
        ApiStatisticDataVO apiStatisticDataVO = new ApiStatisticDataVO();
        apiStatisticDataVO.setValue(size);
        apiStatisticDataVO.setMsg(s);
        apiStatisticDataVO.setId(s);
        ipList.add(apiStatisticDataVO);

    }
    List collect3 = ipList.stream()
            .sorted(Comparator.comparing(ApiStatisticDataVO::getValue)
                    .reversed())
            .limit(5)
            .collect(Collectors.toList());
    ipList = new ArrayList<>(5);
    for (int i = 0; i < 5; i++) {
        ApiStatisticDataVO apiStatisticDataVO = new ApiStatisticDataVO();
        apiStatisticDataVO.setId(i + "");
        apiStatisticDataVO.setValue(0);
        apiStatisticDataVO.setMsg("");
        ipList.add(i, apiStatisticDataVO);
    }
    for (int i = 0; i < collect3.size(); i++) {
        ipList.set(i, collect3.get(i));
    }

    for (String hour2 : list) {
        boolean falg = false;
        for (String hour : collect.keySet()) {
            if (hour2.substring(0, 2).equals(hour.substring(hour.length() - 2))) {
                data1.add(collect.get(hour).size());
                falg = true;
            }
        }
        if (!falg) {
            data1.add(0);
        }
    }

    for (String hour2 : list) {
        boolean falg = false;
        for (String hour : collect1.keySet()) {
            if (hour2.substring(0, 2).equals(hour.substring(hour.length() - 2))) {
                data2.add(collect1.get(hour).size());
                falg = true;
            }
        }
        if (!falg) {
            data2.add(0);
        }
    }

吐槽完了,下面开始正式步入正题。经过不知道多久的时间,该方法的逻辑也慢慢变得清晰起来,其主要实现的是:将Kafka接受并存储在数据库中的日志数据,进行分类统计,具体包括事件状态统计(正常访问量,异常访问量,总访问量),近12/24小时内各时间段的事件统计,攻击IP地址TOP5,接口访问TOP5,安全类型分布等报表。

原有逻辑是直接查询数据库,通过sql来实现统计,这种方式如果在数据量小的情况下并不会出现什么问题并且实现方式也相对简单。但是,当数据量上去之后,sql的查询效率就会随之下降,即使通过优化索引的方式也无济于事。

那么在不能引入其他组件或框架情况下,该如何优化查询呢?

经过短暂的思考后,决定以归档的方式进行数据处理,即在存储日志前,先对日志数据进行分门别类的处理,比如需要统计每个时段的事件访问量,那么就以小时和事件状态为标识进行存储,假设在12:30分有一条异常的访问,那么在消费端接收到消息后,先查询数据库中是否存在12点且访问异常的数据,如果存在,那么次数加一,否则将该数据插入到数据库中,这样在一小时内统一时间状态只会存在一条数据。

图片图片

上面的方式是可以减少一定的数据量并且可以提高查询效率,但是如果请求量很大,消息在不断的消费那么就意味着需要不断的查询数据库,更新数据库,这样就会造成一定的性能消耗,而且还会出现并发问题,造成数据重复。

本打算先用这种方式来解决的,有并发就加锁。但是在划了一小时水之后突然想到,当前小时的数据是不是可以存到redis中?

经过片刻的构想,发现确实可以,毕竟变得只是一个数量,可以用redis自增去做。存到缓存后,定时在同步到数据库中不就搞定了吗,这样既可以大大减少数据库操作,还能提高查询效率。

图片图片


@Override
public void handleWebEventStatus(Log log) {
    String siteId = antispiderDetailLog.getSiteId();
    Date curr = new Date();
    DateTime beginOfHour = DateUtil.beginOfHour(curr);
    Integer eventStatus = log.getAntispiderRule().intValue() == 0 ? 0 : 1;
    // 不同站点事件(区分站点)
    String cacheKey = StrUtil.format(RedisConstant.REPORT_WEB_TIME_EXIST, siteId, DateUtil.format(beginOfHour, timeFormat), eventStatus.intValue());
    // 所有站点事件(不区分站点)
    String cacheKeyAll = StrUtil.format(RedisConstant.REPORT_WEN_TIME_ALL, DateUtil.format(beginOfHour, timeFormat), eventStatus.intValue());
    if (redisService.exist(cacheKeyAll)) {
        redisService.increment(cacheKeyAll, 1L);
    } else {
        redisService.setValueByHour(cacheKeyAll, 1, 2L);
    }
    if (redisService.exist(cacheKey)) {
        redisService.increment(cacheKey, 1L);
    } else {
        redisService.setValueByHour(cacheKey, 1, 2L);
    }
}



@Scheduled(cron = "0 0/30 * * * ?")
public void synRedisDataToDB() {
    synchronized (lock) {
        reportWebEventStatusService.synRedisDataToDB();
        reportWebEventTopService.synRedisDataToDB();
        reportWebIpTopService.synRedisDataToDB();
    }
}

上面的搞完后,我突然又发现,如果要统计24小时内的数据,那前23小时的数据肯定都已经固定了,不会在发生变化了。那完全可以将前23小时的数据统计完后存入redis,查询的时候只需要在数据库中查询当前所属小时的数据即可。更新缓存的时间可以设定为1小时1更新,这样就可以保证到一个新时段时,可以保证缓存中的数据为近23小时内的数据。

图片图片


@Scheduled(cron = "0 0 0/1 * * ?")
public void synAllSiteWebEventDataToRedis() {
    synchronized (lock) {
        synReportWebDataToRedis();
    }
}

现在经过优化以后,几乎所有的数据都通过定时任务的方式来统计和存储了,不在需要通过sql的方式实时统计了。最后还是会有个地方存在优化的空间,由于原业务接口是将所有统计报表的数据放在一个接口里面返回的,那么在不改变原参数和不拆分接口的情况下,可以使用Future做异步处理,毕竟每个报表的数据查询统计操作都是独立的,可以按照预估的查询效率做个排序。那么,最终的一个方法就是将上述几个报表数据进行组装,并统一返回给前端。

@Override
public ApiDashboardResponse webDashboardV2(DashboardRequest request) throws Exception {
    ApiDashboardResponse response = new ApiDashboardResponse();

    // 1. 统计近12/24小时事件防护数量排名
    Future reportWebEventTopVoFuture = reportTaskExecutor.submit(() -> {
        ReportWebEventTopVo webEventTopVo = reportWebEventTopService.getWebEventTopVo(request.getSiteId(), request.getTimeType());
        return webEventTopVo;
    });

    // 2. 统计近12/24小时内各时段安全事件状态
    Future webEventTopVoFuture = reportTaskExecutor.submit(() -> {
        ReportWebEventStatusVo reportWebEventStatus = reportWebEventStatusService.getReportWebEventStatus(request.getSiteId(), request.getTimeType());
        return reportWebEventStatus;
    });

    // 3.统计top5的攻击ip地址
    Future reportWebIpTopVoFuture = reportTaskExecutor.submit(() -> {
        ReportWebIpTopVo reportWebIpTop5 = reportWebIpTopService.getReportWebIpTop5(request.getSiteId(), request.getTimeType());
        return reportWebIpTop5;
    });

    // 4. 统计访问top5的站点
    Future reportWebSiteTopVoFuture = reportTaskExecutor.submit(() -> {
        ReportWebSiteTopVo webSiteTop5 = reportWebSiteTopService.getWebSiteTop5Vo(request.getSiteId(), request.getTimeType());
        return webSiteTop5;
    });


    // 拼装响应数据
    // 站点访问量的数据都存储在redis处理速度应该最快
    ReportWebSiteTopVo reportWebSiteTopVo = reportWebSiteTopVoFuture.get();

    ReportWebEventTopVo reportWebEventTopVo = reportWebEventTopVoFuture.get();

    ReportWebEventStatusVo reportWebEventStatusVo = webEventTopVoFuture.get();

    //攻击源ip的数据可能相对较多
    ReportWebIpTopVo reportWebIpTopVo = reportWebIpTopVoFuture.get();

    //......
    return response;
}

小结

由于是公司项目的代码,所以在这里只能粘贴一小部分。但代码不是关键,关键在于如何在不借助其他数据处理的中间件的情况下,如何优化大量数据查询速度。数据分类归档确实是一种可行的解决方式,如果你的项目中有一些需要以月,以天,以人或者其他标准来进行统计的话,不妨可以尝试一下。

如果有更好的方法方式,可以忽略。下面的两张图是测试人员提供的优化前后对比,发现150万的日志量,查询时间在1秒内,比老版本提高很多倍。

优化前

图片图片

优化后

图片 图片

来源:JAVA日知录内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯