SpringBoot中使用EMQX实现MQTT通讯

news2025/1/20 5:57:46

简述

之前写过一篇SpringBoot通过Netty实现TCP服务的文章,本篇与之前那篇实现的场景类似,都是服务器与客户端之间双向交互,但个人觉得MQTT的方式实现更好,优雅。

基础

MQTT协议是通过MQTT服务器转发消息,MQTT服务器作为三方,为每个客户端转发消息。与TCP不同的是,TCP编码,java大部分场景是作为服务端,设备作为客户端,而MQTT是一台单独的服务器,java跟设备都作为客户端与之保持长连接。

准备

下载EMQX,对应有windows、mac、linux的版本

下载网址:EMQX下载网址

解压,启动

bin文件夹下执行:(稍等五秒钟,出现两行英文时表示启动完成)

emqx start

浏览器打开

http://localhost:18083/

启动成功。

账号密码:admin/public

若代码里连接MQTT服务时带上用户名密码需要如下操作:

创建客户端认证

编码

maven依赖

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

监听器

package com.dpkj.mqtt.mqtt;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dpkj.mqtt.service.DpDeviceService;
import com.mdd.common.entity.DpDevice;
import com.mdd.common.enums.CtrlTypeEnum;
import com.mdd.common.enums.MqttTopicEnum;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;

/**
 * 项目启动 连接mqtt服务器 订阅指定主题
 * 此处为项目启动就订阅主题,这种情况是主题已经明确的情况,
 * 若项目启动时主题不明确,需要在代码里动态订阅,
可直接在类中注入MQTTConnect server,之后执行server.sub("xxxx")
 */
@Slf4j
@Component
public class MQTTListener implements ApplicationListener<ContextRefreshedEvent> {

    private final MQTTConnect server;

    @Autowired
    public MQTTListener(MQTTConnect server) {
        this.server = server;
    }

    @Override
    public void onApplicationEvent(@NotNull ContextRefreshedEvent contextRefreshedEvent) {
        try {
            server.setMqttClient(new Callback());

            //订阅主题
            server.sub(“test/test/test”);

        } catch (MqttException e) {
            log.error(e.getMessage(), e);
        }
    }
}
 
 

发布及订阅主题

package com.dpkj.mqtt.mqtt;

import com.dpkj.mqtt.config.async.AsyncTaskService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

import java.io.UnsupportedEncodingException;

import static org.eclipse.paho.client.mqttv3.MqttConnectOptions.MQTT_VERSION_3_1_1;

/**
 * WQ
 * 2023/2/14 17:10
 * MQTT工具类操作
 */
@Slf4j
@Component
public class MQTTConnect {

    @Value("${mqtt.host}")
    private String mqttHost;
    @Value("${mqtt.port}")
    private String mqttPort;
    @Value("${mqtt.username}")
    private String mqttUsername;
    @Value("${mqtt.password}")
    private String mqttPassword;

    //唯一标识
    private final String clientId = "MqttClient" + (int) (Math.random() * 100000000);

    private MqttClient mqttClient;

    @Resource
    private AsyncTaskService asyncTaskService;

    /**
     * 客户端connect连接mqtt服务器
     **/
    public void setMqttClient(MqttCallback mqttCallback) {
        MqttConnectOptions options = mqttConnectOptions();
        if (mqttCallback == null) {
            mqttClient.setCallback(new Callback());
        } else {
            mqttClient.setCallback(mqttCallback);
        }
        try {
            mqttClient.connect(options);
            log.error("MQTT服务连接成功。tcp://{}:{},clientId:{},username:{},password:{}", mqttHost, mqttPort, clientId, mqttUsername, mqttPassword);
        } catch (MqttException e) {
            e.printStackTrace();
            log.error("MQTT服务连接失败,{}。tcp://{}:{},clientId:{},username:{},password:{}", e.getMessage(), mqttHost, mqttPort, clientId, mqttUsername, mqttPassword);
        }
    }

    /**
     * MQTT连接参数设置
     */
    public MqttConnectOptions mqttConnectOptions() {
        try {
            mqttClient = new MqttClient("tcp://" + mqttHost + ":" + mqttPort, clientId, new MemoryPersistence());
        } catch (MqttException e) {
            log.error("建立mqtt客户端出错。{}", e.getMessage());
            e.printStackTrace();
        }
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(mqttUsername);
        options.setPassword(mqttPassword.toCharArray());
        options.setConnectionTimeout(0);
        options.setAutomaticReconnect(true);
        options.setKeepAliveInterval(90);
        return options;
    }

    /**
     * 关闭MQTT连接
     */
    public void close() throws MqttException {
        mqttClient.disconnect();
        mqttClient.close();
    }

    /**
     * 向某个主题发布消息 默认qos:1
     *
     * @param sn:道品云泊控制卡设备的唯一标识
     * @param msg:发布的消息
     * @param msgId:发布的消息唯一标识
     */
    public void pub(String sn, String msg, String msgId) {
        MqttMessage mqttMessage = new MqttMessage();
        try {
            //GBK格式下发
            mqttMessage.setPayload(msg.getBytes("GBK"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        String topic = "/test/" + sn + "/demo";
        log.error("向主题[{}],发送消息 {}", topic, msg);
        MqttTopic mqttTopic = mqttClient.getTopic(topic);
        MqttDeliveryToken token;
        try {
            token = mqttTopic.publish(mqttMessage);
            token.waitForCompletion();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 订阅某一个主题 ,此方法默认的的Qos等级为:1
     *
     * @param topic 主题
     */
    public void sub(String topic) throws MqttException {
        log.error("主题订阅:{}", topic);
        mqttClient.subscribe(topic);
    }

//    /**
//     * 订阅某一个主题,可携带Qos
//     *
//     * @param topic 所要订阅的主题
//     * @param qos   消息质量:0、1、2
//     */
//    public void sub(String topic, int qos) throws MqttException {
//        mqttClient.subscribe(topic, qos);
//    }
}

yml

mqtt:
  host: 127.0.0.1
  port: 1883
  username: xxx
  password: 'xxx'

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/991297.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++动态内存管理+模板

&#x1f493;博主个人主页:不是笨小孩&#x1f440; ⏩专栏分类:数据结构与算法&#x1f440; C&#x1f440; 刷题专栏&#x1f440; C语言&#x1f440; &#x1f69a;代码仓库:笨小孩的代码库&#x1f440; ⏩社区&#xff1a;不是笨小孩&#x1f440; &#x1f339;欢迎大…

Emgu调用摄像头

1&#xff0c;安装EMgu 2,调用摄像头 public FaceLoad(){InitializeComponent();try{capture new Capture();capture.Start();//摄像头开始工作capture.ImageGrabbed frameProcess;//实时获取图像}catch (NullReferenceException excpt){//MessageBox.Show(excpt.Message);}}…

数据结构算法刷题:背包问题

整数和是p&#xff0c;负数和是s-p&#xff0c;那么target p - (s-p)&#xff0c;求出p (st)//2 class Solution: def findTargetSumWays(self, nums: List[int], target: int) -> int: target sum(nums) if target < 0 or target % 2: #target 一定是偶数而且是大于…

界面控件DevExpress WinForms工具栏菜单组件,模拟流行办公软件!

DevExpress WinForms的工具栏和菜单组件灵感来自于Microsoft Office&#xff0c;并针对WinForms开发人员进行了优化&#xff0c;可以帮助开发者快速模拟当下流行的办公软件应用程序。 DevExpress WinForms有180组件和UI库&#xff0c;能为Windows Forms平台创建具有影响力的业…

《向量数据库指南》——向量数据库Milvus Cloud 2.3的可运维性:从理论到实践

一、引言 在数据科学的大家庭中,向量数据库扮演着重要角色。它们通过独特的向量运算机制,为复杂的机器学习任务提供了高效的数据处理能力。然而,如何让这些数据库在生产环境中稳定运行,成为了运维团队的重要挑战。本文将深入探讨向量数据库的可运维性,并分享一些有趣的案…

基于STM32设计的格力空调遥控器

一、格力空调协议介绍 格力空调的红外控制协议被称为格力红外通讯协议或者格力红外遥控协议。这个协议定义了一系列红外信号&#xff0c;可以用来控制格力空调的各种操作&#xff0c;例如开关、温度控制、模式选择、风速控制等等。 格力空调的红外控制协议是一种自定义协议&a…

进程基本概念

一、什么是进程&#xff08;任务&#xff09; 进程&#xff1a;一个被加载到内存中的程序/正在运行中的程序。 开机时&#xff0c;先将操作系统加载到内存中。 ps -ajx 查询运行中的进程 二、操作系统如何管理进程&#xff1f; 前提&#xff1a;如何利用属性认识事…

使用 crontab 定时任务使用 curl 发送请求

crontab 简单用法 crontab 一般是 linux 系统自带的 输入以下命令可以添加定时任务&#xff0c;里面有 crontab 的说明及示例 crontab -e示例格式如下 # 前面五个分别代表分、时、天、月、周&#xff0c;后面就是命令 * * * * * command例如 * * * * * command就是每分钟执行…

图的应用(最小生成树,最短路径,有向无环图)

目录 一.最小生成树 1.生成树 2.无向图的生成树 3.最小生成树算法 二.最短路径 1.单源最短路径---Dijkstra&#xff08;迪杰斯特拉&#xff09;算法 2.所有顶点间的最短路径---Floyd&#xff08;弗洛伊德&#xff09;算法 三.有向无环图的应用 1.AOV网&#xff08;拓扑…

国内CRM软件系统厂商排名

我们知道CRM软件成为了企业管理中不可或缺的一部分&#xff0c;目前国内CRM厂商排名是怎样的呢&#xff1f;经过评估名列前茅的分别是Zoho CRM、Salesforce CRM、Microsoft Dynamics 、SAP CRM、HubSpot CRM。 1.Zoho Zoho CRM凭借先进的技术和创新的解决方案&#xff0c;帮…

2023年母婴亲子产业研究报告

第一章 行业发展概况 母婴亲子领域是一个综合性的产业&#xff0c;主要聚焦于为孕产妇、婴幼儿及家庭提供全方位的服务和产品。该产业致力于为孕产妇和家庭在孕育、育儿及亲子时期提供必要的支持和便捷。其核心业务涉及婴幼儿商品、孕产妇健康、亲子教育、家庭旅行体验以及亲子…

递归算法学习——图像渲染,岛屿的数量,最大的岛屿

目录 ​编辑 一&#xff0c;图像渲染 1.题意 2.解释 3.题目接口 4.解题思路及代码 二&#xff0c;岛屿的数量 1.题意 2.解释 3.题目接口 4.解题思路及代码 三&#xff0c;最大的岛屿 1.题意 2.解释 3.题目接口 4.解题代码即思路 一&#xff0c;图像渲染 1.题意…

MySQL——笔试测试题

解析&#xff1a; 要查询各科目的最大分数&#xff0c;可以使用如下的SQL语句&#xff1a; SELECT coursename, MAX(score) FROM t_stuscore GROUP BY coursename; 这条SQL语句使用了MAX()聚合函数来获取每个科目的最大分数&#xff0c;并使用GROUP BY子句按照科目进行分组…

核货宝:收银系统后台一般是怎样的,有哪些功能

收银系统后台是一个重要的管理工具&#xff0c;它为企业提供了对收银机的全面控制和配置。收银系统后台是一个用于管理和配置收银机的软件界面。它通常由以下几个主要部分组成&#xff1a; 1. 登录和权限管理 收银系统后台需要一个安全的登录系统&#xff0c;以确保只有授权人…

Mojo 语言官网

Mojo面向 AI 开发者的新型编程语言&#xff0c;无缝支持CPU、GPU&#xff0c;兼容Python&#xff0c;跟Python类似的语法&#xff0c;但是比Python快68000倍。目前Mojo仅支持Ubuntu&#xff0c;暂不支持Windows和Mac&#xff0c;可以在Mojo Playground先体验一下。 Mojo 语言…

ostringstream 多线程下性能问题探究

文章目录 背景火焰图ostringstream 的结构引用 背景 在实习过程中&#xff0c;有一个业务场景需要用到 ostringstream&#xff0c;但经过导师提醒&#xff0c;ostringstream 在多线程关系下&#xff0c;竞态消耗较大&#xff0c;但对于当前业务场景&#xff0c;每次操作&#…

【美团3.18校招真题2】

大厂笔试真题网址&#xff1a;https://codefun2000.com/ 塔子哥刷题网站博客&#xff1a;https://blog.codefun2000.com/ 最多修改两个字符&#xff0c;生成字典序最小的回文串 提交网址&#xff1a;https://codefun2000.com/p/P1089 由于字符串经过修改一定为回文串&#x…

[Linux]动静态库

[Linux]动静态库 文章目录 [Linux]动静态库见一见库存在库的原因编写库模拟编写静态库模拟使用静态库模拟编写动态库模拟使用静态库 库的加载原理静态库的加载原理动态库的加载原理 库在可执行程序中的编址策略静态库在可执行程序中的编址策略动态库在可执行程序中的编址策略 见…

本地启动 Falcon-180B

本地启动 Falcon-180B 通过 Gradio 的 load 函数&#xff0c;我们可以在本地加载 HuggingFace 的 Spaces 上面的 demo。 那就运行 Falcon-180B 来试试吧。 创建 falcon_demo.py 文件&#xff0c; cat << EOF > falcon_demo.py import gradio as grdemo gr.load(&q…

小节3:数据类型

Python的数据类型包括&#xff1a;字符串&#xff08;str&#xff09;、整数&#xff08;int&#xff09;、浮点数&#xff08;float&#xff09;、布尔类型&#xff08;bool&#xff09;、空值类型&#xff08;NoneType&#xff09;、列表&#xff08;list&#xff09;、字典&…