【分布式通信】NPKit,NCCL的Profiling工具

news2024/12/21 18:58:23

NPKit介绍

NPKit (Networking Profiling Kit) is a profiling framework designed for popular collective communication libraries (CCLs), including Microsoft MSCCL, NVIDIA NCCL and AMD RCCL.
It enables users to insert customized profiling events into different CCL components, especially into giant GPU kernels.
These events are then automatically placed onto a unified timeline in Google Trace Event Format, which users can then leverage trace viewer to understand CCLs’ workflow and performance.

以NCCL为例,如何使用?

Usage

  1. NCCL 2.17.1-1版本,将文件夹下的 npkit-for-nccl-2.17.1-1.diff 添加到你的nccl源文件中。

  2. NPKit只有在CPU和GPU没以后overlap的时候使用,所以 NPKIT_FLAGS 也要遵从这个规则。同时 npkit_launcher.sh里面的参数也要对应正确。

  3. nccl_testnpkit_runner.sh对应参数正确. 仅支持每个线程有1个GPU, 因此nccl_test运行参数记得是 -g 1

  4. 运行bash npkit_launcher.sh.

  5. 生成文件 npkit_event_trace.json ,可以用谷歌浏览器打开看。在浏览器那一栏输入chrome://tracing, 然后打开对应文件即可。

在这里插入图片描述

import argparse
import os
import json

from queue import Queue

def parse_npkit_event_header(npkit_event_header_path):
    npkit_event_def = {'id_to_type': {}, 'type_to_id': {}}
    with open(npkit_event_header_path, 'r') as f:
        lines = [x.strip() for x in f.readlines() if len(x.strip()) != 0]
        line_idx = 0
        while line_idx < len(lines):
            if lines[line_idx].startswith('#define NPKIT_EVENT_'):
                fields = lines[line_idx].split()
                if len(fields) == 3:
                    event_type = fields[1]
                    event_id = int(fields[2], 0)
                    npkit_event_def['type_to_id'][event_type] = event_id
                    npkit_event_def['id_to_type'][event_id] = event_type
            line_idx += 1
    return npkit_event_def

def parse_gpu_clock_scale(gpu_clock_file_path):
    with open(gpu_clock_file_path, 'r') as f:
        freq_in_khz = f.read()
        return float(freq_in_khz) * 1e3 / 1e6

def parse_cpu_clock_scale(cpu_clock_den_file_path, cpu_clock_num_file_path):
    with open(cpu_clock_num_file_path, 'r') as f:
        num = float(f.read())
    with open(cpu_clock_den_file_path, 'r') as f:
        den = float(f.read())
    return den / num / 1e6

def parse_gpu_event(event_bytes):
    return {
        'id': int.from_bytes(event_bytes[0:1], byteorder='little', signed=False),
        'size': int.from_bytes(event_bytes[1:5], byteorder='little', signed=False),
        'rsvd': int.from_bytes(event_bytes[5:8], byteorder='little', signed=False),
        'timestamp': int.from_bytes(event_bytes[8:16], byteorder='little', signed=False)
    }

def parse_cpu_event(event_bytes):
    return {
        'id': int.from_bytes(event_bytes[0:1], byteorder='little', signed=False),
        'size': int.from_bytes(event_bytes[1:5], byteorder='little', signed=False),
        'slot': int.from_bytes(event_bytes[5:8], byteorder='little', signed=False),
        'timestamp': int.from_bytes(event_bytes[8:16], byteorder='little', signed=False)
    }

def parse_gpu_event_file(npkit_dump_dir, npkit_event_def, rank, buf_idx, gpu_clock_scale, cpu_clock_scale):
    gpu_event_file_path = os.path.join(npkit_dump_dir, 'gpu_events_rank_%d_buf_%d' % (rank, buf_idx))
    raw_event_size = 16
    curr_cpu_base_time = None
    curr_gpu_base_time = None
    gpu_events = []
    event_type_to_seq = {}
    with open(gpu_event_file_path, 'rb') as f:
        raw_content = f.read()
        raw_content_size = len(raw_content)
        raw_content_idx = 0
        while raw_content_idx < raw_content_size:
            parsed_gpu_event = parse_gpu_event(raw_content[raw_content_idx : raw_content_idx + raw_event_size])
            if npkit_event_def['id_to_type'][parsed_gpu_event['id']] == 'NPKIT_EVENT_TIME_SYNC_CPU':
                curr_cpu_base_time = parsed_gpu_event['timestamp'] / cpu_clock_scale
                curr_gpu_base_time = None
            elif npkit_event_def['id_to_type'][parsed_gpu_event['id']] == 'NPKIT_EVENT_TIME_SYNC_GPU':
                if curr_gpu_base_time is None:
                    curr_gpu_base_time = parsed_gpu_event['timestamp'] / gpu_clock_scale
            else:
                if curr_gpu_base_time is None:
                    curr_gpu_base_time = parsed_gpu_event['timestamp'] / gpu_clock_scale
                event_type = npkit_event_def['id_to_type'][parsed_gpu_event['id']]
                phase = 'B' if event_type.endswith('_ENTRY') else 'E'
                gpu_events.append({
                    'ph': phase,
                    'ts': curr_cpu_base_time + parsed_gpu_event['timestamp'] / gpu_clock_scale - curr_gpu_base_time,
                    'pid': rank,
                    'tid': buf_idx + 1
                })
                if phase == 'B':
                    if event_type not in event_type_to_seq:
                        event_type_to_seq[event_type] = 0
                    gpu_events[-1].update({
                        'name': event_type,
                        'cat': 'GPU',
                        'args': {
                            'rank': rank,
                            'buf_idx': buf_idx,
                            'seq': event_type_to_seq[event_type],
                            'rsvd_0': parsed_gpu_event['rsvd'],
                            'size_0': parsed_gpu_event['size']
                        }
                    })
                    event_type_to_seq[event_type] += 1
                else:
                    gpu_events[-1]['args'] = {'size': parsed_gpu_event['size'], 'rsvd': parsed_gpu_event['rsvd']}
                    delta_time = gpu_events[-1]['ts'] - gpu_events[-2]['ts']
                    gpu_events[-1]['args']['bw (GB/s)'] = 0. if delta_time == 0. else gpu_events[-1]['args']['size'] / delta_time / 1e3
            raw_content_idx += raw_event_size
    return gpu_events

def parse_cpu_event_file(npkit_dump_dir, npkit_event_def, rank, channel, cpu_clock_scale):
    cpu_event_file_path = os.path.join(npkit_dump_dir, 'cpu_events_rank_%d_channel_%d' % (rank, channel))
    raw_event_size = 16
    cpu_events = []
    event_type_to_seq = {}

    fiber_is_usable = []
    fiber_open_ts = []
    slot_to_fiber_id = {}
    channel_shift = 1000

    with open(cpu_event_file_path, 'rb') as f:
        raw_content = f.read()
        raw_content_size = len(raw_content)
        raw_content_idx = 0
        while raw_content_idx < raw_content_size:
            parsed_cpu_event = parse_cpu_event(raw_content[raw_content_idx : raw_content_idx + raw_event_size])
            event_type = npkit_event_def['id_to_type'][parsed_cpu_event['id']]
            phase = 'B' if event_type.endswith('_ENTRY') else 'E'
            cpu_events.append({
                'ph': phase,
                'ts': parsed_cpu_event['timestamp'] / cpu_clock_scale,
                'pid': rank
            })
            slot = parsed_cpu_event['slot']
            if phase == 'B':
                # Open fiber event
                fiber_id = 0
                while fiber_id < len(fiber_is_usable):
                    if fiber_is_usable[fiber_id]:
                        break
                    fiber_id += 1
                if fiber_id == len(fiber_is_usable):
                    fiber_is_usable.append(True)
                    fiber_open_ts.append(0.0)
                slot_to_fiber_id[slot] = fiber_id
                fiber_open_ts[fiber_id] = cpu_events[-1]['ts']
                fiber_is_usable[fiber_id] = False

                if event_type not in event_type_to_seq:
                    event_type_to_seq[event_type] = 0
                cpu_events[-1].update({
                    'name': event_type,
                    'cat': 'CPU',
                    'args': {
                        'rank': rank,
                        'channel': channel,
                        'slot': parsed_cpu_event['slot'],
                        'seq': event_type_to_seq[event_type],
                        'size_0': parsed_cpu_event['size']
                    }
                })
                event_type_to_seq[event_type] += 1
            else:
                # Close fiber event
                fiber_id = slot_to_fiber_id[slot]
                slot_to_fiber_id.pop(slot)
                last_ts = fiber_open_ts[fiber_id]
                fiber_is_usable[fiber_id] = True

                delta_time = max(0.001, cpu_events[-1]['ts'] - last_ts)
                cpu_events[-1]['args'] = {'size': parsed_cpu_event['size']}
                cpu_events[-1]['args']['bw (GB/s)'] = 0. if delta_time == 0. else cpu_events[-1]['args']['size'] / delta_time / 1e3

            cpu_events[-1]['tid'] = fiber_id + (channel + 1) * channel_shift

            raw_content_idx += raw_event_size
    return cpu_events

def convert_npkit_dump_to_trace(npkit_dump_dir, output_dir, npkit_event_def):
    files_in_dump_dir = next(os.walk(npkit_dump_dir))[2]
    gpu_event_files = [x for x in files_in_dump_dir if x.startswith('gpu_events_rank_')]
    cpu_event_files = [x for x in files_in_dump_dir if x.startswith('cpu_events_rank_')]

    ranks = list(set([int(x.split('_rank_')[1].split('_')[0]) for x in gpu_event_files]))
    buf_indices = list(set([int(x.split('_buf_')[1].split('_')[0]) for x in gpu_event_files]))
    channels = list(set([int(x.split('_channel_')[1].split('_')[0]) for x in cpu_event_files]))

    trace = {'traceEvents': []}

    for rank in ranks:
        cpu_clock_den_file_path = os.path.join(npkit_dump_dir, 'cpu_clock_period_den_rank_%d' % rank)
        cpu_clock_num_file_path = os.path.join(npkit_dump_dir, 'cpu_clock_period_num_rank_%d' % rank)
        cpu_clock_scale = parse_cpu_clock_scale(cpu_clock_den_file_path, cpu_clock_num_file_path)

        gpu_clock_file_path = os.path.join(npkit_dump_dir, 'gpu_clock_rate_rank_%d' % rank)
        gpu_clock_scale = parse_gpu_clock_scale(gpu_clock_file_path)

        for buf_idx in buf_indices:
            gpu_events = parse_gpu_event_file(npkit_dump_dir, npkit_event_def, rank, buf_idx, gpu_clock_scale, cpu_clock_scale)
            trace['traceEvents'].extend(gpu_events)

        for channel in channels:
            cpu_events = parse_cpu_event_file(npkit_dump_dir, npkit_event_def, rank, channel, cpu_clock_scale)
            trace['traceEvents'].extend(cpu_events)

    trace['traceEvents'].sort(key=lambda x : x['ts'])
    trace['displayTimeUnit'] = 'ns'

    os.makedirs(output_dir, exist_ok=True)
    with open(os.path.join(output_dir, 'npkit_event_trace.json'), 'w') as f:
        json.dump(trace, f)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--npkit_dump_dir', type=str, required=True, help='NPKit dump directory.')
    parser.add_argument('--npkit_event_header_path', type=str, required=True, help='Path to npkit_event.h.')
    parser.add_argument('--output_dir', type=str, required=True, help='Path to output directory.')
    args = parser.parse_args()

    npkit_event_def = parse_npkit_event_header(args.npkit_event_header_path)
    convert_npkit_dump_to_trace(args.npkit_dump_dir, args.output_dir, npkit_event_def)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1625289.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【JavaScriptThreejs】判断路径在二维平面上投影的方向顺逆时针

原理分析 可以将路径每个连续的两点向量叉乘相加&#xff0c;根据正负性判断路径顺逆时针性 当我们计算两个向量的叉积时&#xff0c;结果是一个新的向量&#xff0c;其方向垂直于这两个向量所在的平面&#xff0c;并且其大小与这两个向量构成的平行四边形的面积成正比。这个新…

Android 组件提供的状态保存(saveInstanceState)与恢复(restoreInstanceState)

在Android的组件Activity中&#xff0c;有这样一对方法: onSaveInstanceeState 和 onRestoreInstanceState 这两对方法&#xff0c;可以让我在Activiy被异常销毁时&#xff0c;保存状态&#xff1b;以及在Activity重建时&#xff0c;恢复状态。 比如&#xff1a;当我们在输入…

MATLAB 2024a软件下载安装教程

1-首先下载Matlab&#xff0c;以下迅雷云链接&#xff0c;里面有全版本的matlab&#xff0c;根据自己的需要下载即可&#xff0c;建议下载最新版的&#xff0c;功能会更多&#xff0c;当然内存也会更大。 迅雷云盘迅雷云盘https://pan.xunlei.com/s/VNgH_6VFav8Kas-tRfxAb3XOA…

Linux I2C(二) - I2C软硬件架构

1&#xff0c;I2C的总线拓扑 2&#xff0c;I2C S/W topology linux kernel I2C framework使用如下的软件拓扑抽象I2C硬件&#xff08;我们可以一起领会一下其中的“设备模型”思想&#xff09;&#xff1a; 1&#xff09;platform bus&#xff08;/sys/bus/platform&#xff0…

Oracle导出导入dmp等文件类型的多表数据的常用方法、遇见的常见问题和解决办法(exp无效sql???)

使用PLSQL执行导出表数据的时候有两种方法 1、使用Oracle命令【imp--exp】【impdp--expdp】 但是如果你的本机没有安装有Oracle数据库&#xff0c;使用的instant client远程连接服务器上的Oracle数据库时候&#xff0c;你没有Oracle数据库带有的exp.exe、imp.exe等扩展文件&a…

如何高效跟进项目进度?试试禅道几个功能

禅道提供了一系列功能和工具&#xff0c;可实现项目进度的有效管理和跟进&#xff0c;极大提升项目管理效率。禅道中的项目进度来源于迭代进度&#xff0c;迭代的进度又来源于任务的消耗和剩余工时&#xff0c;可通过以下功能有效跟进项目进展。 一、燃尽图 在禅道里&#xf…

机器学习day1

一、人工智能三大概念 人工智能三大概念 人工智能&#xff08;AI&#xff09;、机器学习&#xff08;ML&#xff09;和深度学习&#xff08;DL&#xff09; 人工智能&#xff1a;人工智能是研究计算代理的合成和分析的领域。人工智能是使用计算机来模拟&#xff0c;而不是人类…

【办公类-22-14】周计划系列(5-5)“周计划-05 周计划表格内教案部分“节日”清空改成“节日“” (2024年调整版本)Win32

背景需求&#xff1a; 本学期19周&#xff0c;用了近10周的时间&#xff0c;终于把周计划教案部分的内容补全了&#xff08;把所有教案、反思的文字都撑满一个单元格&#xff09;&#xff0c; 一、原始教案 二、新模板内的教案 三、手动添加文字后的样式&#xff08;修改教案…

庐山研习班上介绍的25个LINUX工具

从2013年的第一届算起&#xff0c;庐山研习班走过十余个年头&#xff0c;办了十几次了。但每一次&#xff0c;都有很多不一样。即使是相同的主题&#xff0c;也有很大差异。 今年春季的庐山研习班是在上个周末。周四晚上我和大部分同学都到了五老峰脚下的训练基地。 除了周六下…

【C++ STL序列容器】list 双向链表

文章目录 【 1. 基本原理 】【 2. list 的创建 】2.1 创建1个空的 list2.2 创建一个包含 n 个元素的 list&#xff08;默认值&#xff09;2.3 创建一个包含 n 个元素的 list&#xff08;赋初值&#xff09;2.4 通过1个 list 初始化另一个 list2.5 拷贝其他类型容器的指定元素创…

HNCTF 2022 week1 题解

自由才是生活主旋律。 [HNCTF 2022 Week1] Interesting_include <?php //WEB手要懂得搜索 //flag in ./flag.phpif(isset($_GET[filter])){$file $_GET[filter];if(!preg_match("/flag/i", $file)){die("error");}include($file); }else{highlight_…

CentOS7安装并配置Yearning并实现无公网IP远程SQL审核与数据查询

目录 ​编辑 前言 1. Linux 部署Yearning 2. 本地访问Yearning 3. Linux 安装cpolar 4. 配置Yearning公网访问地址 5. 公网远程访问Yearning管理界面 6. 固定Yearning公网地址 结语 前言 作者简介&#xff1a; 懒大王敲代码&#xff0c;计算机专业应届生 今天给大家聊聊…

Docker 的数据管理 端口映射 容器互联 镜像的创建

目录 概念 概念 管理 Docker 容器中数据主要有两种方式&#xff1a;数据卷&#xff08;Data Volumes&#xff09;和数据卷容器&#xff08;DataVolumes Containers&#xff09;。总结&#xff1a;因为容器数据是临时保存的为了安全&#xff0c;就要让数据保持持久化。 1&#…

qt QTreeWidget 学习

树形控件的节点可以有多层、多个子节点&#xff0c; 如果将子节点全部展开&#xff0c;那么每一行都是一个数据条目。QTreeWidgetItem 比较特殊&#xff0c;一个条目内部可以有多列数据信息&#xff0c;相当于表格控件一整行的表格单元集成为一个条目。 默认情况下&#xff0c;…

Methoxy-PEG-PLGA,mPEG-PLGA是一种可生物降解的两亲性嵌段共聚物

【试剂详情】 英文名称 mPEG-PLGA&#xff0c;Methoxy-PEG-Poly(lactide-co-glycolide)&#xff0c;Methoxy-PEG-PLGA&#xff0c; mPEG-Poly(lactide-co-glycolide) 中文名称 聚乙二醇单甲醚聚乳酸&#xff0c;乙醇酸两嵌段共聚物 外观性状 由分子量决定&#xff0c;液体…

调试记录 Flash 芯片 GD25LQ128ESIG 的程序烧录问题

1. 烧录工具 工具型号&#xff1a; VS4000P 2. 烧录问题 1. 烧录器选择烧录型号过程中没有看见 Flash 芯片 GD25LQ128ESIG 的型号。其中有GD25Q128E &#xff0c;但是三个选项的封装不对。 3. 解决过程 1. 尝试别的类型的芯片型号烧录。 A.GD25LQ80E(SOP8_200) B.GD25LQ64E(SOP…

IDEA 2024.1 配置 AspectJ环境

最近Java课设在学习AspectJ&#xff0c;做PPT顺便写一个博客 下载包 首先去AspectJ官网下载一个JAR包并安装 安装完最后可以按照他的建议配置一下 然后找到AspectJ的安装位置的lib目录&#xff0c;把三个包拷到自己项目中的lib目录下 由于最新版的IDEA已经不支持AspectJ了 所…

(八)Servlet教程——创建Web项目以及Servlet的实现

1. 打开Idea编辑器 2. 点击界面上的“新建项目”按钮 3. 设置好项目名称和位置 应用服务器选择之前设置好的Tomcat服务器 构建系统默认选择Maven 4. 点击“下一步”按钮 5. 点击“完成”按钮&#xff0c;Idea就创建好了项目&#xff0c;创建完成后的目录结构如下图所示 6. 此…

脉冲电源的直流斩波板设计总结(RC缓冲电路,输出电容选值)

IC的RC缓冲 总结一下过去电加工所的直流斩波板问题 1&#xff1a;电流突变问题 在独立式电火花脉冲电源里面&#xff0c;用电阻去限制电流&#xff0c;从而抑制当极间突变时的电流突变。 在非独立式的脉冲电源里面&#xff0c;电流平时是稳定在循环电感里面&#xff0c;当击…

ESLlint重大更新后,使用旧版ESLint搭配Prettier的配置方式

概要 就在前几天&#xff0c;ESLint迎来了一次重大更新&#xff0c;9.0.0版本&#xff0c;根据官方文档介绍&#xff0c;使用新版的先决条件是Node.js版本必须是18.18.0、20.9.0&#xff0c;或者是>21.1.0的版本&#xff0c;新版ESLint将不再直接支持以下旧版配置(非扁平化…