SpringBoot整合Kafka (一)

news2024/11/28 12:55:16

📑前言

本文主要讲了SpringBoot整合Kafka文章,如果有什么需要改进的地方还请大佬指出⛺️

🎬作者简介:大家好,我是青衿🥇
☁️博客首页:CSDN主页放风讲故事
🌄每日一句:努力一点,优秀一点

在这里插入图片描述

目录

文章目录

  • 📑前言
  • **目录**
    • 一、介绍
    • 二、主要功能
    • 三、Kafka基本概念
    • 四、Spring Boot整合Kafka的demo
      • 1、构建项目
        • 1.1、引入依赖
        • 1.2、YML配置
        • 1.3、生产者简单生产
        • 1.4、消费者简单消费
      • 2、生产者
        • 2.1、Kafka生产者消息监听
        • 2.2、生产者写入分区策略
          • 指定写入分区
          • 根据key写入分区
          • 随机选择分区
        • 2.3、带回调的生产者
  • 📑文章末尾


Kafka

一、介绍

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景,比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。

二、主要功能

1.消息系统: Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
2.存储系统: Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。
3.日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种
consumer,例如hadoop、Hbase、Solr等。

三、Kafka基本概念

kafka是一个分布式的,分区的消息(官方称之为commit log)服务。它提供一个消息系统应该具备的功能,但是确有着独特的设计。
首先,让我们来看一下基础的消息(Message)相关术语:

Broker
消息中间件处理节点,一个Kafka节点就是一个broker,一 个或者多个Broker可以组成一个Kafka集群
Topic
Kafka根据topic对消息进行归类,发布到Kafka集群的每条 消息都需要指定一个topic
Producer
消息生产者,向Broker发送消息的客户端 Consumer 消息消费者,从Broker读取消息的客户端ConsumerGroup
每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个Consumer Group中只能有一个Consumer能够消费该消息
Partition
物理上的概念,一个topic可以分为多个partition,每个 partition内部消息是有序的

在这里插入图片描述

四、Spring Boot整合Kafka的demo

1、构建项目

1.1、引入依赖
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
1.2、YML配置
spring:
  kafka:
    bootstrap-servers: 192.168.147.200:9092 # 设置 Kafka Broker 地址。如果多个,使用逗号分隔。
    producer: # 消息提供者key和value序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 3 # 生产者发送失败时,重试发送的次数
    consumer: # 消费端反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: demo # 用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group,默认:""```
1.3、生产者简单生产
    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Test
    void contextLoads() {
        ListenableFuture listenableFuture = kafkaTemplate.send("test01-topic", "Hello Wolrd test");
        System.out.println("发送完成");
    }

1.4、消费者简单消费
@Component
public class TopicConsumer {

    @KafkaListener(topics = "test01-topic")
    public void readMsg(String msg){
        System.out.println("msg = " + msg);
    }
}

2、生产者

2.1、Kafka生产者消息监听

@Component
public class KafkaSendResultHandler implements ProducerListener {

    private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);

    @Override
    public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
        log.info("Message send success : " + producerRecord.toString());

    }
    @Override
    public void onError(ProducerRecord producerRecord, Exception exception) {
        log.info("Message send error : " + producerRecord.toString());
    }
}

2.2、生产者写入分区策略

我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka
为我们提供了默认的分区策略,同时它也支持自定义分区策略。

指定写入分区

100条数据全部写入到2号分区

for (int i = 0; i < 100; i++) {
    ListenableFuture listenableFuture = kafkaTemplate.send("demo03-topic",2,null, "toString");
    listenableFuture.addCallback(new ListenableFutureCallback() {
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("发送失败");
        }
        @Override
        public void onSuccess(Object result) {
            SendResult sendResult = (SendResult) result;
            int partition = sendResult.getRecordMetadata().partition();
            String topic = sendResult.getRecordMetadata().topic();
            System.out.println("topic:"+topic+",分区:" + partition);
        }
    });
    Thread.sleep(1000);
}

根据key写入分区
String key = "kafka-key";
System.out.println("根据key计算分区:" + (key.hashCode() % 3));
for (int i = 0; i < 10; i++) {
    ListenableFuture listenableFuture = kafkaTemplate.send("demo03-topic", key, "toString");
    listenableFuture.addCallback(new ListenableFutureCallback() {
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("发送失败");
        }

        @Override
        public void onSuccess(Object result) {
            SendResult sendResult = (SendResult) result;
            int partition = sendResult.getRecordMetadata().partition();
            String topic = sendResult.getRecordMetadata().topic();
            System.out.println("topic:" + topic + ",分区:" + partition);
        }
    });
    Thread.sleep(1000);
}
随机选择分区
 for (int i = 0; i < 10; i++) {
     ListenableFuture listenableFuture = kafkaTemplate.send("demo03-topic", "toString");
     listenableFuture.addCallback(new ListenableFutureCallback() {
         @Override
         public void onFailure(Throwable ex) {
             System.out.println("发送失败");
         }

         @Override
         public void onSuccess(Object result) {
             SendResult sendResult = (SendResult) result;
             int partition = sendResult.getRecordMetadata().partition();
             String topic = sendResult.getRecordMetadata().topic();
             System.out.println("topic:" + topic + ",分区:" + partition);
         }
     });
     Thread.sleep(1000);
 }
2.3、带回调的生产者

kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理

for (int i = 0; i < 10; i++) {
    ListenableFuture listenableFuture = kafkaTemplate.send("demo03-topic",2,null, "toString");
    listenableFuture.addCallback(new ListenableFutureCallback() {
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("发送失败");
        }
        @Override
        public void onSuccess(Object result) {
            SendResult sendResult = (SendResult) result;
            int partition = sendResult.getRecordMetadata().partition();
            String topic = sendResult.getRecordMetadata().topic();
            System.out.println("topic:"+topic+",分区:" + partition);
        }
    });
    Thread.sleep(1000);
}

以上是简单的Spring Boot整合kafka的示例,可以根据自己的实际需求进行调整。

📑文章末尾

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1177019.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据库备份和恢复

备份&#xff1a; 完全备份 增量备份 完全备份&#xff1a;将整个数据库完整的进行备份。 增量备份&#xff1a;在完全备份的基础之上&#xff0c;对后续新增的内容进行备份。 备份的需求&#xff1a; 备份的需求&#xff1a; 1、在生产环境中&#xff0c;数据的安全至关重…

计算领域高质量科技期刊分级目录

最后需要文件的小伙伴可以私信我哦&#xff0c;也可以自行在官网下载

汽车网络安全渗透测试概述

目录 1.汽车网络安全法规概述 1.1 国外标准 1.2 国内标准 2.汽车网络安全威胁分析 2.1 汽车网络安全资产定义 2.2 汽车网络安全影响场景及评级示例 3.汽车网络安全渗透测试描述 3.1 参考法规 3.2 渗透测试内容 4.小结 1.汽车网络安全法规概述 近年来&#xff0c;汽车…

UE5——源码阅读——100——渲染——高清截图

创建事件&#xff0c;用于代码的调试 获取当前客户端所属的World 标记是否在进行重入绘制 是否开始缓存区可视化转存帧&#xff0c;主要针对请求屏幕截图或电影转存 判断是否需要高清截图 这下面这个函数执行高清截图 是否需要缓存区的可视化转存 判断是否开始渲染 如果…

总结Kibana DevTools如何操作elasticsearch的常用语句

一、操作es的工具 ElasticSearch HeadKibana DevToolsElasticHQ 本文主要是总结Kibana DevTools操作es的语句。 二、搜索文档 1、根据ID查询单个记录 GET /course_idx/_doc/course:202、term 匹配"name"字段的值为"6789999"的文档 类似于sql语句中的等…

自查看看自己转本复习中存在哪些问题

缺乏清晰、强力的奋斗目标 “我想考本科”只不过是一种笼统的泛化的模糊的专转本目标&#xff0c;对潜意识学习潜能的刺激力度不大。 专转本备考中&#xff0c;更需要一个恒久、量化、清晰、明确、具体的目标牵引自我潜意识去努力&#xff0c;克服学习中的重重困难。比如一所…

软件测试丨从外包到自研再到大厂——我是怎么从6k到25k的

功能测试&#xff1a;理论上说&#xff0c;该定位的测试人员应该是对业务需求理解最透彻的群体&#xff0c;专注于用户角度的测试&#xff0c;组织整体质量实践&#xff0c;分析测试运行结果&#xff0c;驱动测试执行。当然除了业务技能过硬&#xff0c;常用的测试工具也是必须…

VMware安装CentOS最小化开发环境导引

目录 一、概要 二、介绍 三、下载 四、安装 4.1 创建虚拟机 4.2 安装CentOS 五、配置网卡 六、配置本地安装源 七、安装软件 7.1 gcc/g 7.2 C的atomic库 7.3 java 7.4 Cmake 7.5 MariaDB客户端&#xff08;兼容mysql&#xff09; 八、用户配置文件.bash_profile…

Python小试牛刀:GUI(图形界面)实现计算器UI界面(三)

上一篇&#xff1a;Python小试牛刀&#xff1a;GUI&#xff08;图形界面&#xff09;实现计算器UI界面(二)-CSDN博客 回顾前两篇文章&#xff0c;第一篇文章主要实现了计算器UI界面如何布局&#xff0c;以及简单概述Python常用的GUI库。第二篇文章主要实现了计算器UI界面按钮组…

shiro-cve2016-4437漏洞复现

一、漏洞特征 Apache Shiro是一款开源强大且易用的Java安全框架&#xff0c;提供身份验证、授权、密码学和会话管理。Shiro框架直观、易用&#xff0c;同时也能提供健壮的安全性。 因为在反序列化时,不会对其进行过滤,所以如果传入恶意代码将会造成安全问题 在 1.2.4 版本前, 加…

Disk Drill v5.3.1313(数据恢复备份)

Disk Drill是一款功能强大的数据恢复软件&#xff0c;它可以帮助用户恢复已删除、丢失、格式化或损坏的文件&#xff0c;并支持多种存储设备&#xff0c;包括计算机硬盘驱动器、外部硬盘、USB闪存驱动器、内存卡和其他存储介质。它和很多的文件系统都兼容&#xff0c;比如&…

如何快速搭建本地书库结合内网穿透实现公网随时远程访问

Kindle中国电子书店停运不要慌&#xff0c;十分钟搭建自己的在线书库随时随地看小说&#xff01; 文章目录 Kindle中国电子书店停运不要慌&#xff0c;十分钟搭建自己的在线书库随时随地看小说&#xff01;1.网络书库软件下载安装2.网络书库服务器设置3.内网穿透工具设置4.公网…

高压放大器在微机电工程中有哪些应用

高压放大器在微机电工程中有许多重要的应用。微机电工程是一种利用微电子加工技术制造微米级或纳米级机械结构并与电子元件集成的领域。高压放大器在MEMS领域发挥着关键的作用&#xff0c;下面将介绍几个高压放大器在MEMS工程中的应用。 MEMS驱动器&#xff1a;MEMS驱动器常用于…

CLIP Surgery论文阅读

CLIP Surgery for Better Explainability with Enhancement in Open-Vocabulary Tasks&#xff08;CVPR2023&#xff09; M norm ⁡ ( resize ⁡ ( reshape ⁡ ( F i ˉ ∥ F i ‾ ∥ 2 ⋅ ( F t ∥ F t ‾ ∥ 2 ) ⊤ ) ) ) M\operatorname{norm}\left(\operatorname{resize}\…

Linux认识协议

目录 TCP协议通信流程TCP三次握手数据传输过程四次挥手过程TCP 和 UDP 对比 认识协议协议的概念结构化数据的传输序列化和反序列化 网络版计算器服务端代码面向字节流 协议定制客户端代码编写代码测试守护进程守护进程创建 关于协议制定中使用现成方法实现 TCP协议通信流程 下…

华为H12-831题库

单选&#xff09;当IS-IS网络中有多条冗余链路时&#xff0c;可能会出现多条等价路由。关于IS-IS网络内的等价路由&#xff0c;以下哪个描述是错误的? A、当组网中存在的等价路由数量大于通过命令配置的数量&#xff0c;且这些路由优先级相同时&#xff0c;优选下一跳设备Sys…

三、机器学习基础知识:Python常用机器学习库(SKlearn)

文章目录 1、Scikit learn简介2、主要步骤3、数据预处理4、模型选择与算法评价 1、Scikit learn简介 Scikit learn 的简称是 SKlearn&#xff0c;专门提供了 Python 中实现机器学习的模块。Sklearn 是一个简单高效的数据分析算法工具&#xff0c;建立在 NumPy、SciPy和 Matplo…

深入研究Android内存

深入研究Android内存 Android应用程序开发时&#xff0c;我们始终意识到自己是Android硬件和操作系统的一部分。 从硬件角度来看&#xff0c;我们可以将Android手机分为不同级别。基本上&#xff0c;我们可以将它们分类为适用于低端硬件或API的入门级、中级和高级硬件-API手机…

Python+Selenium WebUI自动化框架 -- 基础操作封装

前言&#xff1a; 封装Selenium基本操作&#xff0c;让所有页面操作一键调用&#xff0c;让UI自动化框架脱离高成本、低效率时代&#xff0c;将用例的重用性贯彻到极致&#xff0c;让烦人的PO模型变得无所谓&#xff0c;让一个测试小白都能编写并实现自动化。 知识储备前提&a…