在Android开发中,封装MQTT工具可以帮助简化与MQTT服务器的通信。MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息传输协议,常用于物联网(IoT)设备之间的通信。
以下是一个简单的MQTT封装工具示例,使用Eclipse Paho库来实现MQTT客户端功能。
- 添加依赖
首先,在build.gradle文件中添加Paho MQTT库的依赖:
dependencies {
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}
- 权限配置
在AndroidManifest.xml中添加必要的权限:
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
- 服务配置
在AndroidManifest.xml中注册Paho的MqttService:
<service android:name="org.eclipse.paho.android.service.MqttService" />
4.MQTT封装工具类支持:
自定义客户端ID:支持用户自定义客户端ID。
遗言消息:支持设置遗言消息,用于客户端异常断开时通知其他客户端。
线程池管理:使用线程池处理MQTT操作,避免阻塞主线程。
灵活的回调机制:支持为每个操作单独设置回调。
连接重试机制:增加连接失败后的重试机制,提升连接稳定性。
日志级别:支持动态调整日志级别,便于调试和生产环境切换。
支持多服务器连接:允许同时连接多个MQTT服务器。
消息缓存策略:增加消息缓存的最大数量和时间限制,避免内存泄漏。
更灵活的重试机制:支持自定义重试次数和重试间隔。
连接状态管理:增加更详细的连接状态(如连接中、已连接、断开中等)。
支持QoS级别配置:允许动态配置发布和订阅的QoS级别。
MQTT工具类
import android.content.Context;
import android.util.Log;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MqttManager {
private static final String TAG = "MqttManager";
private static MqttManager instance;
private Map<String, MqttAndroidClient> mqttClients = new HashMap<>(); // 支持多服务器连接
private Map<String, MqttConnectOptions> mqttConnectOptionsMap = new HashMap<>();
private Map<String, Boolean> connectionStatusMap = new HashMap<>(); // 连接状态
private Map<String, List<MqttMessage>> messageQueueMap = new HashMap<>(); // 消息队列
private Map<String, List<String>> subscribedTopicsMap = new HashMap<>(); // 已订阅的主题
private ExecutorService executorService = Executors.newCachedThreadPool(); // 线程池
private ConnectionStateListener connectionStateListener;
private static final int DEFAULT_QOS = 1;
private static final boolean DEFAULT_RETAINED = false;
private static final int MAX_MESSAGE_QUEUE_SIZE = 100; // 消息队列最大数量
private static final long MESSAGE_QUEUE_TIMEOUT = 60000; // 消息队列超时时间(毫秒)
private static final int MAX_RETRY_COUNT = 3; // 最大重试次数
private static final int RETRY_INTERVAL = 5000; // 重试间隔(毫秒)
// 单例模式
public static synchronized MqttManager getInstance() {
if (instance == null) {
instance = new MqttManager();
}
return instance;
}
private MqttManager() {}
/**
* 初始化MQTT客户端
*/
public void initClient(Context context, String serverUri, String clientId) {
if (mqttClients.containsKey(serverUri)) {
Log.w(TAG, "Client already initialized for server: " + serverUri);
return;
}
MqttAndroidClient client = new MqttAndroidClient(context, serverUri, clientId);
mqttClients.put(serverUri, client);
mqttConnectOptionsMap.put(serverUri, new MqttConnectOptions());
connectionStatusMap.put(serverUri, false);
messageQueueMap.put(serverUri, new ArrayList<>());
subscribedTopicsMap.put(serverUri, new ArrayList<>());
// 设置回调
client.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
connectionStatusMap.put(serverURI, true);
Log.d(TAG, "MQTT connected: " + serverURI);
if (connectionStateListener != null) {
connectionStateListener.onConnected(serverURI, reconnect);
}
// 连接成功后发送缓存的消息
processMessageQueue(serverURI);
// 重新订阅主题
resubscribeTopics(serverURI);
}
@Override
public void connectionLost(Throwable cause) {
for (Map.Entry<String, MqttAndroidClient> entry : mqttClients.entrySet()) {
if (entry.getValue().equals(client)) {
String serverUri = entry.getKey();
connectionStatusMap.put(serverUri, false);
Log.e(TAG, "MQTT connection lost: " + serverUri, cause);
if (connectionStateListener != null) {
connectionStateListener.onDisconnected(serverUri, cause);
}
// 尝试重新连接
attemptReconnect(serverUri, 0);
break;
}
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.d(TAG, "Message arrived: " + new String(message.getPayload()));
if (connectionStateListener != null) {
connectionStateListener.onMessageReceived(topic, message);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
Log.d(TAG, "Message delivered");
}
});
}
/**
* 设置连接参数
*/
public void setConnectionOptions(String serverUri, String username, String password) {
MqttConnectOptions options = mqttConnectOptionsMap.get(serverUri);
if (options != null) {
options.setUserName(username);
options.setPassword(password.toCharArray());
}
}
/**
* 设置遗言消息
*/
public void setWillMessage(String serverUri, String topic, String message, int qos, boolean retained) {
MqttConnectOptions options = mqttConnectOptionsMap.get(serverUri);
if (options != null) {
options.setWill(topic, message.getBytes(), qos, retained);
}
}
/**
* 设置SSL/TLS加密连接
*/
public void setSSL(String serverUri, boolean useSSL) {
MqttConnectOptions options = mqttConnectOptionsMap.get(serverUri);
if (options != null && useSSL) {
options.setSocketFactory(new DefaultSSLSocketFactory());
}
}
/**
* 连接到MQTT服务器
*/
public void connect(String serverUri) {
if (connectionStatusMap.get(serverUri)) {
Log.w(TAG, "Already connected to MQTT server: " + serverUri);
return;
}
MqttAndroidClient client = mqttClients.get(serverUri);
MqttConnectOptions options = mqttConnectOptionsMap.get(serverUri);
if (client == null || options == null) {
Log.e(TAG, "Client or options not initialized for server: " + serverUri);
return;
}
executorService.execute(() -> {
try {
client.connect(options, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.d(TAG, "Connected to MQTT server: " + serverUri);
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.e(TAG, "Failed to connect to MQTT server: " + serverUri, exception);
attemptReconnect(serverUri, 0);
}
});
} catch (MqttException e) {
Log.e(TAG, "MQTT connection error: " + serverUri, e);
attemptReconnect(serverUri, 0);
}
});
}
/**
* 断开连接
*/
public void disconnect(String serverUri) {
if (!connectionStatusMap.get(serverUri)) {
Log.w(TAG, "Already disconnected from MQTT server: " + serverUri);
return;
}
MqttAndroidClient client = mqttClients.get(serverUri);
if (client == null) {
Log.e(TAG, "Client not initialized for server: " + serverUri);
return;
}
executorService.execute(() -> {
try {
client.disconnect(null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
connectionStatusMap.put(serverUri, false);
Log.d(TAG, "Disconnected from MQTT server: " + serverUri);
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.e(TAG, "Failed to disconnect from MQTT server: " + serverUri, exception);
}
});
} catch (MqttException e) {
Log.e(TAG, "MQTT disconnection error: " + serverUri, e);
}
});
}
/**
* 订阅主题
*/
public void subscribe(String serverUri, String topic, int qos, IMqttActionListener listener) {
if (!connectionStatusMap.get(serverUri)) {
Log.w(TAG, "Not connected, subscription will be attempted after connection: " + serverUri);
subscribedTopicsMap.get(serverUri).add(topic); // 缓存主题
return;
}
MqttAndroidClient client = mqttClients.get(serverUri);
if (client == null) {
Log.e(TAG, "Client not initialized for server: " + serverUri);
return;
}
executorService.execute(() -> {
try {
client.subscribe(topic, qos, null, listener);
} catch (MqttException e) {
Log.e(TAG, "MQTT subscription error: " + serverUri, e);
}
});
}
/**
* 发布消息
*/
public void publish(String serverUri, String topic, String message, int qos, boolean retained, IMqttActionListener listener) {
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
if (!connectionStatusMap.get(serverUri)) {
Log.w(TAG, "Not connected, message will be queued: " + serverUri);
List<MqttMessage> queue = messageQueueMap.get(serverUri);
if (queue.size() < MAX_MESSAGE_QUEUE_SIZE) {
queue.add(mqttMessage); // 缓存消息
} else {
Log.w(TAG, "Message queue is full, discarding message: " + serverUri);
}
return;
}
MqttAndroidClient client = mqttClients.get(serverUri);
if (client == null) {
Log.e(TAG, "Client not initialized for server: " + serverUri);
return;
}
executorService.execute(() -> {
try {
client.publish(topic, mqttMessage, null, listener);
} catch (MqttException e) {
Log.e(TAG, "MQTT publish error: " + serverUri, e);
}
});
}
/**
* 处理消息队列
*/
private void processMessageQueue(String serverUri) {
List<MqttMessage> queue = messageQueueMap.get(serverUri);
for (MqttMessage message : queue) {
publish(serverUri, "default/topic", new String(message.getPayload()), message.getQos(), message.isRetained(), null);
}
queue.clear();
}
/**
* 重新订阅主题
*/
private void resubscribeTopics(String serverUri) {
List<String> topics = subscribedTopicsMap.get(serverUri);
for (String topic : topics) {
subscribe(serverUri, topic, DEFAULT_QOS, null);
}
}
/**
* 尝试重新连接
*/
private void attemptReconnect(String serverUri, int retryCount) {
if (retryCount >= MAX_RETRY_COUNT) {
Log.w(TAG, "Max retry count reached for server: " + serverUri);
return;
}
executorService.execute(() -> {
try {
Thread.sleep(RETRY_INTERVAL);
connect(serverUri);
} catch (InterruptedException e) {
Log.e(TAG, "Reconnect attempt interrupted: " + serverUri, e);
}
});
}
/**
* 设置连接状态监听器
*/
public void setConnectionStateListener(ConnectionStateListener listener) {
this.connectionStateListener = listener;
}
/**
* 连接状态监听器接口
*/
public interface ConnectionStateListener {
void onConnected(String serverUri, boolean isReconnect);
void onDisconnected(String serverUri, Throwable cause);
void onMessageReceived(String topic, MqttMessage message);
}
}
以下是一个使用 MqttManager 的完整示例。这个示例展示了如何初始化MQTT客户端、连接服务器、订阅主题、发布消息以及处理连接状态和消息回调。
import android.os.Bundle;
import androidx.appcompat.app.AppCompatActivity;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MainActivity extends AppCompatActivity {
private static final String TAG = "MainActivity";
private static final String SERVER_URI = "tcp://your.mqtt.broker:1883"; // MQTT服务器地址
private static final String CLIENT_ID = "android_client_" + System.currentTimeMillis(); // 客户端ID
private static final String TOPIC = "your/topic"; // 订阅的主题
private static final String WILL_TOPIC = "your/lwt/topic"; // 遗言主题
private static final String WILL_MESSAGE = "Client disconnected"; // 遗言消息
private MqttManager mqttManager;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
// 初始化MQTT管理器
mqttManager = MqttManager.getInstance();
mqttManager.initClient(this, SERVER_URI, CLIENT_ID);
// 设置连接参数
mqttManager.setConnectionOptions(SERVER_URI, "username", "password");
// 设置遗言消息
mqttManager.setWillMessage(SERVER_URI, WILL_TOPIC, WILL_MESSAGE, 1, true);
// 设置连接状态监听器
mqttManager.setConnectionStateListener(new MqttManager.ConnectionStateListener() {
@Override
public void onConnected(String serverUri, boolean isReconnect) {
Log.d(TAG, "Connected to MQTT server: " + serverUri + ", isReconnect: " + isReconnect);
// 订阅主题
mqttManager.subscribe(SERVER_URI, TOPIC, 1, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.d(TAG, "Subscribed to topic: " + TOPIC);
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.e(TAG, "Failed to subscribe to topic: " + TOPIC, exception);
}
});
}
@Override
public void onDisconnected(String serverUri, Throwable cause) {
Log.e(TAG, "Disconnected from MQTT server: " + serverUri, cause);
}
@Override
public void onMessageReceived(String topic, MqttMessage message) {
Log.d(TAG, "Received message from topic: " + topic + ", payload: " + new String(message.getPayload()));
}
});
// 连接MQTT服务器
mqttManager.connect(SERVER_URI);
// 发布消息
publishMessage("Hello, MQTT!");
}
/**
* 发布消息
*/
private void publishMessage(String message) {
mqttManager.publish(SERVER_URI, TOPIC, message, 1, false, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.d(TAG, "Message published to topic: " + TOPIC);
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.e(TAG, "Failed to publish message to topic: " + TOPIC, exception);
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
// 断开连接
mqttManager.disconnect(SERVER_URI);
}
}
示例说明
初始化MQTT客户端:
使用 MqttManager.getInstance() 获取单例实例。
调用 initClient() 初始化MQTT客户端,传入服务器地址和客户端ID。
设置连接参数:
使用 setConnectionOptions() 设置用户名和密码。
设置遗言消息:
使用 setWillMessage() 设置遗言消息,当客户端异常断开时,服务器会发布该消息。
设置连接状态监听器:
实现 ConnectionStateListener 接口,监听连接状态变化和消息到达事件。
连接MQTT服务器:
调用 connect() 连接到MQTT服务器。
订阅主题:
在 onConnected() 回调中订阅主题。
发布消息:
使用 publish() 方法发布消息到指定主题。
断开连接:
在 onDestroy() 中调用 disconnect() 断开连接。
注意事项
服务器地址和主题:
替换 SERVER_URI 和 TOPIC 为实际的MQTT服务器地址和主题。
用户名和密码:
如果MQTT服务器需要认证,请设置正确的用户名和密码。
遗言消息:
遗言消息用于在客户端异常断开时通知其他客户端,确保设置合适的主题和消息内容。
线程安全:
MQTT操作在后台线程中执行,避免阻塞主线程。