从安装到实战:Spring Boot与kafka终极整合指南

news2025/1/15 12:57:14

docker环境下部署kafka

前置条件

Apache Kafka 自 2.8.0 版本开始引入了不依赖 Zookeeper 的“Kafka Raft Metadata Mode”,本文章依然使用Zookeeper 作为集群管理的插件。

#拉去zookeeper镜像
 docker pull wurstmeister/zookeeper
 #运行zookeeper容器
 docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

启动后防火墙开放端口2181

su在这里插入代码片do firewall-cmd --zone=public --add-port=2181/tcp --permanent

重载配置

sudo firewall-cmd --reload

部署kafka容器

#拉去kafka镜像
 docker pull wurstmeister/kafka
 #运行kakfa进行
docker run -d \
  --name kafka \  # 容器名称为 kafka
  --restart=always \  # 自动重启策略,始终重启
  -p 9092:9092 \  # 将容器的 9092 端口映射到主机的 9092 端口
  --link zookeeper \  # 链接到名为 zookeeper 的容器
  -e KAFKA_ZOOKEEPER_CONNECT=192.168.253.166:2181 \  # 指定 Zookeeper 的连接地址
  -e KAFKA_ADVERTISED_HOST_NAME=192.168.253.166 \  # 广播给 Kafka 客户端的主机名
  -e KAFKA_ADVERTISED_PORT=9092 \  # 广播给 Kafka 客户端的端口号
  -v /etc/localtime:/etc/localtime \  # 将主机的时区信息挂载到容器中
  wurstmeister/kafka  # 使用 wurstmeister 提供的 Kafka 镜像

同理需要放开防火墙端口9092,注意需要将命令中的Zookeeper连接ip切换为自己的本机ip,另外在实际生产中为了安全性,还需要给kafka加上用户和密码,此处仅演示使用,不再赘述。

kafka原理解析

kafka 生产与消费的核心架构模型
在这里插入图片描述

核心概念

  • producer:生产者就是产生消息的组件
  • broker:一个broker可以认为就是一个服务节点,服务实例。
  • consumer:消费者 消费信息的组件
  • zookeeper:用于管理和协调Kafka的Broker

逻辑组件

  • topic:生产者创建消息是要发送给特定的主题的,而消费者拉取消息也是要指定主题的。消息就是通过主题来归类的,类似于RabbitMQ中的Exchange的概念

  • partition:是Kafka下数据存储的基本单元,这是个物理上的概念,同一个topic的数据,会被分散的存储到多个partition中,这些partition可以在同一台机器上,也可以是在多台机器上,kafka中的消息是以键值对的形式存储的,如果没有指定分区,消息是默认按照轮询的方式存储到各个分区上的。
    在这里插入图片描述

  • offset:偏移量, Kafka 的消息是可以持久化并反复消费的,这是因为在每个分区中,当有消息写入就会像追加日志那样顺序写入(顺序IO的写入性能是十分好的),通过 offset 来记录对应消息所在的位置。因此,offset 是消息在 partition 中的唯一标识,并且能看出同一个 partition 内的消息的先后顺序,我们称之为 “Kafka 保证消息在分区内是有序的”。

场景应用

1. 实时数据流管道
日志收集与聚合:Kafka可以用于收集和聚合来自不同系统的日志数据,然后将这些数据传输到集中存储系统(如Hadoop、Elasticsearch)进行分析。
指标监控与报警:通过Kafka传输系统运行指标数据,并实时分析,帮助及时发现并处理系统异常。
2. 数据集成
数据库变更数据捕获(CDC):使用Kafka连接器捕获数据库中的变更(如插入、更新、删除),然后将变更数据流式传输到其他存储系统或服务。
跨数据中心复制:在地理上分散的数据中心之间传输数据,实现数据的实时同步。
3. 流处理与分析
实时分析与机器学习:结合流处理框架(如Apache Flink、Apache Spark Streaming),Kafka可以用于实时数据分析和机器学习模型的在线更新。
用户行为跟踪:收集用户在网站或应用上的行为数据,进行实时分析以优化用户体验或做出业务决策。
4. 消息队列
解耦服务与微服务通信:在微服务架构中使用Kafka作为消息队列,实现服务间松耦合和可靠通信。
事件溯源模式:记录应用程序的所有状态变化作为事件流,以实现事件溯源和回放。
5. 物联网(IoT)
传感器数据收集:通过Kafka收集大量传感器设备的数据,实现实时监控和管理。
边缘计算支持:在边缘设备上进行初步数据处理后,将结果发送到中心服务器进行进一步分析。
6. 金融服务
交易流水处理:金融机构可以使用Kafka处理大量交易流水,确保数据的实时性和一致性。
欺诈检测:实时分析交易行为,快速识别异常以防止欺诈活动。
7. 内容分发
新闻推送与个性化推荐:根据用户兴趣实时推送个性化内容,提高用户参与度。
视频直播流处理:用于视频直播的数据传输和实时处理,确保低延迟和高质量。
Kafka通过其高吞吐量、可扩展性、容错性以及灵活的订阅机制,使得它在这些场景中能够有效地支持复杂的实时数据流处理需求。

整合kafka

引入依赖

  <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.9.13</version>
        </dependency>

引入配置

# Kafka集群的地址,用于指定Kafka服务器的位置。
spring.kafka.bootstrap-servers=192.168.253.166:9092

# 消费者组ID,用于标识消费者所属的组,Kafka通过消费者组来管理消息的消费。
spring.kafka.consumer.group-id=my-group

# 自动偏移量重置策略,当没有初始偏移量或当前偏移量在服务器上不存在时,使用此配置。
# 'earliest'从最早的可用消息开始消费。latest:从最新的消息开始读取。none:没有找到以前的偏移量,抛出异常。
spring.kafka.consumer.auto-offset-reset=earliest

# 生产者键序列化器,定义消息键的序列化方式,这里使用字符串序列化。
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

# 生产者值序列化器,定义消息值的序列化方式,这里使用字符串序列化。
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# 消费者键反序列化器,定义如何将字节数组反序列化为消息键,这里使用字符串反序列化。
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# 消费者值反序列化器,定义如何将字节数组反序列化为消息值,这里使用字符串反序列化。
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# 是否启用幂等性,确保每条消息在Kafka中只会被写入一次,从而避免重复写入。
spring.kafka.producer.enable-idempotence=true

# 事务ID前缀,用于标识生产者事务。每个生产者实例必须有唯一的事务ID前缀,以支持事务性生产者功能。
# 这个设置是可选的,仅在需要事务性保证时使用。
#spring.kafka.producer.transaction-id-prefix=tx-

生产者

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Service
public class KafkaProducer {

    private static final String TOPIC = "my_topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send(TOPIC, message);
        System.out.println("Sent message: " + message);
    }
    // 带回调的发送消息方法
    public void sendMessageCallback(String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, "A", message);

        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                // 发送成功时的处理逻辑
                System.out.println("成功回调=[" + message +
                        "] with offset=[" + result.getRecordMetadata().offset() + "]");
            }

            @Override
            public void onFailure(Throwable ex) {
                // 发送失败时的处理逻辑
                System.err.println("失败回调=["
                        + message + "] due to : " + ex.getMessage());
                // 可选择在此处实现重试机制或记录日志
            }
        });
    }
}

消费者

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "my_topic", groupId = "my-group")
    public void listen(String message) {
        System.out.printf("普通A message: %s%n", message);
    }
}

写个方法调用

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @GetMapping("/send")
    public String send() {

        kafkaProducer.sendMessageCallback("Hello, Kafka!");
        return "Message sent!";
    }

    @GetMapping("/send2")
    public String send2() {

        kafkaProducer.sendMessage("Hello, Kafka启动");
        return "Message sent!";
    }
}

生产者测试结果
在这里插入图片描述
消费者测试结果
在这里插入图片描述

配置说明

1. 生产者配置(Producer Configuration)
  • bootstrap.servers:Kafka 集群地址列表,格式为 hostname:port。
  • key.serializer:消息键的序列化器类,用于将消息键转换为字节数组。
  • value.serializer:消息值的序列化器类,用于将消息值转换为字节数组。
  • acks:生产者确认消息的策略(0 = 不确认,1 = 主节点确认,all = 所有副本确认)。
  • compression.type:压缩类型(none, gzip, snappy, lz4, zstd)。
  • enable.idempotence:是否启用幂等性,防止重复发送相同消息。
  • transactional.id:事务 ID 前缀,用于支持事务功能。
  • batch.size:每个批次的最大字节数。
  • linger.ms:发送延迟,允许生产者在发送前等待更多消息以填满批次。
  • max.in.flight.requests.per.connection:在同一时间内可以发送到服务器的最大未确认请求数。
2. 消费者配置(Consumer Configuration)
  • bootstrap.servers:Kafka 集群地址列表,格式为 hostname:port。
  • group.id:消费者所属的消费组 ID,用于管理消费者的负载均衡和偏移量。
  • key.deserializer:消息键的反序列化器类,用于将字节数组转换为消息键。
  • value.deserializer:消息值的反序列化器类,用于将字节数组转换为消息值。
  • auto.offset.reset:自动偏移量重置策略(earliest = 最早可用,latest = 最新,none = 抛出异常)。
  • enable.auto.commit:是否启用自动提交偏移量;默认为 true。
  • auto.commit.interval.ms:自动提交偏移量的时间间隔,仅在启用自动提交时有效。
  • max.poll.records:每次调用 poll() 时返回的最大记录数。
  • session.timeout.ms:消费者会话超时时间,超出此时间后会被认为失效。
  • heartbeat.interval.ms:心跳间隔时间,用于与 Kafka 保持连接活跃状态。
3. 代理配置(Broker Configuration)
  • broker.id:唯一标识每个代理的 ID,通常是整数值。
  • listeners:定义代理监听客户端请求的地址和端口,例如:PLAINTEXT://localhost:9092。
  • log.dirs:日志文件存储目录,可以设置多个目录以实现数据分散存储。
  • num.partitions:新主题默认分区数,如果未指定,则使用此值创建新主题时默认分区数。
  • replication.factor:默认副本因子,当主题创建时,如果未指定副本因子,则使用此值。
  • min.insync.replicas:确保最小同步副本数量,以防止数据丢失。
  • zookeeper.connect:Zookeeper 的连接字符串,用于管理集群元数据和协调操作。
  • delete.topic.enable:是否允许删除主题;默认为 false。
  • log.retention.hours:日志保留时间(小时),超过此时间的数据将被删除。
4. 全局和其他设置
  • 连接设置
    Connection timeout: 连接超时时间设置。
    Retry count: 重试次数设置。
  • 安全性设置
    SSL configuration: SSL 配置,用于安全传输数据。
    SASL configuration: SASL 配置,用于身份验证。

springboot整合kafka教程暂且到此结束。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2232259.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Kettle的安装与使用】使用Kettle实现mysql和hive的数据传输(使用Kettle将mysql数据导入hive、将hive数据导入mysql)

文章目录 一、安装1、解压2、修改字符集3、启动 二、实战1、将hive数据导入mysql2、将mysql数据导入到hive 一、安装 Kettle的安装包在文章结尾 1、解压 在windows中解压到一个非中文路径下 2、修改字符集 修改 spoon.bat 文件 "-Dfile.encodingUTF-8"3、启动…

如何看待AI技术的应用前景?

文章目录 如何看待AI技术的应用前景引言AI技术的现状1. AI的定义与分类2. 当前AI技术的应用领域 AI技术的应用前景1. 经济效益2. 社会影响3. 技术进步 AI技术应用面临的挑战1. 数据隐私与安全2. 可解释性与信任3. 技能短缺与就业影响 AI技术的未来发展方向1. 人工智能的伦理与法…

PyQt5实战——UTF-8编码器UI页面设计以及按钮连接(五)

个人博客&#xff1a;苏三有春的博客 系类往期文章&#xff1a; PyQt5实战——多脚本集合包&#xff0c;前言与环境配置&#xff08;一&#xff09; PyQt5实战——多脚本集合包&#xff0c;UI以及工程布局&#xff08;二&#xff09; PyQt5实战——多脚本集合包&#xff0c;程序…

快速入门CSS

欢迎关注个人主页&#xff1a;逸狼 创造不易&#xff0c;可以点点赞吗 如有错误&#xff0c;欢迎指出~ 目录 CSS css的三种引入方式 css书写规范 选择器分类 标签选择器 class选择器 id选择器 复合选择器 通配符选择器 color颜色设置 border边框设置 width/heigth 内/外边距 C…

【基础】os模块

前言 1、os是operation system&#xff08;操作系统&#xff09;的缩写&#xff1b;os模块就是python对操作系统操作接口的封装。os模块提供了多数操作系统的功能接口函数。&#xff08;OS模块提供了与操作系统进行交互的函数&#xff09; 2、操作系统属于Python的标准实用程…

Linux---cp命令

Linux cp 命令 | 菜鸟教程 (runoob.com) 命令作用&#xff1a; cp命令主要用于复制文件或目录 语法: cp [options] source dest cp [选项] 源文件 目标文件 source:要复制的文件或目录的名称 dest:复制后的文件或目录的名称 注意&#xff1a;用户使用该指令复制目录时&…

MyBatis-Plus快速入门:从安装到第一个Demo

一、前言 在现代 Java 应用程序中&#xff0c;数据访问层的效率与简洁性至关重要。MyBatis-Plus 作为 MyBatis 的增强工具&#xff0c;旨在简化常见的数据操作&#xff0c;提升开发效率。它提供了丰富的功能&#xff0c;如自动生成 SQL、条件构造器和简单易用的 CRUD 操作&…

【android12】【AHandler】【3.AHandler原理篇AHandler类方法全解】

AHandler系列 【android12】【AHandler】【1.AHandler异步无回复消息原理篇】-CSDN博客 【android12】【AHandler】【2.AHandler异步回复消息原理篇】-CSDN博客 其他系列 本人系列文章-CSDN博客 1.简介 前面两篇我们主要介绍了有回复和无回复的消息的使用方法和源码解析&a…

美发系统——职员绩效和提成——调试过程

一、学会通过现象看本质 首先&#xff0c;通过现象看本质能够让技术研究者更深入地理解问题。在面对技术故障或挑战时&#xff0c;表面的现象往往只是冰山一角&#xff0c;如果只关注表象&#xff0c;可能会采取治标不治本的解决方法。而洞察本质则可以找到问题的根源&#xf…

记一次:Clickhouse同步mysql数据库

ClickHouse可以通过使用MaterializeMySQL引擎来实现与MySQL的数据同步。 前言&#xff1a;因为数据量比较大&#xff0c;既然要分库&#xff0c;为何不让clickhouse同步一下mysql数据库呢&#xff1f; 零、前期准备--mysql的查询和配置 1 查询mysql的配置状态 查询以下语句…

教程:使用 InterBase Express 访问数据库(二)

1. 添加数据模块(IBX 通用教程) 本节将创建一个数据模块(TDataModule),这是一种包含应用程序使用的非可视组件的表单。 以下是完全配置好的 TDataModule 的视图: 创建 TDataModule 后,您可以在其他表单中使用这个数据模块。 2. 添加 TDataModule 要将数据模块添加到…

Matlab实现海马优化算法(SHO)求解路径规划问题

目录 1.内容介绍 2.部分代码 3.实验结果 4.内容获取 1内容介绍 海马优化算法&#xff08;SHO&#xff09;是一种受自然界海马行为启发的优化算法&#xff0c;它通过模拟海马在寻找食物和配偶时的探索、跟踪和聚集行为来搜索最优解。SHO因其高效的全局搜索能力和局部搜索能力而…

002-Kotlin界面开发之Kotlin旋风之旅

Kotlin旋风之旅 Compose Desktop中哪些Kotlin知识是必须的&#xff1f; 在学习Compose Desktop中&#xff0c;以下Kotlin知识是必须的&#xff1a; 基础语法&#xff1a;包括变量声明、数据类型、条件语句、循环等。面向对象编程&#xff1a;类与对象、继承、接口、抽象类等。…

Unity XR Interaction Toolkit 开发教程(2):导入 SDK【3.0 以上版本】

文章目录 &#x1f4d5;课程总结&#x1f4d5;安装 Unity 编辑器与打包模块&#x1f4d5;导入 OpenXR&#x1f4d5;导入 XR Interaction Toolkit&#x1f4d5;打包发布 获取完整课程以及答疑&#xff0c;工程文件下载&#xff1a; https://www.spatialxr.tech/ 视频试看链接&a…

clickhouse运维篇(二):多机器手动部署ck集群

熟悉流程并且有真正部署需求可以看一下我的另一篇简化部署的文章&#xff0c;因为多节点配置还是比较麻烦的先要jdk、zookeeper&#xff0c;再ck&#xff0c;还有各种配置文件登录不同机器上手动改配置文件还挺容易出错的。 clickhouse运维篇&#xff08;三&#xff09;&#x…

RabbitMQ交换机类型

RabbitMQ交换机类型 1、RabbitMQ工作模型2、RabbitMQ交换机类型2.1、Fanout Exchange&#xff08;扇形&#xff09;2.1.1、介绍2.1.2、示例2.1.2.1、生产者2.1.2.2、消费者2.1.2.3、测试 2.2、Direct Exchange&#xff08;直连&#xff09;2.2.1、介绍2.2.2、示例2.2.2.1、生产…

MoonNet基准测试更新

基准测试 引言&#xff1a;为了展示MoonNet网络库支持的动态负载均衡功能&#xff0c;我进行了吞吐量测试&#xff0c;以突出其性能表现。由于该库的动态负载均衡策略包括动态线程调度&#xff0c;测试中的线程数变化是不稳定的。这种动态调整使得直接与其他库采用固定线程数的…

RHCE——笔记

nfs服务器 一、简介 NFS&#xff08;网络文件系统&#xff09; 允许网络中的计算机&#xff08;不同的计算机、不同的操作系统&#xff09;之间通过TCP/IP网络共享资源&#xff0c;主要在unix系列操作系统上使用。在NFS的应用中&#xff0c;本地NFS的客户端应用可以透明地读…

2023年SCRM系统排名分析及市场趋势解读

内容概要 当前&#xff0c;SCRM&#xff08;社交客户关系管理&#xff09;系统在企业运营中的重要性日益凸显&#xff0c;尤其是在快速发展的数字经济环境中。2023年的SCRM市场展现出多元化与专业化的趋势&#xff0c;不同企业在客户关系管理方面的需求各有不同&#xff0c;这…

上云管理之Git/GitHub/GitLab 详解(一)

上云管理之Git/GitHub/GitLab 详解(一&#xff09; 引言1. GIT软件安装2.初始化配置与提交代码2.1. 初始化配置2.2 本地仓库代码提交2.2.1 初始化仓库并提交代码2.2.2 再次提交已修改的代码2.2.3 文件夹层次结构代码提交 2.3 GIT 的文件状态 3.GIT 分支3.1. 分支的切换与删除3.…