SpingBoot集成kafka-发送读取消息示例

news2025/1/26 14:33:49

SpingBoot集成kafka开发

    • kafka的几个常见概念
  • 1、springboot和kafka对应版本(重要)
  • 2、创建springboot项目,引入kafka依赖
    • 2.1、生产者EventProducer
    • 2.2、消费者EventConsumer
    • 2.3、启动生产者的方法SpringBoot01KafkaBaseApplication
    • 2.4、application.yml
    • 2.5、pom.xml
    • 2.6、启动springboot项目的启动类(Application)报错
  • 3、springboot集成kafka读取最早的消息
    • 3.1、如何设置消费者auto-offset-reset: earliest
    • 3.2、设置消费者auto-offset-reset: earliest后存在的问题
      • 3.2.1、修改消费组ID
      • 3.2.2、手动重置偏移量
          • 3.2.2.1、手动将偏移量设置为最早
          • 3.2.2.2、手动将偏移量设置为最新

kafka的几个常见概念

在这里插入图片描述

1、springboot和kafka对应版本(重要)

https://spring.io/projects/spring-kafka

在这里插入图片描述

在这里插入图片描述

2、创建springboot项目,引入kafka依赖

在这里插入图片描述

2.1、生产者EventProducer

package com.power.producer;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    public void sendEvent(){
        kafkaTemplate.send("hello-topic","hello kafka");
    }
}

2.2、消费者EventConsumer

package com.power.consumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class EventConsumer {

    //采用监听的方式接收事件(消息,数据)
    @KafkaListener(topics = {"hello-topic"},groupId="hello-group")
    public void onEvent(String event){
        System.out.println("读取到的事件:"+event);
    }
}

2.3、启动生产者的方法SpringBoot01KafkaBaseApplication

执行一次该方法,会调用一次生产者发送一次消息。
即每执行一次,会调用EventProducer类下的sendEvent方法一次。

package com.power;

import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
public class SpringBoot01KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void test01(){
        eventProducer.sendEvent();
    }
}

2.4、application.yml

spring:
  application:
    #应用名称
    name: spring-boot-01-kafka-base

  #kafka连接地址(ip+port)
  kafka:
    bootstrap-servers: <你的服务器ip>:9092
    #配置生产者(有24个配置)
    #producer:
    #配置消费者(有24个配置)
    #consumer:

2.5、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.2</version>
        <relativePath />
    </parent>

    <groupId>org.powernode</groupId>
    <artifactId>spring-boot-01-kafka-base</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafkaSpringBootProject</name>
    <description>kafka project for Spring Boot</description>

    <properties>
        <java.version>8</java.version>
    </properties>


    <repositories>
        <repository>
            <id>central</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <layout>default</layout>
            <!-- 是否开启发布版构件下载 -->
            <releases>
                <enabled>true</enabled>
            </releases>
            <!-- 是否开启快照版构件下载 -->
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>2.8.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

2.6、启动springboot项目的启动类(Application)报错

项目启动类

package com.power;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
		System.out.println("启动成功--------------------------");
	}
}

启动服务后发现报错:
在这里插入图片描述

修改server.properties配置文件:
在这里插入图片描述修改前:

在这里插入图片描述
修改后:
在这里插入图片描述

3、springboot集成kafka读取最早的消息

已经被消费者读取/消费的消息,无法被新启动的消费组消息的,那么新启动的消费组该如何读取最早的消息呢,可以通过设置消费者auto-offset-reset: earliest去实现。
在这里插入图片描述

3.1、如何设置消费者auto-offset-reset: earliest

在这里插入图片描述

1、修改application.yml
在这里插入图片描述

3.2、设置消费者auto-offset-reset: earliest后存在的问题

在这里插入图片描述

3.2.1、修改消费组ID

原消费组ID
在这里插入图片描述
修改后的消费组ID
在这里插入图片描述4、新的消费组ID成功读取到之前的消息
在这里插入图片描述

3.2.2、手动重置偏移量

3.2.2.1、手动将偏移量设置为最早
#示例:./kafka-consumer-groups.sh --bootstrap-server <your-kafka-bootstrap-servers> --group <your-consumer-group> --topic <your-topic> --reset-offsets --to-earliest --execute

来到kafka安装目录下:

在这里插入图片描述执行如下命令:

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group hello-group --topic hello-topic --reset-offsets --to-earliest --execute

执行后报错

在这里插入图片描述
需要先停掉服务,在去手动重置偏移量,此时重置偏移量成功,偏移量为0

3.2.2.2、手动将偏移量设置为最新
#示例:./kafka-consumer-groups.sh --bootstrap-server <your-kafka-bootstrap-servers> --group <your-consumer-group> --topic <your-topic> --reset-offsets --to-latest --execute
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group hello-group --topic hello-topic --reset-offsets --to-latest --execute

设置成功,此时偏移量已为最新:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2073065.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

监控电脑屏幕的软件叫什么?6款电脑屏幕监控软件分享!

监控电脑屏幕的软件可以帮助企业和家长监控电脑的使用情况&#xff0c;确保工作和学习的效率与安全。 以下是六款常用的电脑屏幕监控软件及其特点&#xff1a; 1. Keylogger 特点&#xff1a;专注于企业数据安全和员工上网行为管理。 功能&#xff1a;全面的屏幕监控、上网…

Redis持久化(RDB、AOF、混合持久化)

目录 1、持久化机制 &#xff08;1&#xff09;RDB &#xff08;2&#xff09;AOF 2、混合持久化 3、总结 ❓为什么需要持久化&#xff1f; Redis 是一个基于内存的键值存储系统&#xff0c;它提供了非常快的数据访问速度&#xff0c;因为它不需要像传统的磁盘存储那样进…

竞猜足球核心算法源码

需要实现的功能如下&#xff1a; 仅用于学习 竞猜足球核心算法源码 package com.lotterysource.portsadmin.service; import com.aliyun.oss.common.utils.DateUtil; import com.fasterxml.jackson.core.type.TypeReference; import com.lotterysource.portsadmin.dbprovid…

进存销系统

摘 要 伴随着我国全面推动信息化的趋势&#xff0c;我国的很多行业都在朝着互联网的方向进发。商品销售行业也有很多挑战。这次论文介绍的进存销系统就是为了能够解决当前传统商品进存销存在的问题&#xff0c;使得商品进存销能够更加有效率。电商智能化管理必不可少的帮手有进…

功能安全实战系列02-RamTst(RamTest)开发介绍

本文框架 前言1. What(RamTst相关概念)1.1 后台检测1.2 前台检测1.3 RamTst对应状态机2.How?2.1 接口调用2.2 配置开发2.3 测试模式选择前言 在本系列笔者将结合工作中对功能安全实战部分的开发经验进一步介绍常用,包括Memory(Flash,Ram)失效检测,程序运行时序时间检测,及…

数字模拟IC设计前端、后端、前仿、后仿新版虚拟机

虚拟化平台&#xff1a;VMware Workstation 15 Pro以上版本 操作系统&#xff1a;CentOS Linux release 7.9.2009 (Core) 一、射频模拟IC设计必备软件 Cadence IC06.18.350/IC23.10.080&#xff08;virtuoso&#xff09; Cadence SPECTRE23.10.538-isr10 Cadence ASSURA04.…

Python优化算法15——麻雀搜索算法(SSA)

科研里面优化算法都用的多&#xff0c;尤其是各种动物园里面的智能仿生优化算法&#xff0c;但是目前都是MATLAB的代码多&#xff0c;python几乎没有什么包&#xff0c;这次把优化算法系列的代码都从底层手写开始。 需要看以前的优化算法文章可以参考&#xff1a;Python优化算…

Mozilla为本地音频到文本翻译开发Whisperfile引擎

Mozilla Ocho 小组正进行 Mozilla 的"创新和实验"。Llamafile 用于将大型语言模型以单个文件的形式发布&#xff0c;以便在不同的硬件/软件间轻松执行。Whisperfile 是一项将音频轻松转化为文本的新引擎。 正如其名称所暗示的&#xff0c;Whisperfile 是围绕 OpenAI…

嵌入式UI开发-lvgl+wsl2+vscode系列:10、控件(Widgets)(三)

1、scale&#xff08;标尺&#xff09; 示例1 #include "../../lv_examples.h" #if LV_USE_SCALE && LV_BUILD_EXAMPLES/*** 简单的水平标尺*/ void lv_example_scale_1(void) {lv_obj_t * scale lv_scale_create(lv_screen_active());lv_obj_set_size(sca…

MyBatis源码(6)拦截器

1、目标 本文的主要目标是学习MyBatis拦截器的源码&#xff0c;本文将以插入操作为例debug拦截器相关的源码 2、拦截器源码分析 调用mapper接口的insert插入记录方法&#xff0c;会调用SqlSession对象的insert方法 SqlSession执行insert方法 Spring容器会创建SqlSessionTemp…

Python画笔案例-011 绘制草帽

1、绘制草帽 通过 python 的turtle 库绘制一个草帽的图案&#xff0c;如下图&#xff1a; 2、实现代码 绘制以上草帽的图案&#xff0c;代码如下&#xff1a; """草帽.py """ import turtle # 导入海龟模块turtle.delay(20) …

多动症的孩子有哪些症状表现?

在星启帆自闭症儿童康复机构&#xff0c;我们不仅关注自闭症儿童的成长与康复&#xff0c;也深刻认识到多动症对儿童日常生活、学习和社交的深远影响。多动症&#xff0c;全称注意缺陷多动障碍&#xff0c;是一种常见于儿童时期的神经发育性疾病&#xff0c;其症状表现多种多样…

Python优化算法16——鲸鱼优化算法(WOA)

科研里面优化算法都用的多&#xff0c;尤其是各种动物园里面的智能仿生优化算法&#xff0c;但是目前都是MATLAB的代码多&#xff0c;python几乎没有什么包&#xff0c;这次把优化算法系列的代码都从底层手写开始。 需要看以前的优化算法文章可以参考&#xff1a;Python优化算…

ChatGPT不同模型在论文写作中的优势和应用

学境思源&#xff0c;一键生成论文初稿&#xff1a; AcademicIdeas - 学境思源AI论文写作 ChatGPT在论文写作中的应用日益广泛。作为OpenAI开发的先进语言模型&#xff0c;ChatGPT有多个版本&#xff0c;包括GPT-3.5、GPT-4.0和GPT-4.0-mini&#xff0c;每个版本在性能和应用方…

如何解决Docker启动时报Status: unknown flag: --graph问题

最近在进行Docker环境迁移时&#xff0c;用二制对Docker进行了重新安装&#xff0c;一切配置好之后&#xff0c;启动Docker时&#xff0c;服务启动不起来&#xff0c;使用journalctl -xe命令查看&#xff0c;报出以下错误&#xff1a; [rootapp docker]# journalctl -xe 8月 2…

【css】伪元素实现图片悬停文字聚焦效果

实现重点&#xff1a; 文字覆盖在图片上&#xff1a; 通过使用 position: absolute 将 .box 文字盒子定位在图片上方。父容器 .img-wrap 使用了 position: relative 确保子元素的绝对定位在父容器的边界内生效。 创建悬停效果&#xff1a; 通过使用 &::before 和 &::…

Android PopupWindow弹窗动态显示在View的上下方,

序、周末不加班&#xff0c; 效果图如下。 我们要弹出的PopupWindow在View的下方&#xff0c;如果下方区域不够&#xff0c;则弹出在上方。 实现方案思路 我们在显示的时候&#xff0c;首先去计算一下弹窗高度。使用屏幕的高 - popupwind的高并且和popup的高做对比&#xff0…

ASP.NET Core SignalR 构建高效实时通信应用

目录 前言 SignalR的基本概念及其工作原理 1、核心概念 2、工作原理 前端环境准备 1、安装SignalR 2、创建SignalR连接 3、设置自动重新连接 4、监听连接状态 5、初始化连接 后端环境准备 1、注册SignalR 2、设置Hub 3、配置路由 4、发送和接收消息 实现聊天应用…

GraphRAG层级多标签文本分类任务实战(1)

1.概述 GraphRAG的本质是调用LLM生成知识图谱&#xff0c;然后在回答问题时检索相关内容输到prompt里&#xff0c;作为补充知识来辅助回答。那么有没有可能将这运用到层级多标签文本分类&#xff08;HMTC)任务中呢&#xff1f; 当然&#xff0c;乍一听有一点天方夜谭&#xf…

3 pytest Fixture

目录 3.1 通过 conftest.py 共享 fixture3.2 使用 fixture 执行配置及销毁逻辑3.3 使用 --setup-show 回溯 fixture 的执行过程3.4 使用 fixture 传递测试数据3.5 使用多个 fixture3.6 指定 fixture 作用范围3.7 使用 usefixtures 指定 fixture3.8 为常用 fixture 添加 autouse…