使用MQTTnet(Version=4.3.1.873)库实现多客户端连接多服务端,同时实现断线重连;
如下图所示,开启3个客户端连接3个服务端,当其一个服务端出现异常(服务停止,网络异常无法连接)导致连接断开时,实现每5秒连接一次
MQTT连接服务核心类:业务需求是一个客户端对应的一个MQTT服务,因此按照服务端个数创建对应的客户端连接,实现通过每一个服务端订阅消息,获取数据
using MQTTnet.Client;
using MQTTnet.Diagnostics;
using MQTTnet;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace MqttConnectManage
{
public class MqttClientManage
{
public readonly List<IMqttClient> _clients = new List<IMqttClient>();
public readonly string[] _serverAdr;
private List<MqttClientOptions> serverOptions = new List<MqttClientOptions>();
private readonly IMqttNetLogger logger;
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
/// <summary>
/// 日志记录
/// </summary>
/// <param name="logger"></param>
public MqttClientManage(IMqttNetLogger logger)
{
this.logger = logger;
}
/// <summary>
/// 客户端连接信息
/// </summary>
/// <param name="brokerAddress"></param>
/// <param name="port"></param>
/// <param name="clientId"></param>
/// <param name="username"></param>
/// <param name="password"></param>
public void AddServer(MQTTServerModel model)
{
var options = new MqttClientOptionsBuilder()
.WithClientId(model.clientId)
.WithTcpServer(model.brokerAddress, model.port)
.WithCleanSession()
.WithCredentials(model.username, model.password)
.Build();
serverOptions.Add(options);
}
public async Task ConnectAllAsync()
{
var mqttFactory = new MqttFactory();
foreach (var options in serverOptions)
{
var mqttClient = mqttFactory.CreateMqttClient();
//3.连接成功事件
//mqttClient.ConnectedAsync +=Client_ConnectHandler;
mqttClient.ConnectedAsync +=(async e =>
{
Console.WriteLine($"【{options.ClientId}】{DateTime.Now.ToShortDateString()}The MQTT client is connected!【{e.ConnectResult.ResultCode}】");
});
//4.订阅消息回调事件
//mqttClient.ApplicationMessageReceivedAsync+=Client_MessageReceivedHandler;
mqttClient.ApplicationMessageReceivedAsync+=(async e =>
{
Console.WriteLine($"【{DateTime.Now.ToShortDateString()}】[{e.ClientId}]--[{e.ApplicationMessage.Topic}]--Message Payload: {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
});
//5.断开连接事件
//mqttClient.DisconnectedAsync +=Client_DisconnectedHandler;
mqttClient.DisconnectedAsync +=(async e =>
{
Console.WriteLine($"【{options.ClientId}】-{DateTime.Now.ToShortDateString()}-连接断开:{e.Reason}-->{(e.Exception==null ? null : e.Exception.Message)}");
Console.WriteLine($"当前客户端数量:{_clients.Count},存活{_clients.Where(n => n.IsConnected == true).ToArray().Length}");
await Task.Delay(TimeSpan.FromSeconds(5));//延迟5s重连
try
{
await mqttClient.ConnectAsync(options, cancellationTokenSource.Token);
// 2、创建订阅
// 2.1创建订阅选项
var subscribeOptions1 = new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter(f =>
{
f.WithTopic("GF01")
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);
})
.Build();
// 2.2订阅主题
var subscribeResult1 = await mqttClient.SubscribeAsync(subscribeOptions1, cancellationTokenSource.Token);
// 2.3输出订阅结果
foreach (var result in subscribeResult1.Items)
{
Console.WriteLine($"【订阅主题信息{options.ClientId}】--Subscription Result: Topic = {result.TopicFilter.Topic}, ResultCode = {result.ResultCode}");
}
//初始未连接将重新加入客户端数组中
if (mqttClient.IsConnected && !_clients.Contains(mqttClient))
{
_clients.Add(mqttClient);
}
}
catch
{
Console.WriteLine("重连失败!!");
}
Console.WriteLine($"{DateTime.Now.ToShortDateString()} 当前客户端数量:{_clients.Count},存活{_clients.Where(n => n.IsConnected == true).ToArray().Length}");
});
// 1.连接服务
await mqttClient.ConnectAsync(options, cancellationTokenSource.Token);
// 2、创建订阅
// 2.1创建订阅选项
var subscribeOptions = new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter(f =>
{
f.WithTopic("GF01")
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);
})
.Build();
// 2.2订阅主题
var subscribeResult = await mqttClient.SubscribeAsync(subscribeOptions, cancellationTokenSource.Token);
// 2.3输出订阅结果
foreach (var result in subscribeResult.Items)
{
Console.WriteLine($"【订阅主题信息{options.ClientId}】--Subscription Result: Topic = {result.TopicFilter.Topic}, ResultCode = {result.ResultCode}");
}
_clients.Add(mqttClient);
Console.WriteLine($"{DateTime.Now.ToShortDateString()} 当前客户端数量:{_clients.Count},存活{_clients.Where(n => n.IsConnected == true).ToArray().Length}");
}
}
}
}
连接信息model类
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace MqttConnectManage
{
public class MQTTServerModel
{
/// <summary>
/// 服务地址
/// </summary>
public string brokerAddress { get; set; }
/// <summary>
/// 端口号
/// </summary>
public int port { get; set; }
/// <summary>
/// 客户端ID
/// </summary>
public string clientId { get; set; }
/// <summary>
/// 登录用户名
/// </summary>
public string username { get; set; }
/// <summary>
/// 登录密码
/// </summary>
public string password { get; set; }
}
}
方法调用
using MQTTnet.Diagnostics;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace MqttConnectManage
{
/// <summary>
/// 使用MQTTnet 4.3.1.873实现多客户端连接多个服务,并且实现异常断开重连
/// </summary>
internal class Program
{
static void Main(string[] args)
{
string[] brokerUris = { "broker.emqx.io", "broker.hivemq.com", "127.0.0.1" };
//MqttClientManage clientManager = new MqttClientManage(brokerUris);
//MqttClientConnect mqttClient = new MqttClientConnect(brokerUris);
IMqttNetLogger logger = null;
MqttClientManage mqttClient = new MqttClientManage(logger);
foreach (var item in brokerUris)
{
mqttClient.AddServer(new MQTTServerModel() { brokerAddress =item, clientId =item, port =1883, username =null, password=null });
}
mqttClient.ConnectAllAsync();
Console.ReadKey();
}
}
}