springboot+mqtt使用总结

news2024/11/16 7:22:23

1.软件的选型

1.1.使用免费版EMQX

1.1.1.下载

百度搜索的目前是会打开官网,这里提供下免费版的使用链接EMQX使用手册

文档很详细,这里不再记录了。

1.2.使用rabbitmq

rabbitmq一般做消息队列用,作为mqtt用我没有找到详细资料,这里总结下使用方法:

1.window安装rabbitmq

首先安装rabbitmq得依赖,也就是opt_win64_24.0.exe,然后傻瓜式安装接可
安装完毕,进入安装目录下,sbin文件夹

1.浏览器查看插件 执行命令
rabbitmq-plugins enable rabbitmq_management

回车,浏览器输入http://127.0.0.1:15672/#/看到此页面及安装成功,默认账号密码均是 guest

2.注意:如果做mqtt使用的话,需安装mqtt插件 安装命令
rabbitmq-plugins enable rabbitmq_mqtt
执行完命令,在浏览器上查看 mqtt及其端口号出现了的话,就证明安装成功,下面就可以开始整合了

2.linux安装rabbitmq

以前公司都是用window服务器,没用过linux,折腾了好久,安装 erlang与rabbitmq不对应 不是最新 等等一系列问题,最后看了一个视频 用 dock安装 根据官网
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
一句话就可以安装
如果后期需要安装插件
docker exec <容器id> rabbitmq-plugins enable rabbitmq_mqtt
ps:查看容器id 方法
1.使用docker ps -aqf “name=containername” -------简短容器id
2.docker inspect --format="{{.Id}}" container_name -------详情容器id

带密码启动dock
docker run -it --rm --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=密码 -p 15672:15672 -p 5672:5672 -p 1883:1883 rabbitmq:management


15672 是rabbitmq management管理界面默认访问端口
5672 是amqp默认端口
1883 是mqtt tcp协议默认端口
15675 是web_mqtt ws协议默认端口

2.springboot集成mqtt

2.1:yml文件集成配置

iot:
  mqtt:
    clientId: mqttClientOutputId
    sendTopic: ktcotrl/dy/#
    topics:
      - /ktcotrl/#
      - gateway/#    
    default:
      topic: "/ktcotrl/dy/*****"
      qos: 1
      receive:
        enable: true
    serverClientId: mqttClientInputId
    servers: tcp://ip:1883
    username: username
    password: password

2.2:主要代码


@Slf4j
@Configuration
public class IotMqttSubscriberConfig {

    @Autowired
    private MqttConfig mqttConfig;


    /*
     *
     *  MQTT连接器选项
     * *
     */
    @Bean(value = "getMqttConnectOptions")
    public MqttConnectOptions getMqttConnectOptions1() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
        mqttConnectOptions.setCleanSession(true);
        // 设置超时时间 单位为秒
        mqttConnectOptions.setConnectionTimeout(10);
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setUserName(mqttConfig.getUsername());
        mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{mqttConfig.getServers()});
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
        mqttConnectOptions.setKeepAliveInterval(10);
        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
        //mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);
        return mqttConnectOptions;
    }
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions1());
        return factory;
    }

    @Bean
    public MessageChannel iotMqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(),
                        mqttClientFactory(),
                        mqttConfig.getTopics().toArray(new String[0]));
//                        mqttConfig.getDefaultTopic());
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(2);
        adapter.setOutputChannel(iotMqttInputChannel());
        return adapter;
    }

  

 
    @Bean
    @ServiceActivator(inputChannel = "iotMqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {

                String topic= (String) message.getHeaders().get("mqtt_receivedTopic");
//                 msgid= message.getHeaders().get("id");
                 String messageContents= message.getPayload().toString();
              //操作
            }
        };
    }





    @Bean
    public MessageChannel defaultMqttInputChannel() {
        return new DirectChannel();
    }

    @Value("${iot.mqtt.default.topic}")
    private String defaultTopic;
    /**
     * 说明:
     * ConditionalOnProperty(value = "driver.mqtt.default.receive.enable")
     * 根据配置属性driver.mqtt.default.receive.enable选择是否开启 Default Topic 主题的数据接收逻辑
     *
     * @return
     */
//    @Bean
//    @ConditionalOnProperty(value = "iot.mqtt.default.receive.enable")
//    public MessageProducer defaultInbound() {
//        MqttPahoMessageDrivenChannelAdapter adapter =
//                new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(),
//                        mqttClientFactory(),
//                        defaultTopic);
//        adapter.setCompletionTimeout(5000);
//        adapter.setConverter(new DefaultPahoMessageConverter());
//        adapter.setQos(2);
//        adapter.setOutputChannel(defaultMqttInputChannel());
//        return adapter;
//    }

    /**
     * 说明:
     * ConditionalOnProperty(value = "iot.mqtt.default.receive.enable")
     * 根据配置属性driver.mqtt.default.receive.enable选择是否开启 Default Topic 主题的数据接收逻辑
     *
     * @return
     */
//    @Bean
//    @ServiceActivator(inputChannel = "defaultMqttInputChannel")
//    @ConditionalOnProperty(value = "iot.mqtt.default.receive.enable")
//    public MessageHandler defaultHandler() {
//
//        return message -> {
//            log.info(
//                    "defaultTopicReceiver\nheader:{},\npayload:{}",
//                    JSON.toJSONString(message.getHeaders(), true),
//                    JSON.toJSONString(message.getPayload(), true)
//            );
//        };
//    }
}
@Getter
@Setter
@Component
@IntegrationComponentScan
@ConfigurationProperties(prefix = "iot.mqtt")
public class MqttConfig {
    /*
     *
     * 服务地址
     */

    private String servers;

    /**
     * 客户端id
     */


    private String clientId;
/*
*
     * 服务端id
     */

    private String serverClientId;
/*
*
     * 默认主题
     */

    private String[] defaultTopic;

    private String sendTopic;
    /*
     *
     * 用户名和密码*/


    private String username;

    private String password;

    private List<String> topics;
}
@Configuration
@IntegrationComponentScan
@EnableIntegration
public class IotMqttSendConfig {
    @Autowired
    private MqttConfig mqttConfig;

    /**
     * 将channel绑定到MqttClientFactory上
     * ServiceActivator 表明当前方法用于处理Mqtt消息,inputChannel用于接收消息的通道
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
        mqttConnectOptions.setUserName(mqttConfig.getUsername());
        mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{mqttConfig.getServers()});
        mqttConnectOptions.setKeepAliveInterval(2);
        factory.setConnectionOptions(mqttConnectOptions);
        MqttPahoMessageHandler messageHandler =  new MqttPahoMessageHandler(mqttConfig.getServerClientId(), factory);
        messageHandler.setAsync(true);
        messageHandler.setDefaultRetained(false);
        messageHandler.setDefaultTopic(mqttConfig.getSendTopic());
        return messageHandler;
    }

    /* 发布者 */
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}
@RestController
@RequestMapping("/path")
@Slf4j
public class WkqController {

    @Autowired
    private IotMqttGateway mqttGateway;
    @RequestMapping("/test")
    @ResponseBody
    public void test() {
       
      //topic:主题
        mqttGateway.sendMessage2MqttHex( topic,1, "sendStr");
    }
   
/**
 * @description rabbitmq mqtt协议网关接口
 */
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface IotMqttGateway {

    void sendMessage2Mqtt(String data);

    void sendMessage2Mqtt(String data, @Header(MqttHeaders.TOPIC) String topic);

    void sendMessage2Mqtt(@Header(MqttHeaders.TOPIC) String topic,
                          @Header(MqttHeaders.QOS) int qos, String payload);
    void sendMessage2MqttHex(@Header(MqttHeaders.TOPIC) String topic,
                          @Header(MqttHeaders.QOS) int qos, byte[] payload);
    void sendMessage3Mqtt(@Header(MqttHeaders.TOPIC) String topic,
                          @Header(MqttHeaders.RECEIVED_TOPIC)String revicetopic,
                          @Header(MqttHeaders.QOS) int qos, String payload);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1788841.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

VB.net实战(VSTO):Excel插件的安装与卸载

1. 安装 1.1编程环境&#xff1a;Visual Studio 2022 1.2创建新项目&#xff1a; 1.3 加入一行测试程序&#xff1a;MsgBox&#xff08;“hello”&#xff09;&#xff0c;点击启动&#xff0c;确认可以弹窗 1.4 点击发布 1.5 找到安装程序&#xff0c;点击安装。打开Excel程…

win10环境下nodejs安装过程

打开 https://nodejs.org/en/官网下载node.js 2.下载完成后的安装文件为node-v16.16.0-x64.msi&#xff0c;双击进行安装即可。 3.一直默认安装&#xff0c;记得可以更改安装路径 4.其他不用打勾&#xff0c;一直next&#xff0c;安装完成即可。 5.安装完成后&#xff0c;wi…

软件测试需求管理指南规范(Word原件,项目管理全资料)

3 测试需求 3.1 测试范围 3.2 测试目标 4 测试需求的现状 5 测试需求的内容 5.1 主体内容 5.2 管理内容 6 测试需求的制定 6.1 需求信息来源 6.2 需求分析 6.2.1 功能性需求 6.2.2 系统功能需求 6.2.3 界面需求 6.2.4 安装需求 6.2.5 业务需求 6.2.6 非功能性需求 6.2.7 性能需…

告别复制粘贴:AI论文写作工具的高效应用

写作这件事一直让我们从小学时期就开始头痛&#xff0c;初高中时期800字的作文让我们焦头烂额&#xff0c;一篇作文里用尽了口水话&#xff0c;拼拼凑凑才勉强完成。 大学时期以为可以轻松顺利毕业&#xff0c;结果毕业前的最后一道坎拦住我们的是毕业论文&#xff0c;这玩意不…

企业应用架构模式--详解51种企业应用架构模式

导读&#xff1a;企业应用包括哪些&#xff1f;它们又分别有哪些架构模式&#xff1f; 世界著名软件开发大师Martin Fowler给你答案 目录 01什么是企业应用02 企业应用的种类03企业架构模式 01什么是企业应用 我的职业生涯专注于企业应用&#xff0c;因此&#xff0c;这里所谈…

【Python系列】Python装饰器

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

Spring AI 第二讲 之 Chat Model API 第二节Ollama Chat

通过 Ollama&#xff0c;您可以在本地运行各种大型语言模型 (LLM)&#xff0c;并从中生成文本。Spring AI 通过 OllamaChatModel 支持 Ollama 文本生成。 先决条件 首先需要在本地计算机上运行 Ollama。请参阅官方 Ollama 项目 README&#xff0c;开始在本地计算机上运行模型…

使用PyCharm 开发工具创建工程

一. 简介 前面学习了 安装 python解释器。如何安装python的一种开发工具 PyCharm。 本文来简单学习一下&#xff0c;如何使用 PyCharm 开发工具创建一个简单的 python工程。 二. PyCharm 开发工具创建一个工程 1. 首先&#xff0c;首先打开PyCharm 开发工具。选择 创建一…

【计算机毕业设计】谷物识别系统Python+人工智能深度学习+TensorFlow+卷积算法网络模型+图像识别

谷物识别系统&#xff0c;本系统使用Python作为主要编程语言&#xff0c;通过TensorFlow搭建ResNet50卷积神经算法网络模型&#xff0c;通过对11种谷物图片数据集&#xff08;‘大米’, ‘小米’, ‘燕麦’, ‘玉米渣’, ‘红豆’, ‘绿豆’, ‘花生仁’, ‘荞麦’, ‘黄豆’, …

白银票据~

一. 白银票据的原理 白银票据就伪造ST票据&#xff0c; kerberoasting是破解ST票据中的服务用户hash值&#xff0c;有以下区别&#xff1a; 白银票据&#xff1a;伪造的ST使用的是机器用户的Hash值 Kerberoasting:破解的是ST的域用户的hash值二. 白银票据的利用条件 1.域名 …

[数据集][目标检测]旋风检测数据集VOC+YOLO格式157张1类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;159 标注数量(xml文件个数)&#xff1a;159 标注数量(txt文件个数)&#xff1a;159 标注类别…

【机器学习】GBDT (Gradient Boosting Decision Tree) 深入解析

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 GBDT (Gradient Boosting Decision Tree) 深入解析引言一、GBDT基础理论1.1 梯度…

C++——从C语言快速入门

目录 一、数组 1、声明数组 2、初始化数组 3、访问数组元素 4、示例 5、注意事项 6、数组小练习 计算器支持加减乘除 数组找最大值 二、指针 三、字符串 string 类型 一、数组 在 C 中&#xff0c;数组是一种存储固定大小的相同类型元素的序列。数组的所有元素都存…

Django中使用Celery和APScheduler实现定时任务

在之前的文章我们已经学习了Celery和APScheduler的基本使用&#xff0c;下面让我们来了解一下如何在Django中使用Celery和APScheduler Celery 1.前提工作 python 3.7 pip install celery pip install eventlet #5.0版本以下 pip install importlib-metadata4.8.3&#xff08…

批量修改文件

最近几个月的文章都直接发在公众号上&#xff0c;没有同步到博客上&#xff0c;想去同步时发现已经有不少了&#xff0c;一个个修改太麻烦了。 之前没规划好&#xff0c;所以博客文章都是直接放在仓库一个目录下&#xff0c;数量多了之后&#xff0c;有点乱&#xff0c;不好管…

工业交换机如何防止广播风暴

工业交换机作为网络设备的重要组成部分&#xff0c;在网络中起到了连接各个设备和传输数据的重要作用。然而&#xff0c;广播风暴是工业交换机面临的一个常见问题&#xff0c;会影响网络性能和稳定性。为了防止广播风暴的发生&#xff0c;工业交换机可以采取一系列有效的措施。…

[数据集][目标检测]吉他检测数据集VOC+YOLO格式66张1类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;66 标注数量(xml文件个数)&#xff1a;66 标注数量(txt文件个数)&#xff1a;66 标注类别数…

qt中使用QSLite时发现query.value(0).toInt()未获取数据问题

1、首先确保数据库名没有问题 2、确保正常连接 3、非常重要的一点&#xff1a;query.value(0).toInt()之前&#xff0c;必须要有query.first()&#xff0c;非常重要&#xff0c;缺少了这个会一直查不到

ruoyi-nbcio基于jeecg的flowable前端支持自定义表单组件的自动获取方法

更多ruoyi-nbcio功能请看演示系统 gitee源代码地址 前后端代码&#xff1a; https://gitee.com/nbacheng/ruoyi-nbcio 演示地址&#xff1a;RuoYi-Nbcio后台管理系统 http://218.75.87.38:9666/ 更多nbcio-boot功能请看演示系统 gitee源代码地址 后端代码&#xff1a; h…

什么时候需要用到 @EnableWebSecurity 注解?

有小伙伴在学习 Spring Security 的遇到一个问题&#xff1a; 箭头所指的位置报红&#xff0c;也就是 Spring 容器中没有找到一个类型为 HttpSecurity 的 Bean。 小伙伴说如果他在配置类上加 EnableWebSecurity 注解&#xff0c;就不报错&#xff1b;不加该注解则会报错。那么…