java 中 main 方法使用 KafkaConsumer 拉取 kafka 消息如何禁止输出 debug 日志

news2025/1/13 21:27:33

pom 依赖:

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
	<version>2.5.14.RELEASE</version>
</dependency>

 或者

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.5.1</version>
</dependency>

 ps:前面的 spring-kafka 依赖中已经包含了后面的 kafka-clients

KafkaConsumerDemo.java:


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.LoggerFactory;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.LoggerContext;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
import java.util.logging.Logger;

public class KafkaConsumerDemo {

    static Map<String,Object> properties = new HashMap<String,Object>();
    private static KafkaConsumer kafkaConsumer = null;

    /**
     *
     * windows 环境需要将下面 8 行添加到 "C:\Windows\System32\drivers\etc\hosts" 文件中:
     *      xxx.xxx.xxx.xxx1 xxx-data01
     *      xxx.xxx.xxx.xxx2 xxx-data02
     *      xxx.xxx.xxx.xxx3 xxx-data03
     *      xxx.xxx.xxx.xxx4 xxx-data04
     *      xxx.xxx.xxx.xxx5 xxx-data05
     *      xxx.xxx.xxx.xxx6 xxx-data06
     *      xxx.xxx.xxx.xxx7 xxx-data07
     *      xxx.xxx.xxx.xxx8 xxx-data08
     * @param args
     */
    public static void main(String[] args) {
		// 禁止控制台输出一些 org.apache.kafka.xxx 相关的日志
        LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
        loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.ConsumerCoordinator").setLevel(Level.OFF);
        loggerContext.getLogger("org.apache.kafka.clients.FetchSessionHandler").setLevel(Level.OFF);
        loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.Fetcher").setLevel(Level.OFF);
        loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.AbstractCoordinator").setLevel(Level.OFF);
        loggerContext.getLogger("org.apache.kafka.clients.NetworkClient").setLevel(Level.OFF);
        loggerContext.getLogger("org.apache.kafka.common.network.Selector").setLevel(Level.OFF);
        loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient").setLevel(Level.OFF);
        loggerContext.getLogger("org.apache.kafka.clients.Metadata").setLevel(Level.OFF);
        loggerContext.getLogger("org.apache.kafka.clients.consumer.KafkaConsumer").setLevel(Level.OFF);
        loggerContext.getLogger("org.apache.kafka.common.utils.AppInfoParser").setLevel(Level.OFF);
        loggerContext.getLogger("org.apache.kafka.clients.consumer.KafkaConsumer").setLevel(Level.OFF);
        loggerContext.getLogger("org.apache.kafka.clients.consumer.ConsumerConfig").setLevel(Level.OFF);

        properties.put("bootstrap.servers","127.0.0.1:9192,127.0.0.1:9192,127.0.0.1:9192");  // 指定 Broker
        properties.put("group.id", "11111111111111111111111");              // 指定消费组群 ID,为防止自己启动拉取消息导致其他生产环境的消费者无法消费该消息,请设置一个绝对不重复的值,以起到隔离的作用
        properties.put("max.poll.records", "1000");
        // todo 设置可批量拉取???
        properties.put("enable.auto.commit", "false");
        properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象
        properties.put("value.deserializer", StringDeserializer.class);  // 将 value 的字节数组转成 Java 对象

        kafkaConsumer = new KafkaConsumer<String, String>(properties);
        // List<String> topics = queryAllTopics( consumer );
        kafkaConsumer.subscribe( Collections.singletonList( "ods_carbon_rfid_device_record" ) );  // 订阅主题 order-events
        new Thread(new Runnable() {
            @Override
            public void run() {
                receiveMessage();
            }
        }).start();
    }

    /**
     * 查询全部的主题(topic)列表
     * @param kafkaConsumer
     * @return
     */
    private static List<String> queryAllTopics(KafkaConsumer kafkaConsumer) {
        if( kafkaConsumer == null ){
            return null;
        }
        Map<String, List<PartitionInfo>> map = kafkaConsumer.listTopics();
        if( map == null ){
            return null;
        }
        return new ArrayList<String>( map.keySet() );
    }

    public static void receiveMessage() {
        try {
            while ( true ){
                synchronized (KafkaConsumerDemo.class) {
                    // ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                    // 30L 表示超时时间为 30秒,有消息立即返回,没消息最多等 30 秒后返回
                    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(30L));
                    String date = sdf.format(new Date());
                    if( records == null ){
                        System.out.println( date + " 本次未拉取到任何消息" );
                    }else {
                        System.out.println( date + " 本次拉取到 " + records.count() + " 条消息" );
                        int i = 1;
                        for (ConsumerRecord<String,String> record: records) {
                            String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]", record.topic(), record.partition(), record.offset(), record.key(), record.value());
                            System.out.println( "第" + i + "条消息:" + info );
                            i++;
                        }
                        kafkaConsumer.commitSync();
                    }
                    /**
                     * 当你用 KafkaConsumer从Kafka里读取消息并且处理完后,commitSync 方法会帮你把这些消息的处理进度(也就是偏移量 offset )同步地告诉 Kafka 服务器。
                     * 这样,Kafka 就知道你已经处理到哪儿了。如果消费者(也就是读取消息的程序)突然崩溃或者重启,Kafka 就能根据最后一次提交的偏移量,让你从上一次处理
                     * 完的地方继续开始,而不会漏掉或者重复处理消息。
                     * 简单来说,commitSync 方 法就是用来“保存进度”的,确保消息处理的可靠性和顺序性。
                     */
                    // Thread.sleep( 5000L );
                }
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            kafkaConsumer.close();
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2276153.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OpenCV的对比度受限的自适应直方图均衡化算法

OpenCV的对比度受限的自适应直方图均衡化&#xff08;CLAHE&#xff09;算法是一种图像增强技术&#xff0c;旨在改善图像的局部对比度&#xff0c;同时避免噪声的过度放大。以下是CLAHE算法的原理、步骤以及示例代码。 1 原理 CLAHE是自适应直方图均衡化&#xff08;AHE&…

【赵渝强老师】什么是NoSQL数据库?

随着大数据技术的兴起&#xff0c;NoSQL数据库得到了广泛的应用。NoSQL的全称是Not Only SQL&#xff0c;中文含义是不仅仅是SQL。它泛指所有的非关系型数据库&#xff0c;即&#xff1a;在NoSQL数据库中存储数据的模型可能不是二维表的行和列。NoSQL数据库不遵循关系型数据库范…

Linux第一个系统程序---进度条

进度条---命令行版本 回车换行 其实本质上回车和换行是不同概念&#xff0c;我们用一张图来简单的理解一下&#xff1a; 在计算机语言当中&#xff1a; 换行符&#xff1a;\n 回车符&#xff1a;\r \r\n&#xff1a;回车换行 这时候有人可能会有疑问&#xff1a;我在学习C…

于交错的路径间:分支结构与逻辑判断的思维协奏

大家好啊&#xff0c;我是小象٩(๑ω๑)۶ 我的博客&#xff1a;Xiao Xiangζั͡ޓއއ 很高兴见到大家&#xff0c;希望能够和大家一起交流学习&#xff0c;共同进步。* 这一节内容很多&#xff0c;文章字数达到了史无前例的一万一&#xff0c;我们要来学习分支与循环结构中…

【学习笔记】理解深度学习和机器学习的数学基础:数值计算

深度学习作为人工智能领域的一个重要分支&#xff0c;其算法的实现和优化离不开数值计算。数值计算在深度学习中扮演着至关重要的角色&#xff0c;它涉及到如何在计算机上高效、准确地解决数学问题。本文将介绍深度学习中数值计算的一些关键概念和挑战&#xff0c;以及如何应对…

DolphinScheduler自身容错导致的服务器持续崩溃重大问题的排查与解决

01 问题复现 在DolphinScheduler中有如下一个Shell任务&#xff1a; current_timestamp() { date "%Y-%m-%d %H:%M:%S" }TIMESTAMP$(current_timestamp) echo $TIMESTAMP sleep 60 在DolphinScheduler将工作流执行策略设置为并行&#xff1a; 定时周期调度设置…

Python学习(三)基础入门(数据类型、变量、条件判断、模式匹配、循环)

目录 一、第一个 Python 程序1.1 命令行模式、Python 交互模式1.2 Python的执行方式1.3 SyntaxError 语法错误1.4 输入和输出 二、Python 基础2.1 Python 语法2.2 数据类型1&#xff09;Number 数字2&#xff09;String 字符串3&#xff09;List 列表4&#xff09;Tuple 元组5&…

LLM - Llama 3 的 Pre/Post Training 阶段 Loss 以及 logits 和 logps 概念

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/145056912 Llama 3 是 Meta 公司发布的开源大型语言模型&#xff0c;包括具有 80 亿和 700 亿参数的预训练和指令微调的语言模型&#xff0c;支持…

[RabbitMQ] RabbitMQ运维问题

&#x1f338;个人主页:https://blog.csdn.net/2301_80050796?spm1000.2115.3001.5343 &#x1f3f5;️热门专栏: &#x1f9ca; Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm1001.2014.3001.5482 &#x1f355; Collection与…

MongoDB如何使用

1.简单介绍 MongoDB是一个开源、高性能、无模式的文档型数据库&#xff0c;当初的设计就是用于简化开发和方便扩展&#xff0c;是NoSQL数据库产品中的一种。是最 像关系型数据库&#xff08;MySQL&#xff09;的非关系型数据库。 MongoDB是一个基于分布式文件存储的数据库由C语…

【2024年华为OD机试】(C卷,100分)- 分割均衡字符串 (Java JS PythonC/C++)

一、问题描述 题目描述 均衡串定义&#xff1a;字符串中只包含两种字符&#xff0c;且这两种字符的个数相同。 给定一个均衡字符串&#xff0c;请给出可分割成新的均衡子串的最大个数。 约定&#xff1a;字符串中只包含大写的 X 和 Y 两种字符。 输入描述 输入一个均衡串…

React Fiber框架中的Commit提交阶段——commitMutationEffect函数

Render阶段 Render阶段可大致归为beginWork&#xff08;递&#xff09;和completeWork&#xff08;归&#xff09;两个阶段 1.beginWork流程&#xff08;递&#xff09; 建立节点的父子以及兄弟节点关联关系 child return sibling属性给fiber节点打上flag标记(当前节点的flag) …

【STM32-学习笔记-6-】DMA

文章目录 DMAⅠ、DMA框图Ⅱ、DMA基本结构Ⅲ、不同外设的DMA请求Ⅳ、DMA函数Ⅴ、DMA_InitTypeDef结构体参数①、DMA_PeripheralBaseAddr②、DMA_PeripheralDataSize③、DMA_PeripheralInc④、DMA_MemoryBaseAddr⑤、DMA_MemoryDataSize⑥、DMA_MemoryInc⑦、DMA_DIR⑧、DMA_Buff…

IoT平台在设备远程运维中的应用

IoT平台是物联网技术的核心组成部分&#xff0c;实现了设备、数据、应用之间的无缝连接与交互。通过提供统一的设备管理、数据处理、安全监控等功能&#xff0c;IoT平台为企业构建了智能化、可扩展的物联网生态系统。在设备远程运维领域&#xff0c;IoT平台发挥着至关重要的作用…

浅谈云计算05 | 云存储等级及其接口工作原理

一、云存储设备 在当今数字化飞速发展的时代&#xff0c;数据已然成为个人、企业乃至整个社会的核心资产。从日常生活中的珍贵照片、视频&#xff0c;到企业运营里的关键业务文档、客户资料&#xff0c;数据量呈爆炸式增长。面对海量的数据&#xff0c;如何安全、高效且便捷地存…

网络传输层TCP协议

传输层TCP协议 1. TCP协议介绍 TCP&#xff08;Transmission Control Protocol&#xff0c;传输控制协议&#xff09;是一个要对数据的传输进行详细控制的传输层协议。 TCP 与 UDP 的不同&#xff0c;在于TCP是有连接、可靠、面向字节流的。具体来说&#xff0c;TCP设置了一大…

【Linux系列】`find / -name cacert.pem` 文件搜索

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

【论文笔记】Sign Language Video Retrieval with Free-Form Textual Queries

&#x1f34e;个人主页&#xff1a;小嗷犬的个人主页 &#x1f34a;个人网站&#xff1a;小嗷犬的技术小站 &#x1f96d;个人信条&#xff1a;为天地立心&#xff0c;为生民立命&#xff0c;为往圣继绝学&#xff0c;为万世开太平。 基本信息 标题: Sign Language Video Retr…

Observability:将 OpenTelemetry 添加到你的 Flask 应用程序

作者&#xff1a;来自 Elastic jessgarson 待办事项列表可以帮助管理与假期计划相关的所有购物和任务。使用 Flask&#xff0c;你可以轻松创建待办事项列表应用程序&#xff0c;并使用 Elastic 作为遥测后端&#xff0c;通过 OpenTelemetry 对其进行监控。 Flask 是一个轻量级…

网站目录权限加固

说明 在一个入侵链路中&#xff0c;往往是利用某个安全漏洞&#xff0c;向服务器写入或上传一个webshell&#xff08;后门&#xff09;&#xff0c;再通过webshell提权或进行后续渗透入侵行为。 这个过程中&#xff0c;获取webshell是最关键最重要的一个步骤&#xff0c;如能在…