深入Kafka核心设计与实践原理读书笔记第三章消费者

news2024/12/25 1:40:14

消费者

消费者与消费组

消费者Consumer负责定于kafka中的主题Topic,并且从订阅的主题上拉取消息。与其他消息中间件不同的在于它有一个消费组。每个消费者对应一个消费组,当消息发布到主题后,只会被投递给订阅它的消费组的一个消费者。

  • 如果有某个主题有4个分区,P0,P1,P2,P3.有两个消费组A和B订阅了这个主题,A消费组有4个消费者,B消费组有2个消费者,那么A消费组中的4个消费者每一个都只会分配到一个分区,而B消费组中的2个消费者会分配到两个分区。

在这里插入图片描述

  • 如果所有消费者都属于一个消费者,那么所有的消息默认会均匀分配给每一个消费者。
  • 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者。
    PS:再均衡动作:解释一下名词,指的是当一个主题中有6个分区时,有一个消费组,这个消费组中只有一个消费者,那么主题中的6个分区的消息都会由同一个消费者来消费,当有一个新的消费者加入这个消费组之后,6个主题中会有3个分配个新的消费者,依次类推,这个动作被称为再均衡动作

必要参数说明

kafka消费者客户端有个4个必填参数

  1. bootstrapp.service:该参数的释义和生产者客户端的相同,用来指定链接kafka集群所需要的broker地址清单。
  2. group.id:消费者隶属的消费组名称,默认为""
  3. key.deserializer和value.deserializer与生产者相同。
    其他重要参数
  4. fetch.min.bytes:配置消费者在一次的poll中拉取的最小数据量 默认 1b
  5. fetch.max.bytes:配置消费者在一次的poll中拉取的最大数据量默认50MB.
  6. fetch.max.wait.ms :参数用于指定 Kafka 的等待时间,默认值为 500 )
  7. exclude.internal.topics:Kafka 中有两个内部的主题:一consumer_offsets tr ansaction state o exclude.internal.topics用来指定 Kafka 中的内部主题是否可以向消费者公开,默认值为 true 。如果设置 true ,那么只能使用 subscribe( Collection)的方式而不能使用subscribe(Pattern)的方式来订阅内部主题,设置为false 则没有这个限制。
  8. receive.buffer.bytes:这个参数用来设置 Socket 接收消息缓冲区的大小,默认值为 65536 (B) 如果设置为 -1,则使用操作系统的默认值。
  9. send.buffer.bytes:,这个参数用来设置 Socket 发送消息缓冲区的大小默认值为 13 1072 (B) ’
  10. request.timeout.ms: 这个参数用来配置 Consumer 等待请求响应的最长时间,默认值为 30000 ms )。
  11. metadata.max.age.ms: 这个参数用来配置元数据的过期时间,默认值为 300000 ms ),即5分钟。如果元数据在此参数所限定的时间范围内没有进行更新,则会被强制更新,即使没有任何分区变化或有新的broker 加入。
  12. reconnect.backoff.ms 这个参数用来配置尝试重新连接指定主机之前的等待时间(也称为退避时间〉,避免频繁地连接主机,默认值为 50 ms )。

订阅主题与分区

订阅主题通过subscribe()方法来订阅一个主题,可以是集合订阅多个主题,也可以是正则。

public void subscribe(Collection<String> topics,ConsumerRebalanceListenergy listener);
public void subscribe(Collection<String> topics);
public void subscribe(Pattern pattern ,ConsumerRebalanceListenergy listener);
public void subscribe(Pattern pattern);
  • 如果前后调用两次 subscribe方法 那么以后一次的为准。
    PS:ConsumerRebalanceListenergy listener 是用来设置相应的再均衡监听器

  • 这里还可以通过assign()方法来指定主题中特定的分区来定义。

public void assign(Collection<TopicPartition> partition);
  • 其中 partition是分区的集合。
  • TopicPartition类有两种属性 topic和partition,分别代表分区所属的主题和自己的分区偏移量也就是编号。
  • 通过partitionsFor(String topic)方法可以查询主题有多少个分区

取消订阅

  1. unsubscribe()方法取消订阅主题
  2. subscribe(new ArrayList<>());
  3. assign(new ArrayList<>());
    以上都可

反序列化

对应生产者的序列化器相反,用来把序列化的内容反序列化,至于序列化与反序列化请自行百度,基础概念不与重复。

消息消费

Kafka中消费方式采取的拉去式消费:消息的消费一般分为两种:拉取式和推送式。

  1. kefka中的消息消费是一个不断轮询的过程。需要重复的效用poll方法。
public ConsumerRecords<K,V> poll(final Duration timeout);
  • 其中timeOut 是用来限制poll方法的阻塞时间的
    其中 Duration 也有Long的方法,Long的timeOut是毫秒值,Duration 可以通过ofMillis、ofSeconds、ofMinutes
    、ofHours等方法来指定不同时间类型。
    ConsumerRecords类中还会提供一个方便开发人员用来对消息进行处理的:count()等 如有兴趣自定查看。

位移提交

offset偏移量也叫位移,消费者可以通过offset来指定消费分区中的某个消息所在的位置。

  • 每次调用poll方法返回的是未被消费的消息集,偏移量不仅要保存在内存中也要做持久化保存,否则消费者重启之后就无法知晓之前的消费位移,如果有新的消费者加入,那么必然会有再均衡动作,那么新加入的消费者也无法知晓之前的消费位移
  • 在旧消费者客户端中消费者偏移量存储在zk中,新版本存放在kafka的主题_consumer_offsets中,这个把偏移量存储起来的动作就时提交。

控制或关闭消费

KafkaConsumer提供了对消费速度进行控制的方法。使用pause()方法resume()方法来分别实现暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区想客户端返回数据的操作。

指定位置消费

对应消费位移,主要用在消费者重启之后出发了再均衡动作之后指定偏移量消费分区内消息。

消费者拦截器

对应生产者消费器,主要在消费到消息或提交消费位移的时候进行一些定制化操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/342943.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

go gin学习记录1

环境&#xff1a; MAC M1&#xff0c;Go 1.17.2&#xff0c;GoLand 默认执行指令的终端&#xff0c;如果没有特别说明&#xff0c;指的都是goland->Terminal 创建项目 Goland中新建项目&#xff0c;在$GOPATH/src/目录下建立t_gin项目。 进入项目&#xff0c;在goland的T…

spark04-文件读取分区数据分配原理

接 https://blog.csdn.net/oracle8090/article/details/129013345?spm1001.2014.3001.5502通过上一节知道 总字节数为7 每个分区字节数为3代码val conf: SparkConf new SparkConf().setMaster("local").setAppName("wordcount")val sc: SparkContext ne…

日日顺供应链|想要看清供应链发展趋势,先回答这三个问题

技术变革如何支撑供应链及管理服务的发展&#xff1f; 数字化与科技化开始承托供应链管理能力的升级与变革&#xff1f; 如何从客户需求的纬度反推供应链及管理服务的模式变革&#xff1f;在过去的三年中&#xff0c;我国的供应链企业经受了最为极端的挑战&#xff0c;但当下&a…

论文写作——公式编辑器、latex表格、颜色搭配器

1、公式编辑器(网页版mathtype可用于latex公式编辑): MathType demo - For DevelopersLive demonstration about the features of Mathtype which allows edition equations and formulas (PNG, flash, SVG, PDF, EPS), based on MathML and compatible with LaTeX.https:/…

C++之可调用对象、bind绑定器和function包装器

可调用对象在C中&#xff0c;可以像函数一样调用的有&#xff1a;普通函数、类的静态成员函数、仿函数、lambda函数、类的非静态成员函数、可被转换为函数的类的对象&#xff0c;统称可调用对象或函数对象。可调用对象有类型&#xff0c;可以用指针存储它们的地址&#xff0c;可…

孙子兵法-36计

目录 04、攻其无备&#xff0c;出其不意。——《孙子兵法始计篇》 08、不战而屈人之兵&#xff0c;善之善者也。——《孙子兵法谋攻篇》 09、上兵伐谋&#xff0c;其次伐交&#xff0c;其次伐兵&#xff0c;其下攻城。 01、兵者&#xff0c;国之大事&#xff0c;死生之地&…

想要的古风女生头像让你快速get

如今我看到很多人都喜欢用古风女生当作头像&#xff0c;那么今天我就来教大家如何快速得到一张超美的古风女生头像~ 上图就是我使用 APISpace 的 AI作画(图像生成)服务 快速生成的古风女生头像&#xff0c;不仅可以限定颜色&#xff0c;还可以选择『宝石镶嵌』或『花卉造型』这…

【HAL库】STM32CubeMX开发----STM32F407----Uart串口接收空闲中断

一、Uart串口接收空闲中断----详解 首先介绍串口通信的数据传输方式&#xff0c;这样后面的Uart串口空闲中断能更好的理解。 Uart串口通信----数据传输方式 串口通信的数据由发送设备通过自身的TXD接口传输到接收设备得RXD接口。 一个字符一个字符地传输&#xff0c;每个字符…

设计模式C++实现11:观察者模式

参考大话设计模式&#xff1b; 详细内容参见大话设计模式一书第十四章&#xff0c;该书使用C#实现&#xff0c;本实验通过C语言实现。 观察者模式又叫做发布-订阅&#xff08;Publish/Subscribe&#xff09;模式。 观察者模式定义了一种一对多的依赖关系&#xff0c;让多个观察…

Windows安装Gradle(IDEA兼容版)

IDEA兼容版本 IDEA安装目录下查看兼容Gradle版本&#xff1a;D:\win11\program\idea_2022.2.3\plugins\gradle\lib Gradle下载环境变量 1.创建仓库目录 D:\win11\program\gradle-7.4-bin\gradle-7.4\repository2.添加环境变量 GRADLE_HOME&#xff1a;D:\win11\program\gradle…

Java连接Redis

Jedis是Redis官方推荐的Java连接开发工具。api&#xff1a;https://tool.oschina.net/apidocs/apidoc?apijedis-2.1.0一、 导入包<!-- https://mvnrepository.com/artifact/redis.clients/jedis --> <dependency><groupId>redis.clients</groupId><…

在职阿里6年,一个29岁女软件测试工程师的心声

简单的先说一下&#xff0c;坐标杭州&#xff0c;14届本科毕业&#xff0c;算上年前在阿里巴巴的面试&#xff0c;一共有面试了有6家公司&#xff08;因为不想请假&#xff0c;因此只是每个晚上去其他公司面试&#xff0c;所以面试的公司比较少&#xff09;其中成功的有4家&…

新版bing(集成ChatGPT)的申请方法

大家好,我是herosunly。985院校硕士毕业,现担任算法研究员一职,热衷于机器学习算法研究与应用。曾获得阿里云天池比赛第一名,科大讯飞比赛第三名,CCF比赛第四名。拥有多项发明专利。对机器学习和深度学习拥有自己独到的见解。曾经辅导过若干个非计算机专业的学生进入到算法…

redis知识汇总(部署、高可用、集群)

文章目录一、redis知识汇总什么是redisredis的优缺点&#xff1a;为什么要用redis做缓存redis为什么这么快什么是持久化redis持久化机制是什么&#xff1f;各自优缺点&#xff1f;AOF和RDB怎么选择redis持久化数据和缓存怎么做扩容什么是事务redis事务的概念ACID概念主从复制re…

03 OpenCV图像运算

文章目录1 普通加法1 加号相加2 add函数2 加权相加3 按位运算1 按位与运算2 按位或运算、非运算4 掩膜1 普通加法 1 加号相加 在 OpenCV 中&#xff0c;图像加法可以使用加号运算符&#xff08;&#xff09;来实现。例如&#xff0c;如果要将两幅图像相加&#xff0c;可以使用…

JVM - 类加载,连接和初始化

目录 类加载和类加载器 概述 类加载要完成的功能 加载类的方式 类加载器 类加载器的关系 类加载器说明 双亲委派模型 工作过程如下&#xff1a; 双亲委派模型说明&#xff1a; 破坏双亲委派模型&#xff1a; 类连接和初始化 类连接主要验证的内容 类连接中的解析…

c++重中之重:“换个龟壳继续套娃“:运算符重载等的学习

文章目录 前言一.运算符重载二.const成员三.取地址重载总结前言 上一期我们讲到类的6个默认构造函数中的拷贝构造函数&#xff0c;这一期我们继续往下讲&#xff0c;当然难点肯定是运算符重载了。 一、运算符重载 运算符重载是c为了增强代码的可读性引入了运算符重载&#xf…

笑死,面试官又问我SpringBoot自动配置原理

面试官&#xff1a;好久没见&#xff0c;甚是想念。今天来聊聊SpringBoot的自动配置吧&#xff1f; 候选者&#xff1a;嗯&#xff0c;SpringBoot的自动配置我觉得是SpringBoot很重要的“特性”了。众所周知&#xff0c;SpringBoot有着“约定大于配置”的理念&#xff0c;这一…

亿级高并发电商项目-- 实战篇 --万达商城项目 四(Dashboard服务、设置统一返回格式与异常处理、Postman测试接口 )

专栏&#xff1a;高并发---前后端分布式项目 &#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是小童&#xff0c;Java开发工程师&#xff0c;CSDN博客博主&#xff0c;Java领域新星创作者 &#x1f4d5;系列专栏&#xff1a;前端、Java、Java中间件大全、微信小程序、…

Spring-事务2

文章目录前言一、事务的特性&#xff08;ACID&#xff09;二、事务的隔离级别三、spring中的事务平台事务管理器.事务定义ISOLation_XXX&#xff1a;**事务隔离级别.**PROPAGATION_XXX&#xff1a;**事务的传播行为**.事务状态关系&#xff1a;四、使用XML文件配置事务1、 搭建…