Kakfa详解(一)

news2024/11/15 19:59:31

kafka使用场景

  • canal同步mysql
  • elk日志系统
  • 业务系统Topic

kafka基础概念

  • Producer: 消息生产者,向kafka发送消息
  • Consumer: 从kafka中拉取消息消费的客户端
  • Consumer Group: 消费者组,消费者组是多个消费者的集合。消费者组之间互不影响,所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。某一个分区中的消息只能够一个消费者组中的一个消费者所消费。

    减少Group订阅Topic的数量,一个Group订阅的Topic最好不要超过5个,建议一个Group只订阅一个Topic

  • Broker: 一台kafka服务器就是一个Broker,一个集群由多个Broker组成
  • Topic:主题,可以理解为队列,生产者和消费者都是面向Topic
  • Partition:分区,为了实现扩展性。一个非常大的Topic可以分布在多个Broker上,一个Topic可以分为多个Partition,每个Partition是一个有序的队列(分区有序,不能保证全局有序)
    • 便于在集群中扩展
    • 可以提高并发,以Partition为单位进行读写,类似于多路
    # 默认分区数 server.properties配置
    num.partitions=1
    
  • Replica:副本,为保证集群中某个节点发生故障,节点上的Partition数据不丢失,kafka可以正常的工作,kafka提供了副本机制,一个Topic的每个分区有若干个副本,一个Leader和多个Follower
    # 默认副本数 server.properties配置 
    # 默认分区副本数不得超过kafka节点数(副本数如果一个节点放2份,就没意义了)
    default.replication.factor=3
    
  • Leader:每个分区多个副本的主角色,生产者发送数据对象,以及消费者消费数据都是Leader
  • Follower: 每个分区多个副本的从角色,实时的从Leader同步数据,保持和Leader数据的同步,Leader发生故障的时候,某个Follower会成为新的Leader
  • ISRin sync replica,基本保存同步的Replica列表,是副本与主副本保持同步的列表,默认是30s数据,如果从副本保持同步,那么重新选举leader的时候,会被选择。如果与主副本同步差距较大,会被移除,选举leader将不会被考虑。
  • OSRout of sync replica, 同步有延迟的follower列表
  • LEOLog End Offset,每个副本最后一个offset
  • HWHigh Watermark,高水位,指消费者能见到的最大的offsetISR队列中最小的LEO
    在这里插入图片描述

文件存储

主要是通过logindex等文件保存具体的消息文件

一个topic 对应多个partition
一个partition 对应多个segment
一个segment对应logindex文件

为了防止log文件过大导致定位效率低下,kafka的log文件以1G为一个分界点,当.log文件大小超过1G的时候,此时会创建一个新的.log文件,同时为了快速定位大文件中消息位置,kafka采取了分片和索引的机制来加速定位。

.index文件存储的消息的offset+真实的起始偏移量。.log中存放的是真实的数据。

数据定位步骤,查找offset=6的数据。

  • 通过二分查找,定位.index文件。offset=6(大于4,小于9),定位到第二个文件segement02
  • 然后offset减去segment02的起始偏移量(6-4=2),定位到之后总的偏移量
  • 获取到总的偏移量之后,直接定位到.log文件即可快速获得当前消息大小
    在这里插入图片描述

生产者

发送消息分区策略
  • 指明partition(指明是指第几个分区)的情况下,直接将指明的值作为partition的值
  • 没有指明partition的情况下,但是存在值key,此时将keyhash值与topicpartition总数进行取余得到partition
  • 值与partition均无的情况下,第一次调用时随机生成一个整数,后面每次调用在这个整数上自增,将这个值与topic可用的partition总数取余得到partition值,即round-robin算法。
生产者消息发送

在这里插入图片描述

为保证producer发送的数据能够可靠的发送到指定的topic中,topic的每个partition收到producer发送的数据后,都需要向producer发送ackacknowledgement,如果producer收到ack就会进行下一轮的发送,否则重新发送数据

发送ack时机
  • 半数follower同步完成即发送ack,容错率低,延时低
  • 全部follower同步完成完成发送ack,容错率高,延时高

kafka采用的是第二种,延迟对kafka影响比较小。

采用了第二种方案进行同步ack之后,如果leader收到数据,所有的follower开始同步数据,但有一个follower因为某种故障,迟迟不能够与leader进行同步,那么leader就要一直等待下去,直到它同步完成,才可以发送ack,此时需要如何解决这个问题呢?

leader中维护了一个ISR(in-sync replica set)同步副本集,即与leader保持同步的follower集合,当ISR中的follower完成数据的同步之后,给leader发送ack,如果follower长时间没有向leader同步数据,则该follower将从ISR中被踢出,该之间阈值由replica.lag.time.max.ms参数设定。当leader发生故障之后,会从ISR中选举出新的leader。

ack参数
  • 0: producer不等待broker的ack,这一操作提供了最低的延迟,broker接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据
  • 1: producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将丢失数据。(只是leader落盘)
  • -1(all): producer等待broker的ack,partition的leader和ISR的follower全部落盘成功才返回ack,但是如果在follower同步完成后,broker发送ack之前,如果leader发生故障,会造成数据重复。(这里的数据重复是因为没有收到,所以继续重发导致的数据重复)
高吞吐量,低延迟
  1. kafka会先写入操作系统页缓存中,操作系统再决定将数据写回到磁盘上
  2. 磁盘顺序写,采用追加的方式写入消息
  3. 零拷贝

kafka消息发送,消息暂时暂存的,批量发送RecordAccumulator.class是专门缓存kafka消息的。

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9200,127.0.0.1:9201,127.0.0.1:9202
    producer: # producer 生产者
      retries: 0 # 重试次数
      acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      batch-size: 16384 # 默认 批量大小 16KB
      buffer-memory: 33554432 # 默认 生产端缓冲区大小  32MB
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

消费者

消费方式

消费者采用pull的方式来从broker中读取数据

push推的模式很难适应消费速率不同的消费者,因为消息发送率是由broker决定的,它的目标是尽可能以最快的速度传递消息,但是这样容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull方式则可以让consumer根据自己的消费处理能力以适当的速度消费消息。

消费流程
  1. 从zookeeper中获取leader的位置和offset的位置,kafka0.9版本之前,consumer默认将offset保存在zookeeper中,从0.9版本之后,consumer默认将offset保存在kafka一个内置的topic中,该topic为__consumer_offsets
  2. 拉取数据,直接从broker的page cache 拉取
  3. 如果page cache数据不全,就会从磁盘中拉取,并发送
  4. 消费完成后,可以手动提交offset,也可以自动提交offset
    在这里插入图片描述
零拷贝

分区分配策略

线上的服务都是多个消费者服务一起消费的,一个topic包含多个partition,分区和消费者存在一个分配的策略,默认采用的是Range范围分配策略

计算公式
n = 分区数/消费者数
m = 分区数%消费者数
前m个消费者,消费n+1个,剩下的消费n个
8个分区(p1 - p8),3个消费者(c1 - c3)
c1 分配 p1 p2 p3
c2 分配 p4 p5 p6
c3 分配 p7 p8

配置参数
spring:
  kafka:
    consumer: # consumer消费者
      group-id: test-group # 默认的消费组ID
      enable-auto-commit: true # 是否自动提交offset
      auto-commit-interval: 1000  # 提交offset延时(接收到消息后多久提交offset)
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 500 #一次拉取最大数据 500条
offset提交

默认是自动提交,enable.auto.commit=true
手动提交offset的方法有两种:

  • commitSync:同步提交,失败后会自动重试
  • commitAsync: 异步提交,失败后不会自动重试
重复消费

产生的原因

  1. 生产者重复提交
  2. rebalance引起的重复消费
    超过一定时间(max.poll.interval.ms设置的值,默认5分钟)未进行poll拉取消息,则会导致客户端主动离开队列,而引发rebalance,提交offset失败。其他消费者会从没有提交的位置消费,从而导致重复消费。

解决方案

  • 提高消费速度
    • 增加消费者
    • 多线程处理
    • 异步消费
    • 调整消费处理时间
  • 幂等处理
    • 消费者设置幂等校验
    • 开启kafka幂等配置,生产者开启幂等配置,将消息生成md5,然后保存到redis中,处理新消息的时候先校验。这个尽量不要开启,消耗性能
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    
批量消费配置
@Configuration
@Slf4j
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String consumerGroupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    public Map<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>(16);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean
    public KafkaListenerContainerFactory<?> containerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String>
                factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerFactory()));
        // 消费者组中线程数量
        factory.setConcurrency(3);
        //  当使用批量监听器时需要设置为true
        factory.setBatchListener(true);
        // 拉取超时时间
        factory.getContainerProperties().setPollTimeout(3000);
        // 重试次数
        RetryingBatchErrorHandler errorHandler = new RetryingBatchErrorHandler(
                new FixedBackOff(500L, 3L), null);
        factory.setBatchErrorHandler(errorHandler);
        return factory;
    }
}
@KafkaListener(topics = "aloneness-topic02",
        properties = {"max.poll.records=20"},
        containerFactory = "containerFactory")
public void listen02(List<String> list) {
    log.info("处理批量消息:{}", JSON.toJSONString(list));
    List<Message> messages = JSON.parseArray(JSON.toJSONString(list), Message.class);
    System.out.println(messages);
}

如果未配置重试次数,也消费代码中出现异常,会一直重试,一直消费异常

手动创建Topic

kafka版本大于2.2

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic als-test-topic02
  • --zookeeper localhost:2181 指定zookeeper集群
  • --partitions 指定分区数
  • --replication-factor 指定分区副本数
重新分配分区副本

声明需要分配的Topic
topic-generate.json

{
  "topics": [
    {
      "topic": "aloneness-topic"
    }
  ],
  "version": 1
}

通过 --topics-to-move-json-file 参数,生成分区分配策略 --generate

kafka-reassign-partitions.bat --zookeeper localhost:2181 --topics-to-move-json-file topic-generate.json --broker-list "0,1,2" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"aloneness-topic","partition":0,"replicas":[0],"log_dirs":["any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"aloneness-topic","partition":0,"replicas":[1],"log_dirs":["any"]}]}

通过 --reassignment-json-file 参数,执行分区分配策略 --execute

kafka-reassign-partitions.bat --zookeeper localhost:2181 --reassignment-json-file partition-replica-reassignment.json --execute

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/382808.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Wireshark+Go捕获本地TCP通信

初学计网&#xff0c;使用Wireshark观察本地端口间TCP通信过程。 目录 步骤1&#xff1a; 步骤2&#xff1a; 步骤3&#xff1a; 步骤1&#xff1a; 使用go语言搭建本地客户端与服务器TCP通信&#xff0c;测试完成后在步骤2先运行服务器&#xff0c;再运行客户端。 服务器…

C语言查漏补缺(进阶)volatile、__attribute__、void*、地址对齐、$$Super$main

最近在学习RT-Thread&#xff0c;在看其源码的时候发现了许多自己不太了解的C语言知识点&#xff0c;在此查漏补缺一下。 1. 关键字 volatile volatile是C90新增关键字&#xff0c;volatile的的中文意思是adj.易变的&#xff1b;无定性的&#xff1b;无常性的&#xff1b;可…

如何使用FarsightAD在活动目录域中检测攻击者部署的持久化机制

关于FarsightAD FarsightAD是一款功能强大的PowerShell脚本&#xff0c;该工具可以帮助广大研究人员在活动目录域遭受到渗透攻击之后&#xff0c;检测到由攻击者部署的持久化机制。 该脚本能够生成并导出各种对象及其属性的CSV/JSON文件&#xff0c;并附带从元数据副本中获取…

Python|每日一练|递归|数学|数组|动态规划|树|深度优先搜索|单选记录:排列序列|三角形最小路径和|求根节点到叶节点数字之和

1、排列序列&#xff08;递归&#xff0c;数学&#xff09; 给出集合 [1,2,3,...,n]&#xff0c;其所有元素共有 n! 种排列。 按大小顺序列出所有排列情况&#xff0c;并一一标记&#xff0c;当 n 3 时, 所有排列如下&#xff1a; "123""132""213…

webpack基本使用和开发环境配置

目录 1 webpack 基本使用 01 webpack 简介 02 webpack 初体验 2 webpack开发环境配置 03 打包样式资源 04 打包html资源 05 打包图片资源 06 打包其他资源&#xff08;以打包icon为例&#xff09; 07 devServer 08.开发环境配置 1 webpack 基本使用 由于笔记文档没有…

批量下载Landsat遥感影像的方法

本文介绍在USGS网站批量下载Landsat系列遥感影像的方法。首先打开EarthExplorer的官网&#xff0c;首先完成注册与登录。接下来点击左侧“Search Criteria”&#xff0c;首先选择研究区域。研究区域的划定有多种方法&#xff0c;可以依据地理名称选定研究区域&#xff0c;也可以…

klipper使用webcam设置多个摄像头方式

一、前言 使用klipper设置多个摄像头&#xff0c;折腾了好些天&#xff0c;网上资料很少&#xff0c;这里写一个帖子记录一下 二、环境 参考链接&#xff1a;https://www.cnblogs.com/sjqlwy/p/klipper_webcam.html 我的klipper安装在香橙派上面&#xff0c;系统是debian&a…

这一次,彻底入门前端测试,覆盖单元测试、组件测试(2.4w 字)

前端测试一直是前端工程化中很重要的话题&#xff0c;但是很多人往往对测试产生误解&#xff0c;认为测试不仅没有什么用而且还浪费时间&#xff0c;或者测试应该让测试人员来做&#xff0c;自己应该专注于开发。所以&#xff0c;文章开头会先从"软件工程语境下的软件测试…

【运筹优化】剩余空间法求解带顺序约束的二维矩形装箱问题 + Java代码实现

文章目录一、带顺序约束的二维矩形装箱问题二、剩余空间法三、完整代码实现3.1 Instance 实例类3.2 Item 物品类3.3 PlaceItem 已放置物品类3.4 Solution 结果类3.5 RSPackingWithWeight 剩余空间算法类3.6 Run 运行类3.7 测试案例3.8 ReadDataUtil 数据读取类3.9 运行结果展示…

Spring boot + mybatis-plus 遇到 数据库字段 创建不规范 大驼峰 下划线 导致前端传参数 后端收不到参数 解决方案

最近使用springboot 连接了一个 sqlserver 数据库 由于数据库年数久远 &#xff0c;建表字段不规范 大驼峰 下划线的字段名都有 但是 java 中 Spring boot mybatis-plus 又严格按照小驼峰 格式 生成实体类 如果不是小驼峰格式 Data 注解 get set 方法 在前端请求参数 使用这个…

如何评估模糊测试工具-unibench的使用

unibench是一个用来评估模糊测试工具的benchmark。这个benchmark集成了20多个常用的测试程序&#xff0c;以及许多模糊测试工具。 这篇文章&#xff08;https://zhuanlan.zhihu.com/p/421124258&#xff09;对unibench进行了简单的介绍&#xff0c;本文就不再赘诉&#xff0c;…

设计模式-第6章(工厂模式)

工厂模式简单工厂实现工厂模式实现简单工厂 VS 工厂方法商场收银程序再再升级&#xff08;简单工厂策略装饰工厂方法&#xff09;工厂方法模式总结简单工厂实现 在简单工厂类中&#xff0c;通过不同的运算符&#xff0c;创建具体的运算类。 public class OperationFactory {pu…

CMMI流程规范—实现与测试

一、概述实现与测试&#xff08;Implementation and Test, IT&#xff09;的目的是依据系统设计文档&#xff0c;编写并测试整个系统的代码。在本规范中&#xff0c;实现与测试是“编程、代码审查、单元测试、集成测试、缺陷管理与改错”的综合表述。实现与测试过程域是SPP模型…

从 AI 绘画到 ChatGPT,聊聊生成式 AI

我们小时候经常有幻想&#xff0c;未来不用再去上班了&#xff0c;在工厂工作的都是机器人。在家也不用打扫卫生&#xff0c;机器人可以包揽一切。不知不觉间&#xff0c;我们小时候的幻想已经慢慢变成现实&#xff0c;工厂里有了多种型号的机械臂&#xff0c;代替了部分流水线…

Vue3中watch的value问题

目录前言一&#xff0c;ref和reactive的简单复习1.ref函数1.2 reactive函数1.3 用ref定义对象类型数据不用reactive二&#xff0c;watch的value问题2.1 ref2.1.1 普通类型数据2.1.2 对象类型数据2.1.3 另一种方式2.2 reactive三&#xff0c;总结后记前言 在Vue3中&#xff0c;…

论文投稿指南——中文核心期刊推荐(中国文学作品)

【前言】 &#x1f680; 想发论文怎么办&#xff1f;手把手教你论文如何投稿&#xff01;那么&#xff0c;首先要搞懂投稿目标——论文期刊 &#x1f384; 在期刊论文的分布中&#xff0c;存在一种普遍现象&#xff1a;即对于某一特定的学科或专业来说&#xff0c;少数期刊所含…

微信小程序通过 node 连接 mysql——方法,简要原理,及一些常见问题

前言 博主自己在22年夏天根据课程要求做了一个小程序连接阿里云服务器的案例&#xff0c;在最近又碰到了相应的需求。 原参考文章&#xff1a;微信小程序 Node连接本地MYSQL_微信小程序nodejs连接数据库_JJJenny0607的博客-CSDN博客 ,还请多多支持原作者&#xff01; 第二次…

vue2 @hook 的解析与妙用

目录前言几种用法用法一 将放在多个生命周期的逻辑&#xff0c;统一到一个生命周期中用法二 监听子组件生命周期运行的情况运用场景场景一 许多时候&#xff0c;我们不得不在不同的生命周期中执行某些逻辑&#xff0c;并且这些逻辑会用到一些通用的变量&#xff0c;这些通用变量…

nginx日志服务之敏感信息脱敏

1. 创建实验资源 开始实验之前&#xff0c;您需要先创建实验相关资源。 日志服务之敏感信息脱敏与审计 2. 创建原始数据 本步骤将指导您如何创建NGINX模拟数据。 双击打开虚拟桌面的Firefox ESR浏览器。 在RAM用户登录框中单击下一步&#xff0c;并复制粘贴页面左上角的子…

使用groovy代码方式解开gradle配置文件神秘面纱

来到这里的是不是都有以下疑问&#xff1a; 1.build.gradle配置文件结构好复杂啊&#xff0c;怎么记&#xff1f; 2.内部是怎么进行分析和执行的&#xff1f; 3.为什么可以在配置文件里面写groovy代码&#xff0c;怎么识别的&#xff1f; 4.怎么才能很方便的记住和快速上手…