Kafka-Consumer理论知识

news2024/11/28 6:12:24

一、上下文

之前的博客我们分析了Kafka的设计思想、Kafka的Producer端、Kafka的Server端的分析,为了完整性,我们接下来分析下Kafka的Consumer。《Kafka-代码示例》中有对应的Consumer示例代码,我们以它为入口进行分析

二、KafkaConsumer是什么?

一个从Kafka集群中消费记录的客户端,KafkaConsumer与broker交互,会随着获取的TopicPartition在集群中因故障迁移而自己调整。

KafkaConsumer允许消费者组使用consumer groups对消费进行负载平衡

三、offset

Kafka为分区中的每条记录维护一个offset,该offset充当该分区内记录的唯一标识符,也表示consumer的消费进度。例如:如果offset=5就表示offset为0-4的记录已经被消费。

consumer的position(TopicPartition)给出了下一条记录的offset,这将比消费者在该分区中看到的最高偏移量大一个。每次consumer在调用poll(Duration) 收到消息时,都会推进offset的增长。

consumer可以交由 kafka 定期自动提交offset 。也可以 调用 commitSync() 手动提交。

如果consumer进程失败并重新启动,会从维护的offset处开始继续消费。

1、自动提交offset

如下的示例依赖于自动提交offset

Properties props = new Properties();
//指定broker列表,用于连接kafka集群,1个或多个都可以,尽量多个
props.setProperty("bootstrap.servers", "localhost:9092");
//定义的组为test
props.setProperty("group.id", "test");
//配置自动提交
props.setProperty("enable.auto.commit", "true");
//自动提交频率,1000表示1秒提交1次
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅 foo bar 两个topic的数据
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

 当程序正常运行,offset会随着consumer的消费自动向前推进,如果consumer发生故障,会出现重复消费或数据丢失的情况。

如果Kafka以auto.commit.interval.ms的频率提交了offset,consumer在下一次自动提交前出现故障,当consumer再次启动后会出现重复消费的现象。

如果consumer刚刚拉回来一批数据准备处理,此刻Kafka正好自动提交offset,但是consumer出现了故障,当consumer再次启动后会出现这批数据丢失的现象。

2、手动提交offset

如下的示例依赖于手动提交offset

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
//关闭自动提交
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        buffer.add(record);
    }
    if (buffer.size() >= minBatchSize) {
        insertIntoDb(buffer);
        //手动提交
        consumer.commitSync();
        buffer.clear();
    }
}

用户可以根据自己处理数据的逻辑,控制何时提交offset。

在这个例子中,我们需要积累一定数量的数据后再批量插入数据库中。因此只有插入数据库后才需要提交offset。

这里还会发生一个小概率事件:这批数据插入数据库成功,但是再手动提交offset时,consumer发生故障。这会导致consumer重启后发生重复消费现象。因此最好把插入数据和提交offset放入到一个事务中,要么都成功,要么都失败。

上面的示例使用 commitSync() 将所有接收到的记录标记为已提交。在某些情况下,可能希望通过明确指定offset来更好地控制已提交的记录。比如下面示例:

try {
    while(running) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
            for (ConsumerRecord<String, String> record : partitionRecords) {
                System.out.println(record.offset() + ": " + record.value());
            }
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            //提交的偏移量应始终是应用程序将读取的下一条消息的偏移量
            //因此这里给的是lastOffset + 1
            consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
        }
    }
} finally {
  consumer.close();
}

四、consumer groups

Kafka使用consumer groups的概念来允许一个进程池来划分消费和处理记录的工作。这些进程可以在同一台机器上运行,也可以分布在许多机器上,为处理提供可扩展性和容错性。

共享统一 group.id  的所有consumer 都将属于同一消费者组。组中的每个消费者都可以通过其中一个subscribe(Collection, ConsumerRebalanceListener) API动态设置要订阅的topic列表。Kafka会将订阅topic中的每条消息传递给每个消费者组中的一个进程。这是通过平衡消费者组中所有成员之间的分区来实现的,这样每个分区都被分配给组中的一个消费者。

如果一个topic有四个分区,一个消费者组有两个进程,那么每个进程会被分配两个分区来消费。

consumer group中的consumer是动态维护的,如果要给consumer出现故障,分配给它的分区将被重新分配给同一组中的其他consumer。同样,如果新的consumer加入该组,分区将从现有consumer移动到新的consumer。这被称 重新平衡 组。

当topic中的分区增多时也会触发 组的 重新平衡

当组重新分配自动发生时,可以通过ConsumerRebalanceListener通知消费者,这允许他们完成必要的应用程序级逻辑,如状态清理、手动偏移提交等。

消费者也可以使用assign()手动分配特定分区。在这种情况下,动态分区分配和消费者组协调将被禁用。

五、监测consumer故障

consumer订阅了一组topic后,当调用poll(Duration)  时,消费者将自动加入一个组。poll(Duration) 可以让Kafka认为该consumer是活动状态,consumer就会留在组中,并继续从分配给它的分区接收消息。

consumer定期向服务器发送心跳,如果consumer崩溃或在 session.timeout.ms  的持续时间内无法发送心跳,则consumer将被视为已死亡,其分区将被重新分配。

consumer也可能遇到“livelock”情况,即它继续发送心跳,但没有调用poll(Duration)

为了防止consumer在这种情况下无限期地保留其分区,Kafka使用 max.poll.interval.ms 设置提供了一种活性检测机制。如果consumer在max.poll.interval.ms内没有执行poll(Duration)消费数据,那么consumer将主动离开该组,以便另一个consumer可以接管其分区。

当这种情况发生时,可能会看到偏移提交失败(如调用commitSync() 时抛出的CommitFailedException 异常。这是一种安全机制,保证只有组中的活跃成员才能提交偏移。因此,要想留在组中,就必须持续调用 poll(Duration)

consumer提供两个配置来控制轮询循环的行为:

max.poll.interval.ms:

        增加轮询之间的间隔,让consumer可以有更多的事件来处理拉取的数据,缺点:增加此值可能会延迟组重新平衡。

max.poll.records:

        使用此设置可限制单次轮询调用返回的总记录数,通过调整此值,您可以减少轮询间隔,这将减少组重新平衡的影响

如果拉回的消息处理时间变化不可预测,这两种选项可能都不能完美的解决。处理这种情况的推荐方法是将消息处理转移到另一个线程或者线程池。此consumer只负责拉取数据。如果使用这种方法,需要关闭自动提交offset,当数据处理完成进行手动提交offset。

六、为consumer手动指定分区

一般情况下,consumer只要订阅topic或topic列表,并让Kafka根据组中的活动consumer为这些主题动态分配公平的分区份额。但某些情况下,可能需要对特定分区进行更精细的控制。

1、如果进程正在维护与该分区相关联的某种本地状态(如本地磁盘键值存储),那么它应该只获取它在磁盘上维护的分区的记录。

2、如果进程本身具有高可用性,并且在失败时将重新启动(可能使用YARN、Mesos或AWS设施等集群管理框架,或者作为流处理框架的一部分)。在这种情况下,Kafka不需要检测故障并重新分配分区,因为消费进程将在另一台机器上重新启动。

使用示例:

String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));

手动分区分配不使用组协调,因此消费者故障不会导致分配的分区重新平衡。每个消费者独立行动,即使它与另一个消费者共享一个groupId。为了避免偏移提交冲突,通常应该确保每个消费者实例的groupId都是唯一的。

请注意:不可能将手动分区分配和订阅topic混合使用。

七、将offset维护转移到Kafka外部

Kafka自身会维护一个内部topic,用于存储offset。当然我们也可以选择将offset存储在外部存储系统,例如mysql。也可以将消费结果和offset存放在同一个存储系统,以原子方式存储结果和偏移量(用数据库事务 将 数据消费和offset提交绑定在一起)。这将使消费完全原子化,并给出“恰好一次”的语义,该语义比您使用Kafka的偏移提交功能获得的默认“至少一次”语义更强。

此时的做法是:

1、配置手动提交 enable.auto.commit=false

2、使用每个 ConsumerRecord提供的偏移量来保存您的位置

3、重新启动时,使用seek(TopicPartition, long)恢复消费者的位置

如果consumer手动指定分区,那么将offset维护到外部是简单的,因为指定的这个分区一直都是归属与你这个consumer来消费的。如果consumer消费的分区时Kafka给你自动分配的,则会因为consumer的变化等因素导致你当下的consumer被分到的分区变化。这时,需要通过对subscribe(Collection, ConsumerRebalanceListener)和subscribe(Pattern, ConsumerRebalanceListener)的调用提供一个ConsumerRebalanceListener

例如:当consumer获取分区时,consumer希望通过实现ConsumerRebalanceListener.onPartitionsRevoked(Collection)来提交这些分区的偏移量。当将分区分配给消费者时,消费者将希望查找这些新分区的偏移量,并通过实现ConsumerRebalanceListener.onPartitionsAssigned(Collection)将consumer正确初始化到该位置。

ConsumerRebalanceListener的另一个常见用途是清除应用程序为移动到其他地方的分区维护的任何缓存

八、控制consumer的消费位置

在大多数用例中,consumer只需从头到尾消费记录,定期提交其位置(自动或手动)。然而,Kafka允许consumer手动控制其位置,在分区中随意向前或向后移动。这意味着consumer可以重新消费旧记录,或者跳到最近的记录,而无需实际消费中间记录。

例如对于时间敏感的记录处理,如果数据已经生产了很长时间,对于一个新的consumer来说就很有意义。它可以直接处理最新的数据,或者它只关心中午12点之后的数据。

九、流量控制

如果一个consumer被分配了多个分区来获取数据,它将尝试同时从所有分区中消费,从而有效地赋予这些分区相同的消费优先级。然而,在某些情况下,consumer可能希望首先全速处理某一个分区。当这个分区没有数据时才处理其他分区。

还有一种情况,那就是流处理,一个consumer从两个topic消费数据,并对这两个流进行连接。当其中一个topic远远落后与另一个topic时,consumer会暂停从速度快的topic获取数据,以便让滞后的topic跟上。

Kafka支持消费流的动态控制,在未来的 poll(Duration)调用中,分别使用pause(Collection)和resume()来暂停指定分配分区上的消费和恢复指定暂停分区上的消费。

十、消费事务性消息

Kafka 0.11.0引入了事务,其中producer可以原子地写入多个topic和partition。为了实现这一点,从这些分区读取的consumer应该被配置为只读取已提交的数据。既:isolation.level=read_committed

在read_committed模式下,consumer将只读取已成功提交的事务消息。consumer的分区结束偏移量将是分区中属于打开事务的第一条消息的偏移量。这种偏移被称为“最后稳定偏移”(LSO)

LSO 是 Last Stable Offset 的缩写

consumer只会读取LSO并过滤掉任何已中止的事务消息。LSO还影响consumer的seekToEnd(Collection)和endOffsets(Collection)行为。

带有事务消息的分区将包括表示事务结果的提交或中止标记。这些标记不会返回给consumer,但在日志中有偏移。

因此,从具有事务性消息的topic中读取的应用程序将看到所消耗的偏移量存在缺口。这些缺失的消息将成为事务标记,并在两个隔离级别中为consumer过滤掉。此外,消费者也可能会看到由于事务中止而导致的空白,因为这些消息不会被consumer返回,但会有有效的偏移量。

十一、多线程消费

Kafka消费者不是线程安全的。我们有责任确保多线程访问正确同步。不同步访问将导致ConcurrentModificationException

此规则的唯一例外是 wakeup(),它可以从外部线程安全地用于中断活动操作。在这种情况下,将从操作的线程阻塞中抛出org.apache.kafka.common.errors.WakeupException这可用于从另一个线程关闭消费者。

如下示例:

public class KafkaConsumerRunner implements Runnable {
	private final AtomicBoolean closed = new AtomicBoolean(false);
	private final KafkaConsumer consumer;
	
	public KafkaConsumerRunner(KafkaConsumer consumer) {
	this.consumer = consumer;
	}
	
	{@literal}@Override
	public void run() {
		try {
			consumer.subscribe(Arrays.asList("topic"));
			while (!closed.get()) {
				ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
				// 处理新记录 ......
			}
		} catch (WakeupException e) {
			// 如果关闭,忽略异常
			if (!closed.get()) throw e;
		} finally {
			consumer.close();
		}
	}
	
	// Shutdown hook which can be called from a separate thread
	// 关闭钩子,可以从单独的线程调用
	public void shutdown() {
		closed.set(true);
		consumer.wakeup();
	}
}

然后在一个单独的线程中,可以通过设置closed标志并唤醒消费者来关闭消费者。

closed.set(true);
consumer.wakeup();

我们看以下几种情况:

1、1个consumer一个线程

优点:

1、最容易实施

2、它通常是最快的,因为不需要线程间的协调

3、它使得基于每个分区的按顺序处理非常容易实现(每个线程只按接收消息的顺序处理消息)。

缺点:

1、更多的consumer意味着更多的TCP连接到集群(每个线程一个)。一般来说,Kafka非常有效地处理连接,因此这通常是一个很小的成本。

2、多个consumer意味着向服务器发送的请求更多,数据批处理略有减少,这可能会导致I/O吞吐量下降。

3、所有进程中的线程总数将受到分区总数的限制

2、将消费和处理解耦

consumer只负责接收数据,处理数据的逻辑让另一个线程池来处理

优点:

        此选项允许独立扩展consumer和处理器的数量。这使得有可能有一个为多个处理器线程提供数据的单一consumer,从而避免了对分区的任何限制。

缺点:

        1、保证处理器之间的顺序需要特别小心,因为线程将独立执行,由于线程执行时间的运气,较早的数据块实际上可能会在较晚的数据块之后被处理。对于没有订购要求的加工,这不是问题。

        2、手动提交 offset 变得更加困难,因为它要求所有线程协调以确保该分区的处理完成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2248868.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring Boot教程之十一:获取Request 请求 和 Put请求

如何在 Spring Boot 中获取Request Body&#xff1f; Java 语言是所有编程语言中最流行的语言之一。使用 Java 编程语言有几个优点&#xff0c;无论是出于安全目的还是构建大型分发项目。使用 Java 的优点之一是 Java 试图借助类、继承、多态等概念将语言中的每个概念与现实世…

【JAVA进阶篇教学】第二十篇:如何高效处理List集合数据及明细数据

博主打算从0-1讲解下java进阶篇教学&#xff0c;今天教学第二十篇&#xff1a;如何高效处理List集合数据及明细数据。 Java 8 Stream API 助力高效处理集合数据&#xff08;订单明细查询优化案例&#xff09; 目录 一、前言 二、问题回顾 三、优化思路与 Stream API 的运用…

Linux的介绍及虚拟机centOS系统的下载与应用

1、什么是Linux Linux 是一种类 Unix 操作系统&#xff0c;它的内核&#xff08;Kernel&#xff09;由 Linus Torvalds 于 1991 年首次发布。作为一个开源、免费的操作系统&#xff0c;Linux 被广泛用于服务器、桌面计算机、嵌入式设备、移动设备等各种场景。 1、操作系统 操…

ORACLE数据库直接取出数据库字段JSON串中的 VALUE内容

字段内容类似这种&#xff1a; 如果是12c以上版本可以使用 SELECT JSON_VALUE(MEMO, $.supplyExercisePrice) AS supplyExercisePrice FROM your_table;如果是11g版本可以使用 SELECT REGEXP_SUBSTR(MEMO, "supplyExercisePrice":"([^"])", 1, 1, …

业务分组:流量隔离

RPC中常用的保护手段“熔断限流”&#xff0c;熔断是调用方为了避免在调用过程中&#xff0c;服务提供方出现问题的时候&#xff0c;自身资源被耗尽的一种保护行为&#xff1b;而限流则是服务提供方为防止自己被突发流量打垮的一种保护行为。虽然这两种手段作用的对象不同&…

数据结构——排序算法第二幕(交换排序:冒泡排序、快速排序(三种版本) 归并排序:归并排序(分治))超详细!!!!

文章目录 前言一、交换排序1.1 冒泡排序1.2 快速排序1.2.1 hoare版本 快排1.2.2 挖坑法 快排1.2.3 lomuto前后指针 快排 二、归并排序总结 前言 继上篇学习了排序的前面两个部分:直接插入排序和选择排序 今天我们来学习排序中常用的交换排序以及非常稳定的归并排序 快排可是有多…

【JavaEE初阶 — 网络编程】Socket 套接字 & UDP数据报套接字编程

1. Socket套接字 1.1 概念 Socket 套接字&#xff0c;是由系统提供用于网络通信的技术&#xff0c;是基于TCP / IP协议的网络通信的基本操作单元。基于 Socket 套接字的网络程序开发就是网络编程。 1.2 分类 Socket套接字主要针对传输层协议划分为如下三类&#x…

熔断限流:业务实现自我保护

服务端-限流 服务端主要是通过限流来进行自我保护&#xff0c;实现限流时要考虑到应用和IP级别&#xff0c;方便在服务治理的时候&#xff0c;对部分访问量特别大的应用进行合理的限流&#xff1b;服务端的限流阈值配置都是作用于单机的&#xff0c;而在有些场景下&#xff0c…

linux系统误操作,设置nofile值超过限制,导致无法登录,permission denied

1.问题描述&#xff08;虚拟机复现&#xff09; 在k8s集群运行某些服务时&#xff0c;对文件描述符要求比较大&#xff0c;在提高这个值前未查询这个值的限制&#xff0c;最后设置了一个超过限制的值导致登录被拒绝 [roottest4 ~]# tail -3 /etc/security/limits.conf * sof…

从零开始配置Qt+VsCode环境

从零开始配置QtVsCode环境 文章目录 从零开始配置QtVsCode环境写在前面扩展安装及配置Qt Configure配置 VsCode创建Qt工程VsCodeQMakeMinGwVsCodeQMakeMsvcVsCodeCMakeMinGwVsCodeCMakeMsvcQtCreatorQMakeMinGw->VsCodeQtCreatorQMakeMsvc->VsCodeQtCreatorCMakeMinGw-&g…

如何借助AI生成PPT,让创作轻松又高效

PPT是现代职场中不可或缺的表达工具&#xff0c;但同时也可能是令人抓狂的时间杀手。几页幻灯片的制作&#xff0c;常常需要花费数小时调整字体、配色与排版。AI的飞速发展为我们带来了革新——AI生成PPT的技术不仅让制作流程大大简化&#xff0c;还重新定义了效率与创意的关系…

【Linux】Make/Makefile

这个3/4行的语法和1/2行是一样的。也是依赖关系和依赖方法。 make命令扫描makefile文件时&#xff0c;从上向下扫描&#xff0c;默认形成一个目标文件。 指定make clean的时候才回去执行对应的清除。 为什么要给我们的clean.PHONY:clean声明它是伪目标呢&#xff1f; PHONY类…

HarmonyOS:@Provide装饰器和@Consume装饰器:与后代组件双向同步

一、前言 Provide和Consume&#xff0c;应用于与后代组件的双向数据同步&#xff0c;应用于状态数据在多个层级之间传递的场景。不同于上文提到的父子组件之间通过命名参数机制传递&#xff0c;Provide和Consume摆脱参数传递机制的束缚&#xff0c;实现跨层级传递。 其中Provi…

如何做好一份技术文档?

打造出色技术文档的艺术 在当今技术驱动的世界中&#xff0c;技术文档扮演着至关重要的角色。它不仅是工程师和开发人员之间交流的桥梁&#xff0c;更是产品和技术成功的隐形推手。一份优秀的技术文档宛如一张精准的航海图&#xff0c;能够引导读者穿越技术的迷雾&#xff0c;…

泰山众筹怎样吸引用户参与

泰山众筹项目要吸引用户参与&#xff0c;需要采取一系列策略来增强项目的吸引力、提高用户信任度&#xff0c;并激发用户的参与热情。以下是一些建议&#xff1a; 1. 明确项目价值与愿景 展示独特性&#xff1a;明确泰山众筹项目的独特卖点&#xff0c;如创新性、社会影响力或…

抓包之验证content-length响应头的作用

写在前面 根据http协议的规范&#xff0c;content-length响应头用来标记固定长度响应信息长度&#xff0c;http客户端&#xff0c;比如浏览器也会解析这个字段来进行数据的解析。 1&#xff1a;测试 1.1&#xff1a;content-length等于实际内容匹配时 使用python脚本testco…

T3 TensorFlow入门实战——天气识别

&#x1f368; 本文為&#x1f517;365天深度學習訓練營 中的學習紀錄博客&#x1f356; 原作者&#xff1a;K同学啊 | 接輔導、項目定制 一、前期准备 1. 导入数据 # Import the required libraries import numpy as np import os,PIL,pathlib import matplotlib.pyplot as …

✨系统设计时应时刻考虑设计模式基础原则

目录 &#x1f4ab;单一职责原则 (Single Responsibility Principle, SRP)&#x1f4ab;开放-封闭原则 (Open-Closed Principle, OCP)&#x1f4ab;依赖倒转原则 (Dependency Inversion Principle, DIP)&#x1f4ab;里氏代换原则 (Liskov Substitution Principle, LSP)&#x…

fatal error in include chain (rtthread.h):rtconfig.h file not found

项目搜索这个文件 rtconfig 找到后将其复制粘贴到 你的目录\Keil\ARM\ARMCC\include 应该还有cJSON&#xff0c;rtthread.h和 等也复制粘贴下

【回文数组——另类递推】

题目 代码 #include <bits/stdc.h> using namespace std; using ll long long; const int N 1e510; int a[N], b[N]; int main() {int n;cin >> n;for(int i 1; i < n; i)cin >> a[i];for(int i 1; i < n / 2; i)b[i] a[i] - a[n1-i];ll ans 0;…