Kafka java 配置

news2024/11/13 8:51:10

前言:
        大家好,大家在springboot项目中,经常采用 @KafkaListener 做为消费者。这个是spring为我们封装的。 但是某些情况 注解的方式并不能满足需求。这个时候就需要手动版本了。

介绍:

        我们已经集成spring-Kafka 就不需要再额外引入kafka-clients的依赖了。直接亮代码。

给大家解释配置含义。

1.Kafka配置代码

public KafkaConsumer<String, String> getCustomer() {
    // 1. 配置属性参数
    Properties properties = new Properties();

    // 设置Kafka集群的地址和端口,消费者将连接到这个地址和端口
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    // 设置键(Key)的反序列化器为StringDeserializer,用于将字节数据转换为String类型
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // 设置值(Value)的反序列化器为StringDeserializer,用于将字节数据转换为String类型
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    // 设置消费者所属的消费者组,消费者组内的消费者将共同消费同一个Topic的消息
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
    // 设置消费者与Kafka集群之间的会话超时时间(单位:毫秒)
    properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
    // 设置消费者是否自动提交offset,true表示自动提交
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    // 设置自动提交offset的时间间隔(单位:毫秒)
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
    // 设置每次poll操作返回的最大记录数
    properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1);

    // 根据配置属性创建Kafka消费者实例
    return new KafkaConsumer<>(properties);
}

2.Kafka消费者代码

@Test
void KafkaConsumerTest() {
    // 创建Kafka消费者实例,通过getCustomer()方法获取
    KafkaConsumer<String, String> consumer = kafkaCustomer.getCustomer();

    // 订阅要消费的主题,这里是 "test-topic"
    consumer.subscribe(Collections.singletonList("test-topic"));

    // 从Kafka服务器拉取消息,poll等待的最长时间设置为10秒(10000000毫秒)
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000000));
    for (ConsumerRecord<String, String> record : records) {
        // 处理消息的逻辑
        // 打印消息的offset、key和value
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

        //以下代码是我的场景,本人需要在某些情况跳转,而编写单元测试做试验的。
        boolean flag = true;
        if (flag){
            // 如果flag为true,则不自动提交offset,可以在这里添加业务逻辑处理消息
            // 如果需要手动提交offset,可以取消注释下面的代码
            // consumer.commitAsync();
            // 由于flag为true,这里会跳出循环,不再处理后续的消息
            break;
        }
    }
    // 关闭消费者,释放资源
    consumer.close();
    // 打印结束消费的日志
    System.out.println("结束消费");
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2238279.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python的函数(补充浅拷贝和深拷贝)

一、定义 函数的定义&#xff1a;实现【特定功能】的代码块。 形参&#xff1a;函数定义时的参数&#xff0c;没有实际意义 实参&#xff1a;函数调用/使用时的参数&#xff0c;有实际意义 函数的作用&#xff1a; 简化代码提高代码重用性便于维护和修改提高代码的可扩展性…

el-input 正则表达式校验输入框不能输入汉字

<el-form :model"data1" :rules"rules" ref"ruleForm" label-width"210px" class"demo-ruleForm"><el-form-item label"锯路&#xff1a;" prop"sawKref"><el-input class"inptWid…

嵌入式linux系统中I2C控制实现AP3216C传感器方法

大家好,今天主要给大家分享一下,如何使用linux系统里面的I2C进行控制实现。 第一:Linux系统中I2C简介 Linux 内核开发者为了让驱动开发工程师在内核中方便的添加自己的 I2C 设备驱动程序,更容易的在 linux 下驱动自己的 I2C 接口硬件,进而引入了 I2C 总线框架。与 Linux 下…

OceanBase 应用实践:如何处理数据空洞,降低存储空间

问题描述 某保险行业客户的核心系统&#xff0c;从Oracle 迁移到OceanBase之后&#xff0c;发现数据存储空间出现膨胀问题&#xff0c;数据空间 datasize9857715.48M&#xff0c;实际存储占用空间17790702.00M。根据 required_mb - data_mb 值判断&#xff0c;数据空洞较为严重…

【flask开启进程,前端内容图片化并转pdf-会议签到补充】

flask开启进程,前端内容图片化并转pdf-会议签到补充 flask及flask-socketio开启threading页面内容转图片转pdf流程前端主js代码内容转图片-browser端browser端的同步编程flask的主要功能route,def 总结 用到了pdf,来回数据转发和合成,担心flask卡顿,响应差,于是刚好看到threadi…

QT栅格布局的妙用

当groupBox中只有一个控件时&#xff0c;我们想要它满格显示可以对groupBox使用栅格布局

MyBatis快速入门(上)

MyBatis快速入门&#xff08;上&#xff09; 一、MyBatis 简介1、概述2、JDBC、Hibernate、MyBatis 对比 二、MyBatis 框架搭建1、开发环境2、创建maven工程3、创建MyBatis的核心配置文件4、创建mapper接口5、创建MyBatis的映射文件6、通过junit测试功能7、加入log4j2日志功能 …

在Pybullet中加载Cinema4D创建的物体

首先明确我们的目标&#xff0c;是希望在cinema4D中创建自己想要的模型&#xff0c;并生成.obj文件&#xff0c;然后在pybullet中加载.obj文件作为静态物体&#xff0c;可以用于抓取物体&#xff0c;避障物体。&#xff08;本文提到的方法只能实现静态物体的建模&#xff0c;如…

第十三届交通运输研究(上海)论坛┆智能网联汽车技术现状与研究实践

0.简介 交通运输研究&#xff08;上海&#xff09;论坛&#xff08;简称为TRF&#xff09;是按照国际会议的组织原则&#xff0c;为综合交通运输领域学者们构建的良好合作交流平台。交通运输研究&#xff08;上海&#xff09;论坛已经成功举办了十二届&#xff0c;凝聚了全国百…

Pr:视频过渡快速参考(合集 · 2025版)

Adobe Premiere Pro 自带七组约四十多个视频过渡 Video Transitions效果&#xff0c;包含不同风格和用途&#xff0c;可在两个剪辑之间创造平滑、自然的转场&#xff0c;用来丰富时间、地点或情绪的变化。恰当地应用过渡可让观众更好地理解故事或人物。 提示&#xff1a; 点击下…

stm32 踩坑笔记

串口问题&#xff1a; 问题&#xff1a;会改变接收缓冲的下一个字节 串口的初始化如下&#xff0c;位长度选择了9位。因为要奇偶校验&#xff0c;要选择9位。但是接收有用数据只用到1个字节。 问题原因&#xff1a; 所以串口接收时会把下一个数据更改

昇思大模型平台打卡体验活动:项目4基于MindSpore实现Roberta模型Prompt Tuning

基于MindNLP的Roberta模型Prompt Tuning 本文档介绍了如何基于MindNLP进行Roberta模型的Prompt Tuning&#xff0c;主要用于GLUE基准数据集的微调。本文提供了完整的代码示例以及详细的步骤说明&#xff0c;便于理解和复现实验。 环境配置 在运行此代码前&#xff0c;请确保…

后悔没早点知道,Coze 插件 + Cursor 原来可以这样赚钱

最近智能体定制化赛道异常火爆。 打开闲鱼搜索"Coze 定制",密密麻麻的服务报价直接刷屏,即使表明看起来几十块的商家,一细聊,都是几百到上千不等的报价。 有趣的是,这些智能体定制化服务背后,最核心的不只是工作流设计,还有一个被很多人忽视的重要角色 —— …

基于STM32的节能型路灯控制系统设计

引言 本项目基于STM32微控制器设计了一个智能节能型路灯控制系统&#xff0c;通过集成多个传感器模块和控制设备&#xff0c;实现对路灯的自动调节。该系统能够根据周围环境光照强度、车辆和行人活动等情况&#xff0c;自动控制路灯的开关及亮度调节&#xff0c;从而有效减少能…

Qml 模型-视图-代理(贰)之 动态视图学习

目录 动态视图 动态视图用法 ⽅向&#xff08;Orientation&#xff09; 键盘导航和⾼亮 页眉与页脚 网格视图 动态视图 动态视图用法 Repeater 元素适合有限的静态数据&#xff0c; QtQuick 提供了 ListView 和 GridView, 这两个都是基于 Flickable(可滑动) 区域的元素…

新标准大学英语综合教程1课后习题答案PDF第三版

《新标准大学英语&#xff08;第三版&#xff09;综合教程1 》是“新标准大学英语&#xff08;第三版&#xff09;”系列教材之一。本书共包含6个单元&#xff0c;从难度和话题上贴近大一上学生的认知和语言水平&#xff0c;包括与学生个人生活领域和社会文化等相关内容&#x…

Python闭包|你应该知道的常见用例(下)

引言 在 Python 编程语言中&#xff0c;闭包通常指的是一个嵌套函数&#xff0c;即在一个函数内部定义的另一个函数。这个嵌套的函数能够访问并保留其外部函数作用域中的变量。这种结构就构成了一个闭包。 闭包在函数式编程语言中非常普遍。在 Python 中&#xff0c;闭包特别有…

Rocky、Almalinux、CentOS、Ubuntu和Debian系统初始化脚本v9版

Rocky、Almalinux、CentOS、Ubuntu和Debian系统初始化脚本 Shell脚本源码地址&#xff1a; Gitee&#xff1a;https://gitee.com/raymond9/shell Github&#xff1a;https://github.com/raymond999999/shell脚本可以去上面的Gitee或Github代码仓库拉取。 支持的功能和系统&am…

AUTOSAR OS模块详解(一) 概述

AUTOSAR OS模块详解(一) 概述 本文主要介绍AUTOSAR架构下的OS概述。 文章目录 AUTOSAR OS模块详解(一) 概述1 前言1.1 操作系统1.2 嵌入式操作系统1.3 AUTOSAR操作系统 2 AUTOSAR OS2.1 AUTOSAR OS组成2.2 AUTOSAR OS类别2.3 任务管理2.4 调度表2.5 资源管理2.6 多核特性2.7 …

5位机械工程师如何共享一台工作站的算力?

在现代化的工程领域中&#xff0c;算力已成为推动创新与技术进步的关键因素之一。对于机械工程师而言&#xff0c;强大的计算资源意味着能够更快地进行复杂设计、模拟分析以及优化工作&#xff0c;从而明显提升工作效率与项目质量。然而&#xff0c;资源总是有限的&#xff0c;…