分布式消息队列Kafka(四)- 消费者

news2024/11/26 16:38:10

1.Kafka消费方式

在这里插入图片描述

2.Kafka消费者工作流程

(1)总体工作流程

在这里插入图片描述

(2)消费者组工作流程

在这里插入图片描述

3.消费者API

(1)单个消费者消费

在这里插入图片描述

实现代码

package com.zrclass.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumer {
	public static void main(String[] args) {
		// 1.创建消费者的配置对象
		Properties properties = new Properties();
		// 2.给消费者配置对象添加参数
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
		"hadoop102:9092");
		// 配置序列化 必须
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
		StringDeserializer.class.getName());
		
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
		StringDeserializer.class.getName());
		// 配置消费者组(组名任意起名) 必须
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
		// 创建消费者对象
		KafkaConsumer<String, String> kafkaConsumer = new 
		KafkaConsumer<String, String>(properties);
		// 注册要消费的主题(可以消费多个主题)
		ArrayList<String> topics = new ArrayList<>();
		topics.add("first");
		kafkaConsumer.subscribe(topics);
		// 拉取数据打印
		while (true) {
			// 设置 1s 中消费一批数据
			ConsumerRecords<String, String> consumerRecords = 
			kafkaConsumer.poll(Duration.ofSeconds(1));
			// 打印消费到的数据
			for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
				System.out.println(consumerRecord);
			}
		}
	} 
 }

(2)单个消费者指定分区消费

在这里插入图片描述

代码实现:

 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
 // 消费某个主题的某个分区数据
 ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
 topicPartitions.add(new TopicPartition("first", 0));
 kafkaConsumer.assign(topicPartitions);

(3)消费者组消费

在这里插入图片描述

复制上面CustomConsumer三个,同时去订阅统一个主题,消费数据,发现一个分区只能被一个消费者消费

4.kafka的分区分配及再平衡策略

(1)kafka的分区分配策略(默认策略是Range + CooperativeSticky)

  • 一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个 partition的数据。
  • Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。 可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用 多个分区分配策略。

在这里插入图片描述

1)每个consumer都发送JoinGroup请求

2)选出一个consumer作为leader

3)coordinator把要消费的topic情况发送给leader消费者

4)leader消费者会负责制定消费方案

5)leader消费者把消费方案发给coordinator

6)Coordinator把leader消费者的消费方案下发给各个consumer

7)每个消费者都会和coordinator保持心跳(默认3s),一旦超时 (session.timeout.ms=45s),该消费者会被移除,并触发再平衡; 或者消费者处理消息的过长(max.poll.interval.ms5分钟),也会触发再 平衡

在这里插入图片描述

(2)range分区策略及再平衡

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IxCP3AOK-1682405725356)(分布式消息队列Kafka.assets/image-20230425112937754.png)]

分区策略测试:

1)修改主题 first 为 7 个分区。

2)复制 CustomConsumer 类,创建由三个消费者 CustomConsumer、CustomConsumer1、CustomConsumer2 组成消费者组,组名都为“test”, 同时启动 3 个消费者。

3)启动 CustomProducer 生产者,发送 500 条消息,随机发送到不同的分区。

package com.zrclass.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class CustomProducer {
 public static void main(String[] args) throws InterruptedException {
	Properties properties = new Properties();
	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
	KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
	for (int i = 0; i < 7; i++) {
		kafkaProducer.send(new ProducerRecord<>("first", i, "test", "java"));
	}
	kafkaProducer.close();
 } 
}

4)观看 3 个消费者分别消费哪些分区的数据

CustomConsumer消费0,1,2号分区数据;

CustomConsumer1消费3,4分区数据;

CustomConsumer2消费5,6分区数据;

分区分配再平衡策略:

1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。

1 号消费者:消费到 3、4 号分区数据。

2 号消费者:消费到 5、6 号分区数据。

0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

2)再次重新发送消息观看结果(45s 以后)。

1 号消费者:消费到 0、1、2、3 号分区数据。

2 号消费者:消费到 4、5、6 号分区数据。

说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。

(3)RoundRobin 分区分配再平衡

轮询的方式

(4)Sticky 以及再平衡

粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前, 考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。 粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。

5.kafka offset偏移量

(1)kafaka内置主题__consumer_offsets

在这里插入图片描述

kafka内置主题__consumer_offsets里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。

(2)消费内置主题__consumer_offsets

0)思想:__consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。

1)在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,

默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。

2)采用命令行方式,创建一个新的 topic。

[zrclass@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server  hadoop102:9092 --create --topic first--partitions 2 --replication-factor 2 

3)启动生产者往 first生产数据。

[zrclass@hadoop102 kafka]$ bin/kafka-console-producer.sh --topic  first --bootstrap-server hadoop102:9092 

4)启动消费者消费 first数据。

[zrclass@hadoop104 kafka]$ bin/kafka-console-consumer.sh -- bootstrap-server hadoop102:9092 --topic first --group test 

注意:指定消费者组名称,更好观察数据存储位置(key 是 group.id+topic+分区号)。

5)查看消费者消费主题__consumer_offsets。

[zrclass@hadoop102 kafka]$ bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter 
"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageForm
atter" --from-beginning

[offset,first,1]::OffsetAndMetadata(offset=7, leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, expireTimestamp=None)

[offset,first,0]::OffsetAndMetadata(offset=8, leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, expireTimestamp=None)

(3)kafka默认自动提交偏移量

在这里插入图片描述

 // 配置消费者组
 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
 // 是否自动提交 offset
 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
 // 提交 offset 的时间周期 1000ms,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
 //3. 创建 kafka 消费者
 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
 //4. 设置消费主题 形参是列表
 consumer.subscribe(Arrays.asList("first"));

(4)手动提交offset

在这里插入图片描述

1)同步提交offset

由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。以下为同步提交 offset 的示例。

package com.zrclass.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumerByHandSync {
	public static void main(String[] args) {
		// 1. 创建 kafka 消费者配置类
		Properties properties = new Properties();
		// 2. 添加配置参数
		// 添加连接
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
		// 配置序列化 必须
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
		// 配置消费者组
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
		// 是否自动提交 offset
		properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
		//3. 创建 kafka 消费者
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
		//4. 设置消费主题 形参是列表
		consumer.subscribe(Arrays.asList("first"));
		//5. 消费数据
		while (true){
		// 读取消息
		ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
		// 输出消息
		for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
			System.out.println(consumerRecord.value());
		}
		// 同步提交 offset
		consumer.commitSync();
	  }
	}
 }

2)异步提交offset

虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。

// 异步提交 offset
 consumer.commitAsync();

(5)指定offset消费

auto.offset.reset = earliest | latest | none 默认是 latest。

当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?

1)earliest:自动将偏移量重置为最早的偏移量,–from-beginning。

2)latest(默认值):自动将偏移量重置为最新偏移量。

3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。

在这里插入图片描述

(6)指定消费时间重新消费

需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?

package com.zrclass.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class CustomConsumerForTime {
	public static void main(String[] args) {
		// 0 配置信息
		Properties properties = new Properties();
		// 连接
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
		// key value 反序列化
		
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
		// 1 创建一个消费者
		KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
		// 2 订阅一个主题
		ArrayList<String> topics = new ArrayList<>();
		topics.add("first");
		kafkaConsumer.subscribe(topics);
		Set<TopicPartition> assignment = new HashSet<>();
		while (assignment.size() == 0) {
		kafkaConsumer.poll(Duration.ofSeconds(1));
		// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
		assignment = kafkaConsumer.assignment();
		}
		HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
		// 封装集合存储,每个分区对应一天前的数据
		for (TopicPartition topicPartition : assignment) {
			timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
		}
		// 获取从 1 天前开始消费的每个分区的 offset
		Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);
		// 遍历每个分区,对每个分区设置消费时间。
		for (TopicPartition topicPartition : assignment) {
			OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
			// 根据时间指定开始消费的位置
			if (offsetAndTimestamp != null){
			kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
			}
		}
		// 3 消费该主题数据
		while (true) {
			ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
			for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
			System.out.println(consumerRecord);
			}
		}
	} 
 }

(7)重复消费和漏消费

重复消费:已经消费了数据,但是 offset 没提交。

漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。

在这里插入图片描述

怎么能做到既不漏消费也不重复消费呢?使用消费者事务。

如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset 过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比 如 MySQL)。

在这里插入图片描述

(8)消费者调优(提高吞吐量)

数据积压(消费者如何提高吞吐量)

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/460715.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Golang项目实战】手把手教你写一个备忘录程序|附源码——建议收藏

博主简介&#xff1a;努力学习的大一在校计算机专业学生&#xff0c;热爱学习和创作。目前在学习和分享&#xff1a;数据结构、Go&#xff0c;Java等相关知识。博主主页&#xff1a; 是瑶瑶子啦所属专栏: Go语言核心编程近期目标&#xff1a;写好专栏的每一篇文章 前几天瑶瑶子…

0Ω电阻在PCB板中的5大常见作用

在PCB板中&#xff0c;时常见到一些阻值为0Ω的电阻。我们都知道&#xff0c;在电路中&#xff0c;电阻的作用是阻碍电流&#xff0c;而0Ω电阻显然失去了这个作用。那它存在于PCB板中的原因是什么呢&#xff1f;今天我们一探究竟。 1、充当跳线 在电路中&#xff0c;0Ω电阻…

CCF-CSP 2013-12-3 最大的矩形(暴力枚举)

首先我们可以先根据数据范围反推时间复杂度&#xff0c;比如&#xff0c;数据范围n < 1000,我们可以将时间复杂度控制在O(n), O(n)logn 思路上比较容易想到的是枚举所有情况&#xff0c;然后输出面积最大的情况即可 可以在第一重循环枚举i&#xff0c;表示从第i个矩形开始往…

云HIS系统源码,部署云端,支持多租户,实现医疗数据共享与交换

云HIS系统源码&#xff0c;医院信息管理系统源码。采用云端SaaS服务的方式提供&#xff0c;采用前后端分离架构&#xff0c;前端由Angular语言、JavaScript开发&#xff1b;后端使用Java语言开发。 文末获取联系&#xff01; 基于云计算技术的B/S架构的云HIS系统&#xff0c;采…

zynqmp 外接fpga linux内核驱动修改

325t配置: 使用内核自带的linux-xlnx-xilinx-v2021.2/drivers/fpga/xilinx-spi驱动&#xff0c;做serial slave模式&#xff0c;设备树更改如下(根据 linux-xlnx-xilinx-v2021.2/Documentation/devicetree/bindings/fpga/xilinx-slave-serial.txt,修改)slave-serial需要将fpga的…

【Java实战篇】Day13.在线教育网课平台--生成支付二维码与完成支付

文章目录 一、需求&#xff1a;生成支付二维码1、需求分析2、表设计3、接口定义4、接口实现5、完善controller 二、需求&#xff1a;查询支付结果1、需求分析2、表设计与模型类3、接口定义4、接口实现步骤一&#xff1a;查询支付结果步骤二&#xff1a;保存支付结果&#xff08…

控制工程有哪些SCI期刊推荐? - 易智编译EaseEditing

控制工程是一门涵盖广泛的学科&#xff0c;其研究内容涉及控制理论、控制工程应用、自动化技术等多个方面&#xff0c;因此相关的SCI期刊也比较多。以下是一些推荐的控制工程SCI期刊&#xff1a; IEEE Transactions on Automatic Control&#xff1a; 该期刊是自动控制领域顶…

【springcloud微微服务】分布式事务框架Seata使用详解

目录 一、前言 二、事务简介 2.1 原子性 2.2 一致性 2.3 隔离性 2.4 持久性 三、分布式事务场景 3.1 分布式事务起源 3.2 分布式事务典型场景 3.2.1 跨库事务 3.2.2 分库分表 3.2.3 服务化 四、分布式事务常用解决方案 4.1 分布式事务理论基础 4.1.1 2PC两阶段提…

降低风险和最大化成功:如何解决项目管理中的成本差异

作为项目经理&#xff0c;你知道让项目按计划进行并按预算进行对于项目管理的成功至关重要。你可以使用的关键工具之一是成本差异分析。但成本差异到底是什么&#xff0c;如何利用它来发挥优势呢&#xff1f; 定义成本差异 成本差异是项目实际成本与预算或计划成本之间的差异…

Linux shell命令行基础

shell简介 shell 与内核沟通的界面、应用程序等。用于将用户操作传递给内核执行。 shell是面向过程 的若类型的解释性语言&#xff0c;不需要编译即可直接执行&#xff0c;常用于作脚本 Linux中的shell 在/etc/shells文件中 存储Linux包含的shell。 最常用的是bash&#xff0c;…

Docker虚拟化技术

1.3 Docker虚拟化技术概念 Docker是一款轻量级、高性能的虚拟化技术&#xff0c;是目前互联网使用最多的虚拟化技术&#xff0c;Docker虚拟化技术的本质类似集装箱机制&#xff0c;最早集装箱没有出现的时候&#xff0c;码头上有许多搬运的工人在搬运货物&#xff0c;集装箱出…

Karl Guttag:现有Micro LED/LCoS+光波导AR眼镜对比解析

轻量化是未来AR眼镜的发展趋势&#xff0c;为了缩减尺寸&#xff0c;AR眼镜厂商尝试了多种方案&#xff0c;长期来看Micro LED光机在小型化上更有优势&#xff0c;但现阶段LCoS光机的图像表现更好。在CES 2023期间&#xff0c;DigiLens、Lumus、Vuzix、OPPO、Avegant也展出了不…

进程通信(同一主机)

1.概述 进程通信机制包括&#xff1a; 传统的UNIX进程间通信&#xff1a;无名管道、有名管道、信号 System V 进程间通信&#xff1a;消息队列、信号量、共享内存 2.管道通信 2.1无名管道 1.特点 &#xff08;1&#xff09;适用具有亲缘关系的进程 &#xff08;2&#x…

C++高精度减法

高精度减法指的是大整数的相减&#xff0c;大整数是用基本数据类型无法存储其精度的整数&#xff0c;位数不超过10^6&#xff0c;注意是位数&#xff0c;不是数值的大小。 因为其精度超过基本的数据类型规定的大小&#xff0c;所以常规的计算方法是不可以实现的&#xff0c;这…

UE4 架构初识(二)

目录 UE4 引擎学习 一、架构基础 1. Pawn &#xff08;1&#xff09;DefaultPawn &#xff08;2&#xff09;SpectatorPawn &#xff08;3&#xff09;Character 2. AController 3. APlayerState 4. 总结 UE4 引擎学习 一、架构基础 1. Pawn UE也是从Actor中再派生…

Zynq-7000、国产zynq-7000的GPIO控制(二)

本文详细说明一下使用SDK中使用MIO/EMIO作为输入中断 SDK中使用MIO/EMIO作为输入中断 这个使用场景可以扩展到PL的可以通过EMIO或者MIO&#xff0c;告知PS中断来了&#xff0c;需要PS处理一些特定事物&#xff0c;当然也可以连接最简单的按键。 这个可以参考SDK自带例程来实…

Python入门教程+项目实战-11.2节: 元组的操作符

目录 11.2.1 元组的常用操作符 11.2.2 []操作符: 索引访问元组 11.2.3 [:]操作符&#xff1a;元组的切片 11.2.4 操作符&#xff1a;元组的加法 11.2.5 *操作符&#xff1a;元组的乘法 11.2.6 元组的关系运算 11.2.7 in操作符&#xff1a;查找元素 11.2.8 知识要点 11…

企业的信息化和数字化有什么区别

数字化是业务新的存在形式&#xff0c;如果说信息化是对业务的局部支撑&#xff0c;那么数字化就是对业务的整体重塑&#xff0c;这是数字化和信息化之间最大的区别&#xff0c;也决定了数字化转型在实施时有着与信息化建设完全不同的底层逻辑。信息化建设和数字化转型有着相同…

手把手教你编写SQLMap的Tamper脚本过狗

本文仅用于技术讨论与学习 测试环境 最新版某狗 测试方法 安全狗其实是比较好绕的WAF&#xff0c;绕过方法很多&#xff0c;但这里我们就用一种&#xff1a;注释混淆 一招鲜吃遍天 注释混淆&#xff0c;其实就是在敏感位置添加垃圾字符注释&#xff0c;常用的垃圾字符有/、…

关于Vue中使用全屏容器无法占满屏幕以及样式不生效问题解决方案

先来看示例问题 App.vue文件 global.css文件 网页效果 可以看到即使设置了宽度和高度为100%都无法占满屏幕&#xff0c;而且容器还超出了屏幕&#xff0c;上拉才可以看到下边框。查看网上解决方法&#xff1a; 1.height设置为100vh&#xff0c; 或者设置为calc&#xff08;10…