此处测试的mqtt的Broker
是使用的EMQX 5.7.1
,可移步至https://blog.csdn.net/tiantang_1986/article/details/140443513查看详细介绍
一、方式1
添加必要的依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
配置
# mqtt 服务端配置
spring:
# mqtt 配置
mqtt:
url: tcp://127.0.0.1:1883,tcp://127.0.0.2:1883
clientId: "00000001" # 客户端Id(不可重复)
username: <访问用户名> # 认证的用户名
password: <访问密码> # 认证的密码
qos: 1
topic: test/# # 监听的topic
读取配置文件
import org.apache.commons.lang3.StringUtils;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfig {
private String username;
private String password;
private String url;
private String clientId;
private String topic;
private Integer qos;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
if (StringUtils.isNotBlank(url) && url.contains(",")) {
options.setServerURIs(url.split(","));
} else {
options.setServerURIs(new String[]{url});
}
options.setCleanSession(true);
//自动重连
options.setAutomaticReconnect(true);
//设置超时时间,单位为秒
options.setConnectionTimeout(0);
//设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线
options.setKeepAliveInterval(90);
//设置遗嘱消息
options.setWill("will_topic", (this.clientId + "与服务器断开连接").getBytes(), qos, false);
factory.setConnectionOptions(options);
factory.setPersistence(new MemoryPersistence());
return factory;
}
}
MQTT消息入站配置
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttInboundConfiguration {
@Resource
private MqttConfig mqttConfig;
@Resource
private MqttPahoClientFactory mqttClientFactory;
@Resource
private MqttMessageReceiver mqttMessageReceiver;
@Bean
public MessageChannel mqttInBoundChannel() {
return new PublishSubscribeChannel();
}
@Bean
public MessageProducerSupport mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory, mqttConfig.getTopic());
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
//传输Hex数据,如果是String则可使用默认值false
converter.setPayloadAsBytes(true);
adapter.setConverter(converter);
adapter.setRecoveryInterval(10000);
adapter.setQos(mqttConfig.getQos());
adapter.setOutputChannel(mqttInBoundChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInBoundChannel")
public MessageHandler mqttMessageHandler() {
return this.mqttMessageReceiver;
}
}
消费者
@Slf4j
@Component
public class MqttMessageReceiver implements MessageHandler {
@Resource
private DataConvertStrategyFactory convertStrategyContext;
@Override
public void handleMessage(Message<?> message) throws MessagingException {
MessageHeaders headers = message.getHeaders();
String topic = (String) headers.get(MqttHeaders.RECEIVED_TOPIC);
if (StringUtils.isNotBlank(topic)) {
return;
}
byte[] payload = (byte[]) message.getPayload();
log.info("topic: {}, message: {}", topic, HexUtils.bytesToHex(payload));
//从topic中获取clientId,topic的格式:{业务}/{clientId}/{事件标识}
Map<String, String> map = MqttDataConverter.covertTopic(topic);
String clientId = map.get("clientId");
log.info("clientId: {}", clientId);
//topic中的事件标识
String eventUrl = map.get("event");
//自定义的enum,主要用来消息处理消息分组,相同组可以使用相同的数据转换服务
Event[] events = Event.values();
String deviceId = clientId;
Arrays.stream(events).filter(item -> item.getEvent().equals(eventUrl)).findFirst().ifPresent(item -> {
//使用策略模式实现
DataConvertService convertService = convertStrategyContext.getStrategy(item.getGroup());
convertService.convert(deviceId, eventUrl, payload);
});
}
}
数据转换服务接口,具体的数据解析只要实现这个接口就行
public interface DataConvertService {
/**
* 转换数据
*
* @param clientId 设备SN
* @param topic topic
* @param data 数据
* @return
*/
Boolean convert(String clientId, String topic, byte[] data);
/**
* 获取转换器
*
* @return
*/
String getConverter();
}
MQTT数据转换策略工厂
@Component
public class DataConvertStrategyFactory implements InitializingBean {
@Resource
private List<DataConvertService> handlers;
private Map<String, DataConvertService> dataConvertServiceMap = new ConcurrentHashMap<>();
/**
* 初始化
*/
@Override
public void afterPropertiesSet() {
//进行初始化
if (CollectionUtils.isNotEmpty(handlers)) {
handlers.forEach(item -> {
dataConvertServiceMap.put(item.getConverter(), item);
});
}
}
/**
* 返回实际处理对象
*
* @param strategy 处理策略
* @return 实际处理对象
*/
public DataConvertService getStrategy(String strategy) {
return dataConvertServiceMap.get(strategy);
}
}
二、方式2
使用EMQX
的Webhook
钩子
首先创建钩子函数,把需要监听的事件加上处理逻辑,示例:
@Slf4j
@RequestMapping("/mqtt/client")
@RestController
public class ClientController {
@PostMapping("/webhook")
public Result webhook(@RequestBody Map<String, Object> message) {
log.info("webhook map:{}", message);
String action = (String) message.get("action");
String clientid = (String) message.get("clientid");
if ("client_connected".equals(action)) {
log.info("client:{} 上线", clientid);
}
if ("client_disconnected".equals(action)) {
log.info("client:{} 下线", clientid);
}
if ("message.publish".equals(action)) {
log.info("已接收到 client:{} 的消息:{}", clientid, message.get("payload"));
}
return Result.success("OK");
}
}
然后在EMQX的Dashboard
中创建Webhook
,可以选择多个触发器
填好URL后可以进行测试,之后使用MQTTX
进行消息发送测试
控制台输出日志
三、方式3
package com.iinplus.mqtt.handler;
import com.iinplus.mqtt.config.MqttConfig;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Slf4j
@Component
public class MqttSubscriber implements InitializingBean {
@Resource
private MqttConfig config;
@Override
public void afterPropertiesSet() {
try {
MqttClient client = new MqttClient(config.getUrl(), config.getClientId(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(config.getUsername());
options.setPassword(config.getPassword().toCharArray());
options.setCleanSession(true);
options.setAutomaticReconnect(true);
options.setConnectionTimeout(0);
client.connect(options);
client.subscribe(config.getTopic());
//设置消息回调
client.setCallback(new MqttMsgHandler());
} catch (MqttException e) {
log.error("MqttException:", e);
}
}
}
消息回调处理
@Slf4j
public class MqttMsgHandler implements MqttCallback {
@Override
public void connectionLost(Throwable t) {
// 连接丢失
log.info("Connection lost:", t);
}
@Override
public void messageArrived(String topic, MqttMessage message) {
// 接收到消息
log.info("Message arrived:" + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// 消息发送成功
log.info("Delivery complete");
}
}