用Python采用Modbus-Tcp的方式读取PLC模块数据

news2024/11/26 14:25:36

使用计算器得到需要的寄存器地址

这里PLC地址是83,对应的程序16进制读取地址是53

实际上由于PLC地址从1开始,所以这里实际地址应该是52,因为计算机从0开始

在这里插入图片描述

使用网络调试助手生成报文

在这里插入图片描述

使用Python中的内置函数int()。以下是将人员卡号’b’3b44’'转换为十进制的示例代码:

card_number = '3b44'
decimal_number = int(card_number, 16)
print(decimal_number)

使用response[-4:]获取了响应数据的后4个字节作为value96。然后,通过struct.unpack(‘>f’, value96)[0]将4字节的二进制字符串解包为单精度浮点数,并将其打印出来。

#实时电量
request = bytes.fromhex("00 20 00 00 00 06 01 03 00 5F 00 02 ")
client_socket.send(request)
response = client_socket.recv(1024)

value96 = response[-4:]
value96 = struct.unpack('>f', value96)[0]
value96=value96*10.00
value96=round(value96,2)
print("实时电量 单精度浮点数: {:.2f}".format(value96))

value40 是一个包含两个字节的字节串,即 b’\x00\x00’,将其转换为二进制,并保留8位。

以下是将字节串转换为二进制并保留8位的示例代码:

value40 = b'\x00\x00'
binary_value = bin(int.from_bytes(value40, byteorder='big'))[2:].zfill(8)
print("状态:", binary_value)

DEMO代码

#!D:/workplace/python
# -*- coding: utf-8 -*-
# @File  : main0523_04.py
# @Author:Romulushe
# @Time    : 2023/5/23 10:38
# @Software: PyCharm
# @Use: PyCharm
import socket
import struct
import time
import binascii

interval = 5
ip_address = ''#根据实际情况自定义
port_number = 502
polling_interval = float(interval)

def print_binary_value(value, name):
    binary_value = bin(int.from_bytes(value, byteorder='big'))[2:].zfill(8)[::-1]
    print(f"{name}: {binary_value}")

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
    try:
        client_socket.settimeout(3)
        client_socket.connect((ip_address, port_number))

        while True:
            try:
                ##电压
                request = bytes.fromhex("00 22 00 00 00 06 01 03 00 53 00 02 ")
                client_socket.send(request)
                response = client_socket.recv(1024)

                value84 = response[-4:]
                value84= struct.unpack('>f', value84)[0]
                # value84 = value84 * 10.00
                value84 = round(value84, 2)
                print("实时电压 单精度浮点数: {:.2f}".format(value84))

                ##电流
                request = bytes.fromhex("00 24 00 00 00 06 01 03 00 4F 00 02  ")
                client_socket.send(request)
                response = client_socket.recv(1024)

                value80 = response[-4:]
                value80 = struct.unpack('>f', value80)[0]
                # value84 = value80 * 10.00
                value80 = round(value80, 2)
                print("实时电流 单精度浮点数: {:.2f}".format(value80))

                ##实时温度1
                request = bytes.fromhex("00 2E 00 00 00 06 01 03 00 57 00 02 ")
                client_socket.send(request)
                response = client_socket.recv(1024)

                value88 = response[-4:]
                value88 = struct.unpack('>f', value88)[0]
                # value88 = value88 * 10.00
                value88 = round(value88, 2)
                print("实时温度1 单精度浮点数: {:.2f}".format(value88))

                #实时温度2
                request = bytes.fromhex("00 2C 00 00 00 06 01 03 00 5B 00 02 ")
                client_socket.send(request)
                response = client_socket.recv(1024)

                value92 = response[-4:]
                value92 = struct.unpack('>f', value92)[0]
                # value92 = value92 * 10.00
                value92 = round(value92, 2)
                print("实时温度2 单精度浮点数: {:.2f}".format(value92))

                ##车速
                request = bytes.fromhex("00 31 00 00 00 06 01 03 00 3B 00 01 ")
                client_socket.send(request)
                response = client_socket.recv(1024)

                speed= binascii.hexlify(response)[-4:]
                speed = int(speed, 16)
                print("车速:", speed)

                # #实时电量
                request = bytes.fromhex("00 20 00 00 00 06 01 03 00 5F 00 02 ")
                client_socket.send(request)
                response = client_socket.recv(1024)

                value96 = response[-4:]
                value96 = struct.unpack('>f', value96)[0]
                value96=value96*10.00
                value96=round(value96,2)
                print("实时电量 单精度浮点数: {:.2f}".format(value96))

                #充放电状态

                #超速报警

                #低电量报警

                #温度过高1

                #温度过高2

                #00 18 00 00 00 06 01 03 00 28 00 01
                request = bytes.fromhex("00 33 00 00 00 06 01 03 00 28 00 01 ")
                client_socket.send(request)
                response = client_socket.recv(1024)

                value40 = response[-2:]
                value40= bin(int.from_bytes(value40, byteorder='big'))[2:].zfill(8)[::-1]
                print("充放电状态:",value40[4])
                print("超速报警:", value40[0])
                print("低电量报警:", value40[2])
                print("温度1过高:", value40[3])
                print("温度2过高:", value40[7])


                #车牌号
                #
                request = bytes.fromhex("00 00 00 00 00 06 01 03 00 1E 00 02 ")  # 车牌号
                client_socket.send(request)
                response = client_socket.recv(1024)

                car_num= binascii.hexlify(response)[-8:-4]
                car_num = int(car_num, 16)
                print("车牌号:", car_num)

                #人员卡号
                request = bytes.fromhex("00 00 00 00 00 06 01 03 00 42 00 02 ")  # 人员卡号
                client_socket.send(request)
                response = client_socket.recv(1024)

                card = binascii.hexlify(response)[-8:-4]
                card = int(card, 16)
                print("人员卡号:", card)

            except socket.timeout:
                print('TIMEOUT ERROR: 服务器未及时响应')
    except Exception as e:
        print('CONNECT ERROR:', e)

DEMO结果

E:\software\python\python.exe E:/projects/Forklift/t2.py

实时电压 单精度浮点数: 48.85

实时电流 单精度浮点数: 0.26

实时温度1 单精度浮点数: 31.10

实时温度2 单精度浮点数: 30.85

车速: 0

实时电量 单精度浮点数: 68.52

充放电状态: 0

超速报警: 0

低电量报警: 0

温度1过高: 0

温度2过高: 0

车牌号: 15172

人员卡号: 10763

其他:地址表

在这里插入图片描述

附带数据库&log存储的代码:

#!D:/workplace/python
# -*- coding: utf-8 -*-
# @File  : main0523_04.py
# @Author:Romulushe
# @Time    : 2023/5/23 10:38
# @Software: PyCharm
# @Use: PyCharm
import socket
import struct
import time
import binascii
import pymysql
import os,sys,datetime,logging

interval = 5
ip_address = ''#根据实际情况自定义
port_number = ''#根据实际情况自定义
polling_interval = float(interval)


base_path = os.path.dirname(os.path.realpath(sys.argv[0]))


def get_log_path():
    return os.path.join(base_path, 'logs')


def cleanup_logs():
    log_path = get_log_path()
    current_time = time.time()
    for file_name in os.listdir(log_path):
        file_path = os.path.join(log_path, file_name)
        if os.path.isfile(file_path):
            creation_time = os.path.getctime(file_path)
            if current_time - creation_time > (3 * 24 * 60 * 60):
                os.remove(file_path)


def configure_logging():
    log_path = get_log_path()
    os.makedirs(log_path, exist_ok=True)
    log_filename = get_log_filename()
    log_file = os.path.join(log_path, log_filename)
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', filename=log_file)


def get_log_filename():
    now = datetime.datetime.now()
    return now.strftime("%Y-%m-%d_%H-%M.log")


def create_new_log():
    log_path = get_log_path()
    log_files = os.listdir(log_path)
    if len(log_files) >= 20:
        oldest_file = min(log_files)
        os.remove(os.path.join(log_path, oldest_file))
    log_filename = get_log_filename()
    log_filepath = os.path.join(log_path, log_filename)
    return log_filepath


def check_log_size(log_filepath):
    log_size = os.path.getsize(log_filepath)
    if log_size > 2 * 1024 * 1024:
        # 创建新的日志文件
        new_log_filepath = create_new_log()
        try:
            shutil.move(log_filepath, new_log_filepath)
            return new_log_filepath
        except PermissionError:
            insert_log(logger, f'{log_filepath} {PermissionError}', log_filepath)
            time.sleep(0.1)
            return log_filepath
    return log_filepath

def insert_log(logger, log_message, log_filepath):
    log_filepath = check_log_size(log_filepath)

    # 创建文件处理器
    file_handler = RotatingFileHandler(log_filepath, maxBytes=2 * 1024 * 1024, backupCount=1)
    file_handler.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)

    # 添加文件处理器到日志记录器
    logger.addHandler(file_handler)

    try:
        logger.debug(log_message)
    except PermissionError:
        insert_log(logger, f'{log_message} {PermissionError}', log_filepath)
        time.sleep(0.1)  # 延迟0.1秒

    # 移除文件处理器
    logger.removeHandler(file_handler)
# 连接数据库
def connect_database(host, port, user, password, db_name):
    try:
        conn = pymysql.connect(host=host, port=port, user=user, password=password, db=db_name)
        # print("成功连接到数据库")
        return conn
    except pymysql.Error as e:
        insert_log(logger, f'DATABASE CONNECT FAILED', log_filepath)
        # print(f"数据库连接失败: {e}")

# 插入数据
def insert_data(conn, types,table_name, create_time, location_no, parameter_desc, location, param_value):
    try:
        cursor = conn.cursor()
        sql = f"INSERT INTO {table_name} (TYPE, CREATE_TIME, LOCATION_NO, PARAMETER_DESC, LOCATION, PARAMVALUE) " \
              f"VALUES (%s, %s, %s, %s, %s, %s)"
        cursor.execute(sql, (types,create_time, location_no, parameter_desc, location, param_value))
        conn.commit()
        # print(f"数据插入成功: {create_time}, {parameter_desc}: {param_value}")
        cursor.close()
    except pymysql.Error as e:
        insert_log(logger, f'DATABASE INSERT DATA FAILED', log_filepath)
        # print(f"数据插入失败: {e}")


with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
    configure_logging()
    cleanup_logs()

    log_filepath = create_new_log()

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)

    try:
        client_socket.settimeout(3)
        client_socket.connect((ip_address, port_number))

        # 连接数据库
        host = ''#根据实际情况自定义
        port = 3306
        user = ''#根据实际情况自定义
        password = ''#根据实际情况自定义
        db_name = ''#根据实际情况自定义
        table_name =''#根据实际情况自定义
        try:
            # 连接数据库
            conn = connect_database(host, port, user, password, db_name)
        except Exception as e:
            print("COON ERROR:",e)

        while True:

            try:
                types ='叉车'
                location ='F2堆02'



                ##电压
                request = bytes.fromhex("00 22 00 00 00 06 01 03 00 53 00 02 ")
                client_socket.send(request)
                response = client_socket.recv(1024)

                value84 = response[-4:]
                value84= struct.unpack('>f', value84)[0]
                # value84 = value84 * 10.00
                value84 = round(value84, 2)
                print("实时电压 单精度浮点数: {:.2f}".format(value84))

                location_no ='F01-1'
                parameter_desc ='实时电压'
                param_value=value84

                insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)

                ##电流
                request = bytes.fromhex("00 24 00 00 00 06 01 03 00 4F 00 02  ")
                client_socket.send(request)
                response = client_socket.recv(1024)

                value80 = response[-4:]
                value80 = struct.unpack('>f', value80)[0]
                # value84 = value80 * 10.00
                value80 = round(value80, 2)
                print("实时电流 单精度浮点数: {:.2f}".format(value80))
                location_no = 'F01-2'
                parameter_desc = '实时电流'
                param_value=value80

                insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)

                ##实时温度1
                request = bytes.fromhex("00 2E 00 00 00 06 01 03 00 57 00 02 ")
                client_socket.send(request)
                response = client_socket.recv(1024)

                value88 = response[-4:]
                value88 = struct.unpack('>f', value88)[0]
                # value88 = value88 * 10.00
                value88 = round(value88, 2)
                print("实时温度1 单精度浮点数: {:.2f}".format(value88))
                location_no = 'F01-3'
                parameter_desc = '实时温度1'
                param_value = value88

                insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)

               #实时温度2
                request = bytes.fromhex("00 2C 00 00 00 06 01 03 00 5B 00 02 ")
                client_socket.send(request)
                response = client_socket.recv(1024)

                value92 = response[-4:]
                value92 = struct.unpack('>f', value92)[0]
                # value92 = value92 * 10.00
                value92 = round(value92, 2)
                print("实时温度2 单精度浮点数: {:.2f}".format(value92))
                location_no = 'F01-4'
                parameter_desc = '实时温度2'
                param_value = value92

                insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)

                ##车速
                request = bytes.fromhex("00 31 00 00 00 06 01 03 00 3B 00 01 ")
                client_socket.send(request)
                response = client_socket.recv(1024)

                speed= binascii.hexlify(response)[-4:]
                speed = int(speed, 16)
                print("车速:", speed)
                location_no = 'F01-5'
                parameter_desc = '车速'
                param_value = speed

                insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)

                # #实时电量
                request = bytes.fromhex("00 20 00 00 00 06 01 03 00 5F 00 02 ")
                client_socket.send(request)
                response = client_socket.recv(1024)

                value96 = response[-4:]
                value96 = struct.unpack('>f', value96)[0]
                value96=value96*10.00
                value96=round(value96,2)
                print("实时电量 单精度浮点数: {:.2f}".format(value96))
                location_no = 'F01-6'
                parameter_desc = '实时电量'
                param_value = value96

                insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)

                #充放电状态

                #超速报警

                #低电量报警

                #温度过高1

                #温度过高2

                #00 18 00 00 00 06 01 03 00 28 00 01
                request = bytes.fromhex("00 33 00 00 00 06 01 03 00 28 00 01 ")
                client_socket.send(request)
                response = client_socket.recv(1024)

                value40 = response[-2:]
                value40= bin(int.from_bytes(value40, byteorder='big'))[2:].zfill(8)[::-1]
                print("充放电状态:",value40[4])
                print("超速报警:", value40[0])
                print("低电量报警:", value40[2])
                print("温度1过高:", value40[3])
                print("温度2过高:", value40[7])
                location_no = 'F01-7'
                parameter_desc = '充放电状态'
                param_value = value40[4]
                insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)

                location_no = 'F01-8'
                parameter_desc = '超速报警'
                param_value = value40[0]
                insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)

                location_no = 'F01-9'
                parameter_desc = '低电量报警'
                param_value = value40[2]
                insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)

                location_no = 'F01-10'
                parameter_desc = '温度1过高'
                param_value = value40[3]
                insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)

                location_no = 'F01-11'
                parameter_desc = '温度2过高'
                param_value = value40[7]
                insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)

                # 车牌号
                #
                request = bytes.fromhex("00 00 00 00 00 06 01 03 00 1E 00 02 ")  # 车牌号
                client_socket.send(request)
                response = client_socket.recv(1024)

                car_num = binascii.hexlify(response)[-8:-4]
                car_num = int(car_num, 16)
                print("车牌号:", car_num)
                location_no = 'F01-12'
                parameter_desc = '车牌号'
                param_value = car_num
                insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)

                # 人员卡号
                request = bytes.fromhex("00 00 00 00 00 06 01 03 00 42 00 02 ")  # 人员卡号
                client_socket.send(request)
                response = client_socket.recv(1024)

                card = binascii.hexlify(response)[-8:-4]
                card = int(card, 16)
                print("人员卡号:", card)
                location_no = 'F01-13'
                parameter_desc = '人员卡号'
                param_value = card
                insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)

            except socket.timeout:
                print('TIMEOUT ERROR: 服务器未及时响应')
    except Exception as e:
        print('CONNECT ERROR:', e)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/753587.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mac批量提取文件夹的名称,怎么操作?

mac批量提取文件夹的名称,怎么操作?很多小伙伴想知道在mac电脑上可以一键快速批量的将大量文件夹的名提取出来,而不是采用一个一个名称提取的方法,这是一个有利于提高工作效率的办法,这一项技能在网上几乎找不到解决办…

智能感测型静电消除器的原理

感测型静电消除器是一种能够监测和消除静电的装置。静电是由于物体表面积聚了不平衡的电荷而产生的现象,常常会引发电击、火花、物体吸附等问题。 感测型静电消除器通常包含以下几个主要部分: 1. 传感器:用于检测静电电荷的存在和强度。传感…

V4l2-ctl

1 v4l-utils v4l-utils是一种用于处理媒体设备的软件包,它主要包含两个常用工具1: media-ctl:用于配置拓扑结构中各节点的format、大小、链接;操作/dev/medio0节点;获取Camera支持数据格式等。v4l2-ctl:用…

Java面试为啥会越来越难?

最近感慨面试难的人越来越多了,一方面是市场环境,更重要的一方面是企业对Java的人才要求越来越高了。 基本上这样感慨的分为两类人,第一,虽然挂着3、5年经验,但肚子里货少,也没啥拿得出手的项目&#xff0c…

追妹神器,恋爱神器,哄老婆开心,智能机器人每天给你心爱的TA发送早晚安问候

柠檬恋爱助理配置文档 柠檬恋爱助理插件是利用微信测试公众号,实现每天给你的TA发送早晚安等模板消息的一款全自动化插件,恋爱神器、追妹神器。真正做到只需简单配置,就可以实现哄你的TA开心快乐每一天。 项目地址:点击查看 体…

【C语言】-- 一篇带你了解指针数组与数组指针

目录 一、什么是指针数组和数组指针 1. 指针数组:顾名思义,存放指针的数组。 补充(1):指针数组还可以和字符串数组相结合使用 补充(2):二维数组与指针数组的区别 2. 数组指针&am…

UriComponentsBuilder使用发现参数被编码了

前言 开发中,小编在项目中,使用RestTemplate做GET请求,为了优雅的封装参数,使用了UriComponentsBuilder来进行参数封装。直接123将代码写完,远程调用有数据,远程底层solr库,所以拿到数据了&…

Linux的基本使用和web程序部署

注意:本文章不适合C学习者(知识点远远不够),只适合Java学习者,学习简单的Linux命令 1.Linux的背景知识 1.1Linux是什么 Linux是一个操作系统,和Windows是“并列”的关系。经过多年的发展,Lin…

控制台警告:Added non-passive event listener to a scroll-blocking <some> event

页面中的echarts图表缩放后在控制台就会出现以下提醒&#xff0c;虽然只是报黄提醒&#xff0c;但本人强迫症严重,见不得控制台有任何异常...... [Violation] Added non-passive event listener to a scroll-blocking <some> event. Consider marking event handler as …

python+pytest接口自动化之测试函数、测试类/测试方法的封装

目录 前言 测试用例封装的一般规则 测试函数的封装 测试类/方法的封装 示例代码 总结 前言 在pythonpytest 接口自动化系列中&#xff0c;我们之前的文章基本都没有将代码进行封装&#xff0c;但实际编写自动化测试脚本中&#xff0c;我们都需要将测试代码进行封装&#…

并不简单的代理,Dubbo是如何做服务引用的

系列文章目录 【收藏向】从用法到源码&#xff0c;一篇文章让你精通Dubbo的SPI机制 面试Dubbo &#xff0c;却问我和Springcloud有什么区别&#xff1f; 超简单&#xff0c;手把手教你搭建Dubbo工程&#xff08;内附源码&#xff09; Dubbo最核心功能——服务暴露的配置、使用…

2023/07/14 UML图/流程图/泳道图是什么

UML图 UML图中的几种图简介&#xff08;时序图&#xff0c;协作图&#xff0c;状态图&#xff0c;活动图&#xff0c;对象图&#xff09; 泳道图 适合做这种效果&#xff0c;体现角色关系 流程图 定义 绘制要素 开始/结束&#xff1a;用一个椭圆标识&#xff0c;代表流畅的开…

优维EasyOps产品使用最佳实践:Agent存活性监控

优维EasyOps平台内置Agent存活性监控啦&#xff01; Agent作为自动化/监控底层核心组件&#xff0c;它的可用性直接影响了上层功能的使用&#xff0c;故我们会非常关注它的状态。但如果有网络波动、Agent升级或机器故障等都可能导致Agent异常&#xff0c;这时用户希望这种异常…

初级 - 如何搭建一个Java Web项目 - 记录

目录 序言一、使用 Spring Initializr 创建创建一个Java 项目基本框架的方法1. 新建项目时&#xff0c;安装依赖理解Developer Tools 选项 &#xff01;Web 选项 &#xff01; 其他选项具体详情请最下面的参考链接&#xff0c;这里就不一一列举了&#xff0c;只筛选出笔者当前需…

科技政策 | 国家网信办等七部门联合公布《生成式人工智能服务管理暂行办法》

文 | BFT机器人 近日&#xff0c;国家网信办联合国家发展改革委、教育部、科技部、工业和信息化部、公安部、广电总局公布《生成式人工智能服务管理暂行办法》&#xff08;以下称《办法》&#xff09;&#xff0c;自2023年8月15日起施行。国家互联网信息办公室有关负责人表示&a…

零代码编程:用ChatGPT自动登陆微信公众号后台

要实现微信公众号后台自动登陆&#xff0c;可以使用ChatGPT来编写Python代码实现。 微信公众平台账号密码登陆&#xff0c;要先点击“使用账号登录”&#xff0c;源代码是&#xff1a;<a href"javascript:;" class"login__type__container__select-type"…

Python自动化之pytest常用插件

目录 1、失败重跑 pytest-rerunfailures 2、多重校验 pytest-assume 3、设定执行顺序 pytest-ordering 4、用例依赖&#xff08;pytest-dependency&#xff09; 5.分布式测试(pytest-xdist) 6.生成报告&#xff08;pytest-html&#xff09; 1、失败重跑 pytest-rerunfailu…

web 前端 Day 4

盒子模型 <style>div {width: 300px;height: 300px;background-color: pink;padding-left: 4px; 左侧内边距border: 3px solid red;margin: 50px;}</style> padding 内边距 </head> ​ <body> ​<div>cfdaffshydghjgdjdnjjjjjjjjjjjjjjj&l…

springboot网吧管理系统

着科学技术发展&#xff0c;电脑已成为人们生活中必不可少的生活办公工具&#xff0c;在这样的背景下&#xff0c;网络技术被应用到各个方面&#xff0c;为了提高办公生活效率&#xff0c;网络信息技术飞速发展。在这样的背景下人类社会进入了全新的信息化的时代。网吧管理一直…

Jenkins持续集成项目实践 —— 基于Python Selenium自动化测试(二)

上一篇讲了如何搭建jenkins&#xff0c;这篇主要讲&#xff0c;怎么将自动化代码与jenkins衔接起来 jenkins上运行的两种方式&#xff1a; 第一种&#xff0c;在jenkins上面运行本地代码&#xff0c;操作如下: 新建项目&#xff1a;项目名称根据自己项目情况填写并选择自由模…