文章目录
一、同步实现思路
elasticsearch中的酒店数据来自于mysql数据库,当mysql中的数据发生改变时,es中的数据也要跟着改变,即es与mysql之间的数据同步。
1、方案一:同步调用
操作mysql的微服务hotel-admin不能直接更新es的索引库,那就由操作es索引库的微服务hotel-demo来暴露一个更新索引库的接口给hotel-admin调用
同步调用方式下,业务耦合太多。
2、方案二:异步通知
引入消息队列,hotel-admin将数据写入mysql,并且自己再往MQ发条消息,说"数据更新了",任务就算完成,至于后面的hotel-demo什么时候更新ES,花费了多久,那是hotel-demo自己的事情。
3、方案三:监听binlog
使用canal中间件去监听mysql的binlog,当binlog发生改变,就通知hotel-demo,和上面不同的时,更加解放了hotel-admin这个上游服务,它往mysql写完数据就算任务完成,不用发消息,也不用调用其他服务,达到了完全解耦合。
其实mysql的binlog发生改变,搭配cancel,就像方案二的hotel-admin服务发了一条消息到MQ。
三种实现方式的对比:
二、实现ES与MySQL数据同步
1、导入hotel-admin工程
启动服务,访问localhost:{spring.service.port}
在hotel-admin服务中,模拟MySQL数据的增删改查。
2、项目分析
mysql的增删改动作需要同步到es中,但对es来说,增和改可以归为一类,就看文档id存在不存在了。接下来使用方案二,及MQ实现数据同步,思路如下;
- 声明exchange、queue、RoutingKey
- 在hotel-admin中的增、删、改业务中完成消息发送
- 在hotel-demo中完成消息监听,并更新elasticsearch中数据
模型如下:
3、SpringAMQP整合
- 引入依赖
<dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-amqpartifactId>dependency>
- 临时启动个rabbitmq
docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management# 访问host:15672,用户和密码为默认的guest
- 在hotel-admin中的application.yml,添加mq连接信息
spring: rabbitmq: host: 192.168.150.101 # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: guest # 用户名 password: guest # 密码
- 最后,记得给消费方也引入AMQP依赖,并添加上mq的连接信息
4、声明队列和交换机
- 在常量目录下定义队列和交换机的名字
package cn.llg.hotel.constants;public class HotelMqConstants {//交换机名称 public static final String EXCHANGE_NAME = "hotel.topic"; //新增和修改队列 public static final String INSERT_QUEUE_NAME = "hotel.insert.queue"; //删除队列 public static final String DELETE_QUEUE_NAME = "hotel.delete.queue"; //RoutingKey public static final String INSERT_KEY = "hotel.insert"; public static final String DELETE_KEY = "hotel.delete";}
- 接下来声明队列和交换机,可以基于注解,也可以基于Bean,后者复杂些,这里演示后者
package cn.llg.hotel.config;import cn.llg.hotel.constants.HotelMqConstants;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class MqConfig { @Bean public TopicExchange topicExchange(){ return new TopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false); } @Bean public Queue insertQueue(){ return new Queue(HotelMqConstants.INSERT_QUEUE_NAME,true); } @Bean public Queue deleteQueue(){ return new Queue(HotelMqConstants.DELETE_QUEUE_NAME,true); } @Bean public Binding insertQueueBinding(){ return BindingBuilder .bind(insertQueue()) .to(topicExchange()) .with(HotelMqConstants.INSERT_KEY); } @Bean public Binding deleteQueueBinding(){ return BindingBuilder .bind(deleteQueue()) .to(topicExchange()) .with(HotelMqConstants.DELETE_KEY); }}
5、发送消息MQ
注入RabbitTemplate的对象之后,这里就直接在controller中发送MQ消息了,convertAndSend()方法的三个参数:
- 交换机名称
- routingKey
- 消息内容,这里消息体尽量小些,别把一整个对象发过去
package cn.llg.hotel.web;import cn.llg.hotel.constants.HotelMqConstants;import cn.llg.hotel.pojo.Hotel;import cn.llg.hotel.pojo.PageResult;import cn.llg.hotel.service.IHotelService;import com.baomidou.mybatisplus.extension.plugins.pagination.Page;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*;import java.security.InvalidParameterException;@RestController@RequestMapping("hotel")public class HotelController { @Autowired private IHotelService hotelService; @Autowired private RabbitTemplate rabbitTemplate; @PostMapping public void saveHotel(@RequestBody Hotel hotel){ // 新增酒店 hotelService.save(hotel); // 发送MQ消息,MQ是基于内存的,你把整个酒店对象hotel发过去很容易占满队列,发个主键ID就好,消息体尽量小些 rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId()); } @PutMapping() public void updateById(@RequestBody Hotel hotel){ if (hotel.getId() == null) { throw new InvalidParameterException("id不能为空"); } hotelService.updateById(hotel); // 发送MQ消息 rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId()); } @DeleteMapping("/{id}") public void deleteById(@PathVariable("id") Long id) { hotelService.removeById(id); // 发送MQ消息 rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.DELETE_KEY, id); } //其他接口 @GetMapping("/list") public PageResult hotelList( @RequestParam(value = "page", defaultValue = "1") Integer page, @RequestParam(value = "size", defaultValue = "1") Integer size ){ Page<Hotel> result = hotelService.page(new Page<>(page, size)); return new PageResult(result.getTotal(), result.getRecords()); } @GetMapping("/{id}") public Hotel queryById(@PathVariable("id") Long id){ return hotelService.getById(id); }}
6、监听MQ消息
hotel-demo整合完SpringAMQP后,在hotel-demo中监听消息。
- 新建类HotelListener类,并加@Component注解以Bean的形式管理
package cn.llg.hotel.mq;import cn.llg.hotel.constants.HotelMqConstants;import cn.llg.hotel.service.IHotelService;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import javax.annotation.Resource;@Componentpublic class HotelListener { @Resource IHotelService hotelService; @RabbitListener(queues = HotelMqConstants.INSERT_QUEUE_NAME) public void listenHotelInsertAndUpdate(Long id){ hotelService.insertDocById(id); } @RabbitListener(queues = HotelMqConstants.DELETE_QUEUE_NAME) public void listenHotelDelete(Long id){ hotelService.deleteDocById(id); }}
- 拿到MQ中的酒店id后,使用JavaHighLevelClient对象来更新ES数据
package cn.llg.hotel.service;import cn.llg.hotel.domain.dto.RequestParams;import cn.llg.hotel.domain.pojo.Hotel;import cn.llg.hotel.domain.vo.PageResult;import com.baomidou.mybatisplus.extension.service.IService;public interface IHotelService extends IService<Hotel> { void insertDocById(Long id); void deleteDocById(Long id);}
@Servicepublic class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService { @Resource RestHighLevelClient client; @Override public void insertDocById(Long id) { try { //0.根据ID查数据,并转为文档类型 Hotel hotel = getById(id); HotelDoc hotelDoc = new HotelDoc(hotel); //1.准备request IndexRequest request = new IndexRequest("hotel").id(hotelDoc.getId().toString()); //2.准备DSL request.source(JSON.toJSONString(hotelDoc), XContentType.JSON); //3.发送请求 client.index(request,RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } } @Override public void deleteDocById(Long id) { try { //1.准备request DeleteRequest request = new DeleteRequest("hotel",id.toString()); //2.发送请求 client.delete(request,RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } }}
最后补充下上面的Hotel和HotelDoc之间的转换关系:
@Data@TableName("tb_hotel")public class Hotel { @TableId(type = IdType.INPUT) private Long id; private String name; private String address; private Integer price; private Integer score; private String brand; private String city; private String starName; private String business; private String longitude; private String latitude; private String pic;}
@Data@NoArgsConstructorpublic class HotelDoc { private Long id; private String name; private String address; private Integer price; private Integer score; private String brand; private String city; private String starName; private String business; private String location; private String pic; //距离 private Object distance; //是否充广告 private Boolean isAD; //ES中的completion,后面存数组,这里可以对应成List private List<String> suggestion; public HotelDoc(Hotel hotel) { this.id = hotel.getId(); this.name = hotel.getName(); this.address = hotel.getAddress(); this.price = hotel.getPrice(); this.score = hotel.getScore(); this.brand = hotel.getBrand(); this.city = hotel.getCity(); this.starName = hotel.getStarName(); this.business = hotel.getBusiness(); this.location = hotel.getLatitude() + ", " + hotel.getLongitude(); this.pic = hotel.getPic(); if(this.business.contains("/")){ //此时business有多个值,需要分开后放入suggestion String[] arr = this.business.split("/"); //添加元素 this.suggestion = new ArrayList<>(); Collections.addAll(this.suggestion,arr); this.suggestion.add(this.brand); }else{ this.suggestion = Arrays.asList(this.brand,this.business); } }}
7、测试同步功能
重启两个服务,查看MQ:
点击队列查看详情,可以看到绑定交换机成功:
接下来去酒店管理页面修改一条酒店信息(我直接调接口了,不去页面改)
在酒店搜索页面搜一下:
可以看到ES数据跟随MySQL更新成功!