SpringBoot整合Kafka (二)

news2025/1/23 12:13:32

📑前言

本文主要讲了SpringBoot整合Kafka文章,如果有什么需要改进的地方还请大佬指出⛺️
上文链接:SpringBoot整合Kafka (一)

🎬作者简介:大家好,我是青衿🥇
☁️博客首页:CSDN主页放风讲故事
🌄每日一句:努力一点,优秀一点

在这里插入图片描述

目录

文章目录

  • 📑前言
  • **目录**
    • 一、介绍
    • 二、主要功能
    • 三、Kafka基本概念
    • 四、Spring Boot整合Kafka的demo
      • 1、构建项目
        • 1.1、引入依赖
        • 1.2、YML配置
        • 1.3、生产者简单生产
        • 1.4、消费者简单消费
      • 2、消费者
        • 2.1、Kafka应答机制
          • ACK应答级别
        • 2.2、Kafka消息消费确认机制
          • 自动提交
          • 手动提交
        • 2.3、指定消费
          • 监听一个主题,指定分区消费消息
  • 📑文章末尾


一、介绍

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景,比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目

二、主要功能

1.消息系统: Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
2.存储系统: Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。
3.日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种\nconsumer,例如hadoop、Hbase、Solr等。

三、Kafka基本概念

kafka是一个分布式的,分区的消息(官方称之为commit log)服务。它提供一个消息系统应该具备的功能,但是确有着独特的设计。首先,让我们来看一下基础的消息(Message)相关术语:
Broker
消息中间件处理节点,一个Kafka节点就是一个broker,一 个或者多个Broker可以组成一个Kafka集群
Topic
Kafka根据topic对消息进行归类,发布到Kafka集群的每条 消息都需要指定一个topic
Producer
消息生产者,向Broker发送消息的客户端 Consumer 消息消费者,从Broker读取消息的客户端
ConsumerGroup
每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个ConsumerGroup中只能有一个Consumer能够消费该消息
Partition
物理上的概念,一个topic可以分为多个partition,每个 partition内部消息是有序的在这里插入图片描述

四、Spring Boot整合Kafka的demo

1、构建项目

1.1、引入依赖
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
1.2、YML配置
spring:
  kafka:
    bootstrap-servers: 192.168.147.200:9092 # 设置 Kafka Broker 地址。如果多个,使用逗号分隔。
    producer: # 消息提供者key和value序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 3 # 生产者发送失败时,重试发送的次数
    consumer: # 消费端反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: demo # 用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group,默认:""
1.3、生产者简单生产
@Autowired
private KafkaTemplate kafkaTemplate;

@Test
void contextLoads() {
    ListenableFuture listenableFuture = kafkaTemplate.send("test01-topic", "Hello Wolrd test");
    System.out.println("发送完成");
}
1.4、消费者简单消费
@Component
public class TopicConsumer {

    @KafkaListener(topics = "test01-topic")
    public void readMsg(String msg){
        System.out.println("msg = " + msg);
    }
}

2、消费者

2.1、Kafka应答机制

在生产者(producer)往Kafka发送数据的进程中,为了确保数据能够发送到指定的topic中,topic中的每一个partition在收到数据后,都需要向生产者发送 ack(ackacknowledgement)。

假设 producer 在必定的时间内收不到应对,那么producer会再次向Kafka发送此条数据。这就类似于写信,假定我们写一封信给或人,然后我们会在一段时间后收到一封回信,但假设超过了一个月我们还没有收到回信,就会猜想是不是信件丢掉了,会将这封信进行从头发送,直到收到回信中止。

ACK应答级别

一、0
介绍:生产者发送过来的数据,不需要等数据落盘应答
数据可靠性分析:容易丢数据
丢失数据原因:生产者发送完成后,Leader没有接收到数据,但是生产者认为已经发送成功了

二、1
介绍:生产者发送过来的数据,Leader收到数据后应答
数据可靠性分析:容易丢数据
丢失数据原因:应答完成后,还没开始同步副本,Leader挂了,新的Leader不会收到同步的消息,因为生产者已经认为发送成功了

三、-1(all)
介绍:生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答
数据可靠性分析:可靠

spring:
  kafka:
    bootstrap-servers: 192.168.***.***:9092 # 设置 Kafka Broker 地址。如果多个,使用逗号分隔。
    producer: # 消息提供者key和value序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 3 # 生产者发送失败时,重试发送的次数
      properties:
        linger.ms: 0 # spring.kafka.producer.properties.linger.ms=0,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
      acks: 1 # 修改ACK应答级别,默认是1
2.2、Kafka消息消费确认机制

Kafka消费消息确认机制分为两种:自动确认和手动确认。
(1)自动确认:在自动确认模式下,Kafka消费者消费一条消息后,会自动将消息偏移量提交到服务器端,不需要手动进行确认,从而确保消息被有效处理。此种确认机制的优点是操作简单,但是可能会导致消息重复消费,即当消费者处理消息的过程中出现异常,导致偏移量提交失败,下一次启动时就会重新消费之前已经处理过的消息。
(2)手动确认:在手动确认模式下,消费者需要显式地调用commit()方法,将消息的偏移量提交到服务器端,才会被标记为已处理。手动确认模式下,可以避免重复消费的问题,但是需要开发者自己实现确认逻辑,增加了一定的开发复杂度。
总的来说,自动确认适用于对消息的可靠性要求不高、实时性较高的场景;手动确认适用于对消息的可靠性要求较高、不要求实时性的场景

自动提交

这种提交方式有两个很重要的参数:
enable.auto.commit=true(是否开启自动提交,true or false)
auto.commit.interval.ms=5000(提交偏移量的时间间隔,默认5000ms)
每隔5秒,消费者会自动把从poll方法接收到的最大偏移量提交上去。自动提交是在轮询中进行,消费者每次轮询时都会检查是否提交该偏移量。可是这种情况会发生重复消费和丢失消息的情况。

server:
  port: 18082
spring:
  kafka:
    bootstrap-servers: 192.168.***.***:9092,192.168.***.***:9093,192.168.***.***:9094
    consumer: # consumer消费者
      group-id: consumergroup # 默认的消费组ID
      enable-auto-commit: true # 是否自动提交offset
      auto-commit-interval: 120000
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

设置enable-auto-commit: true,开启自动提交,也就是偏移量不需要我们手动提交,程序会自己提交。
设置auto.commit.interval.ms=120000,也就是消费后,不会立即提交,会在2分钟后提交,只要在这期间服务异常终止,偏移量就无法提交到Broker,再次启动,会重复消费。

手动提交

手动提交模式可以有效确保消息不丢失以及不重复消费

MANUAL:poll()拉取一批消息,处理完业务后,手动调用Acknowledgment.acknowledge()先将offset存放到map本地缓存,在下一次poll之前从缓存拿出来批量提交。
我们可以先测试一下MANUAL模式,只需要需改配置application.yml即可:

spring:
  kafka:
    bootstrap-servers: 192.168.***.***:9092,192.***.***.130:9093,192.***.***.130:9094
    consumer: # consumer消费者
      group-id: consumergroup # 默认的消费组ID
      enable-auto-commit: false # 是否自动提交offset
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      ack-mode: manual # 手动添加偏移量

消费者代码

@KafkaListener(topics = {"itmentu"},groupId = "itmentuGroup")
public void listener(ConsumerRecord<String,String> record, Acknowledgment ack){
    //获取消息
    String message = record.value();
    //消息偏移量
    long offset = record.offset();
    System.out.println("读取的消息:"+message+"\n当前偏移量:"+offset);

    //手动提交偏移量
    ack.acknowledge();
}
2.3、指定消费

属性解释:
id:消费者ID
groupId:消费组ID
topics:监听的topic,可监听多个
topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听,手动分区。

监听一个主题,指定分区消费消息
    /**
     * 监听一个主题,且指定消费主题的哪些分区。
     * 参数详解:消费者组=apple_group;监听主题=iphoneTopic;只消费的分区=1,2;消费者数量=2
     * @param record
     */
    @KafkaListener(
            groupId = APPLE_GROUP,
            topicPartitions = {
                    @TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"})
            },
            concurrency = "2"
    )
    public void consumeByPattern(ConsumerRecord<String, String> record) {
        System.out.println("consumeByPattern");
        System.out.printf(
                "主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
                record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value(),
                record.timestamp()
        );

以上是简单的Spring Boot整合kafka的示例,可以根据自己的实际需求进行调整。

📑文章末尾

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1181989.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

图论——并查集

参考内容&#xff1a; 图论——并查集(详细版) 并查集&#xff08;Disjoint-set&#xff09;是一种精巧的树形数据结构&#xff0c;它主要用于处理一些不相交集合的合并及查询问题。一些常见用途&#xff0c;比如求联通子图、求最小生成树的 Kruskal 算法和求最近公共祖先&…

【C语言】数据结构——无头单链表实例探究

&#x1f497;个人主页&#x1f497; ⭐个人专栏——数据结构学习⭐ &#x1f4ab;点击关注&#x1f929;一起学习C语言&#x1f4af;&#x1f4ab; 目录 导读&#xff1a;1. 单链表1.1 什么是单链表1.2 优缺点 2. 实现单链表基本功能2.1 定义结构体2.2 单链表打印2.3 销毁单链…

【有源码】基于uniapp的农场管理小程序springboot基于微信小程序的农场检测系统(源码 调试 lw 开题报告ppt)

&#x1f495;&#x1f495;作者&#xff1a;计算机源码社 &#x1f495;&#x1f495;个人简介&#xff1a;本人七年开发经验&#xff0c;擅长Java、Python、PHP、.NET、微信小程序、爬虫、大数据等&#xff0c;大家有这一块的问题可以一起交流&#xff01; &#x1f495;&…

10 路由协议:西出网关无故人,敢问路在何方

1.网络包出了网关之后&#xff0c;就有了一种漂泊的悲凉感 2.之前的场景是比较简单的场景&#xff0c;但是在实际生产环境下&#xff0c;出了网关&#xff0c;会面临着很多路由器&#xff0c;有很多条道路可以选。 3、如何配置路由&#xff1f; 路由表的设计 1.路由器就是一…

高校教务系统登录页面JS分析——西安外国语大学教务系统

高校教务系统密码加密逻辑及JS逆向 本文将介绍高校教务系统的密码加密逻辑以及使用JavaScript进行逆向分析的过程。通过本文&#xff0c;你将了解到密码加密的基本概念、常用加密算法以及如何通过逆向分析来破解密码。 本文仅供交流学习&#xff0c;勿用于非法用途。 一、密码加…

学习笔记:CANOE模拟LIN主节点和实际从节点进行通信测试

先写点感想&#xff0c;在LIN开发阶段&#xff0c;我一般用图莫斯USB工具来进行模拟主机节点发送数据。后来公司买了CANOE工具就边学习边搭建了LIN的测试工程&#xff0c;网上的资料真的很少&#xff0c;主要是靠自己一点点摸索前进&#xff0c;总算入门。几个月后的今天&#…

网工内推 | 售后工程师,IP认证优先,最高15薪,年底有分红

01 威发系统&#xff08;中国&#xff09;有限公司 招聘岗位&#xff1a;售后工程师 职责描述&#xff1a; 1、负责各种规模的项目售后安装、调试和维护工作&#xff1b; 2、解决工程和维护中的一般技术问题&#xff0c;支持、协助处理其他相关的技术问题&#xff1b; 3、与…

如何像优秀测试人员那样思考?

优秀测试和普通测试之间的差别在于测试人员如何思考&#xff1a;测试人员的测试设计选择&#xff0c;解释所观察到的能力&#xff0c;以及非常令人信服地分析描述这些现象的能力。 然而&#xff0c;在实际工作中&#xff0c;我们更多的看到了测试人员在电脑前点点点&#xff0…

文件批量改名:轻松批量重命名快手素材文件,提高工作效率

文件名太长&#xff0c;文件太多有什么办法可以一键改名呢&#xff1f; 在日常繁琐的工作中&#xff0c;我们经常需要整理大量的文件&#xff0c;而为了更高效地管理和快速查找&#xff0c;对文件进行重命名显得尤为关键。然而&#xff0c;传统的手动逐个重命名方式不仅耗时&a…

算法学习打卡day41|栈和队列:栈和队列相互实现、括号匹配、逆波兰表达式、滑动窗口最大值问题、求前 K 个高频元素

栈和队列相互实现 力扣题目链接&#xff1a;用栈实现队列、用队列实现栈 题目描述&#xff1a; 请你仅使用两个栈实现先入先出队列。队列应当支持一般队列支持的所有操作&#xff08;push、pop、peek、empty&#xff09;&#xff1a; 实现 MyQueue 类&#xff1a; void push(…

2 任务2: 使用趋动云GPU进行猫狗识别实践

使用趋动云GPU进行猫狗识别实践 1 创建项目2 初始化开发环境3 调试代码4 提交离线任务5 结果集存储与下载 使用趋动云提供的免费GPU&#xff0c;进行猫狗识别实践。 虽然例程里面提供的是基于tensorflow的&#xff0c;但是你也可以使用pytorch的代码 使用这个平台的一个优点就是…

1-前端基本知识-CSS

1-前端基本知识-CSS 文章目录 1-前端基本知识-CSS总体概述什么是CSS&#xff1f;CSS引入方式行内式内嵌式连接式/外部样式表 CSS选择器元素选择器id选择器class选择器&#xff08;使用较广&#xff09; CSS浮动CSS定位静态定位&#xff1a;static绝对定位&#xff1a;absolute相…

“精准分割视频,误差降低至零——视频剪辑的新革命!”

在数字媒体的时代&#xff0c;视频剪辑已经成为我们日常生活和工作中不可或缺的一部分。无论是制作一部电影、剪辑一个纪录片&#xff0c;还是编辑一个家庭视频&#xff0c;我们都需要一个精准、高效的视频剪辑工具。今天&#xff0c;我们向您推荐一款全新的视频剪辑软件——精…

[NLP] 使用Llama.cpp和LangChain在CPU上使用大模型

一 准备工作 下面是构建这个应用程序时将使用的软件工具: 1.Llama-cpp-python 下载llama-cpp, llama-cpp-python [NLP] Llama2模型运行在Mac机器-CSDN博客 2、LangChain LangChain是一个提供了一组广泛的集成和数据连接器&#xff0c;允许我们链接和编排不同的模块。可以常…

关于卷积神经网络的步幅(stride)

认识步幅&#xff08;stride&#xff09; 卷积核从输入数组的最左上方开始&#xff0c;按从左往右、从上往下的顺序&#xff0c;依次在输入数组上滑动&#xff0c;我们将每次滑动的行数和列数称为步幅。 计算步幅 假设输入的形状n∗n&#xff0c;卷积核的形状为f∗f&#xff0…

域渗透06-协议(NTLM hash利用)

前言&#xff1a; 当我们获取到一台域内主机打算干什么&#xff0c;毫无疑问当然是拿域控&#xff0c;如果域控未发现漏洞应该怎么办&#xff0c;首先我们需要查看我们拿到主机的权限和在域中的组&#xff0c;如果本机权限够我们就需要利用工具抓取本机的hash&#xff0c;然后…

HCIE-CCE

1、创建集群 svc网络&#xff0c;10.247 pod网络&#xff0c;10.244 节点网络&#xff0c;192.168.66&#xff08;master和node一致&#xff09; 2、创建节点 上面集群选择了最新版本1.27&#xff0c;CCE从1.27版本开始不再支持docker容器引擎&#xff0c;仅支持containered&…

渗透实战靶机3wp

0x00 简介 目标IP&#xff1a;xxxx.95 测试IP&#xff1a;xxxx.96 测试环境&#xff1a;win10、kali等 测试时间&#xff1a;2021.7.23-2021.7.26 测试人员&#xff1a;ruanruan 0x01 信息收集 1、端口扫描 21&#xff0c;ftp&#xff0c;ProFTPD&#xff0c;1.3.3c22&a…

Oracle 安装及 Spring 使用 Oracle

参考内容&#xff1a; docker安装oracle数据库史上最全步骤&#xff08;带图文&#xff09; Mac下oracle数据库客户端 Docker安装Oracle docker能安装oracle吗 Batch script for add a auto-increased primary key for exist table with records Docker 安装 Oracle11g 注意&a…

基于单片机的甲醛检测器设计

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 技术交流认准下方 CSDN 官方提供的联系方式 文章目录 概要 一、设计的主要内容二、系统硬件设计三、软件设计4.1 程序结构流程图原理图 四、结论五、 文章目录 概要 本文将要提…