[XOA Code]Test-suite-RFC2544

news2024/11/23 13:51:20

Testsuite-RFC2544,微信公众号也在间断的更新中,博客码码字,节奏老是踩不对,没什么条理

难道要不破不立,要全部推倒了重新来吗,....5555

XOA-2544测试

XOA API RFC2544测试

Dataset.py 

from typing import Any, List, Tuple, Dict
from pydantic import BaseModel, validator
from .utils import exceptions, constants as const
from .model.m_test_config import TestConfigModel
from .model.m_test_type_config import TestTypesConfiguration
from .model.m_port_config import PortConfiguration
from .model.m_protocol_segment import ProtocolSegmentProfileConfig


PortConfType = List[PortConfiguration]


class PluginModel2544(BaseModel):  # Main Model
    test_configuration: TestConfigModel
    protocol_segments: List[ProtocolSegmentProfileConfig]
    ports_configuration: PortConfType
    test_types_configuration: TestTypesConfiguration

    def set_ports_rx_tx_type(self) -> None:
        direction = self.test_configuration.topology_config.direction
        for port_config in self.ports_configuration:
            if port_config.is_loop:
                continue
            elif direction == const.TrafficDirection.EAST_TO_WEST:
                if port_config.port_group.is_east:
                    port_config.set_rx_port(False)
                elif port_config.port_group.is_west:
                    port_config.set_tx_port(False)
            elif direction == const.TrafficDirection.WEST_TO_EAST:
                if port_config.port_group.is_east:
                    port_config.set_tx_port(False)
                elif port_config.port_group.is_west:
                    port_config.set_rx_port(False)

    def set_profile(self) -> None:
        for port_config in self.ports_configuration:
            profile_id = port_config.protocol_segment_profile_id
            profile = [i for i in self.protocol_segments if i.id == profile_id][0]
            port_config.set_profile(profile.copy(deep=True))

    def __init__(self, **data: Dict[str, Any]) -> None:
        super().__init__(**data)
        self.set_ports_rx_tx_type()

        self.check_port_groups_and_peers()
        self.set_profile()

    @validator("ports_configuration", always=True)
    def check_ip_properties(cls, v: "PortConfType", values) -> "PortConfType":
        pro_map = {v.id: v.protocol_version for v in values['protocol_segments']}
        for i, port_config in enumerate(v):
            if port_config.protocol_segment_profile_id not in pro_map:
                raise exceptions.PSPMissing()
            if (
                pro_map[port_config.protocol_segment_profile_id].is_l3
                and (not port_config.ip_address or port_config.ip_address.address.is_empty)
            ):
                raise exceptions.IPAddressMissing()
        return v

    @validator("ports_configuration", always=True)
    def check_port_count(
        cls, v: "PortConfType", values: Dict[str, Any]
    ) -> "PortConfType":
        require_ports = 2
        if "test_configuration" in values:
            topology: const.TestTopology = values[
                "test_configuration"
            ].topology_config.topology
            if topology.is_pair_topology:
                require_ports = 1
            if len(v) < require_ports:
                raise exceptions.PortConfigNotEnough(require_ports)
        return v

    def check_port_groups_and_peers(self) -> None:
        topology = self.test_configuration.topology_config.topology
        ports_in_east = ports_in_west = 0
        uses_port_peer = topology.is_pair_topology
        for port_config in self.ports_configuration:
            if not topology.is_mesh_topology:
                ports_in_east, ports_in_west = self.count_port_group(
                    port_config, uses_port_peer, ports_in_east, ports_in_west
                )
            if uses_port_peer:
                self.check_port_peer(port_config, self.ports_configuration)
        if not topology.is_mesh_topology:
            for i, group in (ports_in_east, "East"), (ports_in_west, "West"):
                if not i:
                    raise exceptions.PortGroupError(group)

    @validator("ports_configuration", always=True)
    def check_modifier_mode_and_segments(
        cls, v: "PortConfType", values: Dict[str, Any]
    ) -> "PortConfType":
        if "test_configuration" in values:
            flow_creation_type = values[
                "test_configuration"
            ].test_execution_config.flow_creation_config.flow_creation_type
            for port_config in v:
                if (
                    not flow_creation_type.is_stream_based
                ) and port_config.profile.protocol_version.is_l3:
                    raise exceptions.ModifierBasedNotSupportL3()
        return v

    @validator("ports_configuration", always=True)
    def check_port_group(
        cls, v: "PortConfiguration", values: Dict[str, Any]
    ) -> "PortConfiguration":
        if "ports_configuration" in values and "test_configuration" in values:
            for k, p in values["ports_configuration"].items():
                if (
                    p.port_group == const.PortGroup.UNDEFINED
                    and not values[
                        "test_configuration"
                    ].topology_config.topology.is_mesh_topology
                ):
                    raise exceptions.PortGroupNeeded()
        return v

    @validator("test_types_configuration", always=True)
    def check_test_type_enable(
        cls, v: "TestTypesConfiguration"
    ) -> "TestTypesConfiguration":
        if not any(
            {
                v.throughput_test.enabled,
                v.latency_test.enabled,
                v.frame_loss_rate_test.enabled,
                v.back_to_back_test.enabled,
            }
        ):
            raise exceptions.TestTypesError()
        return v

    @validator("test_types_configuration", always=True)
    def check_result_scope(
        cls, v: "TestTypesConfiguration", values: Dict[str, Any]
    ) -> "TestTypesConfiguration":
        if "test_configuration" not in values:
            return v
        if (
            v.throughput_test.enabled
            and v.throughput_test.rate_iteration_options.result_scope
            == const.RateResultScopeType.PER_SOURCE_PORT
            and not values[
                "test_configuration"
            ].test_execution_config.flow_creation_config.flow_creation_type.is_stream_based
        ):
            raise exceptions.ModifierBasedNotSupportPerPortResult()
        return v

    @staticmethod
    def count_port_group(
        port_config: "PortConfiguration",
        uses_port_peer: bool,
        ports_in_east: int,
        ports_in_west: int,
    ) -> Tuple[int, int]:
        if port_config.port_group.is_east:
            ports_in_east += 1
            if uses_port_peer and port_config.is_loop:
                ports_in_west += 1

        elif port_config.port_group.is_west:
            ports_in_west += 1
            if uses_port_peer and port_config.is_loop:
                ports_in_east += 1

        return ports_in_east, ports_in_west

    @staticmethod
    def check_port_peer(
        port_config: "PortConfiguration",
        ports_configuration: List["PortConfiguration"],
    ) -> None:
        peer_slot = port_config.peer_slot
        if peer_slot is None or peer_slot >= len(ports_configuration):
            raise exceptions.PortPeerNeeded()
        peer_config = ports_configuration[peer_slot]
        if not port_config.is_pair(peer_config) or not peer_config.is_pair(port_config):
            raise exceptions.PortPeerInconsistent()

Entry.py

from xoa_core.types import PluginAbstract
from typing import TYPE_CHECKING, List
from .plugin.config_checkers import check_test_type_config
from .plugin.tc_base import TestCaseProcessor
from .plugin.test_resource import ResourceManager
from .plugin.test_config import TestConfigData
from .plugin.test_type_config import get_available_test_type_config, AllTestTypeConfig
if TYPE_CHECKING:
    from .dataset import PluginModel2544


class TestSuite2544(PluginAbstract["PluginModel2544"]):
    def prepare(self) -> None:
        self.tpld_id = 0
        self.mac_learned = False
        self.iteration: int = 1
        self.__test_conf = TestConfigData(self.cfg.test_configuration)
        self.resources = ResourceManager(
            self.testers,
            self.cfg.ports_configuration,
            self.port_identities,
            self.__test_conf,
            self.xoa_out,
        )
        self._test_type_conf: List["AllTestTypeConfig"] = get_available_test_type_config(self.cfg.test_types_configuration) 
        self.tc = TestCaseProcessor(self.resources, self.__test_conf, self._test_type_conf, self.state_conditions, self.xoa_out)

    async def __pre_test(self) -> None:
        """ check config and configure ports and streams"""
        check_test_type_config(self._test_type_conf)
        await self.resources.init_resource(
            self.cfg.test_types_configuration.latency_test.latency_mode,
        )

    async def __do_test(self) -> None:
        """ configure tests and run traffic """
        await self.tc.start()

    async def __post_test(self) -> None:
        """ after test should release resource """
        # TODO: wait for callback exception catch
        await self.resources.free()

    async def start(self) -> None:
        await self.__pre_test()
        await self.__do_test()
        await self.__post_test()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1543615.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

3.Spring Bean

3.1 Bean的配置 Spring可以看作一个大型工厂&#xff0c;生产和管理Spring容器中的bean。如何使用这个工厂生产和管理bean&#xff0c;需要开发者将bean配置在Spring的配置文件中。Spring框架支持XML和Properties两种格式的配置文件&#xff0c;在实际开发中&#xff0c;常用X…

天府锋巢直播产业基地科学城核心区域

天府锋巢直播产业基地位于天府新区科学城板块&#xff0c;地理位置优越&#xff0c;交通便利&#xff0c;是集直播电商、创新创业、人才培养等多功能于一体的现代化成都直播基地。这里汇聚了众多优秀的直播电商企业和创业团队&#xff0c;为直播电商行业的发展注入了强大的动力…

算法打卡day27|贪心算法篇01|Leetcode 455.分发饼干、376. 摆动序列、53. 最大子序和

贪心算法理论基础 定义 贪心的本质是选择每一阶段的局部最优&#xff0c;从而达到全局最优。 例如&#xff0c;有一堆不同数值的钞票&#xff0c;可以拿走十张&#xff0c;如果想达到最大的金额可以指定每次拿最大的&#xff0c;最终结果就是拿走最大数额的钱。 每次拿最大的就…

混合像元分解:Matlab如何帮助揭示地表组成?

光谱和图像是人们观察世界的两种方式&#xff0c;高光谱遥感通过“图谱合一”的技术创新将两者结合起来&#xff0c;大大提高了人们对客观世界的认知能力&#xff0c;本来在宽波段遥感中不可探测的物质&#xff0c;在高光谱遥感中能被探测。以高光谱遥感为核心&#xff0c;构建…

分享一道DFS常见题目 C++实现路径之谜

题目描述&#xff1a;路径之谜 小明冒充X星球的骑士&#xff0c;进入了一个奇怪的城堡。 城堡里边什么都没有&#xff0c;只有方形石头铺成的地面。 假设城堡地面是 n x n 个方格。【如图1.png】所示。 按习俗&#xff0c;骑士要从西北角走到东南角。 可以横向或纵向移动&…

3个新变化!2024年国家高新技术企业认定攻略

根据《党和国家机构改革方案》和《党中央、国务院议事协调机构优化调整方案》&#xff0c;经报党中央、国务院批准&#xff0c;现将工业和信息化部职责、机构、编制调整&#xff0c;2024年由工信部管理国家高新技术企业认定工作。 总的来说&#xff0c;通过对政策的研究和解读…

Leetcode 76 最小覆盖子串 java版

官网链接&#xff1a; . - 力扣&#xff08;LeetCode&#xff09; 1. 问题&#xff1a; 给你一个字符串 s 、一个字符串 t 。返回 s 中涵盖 t 所有字符的最小子串。如果 s 中不存在涵盖 t 所有字符的子串&#xff0c;则返回空字符串 "" 。 注意&#xff1a; 对于 …

langchian入门四:LLM+Agents代理=贾维斯?让大模型拥有三头六臂

什么是Agent 在日常生活中,不难发现,chatgpt通过文本输入进行处理后返回的也是文本内容,就像是一个只有头的人,能听能思考能说话,但是无法行动.而Agent是一种能够自主决策、采取行动以达到某种目标的实体。被解释为"智能体"或者"代理". 代理的核心思想是…

Java全栈课程之Linux———基本属性

一、看懂文件属性 Linux系统是一种典型的多用户系统&#xff0c;不同的用户处于不同的地位&#xff0c;拥有不同的权限。为了保护系统的安全性&#xff0c;Linux系统对不同的用户访问同一文件&#xff08;包括目录文件&#xff09;的权限做了不同的规定。 在Linux中我们可以使…

Mysql数据库——数据备份与恢复

目录 一、数据备份的重要性 二、数据库备份的分类 1.从物理与逻辑的角度分类 2.从数据库的备份策略角度&#xff0c;备份可分为 2.1完全备份 2.2差异备份 2.3增量备份 2.4总结 三、常见的备份方法 四、Mysql数据库完全备份 1.完全备份定义 2.优缺点 3.数据库完全备…

代码随想录算法训练营第25天|LeetCode106.中序和后序遍历构造二叉树、LeetCode105.中序和先序遍历构造二叉树

代码随想录算法训练营第25天|LeetCode106.中序和后序遍历构造二叉树、LeetCode105.中序和先序遍历构造二叉树 1、LeetCode106.中序和后序遍历构造二叉树 106. 从中序与后序遍历序列构造二叉树 - 力扣&#xff08;LeetCode&#xff09; 知道理论怎么求&#xff0c;但是太久没写…

Adaptive Partitioning

qnx开源代码 GitHub - vocho/openqnx: mirror of git://git.code.sf.net/p/monartis/openqnx http://www.qnx.com/developers/docs/7.0.0/#com.qnx.doc.adaptivepartitioning.userguide/topic/about_howtouseguide_.html ap是对进程和线程集合分配最小的系统资源&#xff0c;目…

基于nodejs+vue宿舍管理系统python-flask-django-php

随着信息时代的来临&#xff0c;过去的传统管理方式缺点逐渐暴露&#xff0c;对过去的传统管理方式的缺点进行分析&#xff0c;采取计算机方式构建宿舍管理系统。本文通过课题背景、课题目的及意义相关技术&#xff0c;提出了一种楼宇信息、宿舍信息、宿舍安排、缺勤信息等于一…

OceanBase中NOT EXISTS是否需要被改写

作者简介 张瑞远&#xff0c;曾经从事银行、证券数仓设计、开发、优化类工作&#xff0c;现主要从事电信级IT系统及数据库的规划设计、架构设计、运维实施、运维服务、故障处理、性能优化等工作。 持有Orale OCM,MySQL OCP及国产代表数据库认证。 获得的专业技能与认证包括 Oce…

直播预告丨困气排气解决新方案--毅速金属3D打印随形透气钢

您是否也遇到过这些问题 模具困气造成产品出现注塑瑕疵&#xff0c;但复杂的产品形状导致无法开排气槽 常规透气钢需要拆镶件导致工件强度下降 某些工件部分不接受分模线区域无法拆镶件无法使用常规透气钢 面对越来越复杂的产品和结构&#xff0c;越来越多需要透气、保压、…

全网最新网络安全自学路线,最详细没有之一!!!

在各大平台搜的网安学习路线都太粗略了。。。。看不下去了&#xff01; 我把自己整理的系统学习路线&#xff0c;拿出来跟大家分享了&#xff01; 建议的学习顺序&#xff1a; 一、网络安全学习普法&#xff08;心里有个数&#xff0c;要进去坐几年&#xff01;&#xff09; 1…

Spring 面试——restcontroller/requestmapping

RestController Controller ResponseBody Controller&#xff1a;包含Component&#xff0c;把当前类声明成为一个 bean ResponseBody&#xff1a;表示方法返回的结果直接作为 HTTP 响应的内容&#xff0c;不是返回视图 3.RequestMapping注解的基本用法_哔哩哔哩_bilibili

Linux文件系列:磁盘,文件系统,软硬链接

Linux文件系列:磁盘,文件系统,软硬链接 一.磁盘相关知识1.磁盘机械构成2.磁盘物理存储3.磁盘逻辑存储1.LBA地址2.磁盘的分区和分组 二.文件系统和inode1.inode结构体2.文件系统1.Super Block(超级块)2.Group Descriptor Table(块组描述表GDT)3.inode Table4.Data Blocks5.Block…

如何本地部署Imagewheel并实现无公网IP远程连接打造个人云图床

文章目录 1.前言2. Imagewheel网站搭建2.1. Imagewheel下载和安装2.2. Imagewheel网页测试2.3.cpolar的安装和注册 3.本地网页发布3.1.Cpolar临时数据隧道3.2.Cpolar稳定隧道&#xff08;云端设置&#xff09;3.3.Cpolar稳定隧道&#xff08;本地设置&#xff09; 4.公网访问测…

域名SSL证书怎么获取?

获取域名证书的步骤如下&#xff1a; 选择认证机构&#xff1a;域名证书必须从受信任的认证机构(CA)中申请&#xff0c;如JoySSL、GeoTrust、、Thawte等。收集信息&#xff1a;在申请域名证书之前&#xff0c;需要准备一些证明信息&#xff0c;如域名认证等。创建CSR&#xff…