Kafka 消费者组开发

news2025/1/11 20:44:41

Kafka consumer - 消费者组

上一篇文章学习到kafka消费者、消费者组之间处理消息的差异,总结起来就是:

  • 同一个消费组的不同消费实例 共同消费topiic的消息, 一个消息只会消费一次; 也叫做集群消费
  • 同一个消息被不同的消费组同时消费,一个消息会消费多次; 也叫做广播消费

今天以实际代码案例来学习一下,二者之间的区别。在开始之前,先创建一个分区为2的topic

./bin/kafka-topics.sh --create --topic topic_t40 --bootstrap-server localhost:9092 --partitions 2

创建完成后,查看topic信息

./bin/kafka-topics.sh --describe --topic topic_t40 --bootstrap-server localhost:9092

Topic: topic_t40	TopicId: 4-8Xi003Te6i0lEV4YwDHQ	PartitionCount: 2	ReplicationFactor: 1	Configs: 
	Topic: topic_t40	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: topic_t40	Partition: 1	Leader: 0	Replicas: 0	Isr: 0

集群消费

生产者代码

public static void main(String[] args) throws Exception{
        String topicName = "topic_t40";

        Properties props = new Properties();
        //指定kafka 服务器连接地址
        props.put("bootstrap.servers", "localhost:9092");
        // 序列化方式
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 100; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topicName ,"key" + i,"message : " + i);
            Future<RecordMetadata> send = producer.send(record);
            RecordMetadata metadata = send.get();
            System.out.println(String.format("sent record(key=%s value=%s) 分区数: %d, 偏移量: %d, 时间戳: %d",
                    record.key(), record.value(),
                    metadata.partition(),metadata.offset(), metadata.timestamp()
            ));
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println("Message sent successfully");
        producer.close();
    }

消费者代码

  • 第一个消费者 - ConsumerExample.java

    public class ConsumerExample {
    
        public static void main(String[] args){
    
            String topicName = "topic_t40";
    
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "app_c");
            props.put("client.id", "client_01");
            props.put("enable.auto.commit", true);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,   StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topicName));
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                    records.forEach(record -> {
                        System.out.println("Message received " + record.value() + ", partition " + record.partition());
                    });
                }
            }finally {
                consumer.close();
            }
        }
    }
    
  • 第二个消费者 - ConsumerExample02.java

    // 其他代码 一致 只需要替换下面一行代码即可
     props.put("client.id", "client_02");
    

验证测试

先启动两个消费者程序,然后启动生产者,两个消费者控制台打印输出截图如下

在这里插入图片描述

在这里插入图片描述

总结

从上面的日志可以分析出,kafka是以分区为维度来进行多进程消费的,topic 两个分区,两个消费者。即每一个消费者实例分担topic一个或者多个分区数据,从而最终合力达成集群消费的目的。

广播消费

消费者代码

广播消费的代码跟集群消费差不多,唯一需要更改的是将 group.id 改成不一样即可

  • 第一个消费者 - ConsumerExample.java

    //...
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "app_h");
    props.put("client.id", "client_01");
    //...
    
  • 第二个消费者 - ConsumerExample02.java

    //...
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "app_g");
    props.put("client.id", "client_01");
    //...
    

验证测试

生产者代码跟之前一致,无需做任何更改。先启动两个消费者程序,然后启动生产者,两个消费者控制台打印输出截图如下

在这里插入图片描述

在这里插入图片描述

Rebalance 重平衡

kafka 消费者组 rebalance 重平衡本质上是一组协议,该协议主要确定消费者组多个实例时间如何分配订阅topic所有分区数据。

在这里插入图片描述

如上图,topic下面有4个分区,一个消费者组下面有两个消费者,那么正常情况下每个消费者消费2个分区。但是当某个消费者意外宕机的情况下,kafka会感知到消费这的下线情况,此时,存活的消费者组将消费topic所有分区的数据。简单地理解,这就是Rebalance重平衡做的事情。

触发条件

  • 消费者组发生变更 - 如加入新的消费者实例;消费者实例崩溃等
  • 订阅关系发生变化 - 如使用基于正则表达式的订阅,当匹配新topic时 触发重平衡
  • topic 分区数发生变化 - 对已有topic集群进行动态扩容 触发重平衡

重平衡日志

手动关闭其中任何一个消费者,等待一会儿(45秒),观察kafka服务器日志,显示已经发生重平衡。集群模式下,此时存在的消费者实例将消费topic所有分区数据。

在这里插入图片描述

Preparing to rebalance group app_g in state PreparingRebalance with old generation 1 (__consumer_offsets-9) (reason: removing member client_01-ecec69f7-b72d-4184-a4ec-5e57d26a85f1 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)

session.timeout.ms

kafka消费者管理者用于检测客户端故障的时间间隔。一般而言客户端发送周期性心跳给服务端,表示其存活状态。如果在会话超时到期之前服务端没有收到心跳,则服务端将从消费者组中删除该客户端,并重新启动重平衡。默认值 45秒

重平衡监听器

kafka提供接口对重平衡进行监听,rebalance监听器有一个主要的接口回调类 - ConsumerRebalanceListener,该类定义了两个方法

public interface ConsumerRebalanceListener {

    /**
     * 用户可以实现的回调方法,以提供对自定义存储的偏移提交的处理。当消费者必须放弃某些分区时,将在重新平衡操作期间调用此方法。
     */
    void onPartitionsRevoked(Collection<TopicPartition> partitions);

    /**
     * 用户可以实现一种回调方法,以在分区重新分配成功后提供自定义偏移量的处理。此方法将在分区重新分配完成后、使用者开始获取数据之前调用,并且仅作为轮询(长)调用的结果。
     */
    void onPartitionsAssigned(Collection<TopicPartition> partitions);

    /**
     * 可以实现一个回调方法,为已经重新分配给其他使用者的分区提供清理资源的处理。在正常执行期间不会调用此方法,因为所拥有的分区将首先通过调用onPartitionsRevoked来撤销,然后在重新平衡事件期间重新分配给其他使用者。然而,在例外情况下,当消费者意识到不再拥有此分区时,即不会通过正常的重新平衡事件撤销,则会调用此方法。
     */
    default void onPartitionsLost(Collection<TopicPartition> partitions) {
        onPartitionsRevoked(partitions);
    }
}

代码测试 - 使用集群消费模式,修改订阅代码,然后一次启动两个消费者

consumer.subscribe(Arrays.asList(topicName),new ConsumerRebalanceListener(){
            /**
             * 开启新一轮重平衡前调用
             */
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                partitions.stream().forEach(p -> {
                    System.out.println("开始进行重平衡: " + p.topic() + "," + p.partition());
                });
            }
            // 重平衡结束后调用
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                partitions.stream().forEach(p -> {
                    System.out.println("重平衡结果: " + p.topic() + "," + p.partition());
                });
            }
        });

测试场景:

  • 分别启动消费者实例 - 中间间隔1分钟

在这里插入图片描述

  • 关闭一个消费者实例 - 等待一分钟

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/118200.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

机器学习基石1(ML基本概念和VC dimension)

文章目录一、什么是机器学习?二、什么时候可以使用机器学习?三、感知机perceptron四、机器学习的输入形式五、机器真的可以学习吗&#xff1f;六、vc dimension一、什么是机器学习? 其实第一个问题和第二个问题是穿插到一块儿回答的&#xff0c;首先机器学习要解决的是常规的…

RedisTemplate操作redis

目录 Redis Repositories方式 a、启用 Repository 功能 b、注解需要缓存的实体 c、创建一个 Repository 接口 d、测试类中测试 Redis Repositories方式 Spring Data Redis 从 1.7 开始提供 Redis Repositories &#xff0c;可以无缝的转换并存储 domain objects&#xff0…

TOPSIS法(熵权法)(模型+MATLAB代码)

TOPSIS可翻译为逼近理想解排序法&#xff0c;国内简称为优劣解距离法 TOPSIS法是一种常用的综合评价方法&#xff0c;其能充分利用原始数据的信息&#xff0c;其结果能精确地反映各评价方案之间的距离 一、模型介绍 极大型指标&#xff08;效益型指标&#xff09; &#xff…

OR-Tools工具介绍以及实战(从入门到超神Python版)

目录前言0、安装一、什么是优化问题&#xff1f;1-1、优化问题介绍1-2、举例1-2-1、导入所需要的库1-2-2、声明求解器1-2-3、创建变量1-2-4、定义约束条件1-2-5、定义目标函数1-2-6、调用求解器&#xff0c;并且显示结果。二、python有关于各种优化问题示例2-1、简单的线性编程…

AlexNet 深度学习奠基作之一(1)

2012年发表 imagenet 一百二十万的图片 1000个种类 cv 对于刷榜 是非常在意的 AlexNet 有六千万的参数&#xff0c;和65000的神经元 为了减少过拟合 我们使用了dropout 很work 很有效 而且 这篇文章没有conclusion 只有 讨论 当他发现 从5个层里面去掉一个 performence会降…

ubuntu设置core文件

长久不写代码&#xff0c;突然发现ubuntu当前运行目录下没有生成core文件&#xff0c;记录如下&#xff0c;避免下次重找。 1、首先开启core文件权限&#xff1a; ulimit -c 如果结果为0&#xff0c; 则表示没有开启&#xff0c;需要开启&#xff0c;如下&#xff1a; 通过u…

【Linux】Linux系统SSH客户端断开后保持进程继续运行配置方法;Python等脚本在终端后台运行的方法

0. 概述 在Linux系统中&#xff0c;通常我们在执行一些运行时间比较长的任务时&#xff0c;必须等待执行完毕才能断开SSH连接或关闭客户端软件&#xff0c;否则可能会导致执行中断。本文介绍几种保障程序在用户退出登录后持续运行的方法。 一、方法 1. 使用nohup执行 nohup…

网络编程 事件选择模型

目录 1.概念分析 2.事件选择代码逻辑 1.WSACreateEvent函数 2.WSACloseEvent函数 3.WSAEventSelect函数 4.WSAWaitForMultipleEvents()函数 5.WSAEnumNetworkEvents函数 事件分类 3.##模型代码样例 1.概念分析 本质上是操作系统处理用户行为&#xff0c;详细如下 事件选…

目标检测之YOLOv5算法分析

YOLOv5共有5个版本的网络模型及其权重文件&#xff0c;即&#xff08;n,s,m,l,x&#xff09;。 &#xff08;下图来自github上yolov5官方开源项目的性能截图&#xff09; 其中n,s,m,l,x网络模型结构如出一辙&#xff0c;差异在参数上。另外的n6,s6,m6,l6,x6模型是对于更大分辨…

人生黄金十年,你有考虑来社科院与杜兰大学金融管理硕士项目汲取能量吗

在人生长河中&#xff0c;你觉得黄金的十年是哪个阶段呢&#xff1f;在一篇文章中看到人生最宝贵的十年&#xff0c;就是30岁到40岁这十年&#xff0c;一切都未确定&#xff0c;一切都还有机会&#xff0c;这个年龄段&#xff0c;寸阴寸金&#xff0c;流金年华&#xff0c;应该…

FFmpeg学习笔记--视频传输的基本概念

目录 1--容器&#xff08;container&#xff09;和文件&#xff08;file&#xff09; 2--媒体流&#xff08;stream&#xff09; 3--数据帧&#xff08;frame&#xff09;和数据包&#xff08;packet&#xff09;&#xff1a; 4--编解码器&#xff08;Codec&#xff09; 5…

7万人随访发现,每周高强度运动15分钟,死亡风险降低24%

*仅供医学专业人士阅读参考我们已经知道&#xff0c;无论是日常规律运动&#xff0c;还是周末集中一两天运动&#xff0c;只要每周能完成至少75-150分钟的高强度有氧运动&#xff0c;都可以降低全因死亡率和特定原因死亡率[1]。每周300-600分钟中强度运动或150-300分钟高强度运…

RocketMQ原理篇

文章目录broker与NameServerMessageQueue与Topic的关系生产者、消费者写入读取 消息CommitLog生产者消费者组broker与NameServer 基于 Dledger 实现 RocketMQ 高可用自动切换 broker 会每隔 30 秒向 NameServer 发送一个的心跳 &#xff0c;NameServer 收到一个心跳 会更新对…

kafka再浅析

在日常开发中&#xff0c;经常使用kafka&#xff0c;对它是既熟悉又陌生&#xff0c;下面继续聊&#xff0c;继续总结。 1、消息中间件 分布式消息是一种通信机制&#xff0c;和RPC、HTTP不一样&#xff0c;消息中间件采用分布式中间代理的方式进行通信。采用消息中间件后&…

MarkDown语法(自用)

目录结构展示 tree -a 显示所有tree -d 只显示文档夹tree -L n 显示项目的层级&#xff0c;n表示层级数&#xff0c;比如想要显示项目三层结构&#xff0c;可以用tree -l 3&#xff1b;tree -I pattern 用于过滤不想要显示的文档或者文档夹。比如你想要过滤项目中的 node_modu…

Linux之(17)系统服务

Linux之(17)系统服务 Author&#xff1a;onceday Date&#xff1a;2022年12月24日 漫漫长路&#xff0c;有多少人对你笑过… 参考文档&#xff1a; Systemd 入门教程&#xff1a;命令篇 - 阮一峰的网络日志 (ruanyifeng.com)可能是史上最全面易懂的 Systemd 服务管理教程&a…

MergeTree原理之一级索引

一级索引 MergeTree的主键使用PRIMARY KEY定义&#xff0c;待主键定义之后&#xff0c;MergeTree会依据index_granularity间隔&#xff08;默认8192行&#xff09;&#xff0c;为数据表生成一级索引并保存至primary.idx文件内&#xff0c;索引数据按照PRIMARY KEY排序。相比使…

【PotPlayer】采集Switch图像及录制

【PotPlayer】采集Switch图像及录制下载potplayer使用方法连接设备录制视频无边框设置阳&#xff0c;休&#xff0c;懂&#xff1f;QAQ。阳之前买了个Switch&#xff0c;正好有好玩的想录下来&#xff0c;然后就…自行某宝&#xff0c;某东去买个采集卡。本文只管连软件&#x…

【运维有小邓】ADSelfService Plus身份管理

一、身份管理挑战&#xff1a; 由于企业需要越来越高的安全性&#xff0c;以保护用户帐户免遭入侵者的任何恶意攻击&#xff0c;因此身份管理正日益变得重要。在所有密码相关的身份挑战中&#xff0c;帮助台工作单量成为重中之重&#xff0c;它们在组织的年同比财务预算中不堪…

(二)ElasticSearch使用

一、ES的基本使用 1.创建索引 创建一个test索引http://localhost:9200/test 2.删除索引 http://localhost:9200/test 3.查看索引 http://localhost:9200/_all 4.向索引中新增数据 http://localhost:9200/person/_doc/ 5.搜索数据 http://localhost:9200/person/_doc/_sear…