MQTT官网:https://mqtt.org/
百度Android MQTT,或者B站上搜索,发现大多使用https://github.com/eclipse/paho.mqtt.android,这是Eclipse的一个Android MQTT客户端实现库,但是我发现这个库在运行到高版本的手机上时报错了,这个库也是N年没有更新的了,而且这个库不支持MQTT5.0的,所以我找了新的库。
在查看MQTT官网的时候,发现关于MQTT的很多介绍是链接到了HiveMQ上面的,不知道它们是什么关系,我发现HiveMQ即有提供MQTT的服务器端,也有提供客户端,而且官方都给他跳转了,那我就用它的库来实现吧!使用了之后才发现,这个库是真的好用啊,封装的非常好,代码写起来特别简洁,响应式编程,支持异步,可以使用Java自带的,也可以使用RxJava或Reactor,HiveMQ的断线自动重连做的也比较好。库地址:https://github.com/hivemq/hivemq-mqtt-client,这个库好像没有限定Android,所以在普通的Java项目中也是可以使用的。android示例代码如下:
-
添加依赖
implementation("com.hivemq:hivemq-mqtt-client:1.3.0")
-
权限声明
<uses-permission android:name="android.permission.INTERNET"/>
可以看到,相比
paho.mqtt.android
,HiveMQ的只需要声明一个互联网权限。 -
界面UI
-
MQTT实现代码:
Mqtt.kt
object Mqtt { private const val clientId = "9527" private const val host = "192.168.1.188" private const val port = 1883 private const val topic = "message/topic" private var client: Mqtt3AsyncClient? = null private fun createMqttClient(): Mqtt3AsyncClient = Mqtt3Client.builder() .identifier(clientId) .serverHost(host) .serverPort(port) // 认证设置 /*.simpleAuth() .username("admin") .password("password".toByteArray()) .applySimpleAuth()*/ // 重连设置 .automaticReconnect() .initialDelay(1, TimeUnit.SECONDS) // 断线1秒后开始自动重连,如果重连还失败,则下次会等时间会按指数增长,比如2秒、4秒、8秒,双倍增长等待时间,但是不会超过最大值,由maxDelay函数来指定最大值。 .maxDelay(32, TimeUnit.SECONDS) // 断线后最多32秒就会自动重连,第5次连会来到32的位置,前面4次已用掉31秒的等待时间了。 .applyAutomaticReconnect() // 连接状态监听器设置 .addConnectedListener { println("MQTT${it.clientConfig.serverHost}:${it.clientConfig.serverPort}连接成功") } .addDisconnectedListener { // 客户端断开连接,或者连接失败都会回调这里 println("MQTT${it.clientConfig.serverHost}:${it.clientConfig.serverPort}连接断开:${it.cause.message},连接状态:${it.clientConfig.state.name}") /*when (it.clientConfig.state) { MqttClientState.CONNECTING -> println("手动连接失败") // 即主动调用connect时没连接成功 MqttClientState.CONNECTING_RECONNECT -> println("自动重连失败") // 即连接成功后异常断开自动重连时连接失败 MqttClientState.CONNECTED -> println("连接正常断开或异常断开") else -> println("连接断开:${it.clientConfig.state.name}") }*/ } .buildAsync() // 消息监听器设置 .also { // 接收订阅的消息。publishes必须在subscribe之前调用以确保消息不会丢失,可以在connect之前调用它以便接收前一个会话的消息。 it.publishes(MqttGlobalPublishFilter.ALL) { publish: Mqtt3Publish -> println("收到${publish.topic}的消息:${String(publish.payloadAsBytes)}") } } fun connect() { disconnect() // 断开连接后的client没法再复用,复用的client重新再连接时会收不到离线时的消息。所以每次连接时创建一个新的client。 val client = createMqttClient().also { this.client = it } client.connectWith() .cleanSession(false) // false为持久会话,这样离线再上线时还能收到离线时别人推送的消息。 .keepAlive(60) // 心跳时间间隔,单位为秒 .send() .whenComplete { ack, e -> if (ack != null) { // 连接成功之后订阅主题 println("手动连接成功:$ack") client.subscribeWith().topicFilter(topic).qos(MqttQos.EXACTLY_ONCE).send() } else if (e != null) { println("手动连接失败: ${e.message}") } } } fun disconnect() { client?.let { it.disconnect().thenAccept { println("手动断开了连接") } client = null } } fun subscribe(topic: String = this.topic, qos: MqttQos = MqttQos.EXACTLY_ONCE) { val client = this.client ?: return client.subscribeWith().topicFilter(topic).qos(MqttQos.EXACTLY_ONCE).send() } fun unsubscribe(topic: String = this.topic) { val client = this.client ?: return client.unsubscribeWith().topicFilter(topic).send() } fun publish(message: String, topic: String = this.topic, qos: MqttQos = MqttQos.EXACTLY_ONCE, retain: Boolean = false) { val client = this.client ?: return client .publishWith() .topic(topic) .qos(qos) .retain(retain) .payload(message.toByteArray()) .send() } fun clearRetainMessage(topic: String = this.topic) { val client = this.client ?: return // 发送一条空的retain消息即可清除retain消息 client.publishWith().topic(topic).retain(true).send() } }
-
Activity调用Mqtt相关功能:
MainActivity.kt
class MainActivity : AppCompatActivity() { private val testTopic = "topic/hello" override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) } fun connect(view: View) = Mqtt.connect() fun disconnect(view: View) = Mqtt.disconnect() fun subscribe(view: View) = Mqtt.subscribe(testTopic) fun unsubscribe(view: View) = Mqtt.unsubscribe(testTopic) fun publish(view: View) = Mqtt.publish("How are you?", testTopic) fun publishRetain(view: View) = Mqtt.publish("I'm a retain message!", testTopic, retain = true) fun clearRetain(view: View) = Mqtt.clearRetainMessage(testTopic) }
示例完整代码:https://gitee.com/daizhufei/HelloMQTT
这里MQTT的服务器端我使用的是ActivityMQ,它目前的
ActiveMQ Classic
最新版本是6.0.1,只支持MQTT3.1
和3.1.1
,ActiveMQ Artemis
的最新版本是2.32.0,支持支持MQTT3.1
、MQ3.1.1
和MQTT5.0
,官方说明如下:- https://activemq.apache.org/components/classic/documentation/mqtt
- https://activemq.apache.org/components/artemis/documentation/latest/mqtt.html
随着时间的推移,它支持的版本可能会发生变化。这里截图如下:
MQTT其他知识总结:
- 关于MQTT的基础知识,可查看:https://www.hivemq.com/blog/mqtt-essentials-part-10-alive-client-take-over/
clientId
, 连接的时候用到这个参数,(也叫identifier
)应该唯一,可以使用账号做为clientId
,比如我们公司的需求是先使用账号密码登录一个Web接口,然后再连接MQTT,但是这个账号是可以在多台设备上同时登录的,所以使用直接使用账号的话会出现重复,可以为每台设备生成一个唯一标识然后持久化存储,然后clientId
就可以使用账号和唯一标识组合一起使用。比如:username
+uuid
,而uuid
是持久化保存的。cleanSession
, 连接的时候用到这个参数,持久会话设置,如果设置为false
,则是持久的,true
为非持久,持久的意思是当设备离线后,如果有消息推过来,上线时还能收到。如果是非持久的则收不到离线时的消息。keepAlive
,连接的时候用到这个参数,用于设置发送心跳的时间间隔,在客户端和服务器没有交流时,在指定的keepAlive
时间到达后会发送心跳给服务器,以证明客户端还活着(也就是说确保连接是正常的)。MQTT是使用TCP的,虽然理论上TCP/IP会在套接字中断时通知您,但在实践中,特别是在移动和卫星链路等情况下,它们经常在空中“伪造”TCP并在每一端放回报头,TCP会话很可能会“黑洞”,即它看起来仍然打开,但只是将您写入的任何内容倾倒到地板上,内容来自:https://www.hivemq.com/blog/mqtt-essentials-part-10-alive-client-take-over/qos
订阅主题和推送消息的时候用到这个参数,有0、1、2,2是最好的质量,现在的手机网络已经比以前好太多了,所以我就选最好的质量了,质量越好需要的流量越多,但是现在的流量已经不像从前那样贵了,所以无所谓。retain
推送消息的时候用到这个参数,如果设置为true,则推送的这条消息会在客户端每次连接时都接收到,比如连接了收到了,然后断开连接,然后再连接上,还是能收到这条消息。如果需要清除这样的消息,则往这个主题再发送一条空的retain为true的消息即可,这样客户端在连接成功时就不会再收到这条消息了。lastWill
简称遗嘱,遗嘱是一条普通的消息,设置遗嘱后,当客户端异常断开时,服务器会给遗嘱指定的主题推送这条消息。一开始我在想,客户端都断开了,怎么推送消息的啊,这是在客户端上线的时候把遗嘱先传给服务器了,所以客户端断线,则服务器来推送这条消息。如果是正常的断开连接,则服务器不会推送遗嘱消息。- MQTT测试客户端:MQTTX,目前感觉这个测试的客户端是非常好用的,而且这个网站上也有很多关于MQTT的教程。需要注意的是,新建连接是默认选的MQTT5.0协议,需要选择服务器支持的协议。
- MQTT入门:https://www.hivemq.com/mqtt/
关于ActiveMQ服务器:
- 服务器默认是启用了MQTT的,也就是说安装好ActiveMQ之后不需要配置就可以使用客户端来进行MQTT的连接了
- 默认没有启用MQTT认证,也就是MQTT客户端连接时不需要提供用户名和密码就能连接
- 默认的MQTT端口是1883
- 目前的最新版本
ActiveMQ Classic 6.0.1
支持 MQTTv3.1
和v3.1.1
- 目前的最新版本
ActiveMQ Artemis 2.32.0
支持 MQTTv3.1
、v3.1.1
和v5.0