SpringBoot3集成Kafka

news2025/1/16 1:00:04

标签:Kafka3.Kafka-eagle3;

一、简介

Kafka是一个开源的分布式事件流平台,常被用于高性能数据管道、流分析、数据集成和关键任务应用,基于Zookeeper协调的处理平台,也是一种消息系统,具有更好的吞吐量、内置分区、复制和容错,这使得它成为大规模消息处理应用程序的一个很好的解决方案;

二、环境搭建

1、Kafka部署

1、下载安装包:kafka_2.13-3.5.0.tgz

2、配置环境变量

open -e ~/.bash_profile

export KAFKA_HOME=/本地路径/kafka3.5
export PATH=$PATH:$KAFKA_HOME/bin

source ~/.bash_profile

3、该目录【kafka3.5/bin】启动zookeeper
zookeeper-server-start.sh ../config/zookeeper.properties

4、该目录【kafka3.5/bin】启动kafka
kafka-server-start.sh ../config/server.properties

2、Kafka测试

1、生产者
kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
>id-1-message
>id-2-message

2、消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic
id-1-message
id-2-message

3、查看topic列表
kafka-topics.sh --bootstrap-server localhost:9092 --list
test-topic

4、查看消息列表
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --partition 0
id-1-message
id-2-message

3、可视化工具

配置和部署

1、下载安装包:kafka-eagle-bin-3.0.2.tar.gz

2、配置环境变量

open -e ~/.bash_profile

export KE_HOME=/本地路径/efak-web-3.0.2
export PATH=$PATH:$KE_HOME/bin

source ~/.bash_profile

3、修改配置文件:system-config.properties

efak.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181
efak.url=jdbc:mysql://127.0.0.1:3306/kafka-eagle

4、本地新建数据库:kafka-eagle,注意用户名和密码是否一致

5、启动命令
efak-web-3.0.2/bin/ke.sh start
命令语法: ./ke.sh {start|stop|restart|status|stats|find|gc|jdk|version|sdate|cluster}

6、本地访问【localhost:8048】 username:admin password:123456

KSQL语句测试

select * from `test-topic` where `partition` in (0)  order by `date` desc limit 5

select * from `test-topic` where `partition` in (0) and msg like '%5%' order by `date` desc limit 3

三、工程搭建

1、工程结构

2、依赖管理

这里关于依赖的管理就比较复杂了,首先spring-kafka组件选择与boot框架中spring相同的依赖,即6.0.10版本,在spring-kafka最近的版本中3.0.8符合;

但是该版本使用的是kafka-clients组件的3.3.2版本,在Spring文档的kafka模块中,明确说明spring-boot:3.1要使用kafka-clients:3.4,所以从spring-kafka组件中排除掉,重新依赖kafka-clients组件;

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>${spring-kafka.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${kafka-clients.version}</version>
</dependency>

3、配置文件

配置kafka连接地址,监听器的消息应答机制,消费者的基础模式;

spring:
  # kafka配置
  kafka:
    bootstrap-servers: localhost:9092
    listener:
      missing-topics-fatal: false
      ack-mode: manual_immediate
    consumer:
      group-id: boot-kafka-group
      enable-auto-commit: false
      max-poll-records: 10
      properties:
        max.poll.interval.ms: 3600000

四、基础用法

1、消息生产

模板类KafkaTemplate用于执行高级的操作,封装各种消息发送的方法,在该方法中,通过topickey以及消息主体,实现消息的生产;

@RestController
public class ProducerWeb {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/send/msg")
    public String sendMsg (){
        try {
            // 构建消息主体
            JsonMapper jsonMapper = new JsonMapper();
            String msgBody = jsonMapper.writeValueAsString(new MqMsg(7,"boot-kafka-msg"));
            // 发送消息
            kafkaTemplate.send("boot-kafka-topic","boot-kafka-key",msgBody);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return "OK" ;
    }
}

2、消息消费

编写消息监听类,通过KafkaListener注解控制监听的具体信息,在实现消息生产和消费的方法测试后,使用可视化工具kafka-eagle查看topic和消息列表;

@Component
public class ConsumerListener {

    private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);

    @KafkaListener(topics = "boot-kafka-topic")
    public void listenUser (ConsumerRecord<?,String> record, Acknowledgment acknowledgment) {
        try {
            String key =  String.valueOf(record.key());
            String body = record.value();
            log.info("\n=====\ntopic:boot-kafka-topic,key{},body:{}\n=====\n",key,body);
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            acknowledgment.acknowledge();
        }
    }
}

五、参考源码

文档仓库:
https://gitee.com/cicadasmile/butte-java-note

源码仓库:
https://gitee.com/cicadasmile/butte-spring-parent

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/896853.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

0基础入门C++之类和对象上篇

目录 1.面向过程和面向对象初步认识2.类的引入3.类的定义3.1类的两种定义方式:3.2成员变量命名规则的建议 4.类的访问限定符及封装4.1访问限定符4.2封装 5.类的作用域6.类的实例化7.类对象模型7.1如何计算类对象的大小7.2 类对象的存储方式猜测 8.this指针8.1this指针的引出8.2…

C语言入门教程,C语言学习教程(非常详细)第六章 C语言数组

什么是数组&#xff1f;C语言数组的基本概念 在《C语言数据输出大汇总以及轻量进阶》一节中我们举了一个例子&#xff0c;是输出一个 44 的整数矩阵&#xff0c;代码如下&#xff1a; #include <stdio.h>#include <stdlib.h>int main(){int a120, a2345, a3700, a…

PHP自己的框架实现debug调试模式和时区(完善篇三)

1、实现效果通过config设置开关debug调试模式 2、debug调试模式设置和时区设置 error_reporting和display_errors点击查看详细讲解 public static function run(){//定义常量self::_set_const();//创建模块目录self::_mk_module();//加载文件self::_import_file();self::_set_…

java请求SAP系统,发起soap的xml报文,实体类转换,idea自动生成教程

1、将接口的网页地址&#xff0c;右键保存&#xff0c;然后修改文件后缀为wsdl文件 2、idea全局搜索 wsdl&#xff0c;找到自动转换javabean插件&#xff1a; 3、点击后&#xff0c;选择下载改完后缀的文件&#xff1a; 4、将无用的class文件删除掉 5、请求sap的地址为&#…

Ae 效果:CC Twister

过渡/CC Twister Transition/CC Twister CC Twister&#xff08;CC 扭曲器&#xff09;效果主要用于创造出扭曲、旋转的动画效果&#xff0c;适用于背景动画、文字动画以及过渡动画等场景。 ◆ ◆ ◆ 效果属性说明 Completion 完成度 控制过渡的进度&#xff0c;0 %时为动画起…

hive中get_json_object函数不支持解析json中文key

问题 今天在 Hive 中 get_json_object 函数解析 json 串的时候&#xff0c;发现函数不支持解析 json 中文 key。 例如&#xff1a; select get_json_object({ "姓名":"张三" , "年龄":"18" }, $.姓名);我们希望的结果是得到姓名对应…

直播系统源码协议探索篇(二):网络套接字协议WebSocket

上一篇我们分析了直播平台的会话初始化协议SIP&#xff0c;他关乎着直播平台的实时通信和多方互动技术的实现&#xff0c;今天我们来讲另一个协议&#xff0c;叫网络套接字协议WebSocket&#xff0c;WebSocket基于TCP在客户端与服务器建立双向通信的网络协议&#xff0c;并且可…

博客系统之自动化测试

背景&#xff1a;针对个人博客项目进行测试&#xff0c;个人博客主要由四个页面构成&#xff1a;登录页、列表页、详情页和编辑页&#xff0c;主要功能包括&#xff1a;用户登录功能、发布博客功能、查看文章详情功能、查看文章列表功能、删除文章功能、退出功能。对于个人博客…

mysql全文检索使用

数据库数据量10万左右&#xff0c;使用like %test%要耗费30秒左右&#xff0c;放弃该办法 使用mysql的全文检索 第一步:建立索引 首先修改一下设置: my.ini中ngram_token_size 1 可以通过 show variables like %token%;来查看 接下来建立索引:alter table 表名 add f…

C#与西门子PLC1500的ModbusTcp服务器通信1--项目背景

最近在一个120万元的项目中&#xff0c;涉及到modbustcp通信&#xff0c;我作为软件总工负责项目的通信程序开发&#xff0c;modbus是一个在工业自动化领域中的通信协议&#xff0c;可以是modbusrtu&#xff0c;modbusascii&#xff0c;modbustcp三个形式&#xff0c;具体来说是…

QT VS编译环境无法打开包括文件type_traits

这问题&#xff0c;别人给的处理方法都是&#xff1a; 添加环境变量执行vsvars32.bat/vcvarsall.bat/vsdevcmd.bat重新安装QT项目&#xff1a;执行qmake。。。。 个人不推荐配置环境编译&#xff0c;除非你非常熟&#xff0c;因为配置环境变量需要你知道有哪些路径需要添加&a…

SpringBoo t+ Vue 微人事 (十一)

职位修改操作 在对话框里面做编辑的操作 添加对话框 <el-dialogtitle"修改职位":visible.sync"dialogVisible"width"30%"><div><el-tag>职位名称</el-tag><el-input size"small" class"updatePosIn…

Vue2-全局事件总线、消息的订阅与发布、TodoList的编辑功能、$nextTick、动画与过渡

&#x1f954;&#xff1a;高度自律即自由 更多Vue知识请点击——Vue.js VUE2-Day9 全局事件总线1、安装全局事件总线2、使用事件总线&#xff08;1&#xff09;接收数据&#xff08;2&#xff09;提供数据&#xff08;3&#xff09;组件销毁前最好解绑 3、TodoList中的孙传父&…

Jenkins改造—nginx配置鉴权

先kill掉8082的端口进程 netstat -natp | grep 8082 kill 10256 1、下载nginx nginx安装 EPEL 仓库中有 Nginx 的安装包。如果你还没有安装过 EPEL&#xff0c;可以通过运行下面的命令来完成安装 sudo yum install epel-release 输入以下命令来安装 Nginx sudo yum inst…

Flutter 测试小结

Flutter 项目结构 pubspec.yaml 类似于 RN 的 package.json&#xff0c;该文件分别在最外层及 example 中有&#xff0c;更新该文件后&#xff0c;需要执行的 Pub get lib 目录下的 dart 文件为 Flutter 插件封装后的接口源码&#xff0c;方便在其他 dart 文件中调用 example 目…

vue 如何适配 web 夜间模式、暗黑模式、黑色主题 prefers-color-scheme: dark

vue 如何适配 web 夜间模式、暗黑模式、黑色主题 prefers-color-scheme: dark 一、暗黑模式越来越普遍 自苹果推出暗黑模式之后&#xff0c;Web 也有了对应的暗黑模式&#xff0c;默认情况下 Web 的暗黑模式是跟随系统的。 你只需要将暗黑模式的样式写到下面这样的媒体选择器中…

Ubuntu软件源、pip源大全,国内网站网址,阿里云、网易163、搜狐、华为、清华、北大、中科大、上交、山大、吉大、哈工大、兰大、北理、浙大

文章目录 一、企业镜像源1、阿里云2、网易1633、搜狐镜像4、华为 二&#xff1a;高校镜像源1、清华源2、北京大学3、中国科学技术大学源 &#xff08;USTC&#xff09;4、 上海交通大学5、山东大学6、 吉林大学开源镜像站7、 哈尔滨工业大学开源镜像站8、 西安交通大学软件镜像…

使用ChatGPT进行创意写作的缺点

Open AI警告ChatGPT的使用者要明白此工具的局限性&#xff0c;更不应完全依赖。作为一位创作者&#xff0c;这一点非常重要&#xff0c;应尽可能地避免让版权问题或不必要的文体问题出现在自己的作品中。[1] 毕竟使用ChatGPT进行创意写作目前还有以下种种局限或缺点[2]&#xf…

prompt工程(持续更新ing...)

诸神缄默不语-个人CSDN博文目录 我准备想办法把这些东西整合到我的ScholarEase项目里。 其实以现在GPT-4的能力来说&#xff0c;直接就当日常对话随便直接说、直接问&#xff0c;基本没有太大的问题。 有时使用更复杂、详细、明确的prompt可能会起到提升作用。 有一些简单的…

Open cv C++安装

注意;要退出conda的虚拟环境 依赖 1.更新系统 sudo apt-get update sudo apt-get upgrade 2.安装相关的依赖 sudo apt-get install build-essential cmake git libgtk2.0-dev pkg-config libavcodec-dev libavformat-dev libswscale-dev sudo apt-get install libjpeg-de…