监听串口,接收它们发过来的数据,进行处理。
一、概况
前不久做的一个项目,需要读取水下传感器的数据。这些传感器通过串口与外界交互。我们写了一个java程序,接收传感器传送的数据,同时也下发命令,控制部分传感器。
二、运行环境
(一)硬件环境
串口的话,一般台式机主板有1、2个串口,如果只有2台传感器,那么通过串口线与台式机连接起来,就能直接访问了。
1、串口线
2、串口服务器
但如果有许多传感器,怎么办呢?这时候需要用到串口服务器,比如8口的串口服务器,同时接8个传感器,然后串口服务器通过网线与应用服务器相连。而在应用服务器,通过串口软件,将串口服务器的8个串口映射到本机。这样,就相当于8个传感器直接连到了应用服务器一样,应用服务器上的程序就能直接访问这8个传感器了。
串口服务器连接8个传感器的接口,既可以是串口,也可以是经过转换线后接入网口。
3、应用服务器映射串口服务器的串口
(二)串口软件
1、映射串口服务器的串口到本地
前面说过,要用串口软件将串口服务器的8个串口映射到本地。要达成这个效果,
1)首先要安装串口服务器的驱动程序
2)串口服务器提供的软件,将8个串口映射到本地
以moxa为例:
3)访问串口服务器提供的WEB管理页面,对每个串口的参数,如波特率等进行设置。这种设置,主要是针对传感器,假设每款传感器固定连接到某个串口。
2、模拟数据
使用串口设备似乎有个好处,就是有一些类似串口助手之类的软件,可以模拟向指定串口发送数据,利于测试。
三、代码
我们使用JAVA来接收数据和写入指令。
1、代码结构
因为开始时我们以为只须读取数据就好了,后来才将写入指令的功能加进去,所以代码文件的名字考虑不周,显得不够规范。
代码讲解
1、开始程序
开始程序主要是执行初始化工作,获取服务器中可用的串口。串口是独占的,如果有别的进程在用,那我们的程序就用不了。需要先中止别的进程,释放资源才行。
Receiver.java
public class Receiver implements ApplicationRunner {
private List<PortReader> readerList = new ArrayList<PortReader>();//设备数据接收器,有多个,所以用数组
private List<PortEx> curPortList = new ArrayList<>();//当前活跃端口。用于检查端口变化
private PortSaver portSaver = null;//接收到的数据输出或转储处理器,只有1个,集中处理
public static BlockingQueue<DataRow> queue = new LinkedBlockingQueue<>();//接收到的数据处理队列
@Autowired
@Qualifier("redisService1")
RedisService redisService;
//相关数据库表操作类
@Autowired
PortService portService;
//以下为传感器类
@Autowired
PortCtdService ctdService;
@Autowired
PortCo2Service co2Service;
@Autowired
PortAdcpEquipService adcpEquipService;
@Autowired
PortAdcpCurrentService adcpCurrentService;
@Autowired
PortAdcpWaveService adcpWaveService;
@Autowired
PortAdcpInstrumentService adcpInstrumentService;
@Autowired
PortHydroService hydroService;
public static void main(String[] args) {
Receiver rec = new Receiver();
rec.init();
}
@Override
public void run(ApplicationArguments args) throws Exception {
System.out.println("start init reader ");
init();
}
@PreDestroy
public void exit() {
System.out.println("Reader go home when system exit");
for (PortReader reader : readerList) {
if (reader != null) {
reader.stopRead();
}
}
}
private void init() {
List<PortEx> ports = getAvailablePorts();//获取系统中可用的端口
go(ports);
portSaver = new PortSaver(queue,
redisService,
ctdService,co2Service, adcpEquipService,adcpCurrentService,adcpWaveService,adcpInstrumentService,hydroService);//将各款传感器类都传入,方便集中处理
portSaver.start();
}
private List<PortEx> getAvailablePorts() {//获取系统中可用的端口
List<PortEx> availablePorts = new ArrayList<>();
List<PortEx> ports = portService.queryByPageEx(null, PageRequest.of(0, 1000));
Enumeration portList = CommPortIdentifier.getPortIdentifiers();
while (portList.hasMoreElements()) {
CommPortIdentifier portId = (CommPortIdentifier) portList.nextElement();
if (portId.getPortType() == CommPortIdentifier.PORT_SERIAL) {
for (PortEx port : ports) {
if (portId.getName().equals(port.getPortId())) {
port.setCommPortIdentifier(portId);
availablePorts.add(port);
break;
}
}
}
}
return availablePorts;
}
private void go(List<PortEx> ports) {
readerList.clear();
curPortList.clear();
for (PortEx port : ports) {
if(!port.getSensorCategory().equals("LIGHT")){//如果是不用写入指令的设备。。。
PortReader read = new PortReader(port.getCommPortIdentifier(), port, queue);
read.setDaemon(true);//设置守护进程
read.start();
readerList.add(read);
curPortList.add(port);
}else{
PortWriter writer = new PortWriter(port.getCommPortIdentifier(), port, DeviceManager.deviceMap);
writer.setDaemon(true);
writer.start();
}
}
}
}
2、串口设备对象类
串口设备对象类,初始化串口,开启串口监听,接收设备信息流,都在这里完成。
Device.java
/**
* 串口设备对象类
* 初始化串口,开启串口监听,接收设备信息流,都在这里完成
*/
public class Device implements SerialPortEventListener {
private PortEx port = null;//自定义的串口实体,包括端口号,波特率等等
private SerialPort serialPort = null;
private CommPortIdentifier portId = null;
private InputStream inputStream;
private OutputStream outputStream;
private BlockingQueue<DataRow> queue = null;
private Map<String,String> _temps = new HashMap<>();//用于暂存接收到的字符串,积攒到完整一条记录后才填入消息队列进行处理
public Device(CommPortIdentifier Id, PortEx p, BlockingQueue<DataRow> queue) {
try {
this.port = p;
this.portId = Id;
this.queue = queue;
serialPort = (SerialPort) portId.open(port.getPortName(), 2000);
//设置波特率、数据位、停止位、检验位
serialPort.setSerialPortParams(port.getBaudRate(),
port.getDataBits(),
port.getStopBits(),
port.getParity());
//获取输入流
inputStream = serialPort.getInputStream();
outputStream = serialPort.getOutputStream();
//设置串口监听
serialPort.addEventListener(this);
//设置开启监听
serialPort.notifyOnDataAvailable(true);
System.out.println("已初始化端口:" + portId.getName());
} catch (PortInUseException e) {
System.out.println(String.format("%s正在使用!", portId.getName()));
} catch (TooManyListenersException e) {
e.printStackTrace();
} catch (UnsupportedCommOperationException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
public void open() {
if(serialPort != null) {
System.out.println(String.format("%s打开中...",portId.getName()));
} else {
System.out.println(String.format("%s打开失败",portId.getName()));
}
}
public boolean close() {
try {
serialPort.close();
} catch(Exception ex){
System.err.println(ex.getCause());
return false;
}
try {
inputStream.close();
} catch (Exception ex) {
System.err.println(ex.getCause());
return false;
}
try {
outputStream.close();
} catch (Exception ex) {
System.err.println(ex.getCause());
return false;
}
System.out.println(String.format("%s关闭...%b",port.getPortName(),portId.isCurrentlyOwned()));
return true;
}
/**
* 监听函数
*/
public void serialEvent(SerialPortEvent serialPortEvent) {
switch (serialPortEvent.getEventType()) {
//获取到有效信息
case SerialPortEvent.DATA_AVAILABLE:
on_data_available();
break;
default:
System.out.println("不知所谓");
break;
}
}
public void write(byte[] cmd) {//写入指令到设备
try {
outputStream.write(cmd);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 读取串口信息
*/
private void on_data_available() {
if (inputStream != null) {
try {
int len = inputStream.available();
byte[] readBuffer = new byte[len];
len = inputStream.read(readBuffer);
String txtData = new String(readBuffer, 0, len).trim();
read(txtData);
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void read(String txtData) {
if (txtData != null && txtData.length() > 0) {
String category = port.getSensorCategory();
String keeper = "";
if(_temps.containsKey(category)){
keeper = _temps.get(category);
}
String[] lines = txtData.split("\r");
for(int i = 0;i < lines.length;i++){
String line = lines[i];
keeper += line.trim();
if(DataUtil.iswhole(category,keeper)){
sendIt(category,keeper);//信息处理。。。
keeper = "";
}
if(i < lines.length - 1) {//本line与下一个line之间有换行符。意味着已经结束
keeper = "";
}
_temps.put(category,keeper);
}
}
}
private void sendIt(String category,String line){
try {
DataRow row = new DataRow();
//将数据写入row...
。。。
queue.put(row);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
3、串口设备管理器
读取数据的串口设备管理器,在这里操作Device对象。
PortReader.java
/**
* 读取数据的串口设备管理器
*
*/
public class PortReader extends Thread {
private CommPortIdentifier portId = null;
private PortEx port = null;
private Device device = null;
private BlockingQueue<DataRow> queue = null;
public PortReader(CommPortIdentifier id, PortEx p, BlockingQueue<DataRow> queue) {
super();
this.portId = id;
this.queue = queue;
this.port = p;
}
public void run() {
device = new Device(portId, port, queue);
device.open();
DeviceManager.deviceMap.put(port.getPortId(), device);
}
public void stopRead() {
if(device != null) device.close();
System.out.println(getName() + " stop success when reader go home");
}
public PortEx getPort(){
return this.port;
}
}
4、数据处理类
将从消息队列中接收到的设备数据进行处理。
PortSaver.java
/**
* 将读取到的设备数据输出或转储处理器
*/
public class PortSaver extends Thread {
private BlockingQueue<DataRow> queue = null;
private DataRow dataRow = null;
private RedisService redisService;
//以下为各传感器类
private PortCtdService ctdService;
private PortCo2Service co2Service;
private PortAdcpEquipService adcpEquipService;
private PortAdcpCurrentService adcpCurrentService;
private PortAdcpWaveService adcpWaveService;
private PortAdcpInstrumentService adcpInstrumentService;
private PortHydroService hydroService;
public PortSaver(BlockingQueue<DataRow> queue, RedisService redisService,
PortCtdService ctdService,
PortCo2Service co2Service,
PortAdcpEquipService adcpEquipService,
PortAdcpCurrentService adcpCurrentService,
PortAdcpWaveService adcpWaveService,
PortAdcpInstrumentService adcpInstrumentService,
PortHydroService hydroService) {
super();
this.queue = queue;
this.redisService = redisService;
this.ctdService = ctdService;
this.co2Service = co2Service;
this.adcpEquipService = adcpEquipService;
this.adcpCurrentService = adcpCurrentService;
this.adcpWaveService = adcpWaveService;
this.adcpInstrumentService = adcpInstrumentService;
this.hydroService = hydroService;
}
public void run() {
if (queue != null) {
while (true) {
if (queue.size() > 0) {
try {
dataRow = queue.take();
System.out.println(dataRow.getMessage());
String[] values = dataRow.getMessage().trim().split(",");
String category = dataRow.getCategory();//串口类型
int sensorId = dataRow.getSensorId();
Object obj = DataUtil.getEntity(category, values);
dealIt(category, sensorId, obj);
} catch (Exception ex) {
// TODO Auto-generated catch block
System.err.println(ex.getCause());
}
}
}
}
}
public void stopSave() {
System.out.println(getName() + " stop success when reader go home");
}
private Map<String,PortAdcpCurrent> adcpCurrents = new HashMap<>();
private void dealIt(String category, int sensorId, Object obj) {
if (obj == null) return;
//判断category对应的传感器类型,分别处理其数据
。。。
}
。。。
}
四、小结
我们项目中用到的传感器,发送数据有一定频率,比如每分钟发一笔数据。每笔数据有固定的格式。但是,每笔数据有可能分为好几次发送,断断续续,所以处理时要注意。