【Python】FANUC机器人OPC UA通信并记录数据

news2024/11/25 18:24:50

目录

    • 引言
    • 机器人仿真
    • 环境准备
    • 代码实现
      • 1. 导入库
      • 2. 设置参数
      • 3. 日志配置
      • 4. OPC UA通信
      • 5. 备份旧CSV文件
      • 6. 主函数
    • 总结

引言

 OPC UA(Open Platform Communications Unified Architecture)是一种跨平台的、开放的数据交换标准,常用于工业自动化领域。Python因其易用性和丰富的库支持,成为实现OPC UA通信的不错选择。本文将介绍如何使用Python进行OPC UA通信,并实时记录从FANUC机器人获取的数据。

机器人仿真

 FANUC机器人可以使用官方软件RoboGuide进行机器人仿真,启动后默认OPC UA地址为127.0.0.1:4880/FANUC/NanoUaServer
在这里插入图片描述

环境准备

  • Python 3.5+
  • opcua库:用于实现OPC UA通信
  • logging库:用于记录日志

安装opcua库:

pip install opcua

代码实现

1. 导入库

import csv
from datetime import datetime
import logging
import os
import shutil
import time
from typing import List
from opcua.common.node import Node
from opcua import Client, ua

2. 设置参数

SERVER_URL = "opc.tcp://127.0.0.1:4880/FANUC/NanoUaServer"
CSV_FILENAME = 'fanuc_opcua_data.csv'
FAUNC_LOG = 'fanuc.log'
LOG_DIR = 'log'
BACKUP_DIR = 'backup'

3. 日志配置

def getLogger(filename: str):
    if not os.path.exists(LOG_DIR):
        os.makedirs(LOG_DIR)
    logger = logging.Logger(filename[:-4].upper(), logging.INFO)
    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s %(message)s")
    fh = logging.FileHandler(LOG_DIR + "/" + filename, encoding="utf-8", mode="a")
    fh.setFormatter(formatter)
    ch = logging.StreamHandler()
    ch.setFormatter(formatter)
    logger.addHandler(fh)
    logger.addHandler(ch)
    return logger
    
LOGGER = getLogger(FAUNC_LOG)

4. OPC UA通信

  • 连接到服务器
def connect_to_server(url):
    client = Client(url)
    client.connect()
    return client
  • 获取根节点和对象节点
def get_root_node(client: Client):
    return client.get_root_node()
def get_objects_node(client: Client):
    return client.get_objects_node()
  • 遍历所有子节点并返回变量节点的路径和数值
def get_variables(node: Node, path=""):
    variables = {}
    children: List[Node] = node.get_children()
    for child in children:
        try:
            name: ua.QualifiedName = child.get_browse_name()
            new_path = f"{path}/{name.Name}"
            if child.get_node_class() == ua.NodeClass.Variable:
                value = child.get_value()
                if isinstance(value, list):
                    value = ','.join(str(x) for x in value)
                if isinstance(value, str):
                    value = value.replace('\n', '\\n').replace(',', ' ')
                variables[new_path] = value
            else:
                variables.update(get_variables(child, new_path))
        except Exception as e:
            LOGGER.error(f"Error fetching variable: {new_path}, Error: {e}")
    return variables

5. 备份旧CSV文件

def backup_csv_file(filename):
    if not os.path.exists(BACKUP_DIR):
        os.makedirs(BACKUP_DIR)
    if os.path.exists(filename):
        modification_time = os.path.getmtime(filename)
        modification_time_str = datetime.fromtimestamp(modification_time).strftime('%Y%m%d%H%M%S')
        new_filename = f"{BACKUP_DIR}/{filename}_{modification_time_str}"
        try:
            shutil.move(filename, new_filename)
            LOGGER.info(f"文件已移动到 {new_filename}")
        except Exception as e:
            LOGGER.error(f"移动文件出错: {new_filename}, Error: {e})

6. 主函数

if __name__ == "__main__":
    try:
        client = connect_to_server(SERVER_URL)
        root_node = get_root_node(client)
        objects_node = get_objects_node(client)
        backup_csv_file(CSV_FILENAME)
        with open(CSV_FILENAME, mode='w', newline='') as csvfile:
            num = 0
            while True:
                variables = get_variables(objects_node)
                if num == 1:
                    writer = csv.DictWriter(csvfile, fieldnames=variables.keys())
                    writer.writeheader()
                writer.writerow(variables)
                csvfile.flush()
                num += 1
                LOGGER.info("数据记录:" + str(num))
                time.sleep(1)
    except KeyboardInterrupt:
        print("程序被用户中断")
    finally:
        client.disconnect()

记录数据预览:
在这里插入图片描述

总结

 本文介绍了如何使用Python进行OPC UA通信,并实时记录从FANUC机器人获取的数据。通过使用opcua库,我们可以轻松地连接到OPC UA


完整代码:

import csv
from datetime import datetime
import logging
import os
import shutil
import time
from typing import List
from opcua.common.node import Node
from opcua import Client, ua

# OPC UA服务器的URL
SERVER_URL = "opc.tcp://127.0.0.1:4880/FANUC/NanoUaServer"
# CSV文件名
CSV_FILENAME = 'fanuc_opcua_data.csv'
# 日志文件
FAUNC_LOG = 'fanuc.log'
# 文件夹 
LOG_DIR = 'log'
BACKUP_DIR = 'backup'

def getLogger(filename: str):
    if not os.path.exists(LOG_DIR):
        os.makedirs(LOG_DIR)
    logger = logging.Logger(filename[:-4].upper(), logging.INFO)
    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s %(message)s")
    fh = logging.FileHandler(LOG_DIR + "/" + filename, encoding="utf-8", mode="a")
    fh.setFormatter(formatter)
    ch = logging.StreamHandler()
    ch.setFormatter(formatter)
    logger.addHandler(fh)
    logger.addHandler(ch)
    return logger
    
LOGGER = getLogger(FAUNC_LOG)
def connect_to_server(url):
    """创建客户端实例并连接到服务端"""
    client = Client(url)
    client.connect()
    return client

def get_root_node(client: Client):
    """获取服务器命名空间中的根节点"""
    return client.get_root_node()

def get_objects_node(client: Client):
    """获取服务器的对象节点"""
    return client.get_objects_node()

def get_variables(node: Node, path=""):
    """遍历所有子节点并返回变量节点的路径和数值"""
    variables = {}
    children: List[Node] = node.get_children()
    for child in children:
        try:
            name: ua.QualifiedName = child.get_browse_name()
            new_path = f"{path}/{name.Name}"
            if child.get_node_class() == ua.NodeClass.Variable:
                value = child.get_value()
                if isinstance(value, list):
                    value = ','.join(str(x) for x in value)
                if isinstance(value, str):
                    value = value.replace('\n', '\\n').replace(',', ' ')
                variables[new_path] = value
            else:
                variables.update(get_variables(child, new_path))
        except Exception as e:
            LOGGER.error(f"Error fetching variable: {new_path}, Error: {e}")
    return variables

def backup_csv_file(filename):
    """如果CSV文件已存在则备份"""
    if not os.path.exists(BACKUP_DIR):
        os.makedirs(BACKUP_DIR)
    if os.path.exists(filename):
        modification_time = os.path.getmtime(filename)
        modification_time_str = datetime.fromtimestamp(modification_time).strftime('%Y%m%d%H%M%S')
        new_filename = f"{BACKUP_DIR}/{filename}_{modification_time_str}"
        try:
            shutil.move(filename, new_filename)
            LOGGER.info(f"文件已移动到 {new_filename}")
        except Exception as e:
            LOGGER.error(f"移动文件出错: {new_filename}, Error: {e}")
        

if __name__ == "__main__":
    try:
        client = connect_to_server(SERVER_URL)
        root_node = get_root_node(client)
        objects_node = get_objects_node(client)

        backup_csv_file(CSV_FILENAME)

        with open(CSV_FILENAME, mode='w', newline='') as csvfile:
            num = 0

            while True:
                variables = get_variables(objects_node)

                if num == 1:
                    writer = csv.DictWriter(csvfile, fieldnames=variables.keys())
                    writer.writeheader()

                writer.writerow(variables)
                csvfile.flush()
                num += 1
                LOGGER.info("数据记录:" + str(num))
                time.sleep(1)

    except KeyboardInterrupt:
        print("程序被用户中断")
    finally:
        client.disconnect()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1582805.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

4.19号驱动

1. ARM裸机开发和Linux系统开发的异同 相同点:都是对硬件进行操作 不同点: 有无操作系统 是否具备多进程多线程开发 是否可以调用库函数 操作地址是否相同,arm操作物理地址,驱动操作虚拟地址 2. Linux操作系统的层次 应用层…

Redis群集模式

目录 一、集群的作用 二、Redis集群的数据分片 三、集群的工作原理​编辑 四、搭建Redis群集模式 1.准备环境 1.1 首先安装redis 1.2 在etc下创建redis 1.3再在redis中创建redis-cluster/redis600{1..6}文件 1.4 做个for循环 1.5 开启群集功能 1.6启动redis节点 1.…

c语言---预处理详解(详解)

目录 一、预定义符号二、define 定义常量三、define定义宏四、带有副作用的宏参数五、宏替换的规则六、宏函数的对比七、#和##7.1 #运算符7.2 ##运算符 八、命名约定九、#undef十、命令行定义十一、条件编译十二、头文件的包含12.1头⽂件被包含的方式:12.1.1本地文件…

anylabeling使用和安装

源码地址: git clone https://github.com/vietanhdev/anylabeling.git Auto Labeling with Segment Anything Youtube Demo: https://www.youtube.com/watch?v5qVJiYNX5KkDocumentation: https://anylabeling.nrl.ai Features: Image annotation for polygon, r…

德勤:《中国AI智算产业2024年四大趋势》

2023年《数字中国建设整体布局规划》的发布,明确了数字中国是构建数字时代竞争优势的关键支撑,是继移动互联网时代以来经济增长新引擎。当我们谈论数字中国的构建,不仅仅是在讨论一个国家级的技术升级,而是关乎如何利用数字技术来…

StoryImager、Face Morph、Hash3D、DreamView、Magic-Boost、SmartControl

本文首发于公众号:机器感知 StoryImager、Face Morph、Hash3D、DreamView、Magic-Boost、SmartControl Eagle and Finch: RWKV with Matrix-Valued States and Dynamic Recurrence We present Eagle (RWKV-5) and Finch (RWKV-6), sequence models improving upon…

今日arXiv最热大模型论文:Dataverse,针对大模型的开源ETL工具,数据清洗不再难!

引言:大数据时代下的ETL挑战 随着大数据时代的到来,数据处理的规模和复杂性不断增加,尤其是在大语言模型(LLMs)的开发中,对海量数据的需求呈指数级增长。这种所谓的“规模化法则”表明,LLM的性…

Python爬虫之Scrapy框架基础

Scrapy爬虫框架介绍 文档 英文文档中文文档 什么是scrapy 基于twisted搭建的异步爬虫框架. scrapy爬虫框架根据组件化设计理念和丰富的中间件, 使其成为了一个兼具高性能和高扩展的框架 scrapy提供的主要功能 具有优先级功能的调度器去重功能失败后的重试机制并发限制ip使用次…

基于Spring Boot的网上商城购物系统设计与实现

基于Spring Boot的网上商城购物系统设计与实现 开发语言:Java框架:springbootJDK版本:JDK1.8数据库工具:Navicat11开发软件:eclipse/myeclipse/idea 系统部分展示 商品信息界面,在商品信息页面可以查看商…

谷歌(Google)历年编程真题——生命游戏

谷歌历年面试真题——数组和字符串系列真题练习。 生命游戏 根据 百度百科 , 生命游戏 ,简称为 生命 ,是英国数学家约翰何顿康威在 1970 年发明的细胞自动机。 给定一个包含 m n 个格子的面板,每一个格子都可以看成是一个细胞…

Python 全栈体系【四阶】(二十五)

第五章 深度学习 三、计算机视觉基本理论 11. 图像梯度处理 11.1 什么是图像梯度 图像梯度计算的是图像变化的速度。对于图像的边缘部分,其灰度值变化较大,梯度值也较大;相反,对于图像中比较平滑的部分,其灰度值变化…

蓝桥杯复习笔记

文章目录 gridflexhtml表格合并单元格 表单表单元素input类型 select h5文件上传拖拽apiweb Storage css块元素和行内元素转换positionfloat溢出显示隐藏外边距过渡和动画动画变形选择器属性选择伪类选择器 css3边框圆角边框阴影渐变text-overflow与word-wrap jsdom操作documen…

一键下载安装并自动绑定,Xinstall让您的应用推广更高效

在如今的移动互联网时代,应用的下载安装与绑定是用户体验的关键一环。然而,繁琐的操作步骤和复杂的绑定流程往往让用户望而却步,降低了应用的下载和使用率。为了解决这一难题,Xinstall应运而生,为用户提供了一种全新的…

gradio简单搭建——关键词匹配筛选

gradio简单搭建——关键词匹配筛选 界面搭建数据处理过程执行效果展示 上一节使用DataFrame中的apply方法提升了表格数据的筛选效率,本节使用gradio结合apply方法搭建一个关键词匹配筛选的交互界面。 界面搭建 import gradio as gr import pandas as pd from file…

C语言指针—二级指针和指针数组

二级指针和指针数组 二级指针 指针变量也是变量,是变量就有地址,那指针变量的地址存放在哪里? 这就是二级指针 。 int main() {int a 10;int* pa &a;//pa是一个指针变量,同时也是一个一级指针变量*pa 20;//此时解引用pa…

021——搭建TCP网络通信环境(c服务器python客户端)

目录 前言 服务器程序 服务器程序验证过程 客户端程序 前言 驱动开发暂时告一段落了。后面在研究一下OLED和GPS的驱动开发,并且优化前面已经移植过来的这些驱动,我的理念是在封装个逻辑处理层来处理这些驱动程序。server直接操作逻辑处理层的程序。 …

统信UOS(Linux)安装nvm node管理工具

整篇看完再操作,有坑!! 官网 nvm官网 按照官网方式安装,一直报 错 经过不断研究,正确步骤如下 1、下载安装包 可能因为网络安全不能访问github,我是链接热点下载的 wget https://github.com/nvm-sh/…

基于springboot+vue+Mysql的职称评审管理系统

开发语言:Java框架:springbootJDK版本:JDK1.8服务器:tomcat7数据库:mysql 5.7(一定要5.7版本)数据库工具:Navicat11开发软件:eclipse/myeclipse/ideaMaven包:…

Mac安装配置ElasticSearch和Kibana 8.13.2

系统环境:Mac M1 (MacOS Sonoma 14.3.1) 一、准备 从Elasticsearch:官方分布式搜索和分析引擎 | Elastic上下载ElasticSearch和Kibana 笔者下载的是 elasticsearch-8.13.2-darwin-aarch64.tar.gz kibana-8.13.2-darwin-aarch64.tar.gz 并放置到个人…

序列化、反序列化:将对象以字节流的方式,进行写入或读取

序列化:将指定对象,以"字节流"的方式写入一个文件或网络中。 反序列化:从一个文件或网络中,以"字节流"的方式读取到对象。 package com.ztt.Demo01;import java.io.FileNotFoundException; import java.io.Fi…