5、Kafka集成 SpringBoot

news2025/1/6 20:05:02

SpringBoot 是一个在 JavaEE 开发中非常常用的组件。可以用于 Kafka 的生产者,也可以
用于 SpringBoot 的消费者。
在这里插入图片描述
1)在 IDEA 中安装 lombok 插件
在 Plugins 下搜索 lombok 然后在线安装即可,安装后注意重启
2)SpringBoot 环境准备
(1)创建一个 Spring Initializr
注意:有时候 SpringBoot 官方脚手架不稳定,我们切换国内地址 https://start.aliyun.com
(2)项目名称 springboot
(3)添加项目依赖
(4)检查自动生成的配置文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <parent>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-parent</artifactId>
 <version>2.6.1</version>
 <relativePath/> <!-- lookup parent from repository -->
 </parent>
 <groupId>com.atguigu</groupId>
 <artifactId>springboot</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <name>springboot</name>
 <description>Demo project for Spring Boot</description>
 <properties>
 <java.version>1.8</java.version>
 </properties>
 <dependencies>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
<dependency>
 <groupId>org.springframework.kafka</groupId>
 <artifactId>spring-kafka</artifactId>
 </dependency>
 <dependency>
 <groupId>org.projectlombok</groupId>
 <artifactId>lombok</artifactId>
 <optional>true</optional>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-test</artifactId>
 <scope>test</scope>
 </dependency>
 <dependency>
 <groupId>org.springframework.kafka</groupId>
 <artifactId>spring-kafka-test</artifactId>
 <scope>test</scope>
 </dependency>
 </dependencies>
 <build>
 <plugins>
 <plugin>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-maven-plugin</artifactId>
 <configuration>
 <excludes>
 <exclude>
 <groupId>org.projectlombok</groupId>
 <artifactId>lombok</artifactId>
 </exclude>
 </excludes>
 </configuration>
 </plugin>
 </plugins>
 </build>
</project>

3.1 SpringBoot 生产者
(1)修改 SpringBoot 核心配置文件 application.propeties, 添加生产者相关信息

# 应用名称
spring.application.name=atguigu_springboot_kafka
# 指定 kafka 的地址
spring.kafka.bootstrapservers=hadoop102:9092,hadoop103:9092,hadoop104:9092
#指定 key 和 value 的序列化器
spring.kafka.producer.keyserializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.valueserializer=org.apache.kafka.common.serialization.StringSerializer

(2)创建 controller 从浏览器接收数据, 并写入指定的 topic

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
 
 // Kafka 模板用来向 kafka 发送数据
 @Autowired
 KafkaTemplate<String, String> kafka;
 
 @RequestMapping("/prince")
 public String data(String msg) {
 kafka.send("first", msg);
 return "ok";
 }
}

3.2 SpringBoot 消费者
(1)修改 SpringBoot 核心配置文件 application.propeties

# =========消费者配置开始=========
# 指定 kafka 的地址
spring.kafka.bootstrapservers=hadoop102:9092,hadoop103:9092,hadoop104:9092
# 指定 key 和 value 的反序列化器
spring.kafka.consumer.keydeserializer=org.apache.kafka.common.serialization.StringDeserial
izer
spring.kafka.consumer.valuedeserializer=org.apache.kafka.common.serialization.StringDeserial
izer
#指定消费者组的 group_id
spring.kafka.consumer.group-id=prince
# =========消费者配置结束=========

(2)创建类消费 Kafka 中指定 topic 的数据

import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
@Configuration
public class KafkaConsumer {
 
 // 指定要监听的 topic
 @KafkaListener(topics = "first")
 public void consumeTopic(String msg) { // 参数: 收到的 value
 System.out.println("收到的信息: " + msg);
 }
}

(3)向 first 主题发送数据

[hadoop102 kafka]$ bin/kafka-console-producer.sh --
bootstrap-server hadoop102:9092 --topic first
>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1116468.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue 3.0中Treeshaking特性是什么?

一、是什么 Tree shaking 是一种通过清除多余代码方式来优化项目打包体积的技术&#xff0c;专业术语叫 Dead code elimination 简单来讲&#xff0c;就是在保持代码运行结果不变的前提下&#xff0c;去除无用的代码 如果把代码打包比作制作蛋糕&#xff0c;传统的方式是把鸡…

VTK8.0.0编译+QT5.9.2+VS2017

背景 VTK网上资料较多并且使用较多的版本可能是VTK8.2.0&#xff0c;但是由于之前先配置了QT 5.9.2 msvc2017 PCL1.8.1 VTK8.0.0环境&#xff0c;听说有人PCL1.8.1配置VTK8.2.0实测版本不兼容&#xff0c;需修改源码调试&#xff0c;比较麻烦&#xff0c;所以之前就使用的VT…

Unity 中3D数学基础-向量

本文主要全面讲解向量的数学运算已经对应的实际应用意义! 1.向量的认识 向量(矢量) 有1、2、3维! 向量既可以表示大小也可以表示方向,也可以表示一个空间坐标点!因此他虽然是一个数学数字表示,但是实际空间意义有这三种! 坐标点A(x,y,z) 表示与原点连线构成的方向…

前端常见的数据类型有哪些?

在前端开发中,常见的数据类型包括: 1:字符串(String):表示文本数据,用引号(单引号或双引号)括起来,例如:“Hello, World!”。 创建字符串:let str = ‘Hello, World!’;获取字符串长度:let length = str.length;字符串拼接:let newStr = str1 + str2;2:数字(N…

SystemVerilog(2)——数据类型

一、概述 和Verilog相比&#xff0c;SV提供了很多改进的数据结构。它们具有如下的优点&#xff1a; 双状态数据类型&#xff1a;更好的性能&#xff0c;更低的内存消耗队列、动态和关联数组&#xff1a;减少内存消耗&#xff0c;自带搜索和分类功能类和结构&#xff1a;支持抽…

微信小程序之首页-后台交互及WXS的使用

目录 前言 一. 前后台数据交互及封装request 1.准备后台 1.1 配置数据源 1.2 部分后台获取数据方法编写 2.准备前端 2.1封装Request 2.2 前端JS方法编写 2.3 前端页面展示index.wxml 二.WXS的使用 1.简介 2.WXS优化OA系统 2.1 使用及定义 2.2 导入要使用的项目 2.…

【代码随想录第45天】动态规划5

代码随想录第45天| 动态规划5 1049. 最后一块石头的重量 II494. 目标和474.一和零 1049. 最后一块石头的重量 II LeetCode题目&#xff1a;1049. 最后一块石头的重量 II 代码随想录&#xff1a;1049. 最后一块石头的重量 II 思路就是尽量把石头分成重量总和相等的两堆&#xff…

计算机操作系统-第十二天

目录 进程控制的基本概念 什么是进程控制 如何实现进程控制 如何实现原语的”原子性“ 与进程控制相关的原语 进程创建中的原语 进程终止中的原语 进程的阻塞和唤醒中的原语 进程的切换中的原语 知识滚雪球-程序是如何运行的&#xff1f;&#xff1a; 本节思维导图 进…

数据可视化与GraphQL:利用Apollo创建仪表盘

前言 「作者主页」&#xff1a;雪碧有白泡泡 「个人网站」&#xff1a;雪碧的个人网站 「推荐专栏」&#xff1a; ★java一站式服务 ★ ★ React从入门到精通★ ★前端炫酷代码分享 ★ ★ 从0到英雄&#xff0c;vue成神之路★ ★ uniapp-从构建到提升★ ★ 从0到英雄&#xff…

第二证券:风电概念强势拉升,威力传动“20cm”涨停,双一科技等大涨

风电概念20日盘中强势拉升&#xff0c;到发稿&#xff0c;威力传动“20cm”涨停&#xff0c;双一科技涨超17%&#xff0c;顺发恒业亦涨停&#xff0c;金雷股份、大金重工涨约7%&#xff0c;新强联、海力风电涨超5%。 音讯面上&#xff0c;9月以来江苏、广东海风项目加快推动&a…

前端数据可视化之【title、legend、tooltip、toolbox 】配置项

目录 &#x1f31f;Echarts配置项&#x1f31f;Echarts配置项之 title组件&#x1f31f;Echarts配置项之 legend组件&#x1f31f;Echarts配置项之 tooltip组件&#x1f31f;Echarts配置项之 toolbox组件&#x1f31f;写在最后 &#x1f31f;Echarts配置项 ECharts开源来自百度…

如何解决香港服务器使用的常见问题

​  站长们在选择香港服务器租用时会考虑到它的各种性能以及稳定性&#xff0c;这是必须的。但是使用过程中还有些问题也不容忽视&#xff0c;比如&#xff1a;带宽资源是否短缺&#xff0c;是否存在安全漏洞&#xff0c;连接是否正常等这些问题也要考虑到。 香港服务器使用中…

百度地图API:JavaScript开源库几何运算判断点是否在多边形内(电子围栏)

百度地图JavaScript开源库&#xff0c;是一套基于百度地图API二次开发的开源的代码库。目前提供多个lib库&#xff0c;帮助开发者快速实现在地图上添加Marker、自定义信息窗口、标注相关开发、区域限制设置、几何运算、实时交通、检索与公交驾车查询、鼠标绘制工具等功能。 判…

搭建一个windows的DevOps环境记录

边搭建边记录&#xff0c;整个DevOps环境的搭建可能会很久。。。 一、安装Jenkins&#xff1a; 参考&#xff1a;Jenkins基础篇--windows安装Jenkins-CSDN博客 注意上面选择JDK的路径&#xff0c;选择到安装目录&#xff0c;该目录并不一定要在path中配置了&#xff08;就是…

Qt基础 QScatterSeries

QScatterSeries类是Qt Charts模块的一部分&#xff0c;用于表示散点图。 QScatterSeries* series new QScatterSeries(); // 创建离散点数据series->setName("圆点样式");series->setMarkerShape(QScatterSeries::MarkerShapeCircle); //设置位圆形series-&…

SpringBoot + MyBatis 在 jar 中可以启动但是 Idea中无法启动的原因

现象 在idea中启动始终卡住&#xff0c;查看线程堆栈发现一直在mybatis的处理过程中&#xff0c;细究了一下堆栈发现mybatis有使用远程方式加载类的情况&#xff0c;并且此时cpu会飙升&#xff0c; 在命令行中使用java -jar 的形式可以正常启动&#xff0c;但是在idea中启动始…

论文阅读 | RAFT: Recurrent All-Pairs Field Transforms for Optical Flow

RAFT: Recurrent All-Pairs Field Transforms for Optical Flow ECCV2020光流任务best paper 论文地址&#xff1a;【here】 代码地址&#xff1a;【here】 介绍 光流是对两张相邻图像中的逐像素运动的一种估计。目前碰到的一些困难包括&#xff1a;物体的快速运动&#xff…

UE4 UltraDynamicSky 天气与水体交互

最上面的Lerp的A通道为之前的水面效果&#xff0c;B是做的冰面效果 用Dynamic_Landscape_Weather_Effects的BaseColor的R通道四舍五入作为Lerp的Alpha值 使用一张贴图&#xff0c;乘以RadialGradientExponential对材质边缘做弱化&#xff0c;RadialGradientExponential的Raid…

4.7 IP多播

思维导图&#xff1a; **4.7.1 IP多播的基本概念** --- **1. 定义和背景** - IP多播&#xff1a;从一个源点发送信息至多个终点的技术。 - 1988年&#xff1a;Steve Deering首次提及IP多播。 - 1992年&#xff1a;IETF进行了首次IP多播试验&#xff0c;当时有20个网点参与。 …

Python算法:八大排序算法以及速度比较

⭐️⭐️⭐️⭐️⭐️欢迎来到我的博客⭐️⭐️⭐️⭐️⭐️ &#x1f434;作者&#xff1a;秋无之地 &#x1f434;简介&#xff1a;CSDN爬虫、后端、大数据领域创作者。目前从事python爬虫、后端和大数据等相关工作&#xff0c;主要擅长领域有&#xff1a;爬虫、后端、大数据…