本文讲解了如何jxTMS的数据采集与处理框架并介绍了如何用来采集数据,整个系列的文章请查看:docker版jxTMS使用指南:4.4版升级内容
docker版本的使用,请查看:docker版jxTMS使用指南
4.0版jxTMS的说明,请查看:4.0版升级内容
4.2版jxTMS的说明,请查看:4.2版升级内容
使用jxTMS搭建一个数据采集系统,相对来说较为简单,因为jxTMS已经内置了一个完整的数据采集处理读取的框架:
所演示的框架图默认是现场配置有数据采集器,采集了数据后通过MQTT发送到MQTT服务器,然后数据采集系统通过MQTT来收取数据。
注:现场数据的读取与推送和现场设备以及数据采集器、DTU等密切相关,所以不放入本框架中进行讨论
上述框架前面已经讲解过,所以本文只做总体的、针对性的解说,大约分为三篇文章。
站点与站点数据的收取
站点主要是用于业务管理上的概念,如开通、关停、加载并配置下属设备等,同时提供对设备进行管理与操作的接口。其在数据采集与处理方面主要有两个功能:
-
统一提供本站现场采集到的设备数据的传送方式
-
对下属各设备处理完毕的数据根据业务需要进行综合处理,如电力监控,就是从下属的配电系统、柴油发电机、储能系统等抽取最核心的监控指标综合在一起进行保存并提供给外界的监控仪表盘等
目前,现场与本站之间多采取MQTT协议进行数据的传输,简单、方便,同时还可提供一定的QoS,关键是支持MQTT的DTU非常多,便于降低现场采集系统的建设成本。
目前,jxTMS默认以站点名为主题来接收站点数据【相关工作在main.py中完成,也就是说,其实这一点不属于数据采集处理框架】,所以通过MQTT收取数据非常简单,就是以主题名来查找站点,找到后将收到的字节流的消息原封不动的递交给该站点。
站点接收到数据后,需要根据本站现场的数据采集器的打包方式,针对性的对字节流做初步的处理以从字节流中提取出设备标识,然后将处理后的数据递交给相应的设备。
目前,有三种站点类型已经预置了相应的初步处理方式:
-
site类型,最基本的站点类,是单设备无采集器直接MQTT推送型站点。site只是将字节流以utf-8进行编码为字符串后,直接递交给自己下属的设备【就一台】即可
-
site_multiDev_push类型,是多设备MQTT推送型站点。该类型默认接收到某设备数据后即将其转换为json格式,并添加dn【设备名】后立即通过MQTT推送
-
site_packet类型,是多设备自定义协议包MQTT推送型站点。该类型默认接收到某设备数据后即将其视为设备类型为名的数据集,然后打包后立即通过MQTT推送。所以将字节流以自定义协议包进行解码,就可得到相应设备的dict形式的数据
注:现场的数据采集器可以定义诸如stateRep【状态报告】之类的系统用数据包来发送其它类型的数据,这些数据一般由站点做针对性处理
在之前的【站点的调整】一文中,我们还介绍有site_multiDev_poll型站点,该类型的站点主要是用于现场对modbus之类的设备进行设备采集时使用,即site_multiDev_poll目前主要考虑用于现场的数据采集器的部署使用,不归入后端数据框架中讨论。
site_multiDev_push代码:
class site_multiDev_push(site):
def __init__(self, type, name, saveDataInterval=15):
super(site_multiDev_push,self).__init__(type,name,saveDataInterval=saveDataInterval)
def receive(self, bsMsg):
s = str(bsMsg,"utf8")
js = json.loads(s)
dn = js.get('dn')
if not dn is None:
d = self.getDev(dn)
if not d is None:
rd = d.receive(js)
if not rd is None:
#默认回调了站点的receiveData函数来完成站点的数据综合与处理
self.receiveData(dn,rd)
而某站点的代码为:
from jx.site import site
class site_xmk(site):
def __init__(self, name):
super(site_xmk,self).__init__('site_xmk',name)
def addDevice(self, d):
#站点用设备的类型做识别关键字
self._allDev[d.type()] = d
def receive(self, bsMsg):
#此站点的设备数据都是ascii码文本
s = str(bsMsg,"utf8")
try:
#波流仪的数据以$PSVSW为识别冠字
offset = s.index('$PSVSW')
d = self._allDev.get('svs_original')
except:
#此站点就两种类型的设备,不是波流仪就是气象仪
d = self._allDev.get('WSx_XDR_original')
#此站点的数据采集模式是两设备直接通过串口周期性自动发送数据
#DTU直接将串口数据透传给MQTT,因此直接递交给相应设备即可
d.receive(s)
#下面三行代码是注册本类型的站点
def _newSite_xmk(name):
return site_xmk(name)
site.register('site_xmk',_newSite_xmk)
站点本身也提供了站点数据的处理、保存:
#如果需要保存数据,则返回保存站点数据的数据对象
def newOrmData(self):
#返回None则不再执行站点数据的保存动作
return None
#本站点的数据综合处理函数,下属社会收到并处理完自己的数据后调用站点的receiveData函数回送给站点
def onReceive(self, dn, data):
#onReceive即用来完成站点本身的数据处理动作,默认不做任何处理
return data,True
#接收下属设备回送的数据,然后完成一系列相关动作
def receiveData(self, dn, data):
#self._data和data并不一致,data有可能是事件通知、有可能是命令等等
rd,self._newData = self.onReceive(dn,data)
if (not self._ormData is None) and self._newData:
self._lock.acquire()
try:
for k in rd:
#将数据保存到自己的_data中,_saveData会自动进行保存
self._data[k] = rd[k]
finally:
self._lock.release()
if self._scheduler_saveData is None:
#初始化时如果指定了saveDataInterval>0,则为周期性保存,否则就实时保存
self._saveData()
参考资料:
jxTMS设计思想
jxTMS编程手册
下面的系列文章讲述了如何用jxTMS开发一个实用的业务功能:
如何用jxTMS开发一个功能
下面的系列文章讲述了jxTMS的一些基本开发能力:
jxTMS的HelloWorld