文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦

2023-10-27 05:52

关注

在这里插入图片描述

前言

本篇博客是一篇elasticsearch的使用案例,包括结合MybatisPlus使用ES,如何保证MySQL和es的数据一致性,另外使用了RabbitMQ进行解耦,自定义了发消息的方法。

其他相关的Elasticsearch的文章列表如下:

目录

引出


elasticsearch的使用案例,包括结合MybatisPlus使用ES;
2.如何保证MySQL和es的数据一致性;
3.使用了RabbitMQ进行解耦,自定义了发消息的方法。

结合MybatisPlus使用ES

1.引入依赖

        <dependency>            <groupId>org.springframework.bootgroupId>            <artifactId>spring-boot-starter-data-elasticsearchartifactId>        dependency>                <dependency>            <groupId>mysqlgroupId>            <artifactId>mysql-connector-javaartifactId>            <scope>runtimescope>        dependency>                <dependency>            <groupId>com.alibabagroupId>            <artifactId>druid-spring-boot-starterartifactId>        dependency>                <dependency>            <groupId>com.baomidougroupId>            <artifactId>mybatis-plus-boot-starterartifactId>        dependency>        <dependency>            <groupId>org.springframework.bootgroupId>            <artifactId>spring-boot-starter-data-redisartifactId>        dependency>

2.进行配置

package com.tianju.es.config;import org.elasticsearch.client.RestHighLevelClient;import org.springframework.context.annotation.Configuration;import org.springframework.data.elasticsearch.client.ClientConfiguration;import org.springframework.data.elasticsearch.client.RestClients;import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;@Configurationpublic class ESConfig extends AbstractElasticsearchConfiguration {    @Override    public RestHighLevelClient elasticsearchClient() {        ClientConfiguration clientConfiguration =                ClientConfiguration.builder()                        .connectedTo("192.168.111.130:9200")                        .build();        return RestClients.create(clientConfiguration).rest();    }}

3.实体类上加入注解

在这里插入图片描述

package com.tianju.es.entity;import com.baomidou.mybatisplus.annotation.IdType;import com.baomidou.mybatisplus.annotation.TableField;import com.baomidou.mybatisplus.annotation.TableId;import com.baomidou.mybatisplus.annotation.TableName;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.springframework.data.elasticsearch.annotations.Document;import org.springframework.data.elasticsearch.annotations.Field;import org.springframework.data.elasticsearch.annotations.FieldType;import java.math.BigDecimal;@Data@NoArgsConstructor@AllArgsConstructor@TableName("finance_sku")@Document(indexName = "finance_sku")public class FinanceSkuES {    @TableId(value = "ID",type = IdType.AUTO)    private Long id;    @TableField("finance_sku_describe")    @Field(index = true,analyzer = "ik_smart",            searchAnalyzer = "ik_smart",type = FieldType.Text)    private String detail; // 详情    @TableField("finance_sku_price")    private BigDecimal price;    @TableField("finance_sku_stock")    private Long stock;    @TableField("finance_state")    private Integer status;}

参数解释

@Document(indexName = "books", shards = 1, replicas = 0)@Datapublic class Book {    @Id    @Field(type = FieldType.Integer)    private Integer id;        @Field(type = FieldType.Keyword)    private String title;        @Field(type = FieldType.Text)    private String press;        @Field(type = FieldType.Keyword)    private String author;        @Field(type = FieldType.Keyword,index=false)    private BigDecimal price;        @Field(type = FieldType.Text)    private String description;}

4.创建操作的 Repository

在这里插入图片描述

从它的祖先们那里继承了大量的现成的方法,除此之外,它还可以按 spring data 的规则定义特定的方法。

在这里插入图片描述

package com.tianju.es.mapper;import com.tianju.es.entity.FinanceSkuES;import org.springframework.data.domain.Page;import org.springframework.data.domain.Pageable;import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;import org.springframework.stereotype.Repository;@Repositorypublic interface SkuESMapper extends ElasticsearchRepository<FinanceSkuES, Long> {        Page<FinanceSkuES> findFinanceSkuESByDetail(String detail, Pageable pageable);        void removeFinanceSkuESById(Long id);}

5.初始化es中的数据

在这里插入图片描述

运行的后台信息

在这里插入图片描述

查看es页面的信息,index management

在这里插入图片描述

6.进行全查询以及分页

在这里插入图片描述

进行全查询

在这里插入图片描述

{  "content": [    {      "id": 1,      "detail": "HUAWEI MateBook X Pro 2023 微绒典藏版 13代酷睿i7 32GB 2TB 14.2英寸3.1K原色全面屏 墨蓝",      "price": 13999.0,      "stock": 50,      "status": 1    },    {      "id": 2,      "detail": "HUAWEI Mate 60 Pro+ 16GB+1TB 宣白",      "price": 9999.0,      "stock": 60,      "status": 1    },    {      "id": 3,      "detail": "iPhone 15 Pro Max 超视网膜 XDR 显示屏",      "price": 9299.0,      "stock": 46,      "status": 1    },    {      "id": 4,      "detail": "MacBook Air Apple M2 芯片 8 核中央处理器 8 核图形处理器 8GB 统一内存 256GB 固态硬盘",      "price": 8999.0,      "stock": 60,      "status": 1    }  ],  "pageable": {    "sort": {      "empty": true,      "sorted": false,      "unsorted": true    },    "offset": 0,    "pageSize": 4,    "pageNumber": 0,    "paged": true,    "unpaged": false  },  "totalElements": 4,  "last": true,  "totalPages": 1,  "number": 0,  "size": 4,  "sort": {    "empty": true,    "sorted": false,    "unsorted": true  },  "numberOfElements": 4,  "first": true,  "empty": false}

带条件分页查询

在这里插入图片描述

注意分页查询的page从0开始,尝试发现需要输入分词器分词后最小单元,比如hu不是最小单元,而HUAWEI是

在这里插入图片描述

分词器进行分词的结果

在这里插入图片描述

es和mysql的数据一致性

延迟双删

在这里插入图片描述

    @Override    public FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES) {        // 把es看做是缓存,如何保证es 和 mysql的 数据一致性?        // 延迟双删的模式        // 1.先删除缓存 es        skuESMapper.deleteAll();        // 2.更新数据库 mysql        updateById(financeSkuES);        // 3.延时操作        try {            Thread.sleep(3000);        } catch (InterruptedException e) {            throw new RuntimeException(e);        }        // 4.再次删除缓存 es        skuESMapper.deleteAll();        // 5.最后更新缓存 es        skuESMapper.saveAll(list());        Optional<FinanceSkuES> byId = skuESMapper.findById(financeSkuES.getId());        log.debug("byId: "+byId);        return byId.get();    }

上面代码有不妥的地方,我这里是修改,结果一开始直接从es中全部删除,应该是根据id把修改的数据删除,然后把修改好的数据set进es里面

加锁的方式

感觉好像没什么用的样子,就是用了一下加锁

在这里插入图片描述

用rabbitmq进行解耦

在这里插入图片描述

配置yml文件

在这里插入图片描述

spring:  main:    allow-circular-references: true  datasource:    driver-class-name: com.mysql.cj.jdbc.Driver    ### 本地的数据库    url: jdbc:mysql://127.0.0.1:3306/consumer_finance_product?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&allowMultiQueries=true    username: root    password: 123  # redis的相关配置  redis:    host: 119.3.162.127    port: 6379    database: 0    password: Pet3927  # rabbitmq相关  rabbitmq:    host: 192.168.111.130    port: 5672    username: admin    password: 123    virtual-host: /test    # 生产者保证消息可靠性    publisher-returns: true    publisher-confirm-type: correlated    # 设置手动确认    listener:      simple:        acknowledge-mode: manual

rabbitmq的配置类

在这里插入图片描述

将Java对象转换成json字符串传输

在这里插入图片描述

package com.tianju.es.rabbit;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitConfig {    public static final String ES_EXCHANGE = "es_exchange";    public static final String ES_QUEUE = "es_queue";    public static final String ES_KEY = "es_key";    @Bean    public DirectExchange directExchange(){        return new DirectExchange(ES_EXCHANGE);    }    @Bean    public Queue esQueue(){        return new Queue(ES_QUEUE);    }    @Bean    public Binding esQueueToDirectExchange(){        return BindingBuilder.bind(esQueue())                .to(directExchange())                .with(ES_KEY);    }        @Bean    public MessageConverter messageConverter(){        return  new Jackson2JsonMessageConverter();    }    @Bean    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);        rabbitTemplate.setMessageConverter(messageConverter());// 修改转换器        return rabbitTemplate;    }}

在这里插入图片描述

callback回调方法

package com.tianju.es.rabbit;import com.alibaba.fastjson2.JSON;import com.alibaba.fastjson2.JSONObject;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;// RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback@Configuration@Slf4jpublic class CallbackConfig        implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {    @Autowired    private RabbitTemplate rabbitTemplate;    // 初始化    @PostConstruct    public void init(){        rabbitTemplate.setConfirmCallback(this);        rabbitTemplate.setReturnCallback(this);        rabbitTemplate.setMandatory(true);    }        @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause) {        log.debug("ack是否成功:"+ack);        log.debug("cause信息:"+cause);        if (correlationData!=null){            JSONObject jsonObject = JSON.parseObject(correlationData.getReturnedMessage().getBody());            String exchange = correlationData.getReturnedMessage().getMessageProperties().getReceivedExchange();            String routingKey = correlationData.getReturnedMessage().getMessageProperties().getReceivedRoutingKey();            log.debug("消息体:"+jsonObject);            log.debug("交换机:"+exchange);            log.debug("路由key:"+routingKey);        }        if (ack){            return;        }        // 失败了        // 1、重试重试上限次数(默认值5)每重试一次时间间隔会增加        // 2、把消息、交换机名称、路由键等相关的消息保存到数据库,有一个程序定时扫描相关的消息,然后重新发送消息。        // 重发上限次数(默认值5)超过阈值会转人工处理        // 2、把消息体、交换机名称、路由键等相关的消息保存到数据库,有一个程序定时扫描相关的消息,然后重新发送消息。        // 重发上限次数(默认值5)超过阈值会转人工处理        // 2.1需要把相关的信息存放到数据中,表字段:消息体、交换机名称、路由键、状态、次数        // 2.2定时任务(单体:spring定时任务  分布式:XxL-job),发送消息    }        @Override    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {        // 2、把消息、交换机名称、路由键等相关的消息保存到数据库,有一个程序定时扫描相关的消息,然后重新发送消息。    }}

自定义发消息工具类

在这里插入图片描述

package com.tianju.common.util;import com.alibaba.fastjson2.JSON;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.data.redis.core.StringRedisTemplate;@Slf4jpublic class RabbitUtil {        public static  <T> void sendMsg(RabbitTemplate rabbitTemplate,        StringRedisTemplate redisTemplate,        T msg,String token,Integer ttl,        String exchange,String routingKey) {        log.debug("给交换机[{}]通过路由键[{}]发送消息 {},token为{}",exchange,routingKey,msg,token);        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {            @Override            public Message postProcessMessage(Message message) throws AmqpException {                redisTemplate.opsForValue().set(token, token,5*60000);                message.getMessageProperties().setMessageId(token);                if (ttl!=null){                    message.getMessageProperties().setExpiration(ttl.toString());                }                return message;            }        };        CorrelationData correlationData = new CorrelationData();        // 消息体        Message message = new Message(JSON.toJSONBytes(msg));        // 交换机名称        message.getMessageProperties().setReceivedExchange(exchange);        // 路由键        message.getMessageProperties().setReceivedRoutingKey(routingKey);        correlationData.setReturnedMessage(message);        // 发送MQ消息        rabbitTemplate.convertAndSend(exchange, // 发给交换机                routingKey, // 根据这个routingKey就会给到TTL队列,到时间成死信,发给死信交换机,到死信队列                msg,                messagePostProcessor,                correlationData        );    }}

进行消息的发送

在这里插入图片描述

接口

package com.tianju.es.service;import com.baomidou.mybatisplus.extension.service.IService;import com.tianju.es.entity.FinanceSkuES;public interface SkuService extends IService<FinanceSkuES> {        FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES);        FinanceSkuES updateByIdRedisLock(FinanceSkuES financeSkuES);        String updateByIdRabbitMQ(FinanceSkuES financeSkuES);}

实现类

package com.tianju.es.service.impl;import cn.hutool.core.util.IdUtil;import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;import com.tianju.common.util.RabbitUtil;import com.tianju.es.entity.FinanceSkuES;import com.tianju.es.mapper.SkuESMapper;import com.tianju.es.mapper.SkuMybatisPlusMapper;import com.tianju.es.rabbit.RabbitConfig;import com.tianju.es.service.SkuService;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.stereotype.Service;import java.util.Collection;import java.util.Optional;import java.util.UUID;@Servicepublic class SkuServiceImpl extends ServiceImpl<SkuMybatisPlusMapper,FinanceSkuES>        implements SkuService {    @Autowired    private SkuESMapper skuESMapper;    @Autowired    private StringRedisTemplate stringRedisTemplate;    @Autowired    private RabbitTemplate rabbitTemplate;    @Override    public FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES) {        // 把es看做是缓存,如何保证es 和 mysql的 数据一致性?        // 延迟双删的模式        // 1.先删除缓存 es        skuESMapper.deleteAll();        // 2.更新数据库 mysql        updateById(financeSkuES);        // 3.延时操作        try {            Thread.sleep(3000);        } catch (InterruptedException e) {            throw new RuntimeException(e);        }        // 4.再次删除缓存 es        skuESMapper.deleteAll();        // 5.最后更新缓存 es        skuESMapper.saveAll(list());        Optional<FinanceSkuES> byId = skuESMapper.findById(financeSkuES.getId());        log.debug("byId: "+byId);        return byId.get();    }    @Override    public FinanceSkuES updateByIdRedisLock(FinanceSkuES financeSkuES) {        // 第二种方式加锁        String uuid = UUID.randomUUID().toString();        // 相当于setnx指令        Boolean skuLock = stringRedisTemplate.opsForValue().setIfAbsent("skuLock", uuid);        try {            if (skuLock){ // 抢到了锁                skuESMapper.deleteAll();                updateById(financeSkuES);            }        }finally {            if (uuid.equals(stringRedisTemplate.opsForValue().get("skuLock"))){                stringRedisTemplate.delete("skuLock");            }        }        skuESMapper.saveAll(list());        Optional<FinanceSkuES> byId = skuESMapper.findById(financeSkuES.getId());        log.debug("byId: "+byId);        return byId.get();    }    @Override    public String updateByIdRabbitMQ(FinanceSkuES financeSkuES) {        // 采用rabbitmq进行解耦        updateById(financeSkuES);        FinanceSkuES skuES = getById(financeSkuES.getId());        String uuid = IdUtil.fastUUID();        RabbitUtil.sendMsg(                rabbitTemplate,stringRedisTemplate,skuES,uuid,null,                RabbitConfig.ES_EXCHANGE,RabbitConfig.ES_KEY        );        return "已经发送消息:"+skuES;    }}

在这里插入图片描述

接收到消息,更新es

接收到消息进行es的更新,把原来的删除,把最新的set进去

在这里插入图片描述

package com.tianju.es.rabbit;import com.rabbitmq.client.Channel;import com.tianju.es.entity.FinanceSkuES;import com.tianju.es.mapper.SkuESMapper;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.stereotype.Component;import java.io.IOException;@Slf4j@Componentpublic class ESListener {    @Autowired    private StringRedisTemplate redisTemplate;    @Autowired    private SkuESMapper skuESMapper;    @RabbitListener(queues = RabbitConfig.ES_QUEUE)    public void esUpdate(FinanceSkuES financeSkuES, Message message, Channel channel) {        String messageId = message.getMessageProperties().getMessageId();        log.debug("进行业务----> 监听到队列{}的消息,messageId为{}",financeSkuES,messageId);        try {            // 幂等性            if (redisTemplate.delete(messageId)){                // 根据id删除原有的 es 数据                // 然后把新的数据set进来                log.debug("处理es的业务,删除原有的,替换最新的");                skuESMapper.removeFinanceSkuESById(financeSkuES.getId());                skuESMapper.save(financeSkuES);            }            // 手动签收消息            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        }catch (Exception e){            // 幂等性            redisTemplate.opsForValue().set(messageId,messageId,5*60000);            // 1、重试重试上限次数(默认值5) 每重试一次时间间隔会增加            // 2、把消息、交换机名称、路由键等相关的消息保存到数据库,有一个程序定时扫描相关的消息,然后重新发送消息。            // 重发上限次数(默认值5)超过阈值会转人工处理            // 已知的消息,交换机,路由器,消息 message.getBody()  消息发送给的是监听的队列            try {                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);            } catch (IOException ex) {                throw new RuntimeException(ex);            }        }    }}

在这里插入图片描述

后台打印的日志

在这里插入图片描述


总结

elasticsearch的使用案例,包括结合MybatisPlus使用ES;
2.如何保证MySQL和es的数据一致性;
3.使用了RabbitMQ进行解耦,自定义了发消息的方法。

来源地址:https://blog.csdn.net/Pireley/article/details/133885282

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯