前言
分布式事务是分布式系统中非常常见的问题。是非常必要钱常见的。实现的方式也是多种多样。今天这个视频主要来分享一下RocketMQ实现事务消息来保证分布式事务的一致性。不知道大家使用过这种方式没有。这种分布式事务的原理其实和本地消息表一样。
本地消息表实现分布式事务的基本原理
本地消息表实现分布式事务的基本原理是通过两个阶段的事务处理来保证分布式环境中的数据一致性。以下是其基本步骤:
大致就是将本地消息表和要执行的第一个业务逻辑放在一个事务中,这样就可以一起成功一起失败。当第一阶段成功后。根据本地消息表中的记录去让下游的业务执行成功。扫描本地消息表中的消息然后执行下游业务。执行成功后在删除本地消息表中消息。不成功则重试。
1.本地事务:
在开始分布式事务时,首先执行本地操作。例如,更新某个服务的数据。
如果本地操作成功,事务进入下一步;如果失败,则回滚本地事务,并结束流程。
消息记录:
创建一条消息记录,通常称为“本地消息”,将需要在后续阶段执行的远程操作信息保存在本地数据库的一个消息表中。这个消息记录包含了执行远程操作所需的所有数据。
消息发送:
将本地消息发送到消息队列,如RocketMQ或其他消息中间件。此时,消息队列并不保证消息已经被消费,只是简单地将消息放入队列。
消息消费:
消息队列的消费者监听并处理消息。消费者通常是另一个服务,它接收消息并执行相应的远程操作,比如更新另一个服务的数据。
确认与补偿:
如果远程操作成功,消费者会发送一个确认信号(ACK),通知生产者操作已完成。这时,生产者可以删除本地消息表中的记录。
如果远程操作失败,消费者可能会尝试重新消费消息,或者根据策略回滚本地事务,然后通知生产者消息处理失败。
最终一致性:
尽管可能有短暂的延迟,但最终所有服务的数据状态会达到一致,因为本地操作和远程操作都会成功完成,或者在失败时都会回滚。
异常处理:
为了处理异常情况,系统通常会有超时和重试机制。如果消费者长时间没有确认,生产者可能会重新发送消息,或者在一定时间后回滚本地事务。
本地消息表方案的优点在于它避免了分布式事务的复杂性,实现了最终一致性,而不是强一致性。但是,它也有一些缺点,比如增加了系统的复杂性,需要维护额外的消息表,以及可能出现消息丢失或重复消费的问题。因此,它更适合对实时性要求不高,但对最终一致性有要求的场景。
本地消息表是一种最终一致性方案。并不是强一致性方案。
rocketmq事务消息
今天重点来说一下rocketmq事务消息是怎么做的。先理解一下Rocketmq事务消息
这种类似的图片挺多的。简单的来看一下 然后一会结合代码看一下。生产者先送消息到MQserve。然后mq去执行本地事务。通过回查的方式来保证第一阶段消息执行的成功。然后下游消费者来消费这个消息。
代码
我们需要实现分布式事务的两个服务分别是用户中心的服务以及im业务服务。功能是注册的功能。用户的注册信息基本信息存储在用户中心表。然后其他信息存储在im_user表里面。这个听起来有点奇怪。因为我这套代码是计划用户中心存储多个app的用户信息。通义提供鉴权服务什么的。然后基本信息存储在自己的业务用户表里面。大概是这样的设计思路。可以看代码。
/**
* 使用rocketmq实现事务
* @param dto
* @return
* @throws Exception
*/
@ApiOperation("使用邮箱和密码注册")
@PostMapping("/sys/registByWeb")
public GenericResponse registByWebTX(@RequestBody SysRegisterForm dto) throws Exception {
String uuid = UUID.randomUUID().toString() + new Random().nextInt();
SysUserEntity sysUserEntity = new SysUserEntity();
sysUserEntity.setPassword(dto.getPassword());
sysUserEntity.setUsername(dto.getUsername());
sysUserEntity.setOpenid(uuid);
//注册需要的实体类
RegisterFeign registerFeign = new RegisterFeign();
registerFeign.setOpenid(uuid);
registerFeign.setUsername(dto.getUsername());
registerFeign.setEmail(dto.getEmail());
TransactionSendResult sendResult= rocketMqHelper.transactionSend(Topic.REGISTER,
MessageBuilder.withPayload(sysUserEntity).build(),registerFeign);
String sendStatus = sendResult.getSendStatus().name();
String localTXState = sendResult.getLocalTransactionState().name();
logger.info("sendStatus---" + sendStatus);
logger.info("localTXState---"+localTXState);
// 注意:这里不能立即返回成功,因为事务还未完成,实际应用中可能需要设计异步回调通知客户端事务结果
// 以下仅为示例逻辑,实际应用中需根据业务需求调整
return GenericResponse.response(ServiceError.NORMAL);
}
这里实现注册功能。然后
TransactionSendResult sendResult= rocketMqHelper.transactionSend(Topic.REGISTER,
MessageBuilder.withPayload(sysUserEntity).build(),registerFeign);
这行代码用来发送事务消息;
需要给rocketmq配置一个生产者端的消息监听器
@Slf4j
@RocketMQTransactionListener
public class UserRegistrationTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private SysUserService sysUserService;
@Autowired
SysUserDao sysUserDao;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
log.info(">>>> TX message listener execute local transaction, message={},args={} <<<<",msg,arg);
// 执行本地事务
RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;
try {
String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
// OrderEntity order = GSON.fromJson(jsonString, OrderEntity.class);
SysUserEntity sysUserEntity = JSON.parseObject(jsonString, SysUserEntity.class);
sysUserService.saveUser(sysUserEntity);
} catch (Exception e) {
log.error(">>>> exception message={} <<<<",e.getMessage());
result = RocketMQLocalTransactionState.UNKNOWN;
}
// return RocketMQLocalTransactionState.UNKNOWN;
return result;
}
/**
* 步骤四
* 描述:mq回调检查本地事务执行情况
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
log.info(">>>> TX message listener check local transaction, message={} <<<<",msg.getPayload());
// 检查本地事务
RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;
try {
String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
SysUserEntity sysUserEntity = JSON.parseObject(jsonString, SysUserEntity.class);
// OrderEntity order = GSON.fromJson(jsonString, OrderEntity.class);
// List<OrderEntity> list = orderService.selectOrder(order);
List<Map> list = sysUserDao.queryUserByOpenid(sysUserEntity.getOpenid(),sysUserEntity.getUsername());
if(list.size()<=0){
result = RocketMQLocalTransactionState.UNKNOWN;
}
} catch (Exception e) {
// 异常就回滚
log.error(">>>> exception message={} <<<<",e.getMessage());
result = RocketMQLocalTransactionState.ROLLBACK;
}
return result;
}
}
@RocketMQTransactionListener注意这个注解不能落下。
然后可以配置一下下游消费者。
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "consumeRegister", topic = "TX_REGISTER_ADD",consumeMode = ConsumeMode.ORDERLY)
public class RegisterListener implements RocketMQListener<RegisterFeign> {
@Autowired
private WeChatService weChatService;
/**
*
* @param dto
*/
@Override
public void onMessage(RegisterFeign dto) {
log.info("接收到消息,开始消费..dto" + dto);
weChatService.registByOpenid(dto);
}
}
我们在这个地方来接受一下消息。然后调用这个服务的保存。