基于springboot2.5.7
废话不多说,直接上干货:
@Slf4j
@Configuration
@EnableConfigurationProperties(MqttProperties.class)
@IntegrationComponentScan(basePackages = {"扫描包路径","扫描包路径"})
public class MqttAutoConfig {
@Autowired
private MqttProperties mqttProperties;
@Autowired
private ApplicationContext applicationContext;
@Autowired
private DisruptorProperties disruptorProperties;
@RefreshScope
@Bean(value = "mqttParallelQueueHandler",initMethod = "start",destroyMethod = "shutDown")
@ConditionalOnProperty(prefix = "custom-config.mqtt", name = "disruptor", havingValue = "true")
public ParallelQueueHandler mqttParallelQueueHandler(){
log.info("初始化Disruptor...");
return new ParallelQueueHandler.Builder<DisruptorEventData>()
.setDisruptorProperties(disruptorProperties)
.setWaitStrategy(new BlockingWaitStrategy())
.setListener(new MQTTMsgListener())
.build();
}
@Bean
@ConditionalOnProperty(prefix = "custom-config.mqtt", value = {"username","password", "host-url"})
public MqttConnectOptions getReceiverMqttConnectOptionsForSub(){
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(mqttProperties.getUsername());
mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
List<String> hostList = Arrays.asList(mqttProperties.getHostUrl().trim().split(","));
String[] serverURIs = new String[hostList.size()];
hostList.toArray(serverURIs);
mqttConnectOptions.setServerURIs(serverURIs);
mqttConnectOptions.setKeepAliveInterval(2);
mqttConnectOptions.setAutomaticReconnect(true);
return mqttConnectOptions;
}
/**
* MQTT 连接工厂
* @return MqttPahoClientFactory
*/
@Bean
@ConditionalOnMissingBean
public MqttPahoClientFactory receiverMqttClientFactoryForSub(MqttConnectOptions mqttConnectOptions) {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(mqttConnectOptions);
log.info("【MQTT】-初始化连接工厂...");
return factory;
}
/**
* 出站通道
* @return MessageChannel
*/
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* MQTT 消息发送处理器
* @return MessageHandler
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound(MqttPahoClientFactory factory) {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
mqttProperties.getClientId()+"out", factory);
messageHandler.setDefaultQos(1);
//开启异步
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("test");
return messageHandler;
}
/**
* 此处可以使用其他消息通道
* Spring Integration默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收。
*
* @return MessageChannel
*/
@Bean
public MessageChannel mqttInBoundChannel() {
return new DirectChannel();
}
/**
* 适配器, 多个topic共用一个adapter
* 客户端作为消费者,订阅主题,消费消息
*/
@Bean
@ConditionalOnMissingBean
public MqttPahoMessageDrivenChannelAdapter mqttInbound(MqttPahoClientFactory factory) {
List<String> topics = mqttProperties.getSubscribeTopics();
String[] topicArray = new String[topics.size()];
for (int i = 0; i < topics.size(); i++) {
topicArray[i] = "$queue/"+ topics.get(i);
}
log.info("【MQTT】-订阅TOPIC:{}", Arrays.toString(topicArray));
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId(), factory, topicArray);
adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setRecoveryInterval(10000);
adapter.setQos(1);
adapter.setOutputChannel(mqttInBoundChannel());
return adapter;
}
@Autowired(required = false)
@Qualifier("mqttParallelQueueHandler")
private ParallelQueueHandler<DisruptorEventData> parallelQueueHandler;
/**
* mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。
* @return MessageHandler
*/
@Bean
@RefreshScope
@ServiceActivator(inputChannel = "mqttInBoundChannel")
public MessageHandler mqttMessageHandler() {
// 获取配置中的设备品牌
MqttProperties.DeviceBrand deviceBrand = mqttProperties.getDeviceBrand();
boolean disruptor = mqttProperties.isDisruptor();
// 获取所有实现了 CustomMqttMessageHandler 接口的 Bean
return message -> {
log.info("【MQTT】-收到MQTT消息,Topic: {}, Payload: {}",
message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC),
message.getPayload());
String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
Map<String, CustomMqttMessageReceiverHandler> handlers = applicationContext.getBeansOfType(CustomMqttMessageReceiverHandler.class);
boolean handled = false;
if (Objects.nonNull(deviceBrand)){
CustomMqttMessageReceiverHandler handler = handlers.get(deviceBrand.getServiceName());
if (Objects.nonNull(handler)){
if (handler.supportsTopic(topic)) {
handled = this.run(handler,message,topic,disruptor);
}
}else {
log.error("【MQTT】-未找到设备品牌消息接收处理器,deviceBrand->{}",deviceBrand);
}
}else {
for (CustomMqttMessageReceiverHandler handler : handlers.values()) {
if (handler.supportsTopic(topic)) {
handled = this.run(handler,message,topic,disruptor);
}
}
}
if (!handled) {
log.warn("【MQTT】-未找到匹配的处理器来处理Topic {} 的消息", topic);
}
};
}
@Bean
@ConditionalOnProperty(prefix = "custom-config.mqtt", value = {"username","password", "host-url"})
public MqttMessageSender mqttMessageSender(){
return new MqttMessageSender();
}
private boolean run(CustomMqttMessageReceiverHandler handler,Message<?> message,String topic,boolean disruptor){
try {
String traceId = MDC.get("traceId");
if (!StringUtils.hasText(traceId)){
traceId = UUID.randomUUID().toString().replaceAll("-", "");
MDC.put("traceId",traceId);
}
if (disruptor && Objects.nonNull(parallelQueueHandler)){
log.info("【MQTT】-使用Disruptor处理...");
DisruptorEventData data = new DisruptorEventData();
Map<String,Object> map = new HashMap<>();
map.put("data",message);
map.put("handler",handler);
map.put("traceId",traceId);
data.setMessage(map);
parallelQueueHandler.add(data);
}else {
handler.handleMessage(message);
}
return true;
} catch (Exception e) {
log.error("【MQTT】-Handler {} 处理Topic {} 的消息时出错", handler.getClass().getSimpleName(), topic, e);
return false;
}finally {
MDC.clear();
}
}
由于涉及隐私,其余代码可以留言