系统功能升级-多设备采集
数据采集系统在网络环境下,性能足够,可以实现1对多采集,需要支持多个设备进行同时采集功能,现在就开发多设备采集功能
修改多设备配置
设备配置
- 将
DeviceLink
改成List
集合的DeviceLinks
- 删掉
Points
,将点位集合放到对应的设备中去
public class DaqOption
{
//单设备
//public DeviceLink DeviceLink { get; set; }
//多设备
public List<DeviceLink> DeviceLinks { get; set; }
public MqttConfig MqttConfig { get; set; }
public ServiceConfig ServiceConfig { get; set; }
//public List<RegisterPoint> Points { get; set; }
public DaqOption() { }
}
/// <summary>
/// 设备链路类
/// </summary>
public class DeviceLink
{
public string UID { get; set; }
public string Ip { get; set; }
public int Port { get; set; }
public int SlaveId { get; set; } = 1;
/// <summary>
/// 采集周期 单位:S 200ms 0.2s
/// </summary>
public double AcqTimeSpan { get; set; }
//增加
public List<RegisterPoint> Points { get; set; }
public DeviceLink()
{
Points = new List<RegisterPoint>();
}
}
同时配置文件也跟着变化,DeviceLinks
改成数组形式,变成两个设备的配置
{
"DeviceLinks": [
{
"UID": "device01",
"Ip": "127.0.0.1",
"Port": 502,
"SlaveId": 1,
"AcqTimeSpan": 1
},
{
"UID": "device02",
"Ip": "127.0.0.1",
"Port": 503,
"SlaveId": 1,
"AcqTimeSpan": 1
}
],
"MqttConfig": {
//...
},
"ServiceConfig": {
//...
}
}
数据点位配置
在RegisterPoint
中增加设备Id的属性,用于区分,这个数据点位属于哪个设备
public class RegisterPoint
{
//...
public string DeviceId { get; set; }
}
配置加载
在DaqOptionBuilder
修改配置加载,主要就是点位的加载,多设备配置的加载,按上面的配置修改后,自动反序列化加载。
点位的加载,需要读入新加的设备Id,然后根据设备id在DeviceLinks
中查找到对应的设备,然后将点位加入到对应设备的点位集合Points
中去。
public class DaqOptionBuilder
{
//...
public DaqOption Build()
{
var str = File.ReadAllText(_path + "\\Config.json");
var option = JsonSerializer.Deserialize<DaqOption>(str);
List<RegisterPoint> points = new List<RegisterPoint>();
var lines = File.ReadAllLines(_path + "\\AllPoint.csv").ToList();
lines.RemoveAt(0);
foreach (var line in lines)
{
var values = line.Split(',');
var point = new RegisterPoint
{
UID = values[0],
Name = values[1],
Type = Type.GetType("System." + values[2]),
RegisterType = values[3],
Address = int.Parse(values[4]),
Length = int.Parse(values[5]),
DeviceId = values[6] //读入点位对应的设备ID
};
//查找对应的设备,将点位加入进去
option.DeviceLinks.FirstOrDefault(x => x.UID == point.DeviceId)?.Points.Add(point);
}
return option;
}
}
主服务修改
多设备属性修改
修改DaqService
服务类中,原来设备对象_deviceLink
和_points
,改成设备集合_deviceLinks
,然后对应的服务类中同步修改
public class DaqService
{
// 原来
// private DeviceLink _deviceLink;
// private List<RegisterPoint> _points;
//新
private List<DeviceLink> _deviceLinks;
public DaqService(DaqOption option)
{
//...
//修改
_deviceLinks = option.DeviceLinks;
//...
}
}
功能修改
在启动和定时推送中,都改成循环_deviceLinks
来实现功能
public void Start()
{
foreach (var item in _deviceLinks)
{
ModbusTcp modbusTcp = new ModbusTcp(item);
modbusTcp.ValueUpdated += ModbusTcp_ValueUpdated;
modbusTcp.DoMitor();
}
mqttControllor = new MqttControllor(_option.MqttConfig);
mqttControllor.MqttConnect();
if (_serviceConfig.IsPushScheduled)
{
_timer.Start();
}
}
private void Timer_Elapsed(object? sender, ElapsedEventArgs e)
{
foreach (var link in _deviceLinks)
{
try
{
DeviceMessage deviceMessage = new DeviceMessage() { DeviceId = link.UID };
foreach (var point in link.Points)
{
Console.WriteLine($"Point:{point.UID}-->Value:{point.Value}");
deviceMessage.Data.Add(point.UID, point.Value);
}
var str1 = JsonSerializer.Serialize(deviceMessage);
mqttControllor.Publish("Device/" + link.UID + "/Time", str1);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
在变化推送中,可以直接使用点位中DeviceId
替代原设备ID(_deviceLink.UID
)
//事件方法
private void ModbusTcp_ValueUpdated(RegisterPoint point, object value)
{
if (_serviceConfig.IsPushChanged)
{
try
{
DeviceMessage device = new DeviceMessage { DeviceId = point.DeviceId };
device.Data.Add(point.UID, value);
var data = JsonSerializer.Serialize(device);
mqttControllor.Publish($"Device/{point.DeviceId}/Update", data); //采集立刻推送
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
Console.WriteLine($"Point:{point.UID}-->Value:{value}");
}
}
完整的服务代码
public class DaqService
{
private DaqOption _option;
private List<DeviceLink> _deviceLinks;
private ServiceConfig _serviceConfig;
MqttControllor mqttControllor { get; set; }
private Timer _timer;
public DaqService(DaqOption option)
{
_option = option;
_deviceLinks = option.DeviceLinks;
_serviceConfig = option.ServiceConfig;
if (_serviceConfig.IsPushScheduled)
{
_timer = new Timer(_serviceConfig.PushTimeSpan * 1000);
_timer.Elapsed += Timer_Elapsed;
}
}
public void Start()
{
foreach (var item in _deviceLinks)
{
ModbusTcp modbusTcp = new ModbusTcp(item);
modbusTcp.ValueUpdated += ModbusTcp_ValueUpdated;
modbusTcp.DoMitor();
}
mqttControllor = new MqttControllor(_option.MqttConfig);
mqttControllor.MqttConnect();
if (_serviceConfig.IsPushScheduled)
{
_timer.Start();
}
}
private void Timer_Elapsed(object? sender, ElapsedEventArgs e)
{
foreach (var link in _deviceLinks)
{
try
{
DeviceMessage deviceMessage = new DeviceMessage() { DeviceId = link.UID };
foreach (var point in link.Points)
{
Console.WriteLine($"Point:{point.UID}-->Value:{point.Value}");
deviceMessage.Data.Add(point.UID, point.Value);
}
var str1 = JsonSerializer.Serialize(deviceMessage);
mqttControllor.Publish("Device/" + link.UID + "/Time", str1);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
//事件方法
private void ModbusTcp_ValueUpdated(RegisterPoint point, object value)
{
if (_serviceConfig.IsPushChanged)
{
try
{
DeviceMessage device = new DeviceMessage { DeviceId = point.DeviceId };
device.Data.Add(point.UID, value);
var data = JsonSerializer.Serialize(device);
mqttControllor.Publish($"Device/{point.DeviceId}/Update", data); //采集立刻推送
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
Console.WriteLine($"Point:{point.UID}-->Value:{value}");
}
}
}