1、需求分析
我们知道es中的数据来自于mysql数据库,因此mysql数据发生改变时,es也必须跟着改变,否则会导致数据不一致问题,这个就是elasticsearch与mysql之间的数据同步。
如何实现数据同步:
方案一:同步调用
缺点:
得按顺序实行图中的三步,依次执行,容易引起数据耦合;
原来只是hotel-admin的业务只是写数据库写完就结束,现在还得在写数据库代码的后面加上调用hotel-demo的代码,hotel-admin和hotel-demo下的两个业务发生业务耦合;
影响性能,而且一个业务出错整个流程都会收到影响。
方案二:
利用消息队列MQ实现 hotel-admin和hotel-demo的业务解耦,但是会依赖MQ的可靠性,而且复杂度会相应的上升一点。
在hotel-admin和hotel-demo中都得声明交换机和监听队列:
public class MqConstants {
/**
* 声明交换机名称
*/
public final static String HOTEL_EXCHANGE = "hotel.topic";
/**
* 监听新增和修改的队列
*/
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
/**
* 监听删除的队列
*/
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
/**
* 新增或修改的RoutingKey
*/
public final static String HOTEL_INSERT_KEY = "hotel.insert";
/**
* 删除的RoutingKey
*/
public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
2、利用MQ实现数据同步:hotel-admin发布消息
在上面的示意图可以看出hotel-admin负责发出消息,所以hotel-admin是作为publicer的角色,
hotel-demo负责监听hotel-admin发出的消息,所以hotel-demo是作为consumer的角色。
因为新增和修改的逻辑差不多,所以将他们两个放入同一个消费者中,delete放在另外一个消费者中,代码如下:
@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
hotelService.save(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
@PutMapping()
public void updateById(@RequestBody Hotel hotel){
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
}
在方法体内部,首先调用了 hotelService.save(hotel)
,这将酒店信息保存到数据库的业务逻辑。接着调用了 rabbitTemplate.convertAndSend()
方法,它用于向RabbitMQ消息队列发送消息。具体来说,它将 hotel.getId()
作为消息发送到了名为 MqConstants.HOTEL_EXCHANGE
的交换机,并使用 MqConstants.HOTEL_INSERT_KEY
作为路由键。
3、利用MQ实现数据同步:hotel-demo实现consumer对消息监听
因为新增和修改的逻辑差不多,所以将他们两个放入同一个消费者中,delete放在另外一个消费者中,代码如下:
/**
* 监听酒店新增或修改的业务
* @param id 酒店id
*/
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
public void listenHotelInsertOrUpdate(Long id){
hotelService.insertById(id);
}
/**
* 监听酒店删除的业务
* @param id 酒店id
*/
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
public void listenHotelDelete(Long id){
hotelService.deleteById(id);
}
在监听到酒店数据在Mysql发生修改后,对应的也要将ES索引库中的数据进行更新,代码如下:
@Override
public void insertById(Long id) {
try {
// 0.根据id查询酒店数据
Hotel hotel = getById(id);
// 转换为文档类型
HotelDoc hotelDoc = new HotelDoc(hotel);
// 1.准备Request对象
IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
// 2.准备Json文档
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
// 3.发送请求
client.index(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
这段代码是一个方法,用于向Elasticsearch索引中插入酒店数据。以下是该方法的主要步骤:
-
首先,通过调用
getById(id)
方法,根据提供的ID从数据库中获取相应的酒店数据,并将其转换为HotelDoc
类型的对象,可能是为了适配Elasticsearch的文档格式。 -
接下来,创建一个
IndexRequest
对象,该对象用于表示将要插入到索引中的文档。指定索引名称为 "hotel",文档ID为酒店的ID(转换为字符串形式)。 -
然后,准备要插入的JSON文档,使用
JSON.toJSONString(hotelDoc)
将hotelDoc
对象转换为JSON格式的字符串,并使用XContentType.JSON
指定内容类型。 -
最后,通过调用
client.index(request, RequestOptions.DEFAULT)
方法向Elasticsearch发送插入请求。其中,client
是一个Elasticsearch的客户端对象,request
是表示插入请求的IndexRequest
对象。
总之,这段代码通过将酒店数据转换为JSON格式,并使用Elasticsearch的Java客户端将其插入到指定的索引中。