kafka如何动态消费新增topic主题

news2025/1/22 21:36:44

一、背景

使用spring-kafka客户端,每次新增topic主题,都需要硬编码客户端并重新发布服务,操作麻烦耗时长,对于业务逻辑相似场景,创建新主题动态监听可以用kafka-batch-starter组件

二、核心设计点

1、动态接入消息,无需重新发布服务

2、适配spring-kafka配置,和生产运行中的spring-kafka客户端不冲突

三、接入步骤

1、引入maven依赖坐标

<dependency>
    <groupId>io.gitee.qrkcn</groupId>
    <artifactId>kafka-batch-starter</artifactId>
    <version>1.0.0</version>
</dependency>

2、客户端配置,适配spring-kafka原有的配置文件

spring:
  kafka:
    bootstrap-servers: 172.111.0.1:9092
    consumer:
      prefix: auto
      group-id: kafka-local1
      # 手动提交
      enable-auto-commit: false
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        auto.commit.interval.ms: 1000
        session.timeout.ms: 60000
        request.timeout.ms: 1000000
        max.poll.interval.ms: 300000   #默认5分钟 300000ms
        max.poll.records: 30   #默认500条

spring.kafka.consumer.prefix 配置说明:

  • 必填,topic名称前缀符合配置,才会动态监听

  • 不填为null,组件抛出自定义异常:Caused by: java.lang.InterruptedException: spring.kafka.consumer.prefix can not be null if use kafka-batch-starter

  • 填入""空字符串,则会动态监听所有主题

3、实现动态消费方法DynamicConsumerMethod

在processMessage中处理业务逻辑

@Component
@Slf4j
public class DynamicConsumerMethodImpl implements DynamicConsumerMethod {
    @Override
    public void processMessage(ConsumerRecords<String, String> records, String topicName) {
        for (ConsumerRecord<String, String> record : records) {
            String key = record.key();
            String value = record.value();
            int partition = record.partition();
            String topic = record.topic();
            log.info("consumer.topicName={},key={},value={}",topicName, key,value);
        }
    }
}

四、验证

1、新增topic名称为:auto.topic1(由于配置spring.kafka.consumer.prefix 为auto,因此只有auto前缀的topic,才会被组件动态监听。)

2、应用输出日志,监听到新增auto.topic1,并初始化客户端 (主题刷新间隔为10s)

3、发新的消息给auto.topic1,可以看到消费成功,不影响原有kafka客户端的消费

接入成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/363902.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Scala模式匹配详解(第八章:基本语法、模式守卫、模式匹配类型)(尚硅谷笔记)

模式匹配第 8 章 模式匹配8.1 基本语法8.2 模式守卫8.3 模式匹配类型8.3.1 匹配常量8.3.2 匹配类型8.3.3 匹配数组8.3.4 匹配列表8.3.5 匹配元组8.3.6 匹配对象及样例类8.4 变量声明中的模式匹配8.5 for 表达式中的模式匹配8.6 偏函数中的模式匹配(了解)第 8 章 模式匹配 Scal…

Redis:缓存穿透、缓存雪崩和缓存击穿(未完待续)

Redis的缓存穿透、缓存雪崩和缓存击穿一. 缓存穿透1.1 概念1.2 造成的问题1.3 解决方案1.4 案例&#xff1a;查询商铺信息&#xff08;缓存穿透的实现&#xff09;二. 缓存雪崩2.1 概念2.2 解决方案三. 缓存击穿&#xff08;热点key&#xff09;3.1 概念3.2 解决方案3.3 案例&a…

网络基础概述

1.计算机网络背景 ​ 计算机刚刚发展的时候&#xff0c;是没有网络的&#xff0c;每一台计算机都是相互独立的。后来&#xff0c;人们有了多人协作的需求&#xff0c;人们就想办法把多台计算机用“线”连接起来&#xff0c;实现数据共享。后来&#xff0c;连接到一起的电脑越来…

地球板块运动vr交互模拟体验教学提高学生的学习兴趣

海陆变迁是地球演化史上非常重要的一个过程&#xff0c;它不仅影响着地球的气候、地貌、生物多样性等方面&#xff0c;还对人类文明的演化产生了深远的影响。为了帮助学生更加深入地了解海陆变迁的过程和机制&#xff0c;很多高校教育机构开始采用虚拟现实技术进行教学探究。 V…

Go语言进阶与依赖管理-学习笔记

1 语言进阶 1.1 Goroutine 线程&#xff1a;内核态&#xff0c;栈MB级别 协程&#xff1a;用户态&#xff0c;轻量级线程&#xff0c;栈KB级 1.2 CSP 提倡通信实现共享内存 1.3 Channel 创建方法 make(chan 元素类型&#xff0c;缓冲区大小&#xff09; 无缓冲通道&#x…

【Storm】【二】安装

1 准备 1.1 准备linux服务器 本文搭建的是3节点的集群&#xff0c;需要3台linux服务器&#xff0c;我这里使用的是centos7版本的linux虚拟机&#xff0c;虚拟机网络配置如下&#xff1a; 主节点&#xff1a; master 192.168.92.90 从节点&#xff1a; slave1 192.168.92.…

Git 教程

目录1.简介&#xff1a;2.安装Git3.Git 如何工作状态区域4.使用Git5.Git配置5.1 创建仓库 - repository5.2 配置5.2.1 --global5.2.2 检查配置6. 查看工作区的文件状态6.1什么是工作区6.2 如果显示乱码的解决方式7.在工作区添加单个文件8. 添加工作区文件到暂存区9. 创建版本10…

Docker启动RabbitMQ,实现生产者与消费者

目录 一、Docker拉取镜像并启动RabbitMQ 二、Hello World &#xff08;一&#xff09;依赖导入 &#xff08;二&#xff09;消息生产者 &#xff08;三&#xff09;消息消费者 三、实现轮训分发消息 &#xff08;一&#xff09;抽取工具类 &#xff08;二&#xff09;启…

网络安全——数链路层据安全协议

作者简介&#xff1a;一名云计算网络运维人员、每天分享网络与运维的技术与干货。 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a;网络豆的主页​​​​​​ 目录 前言 一.数据链路层安全协议简介 1.数据链路安全性 二.局域网数据链路层协议 1.…

git应用笔记(三)

在新增虚拟机linux的基础上&#xff0c;做git的下载与提交 1、初始化自己的用户名和邮箱。 git config --global user.name “输入你的用户名” git config --global user.email “输入你的邮箱” 2、将本地公钥及配置如图1复制粘贴到虚拟机当前用户.ssh\目录下 4929a0205f43…

面渣逆袭:分布式十二问,万字图文详解

大家好&#xff0c;我是老三&#xff0c;不管今年金三银四如何&#xff0c;面渣逆袭系列继续&#xff0c;这期我们来看看分布式。 分布式理论 1. 说说CAP原则&#xff1f; CAP原则又称CAP定理&#xff0c;指的是在一个分布式系统中&#xff0c;Consistency&#xff08;一致性…

硬件学习 软件Cadence day07 PCB 底板电路图布线

1.根据原理图的元器件&#xff0c; 选择在 PCB 芯片制作的元器件 &#xff08;allegro中原理图和pcb中元件的交互&#xff09; 1.首先完成下列操作 可以尝试先关闭再打开&#xff0c; 等下操作的时候就好 发现新增的发光物体&#xff01;&#xff01; 2.完成操作 &#xff0c;…

Web3中文|香港拟允许比特币交易,瞄准“全球web3中心”

香港再次成为全球加密行业关注的焦点。在美国SEC对于加密货币交易所Kraken、BUSD发行商Paxos以及BA的重磅打击对比下&#xff0c;香港从去年开始持续拥抱Web3的姿态&#xff0c;让投资者开始押注香港。2023年2月20日&#xff0c;香港证监会宣布&#xff0c;就适用于虚拟资产交易…

Linux:基于libevent读写管道代码

基于libevent读写管道代码&#xff1a; 读端&#xff1a; #include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <sys/types.h> #include <sys/stat.h> #include <string.h> #include <event2/event.h> #include…

gdb的简单练习

题目来自《ctf安全竞赛入门》1.用vim写代码vim gdb.c#include "stdio.h" #include "stdlib.h" void main() {int i 100;int j 101;if (i j){printf("bingooooooooo.");system("/bin/sh");}elseprintf("error............&quo…

Python 算法交易实验48 表字段设计

说明 虽然说的是表&#xff0c;实际上用的是Mongo集合 基于ADBS(APIFunc DataBase Service)可以构造一个供后续研究、生产长时间使用的数据基础&#xff0c;这个基础包括了&#xff1a; 1 队列服务。通过队列&#xff0c;数据可以通过API实现零担和批量两种模式的快速存储。2 …

ideal创建maven项目

前置工作本机安装mavenIdea 设置使用本机maven 工具Settings--->Maven开始创建maven项目创建maven项目&#xff0c;勾选通过模板创建&#xff0c;选择 maven-archetype-webapp 模板GroupId: 公司名倒序ArtifactId: 项目名设置本地maven仓库配置项目文件显示名&#xff0c;和…

外卖点餐小程序开发

前言 餐饮行业是一个传统的行业。根据当前发展现状,网络信息时代的全面普及,餐饮行业也在发生着变化,单就点餐这一方面,利用手机点单正在逐步进入人们的生活。传统的点餐方式,不仅会耗费大量的人力、时间,有时候还会出错。小程序系统伴随智能手机为我们提供了新的方向。 手机…

如何使用码匠连接 MySQL

目录 在码匠中集成 MySQL 在码匠中使用 MySQL 关于码匠 目前码匠已经实现了与 MySQL 数据源的连接&#xff0c;支持书写 SQL 语句&#xff0c;也支持通过图形化界面对数据进行增、删、改、查&#xff0c; 同时还支持将数据绑定至各种组件&#xff0c;并通过简单的代码实现数据…

Collecting package metadata (current_repodata.json): failed

一、问题描述 安装anaconda之后&#xff0c;想创建环境&#xff0c;用了下面这段代码&#xff1a; conda create -n pytorch python3.7 conda创建环境报错了&#xff0c;报了如下这一堆&#xff1a; Collecting package metadata (current_repodata.json): failedUnavailab…