目录
- 一、生产者自定义分区器代码示例
- 1.1、自定义分区器类
- 1.2、生产者发送消息代码(生产者的配置中添加分区器参数)
- 1.3、测试
一、生产者自定义分区器代码示例
1.1、自定义分区器类
-
代码
package com.xz.kafka.producer; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; /** * 1. 实现接口 Partitioner * 2. 实现 3 个方法:partition,close,configure * 3. 编写 partition 方法,返回分区号 */ public class MyPartitioner implements Partitioner { /** * 返回信息对应的分区 * @param topic 主题 * @param key 消息的 key * @param keyBytes 消息的 key 序列化后的字节数组 * @param value 消息的 value * @param valueBytes 消息的 value 序列化后的字节数组 * @param cluster 集群元数据可以查看分区信息 * @return */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 获取消息 String msgValues = value.toString(); int partition; // 判断消息是否包含 hello if (msgValues.contains("hello")){ partition = 0; }else { partition = 1; } // 返回分区号 return partition; } /** * 关闭资源 */ @Override public void close() { } /** * 配置方法 */ @Override public void configure(Map<String, ?> configs) { } }
1.2、生产者发送消息代码(生产者的配置中添加分区器参数)
-
代码
package com.xz.kafka.producer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; /** * @author: xz * @since: 2023/4/9 20:56 * @description: 使用自定义的分区器方法,在生产者的配置中添加分区器参数。 */ public class CustomProducerMyPartitioner { public static void main(String[] args) throws InterruptedException { //1、创建 kafka 生产者的配置对象 Properties properties = new Properties(); //2、给 kafka 配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092"); //3、指定对应的key和value的序列化类型 key.serializer value.serializer properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); //4、添加自定义分区器 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class); //5、创建 kafka 生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); //6、调用 send 方法,发送消息 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("news", "hello kafka" + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null){ System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition()); } } }); Thread.sleep(2); } //7、关闭资源 kafkaProducer.close(); } }
1.3、测试
-
在 kafka集群上开启 Kafka 消费者
[root@localhost kafka-3.0.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.136.28:9092 --topic news
-
启动main方法,在 IDEA 控制台观察回调信息,发送消息内容包含hello,则发送到0号分区,如下图:
-
发送消息内容不包含hello,则发送到1号分区,如下图: