目录
- 1 es与数据库同步的方法
- 2 实践
- 2.1 任务介绍
- 2.2 MQ方面操作
- 2.2.1 声明交换机队列并且绑定
- 2.2.2 hotel_admin端web层设置mq发送消息
- 2.3 hotel_demo端监听接受消息并执行es操作
1 es与数据库同步的方法
方式一:同步调用
- 优点:实现简单,粗暴
- 缺点:业务耦合度高
方式二:异步通知(选择这个折中下)
- 优点:低耦合,实现难度一般
- 缺点:依赖mq的可靠性
方式三:监听binlog
- 优点:完全解除服务间耦合
- 缺点:开启binlog增加数据库负担、实现复杂度高
2 实践
2.1 任务介绍
当酒店数据发生增、删、改时,要求对elasticsearch中数据也要完成相同操作。
同时开启了hotel_admin和hotel_demo两个微服务,利用MQ声明exchange、queue、RoutingKey,在hotel-admin中的增、删、改业务中完成消息发送,在hotel-demo中完成消息监听,并更新elasticsearch中数据,进而完成es和mysql的消息同步
2.2 MQ方面操作
2.2.1 声明交换机队列并且绑定
我打算使用的mq结构如下:
代码:
@Configuration
public class Myconfig {
/**
* 声明交换机
* @return
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false);
}
/**
* 插入/更新队列
* @return
*/
@Bean
public Queue insertQueue(){
return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
}
/**
* 删除队列
* @return
*/
@Bean
public Queue deleteQueue(){
return new Queue(MqConstants.HOTEL_DELETE_QUEUE);
}
/**
* 绑定增/改
* @return
*/
@Bean
public Binding bindingInsert(){
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_QUEUE);
}
/**
* 绑定删除
* @return
*/
@Bean
public Binding bindingDelete(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
}
}
2.2.2 hotel_admin端web层设置mq发送消息
@RestController
@RequestMapping("hotel")
public class HotelController {
@Autowired
private IHotelService hotelService;
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/{id}")
public Hotel queryById(@PathVariable("id") Long id){
return hotelService.getById(id);
}
@GetMapping("/list")
public PageResult hotelList(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "size", defaultValue = "1") Integer size
){
Page<Hotel> result = hotelService.page(new Page<>(page, size));
return new PageResult(result.getTotal(), result.getRecords());
}
@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
hotelService.save(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
@PutMapping()
public void updateById(@RequestBody Hotel hotel){
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
}
}
2.3 hotel_demo端监听接受消息并执行es操作
@Component
public class MsgListener {
@Autowired
private IHotelService hotelService;
/**
* 监听插入或者更新doc的信息
* @param id
*/
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
public void receiveInsertMsg(Long id){
hotelService.InsertOrUpdate(id);
}
/**
* 监听删除doc的信息
* @param id
*/
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
public void receiveDeleteMsg(Long id){
hotelService.deleteEsById(id);
}
}
之后去service层实现监听类调用的增删方法
@Override
public void InsertOrUpdate(Long id) {
//1 根据id去数据库查信息
Hotel db_hotel = this.getById(id);
if(db_hotel == null){
log.warn("id为:"+id+"的酒店不存在");
return;
}
//2 构建添加对象
HotelDoc hotelDoc = new HotelDoc(db_hotel);
String jsonString = JSON.toJSONString(hotelDoc);
IndexRequest request = new IndexRequest("hotel").id(db_hotel.getId().toString());
request.source(jsonString, XContentType.JSON);
//3 发送添加请求
try {
IndexResponse result = restHighLevelClient.index(request, RequestOptions.DEFAULT);
} catch (IOException e) {
log.warn("同步id为:"+id+"的信息超时");
}
}
@Override
public void deleteEsById(Long id) {
DeleteRequest request = new DeleteRequest("hotel",id.toString());
try {
restHighLevelClient.delete(request,RequestOptions.DEFAULT);
} catch (IOException e) {
log.warn("删除id为:"+id+"的信息超时");
}
}