Kafka基础/1

news2025/1/22 21:09:34

Kafka

概念

Kafka 是一个分布式的流媒体平台。

应用:消息系统、日志收集、用户行为追踪、流式处理

特点:高吞吐量、消息持久化、高可靠性、高扩展性

术语

  • broker:Kafka 的服务器,Kafka 当中每一台服务器,我们称其为broker

  • Zookeeper:管理集群(Kafka 内置的 Zookeeper)

  • Topic:点对点(很像阻塞队列),发布订阅模式(Kafka采用的。生产者把消息放到某个位置,然后很多个消费者同时关注这个位置,订阅这个位置,然后读取消息,这个时候这个消息可以被多个消费者同时读到或者先后读到,相当于一个文件夹,存放消息的位置)

  • Partition:分区,是对主题这个位置的一个分区,可以把主题分为n个区域,然后就可以采用多线程的方式同时向这n个分区里面写数据,增强并发能力。每一个它是从前往后按照顺序往队尾里追加数据的,然后按照索引顺序读取数据。

  • offset:这个消息在这个分区内存放的这个索引序列

  • Leader Replica:主副本,当想从分区中读取数据的时候,主副本可以给出数据,处理请求

  • Follower Replica:从副本,只是备份,只是从主副本备份数据,不负责做响应。当主副本挂掉,集群就会从从副本中选一个新的作为主副本

具体使用:

创建主题:
在这里插入图片描述

Kafka 默认端口90922

生产者:
在这里插入图片描述

消费者:
在这里插入图片描述

Spring 整合 Kafka

配置的时候:

  • server:项目中的 server 只有一个
  • 消费者的组
  • 是否自动提交
  • 自动提交的频率

Kafka 小样例:

一、首先定义一个对象,对事件进行封装

package com.conquer.community.entity;

import java.util.HashMap;
import java.util.Map;

public class Event {

    private String topic;  //主题/事件的类型
    private int userId;  //事件触发的人
    private int entityType;  //事件发生在哪个实体(帖子、点赞、评论)
    private int entityId;  //实体的 ID
    private int EntityUserId;   //实体的作者
    private Map<String, Object> data = new HashMap<>();  //把其它的额外的数据全都存到这个 map 中,这样就具有了一定的扩展性

    public int getEntityId() {
        return entityId;
    }

    public Event setEntityId(int entityId) {
        this.entityId = entityId;
        return this;  //有了一些改动,好处:当用了 set 方法之后,当我 set 了 topic 我当然还要 set 其它的属性,当我set了这个属性之后,又返回这个对象,我又可以调用当前对象的其它 set 方法,所以我们写的时候可以 event.setTopic().setUserId.set...(链式编程) 就比较方便
    }

    public String getTopic() {
        return topic;
    }

    public Event setTopic(String topic) {
        this.topic = topic;
        return this;
    }

    public int getUserId() {
        return userId;
    }

    public Event setUserId(int userId) {
        this.userId = userId;
        return this;
    }

    public int getEntityType() {
        return entityType;
    }

    public Event setEntityType(int entityType) {
        this.entityType = entityType;
        return this;
    }

    public int getEntityUserId() {
        return EntityUserId;
    }

    public Event setEntityUserId(int entityUserId) {
        EntityUserId = entityUserId;
        return this;
    }

    public Map<String, Object> getData() {
        return data;
    }

    public Event setData(String key, Object value) {
        this.data.put(key, value);
        return this;   //这样的话一次传一个值,还想传的话继续点就可以
    }
}

Kafka 零拷贝原理

在实际应用中,如果我们需要把磁盘中的某个文件内容发送到远程服务器上,那么它必须要经过这样几个拷贝过程
在这里插入图片描述

1、从磁盘中读取目标文件内容拷贝到内核缓冲区

2、CPU 控制器再把内核缓冲区的数据复制到用户空间的缓冲区中

3、在应用程序中,调用 write() 方法,把用户空间缓冲区中的数据拷贝到内核下的 Socket Buffer 中

4、把在内核模式下的 SocketBuffer 中的数据复制到网卡缓冲区,网卡缓冲区再把数据传输到目标服务器上

这这个过程中我们可以发现,数据从磁盘到最终发送出去,要经历四次拷贝,而这四次拷贝过程中,有两次拷贝是浪费的。分别是:

  • 从内核空间复制到用户空间
  • 从用户空间再次复制到内核空间

除此之外,由于用户空间和内核空间的切换,会带来上下文的一个切换,对 CPU 的一个性能也会造成一些影响。而零拷贝就是把这两次多余的拷贝省略掉,应用程序可以直接把磁盘中的数据从内核直接传送到 Socket,而不再需要经过应用程序所在的用户空间。

**零拷贝通过 DMA 技术把文件内容复制到内核空间的 Read Buffer,接着,把包含数据位置和长度信息的文件描述符加载到 Socket Buffer 中,DMA 引擎直接把数据从内核空间传递给网卡设备。**在这样的一个流程中,数据只经历了两次拷贝就发送到了网卡中,并且减少了两次 CPU 的上下文切换,极大的提高了效率。

结论:

所谓零拷贝,并不是完全没有数据复制,只是相对于用户空间来说,不再需要进行数据拷贝。对于前面说的整个流程来说,零拷贝只是减少了不必要的拷贝次数而已。

实现零拷贝:
  • 在 Linux 中,零拷贝技术依赖于低层的 sendfile() 方法实现

  • 在 Java 中,FileChannal.transferTo() 方法的底层实现就是 sendfile() 方法

  • mmap 文件映射机制:原理 —— 将磁盘文件映射到内存,用户通过修改内存就能修改磁盘文件。使用这种方式可以获取很大的 I/O 提升,省去了用户空间到内核空间复制的开销。

ISR 机制

ISR 机制是 Kafka 保障数据可靠性的关键概念之一。从三个方面来说:

1、ISR 的概念

ISR 代表着一组与 Topic 分区 Leader 保持同步的 Follower 分区的副本

2、ISR 的工作原理

ISR 列表中的副本会定期向 Leader 同步数据,确保数据的一致性。只有 ISR 列表中的副本才会参与数据的同步操作,如果某个副本无法及时同步,那么它将会从 ISR 列表中移除。

3、ISR 的重要性

ISR 机制对于数据的可靠性非常重要,当某个副本无法及时同步或者发生故障的时候,Kafka 会从 ISR 列表中去选择另外一个副本作为 Leader,确保高可用性和数据的一致性。

总的来说,Kafka 的 ISR 机制是确保数据可靠性和一致性的关键概念,它通过维护与 Leader 同步的副本列表去确保数据的及时同步和可靠性。

Kafka 快在哪里

Kafka 是一个分布式流处理组件,它设计之初,就是用在一些高吞吐量的数据流应用和数据管道上,所以在早期很多互联网企业对于一些高吞吐量需求的场景清一色采用了 Kafka,在早期很多场景中,大部分用的是 ActiveMQ,发现性能是在满足不了需求,就直接切到 Kafka 上。

有几个比较好的设计:

1、数据的分片存储:Kafka 采用了 partition 的物理存储机制,把一个 topic 中的消息,分成多个数据分片,类似于分库分表逻辑

2、消息的持久化存储方案中,Kafka 的消息都是顺序追加的方式存储到磁盘上的。利用了磁盘的数据写入,减少了磁盘寻道的时间;再结合批量刷盘的操作,节省了磁盘的 IO 次数。

3、Kafka 使用操作系统的零拷贝机制来优化数据传输。这意味着再将消息从磁盘发送到网络接口的时候,可以减少数据的拷贝次数

4、Kafka 整体架构的可扩展性很强。它可以再不停机的情况下进行水平扩容,我们可以动态地添加更多的 Kafka Broker 以及分区来处理更高的负载

5、Kafka 为每个分区维护了一套简洁高效的索引。使得即使在非常大的数据量下,也能快速定位和检索消息。

这些特性共同工作使得 Kafka 在消息传输方面非常高效,特别适合需要处理大量数据流的场景。比如说日志搜集、事件源、实时分析、监控系统等

Kafka 中的 Partition 分区副本的 Leader 选举算法

Kafka 首先会选择一个具有最新数据的副本作为新的 Leader,也就是 ISR 集合的副本。其中 ISR 集合指的是与 Leader 同步的副本集合,也就是说,它们的数据同步状态与 Leader 最接近,并且它们与 Leader 副本的网络通信的延迟是最小的。

如果 ISR 集合没有可用副本,Kafka 会从所有副本中去选择一个具有最新数据的副本作为新的 Leader。在这种情况下选举出来的 Leader 由于和原来老的节点的数据存在较大的延迟,所以可能会造成一部分的数据丢失的情况。

所以 Kafka 设计者把这个功能开关的选择交给了开发人员,如果愿意接受这种情况,可以通过 unclean.leader.election.enable 这样一个参数来设置,开启之后虽然会造成数据丢失,但是至少可以保证依然能够对外提供服务,保证可用性。

MQ 如何保证消费顺序性

这个问题我想从两个方面来回答,一个是 Kafka 为什么会存在无序消费,第二个是 Kafka 如何保证有序消费

Kafka 为什么会存在无序消费

在 Kafka 的整个架构里面,用到了 Partition 的分区机制 实现消息的物理存储。也就是说在同一个 topic 里面可以维护多个 Partition 来去实现消息的一个分片。那么生产者在发送消息的时候会根据消息的 key 来进行取模,来决定把当前的消息存储到哪一个 Partition 里面。

而且消息是按照先后有序的去存储在 Partition 里面。假设一个 topic 里面有三个 Partition,而消息正好被路由到三个独立的 Partition 里面,然后消费端有三个消费者去通过 balance 的一个机制去分别指派了对应的消费分区,因为消费者是完全独立的网络节点,所以可能会出现消息的消费顺序不是按照发送顺序来实现的,从而导致消息的消息乱序的一个问题。

解决方法

自定义消息分区的一个路由算法,然后把指定的 key 都发送到同一个 Partition 里面,然后我们去指定一个消费者专门去消费某一个分区的数据,这样的话就保证了消息的顺序消费了。

另外在有些设计方案里面,在消费端会采用异步线程的方式来消费数据,以提高消息的处理效率。在这样的情况下,因为每个线程的消息处理效率是不同的。所以即便是采用单个分区的存储和消费也可能会出现无序访问的一个问题。对这样的问题的解决方法就是在消息消费端采用一个阻塞队列,把获取到的消息,先保存到阻塞队列里面,然后采用异步线程从阻塞队列里面去获取消息来进行消费。

参考 by B站 跟着Mic 学架构

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1581612.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Visual Studio Code 终端为管理员权限

第一部 1、 Visual Studio Code 快捷方式启动选项加上管理员启动 第二步 管理员方式运行 powershell Windows 10的任务栏自带了搜索。或者开始菜单选搜索只需在搜索框中输入powershell。 在出来的搜索结果中右击Windows PowerShell&#xff0c;然后选择以管理员方式运行。 执…

《前端面试题》- JS基础 - call()、apply()、bind() 的区别

call 、bind 、 apply 这三个函数的功能都是改变this的指向问题&#xff0c;但是也存在一定的区别。 call 的参数是直接放进去的&#xff0c;第二第三第 n 个参数全都用逗号分隔,apply 的所有参数都必须放在一个数组里面传进去bind 除了返回是函数以外&#xff0c;它 的参数和…

Training - 使用 WandB 配置 可视化 模型训练参数

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://blog.csdn.net/caroline_wendy/article/details/137529140 WandB (Weights&Biases) 是轻量级的在线模型训练可视化工具&#xff0c;类似于 TensorBoard&#xff0c;可以帮助用户跟踪…

js语法---简单理解promise

promise语法结构 创建一个promise对象 let p new Promise(function(resolve,reject){// 执行的操作...// 判断操作的结果并执行对应的回调函数if(){resolve()}else{reject()} } 以上实例化了一个promise对象&#xff0c;其中包含了一个参数function&#xff0c;这个函数会在…

【配电网故障定位】基于二进制粒子群算法的配电网故障定位 12节点配电系统故障定位【Matlab代码#76】

文章目录 【获取资源请见文章第5节&#xff1a;资源获取】1. 配电网故障定位2. 二进制粒子群算法3. 部分代码展示4. 仿真结果展示5. 资源获取 【获取资源请见文章第5节&#xff1a;资源获取】 1. 配电网故障定位 配电系统故障定位&#xff0c;即在配电网络发生故障的时候&…

了解Vue中的 computed 计算属性

目录 1. computed计算属性介绍和基础语法 1.1. 概念 1.2. 语法 2. “计算属性”和“方法”的对比 2.1. computed 计算属性 2.1.1. 作用 2.1.2. 语法 2.2. methods 方法 2.2.1. 作用 2.2.2. 语法 2.2.3. 缓存特性&#xff08;提升性能&#xff09; 3. computed 计算…

ssm“最多跑一次”微信小程序

采用技术 ssm“最多跑一次”微信小程序的设计与实现~ 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SpringMVCMyBatis 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统实现的功能 本次设计任务是要设计一个“最多跑一次”微信小程序&#xff0c;…

AI大模型日报#0409:Llama 3下周发布、特斯联20亿融资、Karpathy新项目

导读&#xff1a; 欢迎阅读《AI大模型日报》&#xff0c;内容基于Python爬虫和LLM自动生成。目前采用“文心一言”生成了每条资讯的摘要。标题: 120亿Stable LM 2上线即开源&#xff01;2万亿token训练&#xff0c;碾压Llama 2 70B 摘要: Stable LM 2 12B参数版本发布&#x…

OpenMesh 计算网格顶点Voronoi面积

文章目录 一、简介二、实现代码三、实现代码参考资料一、简介 在计算离散的微分算子时(如拉普拉斯算子、高斯曲率等),总是会需要计算某个网格顶点的局部面积,主要有以下几种: 该操作类似于点云中的邻域操作,只不过点云的邻域一般是基于一个圆或者一个圆柱,而这里则是某个…

VSCode+Cmake 调试时向目标传递参数

我有一个遍历文件层次结构的程序&#xff0c;程序根据传入的文件路径&#xff0c;对该路径下的所有文件进行遍历。这个程序生成一个名为 ftw 的可执行文件&#xff0c;如果我要遍历 /bin 目录&#xff0c;用法为&#xff1a; ftw /bin问题是&#xff0c;如果我想单步跟踪&…

vue将html生成pdf并分页

jspdf html2canvas 此方案有很多的css兼容问题&#xff0c;比如虚线边框、svg、页数多了内容显示不全、部分浏览器兼容问题&#xff0c;光是解决这些问题就耗费了我不少岁月和精力 后面了解到新的技术方案&#xff1a; jspdf html-to-image npm install --save html-to-i…

Linux查看系统配置信息的命令【lscpu】【free】【df】【uname】【lsblk】

目录 1.查看CPU信息【lscpu】 2.查看内存信息【free】 3.查看文件系统信息【df】 4.查看系统信息【uname】 知识扩展&#xff1a;Red Hat Enterprise Linux 和 Debian GNU/Linux 两者的发展介绍 知识扩展&#xff1a;Centos 和 ubuntu的区别 知识扩展&#xff1a;更多 …

Jenkins使用-绑定域控与用户授权

一、Jenkins安装完成后&#xff0c;企业中使用&#xff0c;首先需要绑定域控以方便管理。 操作方法&#xff1a; 1、备份配置文件&#xff0c;防止域控绑定错误或授权策略选择不对&#xff0c;造成没办法登录&#xff0c;或登录后没有权限操作。 [roottest jenkins]# mkdir ba…

最短编辑距离(线性dp)-java

最短编辑问题也是一种非常经典的二维线性dp问题。 文章目录 前言 一、最短编辑距离问题 二、算法思路 1.dp[i][j]的情况 2.边界问题初始化 3.状态转移方程 三、代码如下 1.代码如下 2.读入数据 3.代码运行结果 总结 前言 最短编辑问题也是一种非常经典的二维线性dp问题。 提示&…

NzN的数据结构--插入排序

排序排序我要Disney&#xff0c;今天我们先来看看经典排序算法里的插入排序&#xff0c;先三连后看才是好习惯&#xff01;&#xff01;&#xff01; 目录 一、排序的概念及应用 1. 排序的概念 2. 排序的应用 3. 常见的排序算法 二、插入排序 1. 基本思想 2. 直接插入排…

算法打卡day40|动态规划篇08| Leetcode 139.单词拆分|多重背包理论|背包问题总结篇

目录 算法题 Leetcode 139.单词拆分 个人思路 解法 动态规划 回溯法 多重背包理论基础 背包问题总结篇 解题思路 背包递推公式 遍历顺序 01背包 完全背包 算法题 Leetcode 139.单词拆分 题目链接:139.单词拆分 大佬视频讲解&#xff1a;单词拆分视频讲解 个人思…

使用pytorch构建控制生成GAN(Controllable GAN)网络模型

本文为此系列的第四篇Controllable GAN&#xff0c;上一篇为Conditional GAN。文中使用训练好的模型和优化噪声向量来操纵生成图像的特定属性&#xff0c;若有不懂的无监督知识点可以看本系列第一篇。 原理 本文主要讲什么是控制生成&#xff0c;以及如何做到控制生成。 什么是…

华为S5735S核心交换配置实例

以下脚本实现创建vlan2,3&#xff0c;IP划分&#xff0c;DHCP启用&#xff0c;接口划分&#xff0c;ssh,telnet,http,远程登录启用 默认用户创建admin/admin123提示首次登录需要更改用户密码 sysname test-Hxvlan 2 description to test1…

JavaScript(1)神秘的编程技巧

大家都感兴趣的箭头函数 箭头函数在许多场景中都可以发挥作用&#xff0c;尤其适用于简化函数声明和提高代码的可读性。以下是箭头函数可以使用的一些常见方面&#xff1a; &#xff08;1&#xff09;回调函数&#xff1a; 箭头函数特别适合作为回调函数&#xff0c;例如在事…

html基础(2)(链接、图像、表格、列表、id、块)

1、链接 <a href"https://www.example.com" target"_blank" title"Example Link">Click here</a> 在上示例中&#xff0c;定义了一个链接&#xff0c;在网页中显示为Click here&#xff0c;鼠标悬停指示为Example Link&#xff0c…