kafka(七)——消息偏移(消费者)

news2025/1/23 4:56:05

概念

消费者消费完消息后,向_consumer_offset主题发送消息,用来保存每个分区的偏移量。

在这里插入图片描述

流程说明

  1. consumer发送JoinGroup请求;
  2. coordinator选出一个consumer作为leader,并将topics发送给leader消费者;
  3. leader consumer负责制定消费方案;
  4. leader consumer将消费方案发送给coordinator;
  5. coordinator将消费方案发送给CG中的每个consumer;
  6. 每个consumer与coordinator保持心跳(默认3s),一旦超时(session.timeout.ms=45s),该consumer被移除,触发再平衡,或者消费者处理消息过长(max.poll.interval.ms=300s),也会触发再平衡;

适用场景

消费者数量发生变化、消费者订阅主题发生变化或者分区数量发生变化时,会触发kafka的再平衡(Rebalance),再平衡后,消费者可能被分到新的分区,为保证高可用和伸缩性,消费者需要读取每个分区最后一次偏移量。

注意:再平衡期间,群组不可用,消费者无法读取消息。

再平衡(Rebalance)

再平衡(Rebalance),是Kafka中确保Consumer group下所有的consumer如何达成一致,分配订阅的topic的每个分区的机制。

触发场景

  • 消费者个数发生变化,有新的消费者或分组中的消费者停止消费;
  • 订阅的主题(topic)个数发生变化;
  • 订阅的主题分区发生变化(partition);

影响

  • 再平衡时,消费者组下的所有消费者都会协调在一起共同参与,Kafka使用分配策略尽可能达到最公平的分配;
  • 再平衡过程会对消费者组产生非常严重的影响,所有的消费者都将停止工作,直到再平衡执行完成;

分区分配策略

Range范围分配策略

参数配置
partition.assignment.strategy = org.apache.kafka.clients.consumer.RangeAssignor
算法

n = 分区数量 / 消费者数量

m = 分区数量 % 消费者数量

前m个消费者消费n+1个,剩余消费者消费n个

图解

n = 2 = 8/3

m = 2 = 8%3

前2个消费者消费(2+1)个,剩余消费者消费2个。

在这里插入图片描述

RoundRobin轮询策略

将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序(topic和分区的hashcode进行排序),然后通过轮询方式逐个将分区以此分配给每个消费者。

参数配置
partition.assignment.strategy = org.apache.kafka.clients.consumer.RoundRobinAssignor
图解

在这里插入图片描述

Stricky粘性分配策略

在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。

参数配置
partition.assignment.strategy = org.apache.kafka.clients.consumer.StickyAssignor
图解
  • 故障前

在这里插入图片描述

  • 故障后

在这里插入图片描述

代码示例

// 设置消费者组再平衡回调
// 注册该函数会关闭 rdkafka 的自动分区赋值和再分配
class ConsumerRebalanceCb : public RdKafka::RebalanceCb 
{
public:
	// 消费者组再平衡回调
	void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err,
		std::vector<RdKafka::TopicPartition *> &partitions) 
	{
		if (RdKafka::ERR__ASSIGN_PARTITIONS == err)  // 分区分配成功
		{
			// 消费者订阅这些分区
			consumer->assign(partitions);
			// 获取消费者组本次订阅的分区数量,可以属于不同的topic
			m_partitionCount = (int)partitions.size();
		} 
		else   // 分区分配失败
		{
			// 消费者取消订阅所有的分区
			consumer->unassign();
			// 消费者订阅分区的数量为0
			m_partitionCount = 0;
		}
	}

private:
	int m_partitionCount;    // 消费者组本次订阅的分区数量
};


RdKafka::Conf* t_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(NULL == t_config)
{
    printf("create conf failed\n");
    return;
}

std::string errorStr = ""; 
RdKafka::RebalanceCb* rebalance_cb = new ConsumerRebalanceCb;
RdKafka::Conf::ConfResult errorCode = t_config->set("rebalance_cb", rebalance_cb, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode) 
{
    printf("set conf(rebalance_cb) failed, err:%s\n", errorStr.c_str());
    delete t_config;
    return;
}

提交方式

自动提交

参数配置

# 默认自动提交,消费者close时也会自动提交
enable.auto.comnit=true

# 自动提交周期,默认5s
auto.commit.interval.ms=5000

代码示例

RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时
if(NULL != msg)
{
	// 消费消息
	ConsumeMsg_(msg);

    // 消息消费完后无需手动处理,kafka自动提交偏移
    delete msg;
}

存在的问题

如果在周期5s内发生再平衡,导致偏移量未提交,未提交的消息会被重复消费。

手动提交

参数配置

RdKafka::Conf* t_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(NULL == t_config)
{
    printf("create conf failed\n");
    return;
}

RdKafka::Conf* topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if (NULL == topicConfig) 
{
    printf("create topic conf failed\n");
    delete t_config;
    return;
}

std::string errorStr = ""; 
RdKafka::Conf::ConfResult errorCode = topicConfig->set("enable.auto.commit", " false", errorStr);
if(RdKafka::Conf::CONF_OK != errorCode)
{
    printf("set topic conf(enable.auto.commit) failed, err:%s\n", errorStr.c_str());
    delete topicConfig;
    delete t_config;
    return;
}

// 设置新到来消费者的消费起始位置,latest 消费最新的数据,earliest 从头开始消费
errorCode = topicConfig->set("auto.offset.commit", " earliest", errorStr);
if(RdKafka::Conf::CONF_OK != errorCode)
{
    printf("set topic conf(auto.offset.commit) failed, err:%s\n", errorStr.c_str());
    delete topicConfig;
    delete t_config;
    return;
}

// 默认 topic 配置,用于自动订阅 topics
errorCode = t_config->set("default_topic_conf", topicConfig, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode) 
{
    printf("set conf(default_topic_conf) failed, err:%s\n", errorStr.c_str());
    delete topicConfig;
    delete t_config;
    return;
}

同步提交

  • 消息消费完,手动调用commitSync;
  • 在同步提交未完成的情况下发生再平衡,消息会被重复消费;
  • commitSync会阻塞直到偏移提交成功;
RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时
if(NULL != msg)
{
    // 消费消息
    ConsumeMsg_(msg, NULL);

    // 开启手动提交
    m_consumer->commitSync(); 
    delete msg;
}

异步提交

  • 消息消费完,手动调用commitAsync;
  • commitAsync不会重试提交偏移量;
RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时
if(NULL != msg)
{
    // 消费消息
    ConsumeMsg_(msg, NULL);

    // 开启手动提交
    m_consumer->commitAsync(); 
    delete msg;
}

存在的问题

重复消费(同步提交)

在这里插入图片描述

  • auto.offset.commit参数设置为earliest;
  • 上次提交的偏移量为1;
  • 由于网络故障、超时等原因,2~7已消费完的情况下,8未提交成功,由于设置了参数auto.offset.commit=earliest,分区再平衡后会继续从2开始消费,会导致消息重复消费的问题;
消息丢失(异步提交)

在这里插入图片描述

  • auto.offset.commit参数设置为latest;
  • 上次提交的偏移量为1;
  • 本次消费的偏移量范围为27,消费者立马提交了偏移量8,由于网络故障、超时等原因,27未消费完,由于设置了参数auto.offset.commit=latest,再平衡后会继续从8开始消费,会导致消息重复丢失的问题;

解决方案

根据实际场景选择同步提交还是异步提交。如果对消息可靠性要求比较高,不允许数据丢失,建议选择同步提交+“auto.offset.commit=earliest”,性能略差。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1657554.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何使用Transformer-TTS语音合成模型

1、技术原理及架构图 ​ Transformer-TTS主要通过将Transformer模型与Tacotron2系统结合来实现文本到语音的转换。在这种结构中&#xff0c;原始的Transformer模型在输入阶段和输出阶段进行了适当的修改&#xff0c;以更好地处理语音数据。具体来说&#xff0c;Transformer-TT…

NSSCTF Web方向的例题和相关知识点(一)

[SWPUCTF 2021 新生赛]jicao 解题&#xff1a; 打开环境&#xff0c;是一段php代码 包含了flag.php文件&#xff0c;设定了一个POST请求的id和GET请求的json 语句会对GET请求的数据进行json解码 如果id和json变量的值都等于设定字符串&#xff0c;则得到 flag 我们可以使用…

如何让加快OpenHarmony编译速度?

OpenHarmony 有两种编译方式&#xff0c;一种是通过 hb 工具编译&#xff0c;一种是通过 build.sh 脚本编译。本文笔者将提升 build.sh 方式编译速度的方法整理如下&#xff1a; 因为笔者只用 build.sh 脚本编译&#xff0c;没用过 hb 工具&#xff0c;好像下面的选项也可以用于…

Python中使用tkinter模块和类结构的结合使用举例——编写制作一个简单的加数GUI界面

Python中使用tkinter模块和类结构的结合使用举例——编写制作一个简单的加数GUI界面 这里写目录标题 Python中使用tkinter模块和类结构的结合使用举例——编写制作一个简单的加数GUI界面一、tkinter模块和类的简述1.1 tkinter的简要介绍1.2 类结构的简要介绍 二、基于类机构和t…

拼多多强付费二阶段断流怎么办?分几种情况解决

关于断流的问题应该有不少人遇到过&#xff0c;即使是强付费&#xff0c;也不是一直有流量&#xff0c;到了二阶段说断流就断流&#xff0c;同样不能幸免。那么强付费二阶段直接断流是什么原因呢?今天跟大家讲一下强付费断流可能遇到的几种情况&#xff0c;要怎么应对。 第一…

利用自动获客软件实现高效精准获客

在数字化时代的浪潮中&#xff0c;企业之间的竞争愈发激烈。客户资源的获取成为企业生存和发展的关键。传统的获客方式如广告投放、线下推广等不仅成本高昂&#xff0c;而且效率和准确性难以保证。随着科技的进步&#xff0c;自动获客软件应运而生&#xff0c;它以其独特的优势…

C语言洛谷题目分享(11)回文质数

目录 1.前言 2.题目&#xff1a;回文质数 1.题目描述 2.输入格式 3.输出格式 4.输入输出样例 5.题解 3.小结 1.前言 哈喽大家好&#xff0c;今儿继续为大家分享一道蛮有价值的一道题&#xff0c;希望大家多多支持喔~ 2.题目&#xff1a;回文质数 1.题目描述 因为 151 …

【MySQL数据库】详解数据库审核工具SQLE的部署及接口调用

SQLE部署及使用 1. 部署SQLE SQLE相信大家都不陌生吧&#xff0c;它是一款开源&#xff0c;支持多场景审核&#xff0c;支持标准化上线流程&#xff0c;原生支持 MySQL 审核且数据库类型可扩展的 SQL审核工具。我们可以基于此工具进行数据库SQL审核&#xff0c;提升SQL脚本质量…

ue引擎游戏开发笔记(36)——为射击落点添加特效

1.需求分析&#xff1a; 在debug测试中能看到子弹落点后&#xff0c;需要给子弹添加击中特效&#xff0c;更真实也更具反馈感。 2.操作实现&#xff1a; 1.思路&#xff1a;很简单&#xff0c;类似开枪特效一样&#xff0c;只要在头文件声明特效变量&#xff0c;在fire函数中…

数据挖掘(一)数据类型与统计

前言 打算新开一个笔记系列&#xff0c;基于国防科技大学 丁兆云老师的《数据挖掘》 数据挖掘 1、数据类型与统计 数据统计 最大值&#xff0c;最小值&#xff0c;平均值&#xff0c;中位数&#xff0c;位数&#xff0c;方差等统计指标 df.describe() #当调用df.describe(…

分布式锁与秒杀

分布式锁与秒杀 1. 分布式锁1.1 常用Redis分布式锁方案三&#xff1a;使用Lua脚本(包含SETNX EXPIRE两条指令) 秒杀 1. 分布式锁 https://www.cnblogs.com/shoshana-kong/p/17519673.html 1.1 常用Redis分布式锁方案三&#xff1a;使用Lua脚本(包含SETNX EXPIRE两条指令) …

【JAVA基础之装箱和拆箱】自动装箱和自动拆箱

&#x1f525;作者主页&#xff1a;小林同学的学习笔录 &#x1f525;mysql专栏&#xff1a;小林同学的专栏 目录 1.包装类 1.1 概述 1.2 Integer类 1.3 装箱和拆箱 1.4 自动装箱和自动拆箱 1.5 基本类型与字符串之间的转换 1.5.1 基本类型转换为字符串 1.5.2 字符串转…

力扣每日一题111:二叉树的最小深度

题目 简单 给定一个二叉树&#xff0c;找出其最小深度。 最小深度是从根节点到最近叶子节点的最短路径上的节点数量。 说明&#xff1a;叶子节点是指没有子节点的节点。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;2示例 2&#x…

银行职员向媒体投稿发文章我找到了好方法

作为一名基层银行的媒体联络专员,我的日常工作中有一项至关重要的任务,那就是代表我所在的支行向各大媒体投稿,传播我们的金融服务、产品动态以及社会责任实践。起初,这项看似简单的工作却成了我职业生涯中的一大挑战。传统的邮件投稿方式,不仅耗时费力,而且审核流程严格,稿件从…

python 和 MATLAB 都能绘制的母亲节花束!!

hey 母亲节快到了&#xff0c;教大家用python和MATLAB两种语言绘制花束~这段代码是我七夕节发的&#xff0c;我对代码进行了简化&#xff0c;同时自己整了个python版本 MATLAB 版本代码 function roseBouquet_M() % author : slandarer% 生成花朵数据 [xr,tr]meshgrid((0:24).…

杨辉三角的打印

题目内容&#xff1a; 在屏幕上打印杨辉三角。 思路&#xff1a; 首先我们通过观察发现&#xff0c;每一步的打印都与行列数有关&#xff0c;中间的数据由这一列和上一行的前一列数据控制。所以我们可以使用二维数组进行操作&#xff1a; &#xff08;&#xff11;&#xff…

在k8s中部署hadoop后的使用,包括服务端及客户端(客户端的安装及与k8s服务的对接)

&#xff08;作者&#xff1a;陈玓玏&#xff09; 在https://blog.csdn.net/weixin_39750084/article/details/136744772?spm1001.2014.3001.5502和https://blog.csdn.net/weixin_39750084/article/details/136750613?spm1001.2014.3001.5502这两篇文章中&#xff0c;说明…

Redis + OpenResty 多级缓存

多级缓存 初识 OpenResty OpenResty - 开源官方站 基于 Nginx的高性能 Web 平台&#xff0c;用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。 具备Nginx的完整功能基于Lua语言进行扩展&#xff0c;集成了大量精良的 Lua 库、第三方模块允…

AlibabaCloud微服务下的链路追踪系统实战详解

&#x1f680; 作者 &#xff1a;“二当家-小D” &#x1f680; 博主简介&#xff1a;⭐前荔枝FM架构师、阿里资深工程师||曾任职于阿里巴巴担任多个项目负责人&#xff0c;8年开发架构经验&#xff0c;精通java,擅长分布式高并发架构,自动化压力测试&#xff0c;微服务容器化k…

【深耕 Python】Quantum Computing 量子计算机(3)重要数学公式一览

写在前面 往期量子计算机博客&#xff1a; 【深耕 Python】Quantum Computing 量子计算机&#xff08;1&#xff09;图像绘制基础 【深耕 Python】Quantum Computing 量子计算机&#xff08;2&#xff09;绘制电子运动平面波 正文 偏微分&#xff1a; 交换关系&#xff…