目录:
- 问题
- MQTT是什么以及为什么使用
- 如何使用:第一阶段、基础功能
- 如何使用:第二阶段、增加断网重连
- 如何使用:第三阶段、封装
一、问题
在开发的时候,我们一般都使用Http和后台进行通讯,比如我们是开发物联网的,设备会有很多数据需要频繁发给后台,使用Http来做这件事情,就感觉很重,比如会遇到如下这些问题:
- 开发成本:需要后台创建接口,前台去请求。
- 连接数过多:在HTTP协议中,每次请求都需要建立一个新的连接,这可能导致连接数过多,特别是在高并发场景下。对于自动售卖机来说,如果同时有大量的用户进行交互,可能会导致服务器资源紧张,影响性能。
- 开销较大:HTTP协议的消息头部相对复杂,包含了大量的元数据,这增加了网络传输的开销。
- 实时性较差:HTTP协议是基于请求-响应模式的,需要客户端主动发起请求才能获取数据。这导致在实时性要求较高的场景下,HTTP可能无法满足需求。也就是服务器不能主动发数据给客户端。
基于这样的背景,本来想使用Rabbit MQ,但是不能双向通讯,我们选择切换成MQTT。
二、MQTT是什么以及为什么使用
MQTT(Message Queuing Telemetry Transport)是一个轻量级的发布/订阅消息协议,它构建于TCP/IP协议之上,为小型设备提供了稳定的网络通讯。MQTT协议设计简单,易于实现,非常适合在物联网(IoT)和移动应用中使用。
你会发现传递的数据量是根据你的内容来决定。
能干吗:
1、实时通讯:MQTT支持异步通讯模式,客户端可以通过订阅主题来接收感兴趣的消息,而不需要主动请求。这使得MQTT非常适合实时通讯和事件驱动的应用场景。
2、低开销:MQTT协议的数据包开销非常小,消息头部仅需2字节,非常适合网络带宽受限或设备资源受限的环境。
3、高可靠性:MQTT支持三种不同的服务质量(QoS)级别,可以根据实际需求选择合适的级别来确保消息的可靠传输。同时,MQTT还具有自动重连机制,能够在网络断开时自动恢复连接。
4、减少连接数:与HTTP相比,MQTT协议只需要客户端与服务器(Broker)建立一次连接就可以进行多次消息发布和订阅,大大减少了网络连接次数和数据传输量。
三、如何使用:第一阶段、基础功能
- 如何连接:init方法
- 连接后如何订阅:subscribe方法
- 如何发送数据,如何接受数据:subscribe方法
/**
* 测试环境的设备管理系统
*/
class ManageMqtt {
private var TAG = "MQTT"
private var client: MqttAndroidClient? = null //mqtt客户端
private lateinit var options: MqttConnectOptions //mqtt 的链接信息设置
@Volatile
var isMqConnected: Boolean = false
//初始化,
fun init(context: Context?) {
try {
log("1")
if (client != null) {
return
}
log("1")
//MQTT的连接设置
options = MqttConnectOptions()
//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.isCleanSession = true
//重连尝试
options.isAutomaticReconnect = true
// 设置超时时间 单位为秒
options.connectionTimeout = 10
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.keepAliveInterval = 90
client = MqttAndroidClient(context, "tcp://xxx:xxxx", "名称")//名称
//设置连接的用户名
options.userName = "xxx"
//设置连接的密码
options.password = "xxx".toCharArray()
//设置回调
client?.setCallback(object : MqttCallbackExtended {
override fun connectComplete(reconnect: Boolean, serverURI: String) {
log("已连接mq")
isMqConnected = true
//连接成功,我们要进行订阅
subscribe("xxxx")
}
override fun connectionLost(cause: Throwable) {
log("已断开mq")
isMqConnected = false
}
override fun deliveryComplete(token: IMqttDeliveryToken) {
//publish后会执行到这里 发布
try {
log("发送成功:" + token.message.toString())
} catch (e: Exception) {
e.printStackTrace()
}
}
override fun messageArrived(topicName: String, message: MqttMessage) {
//subscribe后得到的消息会执行到这里面 订阅
//topicName 为主题
try {
//todo 收到消息,要进行一些处理的。
log("收到消息:$topicName $message")
} catch (e: Exception) {
log("异常:$e")
}
}
})
connect()
} catch (e: Exception) {
e.printStackTrace()
}
}
//进行链接
private fun connect() {
Thread(connect).start()
// Schedulers.io().scheduleDirect(connect)
}
private val connect = Runnable {
if (client != null && client!!.isConnected) {
return@Runnable
}
try {
log("连接Mq............")
client?.connect(options, null, object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken) {
log("Connection success")
//todo 是否连接成功?要重连的。
}
override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
log("Connection failure")
//todo 是否连接成功?要重连的。
}
})
} catch (e: Exception) {
e.printStackTrace()
}
}
//订阅信息
fun subscribe(topic: String, qos: Int = 1) {
try {
client?.subscribe(topic, qos, null, object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken?) {
Log.d(TAG, "Subscribed to $topic")
}
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
Log.d(TAG, "Failed to subscribe $topic")
}
})
} catch (e: MqttException) {
e.printStackTrace()
}
}
//发送消息
fun publish(topic: String, msg: String, qos: Int = 1, retained: Boolean = false) {
try {
val message = MqttMessage()
message.payload = msg.toByteArray()
message.qos = qos
message.isRetained = retained
client?.publish(topic, message, null, object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken?) {
Log.d(TAG, "$msg published to $topic")
}
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
Log.d(TAG, "Failed to publish $msg to $topic")
}
})
} catch (e: MqttException) {
e.printStackTrace()
}
}
//释放资源
fun closeMqtt() {
try {
if (client != null) {
client!!.disconnect()
client = null
}
} catch (e: java.lang.Exception) {
e.printStackTrace()
}
}
//打印log
private fun log(msg: String) {
Log.d(TAG, msg)
}
}
四、如何使用:第二阶段、断网重连
- 即使短暂断网,后面自己也还是可以重连恢复。
- 如果第一次没有连接上,增加第一次的断网重连
/**
* 测试环境的设备管理系统
*/
class ManageMqtt {
private var context: Context? = null
private var TAG = "MQTT"
private var client: MqttAndroidClient? = null //mqtt客户端
private lateinit var options: MqttConnectOptions //mqtt 的链接信息设置
@Volatile
var isMqConnected: Boolean = false
//初始化,
fun init(context: Context?) {
this.context = context
try {
log("1")
if (client != null) {
return
}
log("1")
//MQTT的连接设置
options = MqttConnectOptions()
//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.isCleanSession = true
//重连尝试
options.isAutomaticReconnect = true
// 设置超时时间 单位为秒
options.connectionTimeout = 10
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.keepAliveInterval = 90
client = MqttAndroidClient(context, "tcp://xxxx:xxxx", "")//名称
//设置连接的用户名
options.userName = "xxx"
//设置连接的密码
options.password = "xxx".toCharArray()
//设置回调
client?.setCallback(object : MqttCallbackExtended {
override fun connectComplete(reconnect: Boolean, serverURI: String) {
log("已连接mq")
isMqConnected = true
//连接成功,我们要进行订阅
subscribe("xxxx")
}
override fun connectionLost(cause: Throwable) {
log("已断开mq")
isMqConnected = false
}
override fun deliveryComplete(token: IMqttDeliveryToken) {
//publish后会执行到这里 发布
try {
log("发送成功:" + token.message.toString())
} catch (e: Exception) {
e.printStackTrace()
}
}
override fun messageArrived(topicName: String, message: MqttMessage) {
//subscribe后得到的消息会执行到这里面 订阅
//topicName 为主题
try {
//todo 收到消息,要进行一些处理的。 Eventbus
log("收到消息:$topicName $message")
} catch (e: Exception) {
log("异常:$e")
}
}
})
connect()
} catch (e: Exception) {
e.printStackTrace()
}
val intentFilter = IntentFilter()
intentFilter.addAction(ConnectivityManager.CONNECTIVITY_ACTION)
intentFilter.addAction(WifiManager.NETWORK_STATE_CHANGED_ACTION)
intentFilter.addAction(WifiManager.WIFI_STATE_CHANGED_ACTION)
intentFilter.addAction(WifiManager.RSSI_CHANGED_ACTION)
context?.registerReceiver(netWorkBroadCastReciver,intentFilter)
}
//进行链接
private fun connect() {
Thread(connect).start()
// Schedulers.io().scheduleDirect(connect)
}
private val connect = Runnable {
if (client != null && client!!.isConnected) {
return@Runnable
}
try {
log("连接Mq............")
client?.connect(options, null, object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken) {
log("Connection success")
//todo 是否连接成功?要重连的。
}
override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
log("Connection failure")
//todo 是否连接成功?要重连的。
}
})
} catch (e: Exception) {
e.printStackTrace()
}
}
//订阅信息
fun subscribe(topic: String, qos: Int = 1) {
try {
client?.subscribe(topic, qos, null, object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken?) {
Log.d(TAG, "Subscribed to $topic")
}
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
Log.d(TAG, "Failed to subscribe $topic")
}
})
} catch (e: MqttException) {
e.printStackTrace()
}
}
//发送消息
/**
* @param topic 主题 给这个主题发送消息
* @param qos 0最多一次不管是否收到,1最少一次可能会收到多次,2保证收到,且仅一次
* @param retained 发布后是否保留,即重新链接时会存在
* @param msg 消息
*/
fun publish(topic: String, msg: String, qos: Int = 0, retained: Boolean = false) {
try {
val message = MqttMessage()
message.payload = msg.toByteArray()
message.qos = qos
message.isRetained = retained //发布后是否保留,即重新链接时会存在
client?.publish(topic, message, null, object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken?) {
Log.d(TAG, "$msg published to $topic")
}
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
Log.d(TAG, "Failed to publish $msg to $topic")
}
})
} catch (e: MqttException) {
e.printStackTrace()
}
}
//释放资源
fun closeMqtt() {
try {
if (client != null) {
client!!.disconnect()
client = null
}
} catch (e: java.lang.Exception) {
e.printStackTrace()
}
context?.unregisterReceiver(netWorkBroadCastReciver)
}
//打印log
private fun log(msg: String) {
Log.d(TAG, msg)
}
private var networkState = 100
//断网重连查询
fun isNetConnected(context: Context): Boolean {
Log.d(TAG, "isNetConnected: ")
val connectivity =
context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
if (connectivity != null) {
val info = connectivity.activeNetworkInfo
if (info != null) {
if (info.type == networkState) {
return false
}
networkState = info.type
if (info.type == (ConnectivityManager.TYPE_WIFI)) {
if (!isMqConnected) {
connect()
}
return true
} else if (info.type == (ConnectivityManager.TYPE_MOBILE)) {
if (!isMqConnected) {
connect()
}
return true
}
}
}
return false
}
var netWorkBroadCastReciver = object :BroadcastReceiver() {
override fun onReceive(context: Context, intent: Intent?) {
isNetConnected(context)
log( "NetWorkBroadCastReciver: ")
}
}
}
五、如何使用:第三阶段、封装
- 尝试封装,其实就是提供比如,注册,取消注册,订阅,发送数据,或者读取数据的方法。后面如何更换MQTT为其他协议,也很方便
/**
* 对Mqtt操作的进一步封装
*/
@Singleton
class MqttHelper @Inject constructor() {
@Inject
lateinit var mqtt: ManageMqtt
/**
* 注册
*/
fun register(context: Context?){
mqtt.init(context)
}
/**
* 发送数据
*/
fun sendData(data :String){
Heartbeat.deviceId?.let { mqtt.publish(it,Gson().toJson(data)) }
}
/**
* 接收数据
*/
fun data(kind:String,data:String){
//待定,一般都是通过eventbus来解决。
}
}
好了,这篇文章就介绍到这里~,我是前期后期,如果你也有相关的问题,也可以在评论区讨论哦,我们下一篇文章再见。