Springboot整合【Kafka】

news2024/9/20 20:44:02

1.添加依赖

在pom.xml文件中添加以下依赖:

<!--  进行统一的版本管理-->
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.3.3</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>
<!--  JDK版本-->
 <packaging>pom</packaging>
 <properties>
    <java.version>17</java.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <!-- kafka -->
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>

2.添加配置类

添加链接kafka的一些配置信息:

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public DefaultKafkaProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    // Kafka消费者配置
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

并在application.yml文件中配置项目启动端口,若不配置,默认为8080:

spring:
  application:
    name: ProviderTest
server:
  port: 7749

3.消息生产者

模拟生产者发送消息到topic中心的方法:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;

import java.util.concurrent.CompletableFuture;

@Component
public class KafkaProducerService {

    @Autowired
    private  KafkaTemplate<String, Object> kafkaTemplate;

    public void sendMessage(String topic, Object object) {
        CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object)
                .thenApply(result -> {
                    // 在这里可以添加发送成功后的处理逻辑
                    System.out.println("消息发送成功: " + result);
                    return result;
                })
                .exceptionally(exception -> {
                    // 处理发送失败的情况
                    System.err.println("消息发送失败: " + exception.getMessage());
                    return null; // 或者根据业务需求返回适当的默认值
                });

    }
}

想要发送不同的消息,在这里配置一个controller类,通过访问url地址的方式发送不同的消息,需要如下配置:

@Controller
@RequestMapping("/kafka")
public class KafkaController {

    @Autowired
    KafkaProducerService kafkaProducerService;

    @GetMapping("/sendMessage/{topic}/{object}")
    public void sendMessage(@PathVariable("topic") String topic, @PathVariable("object") String object)
    {
        kafkaProducerService.sendMessage(topic, object);
    }
}

4.消息消费者

模拟消息消费者消费消息的方法:

@Component
public class KafkaConsumerService {

    /**
     *  @KafkaListener监听用户组为test-group的topic02主题的消息
     * @param message
     */
    @KafkaListener(groupId = "test-group", topics = "topic02")
    public void listen(String message) {
        System.out.println("获取到的消息为:" + message);
    }
}

5.测试

5.1先启动zookeeper客户端

到zookeeper安装目录下:双击zkServer.cmd文件:

看到如下服务端启动成功:

5.2启动kafka

到kafka的安装目录下进入cmd窗口:

输入命令启动kafka:

.\bin\windows\kafka-server-start.bat .\config\server.properties

若出现如下表示启动成功:

5.3在浏览器测试

启动kafka后端项目,在浏览器输入url访问进行发送消息到topic中:

localhost:7749/kafka/sendMessage/topic02/测试

其中7749是后端定义的端口号,“topic02”是在消息消费者端设置的监听的主题名称,“测试”为发送的具体内容并且值可以改为自己想要发送的任何数据:

可以看到控制台打印出消费者消费到的数据:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2108592.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【全网最全】2024年数学建模国赛A题30页完整建模文档+成品论文+代码+可视化图表等(后续会更新)

您的点赞收藏是我继续更新的最大动力&#xff01; 一定要点击如下的卡片&#xff0c;那是获取资料的入口&#xff01; 2024年高教社杯数学建模国赛A题“板凳龙”闹元宵&#xff1a;建立舞龙队的运动轨迹和速度的空间几何、运动学和优化模型 本文文章较长&#xff0c;建议先看…

ardupilot开发 --- MQTT 篇

原图&#xff1a;ardupilot-onboardComputer-4Glink-console.drawio 白嫖党请点赞、收藏、关注 你说在一起要算命 前言参考文献 前言 为什么在ardupilot开发过程中要用到MQTT &#xff1f; 客户要求向他们的指挥中心平台推送视频流和飞控数据&#xff0c;即要将图数传数据推送给…

代码随想录:96. 不同的二叉搜索树

96. 不同的二叉搜索树 class Solution { public:int numTrees(int n) {int dp[30]{0};//由i个结点组成的二叉搜索树有多少种dp[0]1; for(int i1;i<n;i)for(int j0;j<i;j)//j表示根节点左子树有j个结点dp[i]dp[j]*dp[i-j-1];//对根节点左右子树结点数量遍历//数量有左子树…

【计算机网络】TCP连接如何确保传输的可靠性

一、确保可靠传输的机制 TCP&#xff08;传输控制协议&#xff09;是一种面向连接的、提供可靠交付的、面向字节流的、支持全双工的传输层通信协议 1、序列号 seq TCP头部中的序号&#xff0c;占32位&#xff08;4字节&#xff09;&#xff1b; 发送方给报文段分配一个序列号&a…

CSS中 特殊类型的选择器 伪元素如何使用

一、什么是伪元素 在 CSS 中&#xff0c;伪元素是一种特殊类型的选择器&#xff0c;它允许你为元素的特定部分添加样式&#xff0c;而这些部分在 HTML 文档中并不实际存在。伪元素通常用于创建装饰性效果&#xff0c;如添加边框、背景、阴影等&#xff0c;而不需要额外的 HTML…

PHPJWT的使用

今天得空整理整理JWT的代码 首先&#xff0c;我们得知道什么是JWT&#xff1f; JWT&#xff08;JSON Web Token&#xff09;是一种开放标准&#xff08;RFC7519&#xff09;&#xff0c;用于在网络应用环境中安全地传输声明信息。它是一种紧凑的、URL安全的令牌格式&#xff0…

(一)使用Visual Studio创建ASP.NET Core WebAPI项目

1.创建webAPI项目 选择ASP.NET Core Web API项目模版&#xff08;基于.Core框架可以支持多种系统环境&#xff0c;所以我们选择.Core框架&#xff09;&#xff0c;点下一步。 2.项目名称 项目名称设置为&#xff1a;CoreWebAPI&#xff0c;点下一步 3.选择框架 选择.NET6.0框…

分类预测|基于黑翅鸢优化轻量级梯度提升机算法数据预测Matlab程序BKA-LightGBM多特征输入多类别输出 含对比

分类预测|基于黑翅鸢优化轻量级梯度提升机算法数据预测Matlab程序BKA-LightGBM多特征输入多类别输出 含对比 文章目录 一、基本原理BKA&#xff08;Black Kite Algorithm&#xff09;的原理LightGBM分类预测模型的原理BKA与LightGBM的模型流程总结 二、实验结果三、核心代码四、…

IP学习——twoday

双层Vlan标签 路由器常用命令&#xff1a; 查看当前端口&#xff0c;路由等的信息和配置&#xff1a;display this 查看当前路由器的所有信息&#xff1a; display current-configuration 查看当前路由器的指定信息&#xff1a; display current-configuration | include ip a…

HTML第一课 语法规范与常用标签

目录 ◆ HTML 语法规范 ◆ HTML 常用标签 4.2 标题标签 4.3 段落和换行标签 4.4文本格式化标签 4.5<div>和<span>标签 4.6图像标签和路径 4.7超链接标签 1.外部链接 2.内部链接 3.空链接 4.下载链接 5.锚点链接 ◆ HTML 中的注释和特殊字符​编辑 ◆ HTML 语…

Redis中String类型的基本命令

文章目录 一、String字符串简介二、常见命令setgetmgetmsetsetnxincrincrbydecrdecrbyincrbyfloatappendgetrangesetrangestrlen 三、命令小结四、字符串内部编码五、String典型使用场景1. 缓存(Cache)功能2. 计数功能3. 共享会话&#xff08;Session&#xff09;4. 手机验证码…

软件测试学习笔记丨Pytest+Allure测试计算器

本文转自测试人社区&#xff0c;原文链接&#xff1a;https://ceshiren.com/t/topic/31954 项目要求 3.1 项目简介 计算器是近代人发明的可以进行数字运算的机器。 计算器通过对加法、减法、乘法、除法等功能的运算&#xff0c;将正确的结果展示在屏幕上。 可帮助人们更方便的…

FLTRNN:基于大型语言模型的机器人复杂长时任务规划

目录 一、引言二、FLTRNN框架2.1 任务分解2.2 基于语言的递归神经网络&#xff08;Language-Based RNNs&#xff09;长期记忆&#xff08;Long-Term Memory, Ct&#xff09;&#xff1a;短期记忆&#xff08;Short-Term Memory, Ht&#xff09;&#xff1a; 2.3 增强推理能力的…

GAMES104:12 游戏引擎中的粒子和声效系统-学习笔记

文章目录 一&#xff0c;粒子基础Particle System二&#xff0c;粒子渲染三&#xff0c;GPU粒子及生命周期控制四&#xff0c;粒子应用五&#xff0c;声音基础5.1 Sound System5.2 Digital Sound5.3 Audio Rendering QA 一&#xff0c;粒子基础Particle System 网游里你的付费…

[数据结构]红黑树之插入操作(RBTree)

这里只着重介绍插入操作的实现&#xff1a;) 一、红黑树的概念和性质 红黑树&#xff08;Red Black Tree&#xff09;是一种自平衡的二叉搜索树。红黑树最初在1972年由Rudolf Bayer发明&#xff0c;当时被称为平衡二叉B树&#xff08;symmetric binary B-trees&#xff09;。随…

2024 年高教社杯全国大学生数学建模竞赛B题解题思路(第一版)

原文链接&#xff1a;https://www.cnblogs.com/qimoxuan/articles/18399372 赛题&#xff1a; 问题 1&#xff1a;抽样检测方案设计 分析&#xff1a; 抽样检测方案需要在保证决策准确性的同时&#xff0c;尽量减少检测成本。需要考虑抽样误差对决策的影响&#xff0c;以及如…

OCR经典神经网络(一)文本识别算法CRNN算法原理及其在icdar15数据集上的应用

OCR经典神经网络(一)文本识别算法CRNN算法原理及其在icdar15数据集上的应用 文本识别是OCR&#xff08;Optical Character Recognition&#xff09;的一个子任务&#xff0c;其任务为&#xff1a;识别一个固定区域的的文本内容。 在OCR的两阶段方法里&#xff0c;文本识别模型接…

若依框架登录鉴权详解(动态路由)

若依框架登录鉴权&#xff1a;1.获取token&#xff08;过期在响应拦截器中实现&#xff09;,2.基于RBAC模型获取用户、角色和权限信息&#xff08;在路由前置守卫&#xff09;&#xff0c;3.根据用户权限动态生成&#xff08;从字符串->组件&#xff0c;根据permission添加动…

linux搭建深度学习平台

linux搭建深度学习平台&#xff08;Ubuntu&#xff09; /home/guangyao/anaconda3 我服务器的anaconda地址 ~/anaconda3 1 首先就是打开浏览器&#xff0c;我实验室的是火狐&#xff0c;搜索anaconda下载&#xff0c;找到下载目录&#xff0c;cd进去&#xff0c; 2安装 bas…

【佳学基因检测】在bagisto中,grouped products(同组产品)和bundled products(打包产品)有什么不同?

【佳学基因检测】在bagisto中&#xff0c;grouped products&#xff08;同组产品&#xff09;和bundled products&#xff08;打包产品&#xff09;有什么不同&#xff1f; 在Bagisto电商平台中&#xff0c;**grouped products&#xff08;同组产品&#xff09;和bundled prod…