项目场景:
在项目开发中常常会遇到在一个有数据库操作的方法中,发送MQ消息,如果这种情况消息队列效率比较快,就会出现数据库事务还没提交,消息队列已经执行业务,导致不一致问题。举个应用场景,我们提交一个订单,将流水号放在MQ里,MQ监听到后就会查询订单去做其它业务,如果这时候数据库事务还没提交,也就是没生成订单流水,MQ监听到消息就去执行业务,查询订单,肯定会出现业务不一致问题
问题描述
最近遇到一个业务场景,类似于下单过程,场景是用户注册消息,注册成功后,会发送MQ消息,MQ监听到消息后,会查询用户的信息,如何再做其它业务,但是遇到一个问题,就是mq消费消息的速度是快于数据库事务提交的,就是我们用户注册的信息还没写入数据库,mq已经提前消费了,所以会导致查询不到用户注册的信息
大致的代码:
@Transactional(rollbackFor = Exception.class)
public void register(){
User user = User.builder()
.name("管理员")
.email("123456@qq.com")
.build();
userMapper.insert(user);
// 发送消息给MQ
sendMQMessage();
}
原因分析
MQ消息消费快于事务提交
解决方案
对于这种情况,下面给出两种处理方法,一种是借助于Spring框架提供的TransactionSynchronizationManager
来控制,另外一种方法是借助于Spring框架提供的@TransactionalEventListener
来控制事务
- TransactionSynchronizationManager控制事务
@Transactional(rollbackFor = Exception.class)
public void register() {
User user = User.builder()
.name("管理员")
.email("123456@qq.com")
.build();
userMapper.insert(user);
// after transaction commit
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
// 发送消息给MQ
sendMQMessage();
}
});
}
测试一下,通过日志可以看出事务已经提交了,如何发送mq,mq监听到消息,就会去读取用户信息,是可以获取到的
- @TransactionalEventListener控制事务
如果借助Spring框架提供的事件监听机制来实现,就需要用到@TransactionalEventListener
监听器,下面给出例子
创建一个Event,主要来做参数传送
package com.example.eventlistener.event;
import org.springframework.context.ApplicationEvent;
public class SendMsgEvent extends ApplicationEvent {
private Long userId;
private String userName;
public SendMsgEvent(Object source){
super(source);
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
}
创建一个监听器,注意要加上@Component
,组件类才能被Spring容器管理
package com.example.eventlistener.listener;
import cn.hutool.http.HttpRequest;
import cn.hutool.json.JSONUtil;
import com.example.eventlistener.event.SendMsgEvent;
import com.example.eventlistener.mapper.UserMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import javax.annotation.Resource;
@Component
@Slf4j
public class SendMsgListener {
@Resource
private UserMapper userMapper;
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT , classes = SendMsgEvent.class)
public void sendMsg(SendMsgEvent sendMsgEvent) {
log.info("sendMsg: {}" , JSONUtil.toJsonStr(sendMsgEvent));
// 发送消息给MQ
sendMQMessage();
}
}
业务类实现业务:
package com.example.eventlistener.service.impl;
import cn.hutool.http.HttpRequest;
import com.example.eventlistener.event.SendMsgEvent;
import com.example.eventlistener.event.UserRegisterEvent;
import com.example.eventlistener.mapper.UserMapper;
import com.example.eventlistener.model.User;
import com.example.eventlistener.service.IUserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import javax.annotation.Resource;
@Service
@Slf4j
public class UserServiceImpl implements ApplicationEventPublisherAware , IUserService {
private ApplicationEventPublisher applicationEventPublisher;
@Resource
private UserMapper userMapper;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
@Override
@Transactional(rollbackFor = Exception.class)
public void sendMsgAfterRegisterByEvent() {
User user = doRegister();
// after transaction commit
SendMsgEvent sendMsgEvent = new SendMsgEvent(this);
sendMsgEvent.setUserId(user.getId());
sendMsgEvent.setUserName(user.getName());
applicationEventPublisher.publishEvent(sendMsgEvent);
}
private User doRegister() {
User user = User.builder()
.name("管理员")
.email("123456@qq.com")
.build();
userMapper.insert(user);
log.info("save user info");
return user;
}
}
经过测试,也可以实现同样的效果,控制数据库的事务提交后,才执行发送MQ消息
补充:
如果执行出现java.lang.IllegalStateException: Transaction synchronization is not active
,说明没加事务控制,加上@Transactional(rollbackFor = Exception.class)
即可