👉博主介绍: 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO TOP红人
Java知识图谱点击链接:体系化学习Java(Java面试专题)
💕💕 感兴趣的同学可以收藏关注下 ,不然下次找不到哟💕💕
✊✊ 感觉对你有帮助的朋友,可以给博主一个三连,非常感谢 🙏🙏🙏
文章目录
- 1、生产者写入分区的策略有哪些?
- 2、轮询分区策略
- 3、随机分区策略
- 4、按 key 分区策略
- 5、自定义分区策略
- 写在最后
1、生产者写入分区的策略有哪些?
生产者写入分区的策略主要有以下几种:
-
轮询分区策略:生产者可以使用轮询策略将消息依次写入每个分区,实现负载均衡。在每次发送消息时,生产者会按照轮询的方式选择下一个可用的分区,并将消息写入该分区。这样可以确保消息均匀地分布在各个分区中。
-
随机分区策略:Kafka生产者随机的将消息写入分区,有可能会造成消息的分布不均,所以这个策略基本上也很少用。
-
按 key 分区策略:Kafka生产者基于消息的键(key)进行哈希计算,然后将消息写入对应的分区。这种策略可以保证具有相同键的消息被写入到相同的分区,从而保证消息的顺序性。
-
自定义分区策略:Kafka生产者可以使用自定义分区策略来决定将消息写入哪个分区。
2、轮询分区策略
轮询分区的代码如下:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
public class RoundRobinPartitioner implements Partitioner {
private int currentPartition;
@Override
public void configure(Map<String, ?> configs) {
// 初始化当前分区索引
currentPartition = 0;
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 轮询选择下一个分区
int selectedPartition = currentPartition;
currentPartition = (currentPartition + 1) % numPartitions;
return selectedPartition;
}
@Override
public void close() {
// 可选:清理资源
}
}
partition 方法会使用一个变量 currentPartition 来记录当前选择的分区索引。每次调用 partition 方法时,会将 currentPartition 增加 1,并通过取模运算来确保选择的分区索引始终在分区数范围内。
要使用轮询分区策略,您需要在生产者配置中设置 partitioner.class 属性为您自定义分区器的类名:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.RoundRobinPartitioner");
3、随机分区策略
随机分区的代码如下:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
import java.util.Random;
public class RandomPartitioner implements Partitioner {
private final Random random = new Random();
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
return random.nextInt(numPartitions);
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
partition 方法会随机选择一个分区返回。 random.nextInt(numPartitions) 方法会生成一个小于分区数的随机数,作为分区的索引。
要使用随机分区策略,您需要在生产者配置中设置 partitioner.class 属性为您自定义分区器的类名:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.RandomPartitioner");
4、按 key 分区策略
按 key 分区的代码如下:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
public class KeyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
// 如果 key 为 null,则使用轮询分区策略
return Math.abs(key.hashCode()) % numPartitions;
} else {
// 使用 key 的哈希码来确定分区
return Math.abs(Utils.murmur2(keyBytes)) % numPartitions;
}
}
@Override
public void close() {
// 可选:清理资源
}
@Override
public void configure(Map<String, ?> configs) {
// 可选:配置方法
}
}
partition 方法会检查 key 是否为 null。如果 key 为 null,就会使用轮询分区策略,通过计算 key 的哈希码并对分区数取模来确定分区。如果 key 不为 null,则使用 key 的字节数组的哈希码来确定分区。
要使用基于 key 的分区策略,您需要在生产者配置中设置 partitioner.class 属性为您自定义分区器的类名:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.KeyPartitioner");
5、自定义分区策略
自定义分区的代码如下:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 自定义分区逻辑
// 根据消息的 key 或 value 来选择分区
// 这里以 key 的哈希值作为分区选择依据
int partition = Math.abs(key.hashCode()) % numPartitions;
return partition;
}
@Override
public void close() {
// 可选:清理资源
}
@Override
public void configure(Map<String, ?> configs) {
// 可选:配置分区器
}
}
partition 方法根据消息的 key 或 value 来选择分区。这里使用 key 的哈希值进行取模运算,以确保选择的分区索引在分区数范围内。
要使用自定义分区策略,您需要在生产者配置中设置 partitioner.class 属性为您自定义分区器的类名:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.CustomPartitioner");
写在最后
通过y以上这些实现,生产者将根据自定义的分区策略来选择分区来发送消息。您可以根据自己的需求,实现不同的分区逻辑。
💕💕 本文由激流原创,原创不易,希望大家关注、点赞、收藏,给博主一点鼓励,感谢!!!
🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃