MQTT(二)Java整合MQTT

news2025/1/9 2:16:39

Java整合MQTT

上一节知道MQTT是一个通信协议,需要一个代理服务Broker;通信设备作为客户端Client,后台系统服务器也作为客户端Client。

经过了解选用EMQX作为代理服务Broker(支持WEB界面查看)

后台服务使用Spring Integration链接EMQX

1.EMQX

简介,EMQX 是一个开源的分布式物联网 MQTT 消息服务器,它实现了 MQTT 协议的各种功能,并提供了可靠的消息传递、灵活的消息路由、可扩展的集群和高可用性等特性。EMQ X 可以作为物联网应用的消息中间件,用于连接和管理大规模的物联网设备,实现设备之间的实时通信和数据传输。

语言上,EMQ X 是使用 Erlang/OTP 编程语言实现的。Erlang 是一种通用的函数式编程语言,它具有高并发、分布式和容错性的特点,非常适合构建可扩展和可靠的分布式系统。Erlang/OTP 是一个开发框架和平台,提供了一系列工具、库和标准库,用于开发并运行 Erlang 应用程序。

EMQ X 选择使用 Erlang/OTP 的主要原因是 Erlang 在处理并发和分布式通信方面具有优秀的性能和可靠性。Erlang 提供了轻量级进程(而非操作系统线程)的并发模型,每个进程具有独立的状态和消息传递机制,可以高效地处理大量的并发连接和消息处理。此外,Erlang 的可扩展性和容错性使得 EMQ X 能够在分布式环境中实现高可用性和高性能。

Erlang/OTP 还提供了许多用于网络通信、并发控制和错误处理的库和工具,这些功能对于构建物联网 MQTT 服务器非常有用。通过使用 Erlang/OTP,EMQ X 能够提供可靠的消息传递、灵活的消息路由和高可用性等关键功能,以满足物联网应用的需求

主要特点:

  1. MQTT 协议支持:EMQ X 是一个完全兼容 MQTT 协议的消息服务器,支持 MQTT 3.1.1 和 MQTT 5.0 版本。
  2. 分布式架构:EMQ X 可以在多台服务器上进行水平扩展,形成分布式集群,以处理大规模的设备连接和消息传输。
  3. 高可用性:EMQ X 支持主从复制和自动故障切换,以实现高可用性和容错性,保证设备和应用程序的持续可靠性。
  4. 灵活的消息路由:EMQ X 提供了丰富的消息路由功能,可以根据主题、内容、设备属性等灵活地进行消息过滤和转发。
  5. 安全性和权限控制:EMQ X 支持 SSL/TLS 加密和身份验证,可以对连接和消息进行安全保护,同时提供细粒度的权限控制,确保只有授权的设备和用户可以访问特定的主题和功能。
  6. 插件系统:EMQ X 提供了丰富的插件系统,可以通过插件扩展和定制功能,满足不同场景和需求的应用。
  7. 实时监控和统计:EMQ X 提供了实时监控和统计功能,可以监控设备连接数、消息发布和订阅情况等指标,帮助用户了解系统运行状态和性能。
  8. 多协议支持:除了 MQTT,EMQ X 还支持 AMQP、CoAP、WebSocket 等多种协议,以满足不同设备和应用的通信需求。

由于是测试关系,我选择下载WINDOWS版本

默认账户密码为

admin
public

2.Spring Integration

Spring Integration 是一个基于 Spring 框架的集成框架,用于构建企业级应用程序中的消息驱动和事件驱动的系统。它提供了一组丰富的组件和工具,用于实现应用程序之间的异步消息传递、事件驱动的处理和系统集成。

Spring Integration 提供了一种声明式的方式来定义和管理消息流,它基于传统的企业集成模式(Enterprise Integration Patterns,EIP),使开发者能够通过简单的配置来实现消息的路由、转换、过滤、聚合、分割等操作。开发者可以使用 Spring Integration 提供的各种消息通道、消息适配器、消息处理器和消息端点来搭建一个灵活、可扩展和可靠的消息驱动系统。

Spring Integration 支持多种消息协议和传输方式,包括 JMS、AMQP、MQTT、HTTP、TCP、UDP 等,可以与各种消息代理、消息队列和消息中间件进行集成。它还提供了与 Spring 框架的其他模块(如 Spring Boot、Spring MVC、Spring Batch 等)的无缝集成,使得构建复杂的企业应用程序变得更加简单和高效。

Spring Integration 的主要目标是帮助开发者构建可扩展和可维护的集成解决方案,通过解耦和模块化的方式来处理异步消息和事件。它广泛应用于各种领域,包括企业集成、消息驱动的微服务架构、大数据处理等。

POM引入依赖

        <!-- mqtt -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

配置类MqttConfiguration

/**
 *  mqtt配置类
 */
@Component
@Configuration
@Data
@ConfigurationProperties("mqtt")
public class MqttConfiguration {
    @Autowired
    private MqttCustomerClient mqttCustomerClient;
 
    private String host;
    private String clientId;
    private String username;
    private String password;
    private String topic;

    private int timeout;
    private int keepAlive;
 
    @Bean
    public MqttCustomerClient getMqttCustomerClient() {
        mqttCustomerClient.connect(host, clientId, username, password, timeout,keepAlive);
        // 以/#结尾表示订阅所有以test开头的主题
        mqttCustomerClient.subscribe("test/#");
        return mqttCustomerClient;
    }
}

封装客户端Bean对象MqttCustomerClient

package com.wn.mqtt.util;
 
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
/**
 *  mqtt客户端
 */
@Slf4j
@Component
public class MqttCustomerClient {
    @Autowired
    private PushCallback pushCallback;
 
 
    private static MqttClient client;
 
    public  static MqttClient getClient(){
        return  client;
    }
 
    public static void setClient(MqttClient client){
        MqttCustomerClient.client=client;
    }
 
    /**
     * 客户端连接
     *
     * @param host      ip+端口
     * @param clientID  客户端Id
     * @param username  用户名
     * @param password  密码
     * @param timeout   超时时间
     * @param keeplive 保留数
     */
    public void connect(String host,String clientID,String username,String password,int timeout,int keeplive){
        MqttClient client;
 
        try {
            client=new MqttClient(host,clientID,new MemoryPersistence());
            MqttConnectOptions options=new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keeplive);
            MqttCustomerClient.setClient(client);
            try {
                client.setCallback(pushCallback);
                client.connect(options);
            }catch (Exception e){
                e.printStackTrace();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
 
    /**
     * 发布,默认qos为0,非持久化
     * @param topic
     * @param pushMessage
     */
    public void pushlish(String topic,String pushMessage){
        pushlish(0,false,topic,pushMessage);
    }
 
    /**
     * 发布
     *
     * @param qos         连接方式
     * @param retained    是否保留
     * @param topic       主题
     * @param pushMessage 消息体
     */
    public void pushlish(int qos,boolean retained,String topic,String pushMessage){
        MqttMessage message=new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mqttTopic= MqttCustomerClient.getClient().getTopic(topic);
        if(null== mqttTopic){
            log.error("topic not exist");
        }
        MqttDeliveryToken token;
        try {
            token=mqttTopic.publish(message);
            token.waitForCompletion();
        }catch (MqttPersistenceException e){
            e.printStackTrace();
        }catch (MqttException e){
            e.printStackTrace();
        }
    }
 
    /**
     * 订阅某个主题,qos默认为0
     * @param topic
     */
    public void subscribe(String topic){
        log.error("开始订阅主题" + topic);
        subscribe(topic,0);
    }
 
    public void subscribe(String topic,int qos){
        try {
            MqttCustomerClient.getClient().subscribe(topic,qos);
        }catch (MqttException e){
            e.printStackTrace();
        }
    }
}

消息监听类PushCallback

/**
 *  消费监听
 */
@Component
public class PushCallback implements MqttCallback {
 
    private static MqttClient client;
 
    @Override
    public void connectionLost(Throwable throwable) {
        if (client == null || !client.isConnected()) {
            System.out.println("连接断开,正在重连....");
        }
    }
 
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println("接收消息主题 : " + topic);
        System.out.println("接收消息Qos : " + message.getQos());
        System.out.println("接收消息内容 : " + new String(message.getPayload()));
    }
 
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }
}

测试类

@Autowired
private MqttCustomerClient mqttCustomerClient;

@Test
void pushlish() {
    for (int i = 0; i < 10; i++) {
        mqttCustomerClient.pushlish("test/device1", "hello mqtt............" + i);
        try {
            Thread.*sleep*(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


EMQX WEB监控界面接收消息数据

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/683562.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【教学类-36-01】Midjounery生成的四张图片切片成四张小图

作品展示&#xff1a; 把一张正方形图片的四个等大小图切割成四张图片 背景需求 最近在学习ChatGPT的绘画&#xff08;midjounery AI艺术&#xff09; 我想给中班孩子找卡通动物图片&#xff08;黑白线条&#xff09;&#xff0c;打印下来&#xff0c;孩子们练习描边、涂色…

【发布】ChatGLM2-6B:性能大幅提升,8-32k上下文,推理提速42%

自3月14日发布以来&#xff0c; ChatGLM-6B 深受广大开发者喜爱&#xff0c;截至 6 月24日&#xff0c;来自 Huggingface 上的下载量已经超过 300w。 为了更进一步促进大模型开源社区的发展&#xff0c;我们再次升级 ChatGLM-6B&#xff0c;发布 ChatGLM2-6B 。 在主要评估LLM模…

《C++ Primer》--学习7

顺序容器 容器库概览 迭代器 与容器一样&#xff0c;迭代器有着公共的接口&#xff1a;如果一个迭代器提供某个操作&#xff0c;那么所有提供相同操作的迭代器对这个操作的实现方式都是相同的。 迭代器范围 一个迭代器范围是由一对迭代器表示&#xff0c;两个迭代器分别指向…

剪辑必备技巧:轻松去除视频中的多余物体

在视频剪辑过程中&#xff0c;有时我们需要去除视频中的多余物体&#xff0c;以提升视觉效果和观赏体验。今天将为您介绍一些实用的技巧&#xff0c;帮助您轻松去除视频中的多余物体&#xff0c;让您的剪辑作品更加精彩。 一、选择适当的剪辑软件进行剪辑操作 一些专业的剪辑…

基于MATLAB实现KECA、PCA和KPCA的多阶段发酵过程监测方法毕业设计(完整源码+说明文档+PPT+开题报告+数据)

文章目录&#xff0c;完整源码在文末 1. 研究目标2. 主要研究内容3. 技术路线4. 预期成果5. 功能说明6. 参考文献7. 完整仿真源码下载 1. 研究目标 实现基于KECA的青霉素发酵过程故障监测 2. 主要研究内容 1.针对KPCA监测算法在数据降维过程中簇结构信息丢失的问题&#xff…

BootStrap案例

BootStrap是已经写好的css样式 &#xff08;1&#xff09;下载BootStrap 解压后放在 static文件夹–>plugins(存放插件)–>bootstrap-3.4.1 &#xff08;2&#xff09;使用 在页面上引入BootStrap 编写HTML时&#xff0c;按照BootStrap的规定来编写自定制 开发版本(一…

Web服务器群集:部署LNMP平台(yum方式安装)

目录 一、理论 1.yum安装与源码安装的区别 二、实验 1.Nginx安装&#xff08;yum方式&#xff09; 2.MySQL安装&#xff08;yum方式&#xff09; 3.PHP安装&#xff08;yum方式&#xff09; 4.Nginx 配置 三、问题 1.客户端 404 报错 四、总结 一、理论 1.yum安装与…

转行网络安全,报班之后就万事大吉了吗?

最近在网上看到很多人问&#xff0c;“是不是报了培训班就可以高枕无忧&#xff0c;坐等毕业之后拿高工资了&#xff1f;”“是不是学了网络安全&#xff0c;就一定能够实现月入过万了&#xff1f;” 其实&#xff0c;无论你是选择网络安全也好&#xff0c;还是选择其他的Java、…

自我管理型团队:企业组织力提升利器

近年来&#xff0c;软件项目的规模和复杂性在以前所未有的速度增长。因此&#xff0c;快速响应需求变化已经成为互联网行业的常态。在这样的环境下&#xff0c;软件产品的快速开发和迭代对于公司迅速占领市场、抢占商机来说具有至关重要的意义。 所以&#xff0c;越来越多的研…

Mysql高阶语句(二)

Mysql高阶语句&#xff08;二&#xff09; 1、别名2、子查询3、EXISTS4、连接查询5、CREATE VIEW 视图6、UNION 联集7、交集值8、无交集值9、CASE10、算排名12、算累积总计13、算总合百分比14、算累计总合百分比15、空值&#xff08;null&#xff09;和无值&#xff08;’’&am…

大中型灌区信息化监测系统-智慧灌区

系统概述 大中型灌区信息化监测系统主要对对灌区的水情、雨情、土壤墒情、气象等信息进行监测&#xff0c;对重点区域进行视频监控&#xff0c;同时对泵站、闸门进行远程控制&#xff0c;实现了信息的测量、统计、分析、控制、调度等功能。为灌区管理部门科学决策提供了依据&a…

从0到1精通自动化测试,pytest自动化测试框架,skip跳过用例(八)

一、前言 pytest.mark.skip可以标记无法在某些平台上运行的测试功能&#xff0c;或者希望自己失败的测试功能 skip意味着只有在满足某些条件时才希望测试通过&#xff0c;否则pytest应该跳过运行测试。 常见示例是在非Windows平台上跳过仅限Windows的测试&#xff0c;或跳过测…

Nginx反向代理解决客户端ip获取问题

希望大家可以去我个人网站看本篇博客&#x1f600;&#xff0c;纯手撸了一个月&#xff0c;希望大家能去看看&#xff0c;评论一两句/(ㄒoㄒ)/~~&#xff1a; RoCBlog-Nginx反向代理解决客户端ip获取问题 任务 有访客记录的需求&#xff0c;所以需要获取客户端IP以及地理位置…

站台「亚马逊云科技中国峰会」,我成了「开发者大讲堂」演讲嘉宾~

文章目录 ⭐️ 中国峰会可持续发展论坛亮点抢先看&#xff01;⭐️ 创业者之日亮点抢先看⭐️ 开发者专属板块 | 灵感碰撞⭐️ 峰会现场 | 5大板块实现技能跃迁⭐️ 峰会报名全面启动 | 亮点抢先看 没有废话&#xff0c;咱直接奔主题&#xff0c;报名入口在文末&#xff1b;非常…

5.6.2 传输层编址--端口

5.6.2 传输层编址 传输层为应用进程提供了端到端的逻辑通信&#xff0c;两个主机之间的通信实际上是两个主机中的应用进程之间的相互通信&#xff0c;因此一个主机中可能有多个应用进程同时和另一个主机中多个应用进程进行通信&#xff0c;而网络层我们学习的网际协议能够保证…

Java设计模式之结构型-适配器模式(UML类图+案例分析)

目录 一、概念 二、UML类图 1、类适配器 2、对象适配器 三、角色设计 四、代码实现 案例一 案例二 五、总结 一、概念 将一个类的接口转换为另一个接口&#xff0c;使得原本由于接口不兼容的类进行兼容。 适配器模式主要分为类适配器模式和对象适配器模式&#xff0…

信息技术教师答辩题目及答案解析

小学信息技术教师《制作图文并茂的幻灯片》答辩题目 第一题 在空白幻灯片中输入输入文字? 【参考答案】 1.打开演示文稿&#xff0c;新建幻灯片。 2.单击“绘图”工具栏中的“横排文本框”按钮。 3.在幻灯片的任意位置拖动鼠标&#xff0c;出现的虚线框就是文本框。 4.在文本框…

树的基本概念和表示方法,二叉树的基本概念以及堆的概念和插入。

文章目录 树的基本概念树的基本术语树的表示双亲表示法:孩子兄弟表示法:树的典型应用——目录树 二叉树的概念及结构二叉树的概念两种特殊的二叉树二叉树的存储结构 堆的概念堆的插入 树的基本概念 树是数据结构中的一个重要组成部分&#xff0c;它具有一对多的特点&#xff0c…

解密软件工程的秘密武器:UML图

文章目录 一 综述二 用例图2.1 细化用例说明2.2 包含、扩展、泛化关系2.3 题目列举 三 类图和对象图四 顺序图五 状态图5.1 栗子 六 活动图七 练习7.1 用例图综合题7.2 状态图综合题 一 综述 二 用例图 用例图描述一组用例、参与者及它们之间的关系。 用户角度描述系统功能&am…

Qt自定义窗口部件/控件(实现一个十六进制微调框SpinBox)

目录 1、自定义Qt窗口部件/控件2、十六进制微调框(SpinBox)2.1、实现思路2.2、源码 3、使用方法3.1、代码添加自定义窗口部件/控件3.2、Qt设计师添加自定义窗口部件/控件3.3、运行效果 4、缺点 1、自定义Qt窗口部件/控件 在某些情况下,我们发现Qt窗口控件需要更多的自定义定制…