搭建大型分布式服务(三十九)SpringBoot 整合多个kafka数据源-支持Aware模式

news2025/1/15 19:42:15

系列文章目录


文章目录

  • 系列文章目录
  • 前言
      • 一、本文要点
      • 二、开发环境
      • 三、原项目
      • 四、修改项目
      • 五、测试一下
      • 五、小结


前言

本插件稳定运行上百个kafka项目,每天处理上亿级的数据的精简小插件,快速上手。

<dependency>
    <groupId>io.github.vipjoey</groupId>
    <artifactId>multi-kafka-consumer-starter</artifactId>
    <version>最新版本号</version>
</dependency>

例如下面这样简单的配置就完成SpringBoot和kafka的整合,我们只需要关心com.mmc.multi.kafka.starter.OneProcessorcom.mmc.multi.kafka.starter.TwoProcessor 这两个Service的代码开发。

## topic1的kafka配置
spring.kafka.one.enabled=true
spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.one.topic=mmc-topic-one
spring.kafka.one.group-id=group-consumer-one
spring.kafka.one.processor=com.mmc.multi.kafka.starter.OneProcessor // 业务处理类名称
spring.kafka.one.consumer.auto-offset-reset=latest
spring.kafka.one.consumer.max-poll-records=10
spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

## topic2的kafka配置
spring.kafka.two.enabled=true
spring.kafka.two.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.two.topic=mmc-topic-two
spring.kafka.two.group-id=group-consumer-two
spring.kafka.two.processor=com.mmc.multi.kafka.starter.TwoProcessor // 业务处理类名称
spring.kafka.two.consumer.auto-offset-reset=latest
spring.kafka.two.consumer.max-poll-records=10
spring.kafka.two.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.two.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

## pb 消息消费者
spring.kafka.pb.enabled=true
spring.kafka.pb.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.pb.topic=mmc-topic-pb
spring.kafka.pb.group-id=group-consumer-pb
spring.kafka.pb.processor=pbProcessor
spring.kafka.pb.consumer.auto-offset-reset=latest
spring.kafka.pb.consumer.max-poll-records=10
spring.kafka.pb.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.pb.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

国籍惯例,先上源码:Github源码

一、本文要点

本文将介绍通过封装一个starter,来实现多kafka数据源的配置,通过通过源码,可以学习以下特性。系列文章完整目录

  • SpringBoot 整合多个kafka数据源
  • SpringBoot 批量消费kafka消息
  • SpringBoot 优雅地启动或停止消费kafka
  • SpringBoot kafka本地单元测试(免集群)
  • SpringBoot 利用map注入多份配置
  • SpringBoot BeanPostProcessor 后置处理器使用方式
  • SpringBoot 将自定义类注册到IOC容器
  • SpringBoot 注入bean到自定义类成员变量
  • Springboot 取消限定符
  • Springboot 支持消费protobuf类型的kafka消息
  • Springboot Aware设计模式
  • Springboot 获取kafka消息中的topic、offset、partition、header等参数

二、开发环境

  • jdk 1.8
  • maven 3.6.2
  • springboot 2.4.3
  • kafka-client 2.6.6
  • idea 2020

三、原项目

1、接前文,我们修改了抽象父类,并下沉了kafka相关依赖,使得可以支持消费pb类型的格式数据,根据自己的需求解析出实体类。但也有小伙伴反馈,有时候需要获取kafka消息中的partition、offset、topic、header等参数,这个怎样办?


@Slf4j
@Service
public class OneProcessor extends MmcKafkaKafkaAbastrctProcessor<DemoMsg> {

	// 需要在DemoMsg获取partition、offset、topic、header等参数
	// 但不又不想重写父类的doParse()方法,这样显得太笨重
	// 因为需要获取kafka消息中的参数毕竟场景不算太多
    @Override
    protected void dealMessage(List<DemoMsg> datas) {

        datas.forEach(x -> {
            log.info("dealMessage one: {}", x);
        });

    }


}

2、在Spring Boot中,Aware接口是一种特殊的接口,它允许我们在Spring容器初始化时获取特定的bean或属性。Spring Boot提供了多种Aware接口,用于获取不同类型的bean或属性。这些接口的实现原理主要依赖于Spring框架的生命周期回调机制。所以,我们是否可以参考这种方式,让子类按需选择需要哪些参数能力,类似下面这样呢?

// 获取消息去重能力
@Data
class DemoMsg implements MmcMsgDistinctAware{

    private String routekey;

    private String name;

    private Long timestamp;

}

// 获取kafka消息的topic、offset参数
@Data
class DemoMsg implements MmcMsgKafkaAware {

    private String routekey;

    private String name;

    private Long timestamp;

    private String topic;

    private long offset;
}

答案是可以的、但我们要升级和优化一下。

四、修改项目

1、重命名MmcKafkaMsg类为MmcMsgDistinctAware,使得更加符合规范,只要子类实体类实现了本接口,那么就可以具备消息去重的能力。

public interface MmcMsgDistinctAware {

    /**
     * 代表kafka消息的唯一键,用于批次内分组.

     * @return 唯一键
     */
    String getRoutekey();

    /**
     * kafka消息生产或接收时间,用于批次内分组,根据时间去重,取最新的消息.
     *
     * @return 消息时间
     */
    Long getTimestamp();
}

2、新增MmcMsgKafkaAware,只要子类的实体类实现本接口,就可以方便获取kafka消息中的topic、offset等参数。

public interface MmcMsgKafkaAware {

    /**
     * 注入topic.
     *
     * @param topic topic名称
     */
    void setTopic(String topic);

    /**
     * 注入offset.
     *
     * @param offset offset
     */
    void setOffset(long offset);
}

3、修改KafkaAbastrctProcessor抽象父类,重写解析消息方法,使得可以根据实体类的Aware接口标记来获取对应的能力;

@Slf4j
@Setter
abstract class KafkaAbstractProcessor<T> implements MmcInputer {
   
    // 重写解析消息方法,使得可以根据实体类的Aware接口标记来获取对应的能力
	/**
     * 将kafka消息解析为实体,支持json对象或者json数组.
     *
     * @param map kafka消息对象,包含key、value、topic、partition、offset等
     * @return 实体类
     */
    protected Stream<T> doParse(ConsumerRecord<String, Object> map) {

        // 消息对象
        Object record = map.value();

        // 如果是pb格式
        if (record instanceof byte[]) {

            return doParseProtobuf((byte[]) record);

        } else if (record instanceof String) {

            // 普通kafka消息
            String json = record.toString();

            if (json.startsWith("[")) {

                // 数组
                List<T> datas = doParseJsonArray(json);
                if (CommonUtil.isEmpty(datas)) {

                    log.warn("{} doParse error, json={} is error.", name, json);
                    return Stream.empty();
                }

                // 反序列对象后,做一些初始化操作
                datas = datas.stream().peek(x -> doKafkaAware(x, map)).peek(this::doAfterParse).collect(Collectors.toList());

                return datas.stream();

            } else {

                // 对象
                T data = doParseJsonObject(json);
                if (null == data) {

                    log.warn("{} doParse error, json={} is error.", name, json);
                    return Stream.empty();
                }

                // 注入kafka相关
                doKafkaAware(data, map);

                // 反序列对象后,做一些初始化操作
                doAfterParse(data);

                return Stream.of(data);
            }

        } else if (record instanceof MmcKafkaMsg) {

            // 如果本身就是MmcKafkaMsg对象,直接返回
            //noinspection unchecked
            return Stream.of((T) record);

        } else {


            throw new UnsupportedForMessageFormatException("not support message type");
        }

    }

    protected void doKafkaAware(T x, ConsumerRecord<String, Object> record) {
		// 根据自己诉求去扩展,可以增加无限xxxAware,获取任意record的参数
        if (x instanceof MmcMsgKafkaAware) {
            ((MmcMsgKafkaAware) x).setOffset(record.offset());
            ((MmcMsgKafkaAware) x).setTopic(record.topic());
        }

    }

五、测试一下

1、引入kafka测试需要的jar。参考文章:kafka单元测试

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.18.0</version>
            <scope>test</scope>
        </dependency>
        
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java-util</artifactId>
            <version>3.18.0</version>
            <scope>test</scope>
        </dependency>

2、修改DemoMsg,让它实现MmcMsgKafkaAware接口,用来获取topic和offset参数,Processor不用修改,保持不变;

@Data
class DemoMsg implements MmcMsgKafkaAware {

    private String routekey;

    private String name;

    private Long timestamp;

    private String topic;

    private long offset;
}

3、消费者配置保持不变。

## 消费者配置
spring.kafka.one.enabled=true
spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.one.topic=mmc-topic-one
spring.kafka.one.group-id=group-consumer-one
spring.kafka.one.processor=com.mmc.multi.kafka.starter.OneProcessor
spring.kafka.one.consumer.auto-offset-reset=latest
spring.kafka.one.consumer.max-poll-records=10
spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

4、编写测试类,测试类保持不变。

@Slf4j
@ActiveProfiles("dev")
@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = {MmcMultiConsumerAutoConfiguration.class, DemoService.class, OneProcessor.class})
@TestPropertySource(value = "classpath:application.properties")
@DirtiesContext
@EmbeddedKafka(topics = {"${spring.kafka.one.topic}"})
class AppTest {


    @Resource
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Value("${spring.kafka.one.topic}")
    private String topicOne;

    @Value("${spring.kafka.two.topic}")
    private String topicTwo;

    @Test
    void testDealMessage() throws Exception {

        Thread.sleep(2 * 1000);
        
        // 模拟生产数据
        produceMessage();

        Thread.sleep(10 * 1000);
    }

    void produceMessage() {

        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();

        for (int i = 0; i < 10; i++) {

            DemoMsg msg = new DemoMsg();
            msg.setRoutekey("routekey" + i);
            msg.setName("name" + i);
            msg.setTimestamp(System.currentTimeMillis());

            String json = JsonUtil.toJsonStr(msg);
            producer.send(new ProducerRecord<>(topicOne, "my-aggregate-id", json));
            producer.send(new ProducerRecord<>(topicTwo, "my-aggregate-id", json));
            producer.flush();

        }
    }
}


5、运行一下,测试通过,可以看到已经打印topic和offset。
在这里插入图片描述

五、小结

将本项目代码构建成starter,就可以大大提升我们开发效率,我们只需要关心业务代码的开发,github项目源码:轻触这里。如果对你有用可以打个星星哦。下一篇,升级本starter,在kafka单分区下实现十万级消费处理速度。

《搭建大型分布式服务(三十六)SpringBoot 零代码方式整合多个kafka数据源》
《搭建大型分布式服务(三十七)SpringBoot 整合多个kafka数据源-取消限定符》
《搭建大型分布式服务(三十八)SpringBoot 整合多个kafka数据源-支持protobuf》
《搭建大型分布式服务(三十九)SpringBoot 整合多个kafka数据源-支持Aware模式》

加我加群一起交流学习!更多干货下载、项目源码和大厂内推等着你

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1791137.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【成品设计】基于物联网技术的养老院老人健康监控系统的设计与实现

《基于物联网技术的养老院老人健康监控系统的设计与实现》 老年人监护系统的核心任务是分析老年人在日常生活中是否存在意外事件和异常行为并提供相关帮助。正确分析出老年人的突发意外和行为异常&#xff0c;不仅可以有效地降低老年人因意外事故造成的生命风险&#xff0c;还…

【一步一步了解Java系列】:重磅多态

看到这句话的时候证明&#xff1a;此刻你我都在努力 加油陌生人 个人主页&#xff1a;Gu Gu Study专栏&#xff1a;一步一步了解Java 喜欢的一句话&#xff1a; 常常会回顾努力的自己&#xff0c;所以要为自己的努力留下足迹 喜欢的话可以点个赞谢谢了。 作者&#xff1a;小闭…

Vue的APP实现下载文件功能,并将文件保存到手机中

Vue的APP实现下载文件功能&#xff0c;并将文件保存到手机中 文字说明后台核心代码前台核心代码运行截图项目链接 文字说明 本文介绍Vue实现的APP&#xff0c;将文件下载并保存到手机中&#xff0c;为系统提供导出功能&#xff1b;同时支持导入&#xff0c;即选择本地的文件后&…

C++STL---list知识汇总

前言 学习完list&#xff0c;我们会对STL中的迭代器有进一步的认识。list底层有很多经典的东西&#xff0c;尤其是他的迭代器。而list的结构是一个带头双向循环链表。 list没有reserve和resize&#xff0c;因为它底层不是连续的空间&#xff0c;它是用时随时申请&#xff0c;…

18.使用__asm实现调用hp减伤害

上一个内容&#xff1a;17.调用游戏本身的hp减伤害函数实现秒杀游戏角色 17.调用游戏本身的hp减伤害函数实现秒杀游戏角色 以它的代码为基础进行修改 首先禁用安全检查 然后再把优化关闭 编译代码时使用Release方式&#xff0c;debug方式会加一些代码&#xff0c;如果这些代码…

【机器学习】深度探索:从基础概念到深度学习关键技术的全面解析——梯度下降、激活函数、正则化与批量归一化

&#x1f525; 个人主页&#xff1a;空白诗 文章目录 一、机器学习的基本概念与原理二、深度学习与机器学习的关系2.1 概念层次的关系2.2 技术特点差异2.3 机器学习示例&#xff1a;线性回归&#xff08;使用Python和scikit-learn库&#xff09;2.4 深度学习示例&#xff1a;简…

【网络编程开发】4.socket套接字及TCP的实现框架 5.TCP多进程并发

4.socket套接字及TCP的实现框架 Socket套接字 Socket套接字是网络编程中用于实现不同计算机之间通信的一个基本构建块。 在现代计算机网络中&#xff0c;Socket套接字扮演着至关重要的角色。它们为应用程序提供了一种方式&#xff0c;通过这种方式&#xff0c;程序能够通过网…

【技巧】系统语音是英文 影刀如何设置中文-作者:【小可耐教你学影刀RPA】

写在前面 嘿哈&#xff01; 有些跨境或香港的小伙伴&#xff0c;可能需要使用英文操作界面的影刀 该功能目前还没有现成的可视化按钮&#x1f518; 但其实这个效果可以实现&#xff5e; 1、效果图 2、实现原理 %影刀安装目录%\ShadowBot-版本号\ShadowBot.Shell.dll.confi…

十四、返回Insert操作自增索引值

分为两部分&#xff0c;解析初始化和使用 拿含有selectkey标签的insert语句解析来说 解析部分 1.解析时看有没有selectkey标签&#xff0c;有的话先解析selectkey的内容&#xff0c;包括对其SQL的解析并封装成一个MappedStatement和创建KeyGenerator放入configuration中 2.解…

Android WebView上传文件/自定义弹窗技术,附件的解决方案

安卓内核开发 其实是Android的webview默认是不支持<input type"file"/>文件上传的。现在的前端页面需要处理的是&#xff1a; 权限 文件路径AndroidManifest.xml <uses-permission android:name"android.permission.WRITE_EXTERNAL_STORAGE"/&g…

NumPy应用(一)

NumPy学习篇1 NumPy是一个强大的Python库&#xff0c;它提供了高效的多维数组对象和各种用于数组操作的函数。以下是NumPy学习大纲&#xff0c;详细介绍了NumPy的核心功能和概念。 1. NumPy 简介 NumPy是一个用于处理多维数组的Python库&#xff0c;它提供了一个强大的数组对…

测试记录3:WLS2运行Linux界面

1.WLS1转到WLS2 &#xff08;1&#xff09;根据自己的平台&#xff0c;下载WLS2安装包 x64: https://wslstorestorage.blob.core.windows.net/wslblob/wsl_update_x64.msi arm64: https://wslstorestorage.blob.core.windows.net/wslblob/wsl_update_arm64.msi &#xff08;2&…

高清矩阵是什么?

在数学中&#xff0c;矩阵是一个按照长方阵列排列的复数或实数集合&#xff0c;最早来自于方程组的系数及常数所构成的方阵。如图为m行n列的矩阵&#xff1a; 由此延伸可以想到矩阵图片是把一个三维空间分切成多个行和列的区域进行图像捕获&#xff0c;将捕获图像再进行拼合成为…

Spring系统学习 - Spring入门

什么是Spring&#xff1f; Spring翻译过来就是春天的意思&#xff0c;字面意思&#xff0c;冠以Spring的意思就是想表示使用这个框架&#xff0c;代表程序员的春天来了&#xff0c;实际上就是让开发更加简单方便&#xff0c;实际上Spring确实做到了。 官网地址&#xff1a;ht…

k8s 1.28.x 配置nfs

1.安装nfs&#xff0c;在每个节点上安装 yum install -y nfs-utils 2.创建共享目录(主节点上操作) mkdir -p /opt/nfs/k8s 3.编写NFS的共享配置 /opt/nfs/k8s *(rw,no_root_squash) #*代表对所有IP都开放此目录&#xff0c;rw是读写 4.启动nfs systemctl enable nfs-ser…

基于STC12C5A60S2系列1T 8051单片机实现串口调试助手软件与单片机相互发送数据的RS485通信功能

基于STC12C5A60S2系列1T 8051单片机实现串口调试助手软件与单片机相互发送数据的RS485通信功能 STC12C5A60S2系列1T 8051单片机管脚图STC12C5A60S2系列1T 8051单片机串口通信介绍STC12C5A60S2系列1T 8051单片机串口通信的结构基于STC12C5A60S2系列1T 8051单片机串口通信的特殊功…

Wireshark Lua插件入门

摘要 开发中经常通过抓包分析协议&#xff0c;对于常见的协议如 DNS wireshark 支持自动解析&#xff0c;便于人类的理解&#xff0c;对于一些私有协议&#xff0c;wireshark 提供了插件的方式自定义解析逻辑。 1 动手 废话少说&#xff0c;直接上手。 第一步当然是装上wiresh…

Java基础27,28(多线程,ThreadMethod ,线程安全问题,线程状态,线程池)

目录 一、多线程 1. 概述 2. 进程与线程 2.1 程序 2.2 进程 2.3 线程 2.4 进程与线程的区别 3. 线程基本概念 4.并发与并行 5. 线程的创建方式 方式一&#xff1a;继承Thread类 方式二&#xff1a;实现Runable接口 方式三&#xff1a;实现Callable接口 方式四&…

【wiki知识库】05.分类管理模块--后端SpringBoot模块

&#x1f4dd;个人主页&#xff1a;哈__ 期待您的关注 目录 一、&#x1f525;今日目标 二、☀SpringBoot代码修改 1.使用逆向工程生成Category表结构 2. 新增CategoryQueryParam 3.新增CategorySaveParam 4.新增CategotyQueryVo 三、&#x1f916;新增分类管理的相关接口…

数字化营销有哪些模式?企业采用数字营销方式有什么意义?

在当今快速发展的商业环境中&#xff0c;营销已经远远超越了传统的推广和销售概念&#xff0c;演变成一种复杂而全面的组织职能。随着信息技术的飞速发展&#xff0c;数字化营销应运而生&#xff0c;为企业与消费者之间的互动带来了革命性的改变。数字化营销不仅为企业提供了全…