Android 消息队列之MQTT的使用:物联网通讯,HTTP太重了,使用MQTT;断网重连、注册、订阅、发送数据和接受数据,实现双向通讯。

news2025/2/24 23:17:20

目录:

  1. 问题
  2. MQTT是什么以及为什么使用
  3. 如何使用:第一阶段、基础功能
  4. 如何使用:第二阶段、增加断网重连
  5. 如何使用:第三阶段、封装

在这里插入图片描述


一、问题

在开发的时候,我们一般都使用Http和后台进行通讯,比如我们是开发物联网的,设备会有很多数据需要频繁发给后台,使用Http来做这件事情,就感觉很重,比如会遇到如下这些问题:

  1. 开发成本:需要后台创建接口,前台去请求。
  2. 连接数过多:在HTTP协议中,每次请求都需要建立一个新的连接,这可能导致连接数过多,特别是在高并发场景下。对于自动售卖机来说,如果同时有大量的用户进行交互,可能会导致服务器资源紧张,影响性能。
  3. 开销较大:HTTP协议的消息头部相对复杂,包含了大量的元数据,这增加了网络传输的开销。
  4. 实时性较差:HTTP协议是基于请求-响应模式的,需要客户端主动发起请求才能获取数据。这导致在实时性要求较高的场景下,HTTP可能无法满足需求。也就是服务器不能主动发数据给客户端。

基于这样的背景,本来想使用Rabbit MQ,但是不能双向通讯,我们选择切换成MQTT。


二、MQTT是什么以及为什么使用

MQTT(Message Queuing Telemetry Transport)是一个轻量级的发布/订阅消息协议,它构建于TCP/IP协议之上,为小型设备提供了稳定的网络通讯。MQTT协议设计简单,易于实现,非常适合在物联网(IoT)和移动应用中使用。

你会发现传递的数据量是根据你的内容来决定。

能干吗:

1、实时通讯:MQTT支持异步通讯模式,客户端可以通过订阅主题来接收感兴趣的消息,而不需要主动请求。这使得MQTT非常适合实时通讯和事件驱动的应用场景。
2、低开销:MQTT协议的数据包开销非常小,消息头部仅需2字节,非常适合网络带宽受限或设备资源受限的环境。
3、高可靠性:MQTT支持三种不同的服务质量(QoS)级别,可以根据实际需求选择合适的级别来确保消息的可靠传输。同时,MQTT还具有自动重连机制,能够在网络断开时自动恢复连接。
4、减少连接数:与HTTP相比,MQTT协议只需要客户端与服务器(Broker)建立一次连接就可以进行多次消息发布和订阅,大大减少了网络连接次数和数据传输量。


三、如何使用:第一阶段、基础功能

  1. 如何连接:init方法
  2. 连接后如何订阅:subscribe方法
  3. 如何发送数据,如何接受数据:subscribe方法
/**
 * 测试环境的设备管理系统
 */
class ManageMqtt {

    private var TAG = "MQTT"
    private var client: MqttAndroidClient? = null //mqtt客户端
    private lateinit var options: MqttConnectOptions  //mqtt 的链接信息设置

    @Volatile
    var isMqConnected: Boolean = false


    //初始化,
    fun init(context: Context?) {
        try {
            log("1")

            if (client != null) {
                return
            }
            log("1")
            //MQTT的连接设置
            options = MqttConnectOptions()
            //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
            options.isCleanSession = true
            //重连尝试
            options.isAutomaticReconnect = true
            // 设置超时时间 单位为秒
            options.connectionTimeout = 10
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            options.keepAliveInterval = 90

            client = MqttAndroidClient(context, "tcp://xxx:xxxx", "名称")//名称
            //设置连接的用户名
            options.userName = "xxx"
            //设置连接的密码
            options.password = "xxx".toCharArray()


            //设置回调
            client?.setCallback(object : MqttCallbackExtended {
                override fun connectComplete(reconnect: Boolean, serverURI: String) {
                    log("已连接mq")
                    isMqConnected = true
                    //连接成功,我们要进行订阅
                    subscribe("xxxx")
                }

                override fun connectionLost(cause: Throwable) {
                    log("已断开mq")
                    isMqConnected = false
                }

                override fun deliveryComplete(token: IMqttDeliveryToken) {
                    //publish后会执行到这里  发布
                    try {
                        log("发送成功:" + token.message.toString())
                    } catch (e: Exception) {
                        e.printStackTrace()
                    }
                }

                override fun messageArrived(topicName: String, message: MqttMessage) {
                    //subscribe后得到的消息会执行到这里面  订阅
                    //topicName 为主题
                    try {
                        //todo 收到消息,要进行一些处理的。
                        log("收到消息:$topicName     $message")
                    } catch (e: Exception) {
                        log("异常:$e")
                    }
                }
            })
            connect()
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }

    //进行链接
    private fun connect() {
        Thread(connect).start()
//        Schedulers.io().scheduleDirect(connect)
    }

    private val connect = Runnable {
        if (client != null && client!!.isConnected) {
            return@Runnable
        }
        try {
            log("连接Mq............")
            client?.connect(options, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken) {
                    log("Connection success")
                    //todo 是否连接成功?要重连的。
                }

                override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
                    log("Connection failure")
                    //todo 是否连接成功?要重连的。

                }
            })
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }


    //订阅信息
    fun subscribe(topic: String, qos: Int = 1) {
        try {
            client?.subscribe(topic, qos, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "Subscribed to $topic")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Failed to subscribe $topic")
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

    //发送消息
    fun publish(topic: String, msg: String, qos: Int = 1, retained: Boolean = false) {
        try {
            val message = MqttMessage()
            message.payload = msg.toByteArray()
            message.qos = qos
            message.isRetained = retained
            client?.publish(topic, message, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "$msg published to $topic")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Failed to publish $msg to $topic")
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

    //释放资源
    fun closeMqtt() {
        try {
            if (client != null) {
                client!!.disconnect()
                client = null
            }
        } catch (e: java.lang.Exception) {
            e.printStackTrace()
        }
    }

    //打印log
    private fun log(msg: String) {
        Log.d(TAG, msg)
    }


}


四、如何使用:第二阶段、断网重连

  1. 即使短暂断网,后面自己也还是可以重连恢复。
  2. 如果第一次没有连接上,增加第一次的断网重连
/**
 * 测试环境的设备管理系统
 */
class ManageMqtt {

    private var context: Context? = null
    private var TAG = "MQTT"
    private var client: MqttAndroidClient? = null //mqtt客户端
    private lateinit var options: MqttConnectOptions  //mqtt 的链接信息设置

    @Volatile
    var isMqConnected: Boolean = false


    //初始化,
    fun init(context: Context?) {
        this.context = context
        try {
            log("1")

            if (client != null) {
                return
            }
            log("1")
            //MQTT的连接设置
            options = MqttConnectOptions()
            //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
            options.isCleanSession = true
            //重连尝试
            options.isAutomaticReconnect = true
            // 设置超时时间 单位为秒
            options.connectionTimeout = 10
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            options.keepAliveInterval = 90

            client = MqttAndroidClient(context, "tcp://xxxx:xxxx", "")//名称
            //设置连接的用户名
            options.userName = "xxx"
            //设置连接的密码
            options.password = "xxx".toCharArray()


            //设置回调
            client?.setCallback(object : MqttCallbackExtended {
                override fun connectComplete(reconnect: Boolean, serverURI: String) {
                    log("已连接mq")
                    isMqConnected = true
                    //连接成功,我们要进行订阅
                    subscribe("xxxx")
                }

                override fun connectionLost(cause: Throwable) {
                    log("已断开mq")
                    isMqConnected = false
                }

                override fun deliveryComplete(token: IMqttDeliveryToken) {
                    //publish后会执行到这里  发布
                    try {
                        log("发送成功:" + token.message.toString())
                    } catch (e: Exception) {
                        e.printStackTrace()
                    }
                }

                override fun messageArrived(topicName: String, message: MqttMessage) {
                    //subscribe后得到的消息会执行到这里面  订阅
                    //topicName 为主题
                    try {
                        //todo 收到消息,要进行一些处理的。 Eventbus
                        log("收到消息:$topicName     $message")
                    } catch (e: Exception) {
                        log("异常:$e")
                    }
                }
            })
            connect()
        } catch (e: Exception) {
            e.printStackTrace()
        }
        val intentFilter = IntentFilter()
        intentFilter.addAction(ConnectivityManager.CONNECTIVITY_ACTION)
        intentFilter.addAction(WifiManager.NETWORK_STATE_CHANGED_ACTION)
        intentFilter.addAction(WifiManager.WIFI_STATE_CHANGED_ACTION)
        intentFilter.addAction(WifiManager.RSSI_CHANGED_ACTION)
        context?.registerReceiver(netWorkBroadCastReciver,intentFilter)
    }

    //进行链接
    private fun connect() {
        Thread(connect).start()
//        Schedulers.io().scheduleDirect(connect)
    }

    private val connect = Runnable {
        if (client != null && client!!.isConnected) {
            return@Runnable
        }
        try {
            log("连接Mq............")
            client?.connect(options, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken) {
                    log("Connection success")
                    //todo 是否连接成功?要重连的。
                }

                override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
                    log("Connection failure")
                    //todo 是否连接成功?要重连的。

                }
            })
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }


    //订阅信息
    fun subscribe(topic: String, qos: Int = 1) {
        try {
            client?.subscribe(topic, qos, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "Subscribed to $topic")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Failed to subscribe $topic")
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

    //发送消息
    /**
     * @param topic 主题 给这个主题发送消息
     *  @param qos 0最多一次不管是否收到,1最少一次可能会收到多次,2保证收到,且仅一次
     *  @param retained 发布后是否保留,即重新链接时会存在
     *  @param msg 消息
     */
    fun publish(topic: String, msg: String, qos: Int = 0, retained: Boolean = false) {
        try {
            val message = MqttMessage()
            message.payload = msg.toByteArray()
            message.qos = qos
            message.isRetained = retained //发布后是否保留,即重新链接时会存在
            client?.publish(topic, message, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "$msg published to $topic")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Failed to publish $msg to $topic")
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

    //释放资源
    fun closeMqtt() {
        try {
            if (client != null) {
                client!!.disconnect()
                client = null
            }
        } catch (e: java.lang.Exception) {
            e.printStackTrace()
        }
        context?.unregisterReceiver(netWorkBroadCastReciver)
    }

    //打印log
    private fun log(msg: String) {
        Log.d(TAG, msg)
    }


    private var networkState = 100
    //断网重连查询
    fun isNetConnected(context: Context): Boolean {
        Log.d(TAG, "isNetConnected: ")
        val connectivity =
            context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
        if (connectivity != null) {
            val info = connectivity.activeNetworkInfo
            if (info != null) {
                if (info.type == networkState) {
                    return false
                }
                networkState = info.type
                if (info.type == (ConnectivityManager.TYPE_WIFI)) {
                    if (!isMqConnected) {

                        connect()
                    }
                    return true
                } else if (info.type == (ConnectivityManager.TYPE_MOBILE)) {
                    if (!isMqConnected) {
                        connect()
                    }
                    return true
                }
            }
        }

        return false
    }
    
    var netWorkBroadCastReciver = object :BroadcastReceiver() {
        override fun onReceive(context: Context, intent: Intent?) {
            isNetConnected(context)
            log( "NetWorkBroadCastReciver: ")
        }
    }


}

五、如何使用:第三阶段、封装

  1. 尝试封装,其实就是提供比如,注册,取消注册,订阅,发送数据,或者读取数据的方法。后面如何更换MQTT为其他协议,也很方便
/**
 * 对Mqtt操作的进一步封装
 */
@Singleton
class MqttHelper @Inject constructor() {

    @Inject
    lateinit var mqtt: ManageMqtt


    /**
     * 注册
     */
    fun register(context: Context?){
        mqtt.init(context)
    }

    /**
     * 发送数据
     */
    fun sendData(data :String){
        Heartbeat.deviceId?.let { mqtt.publish(it,Gson().toJson(data)) }
    }

    /**
     * 接收数据
     */
    fun data(kind:String,data:String){
        //待定,一般都是通过eventbus来解决。
    }
}

好了,这篇文章就介绍到这里~,我是前期后期,如果你也有相关的问题,也可以在评论区讨论哦,我们下一篇文章再见。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2253423.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

项目-02-数学学院后台项目开发过程中的问题总结

目录 一、后台(pc端,vue2)1. dialog对话框被黑色蒙层盖住2. 将前端表格导出为word文档3. 在线查看、下载 .docx、.doc、.pdf文档 一、后台(pc端,vue2) 1. dialog对话框被黑色蒙层盖住 问题: d…

大数据实验E5HBase:安装配置,shell 命令和Java API使用

实验目的 熟悉HBase操作常用的shell 命令和Java API使用; 实验要求 掌握HBase的基本操作命令和函数接口的使用; 实验平台 操作系统:Linux(建议Ubuntu16.04或者CentOS 7 以上);Hadoop版本:3…

使用Tomcat搭建简易文件服务器

创建服务器 1. 复制一个tomcat服务器,并命名为file-service(好区分即可) 2.在webapp里面新建一个文件夹 uploadfiles ,用于存储上传的文件 3. 修改conf/service.xml,配置文件服务器的端口与上传文件夹的访问 在Host标签之间加入一个Context标签 docBase"uploa…

【算法】位运算合集

阿华代码,不是逆风,就是我疯 你们的点赞收藏是我前进最大的动力!! 希望本文内容能够帮助到你!! 目录 零:位运算基础公式 零:五道基础题 1:位1的个数 2:比…

【NLP高频面题 - LLM架构篇】旋转位置编码RoPE相对正弦位置编码有哪些优势?

【NLP高频面题 - LLM架构篇】旋转位置编码RoPE相对正弦位置编码有哪些优势? 重要性:⭐⭐⭐ 💯 NLP Github 项目: NLP 项目实践:fasterai/nlp-project-practice 介绍:该仓库围绕着 NLP 任务模型的设计、训练…

《Vue零基础教程》(5)计算属性和侦听器好讲解

1 计算属性 1) 什么是计算属性 计算属性就是基于现有属性计算后的属性 2) 计算属性的作用 计算属性用于对原始数据的再次加工 3) 案例 需求 实现如下效果 使用表达式实现 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF…

Narya.ai正在寻找iOS工程师!#Mixlab内推

如果你对AI技术和iOS开发充满热情&#xff0c;这里有一个绝佳的机会加入一家专注于AI应用创新的初创公司。Narya.ai正在招聘iOS工程师&#xff0c;帮助他们开发下一代效率工具&#xff0c;旨在提升用户的日常生活效率与幸福感。 关于Narya.ai&#xff1a; 专注于AI应用层创新&a…

【开源免费】基于SpringBoot+Vue.JS课程答疑系统(JAVA毕业设计)

博主说明&#xff1a;本文项目编号 T 070 &#xff0c;文末自助获取源码 \color{red}{T070&#xff0c;文末自助获取源码} T070&#xff0c;文末自助获取源码 目录 一、系统介绍二、演示录屏三、启动教程四、功能截图五、文案资料5.1 选题背景5.2 国内外研究现状5.3 可行性分析…

FPGA实战篇(触摸按键控制LED灯)

1.触摸按键简介 触摸按键主要可分为四大类&#xff1a;电阻式、电容式、红外感应式以及表面声波式。根据其属性的不同&#xff0c;每种触摸按键都有其合适的使用领域。 电阻式触摸按键由多块导电薄膜按照按键的位置印制而成&#xff0c;但由于耐用性较差且维护复杂&#xff0c…

VSCode如何关闭Vite项目本地自启动

某些情况下VSCode打开Vite项目不需要自动启动&#xff0c;那么如何关闭该功能 文件>首选项>设置 搜索vite 将Vite:Auto Start 勾选取消即可

重生之我在异世界学编程之C语言:深入指针篇(上)

大家好&#xff0c;这里是小编的博客频道 小编的博客&#xff1a;就爱学编程 很高兴在CSDN这个大家庭与大家相识&#xff0c;希望能在这里与大家共同进步&#xff0c;共同收获更好的自己&#xff01;&#xff01;&#xff01; 本文目录 引言正文&#xff08;1&#xff09;内置数…

TypeScript (一)运行环境配置,数据类型,可选类型,联合类型,type与interface,交叉类型,断言as,字面量类型,类型缩小

文章目录 一、认识TS1.1 JS 存在的问题1.2 TS的出现1.3 TS运行环境运行ts的三种方式 1.4 变量声明1.5 类型推断 二、数据类型2.1 JS数据类型(1) 数组Array(2) 对象Object(3) 其他类型 2.2 TS特有数据类型(1) any类型(2) unknown类型(3) void类型(4) never (了解)(5) tuple类型 …

【Leetcode 每日一题】3274. 检查棋盘方格颜色是否相同

问题背景 给你两个字符串 c o o r d i n a t e 1 coordinate1 coordinate1 和 c o o r d i n a t e 2 coordinate2 coordinate2&#xff0c;代表 8 8 8 \times 8 88 国际象棋棋盘上的两个方格的坐标。 以下是棋盘的参考图。 如果这两个方格颜色相同&#xff0c;返回 t …

【Dubbo03】消息队列与微服务之dubbo-admin 二进制与编译安装

实战案例&#xff1a;二进制安装 dubbo-admin 新版用Golang重构&#xff0c;提供了二进制包&#xff0c;可以直接部署 #下载二进制包 [rootubuntu2204 ~]#wget https://github.com/apache/dubbo-admin/releases/download/0.5.0/apache-dubbo-admin-0.5.0-bin-release.tar.gz …

Kylin Server V10 下 Kafka 集群部署

一、ZooKeeper 集群部署 1、主机规划 主机名 IP 地址 myid 10.8.3.35 1 10.8.3.36 2 10.8.3.37 3 2、拓扑结构 3、部署 (1) 下载Zookeeper [root@localhost ~]# cd /usr/local [root@localhost local]# wget https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-…

redis的应用----缓存

redis的应用----缓存 一、缓存的概念二、使用redis作为缓存2.1使用redis作为缓存的原因2.2缓存机制的访问步骤 三、缓存的更新策略3.1定期更新3.2实时更新3.3淘汰策略 四、缓存常见的问题4.1缓存预热(Cache preheating)4.2缓存穿透(Cache penetration)4.3缓存雪崩(Cache avalan…

用于LiDAR测量的1.58um单芯片MOPA(一)

--翻译自M. Faugeron、M. Krakowski1等人2014年的文章 1.简介 如今&#xff0c;人们对高功率半导体器件的兴趣日益浓厚&#xff0c;这些器件主要用于遥测、激光雷达系统或自由空间通信等应用。与固态激光器相比&#xff0c;半导体器件更紧凑且功耗更低&#xff0c;这在低功率供…

SpringBoot两天

SpringBoot讲义 什么是SpringBoot&#xff1f; Spring Boot是由Pivotal团队提供的全新框架&#xff0c;其设计目的是用来简化新Spring应用的初始搭建以及开发过程。该框架使用了特定的方式来进行配置&#xff0c;从而使开发人员不再需要定义样板化的配置。通过这种方式&#xf…

vue3项目最新eslint9+prettier+husky+stylelint+vscode配置

一、eslint9和prettier通用配置 安装必装插件 ESlint9.x pnpm add eslintlatest -DESlint配置 vue 规则 , typescript解析器 pnpm add eslint-plugin-vue typescript-eslint -DESlint配置 JavaScript 规则 pnpm add eslint/js -D配置所有全局变量 globals pnpm add globa…

LSTM-CNN-BP-RF-SVM五模型咖喱融合策略混合预测模型

目录 效果一览基本介绍程序设计参考资料 效果一览 基本介绍 LSTM-CNN-BP-RF-SVM五模型咖喱融合策略混合预测模型 Matlab代码注释清晰。 程序设计 完整程序和数据获取方式&#xff1a;私信博主回复LSTM-CNN-BP-RF-SVM五模型咖喱融合策略混合预测模型&#xff08;Matlab&#…