Kafka是什么,以及如何使用SpringBoot对接Kafka

news2025/1/20 10:47:02

系列文章目录

上手第一关,手把手教你安装kafka与可视化工具kafka-eagle



在这里插入图片描述
继上一次教大家手把手安装kafka后,今天我们直接来到入门实操教程,也就是使用SpringBoot该怎么对接和使用kafka。当然,在一开始我们也会比较细致的介绍一下kafka本身。那么话不多说,马上开始今天的学习吧

📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容
📗本文收录于 kafka 专栏,有需要者,可直接订阅专栏实时获取更新
📘高质量专栏 云原生、RabbitMQ、Spring全家桶 等仍在更新,欢迎指导
📙Zookeeper Redis dubbo docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待

一、Kafka与流处理

我们先来看看比较正式的介绍:Kafka是一种流处理平台,由LinkedIn公司创建,现在是Apache下的开源项目。Kafka通过发布/订阅机制实现消息的异步传输和处理。它具有高吞吐量、低延迟、可伸缩性和可靠性等优点,使其成为了流处理和实时数据管道的首选解决方案

介绍其实是比较清晰的,如果你是第一次接触“流处理”概念,我们也可以做一点解释,流处理指的是对连续、实时产生的数据流进行实时处理、计算和分析的过程。

假设你正在玩一款在线游戏,其他玩家的动作和游戏事件会实时地传到服务器上。这些事件就形成了一条数据流。在流处理中,我们会对这条数据流进行实时处理,例如计算每个玩家的分数、监控游戏区域内的异常情况、统计玩家在线时长等等。这样,游戏管理员就可以实时地监控和管理游戏,而不需要等到游戏结束才进行操作。
类似的,流处理还可以应用在其他实时性要求比较高的场景中,例如金融交易、物联网、实时监测等。通过对数据流进行实时处理,我们可以更加精准地掌握数据变化的情况,并及时做出反应和调整,

二、Spring Boot与Kafka的整合Demo

1. 新建springboot工程

如果你没有现成的Spring boot项目,那么我们可以使用IDEA自带的Spring Initializr 来创建一个spring-boot的项目

在这里插入图片描述

此时我们可以直接选择使用Apache Kafka,另外项目还可以加个Spring Web准备让前台调用

在这里插入图片描述

2. 添加Kafka依赖

如果你不是像上述一样新建的项目,那你也可以选择在已有的Spring Boot应用程序中使用Kafka,那么你需要在pom.xml文件中添加以下依赖:

<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.8.11</version>
</dependency>

3. 配置Kafka

在application.properties文件中添加以下配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test_group

这里我们指定了Kafka服务器的地址和端口,并配置了消费者组的ID,关于消费者组的概念,其实就是某一些消费者具备相同的功能,因此会把他们设为同一个消费者组,这样他们就不会重复消费同一条消息了。更具体地原理,我们会在之后地篇章中介绍。

4. 创建Kafka生产者

在Kafka中,生产者是发送消息的应用程序或服务。在Spring Boot中,我们可以使用KafkaTemplate类来创建Kafka生产者

package com.zhanfu.kafkademo.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaService {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send("test_topic", message);
    }
}

这里我们使用@Autowired注解来自动注入KafkaTemplate,并使用send方法将消息发送到名为“test_topic”的Kafka主题中。


5. 创建Kafka消费者

在Kafka中,消费者是接收并处理订阅主题消息的应用程序或服务。在Spring Boot中,我们可以使用@KafkaListener注解来创建Kafka消费者。

package com.zhanfu.kafkademo.listener;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaLis {

    @KafkaListener(topics = "test_topic", groupId = "test_group")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

6. 应用程序入口

现在我们已经完成了Spring Boot和Kafka的整合。我们可以启动Spring Boot应用程序,然后发送消息并消费它,以测试我们的应用程序是否正确地与Kafka集成。

package com.zhanfu.kafkademo.controller;

import com.zhanfu.kafkademo.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    @Autowired
    private KafkaService kafkaService;

    @GetMapping("/send/{message}")
    public String sendMessage(@PathVariable String message) {
        kafkaService.sendMessage(message);
        return "Message sent successfully";
    }
}

在这个例子中,我们使用@Autowired注解来自动注入KafkaProducer,并通过发送消息的方法来调用sendMessage方法。最终项目整体框架如图:

在这里插入图片描述

三、启动与验证

首先自然是启动 Kafka ,怎么启动可参考 《上手第一关,手把手教你安装kafka与可视化工具kafka-eagle》,然后是启动我们的Spring Boot项目

在这里插入图片描述

然后在浏览器中输入

http://127.0.0.1:8080/send/hello

在这里插入图片描述

最后检查我们的项目日志:

在这里插入图片描述

可以看到,整个发送和接收的流程都走通了

四、KafkaTemplate 介绍

不难看出,在Springboot中,使用kafka的关键在于 KafkaTemplate, 它是 Spring 提供的 Kafka 生产者模版,用于向 Kafka 集群发送消息。并且把 Kafka 的生产者客户端封装成了一个 Spring Bean,提供更加方便易用的 API。

它有三个主要属性:

  • producerFactory:生产者工厂类,用于创建 KafkaProducer 实例。
  • defaultTopic:默认主题名称,如果在发送消息时没有指定主题名称,则使用该默认主题。
  • messageConverter:消息转换器,用于将消息对象转换为 Kafka ProducerRecord

它的主要方法:

  • send(ProducerRecord<K,V> record):向指定的 Kafka 主题发送一条消息。ProducerRecord 包含了主题名称、分区编号、Key 和 Value 等信息。
  • send(String topic, V data):向指定的 Kafka 主题发送一条消息。
  • send(String topic, K key, V data):向指定的 Kafka 主题发送一条消息,并指定消息的 Key。
  • execute(ProducerCallback<K,V> callback):使用回调方式发送消息,可以自定义消息的创建过程和错误处理过程。
  • inTransaction():启用事务,多个 send 方法调用将被包装在一个事务中,保证 Kafka 事务的原子性。

除了上述方法外,KafkaTemplate 还提供了其他方法,如 sendDefault()sendOffsetsToTransaction() 等,可以根据实际需要进行选择和使用。

需要注意的是,在使用 KafkaTemplate 发送消息时应该注意消息的序列化方式、主题和分区的选择以及错误处理等问题,以保证消息的可靠性和正确性。

当然,很多同学可能还注意到一个细节,我们在上面的Demo中,我们直接将其 @Autowired进我们的代码中,这是怎么做到的呢?换句话说,这个 KafkaTemplate 为什么自己就会被spring 容器管理的呢?其实这得益于SpringBoot中对Kafka有了很多自动配置的内容。如下:

在这里插入图片描述
在这里插入图片描述

如上图,相信对Spring Boot熟悉的同学看到 ConditionalOnClassConditionalOnMissingBean 应该就明白了。其实Spring Boot 早就贴心的为我们预留了这些自动配置,只要我们引入了 spring-kafka 包,使得项目中出现了 KafkaTemplate 类,那么它就能被自动配置并存入Spring 容器内

总结

今天我们通过一个Demo讲解了在SpringBoot中如何对接Kafka,也介绍了下关键类 KafkaTemplate ,得益于Spring Boot 的自动配置,开发者要做的配置内容其实并不多,使用也主要是依赖其提供的API,相对简单,相信大家很容易也都学会了,那么在后面的过程中,我们将继续学习其使用,并且会着重讲解 Kafka 的原理与结构

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1069718.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

TCP/IP网络协议通信函数接口

创建套接字函数 socket 【头文件】 #include <sys/types.h> #include <sys/socket.h> 【函数原型】 int socket(int domain, int type, int protocol); 【函数功能】 socket 函数创建一个通信端点&#xff0c;并返回一个引用该端点的文件描述符&#xff0c;…

C++入门-day03

引言&#xff1a;本节我们讲一下C中的引用、内联函数、Auto、范围for 一、引用 先看一下下面这段代码&#xff1a; 在这段代码中。我们命名了两个变量&#xff0c;a和_a&#xff0c;其中_a就是a的引用 所谓引用就是a的“别名”&#xff0c;我们看一下这段代码的运行结果&…

互联网云厂商大转向:在海外重燃新「战事」

2023&#xff0c;云厂出海的第七个年头&#xff0c;三朵云的海外布局都在加速&#xff0c;在“主动出海”的大背景下&#xff0c;云厂的海外战场也正在发生新的变化。 作者|思杭 编辑|皮爷 出品|产业家 中国云厂&#xff0c;正在将目光从东南亚转移至中东。 东南亚的互…

代码随想录算法训练营第四十六天 | 518. 零钱兑换 II、377. 组合总和 Ⅳ

518. 零钱兑换 II 视频讲解&#xff1a;动态规划之完全背包&#xff0c;装满背包有多少种方法&#xff1f;组合与排列有讲究&#xff01;| LeetCode&#xff1a;518.零钱兑换II_哔哩哔哩_bilibili 代码随想录 &#xff08;1&#xff09;代码 377. 组合总和 Ⅳ 视频讲解&…

【哈士奇赠书活动 - 41期】- 〖产品设计软技能:创业公司篇〗

文章目录 ⭐️ 赠书 - 《产品设计软技能&#xff1a;创业公司篇》⭐️ 内容简介⭐️ 作者简介⭐️ 编辑推荐⭐️ 赠书活动 → 获奖名单 ⭐️ 赠书 - 《产品设计软技能&#xff1a;创业公司篇》 ⭐️ 内容简介 在创业公司设计产品与在成熟公司设计产品存在明显差异。《产品设计软…

华为防火墙项目

二、知识点 1&#xff0c;会话表&#xff1a;防火墙通过首包建立会话表&#xff0c;其他非首包通过匹配会话表进行通信&#xff0c;就不用查看安全策略啦。 2&#xff0c;长连接 防火墙为各种协议设定了会话老化机制。当一条会话在老化时间内没有被任何报文匹配&#xff0c;则…

【算法|动态规划No.15】leetcode1035. 不相交的线

个人主页&#xff1a;兜里有颗棉花糖 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 兜里有颗棉花糖 原创 收录于专栏【手撕算法系列专栏】【LeetCode】 &#x1f354;本专栏旨在提高自己算法能力的同时&#xff0c;记录一下自己的学习过程&#xff0c;希望…

【教程】在RK3568上部署(C++)语义分割算法BiSeNetv1/v2

引言 在本篇教程中&#xff0c;博主将记录国庆假期前在RK3568上部署分割算法的步骤以及代码。首先说一下&#xff0c;RK3568这个开发板本身的算力大概是0.8T&#xff08;在实际开发中还会用到额外的计算卡&#xff0c;额外的计算卡后面文章再说&#xff0c;本篇文章主要记录在…

AQS的简单说明

1.概述 AQS全称AbstractQueuedSynchronizer&#xff0c;是用来实现锁或者队列同步器的公共基础部分的抽象实现&#xff0c;是整个JUC体系的基石&#xff0c;用于解决锁分配给谁的问题&#xff0c;ReentrantLock底层的实现就是AQS。 2.AQS实现原理 AQS内部有一个由volatile修…

正点原子嵌入式linux驱动开发——Linux内核顶层Makefile详解

之前的几篇学习笔记重点讲解了如何移植uboot到STM32MP157开发板上&#xff0c;从本章就开始学习如何移植Linux内核。 同uboot一样&#xff0c;在具体移植之前&#xff0c;先来学习一下Linux内核的顶层Makefile文件&#xff0c;因为顶层 Makefile控制着Linux内核的编译流程。 L…

如何在Apache和Resin环境中实现HTTP到HTTPS的自动跳转:一次全面的探讨与实践

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

提取歌曲伴奏?用对软件一键帮你搞定~

相信大家经常想获取某首歌曲的伴奏&#xff0c;但是不知从何下手&#xff0c;今天这篇教程给大家分享一个超神奇软件&#xff0c;一键提取歌曲伴奏&#xff01; 第一步&#xff1a;打开【音分轨】APP&#xff0c;进入首页点击【人声分离】 第二步&#xff1a;选择导入方式&…

多电脑之间无线访问文件夹传输文件之“电子神偷”

目录 应用场景说明网络共享文件功能开启步骤1&#xff1a;确保电脑开启网络共享功能步骤2&#xff1a;在自己电脑某个盘创建一个文件夹&#xff0c;作为共享文件夹步骤3&#xff1a;查看当前电脑的用户名和ip地址 访问网络共享文件夹&#xff0c;在电脑B访问获取电脑A的文件数据…

Windows版MySql8.0安装(亲测成功!)

下载 下载地址&#xff1a;点我下载 下载完成后将其解压到自定义目录下,我所有的软件都保存在C:\zhushanglin\WindowsSoft&#xff0c;解压完成后会看见以下目录: 配置环境变量 此电脑 右键,然后点属性&#xff0c;步骤如下: 新建MYSQL_HOME系统变量 编辑Path系统变量&a…

读论文:Real-Time Encrypted Traffic Classification via Lightweight Neural Networks

基于轻量级神经网络的实时加密流量分类 0、摘要 提出一种轻量级模型&#xff0c;设计原则“maximize the reuse of thin modules”&#xff0c;thin modules采用多头注意和一维卷积网络。由于所有数据包的一步交互和多头注意力机制的并行计算&#xff0c;所提出的模型的优势是…

RF元素定位

元素定位方式&#xff1a;id, name, link, partial_link_text, xpath, css id 【登录输入框】id session_email_or_mobile_number input text id session_email_or_mobile_numbername 【登录输入框】name session[email_or_mobile_number] input text name sessi…

react-antd 文件导入按钮增加一个加载状态

1、效果图实例: 2、部分代码 2.1 props : 2.2 handleChange、上传的文件检验 : construction中定义 construction(props) { super(props); this.state { loadingStaus: flase, loadingDisabled: flase, // 作用:按钮如果在加 载状态中&#xff0c;没…

Android多线程学习:线程

一、概念 进程&#xff1a;系统资源分配的基本单位&#xff0c;进程之间相互独立&#xff0c;不能直接访问其他进程的地址空间。 线程&#xff1a;CPU调度的基本单位&#xff0c;线程之间共享所在进程的资源&#xff0c;包括共享内存&#xff0c;公有数据&#xff0c;全局变量…

【Pod】

Pod 一、Pod基本概念二、Pod的使用方式pause容器&#xff08;pod的基础容器&#xff09;核心功能pause容器使得Pod中所有容器可以共享两种资源&#xff1a;网络和存储网络存储 三、Pod分类自主式Pod/静态pod控制器管理的Pod 四、三种容器五、镜像拉取策略&#xff08;image Pul…

云计算安全和云原生安全的关系

云计算安全(Cloud Computing Security)指的是在云环境中保护数据、应用程序和基础设施的安全性。它包括保护云服务提供商的基础设施和平台&#xff0c;以及云服务用户的数据和应用程序。 云原生安全(Cloud-Native Security)则是指在云原生环境中保护应用程序和服务的安全性。云…