SpringBoot 3.1.7 集成Kafka 3.5.0

news2025/1/16 8:03:03

一、背景

写这边篇文章的目的,是记录我在集成kafka客户端遇到的一些问题,文章会记录整个接入的过程,其中会遇到几个坑,如果需要最终版本,直接看最后一节就行了,感觉Spring-Kafka的文档太少了,如果采用SpringBoot集成的方式接入,一不小可能就会踩坑

二、操作步骤

1 添加依赖

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2 添加配置文件

spring:
  profiles:
    active: dev
  application:
    name: goods-center
  kafka:
    bootstrap-servers: 192.168.31.114:9092
    producer:
      acks: all
      timeout.ms: 5000
      # 值序列化:使用Json
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      enable:
        idempotence: true # 默认为True
    consumer:
      group-id: goods-center
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      enable-auto-commit: false # 取消自动提交

3 生产者代码

package com.ychen.goodscenter.fafka;


import com.ychen.goodscenter.vo.req.SubmitOrderReq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class MessageProducer {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private KafkaTemplate<Long, SubmitOrderReq> kafkaTemplate;

    public void sendOrderMessage(SubmitOrderReq msg) {
        kafkaTemplate.send(TopicConstants.ORDER_MESSAGE_TOPIC, msg.getOrderId(), msg);
        logger.info("order-message-topic message sent, orderId: {}", msg.getOrderId());
    }
}

4 消费者代码

package com.ychen.goodscenter.fafka;

import com.ychen.goodscenter.service.OrderService;
import com.ychen.goodscenter.vo.req.SubmitOrderReq;
import org.apache.kafka.clients.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MessageListener {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private OrderService orderService;

    @KafkaListener(topics = "order-message-topic")
    public void processMessage(Consumer<Long, SubmitOrderReq> consumer, SubmitOrderReq submitOrderReq) {
        try {
            logger.info("order-message-topic message received, orderId: {}", submitOrderReq.getOrderId());
            orderService.submitOrder(submitOrderReq);
            // 同步提交
            consumer.commitSync();
            logger.info("order-message-topic message acked: orderId: {}", submitOrderReq.getOrderId());
        } catch (DuplicateKeyException dupe) {
            // 处理异常情况
            logger.error("order-message-topic message processMessage data DuplicateKeyException", dupe);
            // 重复数据,忽略掉,同步提交
            consumer.commitSync();
        } catch (Exception e) {
            // 处理异常情况
            logger.error("order-message-topic message processMessage error", e);
        }
    }
}

三、开始踩坑了

1 添加信任自己包

Caused by: java.lang.IllegalArgumentException: The class 'com.ychen.goodscenter.vo.req.SubmitOrderReq' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
	at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129)
	at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:572)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429)
	... 15 common frames omitted

原因: 因为我在消费消息时用了json序列化,需要给这个序列化,添加信任自己包,不加json序列号会报错

解决方法:添加配置

spring:
  kafka:
    consumer:
      properties:
        spring.json.trusted.packages: "com.ychen.**"  

解决途径:百度

2 consumer.commitSync(); 无效

问题发现:当我正在批量消费消息时,强制重启应用进程,发现有部分消息丢失了,没有处理

我发了5000个样本请求,最后只生成了4912 个订单(中途强制重启了2次)

问题分析:有2中可能

第一种:之前配置的enable-auto-commit: false  是无效的。

第二种: consumer.commitSync(); 一次将批量拉取的offset提交了

问题排查:

通过在 consumer.commitSync(); 代码之前和之后分别打一个断点,然后发送一批数据

consumer.commitSync(); 之前:

consumer.commitSync(); 之后

结果发生了突变,说明是consumer.commitSync();执行之后引发的offset突变

翻阅源码:

总体而言,通过官方文档和源代码,我们可以确定 commitSync() 提交的是已经成功拉取到的消息的最大 offset,而不是当前正在处理的消息的 offset。

3 缺少AckMode 配置

既然consumer.commitSync();无法在批量处理消息的环境保证消息不丢失,那么需要寻找新的解决方案:

在org.springframework.kafka.annotation.KafkaListener 类的注释上面有写到可以使用org.springframework.kafka.support.Acknowledgment

然后我们消费者的代码改造后为:

package com.ychen.goodscenter.fafka;

import com.ychen.goodscenter.service.OrderService;
import com.ychen.goodscenter.vo.req.SubmitOrderReq;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class MessageListener {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private OrderService orderService;

    @KafkaListener(topics = "order-message-topic")
    public void processMessage(ConsumerRecord<Long, SubmitOrderReq> record, Acknowledgment acknowledgment) {
        try {
            logger.info("order-message-topic message received, orderId: {}", record.value().getOrderId());
            orderService.submitOrder(record.value());
            // 同步提交
            acknowledgment.acknowledge();
            logger.info("order-message-topic message acked: orderId: {}", record.value().getOrderId());
        } catch (DuplicateKeyException dupe) {
            // 处理异常情况
            logger.error("order-message-topic message processMessage data DuplicateKeyException", dupe);
            // 重复数据,忽略掉,同步提交
            acknowledgment.acknowledge();
        } catch (Exception e) {
            // 处理异常情况
            logger.error("order-message-topic message processMessage error", e);
        }
    }
}
Caused by: java.lang.IllegalStateException: No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.

这里我也有点不懂了,明明已经配置自动提交了,还需要配置 ack-mode: MANUAL,既然他说要那就配置吧

在application.yml 增加配置

spring:
  kafka:
    listener:
      ack-mode: MANUAL

现在准备2000个样本,然后让消费者实例强制重启2次,看看数据库的订单数量是否为2000条

现在正确了,支持系统宕机仍然不丢失消息了

四、最终的配置文件和消费者代码

1 配置文件

spring:
  profiles:
    active: dev
  application:
    name: goods-center
  kafka:
    bootstrap-servers: 192.168.31.114:9092
    listener:
      ack-mode: MANUAL
    producer:
      acks: all
      timeout.ms: 5000
      # 值序列化:使用Json
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      enable:
        idempotence: true # 默认为True
    consumer:
      properties:
        spring.json.trusted.packages: "com.ychen.**" # 信任自己包,不加json序列号会报错
      group-id: goods-center
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      enable-auto-commit: false # 取消自动提交

2 消费者代码

package com.ychen.goodscenter.fafka;

import com.ychen.goodscenter.service.OrderService;
import com.ychen.goodscenter.vo.req.SubmitOrderReq;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class MessageListener {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private OrderService orderService;

    @KafkaListener(topics = "order-message-topic")
    public void processMessage(ConsumerRecord<Long, SubmitOrderReq> record, Acknowledgment acknowledgment) {
        try {
            logger.info("order-message-topic message received, orderId: {}", record.value().getOrderId());
            orderService.submitOrder(record.value());
            // 同步提交
            acknowledgment.acknowledge();
            logger.info("order-message-topic message acked: orderId: {}", record.value().getOrderId());
        } catch (DuplicateKeyException dupe) {
            // 处理异常情况
            logger.error("order-message-topic message error DuplicateKeyException", dupe);
            // 重复数据,忽略掉,同步提交
            acknowledgment.acknowledge();
        } catch (Exception e) {
            // 处理异常情况
            logger.error("order-message-topic message error unknown ", e);
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1411083.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MATLAB环境下一种音频降噪优化方法—基于时频正则化重叠群收缩

语音增强是语音信号处理领域中的一个重大分支&#xff0c;这一分支已经得到国内外学者的广泛研究。当今时代&#xff0c;随着近六十年来的不断发展&#xff0c;己经产生了许多有效的语音增强算法。根据语音增强过程中是否利用语音和噪声的先验信息&#xff0c;语音增强算法一般…

Linux shell编程学习笔记42:hdparm命令

ChatGPT 和文心一言哪个更好用&#xff1f; 从智能回复、语言准确性、知识库丰富度等方面比较&#xff0c;两大AI助手哪个更胜一筹&#xff1f;快来和我们分享一下你的看法吧~ 0 前言 获取硬盘序列号是信息资产管理和信息安全检测中经常要收集的信息&#xff0c;对于Linux来说…

2.数据结构 顺序表(自留笔记)

文章目录 一.静态顺序表&#xff1a;长度固定二.动态顺序表1.下面证明原地扩容和异地扩容代码如下&#xff1a;2.下面是写一段Print&#xff0c;打印数字看看&#xff1a;3.头插4.尾删5.头删6.越界一定会报错吗7.下标插入8.下标删除9.查找数字10.应用&#xff1a;利用顺序表写一…

云原生 - 微信小程序 COS 对象存储图片缓存强制更新解决方案

问题描述 遇到一个这样的情况&#xff1a;在微信小程序里图片缓存十分麻烦&#xff0c;网上很多说在腾讯云里的 COS 存储对象服务里设置对应的图片缓存&#xff08;Header 头 Cache-Contorl&#xff09;&#xff0c;说实话真不好用&#xff0c;一会儿生效&#xff0c;一会儿没…

使用宝塔面板部署Node.js+Mysql服务和Vue3-Admin项目到云服务器上

准备工作 一台云服务器&#xff0c;可以先用免费试用一个月的服务器进行练手&#xff1b;我这里选择的是腾讯云的轻量云服务器&#xff1b; 1、在云服务器上安装宝塔面板 宝塔面板官网地址&#xff1a;https://www.kancloud.cn/chudong/bt2017/424209 1.1 安装Xshell脚本工…

SpringCloud Aliba-Seata【下】-从入门到学废【8】

目录 1.数据库创建 1.seata_account库下建表 2.seata_order库下建表 3.seata_storage库下建表 4.在每个库下创建回滚日志 2.创建订单模块 2.1建工程 2.2加pom 2.3改yml 2.4file.conf 2.5registry.conf 2.6domain 2.7Dao 2.8Service 2.9controller 2.10confi…

cidp环境启动步骤及注意事项

1、导入项目 选择file——》import…——》Generate——》Exiting Projects into Workspace——》选择要导入的项目 2、添加tomcat 1&#xff09;点击Serves——》No servers are available. Click this link to create a new server… 2&#xff09;点击“Add…” 3&…

阿里巴巴Java开发手册(详尽版)

点击下载 阿里巴巴Java开发手册

C++输入输出流

输入/输出流类&#xff1a;iostream---------i input&#xff08;输入&#xff09; o output&#xff08;输出&#xff09; stream&#xff1a;流 iostream&#xff1a; istream类&#xff1a;输入流类-------------cin&#xff1a;输入流类的对象 ostre…

【java面试】常见问题(超详细)

目录 一、java常见问题JDK和JRE的区别是什么&#xff1f;Java中的String类是可变的还是不可变的&#xff1f;Java中的equals方法和hashCode方法有什么关系&#xff1f;Java中什么是重载【Overloading】&#xff1f;什么是覆盖【Overriding】&#xff1f;它们有什么区别&#xf…

React进阶 - 14(说一说”虚拟DOM“中的”Diff算法“)

本章内容 目录 一、了解 Diff 算法二、key 值的重要性三、为什么不建议使用 index 做 key 值 上一节我们初步了解了 React中的”虚拟 DOM“ &#xff0c;本节我们来说一说”虚拟DOM“中的”Diff算法“ 一、了解 Diff 算法 在上一篇中&#xff0c;我们有讲到&#xff1a;当 st…

【Computer Networks】FDM、TDM、WDM、CDM

目录 FDM TDM ​WDM CDM FDM TDM WDM CDM

C语言每日一题(47)两数相加II

力扣 445 两数相加II 题目描述 给你两个 非空 链表来代表两个非负整数。数字最高位位于链表开始位置。它们的每个节点只存储一位数字。将这两数相加会返回一个新的链表。 你可以假设除了数字 0 之外&#xff0c;这两个数字都不会以零开头。 示例1&#xff1a; 输入&#xff…

Java如何对OSS存储引擎的Bucket进行创建【OSS学习】

在前面学会了如何开通OSS&#xff0c;对OSS的一些基本操作&#xff0c;接下来记录一下如何通过Java代码通过SDK对OSS存储引擎里面的Bucket存储空间进行创建。 目录 1、先看看OSS&#xff1a; 2、代码编写&#xff1a; 3、运行效果&#xff1a; 1、先看看OSS&#xff1a; 此…

FPGA高端项目:Xilinx Zynq7020系列FPGA多路视频拼接 工程解决方案 提供6套工程源码和技术支持

目录 1、前言版本更新说明给读者的一封信FPGA就业高端项目培训计划免责声明 2、相关方案推荐我已有的FPGA视频拼接叠加融合方案本方案在Xilinx Kintex7 系列FPGA上的应用本方案在Xilinx Artix7 系列FPGA上的应用 3、设计思路框架视频源选择ov5640 i2c配置及采集动态彩条多路视频…

实用工具合集(持续更新...)

一、搜索引擎 1.1、小白盘 网站&#xff1a;https://www.xiaobaipan.com 度盘资源搜索的网站&#xff0c;能够搜索电影、电视剧、小说、音乐等资源&#xff08;注意&#xff1a;评论区很多小伙伴说小白盘有毒&#xff0c;我用谷歌浏览器搜索过几次并无大碍&#xff0c;请慎用…

JeecgBoot集成TiDB,打造高效可靠的数据存储解决方案

TiDB简介 TiDB是PingCAP公司自主设计、研发的开源分布式关系型数据库&#xff0c;同时支持在线事务处理与在线分析处理 (Hybrid Transactional and Analytical Processing, HTAP) 的融合型分布式数据库产品&#xff0c;具备水平扩容或者缩容、金融级高可用、实时 HTAP、云原生…

三篇论文联合复现:高比例新能源下考虑需求侧响应和智能软开关的配电网重构程序代码!

适用平台&#xff1a;MatlabYalmipCplex 程序在高比例新能源接入的情况下提出了考虑需求响应&#xff08;DR&#xff09;和智能软开关&#xff08;SOP&#xff09;的多时段主动配电网重构策略&#xff0c;进一步降低配电系统重构费用&#xff0c;减少弃风率和弃光率&#xff1…

CSS自适应分辨率 postcss-pxtorem(适用于 Vite)

前言 此篇是基于 Vite Vu3 项目的 CSS 自适应分辨率&#xff01; 如果想知道基于 Webpack Vue2 可移步 《CSS自适应分辨率 amfe-flexible 和 postcss-pxtorem&#xff08;适用于 Webpack&#xff09;》 项目对应的主要插件版本如下&#xff1a; "vite": "^4…

PMU || PMIC(Power management IC):电源管理集成电路

1、PMU&#xff08;电源管理芯片&#xff09;是一种高度集成化的电源管理方案&#xff0c;可以简单理解为集成多路的LDO和DC-DC&#xff0c;以及相应的检测和控制电路&#xff0c;其核心结构通常是PWM控制器和MOSFET。高集成度的PMU器件可以有效减小电路板占用面积和器件数量&a…