从队列中获取节点名
我们有时候需要从任务队列中取出完整的节点名称,比如cn[8044-8046,8358-8360,8926-8928,9002-9004],可以给定参数input_str也可以在脚本中直接写死。
import re
import subprocess
import sys
input_str = "cn[7512-7519,7545-7552,7777,8290-8297,8888]"
input_str = "cn[7512-7519]"
# 使用正则表达式提取数字范围
input_str = sys.argv[1]
ranges = re.findall(r"\d+-\d+|\d+", input_str)
node_names = []
for item in ranges:
if '-' in item:
start, end = map(int, item.split('-'))
for i in range(start, end + 1):
node_names.append(f"cn{i}")
else:
node_names.append(f"cn{item}")
print(node_names)
print(len(node_names))
from util import *
一个技巧:写python脚本尽量使用函数方便进行调用,比如上面的脚本,虽然实现了功能,但是不方便其他脚本来调用,可以使用函数改写成:
import re
import subprocess
import sys
def get_cn_name(input_str):
# 使用正则表达式提取数字范围
ranges = re.findall(r"\d+-\d+|\d+", input_str)
node_names = []
for item in ranges:
if '-' in item:
start, end = map(int, item.split('-'))
for i in range(start, end + 1):
node_names.append(f"cn{i}")
else:
node_names.append(f"cn{item}")
return node_names
if __name__ == "__main__":
#input_str = "cn[7512-7519,7545-7552,7777,8290-8297,8888]"
#input_str = "cn[7512-7519]"
input_str = sys.argv[1]
node_names = get_cn_name(input_str)
print(node_names)
print(len(node_names))
这样在其他脚本中使用这个函数,只需要import一下:
import get_cn_name
node_names = get_cn_name.get_cn_name("cn[7512-7519,7545-7552,7777,8290-8297,8888]")
print(node_names)
print(len(node_names))
from get_cn_name import *
node_names = get_cn_name("cn[7512-7519,7545-7552,7777,8290-8297,8888]")
print(node_names)
print(len(node_names))
import 关键字:
- 使用 import 关键字可以导入整个模块。例如:import random 将整个 random 模块导入到当前的命名空间中。
- 导入整个模块后,你可以使用模块中定义的函数、类和变量。但是,你需要使用模块名作为前缀来访问这些定义。例如:random.randint(1, 10)。
- 这种方式可能会导致命名空间的冲突,特别是当你导入多个模块时,可能会出现同名的函数或变量。
from import 关键字:
- 使用 from import 关键字可以从模块中导入特定的内容。例如:from random import randint 只导入 random 模块中的 randint 函数。
- 导入特定内容后,你可以直接使用函数、类或变量的名称,而无需使用模块名作为前缀。例如:randint(1, 10)。
- 这种方式可以减少代码中的冗余,并提供更直接的访问方式。但是,如果导入的内容与当前命名空间中的其他内容发生冲突,可能会导致命名冲突的问题。
随机数
在Python中,你可以使用random模块来生成随机数。random模块提供了各种生成随机数的函数。
生成一个0到1之间的随机浮点数:
import random
random_number = random.random()
print(random_number)
生成一个指定范围内的随机整数:
import random
random_integer = random.randint(1, 10) # 生成1到10之间的随机整数
print(random_integer)
从列表或序列中随机选择一个元素:
import random
my_list = [1, 2, 3, 4, 5]
random_element = random.choice(my_list)
print(random_element)
生成一个指定范围内的随机浮点数:
import random
random_float = random.uniform(0.0, 1.0) # 生成0到1之间的随机浮点数
print(random_float)
保留两位小数:
import random
random_float = round(random.uniform(0.0, 1.0), 2) # 生成0到1之间的随机浮点数并保留两位小数
print(random_float)
采样
random.sample()函数是Python中用于从给定的序列中选择指定数量的唯一随机元素的函数。它可以用于生成不重复的随机数样本。下面这个程序是采样不重复numa和节点对。
import random
numa = [(0, 0), (0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7),
(1, 0), (1, 1), (1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (1, 7),
(2, 0), (2, 1), (2, 2), (2, 3), (2, 4), (2, 5), (2, 6), (2, 7),
(3, 0), (3, 1), (3, 2), (3, 3), (3, 4), (3, 5), (3, 6), (3, 7),
(4, 0), (4, 1), (4, 2), (4, 3), (4, 4), (4, 5), (4, 6), (4, 7),
(5, 0), (5, 1), (5, 2), (5, 3), (5, 4), (5, 5), (5, 6), (5, 7),
(6, 0), (6, 1), (6, 2), (6, 3), (6, 4), (6, 5), (6, 6), (6, 7),
(7, 0), (7, 1), (7, 2), (7, 3), (7, 4), (7, 5), (7, 6), (7, 7),
]
random_four = random.sample(numa, 4)
print(random_four)
print(random_four[1])
print(random_four[1][0])
print(random_four[1][1])
cns = ["cn1", "cn2", "cn3", "cn4", "cn5", "cn6", "cn7", "cn8", "cn9", "cn10", "cn11", "cn12", "cn13", "cn14", "cn15", "cn16",
"cn17", "cn18", "cn19", "cn10"]
random_pairs = random.sample(cns, k=20)
random_pairs = [(random_pairs[i], random_pairs[i + 1]) for i in range(0, len(random_pairs), 2)]
print(random_pairs)
for i in range(0,len(random_pairs)):
print(random_pairs[i][0],random_pairs[i][1])
程序运行时间计时
import os
import time
start_time = time.time()
time.sleep(10)
end_time = time.time()
# 计算执行时间
execution_time = end_time - start_time
print("程序执行时间:", execution_time, "秒")
使用oobw计算点点延迟和带宽
get_oobw是单纯的进行点点延迟和带宽的计算,get_oobw_mpi使用了多线程计算,节省大量时间。sam_get_min_oobw_mpi相比get_oobw_mpi进行了采样,在全局环境下不需要穷尽所有节点的点点延迟和带宽计算,进一步节省了时间。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#----------------------------------------------------------------------------
# Created By : Wanxin Wang
# Created Date: 03.8.2024
import random
import re
import os
#import matplotlib.pyplot as plt
import numpy as np
import os
import subprocess
from subprocess import PIPE, run
from contextlib import contextmanager
import sys
import time
import multiprocessing
def get_available_vp(info):
info = info.split("\n")[1:]
#print(info)
used_vps = []
for line in info:
tmp = line.split()
if len(tmp) > 1:
tmp = line.split()[0]
if "*" in tmp:
tmp = tmp[1:]
#print(tmp)
used_vps.append(int(tmp))
for vp in range(9, 56):
if vp not in used_vps:
return vp
def get_oobw(sender,reciver,reciver_numa,sender_numa):
latency = 0
bandwidth = 0
# get inputs
os.system("rm ./receiver.lock ./result.txt >/dev/null 2>&1")
#print("sys.arg[0] sender reciver reciver_numa sender_numa")
#sender = sys.argv[1]
#reciver = sys.argv[2]
#reciver_numa = sys.argv[3]
#sender_numa = sys.argv[4]
errorlog = open("/tmp/stderr.log", "wb")
outlog = open("/tmp/out.log", "wb")
get_nic_id_cmd = "~/auto_oobw-master/get_nic_id"
out, err = subprocess.Popen(
"/usr/bin/nss_yhpc_ssh root@{} ".format(sender) + get_nic_id_cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=errorlog,
).communicate()
out = out.decode("ascii")
sender_nic_id = out.split("\n")[0]
get_vp_cmd = "cat /proc/glex/nic0/endpoints"
out, err = subprocess.Popen(
"/usr/bin/nss_yhpc_ssh root@{} ".format(reciver) + get_nic_id_cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=errorlog,
).communicate()
out = out.decode("ascii")
reciver_nic_id = out.split("\n")[0]
out, err = subprocess.Popen(
"/usr/bin/nss_yhpc_ssh root@{} ".format(sender) + get_vp_cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=errorlog,
).communicate()
sender_available_vp = get_available_vp(out.decode("ascii"))
out, err = subprocess.Popen(
"/usr/bin/nss_yhpc_ssh root@{} ".format(reciver) + get_vp_cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=errorlog,
).communicate()
reciver_available_vp = get_available_vp(out.decode("ascii"))
print("starting reciver")
#result_file = '{}.to.{}/txt/{}_send_{}_recv_send_numa_{}_recv_numa_{}.txt'.format(sender, reciver, sender, reciver, sender_numa, reciver_numa)
#if os.access(result_file, os.F_OK):
# exit(1)
reciver_test_cmd = "/usr/bin/nss_yhpc_ssh root@{} numactl --cpunodebind={} --localalloc /usr/local/glex/examples/oo_bw_r {} {} {} 0x100000".format(
reciver, reciver_numa, reciver_available_vp, sender_nic_id, sender_available_vp
)
sender_test_cmd = "/usr/bin/nss_yhpc_ssh root@{} numactl --cpunodebind={} --localalloc /usr/local/glex/examples/oo_bw_s {} {} {} 0x100000".format(
sender, sender_numa, sender_available_vp, reciver_nic_id, reciver_available_vp
)
#print(reciver_test_cmd)
#print(sender_test_cmd)
pid = os.fork()
if pid == 0:
# os.system(reciver_test_cmd)
result = subprocess.run(reciver_test_cmd, shell=True, capture_output=True, text=True)
exit(0)
# print(result.stdout)
else:
time.sleep(1)
result = subprocess.run(sender_test_cmd, shell=True, capture_output=True, text=True)
#print(result.stdout)
lines = result.stdout.split('\n')
pattern = re.compile(r'\s+')
#print(type(lines[21:]))
values = re.split(pattern, lines[21:][0])
#print(values)
if len(values) >= 3:
latency = float(values[1])
bandwidth = float(values[2])
# print(latency, bandwidth)
# os.system(sender_test_cmd)
return latency, bandwidth
def get_oobw_mpi(sender,reciver,reciver_numa,sender_numa,result_queue):
latency = 0
bandwidth = 0
# get inputs
os.system("rm ./receiver.lock ./result.txt >/dev/null 2>&1")
#print("sys.arg[0] sender reciver reciver_numa sender_numa")
#sender = sys.argv[1]
#reciver = sys.argv[2]
#reciver_numa = sys.argv[3]
#sender_numa = sys.argv[4]
errorlog = open("/tmp/stderr.log", "wb")
outlog = open("/tmp/out.log", "wb")
get_nic_id_cmd = "~/auto_oobw-master/get_nic_id"
out, err = subprocess.Popen(
"/usr/bin/nss_yhpc_ssh root@{} ".format(sender) + get_nic_id_cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=errorlog,
).communicate()
out = out.decode("ascii")
sender_nic_id = out.split("\n")[0]
get_vp_cmd = "cat /proc/glex/nic0/endpoints"
out, err = subprocess.Popen(
"/usr/bin/nss_yhpc_ssh root@{} ".format(reciver) + get_nic_id_cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=errorlog,
).communicate()
out = out.decode("ascii")
reciver_nic_id = out.split("\n")[0]
out, err = subprocess.Popen(
"/usr/bin/nss_yhpc_ssh root@{} ".format(sender) + get_vp_cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=errorlog,
).communicate()
sender_available_vp = get_available_vp(out.decode("ascii"))
out, err = subprocess.Popen(
"/usr/bin/nss_yhpc_ssh root@{} ".format(reciver) + get_vp_cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=errorlog,
).communicate()
reciver_available_vp = get_available_vp(out.decode("ascii"))
print("starting reciver")
#result_file = '{}.to.{}/txt/{}_send_{}_recv_send_numa_{}_recv_numa_{}.txt'.format(sender, reciver, sender, reciver, sender_numa, reciver_numa)
#if os.access(result_file, os.F_OK):
# exit(1)
reciver_test_cmd = "/usr/bin/nss_yhpc_ssh root@{} numactl --cpunodebind={} --localalloc /usr/local/glex/examples/oo_bw_r {} {} {} 0x100000".format(
reciver, reciver_numa, reciver_available_vp, sender_nic_id, sender_available_vp
)
sender_test_cmd = "/usr/bin/nss_yhpc_ssh root@{} numactl --cpunodebind={} --localalloc /usr/local/glex/examples/oo_bw_s {} {} {} 0x100000".format(
sender, sender_numa, sender_available_vp, reciver_nic_id, reciver_available_vp
)
#print(reciver_test_cmd)
#print(sender_test_cmd)
pid = os.fork()
if pid == 0:
# os.system(reciver_test_cmd)
result = subprocess.run(reciver_test_cmd, shell=True, capture_output=True, text=True)
exit(0)
# print(result.stdout)
else:
time.sleep(1)
result = subprocess.run(sender_test_cmd, shell=True, capture_output=True, text=True)
#print(result.stdout)
try:
lines = result.stdout.split('\n')
pattern = re.compile(r'\s+')
#print(type(lines[21:]))
values = re.split(pattern, lines[21:][0])
#print(values)
if len(values) >= 3:
latency = float(values[1])
bandwidth = float(values[2])
# print(latency, bandwidth)
# os.system(sender_test_cmd)
except Exception as e:
pass
result = (sender,reciver,reciver_numa,sender_numa,latency, bandwidth)
result_queue.put(result)
#return latency, bandwidth
def sam_get_min_oobw_mpi(input_str):
start_time = time.time()
ranges = re.findall(r"\d+-\d+|\d+", input_str)
node_names = []
for item in ranges:
if '-' in item:
start, end = map(int, item.split('-'))
for i in range(start, end + 1):
node_names.append(f"cn{i}")
else:
node_names.append(f"cn{item}")
# print(node_names)
# print(len(node_names))
random_pairs = random.sample(node_names, k=len(node_names))
random_pairs = [(random_pairs[i], random_pairs[i + 1]) for i in range(0, len(random_pairs), 2)]
print(random_pairs)
# for i in range(0,len(random_pairs)):
# print(random_pairs[i][0],random_pairs[i][1])
numa = [(0, 0), (0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7),
(1, 0), (1, 1), (1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (1, 7),
(2, 0), (2, 1), (2, 2), (2, 3), (2, 4), (2, 5), (2, 6), (2, 7),
(3, 0), (3, 1), (3, 2), (3, 3), (3, 4), (3, 5), (3, 6), (3, 7),
(4, 0), (4, 1), (4, 2), (4, 3), (4, 4), (4, 5), (4, 6), (4, 7),
(5, 0), (5, 1), (5, 2), (5, 3), (5, 4), (5, 5), (5, 6), (5, 7),
(6, 0), (6, 1), (6, 2), (6, 3), (6, 4), (6, 5), (6, 6), (6, 7),
(7, 0), (7, 1), (7, 2), (7, 3), (7, 4), (7, 5), (7, 6), (7, 7),
]
random_four = random.sample(numa, 5)
mb = 2000000
ml = 2000000
results = []
processes = []
result_queue = multiprocessing.Queue()
for i in range(0, len(random_pairs)):
process = multiprocessing.Process(target=get_oobw_mpi, args=(
random_pairs[i][0], random_pairs[i][1], random_four[0][0], random_four[0][1], result_queue))
process.start()
processes.append(process)
for process in processes:
process.join()
while not result_queue.empty():
result = result_queue.get()
results.append(result)
for result in results:
node_name1, node_name2, numa1, numa2, latency, bandwidth = result
print(node_name1, node_name2, numa1, numa2, latency, bandwidth)
if bandwidth < mb:
mb = bandwidth
ml = latency
results = []
processes = []
result_queue = multiprocessing.Queue()
for i in range(0, len(random_pairs)):
process = multiprocessing.Process(target=get_oobw_mpi, args=(
random_pairs[i][0], random_pairs[i][1], random_four[1][0], random_four[1][1], result_queue))
process.start()
processes.append(process)
for process in processes:
process.join()
while not result_queue.empty():
result = result_queue.get()
results.append(result)
for result in results:
node_name1, node_name2, numa1, numa2, latency, bandwidth = result
print(node_name1, node_name2, numa1, numa2, latency, bandwidth)
if bandwidth < mb:
mb = bandwidth
ml = latency
results = []
processes = []
result_queue = multiprocessing.Queue()
for i in range(0, len(random_pairs)):
process = multiprocessing.Process(target=get_oobw_mpi, args=(
random_pairs[i][0], random_pairs[i][1], random_four[2][0], random_four[2][1], result_queue))
process.start()
processes.append(process)
for process in processes:
process.join()
while not result_queue.empty():
result = result_queue.get()
results.append(result)
for result in results:
node_name1, node_name2, numa1, numa2, latency, bandwidth = result
print(node_name1, node_name2, numa1, numa2, latency, bandwidth)
if bandwidth < mb:
mb = bandwidth
ml = latency
results = []
processes = []
result_queue = multiprocessing.Queue()
for i in range(0, len(random_pairs)):
process = multiprocessing.Process(target=get_oobw_mpi, args=(
random_pairs[i][0], random_pairs[i][1], random_four[3][0], random_four[3][1], result_queue))
process.start()
processes.append(process)
for process in processes:
process.join()
while not result_queue.empty():
result = result_queue.get()
results.append(result)
for result in results:
node_name1, node_name2, numa1, numa2, latency, bandwidth = result
print(node_name1, node_name2, numa1, numa2, latency, bandwidth)
if bandwidth < mb:
mb = bandwidth
ml = latency
results = []
processes = []
result_queue = multiprocessing.Queue()
for i in range(0, len(random_pairs)):
process = multiprocessing.Process(target=get_oobw_mpi, args=(
random_pairs[i][0], random_pairs[i][1], random_four[4][0], random_four[4][1], result_queue))
process.start()
processes.append(process)
for process in processes:
process.join()
while not result_queue.empty():
result = result_queue.get()
results.append(result)
for result in results:
node_name1, node_name2, numa1, numa2, latency, bandwidth = result
print(node_name1, node_name2, numa1, numa2, latency, bandwidth)
if bandwidth < mb:
mb = bandwidth
ml = latency
print(ml, mb)
end_time = time.time()
# 计算执行时间
execution_time = end_time - start_time
print("程序执行时间:", execution_time, "秒")
return ml, mb