文章目录
- 1.闭包和装饰器:函数里return就是闭包
- 2.解析eeprom:如下是二进制文件,C8是一个字节
- 3.json/configparser/optparse:json.dumps(将字典转化为字符串,将json信息写进文件),json.loads(将字符串转化为字典)
- 4.map:[]列表 , ()元组 , {}集合 , {a:b}字典。for else中break后不走else
- 5.walk/split/getattr/bin:bin转为二进制0b...
- 6.sol/mmc:emmc_info_cmd = "mmc cid read /sys/block/mmcblk0/device",size_cmd = "cat /sys/block/mmcblk0/size"
- 5.gpio/pcie:lspci,re.compile
1.闭包和装饰器:函数里return就是闭包
如下打印第一行是:我在~。
2.解析eeprom:如下是二进制文件,C8是一个字节
如下5x4+4+2x4=32。
# tlv_tool.py
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import sys
import struct
import os
import time
import json
import subprocess
import binascii
from os.path import dirname, abspath
# a.py
# import json, os
# print(json.__file__) # /usr/lib/python3.8/json/__init__.py
# print(os.path.abspath(__file__)) # /home/y/utool/a.py
# print(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # /home/y
sys.path.append(dirname(dirname(abspath(__file__)))) #当前tlv_tool.py文件所在绝对路径的文件夹的上级文件夹
Project_path = dirname(abspath(__file__)) #获取所在路径文件夹
Conf_path = '{}/tlv_conf.json'.format(Project_path)
def ParseJson(filename=Conf_path):
with open(filename, 'r') as f:
jdata = json.load(f)
return jdata
tlv_json = ParseJson()
tlv_reg_path = tlv_json['tlv_reg_path']
tlv_eeprom_path = tlv_json['tlv_eeprom_path']
def run_command(cmd):
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate()
if err and proc.returncode != 0:
return err
return out.decode()
# def write_reg(fpath, reg, data):
# if os.getcwd() != fpath:
# if os.path.exists(fpath) == False or os.chdir(fpath) == False:
# return 'file Not exists or file Not accessible'
# if run_command('echo {} {} > setreg'.format(reg, data)) != 0:
# return 'write fail'
# return 0
# def read_reg(fpath, reg):
# if os.getcwd() != fpath:
# if os.path.exists(fpath) == False or os.chdir(fpath) == False:
# return 'file Not exists or file Not accessible'
# if run_command('echo {} > getreg'.format(reg)) != '':
# return 'echo reg fail'
# return run_command('cat getreg').strip()
def crc2hex(crc):
return '0x{:0>8X}'.format(binascii.crc32(crc)) #取crc32的八位数据 %x返回16进制,摘要算法
def unlock_tlv_eeprom(path,reg,data = '0x0'): # unlock_tlv_eeprom(tlv_reg_path,'0x06') #0x06寄存器eeprom写保护
if os.getcwd() != path:
if os.path.exists(path) == False or os.chdir(path) == False:
return 'file Not exists or file Not accessible'
if run_command('echo {} {} > setreg'.format(reg, data)) != 0:
return 'write fail'
if run_command('echo {} > getreg'.format(reg)) != '':
return 'echo reg fail'
output = run_command('cat getreg').strip()
print("set reg {} value: {}".format(reg,output))
if output == data:
print('Unlock tlv eeprom success!') # 0x0
return True
print('Unlock tlv eeprom fail!')
return False
def tlv_read_eeprom(path,length=None,offset=None): # tlv_read_eeprom(tlv_eeprom_path,length)
with open('{}eeprom'.format(path),mode = 'rb') as f:
return f.read(length)
def tlv_read_length():
with open('{}eeprom'.format(tlv_eeprom_path),mode = 'rb') as f:
'''
tlv_header_length = char[8] + u8 + u16
'''
tlv_header_length = 11
tlv_signature , tlv_version , tlv_totallen = struct.unpack("!8sBH", f.read(tlv_header_length)) #读11字节
return tlv_header_length + tlv_totallen
def tlv_dump(args): #读出eeprom信息写到tlv_dump.bin文件
length = tlv_read_length()
data = tlv_read_eeprom(tlv_eeprom_path,length)
try:
file_name = args[2]
except IndexError:
file_name = 'tlv_dump.bin'
with open(file_name,mode = 'wb') as f:
# print(data)
# print(len(data))
f.write(data)
f.flush()
print("Dump tlv data finish!")
tlv_type_dirt = {
0x21 : 'Product Name',
0x22 : 'Part Number',
0x23 : 'Serial Number',
0x24 : 'Base MAC Address',
0x25 : 'Manufacture Date',
0x26 : 'Device Version',
0x27 : 'Label Revision',
0x28 : 'Platform Name',
0x29 : 'ONIE Version',
0x2A : 'MAC Address Size',
0x2B : 'Manufacturer',
0x2C : 'Country Code',
0x2D : 'Vendor Name',
0x2E : 'Diag Version',
0x2F : 'Service Tag',
0xFD : 'Vendor Extension',
0xFE : 'CRC-32'
}
def tlv_write(args):
with open(args[2],mode = 'rb') as f:
data = f.read()
if unlock_tlv_eeprom(tlv_reg_path,'0x06'):
with open('{}eeprom'.format(tlv_eeprom_path),mode = 'rb+') as f:
f.write(data)
f.flush()
print("Write tlv data finish!")
def tlv_read(args):
if len(args) == 3 and '.bin' in args[2]:
path = args[2]
else:
path = '{}eeprom'.format(tlv_eeprom_path)
with open(path,mode = 'rb') as f:
'''
tlv_header_length = char[8] + u8 + u16
'''
tlv_header_length = 11
tlv_signature , tlv_version , tlv_totallen = struct.unpack("!8sBH", f.read(tlv_header_length))
print("TlvInfo Header:")
print(" Id String : {}".format(tlv_signature.decode().strip()))
print(" Version : {}".format(tlv_version))
print(" Total Length : {}".format(tlv_totallen))
'''
data_header_length = u8 + u8
'''
print("TLV Name Code Len Value")
print("--------------------------------")
data_header_length = 2
length_cnt = 0
stored_crc = None
while length_cnt < tlv_totallen :
data_type , data_length = struct.unpack("!BB", f.read(data_header_length))
length_cnt += data_header_length + data_length
if (data_type in tlv_type_dirt.keys()):
pass
else:
continue
if tlv_type_dirt[data_type] == 'Base MAC Address':
if data_length == 6:
mac0,mac1,mac2,mac3,mac4,mac5 = struct.unpack('!BBBBBB', f.read(data_length))
data = "{:0>2X}:{:0>2X}:{:0>2X}:{:0>2X}:{:0>2X}:{:0>2X}".format(mac0,mac1,mac2,mac3,mac4,mac5)
else:
print('Base MAC Address Error Length {}!'.format(data_length))
exit
elif tlv_type_dirt[data_type] == 'Device Version':
if data_length == 1:
data = struct.unpack('!B', f.read(data_length))
data = data[0]
else:
print('Device Version Error Length {}!'.format(data_length))
exit
elif tlv_type_dirt[data_type] == 'MAC Address Size':
if data_length == 2:
byte0,byte1 = struct.unpack('!BB', f.read(data_length))
data = (byte0 << 8) | byte1
else:
print('MAC Address Size Error Length {}!'.format(data_length))
exit
elif tlv_type_dirt[data_type] == 'Vendor Extension':
if data_length > 0:
vendor_cmd = '!'
for index in range(data_length):
vendor_cmd += "B"
byte_tuple = struct.unpack(vendor_cmd, f.read(data_length))
data = ''
for byte in byte_tuple:
data += '0x{:0>2X} '.format(byte)
else:
data = 'NULL'
elif tlv_type_dirt[data_type] == 'CRC-32':
if data_length == 4:
crc0,crc1,crc2,crc3 = struct.unpack('!BBBB', f.read(data_length))
data = "0x{:0>2X}{:0>2X}{:0>2X}{:0>2X}".format(crc0,crc1,crc2,crc3)
stored_crc = eval(data)
else:
print('CRC Error Length {}!'.format(data_length))
exit
else:
if data_length > 0:
data = struct.unpack('!{}s'.format(data_length), f.read(data_length))
data = data[0].decode().strip()
else:
# data = None
# data = ''
data = 'NULL'
print("{:<16} 0x{:x} {:>3} : {}".format(tlv_type_dirt[data_type],data_type,data_length,data))
if stored_crc != None:
f.seek(0,0)
crc_data = f.read(tlv_header_length + tlv_totallen - 4)
# print(crc_data)
calc_crc = eval(crc2hex(crc_data))
if calc_crc == stored_crc:
print('Checksum is valid')
else:
print('Checksum is invalid')
else:
print('No Checksum Data')
def tlv_update(args):
try:
file_name = args[2]
except IndexError:
file_name = None
tlv_data = tlv_json['tlv_data']
'''
tlv_header_length = char[8] + u8 + u16
'''
tlv_signature = tlv_json['tlv_signature'].encode()
if len(tlv_signature) > 8:
print('tlv signature length out off range !!!')
exit
while len(tlv_signature) < 8:
tlv_signature += b'\x00'
tlv_version = tlv_json['tlv_version'].to_bytes(1,'big')
tlv_totallen = 0x00
data_list = [] #一般都在for上面定义一个空列表
for item in tlv_type_dirt:
title = tlv_type_dirt[item].replace(' ','_')
if (title in tlv_data.keys()):
pass
else:
if tlv_type_dirt[item] == 'Manufacture Date':
pass
else:
continue
if tlv_type_dirt[item] == 'Base MAC Address':
mac_list = tlv_data[title].split(':')
mac_code = b''
for mac_item in mac_list:
mac_code += eval('0x{}'.format(mac_item)).to_bytes(1,'big')
data = mac_code
elif tlv_type_dirt[item] == 'Manufacture Date':
data = time.strftime("%d-%m-%Y %H:%M:%S", time.localtime()).encode()
elif tlv_type_dirt[item] == 'Device Version':
data = tlv_data[title].to_bytes(1,'big')
elif tlv_type_dirt[item] == 'MAC Address Size':
data = tlv_data[title].to_bytes(2,'big')
elif tlv_type_dirt[item] == 'Vendor Extension':
vendor_list = tlv_data[title].split()
vendor_code = b''
for vendor_item in vendor_list:
vendor_code += eval(vendor_item).to_bytes(1,'big')
data = vendor_code
else:
data = tlv_data[title]
if data != 'None' and data != '':
data = data.encode()
else:
data = b''
type_code = item.to_bytes(1,'big')
len_code = len(data).to_bytes(1,'big')
data_str = type_code + len_code + data
data_list.append(data_str) ##################
for item in data_list:
tlv_totallen += len(item)
'''
tlv_totallen add crc data str length
'''
tlv_totallen = (tlv_totallen + 6).to_bytes(2,'big')
tlv_final_data = tlv_signature + tlv_version + tlv_totallen
for data_item in data_list:
tlv_final_data += data_item
'''
add crc data header and length
'''
tlv_final_data += (0xFE).to_bytes(1,'big') + (0x04).to_bytes(1,'big')
'''
add crc data
'''
crc_data = eval(crc2hex(tlv_final_data)).to_bytes(4,'big')
tlv_final_data += crc_data
# print(tlv_final_data)
if file_name == None:
if unlock_tlv_eeprom(tlv_reg_path,'0x84'):
with open('{}eeprom'.format(tlv_eeprom_path),mode = 'rb+') as f:
f.write(tlv_final_data)
f.flush()
print("update tlv data finish!")
else:
with open(file_name,mode = 'wb') as f:
f.write(tlv_final_data)
f.flush()
print("update tlv data bin finish!")
def read_help(args):
print('usage: {} [OPTIONS]'.format(args[0]))
print('Options are:')
print(' -w --write write tlv bin to eeprom')
print(' -d --dump dump tlv bin from eeprom')
print(' -r --read read tlv data from eeprom')
print(' -u --update update tlv data to eeprom or create new tlv bin')
print(' -h --help Display this help text and exit')
print('Example:')
print(' --write <bin name>')
print(' --dump <bin name>')
print(' --update <bin name>')
print(' just create tlv bin , but not update tlv data')
print(' --update')
print(' just update tlv data , but not create tlv bin')
print(' --read')
op = {
'--write' : tlv_write,
'--dump' : tlv_dump,
'--read' : tlv_read,
'--update' : tlv_update,
'--help' : read_help,
'-w' : tlv_write,
'-d' : tlv_dump,
'-r' : tlv_read,
'-u' : tlv_update,
'-h' : read_help,
}
if __name__ == '__main__':
if len(sys.argv) == 1:
read_help(sys.argv)
else:
op.get(sys.argv[1], read_help)(sys.argv)
// tlv_conf.json
{
"tlv_reg_path" : "/sys/bus/platform/devices/hq_switch/cpld_lc/",
"tlv_eeprom_path" : "/sys/bus/i2c/devices/0-0056/",
"tlv_signature": "TlvInfo",
"tlv_version": 1,
"tlv_data": {
"Product_Name": "ZSR551",
"Part_Number": "ZSR551",
"Serial_Number": "SN number",
"Base_MAC_Address": "00:E0:EC:00:00:00",
"Device_Version": 1,
"Label_Revision": "ZSR551",
"Platform_Name": "x86_64-ZSR551-r0",
"ONIE_Version": "0.0.1",
"MAC_Address_Size": 260,
"Manufacturer": "None",
"Country_Code": "CHN",
"Vendor_Name": "None",
"Diag_Version": "1.4.0",
"Service_Tag": "LB",
"Vendor_Extension": "0x00 0x00 0x00"
}
}
3.json/configparser/optparse:json.dumps(将字典转化为字符串,将json信息写进文件),json.loads(将字符串转化为字典)
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import json
# dict1 = {"age": "12"} # <type 'dict'>
# json_info = json.dumps(dict1)
# print("通过json.dumps()函数处理:json_info的类型:"+str(type(json_info))) # <type 'str'>
# print(json_info) # {"age": "12"}
json_info = '{"age": "12"}' # <type 'str'>
dict1 = json.loads(json_info)
print("通过json.loads()函数处理:dict1的类型:"+str(type(dict1))) # <type 'dict'>
print(dict1) # {u'age': u'12'} # u:unicode中文显示
# json_info = "{'age': '12'}"
# file = open('1.json','w')
# json.dump(json_info,file) #生成1.json里面内容为 "{'age': '12'}"
win下.ini多。
# coding=UTF-8<code>
import os
import json
def d():
info_list = ['a','b','c','d','e','f','g','h','i']
split_list = [info_list[i:i+4] for i in range(0, len(info_list), 4)]
print(split_list) # [['a', 'b', 'c', 'd'], ['e', 'f', 'g', 'h'], ['i']]
x = set('runoob')
print(x) # set(['b', 'r', 'u', 'o', 'n'])
print(len(x)) # 5
i2clist={
'2':'port 1 SFP',
'3':'port 2 SFP',
'4':'port 3 SFP',
'5':'port 4 SFP'
}
def e():
for i in i2clist:
print(i) # 3 2 5 4
def search_dir_by_name(name, dir):
result = []
try:
files = os.listdir(dir)
for file in files:
if name in file:
result.append(os.path.join(dir, file))
except Exception as e:
pass
return result
dir = "/sys/block/"
spect = "sd"
ssdpath = []
# /sys/block/sd* remove=1 sda fdisk | grep sda 有/dev/sda , 没有的话只有sda, 拼接/dev/sda
if __name__ == '__main__':
result = search_dir_by_name(spect, dir)
#print(result) #['/sys/block/sdd', '/sys/block/sdb', '/sys/block/sde', '/sys/block/sdc', '/sys/block/sda']
for item in result:
with open(os.path.join(item, "removable"), 'r') as fd:
#print(item) # /sys/block/sde
#print(os.path.join(item, "removable")) # /sys/block/sde/removable
#print(fd.read()) #0或1 # SyntaxError: Non-ASCII character '\xe6' 加#coding=UTF-8<code>
value = fd.read()
if value.strip() == "0": # found ssd #去除1
ssd_disk = "/dev/" + os.path.basename(item) #ssd_disk:/dev/sda .../dev/sdd
ssdpath.append(ssd_disk)
4.map:[]列表 , ()元组 , {}集合 , {a:b}字典。for else中break后不走else
python中None,False,空字符串"",0,空列表[],空字典{},空元组()都相当于False。
cpu和switch连着,中间有一个nic卡(mellanox或e810),switch连着外面的光模块。fpga一端连着cpu,一端连着switch,fpga的eeprom存指令控制switch。
前面板前36个是loopback(光模块插DAC线或光纤,lookpack内部TX和RX接在一起),37-64网口是fpga和nic卡, 61和62->mellax 100G(100G无4个lan), e810 25G。
traffic:外部loocpack,cpu发包,cpu收包。
fpga 100g : cpu - fpga - switch - loopback - switch - fpga。
fpga 25g : fpga - switch。
layout只fpga,1个fpga有8个pcie设备(pf),1个pcie设备有16个通道。
如下25G有4个lan。
def stop_all(self, bdf_lst, bar, port, channel) 。如下self. 默认第一个参数有self。
5.walk/split/getattr/bin:bin转为二进制0b…
# b.py
import os
import json
import sys
def seek_file(path, name):
fullpath = None
for root, dirs, files in os.walk(path):
if name in files:
fullpath = '{0}/{1}'.format(root, name)
break
return fullpath
if len(sys.argv)==3:
print(sys.argv[1],sys.argv[2])
path=seek_file(sys.argv[1],sys.argv[2])
print(path)
else:
print(len(sys.argv),'err')
root@bmc-oob:~# python b.py /sys/bus/i2c/devices/26-0048 in3_input
/sys/bus/i2c/devices/26-0048 in3_input
/sys/bus/i2c/devices/26-0048/hwmon/hwmon4/in3_input
def get_baseboard_sku():
try:
proc = subprocess.Popen([IPMITOOL_PATH, "raw", "0x3a", "0x10", "9", "0x1a", "1", "0x12"],
stdout=subprocess.PIPE,
shell=False,
stderr=subprocess.STDOUT) # proc返回20
stdout = proc.communicate()[0] # b' 20\n'
proc.wait()
sku_str = stdout.decode() # 20
sku_bin = bin(int("0x" + sku_str.strip(), 16))[-5:] # 0x20(十六进制 )= 32(十进制 ),bin(32) = 0b100000
sku_dict = {
"00001" : "ns35x",
"00010" : "ns34x",
"00100" : "ns33x",
"01000" : "ns32x",
"10000" : "ns31x"
}
sku = sku_dict.get(sku_bin, "")
except OSError as e:
raise OSError("Cannot detect platform")
return sku
6.sol/mmc:emmc_info_cmd = “mmc cid read /sys/block/mmcblk0/device”,size_cmd = “cat /sys/block/mmcblk0/size”
def check_bmc_sol_test(self, also_print_console=False):
self.logger.log_info("[RUN SOL TEST]:", also_print_console)
expect_list = [
'yes/no',
'password:',
'Password:',
'localhost login:',
'bmc-oob login:',
'bmc-oob. login:',
'root@bmc-oob:~#',
'root@localhost:~#',
]
remote = pit_util_common.remote(
self.logger, self.fail_reason, self.bmc_login["usrname"],
self.bmc_login["ip"], self.bmc_login["password"])
ret = remote.connect(also_print_console)
if ret != E.OK:
return ret
output = remote.command("sol.sh", time=10, expect_list=['-----------------------------------'])
self.logger.log_info(output, also_print_console)
output1 = remote.command('\x15\x12\x14\x32', current=True, expect_list=expect_list)
self.logger.log_info(output1, also_print_console)
output2 = remote.command('\r\n', current=True, expect_list=expect_list)
self.logger.log_info(output2, also_print_console)
if "localhost login" in output1 or "localhost login" in output2:
self.logger.log_info("Switch to CPU Pass ", also_print_console)
else:
ret = E.EFAIL
self.fail_reason.append("Switch to CPU Fail")
remote.disconnect()
return ret
output1 = remote.command('\x15\x12\x14\x33', current=True, expect_list=expect_list)
self.logger.log_info(output1, also_print_console)
output2 = remote.command('\r\n', current=True, expect_list=expect_list)
self.logger.log_info(output2, also_print_console)
if "bmc-oob" in output1 or "bmc-oob" in output2:
self.logger.log_info("Switch to BMC Pass ", also_print_console)
ret = E.OK
else:
ret = E.EFAIL
self.fail_reason.append("Switch to BMC Fail")
remote.disconnect()
if ret != E.OK:
self.logger.log_err("FAIL!", also_print_console)
else:
self.logger.log_info("PASS.", also_print_console)
return ret
def bmc_ping_cpu_test(self, also_print_console=False):
count = 5
ping_cmd = "ping %s -c %d | grep received" % (
self.bmc_login["ip"], count)
status, output = run_command(ping_cmd)
self.logger.log_info(output, also_print_console)
if output.find(" 0% packet loss") > 0:
ret = E.OK
else:
self.fail_reason.append("cpu ping server lost packages")
ret = E.EMGMT11002
return ret
bmc_ddr_name_cmd = "cat /proc/meminfo | awk '{print $1}'"
bmc_emmc_show_cmd = "fdisk -l | grep /dev/mmcblk0"
bmc_emmc_stress_test_cmd = "touch /var/log/emmc.test ; \
echo '==emmc read stress test==' ; \
(time dd if=/dev/mmcblk0 of=/dev/null bs=1M count=10 && sync) >> /var/log/emmc.log ; echo ' ' ; \
echo '==emmc write stress test==' ; \
(time dd if=/dev/zero of=/var/log/emmc.test bs=1M count=10 && sync) >> /var/log/emmc.log ; \
rm /var/log/emmc.test"
def str_to_hex(self, s): # 把字符串转化为对应的16进制串
return ' '.join([hex(ord(c)).replace('0x', '') for c in s])
def hex_to_str(self, s):
return ''.join([chr(i) for i in [int(b, 16) for b in s.split(' ')]])
# def str2bin(s): # bin二进制
# return ' '.join([bin(ord(c)).replace('0b', '') for c in s])
# def bin2str(s):
# return ''.join([chr(i) for i in [int(b, 2) for b in s.split(' ')]])
def replace_char(self, string, char, index):
string = list(string)
string[index] = char
return ''.join(string)
def modify_profile(self, also_print_console=True):
ret, Before_mac = self.get_mac_address_from_tlv_eeprom(also_print_console)
if ret != E.OK:
return ret
After_mac = Before_mac # 00E0EC000000
for i in range(11, -1, -1): # 从11到0 (竖着打印) , 12个(相当于索引)
last = After_mac[i] # 从右向左单个遍历
if last == "F":
After_mac = self.replace_char(After_mac, "0", i)
continue
else:
last = self.str_to_hex(last) # 字符0转为ascii码48,再转为十六进制0x30即30
last = int(last) + int(self.offset["offset"]["bmc_offset"])
last = str(last)
last = self.hex_to_str(last) # 函数里有split,所以last必须为str类型
After_mac = self.replace_char(After_mac, last, i) # After_mac = "EFFFFFFFFFFF" , 因为下行break,只打印出一行F00000000000
break
sign = 0
os.chdir(self.hwsku_path)
with open('fru.ini', 'r') as f:
lines = []
for line in f.readlines():
if line != '\n':
lines.append(line)
f.close()
with open('fru.ini', 'w') as f:
for line in lines:
if "bmc_base_mac_address=" in line:
if sign == 1:
f.write('%s' % line)
continue
line = "bmc_base_mac_address="+After_mac
f.write('%s\n' %line)
sign = 1
else:
f.write('%s' %line)
return After_mac
5.gpio/pcie:lspci,re.compile
# "gpio":{
# "gpio504":{ # cpu的gpio504
# "pin":"GPIO8", # cpld的gpio8
# "net":"CTC7132_CPLD_JTAG_EN" # cpld的gpio8的备注作用:使能
# },
# "gpio505":{
# "pin":"GPIO9",
# "net":"CTC7132_CPLD_JTAG_TCK_R" # 时钟
# },
# "gpio506":{
# "pin":"GPIO10",
# "net":"CTC7132_CPLD_JTAG_TMS_R"
# },
# "gpio507":{
# "pin":"GPIO11",
# "net":"CTC7132_CPLD_JTAG_TDI_R" # 数据输入
# },
# "gpio508":{
# "pin":"GPIO12",
# "net":"CTC7132_CPLD_JTAG_TDO" # 数据输出
# }
# }
self.gpio_map = self.platform_cfg_json['gpio']
def _read_gpio_pin_value(self, gpio_value_file):
value = -1
try:
with open(gpio_value_file, "r") as f:
value = f.read()
except IOError as e:
self.logger.log_err(str(e))
return value
def gpio_pin_check(self, also_print_console=True):
if not self.gpio_map:
err = "gpio conf is null!"
self.fail_reason.append(err)
self.logger.log_err(err, also_print_console)
return E.EEXIST
self.logger.log_info("[GPIO PIN CHECK]:", also_print_console)
header = ["GPIO", "Pin", "Net", "Status"]
status_tbl = []
ret = E.OK
for gpio, desc in self.gpio_map.items():
if not os.path.exists("/sys/class/gpio/{}".format(gpio)):
run_command("echo {} > /sys/class/gpio/export".format(gpio.split("gpio")[1]))
# find . -name export
# echo 505 > export , gpio文件夹下显示gpio505文件夹 , echo 505 > unexport , gpio505文件夹消失
pin_name = desc['pin']
net_name = desc['net']
gpio_value_file = os.path.join("/sys/class/gpio", gpio, "value") # gpio_value_file : /sys/class/gpio/gpio504/value
value = self._read_gpio_pin_value(gpio_value_file)
if value < 0:
ret = E.EGPIO13001
status = "Not_OK"
err = "{} read failed!".format(gpio)
self.fail_reason.append(err)
else:
status = "OK"
status_tbl.append([gpio, pin_name, net_name, status])
# root@localhost:~# lspci -nnn -s 07:00.0 (07:00.0是fpga的bus号,fpga是pcie设备)
# 07:00.0 Memory controller [0580]: Xilinx Corporation Device [10ee:7021]
# root@localhost:~# lspci -vvv -s 07:00.0 (devmem 0xfb800000 8)
# Region 0: Memory at fb800000 (32-bit, non-prefetchable) [size=4M]
"pcie":{
"X552":{
"width":"x1",
"speed":"2.5GT/s",
"num":4
}
},
"fpga":{
"FPGA1":{
"num":1,
"path":"/sys/bus/platform/devices/hq_switch/FPGA",
"register":"scratch"
}
},
"fpga_info":{
"fpga_test_count": 3
},
GLOBAL_VALUE1 = '0x5A'
GLOBAL_VALUE2 = '0xA5'
class PCIETC(TestCaseCommon): #同 lpc_tc
def __init__(self, index, logger, platform_cfg_file, case_cfg_file=None):
MODULE_NAME = "pcie_tc"
TestCaseCommon.__init__(self, index, MODULE_NAME,
logger, platform_cfg_file, case_cfg_file)
self.pcie_devices = None # default
self.fpga_devices = None # default
try:
if self.platform_cfg_json and 'pcie' in self.platform_cfg_json.keys():
self.pcie_devices = self.platform_cfg_json['pcie']
if self.platform_cfg_json and 'fpga' in self.platform_cfg_json.keys():
self.fpga_devices = self.platform_cfg_json['fpga']
if self.platform_cfg_json and "fpga_info" in self.platform_cfg_json.keys():
self.fpga_info = self.platform_cfg_json["fpga_info"]
except Exception as e:
self.logger.log_err(str(e), True)
def _get_bus_dev_func(self, device):
bus_dev_func = []
ret, out = run_command("lspci | grep {}".format(device))
for line in out.splitlines():
if re.search("[0-9a-f]{2}\:[0-9a-f]{2}\.[0-9a-f]{1}", line):
bus_dev_func.append(re.findall(
"[0-9a-f]{2}\:[0-9a-f]{2}\.[0-9a-f]{1}", line)[0])
else:
bus_dev_func.append(None)
return bus_dev_func
def _get_device_conf(self, busid):
ret, output = run_command(
"lspci -s {} -vvv | grep -i LnkSta | grep Speed".format(busid))
if ret or output == '':
return '', ''
else:
speed = output.strip(" \t\n").split(
',')[0].split('Speed')[1].strip()
width = output.strip(" \t\n").split(
',')[1].split('Width')[1].strip()
return speed, width
def check_pcie_device(self, also_print_console=False):
header = ['Device', 'Bus', 'Width', 'Speed']
ret = E.OK
status_tbl = []
if self.pcie_devices:
for device in self.pcie_devices.keys():
conf_width = self.pcie_devices[device]["width"]
conf_speed = self.pcie_devices[device]["speed"]
dev_num = int(self.pcie_devices[device]["num"])
busid_list = self._get_bus_dev_func(device)
if not busid_list:
self.fail_reason.append("{} not found".format(device))
ret = E.EPCI9001
status_tbl.append([device, 'None', 'None', 'None'])
else:
if len(busid_list) != dev_num:
self.logger.log_err("{} number expect {}, real {}".format(
device, dev_num, len(busid_list)))
self.fail_reason.append(
"{} number mismatch".format(device))
ret = E.EPCI9001
for busid in busid_list:
speed, width = self._get_device_conf(busid) ########
if conf_width != width:
self.fail_reason.append(
"{} width not matched".format(device))
ret = E.EPCI9005
if conf_speed != speed:
self.fail_reason.append(
"{} speed not matched".format(device))
ret = E.EPCI9004
status_tbl.append([device, busid, width, speed])
def fpga_device_rw_test(self, also_print_console=False):
self.logger.log_info("[FPGA RW TEST]:", also_print_console)
ret = E.OK
if self.fpga_devices:
for device in self.fpga_devices.keys():
try:
device_conf = self.fpga_devices.get(device)
driver_path = device_conf['path']
reg_file = os.path.join(
driver_path, device_conf['register'])
with open(reg_file, 'w') as f:
f.write(GLOBAL_VALUE1)
with open(reg_file, 'r') as f:
value = f.read().strip()
self.logger.log_info("{} write {}, read {}".format(
device, GLOBAL_VALUE1, value), also_print_console)
if int(value, 16) != int(GLOBAL_VALUE1, 16):
ret = E.EPCI9003
self.fail_reason.append("{} rw fail".format(device))
with open(reg_file, 'w') as f:
f.write(GLOBAL_VALUE2)
with open(reg_file, 'r') as f:
value = f.read().strip()
self.logger.log_info("{} write {}, read {}".format(
device, GLOBAL_VALUE2, value), also_print_console)
if int(value, 16) != int(GLOBAL_VALUE2, 16):
ret = E.EPCI9003
self.fail_reason.append("{} rw fail".format(device))
except Exception as e:
self.logger.log_err(str(e), True)
self.fail_reason.append(str(e))
ret = E.EPCI9002
def run_test(self, *argv):
ret = self.check_pcie_device(True)
if ret != E.OK:
return ret
for i in range(self.fpga_info["fpga_test_count"]):
ret = self.fpga_device_rw_test(True)
if ret != E.OK:
self.logger.log_err(
"pcie r/w loop {} failed!".format(i + 1), True)
return ret
return ret
ethtool 网卡名 :查看网卡速率,全双工
ethtool -s ethX [speed 10|100|1000] [duplex half|full] [autoneg on|off]
设置网口速率 10/100/1000M 设置网口半/全双工 设置网口是否自协商
test_help_list = [
"Please check if psu status is correct",
"1.correct",
"2.wrong",]
for item in self.test_help_list:
self.logger.log_info(item, True)
choice = str(input("Please input 1 or 2: "))
value = re.compile(r'^[-+]?[0-9]+$')
result = value.match(choice)
if result:
if int(choice) == 1:
elif int(choice) == 2:
else:
def search_speed(self, speed):
time.sleep(5)
sta, out = run_command("ethtool eth0")
pattern = "\s+Speed: (.*)\s+"
result = re.findall(pattern, out)[0]
if result != speed:
return False
else:
return True
def rj45_led_test(self, also_print_console=False):
ret = E.EFAIL
self.logger.log_info("[RJ45 LED BLINK TEST]:", also_print_console)
speed_list = ["10", "100", "1000"]
for speed in speed_list:
cmd = "ethtool -s eth0 autoneg off speed {} duplex full"
if speed == "1000":
cmd = "ethtool -s eth0 autoneg on speed {} duplex full".format(
speed)
else:
cmd = "ethtool -s eth0 autoneg off speed {} duplex full".format(
speed)
run_command(cmd)
if not self.search_speed("{}Mb/s".format(speed)):
ret = E.EFAIL
self.fail_reason("set eth0 speed fail.")
self.logger.log_err("FAIL.", also_print_console)
return ret
self.logger.log_info(
"current eth0 speed is {}Mb/s, start ping test...".format(speed), also_print_console)
run_command("ping -c 20 -I eth0 {}".format(self.external_ip))
choice = input(
"please confirm rj45 led whether blink(1 is pass, 2 is fail):")
if int(choice) == 2:
ret = E.ELED14010
self.fail_reason.append("rj45 led fault")
self.logger.log_err("FAIL.", also_print_console)
return ret
elif int(choice) == 1:
continue