【Springboot】整合kafka

news2025/1/22 22:51:18

目录

  • 安装zookeeper
    • jdk安装
    • zookeeper安装
  • 安装kafka(非集群)
  • springboot项目整合配置

安装zookeeper

jdk安装

环境准备:CentOS7,jdk1.8
步骤如下:

  • 下载自己需要的版本
    这里使用的jdk1.8,获取链接如下
    链接:https://pan.baidu.com/s/1Bkvy0ChTVH0hrKkxiE4ZDg 提取码:1zws

  • 上传到安装目录并解压
    使用linux连接工具,将下载的JDK压缩包上传到要安装的目录下,这里使用的是windTerm,我是将压缩包上传到/usr/java目录下,解压命令如下

    tar -zxvf jdk-8u152-linux-x64.tar.gz
    

    解压后目录下面多个文件夹
    在这里插入图片描述

  • 配置环境变量

    vim /etc/profile
    

    在文件最后面添加如下配置,JAVA_HOME是自己安装的目录,需要改为自己的目录

    export JAVA_HOME=/usr/java/jdk1.8.0_152
    export CLASSPATH=$:CLASSPATH:$JAVA_HOME/lib/    
    export PATH=$PATH:$JAVA_HOME/bin
    

    重新加载系统环境变量

    source /etc/profile
    
  • 验证是否安装成功

    java -version
    

    出现如下内容说明安装成功
    在这里插入图片描述

zookeeper安装

  • 下载压缩包:
    官网地址:https://zookeeper.apache.org/releases.html

  • 上传到安装目录并解压
    我的是/usr/zookeeper,解压压缩包
    在这里插入图片描述

  • 修改配置文件
    进入解压后的文件夹里面的conf目录下,该目录下有一个名zoo_sample.cfg文件,将该文件重新名为zoo.cfg,或者重新复制一份命名为zoo.cfg。使用vim命令打开zoo.cfg文件编辑,主要修改两个地方,data,log需要自己创建,我是直接在zookeeper安装目录下创建的
    在这里插入图片描述

  • 启动zookeeper

    进入zookeeper安装目录下的bin目录
    在这里插入图片描述

    进入bin目录如下
    在这里插入图片描述

    启动zookeeper服务端:./zkServer.sh start
    在这里插入图片描述

    查看启动状态:./zkServer.sh status
    在这里插入图片描述

    启动zookeeper客户端

    	[root@localhost bin]# ./zkCli.sh
    

安装kafka(非集群)

  • 首先是下载自己需要的压缩包
    官网下载地址:https://kafka.apache.org/downloads
  • 上传到安装目录并解压
    tar -zxvf kafka压缩包名称
    
  • 修改kafka的server.properties配置文件
    进入解压后的文件能看到如下内容,server.properties配置文件在config目录下
    在这里插入图片描述
    我这里是非集群,修改配置文件也比较简单,使用vi或者vim文本编辑命令打开server.properties文件,修改下面配置
    #kafka服务器地址
    listeners=PLAINTEXT://10.3.1.156:9092 
    
    #zk服务器地址,我这里zk跟kafka在一台虚拟机上
    zookeeper.connect=10.3.1.156:2181
    
    log.dirs=/usr/kafka/kafka-logs
    
    非集群方法配置就完成了,下面就是启动kafka服务,进入kafka文件下的bin目录下,使用命令的方式体验消息发送和接收
    启动kafka
    ./kafka-server-start.sh -daemon ../config/server.properties
    
    创建topic
    ./kafka-topics.sh --bootstrap-server 10.3.1.156:9092 --create --topic test --replication-factor 1 --partitions 1
    
    查看topic
    ./kafka-topics.sh --list --bootstrap-server 10.3.1.156:9092
    
    发消息
    ./kafka-console-producer.sh --bootstrap-server 10.3.1.156:9092 --topic test
    
    接收消息
    ./kafka-console-consumer.sh --bootstrap-server 10.3.1.156:9092 --topic test
    
    删除topic
    ./kafka-topics.sh --bootstrap-server 10.3.1.156:9092 --delete --topic test
    

springboot项目整合配置

  • 新建的项目pom文件中导入kafka依赖
    <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka</artifactId>
    </dependency>
    
  • 配置文件添加kafka相关配置
    server:
      port: 8080
    spring:
      kafka:
        bootstrap-servers: 10.3.1.156:9092 #kafka服务地址
        producer: #生产者
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer: #消费者
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          auto-offset-reset: earliest
    
  • 客户端发送消息
    @RestController
    public class KafkaProducerController {
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        @GetMapping(value = "/sentMessage")
        public String sentMessage(@RequestParam("msg") String msg){
            kafkaTemplate.send("test", msg);
            return "发送成功";
        }
    }
    
  • 消费者接收消息
    @Component
    public class KafkaConsumerListener {
        @KafkaListener(topics = "test", groupId = "my-test-group")
        public void consumer(ConsumerRecord<String,String> record){
            System.out.println("接收到的消息:" + record.value());
        }
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1012091.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Maxwell 概述、安装、数据同步【一篇搞定】!

文章目录 什么是 Maxwell&#xff1f;Maxwell 输出格式Maxwell 工作原理Maxwell 安装Maxwell 历史数据同步Maxwell 增量数据同步 什么是 Maxwell&#xff1f; Maxwell 在大数据领域通常指的是一个用于数据同步和数据捕获的开源工具&#xff0c;由美国 Zendesk 开源&#xff0c…

千巡翼X1协调转弯功能

近年来&#xff0c;随着技术的飞速发展&#xff0c;无人机航测已成为现代测绘领域的一项重要应用。 无人机的出现极大地提高了航测的效率和精度&#xff0c;极大地减少了人力资源的投入。通过搭载各种高精度的航测仪器和传感器&#xff0c;无人机可以在短时间内完成大面积的航…

使用vscode以16进制方式查看bin文件内容

简介 方便对bin文件内容进行分析。 使用 VSCODE&#xff1a;插件下载 Hex Editor,下载完后使用vscode打开bin文件。 使用快捷键CtrlShiftP&#xff0c; 并在上方命令框输入>hex 选择 结果如下

微信小程序隐私授权

微信开发者平台新公告&#xff1a;2023年9月15之后&#xff0c;隐私协议将被启用&#xff0c;所以以后的小程序都要加上隐私协议的内容提示用户&#xff0c; 首先设置好隐私协议的内容&#xff0c;登录小程序的开发者后台&#xff0c;在设置--》服务内容声明--》用户隐私保护指…

前端JavaScript入门到精通,javascript核心进阶ES6语法、API、js高级等基础知识和实战 —— JS基础(一)

&#xfecc;&#xfecc;&#xfecc;&#xfecc;♡‎&#xfecc;&#xfecc;&#xfecc;&#xfecc;♡‎‎&#xfecc;&#xfecc;&#xfecc;&#xfecc;♡‎&#xfecc;&#xfecc;&#xfecc;&#xfecc;♡&#xfecc;&#xfecc;&#xfecc;&#xfecc;…

OPC HDA扫盲

目录 1 基本概念 1.1 历史数据服务器类型 1.2 数据源 1.3 对象和接口概述 1.4 所需接口定义 1.5 可选接口定义 1.6 定义 1.7 边界值和时域 2 HDA聚合 2.1 生成间隔 2.2 数据类型 2.3 数据质量 3 聚合示例 3.1 示例数据 3.2 内插&#xff08;INTERPOLATIVE&#x…

构造与析构

在类的声明中&#xff0c;构造函数和析构函数是一类特殊的函数&#xff1a;由系统自动执行&#xff0c;在程序中不可显示地调用它们。 构造函数 作用&#xff1a;建立对象时对对象的数据成员进行初始化 特点&#xff1a; 构造函数是与类同名的特殊成员函数&#xff0c;没有…

Xamarin.Android实现App内版本更新

目录 1、具体的效果2、代码实现2.1 基本原理2.2 开发环境2.3 具体代码2.3.1 基本设置2.3.2 系统的权限授予2.3.3 进度条的layout文件2.3.4 核心的升级文件 3、代码下载4、知识点5、参考文献 1、具体的效果 有事需要在程序内集成自动更新的功能&#xff0c;网上找了下&#xff…

【ARM AMBA AXI 入门 11 - AXI 总线 AWCACHE 和 ARCACHE 介绍】

文章目录 1.1 AXI 传输事务属性1.1.1 slave type1.1.2 系统级缓存 1.2 Memory Attributes1.2.1 Bufferable&#xff0c;AxCACHE[0]1.2.2 Modifiable, AxCACHE[1]1.2.3 cache-allocate 1.3 Memory types 转自&#xff1a;https://zhuanlan.zhihu.com/p/148813963 如有侵权请联系…

学习记忆——英语篇

文章目录 英语字母形象起源右脑记忆单词的原则四大步骤第一步&#xff1a;摄取信息第二步&#xff1a;处理信息第三步&#xff1a;储存信息第四步&#xff1a;提取信息 训练例子字母形象训练 右脑记忆单词5大方法字源法编码法字母编码法字母组合编码法 拼音法全拼法拼音组合 熟…

前K个高频单词-c++实现

692. 前K个高频单词 - 力扣&#xff08;LeetCode&#xff09; 给定一个单词列表 words 和一个整数 k &#xff0c;返回前 k 个出现次数最多的单词。 返回的答案应该按单词出现频率由高到低排序。如果不同的单词有相同出现频率&#xff0c; 按字典顺序 排序。 示例 1&#xff…

关于Linux服务器.sh文件启动问题

问题描述 在linux服务器上使用文本编辑&#xff08;并非vim操作&#xff09;对.sh脚本文件进行修改后无法启动&#xff0c;显示’\r’识别错误等。 错误如下&#xff1a; 错误原因 因为.sh文件在经过这种编辑后格式产生了错误&#xff0c;由unix转为了doc格式&#xff0c;需…

Ae 效果:CC Particle Systems II

模拟/CC Particle Systems II Simulation/CC Particle Systems II CC Particle Systems II&#xff08;CC 粒子系统 II&#xff09;可用于生成和模拟各种类型的粒子系统&#xff0c;包括火焰、雨、雪、爆炸、烟雾等等。 与 CC Particle World 效果相比有许多类似的属性。最大的…

华为云云耀云服务器L实例评测|部署功能强大的监控和可视化工具Grafana

应用场景 Grafana介绍 Grafana是一个功能强大的监控和可视化工具&#xff0c;适用于各种行业和应用场景&#xff0c;如IT运维监控、网络监控、能源管理、金融市场分析等。它提供了灵活的数据源支持、强大的可视化功能和告警机制&#xff0c;以及注释和过滤功能&#xff0c;使…

阿里云服务器全方位介绍_优势_使用场景_限制说明

阿里云服务器是什么&#xff1f;云服务器ECS是一种安全可靠、弹性可伸缩的云计算服务&#xff0c;云服务器可以降低IT成本提升运维效率&#xff0c;免去企业或个人前期采购IT硬件的成本&#xff0c;阿里云服务器让用户像使用水、电、天然气等公共资源一样便捷、高效地使用服务器…

使用cpolar配合Plex打造个人媒体站,畅享私人影音娱乐空间

文章目录 1.前言2. Plex网站搭建2.1 Plex下载和安装2.2 Plex网页测试2.3 cpolar的安装和注册 3. 本地网页发布3.1 Cpolar云端设置3.2 Cpolar本地设置 4. 公网访问测试5. 结语 1.前言 用手机或者平板电脑看视频&#xff0c;已经算是生活中稀松平常的场景了&#xff0c;特别是各…

langchain+GPT+neo4j 图数据库

neo4j版本是5.11.0,langchain的版本 0.0.288下载apoc插件 https://neo4j.com/docs/apoc/current/installation/ neo4j.conf文件把apoc.*添加到dbms.security.procedures.unrestricted配置项 使用return apoc.version()来查看是否安装成功 pip install neo4j图 参考官网&…

以太网的简单概念、MAC地址与IP地址的关系

以太网 DIX Ethernet V2标准的局域网------以太网。 IEEE 802.3标准和DIX Ethernet V2标准很相似&#xff0c;只有些许区别&#xff0c;不严格的来说&#xff0c;802.3局域网也叫做以太网。以太网是一个局域网&#xff0c;信息以广播的形式发送。 IEEE 802标准定义的局域网参…

哈工大校园网显示IP地址错误连接不上

您当前获取到的IP地址有误&#xff0c;请重新开关无线获取IP地址(注:电脑端还可以通过cmd窗口&#xff0c;输入ipconfig /release、ipconfig /renew命令)。如未解决此问题请联系网络安全和信息化办公室处理。 当校园网登录时会出现如上情况&#xff0c;并且当你按照他的方法尝试…

vue2使用vuedraggable实现拖拽删除添加重置功能

需求&#xff1a;要输入xx阶段&#xff0c;之后输入后显示但是要可以自己手动排序和删除&#xff0c;以免写错了&#xff0c;并且做了判断&#xff0c;如果重复输入的话会提示&#xff0c;不会让他添加&#xff0c;点击重置功能后一键清空所有输入 1.效果 2.下载插件 我直接下…