背景
由于当时项目周期赶,引入了一个PLC4X组件,上手快。接下来就是使用这个组件遇到的一些问题:
- 关闭连接NioEventLoop没有释放导致oom
- 设计思想是一个设备一个连接,而不是一个网关一个连接
- 连接断开后客户端无从感知
前两个问题解决方案参考上篇文章,最后一个问题虽然可以通过isConnect()方法获取到状态,但是连接断开后这个状态并没有更新,只能代码实现失败重连。
所以为了解决以上问题,我打算重新封装一个Modbus组件。
步骤
代码如下所示,目前只分享modbus-core相关的代码。
- modbus-core:实现设备读写指令的下发以及应答。
- modbus-app:实现通用的可灵活配置的modbus设备接入层,通过更新配置信息即可快速引入新设备,无需手写代码重启应用。
为了快速实现modbus组件封装,这里引入了Vertx框架(基于事件+异步)官网链接,而不是原生的Netty框架。
引入架包
<!-- 目前我这里引入最新的版本(4.4.4) -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${vertx.version}</version>
</dependency>
工具类
ByteUtil
package com.bho.modbus.utils;
import java.nio.ByteBuffer;
public class ByteUtil {
/**
* 字节数组转字符串
* @param bytes
* @return
*/
public static String bytesToHexString(byte[] bytes) {
StringBuffer sb = new StringBuffer(bytes.length);
String sTemp;
for (int i = 0; i < bytes.length; i++) {
sTemp = Integer.toHexString(0xFF & bytes[i]);
if (sTemp.length() < 2) {
sb.append(0);
}
sb.append(sTemp.toUpperCase());
}
return sb.toString();
}
/**
* int整型转字节数组
* @param data
* @param offset
* @param len
* @return
*/
public static byte[] intToBytes(int data, int offset, int len) {
ByteBuffer buffer = ByteBuffer.allocate(4);
buffer.putInt(data);
byte[] bytes = buffer.array();
if (len - offset == 4) {
return bytes;
}
byte[] dest = new byte[len];
System.arraycopy(bytes, offset, dest, 0, len);
return dest;
}
/**
* 字节数组转int整型
* @param bytes
* @param offset
* @param len
* @return
*/
public static int bytesToInt(byte[] bytes, int offset, int len) {
ByteBuffer buffer = ByteBuffer.allocate(4);
for (int i = len; i < 4; i ++) {
buffer.put((byte) 0x00);
}
for (int i = offset; i < offset + len; i++) {
buffer.put(bytes[i]);
}
buffer.flip();
return buffer.getInt();
}
}
Crc16
package com.bho.modbus.utils;
public class Crc16 {
/**
* 获取CRC16校验码
* @param arr_buff
* @return
*/
public static byte[] getCrc16(byte[] arr_buff) {
int len = arr_buff.length;
// 预置 1 个 16 位的寄存器为十六进制FFFF, 称此寄存器为 CRC寄存器。
int crc = 0xFFFF;
int i, j;
for (i = 0; i < len; i++) {
// 把第一个 8 位二进制数据 与 16 位的 CRC寄存器的低 8 位相异或, 把结果放于 CRC寄存器
crc = ((crc & 0xFF00) | (crc & 0x00FF) ^ (arr_buff[i] & 0xFF));
for (j = 0; j < 8; j++) {
// 把 CRC 寄存器的内容右移一位( 朝低位)用 0 填补最高位, 并检查右移后的移出位
if ((crc & 0x0001) > 0) {
// 如果移出位为 1, CRC寄存器与多项式A001进行异或
crc = crc >> 1;
crc = crc ^ 0xA001;
} else
// 如果移出位为 0,再次右移一位
crc = crc >> 1;
}
}
return intToBytes(crc);
}
private static byte[] intToBytes(int value) {
byte[] src = new byte[2];
src[1] = (byte) ((value >> 8) & 0xFF);
src[0] = (byte) (value & 0xFF);
return src;
}
}
实体类
ModbusMode
目前只实现了以下两种通信方式,可根据自己需求加入其它通信方式。
package com.bho.modbus.model;
import com.bho.modbus.utils.ByteUtil;
import com.bho.modbus.utils.Crc16;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.log4j.Log4j2;
import java.nio.ByteOrder;
@Log4j2
public enum ModbusMode {
/**
* 【事务ID(2) + 协议标识(2) + 数据长度(2)】 + 从机地址(1) + 功能码(1) + 数据区(N)
*/
TCP,
/**
* 从机地址(1) + 功能码(1) + 数据区(N) + 【校验码(2)】
*
*/
RTU,
;
public ByteToMessageDecoder getDecoder() {
if (this == ModbusMode.TCP) {
return new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, 65536, 4,
2, 0, 6, true);
}
if (this == ModbusMode.RTU){
return new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, 65536, 2,
1, 2, 0, true);
}
return null;
}
public byte[] readData(byte[] bytes) {
int len = bytes.length;
if (this == ModbusMode.RTU) {
byte[] tempArr = new byte[len - 2];
System.arraycopy(bytes, 0, tempArr, 0, tempArr.length);
byte[] crc16 = Crc16.getCrc16(tempArr);
if (crc16[0] != bytes[len -2] || crc16[1] != bytes[len - 1]) {
log.error("Modbus receive illegal data:{}", ByteUtil.bytesToHexString(bytes));
return null;
}
if (log.isDebugEnabled()) {
log.debug("read data:{}", ByteUtil.bytesToHexString(tempArr));
}
return tempArr;
}
if (this == ModbusMode.TCP) {
if (log.isDebugEnabled()) {
log.debug("read data:{}", ByteUtil.bytesToHexString(bytes));
}
return bytes;
}
return null;
}
public byte[] writeData(byte[] bytes) {
if (log.isDebugEnabled()) {
log.debug("write data:{}",ByteUtil.bytesToHexString(bytes));
}
int len = bytes.length;
if (this == ModbusMode.RTU) {
byte[] crc16 = Crc16.getCrc16(bytes);
byte[] tempArr = new byte[len + 2];
System.arraycopy(bytes, 0, tempArr, 0, len);
tempArr[len] = crc16[0];
tempArr[len + 1] = crc16[1];
return tempArr;
}
if (this == ModbusMode.TCP) {
byte[] tempArr = new byte[len + 6];
tempArr[1] = 0x01;
byte[] lenBytes = ByteUtil.intToBytes(len, 2, 2);
tempArr[4] = lenBytes[0];
tempArr[5] = lenBytes[1];
System.arraycopy(bytes, 0, tempArr, 6, len);
return tempArr;
}
return null;
}
}
ModbusFunc
功能码
package com.bho.modbus.model;
/**
* Modbus常见功能码
*/
public enum ModbusFunc {
/**
* 错误代码
* 01:非法的功能码
* 02:非法的寄存器地址
* 03:非法的数据值
* 04:从机故障
*/
/**
* 请求:
* 功能代码:1字节 0x01
* 起始地址:2字节 0x0000-0xffff
* 线圈数量:2字节 0x0001-0x07d0(2000)
*
* 正确响应:
* 功能代码:1字节 0x01
* 字节数:1字节 N(读线圈个数/8,余数不为0则加1)
* 线圈状态:N字节
*
* 错误响应:
* 功能代码:1字节 0x81
* 错误代码:1字节 0x01-0x04
*/
READ_COILS((byte)0x01),//读连续线圈状态
READ_DISCRETE_COILS((byte)0x02),//读离散线圈状态 同上
/**
* 请求:
* 功能代码:1字节 0x03
* 起始地址:2字节 0x0000-0xffff
* 寄存器数量:2字节 0x0001-0x007d(125)
*
* 正确响应:
* 功能代码:1字节 0x03
* 字节数:1字节 2N(N为寄存器数量)
* 寄存器数量:2N字节
*
* 错误响应:
* 功能代码:1字节 0x83
* 错误代码:1字节 0x01-0x04
*/
READ_HOLDING_REGISTERS((byte)0x03),//读保持寄存器值
READ_INPUT_REGISTERS((byte)0x04),//读输入寄存器值 同上
/**
* 请求:
* 功能代码:1字节 0x05
* 起始地址:2字节 0x0000-0xffff
* 线圈状态:2字节 0x0000/0xff00
*
* 正确响应:
* 功能代码:1字节 0x05
* 起始地址:2字节 0x0000-0xffff
* 线圈状态:2字节 0x0000/0xff00
*
* 错误响应:
* 功能代码:1字节 0x85
* 错误代码:1字节 0x01-0x04
*/
WRITE_SINGLE_COILS((byte)0x05),//写单个线圈
/**
* 请求:
* 功能代码:1字节 0x06
* 起始地址:2字节 0x0000-0xffff
* 寄存器值:2字节 0x0000-0xffff
*
* 正确响应:
* 功能代码:1字节 0x06
* 起始地址:2字节 0x0000-0xffff
* 寄存器值:2字节 0x0000-0xffff
*
* 错误响应:
* 功能代码:1字节 0x86
* 错误代码:1字节 0x01-0x04
*/
WRITE_SINGLE_HOLDING_REGISTERS((byte)0x06),//写单个保持寄存器
/**
* 请求:
* 功能代码:1字节 0x10
* 起始地址:2字节 0x0000-0xffff
* 写入寄存器个数:2字节 0x0001-0x007b(123)
* 写入字节数:1字节 2N(N为寄存器个数)
* 寄存器值:2N字节 0x0000-0xffff
*
* 正确响应:
* 功能代码:1字节 0x10
* 起始地址:2字节 0x0000-0xffff
* 写入寄存器个数:2字节 0x0001-0x007b(123)
*
* 错误响应:
* 功能代码:1字节 0x90
* 错误代码:1字节 0x01-0x04
*/
WRITE_MULTI_HOLDING_REGISTERS((byte)0x10),//写多个保持寄存器
/**
* 请求:
* 功能代码:1字节 0x0F
* 起始地址:2字节 0x0000-0xffff
* 写入线圈个数:2字节 0x0001-0x07b0(1968)
* 写入字节数:1字节 N(N为线圈个数/8,余数不为0则加1)
* 线圈状态:N字节
*
* 正确响应:
* 功能代码:1字节 0x0F
* 起始地址:2字节 0x0000-0xffff
* 写入线圈个数:2字节 0x0001-0x07b0(1968)
*
* 错误响应:
* 功能代码:1字节 0x8F
* 错误代码:1字节 0x01-0x04
*/
WRITE_MULTI_COILS((byte)0x0F),//写多个线圈
;
private byte func;
ModbusFunc(byte func) {
this.func = func;
}
public byte getFunc() {
return func;
}
}
ModbusParamConfig
下发指令参数配置信息
package com.bho.modbus.model;
import lombok.Data;
@Data
public class ModbusParamConfig {
private RegisterType registerType;//寄存器类型
private int registerAddress;//寄存器地址
private String name;//指标名称
private DataType dataType;//指标数据类型
private int numberSplit;//(除)倍数
public enum RegisterType {
COIL,
HOLDING_REGISTER,
INPUT_REGISTER;
}
public enum DataType {
BOOL,
FLOAT,
INT;
}
}
SendCmdTask
下发指令任务
package com.bho.modbus.model;
import com.alibaba.fastjson.JSONObject;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import lombok.Data;
import java.util.List;
@Data
public class SendCmdTask {
private List<ModbusParamConfig> paramConfigs;//参数列表
private JSONObject reqParam;//请求参数 写数据必填
private Boolean isWrite;//是否是写数据
private Integer slaveId;//从机ID
private Integer reqTimeout;//请求超时时间(秒)
private Promise<JSONObject> promise;
private Long timerId;
public SendCmdTask(Vertx vertx, List<ModbusParamConfig> paramConfigs, JSONObject reqParam, Boolean isWrite, Integer slaveId, Integer reqTimeout) {
this.paramConfigs = paramConfigs;
this.reqParam = reqParam;
this.isWrite = isWrite;
this.slaveId = slaveId;
this.reqTimeout = Math.max(reqTimeout, 5);
Promise<JSONObject> promise = Promise.promise();
this.promise = promise;
this.timerId = vertx.setTimer(reqTimeout * 1000, hh -> promise.tryFail("Request timeout"));
}
}
核心类
package com.bho.modbus.core;
import com.alibaba.fastjson.JSONObject;
import com.bho.modbus.model.SendCmdTask;
import com.bho.modbus.model.ModbusFunc;
import com.bho.modbus.model.ModbusMode;
import com.bho.modbus.model.ModbusParamConfig;
import com.bho.modbus.utils.ByteUtil;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.NetSocketImpl;
import lombok.extern.log4j.Log4j2;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@Log4j2
public class ModbusConnection {
private String ip;//从机IP
private Integer port;//从机端口
private AtomicBoolean isAlive;//从机是否在线
private ModbusMode mode;//通讯模式
private NetSocket netSocket;//客户端连接
private boolean isInitiativeClose;//是否是主动关闭连接
private Long failRetryTimerId;//失败重试定时器ID
private Integer failRetryIntervalSecond;//连接断开后重连间隔时间
private Integer reqTimeoutSecond = 1;//请求超时时间
private Long queueTimerId;//队列定时器
private ConcurrentLinkedQueue<SendCmdTask> writeQueue;//写队列 优先写
private ConcurrentLinkedQueue<SendCmdTask> readQueue;//读队列
private Map<String, Promise<byte[]>> promiseMap;
private Vertx vertx;
public ModbusConnection(Vertx vertx, String ip, Integer port, Integer failRetryIntervalSecond, ModbusMode mode) {
this.vertx = vertx;
this.ip = ip;
this.port = port;
this.failRetryIntervalSecond = failRetryIntervalSecond;
this.mode = mode;
this.isAlive = new AtomicBoolean(false);
this.writeQueue = new ConcurrentLinkedQueue<>();
this.readQueue = new ConcurrentLinkedQueue<>();
this.promiseMap = new ConcurrentHashMap<>();
consumerTaskQueue(true);
}
/**
* 建立连接
* @return
*/
public Future<Boolean> connect(){
NetClient netClient = vertx.createNetClient();
return vertx.executeBlocking(b -> {
netClient.connect(port, ip)
.onSuccess(socket -> {
log.info("Modbus connect success, ip:{}, port:{}", ip, port);
netSocket = socket;
isAlive.set(true);
b.tryComplete(true);
NetSocketImpl netSocketImpl = (NetSocketImpl) socket;
netSocketImpl.channelHandlerContext().pipeline().addFirst(mode.getDecoder());
socket.handler(buf -> {
byte[] bytes = mode.readData(buf.getBytes());
if (bytes == null) {
return;
}
int slaveId = ByteUtil.bytesToInt(bytes, 0, 1);
int funcNo = ByteUtil.bytesToInt(bytes, 1, 1);
int errFuncNo = funcNo - 128;
String key = String.format("%s_%s", slaveId, funcNo);
String errKey = String.format("%s_%s", slaveId, errFuncNo);
if (promiseMap.containsKey(key)) {
Promise<byte[]> promise = promiseMap.get(key);
byte[] content = new byte[bytes.length - 2];
System.arraycopy(bytes, 2, content, 0, content.length);
promise.tryComplete(content);
} else if (promiseMap.containsKey(errKey)) {
Promise<byte[]> promise = promiseMap.get(errKey);
int data = ByteUtil.bytesToInt(bytes, 2, 1);
switch (data) {
case 1:
promise.tryFail("Illegal function code");
break;
case 2:
promise.tryFail("Illegal register address");
break;
case 3:
promise.tryFail("Illegal data value");
break;
case 4:
promise.tryFail("Slave fault");
break;
}
}
});
socket.closeHandler(h -> {
if (!isInitiativeClose) {
log.error("Modbus connect close, ip:{}, port:{}", ip, port);
failRetryTimerId = vertx.setTimer(failRetryIntervalSecond * 1000, hh -> connect());
} else {
log.info("Modbus connect close, ip:{}, port:{}", ip, port);
}
});
}).onFailure(err -> {
log.error("Modbus connect fail, ip:{}, port:{}, msg:{}", ip, port, err.getMessage());
isAlive.set(false);
b.fail(err.getMessage());
failRetryTimerId = vertx.setTimer(failRetryIntervalSecond * 1000, h -> connect());
});
});
}
/**
* 是否在线
* @return
*/
public boolean isActive() {
return isAlive.get();
}
/**
* 断开连接
*/
public void close() {
isInitiativeClose = true;
if (failRetryTimerId != null) {
vertx.cancelTimer(failRetryTimerId);
}
if (queueTimerId != null) {
vertx.cancelTimer(queueTimerId);
}
if (netSocket != null) {
netSocket.close();
}
}
/**
* 下发读写任务(串行 优先写任务)
* 若并行可直接调用executeTask执行任务,无需排队等候一个个消费任务
* @param task 读写任务
* @return
*/
public Promise<JSONObject> offerTask(SendCmdTask task) {
if (task.getIsWrite()) {
writeQueue.offer(task);
} else {
readQueue.offer(task);
}
return task.getPromise();
}
/**
* 消费任务队列 500毫秒轮询一次 优先消费写任务
* @param delayFlag
*/
private void consumerTaskQueue(boolean delayFlag){
if(delayFlag){
queueTimerId = vertx.setTimer(500,id->{
consumerTaskQueue(false);
});
return;
}
if(writeQueue.isEmpty() && readQueue.isEmpty()){
consumerTaskQueue(true);
return;
}
if(!writeQueue.isEmpty()){
SendCmdTask sendCmdTask = writeQueue.poll();
sendCmdTask.getPromise().future().onComplete(h->{
consumerTaskQueue(false);
});
executeTask(sendCmdTask);
return;
}
if(!readQueue.isEmpty()){
SendCmdTask sendCmdTask = readQueue.poll();
sendCmdTask.getPromise().future().onComplete(h->{
consumerTaskQueue(false);
});
executeTask(sendCmdTask);
}
}
private Future<Void> executeTask(SendCmdTask sendCmdTask){
vertx.cancelTimer(sendCmdTask.getTimerId());
Future<JSONObject> future;
if (sendCmdTask.getIsWrite()) {
future = executeWrite(sendCmdTask.getReqParam(), sendCmdTask.getParamConfigs(), sendCmdTask.getSlaveId());
} else {
future = executeQuery(sendCmdTask.getParamConfigs(), sendCmdTask.getSlaveId());
}
return future.onSuccess(res -> sendCmdTask.getPromise().tryComplete(res))
.onFailure(err -> sendCmdTask.getPromise().tryFail(err)).map(o -> null);
}
/**
* 写数据
* @param reqParam 下发参数
* @param paramConfigs 参数配置列表
* @param slaveId 从机ID
* @return
*/
private Future<JSONObject> executeWrite(JSONObject reqParam, List<ModbusParamConfig> paramConfigs, Integer slaveId) {
if (!isActive()) {
return Future.failedFuture("Gateway offline");
}
boolean isMerge = isMergeSendCmd(paramConfigs);
if (isMerge) {
int registerAddress = paramConfigs.get(0).getRegisterAddress();
ModbusParamConfig.RegisterType registerType = paramConfigs.get(0).getRegisterType();
Promise<byte[]> promise = Promise.promise();
List<String> keyList = paramConfigs.stream().map(ModbusParamConfig::getName).collect(Collectors.toList());
return vertx.executeBlocking(h -> {
Buffer buffer = getWriteCmd(registerAddress, slaveId, reqParam, keyList, registerType, promise);
netSocket.write(buffer);
promise.future().onSuccess(buf -> {
h.complete(reqParam);
}).onFailure(err -> {
log.error("Modbus executeWrite fail, ip:{}, port:{}, slaveId:{}, msg:{}", ip, port, slaveId, err.getMessage());
h.tryFail(err.getMessage());
});
});
}
List<Future<Object>> futures = new ArrayList<>();
Future blockingFuture = Future.succeededFuture();
for (int i = 0; i < paramConfigs.size(); i++) {
ModbusParamConfig paramConfig = paramConfigs.get(i);
ModbusParamConfig.RegisterType registerType = paramConfig.getRegisterType();
Promise<byte[]> promise = Promise.promise();
blockingFuture = blockingFuture.compose(suc -> singleExecuteWrite(slaveId, reqParam, promise, registerType, paramConfig),
err -> singleExecuteWrite(slaveId, reqParam, promise, registerType, paramConfig));
futures.add(blockingFuture);
}
return commonReplyResult(futures, paramConfigs);
}
private Future<Object> singleExecuteWrite(int slaveId, JSONObject reqParam, Promise<byte[]> promise, ModbusParamConfig.RegisterType registerType, ModbusParamConfig paramConfig) {
return vertx.executeBlocking(h -> {
Buffer buffer = getWriteCmd(paramConfig.getRegisterAddress(), slaveId, reqParam, Arrays.asList(paramConfig.getName()), registerType, promise);
netSocket.write(buffer);
promise.future().onSuccess(buf -> {
h.tryComplete(reqParam.get(paramConfig.getName()));
}).onFailure(err -> {
log.error("Modbus executeWrite fail, ip:{}, port:{}, slaveId:{}, key:{}, msg:{}",
ip, port, slaveId, paramConfig.getName(), err.getMessage());
h.tryFail(err.getMessage());
});
});
}
/**
* 读数据
* @param paramConfigs 参数配置列表
* @param slaveId 从机ID
* @return
*/
private Future<JSONObject> executeQuery(List<ModbusParamConfig> paramConfigs, Integer slaveId) {
if (!isActive()) {
return Future.failedFuture("Gateway offline");
}
boolean isMerge = isMergeSendCmd(paramConfigs);
if (isMerge) {
int registerAddress = paramConfigs.get(0).getRegisterAddress();
ModbusParamConfig.RegisterType registerType = paramConfigs.get(0).getRegisterType();
int num = paramConfigs.size();
Promise<byte[]> promise = Promise.promise();
Buffer buffer = getQueryCmd(registerAddress, num, slaveId, registerType, promise);
return vertx.executeBlocking(h -> {
netSocket.write(buffer);
promise.future().onSuccess(buf -> {
JSONObject jsonObject = new JSONObject();
for (int i = 0; i < paramConfigs.size(); i++) {
ModbusParamConfig paramConfig = paramConfigs.get(i);
switch (registerType) {
case COIL:
Integer pow = Double.valueOf(Math.pow(2, i % 8)).intValue();
jsonObject.put(paramConfig.getName(), (pow & buf[i / 8 + 1]) == pow);
break;
case INPUT_REGISTER:
case HOLDING_REGISTER:
jsonObject.put(paramConfig.getName(), getValue(ByteUtil.bytesToInt(buf, i * 2 + 1, 2), paramConfig.getNumberSplit(), paramConfig.getDataType()));
break;
}
}
h.complete(jsonObject);
}).onFailure(err -> {
log.error("Modbus executeQuery fail, ip:{}, port:{}, slaveId:{}, msg:{}", ip, port, slaveId, err.getMessage());
h.tryFail(err.getMessage());
});
});
}
List<Future<Object>> futures = new ArrayList<>();
Future blockingFuture = Future.succeededFuture();
for (int i = 0; i < paramConfigs.size(); i++) {
ModbusParamConfig paramConfig = paramConfigs.get(i);
ModbusParamConfig.RegisterType registerType = paramConfig.getRegisterType();
Promise<byte[]> promise = Promise.promise();
blockingFuture = blockingFuture.compose(suc -> singleExecuteQuery(slaveId, promise, registerType, paramConfig),
err -> singleExecuteQuery(slaveId, promise, registerType, paramConfig));
futures.add(blockingFuture);
}
return commonReplyResult(futures, paramConfigs);
}
private Future<Object> singleExecuteQuery(int slaveId, Promise<byte[]> promise, ModbusParamConfig.RegisterType registerType, ModbusParamConfig paramConfig) {
return vertx.executeBlocking(h -> {
Buffer buffer = getQueryCmd(paramConfig.getRegisterAddress(), 1, slaveId, paramConfig.getRegisterType(), promise);
netSocket.write(buffer);
promise.future().onSuccess(buf -> {
switch (registerType) {
case COIL:
h.complete(Integer.valueOf(buf[1]) == 1);
break;
case INPUT_REGISTER:
case HOLDING_REGISTER:
h.complete(getValue(ByteUtil.bytesToInt(buf, 1, 2), paramConfig.getNumberSplit(), paramConfig.getDataType()));
break;
}
}).onFailure(err -> {
log.error("Modbus executeQuery fail, ip:{}, port:{}, slaveId:{}, key:{}, msg:{}",
ip, port, slaveId, paramConfig.getName(), err.getMessage());
h.tryFail(err.getMessage());
});
});
}
/**
* 如果所有参数寄存器类型一致并且地址连续 则合并成一条命令下发
* @param paramConfigs
* @return 是否可以合并下发命令
*/
private boolean isMergeSendCmd(List<ModbusParamConfig> paramConfigs) {
if (paramConfigs.size() == 1) {
return false;
}
int lastPos = paramConfigs.get(0).getRegisterAddress();
ModbusParamConfig.RegisterType registerType = paramConfigs.get(0).getRegisterType();
for (int i = 1; i < paramConfigs.size(); i++) {
int curPos = paramConfigs.get(i).getRegisterAddress();
if (curPos - lastPos != 1) {
return false;
}
ModbusParamConfig.RegisterType curRegisterType = paramConfigs.get(i).getRegisterType();
if (registerType != curRegisterType) {
return false;
}
lastPos = curPos;
}
return true;
}
/**
* 获取查询数据命令
* @param startPos 查询地址
* @param num 查询数量
* @param slaveId 从机ID
* @param registerType 寄存器类型
* @param promise
* @return
*/
private Buffer getQueryCmd(int startPos, int num, int slaveId, ModbusParamConfig.RegisterType registerType, Promise<byte[]> promise) {
byte[] bytes = new byte[6];
bytes[0] = ByteUtil.intToBytes(slaveId, 3, 1)[0];
switch (registerType) {
case COIL:
bytes[1] = ModbusFunc.READ_COILS.getFunc();
break;
case HOLDING_REGISTER:
bytes[1] = ModbusFunc.READ_HOLDING_REGISTERS.getFunc();
break;
case INPUT_REGISTER:
bytes[1] = ModbusFunc.READ_INPUT_REGISTERS.getFunc();
break;
}
Integer func = ByteUtil.bytesToInt(bytes, 1, 1);
String key = String.format("%s_%s", slaveId, func);
byte[] startPosBytes = ByteUtil.intToBytes(startPos, 0, 4);
bytes[2] = startPosBytes[2];
bytes[3] = startPosBytes[3];
byte[] numBytes = ByteUtil.intToBytes(num, 0, 4);
bytes[4] = numBytes[2];
bytes[5] = numBytes[3];
Buffer buffer = new BufferImpl();
buffer.appendBytes(mode.writeData(bytes));
promiseMap.put(key, promise);
long timeId = vertx.setTimer(reqTimeoutSecond * 1000, h -> promise.tryFail("Request timeout"));
promise.future().onComplete(res -> {
promiseMap.remove(key);
vertx.cancelTimer(timeId);
});
return buffer;
}
/**
* 获取写数据命令
* @param startPos 查询地址
* @param slaveId 从机ID
* @param reqParam 写参数
* @param keys 参数列表
* @param registerType 寄存器类型
* @param promise
* @return
*/
private Buffer getWriteCmd(int startPos, int slaveId, JSONObject reqParam,
List<String> keys, ModbusParamConfig.RegisterType registerType, Promise<byte[]> promise) {
int len = keys.size() == 1 ? 6 : (registerType == ModbusParamConfig.RegisterType.HOLDING_REGISTER ?
7 + keys.size() * 2 : 7 + Double.valueOf(Math.ceil(keys.size() / 8.0)).intValue());
byte[] bytes = new byte[len];
bytes[0] = ByteUtil.intToBytes(slaveId, 3, 1)[0];
byte[] startPosBytes = ByteUtil.intToBytes(startPos, 0, 4);
bytes[2] = startPosBytes[2];
bytes[3] = startPosBytes[3];
if (keys.size() == 1) {
switch (registerType) {
case COIL:
bytes[1] = ModbusFunc.WRITE_SINGLE_COILS.getFunc();
boolean value = reqParam.getBoolean(keys.get(0));
if (value) {
bytes[4] = (byte) 0xFF;
} else {
bytes[4] = 0x00;
}
bytes[5] = 0x00;
break;
case HOLDING_REGISTER:
bytes[1] = ModbusFunc.WRITE_SINGLE_HOLDING_REGISTERS.getFunc();
byte[] dataArr = ByteUtil.intToBytes(reqParam.getInteger(keys.get(0)), 2, 2);
bytes[4] = dataArr[0];
bytes[5] = dataArr[1];
break;
}
} else {
byte[] dataNum = ByteUtil.intToBytes(keys.size(), 2, 2);
bytes[4] = dataNum[0];
bytes[5] = dataNum[1];
switch (registerType) {
case COIL:
bytes[1] = ModbusFunc.WRITE_MULTI_COILS.getFunc();
int dataSize = Double.valueOf(Math.ceil(keys.size() / 8.0)).intValue();
bytes[6] = ByteUtil.intToBytes(dataSize, 3, 1)[0];
for (int i = 0; i < dataSize; i += 2) {
int sum = 0;
int startIndex = i * 8;
int endIndex = (i + 2) * 8;
endIndex = endIndex > keys.size() ? keys.size() : endIndex;
for (int j = startIndex; j < endIndex; j++) {
sum += Double.valueOf(Math.pow(2, j)).intValue() * (reqParam.getBoolean(keys.get(j)) ? 1 : 0);
}
byte[] sumArr = ByteUtil.intToBytes(sum, 2, 2);
if (i + 8 < keys.size()) {
bytes[i + 7] = sumArr[0];
bytes[i + 8] = sumArr[1];
} else {
bytes[i + 7] = sumArr[1];
}
}
break;
case HOLDING_REGISTER:
bytes[1] = ModbusFunc.WRITE_MULTI_HOLDING_REGISTERS.getFunc();
bytes[6] = ByteUtil.intToBytes(keys.size() * 2, 3, 1)[0];
for (int i = 0; i < keys.size(); i++) {
String paramKey = keys.get(i);
Integer value = reqParam.getInteger(paramKey);
byte[] dataArr = ByteUtil.intToBytes(value, 2, 2);
bytes[i * 2 + 7] = dataArr[0];
bytes[i * 2 + 8] = dataArr[1];
}
break;
}
}
Integer func = ByteUtil.bytesToInt(bytes, 1, 1);
String key = String.format("%s_%s", slaveId, func);
Buffer buffer = new BufferImpl();
buffer.appendBytes(mode.writeData(bytes));
promiseMap.put(key, promise);
long timeId = vertx.setTimer(reqTimeoutSecond * 1000, h -> promise.tryFail("Request timeout"));
promise.future().onComplete(res -> {
promiseMap.remove(key);
vertx.cancelTimer(timeId);
});
return buffer;
}
private Future<JSONObject> commonReplyResult(List<Future<Object >> futures, List<ModbusParamConfig> paramConfigs) {
return vertx.executeBlocking(b -> {
Future.join(futures).onComplete(h -> {
JSONObject okJson = new JSONObject();
JSONObject errJson = new JSONObject();
for (int i = 0; i < paramConfigs.size(); i++) {
ModbusParamConfig paramConfig = paramConfigs.get(i);
Future<Object> objectFuture = futures.get(i);
if (objectFuture.succeeded()) {
okJson.put(paramConfig.getName(), objectFuture.result());
} else {
errJson.put(paramConfig.getName(), objectFuture.cause().getMessage());
}
}
if (okJson.size() > 0) {
b.tryComplete(okJson);
} else {
b.tryFail(errJson.getString(paramConfigs.get(0).getName()));
}
});
});
}
private Object getValue(int value, int numberSplit, ModbusParamConfig.DataType dataType) {
if (numberSplit == 1) {
return value;
}
Float temp = value * 1f / numberSplit;
switch (dataType) {
case INT :
return Math.round(temp);
case FLOAT:
return temp;
}
return temp;
}
}
测试
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.bho.modbus.model.ModbusMode;
import com.bho.modbus.core.ModbusConnection;
import com.bho.modbus.model.ModbusParamConfig;
import com.bho.modbus.model.SendCmdTask;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import lombok.extern.log4j.Log4j2;
import java.util.List;
@Log4j2
public class TestModbus {
public static final String READ_DATA = "[" +
" {" +
" \"name\": \"a\"," +
" \"registerType\": \"HOLDING_REGISTER\"," +
" \"registerAddress\": 504," +
" \"dataType\": \"FLOAT\"," +
" \"numberSplit\": 10" +
" }," +
" {" +
" \"name\": \"b\"," +
" \"registerType\": \"HOLDING_REGISTER\"," +
" \"registerAddress\": 505," +
" \"dataType\": \"FLOAT\"," +
" \"numberSplit\": 10" +
" }," +
" {" +
" \"name\": \"c\"," +
" \"registerType\": \"HOLDING_REGISTER\"," +
" \"registerAddress\": 506," +
" \"dataType\": \"FLOAT\"," +
" \"numberSplit\": 10" +
" }," +
" {" +
" \"name\": \"d\"," +
" \"registerType\": \"HOLDING_REGISTER\"," +
" \"registerAddress\": 507," +
" \"dataType\": \"INT\"," +
" \"numberSplit\": 1" +
" }," +
" {" +
" \"name\": \"e\"," +
" \"registerType\": \"HOLDING_REGISTER\"," +
" \"registerAddress\": 508," +
" \"dataType\": \"INT\"," +
" \"numberSplit\": 1" +
" }]";
private static final String WRITE_DATA = "[" +
" {" +
" \"name\": \"do0\"," +
" \"registerType\": \"COIL\"," +
" \"registerAddress\": 20," +
" \"dataType\": \"BOOL\"," +
" \"numberSplit\": 1" +
" }" +
" ,{" +
" \"name\": \"do1\"," +
" \"registerType\": \"COIL\"," +
" \"registerAddress\": 21," +
" \"dataType\": \"BOOL\"," +
" \"numberSplit\": 1" +
" }" +
"]";
public static void main(String[] args) {
testReadData();
// testWriteData();;
}
private static void testWriteData() {
Vertx vertx = Vertx.vertx();
ModbusConnection connection = new ModbusConnection(vertx,"127.0.0.1", 502, 30, ModbusMode.TCP);
Future<Boolean> connectFuture = connection.connect();
JSONObject reqParam = new JSONObject();
reqParam.put("do0", false);
reqParam.put("do1", false);
List<ModbusParamConfig> modbusParamConfigs = JSONArray.parseArray(WRITE_DATA, ModbusParamConfig.class);
connectFuture.onComplete(con -> {
if (connectFuture.succeeded()) {
SendCmdTask task = new SendCmdTask(vertx, modbusParamConfigs, null, false, 21, 10);
Promise<JSONObject> promise = connection.offerTask(task);
promise.future().onSuccess(suc -> {
log.info("read:"+suc);
}).onFailure(err -> System.err.println(err.getMessage()));
SendCmdTask task2 = new SendCmdTask(vertx, modbusParamConfigs, reqParam, true, 21, 10);
Promise<JSONObject> promise2 = connection.offerTask(task2);
promise2.future().onSuccess(suc -> {
log.info("write:"+suc);
}).onFailure(err -> System.err.println(err.getMessage()));
} else {
System.err.println("gateway offline");
}
});
}
private static void testReadData() {
Vertx vertx = Vertx.vertx();
ModbusConnection connection = new ModbusConnection(vertx,"127.0.0.1", 502, 30, ModbusMode.TCP);
Future<Boolean> connectFuture = connection.connect();
List<ModbusParamConfig> modbusParamConfigs = JSONArray.parseArray(READ_DATA, ModbusParamConfig.class);
connectFuture.onComplete(con -> {
if (connection.isActive()) {
SendCmdTask task = new SendCmdTask(vertx, modbusParamConfigs, null, false, 2, 10);
Promise<JSONObject> promise = connection.offerTask(task);
promise.future().onSuccess(suc -> {
log.info(suc);
}).onFailure(err -> System.err.println(err.getMessage()));
} else {
System.err.println("gateway offline");
}
});
}
}
运行结果如下:
其实这两个读写示例如果是一个网关可以共用一个Modbus连接。
modbus-app配置参数
格式如下:
{
"readable": {
"devType01": {
"ReportData": [
{
"name" : "xxx",
"registerType" : "COIL",
"registerAddress" : 1,
"dataType" : "BOOL",
"numberSplit" : 1
}
]
},
"devType02": {
"ReportData": [
{
"name" : "a",
"registerType" : "HOLDING_REGISTER",
"registerAddress" : 1,
"dataType" : "INT",
"numberSplit" : 1
},
{
"name" : "b",
"registerType" : "HOLDING_REGISTER",
"registerAddress" : 2,
"dataType" : "INT",
"numberSplit" : 10
},
{
"name": "c",
"registerType": "",
"dataType": "FLOAT",
"mbScript": "(a*10000+b)/10"
}
]
}
},
"writable": {
"devType01": {
"Control": [
{
"name": "operation",
"registerType": "COIL",
"registerAddress": 21,
"dataType": "BOOL",
"numberSplit": 1
}
]
}
},
"readDataPeriods": [
{
"period" : 60,
"deviceTypes": ["devType01"]
},
{
"period" : 600,
"deviceTypes": ["devType02","devType03"]
}
]
}
具体怎么实现这边就不过多讲解了…
结束
不保证代码正确,我这边只是大概实现了一下,仅供参考。若有问题,请批评指出,我会虚心接受并积极修复问题。