python实现ModBusTCP协议的client是一件简单的事情,只要通过pymodbus、pyModbusTCP等模块都可以实现,本文采用pymodbus。但要基于ModBusTCP服务端的业务实现特定的client,那得看看服务端是否复杂。前面系列文章,我们学习了对服务端的简单交互,便是得力于服务端简单的业务流程,本文将实现有点复杂的业务流程。
一、业务描述
我们将使用python脚本来实现一个ModBusTCP client,他能够触发设备进行拍照,然后读取拍照情况。
1、ModBusTCP服务端交互流程
大概类似如下交互时序,其中PLC将用我们的脚本代替。
2、控制
根据上述业务,服务器需要有寄存器存储客户端写入的控制位,存储顺序如下。
3、状态
根据上述业务,服务器需要有寄存器存储状态位,让客户端来读取,存储顺序如下。
4、结果
根据上述业务,服务器需要有寄存器存储结果,让客户端来读取,存储顺序如下。
5、 地址空间
(1)控制
类型:HoldingRegisters、Coils
起始地址与寄存器数量:自定义
(2)状态
类型:HoldingRegisters、DiscreteInputs、InputRegisters
起始地址与寄存器数量:自定义
(3)结果
类型:HoldingRegisters、InputRegisters
起始地址与寄存器数量:自定义
二、程序整体设计
class myModBusTCPclient(object):
def __init__(self, obj):
self.obj = obj
# 状态读取后转成二进制,分别对应TriggerReady等状态
def custom_binary(self, num, length=16, ByteSwap=0):
# 将整数转换为二进制字符串
binary_str = bin(num)[2:]
# 计算需要补充的零的个数
zeros_to_add = length - len(binary_str)
# 构造符合规则的二进制字符串
result_str = '0' * zeros_to_add + binary_str
# 翻转二进制,如01转为10,方便后续取值
result_str = result_str[::-1]
if ByteSwap==0:
return result_str
elif ByteSwap==1: # 需要字节交换时
return result_str[8:] + result_str[:8]
else:
raise ValueError("ByteSwap 的值错误!")
# 控制写之前先将TriggerEnable等二进制控制位转成数值
def custom_num(self, binary, length=16, ByteSwap=0):
assert len(binary) == length, "输入的二进制长度不正确!"
binary = binary[::-1] # 翻转二进制,如01转为10,方便后续取值
if ByteSwap==0:
return int(binary, 2)
elif ByteSwap==1: # 需要字节交换时
return int(binary[8:] + binary[:8], 2)
else:
raise ValueError("ByteSwap 的值错误!")
def ctrl(self, addrtype="HoldingRegisters", ByteSwap=0, binary="0000000000000000", address=0, slave=1):
if addrtype=="HoldingRegisters":
value = self.custom_num(binary[0:16], ByteSwap=ByteSwap)
self.obj.write_registers(address, value, slave=slave)
elif addrtype=="Coils":
...
else:
raise ValueError("ctrl_addrtype的值错误!")
def status(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=2, reg_nb=2, slave=1):
if addrtype=="HoldingRegisters":
value = self.obj.read_holding_registers(reg_addr, reg_nb, slave=slave)
value_list = [value.registers[i] for i in range(reg_nb)] # 整型列表
print("status HoldingRegisters:", value_list)
print([self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list])
return [self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list]
elif addrtype=="InputRegisters":
...
elif addrtype=="DiscreteInputs":
...
else:
raise ValueError("status_addrtype的值错误!")
def plc_out(self, addrtype="HoldingRegisters", ByteSwap=0):
if addrtype=="HoldingRegisters":
...
elif addrtype=="InputRegisters":
...
else:
raise ValueError("plc_out_addrtype的值错误!")
if __name__ == "__main__":
# Modbus TCP服务器的IP地址和端口号
server_ip = "192.168.1.196"
port = 502
station = 1
# 创建Modbus TCP客户端
MDclient = ModbusTcpClient(server_ip, port)
if MDclient.connect():
myclient = myModBusTCPclient(MDclient)
myclient.ctrl(ByteSwap=1, binary="1100000000000000")
time.sleep(1)
myclient.status(ByteSwap=1)
1、程序结构
上述代码定义了一个名为 myModBusTCPclient
的类,用于与 Modbus TCP 服务器进行通信。下面是对程序结构的分析:
构造函数 __init__
:
接收一个参数 obj
,表示 Modbus TCP 客户端对象。将这个对象存储在实例变量 self.obj
中。
custom_binary
方法:
将给定的整数 num
转换为指定长度 length
的二进制字符串。可选参数 ByteSwap
用于指定是否进行字节交换。如果 ByteSwap
为 1,则进行字节交换,否则不进行。返回构造好的二进制字符串。
custom_num
方法:
接收一个二进制字符串 binary
,根据给定的长度 length
和是否进行字节交换 ByteSwap
将其转换为整数。返回转换得到的整数。
ctrl
方法:
根据给定的地址类型 addrtype
(默认是 "HoldingRegisters")、是否进行字节交换 ByteSwap
、二进制字符串 binary
、Modbus 地址 address
和从站号 slave
,向 Modbus 服务器写入数据。如果地址类型是 "HoldingRegisters",则使用 write_registers
方法写入寄存器。
status
方法:
根据给定的地址类型 addrtype
、是否进行字节交换 ByteSwap
、寄存器地址 reg_addr
、寄存器数量 reg_nb
和从站号 slave
,从 Modbus 服务器读取数据。如果地址类型是 "HoldingRegisters",则使用 read_holding_registers
方法读取寄存器。
plc_out
方法:
根据给定的地址类型 addrtype
和是否进行字节交换 ByteSwap
,执行一些 Modbus 操作。具体操作需要根据地址类型的不同进行扩展。
if __name__ == "__main__":
部分:
在脚本独立运行时进行的操作。创建了一个 Modbus TCP 客户端对象 MDclient
。通过 myModBusTCPclient
类创建了一个自定义的客户端对象 myclient
。调用了 ctrl
方法,向 Modbus 服务器写入数据。等待了一秒钟。调用了 status
方法,从 Modbus 服务器读取数据。
2、不同地址空间的请求
在ModbusTCP中,对于不同的寄存器,请求方式是不一样的,因此需要根据服务端的设置相应更改。
为了满足业务,需要对原有的pymodbus进行封装改造。
为了满足大端序和小端序,需要加入字节交换的操作。