微服务框架
【SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud微服务技术栈课程|黑马程序员Java微服务】
SpringCloud微服务架构
文章目录
- 微服务框架
- SpringCloud微服务架构
- 28 数据同步
- 28.5 监听 MQ 消息
- 28.5.1 直接开干
28 数据同步
28.5 监听 MQ 消息
28.5.1 直接开干
案例:利用MQ实现mysql与elasticsearch数据同步
利用课前资料提供的hotel-admin项目作为酒店管理的微服务。当酒店数据发生增、删、改时,要求对elasticsearch中数据也要完成相同操作。
步骤:
- 导入课前资料提供的hotel-admin项目,启动并测试酒店数据的CRUD √
- 声明exchange、queue、RoutingKey √
- 在hotel-admin中的增、删、改业务中完成消息发送 √
- 在hotel-demo中完成消息监听,并更新elasticsearch中数据 【这一步了】
- 启动并测试数据同步功能
【在hotel-demo 中】
新建消息监听类
package cn.itcast.hotel.mq;
import cn.itcast.hotel.constants.MqConstants;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* ClassName: HotelListener
* date: 2022/11/3 18:29
*
* @author DingJiaxiong
*/
@Component
public class HotelListener {
@Autowired
private IHotelService hotelService;
//监听酒店新增或修改的业务【参数:酒店id】
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
public void listenHotelInsertOrUpdate(Long id){
hotelService.insertById(id);
}
//监听酒店删除的业务【参数:酒店id】
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
public void listenHotelDelete(Long id){
hotelService.deleteById(id);
}
}
修改业务层接口
package cn.itcast.hotel.service;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import com.baomidou.mybatisplus.extension.service.IService;
import java.util.List;
import java.util.Map;
public interface IHotelService extends IService<Hotel> {
PageResult search(RequestParams params);
Map<String , List<String>> filters(RequestParams params);
List<String> getSuggestions(String prefix);
void deleteById(Long id);
void insertById(Long id);
}
修改实现类
@Override
public void deleteById(Long id) {
try {
//1. 准备Request
DeleteRequest request = new DeleteRequest("hotel",id.toString());
//2. 发送请求
client.delete(request,RequestOptions.DEFAULT);
}catch (IOException e){
throw new RuntimeException(e);
}
}
@Override
public void insertById(Long id) {
try {
//0. 根据id 查询酒店数据
Hotel hotel = getById(id);
//转换为文档类型
HotelDoc hotelDoc = new HotelDoc(hotel);
//1. 准备Request 对象
IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
//2. 准备Json 文档
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
//3. 发送请求
client.index(request, RequestOptions.DEFAULT);
}catch (IOException e){
throw new RuntimeException(e);
}
}
实现我们新增的两个接口,OK,这就是监听的代码