SpingBoot集成kafka发送读取消息

news2025/1/11 20:54:30

SpingBoot集成kafka开发

    • kafka的几个常见概念
  • 1、springboot和kafka对应版本(重要)
  • 2、创建springboot项目,引入kafka依赖
    • 2.1、生产者EventProducer
    • 2.2、消费者EventConsumer
    • 2.3、启动生产者的方法SpringBoot01KafkaBaseApplication
    • 2.4、application.yml
    • 2.5、pom.xml
    • 2.6、启动springboot项目的启动类(Application)报错
  • 3、springboot集成kafka读取最早的消息
    • 3.1、如何设置消费者auto-offset-reset: earliest
    • 3.2、设置消费者auto-offset-reset: earliest后存在的问题
      • 3.2.1、修改消费组ID
      • 3.2.2、手动重置偏移量
          • 3.2.2.1、手动将偏移量设置为最早
          • 3.2.2.2、手动将偏移量设置为最新

kafka的几个常见概念

在这里插入图片描述

1、springboot和kafka对应版本(重要)

https://spring.io/projects/spring-kafka

在这里插入图片描述

2、创建springboot项目,引入kafka依赖

在这里插入图片描述

2.1、生产者EventProducer

package com.power.producer;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    public void sendEvent(){
        kafkaTemplate.send("hello-topic","hello kafka");
    }
}

2.2、消费者EventConsumer

package com.power.consumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class EventConsumer {

    //采用监听的方式接收事件(消息,数据)
    @KafkaListener(topics = {"hello-topic"},groupId="hello-group")
    public void onEvent(String event){
        System.out.println("读取到的事件:"+event);
    }
}

2.3、启动生产者的方法SpringBoot01KafkaBaseApplication

执行一次该方法,会调用一次生产者发送一次消息。
即每执行一次,会调用EventProducer类下的sendEvent方法一次。

package com.power;

import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
public class SpringBoot01KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void test01(){
        eventProducer.sendEvent();
    }
}

2.4、application.yml

spring:
  application:
    #应用名称
    name: spring-boot-01-kafka-base

  #kafka连接地址(ip+port)
  kafka:
    bootstrap-servers: <你的服务器ip>:9092
    #配置生产者(有24个配置)
    #producer:
    #配置消费者(有24个配置)
    #consumer:

启动服务后发现报错:
在这里插入图片描述

2.5、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.2</version>
        <relativePath />
    </parent>

    <groupId>org.powernode</groupId>
    <artifactId>spring-boot-01-kafka-base</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafkaSpringBootProject</name>
    <description>kafka project for Spring Boot</description>

    <properties>
        <java.version>8</java.version>
    </properties>


    <repositories>
        <repository>
            <id>central</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <layout>default</layout>
            <!-- 是否开启发布版构件下载 -->
            <releases>
                <enabled>true</enabled>
            </releases>
            <!-- 是否开启快照版构件下载 -->
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>2.8.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

2.6、启动springboot项目的启动类(Application)报错

项目启动类

package com.power;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
		System.out.println("启动成功--------------------------");
	}
}

修改server.properties配置文件:
在这里插入图片描述修改前:

在这里插入图片描述
修改后:
在这里插入图片描述

3、springboot集成kafka读取最早的消息

已经被消费者读取/消费的消息,无法被新启动的消费组消息的,那么新启动的消费组该如何读取最早的消息呢,可以通过设置消费者auto-offset-reset: earliest去实现。
在这里插入图片描述

3.1、如何设置消费者auto-offset-reset: earliest

在这里插入图片描述

1、修改application.yml
在这里插入图片描述

3.2、设置消费者auto-offset-reset: earliest后存在的问题

在这里插入图片描述

3.2.1、修改消费组ID

原消费组ID
在这里插入图片描述
修改后的消费组ID
在这里插入图片描述4、新的消费组ID成功读取到之前的消息
在这里插入图片描述

3.2.2、手动重置偏移量

3.2.2.1、手动将偏移量设置为最早
#示例:./kafka-consumer-groups.sh --bootstrap-server <your-kafka-bootstrap-servers> --group <your-consumer-group> --topic <your-topic> --reset-offsets --to-earliest --execute

来到kafka安装目录下:

在这里插入图片描述执行如下命令:

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group hello-group --topic hello-topic --reset-offsets --to-earliest --execute

执行后报错

在这里插入图片描述
需要先停掉服务,在去手动重置偏移量,此时重置偏移量成功,偏移量为0

3.2.2.2、手动将偏移量设置为最新
#示例:./kafka-consumer-groups.sh --bootstrap-server <your-kafka-bootstrap-servers> --group <your-consumer-group> --topic <your-topic> --reset-offsets --to-latest --execute
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group hello-group --topic hello-topic --reset-offsets --to-latest --execute

设置成功,此时偏移量已为最新:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2061058.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

小程序学习day10-自定义组件的data数据、方法、属性,data与properties的关系、自定子组件之数据监听器

39、WXS脚本&#xff08;小程序独有的一套脚本语言&#xff09;&#xff08;续&#xff09; &#xff08;6&#xff09;自定义组件的data数据、方法、属性 1&#xff09;自定义组件的data数据&#xff08;在小程序组件中&#xff0c;用于组件模版渲染的私有数据&#xff09; …

定格精彩瞬间!详解六自由度技术原理及应用

在体育赛事中&#xff0c;观赏各项目的精彩瞬间&#xff0c;欣赏运动员的卓越表现是观众们最为关注的焦点。以体操跳马为例&#xff0c;运动员们全力助跑&#xff0c;然后奋力起跳、腾空&#xff0c;接着精准的推手和转体动作&#xff0c;最后稳稳落地&#xff0c;整个动作行云…

【JAVA CORE_API】Day19 多线程API(2)、多线程并发安全问题、同步

多线程API 进程和线程 进程&#xff1a;进程就像是一个程序在电脑里运行时的一个实例。你可以把它想象成一个独立的小工人&#xff0c;专门负责完成某项任务&#xff08;比如打开浏览器、播放音乐&#xff09;。每个进程都有自己独立的资源&#xff08;比如内存&#xff09;和…

python 可迭代,迭代器,生成器,装饰器

1. 可迭代&#xff08;Iterable&#xff09; 可迭代 是指一个对象可以返回一个迭代器的对象。也就是说&#xff0c;它实现了 __iter__() 方法或 __getitem__() 方法。常见的可迭代对象有列表、元组、字符串、字典和集合。 from collections.abc import Iterablei 100 s &qu…

墙裂推荐!云上机密计算,阿里云上体验了一下海光内存加密和远程认证

&#x1f339;作者主页&#xff1a;青花锁 &#x1f339;简介&#xff1a;Java领域优质创作者&#x1f3c6;、Java微服务架构公号作者&#x1f604; &#x1f339;简历模板、学习资料、面试题库、技术互助 &#x1f339;文末获取联系方式 &#x1f4dd; 机密计算目录 前言1、构…

哈希原理实现

本节主要看源代码实现 哈希特点 哈希&#xff08;Hashing&#xff09;是一种将数据映射到固定大小的表中以实现快速查找的数据结构和算法方法。哈希的主要特点包括&#xff1a; 1. 高效的查找、插入和删除 时间复杂度&#xff1a;哈希表通常提供近乎常数时间的查找、插入和…

app安全评估报告的常见留存措施(内附独家资料)

对用户账号、操作时间、操作类型、网络源地址和目标地址、网络源端口、客户端硬件特征等日志信息以及用户发布信息记录的留存措施 1**. 用户账号信息**&#xff1a;我们将用户账号信息安全存储&#xff0c;只有授权的人员能够访问。这些信息包括用户名、电子邮件地址等&#xf…

【C++ 面试 - 面向对象】每日 3 题(六)

✍个人博客&#xff1a;Pandaconda-CSDN博客 &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/fYaBd &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 C 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞&#x1f44d;收藏&…

VAuditDemo审计之二次注入漏洞

目录 VAuditDemo二次注入漏洞 搜索危险函数&#xff0c;用户可控点 regCheck.php messageSub.php message.php 漏洞调用链 漏洞错误利用过程 注册用户 xxxx, 发表payload留言 漏洞正确利用过程 注册用户 wwww\ 退出用户 wwww\\ 使用 wwww\ 登录 发表留言 替换dat…

《javaEE篇》--定时器

定时器概念 当我们不需要某个线程立刻执行&#xff0c;而是在指定时间点或指定时间段之后执行&#xff0c;假如我们要定期清理数据库里的一些信息时&#xff0c;如果每次都手动清理的话就太麻烦&#xff0c;所以就可以使用定时器。定时器就可以比作一个闹钟&#xff0c;可以让…

C++ 设计模式(6. 适配器模式)

适配器模式Adapter Pattern是一种结构型设计模式&#xff0c;它可以将一个类的接口转换成客户希望的另一个接口&#xff0c;主要目的是充当两个不同接口之间的桥梁&#xff0c;使得原本接口不兼容的类能够一起工作。基本结构 Target 是目标接口&#xff0c;Adaptee 是需要适配的…

微信小程序实例代码解读

以微信 小程序开发工具给的示例代码为例&#xff1a; 主页代码&#xff1a; index.wxml 这个文件是一个微信小程序页面的 WXML 结构,主要功能是展示一个快速开始教程的步骤和内容。 源代码&#xff1a; <!--index.wxml--> <view class"container">&l…

ZK-Rollups测评

1. 引言 Matter Labs团队和多个高校研究人员一起&#xff0c;发布2024年论文《Analyzing and Benchmarking ZK-Rollups》&#xff0c;开源代码见&#xff1a; https://github.com/StefanosChaliasos/zkrollup-benchmarking&#xff08;Python&#xff09; 其中&#xff1a; …

安装MySQL入门基础指令

一.安装MySQL(以5.7版本为例) 1.一路默认安装&#xff0c;截图供大家参考 修改自己window安装名字即可 2.配置环境变量 C:\Program Files\MySQL\MySQL Server 5.7\bin 写入系统环境变量即可在window窗口使用其服务了 3.登录MySQL服务 进入控制台输入命令 mysql -u root …

运维小技能:基于Windows系统和‌Linux系统,以tomcat为案例,讲解如何新增自启动服务。

文章目录 引言‌I Linux系统‌(以CentOS为例)基础知识:运行级别(run level)基于chkconfig 工具,设置服务启动类型。基于systemctl 新增系统服务II 基于Windows系统设置服务自启动的常规操作安装多个tomcat服务,并设置自启动。III 扩展制定定时任务优化停止Tomcat服务命令引…

ESP32Cam人工智能教学20

ESP32Cam人工智能教学20 ESP32Cam专用APP 这次我们专门为ESP32Cam量身定制一个手机APP。手机APP是客户端&#xff0c;利用Socket连接ESP32Cam&#xff0c;ESP32Cam成了服务器&#xff0c;实现Socket全双工的数据传输模式&#xff0c;还可以一边显示摄像头图像&#xff0c;一边…

【Canvas与诗词】北岛诗《献给遇罗克》节选(以太阳的名义...)

【成图】 【代码】 <!DOCTYPE html> <html lang"utf-8"> <meta http-equiv"Content-Type" content"text/html; charsetutf-8"/> <head><title>以太阳的名义</title><style type"text/css">…

基类没有虚析构,即使派生类使用智能指针也一定会内存泄漏

实验 定义一个基类和一个派生类 class Base { public:virtual ~Base() default; };class Derive :public Base { public:std::shared_ptr<int> sp{new int{0},[](int *p){delete p;std::cout << "删除器" << endl;},}; };main 函数执行如下代码…

作业08.21

服务器&#xff1a; #include <myhead.h>#define SER_PORT 6666 #define SER_IP "127.0.0.1"int find_client(int *client_arr, int len, int client) {for(int i0; i<len; i){if(client_arr[i] client){return i;}}return -1; }void remove_client(int *…

Mac 使用vscode 创建vue项目后修改文件提示:权限不足,以超级用户身份重试

项目场景&#xff1a; Mac 安装了全局 vue-cli 插件后&#xff0c;使用webpack 创建vue项目&#xff0c;打开项目&#xff0c;选择信任所有文件夹&#xff0c;然后正常编写代码&#xff0c;并对项目中的文件进行修改&#xff0c;点击保存的时候提示&#xff1a;保存“webpack.…