Kafka入门二——SpringBoot连接Kafka示例

news2024/11/15 3:29:48

实现

1.引入maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.9</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>kafka-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-demo</name>
    <description>kafka-demo</description>
    <properties>
        <java.version>8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <image>
                        <builder>paketobuildpacks/builder-jammy-base:latest</builder>
                    </image>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

2.修改application.properties配置

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.bootstrap-servers=10.20.37.85:9092
spring.kafka.consumer.group-id=demo

通过spring.kafka.consumer.auto-offset-reset配置可以指定Kafka消费者的行为,当遇到没有初始偏移量或当前偏移量不再服务器上存在的情况时的处理策略。这个配置有以下三个可选值:

  • earliest:当消费者组中没有存储的偏移量,或者偏移量超出了可用范围时,从最早的可用消息开始消费。
  • latest(默认):当消费者组中没有存储的偏移量,或者偏移量超出了可用范围时,从最新的可用消息开始消费。
  • none:当服务器上找不到消费者的偏移量时,抛出异常。

通过spring.kafka.bootstrap-servers配置可以指定Kafka的地址。这个配置的值应该是一个或多个Kafka服务器的地址,用逗号分隔。

通过spring.kafka.consumer.group-id配置可以指定Kafka消费者的组ID。这个配置的值应该是一个字符串,用于标识消费者所属的组。

定义Configuer引入kafka Bean

@Component
public class MyConfiguer {
   
    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1")
                .partitions(10)
                .replicas(1)
                .build();
    }
}

使用了@Bean注解,表示该方法将返回一个Spring管理的bean对象。在这个方法中,通过调用TopicBuilder类的name方法指定了主题的名称为"topic1",然后使用partitions方法设置了主题的分区数为10,使用replicas方法设置了主题的副本数为1。最后,调用build方法构建并返回了一个NewTopic对象。

定义生产者

@Component
public class KafkaProducer {

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("topic1", "test");
        };
    }
}

ApplicationRunner接口是Spring Boot提供的一个回调接口,用于在应用程序启动后执行一些操作。这段代码的作用是在应用程序启动后自动发送一条消息到Kafka的"topic1"主题,消息内容为"test"。

定义消费者

@Component
public class KafkaConsumer {
    @KafkaListener(id = "myId", topics = "topic1")
    public void listen(String in) {
        System.out.println(in);
    }
}

@KafkaListener注解用于标记方法作为Kafka消息的消费者。这段代码的作用是创建一个Kafka消费者,监听名为"topic1"的主题,并将接收到的消息内容打印到控制台。

总结

生产者

Kafka生产者发送消息的流程如下:

  • 创建ProducerRecord:首先,生产者需要创建一个ProducerRecord对象,该对象包含目标主题和要发送的内容。如果需要,还可以指定消息的键(key)或分区(partition)。

  • 序列化:生产者将ProducerRecord对象中的键和值进行序列化,转换为字节数组,以便能够在网络上传输。

  • 拦截器链:消息会通过一个由一个或多个拦截器组成的链。这些拦截器可以对消息进行预处理,例如添加头信息或实施自定义的逻辑。

  • 元数据加载:生产者在发送消息之前,需要加载Kafka集群的元数据,以确定各个主题的分区和副本分布情况。

  • 计算分区号:根据ProducerRecord中的键和分区策略(如果有的话),生产者会计算出消息应该发送到哪个分区。

  • 累加器缓存:消息被缓存到RecordAccumulator累加器中,这是一个按照批次处理消息的组件。

  • Sender线程发送:Sender线程负责将符合条件的消息批次发送给Kafka broker。除了发送消息外,Sender线程也负责更新元数据。

  • 等待确认:生产者可以选择等待broker的确认,以确保消息已经被成功接收并写入日志。

  • 处理响应:生产者处理来自broker的响应,包括错误处理和重试逻辑。

  • 完成发送:一旦消息被成功发送并且确认,生产者会继续发送下一批消息或者结束发送过程。
    在这里插入图片描述

消费者

Kafka消费者接收消息的流程涉及到多个关键步骤,确保了消息能够从broker高效地传送到消费者手中。以下是该过程的详细描述:

  • 创建消费者实例:需要创建一个Kafka消费者实例,这通常涉及到指定一些关键参数,如Kafka集群的地址、消费者组ID、反序列化器等。

  • 订阅主题:消费者需要订阅一个或多个感兴趣的主题。这意味着消费者希望从这些主题接收消息。

  • 协调消费者组:在ConsumerCoordinator的poll方法中,会有一个ensureActiveGroup的过程,消费者通过这个过程向Coordinator发送加入消费者组的请求,并等待Coordinator的响应。

  • 拉取消息:一旦加入了消费者组,消费者开始从订阅的主题中拉取(pull)消息。Kafka采用pull模式而不是push模式,这意味着消费者需要主动去服务器拉取消息。

  • 处理消息:消费者获取消息后,会进行相应的处理。这个处理过程可能包括数据转换、存储或者其他业务逻辑。

  • 提交消费位移:消费者在处理完消息后,需要提交消费位移。这是告诉Kafka它已经消费到了哪个位置,以便下次从正确的位置开始消费。位移的提交是由消费者自己管理的,Kafka提供了接口来更新这些位移。

  • 控制消费速率:消费者可以通过各种方式控制消息的消费速率,例如通过限制拉取操作的频率或者调整消费者的线程数。

  • 错误处理和重试:在消费过程中可能会遇到错误,比如网络问题或者消息格式错误。消费者需要有相应的错误处理机制来处理这些情况,并在必要时进行重试。

  • 关闭消费者:完成消息消费后,应当关闭消费者实例,释放资源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1467810.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Sqli-labs靶场第8关详解[Sqli-labs-less-8]

Sqli-labs-Less-8 前言&#xff1a; SQL注入的三个条件&#xff1a; ①参数可控&#xff1b;&#xff08;从参数输入就知道参数可控&#xff09; ②参数过滤不彻底导致恶意代码被执行&#xff1b;&#xff08;需要在测试过程中判断&#xff09; ③参数带入数据库执行。&#…

从源码学习单例模式

单例模式 单例模式是一种设计模式&#xff0c;常用于确保一个类只有一个实例&#xff0c;并提供一个全局访问点。这意味着无论在程序的哪个地方&#xff0c;只能创建一个该类的实例&#xff0c;而不会出现多个相同实例的情况。 在单例模式中&#xff0c;常用的实现方式包括懒汉…

Spring Boot与HikariCP:性能卓越的数据库连接池

点击下载《Spring Boot与HikariCP&#xff1a;性能卓越的数据库连接池》 1. 前言 本文将详细介绍Spring Boot中如何使用HikariCP作为数据库连接池&#xff0c;包括其工作原理、优势分析、配置步骤以及代码示例。通过本文&#xff0c;读者将能够轻松集成HikariCP到Spring Boot…

深度学习中数据的转换

原始&#xff08;文本、音频、图像、视频、传感器等&#xff09;数据被转化成结构化且适合机器学习算法或深度学习模型使用的格式。 原始数据转化为结构化且适合机器学习和深度学习模型使用的格式&#xff0c;通常需要经历以下类型的预处理和转换&#xff1a; 文本数据&#xf…

Java语言实现五子棋

目录 内容 题目 解题 代码 实现 内容 题目 五子棋 使用二维数组,实现五子棋功能. 1.使用二维数组存储五子棋棋盘 如下图 2.在控制台通过Scanner输入黑白棋坐标(例如:1,2 2,1格式 表示二维数组坐标),使用实心五角星和空心五角星表示黑白棋子. 如下图: 输入后重新输出…

Redis信创平替之TongRDS(东方通),麒麟系统安装步骤

我的系统: 银河麒麟桌面系统V10(SP1)兆芯版 1.先进入东方通申请使用 2.客服会发送一个TongRDS包与center.lic给你(我这里只拿到.tar.gz文件,没有网上的什么安装版) 3.上传全部文件到目录中 4.服务节点安装,并启动 tar -zxvf TongRDS-2.2.1.2_P3.Node.tar.gz cd pmemdb/bin/…

chatGPT 使用随想

一年前 chatGPT 刚出的时候&#xff0c;我就火速注册试用了。 因为自己就是 AI 行业的&#xff0c;所以想看看国际上最牛的 AI 到底发展到什么程度了. 自从一年前 chatGPT 火出圈之后&#xff0c;国际上的 AI 就一直被 OpenAI 这家公司引领潮流&#xff0c;一直到现在&#x…

探讨javascript中运算符优先级

如果阅读有疑问的话&#xff0c;欢迎评论或私信&#xff01;&#xff01; 本人会很热心的阐述自己的想法&#xff01;谢谢&#xff01;&#xff01;&#xff01; 文章目录 深入理解JavaScript运算符优先级运算符优先级概述示例演示示例1&#xff1a;加法和乘法运算符的优先级示…

VBA即用型代码手册:立即保护所有工作表Code及插入多工作表Code

我给VBA下的定义&#xff1a;VBA是个人小型自动化处理的有效工具。可以大大提高自己的劳动效率&#xff0c;而且可以提高数据的准确性。我这里专注VBA,将我多年的经验汇集在VBA系列九套教程中。 作为我的学员要利用我的积木编程思想&#xff0c;积木编程最重要的是积木如何搭建…

【Java程序设计】【C00277】基于Springboot的招生管理系统(有论文)

基于Springboot的招生管理系统&#xff08;有论文&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的招生管理系统 本系统分为系统功能模块、管理员功能模块以及学生功能模块。 系统功能模块&#xff1a;在系统首页可以查看首页、专业…

UE蓝图 函数调用(CallFunction)节点和源码

系列文章目录 UE蓝图 Get节点和源码 UE蓝图 Set节点和源码 UE蓝图 Cast节点和源码 UE蓝图 分支(Branch)节点和源码 UE蓝图 入口(FunctionEntry)节点和源码 UE蓝图 返回结果(FunctionResult)节点和源码 UE蓝图 函数调用(CallFunction)节点和源码 文章目录 系列文章目录一、Call…

[LWC] Components Communication

目录 Overview ​Summary Sample Code 1. Parent -> Child - Public Setter / Property / Function a. Public Property b. Public getters and setters c. Public Methods 2. Child -> Parent - Custom Event 3. Unrelated Components - LMS (Lightning Message…

LeetCode206: 反转链表.

题目描述 给你单链表的头节点 head &#xff0c;请你反转链表&#xff0c;并返回反转后的链表。 示例 解题方法 假设链表为 1→2→3→∅&#xff0c;我们想要把它改成∅←1←2←3。在遍历链表时&#xff0c;将当前节点的 next指针改为指向前一个节点。由于节点没有引用其前一…

websocket了解下

websocket请求长啥样 GET /chat HTTP/1.1 Host: example.com Upgrade: websocket Connection: Upgrade Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ Sec-WebSocket-Version: 13 啥是websocket websocket是http的一种&#xff0c;服务器可以主动向客户端推送信息&#xff0c;…

板块一 Servlet编程:第五节 Cookie对象全解 来自【汤米尼克的JAVAEE全套教程专栏】

板块一 Servlet编程&#xff1a;第五节 Cookie对象全解 一、什么是CookieCookie的源码 二、Cookie的具体操作&#xff08;1&#xff09;创建Cookie&#xff08;2&#xff09;获取Cookie&#xff08;3&#xff09;设置Cookie的到期时间&#xff08;4&#xff09;设置Cookie的路径…

lxml库和Xpath提取网页数据的基础与实战:完整指南与实战【第92篇—提取网页】

使用lxml库和Xpath提取网页数据的基础与实战 在网络爬虫和数据抓取中&#xff0c;从网页中提取所需信息是一项常见的任务。lxml库和Xpath是Python中用于解析和提取HTML/XML数据的强大工具。本文将介绍lxml库的基础知识&#xff0c;以及如何使用Xpath表达式来准确地提取网页数据…

Java----认识异常

目录 一、异常的概念与体系结构 1.异常的概念 2.异常的体系结构 3.异常的分类 1. 编译时异常 2. 运行时异常 二、异常的处理 1.认识防御式编程 2.异常的抛出 3.异常的捕获 3.1 异常声明throws throws与throw的区别&#xff1a; 3.2 try-catch捕获并处理 3.3 finally …

unity学习(38)——创建(create)角色脚本(panel)--EventSystem

1.在scripts文件夹下创建一个脚本CreatePlayerPanel.cs&#xff0c;脚本挂到panel上&#xff01;给panel加个tag&#xff0c;叫createPanel&#xff0c;脚本内容如下&#xff1a; using System.Collections; using System.Collections.Generic; using TMPro; using UnityEngin…

华为配置WLAN AC和AP之间VPN穿越示例

配置WLAN AC和AP之间VPN穿越示例 组网图形 图1 配置WLAN AC和AP之间VPN穿越示例组网图 业务需求组网需求数据规划配置思路配置注意事项操作步骤配置文件 业务需求 企业用户接入WLAN网络&#xff0c;以满足移动办公的最基本需求。且在覆盖区域内移动发生漫游时&#xff0c;不影响…

redis的搭建 RabbitMq搭建 Elasticsearch 搭建

官网 Download | Redis wget https://github.com/redis/redis/archive/7.2.4.tar.gz 编译安装 yum install gcc g tar -zxvf redis-7.2.4.tar.gz -C /usr/localcd /usr/local/redis make && make install 常见报错 zmalloc.h:50:10: fatal error: jemalloc/jemal…