1.软件的选型
1.1.使用免费版EMQX
1.1.1.下载
百度搜索的目前是会打开官网,这里提供下免费版的使用链接EMQX使用手册
文档很详细,这里不再记录了。
1.2.使用rabbitmq
rabbitmq一般做消息队列用,作为mqtt用我没有找到详细资料,这里总结下使用方法:
1.window安装rabbitmq
首先安装rabbitmq得依赖,也就是opt_win64_24.0.exe,然后傻瓜式安装接可
安装完毕,进入安装目录下,sbin文件夹
1.浏览器查看插件 执行命令
rabbitmq-plugins enable rabbitmq_management
回车,浏览器输入http://127.0.0.1:15672/#/看到此页面及安装成功,默认账号密码均是 guest
2.注意:如果做mqtt使用的话,需安装mqtt插件 安装命令
rabbitmq-plugins enable rabbitmq_mqtt
执行完命令,在浏览器上查看 mqtt及其端口号出现了的话,就证明安装成功,下面就可以开始整合了
2.linux安装rabbitmq
以前公司都是用window服务器,没用过linux,折腾了好久,安装 erlang与rabbitmq不对应 不是最新 等等一系列问题,最后看了一个视频 用 dock安装 根据官网
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
一句话就可以安装
如果后期需要安装插件
docker exec <容器id> rabbitmq-plugins enable rabbitmq_mqtt
ps:查看容器id 方法
1.使用docker ps -aqf “name=containername” -------简短容器id
2.docker inspect --format="{{.Id}}" container_name -------详情容器id
带密码启动dock
docker run -it --rm --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=密码 -p 15672:15672 -p 5672:5672 -p 1883:1883 rabbitmq:management
15672 是rabbitmq management管理界面默认访问端口
5672 是amqp默认端口
1883 是mqtt tcp协议默认端口
15675 是web_mqtt ws协议默认端口
2.springboot集成mqtt
2.1:yml文件集成配置
iot:
mqtt:
clientId: mqttClientOutputId
sendTopic: ktcotrl/dy/#
topics:
- /ktcotrl/#
- gateway/#
default:
topic: "/ktcotrl/dy/*****"
qos: 1
receive:
enable: true
serverClientId: mqttClientInputId
servers: tcp://ip:1883
username: username
password: password
2.2:主要代码
@Slf4j
@Configuration
public class IotMqttSubscriberConfig {
@Autowired
private MqttConfig mqttConfig;
/*
*
* MQTT连接器选项
* *
*/
@Bean(value = "getMqttConnectOptions")
public MqttConnectOptions getMqttConnectOptions1() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
mqttConnectOptions.setCleanSession(true);
// 设置超时时间 单位为秒
mqttConnectOptions.setConnectionTimeout(10);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setUserName(mqttConfig.getUsername());
mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());
mqttConnectOptions.setServerURIs(new String[]{mqttConfig.getServers()});
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
mqttConnectOptions.setKeepAliveInterval(10);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
//mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions1());
return factory;
}
@Bean
public MessageChannel iotMqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(),
mqttClientFactory(),
mqttConfig.getTopics().toArray(new String[0]));
// mqttConfig.getDefaultTopic());
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(2);
adapter.setOutputChannel(iotMqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "iotMqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic= (String) message.getHeaders().get("mqtt_receivedTopic");
// msgid= message.getHeaders().get("id");
String messageContents= message.getPayload().toString();
//操作
}
};
}
@Bean
public MessageChannel defaultMqttInputChannel() {
return new DirectChannel();
}
@Value("${iot.mqtt.default.topic}")
private String defaultTopic;
/**
* 说明:
* ConditionalOnProperty(value = "driver.mqtt.default.receive.enable")
* 根据配置属性driver.mqtt.default.receive.enable选择是否开启 Default Topic 主题的数据接收逻辑
*
* @return
*/
// @Bean
// @ConditionalOnProperty(value = "iot.mqtt.default.receive.enable")
// public MessageProducer defaultInbound() {
// MqttPahoMessageDrivenChannelAdapter adapter =
// new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(),
// mqttClientFactory(),
// defaultTopic);
// adapter.setCompletionTimeout(5000);
// adapter.setConverter(new DefaultPahoMessageConverter());
// adapter.setQos(2);
// adapter.setOutputChannel(defaultMqttInputChannel());
// return adapter;
// }
/**
* 说明:
* ConditionalOnProperty(value = "iot.mqtt.default.receive.enable")
* 根据配置属性driver.mqtt.default.receive.enable选择是否开启 Default Topic 主题的数据接收逻辑
*
* @return
*/
// @Bean
// @ServiceActivator(inputChannel = "defaultMqttInputChannel")
// @ConditionalOnProperty(value = "iot.mqtt.default.receive.enable")
// public MessageHandler defaultHandler() {
//
// return message -> {
// log.info(
// "defaultTopicReceiver\nheader:{},\npayload:{}",
// JSON.toJSONString(message.getHeaders(), true),
// JSON.toJSONString(message.getPayload(), true)
// );
// };
// }
}
@Getter
@Setter
@Component
@IntegrationComponentScan
@ConfigurationProperties(prefix = "iot.mqtt")
public class MqttConfig {
/*
*
* 服务地址
*/
private String servers;
/**
* 客户端id
*/
private String clientId;
/*
*
* 服务端id
*/
private String serverClientId;
/*
*
* 默认主题
*/
private String[] defaultTopic;
private String sendTopic;
/*
*
* 用户名和密码*/
private String username;
private String password;
private List<String> topics;
}
@Configuration
@IntegrationComponentScan
@EnableIntegration
public class IotMqttSendConfig {
@Autowired
private MqttConfig mqttConfig;
/**
* 将channel绑定到MqttClientFactory上
* ServiceActivator 表明当前方法用于处理Mqtt消息,inputChannel用于接收消息的通道
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
mqttConnectOptions.setUserName(mqttConfig.getUsername());
mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());
mqttConnectOptions.setServerURIs(new String[]{mqttConfig.getServers()});
mqttConnectOptions.setKeepAliveInterval(2);
factory.setConnectionOptions(mqttConnectOptions);
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getServerClientId(), factory);
messageHandler.setAsync(true);
messageHandler.setDefaultRetained(false);
messageHandler.setDefaultTopic(mqttConfig.getSendTopic());
return messageHandler;
}
/* 发布者 */
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
@RestController
@RequestMapping("/path")
@Slf4j
public class WkqController {
@Autowired
private IotMqttGateway mqttGateway;
@RequestMapping("/test")
@ResponseBody
public void test() {
//topic:主题
mqttGateway.sendMessage2MqttHex( topic,1, "sendStr");
}
/**
* @description rabbitmq mqtt协议网关接口
*/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface IotMqttGateway {
void sendMessage2Mqtt(String data);
void sendMessage2Mqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
void sendMessage2Mqtt(@Header(MqttHeaders.TOPIC) String topic,
@Header(MqttHeaders.QOS) int qos, String payload);
void sendMessage2MqttHex(@Header(MqttHeaders.TOPIC) String topic,
@Header(MqttHeaders.QOS) int qos, byte[] payload);
void sendMessage3Mqtt(@Header(MqttHeaders.TOPIC) String topic,
@Header(MqttHeaders.RECEIVED_TOPIC)String revicetopic,
@Header(MqttHeaders.QOS) int qos, String payload);
}