使用SpringBoot对接Kafka

news2025/1/17 21:52:35
Kafka是什么,以及如何使用SpringBoot对接Kafka

一、Kafka与流处理

我们先来看看比较正式的介绍:Kafka是一种流处理平台,由LinkedIn公司创建,现在是Apache下的开源项目。Kafka通过发布/订阅机制实现消息的异步传输和处理。它具有高吞吐量、低延迟、可伸缩性和可靠性等优点,使其成为了流处理和实时数据管道的首选解决方案

介绍其实是比较清晰的,如果你是第一次接触“流处理”概念,我们也可以做一点解释,流处理指的是对连续、实时产生的数据流进行实时处理、计算和分析的过程。

假设你正在玩一款在线游戏,其他玩家的动作和游戏事件会实时地传到服务器上。这些事件就形成了一条数据流。在流处理中,我们会对这条数据流进行实时处理,例如计算每个玩家的分数、监控游戏区域内的异常情况、统计玩家在线时长等等。这样,游戏管理员就可以实时地监控和管理游戏,而不需要等到游戏结束才进行操作。
类似的,流处理还可以应用在其他实时性要求比较高的场景中,例如金融交易、物联网、实时监测等。通过对数据流进行实时处理,我们可以更加精准地掌握数据变化的情况,并及时做出反应和调整,

二、Spring Boot与Kafka的整合Demo

1. 新建springboot工程

如果你没有现成的Spring boot项目,那么我们可以使用IDEA自带的Spring Initializr 来创建一个spring-boot的项目

此时我们可以直接选择使用Apache Kafka,另外项目还可以加个Spring Web准备让前台调用

2. 添加Kafka依赖

如果你不是像上述一样新建的项目,那你也可以选择在已有的Spring Boot应用程序中使用Kafka,那么你需要在pom.xml文件中添加以下依赖:

<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.8.11</version>
</dependency>

3. 配置Kafka

在application.properties文件中添加以下配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test_group

这里我们指定了Kafka服务器的地址和端口,并配置了消费者组的ID,关于消费者组的概念,其实就是某一些消费者具备相同的功能,因此会把他们设为同一个消费者组,这样他们就不会重复消费同一条消息了。更具体地原理,我们会在之后地篇章中介绍。

4. 创建Kafka生产者

在Kafka中,生产者是发送消息的应用程序或服务。在Spring Boot中,我们可以使用KafkaTemplate类来创建Kafka生产者

package com.zhanfu.kafkademo.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaService {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send("test_topic", message);
    }
}

这里我们使用@Autowired注解来自动注入KafkaTemplate,并使用send方法将消息发送到名为“test_topic”的Kafka主题中。

5. 创建Kafka消费者

在Kafka中,消费者是接收并处理订阅主题消息的应用程序或服务。在Spring Boot中,我们可以使用@KafkaListener注解来创建Kafka消费者。

package com.zhanfu.kafkademo.listener;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaLis {

    @KafkaListener(topics = "test_topic", groupId = "test_group")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

6. 应用程序入口

现在我们已经完成了Spring Boot和Kafka的整合。我们可以启动Spring Boot应用程序,然后发送消息并消费它,以测试我们的应用程序是否正确地与Kafka集成。

package com.zhanfu.kafkademo.controller;

import com.zhanfu.kafkademo.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    @Autowired
    private KafkaService kafkaService;

    @GetMapping("/send/{message}")
    public String sendMessage(@PathVariable String message) {
        kafkaService.sendMessage(message);
        return "Message sent successfully";
    }
}

在这个例子中,我们使用@Autowired注解来自动注入KafkaProducer,并通过发送消息的方法来调用sendMessage方法。最终项目整体框架如图:

三、启动与验证

首先自然是启动 Kafka ,然后是启动我们的Spring Boot项目

然后在浏览器中输入

http://127.0.0.1:8080/send/hello

最后检查我们的项目日志:

可以看到,整个发送和接收的流程都走通了

四、KafkaTemplate 介绍

不难看出,在Springboot中,使用kafka的关键在于 KafkaTemplate, 它是 Spring 提供的 Kafka 生产者模版,用于向 Kafka 集群发送消息。并且把 Kafka 的生产者客户端封装成了一个 Spring Bean,提供更加方便易用的 API。

它有三个主要属性:

        producerFactory:生产者工厂类,用于创建 KafkaProducer 实例。
        defaultTopic:默认主题名称,如果在发送消息时没有指定主题名称,则使用该默认主题。
        messageConverter:消息转换器,用于将消息对象转换为 Kafka ProducerRecord

它的主要方法:

        send(ProducerRecord<K,V> record):向指定的 Kafka 主题发送一条消息。ProducerRecord 包含了主题名称、分区编号、Key 和 Value 等信息。
        send(String topic, V data):向指定的 Kafka 主题发送一条消息。
        send(String topic, K key, V data):向指定的 Kafka 主题发送一条消息,并指定消息的 Key。
        execute(ProducerCallback<K,V> callback):使用回调方式发送消息,可以自定义消息的创建过程和错误处理过程。
        inTransaction():启用事务,多个 send 方法调用将被包装在一个事务中,保证 Kafka 事务的原子性。

除了上述方法外,KafkaTemplate 还提供了其他方法,如 sendDefault()sendOffsetsToTransaction() 等,可以根据实际需要进行选择和使用。

需要注意的是,在使用 KafkaTemplate 发送消息时应该注意消息的序列化方式、主题和分区的选择以及错误处理等问题,以保证消息的可靠性和正确性。

当然,很多同学可能还注意到一个细节,我们在上面的Demo中,我们直接将其 @Autowired进我们的代码中,这是怎么做到的呢?换句话说,这个 KafkaTemplate 为什么自己就会被spring 容器管理的呢?其实这得益于SpringBoot中对Kafka有了很多自动配置的内容。如下:

如上图,相信对Spring Boot熟悉的同学看到 ConditionalOnClass ConditionalOnMissingBean 应该就明白了。其实Spring Boot 早就贴心的为我们预留了这些自动配置,只要我们引入了 spring-kafka 包,使得项目中出现了 KafkaTemplate 类,那么它就能被自动配置并存入Spring 容器内

总结
        今天我们通过一个Demo讲解了在SpringBoot中如何对接Kafka,也介绍了下关键类 KafkaTemplate ,得益于Spring Boot 的自动配置,开发者要做的配置内容其实并不多,使用也主要是依赖其提供的API,相对简单,相信大家很容易也都学会了,那么在后面的过程中,我们将继续学习其使用,并且会着重讲解 Kafka 的原理与结构

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1832714.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Shell脚本(.sh文件)如何执行完毕之后不自动关闭?

Shell脚本异常傲娇&#xff0c;出错后、执行完根本不给你机会让你查看报错信息、输出信息&#xff0c;直接闪退。 废话不多说&#xff0c;调教方法如下&#xff0c;直接在Shell脚本末尾加上如下代码&#xff1a; 1、实现方式一 1.1 使用read命令达到类似bat中的pause命令效果…

构建全面框架 | 简化基因组+线粒体遗传进化联合分析

近日&#xff0c;凌恩生物客户河北农业大学、浙江大学及英国格林威治大学的研究团队合作&#xff0c;在《Insect Science》杂志上发表了题为“A comprehensive framework for the delimitation of species within the Bemisia tabaci cryptic complex, a global pest-species g…

LaTeX 的使用

文章目录 TeX 编辑器文档类型中文编译文档结构preamble 导言区&#xff08;不能放正文内容&#xff09;document body 正文区 正文内容目录段落列表无序列表有序列表 图片表格交叉引用段落图片表格 转义符 数学公式数学符号行内公式行间公式有公式计数器无公式计数器 公式包含文…

OpenGL3.3_C++_Windows(11)

git submodule项目子模块 Git Submodule &#xff08;子模块的代码并不直接存储在父仓库中&#xff0c;而是通过一个指针来维护&#xff09;克隆含有子模块的仓库时&#xff0c;使用git管理Git Clone &#xff08;复制一份完整的Git仓库到本地&#xff09;若仓库包含子模块&am…

MyBatis基础教程

文章目录 一、MyBatis基本使用1.1简介1.2搭建MyBatis环境1.2.1安装MyBatis1.2.2创建MyBatis核心配置文件1.2.3创建mapper接口1.2.4创建MyBatis映射文件1.2.5实现增加功能 1.3整合log4j1.4修改与删除功能1.5查询功能1.5.1查询单个实体类对象1.5.2查询所有用户信息 二、核心配置文…

C# Secs源码 HsmsSecs测试

包含客户端和服务端 启动客户端和服务端即可互相模拟sece 通讯 也可使用secs仿真器进行测试 开启后进行相关操作&#xff0c;创建客户端连接仿真器进行操作 仿真器显示日志 相关文件&#xff0c;源码 4.9 私信即可或者看我博客描述那个地址 我是狗子&#xff0c;希望你幸…

Vue47-修改默认配置webpack.config.js文件

main.js是脚手架项目的入口文件&#xff0c;系统运行时&#xff0c;默认去找src下的main.js文件。这是webpack通过配置文件&#xff1a;webpack.config.js配置的。 脚手架把所有重要的配置文件都隐藏了&#xff0c;方式被开发者修改。 一、查看被隐藏的webpack配置 1-1、webpa…

Matlab进阶绘图第60期—带伪彩图的曲面图

带伪彩图的曲面图是曲面图与伪彩图的组合。 其中&#xff0c;伪彩图与曲面图的颜色用于表示同一个特征。 由于伪彩图无遮挡但不直观&#xff0c;曲面图直观但有遮挡&#xff0c;而将二者组合&#xff0c;可以实现优势互补。 本期就来分享一下带伪彩图的曲面图的绘制方法&…

韩兴国/姜勇团队在《Trends in Plant Science》发表植物根系氮素再分配的观点文章!

氮素是陆地生态系统中的关键限制性营养元素&#xff0c;通过生物固氮和土壤氮供应通常远低高等植物的氮需求。当土壤氮素供应无法充分满足植物茎叶生长需求时&#xff0c;植物会通过自身营养器官&#xff08;如根或根茎&#xff09;再分配来实现氮的内部循环和再利用。尽管植物…

App端接口用例设计方法和测试方法

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 &#x1f345; 点击文末小卡片 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 前言 接口测试作为测试的重要一环&#xff0c;重点关注的是数据层面的输入输出&#xff0c;今天…

API接口设计的艺术:如何提升用户体验和系统性能

在数字时代&#xff0c;API接口的设计对于用户体验和系统性能有着至关重要的影响。良好的设计可以显著提升应用程序的响应速度、可靠性和易用性。以下是几个关键点&#xff0c;帮助改善API接口的设计&#xff1a; 1. 理解并定义清晰的要求 用户研究&#xff1a;与最终用户进行…

CesiumJS【Basic】- #006 浏览器控制台查看位置角度

文章目录 浏览器控制台查看位置角度1 目标 浏览器控制台查看位置角度 1 目标 浏览器控制台查看位置角度

详情资料SR560(斯坦福)SR570 低噪声前置放大器

SR560 低噪声前置放大器 SR560 是一款高性能、低噪声前置放大器&#xff0c;非常适合各种应用&#xff0c;包括低温测量、光学检测和音频工程。 输入 SR560 有一个差分前端&#xff0c;输入噪声为 4 nV/√Hz&#xff0c;输入阻抗为 100 MΩ。完整的噪声系数轮廓如下图所示。…

【笔记】复制Edge的网址粘贴后自动变成中文标题超链接

问题 1、从edge复制的网址粘贴直接显示网页内容名称而不是网址url。 2、复制任何网址粘贴到CSDN里面粘贴时直接转换成标题超链接&#xff08;很讨厌的功能习惯&#xff09;。 而如上两种问题不是互相影响的&#xff0c;就算设置了Edge的粘贴方式&#xff0c;复制到CSDN的文章…

【机器学习300问】120、该怎么用RNN来构建语言模型?

一、基本概念补充 在构建语言模型之前补充几个自然语言处理&#xff08;NLP&#xff09;基本概念。 &#xff08;1&#xff09;语料库&#xff08;Corpus&#xff09; ① 语料库的定义 在自然语言处理&#xff08;NLP&#xff09;领域&#xff0c;语料库是一个经过组织和加工…

自定义starter并发布maven私服

一、搭建nexus私服 nexus就是maven的私有服务器&#xff0c;这个搭建教程可以在网络上找到很多&#xff0c;这里就不赘述了。搭建完成之后再进行下一步 二、本地maven的setting配置文件中配置nexus的用户名和密码 <servers><server><id>nexus-releases<…

如何拥有自己的微信小程序

如何拥有自己的微信小程序 ~~话先放在这里~~ 写在前面申请一个属于自己的小程序先去[微信开放平台](https://open.weixin.qq.com/home)申请一个你的小程序扫码申请新小程序小程序该记好的个人信息 安装微信开发者工具下载工具关联你的小程序请求域名配置发布小程序 BUY一个自己…

抛光粉尘可爆性检测 打磨粉尘喷砂粉尘爆炸下限测试

抛光粉尘可爆性检测 抛光粉尘的可爆性检测是一种安全性能测试&#xff0c;用于确定加工过程中产生的粉尘在特定条件下是否会爆炸&#xff0c;从而对生产安全构成威胁。如果粉尘具有可爆性&#xff0c;那么在生产环境中就需要采取相应的防爆措施。粉尘爆炸的条件通常包括粉尘本身…

大模型应用开发技术:Multi-Agent框架流程、源码及案例实战(一)

LlaMA 3 系列博客 基于 LlaMA 3 LangGraph 在windows本地部署大模型 &#xff08;一&#xff09; 基于 LlaMA 3 LangGraph 在windows本地部署大模型 &#xff08;二&#xff09; 基于 LlaMA 3 LangGraph 在windows本地部署大模型 &#xff08;三&#xff09; 基于 LlaMA…

YOLOv10涨点改进轻量化双卷积DualConv,完成涨点且计算量和参数量显著下降

本文独家改进:双卷积由组卷积和异构卷积组成,执行3x3 和 1x1 卷积运算Q代替其他卷积核仅执行 1x1 卷积。 DualIConv 显着降低了深度神经网络的计算成本和参数数量,同时在某些情况下令人惊讶地实现了比原始模型略高的精度。 我们使用 DualConv 将轻量级 MobileNetV2 的参数数量…