SpringBoot日常:集成Kafka

news2025/1/13 12:35:18

文章目录

      • 1、pom.xml文件
      • 2、application.yml
      • 3、生产者配置类
      • 4、消费者配置类
      • 5、消息订阅
      • 6、生产者发送消息
      • 7、测试发送消息

本章内容主要介绍如何在springboot项目对kafka进行整合,最终能达到的效果就是能够在项目中通过配置相关的kafka配置,就能进行消息的生产和消费。

1、pom.xml文件

原本项目用 Spring Boot 的版本为2.6.X,所以这里用spring-cloud-starter-stream-kafka的版本用的是2.2.1.RELEASE,也可以用其他版本,但是注意兼容性,不然会编译运行报错

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>2021.0.2</version>  <!-- 确保与 Spring Boot 2.6.x 兼容 -->
            <scope>import</scope>
            <type>pom</type>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
	<dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        <version>2.2.1.RELEASE</version>
    </dependency>
</dependencies>

2、application.yml

添加kafka的相关配置

spring:
  kafka:
    bootstrap-servers: 192.168.102.179:9092
    producer:
      acks: 1
      retries: 0
      batch-size: 30720000
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    #消费者配置
    consumer:
      group-id: test-kafka
      #是否开启手动提交 默认自动提交
      enable-auto-commit: true
      #如果enable.auto.commit为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000  自动提交已消费offset时间间隔
      auto-commit-interval: 5000
      #earliest:分区已经有提交的offset从提交的offset开始消费,如果没有提交的offset,从头开始消费,latest:分区下已有提交的offset从提交的offset开始消费,没有提交的offset从新产生的数据开始消费
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #一次调用 poll() 操作时返回的最大记录数 默认为 500 条
      max-poll-records: 2
      #kafka session timeout
      session:
        timeout:
          ms: 300000
    listener:
      #kafka 没有创建指定的 topic 下  项目启动是否报错 true  false
      missing-topics-fatal: false
      #Kafka 的消费模式 single 每次单条消费消息  batch  每次批量消费消息
      type: single
      ack-mode: manual_immediate

3、生产者配置类

添加一个生产者配置类KafkaProducerConfig ,主要设置消息的序列化方式等消息处理方式

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author 码至终章
 * @Date 2025/1/8 11:33
 * @Version 1.0
 */
@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Bean("myProducerKafkaProps")
    public Map<String, Object> getMyKafkaProps() {
        Map<String, Object> props = new HashMap<>(4);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> newProducerFactory() {
        return new DefaultKafkaProducerFactory<>(getMyKafkaProps());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(newProducerFactory());
    }

}

4、消费者配置类

创建一个消费者配置类KafkaConsumerConfig,主要设置一些消息的接收处理配置

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author 码至终章
 * @Date 2025/1/8 12:09
 * @Version 1.0
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String offsetReset;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private String autoCommitIntervalMs;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Bean("myConsumerKafkaProps")
    public Map<String, Object> getMyKafkaProps() {
        Map<String, Object> props = new HashMap<>(12);
        //是否自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //kafak 服务器
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //不存在已经提交的offest时 earliest 表示从头开始消费,latest 表示从最新的数据消费
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
        //消费组id
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //一次调用poll()操作时返回的最大记录数,默认值为500
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        //自动提交时间间隔 默认 5秒
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        //props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
        return props;
    }


    /**
     * 消费者工厂
     */
    @Bean("myContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(getMyKafkaProps()));
        // 并发创建的消费者数量
        factory.setConcurrency(3);
        // 开启批处理
        factory.setBatchListener(true);
        //拉取超时时间
        factory.getContainerProperties().setPollTimeout(1500);
        //是否自动提交 ACK kafka 默认是自动提交
        if (!enableAutoCommit) {
            //共有其中方式
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
        }
        return factory;
    }
}

5、消息订阅

创建一个消费者监听消息类,里面对主题消息监听,这里的测试主题为testone

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @Author 码至终章
 * @Date 2025/1/8 14:19
 * @Version 1.0
 */
@Slf4j
@Component
public class MyKafkaConsumer {

    @KafkaListener(id = "my-kafka-consumer",
            idIsGroup = false, topics = "topicone",
            containerFactory = "myContainerFactory")
    public void listen(String message) {
        log.info("接收到主题消息,消息内容:{}", message);
    }
}

6、生产者发送消息

为了方便调用测试,这里在controller编写一个方法发送消息

@RestController
@Slf4j
public class TestController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping ("/sendMessage")
    public void sendMessage(@RequestParam String message) {
        this.kafkaTemplate.send("topicone", message);
    }
}

7、测试发送消息

这里简单用postman调用接口发送一条消息
在这里插入图片描述
从idea的程序控制台可以看到消费者监听可以正常接收到消息
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2275963.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RK3568 Android 13 内置搜狗输入法小计

问&#xff1a;为什么写&#xff1f; 答&#xff1a;网上搜出来的都试过了&#xff0c;不行&#xff01;下面直接上代码和注意事项&#xff01; 首先到这个目录&#xff08;/RK3568/Rockchip_Android13_SDK_Release/device/rockchip/rk356x/tl3568_evm/preinstall&#xff09…

【opencv】第8章 图像轮廓与图像分割修复

8.1 查找并绘制轮廓 一个轮廓一般对应一系列的点&#xff0c;也就是图像中的一条曲线。其表示方法可能 根据不同的情况而有所不同。在OpenCV 中&#xff0c;可以用findContours()函数从二值图 像中查找轮廓 8.1.1 寻找轮廓&#xff1a; findContours() 函数 findContours) 函…

BGP 泄露

大家读完觉得有帮助记得关注和点赞&#xff01;&#xff01;&#xff01; 目录 1. BGP 是什么&#xff1f; 2. 什么是 BGP 泄露&#xff1f; 3. 今天发生了什么&#xff1f; 4. 正常和被劫持状态下的路由示意图 5. 受影响区域 6. 责任在谁&#xff1f; 7. 有办法避免这…

数据结构与算法之二叉树: LeetCode 572. 另一棵树的子树 (Ts版)

另一棵树的子树 https://leetcode.cn/problems/subtree-of-another-tree/description/ 描述 给你两棵二叉树 root 和 subRoot检验 root 中是否包含和 subRoot 具有相同结构和节点值的子树如果存在&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false二叉树 tree …

动植物基因表达调控

1&#xff0c; on and off状态 以及表达的量 2&#xff0c; 基因调控的生物学影响&#xff1f; 超过400多种细胞类型&#xff0c;数目上37万亿 不是所有的基因都表达 为什么多核真核细胞需要基因调控&#xff1f; 单个细胞往多个细胞逐渐进化的过程&#xff0c;形成复杂的…

FreePBX 17 on ubuntu24 with Asterisk 20

版本配置&#xff1a; FreePBX 17&#xff08;最新&#xff09; Asterisk 20&#xff08;最新Asterisk 22&#xff0c;但是FreePBX 17最新只支持Asterisk 21&#xff0c;但是21非LTS版本&#xff0c;所以选择Asterisk 20&#xff09; PHP 8.2 Maria DB (v10.11) Node J…

“AI智能服务平台系统,让生活更便捷、更智能

大家好&#xff0c;我是资深产品经理老王&#xff0c;今天咱们来聊聊一个让生活变得越来越方便的高科技产品——AI智能服务平台系统。这个系统可是现代服务业的一颗璀璨明珠&#xff0c;它究竟有哪些魅力呢&#xff1f;下面我就跟大家伙儿闲聊一下。 一、什么是AI智能服务平台系…

Qt监控系统远程网络登录/请求设备列表/服务器查看实时流/回放视频/验证码请求

一、前言说明 这几个功能是近期定制的功能&#xff0c;也非常具有代表性&#xff0c;核心就是之前登录和设备信息都是在本地&#xff0c;存放在数据库中&#xff0c;数据库可以是本地或者远程的&#xff0c;现在需要改成通过网络API请求的方式&#xff0c;现在很多的服务器很强…

【网络协议】动态路由协议

前言 本文将概述动态路由协议&#xff0c;定义其概念&#xff0c;并了解其与静态路由的区别。同时将讨论动态路由协议相较于静态路由的优势&#xff0c;学习动态路由协议的不同类别以及无类别&#xff08;classless&#xff09;和有类别&#xff08;classful&#xff09;的特性…

安装完docker后,如何拉取ubuntu镜像并创建容器?

1. 先docker拉取ubuntu镜像 docker search ubuntu #搜索ubuntu 镜像 docker pull ubuntu:22.04 #拉取ubuntu 镜像 docker images #下载完成后&#xff0c;查看已经下载的镜像 docker run --name ubuntu_container -dit ubuntu:22.04 /bin/bash # docker container -l 2.…

互联网全景消息(10)之Kafka深度剖析(中)

一、深入应用 1.1 SpringBoot集成Kafka 引入对应的依赖。 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupI…

React Fiber框架中的Render渲染阶段——workLoop(performUnitOfWork【beginWork与completeWork】)

触发渲染过程——renderRoot renderRoot 是一个函数&#xff0c;用于触发渲染工作。它通常会调用并递归地执行一系列的渲染任务&#xff0c;直到完成整个更新过程。这个过程包括执行 Fiber 树中的 beginWork 和 completeWork&#xff0c;以及渲染新状态或 DOM。 function ren…

STM32F1学习——ADC模数转换器

一、ADC模数转换器 ADC的全称 Analog-Digital Converter 模拟-数字转换器&#xff0c;他可以用来将引脚上连续变换的模拟电压转换为内存中存储的数字变量。 ADC有两个重要指标&#xff0c;分辨率和频率。 STM32的ADC是 12位 逐次逼近型&#xff0c;1us转换时间&#xff0c;也就…

[每周一更]-(第131期):Go并发协程总结篇

Go语言的并发是通过协程&#xff08;goroutine&#xff09;实现的。Go协程是轻量级的线程&#xff0c;允许多个任务同时执行&#xff0c;且Go运行时会高效地管理它们。在Go中使用并发协程的方式非常简便&#xff0c;也很强大。以下是一些关于Go协程的基础用法和并发控制方法&am…

Ecdsa密钥在线生成工具

具体前往&#xff1a;ECC公钥私钥对在线生成器

llama.cpp 模型可视化工具 GGUF Visualizer

llama.cpp 模型可视化工具 GGUF Visualizer 1. GGUF Visualizer for VS Code (gguf-viz)1.1. Features1.2. Extension Settings References GGUF Visualizer https://marketplace.visualstudio.com/items?itemNameAgainstEntropy.gguf-viz 1. GGUF Visualizer for VS Code (g…

【DAPM杂谈之三】DAPM的初始化流程

本文主要分析DAPM的设计与实现 内核的版本是&#xff1a;linux-5.15.164&#xff0c;下载链接&#xff1a;Linux内核下载 主要讲解有关于DAPM相关的知识&#xff0c;会给出一些例程并分析内核如何去实现的 /**************************************************************…

HarmonyOS:@LocalBuilder装饰器: 维持组件父子关系

一、前言 当开发者使用Builder做引用数据传递时&#xff0c;会考虑组件的父子关系&#xff0c;使用了bind(this)之后&#xff0c;组件的父子关系和状态管理的父子关系并不一致。为了解决组件的父子关系和状态管理的父子关系保持一致的问题&#xff0c;引入LocalBuilder装饰器。…

Pytorch导出onnx模型并在C++环境中调用(含python和C++工程)

Pytorch导出onnx模型并在C环境中调用&#xff08;含python和C工程&#xff09; 工程下载链接&#xff1a;Pytorch导出onnx模型并在C环境中调用&#xff08;python和C工程&#xff09; 机器学习多层感知机MLP的Pytorch实现-以表格数据为例-含数据集和PyCharm工程中简单介绍了在…

git打补丁

1、应用场景 跨仓库升级 开发项目B使用的是开源项目A。开源项目A发现漏洞&#xff0c;作者进行了修复&#xff0c;我们可以通过使用git补丁的方式&#xff0c;将作者修改的内容复制到我 们的项目B中。 2、TortoiseGit方式 源仓库 格式化补丁 根据提交数量&#xff0c;生成…