【python5】闭包/装饰器,

news2025/1/9 15:06:46

文章目录

  • 1.闭包和装饰器:函数里return就是闭包
  • 2.解析eeprom:如下是二进制文件,C8是一个字节
  • 3.json/configparser/optparse:json.dumps(将字典转化为字符串,将json信息写进文件),json.loads(将字符串转化为字典)
  • 4.map:[]列表 , ()元组 , {}集合 , {a:b}字典。for else中break后不走else
  • 5.walk/split/getattr/bin:bin转为二进制0b...
  • 6.sol/mmc:emmc_info_cmd = "mmc cid read /sys/block/mmcblk0/device",size_cmd = "cat /sys/block/mmcblk0/size"
  • 5.gpio/pcie:lspci,re.compile


1.闭包和装饰器:函数里return就是闭包

在这里插入图片描述
如下打印第一行是:我在~。
在这里插入图片描述
在这里插入图片描述

2.解析eeprom:如下是二进制文件,C8是一个字节

在这里插入图片描述
在这里插入图片描述
如下5x4+4+2x4=32。
在这里插入图片描述

# tlv_tool.py
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import sys
import struct
import os
import time
import json
import subprocess
import binascii
from os.path import dirname, abspath

# a.py
# import json, os
# print(json.__file__) # /usr/lib/python3.8/json/__init__.py
# print(os.path.abspath(__file__)) # /home/y/utool/a.py
# print(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # /home/y

sys.path.append(dirname(dirname(abspath(__file__))))  #当前tlv_tool.py文件所在绝对路径的文件夹的上级文件夹
Project_path = dirname(abspath(__file__))  #获取所在路径文件夹
Conf_path = '{}/tlv_conf.json'.format(Project_path)

def ParseJson(filename=Conf_path):
    with open(filename, 'r') as f:
        jdata = json.load(f)
    return jdata
tlv_json = ParseJson()
tlv_reg_path = tlv_json['tlv_reg_path']
tlv_eeprom_path = tlv_json['tlv_eeprom_path']

def run_command(cmd):
    proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out, err = proc.communicate()
    if err and proc.returncode != 0:
        return err
    return out.decode()

# def write_reg(fpath, reg, data):
#     if os.getcwd() != fpath:
#         if os.path.exists(fpath) == False or os.chdir(fpath) == False:
#             return 'file Not exists or file Not accessible'
#     if run_command('echo {} {} > setreg'.format(reg, data)) != 0:
#         return 'write fail'
#     return 0

# def read_reg(fpath, reg):
#     if os.getcwd() != fpath:
#         if os.path.exists(fpath) == False or os.chdir(fpath) == False:
#             return 'file Not exists or file Not accessible'
#     if run_command('echo {} > getreg'.format(reg)) != '':
#         return 'echo reg fail'
#     return run_command('cat getreg').strip()

def crc2hex(crc):
    return '0x{:0>8X}'.format(binascii.crc32(crc))    #取crc32的八位数据 %x返回16进制,摘要算法

def unlock_tlv_eeprom(path,reg,data = '0x0'):   # unlock_tlv_eeprom(tlv_reg_path,'0x06')  #0x06寄存器eeprom写保护
    if os.getcwd() != path:
        if os.path.exists(path) == False or os.chdir(path) == False:
            return 'file Not exists or file Not accessible'
    if run_command('echo {} {} > setreg'.format(reg, data)) != 0:
        return 'write fail'
    if run_command('echo {} > getreg'.format(reg)) != '':
        return 'echo reg fail'
    output =  run_command('cat getreg').strip()
    print("set reg {} value: {}".format(reg,output))
    if output == data:
        print('Unlock tlv eeprom success!')  # 0x0
        return True
    print('Unlock tlv eeprom fail!')
    return False

def tlv_read_eeprom(path,length=None,offset=None):  # tlv_read_eeprom(tlv_eeprom_path,length)
    with open('{}eeprom'.format(path),mode = 'rb') as f:
        return f.read(length)

def tlv_read_length():
    with open('{}eeprom'.format(tlv_eeprom_path),mode = 'rb') as f:
        '''
            tlv_header_length = char[8] + u8 + u16
        '''
        tlv_header_length = 11
        tlv_signature , tlv_version , tlv_totallen = struct.unpack("!8sBH", f.read(tlv_header_length))  #读11字节
    return tlv_header_length + tlv_totallen

def tlv_dump(args):  #读出eeprom信息写到tlv_dump.bin文件
    length = tlv_read_length()
    data = tlv_read_eeprom(tlv_eeprom_path,length)
    try:
        file_name = args[2]
    except IndexError:
        file_name = 'tlv_dump.bin'
    with open(file_name,mode = 'wb') as f:
        # print(data)
        # print(len(data))
        f.write(data) 
        f.flush()    
    print("Dump tlv data finish!")

tlv_type_dirt = {
    0x21 : 'Product Name', 
    0x22 : 'Part Number',   
    0x23 : 'Serial Number', 
    0x24 : 'Base MAC Address', 
    0x25 : 'Manufacture Date',
    0x26 : 'Device Version', 
    0x27 : 'Label Revision', 
    0x28 : 'Platform Name', 
    0x29 : 'ONIE Version', 
    0x2A : 'MAC Address Size', 
    0x2B : 'Manufacturer', 
    0x2C : 'Country Code',
    0x2D : 'Vendor Name',
    0x2E : 'Diag Version',
    0x2F : 'Service Tag',
    0xFD : 'Vendor Extension',
    0xFE : 'CRC-32'
}

def tlv_write(args):
    with open(args[2],mode = 'rb') as f:
       data = f.read()
    if unlock_tlv_eeprom(tlv_reg_path,'0x06'):
        with open('{}eeprom'.format(tlv_eeprom_path),mode = 'rb+') as f:
            f.write(data)
            f.flush()
            print("Write tlv data finish!")

def tlv_read(args):
    if len(args) == 3 and '.bin' in args[2]:
        path = args[2]
    else:
        path = '{}eeprom'.format(tlv_eeprom_path)
    with open(path,mode = 'rb') as f:
        '''
            tlv_header_length = char[8] + u8 + u16
        '''
        tlv_header_length = 11
        tlv_signature , tlv_version , tlv_totallen = struct.unpack("!8sBH", f.read(tlv_header_length))
        print("TlvInfo Header:")
        print("    Id String    : {}".format(tlv_signature.decode().strip()))
        print("    Version      : {}".format(tlv_version))
        print("    Total Length : {}".format(tlv_totallen))
        '''
            data_header_length = u8 + u8
        '''
        print("TLV Name         Code Len  Value")
        print("--------------------------------")
        data_header_length = 2
        length_cnt = 0
        stored_crc = None
        while length_cnt < tlv_totallen :
            data_type , data_length = struct.unpack("!BB", f.read(data_header_length))
            length_cnt += data_header_length + data_length
            if (data_type in tlv_type_dirt.keys()):
                pass
            else:
                continue

            if tlv_type_dirt[data_type] == 'Base MAC Address':
                if data_length == 6:
                    mac0,mac1,mac2,mac3,mac4,mac5 = struct.unpack('!BBBBBB', f.read(data_length))
                    data = "{:0>2X}:{:0>2X}:{:0>2X}:{:0>2X}:{:0>2X}:{:0>2X}".format(mac0,mac1,mac2,mac3,mac4,mac5)
                else:
                    print('Base MAC Address Error Length {}!'.format(data_length))
                    exit

            elif tlv_type_dirt[data_type] == 'Device Version':
                if data_length == 1:
                    data = struct.unpack('!B', f.read(data_length))
                    data = data[0]
                else:
                    print('Device Version Error Length {}!'.format(data_length))
                    exit

            elif tlv_type_dirt[data_type] == 'MAC Address Size':
                if data_length == 2:
                    byte0,byte1 = struct.unpack('!BB', f.read(data_length))
                    data = (byte0 << 8) | byte1
                else:
                    print('MAC Address Size Error Length {}!'.format(data_length))
                    exit

            elif tlv_type_dirt[data_type] == 'Vendor Extension':
                if data_length > 0:
                    vendor_cmd = '!'
                    for index in range(data_length):
                       vendor_cmd += "B"
                    byte_tuple = struct.unpack(vendor_cmd, f.read(data_length))
                    data = ''
                    for byte in byte_tuple:
                        data += '0x{:0>2X} '.format(byte)
                else:
                    data = 'NULL'
                  
            elif tlv_type_dirt[data_type] == 'CRC-32':
                if data_length == 4:
                    crc0,crc1,crc2,crc3 = struct.unpack('!BBBB', f.read(data_length))
                    data = "0x{:0>2X}{:0>2X}{:0>2X}{:0>2X}".format(crc0,crc1,crc2,crc3)
                    stored_crc = eval(data)
                else:
                    print('CRC Error Length {}!'.format(data_length))
                    exit
            else:
                if data_length > 0:
                    data = struct.unpack('!{}s'.format(data_length), f.read(data_length))
                    data = data[0].decode().strip()
                else:
                    # data = None
                    # data = ''
                    data = 'NULL'

            print("{:<16} 0x{:x} {:>3} : {}".format(tlv_type_dirt[data_type],data_type,data_length,data))
        
        if stored_crc != None:
            f.seek(0,0)
            crc_data = f.read(tlv_header_length + tlv_totallen - 4)
            # print(crc_data)
            calc_crc = eval(crc2hex(crc_data))
            if calc_crc == stored_crc:
                print('Checksum is valid')
            else:
                print('Checksum is invalid')
        else:
            print('No Checksum Data')

def tlv_update(args):
    try:
        file_name = args[2]
    except IndexError:
        file_name = None
    tlv_data = tlv_json['tlv_data']
    '''
        tlv_header_length = char[8] + u8 + u16
    '''
    tlv_signature = tlv_json['tlv_signature'].encode()
    if len(tlv_signature) > 8:
        print('tlv signature length out off range !!!')
        exit
    while len(tlv_signature) < 8:
        tlv_signature += b'\x00'
    tlv_version = tlv_json['tlv_version'].to_bytes(1,'big')
    tlv_totallen = 0x00

    data_list = []      #一般都在for上面定义一个空列表
    for item in tlv_type_dirt:
        title = tlv_type_dirt[item].replace(' ','_')
        if (title in tlv_data.keys()):
            pass
        else:
            if tlv_type_dirt[item] == 'Manufacture Date':
                pass
            else:
                continue
        if tlv_type_dirt[item] == 'Base MAC Address':
            mac_list = tlv_data[title].split(':')
            mac_code = b''
            for mac_item in mac_list:
                mac_code += eval('0x{}'.format(mac_item)).to_bytes(1,'big')
            data = mac_code
        elif tlv_type_dirt[item] == 'Manufacture Date':
            data = time.strftime("%d-%m-%Y %H:%M:%S", time.localtime()).encode()
        elif tlv_type_dirt[item] == 'Device Version':
            data = tlv_data[title].to_bytes(1,'big')
        elif tlv_type_dirt[item] == 'MAC Address Size':
            data = tlv_data[title].to_bytes(2,'big')
        elif tlv_type_dirt[item] == 'Vendor Extension':
            vendor_list = tlv_data[title].split()
            vendor_code = b''
            for vendor_item in vendor_list:
                vendor_code += eval(vendor_item).to_bytes(1,'big')
            data = vendor_code
        else:
            data = tlv_data[title]
            if data != 'None' and data != '':
                data = data.encode()
            else:
                data = b''
        type_code = item.to_bytes(1,'big')
        len_code = len(data).to_bytes(1,'big')
        data_str = type_code + len_code + data
        data_list.append(data_str)  ##################

    for item in data_list:
        tlv_totallen += len(item)
    '''
        tlv_totallen add crc data str length
    '''
    tlv_totallen = (tlv_totallen + 6).to_bytes(2,'big')
    tlv_final_data = tlv_signature + tlv_version + tlv_totallen
    for data_item in data_list:
        tlv_final_data += data_item
    '''
        add crc data header and length
    '''
    tlv_final_data += (0xFE).to_bytes(1,'big') + (0x04).to_bytes(1,'big')
    '''
        add crc data
    '''
    crc_data = eval(crc2hex(tlv_final_data)).to_bytes(4,'big')

    tlv_final_data += crc_data
    # print(tlv_final_data)

    if file_name == None:
        if unlock_tlv_eeprom(tlv_reg_path,'0x84'):
            with open('{}eeprom'.format(tlv_eeprom_path),mode = 'rb+') as f:
                f.write(tlv_final_data)
                f.flush()
                print("update tlv data finish!")
    else:
        with open(file_name,mode = 'wb') as f:
            f.write(tlv_final_data) 
            f.flush()
        print("update tlv data bin finish!")

def read_help(args):
    print('usage: {} [OPTIONS]'.format(args[0]))
    print('Options are:')
    print('     -w      --write      write tlv bin to eeprom')
    print('     -d      --dump       dump tlv bin from eeprom')
    print('     -r      --read       read  tlv data from eeprom')
    print('     -u      --update     update tlv data to eeprom or create new tlv bin')
    print('     -h      --help       Display this help text and exit')
    print('Example:')
    print('             --write  <bin name>')
    print('             --dump   <bin name>')
    print('             --update <bin name>')
    print('                  just create tlv bin , but not update tlv data')
    print('             --update')
    print('                  just update tlv data , but not create tlv bin')
    print('             --read')

op = {
    '--write'  : tlv_write,
    '--dump'   : tlv_dump,
    '--read'   : tlv_read,
    '--update' : tlv_update,
    '--help'   : read_help,
    '-w'       : tlv_write,
    '-d'       : tlv_dump,
    '-r'       : tlv_read,
    '-u'       : tlv_update,
    '-h'       : read_help,
}

if __name__ == '__main__':
    if len(sys.argv) == 1:
        read_help(sys.argv)
    else:        
        op.get(sys.argv[1], read_help)(sys.argv)
// tlv_conf.json
{
    "tlv_reg_path" : "/sys/bus/platform/devices/hq_switch/cpld_lc/",
    "tlv_eeprom_path" : "/sys/bus/i2c/devices/0-0056/",
    "tlv_signature": "TlvInfo",
    "tlv_version": 1, 
    "tlv_data": {
        "Product_Name":     "ZSR551", 
        "Part_Number":      "ZSR551",   
        "Serial_Number":    "SN number", 
        "Base_MAC_Address": "00:E0:EC:00:00:00", 
        "Device_Version":   1, 
        "Label_Revision":   "ZSR551", 
        "Platform_Name":    "x86_64-ZSR551-r0", 
        "ONIE_Version":     "0.0.1", 
        "MAC_Address_Size": 260, 
        "Manufacturer":     "None", 
        "Country_Code":     "CHN",
        "Vendor_Name":      "None",
        "Diag_Version":     "1.4.0",
        "Service_Tag":      "LB",
        "Vendor_Extension": "0x00 0x00 0x00"
    }
}

在这里插入图片描述

3.json/configparser/optparse:json.dumps(将字典转化为字符串,将json信息写进文件),json.loads(将字符串转化为字典)

#!/usr/bin/python3
# -*- coding: utf-8 -*-
import json

# dict1 = {"age": "12"}                                                      # <type 'dict'>
# json_info = json.dumps(dict1)     
# print("通过json.dumps()函数处理:json_info的类型:"+str(type(json_info)))  # <type 'str'>
# print(json_info)                                                           # {"age": "12"}

json_info = '{"age": "12"}' 									    # <type 'str'>
dict1 = json.loads(json_info)    
print("通过json.loads()函数处理:dict1的类型:"+str(type(dict1)))   # <type 'dict'>
print(dict1)         											    # {u'age': u'12'}  # u:unicode中文显示

# json_info = "{'age': '12'}"
# file = open('1.json','w') 
# json.dump(json_info,file)   #生成1.json里面内容为 "{'age': '12'}"

在这里插入图片描述
在这里插入图片描述
win下.ini多。
在这里插入图片描述
在这里插入图片描述

# coding=UTF-8<code>
import os
import json

def d():
    info_list = ['a','b','c','d','e','f','g','h','i']
    split_list = [info_list[i:i+4] for i in range(0, len(info_list), 4)]
    print(split_list)  # [['a', 'b', 'c', 'd'], ['e', 'f', 'g', 'h'], ['i']]
    x = set('runoob')
    print(x)   # set(['b', 'r', 'u', 'o', 'n'])
    print(len(x)) # 5

i2clist={
    '2':'port 1 SFP',
    '3':'port 2 SFP',
    '4':'port 3 SFP',
    '5':'port 4 SFP'
}
def e():
    for i in i2clist:
        print(i) # 3 2 5 4

def search_dir_by_name(name, dir):
    result = []
    try:
        files = os.listdir(dir)
        for file in files:
            if name in file:
                result.append(os.path.join(dir, file))
    except Exception as e:
        pass
    return result

dir = "/sys/block/"
spect = "sd"
ssdpath = []
# /sys/block/sd*   remove=1  sda  fdisk | grep sda 有/dev/sda , 没有的话只有sda, 拼接/dev/sda
if __name__ == '__main__':
    result = search_dir_by_name(spect, dir)
    #print(result)    #['/sys/block/sdd', '/sys/block/sdb', '/sys/block/sde', '/sys/block/sdc', '/sys/block/sda']
    for item in result:
        with open(os.path.join(item, "removable"), 'r') as fd:
            #print(item)  # /sys/block/sde
            #print(os.path.join(item, "removable"))  # /sys/block/sde/removable
            #print(fd.read()) #0或1  # SyntaxError: Non-ASCII character '\xe6' 加#coding=UTF-8<code>
            value = fd.read()
            if value.strip() == "0":  # found ssd  #去除1
                ssd_disk = "/dev/" + os.path.basename(item)  #ssd_disk:/dev/sda .../dev/sdd
                ssdpath.append(ssd_disk)

4.map:[]列表 , ()元组 , {}集合 , {a:b}字典。for else中break后不走else

python中None,False,空字符串"",0,空列表[],空字典{},空元组()都相当于False。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
cpu和switch连着,中间有一个nic卡(mellanox或e810),switch连着外面的光模块。fpga一端连着cpu,一端连着switch,fpga的eeprom存指令控制switch。

前面板前36个是loopback(光模块插DAC线或光纤,lookpack内部TX和RX接在一起),37-64网口是fpga和nic卡, 6162->mellax 100G(100G无4个lan), e810 25G。

traffic:外部loocpack,cpu发包,cpu收包。
fpga 100g : cpu - fpga - switch - loopback - switch - fpga。
fpga 25g : fpga - switch。

layout只fpga,1个fpga有8个pcie设备(pf),1个pcie设备有16个通道。

如下25G有4个lan。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
def stop_all(self, bdf_lst, bar, port, channel) 。如下self. 默认第一个参数有self。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

5.walk/split/getattr/bin:bin转为二进制0b…

# b.py
import os
import json
import sys

def seek_file(path, name):
    fullpath = None
    for root, dirs, files in os.walk(path):
        if name in files:
            fullpath = '{0}/{1}'.format(root, name)
            break
    return fullpath

if len(sys.argv)==3:
    print(sys.argv[1],sys.argv[2])
    path=seek_file(sys.argv[1],sys.argv[2])
    print(path)
else:
    print(len(sys.argv),'err')

root@bmc-oob:~# python b.py /sys/bus/i2c/devices/26-0048 in3_input
/sys/bus/i2c/devices/26-0048 in3_input
/sys/bus/i2c/devices/26-0048/hwmon/hwmon4/in3_input

在这里插入图片描述

def get_baseboard_sku():
    try:
        proc = subprocess.Popen([IPMITOOL_PATH, "raw", "0x3a", "0x10", "9", "0x1a", "1", "0x12"],
                                stdout=subprocess.PIPE,
                                shell=False,
                                stderr=subprocess.STDOUT) # proc返回20
        stdout = proc.communicate()[0]   # b' 20\n'
        proc.wait()
        sku_str = stdout.decode()  # 20
        sku_bin = bin(int("0x" + sku_str.strip(), 16))[-5:]    # 0x20(十六进制 )= 32(十进制 ),bin(32) = 0b100000
        sku_dict = {
            "00001" : "ns35x",
            "00010" : "ns34x",
            "00100" : "ns33x",
            "01000" : "ns32x",
            "10000" : "ns31x"
        }
        sku = sku_dict.get(sku_bin, "")
    except OSError as e:
        raise OSError("Cannot detect platform")
    return sku

6.sol/mmc:emmc_info_cmd = “mmc cid read /sys/block/mmcblk0/device”,size_cmd = “cat /sys/block/mmcblk0/size”

def check_bmc_sol_test(self, also_print_console=False):
    self.logger.log_info("[RUN SOL TEST]:", also_print_console)
    expect_list = [
        'yes/no',
        'password:',
        'Password:',
        'localhost login:',
        'bmc-oob login:',
        'bmc-oob. login:',
        'root@bmc-oob:~#',
        'root@localhost:~#',
    ]
    remote = pit_util_common.remote(
        self.logger, self.fail_reason, self.bmc_login["usrname"],
        self.bmc_login["ip"], self.bmc_login["password"])
    ret = remote.connect(also_print_console)
    if ret != E.OK:
        return ret
    output = remote.command("sol.sh", time=10, expect_list=['-----------------------------------'])
    self.logger.log_info(output, also_print_console)
    output1 = remote.command('\x15\x12\x14\x32', current=True, expect_list=expect_list)
    self.logger.log_info(output1, also_print_console)
    output2 = remote.command('\r\n', current=True, expect_list=expect_list)
    self.logger.log_info(output2, also_print_console)
    if "localhost login" in output1 or "localhost login" in output2:
        self.logger.log_info("Switch to CPU Pass ", also_print_console)
    else:
        ret = E.EFAIL
        self.fail_reason.append("Switch to CPU Fail")
        remote.disconnect()
        return ret
    output1 = remote.command('\x15\x12\x14\x33', current=True, expect_list=expect_list)
    self.logger.log_info(output1, also_print_console)
    output2 = remote.command('\r\n', current=True, expect_list=expect_list)
    self.logger.log_info(output2, also_print_console)
    if "bmc-oob" in output1 or "bmc-oob" in output2:
        self.logger.log_info("Switch to BMC Pass ", also_print_console)
        ret = E.OK
    else:
        ret = E.EFAIL
        self.fail_reason.append("Switch to BMC Fail")
    remote.disconnect()
    if ret != E.OK:
        self.logger.log_err("FAIL!", also_print_console)
    else:
        self.logger.log_info("PASS.", also_print_console)
    return ret

def bmc_ping_cpu_test(self, also_print_console=False):
    count = 5
    ping_cmd = "ping %s -c %d | grep received" % (
        self.bmc_login["ip"], count)
    status, output = run_command(ping_cmd)
    self.logger.log_info(output, also_print_console)
    if output.find(" 0% packet loss") > 0:
        ret = E.OK
    else:
        self.fail_reason.append("cpu ping server lost packages")
        ret = E.EMGMT11002
    return ret

bmc_ddr_name_cmd = "cat /proc/meminfo | awk '{print $1}'"
bmc_emmc_show_cmd = "fdisk -l | grep /dev/mmcblk0"
bmc_emmc_stress_test_cmd = "touch /var/log/emmc.test ; \
    echo '==emmc read stress test==' ; \
    (time dd if=/dev/mmcblk0 of=/dev/null bs=1M count=10 && sync) >> /var/log/emmc.log ; echo ' ' ; \
    echo '==emmc write stress test==' ; \
    (time dd if=/dev/zero of=/var/log/emmc.test bs=1M count=10 && sync) >> /var/log/emmc.log ; \
    rm /var/log/emmc.test"

def str_to_hex(self, s):    # 把字符串转化为对应的16进制串
    return ' '.join([hex(ord(c)).replace('0x', '') for c in s])

def hex_to_str(self, s):
    return ''.join([chr(i) for i in [int(b, 16) for b in s.split(' ')]])

# def str2bin(s):   # bin二进制
#     return ' '.join([bin(ord(c)).replace('0b', '') for c in s])

# def bin2str(s):
#     return ''.join([chr(i) for i in [int(b, 2) for b in s.split(' ')]])

def replace_char(self, string, char, index):
    string = list(string)
    string[index] = char
    return ''.join(string)

def modify_profile(self, also_print_console=True):
    ret, Before_mac = self.get_mac_address_from_tlv_eeprom(also_print_console)
    if ret != E.OK:
        return ret
    After_mac = Before_mac    # 00E0EC000000
    for i in range(11, -1, -1): # 从11到0 (竖着打印) , 12个(相当于索引)
        last = After_mac[i]  # 从右向左单个遍历
        if last == "F":
            After_mac = self.replace_char(After_mac, "0", i)
            continue
        else:
            last = self.str_to_hex(last)  # 字符0转为ascii码48,再转为十六进制0x30即30
            last = int(last) + int(self.offset["offset"]["bmc_offset"])
            last = str(last)
            last = self.hex_to_str(last)  # 函数里有split,所以last必须为str类型
            After_mac = self.replace_char(After_mac, last, i) # After_mac = "EFFFFFFFFFFF" , 因为下行break,只打印出一行F00000000000
            break
    sign = 0
    os.chdir(self.hwsku_path)
    with open('fru.ini', 'r') as f:
        lines = []
        for line in f.readlines():
            if line != '\n':
                lines.append(line)
        f.close()
    with open('fru.ini', 'w') as f:
        for line in lines:
            if "bmc_base_mac_address=" in line:
                if sign == 1:
                    f.write('%s' % line)
                    continue
                line = "bmc_base_mac_address="+After_mac
                f.write('%s\n' %line)
                sign = 1
            else:
                f.write('%s' %line)
    return After_mac

5.gpio/pcie:lspci,re.compile

    # "gpio":{
    #     "gpio504":{           			    # cpu的gpio504
    #         "pin":"GPIO8",                    # cpld的gpio8
    #         "net":"CTC7132_CPLD_JTAG_EN"      # cpld的gpio8的备注作用:使能
    #     },
    #     "gpio505":{
    #         "pin":"GPIO9",
    #         "net":"CTC7132_CPLD_JTAG_TCK_R"    # 时钟
    #     },
    #     "gpio506":{
    #         "pin":"GPIO10",
    #         "net":"CTC7132_CPLD_JTAG_TMS_R"
    #     },
    #     "gpio507":{
    #         "pin":"GPIO11",
    #         "net":"CTC7132_CPLD_JTAG_TDI_R"    # 数据输入
    #     },
    #     "gpio508":{
    #         "pin":"GPIO12",
    #         "net":"CTC7132_CPLD_JTAG_TDO"      # 数据输出
    #     }
    # }
	self.gpio_map = self.platform_cfg_json['gpio']
    def _read_gpio_pin_value(self, gpio_value_file):
        value = -1
        try:
            with open(gpio_value_file, "r") as f:
                value = f.read()
        except IOError as e:
            self.logger.log_err(str(e))
        return value

    def gpio_pin_check(self, also_print_console=True):
        if not self.gpio_map:
            err = "gpio conf is null!"
            self.fail_reason.append(err)
            self.logger.log_err(err, also_print_console)
            return E.EEXIST

        self.logger.log_info("[GPIO PIN CHECK]:", also_print_console)
        header = ["GPIO", "Pin", "Net", "Status"]
        status_tbl = []
        ret = E.OK
        for gpio, desc in self.gpio_map.items():
            if not os.path.exists("/sys/class/gpio/{}".format(gpio)):
                run_command("echo {} > /sys/class/gpio/export".format(gpio.split("gpio")[1]))
            # find . -name export
            # echo 505 > export , gpio文件夹下显示gpio505文件夹 , echo 505 > unexport , gpio505文件夹消失
            pin_name = desc['pin']
            net_name = desc['net']
            gpio_value_file = os.path.join("/sys/class/gpio", gpio, "value")   # gpio_value_file : /sys/class/gpio/gpio504/value
            value = self._read_gpio_pin_value(gpio_value_file)
            if value < 0:
                ret = E.EGPIO13001
                status = "Not_OK"
                err = "{} read failed!".format(gpio)
                self.fail_reason.append(err)
            else:
                status = "OK"
            status_tbl.append([gpio, pin_name, net_name, status])
# root@localhost:~# lspci -nnn -s 07:00.0    (07:00.0是fpga的bus号,fpga是pcie设备)
# 07:00.0 Memory controller [0580]: Xilinx Corporation Device [10ee:7021]

# root@localhost:~# lspci -vvv -s 07:00.0    (devmem 0xfb800000 8)
# Region 0: Memory at fb800000 (32-bit, non-prefetchable) [size=4M]

"pcie":{
    "X552":{
        "width":"x1",
        "speed":"2.5GT/s",
        "num":4
        }
},
"fpga":{ 
    "FPGA1":{
        "num":1,  
        "path":"/sys/bus/platform/devices/hq_switch/FPGA",
        "register":"scratch"
    }
},
"fpga_info":{
    "fpga_test_count": 3
},

GLOBAL_VALUE1 = '0x5A'
GLOBAL_VALUE2 = '0xA5'
class PCIETC(TestCaseCommon):  #同 lpc_tc
    def __init__(self, index, logger, platform_cfg_file, case_cfg_file=None):
        MODULE_NAME = "pcie_tc"
        TestCaseCommon.__init__(self, index, MODULE_NAME,
                                logger, platform_cfg_file, case_cfg_file)
        self.pcie_devices = None         # default
        self.fpga_devices = None         # default
        try:
            if self.platform_cfg_json and 'pcie' in self.platform_cfg_json.keys():
                self.pcie_devices = self.platform_cfg_json['pcie']
            if self.platform_cfg_json and 'fpga' in self.platform_cfg_json.keys():
                self.fpga_devices = self.platform_cfg_json['fpga']
            if self.platform_cfg_json and "fpga_info" in self.platform_cfg_json.keys():
                self.fpga_info = self.platform_cfg_json["fpga_info"]
        except Exception as e:
            self.logger.log_err(str(e), True)

    def _get_bus_dev_func(self, device):
        bus_dev_func = []
        ret, out = run_command("lspci | grep {}".format(device))
        for line in out.splitlines():
            if re.search("[0-9a-f]{2}\:[0-9a-f]{2}\.[0-9a-f]{1}", line):
                bus_dev_func.append(re.findall(
                    "[0-9a-f]{2}\:[0-9a-f]{2}\.[0-9a-f]{1}", line)[0])
            else:
                bus_dev_func.append(None)
        return bus_dev_func

    def _get_device_conf(self, busid):
        ret, output = run_command(
            "lspci -s {} -vvv | grep -i LnkSta | grep Speed".format(busid))
        if ret or output == '':
            return '', ''
        else:
            speed = output.strip(" \t\n").split(
                ',')[0].split('Speed')[1].strip()
            width = output.strip(" \t\n").split(
                ',')[1].split('Width')[1].strip()
            return speed, width

    def check_pcie_device(self, also_print_console=False):
        header = ['Device', 'Bus', 'Width', 'Speed']
        ret = E.OK
        status_tbl = []
        if self.pcie_devices:
            for device in self.pcie_devices.keys():
                conf_width = self.pcie_devices[device]["width"]
                conf_speed = self.pcie_devices[device]["speed"]
                dev_num = int(self.pcie_devices[device]["num"])
                busid_list = self._get_bus_dev_func(device)
                if not busid_list:
                    self.fail_reason.append("{} not found".format(device))
                    ret = E.EPCI9001
                    status_tbl.append([device, 'None', 'None', 'None'])
                else:
                    if len(busid_list) != dev_num:
                        self.logger.log_err("{} number expect {}, real {}".format(
                            device, dev_num, len(busid_list)))
                        self.fail_reason.append(
                            "{} number mismatch".format(device))
                        ret = E.EPCI9001
                    for busid in busid_list:
                        speed, width = self._get_device_conf(busid)  ########
                        if conf_width != width:
                            self.fail_reason.append(
                                "{} width not matched".format(device))
                            ret = E.EPCI9005
                        if conf_speed != speed:
                            self.fail_reason.append(
                                "{} speed not matched".format(device))
                            ret = E.EPCI9004
                        status_tbl.append([device, busid, width, speed])

    def fpga_device_rw_test(self, also_print_console=False):
        self.logger.log_info("[FPGA RW TEST]:", also_print_console)
        ret = E.OK
        if self.fpga_devices:
            for device in self.fpga_devices.keys():
                try:
                    device_conf = self.fpga_devices.get(device)
                    driver_path = device_conf['path']
                    reg_file = os.path.join(
                        driver_path, device_conf['register'])
                    with open(reg_file, 'w') as f:
                        f.write(GLOBAL_VALUE1)
                    with open(reg_file, 'r') as f:
                        value = f.read().strip()
                    self.logger.log_info("{} write {}, read {}".format(
                        device, GLOBAL_VALUE1, value), also_print_console)
                    if int(value, 16) != int(GLOBAL_VALUE1, 16):
                        ret = E.EPCI9003
                        self.fail_reason.append("{} rw fail".format(device))

                    with open(reg_file, 'w') as f:
                        f.write(GLOBAL_VALUE2)
                    with open(reg_file, 'r') as f:
                        value = f.read().strip()
                    self.logger.log_info("{} write {}, read {}".format(
                        device, GLOBAL_VALUE2, value), also_print_console)
                    if int(value, 16) != int(GLOBAL_VALUE2, 16):
                        ret = E.EPCI9003
                        self.fail_reason.append("{} rw fail".format(device))
                except Exception as e:
                    self.logger.log_err(str(e), True)
                    self.fail_reason.append(str(e))
                    ret = E.EPCI9002

    def run_test(self, *argv):
        ret = self.check_pcie_device(True)
        if ret != E.OK:
            return ret
        for i in range(self.fpga_info["fpga_test_count"]):
            ret = self.fpga_device_rw_test(True)
            if ret != E.OK:
                self.logger.log_err(
                    "pcie r/w loop {} failed!".format(i + 1), True)
                return ret
        return ret
ethtool 网卡名 :查看网卡速率,全双工
ethtool -s ethX [speed 10|100|1000] [duplex half|full]  [autoneg on|off]        
设置网口速率     10/100/1000M      设置网口半/全双工    设置网口是否自协商

test_help_list = [
    "Please check if psu status is correct",
    "1.correct",
    "2.wrong",]
for item in self.test_help_list:
    self.logger.log_info(item, True)
choice = str(input("Please input 1 or 2: "))
value = re.compile(r'^[-+]?[0-9]+$')
result = value.match(choice)
if result:
    if int(choice) == 1:
    elif int(choice) == 2:
    else:

    def search_speed(self, speed):
        time.sleep(5)
        sta, out = run_command("ethtool eth0")
        pattern = "\s+Speed: (.*)\s+"
        result = re.findall(pattern, out)[0]
        if result != speed:
            return False
        else:
            return True

    def rj45_led_test(self, also_print_console=False):
        ret = E.EFAIL
        self.logger.log_info("[RJ45 LED BLINK TEST]:", also_print_console)
        speed_list = ["10", "100", "1000"]
        for speed in speed_list:
            cmd = "ethtool -s eth0 autoneg off speed {} duplex full"
            if speed == "1000":
                cmd = "ethtool -s eth0 autoneg on speed {} duplex full".format(
                    speed)
            else:
                cmd = "ethtool -s eth0 autoneg off speed {} duplex full".format(
                    speed)
            run_command(cmd)
            if not self.search_speed("{}Mb/s".format(speed)):
                ret = E.EFAIL
                self.fail_reason("set eth0 speed fail.")
                self.logger.log_err("FAIL.", also_print_console)
                return ret
            self.logger.log_info(
                "current eth0 speed is {}Mb/s, start ping test...".format(speed), also_print_console)
            run_command("ping -c 20 -I eth0 {}".format(self.external_ip))
            choice = input(
                "please confirm rj45 led whether blink(1 is pass, 2 is fail):")
            if int(choice) == 2:
                ret = E.ELED14010
                self.fail_reason.append("rj45 led fault")
                self.logger.log_err("FAIL.", also_print_console)
                return ret
            elif int(choice) == 1:
                continue

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1444779.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PLC在物联网中位置—承上启下,与上位机下位机的关联。

谈到物联网&#xff0c;就绕不开PLC&#xff0c;本文着重介绍PLC的定义、与单片机的区分&#xff0c;价值、物联网中的位置&#xff0c;以及和上位机、下位机的关联&#xff0c;让友友们对PLC有个全面的认知。 一、什么是PLC PLC是可编程逻辑控制器&#xff08;Programmable L…

Java:字符集、IO流 --黑马笔记

一、字符集 1.1 字符集的来历 我们知道计算机是美国人发明的&#xff0c;由于计算机能够处理的数据只能是0和1组成的二进制数据&#xff0c;为了让计算机能够处理字符&#xff0c;于是美国人就把他们会用到的每一个字符进行了编码&#xff08;所谓编码&#xff0c;就是为一个…

python-自动化篇-终极工具-用GUI自动控制键盘和鼠标-pyautogui

文章目录 用GUI自动控制键盘和鼠标pyautogui 模块鼠标屏幕位置——移动地图——pyautogui.size鼠标位置——自身定位——pyautogui.position()移动鼠标——pyautogui.moveTo拖动鼠标滚动鼠标 键盘按下键盘释放键盘 开始与结束通过注销关闭所有程序 用GUI自动控制键盘和鼠标 在…

2024 CKS 题库 | 4、RBAC - RoleBinding

CKS 题库 4、RBAC - RoleBinding Context 绑定到 Pod 的 ServiceAccount 的 Role 授予过度宽松的权限。完成以下项目以减少权限集。 Task 一个名为 web-pod 的现有 Pod 已在 namespace db 中运行。 编辑绑定到 Pod 的 ServiceAccount service-account-web 的现有 Role&#…

shell脚本之文件处理命令及字符切片处理

目录 一、文件处理工具 1、tr命令 1.1 转换字符 1.2 压缩字符及删除字符 2、seq命令 3、cut命令 ​4、tac命令 5、rev命令 6、sort命令 ​​​​​7、uniq命令 ​8、echo命令 9、date命令 二、字符串切片处理 1、取字符串的长度 2、跳过字符串最前边的字符 3、…

利用Python和pandas库进行股票技术分析:移动平均线和MACD指标

利用Python和pandas库进行股票技术分析&#xff1a;移动平均线和MACD指标 介绍准备工作数据准备计算移动平均线计算MACD指标结果展示完整代码演示 介绍 在股票市场中&#xff0c;技术分析是一种常用的方法&#xff0c;它通过对股票价格和交易量等历史数据的分析&#xff0c;来…

LeetCode Python - 9.回文数

文章目录 题目答案运行结果 题目 给你一个整数 x &#xff0c;如果 x 是一个回文整数&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 回文数是指正序&#xff08;从左向右&#xff09;和倒序&#xff08;从右向左&#xff09;读都是一样的整数。 例如&am…

Python基础语法(内置Python, pycharm配置方式)

一.工具安装与配置 1.Python解释器的安装 官网网址:https://www.python.org/ 选择downloads即可(Windows用户点击Windows, 苹果用户点击macOS) 找到最新版本, 并选择 Download Windows installer (64-bit) 下载完成后可在得到一个安装包进行安装(安装时间较长) 安装完成后…

Stable Diffusion 模型下载:DreamShaper(梦想塑造者)

文章目录 模型介绍生成案例案例一案例二案例三案例四案例五案例六案例七案例八案例九案例十 下载地址 模型介绍 DreamShaper 是一个分格多样的大模型&#xff0c;可以生成写实、原画、2.5D 等多种图片&#xff0c;能生成很棒的人像和风景图。 条目内容类型大模型基础模型SD 1…

C++多态:定义、实现及原理/继承关系中的虚函数表

目录​​​​​​​ 一、多态的定义及实现 1.1多态的概念​​​​​​​ 1.2多态的构成条件 1.3virtual虚函数 1.4虚函数的重写 二、override和final 三、抽象类 3.1概念 3.2接口继承和实现继承 四、多态的原理 4.1虚函数表 4.2 多态的原理 4.3动态绑定与静态绑定…

自动化AD域枚举和漏洞检测脚本

linWinPwn 是一个 bash 脚本&#xff0c;可自动执行许多 Active Directory 枚举和漏洞检查。该脚本基于很多现有工具实现其功能&#xff0c;其中包括&#xff1a;impacket、bloodhound、netexec、enum4linux-ng、ldapdomaindump、lsassy、smbmap、kerbrute、adidnsdump、certip…

华为OD机试 - 最长子字符串的长度(一) (Python C C++ JavaGo JS PHP)

题目描述 给定一个字符串s&#xff0c;将其视为环形&#xff0c;要求找出其中出现偶数次的最长子字符串的长度。 输入描述 输入一个字符串s。 输出描述 输出一个整数&#xff0c;表示出现偶数次的最长子字符串的长度。 示例 解析题目 本题要求在给定的字符串中找出出现偶…

OpenCV入门:图像处理的基石

在数字图像处理领域&#xff0c;OpenCV&#xff08;开源计算机视觉库&#xff09;是一个不可或缺的工具。它包含了一系列强大的算法和函数&#xff0c;使得开发者可以轻松地处理图像和视频数据。本文将带你走进OpenCV的世界&#xff0c;了解其基本概念和常见应用。 1. OpenCV简…

Java 集合、迭代器

Java 集合框架主要包括两种类型的容器&#xff0c;一种是集合&#xff08;Collection&#xff09;&#xff0c;存储一个元素集合&#xff0c;另一种是图&#xff08;Map&#xff09;&#xff0c;存储键/值对映射。Collection 接口又有 3 种子类型&#xff0c;List、Set 和 Queu…

【EAI 015】CLIPort: What and Where Pathways for Robotic Manipulation

论文标题&#xff1a;CLIPort: What and Where Pathways for Robotic Manipulation 论文作者&#xff1a;Mohit Shridhar1, Lucas Manuelli, Dieter Fox1 作者单位&#xff1a;University of Washington, NVIDIA 论文原文&#xff1a;https://arxiv.org/abs/2109.12098 论文出处…

sheng的学习笔记-docker部署springboot

部署文章目录&#xff1a;目录 docker部署&#xff0c;原理&#xff0c;命令&#xff0c;可以参考&#xff1a;docker原理图&#xff0c;部署&#xff0c;命令 目录 将springboot部署到docker中 遇到过的问题&#xff1a; pom配置 操作步骤 生成jar 构建镜像 查看镜像d…

C语言之预处理详解

目录 1. 预定义符号2. #define定义常量3. #define定义宏练习 4. 带有副作用的宏参数5. 宏替换的规则6. 宏函数的对比宏和函数的一个对比 7. #和###运算符##运算符 8. 命名约定9. #undef10. 命令行定义11. 条件编译常见的条件编译 12. 头文件的包含头文件的包含方式库文件包含嵌…

移动端web开发布局

目录 flex布局&#xff1a; flex布局父项常见属性&#xff1a; flex布局子项常见属性&#xff1a; REM适配布局&#xff1a; 响应式布局&#xff1a; flex布局&#xff1a; 需要先给父类盒子设置display&#xff1a;flex flex是flexiblebox的缩写&#xff0c;意为"弹…

【DC渗透系列】DC-4靶场

主机发现 arp-scan -l┌──(root㉿kali)-[~] └─# arp-scan -l Interface: eth0, type: EN10MB, MAC: 00:0c:29:6b:ed:27, IPv4: 192.168.100.251 Starting arp-scan 1.10.0 with 256 hosts (https://github.com/royhills/arp-scan) 192.168.100.1 00:50:56:c0:00:08 …

【人工智能教育】“奇幻森林里的决战:小明‘剑’指期末,勇闯试卷迷宫

在智慧校园的奇幻乐园中&#xff0c;教育的故事不再局限于传统的粉笔与黑板&#xff0c;而是跃然于光影之间&#xff0c;流淌于数据之海。小明和他的同学们正是这个新世界的探险者&#xff0c;他们手握名为“智能辅导助手”的魔法棒&#xff0c;勇闯知识的迷宫。每当他们在力学…