Java集成MQTT和Kafka实现高可用方案
1. 概述
在物联网(IoT)和分布式系统中,消息传递的可靠性和高可用性至关重要。本文将详细介绍如何使用Java集成MQTT和Kafka来构建一个高可用的消息处理系统。
MQTT(消息队列遥测传输)是一种轻量级的发布/订阅协议,适用于资源受限的设备和低带宽、高延迟网络。而Kafka是一个分布式流处理平台,提供高吞吐量、可扩展性和持久性。将两者结合,可以创建一个既能处理大量IoT设备连接,又能保证消息可靠传递和处理的系统。
2. 架构设计
我们的高可用架构设计如下:
主要组件:
- MQTT集群:使用EMQ X等MQTT代理实现集群
- Kafka集群:作为中央消息总线和持久化层
- 桥接组件:将MQTT消息转发到Kafka
- Java应用服务:处理和分析消息
- 监控系统:确保整个系统的健康运行
3. Java集成MQTT实现
3.1 Maven依赖
<dependencies>
<!-- MQTT客户端 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<!-- Spring Integration MQTT -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.15</version>
</dependency>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<version>2.7.8</version>
</dependency>
</dependencies>
3.2 MQTT配置类
@Configuration
public class MqttConfig {
@Value("${mqtt.broker.urls}")
private String[] brokerUrls; // 多个MQTT代理地址,用于故障转移
@Value("${mqtt.client.id}")
private String clientId;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.topics}")
private String[] topics;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
// 设置多个服务器地址,实现故障转移
options.setServerURIs(brokerUrls);
// 设置自动重连
options.setAutomaticReconnect(true);
options.setKeepAliveInterval(30);
options.setConnectionTimeout(30);
// 设置遗嘱消息,当客户端异常断开时发送
options.setWill("clients/status",
(clientId + ": disconnected").getBytes(),
1,
true);
if (username != null && !username.isEmpty()) {
options.setUserName(username);
options.setPassword(password.toCharArray());
}
// 设置清除会话,false表示客户端断开连接后,服务器保留其订阅信息
options.setCleanSession(false);
factory.setConnectionOptions(options);
return factory;
}
// 出站通道(用于发送消息)
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
// 出站消息处理器
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler(clientId + "-pub", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultQos(1);
return messageHandler;
}
// 入站通道(用于接收消息)
@Bean
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
// 入站消息适配器
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
clientId + "-sub",
mqttClientFactory(),
topics);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
}
3.3 MQTT服务类
@Service
@Slf4j
public class MqttService {
private final MessageChannel mqttOutboundChannel;
@Autowired
public MqttService(MessageChannel mqttOutboundChannel) {
this.mqttOutboundChannel = mqttOutboundChannel;
}
// 发布消息到MQTT主题
public void publish(String topic, String payload) {
log.info("Publishing message to topic {}: {}", topic, payload);
Message<String> message = MessageBuilder
.withPayload(payload)
.setHeader(MqttHeaders.TOPIC, topic)
.setHeader(MqttHeaders.QOS, 1)
.setHeader(MqttHeaders.RETAINED, false)
.build();
mqttOutboundChannel.send(message);
}
// 处理接收到的MQTT消息
@ServiceActivator(inputChannel = "mqttInboundChannel")
public void handleMessage(Message<?> message) {
String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
String payload = message.getPayload().toString();
log.info("Received message from topic {}: {}", topic, payload);
// 这里可以添加消息处理逻辑,或者转发到Kafka
}
}
4. Java集成Kafka实现
4.1 Maven依赖
<dependencies>
<!-- Kafka客户端 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.2</version>
</dependency>
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.9.5</version>
</dependency>
</dependencies>
4.2 Kafka配置类
@Configuration
public class KafkaConfig {
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Value("${kafka.consumer.group.id}")
private String consumerGroupId;
// Kafka生产者配置
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
// 设置Kafka集群地址
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 高可用配置
// acks=all表示所有副本都确认后才认为消息发送成功
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
// 重试次数
configProps.put(ProducerConfig.RETRIES_CONFIG, 10);
// 启用幂等性,确保消息不会重复发送
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 批处理大小
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
// 批处理延迟
configProps.put(ProducerConfig.LINGER_MS_CONFIG, 20);
// 缓冲区大小
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
// Kafka消费者配置
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 高可用配置
// 自动提交偏移量
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 从最早的消息开始消费
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 最大拉取记录数
configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// 心跳间隔
configProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
// 会话超时
configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
// 最大拉取间隔
configProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 设置并发消费者数量
factory.setConcurrency(3);
// 批量消费
factory.setBatchListener(true);
// 手动提交偏移量
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
4.3 Kafka服务类
@Service