使用HiveMQ实现Android MQTT

news2025/1/8 5:45:45

MQTT官网:https://mqtt.org/

百度Android MQTT,或者B站上搜索,发现大多使用https://github.com/eclipse/paho.mqtt.android,这是Eclipse的一个Android MQTT客户端实现库,但是我发现这个库在运行到高版本的手机上时报错了,这个库也是N年没有更新的了,而且这个库不支持MQTT5.0的,所以我找了新的库。

在查看MQTT官网的时候,发现关于MQTT的很多介绍是链接到了HiveMQ上面的,不知道它们是什么关系,我发现HiveMQ即有提供MQTT的服务器端,也有提供客户端,而且官方都给他跳转了,那我就用它的库来实现吧!使用了之后才发现,这个库是真的好用啊,封装的非常好,代码写起来特别简洁,响应式编程,支持异步,可以使用Java自带的,也可以使用RxJava或Reactor,HiveMQ的断线自动重连做的也比较好。库地址:https://github.com/hivemq/hivemq-mqtt-client,这个库好像没有限定Android,所以在普通的Java项目中也是可以使用的。android示例代码如下:

  1. 添加依赖

    implementation("com.hivemq:hivemq-mqtt-client:1.3.0")
    
  2. 权限声明

    <uses-permission android:name="android.permission.INTERNET"/>
    

    可以看到,相比paho.mqtt.android,HiveMQ的只需要声明一个互联网权限。

  3. 界面UI
    在这里插入图片描述

  4. MQTT实现代码:Mqtt.kt

    object Mqtt {
    
        private const val clientId = "9527"
        private const val host = "192.168.1.188"
        private const val port = 1883
        private const val topic = "message/topic"
    
        private var client: Mqtt3AsyncClient? = null
    
        private fun createMqttClient(): Mqtt3AsyncClient =
            Mqtt3Client.builder()
                .identifier(clientId)
                .serverHost(host)
                .serverPort(port)
    
                // 认证设置
                /*.simpleAuth()
                .username("admin")
                .password("password".toByteArray())
                .applySimpleAuth()*/
    
                // 重连设置
                .automaticReconnect()
                .initialDelay(1, TimeUnit.SECONDS) // 断线1秒后开始自动重连,如果重连还失败,则下次会等时间会按指数增长,比如2秒、4秒、8秒,双倍增长等待时间,但是不会超过最大值,由maxDelay函数来指定最大值。
                .maxDelay(32, TimeUnit.SECONDS)    // 断线后最多32秒就会自动重连,第5次连会来到32的位置,前面4次已用掉31秒的等待时间了。
                .applyAutomaticReconnect()
    
                // 连接状态监听器设置
                .addConnectedListener {
                    println("MQTT${it.clientConfig.serverHost}:${it.clientConfig.serverPort}连接成功")
                }
                .addDisconnectedListener {
                    // 客户端断开连接,或者连接失败都会回调这里
                    println("MQTT${it.clientConfig.serverHost}:${it.clientConfig.serverPort}连接断开:${it.cause.message},连接状态:${it.clientConfig.state.name}")
                    /*when (it.clientConfig.state) {
                        MqttClientState.CONNECTING -> println("手动连接失败")             // 即主动调用connect时没连接成功
                        MqttClientState.CONNECTING_RECONNECT -> println("自动重连失败")   // 即连接成功后异常断开自动重连时连接失败
                        MqttClientState.CONNECTED -> println("连接正常断开或异常断开")
                        else -> println("连接断开:${it.clientConfig.state.name}")
                    }*/
                }
                .buildAsync()
    
                // 消息监听器设置
                .also {
                    // 接收订阅的消息。publishes必须在subscribe之前调用以确保消息不会丢失,可以在connect之前调用它以便接收前一个会话的消息。
                    it.publishes(MqttGlobalPublishFilter.ALL) { publish: Mqtt3Publish ->
                        println("收到${publish.topic}的消息:${String(publish.payloadAsBytes)}")
                    }
                }
    
        fun connect() {
            disconnect()
            // 断开连接后的client没法再复用,复用的client重新再连接时会收不到离线时的消息。所以每次连接时创建一个新的client。
            val client = createMqttClient().also { this.client = it }
            client.connectWith()
                .cleanSession(false) // false为持久会话,这样离线再上线时还能收到离线时别人推送的消息。
                .keepAlive(60)  // 心跳时间间隔,单位为秒
                .send()
                .whenComplete { ack, e ->
                    if (ack != null) {
                        // 连接成功之后订阅主题
                        println("手动连接成功:$ack")
                        client.subscribeWith().topicFilter(topic).qos(MqttQos.EXACTLY_ONCE).send()
                    } else if (e != null) {
                        println("手动连接失败: ${e.message}")
                    }
                }
        }
    
        fun disconnect() {
            client?.let {
                it.disconnect().thenAccept { println("手动断开了连接") }
                client = null
            }
        }
    
        fun subscribe(topic: String = this.topic, qos: MqttQos = MqttQos.EXACTLY_ONCE) {
            val client = this.client ?: return
            client.subscribeWith().topicFilter(topic).qos(MqttQos.EXACTLY_ONCE).send()
        }
    
        fun unsubscribe(topic: String = this.topic) {
            val client = this.client ?: return
            client.unsubscribeWith().topicFilter(topic).send()
        }
    
        fun publish(message: String, topic: String = this.topic,  qos: MqttQos = MqttQos.EXACTLY_ONCE, retain: Boolean = false) {
            val client = this.client ?: return
            client
                .publishWith()
                .topic(topic)
                .qos(qos)
                .retain(retain)
                .payload(message.toByteArray())
                .send()
        }
    
        fun clearRetainMessage(topic: String = this.topic) {
            val client = this.client ?: return
            // 发送一条空的retain消息即可清除retain消息
            client.publishWith().topic(topic).retain(true).send()
        }
    
    }
    
  5. Activity调用Mqtt相关功能:MainActivity.kt

    class MainActivity : AppCompatActivity() {
    
        private val testTopic = "topic/hello"
    
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
        }
    
        fun connect(view: View) = Mqtt.connect()
        fun disconnect(view: View) = Mqtt.disconnect()
        fun subscribe(view: View) = Mqtt.subscribe(testTopic)
        fun unsubscribe(view: View) = Mqtt.unsubscribe(testTopic)
        fun publish(view: View) = Mqtt.publish("How are you?", testTopic)
        fun publishRetain(view: View) = Mqtt.publish("I'm a retain message!", testTopic, retain = true)
        fun clearRetain(view: View) = Mqtt.clearRetainMessage(testTopic)
    
    }
    

    示例完整代码:https://gitee.com/daizhufei/HelloMQTT

    这里MQTT的服务器端我使用的是ActivityMQ,它目前的ActiveMQ Classic最新版本是6.0.1,只支持MQTT3.13.1.1ActiveMQ Artemis的最新版本是2.32.0,支持支持MQTT3.1MQ3.1.1MQTT5.0,官方说明如下:

    • https://activemq.apache.org/components/classic/documentation/mqtt
    • https://activemq.apache.org/components/artemis/documentation/latest/mqtt.html

    随着时间的推移,它支持的版本可能会发生变化。这里截图如下:
    在这里插入图片描述
    在这里插入图片描述

MQTT其他知识总结:

  • 关于MQTT的基础知识,可查看:https://www.hivemq.com/blog/mqtt-essentials-part-10-alive-client-take-over/
  • clientId , 连接的时候用到这个参数,(也叫identifier)应该唯一,可以使用账号做为clientId,比如我们公司的需求是先使用账号密码登录一个Web接口,然后再连接MQTT,但是这个账号是可以在多台设备上同时登录的,所以使用直接使用账号的话会出现重复,可以为每台设备生成一个唯一标识然后持久化存储,然后clientId就可以使用账号和唯一标识组合一起使用。比如:username + uuid,而uuid是持久化保存的。
  • cleanSession , 连接的时候用到这个参数,持久会话设置,如果设置为false,则是持久的,true为非持久,持久的意思是当设备离线后,如果有消息推过来,上线时还能收到。如果是非持久的则收不到离线时的消息。
  • keepAlive,连接的时候用到这个参数,用于设置发送心跳的时间间隔,在客户端和服务器没有交流时,在指定的keepAlive时间到达后会发送心跳给服务器,以证明客户端还活着(也就是说确保连接是正常的)。MQTT是使用TCP的,虽然理论上TCP/IP会在套接字中断时通知您,但在实践中,特别是在移动和卫星链路等情况下,它们经常在空中“伪造”TCP并在每一端放回报头,TCP会话很可能会“黑洞”,即它看起来仍然打开,但只是将您写入的任何内容倾倒到地板上,内容来自:https://www.hivemq.com/blog/mqtt-essentials-part-10-alive-client-take-over/
  • qos 订阅主题和推送消息的时候用到这个参数,有0、1、2,2是最好的质量,现在的手机网络已经比以前好太多了,所以我就选最好的质量了,质量越好需要的流量越多,但是现在的流量已经不像从前那样贵了,所以无所谓。
  • retain 推送消息的时候用到这个参数,如果设置为true,则推送的这条消息会在客户端每次连接时都接收到,比如连接了收到了,然后断开连接,然后再连接上,还是能收到这条消息。如果需要清除这样的消息,则往这个主题再发送一条空的retain为true的消息即可,这样客户端在连接成功时就不会再收到这条消息了。
  • lastWill 简称遗嘱,遗嘱是一条普通的消息,设置遗嘱后,当客户端异常断开时,服务器会给遗嘱指定的主题推送这条消息。一开始我在想,客户端都断开了,怎么推送消息的啊,这是在客户端上线的时候把遗嘱先传给服务器了,所以客户端断线,则服务器来推送这条消息。如果是正常的断开连接,则服务器不会推送遗嘱消息。
  • MQTT测试客户端:MQTTX,目前感觉这个测试的客户端是非常好用的,而且这个网站上也有很多关于MQTT的教程。需要注意的是,新建连接是默认选的MQTT5.0协议,需要选择服务器支持的协议。
  • MQTT入门:https://www.hivemq.com/mqtt/

关于ActiveMQ服务器:

  • 服务器默认是启用了MQTT的,也就是说安装好ActiveMQ之后不需要配置就可以使用客户端来进行MQTT的连接了
  • 默认没有启用MQTT认证,也就是MQTT客户端连接时不需要提供用户名和密码就能连接
  • 默认的MQTT端口是1883
  • 目前的最新版本 ActiveMQ Classic 6.0.1支持 MQTT v3.1v3.1.1
  • 目前的最新版本 ActiveMQ Artemis 2.32.0支持 MQTT v3.1v3.1.1v5.0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1467155.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unet 高阶分割网络实战、多类别分割、迁移学习(deeplab、resnet101等等)

1、前言 Unet 图像分割之前介绍了不少&#xff0c;具体可以参考 图像分割专栏 为了实现多类别的自适应分割&#xff0c;前段时间利用numpy的unique函数实现了一个项目。通过numpy函数将mask的灰度值提取出来&#xff0c;保存在txt文本里&#xff0c;这样txt里面就会有类似0 1…

基于微信小程序的垃圾分类系统,附源码

博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…

Java SpringBoot测试OceanBase

对上篇mysql导入到OceanBase中的数据库进行代码测试&#xff0c;写了个demo包含测试方法&#xff0c;在原mysql库中成功执行&#xff0c;迁移到OceanBase时看是否能不修改业务代码而成功执行测试方法&#xff1a; 代码基于SpringBoot MyBastis测试增删改查、批量新增、多表联…

在项目中应用设计模式的实践指南

目录 ✨✨ 祝屏幕前的您天天开心&#xff0c;每天都有好运相伴。我们一起加油&#xff01;✨✨ &#x1f388;&#x1f388;作者主页&#xff1a; 喔的嘛呀&#x1f388;&#x1f388; 引言 一. 单例模式&#xff08;Singleton Pattern&#xff09; 1、实现单例模式的方式 1…

List集合之UML、特点、遍历方式、迭代器原理、泛型、装拆箱及ArrayList、LinkedList和Vector的区别

目录 ​编辑 一、什么是UML 二、集合框架 三、List集合 1.特点 2.遍历方式 3.删除 4.优化 四、迭代器原理 五、泛型 六、装拆箱 七、ArrayList、LinkedList和Vector的区别 ArrayList和Vector的区别 LinkedList和Vector的区别 一、什么是UML UML&#xff08;Unif…

20个改善编码的Python异常处理技巧,让你的代码更高效

异常处理是写好代码的一个重要的方面&#xff0c;虽然许多开发人员都熟悉基本的try-except块&#xff0c;但是有很多更深入的知识可以使异常处理更高效、更可读和更python化。所以本文将介绍关于Python异常的20个可以显著改善编码的Python异常处理技巧&#xff0c;这些技巧可以…

C/C++内存管理学习【new】

文章目录 一、C/C内存分布二、C语言中动态内存管理方式&#xff1a;malloc/calloc/realloc/free三、C内存管理方式3.1 new/delete操作内置类型3.2 new和delete操作自定义类型四、operator new与operator delete函数五、new和delete的实现原理5.1 内置类型 六、定位new表达式(pl…

天锐绿盾 | 文件数据\资料防泄漏软件 \ 自动智能透明加密保护

怎么防止公司办公终端文件数据资料外泄? 防止公司办公终端文件数据资料外泄是非常重要的&#xff0c;以下是一些有效的措施&#xff1a; 限制访问权限&#xff1a;根据员工的职责和需求&#xff0c;设定文件和数据资料的访问权限。确保只有授权人员才能访问敏感信息。 加密存…

2024图像处理分析与信息工程国际学术会议(IACIPIE2024)

2024图像处理分析与信息工程国际学术会议(IACIPIE2024) 会议简介 2024图像处理分析与信息工程国际学术会议&#xff08;IACIPIE2024&#xff09;将在中国长沙举行。 IACIPIE2024是一个年度会议&#xff0c;探讨图像处理分析和信息工程相关领域的发展和影响&#xff0c;旨在介…

数字孪生低代码平台盘点(一):厂家介绍

特别说明&#xff1a;本文根据网上资料搜集整理而成&#xff0c;排名不分先后&#xff0c;配图是为了更好地阅读体验&#xff0c;并非表明该图为该平台所生产。如有错误之处&#xff0c;请在评论区提出。 一、优锘ChartBuilder 优锘ChartBuilder是一款基于Web的数据可视化工具…

C++的vector容器->基本概念、构造函数、赋值操作、容量和大小、插入和删除、数据存取、互换容器、预留空间

#include<iostream> using namespace std; #include <vector> //vector容器构造 void printVector(vector<int>& v) { for (vector<int>::iterator it v.begin(); it ! v.end(); it) { cout << *it << " "…

挑战杯 基于卷积神经网络的乳腺癌分类 深度学习 医学图像

文章目录 1 前言2 前言3 数据集3.1 良性样本3.2 病变样本 4 开发环境5 代码实现5.1 实现流程5.2 部分代码实现5.2.1 导入库5.2.2 图像加载5.2.3 标记5.2.4 分组5.2.5 构建模型训练 6 分析指标6.1 精度&#xff0c;召回率和F1度量6.2 混淆矩阵 7 结果和结论8 最后 1 前言 &…

RGB颜色如何转换为十六进制?16进制颜色代码怎么转为RGB颜色值?

我们在调整网站的色彩搭配&#xff0c;或修改图片的时候&#xff0c;偶尔需要用到RGB颜色值&#xff0c;或者16进制颜色代码。 如果我只知道16进制颜色代码想要知道RGB颜色值&#xff0c;那么16进制颜色代码怎么转为RGB颜色值&#xff1f;又或者我知道RGB颜色值想要知道16进制…

ubuntu使用LLVM官方发布的tar.xz来安装Clang编译器

ubuntu系统上的软件相比CentOS更新还是比较快的&#xff0c;但是还是难免有一些软件更新得不那么快&#xff0c;比如LLVM Clang编译器&#xff0c;目前ubuntu 22.04版本最高还只能安装LLVM 15&#xff0c;而LLVM 18 rc版本都出来了。参见https://github.com/llvm/llvm-project/…

Jenkins使用遇到的一些问题

一&#xff1a;插件依赖报错 比如遇到一堆插件报错&#xff0c;不是提示版本对不上&#xff0c;就是启用不了 这样直接把Jenkins升级就行了&#xff0c;比如我这个是命令行启动的&#xff0c;直接把他替换就好了 如果是遇到插件依赖报错&#xff0c;比如A插件异常 则点击这个插…

训练Sora模型,你可能需要这些开源代码,模型,数据集及算力评估

在之前的文章&#xff0c;我们总结了Sora模型上用到的一些核心技术和论文 复刻大模型 Sora 有多难&#xff1f;一张图带你读懂 Sora 的技术路径一文看懂大模型 Sora 技术推演 今天这篇文章来自我们社区讨论交流&#xff0c;我这边整理和总结现有的一些开源代码、模型、数据集…

网页数据的存储--存储为文本文件(TXT、JSON、CSV)

用解析器解析出数据后&#xff0c;接下来就是存储数据了。数据的存储有多种多样&#xff0c;其中最简单的一种是将数据直接保存为文本文件&#xff0c;如TXT、JSON、CSV等。这里就介绍将数据直接保存为文本文件。 目录 一、Python存储数据的方法 1、 文件读取 2、 文件写入…

milvus Delete api写s3的流程

Delete api写s3的流程 milvus版本:v2.3.2 整体架构: Delete 的数据流向 delete相关配置 dataNode:segment:insertBufSize: 16777216 # Max buffer size to flush for a single segment.deleteBufBytes: 67108864 # Max buffer size to flush del for a single channelsyncPe…

Neo4j导入数据之JAVA JDBC

目录结构 前言设置neo4j外部访问代码整理maven 依赖java 代码 参考链接 前言 公司需要获取neo4j数据库内容进行数据筛查&#xff0c;neo4j数据库咱也是头一次基础&#xff0c;辛辛苦苦安装好整理了安装neo4j的步骤&#xff0c;如今又遇到数据不知道怎么创建&#xff0c;关关难…

【wails】(4):使用wails做桌面应用开发,整合chatgpt-web项目做前端,进行本地开发,web端也可以连调,使用websocket实现

1&#xff0c;视频地址 【wails】&#xff08;4&#xff09;&#xff1a;使用wails做桌面应用开发&#xff0c;整合chatgpt-web项目做前端&#xff0c;进行本地开发&#xff0c;web端也可以连调&#xff0c;使用websocket实现 2&#xff0c;演示效果 启动先是报500 错误&#…