SpringBoot+Canal(监听MySQL的binlog)+RabbitMQ(处理保存变更记录)
在SpringBoot中采用一种与业务代码解耦合的方式,来实现数据的变更记录,记录的内容是新数据,如果是更新操作还得有旧数据内容。
使用Canal来监听MySQL的binlog变化可以实现这个需求,可是在监听到变化后需要马上保存变更记录,除非再做一些逻辑处理,于是又结合了RabbitMQ来处理保存变更记录的操作。
- 启动MySQL环境,并开启binlog
- 启动Canal环境,为其创建一个MySQL账号,然后以Slave的形式连接MySQL
- Canal服务模式设为TCP,用Java编写客户端代码,监听MySQL的binlog修改
- Canal服务模式设为RabbitMQ,启动RabbitMQ环境,配置Canal和RabbitMQ的连接,用消息队列去接收binlog修改事件
预先在model实体中准备
短信实体
@Data
@ApiModel(description = "短信实体")
public class MsmVo{
@ApiModelProperty(value="phone")
private String phone;
@ApiModelProperty(value = "短信模板code")
private String templateCode;
@ApiModelProperty(value="短信模板参数")
private Map<String,Object> param;
}
排班实体
@Data
@ApiModel(description = "OrderMqVo")
public class OrderMqVo{
@ApiModelProperty(value="可预约数")
private Integer reserverdNumber
@ApiModelProperty(value = "剩余预约数")
private Integer availableNumber;
@ApiModelProperty(value = "排班id")
private String scheduleId;
@ApiModelProperty(value = "短信实体")
private MsmVo msmVo;
}
一、安装RabbitMQ
docker pull rabbitmq:nanagemnet
docker run -d -p 5672:5672 -p 12672:15672 --name rabbitmq rabbitmq:nanagement
访问:http://IP:15672
二、rabbit-util模块封装
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
创建一个RabbitService用来发送消息
@Service
public class RabbitService{
@Autowired
private RabbitTemplate rabbitTemplate;
//发送消息
public boolean sendMessage(String exchange,String routingKey,Object message){
rabbitTemplate.convertAndSend(exchange,routingKey,message);
return true;
}
}
创建mq消息转化器
@Configuration
public class MQConfig{
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
添加常量配置类
public class MqConst{
//预约下单
public static final String EXCHANGE_DIRECT_ORDER = "exchange.direct.order";
public static final String ROUTING_ORDER = "order";
//队列
public static final String QUEUE_ORDER = "queue.order";
//短信
public static final String EXCHANGE_DIRECT_MSM = "exchange.direct.msm";
public static final String ROUTING_MSM_ITEM = "msm.item";
pulib static final String Queue_MSM_item = "queue.msm.item";
}
三、短信模块service-sms
将二中的模块依赖引入
<dependency>
<groupId>com.michael</groupId>
<artifactId>rabbit_util</artifactId>
<version>xxx</version>
</dependency>
配置文件application.properties
spring.rabbitmq.host=192.168.44.168
spring.rabbitmq.port=5672
spring.rabbit.uername=guest
spring.rabbitmq.password=guest
Service发送消息
public interface MsmService{
//发送手机验证码
boolean send(String phone,String code);
//MQ使用发送短信的接口
boolean send(MsmVo msmVo);
}
@Service
public class MsmServiceImpl implements MsmService{
@Override
public boolean send(String phone,String code){
//判断手机号是否为空
if(StringUtils.isEmpty(phone)){
return false;
}
//整合阿里云相关参数,短信服务
DefaultProfile profile = DefaultProfile.getProfile(ConstantPropertiesUtils.REGION_Id,
ConstantPropertiesUtils.ACCESS_KEY_ID,
ConstantPropertiesUtils.SECRET
);
IAcsClient client = new DefaultAcsClient(profile);
CommonRequest request = new CommonRequest();
request.setMethod(MethodType.POST);
request.setDomain("dysmsapi.aliyuncs.com");
request.setVersion("2018-08-08");
request.setAction("SendSms");
//手机号
request.putQueryParameter("PhoneNumbers",phone);
//签名名称
request.putQueryParameter("SignName","我的网站");
//模板
request.putQueryParameter("TemplateCode","SMS_180051135");
//验证码使用json格式{"code":"123456"}
Map<String,Object> param = new HashMap();
param.put("code",code);
request.putQueryParameter("TemplateParam",JSONObject.toJSONString(param));
//调用方法进行短信发送
try{
CommonResponse response = client.getCommonResponse(request);
System.out.println(response.getData());
return response.getHttpResponse().isSuccess();
}catch(ServerException e){
e.printStackTrace();
}catch(ClientException e){
e.printStackTrace();
}
return false;
}
@Override
public boolean send(MsmVo msmVo){
if(!StringUtils.isEmpty(msmVO.getPhone())){
String code = (String)msmVo.getParam().get("code");
boolean isSend = this.send(msmVo.getPhone(),code);
return isSend;
}
return false;
}
}
创建mq监控器
@Component
public class MsmReceiver{
@Autowired
private MsmService msmService;
//监听
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = MqConst.QUEUE_MSM_ITEM,durable = "true"),
exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_MSM),
key = {MqConst.ROUTING_MSM_ITEM}
))
public void send(MsmVo msmVo,Message message,Channel channel){
msmService.ssend(msmVo);
}
}
四、业务类
生成订单之后,发送短信并更新数量
①、业务模块中引入依赖
rabbit-util
②、添加配置
spring.rabbitmq.host=192.168.44.165
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
③、service接口以及实现类
@Override
public void update(Schedule schedule){
schedule.setUpdata(new Date());
scheduleRepository.save(schedule);
}
④、receiver包中创建MQ监听器
@Component
public class HospitalReceiver{
@Autowired
private ScheduleService scheduleService;
@Autowired
private RabbitService rabbitService;
//监听
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = MqConst.QUEUE_ORDER,durable = "true"),
exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_ORDER),
key = {MqConst.ROUTING_ORDER}
)
)
public void receiver(OrderMqVo orderMqVo,Message message,Channel channle) throws IOException{
//下单成功,更新数据
Schedule schedule = scheduleService.getScheduleId(orderMqVo.getScheduleId());
schedule.setReservedNumber(orderMqVo.getReservedNumber());
schedule.setAvailableNumber(orderMqVo.getAvailableNumber);
scheduleService.update(schedule);
//发送短信
MsmVo msmVo = orderMqVo.getMsmVo();
if(null != msmVo){
rebbitService.sendMessage(MqConst.QUEUE_MSM_ITEM,MqConst.ROUTING_MSM_ITEM,msmVo);
}
}
}