目录
1. 多线程安全
1.1. 生产者是多线程安全的么?
1.1. 消费者是多线程安全的么?
2. 消费者规避多线程安全方案
2.1. 每个线程维护一个kafkaConsumer
2.2. [单/多]kafkaConsumer实例 + 多worker线程
2.3.方案优缺点对比
1. 多线程安全
1.1. 生产者是多线程安全的么?
Kafka生产者是线程安全的,可以在多个线程中共享一个Kafka生产者实例。这是因为Kafka生产者实例内部使用了一些同步机制来保证线程安全,例如使用了线程安全的队列来缓存消息,使用了同步锁来保护共享资源的访问等。
同时,Kafka生产者的send()方法是非阻塞的,可以在多个线程中并发调用,不会阻塞线程。Kafka生产者还提供了异步发送和同步发送两种发送方式,可以根据实际需求选择不同的发送方式。
然而,如果多个线程共享同一个Kafka生产者实例,需要注意以下几点:
-
同一个线程中不要同时调用send()方法和flush()方法,可能会导致消息发送顺序不一致。
-
不同线程中调用send()方法时,需要注意消息的顺序,可以使用Kafka的分区机制来保证消息的顺序。
-
如果多个线程发送的消息都是针对同一个主题或分区,可能会导致消息的重复发送或丢失。因此,建议在多线程情况下,使用Kafka的分区机制,将消息发送到不同的分区中,以避免消息的重复和丢失。
综上所述,Kafka生产者是线程安全的,可以在多个线程中共享一个Kafka生产者实例,但需要注意消息的顺序和分区的使用,以保证消息的可靠性和顺序性。
1.1. 消费者是多线程安全的么?
Kafka消费者是非线程安全的,主要原因是因为:Kafka消费者使用了内部的状态来跟踪消费进度和偏移量。这种状态包括消费者的位置,消费者的偏移量,以及消费者的订阅主题和分区等信息。如果多个线程同时访问同一个Kafka消费者实例,就会导致这些状态信息的不一致,从而导致消费进度出现错误。
2. 消费者规避多线程安全方案
背景:Kafka消费者中具有poll()方法,该方法是一个阻塞方法,如果在同一个线程中多次调用poll()方法,会导致消费者阻塞在poll()方法中,无法及时消费新到达的消息。因此,为了提高Kafka消费者的并发性能,通常使用多个消费者线程来消费+处理消息。为了保证Kafka消费者的线程安全性,通常采用以下2种方案。
方案的本质:一个线程只能绑定一个消费者实例(不能多个线程共用一个消费者实例)
2.1. 每个线程维护一个kafkaConsumer
1. 创建多个线程,去消费topic
2. 每个线程绑定固定数量的分区(最好的情况是一个消费者绑定一个分区)
代码比较简单,唯一需要关注的点:创建的消费者线程和分区的数量可能不相等,此时,可以采用「分区副本自动分配策略」,该策略会将消费者线程和分区绑定在一起。
场景讨论:
按照现在的代码,假设消费者线程获取了分区A 100个msg,然后处理,若还未处理完,此时,offset未提交。因为此时offset还未提交,消费者线程还会去分区A调用poll,获取到的msg依然会是刚才的100个msg。存在消费重复的场景,因此,消费者需要做好幂等处理。
上面的代码,不会出现这个场景:消费者获取了消息1,2,3,3处理完成了,执行了提交偏移,但是消息1还未提交的场景。
代码示例:
package com.bie.kafka.kafkaThrea;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
*
* 1、KafkaConsumer是非线程安全的,KafkaProducer是线程安全的。
* 2、该案例是创建多个线程,每个线程维护一个KafkaConsumer实例
* 用户创建多个线程消费topic数据,使用分区副本自动分配策略,将消费者线程与分区进行绑定
* 3、ConsumerRunnable,消费线程类,执行真正的消费任务
*/
public class ConsumerRunnable implements Runnable {
// 每个线程维护私有的kafkaConsumer实例
private final KafkaConsumer<String, String> consumer;
/**
* 默认每个消费者的配置参数初始化
*
* @param brokerList
* @param groupId
* @param topic
*/
public ConsumerRunnable(String brokerList, String groupId, String topic) {
// 带参数的构造方法
Properties props = new Properties();
// kafka的列表
props.put("bootstrap.servers", brokerList);
// 消费者组编号
props.put("group.id", groupId);
// 自动提交
props.put("enable.auto.commit", true);
// 提交提交每个一秒钟
props.put("auto.commit.interval.ms", "1000");
// 反序列化key
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 反序列化value
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 将配置信息进行初始化操作
this.consumer = new KafkaConsumer<>(props);
// 定义响应的主题信息topic:使用分区副本自动分配策略
consumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
// 消费者保持一直消费的状态
while (true) {
// 将获取到消费的信息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(200));
// 遍历出每个消费的消息
for (ConsumerRecord<String, String> record : records) {
// 处理消息:deal msg
}
// 提交offset
}
}
}
package com.bie.kafka.kafkaThrea;
import java.util.ArrayList;
import java.util.List;
/**
* 1、消费线程管理类,创建多个线程类执行消费任务
*/
public class ConsumerGroup {
// 消费者群组,多消费者。
private List<ConsumerRunnable> consumers;
public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
// 初始化消费者组
consumers = new ArrayList<>(consumerNum);
// 初始化消费者,创建多少个消费者
for (int i = 0; i < consumerNum; i++) {
// 根据消费者构造方法,创建消费者实例
ConsumerRunnable consumerRunnable = new ConsumerRunnable(brokerList, groupId, topic);
// 将创建的消费者实例添加到消费者组中
consumers.add(consumerRunnable);
}
}
public void execute() {
// 将消费者组里面的消费者遍历出来
for (ConsumerRunnable task : consumers) {
// 创建一个消费者线程,并且启动该线程
new Thread(task).start();
}
}
}
package com.bie.kafka.kafkaThrea;
public class ConsumerMain {
public static void main(String[] args) {
// kafka即broker列表
String brokerList = "slaver1:9092,slaver2:9092,slaver3:9092";
// group组名称
String groupId = "group1";
// topic主题名称
String topic = "topic1";
// 消费者的数量
int consumerNum = 3;
// 通过构造器创建出一个对象
ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
// 执行execute的方法,创建出ConsumerRunnable消费者实例。多线程多消费者实例
consumerGroup.execute();
}
}
上面代码,在实际工程应用中存在问题,描述见下:
- 每个消费者线程,先执行poll拉取一批batchsize命令后
- 批量处理这批消息
- 提交offset
步骤2,处理batchsize的消息,可能中间的某一个失败了,但是步骤3提交了整体的offset,会导致失败的消息丢失了。
解决方案:处理每个消息的流程中,增加重试机制(本地消息表),保证该消息能执行成功
2.2. [单/多]kafkaConsumer实例 + 多worker线程
与2.1方案一的区别在于,将「消息的获取」与「消息的处理」解耦开
1. 消息的获取:维护一个or多个kafkaConsumer实例,获取消息,获取到消息后,将消息丢到消息处理线程池中
2. 消息的处理:创建一个线程池,里面存放了worker线程,每个worker线程处理获取到的消息
ConsumerWorker类
worker线程:执行msg的处理,更新offsets信息
package huxi.test.consumer.multithreaded;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.List;
import java.util.Map;
public class ConsumerWorker<K, V> implements Runnable {
private final ConsumerRecords<K, V> records;
private final Map<TopicPartition, OffsetAndMetadata> offsets;
public ConsumerWorker(ConsumerRecords<K, V> record, Map<TopicPartition, OffsetAndMetadata> offsets) {
this.records = record;
this.offsets = offsets;
}
@Override
public void run() {
for (TopicPartition partition : records.partitions()) {
// 获取到分区的消息记录
List<ConsumerRecord<K, V>> partitionRecords = records.records(partition);
// 遍历获取到的消息记录
for (ConsumerRecord<K, V> record : partitionRecords) {
// 消息处理逻辑
}
/* 下面操作,是更新各个分区的offset信息到offsets变量,并没有真正的提交位移 */
/* 真正的更新位移操作,不是在worker线程,而是在消费者线程 */
// 待更新的位移
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
// 同步锁,锁住offsets位移
synchronized (offsets) {
// 如果offsets位移不包含partition这个key信息,就将位移信息设置到map集合里面
if (!offsets.containsKey(partition)) {
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
} else {
// 否则,offsets位移包含partition这个key信息,获取到offsets的位置信息
long curr = offsets.get(partition).offset();
if (curr <= lastOffset + 1) { // 如果获取到的位置信息小于等于上一次位移信息大小,将这个partition的位置信息设置到map集合中。并保存到broker中。
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
}
}
}
}
}
ConsumerThreadHandler类
一个主线程,定时去poll消息,然后将消息投递到worker线程中,最后,当offsets信息发生变更后,提交offset
package huxi.test.consumer.multithreaded;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ConsumerThreadHandler<K, V> {
// KafkaConsumer实例
private final KafkaConsumer<K, V> consumer;
// ExecutorService实例
private ExecutorService executors;
// 位移信息offsets
private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
public ConsumerThreadHandler(String brokerList, String groupId, String topic) {
// 构造kafkaConsumer配置
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "false"); // 非自动提交位移信息
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
// 创建kafkaConsumer,赋值给成员变量consumer
consumer = new KafkaConsumer<>(props);
// 消费者订阅消息,并实现重平衡rebalance
// rebalance监听器,创建一个匿名内部类。使用rebalance监听器前提是使用消费者组(consumer group)。
// 监听器最常见用法就是手动提交位移到第三方存储以及在rebalance前后执行一些必要的审计操作。
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
// 在coordinator开启新一轮rebalance前onPartitionsRevoked方法会被调用。
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(offsets); // 提交位移
}
// rebalance完成后会调用onPartitionsAssigned方法。
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
offsets.clear(); // 清除位移信息
}
});
}
/**
* 消费主方法
* @param threadNumber 线程池中线程数
*/
public void consume(int threadNumber) {
// 创建一个worker线程池,线程数量为threadNumber个
executors = new ThreadPoolExecutor(
threadNumber,
threadNumber,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
// 只有一个消费者线程
try {
// 消费者一直处于等待状态,等待消息消费
while (true) {
// 从主题中获取消息
ConsumerRecords<K, V> records = consumer.poll(1000L);
// 如果获取到的消息不为空
if (!records.isEmpty()) {
// submit: 将一批msg交给worker线程处理,消息处理完后,更新offsets信息
executors.submit(new ConsumerWorker<>(records, offsets));
}
// 调用提交位移信息
commitOffsets();
}
} catch (WakeupException e) {
// swallow this exception
} finally {
commitOffsets(); // 调用提交位移信息
consumer.close(); // 关闭consumer
}
}
private void commitOffsets() {
Map<TopicPartition, OffsetAndMetadata> unmodfiedMap;
// 保证线程安全、同步锁,锁住offsets
synchronized (offsets) {
// 判断如果offsets位移信息为空,直接返回,节省同步锁对offsets的锁定的时间
if (offsets.isEmpty()) {
return;
}
// 如果offsets位移信息不为空,将位移信息offsets放到集合中,方便同步
unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets));
// 清除位移信息offsets
offsets.clear();
}
// 将封装好的位移信息unmodfiedMap集合进行同步提交
// 手动提交位移信息
consumer.commitSync(unmodfiedMap);
}
public void close() {
consumer.wakeup();
// 关闭ExecutorService实例
executors.shutdown();
}
}
Main类
包装了所有的工具类,启动整个程序
package huxi.test.consumer.multithreaded;
public class Main {
public static void main(String[] args) {
// broker列表、topic、group id
String brokerList = "localhost:9092";
String topic = "test-topic";
String groupID = "test-group";
// 1. 根据ConsumerThreadHandler构造方法,创建出消费者handler
final ConsumerThreadHandler<byte[], byte[]> handler = new ConsumerThreadHandler<>(brokerList, groupID, topic);
final int cpuCount = Runtime.getRuntime().availableProcessors();
// 创建线程的匿名内部类
Runnable runnable = new Runnable() {
@Override
public void run() {
// 执行consume,在此线程中执行消费者消费消息。
handler.consume(cpuCount);
}
};
new Thread(runnable).start(); // 3. 该函数会调用上面的run()方法
try {
// 20秒后自动停止该测试程序
Thread.sleep(20000L);
} catch (InterruptedException e) {
// swallow this exception
}
System.out.println("Starting to close the consumer...");
handler.close();
}
}
2.3.方案优缺点对比
优点 | 缺点 | |
方案1 | 实现简单 速度较快,因为无线程交互开销 方便位移管理 易于维护分区之间的消息消费顺序 | socket连接开销大 consumer受限于topic的分区数,扩展性差 broker端处理负载高(因为发往broker的请求较多) reblance可能性增大 |
方案2 | 消息获取和处理解耦 可独立扩展consumer和worker数,伸缩性好 | 实现负载 难于维护分区内的消息顺序 处理链路变长,导致位移管理困难 worker线程异常可能导致消费数据丢失 |