torch分布式通信基础

news2024/11/24 9:45:57

torch分布式通信基础

  • 1. 点到点通信
  • 2. 集群通信

官网文档:WRITING DISTRIBUTED APPLICATIONS WITH PYTORCH

1. 点到点通信

在这里插入图片描述

# 同步,peer-2-peer数据传递
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def test_send_recv_sync(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
        tensor += 1
        dist.send(tensor=tensor, dst=1) # 需要指定dst,发送的目标
    else:
        dist.recv(tensor=tensor, src=0) # 需要指定src,从哪儿接收
    print('Rank ', rank, ' has data ', tensor[0])

# 异步
def test_send_recv_async(rank, size):
    tensor = torch.zeros(1)
    req = None
    if rank == 0:
        tensor += 1
        req = dist.isend(tensor=tensor, dst=1)
    else:
        req = dist.irecv(tensor=tensor, src=0)
    req.wait()
    print('Rank ', rank, ' has data ', tensor[0])

def init_process(rank, size, backend='gloo'):
    """ 这里初始化分布式环境,设定Master机器以及端口号 """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29598'
    dist.init_process_group(backend, rank=rank, world_size=size)
    #test_send_recv_sync(rank, size)
    test_send_recv_async(rank, size)

if __name__ == "__main__":
    size = 2
    processes = []
    mp.set_start_method("spawn")
    for rank in range(size):
        p = mp.Process(target=init_process, args=(rank, size))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

2. 集群通信

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def test_broadcast(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
      tensor += 2
    else:
      tensor += 1
    dist.broadcast(tensor=tensor,src=0) # src指定broad_cast的源
    print("******test_broadcast******")
    print('Rank ', rank, ' has data ', tensor) # 结果都是 2

def test_scatter(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
      tensor_list = [torch.tensor([1.0]), torch.tensor([2.0]), torch.tensor([3.0]), torch.tensor([4.0])]
      dist.scatter(tensor, scatter_list = tensor_list, src = 0)
    else:
      dist.scatter(tensor, scatter_list = [], src = 0)
    print("******test_scatter******")
    print('Rank ', rank, ' has data ', tensor) # 结果是[[1], [2], [3], [4]]

def test_reduce(rank, size):
    tensor = torch.ones(1)
    dist.reduce(tensor=tensor, dst=0) # dst指定哪个进程进行reduce, 默认操作是加法
    print("******test_reduce******")
    print('Rank ', rank, ' has data ', tensor)

def test_all_reduce(rank, size):
    tensor = torch.ones(1)
    dist.all_reduce(tensor=tensor,op=dist.ReduceOp.SUM)
    print("******test_all_reduce******")
    print('Rank ', rank, ' has data ', tensor)  # 结果都是 4

def test_gather(rank, size):
    tensor = torch.ones(1)
    if rank == 0:
      output = [torch.zeros(1) for _ in range(size)]
      dist.gather(tensor, gather_list=output, dst=0)
    else:
      dist.gather(tensor, gather_list=[], dst=0)
    if rank == 0:
      print("******test_gather******")
      print('Rank ', rank, ' has data ', output)  # 结果是 [[1,1,1,1]]

def test_all_gather(rank, size):
    output = [torch.zeros(1) for _ in range(size)]
    tensor = torch.ones(1)
    dist.all_gather(output, tensor)
    print("******test_all_gather******")
    print('Rank ', rank, ' has data ', output)  # 结果都是 [1,1,1,1]

def init_process(rank, size, backend='gloo'):
    """ 这里初始化分布式环境,设定Master机器以及端口号 """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29596'
    dist.init_process_group(backend, rank=rank, world_size=size)
    test_reduce(rank, size)
    test_all_reduce(rank, size)
    test_gather(rank, size)
    test_all_gather(rank, size)
    test_broadcast(rank, size)
    test_scatter(rank, size)

if __name__ == "__main__":
    size = 4
    processes = []
    mp.set_start_method("spawn")
    for rank in range(size):
        p = mp.Process(target=init_process, args=(rank, size))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

需要注意的一点是:
这里面的调用都是同步的,可以理解为,每个进程都调用到通信api时,真正的有效数据传输才开始,然后通信完成之后,代码继续往下跑。实际上有些通信进程并不获取数据,这些进程可能并不会被阻塞。

文档最后,提供了一个简单的类似 DDP 的实现,里面核心的部分就是:
在这里插入图片描述
在这里插入图片描述
这也进一步阐释了DDP的核心逻辑:
反向计算完成之后,汇总梯度信息(求均值),然后再更新参数

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/748081.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

热点探测技术架构设计与实践

1. 概述 说到热点问题,首先我们先理解一下什么是热点? 热点通常意义来说,是指在一段时间内,被广泛关注的物品或事件,例如微博热搜,热卖商品,热点新闻,明星直播等等,所以…

领域知识图谱的医生推荐系统:利用BERT+CRF+BiLSTM的医疗实体识别,建立医学知识图谱,建立知识问答系统

项目设计集合(人工智能方向):助力新人快速实战掌握技能、自主完成项目设计升级,提升自身的硬实力(不仅限NLP、知识图谱、计算机视觉等领域):汇总有意义的项目设计集合,助力新人快速实…

Xcode报错--访问keychain,出现弹窗处理方案

情景 访问keychain弹出弹窗&#xff0c;不想人工点击&#xff0c;比如自动化测试中使用keychain中的证书的情况 原因 Mac的保护机制 处理 1、人工&#xff1a;输入Password&#xff0c;点击Allow或者Always Allow 2、命令行处理 security unlock-keychain -p "<…

Spring @RequestMapping 工作原理

Spring RequestMapping 工作原理 配置基础启动类及Controller类 SpringBootApplication public class DemoServiceApplication {public static void main(String[] args) {SpringApplication.run(DemoServiceApplication.class, args);} }RestController public class HelloC…

列表定义状态比较不错的UI写法

<el-table-columnprop"status"label"状态"align"left":formatter"formatTd" ><template slot-scope"scope"><span class"grayStatus" v-if"scope.row.status 1">未开始</span>…

Linux安装最新版的gcc13.1.0编译器,支持c++20、23

Linux安装最新版的gcc13.1.0编译器&#xff0c;支持c20、23 最近在写c20的代码&#xff0c;所以需要升级支持c20及23的编译器&#xff0c;貌似gcc11就已经支持了c20了&#xff0c;但是我这里选择了最新的13.1版本。本文全程实操&#xff0c;上机验证通过。 查看gcc版本 gcc -v…

MySql 高级-0711

3. 查询截取分析 分析 分析&#xff0c;至少跑一天&#xff0c;看看生产的慢 SQL 情况开启慢查询日志&#xff0c;设置阙值&#xff0c;比如超过5秒钟的就是慢SQL&#xff0c;并将它抓取出来。explain慢SQL分析Show Profile运维经理 or DBA 进行SQL 数据库服务器的参数调优 总…

QT-QRegExp和QRegularExpression

1.QRegExp qt5.0版本之前正则表示示类是QRegExp,通过它能够筛选出我们想要的数据,它的构造函数如下所示: QRegExp::QRegExp(const QString &pattern, Qt::CaseSensitivity cs Qt::CaseSensitive, QRegExp::PatternSyntax syntax); 其中QRegExp::PatternSyntax syntax用…

【Leetcode】面试题 02.07. 链表相交

给你两个单链表的头节点 headA 和 headB &#xff0c;请你找出并返回两个单链表相交的起始节点。如果两个链表没有交点&#xff0c;返回 null 。 图示两个链表在节点 c1 开始相交&#xff1a; 一看不会&#xff0c;一看答案就懂 我们求出两个链表的长度&#xff0c;并求出两个…

【新版系统架构】第十九章-大数据架构设计理论与实践

大数据处理系统架构 大数据处理系统面临挑战 如何利用信息技术等手段处理非结构化和半结构化数据如何探索大数据复杂性、不确定性特征描述的刻画方法及大数据的系统建模数据异构性与决策异构性的关系对大数据知识发现与管理决策的影响 大数据处理系统架构特征 鲁棒性和容错…

分布式ELK 企业级日志分析系统

一、ELK的相关知识 1.ELK简介 ELK平台是一套完整的日志集中处理解决方案&#xff0c;将 ElasticSearch、Logstash 和 Kiabana 三个开源工具配合使用&#xff0c; 完成更强大的用户对日志的查询、排序、统计需求。 ElasticSearch&#xff1a;是基于Lucene&#xff08;一个全文检…

【JUC进阶】12. 环形缓冲区

目录 1、前言 2、基本概述 2.1、什么是环形缓冲区 2.2、结构刨析 2.3、优点 2.4、缺点 3、如何使用 3.1、定义一个环形缓冲区 3.2、Demo使用 1、前言 上一篇《【JUC进阶】11. BlockingQueue》中介绍到ArrayBlockingQueue&#xff0c;在物理上是一个数组&#xff0c;但…

安科瑞智能母线监控在数据中心的应用

引言&#xff1a; 近年来&#xff0c;随着母线槽在建筑及工厂的配电中越来越广泛&#xff0c;母线槽场景运用的越多&#xff0c;随着数据中心建设的快速发展和更高需求&#xff0c;智能母线系统逐渐被应用于机房的末端配电中&#xff0c;具有电流小、插接方便、智能化程度高等…

一百二十八、Kettle——从Hive增量导入到ClickHouse

一、目标 用Kettle把Hive的DWS层数据增量导入到ClickHouse中 工具版本&#xff1a;Kettle&#xff1a;8.2 Hive:3.1.2 ClickHouse21.9.5.16 全量导入请访问拙作链接 http://t.csdn.cn/Rqvuvhttp://t.csdn.cn/Rqvuv 二、前提准备 &#xff08;一&#xff09;kettl…

如何在 .NET Core 中使用 Azure Key Vaul

Azure Key Vault是一个安全可靠的存储库&#xff0c;用于存储在.NET Core应用程序中使用的令牌、密钥、密码、证书和其他敏感数据。接下来我们讲讲如何在C#中使用它。 在构建.NET Core应用程序时&#xff0c;我们经常使用各种“秘密”&#xff0c;如客户端ID、访问令牌、密码、…

我的第一个java项目

安装了idea软件且本地通过cmd 命令启动了mysql。还安装了java sdk。 总结spring-boot通过resource下的mapping文件下的文件xml语法来增删查改(因为使用了MyBatis&#xff0c;MyBatis 的真正强大在于它的语句映射&#xff0c;这是它的魔力所在。由于它的异常强大&#xff0c;映…

7.12 模型显存/mix-precision

一、完全参考&#xff1a;模型的显存和参数量计算 显存占用模型显存(参数)batch_size每个样本显存(输出和梯度动量) 首先是“运算量”和“参数量”两个概念&#xff1a;参数量&#xff1a;这个比较好理解&#xff0c;例如卷积层中的卷积核c_i*k*k*n_o&#xff0c;其参数量就是相…

CAN转EtherNet/IP网关ethernet/ip协议

JM-EIP-CAN 是自主研发的一款 ETHERNET/IP 从站功能的通讯网关。该产品主要功能是将各种 CAN 总线和 ETHERNET/IP 网络连接起来。 本网关连接到 ETHERNET/IP 总线中做为从站使用&#xff0c;连接到 CAN 总线中根据节点号进行读写。 技术参数 ETHERNET/IP 技术参数 网关做为 E…

【计算机网络】第三章 数据链路层(集线器与交换机)

文章目录 第三章 数据链路层3.8 集线器与交换机总结 第三章 数据链路层 3.8 集线器与交换机 使用 集线器HUB 的以太网在逻辑上仍是一个总线网&#xff0c;各站共享总线资源&#xff0c;使用的还是 CSMA/CD 协议&#xff08;半双工&#xff09;。集线器 只工作在物理层&#xff…

UG\NX二次开发 返回视图中的可见对象UF_VIEW_ask_visible_objects

文章作者:里海 来源网站:https://blog.csdn.net/WangPaiFeiXingYuan 简介: 返回视图中的可见对象UF_VIEW_ask_visible_objects 效果: 代码: #include "me.hpp" using namespace std; //获取view视图的可见对象 //view = NULL_TAG 当前视图 vector<tag_t>…