和上篇文章 动态订阅rocket mq实现(消费者组动态上下线) 目的一致,直接上代码
/**
* Kafka topic container集合
*/
private static final Map<String, ConcurrentMessageListenerContainer<String, String>> topics = new HashMap<>();
public void registerKafkaListeners(BinlogPortDatabaseConfig binlogPortDatabaseConfig) {
/*
BinlogPortDatabaseConfig是自定义的数据结构,即需要动态注册的kafka配置
包含topic、sever、client,自定义即可
*/
ConsumerFactory<String, String> consumerFactory = binlogPortDatabaseConfig.createConsumerFactory();
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setBatchListener(true);
if (consumerFactory == null) {
return;
}
factory.setConsumerFactory(consumerFactory);
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(binlogPortDatabaseConfig.getTopic());
//设置为false,解决client后自动加-0的问题
container.setAlwaysClientIdSuffix(false);
container.setupMessageListener((MessageListener<String, String>) record -> {
//TODO:你的消费逻辑,record即为消息体
}
} catch (IllegalArgumentException e) {
log.error("registerKafkaListeners JSON解析失败", e);
} catch (NullPointerException e) {
log.error("registerKafkaListeners 消息为空或部分字段缺失", e);
} catch (Exception e) {
log.error("registerKafkaListeners 注册异常", e);
}
});
container.start();
topics.put(binlogPortDatabaseConfig.getTopic(), container);
}
public void factoryDel(String topic) {
ConcurrentMessageListenerContainer<String, String> container = topics.get(topic);
if (!topic.isEmpty()) {
container.stop();
topics.remove(topic);
}
}
public ConsumerFactory<String, String> createConsumerFactory() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, /*你的kafka server*/);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, /*你的kafka client*/);
if (SystemEnvUtil.isTest()) {
props.put(ConsumerConfig.GROUP_ID_CONFIG, Constant.consumerGroupIdOffline + topic);
}
if (SystemEnvUtil.isProd() || SystemEnvUtil.isSandbox()) {
props.put(ConsumerConfig.GROUP_ID_CONFIG,/*你的group id*/);
}
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(100));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(false));
Map<String, Object> configMap = new java.util.HashMap<>();
for (Map.Entry<Object, Object> entry : props.entrySet()) {
configMap.put((String) entry.getKey(), entry.getValue());
}
return new DefaultKafkaConsumerFactory<>(configMap);
}