Kafka 3.x.x 入门到精通(05)——对标尚硅谷Kafka教程

news2024/12/30 2:41:01

Kafka 3.x.x 入门到精通(05)——对标尚硅谷Kafka教程

  • 2. Kafka基础
    • 2.1 集群部署
    • 2.2 集群启动
    • 2.3 创建主题
    • 2.4 生产消息
    • 2.5 存储消息
    • 2.6 消费消息
      • 2.6.1 消费消息的基本步骤
      • 2.6.2 消费消息的基本代码
      • 2.6.3 消费消息的基本原理
        • 2.6.3.1消费者组
          • 2.6.3.1.1 消费数据的方式:push & pull
          • 2.6.3.1.2 消费者组Consumer Group
        • 2.6.3.2 调度(协调)器Coordinator
        • 2.6.3.3 消费者分配策略Assignor
        • 2.6.3.4 偏移量offset
          • 2.6.3.4.1 起始偏移量
          • 2.6.3.4.3 指定偏移量消费
          • 2.6.3.4.4 偏移量提交
        • 2.6.3.5 消费者事务
        • 2.6.3.6 偏移量的保存
        • 2.6.3.7消费数据

在这里插入图片描述

在这里插入图片描述

本文档参看的视频是:

  • 尚硅谷Kafka教程,2024新版kafka视频,零基础入门到实战
  • 黑马程序员Kafka视频教程,大数据企业级消息队列kafka入门到精通
  • 小朋友也可以懂的Kafka入门教程,还不快来学

本文档参看的文档是:

  • 尚硅谷官方文档,并在基础上修改 完善!非常感谢尚硅谷团队!!!!

在这之前大家可以看我以下几篇文章,循序渐进:

❤️Kafka 3.x.x 入门到精通(01)——对标尚硅谷Kafka教程

❤️Kafka 3.x.x 入门到精通(02)——对标尚硅谷Kafka教程

❤️Kafka 3.x.x 入门到精通(03)——对标尚硅谷Kafka教程

❤️Kafka 3.x.x 入门到精通(04)——对标尚硅谷Kafka教程
在这里插入图片描述

2. Kafka基础

2.1 集群部署

❤️Kafka 3.x.x 入门到精通(02)——对标尚硅谷Kafka教程

2.2 集群启动

❤️Kafka 3.x.x 入门到精通(02)——对标尚硅谷Kafka教程

2.3 创建主题

❤️Kafka 3.x.x 入门到精通(02)——对标尚硅谷Kafka教程

2.4 生产消息

❤️Kafka 3.x.x 入门到精通(03)——对标尚硅谷Kafka教程

2.5 存储消息

❤️Kafka 3.x.x 入门到精通(04)——对标尚硅谷Kafka教程

在这里插入图片描述

2.6 消费消息

数据已经存储到了Kafka的数据文件中,接下来应用程序就可以使用Kafka Consumer API 向Kafka订阅主题,并从订阅的主题上接收消息了。

2.6.1 消费消息的基本步骤

(一)建Map类型的配置对象,根据场景增加相应的配置属性:

参数名参数作用类型默认值推荐值
bootstrap.servers集群地址,格式为:brokerIP1:端口号,brokerIP2:端口号必须
key.deserializer对数据Key进行反序列化的类完整名称必须Kafka提供的字符串反序列化类:StringSerializer
value.deserializer对数据Value进行反序列化的类完整名称必须Kafka提供的字符串反序列化类:ValueSerializer
group.id消费者组ID,用于标识完整的消费场景,一个组中可以包含多个不同的消费者对象。必须
auto.offset.reset
group.instance.id消费者实例ID,如果指定,那么在消费者组中使用此ID作为memberId前缀可选
partition.assignment.strategy分区分配策略可选
enable.auto.commit启用偏移量自动提交可选true
auto.commit.interval.ms自动提交周期可选5000ms
fetch.max.bytes消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响可选52428800(50 m)
offsets.topic.num.partitions偏移量消费主题分区数可选50

(二)创建消费者对象

根据配置创建消费者对象KafkaConsumer,向Kafka订阅(subscribe)主题消息,并向Kafka发送请求(poll)获取数据。

(三)获取数据

Kafka会根据消费者发送的参数,返回数据对象ConsumerRecord。返回的数据对象中包括指定的数据。

数据项数据含义
topic主题名称
partition分区号
offset偏移量
timestamp数据时间戳
key数据key
value数据value

在这里插入图片描述

(四)关闭消费者

消费者消费完数据后,需要将对象关闭用以释放资源。一般情况下,消费者无需关闭。

在这里插入图片描述

2.6.2 消费消息的基本代码

之前的代码:

package com.atguigu.test;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class KafkaConsumerTest {
    public static void main(String[] args) {
        // TODO 创建消费者配置参数集合
        Map<String, Object> paramMap = new HashMap<>();
        paramMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        paramMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

        // TODO 通过配置,创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(paramMap);
        // TODO 订阅主题
        consumer.subscribe(Collections.singletonList("test"));
        // TODO 消费数据
        final ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
        // TODO 遍历数据
        for (ConsumerRecord<String, String> record : poll) {
            System.out.println( record.value() );
        }
        // TODO 关闭消费者
        consumer.close();
    }
}

在这里插入图片描述

2.6.3 消费消息的基本原理

从数据处理的角度来讲,消费者和生产者的处理逻辑都相对比较简单。

Producer生产者的基本数据处理逻辑就是向Kafka发送数据,并获取Kafka的数据接收确认响应。

在这里插入图片描述

而消费者的基本数据处理逻辑就是向Kafka请求数据,并获取Kafka返回的数据。

在这里插入图片描述

逻辑确实很简单,但是Kafka为了能够构建高吞吐,高可靠性,高并发的分布式消息传输系统,所以在很多细节上进行了扩展和改善:比如生产者可以指定分区,可以异步和同步发送数据,可以进行幂等性操作和事务处理。对应的,消费者功能和处理细节也进行了扩展和改善。

在这里插入图片描述

2.6.3.1消费者组
2.6.3.1.1 消费数据的方式:push & pull

Kafka的主题如果就一个分区的话,那么在硬件配置相同的情况下,消费者Consumer消费主题数据的方式没有什么太大的差别。

在这里插入图片描述

不过,Kafka为了能够构建高吞吐,高可靠性,高并发的分布式消息传输系统,它的主题是允许多个分区的,那么就会发现不同的消费数据的方式区别还是很大的。

  • 如果数据由Kafka进行推送(push),那么多个分区的数据同时推送给消费者进行处理,明显一个消费者的消费能力是有限的,那么消费者无法快速处理数据,就会导致数据的积压,从而导致网络,存储等资源造成极大的压力,影响吞吐量和数据传输效率。

在这里插入图片描述

  • 如果kafka的分区数据在内部可以存储的时间更长一些,再由消费者根据自己的消费能力向kafka申请(拉取)数据,那么整个数据处理的通道就会更顺畅一些。Kafka的Consumer就采用的这种拉取数据的方式。

在这里插入图片描述

在这里插入图片描述

2.6.3.1.2 消费者组Consumer Group

消费者可以根据自身的消费能力主动拉取Kafka的数据,但是毕竟自身的消费能力有限,如果主题分区的数据过多,那么消费的时间就会很长。对于kafka来讲,数据就需要长时间的进行存储,那么对Kafka集群资源的压力就非常大。

如果希望提高消费者的消费能力,并且减少kafka集群的存储资源压力。所以有必要对消费者进行横向伸缩,从而提高消息消费速率。

在这里插入图片描述

不过这么做有一个问题,就是每一个消费者是独立,那么一个消费者就不能消费主题中的全部数据,简单来讲,就是对于某一个消费者个体来讲,主题中的部分数据是没有消费到的,也就会认为数据丢了,这个该如何解决呢?那如果我们将这多个消费者当成一个整体,是不是就可以了呢?这就是所谓的消费者组 Consumer Group

在kafka中,每个消费者都对应一个消费组,消费者可以是一个线程,一个进程,一个服务实例,如果kafka想要消费消息,那么需要指定消费那个topic的消息以及自己的消费组id(groupId)。

在这里插入图片描述

在这里插入图片描述

2.6.3.2 调度(协调)器Coordinator

消费者想要拉取数据,首先必须要加入到一个组中,成为消费组中的一员,同样道理,如果消费者出现了问题,也应该从消费者组中剥离。而这种加入组和退出组的处理,都应该由专门的管理组件进行处理,这个组件在kafka中,我们称之为消费者组调度器(协调)(Group Coordinator)

Group Coordinator是Broker上的一个组件,用于管理和调度消费者组的成员、状态、分区分配、偏移量等信息每个Broker都有一个Group Coordinator对象,负责管理多个消费者组,但每个消费者组只有一个Group Coordinator

在这里插入图片描述

在这里插入图片描述

2.6.3.3 消费者分配策略Assignor

消费者想要拉取主题分区的数据,首先必须要加入到一个组中。

但是一个组中有多个消费者的话,那么每一个消费者该如何消费呢,是不是像图中一样的消费策略呢?如果是的话,那假设消费者组中只有2个消费者或有4个消费者,和分区的数量不匹配,怎么办?所以这里,我们需要给大家介绍一下,Kafka中基本的消费者组中的消费者和分区之间的分配规则:

  • 同一个消费者组的消费者都订阅同一个主题,所以消费者组中的多个消费者可以共同消费一个主题中的所有数据。

  • 为了避免数据被重复消费,所有主题一个分区的数据只能被组中的一个消费者消费,也就是说不能两个消费者同时消费一个分区的数据。但是反过来,一个消费者是可以消费多个分区数据的。
    在这里插入图片描述

  • 消费者组中的消费者数量最好不要超出主题分区的数据,就会导致多出的消费者是无法消费数据的,造成了资源的浪费。
    在这里插入图片描述

消费者中的每个消费者到底消费哪一个主题分区,这个分配策略其实是由消费者的Leader决定的,这个Leader我们称之为群主。群主是多个消费者中,第一个加入组中的消费者,其他消费者我们称之为Follower,称呼上有点类似与分区的Leader和Follower。

在这里插入图片描述

当消费者加入群组的时候,会发送一个JoinGroup请求。群主负责给每一个消费者分配分区。
每个消费者只知道自己的分配信息,只有群主知道群组内所有消费者的分配信息。

指定分配策略的基本流程:

(1) 第一个消费者设定group.id为test,向当前负载最小的节点发送请求查找消费调度器
在这里插入图片描述

(2) 找到消费调度器后,消费者向调度器节点发出JOIN_GROUP请求,加入消费者组。
在这里插入图片描述

(3) 当前消费者当选为群主后,根据消费者配置中分配策略设计分区分配方案,并将分配好的方案告知调度器
在这里插入图片描述

(4) 此时第二个消费者设定group.id为test,申请加入消费者组

在这里插入图片描述

(5) 加入成功后,kafka将消费者组状态切换到准备rebalance,关闭和消费者的所有链接,等待它们重新加入。客户端重新申请加入,kafka从消费者组中挑选一个作为leader,其它的作为follower。(步骤和之前相同,我们假设还是之前的消费者为Leader)

在这里插入图片描述

(6) Leader会按照分配策略对分区进行重分配,并将方案发送给调度器,由调度器通知所有的成员新的分配方案。组成员会按照新的方案重新消费数据
在这里插入图片描述

Kafka提供的分区分配策略常用的有4个:

  • RoundRobinAssignor(轮询分配策略)
    • 每个消费者组中的消费者都会含有一个自动生产的UUID作为memberid。
      在这里插入图片描述

轮询策略中会将每个消费者按照memberid进行排序,所有member消费的主题分区根据主题名称进行排序。
在这里插入图片描述

将主题分区轮询分配给对应的订阅用户,注意未订阅当前轮询主题的消费者会跳过。
在这里插入图片描述
在这里插入图片描述

从图中可以看出,轮询分配策略是存在缺点的,并不是那么的均衡,如果test1-2分区能够分配给消费者ccc是不是就完美了。

  • RangeAssignor(范围分配策略)
    • 按照每个topic的partition数计算出每个消费者应该分配的分区数量,然后分配,分配的原则就是一个主题的分区尽可能的平均分,如果不能平均分,那就按顺序向前补齐即可。
#所谓按顺序向前补齐就是:
假设【1,2,3,4,55个分区分给2个消费者:
5 / 2 = 2, 5 % 2 = 1 => 剩余的一个补在第一个中[2+1][2] => 结果为[1,2,3][4,5]

假设【1,2,3,4,55个分区分到3个消费者:
5 / 3 = 1, 5 % 3 = 2 => 剩余的两个补在第一个和第二个中[1+1][1+1][1] => 结果为[1,2][3,4][5]

在这里插入图片描述

在这里插入图片描述

缺点:Range分配策略针对单个Topic的情况下显得比较均衡,但是假如Topic多的话, member排序靠前的可能会比member排序靠后的负载多很多。是不是也不够理想。

在这里插入图片描述

还有就是如果新增或移除消费者成员,那么会导致每个消费者都需要去建立新的分区节点的连接,更新本地的分区缓存,效率比较低。

在这里插入图片描述

  • StickyAssignor(粘性分区)
    • 在第一次分配后,每个组成员都保留分配给自己的分区信息。如果有消费者加入或退出,那么在进行分区再分配时(一般情况下,消费者退出45s后,才会进行再分配,因为需要考虑可能又恢复的情况),尽可能保证消费者原有的分区不变,重新对加入或退出消费者的分区进行分配。

在这里插入图片描述
在这里插入图片描述

从图中可以看出,粘性分区分配策略分配的会更加均匀和高效一些。

  • CooperativeStickyAssignor
    • 前面的三种分配策略再进行重分配时使用的是EAGER协议,会让当前的所有消费者放弃当前分区,关闭连接,资源清理,重新加入组和等待分配策略。明显效率是比较低的,所以从Kafka2.4版本开始,在粘性分配策略的基础上,优化了重分配的过程,使用的是COOPERATIVE协议,特点就是在整个再分配的过程中从图中可以看出,粘性分区分配策略分配的会更加均匀和高效一些,COOPERATIVE协议将一次全局重平衡,改成每次小规模重平衡,直至最终收敛平衡的过程。

Kafka消费者默认的分区分配就是RangeAssignor,CooperativeStickyAssignor

在这里插入图片描述

2.6.3.4 偏移量offset

偏移量offset是消费者消费数据的一个非常重要的属性。默认情况下,消费者如果不指定消费主题数据的偏移量,那么消费者启动消费时,无论当前主题之前存储了多少历史数据,消费者只能从连接成功后当前主题最新的数据偏移位置读取,而无法读取之前的任何数据,如果想要获取之前的数据,就需要设定配置参数或指定数据偏移量。

在这里插入图片描述

2.6.3.4.1 起始偏移量

在消费者的配置中,我们可以增加偏移量相关参数auto.offset.reset,用于从最开始获取主题数据,
在这里插入图片描述

package com.atguigu.test;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class KafkaConsumerTest {
    public static void main(String[] args) {

        // TODO 创建消费者配置参数集合
        Map<String, Object> paramMap = new HashMap<>();
        paramMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        paramMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        paramMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

        // TODO 通过配置,创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(paramMap);
        // TODO 订阅主题
        consumer.subscribe(Arrays.asList("test"));

        while ( true ) {
            // TODO 消费数据
            final ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));

            // TODO 遍历数据
            for (ConsumerRecord<String, String> record : poll) {
                System.out.println( record );
            }
        }
    }
}

参数取值有3个:

  • earliest:对于同一个消费者组,从头开始消费。就是说如果这个topic有历史消息存在,现在新启动了一个消费者组,且auto.offset.reset=earliest,那将会从头开始消费(未提交偏移量的场合)。
    在这里插入图片描述

  • latest:对于同一个消费者组,消费者只能消费到连接topic后,新产生的数据(未提交偏移量的场合)。
    在这里插入图片描述

  • none:生产环境不使用

在这里插入图片描述

2.6.3.4.3 指定偏移量消费

除了从最开始的偏移量或最后的偏移量读取数据以外,Kafka还支持从指定的偏移量的位置开始消费数据。

在这里插入图片描述

package com.atguigu.test;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class KafkaConsumerOffsetTest {
    public static void main(String[] args) {

        Map<String, Object> paramMap = new HashMap<>();
        paramMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        paramMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

        KafkaConsumer<String, String> c = new KafkaConsumer<String, String>(paramMap);

        // TODO 订阅主题
        c.subscribe(Collections.singletonList("test"));
        // TODO 拉取数据,获取基本集群信息
        c.poll(Duration.ofMillis(100));
        // TODO 根据集群的基本信息配置需要消费的主题及偏移量
        final Set<TopicPartition> assignment = c.assignment();
        for (TopicPartition topicPartition : assignment) {
            if ( topicPartition.topic().equals("test") ) {
                c.seek(topicPartition, 0);
            }
        }
        // TODO 拉取数据
        while (true) {
            final ConsumerRecords<String, String> poll = c.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : poll) {
                System.out.println( record.value() );
            }
        }
    }
}

在这里插入图片描述

2.6.3.4.4 偏移量提交

生产环境中,消费者可能因为某些原因或故障重新启动消费,那么如果不知道之前消费数据的位置,重启后再消费,就可能重复消费(earliest)或漏消费(latest)。所以Kafka提供了保存消费者偏移量的功能,而这个功能需要由消费者进行提交操作。这样消费者重启后就可以根据之前提交的偏移量进行消费了。注意,一旦消费者提交了偏移量,那么kafka会优先使用提交的偏移量进行消费。此时,auto.offset.reset 参数是不起作用的。

  • 自动提交

所谓的自动提交就是消费者消费完数据后,无需告知kafka当前消费数据的偏移量,而是由消费者客户端 API 周期性地将消费的偏移量提交到Kafka中。这个周期默认为5000ms,可以通过配置进行修改。

在这里插入图片描述

package com.atguigu.test;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class KafkaConsumerCommitAutoTest {
    public static void main(String[] args) {

        // TODO 创建消费者配置参数集合
        Map<String, Object> paramMap = new HashMap<>();
        paramMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // TODO 启用自动提交消费偏移量,默认取值为true
        paramMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        // TODO 设置自动提交offset的时间周期为1000ms,默认5000ms
        paramMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        paramMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

        // TODO 通过配置,创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(paramMap);
        // TODO 订阅主题
        consumer.subscribe(Arrays.asList("test"));

        while ( true ) {
            // TODO 消费数据
            final ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));

            // TODO 遍历数据
            for (ConsumerRecord<String, String> record : poll) {
                System.out.println( record );
            }
        }
    }
}
  • 手动提交

    • 基于时间周期的偏移量提交,是我们无法控制的,一旦参数设置的不合理,或单位时间内数据量消费的很多,却没有来及的自动提交,那么数据就会重复消费。所以Kafka也支持消费偏移量的手动提交,也就是说当消费者消费完数据后,自行通过API进行提交。不过为了考虑效率和安全,kafka同时提供了异步提交和同步提交两种方式供我们选择。注意:需要禁用自动提交auto.offset.reset=false,才能开启手动提交
  • 异步提交

    • 向Kafka发送偏移量offset提交请求后,就可以直接消费下一批数据,因为无需等待kafka的提交确认,所以无法知道当前的偏移量一定提交成功,所以安全性比较低,但相对,消费性能会提高

在这里插入图片描述

package com.atguigu.test;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class KafkaConsumerCommitASyncTest {
    public static void main(String[] args) {

        // TODO 创建消费者配置参数集合
        Map<String, Object> paramMap = new HashMap<>();
        paramMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // TODO 禁用自动提交消费偏移量,默认取值为true
        paramMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        paramMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        // TODO 通过配置,创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(paramMap);
        // TODO 订阅主题
        consumer.subscribe(Arrays.asList("test"));
        while ( true ) {
            // TODO 消费数据
            final ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
            // TODO 遍历处理数据
            for (ConsumerRecord<String, String> record : poll) {
                System.out.println( record );
            }
            // TODO 异步提交偏移量
            //     此处需要注意,需要在拉取数据完成处理后再提交
            //     否则提前提交了,但数据处理失败,下一次消费数据就拉取不到了
            consumer.commitAsync();
        }
    }
}
  • 同步提交
    • 必须等待Kafka完成offset提交请求的响应后,才可以消费下一批数据,一旦提交失败,会进行重试处理,尽可能保证偏移量提交成功,但是依然可能因为以外情况导致提交请求失败。此种方式消费效率比较低,但是安全性高。
package com.atguigu.test;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class KafkaConsumerCommitSyncTest {
    public static void main(String[] args) {

        // TODO 创建消费者配置参数集合
        Map<String, Object> paramMap = new HashMap<>();
        paramMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // TODO 禁用自动提交消费偏移量,默认取值为true
        paramMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        paramMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        // TODO 通过配置,创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(paramMap);
        // TODO 订阅主题
        consumer.subscribe(Arrays.asList("test"));
        while ( true ) {
            // TODO 消费数据
            final ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
            // TODO 遍历处理数据
            for (ConsumerRecord<String, String> record : poll) {
                System.out.println( record );
            }
            // TODO 同步提交偏移量
            //     此处需要注意,需要在拉取数据完成处理后再提交
            //     否则提前提交了,但数据处理失败,下一次消费数据就拉取不到了
            consumer.commitSync();
        }
    }
}

在这里插入图片描述

2.6.3.5 消费者事务

无论偏移量使用自动提交还是,手动提交,特殊场景中数据都有可能会出现重复消费。

在这里插入图片描述

如果提前提交偏移量,再处理业务,有可能出现数据丢失的情况。

在这里插入图片描述

对于单独的Consumer来讲,事务保证会比较弱,尤其是无法保证提交的信息被精确消费,主要原因就是消费者可以通过偏移量访问信息,而不同的数据文件生命周期不同,同一事务的信息可能会因为重启导致被删除的情况。

所以一般情况下,想要完成kafka消费者端的事务处理,需要将数据消费过程和偏移量提交过程进行原子性绑定,也就是说数据处理完了,必须要保证偏移量正确提交,才可以做下一步的操作,如果偏移量提交失败,那么数据就恢复成处理之前的效果。

对于生产者事务而言,消费者消费的数据也会受到限制。默认情况下,消费者只能消费到生产者提交的数据,也就是未提交完成的数据,消费者是看不到的。如果想要消费到未提交的数据,需要更高消费事务隔离级别

package com.atguigu.test;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

public class KafkaConsumerTransactionTest {
    public static void main(String[] args) {
        Map<String, Object> paramMap = new HashMap<>();
        paramMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        paramMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // TODO 隔离级别:已提交读,读取已经提交事务成功的数据(默认)
        //paramMap.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        // TODO 隔离级别:未提交读,读取已经提交事务成功和未提交事务成功的数据
        paramMap.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");
        paramMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        KafkaConsumer<String, String> c = new KafkaConsumer<String, String>(paramMap);
        c.subscribe(Collections.singletonList("test"));
        while (true) {
            final ConsumerRecords<String, String> poll = c.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : poll) {
                System.out.println( record.value() );
            }
        }
    }
}

在这里插入图片描述

2.6.3.6 偏移量的保存

由于消费者在消费消息的时候可能会由于各种原因而断开消费,当重新启动消费者时我们需要让它接着上次消费的位置offset继续消费,因此消费者需要实时的记录自己以及消费的位置。

0.90版本之前,这个信息是记录在zookeeper内的,在0.90之后的版本,offset保存在__consumer_offsets这个topic内。

每个consumer会定期将自己消费分区的offset提交给kafka内部topic:__consumer_offsets,提交过去的时候,keyconsumerGroupId+topic+分区号

在这里插入图片描述

value就是当前offset的值,kafka会定期清理topic里的消息,最后就保留最新的那条数据。

在这里插入图片描述

因为__consumer_offsets可能会接收高并发的请求,kafka默认给其分配50个分区(可以通过offsets.topic.num.partitions设置),均匀分配到Kafka集群的多个Broker中。Kafka采用hash(consumerGroupId) % __consumer_offsets主题的分区数来计算我们的偏移量提交到哪一个分区。因为偏移量也是保存到主题中的,所以保存的过程和生产者生产数据的过程基本相同。

在这里插入图片描述

2.6.3.7消费数据

消费者消费数据时,一般情况下,只是设定了订阅的主题名称,那是如何消费到数据的呢。我们这里说一下服务端拉取数据的基本流程。

在这里插入图片描述

(1) 服务端获取到用户拉取数据的请求

  • Kafka消费客户端会向Broker发送拉取数据的请求FetchRequest,服务端Broker获取到请求后根据请求标记FETCH交给应用处理接口KafkaApis进行处理。

(2) 通过副本管理器拉取数据

  • 副本管理器需要确定当前拉取数据的分区,然后进行数据的读取操作

(3) 判定首选副本

  • 2.4版本前,数据读写的分区都是Leader分区,从2.4版本后,kafka支持Follower副本进行读取。主要原因就是跨机房或者说跨数据中心的场景,为了节约流量资源,可以从当前机房或数据中心的副本中获取数据。这个副本称之未首选副本。

(4) 拉取分区数据

  • Kafka的底层读取数据是采用日志段LogSegment对象进行操作的。

(5) 零拷贝

  • 为了提高数据读取效率,Kafka的底层采用nio提供的FileChannel零拷贝技术,直接从操作系统内核中进行数据传输,提高数据拉取的效率。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1627475.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

凹凸技术揭秘·羚珑智能设计平台·逐梦设计数智化

从技术和功能形态层面&#xff0c;我们把设计数智化分成了两个方向&#xff0c;一个方向是「模板化设计」&#xff0c;另一个方向是「程序化设计」。 2、模板化设计— 「模板化设计」的核心目标&#xff1a;是实现线下设计物料的数字化&#xff0c;在数字化设计资产的基础之上…

WildCard开通GitHub Copilot

更多AI内容请关注我的专栏&#xff1a;《体验AI》 期待您的点赞&#x1f44d;收藏⭐评论✍ WildCard开通GitHub Copilot GitHub Copilot 简介主要功能工作原理 开通过程1、注册Github账号2、准备一张信用卡或虚拟卡3、进入github copilot页4、选择试用5、选择支付方式6、填写卡…

C语言:插入排序

插入排序 1.解释2.步骤3.举例分析示例结果分析 1.解释 插入排序是一种简单直观的排序算法&#xff0c;它的工作原理是通过构建有序序列&#xff0c;对于未排序数据&#xff0c;在已排序序列中从后向前扫描&#xff0c;找到相应位置并插入。插入排序在实现上&#xff0c;通常采…

SSH新功能揭秘:远程工作提升指南【AI写作】

首先&#xff0c;这篇文章是基于笔尖AI写作进行文章创作的&#xff0c;喜欢的宝子&#xff0c;也可以去体验下&#xff0c;解放双手&#xff0c;上班直接摸鱼~ 按照惯例&#xff0c;先介绍下这款笔尖AI写作&#xff0c;宝子也可以直接下滑跳过看正文~ 笔尖Ai写作&#xff1a;…

如何实现直播声卡反向给手机充电功能呢?

在数字化时代的浪潮中&#xff0c;声卡作为多媒体系统的核心组件&#xff0c;扮演着声波与数字信号相互转换的关键角色。它不仅能够将来自各类音源的原始声音信号转换为数字信号&#xff0c;进而输出到各类声响设备&#xff0c;更能够通过音乐设备数字接口(MIDI)发出合成乐器的…

多家企业机密数据遭Lockbit3.0窃取,亚信安全发布《勒索家族和勒索事件监控报告》

本周态势快速感知 本周全球共监测到勒索事件87起&#xff0c;与上周相比勒索事件大幅下降。美国依旧为受勒索攻击最严重的国家&#xff0c;占比45%。 本周Cactus是影响最严重的勒索家族&#xff0c;Lockbit3.0和Bianlian恶意家族紧随其后&#xff0c;从整体上看Lockbit3.0依旧…

BERT-CRF 微调中文 NER 模型

文章目录 数据集模型定义数据集预处理BIO 标签转换自定义Dataset拆分训练、测试集 训练验证、测试指标计算推理其它相关参数CRF 模块 数据集 CLUE-NER数据集&#xff1a;https://github.com/CLUEbenchmark/CLUENER2020/blob/master/pytorch_version/README.md 模型定义 imp…

如何安全进行速卖通自养号测评操作?

对于新加入的卖家而言&#xff0c;进行销量测评显得尤为关键。速卖通平台上的新店往往难以获得活动的扶持&#xff0c;且初始流量相当有限。因此&#xff0c;开店的首要任务便是积极展开测评工作&#xff0c;努力积累初始的评论和销售记录。测评的益处颇为显著&#xff0c;它不…

Android Dalvik虚拟机JNI方法的注册过程分析

Dalvik虚拟机在调用一个成员函数的时候&#xff0c;如果发现该成员函数是一个JNI方法&#xff0c;那么就会直接跳到它的地址去执行。也就是说&#xff0c;JNI方法是直接在本地操作系统上执行的&#xff0c;而不是由Dalvik虚拟机解释器执行。由此也可看出&#xff0c;JNI方法是A…

数据结构(九)---并查集

目录 1.集合 2.集合的相关操作 (1)查(Find)&#xff1a; •Find操作的优化 (2)并(Union)&#xff1a; •Union操作的优化 1.集合 数据元素之间的逻辑关系可以为集合&#xff0c;树形关系&#xff0c;线性关系&#xff0c;图关系。对于集合而言&#xff0c;一个集合可以划…

微信小程序:8.WXSS

WXSS和CSS的关系 WXSS具有CSS大部分特性&#xff0c;同时&#xff0c;WXSS还对CSS进行扩充以及修改&#xff0c;适应微信小程序的开发。 与CSS相比&#xff0c;WXSS扩展的特性有&#xff1a; rpx尺寸单位imprt样式导入 rpx尺寸单位 rpx是微信小程序中独有的&#xff0c;用来…

第三节课,后端登录【1】.2--本人

一、视频链接 网址&#xff1a; 后端用户脱敏和session-CSDN直播 二、代码开始 2.1 新建一个request参数。完成用户登录态键 快捷建&#xff0c; 全局变量 代码&#xff1a; // 3.记录用户的登录态/*** 这段代码是Java Web开发中的一部分&#xff0c;用于在会话&#xff08…

面试:finalize

一、概述 将资源释放和清理放在finalize方法中非常不好&#xff0c;非常影响性能&#xff0c;严重时甚至会引起OOM&#xff08;Out Of Memory&#xff09;&#xff0c;从Java9开始就被标注为Deprecated&#xff0c;不建议被使用了。 二、两个重要的队列 1、unfinalized 队列 当…

为什么 Facebook 不使用 Git?

在编程的世界里&#xff0c;Git 就像水一样常见&#xff0c;以至于我们认为它是创建和管理代码更改的唯一可行的工具。 前 Facebook 员工&#xff0c;2024 年 首先&#xff0c;我为什么关心&#xff1f; 我致力于构建 Graphite&#xff0c;它从根本上受到 Facebook 内部工具的…

FSMC读取FPGA的FIFO

一、硬件说明 FSMC配置 单片机的代码如下&#xff1a; #define VALUE_ADDRESS_AD1 (__IO uint16_t *)0x60400000while (1){if(!HAL_GPIO_ReadPin(GPIOF, GPIO_PIN_8)) //数据非空{data *(__IO uint16_t *)VALUE_ADDRESS_AD1;data2 *(__IO uint16_t *)VALUE_ADDRESS_AD1…

网上订餐系统,基于 SpringBoot+Vue+MySQL 开发的前后端分离的网上订餐系统设计实现

目录 一. 前言 二. 功能模块 2.1. 用户功能模块的实现 2.2. 管理员功能模块的实现 三. 部分代码实现 四. 源码下载 一. 前言 随着我国经济的飞速发展&#xff0c;人们的生活速度明显加快&#xff0c;在餐厅吃饭排队的情况到处可见&#xff0c;近年来由于新兴IT行业的空前…

python的turtle库画直线

1.画一条直线 让画笔从(0,0)划到&#xff08;100,100&#xff09;&#xff0c;在turtle中画笔是一只小乌龟。 import turtle turtle.setup(800,800,0,0)#turtle.setup(width,height,startx,starty)来设置窗口初始位置及大小 turtle.goto(100,100)2.画一条折线 left和right使小…

安装crossover游戏提示容量不足怎么办 如何把游戏放到外置硬盘里 Mac电脑清理磁盘空间不足

CrossOver作为一款允许用户在非原生操作系统上运行游戏和应用程序的软件&#xff0c;为不同平台的用户提供了极大的便利。然而&#xff0c;随着游戏文件大小的不断增加&#xff0c;内置硬盘的容量往往无法满足安装需求。幸运的是&#xff0c;通过一些简单的步骤&#xff0c;我们…

怎么样解决web图片加载未更新问题|浏览器图片未更新问题

问题列举 为什么我本地资源改变但是我在用 tomcat 预览网页时图片仍然未之前的图片? 为什么我当前网页的图片是之前的我换浏览器就变了? tomcat启动后&#xff0c;为什么访问项目中的图片无效解决? 解决方案 问题一 为什么我本地资源改变但是我在用 tomcat 预览网页时…

关于使用SpringSecurity框架发起JSON请求,但因登陆失效导致响应403的问题。

这里记录一个生产中遇到的一个问题。 现有环境是基于SpringBoot 2.6.8&#xff0c;然后是前后台一体化的项目。 安全框架使用的是内置版本的SpringSecurity。 在实际使用过程中遇到一个问题。 就是当用户登陆失效后&#xff0c;前端操作JSON请求获取列表数据&#xff0c;但…