文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

【ElasticSearch】ES与MySQL数据同步方案及Java实现

2023-08-30 13:38

关注

一、同步实现思路

elasticsearch中的酒店数据来自于mysql数据库,当mysql中的数据发生改变时,es中的数据也要跟着改变,即es与mysql之间的数据同步。
在这里插入图片描述

1、方案一:同步调用

操作mysql的微服务hotel-admin不能直接更新es的索引库,那就由操作es索引库的微服务hotel-demo来暴露一个更新索引库的接口给hotel-admin调用

在这里插入图片描述
同步调用方式下,业务耦合太多。

2、方案二:异步通知

引入消息队列,hotel-admin将数据写入mysql,并且自己再往MQ发条消息,说"数据更新了",任务就算完成,至于后面的hotel-demo什么时候更新ES,花费了多久,那是hotel-demo自己的事情。
在这里插入图片描述

3、方案三:监听binlog

使用canal中间件去监听mysql的binlog,当binlog发生改变,就通知hotel-demo,和上面不同的时,更加解放了hotel-admin这个上游服务,它往mysql写完数据就算任务完成,不用发消息,也不用调用其他服务,达到了完全解耦合
在这里插入图片描述
其实mysql的binlog发生改变,搭配cancel,就像方案二的hotel-admin服务发了一条消息到MQ。


三种实现方式的对比:
在这里插入图片描述

二、实现ES与MySQL数据同步

1、导入hotel-admin工程

启动服务,访问localhost:{spring.service.port}

在这里插入图片描述
在hotel-admin服务中,模拟MySQL数据的增删改查。

2、项目分析

mysql的增删改动作需要同步到es中,但对es来说,增和改可以归为一类,就看文档id存在不存在了。接下来使用方案二,及MQ实现数据同步,思路如下;

模型如下:

在这里插入图片描述

3、SpringAMQP整合

<dependency>    <groupId>org.springframework.bootgroupId>        <artifactId>spring-boot-starter-amqpartifactId>dependency>
docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management# 访问host:15672,用户和密码为默认的guest
spring:  rabbitmq:      host: 192.168.150.101 # 主机名      port: 5672 # 端口      virtual-host: / # 虚拟主机       username: guest # 用户名      password: guest # 密码

4、声明队列和交换机

package cn.llg.hotel.constants;public class HotelMqConstants {//交换机名称    public static final String EXCHANGE_NAME = "hotel.topic";    //新增和修改队列    public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";    //删除队列    public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";    //RoutingKey    public static final String INSERT_KEY = "hotel.insert";    public static final String DELETE_KEY = "hotel.delete";}
package cn.llg.hotel.config;import cn.llg.hotel.constants.HotelMqConstants;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class MqConfig {    @Bean    public TopicExchange topicExchange(){        return new TopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false);    }    @Bean    public Queue insertQueue(){        return new Queue(HotelMqConstants.INSERT_QUEUE_NAME,true);    }    @Bean    public Queue deleteQueue(){        return new Queue(HotelMqConstants.DELETE_QUEUE_NAME,true);    }        @Bean    public Binding insertQueueBinding(){        return BindingBuilder                .bind(insertQueue())                .to(topicExchange())                .with(HotelMqConstants.INSERT_KEY);    }    @Bean    public Binding deleteQueueBinding(){        return BindingBuilder                .bind(deleteQueue())                .to(topicExchange())                .with(HotelMqConstants.DELETE_KEY);    }}

5、发送消息MQ

注入RabbitTemplate的对象之后,这里就直接在controller中发送MQ消息了,convertAndSend()方法的三个参数:

package cn.llg.hotel.web;import cn.llg.hotel.constants.HotelMqConstants;import cn.llg.hotel.pojo.Hotel;import cn.llg.hotel.pojo.PageResult;import cn.llg.hotel.service.IHotelService;import com.baomidou.mybatisplus.extension.plugins.pagination.Page;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*;import java.security.InvalidParameterException;@RestController@RequestMapping("hotel")public class HotelController {    @Autowired    private IHotelService hotelService;    @Autowired    private RabbitTemplate rabbitTemplate;    @PostMapping    public void saveHotel(@RequestBody Hotel hotel){        // 新增酒店        hotelService.save(hotel);        // 发送MQ消息,MQ是基于内存的,你把整个酒店对象hotel发过去很容易占满队列,发个主键ID就好,消息体尽量小些        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());    }    @PutMapping()    public void updateById(@RequestBody Hotel hotel){        if (hotel.getId() == null) {            throw new InvalidParameterException("id不能为空");        }        hotelService.updateById(hotel);        // 发送MQ消息        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());    }    @DeleteMapping("/{id}")    public void deleteById(@PathVariable("id") Long id) {        hotelService.removeById(id);        // 发送MQ消息        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.DELETE_KEY, id);    }    //其他接口    @GetMapping("/list")    public PageResult hotelList(            @RequestParam(value = "page", defaultValue = "1") Integer page,            @RequestParam(value = "size", defaultValue = "1") Integer size    ){        Page<Hotel> result = hotelService.page(new Page<>(page, size));        return new PageResult(result.getTotal(), result.getRecords());    }        @GetMapping("/{id}")    public Hotel queryById(@PathVariable("id") Long id){        return hotelService.getById(id);    }}

6、监听MQ消息

hotel-demo整合完SpringAMQP后,在hotel-demo中监听消息。

package cn.llg.hotel.mq;import cn.llg.hotel.constants.HotelMqConstants;import cn.llg.hotel.service.IHotelService;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import javax.annotation.Resource;@Componentpublic class HotelListener {    @Resource    IHotelService hotelService;        @RabbitListener(queues = HotelMqConstants.INSERT_QUEUE_NAME)    public void listenHotelInsertAndUpdate(Long id){        hotelService.insertDocById(id);    }        @RabbitListener(queues = HotelMqConstants.DELETE_QUEUE_NAME)    public void listenHotelDelete(Long id){        hotelService.deleteDocById(id);    }}
package cn.llg.hotel.service;import cn.llg.hotel.domain.dto.RequestParams;import cn.llg.hotel.domain.pojo.Hotel;import cn.llg.hotel.domain.vo.PageResult;import com.baomidou.mybatisplus.extension.service.IService;public interface IHotelService extends IService<Hotel> {    void insertDocById(Long id);    void deleteDocById(Long id);}
@Servicepublic class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {    @Resource    RestHighLevelClient client;    @Override    public void insertDocById(Long id) {        try {            //0.根据ID查数据,并转为文档类型            Hotel hotel = getById(id);            HotelDoc hotelDoc = new HotelDoc(hotel);            //1.准备request            IndexRequest request = new IndexRequest("hotel").id(hotelDoc.getId().toString());            //2.准备DSL            request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);            //3.发送请求            client.index(request,RequestOptions.DEFAULT);        } catch (IOException e) {            throw new RuntimeException(e);        }    }    @Override    public void deleteDocById(Long id) {        try {            //1.准备request            DeleteRequest request = new DeleteRequest("hotel",id.toString());            //2.发送请求            client.delete(request,RequestOptions.DEFAULT);        } catch (IOException e) {            throw new RuntimeException(e);        }    }}

最后补充下上面的Hotel和HotelDoc之间的转换关系:

@Data@TableName("tb_hotel")public class Hotel {    @TableId(type = IdType.INPUT)    private Long id;    private String name;    private String address;    private Integer price;    private Integer score;    private String brand;    private String city;    private String starName;    private String business;    private String longitude;    private String latitude;    private String pic;}
@Data@NoArgsConstructorpublic class HotelDoc {    private Long id;    private String name;    private String address;    private Integer price;    private Integer score;    private String brand;    private String city;    private String starName;    private String business;    private String location;    private String pic;    //距离    private Object distance;    //是否充广告    private Boolean isAD;    //ES中的completion,后面存数组,这里可以对应成List    private List<String> suggestion;    public HotelDoc(Hotel hotel) {        this.id = hotel.getId();        this.name = hotel.getName();        this.address = hotel.getAddress();        this.price = hotel.getPrice();        this.score = hotel.getScore();        this.brand = hotel.getBrand();        this.city = hotel.getCity();        this.starName = hotel.getStarName();        this.business = hotel.getBusiness();        this.location = hotel.getLatitude() + ", " + hotel.getLongitude();        this.pic = hotel.getPic();        if(this.business.contains("/")){            //此时business有多个值,需要分开后放入suggestion            String[] arr = this.business.split("/");            //添加元素            this.suggestion = new ArrayList<>();            Collections.addAll(this.suggestion,arr);            this.suggestion.add(this.brand);        }else{            this.suggestion = Arrays.asList(this.brand,this.business);        }    }}

7、测试同步功能

重启两个服务,查看MQ:

在这里插入图片描述

点击队列查看详情,可以看到绑定交换机成功:

在这里插入图片描述
接下来去酒店管理页面修改一条酒店信息(我直接调接口了,不去页面改)

在这里插入图片描述
在酒店搜索页面搜一下:

在这里插入图片描述

可以看到ES数据跟随MySQL更新成功!

来源地址:https://blog.csdn.net/llg___/article/details/131691624

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯