动态订阅kafka mq实现(消费者组动态上下线)

news2025/2/22 1:41:46

和上篇文章 动态订阅rocket mq实现(消费者组动态上下线) 目的一致,直接上代码

    /**
     * Kafka topic container集合
     */
    private static final Map<String, ConcurrentMessageListenerContainer<String, String>> topics = new HashMap<>();

	public void registerKafkaListeners(BinlogPortDatabaseConfig binlogPortDatabaseConfig) {
	/*
		BinlogPortDatabaseConfig是自定义的数据结构,即需要动态注册的kafka配置
		包含topic、sever、client,自定义即可
	*/
        ConsumerFactory<String, String> consumerFactory = binlogPortDatabaseConfig.createConsumerFactory();
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setBatchListener(true);
        if (consumerFactory == null) {
            return;
        }
        factory.setConsumerFactory(consumerFactory);
        ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(binlogPortDatabaseConfig.getTopic());
        //设置为false,解决client后自动加-0的问题
        container.setAlwaysClientIdSuffix(false);
        container.setupMessageListener((MessageListener<String, String>) record -> {
           //TODO:你的消费逻辑,record即为消息体
                }
            } catch (IllegalArgumentException e) {
                log.error("registerKafkaListeners JSON解析失败", e);
            } catch (NullPointerException e) {
                log.error("registerKafkaListeners 消息为空或部分字段缺失", e);
            } catch (Exception e) {
                log.error("registerKafkaListeners 注册异常", e);
            }
        });
        container.start();
        topics.put(binlogPortDatabaseConfig.getTopic(), container);
    }


    public void factoryDel(String topic) {
        ConcurrentMessageListenerContainer<String, String> container = topics.get(topic);
        if (!topic.isEmpty()) {
            container.stop();
            topics.remove(topic);
        }
    }

    public ConsumerFactory<String, String> createConsumerFactory() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, /*你的kafka server*/);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, /*你的kafka client*/);
        if (SystemEnvUtil.isTest()) {
            props.put(ConsumerConfig.GROUP_ID_CONFIG, Constant.consumerGroupIdOffline + topic);
        }
        if (SystemEnvUtil.isProd() || SystemEnvUtil.isSandbox()) {
            props.put(ConsumerConfig.GROUP_ID_CONFIG,/*你的group id*/);
        }
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(100));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(false));

        Map<String, Object> configMap = new java.util.HashMap<>();
        for (Map.Entry<Object, Object> entry : props.entrySet()) {
            configMap.put((String) entry.getKey(), entry.getValue());
        }
        return new DefaultKafkaConsumerFactory<>(configMap);
    }



本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2301483.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【python碎碎笔记】

1.交互模式和编辑器模式 2. 保存文件格式.py &#xff08;表示python文件&#xff09; 3.缩进是python的命&#xff01; 4.内置函数 dir(__builtins__) [ArithmeticError, AssertionError, AttributeError, BaseException, BaseExceptionGroup, BlockingIOError, Broken…

【OS安装与使用】part3-ubuntu安装Nvidia显卡驱动+CUDA 12.4

文章目录 一、待解决问题1.1 问题描述1.2 解决方法 二、方法详述2.1 必要说明2.2 应用步骤2.2.1 更改镜像源2.2.2 安装NVIDIA显卡驱动&#xff1a;nvidia-550&#xff08;1&#xff09;查询显卡ID&#xff08;2&#xff09;PCI ID Repository查询显卡型号&#xff08;3&#xf…

python-leetcode 37.翻转二叉树

题目&#xff1a; 给定一颗二叉树的根节点root,翻转这棵二叉树&#xff0c;并返回根节点 方法一&#xff1a;递归 从根节点开始&#xff0c;递归地对树进行遍历&#xff0c;并从叶子节点先开始翻转。如果当前遍历到的节点root的左右两棵子树都已经翻转&#xff0c;那么我们只…

Vue 实现通过URL浏览器本地下载 PDF 和 图片

1、代码实现如下&#xff1a; 根据自己场景判断 PDF 和 图片&#xff0c;下载功能可按下面代码逻辑执行 const downloadFile async (item: any) > {try {let blobUrl: any;// PDF本地下载if (item.format pdf) {const response await fetch(item.url); // URL传递进入i…

android,flutter 混合开发,pigeon通信,传参

文章目录 app效果native和flutter通信的基础知识1. 编解码器 一致性和完整性&#xff0c;安全性&#xff0c;性能优化2. android代码3. dart代码 1. 创建flutter_module2.修改 Android 项目的 settings.gradle&#xff0c;添加 Flutter module3. 在 Android app 的 build.gradl…

unity学习47:寻路和导航,unity2022后版本如何使用 Navmesh 和 bake

目录 1 寻路和导航对移动的不同 1.1 基础的移动功能 1.1.1 基础移动 1.1.2 智能导航寻路 1.1.3 智能导航寻路还可以 2 如何实现这个效果&#xff1f; 2.1 通过地图网格的形式 2.1.1 警告信息 the static value has been deprecated的对应搜索 2.1.2 新的navigation ba…

跟着李沐老师学习深度学习(十二)

循环神经网络 序列模型 序列数据 实际中很多数据是有时序结构的 比如&#xff1a;电影的评价随时间变化而变化 拿奖后评分上升&#xff0c;直到奖项被忘记看了很多好电影后&#xff0c;人们的期望变高季节性:贺岁片、暑期档导演、演员的负面报道导致评分变低 核心思想&#…

深入解析NoSQL数据库:从文档存储到图数据库的全场景实践

title: 深入解析NoSQL数据库:从文档存储到图数据库的全场景实践 date: 2025/2/19 updated: 2025/2/19 author: cmdragon excerpt: 通过电商、社交网络、物联网等12个行业场景,结合MongoDB聚合管道、Redis Stream实时处理、Cassandra SSTable存储引擎、Neo4j路径遍历算法等42…

STM32物联网终端实战:从传感器到云端的低功耗设计

STM32物联网终端实战&#xff1a;从传感器到云端的低功耗设计 一、项目背景与挑战分析 1.1 物联网终端典型需求 &#xff08;示意图说明&#xff1a;传感器数据采集 → 本地处理 → 无线传输 → 云端存储&#xff09; 在工业物联网场景中&#xff0c;终端设备需满足以下核心需…

[实现Rpc] 客户端划分 | 框架设计 | common类的实现

目录 3. 客户端模块划分 3.1 Network模块 3.2 Protocol模块 3.3 Dispatcher模块 3.4 Requestor模块 3.5 RpcCaller模块 3.6 Publish-Subscribe模块 3.7 Registry-Discovery模块 3.8 Client模块 4. 框架设计 4.1 抽象层 4.2 具象层 4.3 业务层 ⭕4.4 整体设计框架…

【SFRA】笔记

GK_SFRA_INJECT(x) SFRA小信号注入函数,向控制环路注入一个小信号。如下图所示,当前程序,小信号注入是在固定占空比的基础叠加小信号,得到新的占空比,使用该占空比控制环路。 1.2 GK_SFRA_COLLECT(x, y) SFRA数据收集函数,将小信号注入环路后,该函数收集环路的数据,以…

基于Python的Diango旅游数据分析推荐系统设计与实现+毕业论文(15000字)

基于Python的Diango旅游数据分析推荐系系统设计与实现毕业论文指导搭建视频&#xff0c;带爬虫 配套论文1w5字 可定制到某个省份&#xff0c;加40 基于用户的协同过滤算法 有后台管理 2w多数据集 可配套指导搭建视频&#xff0c;加20 旅游数据分析推荐系统采用了Python语…

国自然青年基金|针对罕见神经上皮肿瘤的小样本影像深度数据挖掘关键技术研究|基金申请·25-02-15

小罗碎碎念 今天和大家分享一个国自然青年基金项目&#xff0c;执行年限为2021.01&#xff5e;2023.12&#xff0c;直接费用为24万元。 该项目聚焦罕见神经上皮肿瘤小样本影像深度数据挖掘技术&#xff0c;致力于攻克小样本数据和临床经验缺乏带来的难题。项目围绕影像规范化、…

Linux 网络安全技巧

网络安全是一个非常重要的课题,基本上你运行的服务后台越多,你就可能打开更多的安全漏洞.如果配置的恰当的话,Linux本身是非常安全可靠的,假使在Linux系统中有某个安全缺陷,由于Linux的源码是开放的&#xff0c;有成千上万的志愿者会立刻发现并修补它。本文旨在介绍用来增强你的…

Windows桌面系统管理7:国产操作系统与Linux操作系统

Windows桌面系统管理0&#xff1a;总目录-CSDN博客 Windows桌面系统管理1&#xff1a;计算机硬件组成及组装-CSDN博客 Windows桌面系统管理2&#xff1a;VMware Workstation使用和管理-CSDN博客 Windows桌面系统管理3&#xff1a;Windows 10操作系统部署与使用-CSDN博客 Wi…

百度百舸 DeepSeek 一体机发布,支持昆仑芯 P800 单机 8 卡满血版开箱即用

在私有云环境中成功部署 DeepSeek 满血版并实现性能调优&#xff0c;并不是一件容易的事情。选择合适的 GPU 配置、安装相应的环境、成功部署上线业务、加速推理任务加速、支撑多用户并发 …… 完成业务测试&#xff0c;成功融入生产业务中。 为了帮助企业快速实现 DeepSeek 服…

解锁 AIoT 无限可能,乐鑫邀您共赴 Embedded World 2025

2025 年 3 月 11-13 日&#xff0c;全球规模最大的嵌入式展览会——Embedded World 2025 将在德国纽伦堡盛大开幕。作为物联网和嵌入式技术领域的领先企业&#xff0c;乐鑫信息科技 (688018.SH) 将展示在 AI LLM、HMI、双频 Wi-Fi 6、低功耗 MCU 和 Matter 等领域的最新技术及解…

LlamaFactory可视化模型微调-Deepseek模型微调+CUDA Toolkit+cuDNN安装

LlamaFactory https://llamafactory.readthedocs.io/zh-cn/latest/ 安装 必须保证版本匹配&#xff0c;否则到训练时&#xff0c;找不到gpu cuda。 否则需要重装。下面图片仅供参考。因为cuda12.8装了没法用&#xff0c;重新搞12.6 cudacudnnpytorch12.69.612.6最新&#xf…

【GPT】从GPT1到GPT3

every blog every motto: Although the world is full of suffering&#xff0c; it is full also of the overcoming of it 0. 前言 从GPT1 到GPT3 1. GPT1 论文&#xff1a; https://s3-us-west-2.amazonaws.com/openai-assets/research-covers/language-unsupervised/lan…

openGauss 3.0 数据库在线实训课程18:学习视图管理

前提 我正在参加21天养成好习惯| 第二届openGauss每日一练活动 课程详见&#xff1a;openGauss 3.0.0数据库在线实训课程 学习目标 掌握openGauss视图的管理&#xff1a;创建视图、删除视图、查询视图的信息、修改视图的信息。 课程作业 1.创建表&#xff0c;创建普通视图…