使用kafka改造分布式事务

news2025/1/21 6:29:25

文章目录

  • 1、kafka确保消息不丢失?
    • 1.1、生产者端确保消息不丢失
    • 1.2、kafka服务端确保消息不丢失
    • 1.3、消费者确保正确无误的消费
  • 2、生产者发送消息 KafkaService
  • 3、UserInfoServiceImpl -> login()
  • 4、service-account - > AccountListener.java

1、kafka确保消息不丢失?

1.1、生产者端确保消息不丢失

  1. 发送模式:发后即忘、同步阻塞确认、异步非阻塞确认
  2. 生产者acks模式:props.put(“acks”, “all”)、acks: all(-1)
  3. 配置重试:props.put(“retries”, 3)、retries: 3

1.2、kafka服务端确保消息不丢失

  1. kafka是文件型的消息中间件,不会单纯的因为服务器宕机导致消息丢失
  2. 消息的log日志文件损坏:搭建kafka集群(副本)

1.3、消费者确保正确无误的消费

  1. 偏移量提交
     自动提交:enable-auto-commit: true
     手动提交:ack-mode: manual_immediate:同步提交 异步提交(推荐)
  2. 偏移量重置:
     auto-offset-reset: earliest -> 如果有偏移量则继续消费,如果偏移量没了,从头重新进行消费,可能会存在幂等性问题
     auto-offset-reset: latest -> 如果有偏移量则继续消费,如果偏移量不存在,只消费新消息,旧消息没消费完就丢掉了
     auto-offset-reset: none -> 如果有偏移量则继续消费,如果偏移量不存在,抛出异常
  3. 消费者重试:重试主题和死信主题, @RetryableTopic()

2、生产者发送消息 KafkaService

package com.atguigu.tingshu.common.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

@Service
public class KafkaService {

    private static final Logger logger = LoggerFactory.getLogger(KafkaService.class);

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 向指定主题发送消息
     * 此方法通过调用重载的sendMsg方法,向指定主题发送消息,使用默认的消息标签和消息键
     *
     * @param topic 发送消息的主题
     * @param msg   需要发送的消息内容
     */
    public void sendMsg(String topic, String msg){
        // 调用重载的sendMsg方法,传入默认值以简化调用
        this.sendMsg(topic, null, null, msg);
    }

    /**
     * 发送消息到指定的Kafka主题
     *
     * @param topic 消息主题
     * @param partition 分区编号
     * @param key 消息键值
     * @param msg 消息内容
     */
    public void sendMsg(String topic, Integer partition, String key, String msg){
        // 发生消息并返回异步结果
        CompletableFuture<SendResult> future = this.kafkaTemplate.send(topic, partition, key, msg);

        // 异步处理发送结果
        future.whenCompleteAsync((result, ex) -> {
            if (ex != null){
                // 如果发送过程中出现异常
                logger.error("生产者发送消息失败!原因:{}", ex.getMessage());
            }
        });
    }

}

  • whenCompleteAsync:异步完成时的处理、当异步操作完成时
    在这里插入图片描述

3、UserInfoServiceImpl -> login()

  • 此时 service-user 是生产者 发送消息

在这里插入图片描述

@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class UserInfoServiceImpl extends ServiceImpl<UserInfoMapper, UserInfo> implements UserInfoService {

	@Autowired
	private WxMaService wxMaService;

	@Autowired
	private RedisTemplate redisTemplate;

	@Autowired
	private UserAccountFeignClient userAccountFeignClient;

	@Autowired
	private KafkaService kafkaService;


	/**
	 * 根据微信返回的code进行用户登录
	 * @param code 微信登录凭证
	 * @return 返回包含登录令牌的Map对象
	 */
	//@GlobalTransactional
	//@Transactional
	@Override
	public Map<String, Object> login(String code) {
	    // 创建一个HashMap对象用于存放返回的数据
	    HashMap<String, Object> map = new HashMap<>();

	    try {
	        // 通过微信服务获取用户的会话信息
	        WxMaJscode2SessionResult sessionInfo = this.wxMaService.getUserService().getSessionInfo(code);
	        // 获取用户的openid
	        String openid = sessionInfo.getOpenid();

	        // 查询数据库中是否存在该openid对应的用户信息
	        UserInfo userInfo = this.getOne(new LambdaQueryWrapper<UserInfo>().eq(UserInfo::getWxOpenId, openid));
	        if (userInfo == null) {
	            // 如果用户不存在,则创建一个新的UserInfo对象
	            userInfo = new UserInfo();
	            // 设置用户的openid
	            userInfo.setWxOpenId(openid);
	            // 设置用户的昵称,其中包含一个随机生成的ID
	            userInfo.setNickname("这家伙太懒"+ IdWorker.getIdStr());
	            // 设置用户的头像URL
	            userInfo.setAvatarUrl("https://img0.baidu.com/it/u=1633409170,3159960019&fm=253&fmt=auto&app=138&f=JPEG?w=500&h=500");
	            // 保存用户信息到数据库
	            this.save(userInfo);

	            // 初始化用户账号信息
	            //userAccountFeignClient.initAccount(userInfo.getId());
				this.kafkaService.sendMsg(KafkaConstant.QUEUE_USER_REGISTER,userInfo.getId().toString());


				//int i = 1 / 0;
	        }
	        // 生成一个随机的登录令牌
	        String token = UUID.randomUUID().toString();
	        // 创建一个UserInfoVo对象,用于存放用户信息
	        UserInfoVo userInfoVo = new UserInfoVo();
	        // 将UserInfo对象的属性复制到UserInfoVo对象中
	        BeanUtils.copyProperties(userInfo, userInfoVo);
	        // 将用户信息存储到Redis中,设置过期时间为30分钟
	        this.redisTemplate.opsForValue().set(RedisConstant.USER_LOGIN_KEY_PREFIX + token, userInfoVo,RedisConstant.USER_LOGIN_KEY_TIMEOUT, TimeUnit.SECONDS);

	        // 将生成的登录令牌放入Map对象中
	        map.put("token", token);

	        // 返回包含登录令牌的Map对象
	        return map;
	    } catch (WxErrorException e) {
	        // 如果发生微信错误异常,抛出自定义的异常
	        throw new GuiguException(ResultCodeEnum.LOGIN_AUTH);
	    }
	}

}

在这里插入图片描述

4、service-account - > AccountListener.java

  • 此时 service-account 是消费者 接收消息

在这里插入图片描述

@Slf4j
@Component
public class AccountListener {

    @Autowired
    private UserAccountService userAccountService;

    @RetryableTopic(backoff = @Backoff(2000))
    @KafkaListener(topics = KafkaConstant.QUEUE_USER_REGISTER)
    public void listen(String userId, Acknowledgment ack){

        // 如果是空消息直接确认掉,后续不用再执行
        if (StringUtils.isBlank(userId)) {
            ack.acknowledge();
            return;
        }

        // 初始化账户
        this.userAccountService.saveAccount(Long.valueOf(userId));

        ack.acknowledge();// 手动确认
    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2066902.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

电磁炮设计

视频链接&#xff1a; 电磁炮视频 项目简介 这个项目电磁炮主要是&#xff0c;测试电磁炮原理部分的简易制作&#xff0c;对原理有初步认识&#xff0c;升压电路采用的是boost电路&#xff0c;IGBT作为开关管&#xff0c;电解电容作为储能元件。 项目功能 本设计是基于STM32F4…

Chapter 02 Vue指令(上)

欢迎大家订阅【Vue2Vue3】入门到实践 专栏&#xff0c;开启你的 Vue 学习之旅&#xff01; 文章目录 前言一、v-text指令二、v-html指令三、v-show指令四、v-if指令五、v-else指令六、v-else-if指令 前言 在 Vue.js 中&#xff0c;指令是带有 v- 前缀的特殊属性&#xff0c;不…

【大数据】数据仓库的定义、数据模型及其建设与设计

1. 数据仓库 1.1 定义 数据仓库不是数据的简单堆积&#xff0c;而是从大量的事务型数据库中抽取数据&#xff0c;并将其清理、转换为新的存储格式,即为决策目标把数据聚合在一种特殊的格式中。公认的数据仓库之父 W.H. Inmon 将其定义为&#xff1a;“数据仓库是支持管理决策…

【秋招笔试】8.19蔚来秋招-三语言题解

🍭 大家好这里是 春秋招笔试突围,一起备战大厂笔试 💻 ACM金牌团队🏅️ | 多次AK大厂笔试 | 编程一对一辅导 ✨ 本系列打算持续跟新 春秋招笔试题 👏 感谢大家的订阅➕ 和 喜欢💗 和 手里的小花花🌸 ✨ 笔试合集传送们 -> 🧷春秋招笔试合集 🍒 本专栏已收…

git submodule

文章目录 环境准备用法添加子模块添加b添加c提交总结 其它用户获取子模块其它总结 更新子模块内容方式1&#xff1a;独立更新其它 方式2&#xff1a;在主模块嵌套下更新总结 总结参考 写的有点乱&#xff0c;凑合理解一下吧。另外常用命令总结一下&#xff1a; git submodule …

开发者空间实践指导:基于华为云3大PaaS主流服务轻松实现文字转换语音

案例简介 开发者将在云主机中&#xff0c;基于CodeArts API设计语音合成接口&#xff0c;基于API Explorer调试接口&#xff0c;并利用CodeArts IDE实现数据流转换为音频。在此过程中&#xff0c;开发者可体验API设计、开发、调试等全生命周期&#xff0c;对华为云产品API体系…

vue文件打包后怎么运行

找到打包后的文件 并在此处打开cmd控制台 输入 npm run serve 按住" ctrl " 再点击网址及可访问。 ------------------------------

代码行数计数器

做了个记录代码函数的小程序&#xff0c;后缀名记得设置为.pyw&#xff0c;如果你装了python的话可以直接拿来用&#xff0c;免费自取。 功能说明&#xff1a; 1.记录总行数、当前行数、目标行数三个值 2.具有进度条功能 3.行数的多少能激发不同的反馈&#xff0c;如great&am…

基于分数Talbot效应的阵列光学涡旋产生matlab模拟与仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.本算法原理 5.完整程序 1.程序功能描述 基于分数Talbot效应的阵列光学涡旋产生matlab模拟与仿真&#xff0c;分别测试正方形&#xff0c;旋转正方形以及六边形三种阵列形状下的光学涡旋。 2.测试软件版…

(论文研读)解决transform训练的不稳定性问题:SAMformer(时序预测)

论文链接&#xff1a;https://arxiv.org/abs/2402.10198 作者团队&#xff1a;华为诺亚方舟实验室&#xff08;华为巴黎研究中心&#xff09;&#xff0c;Laboratory of Informatics Paris Descartes (LIPADE) 巴黎笛卡尔大学&#xff08;第五大学&#xff09;信息学实验室 文…

【笔记篇】Davinci Configurator SomeIpXf模块

目录 1 简介1.1 架构概览2 功能描述2.1 特性2.2 初始化2.3 状态机2.4 主函数2.5 故障处理3 集成4 API描述5 配置1 简介 本文主要描述了AUTOSAR SomeIpXf模块的功能。 SomeIpXf主要用途是对数据进行SOME/IP格式的序列化和反序列化。 1.1 架构概览 SomeIpXf在AUTOSAR软件架构…

环绕音效是什么意思,电脑环绕音效怎么开

Boom 3D是一款专业的音效增强软件&#xff0c;它拥有先进的音效处理技术和丰富的音效设置选项&#xff0c;可以为用户打造出高度定制化的音频体验&#xff0c;Boom 3D还拥有简洁直观的界面&#xff0c;操作简单易懂&#xff0c;即使是音频技术的新手也能轻松上手。本篇文章就将…

Mybatis实现员工管理系统

文章目录 1.案例需求2.编程思路3.案例源码4.小结 1.案例需求 在上次做的父子模块的maven以及Ajax实现人工管理系统的基础上使用Mybatis实现员工管理系统的增删改查&#xff0c;具体运行效果如下&#xff1a; 2.编程思路 Mybatis框架的一般执行流程&#xff1a; 创建MyBati…

基于改进字典的大数据多维分析加速实践

一、背景 OLAP场景是大数据应用中非常重要的一环&#xff0c;能够快速、灵活地满足业务各种分析需求&#xff0c;提供复杂的分析操作和决策支持。B站主流湖仓使用Iceberg存储&#xff0c;通过建表优化可以实现常规千万级的指标统计秒级查询&#xff0c;这样就能快速搭建可视化报…

WRF输出结果的可视化展示与分析:以风速为例

1.前言 天气研究与预报 (WRF) 模型是一种功能强大的数值天气预报系统&#xff0c;用于模拟各种尺度的大气现象。WRF 生成大量输出数据&#xff0c;可为气象和气候研究、天气预报和环境管理提供宝贵信息。 WRF 输出数据通常存储在 netCDF 文件中&#xff0c;其中包含具有不同单位…

AI生成PPT怎么用?5款AI PPT工具助你轻松制作演示文稿

当你站在山西应县木塔之下&#xff0c;仰望这座千年古塔的雄伟与震撼&#xff0c;心中不禁涌起一股对历史与建筑艺术的敬畏之情。 想象一下&#xff0c;如果将这份震撼与敬仰融入到你的演示文稿中&#xff0c;那将是多么引人入胜的体验。而这一切&#xff0c;只需借助AI生成PP…

Kubernetes 运维工程师必备:K8s 基础面试题精编(三)

Kubernetes 运维工程师必备:K8s 基础面试题精编(三) 1. 在Kubernetes集群中如何查看Pod的日志?2. 如何将一个已经部署的应用程序从一个命名空间迁移到另一个命名空间?3. 如何更新Kubernetes集群中的应用程序镜像版本?4. 如何通过Kubernetes进行自动扩容?5. 如何手动扩容…

震惊!!大模型玩转JS逆向

不知道大家有没有被JS代码混淆折磨过&#xff0c;我之前搞爬虫的时候&#xff0c;也经常被OB代码混淆搞到心态崩溃&#xff0c;但是自从接触了大模型&#xff0c;腰不疼了&#xff0c;腿不酸了&#xff0c;OB代码直接交给大模型&#xff0c;简直不要太爽 这是一段经过OB混淆之…

盘点12个国内外主流CRM系统,哪一个能免费试用?

客户关系管理&#xff08;CRM&#xff09;系统已成为企业成功的关键工具。它们不仅帮助企业改善客户关系&#xff0c;还提高了销售效率和业务分析能力。在众多选择中&#xff0c;我们特意盘点了12个国内外主流的CRM系统&#xff0c;从地位、业务应用以及特点三方面进行解析&…

车牌号字符检测系统源码分享 # [一条龙教学YOLOV8标注好的数据集一键训练_70+全套改进创新点发刊_Web前端展示]

车牌号字符检测系统源码分享 [一条龙教学YOLOV8标注好的数据集一键训练_70全套改进创新点发刊_Web前端展示] 1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 研究背景与意义 随着智能交通系统的快速发展&#xff0c;车牌号字…