Kafka原理之消费者

news2025/1/13 10:19:55

一、消费模式

1、pull(拉)模式(kafka采用这种方式)

consumer采用从broker中主动拉取数据。
存在问题:如果kafka中没有数据,消费者可能会陷入循环中,一直返回空数据

2、push(推)模式

由broker决定消息发送频率,很难适应所有消费者的消费速率。

二、总体工作流程

案例一:单独消费者,并订阅主题

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class KafkaConsumerTest {

	public static void main(String[] args) {
		Properties properties  = new Properties();
		//集群地址
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
		//反序列化方式
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		//消费者组,必须指定
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
		//创建消费者
		KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
		//订阅主题
		List<String> topicList = new ArrayList<>();
		topicList.add("first");
		kafkaConsumer.subscribe(topicList);
		//消费数据
		while (true){
			try {
				ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
				for (ConsumerRecord<String, String> record : consumerRecords) {
					System.out.println(record.key() + "---------" + record.value());
				}
			}catch (Exception e){
				e.printStackTrace();
			}
		}
	}
}

控制台输出
image.png

案例二:单独消费者,订阅主题+分区

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class KafkaConsumerTest {

	public static void main(String[] args) {
		Properties properties  = new Properties();
		//集群地址
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
		//反序列化方式
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		//消费者组,必须指定
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
		//创建消费者
		KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
		//订阅主题+分区
		List<TopicPartition> topicPartitionList = new ArrayList<>();
		topicPartitionList.add(new TopicPartition("first", 0));
		kafkaConsumer.assign(topicPartitionList);
		//消费数据
		while (true){
			try {
				ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
				for (ConsumerRecord<String, String> record : consumerRecords) {
					System.out.println(record.key() + "---------" + record.value());
				}
			}catch (Exception e){
				e.printStackTrace();
			}
		}
	}
}

只消费了发往分区0的数据
image.pngimage.png

案例三:消费者组

启动多个消费案例一的消费者,会自动指定消费的分区(partition)
启动3个消费者,一个消费者消费一个分区

image.png

三、消费者组

由多个consumer组成(条件:groupid相同),是逻辑上的一个订阅者。

  • 每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费
  • 消费者组之间互不影响

1、初始化流程

coordinator:辅助实现消费者组的初始化和分区的分配
coordinator节点选择=groupid的hashCode值%50(__consumer_offsets的分区数量)
例如:groupid的hashCode=1,1%50=1,那么__consumer_offsets主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大,消费者组下所有的消费者提交offset的时候,就往这个分区去提交offset

  • 1.组内每个消费者向选中的coordinator节点发送joinGroup请求
  • 2.coordinator节点选择一个consumer作为leader
  • 3.coordinator节点把要消费的topic情况,发送给消费者leader
  • 4.消费者leader负责制定消费方案
  • 5.把消费方案发送给coordinator节点
  • 6.coordinator节点把消费方案发送给各consumer
  • 7.每个消费者都会和coordinator节点保持心跳(默认3s),一旦超时(session.timeout.ms=45s),该消费者会被移除,并触发再平衡;或者消费者处理的时间过长(max.poll.interval.ms=5分钟),也会被移除,并触发再平衡

2、分区分配以及再平衡

到底由哪个消费者来消费哪个partition的数据

  • 分配策略:Range、RoundRobin、Sticky、CooperativeStick
  • 配置参数:partition.assignment.strategy(默认:Range+CooperativeStick)
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;

public class KafkaConsumerTest {

	public static void main(String[] args) {
		Properties properties  = new Properties();
		//集群地址
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
		//反序列化方式
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		//消费者组,必须指定
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
		//设置分区分配策略,多个策略使用逗号拼接
		properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
		//创建消费者
		KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
		//订阅主题
		List<String> topicList = new ArrayList<>();
		topicList.add("first");
        //再平衡的时候,会触发ConsumerRebalanceListener
		kafkaConsumer.subscribe(topicList, new ConsumerRebalanceListener() {
			// 重新分配完分区之前调用
			@Override
			public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
				System.out.println("==============回收的分区=============");
				for (TopicPartition partition : partitions) {
					System.out.println("partition = " + partition);
				}
			}

			// 重新分配完分区后调用
			@Override
			public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
				System.out.println("==============重新得到的分区==========");
				for (TopicPartition partition : partitions) {
					System.out.println("partition = " + partition);
				}
			}
		});
		//消费数据
		while (true){
			try {
				ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
				for (ConsumerRecord<String, String> record : consumerRecords) {
					System.out.println(record);
				}
			}catch (Exception e){
				e.printStackTrace();
			}
		}
	}
}

range

  • 分配策略:对同一个topic里面的分区序号排序,对消费者按字母排序,通过partition数量/consumer数量(如果除不尽,那么前面几个消费者将会多消费1个分区)

这个只是针对一个topic而言,C0消费者多消费一个分区影响不是很大,但是如果这个消费者组消费多个topic,容易产生数据倾斜

  • 再平衡机制:某一个消费者挂掉后,45秒内产生的数据,将会由某一个消费者代为消费;45秒后产生的数据,会重新分配

RoundRobin

  • 分配策略:对集群中所有的Topic而言,把所有的partition和所有的consumer都列出来,然后按照hashCode进行排序,最后通过轮询算法来分配partition给各个消费者
  • 再平衡机制:轮询分配(不是按数据,是按分区)

Sticky

  • 分配策略:分配带粘性,执行一次新的分配时,考虑原有的分配
  • 再平衡机制:打散,尽量均匀分配(不是按数据,是按分区)

四、offset

1、默认维护位置

主题:__consumer_offset
key:group.id + topic + 分区号
value:当前offset的值

每隔一段时间,kafka内部会对这个topic进行压缩(compact),也就是每一个group.id + topic + 分区号保留最新数据

2、自动提交offset

是否开启自动提交:enable.auto.commit默认true
自动提交时间间隔:auto.commit.interval.ms默认5s

基于时间的提交,难以把握

3、手动提交offset

类别:同步提交(commitSync)、异步提交(commitAsync)
相同点:提交一批数据的最高偏移量
不同点:同步阻塞当前现场,失败会自动重试;异步没有重试机制,可以提交失败。

4.指定offset消费

如果没有初始偏移量(消费者第一次消费)或者服务器上不存在当前偏移量(被删除),如何指定offset进行消费
auto.offset.reset=earliest(默认) | latest | none
在代码中设置方式为properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest")

  • earliest:自动将偏移量重置为最早的偏移量(--from-beginning)
  • latest:自动将偏移量重置为最新的偏移量
  • none:没有偏移量,抛出异常

除了这三中,还可以自己来指定位置或者指定时间
指定位置开始消费案例:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

public class KafkaConsumerTest {

	public static void main(String[] args) {
		Properties properties  = new Properties();
		//集群地址
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
		//反序列化方式
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		//消费者组,必须指定
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
		//创建消费者
		KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
		//订阅主题
		List<String> topicList = new ArrayList<>();
		topicList.add("first");
		kafkaConsumer.subscribe(topicList);
		Set<TopicPartition> assignment = new HashSet<>();
		while (assignment.size() == 0){
			kafkaConsumer.poll(Duration.ofSeconds(1));
			//获取到消费者分区分配信息(有了分区分配信息才能开始消费)
			assignment = kafkaConsumer.assignment();
		}
		//遍历所有分区,并指定offset从100的位置开始消费
		for (TopicPartition partition : assignment) {
			kafkaConsumer.seek(partition, 100);
		}

		//消费数据
		while (true){
			try {
				ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
				for (ConsumerRecord<String, String> record : consumerRecords) {
					System.out.println(record);
				}
			}catch (Exception e){
				e.printStackTrace();
			}
		}
	}
}

指定时间开始消费案例:把指定的时间转为offset

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

public class KafkaConsumerTest {

	public static void main(String[] args) {
		Properties properties  = new Properties();
		//集群地址
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
		//反序列化方式
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		//消费者组,必须指定
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
		//创建消费者
		KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
		//订阅主题
		List<String> topicList = new ArrayList<>();
		topicList.add("first");
		kafkaConsumer.subscribe(topicList);
		Set<TopicPartition> assignment = new HashSet<>();
		while (assignment.size() == 0){
			kafkaConsumer.poll(Duration.ofSeconds(1));
			//获取到消费者分区分配信息(有了分区分配信息才能开始消费)
			assignment = kafkaConsumer.assignment();
		}
		HashMap<TopicPartition, Long> timestampMap = new HashMap<>();
		for (TopicPartition partition : assignment) {
			//一天前的毫秒数
			timestampMap.put(partition, System.currentTimeMillis() - 1*24*3600*1000);
		}
		//获取毫秒数对应的offset位置
		Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampMap);
		OffsetAndTimestamp offsetAndTimestamp;
		//给每个patition设置offset位置
		for (TopicPartition partition : assignment) {
			offsetAndTimestamp = offsetAndTimestampMap.get(partition);
			kafkaConsumer.seek(partition, offsetAndTimestamp.offset());
		}

		//消费数据
		while (true){
			try {
				ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
				for (ConsumerRecord<String, String> record : consumerRecords) {
					System.out.println(record);
				}
			}catch (Exception e){
				e.printStackTrace();
			}
		}
	}
}

五、消费者事务

使用消费者事务,进行精准一次消费,将消费过程和提交offset过程做原子操作绑定。解决重复消费和漏消费问题

  • 重复消费:由自动提交offset引起。
  • 漏消费:设置手动提交offset,提交offset时,数据还未落盘,消费者进程被kill,那么offset已经提交,但是数据未处理,导致这部分内存中数据丢失

六、数据挤压

  • 消费能力不足:增加分区数量,同时提高消费者数量(注意:分区数量≥消费者数量)
  • 处理不及时: 拉去数据 / 处理时间 < 生产速度 拉去数据/处理时间<生产速度 拉去数据/处理时间<生产速度,提高每批次拉去的数量。fetch.max.bytes(一次拉取得最大字节数,默认:5242880=50m)max.poll.records(一次poll数据最大条数,默认:500条)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/524949.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【MySQL】 InnoDB

学习笔记&#xff0c;来源黑马程序员MySQL教程 文章目录 逻辑存储结构架构内存架构磁盘结构后台线程 事务原理概述redo logundo log MVCC基本概念实现原理1、隐藏字段2、undo log3、readview 总结 逻辑存储结构 一个表空间对应一张表一 页 对应B树上一个 节点Trx id&#xff1a…

Git cat命令的用法

cat (全称 concatenate) 命令是 Linux/类 Unix 操作系统中最常用的命令之一。cat 命令允许我们创建单个或多个文件、查看文件内容、连接文件和重定向终端或文件中的输出。 语法&#xff1a; cat [OPTION] [FILE]...1.终端查看一个文件内容 cat file01.txt2.终端查看多个文件…

熵、信息量、条件熵、联合熵、互信息简单介绍

熵、信息量、条件熵、联合熵、互信息简单介绍 近期在看对比学习论文&#xff0c;发现有不少方法使用了互信息这种方式进行约束&#xff0c;故在此整理一下网上查阅到的关于互信息的相关内容。 一、熵、信息量 关于熵的讨论&#xff0c;这个知乎专栏写的挺不错的。 熵在信息论…

【更新日志】填鸭表单TduckPro v5.1 更新

hi&#xff0c;各位Tducker小伙伴。 填鸭表单pro迎来了v5.1版本&#xff1b;本次我们进行了许多的功能新增和优化&#xff0c;能够让我们在日常使用中获得更好的体验。 让我们一起来康康新功能吧。 01 新增Pro功能 新增登录后才能填写表单。 新增表单卡片一键发布。 新增矩…

【C++学习】CC++内存管理

目录 一、C&C内存管理 二、C语言中动态内存管理方式&#xff1a;malloc/calloc/realloc/free 三、C内存管理方式 3.1 new/delete操作内置类型 3.2 new和delete操作符自定义类型 四、operator new与operator delete函数 4.1 operator new与operator delete函数&#x…

【云原生】使用外网Rancher2.5.12在阿里云自建内网K8s 1.20集群

目录 一、目标二、解决方案三、草图四、版本信息五、资源规划六、必要条件七、开始部署1、安装Docker2、安装Rancher3、解析Rancher Server URL域名4、创建K8s集群5、注册K8s集群节点 八、验证 一、目标 在云平台搭建一套高可用的K8s集群 二、解决方案 第一种&#xff1a;使…

横向移动-利用IPC$

环境主机 本次都是在内网自己搭的靶机实验 上线主机&#xff1a;windows2008R2 - 192.168.31.46 需要移动到的主机&#xff1a;windows2012 - 192.168.31.45 实验演示 1.确定域控 通过命令net time /domain&#xff0c;发现存在域 这里我们通过ping来发现域控的ip&#xff0c;…

UGUI Scroll Rect滚动矩形组件

1、概述 当需要在小区域显示占用大量空间的内容时&#xff0c;可以使用Scroll Rect。滚动矩形提供了滚动浏览此内容的功能。 通常&#xff0c;将Scroll Rect与Mask结合在一起以创建滚动视图&#xff0c;在该视图中&#xff0c;只有Scroll Rect内部的可滚动内容可见。它也可以…

类和对象【1】

全文目录 引言&#xff08;初识面向对象&#xff09;类和对象定义类访问限定及封装类定义的两种方式 类实例化与类对象大小this指针 总结 引言&#xff08;初识面向对象&#xff09; C语言是面向过程的&#xff0c;关注的是过程&#xff0c;分析出求解问题的步骤&#xff0c;通…

NSSCTF之Misc篇刷题记录⑩

NSSCTF之Misc篇刷题记录⑩ [CISCN 2022 初赛]ez_usb[SWPUCTF 2021 新生赛]你喜欢osu吗&#xff1f;[SWPUCTF 2021 新生赛]Bill[SWPUCTF 2021 新生赛]二维码不止有二维码[HGAME 2022 week1]好康的流量[红明谷CTF 2022]MissingFile[广东省大学生攻防大赛 2021]这是道签到题[羊城杯…

TOGAF架构开发方法—阶段 F:迁移规划

本章介绍迁移规划;也就是说&#xff0c;如何通过最终确定一个 详细的实施和迁移计划。 一、目标 F阶段的目标是&#xff1a; 最终确定架构路线图以及支持实施和迁移计划确保实施和迁移计划与企业的管理和实施方法相协调 企业整体变更组合的变化确保关键利益相关者了解工作包和…

【什么是蜂窝移动网络】

从 DataReportal 2021 年 1 月的统计数据来看&#xff0c;全球 78 亿人口中&#xff0c;有 52 亿手机用户&#xff0c;46 亿互联网用户。能够接入网络的设备越来越多&#xff0c;体量越来越大&#xff0c;不知道你有没有好奇过&#xff0c;这样一个庞大的世界是如何被构造出来的…

【Linux】指令(下)

⭐博客主页&#xff1a;️CS semi主页 ⭐欢迎关注&#xff1a;点赞收藏留言 ⭐系列专栏&#xff1a;Linux ⭐代码仓库&#xff1a;Linux 家人们更新不易&#xff0c;你们的点赞和关注对我而言十分重要&#xff0c;友友们麻烦多多点赞&#xff0b;关注&#xff0c;你们的支持是我…

论文阅读:Multimodal Graph Transformer for Multimodal Question Answering

文章目录 论文链接摘要1 contribution3 Multimodal Graph Transformer3.1 Background on Transformers3.2 Framework overview 框架概述3.3 Multimodal graph construction多模态图的构建Text graphSemantic graphDense region graph Graph-involved quasi-attention 总结 论文…

【AIGC提示工程 - MidJourney教程:一】“Midjourney AI“是什么,为何众人皆谈?

关注元壤教育公众号系统学习AIGC提示工程课程。 更多AIGC好博客&#xff0c;请移步访问AIGC博客派 Midjourney AI是一个极富创造性的工具&#xff0c;它能够帮助用户通过指令创建图像。这些图像是基于用户的想象力而创造的。 在本文中&#xff0c;我们将详细了解Midjourney AI。…

软件测试面试面对史上最难求职季,会哪些测试技能更容易拿到offer?

在一线大厂&#xff0c;没有测试这个岗位&#xff0c;只有测开这个岗位。这几年&#xff0c;各互联网大厂技术高速更新迭代&#xff0c;软件测试行业也 如果你在中小型公司&#xff0c;普通的测试工程师20K差不多到极限了&#xff0c;薪资想再进一步提升很困难。而在阿里巴巴P…

【AIGC提示工程 - MidJourney教程:二】《MidJourney参数大全指南:实现最佳图像输出的关键》

关注元壤教育公众号系统学习AIGC提示工程课程。 更多AIGC好博客&#xff0c;请移步访问AIGC博客派 这篇文章介绍了不同的MidJourney参数和提示词&#xff0c;帮助你创建你选择的图像。探索如何使用不同的风格和参数进行操作。 如果你使用Midjourney应用&#xff0c;你就知道提示…

NetSuite SuiteQL 内建函数

之前写过一篇文章介绍SutieQL Query Tool&#xff0c;今天继续挖掘一下SuiteQL的价值。 NetSuite SuiteQL Query Tool_netsuite好用吗_毛岩喆的博客-CSDN博客这是一个非常好的NetSuite数据查询工具&#xff0c;免费、强大&#xff01;所以忍不住安利给大家。首先介绍一下背景&…

Redis系列--redis持久化

一、为什么需要持久化 redis本身运行时数据保存在内存中&#xff0c;如果不进行持久化&#xff0c;那么在redis出现非正常原因宕机或者关闭redis的进程或者关闭计算机后数据肯定被会操作系统从内存中清掉。当然&#xff0c;redis本身默认采用了一种持久化方式&#xff0c;即RD…

11.Kafka系列之Stream实践

Kafka Streams是一个基于Apache Kafka的处理库&#xff0c;可以用于实现高效、可扩展的实时数据处理应用程序。它是一个轻量级的库&#xff0c;允许你在Java和Scala中创建和运行流处理应用程序&#xff0c;这些应用程序可以读取输入流&#xff0c;执行各种数据转换&#xff0c;…