Springboot集成kafka(环境搭建+演示)|超级详细,建议收藏

news2024/12/29 0:30:32

Springboot集成kafka

  • 一、前言🔥
  • 二、环境说明🔥
  • 三、概念🔥
  • 四、CentOS7安装kafka🔥
    • 1.下载kafka安装包
    • 2.下载好后,进行解压
  • 六、kafka项目集成🔥
    • 1️⃣pom引入
    • 2️⃣配置kafka
    • 3️⃣一个kafka消息发送端
    • 4️⃣定义一个kafka消息消费端
    • 5️⃣定义一个Controller进行测试
    • 6️⃣测试结果如下

一、前言🔥

上一期,我是带着大家入门了SpringBoot整合WebSocket,今天我再来一期kafka的零基础教学吧。不知道大家对kafka有多少了解,反正我就是从搭建开始,然后再加一个简单演示,这就算是带着大家了个门哈,剩下的我再后边慢慢出教程给大家说。

二、环境说明🔥

演示环境:idea2021 + springboot 2.3.1REALSE + CentOS7 + kafka

三、概念🔥

kafka是linkedin开源的分布式发布-订阅消息系统,目前归属于Apache的顶级项目。主要特点是基于pull模式来处理消息消费,追求高吞吐量,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。

一开始的目的是日志的收集和传输。0.8版本开始支持复制,不支持事务,对消息的丢失,重复,错误没有严格要求 适用于产生大量数据的互联网服务的数据收集业务。在廉价的服务器上都能有很高的性能,这个主要是基于操作系统底层的pagecache,不用内存胜似使用内存。

综上所述,kafka是一款开源的消息引擎系统(消息队列/消息中间件) 分布式流处理平台

四、CentOS7安装kafka🔥

1.下载kafka安装包

下载地址:https://kafka.apache.org/downloads.html
CSDN:kafka_2.12-2.2.1.zip

2.下载好后,进行解压

通过ftp将kafka安装包kafka_2.11-0.9.0.1.tgz上传到服务器 /opt/monitor/kafka目录下
执行命令unzip kafka_2.12-2.2.1.zip 解压上传的kafka安装包

unzip kafka_2.12-2.2.1.zip 

在这里插入图片描述

输入命令ll查询解压情况

在这里插入图片描述
执行命令 cd /opt/monitor/kafka/kafka_2.12-2.2.1 进入kafka目录

 cd /opt/monitor/kafka/kafka_2.12-2.2.1

1 配置并启动zookeeper
执行命令 创建zookeeper日志文件存放路径

mkdir zklogs

在这里插入图片描述
执行命令 修改zookeeper的配置信息

vim config/zookeeper.properties

按一下键盘上的 i 键进入编辑模式,将光标移动到日志文件存放路径配置信息所在行,并修改dataDir=/opt/monitor/kafka/kafka_2.12-2.2.1/zklogs

dataDir=/opt/monitor/kafka/kafka_2.12-2.2.1/zklogs

在这里插入图片描述
修改好后按下键盘上的Esc 键后 输入:wq 并按下Enter键保存修改的信息并退出,注意这里的:也是要输入的

执行sh./zookeeper-server-start.sh ./config/zookeeper.properties & 命令后台启动zookeeper
在这里插入图片描述

注意这里提示报错权限不足,使用命令修改权限(个人建议把bin的权限全部修改成777)
chmod 777 zookeeper-server-start.sh
在这里插入图片描述
显示没有报错启动zookeeper成功

 sh ./zookeeper-server-start.sh  /opt/monitor/kafka/kafka_2.12-2.2.1/config/zookeeper.properties

在这里插入图片描述
执行命令ps -ef | grep zookeeper 查看zookeeper是否启动成功,出现类型如下信息表示成功启动
在这里插入图片描述
2 配置并启动kafka
执行命令 vim config/server.properties 修改kafka的配置信息
在这里插入图片描述
按一下键盘上的 i 键进入编辑模式,修改advertised.listeners=PLAINTEXT://外网IP:9092;
在这里插入图片描述
修改log.dirs=/opt/monitor/kafka/kafka_2.12-2.2.1/logs该参数为kafka日志文件存放路径
在这里插入图片描述
修改每个topic的默认分区参数num.partitions,默认是1,具体合适的取值需要根据服务器配置进程确定
在这里插入图片描述
修改完成后按下键盘上的Esc 键后 输入:wq 并按下Enter键 保存修改的信息并退出,注意这里的:也是要输入的.

cd /opt/monitor/kafka/kafka_2.12-2.2.1/bin #进入kafka启动目录
sh kafka-server-start.sh /opt/monitor/kafka/kafka_2.12-2.2.1/config/server.properties  #启动kafka服务指定配置文件

在这里插入图片描述
执行命令查看kafka是否启动成功

ps -ef | grep kafka  #查看kafka是否启动成功

在这里插入图片描述

六、kafka项目集成🔥

1️⃣pom引入

<!--kafka依赖-->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2️⃣配置kafka


spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    producer:
      # 发生错误后,消息重发的次数。
      retries: 0
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 1
    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 5
      #listner负责ack,每调用一次,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false
  profiles:
    active: dev
server:
  port: 8070

3️⃣一个kafka消息发送端

package com.suihao.kafka;

import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * @author suihao
 * @Title: KafkaProducer
 * @Description TODO
 * @date: 2023/03/03 17:
 * @version: V1.0
 */
@Component
@Slf4j
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    //自定义topic
    public static final String TOPIC_TEST = "topic.test";

    //
    public static final String TOPIC_GROUP1 = "topic.group1";

    //
    public static final String TOPIC_GROUP2 = "topic.group2";

    public void send(Object obj) {
        String obj2String = JSONUtil.toJsonStr(obj);
        log.info("准备发送消息为:{}", obj2String);
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                //发送失败的处理
                log.info(TOPIC_TEST + " - 生产者 发送消息失败:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                //成功的处理
                log.info(TOPIC_TEST + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
            }
        });
    }
}

4️⃣定义一个kafka消息消费端

package com.suihao.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Optional;


/**
 * @author suihao
 * @Title: KafkaConsumer
 * @Description TODO
 * @date: 2023/03/03 17:
 * @version: V1.0
 */
@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP1)
    public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("topic.group1 消费了: Topic:" + topic + ",Message:" + msg);
            ack.acknowledge();
        }
    }

    @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP2)
    public void topic_test1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("topic.group2 消费了: Topic:" + topic + ",Message:" + msg);
            ack.acknowledge();
        }
    }
}

5️⃣定义一个Controller进行测试

package com.suihao.controller;

import com.suihao.kafka.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author suihao
 * @Title: KafkaController
 * @Description TODO
 * @date: 2023/03/03 17:
 * @version: V1.0
 */
@RestController
public class KafkaController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @GetMapping("/send")
    public void sendMsg(){
        kafkaProducer.send("------------测试消息-----------");
    }
}

6️⃣测试结果如下

在这里插入图片描述
彩蛋: https://gitee.com/suihao666/SpringBoot-Kafka

最后送所有正在努力的大家一句话:

你不一定逆风翻盘,但一定要向阳而生。

期待下次发布好的文章:

山水相逢,我们江湖见。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/386235.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL45讲笔记04深入浅出索引上

索引的目的: 索引的出现其实就是为了提高数据查询的效率&#xff0c;就像书的目录一样。常见索引模型&#xff1a; hash表&#xff0c;以K-V键值对的形式的一种数据结构&#xff0c;底层是数组加链表形式。通过一定的hash运算找到数据合适的位置放入&#xff0c;如果放入的位置…

[jetson]paddlepaddle2.4.0在jetpack5.0.2源码编译流程

由于官方暂时没有提供jetson对应的jetson jetpack5.0.2预编译包&#xff0c;因此只有源码编译&#xff0c;本次编译不带Tensorrt,编译已经顺利成功&#xff0c;注意本次使用的设备是jetson NX 测试环境&#xff1a; ubuntu20.04 jetpack5.0.2 GCC-8.4 Software part of jet…

Centos7搭建NFS

1.NFS简介Network File System(网络文件系统&#xff0c;通过网络让不同的机器系统之间可以彼此共享文件和目录&#xff0c;类似Samba服务。2.NFS挂载原理 在网络中服务器和客户端进行连接都是通过端口进行数据传输&#xff0c;而NFS服务端的端口是随机的&#xff0c;从而导致N…

Linux----网络基础(2)--应用层的序列化与反序列化--守护进程--0226

文章中有使用封装好的头文件&#xff0c;可以在下面连接处查询。 Linux相关博文中使用的头文件_Gosolo&#xff01;的博客-CSDN博客 1. 应用层 我们程序员写的一个个解决我们实际问题, 满足我们日常需求的网络程序, 都是在应用层 1.2 协议 我们在之前的套接字编程中使用的是…

最适合你的团队云协作工具

团队云协作工具哪个好&#xff1f;使用Zoho Projects的团队云协作软件套件&#xff0c;在一个平台上无缝协作&#xff0c;激励您的团队在任何地方以最好的状态完成他们的工作。 使您的团队能够使用团队云协作软件在任何地方进行协作和沟通。Zoho Projects提供了一套强大…

三天吃透计算机网络八股文

本文已经收录到Github仓库&#xff0c;该仓库包含计算机基础、Java基础、多线程、JVM、数据库、Redis、Spring、Mybatis、SpringMVC、SpringBoot、分布式、微服务、设计模式、架构、校招社招分享等核心知识点&#xff0c;欢迎star~ Github地址&#xff1a;https://github.com/…

一文读懂光学天线

天线&#xff0c;按维基百科的定义&#xff0c;"是一种用来发射或接收无线电波—或更广泛来讲—电磁波的器件"。例如&#xff0c;在无线通信系统中&#xff0c;天线被用于发射与接收射频与微波波段的电磁波。而在我们的智能手机中&#xff0c;就有内置的平面倒F天线(…

01-认识产品经理

文章目录引入1.1 合格的产品经理1.2 产品经理的分类按服务对象不同划分按产品平台不同划分按公司所属行业不同按工作内容划分按职级高低划分1.3 产品经理的岗位职责产品的开发流程核心团队成员及其职责产品经理工作中常见误区1.4 产品经理的能力素质专业技能&#xff08;干得了…

Unity Lighting -- 配置平行光源和天空盒

识别不同种类的光源 在游戏或实时应用程序中&#xff0c;我们可能会创建多种不同种类的场景&#xff0c;比如室内场景、室外场景、真实的场景或完全想象的场景。即便项目是一个完全的想象的或是科幻的故事&#xff0c;灯光也是非常重要的一环&#xff0c;它能极大提升沉浸感。 …

Python3-条件控制

Python3 条件控制 Python 条件语句是通过一条或多条语句的执行结果&#xff08;True 或者 False&#xff09;来决定执行的代码块。 可以通过下图来简单了解条件语句的执行过程: 代码执行过程&#xff1a; if 语句 Python中if语句的一般形式如下所示&#xff1a; if condi…

Atlassian Server用户新选择 | 数据中心产品是否适合您的企业(3)?

2024年2月&#xff0c;也就是一年不到&#xff0c;Atlassian将终止对Server产品及插件的所有支持。 此公告发布后&#xff0c;许多用户需要了解怎样的前进方向才是最适合企业的。为此&#xff0c;Atlassian不仅提供云版&#xff0c;还提供了本地部署的数据中心&#xff08;Data…

jupyter lab安装和配置

jupyter lab 安装和配置 一、jupyter lab安装并配置 安装jupyterlab pip install jupyterlab启动 Jupyter lab默认会打开实验环境的&#xff0c;也可以自己在浏览器地址栏输入127.0.0.1:8888/lab 汉化 pip install jupyterlab-language-pack-zh-CN刷新一下网页&#xff0…

ChatGPT解答:PYQT5 组件化实例,Python代码实现,给出100个代码实例

ChatGPT解答&#xff1a; PYQT5 组件化实例&#xff0c;Python代码实现&#xff0c;给出100个代码实例 PYQT5 组件化实例&#xff0c;Python代码实现&#xff0c;给出100个代码实例 实现一个简单的窗口 import sys from PyQt5.QtWidgets import QApplication, QWidgetapp QA…

我90后,零基础成功转行python工程师,从月薪5K到现在月入2W+改变真的难吗?

我是25岁转行学python的。说实在&#xff0c;转行就是奔着挣钱去的。希望我的经历可以给想转行的朋友带来一点启发和借鉴。 先简单介绍下个人背景&#xff0c;三流大学毕业&#xff0c;物流专业&#xff0c;学习能力一般&#xff0c;没啥特别技能&#xff0c;反正就很普通的一…

CSS3新特性-变量

2017年三月&#xff0c;微软宣布 Edge 浏览器将支持 CSS 变量。 这个重要的 CSS 新功能&#xff0c;所有主要浏览器已经都支持了。本文全面介绍如何使用它&#xff0c;你会发现原生 CSS 从此变得异常强大。 一、变量的声明 声明变量的时候&#xff0c;变量名前面要加两根连词…

python入门应该怎么学习

国外Python的使用率非常高&#xff0c;但在国内Python是近几年才火起来&#xff0c;Python正处于高速上升期市场对于Python开发人才的需求量急剧增加&#xff0c;学习Python的前景比较好。 Python应用领域广泛&#xff0c;意味着选择Python的同学在学成之后可选择的就业领域有…

虚函数与多态性

5.1多态性概述&#xff1a; 按实施的机制&#xff0c;多态可以分为两类&#xff1a; 虚函数的定义&#xff1a; &#xff08;前面思维是虚基类&#xff0c;别搞混了&#xff09; 运行时多态的条件&#xff1a; 运行时的多态&#xff1a; 基类中有show(),派生类中也有show&…

叠氮化物标记糖92659-90-0,2-[(Azidoacetyl)amino]-2-deoxy-D-glucose广泛用于体内代谢标记

基础产品数据&#xff1a;CAS号&#xff1a;92659-90-0中文名&#xff1a;2-[(叠氮基乙酰基)氨基]-2-脱氧葡萄糖英文名&#xff1a;2-[(Azidoacetyl)amino]-2-deoxy-D-glucose性 状&#xff1a;白色粉末温馨提示&#xff1a;所有的试剂仅用于科研实验。结构式&#xff08;Struc…

ChatGPT解答:纯前端文档预览,Vue实现,无需后端,支持Word、Excel、PPT、pdf、文本、图片,附接入demo和文档

ChatGPT解答&#xff1a;纯前端文档预览&#xff0c;Vue实现&#xff0c;无需后端&#xff0c;支持Word、Excel、PPT、pdf、文本、图片&#xff0c;附接入demo和文档 ChatGPTDemo Based on OpenAI API (gpt-3.5-turbo). 纯前端文档预览&#xff0c;Vue实现&#xff0c;无需后…

【软件测试】性能测试面试题都问什么?面试官想要什么?回答惊险避坑......

目录&#xff1a;导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09;前言 1、你认为不同角色关…