三、Kafka的消费全流程

news2024/11/23 12:16:01

Kafka的消费全流程

我们接着继续去理解最后这条消息是如何被消费者消费掉的。其中最核心的有以下内容。

1、多线程安全问题

2、群组协调

3、分区再均衡

多线程安全问题

当多个线程访问某个类时,这个类始终都能表现出正确的行为,那么就称这个类是线程安全的。

对于线程安全,还可以进一步定义:

当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替进行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。

生产者

KafkaProducer的实现是线程安全的。

KafkaProducer就是一个不可变类。线程安全的,可以在多个线程中共享单个KafkaProducer实例

所有字段用private final修饰,且不提供任何修改方法,这种方式可以确保多线程安全。

image.png

如何节约资源的多线程使用KafkaProducer实例

package com.msb.concurrent;

import com.msb.selfserial.User;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 类说明:多线程下使用生产者
 */
public class KafkaConProducer {

    //发送消息的个数
    private static final int MSG_SIZE = 1000;
    //负责发送消息的线程池
    private static ExecutorService executorService = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors());
    private static CountDownLatch countDownLatch  = new CountDownLatch(MSG_SIZE);

    private static User makeUser(int id){
        User user = new User(id);
        String userName = "msb_"+id;
        user.setName(userName);
        return user;
    }

    /*发送消息的任务*/
    private static class ProduceWorker implements Runnable{

        private ProducerRecord<String,String> record;
        private KafkaProducer<String,String> producer;

        public ProduceWorker(ProducerRecord<String, String> record, KafkaProducer<String, String> producer) {
            this.record = record;
            this.producer = producer;
        }

        public void run() {
            final String id = Thread.currentThread().getId() +"-"+System.identityHashCode(producer);
            try {
                producer.send(record, new Callback() {
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if(null!=exception){
                            exception.printStackTrace();
                        }
                        if(null!=metadata){
                            System.out.println(id+"|" +String.format("偏移量:%s,分区:%s", metadata.offset(),
                                    metadata.partition()));
                        }
                    }
                });
                System.out.println(id+":数据["+record+"]已发送。");
                countDownLatch.countDown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        // 设置属性
        Properties properties = new Properties();
        // 指定连接的kafka服务器的地址
        properties.put("bootstrap.servers","127.0.0.1:9092");
        // 设置String的序列化
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);
        // 构建kafka生产者对象
        KafkaProducer<String,String> producer  = new KafkaProducer<String, String>(properties);
        try {
            for(int i=0;i<MSG_SIZE;i++){
                User user = makeUser(i);
                ProducerRecord<String,String> record = new ProducerRecord<String,String>("concurrent-test",null,
                        System.currentTimeMillis(), user.getId()+"", user.toString());
                executorService.submit(new ProduceWorker(record,producer));
            }
            countDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
            executorService.shutdown();
        }
    }




}

消费者

KafkaConsumer的实现不是线程安全的

实现消费者多线程最常见的方式: 线程封闭 ——即为每个线程实例化一个 KafkaConsumer对象

package com.msb.concurrent;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 类说明:多线程下正确的使用消费者,需要记住,一个线程一个消费者
 */
public class KafkaConConsumer {

    public static final int CONCURRENT_PARTITIONS_COUNT = 2;

    private static ExecutorService executorService = Executors.newFixedThreadPool(CONCURRENT_PARTITIONS_COUNT);

    private static class ConsumerWorker implements Runnable{

        private KafkaConsumer<String,String> consumer;

        public ConsumerWorker(Map<String, Object> config, String topic) {
            Properties properties = new Properties();
            properties.putAll(config);
            this.consumer = new KafkaConsumer<String, String>(properties);
            consumer.subscribe(Collections.singletonList(topic));
        }

        public void run() {
            final String ThreadName = Thread.currentThread().getName();
            try {
                while(true){
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                    for(ConsumerRecord<String, String> record:records){
                        System.out.println(ThreadName+"|"+String.format(
                                "主题:%s,分区:%d,偏移量:%d," +
                                        "key:%s,value:%s",
                                record.topic(),record.partition(),
                                record.offset(),record.key(),record.value()));
                        //do our work
                    }
                }
            } finally {
                consumer.close();
            }
        }
    }

    public static void main(String[] args) {

        /*消费配置的实例*/
        Map<String,Object> properties = new HashMap<String, Object>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"c_test");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        for(int i = 0; i<CONCURRENT_PARTITIONS_COUNT; i++){
            //一个线程一个消费者
            executorService.submit(new ConsumerWorker(properties, "concurrent-test"));
        }
    }




}

群组协调

消费者要加入群组时,会向群组协调器发送一个JoinGroup请求,第一个加入群主的消费者成为群主,群主会获得群组的成员列表,并负责给每一个消费者分配分区。分配完毕后,群主把分配情况发送给群组协调器,协调器再把这些信息发送给所有的消费者,每个消费者只能看到自己的分配信息,只有群主知道群组里所有消费者的分配信息。群组协调的工作会在消费者发生变化(新加入或者掉线),主题中分区发生了变化(增加)时发生。

image.png

组协调器

组协调器是Kafka服务端自身维护的。

组协调器( GroupCoordinator )可以理解为各个消费者协调器的一个中央处理器, 每个消费者的所有交互都是和组协调器( GroupCoordinator )进行的。

  1. 选举Leader消费者客户端
  2. 处理申请加入组的客户端
  3. 再平衡后同步新的分配方案
  4. 维护与客户端的心跳检测
  5. 管理消费者已消费偏移量,并存储至 __consumer_offset

kafka上的组协调器( GroupCoordinator )协调器有很多,有多少个 __consumer_offset分区, 那么就有多少个组协调器( GroupCoordinator )

默认情况下, __consumer_offset有50个分区, 每个消费组都会对应其中的一个分区,对应的逻辑为 hash(group.id)%分区数。

消费者协调器

每个客户端(消费者的客户端)都会有一个消费者协调器, 他的主要作用就是向组协调器发起请求做交互, 以及处理回调逻辑

  1. 向组协调器发起入组请求
  2. 向组协调器发起同步组请求(如果是Leader客户端,则还会计算分配策略数据放到入参传入)
  3. 发起离组请求
  4. 保持跟组协调器的心跳线程
  5. 向组协调器发送提交已消费偏移量的请求

消费者加入分组的流程

1、客户端启动的时候, 或者重连的时候会发起JoinGroup的请求来申请加入的组中。

2、当前客户端都已经完成JoinGroup之后, 客户端会收到JoinGroup的回调, 然后客户端会再次向组协调器发起SyncGroup的请求来获取新的分配方案

3、当消费者客户端关机/异常 时, 会触发离组LeaveGroup请求。

当然有主动的消费者协调器发起离组请求,也有组协调器一直会有针对每个客户端的心跳检测, 如果监测失败,则就会将这个客户端踢出Group。

4、客户端加入组内后, 会一直保持一个心跳线程,来保持跟组协调器的一个感知。

并且组协调器会针对每个加入组的客户端做一个心跳监测,如果监测到过期, 则会将其踢出组内并再平衡。

消费者消费的offset的存储

__consumer_offsets topic,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。
__consumer_offsets 是 kafka 自行创建的,和普通的 topic 相同。它存在的目的之一就是保存 consumer 提交的位移。

kafka-consumer-groups.bat --bootstrap-server :9092 --group c_test --describe

image.png

那么如何使用 kafka 提供的脚本查询某消费者组的元数据信息呢?

Math.abs(groupID.hashCode()) % numPartitions,

image.png

image.png

image.png

__consumer_offsets 的每条消息格式大致如图所示

可以想象成一个 KV 格式的消息,key 就是一个三元组:group.id+topic+分区号,而 value 就是 offset 的值

分区再均衡

当消费者群组里的消费者发生变化,或者主题里的分区发生了变化,都会导致再均衡现象的发生。从前面的知识中,我们知道,Kafka中,存在着消费者对分区所有权的关系,

这样无论是消费者变化,比如增加了消费者,新消费者会读取原本由其他消费者读取的分区,消费者减少,原本由它负责的分区要由其他消费者来读取,增加了分区,哪个消费者来读取这个新增的分区,这些行为,都会导致分区所有权的变化,这种变化就被称为 再均衡

再均衡对Kafka很重要,这是消费者群组带来高可用性和伸缩性的关键所在。不过一般情况下,尽量减少再均衡,因为再均衡期间,消费者是无法读取消息的,会造成整个群组一小段时间的不可用。

消费者通过向称为群组协调器的broker(不同的群组有不同的协调器)发送心跳来维持它和群组的从属关系以及对分区的所有权关系。如果消费者长时间不发送心跳,群组协调器认为它已经死亡,就会触发一次再均衡。

心跳由单独的线程负责,相关的控制参数为max.poll.interval.ms。

消费者提交偏移量导致的问题

当我们调用poll方法的时候,broker返回的是生产者写入Kafka但是还没有被消费者读取过的记录,消费者可以使用Kafka来追踪消息在分区里的位置,我们称之为 偏移量 。消费者更新自己读取到哪个消息的操作,我们称之为 提交

消费者是如何提交偏移量的呢?消费者会往一个叫做_consumer_offset的特殊主题发送一个消息,里面会包括每个分区的偏移量。发生了再均衡之后,消费者可能会被分配新的分区,为了能够继续工作,消费者者需要读取每个分区最后一次提交的偏移量,然后从指定的地方,继续做处理。

分区再均衡的例子:

某软件公司,有一个项目,有两块的工作,有两个码农,一个小王、一个小李,一个负责一块(分区消费),干得好好的。突然一天,小王桌子一拍不干了,老子中了5百万了,不跟你们玩了,立马收拾完电脑就走了。这个时候小李就必须承担两块工作,这个时候就是发生了分区再均衡。

过了几天,你入职,一个萝卜一个坑,你就入坑了,你承担了原来小王的工作。这个时候又会发生了分区再均衡。

1)如果提交的偏移量小于消费者实际处理的最后一个消息的偏移量,处于两个偏移量之间的消息会被重复处理,

2)如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失

image.png

再均衡监听器实战

我们创建一个分区数是3的主题rebalance

kafka-topics.bat --bootstrap-server localhost:9092  --create --topic rebalance --replication-factor 1 --partitions 3

image.png

在为消费者分配新分区或移除旧分区时,可以通过消费者API执行一些应用程序代码,在调用 subscribe()方法时传进去一个 ConsumerRebalancelistener实例就可以了。

ConsumerRebalancelistener有两个需要实现的方法。

  1. public void
    onPartitionsRevoked( Collection< TopicPartition> partitions)方法会在

再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了

  1. public void
    onPartitionsAssigned( Collection< TopicPartition> partitions)方法会在重新分配分区之后和消费者开始读取消息之前被调用。

具体使用,我们先创建一个3分区的主题,然后实验一下,

在再均衡开始之前会触发onPartitionsRevoked方法

在再均衡开始之后会触发onPartitionsAssigned方法

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/668867.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深度学习实战39-U-Net模型在医学影像识别分割上的应用技巧,以细胞核分割任务为例

大家好,我是微学AI,今天给大家介绍一下深度学习实战39-U-Net模型在医学影像识别分割上的应用技巧,以细胞核分割任务为例。本文将介绍在医学影像分割领域中应用U-Net模型的方法。我们将从U-Net模型原理出发,并使用PyTorch搭建模型,详细展示模型代码。接着,我们将展示一些医…

I.MX RT1170之FlexSPI(2):LUT表格的组成和FlexSPI结构体配置

从上一节FlexSPI的框图中可知&#xff0c;SEQ_CTL实现了对外部存储器的时序控制。不同的存储器有着不同的时序&#xff0c;这个时序就是由LUT(Look Up Table)指定的。LUT有它自己的寄存器&#xff0c;当我们设置好之后&#xff0c;外部存储器的读、写和擦除等操作就会根据LUT寄…

《网络安全0-100》安全事件案例

网络安全事件案例分析 2017年Equifax数据泄露事件 Equifax是美国一家信用评级机构&#xff0c;2017年9月&#xff0c;该公司披露发生了一起重大的数据泄露事件&#xff0c;涉及1.43亿美国人的个人信息&#xff0c;包括姓名、出生日期、社会安全号码等敏感信息。经过调查&#…

《OpenCV 计算机视觉编程攻略》学习笔记(一:图像编程入门)

1、参考引用 OpenCV 计算机视觉编程攻略&#xff08;第3版&#xff09;本书结合 C 和 OpenCV 3.2 全面讲解计算机视觉编程所有代码均在 Ubuntu 系统中用 g 编译执行 0. 安装 OpenCV 库 在Ubuntu上安装OpenCV及使用OpenCV 库分为多个模块&#xff0c;常见模块如下 opencv_core …

Python数据可视化 - 使用Python dash搭建交互式地图可视化看板

1.前言 前几年刚接触Dash库的时候&#xff0c;Dash生态还不太成熟&#xff0c;做些简单的web还行&#xff0c;复杂的、系统性还是得用flask或django来实现。随着这两年dash的不断迭代更新&#xff0c;以及dash大佬feffery相继开发了feffery_antd_components、feffery_leaflet_…

编译原理笔记7:语法分析(1)语法分析器的任务、语法错误的处理

目录 语法分析器是编译器前端的核心语法错误的处理语法错误的处理目标语法错误的基本恢复策略 语法分析器是编译器前端的核心 语法分析器的两项主要任务&#xff0c;分别&#xff1a; 是根据词法分析器提供的记号流&#xff0c;为语法正确的输入构造分析树&#xff08;或语法树…

uboot下UCLASS框架详解---结合项目工作中spi master和flash驱动开发

文章目录 一、综述二、UCLASS架构解析2.1 uclass2.2 udevice2.3 uclass driver2.4 driver2.4.1 spi master driver 三、uboot代码解析3.1 DM的初始化3.2 spi norflash设备识别3.3 设备树内容3.4 .config配置3.5 spi读写测试 四、其他相关链接1、SPI协议详细总结附实例图文讲解通…

IDEA合并分支和.gitignore可能遇到的问题

将本地的 v1 分支合并到 master 分支上 1.确认你在 master 分支上&#xff0c;在命令行执行以下命令&#xff0c;这将切换到 master 分支。 git checkout master 2.拉取最新代码 在合并分支之前&#xff0c;请确保您的代码库是最新的。在命令行执行以下命令&#xff0c;这将从…

一文解决C/C++中所有指针相关知识点

本篇会对C/C中【常见指针相关知识】一直进行总结迭代&#xff0c;记得收藏吃灰不迷路&#xff0c;一起学习分享喔 请大家批评指正&#xff0c;一起学习呀~ 一、指针基本知识1.1 指针的定义1.2 &#xff08;*&#xff09; 和&#xff08; &&#xff09; 运算符1.3 如何声明…

使用omp并行技术加速bfs广度优先算法

基本思想 从初始状态S开始&#xff0c;利用规则&#xff0c;生成所有可能的状态。构成树的下一层节点&#xff0c;检查是否出现目标状态G&#xff0c;若未出现&#xff0c;就对该层所有状态节点&#xff0c;分别顺序利用规则。生成再下一层的所有状态节点&#xff0c;对这一层的…

chatgpt赋能python:Python在SEO中的排名

Python在SEO中的排名 Python作为一门高级编程语言&#xff0c;已经有近三十年的历史&#xff0c;被广泛用于各个领域的开发&#xff0c;包括 Web开发、数据分析、机器学习等。在 SEO 中&#xff0c;Python 也具有很高的应用价值。本文将会介绍 Python 在 SEO 中的应用以及其排…

C语言进阶教程(一个可执行文件生成的具体步骤)

文章目录 前言一、预处理二、编译三、汇编四、链接总结 前言 本篇文章来讲解一个.c文件生成一个可执行文件的完整过程&#xff0c;我们学习了那么久&#xff0c;只知道在编译器中按下编译运行就可以将一个.c文件运行起来了&#xff0c;但是我们并不了解其中的具体步骤&#xf…

【openGauss简单数据库管理---快速入门】

【openGauss简单数据库管理---快速入门】 &#x1f53b; 一、openGauss数据库管理&#x1f530; 1.1 连接openGauss数据库&#x1f530; 1.2 创建数据库&#x1f530; 1.3 查看数据库和切换数据库&#x1f530; 1.4 修改数据库&#x1f530; 1.5 删除数据库&#x1f530; 1.6 启…

高等代数复习(二)

本篇复习内容有 求解标准正交基 证明标准正交基 证明正交变换 利用共轭变换证明 求解与给定矩阵的相似矩阵--对角矩阵 1.求标准正交基 在求标准正交基时&#xff0c;通常要先正交化&#xff0c;然后单位化&#xff0c;即可求出标准正交基。 2.证明标准正交基 标准正交基单位…

chatgpt赋能python:Python提取指定位置字符

Python 提取指定位置字符 Python 是一种高级程序语言&#xff0c;其易读性、简单易学性和易维护性使其成为最受欢迎的编程语言之一。它可以用于各种数据分析和科学计算&#xff0c;包括搜索引擎优化&#xff08;SEO&#xff09;。 在SEO中&#xff0c;提取和处理数据是一个重…

C++——string容器模拟实现

目录 1. 基本成员变量 2. 默认成员函数 2.1 构造函数 2.2 析构函数 2.3 拷贝构造函数 2.4 赋值运算符重载 3. 容量与大小相关函数 3.1 size 3.2 capacity 4. 字符串访问相关函数 4.1 operator[ ]重载 4.2 迭代器 5. 增加函数接口 5.1 reserve 5.2 resize 5.3 …

【二叉树part01】| 二叉树的递归遍历、二叉树的迭代遍历

目录 ✿二叉树的递归遍历❀ ☞LeetCode144.前序遍历 ☞LeetCode145.二叉树的后序遍历 ☞LeetCode94.二叉树的中序遍历 ✿二叉树的迭代遍历❀ ☞LeetCode144.前序遍历 ☞LeetCode145.二叉树的后序遍历 ☞LeetCode94.二叉树的中序遍历 ✿二叉树的递归遍历❀ ☞LeetCode…

docker-compose启动opengauss数据库——华为“自研”数据库

文章目录 1. 启动数据库2. 登录2.1 本地登录2.2 远程登录 1. 启动数据库 yml文件 创建opengauss目录&#xff0c;里边创建docker-compose.yml文件内容如下&#xff1a; 华为开源数据库&#xff0c;默认5432端口&#xff0c;是不是很熟悉&#xff0c;疑似又是个套壳子的事件。果…

Cortext-M3系列:调试系统架构(8)

1、调试特性概述 单片机的调试功能在程序开发中有着十分重要的地位&#xff0c;好的调试工具&#xff0c;能让程序开发大大加快。笔者在刚开始学单片机相关知识时&#xff0c;使用的是pintf打印相关参数&#xff0c;进行调试&#xff08;虽然现在很多时候也这样&#xff09;&am…

MyBatis 的使用方法

观前提示:本篇博客演示使用的 IDEA 版本为2021.3.3版本,使用的是Java8(又名jdk1.8) 前端使用 VSCode(Visual Studio Code1.78.2) 电脑使用的操作系统版本为 Windows 10 目录 Mybatis是什么? Mybatis 有什么用? Mybatis 框架交流 Mybatis 项目环境搭建 1. 添加 Mybatis…