mqtt的使用与二次封装

news2025/1/12 22:49:32

前提:先安装Mosquitto并启动服务,可使用mqttx进行接收发送的测试。
Mosquitto以配置启动命令

mosquitto -c mosquitto.conf -v

原文链接:mqtt的使用
本文为测试使用固无账号密码,可在原文查看

封装后实现效果,加入一个新的topic时新建一个类进行自动订阅,该类写所订阅topic方法的处理。发送消息看原文使用方法。

所用依赖

<!--mqtt包-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>5.5.14</version>
        </dependency>
        <!--gson包-->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.9.0</version>
        </dependency>

配置文件

spring:
  mqtt:
    url: tcp://127.0.0.1:1883
    clientId: mqttx_867916f3
    completionTimeout: 2000

封装后代码,链接mqtt工具类

package Ceshi.configure.mqtt;

import javax.annotation.PostConstruct;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqttConsumerConfig {

    @Value("${spring.mqtt.url}")
    private String hostUrl;

    @Value("${spring.mqtt.clientId}")
    private String clientId;

    /**
     * 客户端对象
     */
    private MqttClient client;

    /**
     * 在bean初始化后连接到服务器
     */
    @PostConstruct
    public void init(){
        connect();
    }

    /**
     * 客户端连接服务端
     */
    public void connect(){
        try {
            //创建MQTT客户端对象
            client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
            //连接设置
            MqttConnectOptions options = new MqttConnectOptions();
            //是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
            //设置为true表示每次连接到服务端都是以新的身份
            options.setCleanSession(true);
            //设置超时时间,单位为秒
            options.setConnectionTimeout(100);
            //设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
            options.setKeepAliveInterval(20);
            //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
            options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);
            //设置回调
            client.setCallback(new MqttConsumerCallBack());
            client.connect(options);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 断开连接
     */
    public void disConnect(){
        try {
            client.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }


    /**
     * 订阅主题
     */
    public void subscribe(String topic,int qos){
        try {
            client.subscribe(topic,qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
    * @Description: 消息发布
    */
    public void publish(int qos,boolean retained,String topic,String message){
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained);
        mqttMessage.setPayload(message.getBytes());
        //主题的目的地,用于发布/订阅信息
        MqttTopic mqttTopic = client.getTopic(topic);
        //提供一种机制来跟踪消息的传递进度
        //用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
        MqttDeliveryToken token;
        try {
            //将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
            //一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
            token = mqttTopic.publish(mqttMessage);
            token.waitForCompletion();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}


监听消息工具类

package Ceshi.configure.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;

import java.util.HashMap;
import java.util.Map;

public class MqttConsumerCallBack implements MqttCallback{

    @Value("${spring.mqtt.clientId}")
    private String clientId;
    private static Map<String,JIantingChuli> maps=new HashMap<>();
    public static void setJIantingChulis(String k,JIantingChuli y){
        maps.put(k,y);
    }
    /**
     * 客户端断开连接的回调
     */
    @Override
    public void connectionLost(Throwable throwable) {
        System.out.println(clientId+"与服务器断开连接,可重连");
    }

    /**
     * 消息到达的回调
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println(String.format("接收消息主题 : %s",topic));
        System.out.println(String.format("接收消息Qos : %d",message.getQos()));
        System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload())));
        System.out.println(String.format("接收消息retained : %b",message.isRetained()));
        maps.get(topic).xiaoxifenhua(new String(message.getPayload()));//调用方法进行消费
    }

    /**
     * 消息发布成功的回调
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        IMqttAsyncClient client = iMqttDeliveryToken.getClient();
        System.out.println(client.getClientId()+"发布消息成功!");
    }

}

统一处理消息接口

public  interface JIantingChuli {
    //根据fname找到要执行的方法,后者为传参
    void xiaoxifenhua(String message);
}

接收报文格式类

import lombok.AllArgsConstructor;
import lombok.Data;

import java.io.Serializable;

@Data
@AllArgsConstructor
public class MqResult implements Serializable{

    private String key;//方法名
    private Object data;//方法所需参数
}

一个测试订阅类(重点)(自己根据需求几个topic复制几个,内部topic与qos看情况修改,xiaoxifenhua内有几个方法就写几个分支,k为方法名)

import Ceshi.configure.mqtt.JIantingChuli;
import Ceshi.configure.mqtt.MqResult;
import Ceshi.configure.mqtt.MqttConsumerCallBack;
import Ceshi.configure.mqtt.MqttConsumerConfig;
import Ceshi.param.AppCodeApiParam;
import com.google.gson.Gson;
import lombok.Data;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;


@Component
public class Ceshi1 implements JIantingChuli {
    @Resource
    private MqttConsumerConfig mqttConsumerConfig;
    private Gson gson=new Gson();
    private String topic="ceshi";
    private int qos=2;

    @PostConstruct
    public void chushihua(){
        mqttConsumerConfig.subscribe(topic,qos);//添加到订阅
        MqttConsumerCallBack.setJIantingChulis(topic,this);//监听到消息时找到此类对象
        System.out.println(topic+","+qos+"已注册订阅");
    }
    @Override
    public void xiaoxifenhua(String message) {
        MqResult mqres=gson.fromJson(message,MqResult.class);//json解析为对象
        switch (mqres.getKey()){
            case "ceshi":
                ceshi(gson.fromJson(mqres.getData().toString(), AppCodeApiParam.class));//AppCodeApiParam是随便一个类,内部有个getPassword方法显示参数值
                break;
        }
    }
    //随便写的测试方法t
    public void ceshi(AppCodeApiParam appCodeApiParam){
        System.out.println(appCodeApiParam.getPassword());
    }
}

QoS0,At most once,至多一次;
QoS1,At least once,至少一次;
QoS2,Exactly once,确保只有一次。
因上方测试类topic=“ceshi”,所以我们用MqttX进行测试一下(试了一下发送时qos等于啥都能接收到,只要topic相等)
在这里插入图片描述
参数(按照参数格式来key是要调用的方法名,data是所传参数)

{
  "key": "ceshi",//方法名
  "data": {//要传的参数
    "password":"132465"
  }
}

启动后可看见已注册订阅
在这里插入图片描述

发送消息测试结果,可见触发的方法与传递的参数均无误
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/112281.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LeetCode题解 16(15,22) 三数之和,括号生成

文章目录三数之和(15)代码解答:括号生成(22)代码解答:三数之和(15) 该题是让从1个数组中找到和为0的不重复的三个数,这次我们使用排序和指针的方法来解决 先将该数组从小到大进行排序 Arrays.sort(nums);我们需要遍历一遍该数组,同时我们还要去重的操作(例如[-1,-1,-1,2],这里面…

helm本地debug template渲染小记

前提条件 1&#xff0c; 安装helm 2&#xff0c;要能连接的k8s 3&#xff0c;本地有完成charts文件目录 具体步骤 本文因为是在项目流程中helm渲染出的deployment语法报错或者最终生成的不符合预期&#xff0c;因此本地使用helm命令进行debug验证测试。 我们先看一下基本的c…

怎么把word里面的彩色图转化为灰度图,直接在word里面操作,无需转其他软件,超简单!(位图和矢量图都可以)

怎么把word里面的彩色图转化为灰度图&#xff0c;直接在word里面操作&#xff0c;无需转其他软件&#xff0c;超简单&#xff01;&#xff08;位图和矢量图都可以&#xff09; Microsoft Office Word是微软公司的一个文字处理器应用程序。它最初是由Richard Brodie为了运行DOS…

深入理解自编码器(用变分自编码器生成图像)

文章目录自编码器欠完备自编码器正则自编码器稀疏自编码器去噪自编码器收缩自编码器变分自编码器References内容总结自花书《Deep Learning》以及《Python 深度学习》。 自编码器 自编码器&#xff08;autoencoder&#xff09;是神经网络的一种&#xff0c;经过训练后能尝试将…

机器学习经典算法:决策树(2)

1. 概述 决策树&#xff08;Decision Tree&#xff09;是有监督学习中的一种算法&#xff0c;并且是一种基本的分类与回归的方法。决策树有两种&#xff1a;分类树和回归树。 决策树是用于分类和回归的工具&#xff0c;它将数据特征值拆分为决策节点处的分支&#xff08;例如&a…

六、Kubernetes核心技术Pod详解、实例

1、概述 Pod 是 k8s 系统中可以创建和管理的最小单元&#xff0c;是资源对象模型中由用户创建或部署的最 小资源对象模型&#xff0c;也是在 k8s 上运行容器化应用的资源对象&#xff0c;其他的资源对象都是用来支 撑或者扩展 Pod 对象功能的&#xff0c;比如控制器对象是用来管…

某大型政务网站的优化咨询案例(视频点播VOD+GZIP压缩+静态文件CDN+Redis缓存+全文索引)

2022年圣诞节到来啦&#xff0c;很高兴这次我们又能一起度过~ 这次分享关于一个对某大型政务网站的优化咨询的案例&#xff0c;发生在今年的下半年&#xff0c;已过去一段时间&#xff0c;并取得了良好的成果&#xff01;* 项目背景 某大型政务网站准备上线&#xff0c;需要…

08-Golang中的运算符

[TOC](Golang中的运算符运算符介绍算数运算符基本介绍细节说明关系运算符(比较运算符&#xff09;基本介绍细节说明逻辑运算符基本介绍细节说明赋值运算符基本介绍细节说明运算符优先级运算符介绍 运算符是一种特殊的符号&#xff0c;用来表示数据的运算、赋值和比较 1.算数运…

Vue事件处理的基本使用

前言 事件处理在vue中也是非常重要的一项技术&#xff0c;它类似于js的事件处理&#xff0c;但是也有不同&#xff0c;下面就简单介绍一下在vue中如何进行事件使用以及一些要点 1 事件基本使用 在这里我们使用单击事件为例&#xff0c;简单讲讲在vue中单击事件的编写以及细节…

最近面试遇到一个算法题,简单写一点。

第⼀题&#xff08;必答&#xff09; 请针对有重复数字的数组设计⼀个快排算法&#xff0c;⽐如&#xff1a;[34, 34, 89, 1, 1, 20, 12]&#xff0c;排序后结果为 [89,34,34,20,12,1,1] 第⼆题&#xff08;必答&#xff09; 请利⽤Redis 实现⼀个通⽤分布式锁&#xff0c;并…

技术进步、研发计划启动及政策支持 共促我国合成生物学市场容量加速扩张

合成生物学是对生物体进行有目标的设计、改造乃至重新合成&#xff0c;这一名词最早出现于DNA重组技术发展的上世纪70年代。合成生物学汇聚并融合了生命科学、工程学和信息科学等诸多学科&#xff0c;在天然产物合成、化学工业、生物能源、生物医药等诸多领域有广泛的应用前景。…

【Animejs】——Anime.js照片墙案例实现

目录 一、实现的效果&#xff1a; 二、实现js的具体步骤 1、需要实现的逻辑梳理 2、切换风格的逻辑 三、完整代码&#xff1a; 用js编写使用animejs实现图片复杂的切换效果 一、实现的效果&#xff1a; 点击——> <——箭头&#xff0c;实现不同动画效果的炫酷切换 …

【小5聊】C++ 输入矩阵数字,然后回环方式输出

C 输入矩阵数字&#xff0c;然后回环方式输出 1、题目内容 输入 第一行是两个m和n&#xff0c;分别代表矩阵的行数和列数。 第二行开始输入对应矩阵 输出 第二行回转输出。 相邻的两个整数之间用一个空格分开&#xff0c;行尾无空格 样例输入 5 6 4 8 9 4 5 6 1 2 5 6…

控制算法-PID算法总结-从公式原理到参数整定解析

目录 一、控制系统 1.1控制系统的分类 1.2 性能指标 二、PID算法的起源及特点 三、PID应用 四、PID公式原理 五、PID源码 六、PID整定方法 6.1 经验法 6.2 衰减曲线法 6.3 响应曲线法 参考文献&#xff1a; 一、控制系统 1.1控制系统的分类 分为开环控制、闭环控制和…

Java 8 Stream 从入门到进阶——像SQL一样玩转集合

0.阅读完本文你将会 了解Stream的定义和它的特征了解Stream的基础和高阶用法 1. 前言 在我们日常使用Java的过程中&#xff0c;免不了要和集合打交道。对于集合的各种操作有点类似于SQL——增删改查以及聚合操作&#xff0c;但是其方便性却不如SQL。 所以有没有这样一种方式…

【每日一题Day66】LC1754构造字典序最大的合并字符串 | 贪心 双指针模拟

构造字典序最大的合并字符串【LC1754】 You are given two strings word1 and word2. You want to construct a string merge in the following way: while either word1 or word2 are non-empty, choose one of the following options: If word1 is non-empty, append the fir…

10.2、Django入门--前台管理

文章目录1、URLconf 路由管理展示首页2、视图函数处理业务逻辑展示书籍的详细页3、模板管理实现好看的HTML页面3.1 模板引擎配置3.2 模板语法&#xff1a;变量3.3 模板语法: 常用标签3.4 主页与详情页前端HTML设计常用的HTML编写基础标题标签列表标签图片标签链接标签表格标签表…

耗时二周,万字总结Maven简明教程,与君共勉!

什么是Mavne Maven 是一个项目管理工具&#xff0c;它包含了一个项目对象模型 (POM&#xff1a;Project Object Model)&#xff0c;一组标准集合。由于 Maven 使用标准目录布局和默认构建生命周期&#xff0c;开发团队几乎可以立即自动化项目的构建基础设施。在多个开发团队环…

代码随想录训练营第60天|LeetCode 84.柱状图中最大的矩形

LeetCode 84.柱状图中最大的矩形 双指针 注意&#xff0c;双指针解法可行&#xff0c;但是在力扣上提交会超时。 以heights[i]为中心&#xff0c;用两个指针向两边扩散&#xff0c;直到heights[left]和heights[right]小于heights[i]为止&#xff0c;这样就构成了以left和rig…

第11章_数据库的设计规范(理论了解)

第11章_数据库的设计规范 范式 2.3键和相关属性的概念 范式的定义会使用到主键和候选键&#xff0c;数据库中的键(Key)由一个或者多个属性组成。数据表中常用的几种键和属性的定义: 超键︰能唯─标识元组的属性集叫做超键。候选键︰如果超键不包括多余的属性&#xff0c;那…