kafka广播消费组停机后未删除优化

news2025/1/11 8:37:38

背景

kafka广播消息的时候为了保证groupId不重复,再创建的时间采用前缀+时间戳的形式,这样可以保证每次启动的时候是创建的新的,但是

会出现一个问题:就是每次停机或者重启都会新建一个应用实例,关闭应用后并不会删除kafka下面的消费组,导致消费组越来越多,目前

我们有promethes监控kafka消息偏移,一直没有消费的消费组就会进行报警;

解决思路

既然是没有删除消费组就通过优雅停机,应用关闭前采用java的api操作kafka消费组,进行删除

代码实现

1)编写类实现DisposableBean接口,实现destroy方法,注意每个项目定义的id会不一样,此例子中 id = “cfgs-broadcast”

package com.simo.vsim.cfgs.init;
 
import com.alibaba.nacos.api.config.annotation.NacosValue;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
import org.apache.kafka.common.KafkaFuture;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;
 
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
 
@Data
@Component
@Slf4j
public class ApplicationListen implements InitializingBean, DisposableBean {
 
    @Resource
    private KafkaListenerEndpointRegistry registry;
 
    @NacosValue(value = "${spring.kafka.bootstrap-servers}", autoRefreshed = true)
    private String servers;
 
    @Override
    public void destroy()  {
        MessageListenerContainer listenerContainer = registry.getListenerContainer("cfgs-broadcast");
        String groupId = listenerContainer.getGroupId();
        Map<String, Object> props = new HashMap<>(1);
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,servers);
        AdminClient adminClient = AdminClient.create(props);
        DeleteConsumerGroupsResult deleteConsumerGroupsResult = adminClient.deleteConsumerGroups(Arrays.asList(groupId));
        KafkaFuture resultFuture = deleteConsumerGroupsResult.all();
        try {
            resultFuture.get();
            log.info("kafka关闭消费组="+groupId);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
        adminClient.close();
    }
 
    @Override
    public void afterPropertiesSet() {
    }
}

2)接收kafka广播消息的时候指定容器id,用于第一步通过id进行删除,id = “cfgs-broadcast”

/**
 * groupId不一样代表广播模式,earliest 可能重复消费,latest可能漏消费
 * @param message
 * @param ack
 */
@KafkaListener(containerFactory = "manualImmediateListenerContainerFactory" , topics = {"${kafka.topic.cfgs-broadcast}"},properties = {"auto.offset.reset=latest"},
        groupId = "cfgs-broadcast-" + "#{T(java.lang.System).currentTimeMillis()}",idIsGroup = false,id = "cfgs-broadcast")
public void onMessageManualBroadcast(List<Object> message, Acknowledgment ack){
    message.forEach(item -> handleMsg(2,item));
    //直接提交offset
    ack.acknowledge();
}

效果

1)正常启动有这个消费组:cfgs-broadcast-1696754926097

2)重新启动,通过日志显示已经删除(k8s默认是优雅停机)
在这里插入图片描述
如果是iead直接关闭下,不要一下子点击两下停止,点击一次是优雅停机,连续点击2次就是kill -9的效果,就无法看到效果
![在这里插入图片描述](https://img-blog.csdnimg.cn/d36947cdd8f048acaa886eadafeaa34b.png

3)查看kafka消费组,确实已经删除

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1082471.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何调整 Kubernetes StatefulSet 卷的大小

Kubernetes StatefulSet用于在集群内部署有状态应用程序。StatefulSet 中的每个 Pod 都可以访问即使在重新调度后仍坚持使用的本地持久卷。这使得 Pod 能够维护与其集合中的邻居不同的单独状态。 不幸的是,这些卷有一个很大的限制:Kubernetes 没有提供从 StatefulSet 对象调整…

排序算法-冒泡排序法(BubbleSort)

排序算法-冒泡排序法&#xff08;BubbleSort&#xff09; 1、说明 冒泡排序法又称为交换排序法&#xff0c;是从观察水中的气泡变化构思而成的&#xff0c;原理是从第一个元素开始&#xff0c;比较相邻元素的大小&#xff0c;若大小顺序有误&#xff0c;则对调后再进行下一个…

排序算法-选择排序法(SelectionSort)

排序算法-选择排序法&#xff08;SelectionSort&#xff09; 1、说明 选择排序法也是枚举法的应用&#xff0c;就是反复从未排序的数列中取出最小的元素&#xff0c;加入另一个数列中&#xff0c;最后的结果即为已排序的数列。选择排序法可使用两种方式排序&#xff0c;即在所…

基于nodejs+vue驾校预约管理系统

通过科技手段提高自身的优势&#xff1b;对于驾校预约管理系统当然也不能排除在外&#xff0c;随着网络技术的不断成熟&#xff0c;带动了驾校预约管理系统&#xff0c; 随着科学技术的飞速发展&#xff0c;各行各业都在努力与现代先进技术接轨&#xff0c;驾校预约管理系统&am…

【Rust】包和模块,文档注释,Rust格式化输出

文章目录 包和模块包 CrateRust 的标准目录结构 模块 Module用路径引用模块使用super引用模块使用self引用模块结构体和枚举的可见性 使用 use 引入模块及受限可见性基本引入方式绝对路径引入模块相对路径引入模块中的函数 避免同名引用 注释和文档文档注释包和模块级别的注释注…

conda: error: argument COMMAND: invalid choice: ‘activate‘

参考:https://github.com/conda/conda/issues/13022 输入后重启terminal即可

Spring Boot 开发环境热部署

Spring Boot 项目无法像前端项目那样&#xff0c;修改源代码后刷新网页就能即时看到效果&#xff0c;需要先暂停运行&#xff0c;再重新启动&#xff0c;最后刷新网页。 为了避免这一麻烦的操作&#xff0c;我们可以设置热部署&#xff0c;启动服务后不论怎么修改源码&#xf…

NodeJs中使用JSONP和Cors实现跨域

跨域是为了解决浏览器请求域名&#xff0c;协议&#xff0c;端口不同的接口&#xff0c;相同的接口是不需要实现跨域的。 1.使用JSONP格式实现跨域 实现步骤 动态创建一个script标签 src指向接口的地址 定义一个函数和后端调用的函数名一样 实现代码 -- 在nodejs中使用http内…

【Mysql】重新认识mysql(一)

参考Mysql是怎么运行的&#xff0c;并结合实际的工作经验对mysql的知识进行总结。 Mysql架构 从大体上来说&#xff0c;Mysql是C/S架构。以我们平时使用的QQ为例&#xff0c;它其实是由两部分组成的&#xff0c;一部分是客户端程序,&#xff0c;一部分是服务器程序。客户端可…

【深蓝学院】手写VIO第6章--视觉前端--作业(SVD分解部分复习)

0. 题目 T1. 奇异值分解需要补&#xff0c;看这篇博客&#xff0c;讲的很好。 总结一下&#xff0c;关于奇异值分解(Singular Value Decomposition&#xff0c;SVD )有以下内容摘抄自该博客&#xff0c;关于SDV分解的部分应该是摘自李航《统计学习方法里面的》&#xff1a; 1…

互联网性能和可用性优化CDN和DNS

当涉及到互联网性能和可用性优化时&#xff0c;DNS&#xff08;Domain Name System&#xff09;和CDN&#xff08;Content Delivery Network&#xff09;是两个至关重要的元素。它们各自发挥着关键作用&#xff0c;以确保用户能够快速、可靠地访问网站和应用程序。在本文中&…

项目管理之常见七大问题挑战

在当今复杂多变的市场环境下&#xff0c;企业为了生存和发展&#xff0c;必须不断应对和解决各种挑战。其中&#xff0c;项目管理作为企业运营及项目交付等的重要组成部分&#xff0c;也面临着七大问题挑战。这些挑战不仅影响着项目的成功实施&#xff0c;也对企业的发展产生着…

[Spring] SpringMVC 简介(一)

目录 一、SpringMVC 简介 1、什么是 MVC 2、什么是 SpringMVC 3、SpringMVC 实现原理 4、SpringMVC 的特点 二、简单案例 1、引入依赖 2、在 web.xml 中配置前端控制器 DispatcherServlet 3、创建 SpringMVC 的配置文件 4、创建请求控制器 5、测试页面 6、访问不到 …

Go 存储系列:B+树存储引擎 boltdb

boltdb 介绍 boltdb是一个纯go编写的支持事务的文件型单机kv数据库 支持事务&#xff1a; boltdb数据库支持两类事务&#xff1a;读写事务、只读事务。这一点就和其他kv数据库有很大区别文件型&#xff1a; boltdb所有的数据都是存储在磁盘上的&#xff0c;所以它属于文件型数…

信号与系统第一章

文章目录 1.1连续信号与离散信号1.1.2信号能量与功率能量讨论无穷区间内功率和能量&#xff1a;无限区间内的平均功率&#xff1a;利用上述定义区分三种重要信号 1.2自变量的变换1.2.1举例基本变换1.2.2周期信号1.2.3偶信号与奇信号 1.3指数信号与正弦信号1.3.1连续时间复指数信…

LeetCode(力扣)416. 分割等和子集Python

LeetCode416. 分割等和子集 题目链接代码 题目链接 https://leetcode.cn/problems/partition-equal-subset-sum/ 代码 class Solution:def canPartition(self, nums: List[int]) -> bool:sum 0dp [0]*10001for num in nums:sum numif sum % 2 1:return Falsetarget …

python之计算市场技术指标

1、MA MA指标是一种常用的技术指标&#xff0c;它是通过计算一定时间内的股价平均值来反映股价趋势的指标。通常&#xff0c;MA指标越平滑&#xff0c;就能更好地反映出股价的长期趋势。 MA指标的作用是帮助投资者识别股票价格的趋势。当股票价格的MA指标向上运动时&#xff…

分类网络的评价指标

之前一直是做目标检测的研究&#xff0c;在目标检测中主要有两个任务&#xff0c;一个是分类回归&#xff0c;一个是位置回归&#xff0c;所用的评价指标有&#xff1a;AP&#xff0c;mAP&#xff0c;Recall&#xff0c;Precision&#xff0c;F1值&#xff0c;前两个用的一般最…

论文学习记录--零样本学习(zero-shot learning)

Socher R, Ganjoo M, Manning C D, et al. Zero-shot learning through cross-modal transfer[J]. Advances in neural information processing systems, 2013, 26. 注&#xff1a;中文为机翻 zero-shot learning&#xff1a;通过学习类别之间的关系和属性&#xff0c;使得模型…

每日leetcode_LCP01猜数字

每日leetcode_LCP01猜数字 记录自己的成长&#xff0c;加油。 题目出处&#xff1a;LCP 01. 猜数字 - 力扣&#xff08;LeetCode&#xff09; 题目 解题 class Solution {public int game(int[] guess, int[] answer) {int count 0;for (int i 0 ; i< guess.length; i){…