【Python】OPC UA模拟服务器实现

news2024/11/16 16:43:35

目录

      • 服务器模拟
        • 1. 环境准备
        • 2. 服务器设置
        • 3. 服务器初始化
        • 4. 节点操作
        • 5. 读取CSV文件
        • 6. 运行服务器
      • 查看服务器
      • 客户端
      • 总结

 在工业自动化和物联网(IoT)领域,OPC UA(开放平台通信统一架构)已经成为一种广泛采用的数据交换标准。它提供了一种安全、可靠且独立于平台的方式来访问实时数据。在本文中,我们将探讨如何使用Python和OPC UA库来创建一个高效的数据服务器,该服务器能够从CSV文件读取数据,并允许OPC UA客户端访问这些数据。

服务器模拟

1. 环境准备

确保您的Python环境中已安装opcua库。如果未安装,请使用以下命令进行安装:

pip install opcua

导入python库

import argparse
import csv
import time
from typing import List
from opcua import Server, Node, ua
2. 服务器设置

首先,我们定义命令行参数来配置服务器地址、端口和数据源路径。

parser = argparse.ArgumentParser(description="OPCUA模拟服务器")

# 添加参数
parser.add_argument("--ip", "-i", default="0.0.0.0", help="服务器地址")
parser.add_argument("--port", "-p", default=4840, type=int, help="服务器端口")
parser.add_argument("--source", "-s", default="opcua_data.csv", help="数据源路径")
parser.add_argument("--type", "-t", default="fanuc", help="模拟类型")
# 解析命令行参数
args = parser.parse_args()

CSV_FILE = args.source
SERVER_URL = "opc.tcp://" + str(args.ip) + ":" + str(args.port)
SERVER_TYPE: str = args.type

参数:
在这里插入图片描述

3. 服务器初始化

接着,我们初始化OPC UA服务器,并设置命名空间。

# 创建服务器实例
server = Server()
# 设置服务器端口
server.set_endpoint(SERVER_URL)
# 创建一个命名空间
uri = "http://opcua.simulator.com"
idx = server.register_namespace(uri)
4. 节点操作

定义函数来处理节点信息,并添加或更新节点。

# 获取对象节点,它通常是根节点的第一个孩子
objects = server.get_objects_node()
node_cache_dict = {}


def get_node_dict(node_info: str):
    pairs = node_info.split(";")
    if len(pairs) <= 1:
        return {"ns": idx, "s": node_info}
    data = {}
    for pair in pairs:
        key, value = pair.split("=")
        data[key] = value
    return data


def add_node(parent: Node, node_name, value, node_dict: dict, node_type="obj"):
    node_path = node_dict.get("s")
    node_ns = node_dict.get("ns")
    node_id = node_dict.get("i")
    # 检查节点是否已经存在
    children: List[Node] = parent.get_children()
    for child in children:
        browse_name: ua.QualifiedName = child.get_browse_name()
        if browse_name.Name == node_name:
            return child  # 返回已存在的节点

    # 如果节点不存在,创建它
    if node_type == "var":
        if SERVER_TYPE.upper() == "FANUC":
            node_idx = f"ns={node_ns};i={node_id}" if node_id else idx
        else:
            node_idx = f"ns={node_ns};s={node_path}"
        new_node = parent.add_variable(node_idx, node_name, str(value))
    else:
        new_node: Node = parent.add_object(idx, node_name)
        # new_node.set_writable()  # 设置变量为可写
    return new_node


def set_node_value(node_info: str, value):
    node_dict = get_node_dict(node_info)
    node_path: str = node_dict.get("s")
    if node_path.startswith("/Server/"):
        return
    try:
        last_node: Node = node_cache_dict.get(node_path)
        if last_node is None:
            # 递归查找或创建节点
            parent = objects
            parts = node_path.split("/")
            for part in parts[:-1]:  # 遍历除了最后一个节点的所有节点
                if part == "":
                    continue
                parent = add_node(
                    parent, part, value, node_dict
                )  # 创建节点(如果不存在)

            # 设置或更新最后一个节点的值
            last_node_name = parts[-1]
            last_node = add_node(parent, last_node_name, value, node_dict, "var")
            node_cache_dict[node_path] = last_node
        last_node.set_value(str(value))
    except Exception as e:
        print(f"Error setting node value: {e}")
5. 读取CSV文件

使用csv模块读取CSV文件,并更新OPC UA节点的值。

csv.field_size_limit(500 * 1024 * 1024)

try:
    with open(CSV_FILE, "r") as csvfile:
        csvreader = csv.reader(csvfile)
        header = next(csvreader)
        row = next(csvreader)
        for i in range(len(header)):
            set_node_value(header[i], row[i])  # 首次创建节点

        server.start()  # 启动服务器(先创建结构再启动server)
        print(f"Server started at {SERVER_URL}")

        # 每秒读取csv中的一行,更新节点值
        while True:
            try:
                row = next(csvreader)
                for i in range(len(header)):
                    set_node_value(header[i], row[i])
                time.sleep(1)
            except StopIteration:
                csvfile.seek(0)  # 回到文件开头
                next(csvreader)  # 跳过表头
finally:
    server.stop() if server else None
6. 运行服务器

执行脚本,服务器将启动,并开始从CSV文件读取数据更新OPC UA节点。

python opcua_simulator.py --ip 0.0.0.0 --port 4840 --source opcua_data.csv

查看服务器

可以使用 UA Sample Client (下载链接)软件连接到创建的opcua服务器查看结构和数据。
在这里插入图片描述
数据监控:
请添加图片描述

客户端

【Python】OPC UA 服务器与客户端的实现

总结

 本文介绍了如何使用Python和OPC UA库创建一个模拟服务器。通过读取CSV文件,服务器能够动态更新OPC UA节点的值,这对于开发和测试OPC UA客户端应用程序非常有用。您可以按照上述步骤进行实践,并根据具体需求调整代码。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1605658.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Leo赠书活动-24期 【三大层次学习企业架构框架TOGAF】文末送书

✅作者简介&#xff1a;大家好&#xff0c;我是Leo&#xff0c;热爱Java后端开发者&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1a;Leo的博客 &#x1f49e;当前专栏&#xff1a; 赠书活动专栏 ✨特色专栏&#xff1a;…

鸿蒙开发岗突增!它和前端开发到底有哪些区别和联系?

2024年1 月 18 日&#xff0c;鸿蒙 Next 预览版面向开发者正式开放申请。至此&#xff0c;鸿蒙原生应用版图已成型&#xff0c;这个中国自主研发的操作系统&#xff0c;正式走上了独立之路。 有许多的公司都陆续地加入了鸿蒙原生应用开发的队列&#xff0c;从年初宣布的200个应…

网络基础-基于TCP协议的Socket通讯

一、Socket通讯基于TCP协议流程图 UDP 的 Socket 编程相对简单些不在介绍。 二、 服务端程序启动 服务端程序要先跑起来&#xff0c;然后等待客户端的连接和数据。 服务端程序首先调用 socket() 函数&#xff0c;创建网络协议为 IPv4&#xff0c;以及传输协议为 TCP 的…

基于数据库现有表导出为设计文档

1.查询 SELECTCOLUMN_NAME 字段名,COLUMN_COMMENT 字段描述,COLUMN_TYPE 字段类型,false as 是否为主键 FROMINFORMATION_SCHEMA.COLUMNS wheretable_NAME region -- 表名2.查询结果 3.导出为excel

求交错且分母为阶乘的和(java)

import java.util.*; public class APP1{public static void main(String[] args){double sum0.0;int n0;int flag1;int fm1;Scanner reader new Scanner(System.in);System.out.println("请输入n的值&#xff1a;");nreader.nextInt();for(int i0;i<n;i){fm*i; …

【笔试训练】day5

今天的题&#xff0c;最后一题忘公式了&#xff0c;卡了一会推出来了 1、游游的you 思路&#xff1a; 看清题目意思就行&#xff0c;这里的相邻两个o可以重复算&#xff0c;也就是说&#xff0c;“ooo”算2分。 先算you的得分&#xff0c;再算oo 对了&#xff0c;不开long lo…

图神经网络实战——利用节点回归预测网络流量

图神经网络实战——利用节点回归预测网络流量 0. 前言1. 数据集分析2. 实现 GCN 模型执行节点回归3. 模型测试相关链接 0. 前言 在机器学习中&#xff0c;回归指的是对连续值的预测。通常与分类形成鲜明对比&#xff0c;分类的目标是找到正确的类别(即离散值&#xff0c;而非连…

C++_智能指针

文章目录 前言一、智能指针原理二、库支持的智能指针类型1.std::auto_ptr2.std::unique_ptr3.std::shared_ptr4.std::weak_ptr 三、删除器总结 前言 智能指针是一种采用RAII思想来保护申请内存不被泄露的方式来管理我们申请的内存&#xff0c;对于RAII&#xff0c;我们之前也已…

这是刚发布的人形机器人?不,分明是《午夜凶铃》现实版

波士顿动力公司大名鼎鼎的人形机器人Atlas&#xff0c;你一定见识过吧。 Atlas可以像人一样行走、奔跑和攀爬 | 波士顿动力公司 这款用液压系统打造的机器人产品&#xff0c;经过十多年的调试升级&#xff0c;才终于拥有了人类一般灵活的身手。在波士顿动力公司历年来放出的视频…

OpenHarmony UI开发-ohos-svg

简介 ohos-svg是一个SVG图片的解析器和渲染器&#xff0c;解析SVG图片并渲染到页面上。它支持大部分 SVG 1.1 规范&#xff0c;包括基本形状、路径、文本、样式和渐变,它能够渲染大多数标准的 SVG 图像。ohos-svg的优点是性能好、内存占用低。 效果展示 SVG图片解析并绘制: …

第七周学习笔记DAY.4-方法重写与多态

学完本次课程后&#xff0c;你能够&#xff1a; 实现方法重写 深入理解继承相关概念 了解Object类 会使用重写实现多态机制 会使用instanceof运算符 会使用向上转型 会使用向下转型 什么是方法重写 方法的重写或方法的覆盖&#xff08;overriding&#xff09; 1.子类根据…

【STM32CubeIDE 1.15.0】汉化包带路径配置过程

一、IDE软件下载 二、汉化版包路径 三、IDE软件板载汉化包 一、IDE软件下载 ST官网IDE下载链接 二、汉化版包路径 https://mirrors.ustc.edu.cn/eclipse/technology/babel/update-site/ 找不到就到.cn后面一级一级进 三、IDE软件板载汉化包 https://mirrors.ustc.edu…

Jmeter 压测-Jprofiler定位接口相应时间长

1、环境准备 执行压测脚本&#xff0c;分析该接口tps很低&#xff0c;响应时间很长 高频接口在100ms以内&#xff0c;普通接口在200ms以内 2、JProfiler分析响应时间长的方法 ①JProfiler录制数据 压测脚本&#xff0c;执行1-3分钟即可 ②分析接口相应时间长的方法 通过Me…

Django之rest_framework(四)

扩展的视图类介绍 rest_framework提供了几种后端视图(对数据资源进行增删改查)处理流程的实现,如果需要编写的视图属于这几种,则视图可以通过继承相应的扩展类来复用代码,减少自己编写的代码量 官网:3 - Class based views - Django REST framework rest_framework.mixi…

cobaltstrike 流量隐藏

云函数 新建一个云函数&#xff0c;在代码位置进行修改 首先导入 yisiwei.zip 的云函数包 PYTHON # -*- coding: utf8 -*- import json, requests, base64def main_handler(event, context):C2 https://49.xx.xx.xx # 这里可以使用 HTTP、HTTPS~下角标~ path event[path]h…

在Windows 11/10/8中打开计算机管理的几种方法,总有一种适合你

序言 计算机管理是Windows中一个功能强大的工具,允许你管理和监视计算机系统的各个方面。使用“计算机管理”,你可以快速访问“设备管理器”、“磁盘管理”、“本地用户管理”等。本文将向你展示如何在Windows 11/10/8中打开“计算机管理器”。 网上有很多方法可以打开计算…

Spring Security详细学习第一篇

Spring Security 前言Spring Security入门编辑Spring Security底层原理UserDetailsService接口PasswordEncoder接口 认证登录校验密码加密存储退出登录 前言 本文是作者学习三更老师的Spring Security课程所记录的学习心得和笔记知识&#xff0c;希望能帮助到大家 Spring Sec…

单分支:if语句

示例&#xff1a; /*** brief how about if? show you here.* author wenxuanpei* email 15873152445163.com(query for any question here)*/ #define _CRT_SECURE_NO_WARNINGS//support c-library in Microsoft-Visual-Studio #include <stdio.h>#define if_state…

C语言学习/复习23---

一、数据的存储 二、数据类型的介绍 三、整型在内存中的存储 将原码转换为补码。如果数是正数&#xff0c;则补码与原码相同&#xff1b;如果数是负数&#xff0c;则先将原码按位取反&#xff0c;然后加1。将补码转换原补码。如果数是正数&#xff0c;则补码与原码相同&#x…

【笔试强训】双指针的思想!

1.数组中字符串的最小距离 题目链接 解题思路&#xff1a; 小技巧 ✌&#xff1a;标记两个字符串是否被找到&#xff0c;每次找到一个字符串就更新一次答案来保证找到的是最小距离。 实现代码&#xff1a; #include <iostream> using namespace std;int main() {in…