C#实现数据采集系统-数据反写
实现步骤
- MQTT订阅,接收消息
- 反写内容写入通信类,添加到写入队列中 链接-消息内容处理和写入通信类队列
- 实现Modbustcp通信写入
具体实现
1.MQTT订阅,接收消息
Mqtt实现采集数据转发
Mqtt控制类增加订阅方法
- 增加一个通用的订阅方法,需要的参数是一个主题和一个委托,将主题跟对应的委托方法对应存储,然后再mqtt中订阅,收到对应的主题消息,然后执行对应的方法。
public void SubscribeTopic(string topic, Action<string, string> topicAction)
{
//订阅
}
然后需要一个键值对用于存储这个关系
private Dictionary<string, Action<string, string>> _topicActions;
订阅方法实现:订阅主题,添加到_topicActions,如果已经连接,则直接订阅,没有连接,则等待连上的时候自动订阅,增加锁来确保订阅成功
/// <summary>
/// 订阅主题,添加到_topicActions,如果已经连接,则直接订阅
/// </summary>
/// <param name="topic"></param>
/// <param name="topicAction"></param>
public void SubscribeTopic(string topic, Action<string, string> topicAction)
{
lock (_topicActionsLock)
{
if (!_topicActions.ContainsKey(topic))
{
_topicActions.Add(topic, topicAction);
if (_mqttClient.IsConnected)
{
_mqttClient.SubscribeAsync(topic);
}
}
}
}
在连接方法中,添加订阅
public void MqttConnect()
{
while (!_mqttClient.IsConnected)
{
try
{
Console.WriteLine($"正在连接……");
_mqttClient.ConnectAsync(_clientOptions).GetAwaiter().GetResult();
}
catch (Exception ex)
{
Task.Delay(1000).Wait();
Console.WriteLine("连接mqtt服务器失败");
}
}
lock (_topicActionsLock)
{
foreach (var item in _topicActions)
{
_mqttClient.SubscribeAsync(item.Key);
}
}
}
- 添加接收消息事件
//客户端接收消息事件
_mqttClient.ApplicationMessageReceivedAsync +=
MqttClient_ApplicationMessageReceivedAsync;
/// <summary>
/// 接收消息
/// </summary>
/// <param name="args"></param>
/// <returns></returns>
private async Task MqttClient_ApplicationMessageReceivedAsync(
MqttApplicationMessageReceivedEventArgs args
)
{
try
{
Console.WriteLine($"收到消息:{args.ApplicationMessage.Topic}");
if (_topicActions.ContainsKey(args.ApplicationMessage.Topic))
{
_topicActions[args.ApplicationMessage.Topic]
.Invoke(
args.ApplicationMessage.Topic,
Encoding.UTF8.GetString(args.ApplicationMessage.Payload)
);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
完整Mqtt代码
public class MqttControllor
{
private MqttConfig _config;
private string _clientId;
MqttClientOptions _clientOptions;
private IMqttClient _mqttClient;
private readonly object _topicActionsLock = new object();
private Dictionary<string, Action<string, string>> _topicActions;
public MqttControllor(MqttConfig config, bool isAutoConnect = true)
{
_topicActions = new Dictionary<string, Action<string, string>>();
_config = config;
_clientId = config.ClientId == "" ? Guid.NewGuid().ToString() : config.ClientId;
MqttClientOptionsBuilder optionsBuilder = new MqttClientOptionsBuilder()
.WithTcpServer(_config.Ip, _config.Port)
.WithCredentials(_config.Username, _config.Password)
.WithClientId(_clientId);
_clientOptions = optionsBuilder.Build();
_mqttClient = new MqttFactory().CreateMqttClient();
// 客户端连接关闭事件
_mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync;
//客户端接收消息事件
_mqttClient.ApplicationMessageReceivedAsync +=
MqttClient_ApplicationMessageReceivedAsync;
if (isAutoConnect)
{
Task.Run(() =>
{
MqttConnect();
});
}
}
/// <summary>
/// 接收消息
/// </summary>
/// <param name="args"></param>
/// <returns></returns>
private async Task MqttClient_ApplicationMessageReceivedAsync(
MqttApplicationMessageReceivedEventArgs args
)
{
try
{
Console.WriteLine($"收到消息:{args.ApplicationMessage.Topic}");
if (_topicActions.ContainsKey(args.ApplicationMessage.Topic))
{
_topicActions[args.ApplicationMessage.Topic]
.Invoke(
args.ApplicationMessage.Topic,
Encoding.UTF8.GetString(args.ApplicationMessage.Payload)
);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
private Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
Console.WriteLine($"客户端已断开与服务端的连接……");
//断开重连
_mqttClient = new MqttFactory().CreateMqttClient();
MqttConnect();
return Task.CompletedTask;
}
public void MqttConnect()
{
while (!_mqttClient.IsConnected)
{
try
{
Console.WriteLine($"正在连接……");
_mqttClient.ConnectAsync(_clientOptions).GetAwaiter().GetResult();
}
catch (Exception ex)
{
Task.Delay(1000).Wait();
Console.WriteLine("连接mqtt服务器失败");
}
}
Console.WriteLine($"客户端已连接到服务端……");
//连接成功,订阅主题
lock (_topicActionsLock)
{
foreach (var item in _topicActions)
{
_mqttClient.SubscribeAsync(item.Key);
}
}
}
/// <summary>
/// 订阅主题,添加到_topicActions,如果已经连接,则直接订阅
/// </summary>
/// <param name="topic"></param>
/// <param name="topicAction"></param>
public void SubscribeTopic(string topic, Action<string, string> topicAction)
{
lock (_topicActionsLock)
{
if (!_topicActions.ContainsKey(topic))
{
_topicActions.Add(topic, topicAction);
if (_mqttClient.IsConnected)
{
_mqttClient.SubscribeAsync(topic);
}
}
}
}
/// <summary>
/// 推送消息
/// </summary>
/// <param name="topic">主题</param>
/// <param name="data">消息内容</param>
/// <param name="qsLevel"></param>
/// <param name="retain"></param>
public void Publish(string topic, string data, int qsLevel = 0, bool retain = false)
{
qsLevel = Math.Clamp(qsLevel, 0, 2);
if (!_mqttClient.IsConnected)
{
throw new Exception("mqtt未连接");
}
var message = new MqttApplicationMessage
{
Topic = topic,
PayloadSegment = Encoding.UTF8.GetBytes(data),
QualityOfServiceLevel = (MqttQualityOfServiceLevel)qsLevel,
Retain = retain // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。
};
_mqttClient.PublishAsync(message);
}
}