【2023】kafka入门学习与使用(kafka-2)

news2025/1/17 4:02:40

目录💻

  • 一、基本介绍
    • 1、产生背景
    • 2、 消息队列介绍
      • 2.1、消息队列的本质作用
      • 2.2、消息队列的使用场景
      • 2.3、消息队列的两种模式
      • 2.4、消息队列选型:
  • 二、kafka组件
    • 1、核心组件概念
    • 2、架构
    • 3、基本使用
      • 3.1、消费消息
      • 3.2、单播和多播消息的实现
    • 4、主题和分区
      • 4.1、主题Topic
      • 4. 2、分区partition
      • 4.3、集群、副本、分区和topic的关联
      • 4.4、关于分区消费组消费者的细节
      • 4.5、kafka集群中的controller、rebalance、HW
  • 三、Kafka问题与优化方案
    • 1、如何防止消息丢失
    • 2、如何防止重复消费
    • 3、如何做到顺序消费
    • 4、解决消息积压问题
    • 5、实现延迟队列的效果

一、基本介绍

Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台(2.8版本以后可以不需要依赖)。目前已经成为 Apache 软件基金会的顶级项目。它被设计用于处理大规模的实时数据流,并具有高吞吐量、低延迟、高可靠性和可扩展性等特点。

kafka如果还没安装的可以看下面的文章
🍅kafka-linux和docker安装

1、产生背景

Apache Kafka 的产生背景可以追溯到 LinkedIn 公司在处理大规模实时数据流时遇到的挑战。在过去,LinkedIn 需要处理大量的实时数据,例如用户活动、网站指标、日志记录等。为了应对这些数据的高吞吐量和低延迟需求,LinkedIn 开发了 Kafka 来解决以下几个主要问题:

  • 数据管道:LinkedIn 需要一种可靠的方式来收集和传输实时数据流,以支持各种数据处理和分析任务。

  • 数据持久性:对于一些重要的数据,LinkedIn 需要一种持久性存储方案,以便即使在系统故障或重启后,数据也不会丢失。

  • 扩展性:LinkedIn 面临着数据量不断增长的挑战,因此需要一种能够水平扩展的数据处理系统,以满足不断增长的需求。

  • 实时性:对于某些业务场景,LinkedIn 需要能够实时地处理和分析数据流,以及及时地发现和响应问题。

2、 消息队列介绍

2.1、消息队列的本质作用

消息队列:用于存放消息的一个组件

  • 消息队列最主要的作用其实是用于帮我们解决通信问题,通过内部封装,定义规范帮我们实现简单异步通信;
  • 消息队列一般也被用作临时处理信息的一个组件;

2.2、消息队列的使用场景

  1. 系统解藕:降低两个系统之间的直接耦合度
  2. 流量削峰:大量大流量到来通过消息队列循序渐进的取出,避免这些流量全部直接到达数据库
  3. 日志处理(大数据领域常用)
  4. 异步处理:相对于同步通行来说,异步的发生,可以让上游快速成功,极大提高了系统的吞吐量。而且在分布式系统中,通过下游多个服务的分布式事务的保障,也能保障业务执行之后的最终一致性
    在这里插入图片描述

2.3、消息队列的两种模式

分类方式一:

  • 点对点模式
    • 一对一
  • 发布订阅模式
    • 一个生产者生产消息,多个订阅该主题的消费者可以获取到消息

分类方式二:

  • 有Broker:
    • 重Topic:Kafka、RocketMQ、ActiveMQ
      • 整个broker,依据topic来进行消息的中转,在重topic的消息队列里必然需要topic来实现
    • 轻Topic:RabbitMQ
      • topic只是一种中转模式
  • 无Broker:
    • 在生产者和消费者之间没有broker,例如zeroMQ,直接使用socket进行通信
      在这里插入图片描述

2.4、消息队列选型:

  • rabbitMQ:内部的可玩性(功能性)是非常强的,但不是分布式的消息队列
  • rocketMQ:阿里出品,根据kafka的内部执行原理,手写的一个消息中间件,性能可以和kafka相比肩
  • kafka:全球消息处理性能最快的一款MQ
  • zeroMQ:ZeroMQ 是一款更加轻量级和灵活,适用于构建简单而高效的分布式和并发应用程序,无需中心化的消息代理。

二、kafka组件

1、核心组件概念

名称解释
Broker(节点)消息中间件处理节点,一个kafka节点就是一个broker,一个或者多个Broker可以组成一个kafka集群
Topic(主题)kafka根据topic对消息归类,发布到kafka集群的每条消息都需要指定一个topic
Producer(生产者)消息生产者,向Broker发送消息的客户端
Consumer(消费者)消息消费者,从Broker读取消息的客户端
ConsumerGroup每个consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个Consumer Group中只能有一个Consumer能够消费该消息(最后连接的那一个Consumer)
Partition(分区)物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的
offset(偏移量)消费者消费到的下标位置

基础组件图
在这里插入图片描述

2、架构

Kafka 的架构主要由以下几个核心组件组成:

  • Producer(生产者):负责将消息发布到 Kafka 集群中的主题(topic)。
  • Broker(代理):Kafka 集群中的每个节点都是一个代理,用于存储和管理消息。
  • Topic(主题):消息被发布到特定的主题中,每个主题可以分成多个分区(partition)。
  • Partition(分区):每个主题可以分成多个分区,每个分区在物理上都是一个独立的日志文件。
  • Consumer Group(消费者组):消费者组由一组消费者组成,每个消费者都从特定的分区中读取消息。
  • ZooKeeper:Kafka 使用 ZooKeeper 进行集群管理和协调,用于领导者选举、存储集群元数据等。

3、基本使用

3.1、消费消息

  • 方式一:从当前主题中的最后一条消息的offset(偏移量位置)+1开始消费

    ./kafka-console-consumer.sh --bootstrap-server ip:port --topic test
    
  • 方式二:从当前主题中的第一条消息开始消费

    ./kafka-console-consumer.sh --bootstrap-server ip:port --from-beginning --topic test
    

物理层面存储流程图
在这里插入图片描述

物理存储消费流程

  • 生产者将消息发送给broker,broker会将消息保存在本地的日志文件中

    • kafka-logs/主题-分区/00000000.log
  • 消息的保存是有序的,通过offset偏移量来描述消息的有序性

  • 消费者消费消息时也是通过offset来描述当前要消费的那条消息的位置

3.2、单播和多播消息的实现

  • 单播消息:在一个kafka中一个topic只有一个消费组订阅
  • 多播消息:在一个kafka中一个topic中有多个消费组订阅

在这里插入图片描述

4、主题和分区

4.1、主题Topic

主题-topic44在kafka中是一个逻辑的概念,kafka通过topic将消息就行分类。不同的topic会被订阅该topic的消费者消费。

所以就会出现一个问题,消息可能会非常多,多到需要几T来存,因为消息是会被保存到log日志文件中的,为了解决这个文件过大的问题,kafka提出了partition分区的概念

4. 2、分区partition

  1. 分区的概念

通过partition将一个topic中的消息分区来存储。这样的好处有:

  • 分区存储,可以解决统一存储文件过大的问题
  • 提供了读写的吞吐量:读和写可以同时在多个分区进行(并行读写)

在这里插入图片描述

  1. 创建多分区的主题
./kafka-topics.sh --create --zookeeper 192.168.8.62:3333 --replication-factor 1 --partitions 2 --topic my-replicated-topic2
  1. kafka中消息日志文件中保存的内容
  • 00000.log:这个文件中保存的就是消息数据

  • *consumer*offsets-49:

    kafka内部自己创建了*consumer*offsets主题包含了50个分区。这个主题用来存放消费者消费某个主题的偏移量。因为每个消费者都会自己维护着消费的主题的偏移量,也就是说每个消费者会把消费的主题的偏移量自主上报给kafka的默认主题:

    *consumer_*offsets。为了提高并发性设置了50个主题

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

4.3、集群、副本、分区和topic的关联

在创建主题时,除了指明主题的分区数以外,还指明了福本数,那么副本是一个什么概念?

  • 副本为了为主题中的分区创建多个备份,多个副本在kafka集群中,会有一个副本为leader,其他是follower
  • leader:用作接收发送消息,并且负责把数据同步给follower,当leader挂了之后,会从follower中选举产生一个新的leader
  • follower:普通的partition
  • Isr:可以同步会已经、同步的节点会被存入到isr集合中。如果isr中的节点性能较差,会被剔出isr集合

在这里插入图片描述

4.4、关于分区消费组消费者的细节

在这里插入图片描述

  • 一个partiton只能被一个消费组中的一个消费者消费
  • partition的数量决定了消费组中消费的数量,建议同一个消费组中的消费者的数量不要超过partition的数量,否则多的消费者消费不到消息
  • 如果消费者挂了,那么会触发redalance机制,会让消费组中的其他消费者来消费该分区

4.5、kafka集群中的controller、rebalance、HW

  • controller(控制器):每个brocker启动时会向zk创建一个临时序号节点,获得的序号最小的那个brroker将会作为集群中的controller,负责下面几件事情:

    • 当集群中有一个副本的leader挂掉,需要在集群中选举出一个新的leaber,选举的规则是从isr集合中最左边获得
    • 当集群中有broker新增或者减少,controller会同步信息给其他broker
    • 当集群中有分区新增或者减少,controller会同步信息给其他broker
  • rebalance机制:

    • 前提:消费组中的消费者没有指定分区
    • 触发的条件:当消费组中的消费者和分区的关系发生变化的时候
    • 分区分配的策略:在rebalance之前,分区怎么分配会有这么三种策略
      • range:根据公式计算得到每个消费者消费哪个分区:前面的消费者是分区总数/消费者数量+1最后的消费者是分区总数/消费者数量
      • 轮训:大家轮着来
      • sticky:粘合策略,如果需要rebalance,会在之前已分配的基础上调整,不会改变之前的分配情况。如果这个策略没有开,那么就要进行全部的重新分配。建议开启

    在这里插入图片描述

  • HW和LEO:

    • LEO:LEO是每个副本最后消息的消息位置(log-end-offset)
    • HW:HW是已完成同步的位置,消息在写入broker时,且每个broker完成这条消息的同步后,hw才会发生变化往后移动。在这之前消费者是消费不到LEO位置的消息的,在同步全部完成之后,HW就会更新,更新之后,消费者才能消费到最新的这条消息,这样的目的主要是为了防止消息丢失。

    在这里插入图片描述

三、Kafka问题与优化方案

1、如何防止消息丢失

  • 生产者:
    • 使用同步发送
    • 把ack设置成1或者all,并且设置同步的分区数≥2
  • 消费者:
    • 把自动提交改为手动提交

2、如何防止重复消费

在防止消息丢失的方案中,如果发送者发送完消息后,因为网络抖动,没有收到ack,但实际上broker已经收到了。此时生产者会进行重试,于是broker就会收到多条相同的消息,而造成消费者的重复消费

解决办法:

  • 生产者关闭重试:会造成丢消息(不建议)
  • 消费者解决非幂等性消费问题:
    所谓的幂等性:多次访问的结果是一样的。对于rest的请求(ge(幂等)、post(非幂等)、put(幂等))
    • 在数据库中创建联合主键,防止相同的主键创建出多条记录
    • 使用分布式锁,以业务id为锁保证只有一条记录能够创建成功

在这里插入图片描述

3、如何做到顺序消费

  • 生产者:保证消息按顺序消费,且消息不丢失一一使用同步的发送,ack设置成非0的值。
  • 消费者:主题只能设置一个分区,消费组只能有一个消费者

4、解决消息积压问题

  • 消息积压问题的出现
    • 消息的消费者的消费速度远赶不上生产者的生产消息的速度,导致kafka中有大量的数据没有被消费,随着没有被消费的数据堆积越多,消费者寻址的性能会越来越差,最后导致整个kafka对外提供的服务的性能很差,从而造成其他服务也访问速度变慢,造成服务雪崩。
  • 消息挤压的解决方案
    • 在这个消费者中,使用多线程,充分利用机器的性能进行消费消息。
    • 创建多个消费组,多个消费者,部署到其他机器,一起消费,提高消费者的消费速度
    • 创建一个消费者,该消费者在kafka另建一个主题,配上多个分区,多个分区再配上多个消费者。该消费者将poll下来的消息,不进行消费,直接转发到新建的主题上,此时,新的主题的多个分区的多个消费者就开始一起消费了。 ———不常用

在这里插入图片描述

5、实现延迟队列的效果

  • 应用场景

    订单创建后,超过30分钟没有支付,取消订单

  • 具体方案

    1. kafka中创建相应的主题
    2. 消费者消费该主题的消息(轮训)
    3. 消费者消费消息时判断消息的创建时间和当前时间是否超过30分钟(前提是订单没支付)
      1. 如果是:去数据库中修改订单为已取消
      2. 如果否:记录当前消息的offset,并不再继续消费之后的消息。等待1分钟后,再次向kafka拉取该offset及之后的消息,继续进行判断,以此反复

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1557690.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

QT:如何在程序密集响应时,界面不会卡住?

前因: 当调用QApplication::exec()时,就启动了QT的事件循环。在开始的时候QT会发出一些事件命令来显示和绘制窗口部件。 在这之后,事件循环就开始运行,它不断检查是否有事件发生并且把这些事件发生给应用程序的QObject。 当处理…

HarmonyOS 应用开发之FA模型绑定Stage模型ServiceExtensionAbility

本文介绍FA模型的三种应用组件如何绑定Stage模型的ServiceExtensionAbility组件。 PageAbility关联访问ServiceExtensionAbility PageAbility关联访问ServiceExtensionAbility和PageAbility关联访问ServiceAbility的方式完全相同。 import featureAbility from ohos.ability…

Git 核心知识

2024年3月30日 Git 安装 官网下载,Git 选择合适的版本,无脑下一步即可。 安装成功之后,鼠标右键任意的文件夹,会出现 Git GUI 的选项,即安装成功 安装注意事项 安装前,检查环境变量 , 如果…

自定义SpringSecurity异常格式

今天发现spring的异常格式没有跟着mvc的错误格式走,场景是用户权限的时候。查了一下原来是springsecurity定义了一组filter作用在了mvc上层,因此需要处理一下错误格式。 处理前错误返回信息如下: 由于使用了多语言,因此错误格式也…

设计模式9--单例模式

定义 案例一 案例二 优缺点

8LS Three-phase Synchronous 电机Motors MAMOT2-ENG 安装调试接线等说明 146页

8LS Three-phase Synchronous 电机Motors MAMOT2-ENG 安装调试接线等说明 146页

4月1日起,未备案App小程序将下架

关注卢松松,会经常给你分享一些我的经验和观点。 最后2天了、最后2天了。 从2024年4月1日起,工信部要求所有的APP、小程序都要备案,否则下架、关停、限制更新。这是去年8月份出的新规,没想到十个月这么快就过去了。 现在广东省…

JDK和IntelliJ IDEA下载和安装及环境配置教程

一、JDK下载(点击下方官网链接) Java Downloads | Oracle 选择对应自己电脑系统往下拉找到自己想要下载的JDK版本进行下载,我下的是jdk 11,JDK有安装版和解压版,我就直接下安装版的了。 .exe和.zip的区别&#xff1a…

链表(下)

0.上篇引入: 顺序表存在一些问题 1.顺序在插入的数据的时候数据需要进行移动势必会降低效率 2. 扩容存在性能的消耗 3.扩容会存在内存的浪费 所以引入了链表这种更好用的数据存储(由独立的节点组成) 1. 头删 2.查找 3.1指定位置之前插入…

sqli第24关二次注入

注入点 # Validating the user input........$username $_SESSION["username"];$curr_pass mysql_real_escape_string($_POST[current_password]);$pass mysql_real_escape_string($_POST[password]);$re_pass mysql_real_escape_string($_POST[re_password]);if($p…

Adaboost集成学习 | Matlab实现基于ELM-Adaboost极限学习机结合Adaboost集成学习时间序列预测(股票价格预测)

目录 效果一览基本介绍模型设计程序设计参考资料效果一览 基本介绍 基于ELM-Adaboost极限学习机结合Adaboost集成学习时间序列预测(股票价格预测) 单变量时间序列单步预测。 ELM(Extreme Learning Machine,极限学习机)和AdaBoost(Adaptive Boosting,自适应提升)都是机…

文件和文件操作(C语言)

1.什么是文件? 磁盘或者是说硬盘上的文件就是文件。在程序设计中,我们⼀般谈的文件有两种:程序文件、数据文件。 2.文件名 ⼀个文件要有⼀个唯⼀的文件标识,以便识别和引用。 例如: c:\ files\ test.txt 3. ⼆进制…

搜索与图论——Kruskal算法求最小生成树

kruskal算法相比prim算法思路简单,不用处理边界问题,不用堆优化,所以一般稀疏图都用Kruskal。 Kruskal算法时间复杂度O(mlogm) 每条边存结构体里,排序需要在结构体里重载小于号 判断a,b点是否连通以及将点假如集合中…

【教程】最新可用! Python实现QQ扫码登录的群验证

转载请注明出处:小锋学长生活大爆炸[xfxuezhang.cn] 目录 背景说明 使用效果 参考代码 扫码登录(备份) 背景说明 Q群验证就是为了验证某个用户是否加入了指定的群聊。这可以有很多作用,比如限制软件的使用人群,以防滥用。 使用效果 参考代码 from PyQt5.QtCore import …

PinkysPalaceV2靶场详解IDA逆向查看缓存区溢出漏洞原理以及使用kali gdb使用超详细三次提权字典生成

下载链接: Pinkys Palace: v2 ~ VulnHub 安装: 正常用vm虚拟机打开即可,注意导入时所选择的硬盘存储目录应为空目录,否则会导入失败 根据下载链接提示我们需要更改host文件,以便于我们可以正常访问 kali中的host文件位置为 /etc/h…

Linux中常用命令(文件、目录和文件压缩)及功能示例

一、Linux关于文件与目录的常用命令及其功能示例 命令: ls 全名: List (列表) 常用选项: -l: 详细列表格式,显示详细信息。-a: 显示所有文件,包括隐藏文件。 功能: 列出目录内容。 示例: ls -la /home 此命令以详细格式列出/home目录中的所有文件&#x…

css3之2D转换transform

2D转换transform 一.移动(translate)(中间用,隔开)二.旋转(rotate)(有单位deg)1.概念2.注意点3.转换中心点(transform-origin)(中间用空格)4.一些例子(css三角和旋转) 三…

VAE——生成数字(Pytorch+mnist)

1、简介 VAE(变分自编码器)同样由编码器和解码器组成,但与AE不同的是,VAE通过引入隐变量并利用概率分布来学习潜在表示。VAE的编码器学习将输入数据映射到潜在空间的概率分布的参数,而不是直接映射到确定性的潜在表示…

Astro 宣布:将超过 500 多个测试从 Mocha 迁移到了 Node.js

近期,Astro 在其官方博客中宣布,虽然我们对 Mocha 感到满意,但也在寻求让我们的 CI 作业更快的方法。最终将超过 500 多个测试从 Mocha 迁移到了 Node.js。 先了解下 Astro 是什么?Astro 是适合构建像博客、营销网站、电子商务网站…

2_1.Linux中的网络配置

#1.什么是IP ADDRESS# internet protocol ADDRESS ##网络进程地址 ipv4 internet protocol version 4 ip是由32个01组成 11111110.11111110.11111110.11111110 254.254.254.254 #2.子网掩码# 用来划分网络区域 子网掩码非0的位对应的ip上的数字表示这个ip的网络位 子网掩码0位…