elasticsearch & MySQL 数据同步。
文章目录
- elasticsearch & MySQL 数据同步。
- 3. 数据同步。
- 3.1. 思路分析。
- 3.1.1. 同步调用。
- 3.1.2. 异步通知。
- 3.1.3. 监听 binlog。
- 3.1.4. 选择。
- 3.2. 实现数据同步。
- 3.2.1. 思路。
- 3.2.2. 导入 demo。
- 3.2.3. 声明交换机、队列。
- 1)引入依赖。
- 2)声明队列交换机名称。
- 3)声明队列交换机。
- 3.2.4. 发送 MQ 消息。
- 3.2.5. 接收 MQ 消息。
3. 数据同步。
elasticsearch 中的酒店数据来自于 mysql 数据库,因此 mysql 数据发生改变时,elasticsearch 也必须跟着改变,这个就是 elasticsearch 与 mysql 之间的数据同步。
在微服务中,负责酒店管理(操作 MySQL)的业务与负责酒店搜索(操作 elasticsearch)的业务可能在两个不同的微服务上,两者数据该如克如何保持同步?
3.1. 思路分析。
常见的数据同步方案有三种。
-
同步调用。
-
异步通知。
-
监听 binlog。
3.1.1. 同步调用。
方案一:同步调用。
基本步骤如下。
-
hotel-demo 对外提供接口,用来修改 elasticsearch 中的数据。
-
酒店管理服务在完成数据库操作后,直接调用 hotel-demo 提供的接口,
3.1.2. 异步通知。
方案二:异步通知。
流程如下。
-
hotel-admin 对 mysql 数据库数据完成增、删、改后,发送 MQ 消息。
-
hotel-demo 监听 MQ,接收到消息后完成 elasticsearch 数据修改。
3.1.3. 监听 binlog。
方案三:监听 binlog。
流程如下。
-
给 mysql 开启 binlog 功能。
-
mysql 完成增、删、改操作都会记录在 binlog 中。
-
hotel-demo 基于 canal 监听 binlog 变化,实时更新 elasticsearch 中的内容。
3.1.4. 选择。
方式一:同步调用。
-
优点:实现简单,粗暴。
-
缺点:业务耦合度高。
方式二:异步通知。
-
优点:低耦合,实现难度一般。
-
缺点:依赖 mq 的可靠性。
方式三:监听 binlog。
-
优点:完全解除服务间耦合。
-
缺点:开启 binlog 增加数据库负担、实现复杂度高。
3.2. 实现数据同步。
3.2.1. 思路。
开发 hotel-admin 项目作为酒店管理的微服务。当酒店数据发生增、删、改时,要求对 elasticsearch 中数据也要完成相同操作。
步骤。
-
导入 hotel-admin 项目,启动并测试酒店数据的 CRUD。
-
声明 exchange、queue、routingKey。
-
在 hotel-admin 中的增、删、改业务中完成消息发送。
-
在 hotel-demo 中完成消息监听,并更新 elasticsearch 中数据。
-
启动并测试数据同步功能。
3.2.2. 导入 demo。
导入 hotel-admin 项目。
运行后,访问 http://localhost:8099。
其中包含了酒店的 CRUD 功能。
3.2.3. 声明交换机、队列。
MQ 结构如图。
1)引入依赖。
在 hotel-admin、hotel-demo 中引入 rabbitmq 的依赖。
<!-- amqp。-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2)声明队列交换机名称。
在 hotel-admin 和 hotel-demo 中的 com.geek.elasticsearchgeek.hotel.constatnts
包下新建一个类 MqConstants
。
package com.geek.elasticsearchgeek.hotel.constant;
/**
* @author geek
*/
public interface IMqConstants {
/**
* 交换机。
*/
String TOPIC_EXCHANGE_HOTEL = "topic.exchange.hotel";
/**
* 监听新增和修改的队列。
*/
String QUEUE_HOTEL_INSERT = "queue.hotel.insert";
/**
* 监听删除的队列。
*/
String QUEUE_HOTEL_DELETE = "queue.hotel.delete";
/**
* 新增或修改的 RoutingKey。
*/
String ROUTING_KEY_HOTEL_INSERT = "routing.key.hotel.insert";
/**
* 删除的 RoutingKey。
*/
String ROUTING_KEY_HOTEL_DELETE = "routing.key.hotel.delete";
}
3)声明队列交换机。
在 hotel-demo 中,定义配置类,声明队列、交换机。
package com.geek.elasticsearchgeek.hotel.config;
import com.geek.elasticsearchgeek.hotel.constant.IMqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author geek
*/
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(IMqConstants.TOPIC_EXCHANGE_HOTEL, true, false);
}
@Bean
public Queue queueHotelInsert() {
return new Queue(IMqConstants.QUEUE_HOTEL_INSERT, true);
}
@Bean
public Queue queueHotelDelete() {
return new Queue(IMqConstants.QUEUE_HOTEL_DELETE, true);
}
@Bean
public Binding hotelInsertQueueRoutingKeyBinding() {
return BindingBuilder.bind(queueHotelInsert())
.to(topicExchange())
.with(IMqConstants.ROUTING_KEY_HOTEL_INSERT);
}
@Bean
public Binding hotelDeleteQueueRoutingKeyBinding() {
return BindingBuilder.bind(queueHotelDelete())
.to(topicExchange())
.with(IMqConstants.ROUTING_KEY_HOTEL_DELETE);
}
}
3.2.4. 发送 MQ 消息。
在 hotel-admin 中的增、删、改业务中分别发送 MQ 消息。
package com.geek.elasticsearch.hotel.admin.controller;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.geek.elasticsearch.hotel.admin.constant.IMqConstants;
import com.geek.elasticsearch.hotel.admin.dataobject.Hotel;
import com.geek.elasticsearch.hotel.admin.dto.PageResult;
import com.geek.elasticsearch.hotel.admin.service.IHotelService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.security.InvalidParameterException;
/**
* @author geek
*/
@Slf4j
@RestController
@RequestMapping("/hotel")
public class HotelController {
@Autowired
private IHotelService hotelService;
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/{id}")
public Hotel queryById(@PathVariable("id") Long id) {
return this.hotelService.getById(id);
}
@GetMapping("/list")
public PageResult hotelList(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "size", defaultValue = "1") Integer size) {
Page<Hotel> result = this.hotelService.page(new Page<>(page, size));
return new PageResult(result.getTotal(), result.getRecords());
}
@PostMapping
public void saveHotel(@RequestBody Hotel hotel) {
this.hotelService.save(hotel);
this.rabbitTemplate.convertAndSend(IMqConstants.TOPIC_EXCHANGE_HOTEL,
IMqConstants.ROUTING_KEY_HOTEL_INSERT,
hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
this.hotelService.removeById(id);
this.rabbitTemplate.convertAndSend(IMqConstants.TOPIC_EXCHANGE_HOTEL,
IMqConstants.ROUTING_KEY_HOTEL_DELETE,
id);
}
@PutMapping
public void updateById(@RequestBody Hotel hotel) {
if (hotel.getId() == null) {
throw new InvalidParameterException("id 不能为空。");
}
this.hotelService.updateById(hotel);
this.rabbitTemplate.convertAndSend(IMqConstants.TOPIC_EXCHANGE_HOTEL,
IMqConstants.ROUTING_KEY_HOTEL_INSERT,
hotel.getId());
}
}
3.2.5. 接收 MQ 消息。
hotel-demo 接收到 MQ 消息要做的事情包括。
-
新增消息:根据传递的 hotel 的 id 查询 hotel 信息,然后新增一条数据到索引库。
-
删除消息:根据传递的 hotel 的 id 删除索引库中的一条数据。
1)首先在 hotel-demo 的 com.geek.elasticsearchgeek.hotel.service
包下的 IHotelService
中新增新增、删除业务。
void deleteById(Long id);
void insertById(Long id);
2)给 hotel-demo 中的 com.geek.elasticsearchgeek.hotel.service.impl
包下的 HotelService 中实现业务。
@Override
public void insertById(Long id) {
// 根据 id 查询酒店数据。
Hotel hotel = getById(id);
// 转换为文档类型。
HotelDoc hotelDoc = new HotelDoc(hotel);
// 准备 Request 对象。
IndexRequest indexRequest = new IndexRequest("hotel").id(hotel.getId().toString());
// 准备 Json 文档。
indexRequest.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
// 发送请求。
try {
this.restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void deleteById(Long id) {
// 准备 Request。
DeleteRequest deleteRequest = new DeleteRequest("hotel", id.toString());
// 发送请求。
try {
this.restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
3)编写监听器。
在 hotel-demo 中的com.geek.elasticsearchgeek.hotel.mq
包新增一个类。
package com.geek.elasticsearchgeek.hotel.mq;
import com.geek.elasticsearchgeek.hotel.constant.IMqConstants;
import com.geek.elasticsearchgeek.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author geek
*/
@Component
public class HotelMqListener {
@Autowired
private IHotelService hotelService;
/**
* 监听酒店新增或修改的业务。
*
* @param id 酒店 id。
*/
@RabbitListener(queues = IMqConstants.QUEUE_HOTEL_INSERT)
public void listenHotelInsertOrUpdate(Long id) {
this.hotelService.insertById(id);
}
/**
* 监听酒店删除的业务。
*
* @param id 酒店 id。
*/
@RabbitListener(queues = IMqConstants.QUEUE_HOTEL_DELETE)
public void listenHotelDelete(Long id) {
this.hotelService.deleteById(id);
}
}