Spring Boot整合Kafka,实现单条消费和批量消费,示例教程

news2025/1/11 21:41:01

如何安装Kafka,可以参考docker搭载Kafka集群,一个文件搞定,超简单,亲试可行-CSDN博客

1、在pom.xml中加入依赖

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
            <version>3.1.6</version>
        </dependency>

2、配置application.yml文件

在application.yml中加入

spring
  kafka:
    #Kafka地址,可以是一个,也可以是Kafka集群的地址,多个地址用逗号分隔
    bootstrap-servers: 192.168.57.1xx:9093,192.168.57.1xx:9094,192.168.57.1xx:9095

    producer:
      # 消息确认模式:0=不等待确认,1=等待leader确认,all=所有副本确认
      acks: 1
      # 发送失败时的重试次数,0表示不重试
      retries: 0
      # 批量发送时的批次大小(字节)
      batch-size: 30720000 # 30MB
      # 生产者的内存缓冲区大小(字节)
      buffer-memory: 33554432 # 32MB
      # Key的序列化器类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # Value的序列化器类
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    consumer:
      # 消费者所属的组ID
      group-id: test-kafka
      # 禁用自动提交offset,改为手动提交
      enable-auto-commit: false
      # 偏移量重置策略:
      # earliest:从最早的记录开始消费
      # latest:从最新的记录开始消费
      auto-offset-reset: earliest
      # Key的反序列化器类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # Value的反序列化器类
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 每次poll()调用返回的最大消息条数
      max-poll-records: 2
      session:
        # 消费者会话超时时间,超时未发送心跳将被认为失联(毫秒)
        timeout:
          ms: 300000 # 5分钟

    listener:
      # 如果指定的主题不存在,是否让应用启动失败,false表示不会报错
      missing-topics-fatal: false
      # 消费模式:single=单条消息,batch=批量消费
      type: single
      # 消费确认模式:
      # manual_immediate:手动确认消息,立即提交offset
      ack-mode: manual_immediate

3、主要示例代码

创建一个目录和四个java文件,可以做测试

3.1、KafkaConfig.java

Kafka监听器配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;

@EnableKafka
@Configuration
public class KafkaConfig {

    // 单条消费监听器工厂,手动提交offset
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> singleFactory(
            ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    // 批量消费监听器工厂,手动提交offset
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> batchFactory(
            ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setBatchListener(true); // 启用批量消费
        factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

3.2、KafkaProducer.java

生产者

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootApplication
public class KafkaProducer {

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducer.class, args);
    }

    @Bean
    CommandLineRunner commandLineRunner(KafkaTemplate<String, String> kafkaTemplate) {
        return args -> {
            String topic = "test-topic";
            for (int i = 1; i <= 10; i++) {
                String message = "Message " + i;
                kafkaTemplate.send(topic, message);
                System.out.println("Sent: " + message);
                Thread.sleep(500); // 模拟消息发送间隔
            }
        };
    }
}

3.3、SingleConsumer.java

单条消息消费者

autoStartup参数:是是否自动启动;=”true“:自动启动,即生产者启动,该消费者将会开始消费;=”false":不自动启动,不开该模式的消费。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

@Service
public class SingleConsumer {
    @KafkaListener(topics = "test-topic", groupId = "test-group", containerFactory = "singleFactory", autoStartup = "true")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        System.out.println("SingleConsumer - Received: " + record.value());
        // 手动提交offset
        acknowledgment.acknowledge();
    }
}

3.4、BatchConsumer.java

批量消息消费者

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class BatchConsumer {

    @KafkaListener(topics = "test-topic", groupId = "test-group", containerFactory = "batchFactory", autoStartup = "false")
    public void batchListen(List<String> messages, Acknowledgment acknowledgment) {
        System.out.println("BatchConsumer - Received batch: " + messages);
        // 手动提交offset
        acknowledgment.acknowledge();
    }
}

4、测试

4.1、单条信息消费模式

在SingleConsumer.java中设置autoStartup = "true",启动KafkaProducer.java

消费成功

4.2、批量信息消费模式

在BatchConsumer.java中设置autoStartup = "true",启动KafkaProducer.java

配置文件中设置了max-poll-records: 2,所有一次只消费两条

消费成功

如果在BatchConsumer.java和SingleConsumer.java中设置autoStartup = "true",Kafka会随机选择消费者组里的一个消费者进行消费,所有可以会导致其中一个消费者没有消费信息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2244889.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于Vue+SpringBoot的求职招聘平台

平台概述 本平台是一个高效、便捷的人才与职位匹配系统&#xff0c;旨在为求职者与招聘者提供一站式服务。平台内设三大核心角色&#xff1a;求职者、招聘者以及超级管理员&#xff0c;每个角色拥有独特的功能模块&#xff0c;确保用户能够轻松完成从信息获取到最终录用的整个…

FPGA FIFO系列 - FIFO使用中需要注意的若干问题

FIFO使用中需要注意的若干问题 文章目录 FIFO使用中需要注意的若干问题前言场景1&#xff1a;包数据FIFO设计之冗余法场景2、FIFO数据传输之流控总结 前言 场景1&#xff1a;包数据FIFO设计之冗余法 场景&#xff1a;类似图像、文字等码流数据是不需要重复被访问的&#xff0c…

.NET 9 - BinaryFormatter移除

1.简单介绍 .NET 9 SDK正式版已经发布, 下载地址是.NET9 同时.NET Conf 2024 大会已经从2024-11-13开始了&#xff0c;感觉Aspire和AI的内容相对挺多的&#xff0c;主题分享演示时候打开的网站大部分都是Blazor制作的。 这次.NET Conf 2024老师也再次说明了一下&#xff0c;…

[免费]SpringBoot+Vue毕业设计论文管理系统【论文+源码+SQL脚本】

大家好&#xff0c;我是java1234_小锋老师&#xff0c;看到一个不错的SpringBootVue毕业设计论文管理系统&#xff0c;分享下哈。 项目视频演示 【免费】SpringBootVue毕业设计论文管理系统 Java毕业设计_哔哩哔哩_bilibili 项目介绍 现代经济快节奏发展以及不断完善升级的信…

C# 高级--反射 详解

一、反射是什么 1、C#编译运行过程 高级语言->编译->dll/exe文件->CLR/JIT->机器码 2、原理解析metadata&#xff1a;元数据数据清单&#xff0c;记录了dll中包含了哪些东西,是一个描述。IL&#xff1a;中间语言&#xff0c;编译把高级语言编译后得到的C#中最真…

OpenCV与AI深度学习|16个含源码和数据集的计算机视觉实战项目(建议收藏!)

本文来源公众号“OpenCV与AI深度学习”&#xff0c;仅用于学术分享&#xff0c;侵权删&#xff0c;干货满满。 原文链接&#xff1a;分享&#xff5c;16个含源码和数据集的计算机视觉实战项目 本文将分享16个含源码和数据集的计算机视觉实战项目。具体包括&#xff1a; 1. 人…

【软考网工笔记】网络基础理论——应用层

TLv 基本编码规则&#xff08;BER&#xff09;将ASN.1表示的抽象类型值编码为字节串&#xff0c;这种字节串的结构为&#xff1a;类型——长度——值&#xff0c;简称TLv。 其中&#xff0c;值部分还可以递归的在编码为TLv结构&#xff0c;一具有表达复杂结构的能力。 IP地址…

用Python爬虫“偷窥”1688商品详情:一场数据的奇妙冒险

引言&#xff1a;数据的宝藏 在这个信息爆炸的时代&#xff0c;数据就像是一座座等待挖掘的宝藏。而对于我们这些电商界的探险家来说&#xff0c;1688上的商品详情就是那些闪闪发光的金子。今天&#xff0c;我们将化身为数据的海盗&#xff0c;用Python这把锋利的剑&#xff0…

企业网络安全规划建设实践

规划是指较全面或长远的计划。凡事预则立&#xff0c;不预则废&#xff01; 在企业战略规划方面&#xff0c;随着市场环境变化速度的不断加快&#xff0c;人们越来越意识到企业战略规划对企业生存和发展的重要性&#xff0c;战略规划能帮助企业解决影响组织未来发展最重要、最…

QT基本绘图

QT绘图 1.概述 这篇文章介绍如何绘图 2.绘图基本操作 创建一个普通的widget类型的项目 在widget.h 文件中重写绘图事件 #ifndef WIDGET_H #define WIDGET_H#include <QWidget>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPACEclass Widget : p…

Linux驱动开发(7):使用设备树实现RGB 灯驱动

通过上一小节的学习&#xff0c;我们已经能够编写简单的设备树节点&#xff0c;并且使用常用的of函数从设备树中获取我们想要的节点资源。 这一小节我们带领大家使用设备树编写一个简单的RGB灯驱动程序&#xff0c;加深对设备树的理解。 1. 实验说明 本节实验使用到 EBF6ULL-…

MATLAB实现GARCH(广义自回归条件异方差)模型计算VaR(Value at Risk)

MATLAB实现GARCH(广义自回归条件异方差)模型计算VaR(Value at Risk) 1.计算模型介绍 使用GARCH&#xff08;广义自回归条件异方差&#xff09;模型计算VaR&#xff08;风险价值&#xff09;时&#xff0c;方差法是一个常用的方法。GARCH模型能够捕捉到金融时间序列数据中的波…

Neo4j下载及其Cypher语法介绍

1.部署安装 Neo4j支持众多平台的部署安装&#xff0c;如&#xff1a;Windows、Mac、Linux等系统。Neo4j是基于Java平台的&#xff0c;所以部署安装前先保证已经安装了Java虚拟机。 在神领物流项目中&#xff0c;我们采用docker的方式进行安装。安装命令如下&#xff1a; dock…

【Redis】实现点赞功能

一、实现笔记点赞 使用redis实现点赞功能&#xff0c;对于一个笔记来说&#xff0c;不同用户只能是点赞和没点赞&#xff0c;点赞过的笔记再点击就应该取消点赞&#xff0c;所以实际上根据需求&#xff0c;我们只需要将点赞的数据存到对应的笔记里&#xff0c;查看对应的笔记相…

开源TTS语音克隆神器GPT-SoVITS_V2版本地整合包部署与远程使用生成音频

文章目录 前言1.GPT-SoVITS V2下载2.本地运行GPT-SoVITS V23.简单使用演示4.安装内网穿透工具4.1 创建远程连接公网地址 5. 固定远程访问公网地址 前言 本文主要介绍如何在Windows系统电脑使用整合包一键部署开源TTS语音克隆神器GPT-SoVITS&#xff0c;并结合cpolar内网穿透工…

【Pytorch】torch.utils.data模块

torch.utils.data模块主要用于进行数据集处理&#xff0c;是常用的一个包。在构建数据集的过程中经常会用到。要使用data函数必须先导入&#xff1a; from torch.utils import data 下面介绍几个经常使用到的类。 torch.utils.data.DataLoader DataLoader(dataset, batch_…

XGBOOST、LightGBM、CATBoost

本文介绍几种不同的 GBDT 优化算法&#xff1a; XGBoost XGBoost 对损失函数展开二阶导&#xff0c;使得提升树能逼近真是损失&#xff0c;增加正则项防止过拟合&#xff0c;XGBoost 公式&#xff1a; L( y i y_i yi​, y ^ i \hat{y}_i y^​i​): 损失函数 Ω ( f k ) \Ome…

论文阅读 SimpleNet: A Simple Network for Image Anomaly Detection and Localization

SimpleNet: A Simple Network for Image Anomaly Detection and Localization 摘要&#xff1a; 该论文提出了一个简单且应用友好的网络&#xff08;称为 SimpleNet&#xff09;来检测和定位异常。SimpleNet 由四个组件组成&#xff1a;&#xff08;1&#xff09;一个预先训练的…

多线程4:线程池、并发、并行、综合案例-抢红包游戏

欢迎来到“雪碧聊技术”CSDN博客&#xff01; 在这里&#xff0c;您将踏入一个专注于Java开发技术的知识殿堂。无论您是Java编程的初学者&#xff0c;还是具有一定经验的开发者&#xff0c;相信我的博客都能为您提供宝贵的学习资源和实用技巧。作为您的技术向导&#xff0c;我将…

Java数据库连接(Java Database Connectivity,JDBC)

1.JDBC介绍 Java数据库连接&#xff08;Java Database Connectivity&#xff0c;JDBC&#xff09;是SUN公司为了简化、统一对数据库的操作&#xff0c;定义的一套Java操作数据库的规范&#xff08;接口&#xff09;。这套接口由数据库厂商去实现&#xff0c;这样&#xff0c;开…