Kafka 快速实战及基本原理详解解析-01

news2025/1/8 2:17:58

一、Kafka 介绍

1. MQ 的作用

消息队列(Message Queue,简称 MQ)是一种用于跨进程通信的技术,核心功能是通过异步消息的方式实现系统之间的解耦。它在现代分布式系统中有着广泛的应用,主要作用体现在以下三个方面:

异步处理

在传统的同步调用中,生产者和消费者需要同时在线,并且生产者在完成任务后才能继续执行其他工作。这种模式限制了系统的性能。而引入消息队列后,生产者可以将任务提交到队列中,消费者按需消费任务,从而提升系统的吞吐量。

  • 示例:快递员送快递到客户家,效率低下。而菜鸟驿站的出现让快递员只需将包裹放置在驿站,客户可以根据自己的时间安排取件。这种方式大大提高了效率。
解耦

解耦是消息队列最重要的功能之一。服务之间通过消息队列传递数据,而不是直接调用对方的服务接口,这样可以有效降低系统的耦合度。

  • 示例:《Thinking in JAVA》原书是英文版,但通过翻译社将内容翻译成多种语言,满足不同读者的需求。翻译社起到了桥梁作用,不同语言之间的沟通不再直接依赖于作者和读者。
削峰填谷

在高并发场景下,系统往往会遇到流量高峰,导致系统负载过重。通过消息队列,可以将流量暂存并按固定速率处理,从而避免系统崩溃。

  • 示例:长江每年都会涨水,但通过三峡大坝的调节,下游的出水速度保持稳定,避免了洪水泛滥。

2. 为什么要用 Kafka

Kafka 是一种高吞吐量、低延迟、分布式的消息队列系统,适合在大规模数据处理场景中使用。以下是 Kafka 的典型使用场景和优势:

日志聚合场景

在大规模分布式系统中,各个服务都会产生大量的日志信息。传统的日志收集方式往往存在以下问题:

  • 数据量大:需要快速收集和处理来自各个渠道的海量日志。
  • 容错性要求高:集群中允许少量节点出现故障而不影响整体服务。
  • 功能专注:Kafka 专注于高吞吐量、低延迟的消息传递,不追求复杂的消息处理功能。
核心优势
  • 高吞吐量:Kafka 能够处理数百万 TPS(每秒事务处理量)。
  • 低延迟:通常在毫秒级别的延迟时间内完成消息传递。
  • 可扩展性:通过增加节点和分区数量,可以线性扩展处理能力。
  • 容错性:通过副本机制保证消息的高可用性。
  • 持久化:Kafka 使用磁盘存储消息,保证消息的持久性。

二、Kafka 快速上手

1. 实验环境准备

要快速上手 Kafka,首先需要搭建实验环境。以下是推荐的实验环境配置:

  • 虚拟机数量:3 台
  • 操作系统:CentOS 7
  • Java 版本:Java 8
环境配置步骤
  1. 下载 Kafka 和 Zookeeper。
  2. 将 Kafka 解压到 /app/kafka 目录,将 Zookeeper 解压到 /app/zookeeper 目录。
  3. 配置环境变量,确保系统能够识别 Kafka 和 Zookeeper 的命令。
  4. 关闭防火墙,以避免端口阻塞:
    systemctl stop firewalld.service
    

2. 单机服务体验

为了更直观地理解 Kafka 的工作原理,我们可以先体验单机版 Kafka 服务。

步骤 1:启动 Zookeeper

Kafka 依赖 Zookeeper 进行元数据管理和选举机制。在实际部署中,通常使用独立的 Zookeeper 集群。

启动 Zookeeper 服务:

nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

检查 Zookeeper 是否正常启动:

jps

确认输出中有 QuorumPeerMain 进程。

步骤 2:启动 Kafka

启动 Kafka 服务前,需要确保 Zookeeper 服务正常运行。

启动 Kafka 服务:

nohup bin/kafka-server-start.sh config/server.properties &

确认 Kafka 是否正常启动:

jps

检查输出中是否包含 Kafka 进程。

步骤 3:创建和使用 Topic

Kafka 的基础工作机制是通过 Topic 进行消息的传递。

  1. 创建 Topic

    bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
    
  2. 发送消息 启动生产者端并发送消息:

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    > 这是一条测试消息
    
  3. 消费消息 启动消费者端并接收消息:

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    

3. 理解 Kafka 的消息传递机制

Kafka 的消息传递机制可以通过以下核心组件来理解:

  • 生产者(Producer):将消息发送到指定的 Topic。
  • 消费者(Consumer):从指定的 Topic 消费消息。
  • Topic:逻辑概念,表示一类业务消息的集合。
  • Partition:物理概念,实际存储消息的分区。
  • Broker:Kafka 服务器实例,存储和管理 Partition。

Kafka 的设计目标是通过这些组件实现高效、可靠的消息传递,满足企业级数据管道的需求。


四、Kafka 集群服务

1. 为什么要使用集群

单机部署的 Kafka 在性能上虽然已经非常出色,但在实际生产环境中通常需要使用 Kafka 集群来进一步提升数据存储能力和系统的高可用性。集群可以解决以下问题:

1.1 解决海量数据存储问题

单个 Broker 服务器的存储能力有限,当数据量增长到一定程度时,单机难以承载。通过集群部署,可以将数据分散存储在多个 Broker 中,从而提升整体存储能力。

1.2 提高系统容错能力

单机环境中,如果 Broker 崩溃,所有数据都会丢失。而集群环境下,每个 Partition 都有多个副本,即使部分 Broker 节点宕机,系统依然可以正常运行,保证数据的高可用性。


五、理解服务端的 Topic、Partition 和 Broker

Kafka 的核心架构由 Topic、Partition 和 Broker 组成,这三者之间的关系至关重要:

  • Topic:一个逻辑的消息分类,每个 Topic 包含多条消息。
  • Partition:每个 Topic 可以分成多个 Partition,每个 Partition 是一个消息队列。
  • Broker:Kafka 的服务器实例,负责存储 Partition 数据,并处理客户端请求。

5.1 创建分布式 Topic 示例

bin/kafka-topics.sh --bootstrap-server worker1:9092 --create --replication-factor 2 --partitions 4 --topic distributedTopic

5.2 查看 Topic 信息

bin/kafka-topics.sh --bootstrap-server worker1:9092 --describe --topic distributedTopic

六、章节总结:Kafka 集群的整体结构

通过前面的学习,我们可以总结 Kafka 集群的整体结构:

  1. Topic 是逻辑概念,Producer 和 Consumer 通过 Topic 进行消息传递。
  2. Partition 是实际存储单元,保证数据分散存储和负载均衡。
  3. Broker 是 Kafka 的服务器实例,存储 Partition 数据并处理客户端请求。
  4. Zookeeper 管理 Kafka 集群的元数据和选举过程。
  5. Controller 是 Kafka 集群的核心管理节点,负责管理 Topic 和 Partition 的分配。

七、Spring Boot 实现 Kafka 消息有序性

为了保证 Kafka 的消息有序性,可以使用 Spring Boot 和 Kafka 的整合来实现。在 Java 的 Spring Boot 项目中,我们通过指定消息的 Key 和自定义分区器来确保消息发送到相同的 Partition,从而实现有序性。

7.1 依赖配置

在 Maven 项目中,引入 Kafka 的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-kafka</artifactId>
</dependency>

7.2 配置 KafkaProducer

创建 Kafka 的生产者配置类:

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

7.3 发送有序消息

创建一个消息发送服务,确保消息使用相同的 Key 发送到同一个 Partition:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    private static final String TOPIC = "test_topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String key, String message) {
        kafkaTemplate.send(TOPIC, key, message);
    }
}

7.4 自定义分区器(可选)

如果有更复杂的分区逻辑,可以自定义分区器:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 自定义分区逻辑
        return Math.abs(key.hashCode()) % cluster.partitionCountForTopic(topic);
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}

7.5 设置一个 Topic 对应一个 Partition 的方法

如果业务需求是保证某个 Topic 的消息全局有序,可以在创建 Topic 时将 Partition 数量设置为 1,从而保证所有消息存储在同一个 Partition 中,实现全局有序。

创建一个 Partition 的 Topic
bin/kafka-topics.sh --create --topic singlePartitionTopic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
在 Spring Boot 中发送消息到该 Topic
@Service
public class KafkaSinglePartitionProducerService {

    private static final String TOPIC = "singlePartitionTopic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send(TOPIC, message);
    }
}

通过这种方式,所有发送到 singlePartitionTopic 的消息都会进入同一个 Partition,确保消息顺序性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2272956.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

WPS计算机二级•数据查找分析

听说这里是目录哦 通配符&#x1f30c;问号&#xff08;?&#xff09;星号&#xff08;*&#xff09;波形符&#xff08;~&#xff09; 排序&#x1f320;数字按大小排序以当前选定区域排序以扩展选定区域排序 文字按首字母排序 快速筛选分类数据☄️文字筛选数字筛选颜色筛选…

(leetcode算法题)191. 位1的个数 和 338. 比特位计数

对于一个正整数 n&#xff0c;如果对这个正整数执行 n & (n - 1); 的代码&#xff0c;将会得到一个整数m n 和 m的关系&#xff1a;n 的二进制表示 与 m的二进制表示中除了n最右边的1取值不同&#xff0c;其他都相同 比如114514 和 114512 的二进制表示中第2位&#xff0…

mongodb==安装prisma连接

官网下载mongodb,解压安装 Download MongoDB Community Server | MongoDB 修改bin/mongod.cfg # mongod.conf# for documentation of all options, see: # http://docs.mongodb.org/manual/reference/configuration-options/# Where and how to store data. storage:dbPat…

安装Cockpit服务,使用Web页面管理你的Linux服务器

说起管理 Linux 服务器&#xff0c;大家首先想到的使用 SecureCRT、Xshell、MobaXterm 等工具远程到服务器&#xff0c;然后使用命令行管理服务器。今天给大家介绍一个好玩的工具&#xff0c;名字叫Cockpit&#xff0c; Cockpit 是一个免费开源的基于 web 的 Linux 服务器管理…

Excel | 空格分隔的行怎么导入excel?

准备工作&#xff1a;windows&#xff0c;一个记事本程序和微软的Excel软件。 打开记事本&#xff0c;选中所有内容&#xff0c;按CtrlA全选&#xff0c;然后复制(CtrlC)。 在Excel中&#xff0c;定位到你想粘贴的单元格&#xff0c;按CtrlV进行粘贴。粘贴后&#xff0c;你会在…

深度评测uni-app x:开启跨平台开发新篇章

文章目录 一、引言1.1 跨平台开发的崛起1.2 uni-app x 初印象 二、uni-app x 核心特性评测2.1 uts 语言&#xff1a;跨平台编程新利器2.2 uvue 渲染引擎&#xff1a;原生渲染新体验2.3 强大的组件和 API 支持2.4 插件生态&#xff1a;拓展无限可能 三、与 uni-app 对比&#xf…

Mac修改文件权限

查看文件权限 ll -all 修改读写权限 sudo chmod -R arwx /usr/local/mysql-5.7.30-macos10.14-x86_64/data/a_test 修改用户分组 sudo chown -R _mysql:wheel /usr/local/mysql-5.7.30-macos10.14-x86_64/data/b_test

计算机网络——网络层—路由算法和路由协议

一、因特网的路由选择协议 • 不存在一种绝对的最佳路由算法。 • 所谓“最佳”只能是相对于某一种特定要求下得出的较为合理的选择而已。 • 实际的路由选择算法&#xff0c;应尽可能接近于理想的算法。 • 路由选择是个非常复杂的问题 • 它是网络中的所有结点共同协调工…

Linux运维相关基础知识(二)

系列文章目录 Linux常用命令 linux 账号管理与权限设定 Linux运维相关基础知识 文章目录 系列文章目录前言1. 自动任务执行at 与 atdcrontab 与 crond 2. SELinuxtty多任务管理与进程管理相关的命令/proc/* 文件的意义SELinux 3. 守护进程早期SystemV的init管理行为中daemon…

java开发springoot

阅读理解 命令之间空一行&#xff1a;表示前面的是配置 红色背景&#xff1a;表示待验证蓝色背景&#xff1a;表示常用或推荐绿色背景&#xff1a;注意/推荐 json 转 对象 import com.fasterxml.jackson.databind.ObjectMapper; public DebangResp convertJsonToObject(Stri…

MLU上使用MagicMind GFPGANv1.4 onnx加速!

文章目录 前言一、平台环境准备二、环境准备1.GFPGAN代码处理2.MagicMind转换修改env.sh修改run.sh参数解析运行 3.修改后模型运行 前言 MagicMind是面向寒武纪MLU的推理加速引擎。MagicMind能将人工智能框架&#xff08;TensorFlow、PyTorch、Caffe与ONNX等&#xff09;训练好…

Nginx——入门介绍、安装与核心配置文件结构(一/五)

目录 1.Nginx 简介1.1.背景介绍1.2.名词解释1.3.常见服务器对比1.3.1.IIS1.3.2.Tomcat1.3.3.Apache1.3.4.Lighttpd1.3.5.其他的服务器 1.4.Nginx 的优点1.4.1.速度更快、并发更高1.4.2.配置简单&#xff0c;扩展性强1.4.3.高可靠性1.4.4.热部署1.4.5.成本低、BSD 许可证 1.5.Ng…

nginx-限流(请求/并发量)

一. 简述&#xff1a; 在做日常的web运维工作中&#xff0c;难免会遇到服务器流量异常&#xff0c;负载过大等情况。恶意攻击访问/爬虫等非正常性请求&#xff0c;会带来带宽的浪费&#xff0c;服务器压力增大&#xff0c;影响业务质量。 二. 限流方案&#xff1a; 对于这种情…

【学Rust开发CAD】1 环境搭建

文章目录 一、搭建C/C编译环境二、安装Rust三、配置 PATH 环境变量四、验证安装结果五、安装编辑工具 一、搭建C/C编译环境 Rust 的编译工具依赖 C 语言的编译工具&#xff0c;这意味着你的电脑上至少已经存在一个 C 语言的编译环境。如果你使用的是 Linux 系统&#xff0c;往…

模型创新、论文复现、科研辅导、论文代码定制

建模先锋团队长期致力于为用户提供优质的代码定制服务。团队提供全网最低价格的服务&#xff0c;同时保证高性价比和高质量的代码交付&#xff0c;为您提供个性化定制的服务。 以下是定制服务范围&#xff1a; 通过深度学习和信号处理技术&#xff0c;我们能够针对不同行业和场…

基于云效 Windows 构建环境和 Nuget 制品仓库进行 .Net 应用开发

作者&#xff1a;陆冬澄、周静 在现代软件研发体系中&#xff0c;.NET 平台由于其强大的功能、灵活性和丰富的开发工具&#xff0c;成为了构建 Windows 应用程序的热门选择。无论是桌面应用、Web 应用还是服务应用&#xff0c;.NET 提供了一系列强大的框架和工具&#xff0c;帮…

用VS C#构建Windows服务【纯操作版,附带项目地址】

1&#xff0e;点击“创建新项目”&#xff0c;选择“Windows 服务&#xff08;.NET Framework&#xff09;” 2、给项目命名 3、双击“Service1.cs”&#xff0c;右键&#xff0c;选择“添加安装程序”&#xff0c;就会生成一个“ProjectInstaller.cs”文件 4、双击“P…

KUKA机器人如何修改程序并下载到机器人控制器中?

KUKA机器人如何修改程序并下载到机器人控制器中? 如下图所示,首先将使用的网卡的IP地址设置为自动获得, 打开workvisual软件,点击搜索,正常情况下可以搜索到项目文件,选中后双击进入, 如下图所示,此时,workvisual会自动从机器人控制器中下载项目文件到电脑上,耐心等待…

L28.【LeetCode笔记】移动零(三种解法)

目录 1.题目 2.向前覆盖法 分析 代码 提交结果 3.优解:双指针 代码 提交结果 4.其他不符合题意的方法:使用队列 代码 提交结果 1.题目 https://leetcode.cn/problems/move-zeroes/description/ 给定一个数组 nums&#xff0c;编写一个函数将所有 0 移动到数组的末尾…

js逆向实战(1)-- 某☁️音乐下载

下载某云音乐源文件.mp4格式 首先随便点进一首歌&#xff0c;如图所示获取该音乐id&#xff0c;然后点击播放键&#xff0c;打开F12进行查询XHR 由此可知&#xff0c;实际请求网址是 https://music.163.com/weapi/song/enhance/player/url/v1?csrf_token「你的token」url需带…