背景
上节我们介绍了IB SHARP的工作原理,进一步的,英伟达在Hopper架构机器中引入了第三代NVSwitch,就像机间IB SHARP一样,机内可以通过NVSwitch执行NVLink SHARP,简称nvls,这节我们会介绍下NVLink SHARP如何工作的。
后续为了方便都是以nranks为2举例的,但值得注意的是nranks为2实际上不会用到nvls。
图搜索
ncclResult_t ncclNvlsInit(struct ncclComm* comm) {
...
if (comm->nvlsSupport == 1) comm->nvlsChannels = std::max(comm->config.minCTAs, std::min(comm->config.maxCTAs, (int)ncclParamNvlsChannels()));
return ncclSuccess;
}
init过程中主要是判断是否支持,如果支持的话会将nvlsSupport设置为1,然后设置comm->nvlsChannels,这里需要注意下,这个是kernel实际启动的block数,而不是搜索出来的channel数。
然后开始执行搜索过程,nvls的channel和其他算法搜索出来的channel不太一样,我们具体看下
nvlsGraph.pattern = NCCL_TOPO_PATTERN_NVLS;
nvlsGraph.minChannels = 1;
nvlsGraph.maxChannels = MAXCHANNELS;
if (comm->nvlsSupport) {
NCCLCHECKGOTO(ncclTopoCompute(comm->topo, &nvlsGraph), ret, fail);
NCCLCHECKGOTO(ncclTopoPrintGraph(comm->topo, &nvlsGraph), ret, fail);
}
设置pattern为NCCL_TOPO_PATTERN_NVLS,然后开始搜索,通过pattern确定到backToNet和backToFirstRank均为-1。
ncclResult_t ncclTopoSearchRec(struct ncclTopoSystem* system, struct ncclTopoGraph* graph, struct ncclTopoGraph* saveGraph, int* time) {
int backToNet, backToFirstRank;
NCCLCHECK(ncclTopoSearchParams(system, graph->pattern, &backToNet, &backToFirstRank));
if (system->nodes[NET].count) {
} else {
if (graph->pattern == NCCL_TOPO_PATTERN_NVLS) {
NCCLCHECK(ncclTopoSearchTryGpu(system, graph, saveGraph, 0, backToNet, backToFirstRank, 0, time, -1, -1, graph->nChannels));
return ncclSuccess;
}
...
}
return ncclSuccess;
}
此时graph->nChannels为0。
ncclResult_t ncclTopoSearchTryGpu(struct ncclTopoSystem* system, struct ncclTopoGraph* graph, struct ncclTopoGraph* saveGraph, int step, int backToNet, int backToFirstRank, int forcedOrder, int *time, int type, int index, int g) {
const uint64_t flag = 1ULL<<(graph->nChannels);
struct ncclTopoNode* gpu;
NCCLCHECK(ncclTopoFollowPath(system, graph, type, index, GPU, g, 1, &gpu));
if (gpu) {
gpu->used ^= flag;
NCCLCHECK(ncclTopoSearchRecGpu(system, graph, saveGraph, gpu, step, backToNet, backToFirstRank, forcedOrder, time));
gpu->used ^= flag;
NCCLCHECK(ncclTopoFollowPath(system, graph, type, index, GPU, g, -1, &gpu));
}
return ncclSuccess;
}
由于type为-1,因此ncclTopoFollowPath直接返回gpu0,从gpu0开始搜索。
ncclResult_t ncclTopoSearchRecGpu(struct ncclTopoSystem* system, struct ncclTopoGraph* graph, struct ncclTopoGraph* saveGraph, struct ncclTopoNode* gpu, int step, int backToNet, int backToFirstRank, int forcedOrder, int *time) {
if ((*time) <= 0) return ncclSuccess;
(*time)--;
int ngpus = system->nodes[GPU].count;
if (step == ngpus) {
}
graph->intra[graph->nChannels*ngpus+step] = gpu->gpu.rank;
int g = gpu - system->nodes[GPU].nodes;
if (step == backToNet) {
} else if (graph->pattern == NCCL_TOPO_PATTERN_NVLS) {
NCCLCHECK(ncclTopoSearchTryNvls(system, graph, saveGraph, g, ngpus, time));
} else if (step < system->nodes[GPU].count-1) {
} else if (step == backToFirstRank) {
} else {
}
return ncclSuccess;
}
将0号GPU填到graph->intra,由于pattern为NCCL_TOPO_PATTERN_NVLS,因此直接执行ncclTopoSearchTryNvls。
ncclResult_t ncclTopoSearchTryNvls(struct ncclTopoSystem* system, struct ncclTopoGraph* graph, struct ncclTopoGraph* saveGraph, int g, int ngpus, int *time) {
struct ncclTopoNode* nvs;
struct ncclTopoNode* gpu;
int d0=0; // See if there is enough bandwidth for NVS->GPU traffic
do {
NCCLCHECK(ncclTopoFollowPath(system, graph, NVS, 0, GPU, d0, d0 == g ? 2 : 1, &gpu));
d0++;
} while (gpu && d0 < system->nodes[GPU].count);
if (gpu == NULL) {
d0--;
} else {
int d1=0; // See if there is enough bandwidth for GPU->NVS traffic
do {
NCCLCHECK(ncclTopoFollowPath(system, graph, GPU, d1, NVS, 0, d1 == g ? 2 : 1, &nvs));
d1++;
} while (nvs && d1 < system->nodes[GPU].count);
if (nvs == NULL) {
d1--;
} else { // Both directions worked. Move on to the next path.
NCCLCHECK(ncclTopoSearchRecGpu(system, graph, saveGraph, NULL, ngpus, -1, -1, 0, time));
}
while (d1) {
d1--;
NCCLCHECK(ncclTopoFollowPath(system, graph, GPU, d1, NVS, 0, d1 == g ? -2 : -1, &nvs));
}
}
while (d0) {
d0--;
NCCLCHECK(ncclTopoFollowPath(system, graph, NVS, 0, GPU, d0, d0 == g ? -2 : -1, &gpu));
}
return ncclSuccess;
}
这里就是判断带宽是否满足要求,我们先看下实际机器中GPU和NVSwitch的拓扑如图1所示
但是因为NVSwitch对用户来说是透明的,因此NCCL中构建的拓扑实际下所示
假设现在的搜索条件中带宽为bw,g为GPU0,那么这里搜索一个channel的逻辑是判断所有GPU节点到NVSwitch的双向带宽是否大于bw,如果大于的话,则减去bw,特殊的是GPU0,需要判断现有链路带宽是否大于2 * bw,这个原因后边会介绍。
然后继续执行ncclTopoSearchRecGpu,注意这里step指定为了ngpus,所以就完成了一个channel的搜索,nChannels变为1,那么下次执行ncclTopoSearchTryGpu的时候将从GPU1开始,重复这一过程知道搜索到了ngpus个channel,以四卡为例,搜索出的channel如下所示
0
1
2
3
其他算法的channel表示了节点内的传输顺序,而nvls搜索出来的channel不太一样,比如第一个channel里的0,nccl称为nvlsHead,他用来表示某一段内存的reduce之类的工作由谁来负责,后边我们会看到。
channel连接
static ncclResult_t connectNvls(struct ncclComm* comm, int* nvlsHeads, struct ncclTopoGraph* nvlsGraph) {
int nHeads = nvlsGraph->nChannels;
int headRank = -1;
for (int h=0; h<nHeads; h++) {
if (nvlsGraph->intra[h*comm->localRanks] == comm->rank) headRank = h;
}
for (int c=0; c<comm->nvlsChannels; c++) {
struct ncclChannel* channel = comm->channels+c;
channel->nvls.nHeads = nHeads;
for (int h=0; h<nHeads; h++) channel->nvls.up[h] = comm->nRanks+1+h;
for (int h=nHeads; h<NCCL_MAX_NVLS_ARITY; h++) channel->nvls.up[h] = -1;
channel->nvls.down = comm->nRanks+1+headRank;
channel->nvls.out = -1; // NVLS+SHARP not yet implemented.
channel->nvls.headRank = headRank;
channel->nvls.treeUp = channel->nvls.treeDown[0] = channel->nvls.treeDown[1] = channel->nvls.treeDown[2] = -1;
channel->nvls.node = comm->node;
channel->nvls.nNodes = comm->nNodes;
}
if (comm->nNodes == 1) return ncclSuccess;
}
计算headRank,即第几个channel的节点是自己这个rank,然后开始设置所有的nvls channel,这里的up和down用于索引peers,由于从nRanks+1开始才是nvls的链接,所以这里要加上nRanks+1,up是所有的head,down是headRank,其实就是自己。
内存注册
内存注册的整体流程如图4所示,首先通过cuMulticastCreate创建一个multicast对象,图中handle指向这个multicast对象,然后每个GPU通过cuMulticastAddDevice将当前device和这个multicast对象关联起来,然后申请显存,最后通过cuMulticastBindAddr或者cuMulticastBindMem将申请到的显存和handle关联起来。
ncclResult_t ncclNvlsSetup(struct ncclComm* comm, struct ncclComm* parent) {
...
size_t buffSize = comm->buffSizes[NCCL_PROTO_SIMPLE];
size_t memSize = NVLS_MEM_ALIGN_SIZE;
size_t nvlsPerRankSize = nChannels * 2 * (buffSize + memSize);
size_t nvlsTotalSize = nvlsPerRankSize * nHeads;
char* shareableHandle = resources->shareableHandle;
NCCLCHECKGOTO(nvlsGetProperties(comm, resources, dev, comm->localRanks, nvlsTotalSize), res, cleanup);
...
}
buffSize为SIMPLE协议的buff大小,memSize用于保存head,tail,然后计算一共需要分配多少内存,nHeads为搜索出来的channel数量,即nRanks,后边会看到为什么内存大小是这样的,然后将内存总大小和localRanks等信息保存到resources。
ncclResult_t ncclNvlsSetup(struct ncclComm* comm, struct ncclComm* parent) {
...
if (comm->localRank == 0) {
NCCLCHECKGOTO(nvlsGroupCreate(comm, &resources->properties, comm->localRank, comm->localRanks, &resources->mcHandle, shareableHandle), res, cleanup);
NCCLCHECKGOTO(bootstrapIntraNodeBroadcast(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, 0, shareableHandle, NVLS_HANDLE_SIZE), res, cleanup);
} else {
NCCLCHECKGOTO(bootstrapIntraNodeBroadcast(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, 0, shareableHandle, NVLS_HANDLE_SIZE), res, cleanup);
NCCLCHECKGOTO(nvlsGroupConnect(comm, shareableHandle, comm->localRankToRank[0], &resources->mcHandle), res, cleanup);
}
...
}
rank0执行nvlsGroupCreate,通过cuMulticastCreate创建一个multicast对象,保存在resources->mcHandle,由于要跨进程共享,所以需要转成shareable handle,这里是直接memcpy的。
ncclResult_t nvlsGroupCreate(struct ncclComm *comm, CUmulticastObjectProp *prop, int rank, unsigned int nranks, CUmemGenericAllocationHandle *mcHandle, char *shareableHandle) {
size_t size = prop->size;
CUCHECK(cuMulticastCreate(mcHandle, prop));
memcpy(shareableHandle, mcHandle, sizeof(CUmemGenericAllocationHandle));
return ncclSuccess;
}
然后所有rank执行bootstrapIntraNodeBroadcast,rank0将multicast对象的共享handle广播到所有rank的shareableHandle,其他rank在得到shareableHandle之后通过nvlsGroupConnect转成mcHandle。然后通过cuMulticastAddDevice将当前卡bind到mcHandle,这样所有rank就都拿到了mcHandle对应的multicast对象。
ncclResult_t nvlsGroupBindMem(struct ncclComm *comm, struct ncclNvlsSharedRes* resources) {
size_t size = resources->size;
size_t granularity;
CUdeviceptr ptr = 0;
CUmemAllocationProp prop;
memset(&prop, 0, sizeof(prop));
prop.type = CU_MEM_ALLOCATION_TYPE_PINNED;
prop.location.type = CU_MEM_LOCATION_TYPE_DEVICE;
prop.location.id = resources->dev;
prop.requestedHandleTypes = NVLS_CU_MEM_HANDLE_TYPE;
CUCHECK(cuMemGetAllocationGranularity(&granularity, &prop, CU_MEM_ALLOC_GRANULARITY_RECOMMENDED));
resources->ucGran = granularity;
// Map a VA for UC memory
CUCHECK(cuMemAddressReserve(&ptr, size, granularity, 0U, 0));
// Alloc local physical mem for this NVLS group
CUCHECK(cuMemCreate(&resources->ucHandle, size, &prop, 0));
CUCHECK(cuMemMap(ptr, size, 0, resources->ucHandle, 0));
CUCHECK(cuMemSetAccess(ptr, size, &resources->accessDesc, 1));
CUDACHECK(cudaMemset((void*)ptr, 0, size));
resources->ucBuff = (char*)ptr;
CUCHECK(cuMulticastBindMem(resources->mcHandle, 0/*mcOffset*/, resources->ucHandle, 0/*memOffset*/, size, 0/*flags*/));
return ncclSuccess;
}
然后开始分配物理内存并映射到虚拟地址空间,首先预留一段虚拟地址空间到ptr,然后分配物理内存到ucHandle,再将ucHandle指向的物理内存map到ptr,将ptr赋值给ucBuff,如图5所示。
最后通过cuMulticastBindMem将ucHandle对应的物理内存bind到mcHandle。
然后开始执行nvlsGroupMapMem将mcHandle映射到虚拟地址空间。
ncclResult_t nvlsGroupMapMem(struct ncclComm *comm, struct ncclNvlsSharedRes* resources) {
size_t size = resources->size;
CUdeviceptr ptr = 0;
// Create a VA for the NVLS
CUCHECK(cuMemAddressReserve(&ptr, size, resources->granularity, 0U, 0));
// Map the VA locally
CUCHECK(cuMemMap(ptr, size, 0, resources->mcHandle, 0));
resources->mcBuff = (char*)ptr;
INFO(NCCL_NVLS, "NVLS Mapped MC buffer at %p size %zi", resources->mcBuff, size);
// Having completed the BindMem we can now call SetAccess
// NB: It will block until all ranks have bound to the Group
CUCHECK(cuMemSetAccess((CUdeviceptr)resources->mcBuff, size, &resources->accessDesc, 1));
return ncclSuccess;
}
同样的,预留虚拟地址空间空间到ptr,然后将mcHandle映射到ptr,保存在mcBuff。此时如图6所示
此时这块物理内存被映射到了ucBuff和mcBuff,ucBuff是Unicast buffer,对他的访问只会影响到当前device的内存,mcBuff是Multicast buffer,对他的访问将被NVSwitch广播到所有被添加到mcHandle的device。
然后开始将内存记录到各个peer的connection的buff。
ncclResult_t ncclNvlsSetup(struct ncclComm* comm, struct ncclComm* parent) {
...
for (int h = 0; h < nHeads; h++) {
int nvlsPeer = comm->nRanks + 1 + h;
for (int c = 0; c < nChannels; c++) {
struct ncclChannel* channel = comm->channels + c;
char* mem = NULL;
struct ncclChannelPeer* peer = channel->peers[nvlsPeer];
// Reduce UC -> MC
mem = resources->ucBuff + (h * 2 * nChannels + c) * (buffSize + memSize);
peer->send[1].transportComm = &nvlsTransport.send;
peer->send[1].conn.buffs[NCCL_PROTO_SIMPLE] = mem;
peer->send[1].conn.head = (uint64_t*)(mem + buffSize);
peer->send[1].conn.tail = (uint64_t*)(mem + buffSize + memSize / 2);
mem = resources->mcBuff + (h * 2 * nChannels + c) * (buffSize + memSize);
peer->recv[0].transportComm = &nvlsTransport.recv;
peer->recv[0].conn.buffs[NCCL_PROTO_SIMPLE] = mem;
peer->recv[0].conn.head = (uint64_t*)(mem + buffSize);
peer->recv[0].conn.tail = (uint64_t*)(mem + buffSize + memSize / 2);
peer->recv[0].conn.flags |= NCCL_NVLS_MIN_POLL;
// Broadcast MC -> UC
mem = resources->ucBuff + ((h * 2 + 1) * nChannels + c) * (buffSize + memSize);
peer->recv[1].transportComm = &nvlsTransport.recv;
peer->recv[1].conn.buffs[NCCL_PROTO_SIMPLE] = mem;
peer->recv[1].conn.head = (uint64_t*)(mem + buffSize);
peer->recv[1].conn.tail = (uint64_t*)(mem + buffSize + memSize / 2);
mem = resources->mcBuff + ((h * 2 + 1) * nChannels + c) * (buffSize + memSize);
peer->send[0].transportComm = &nvlsTransport.send;
peer->send[0].conn.buffs[NCCL_PROTO_SIMPLE] = mem;
peer->send[0].conn.head = (uint64_t*)(mem + buffSize);
peer->send[0].conn.tail = (uint64_t*)(mem + buffSize + memSize / 2);
peer->send[0].conn.flags |= NCCL_NVLS_MIN_POLL;
CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->send[0], &peer->send[0].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);
CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->recv[0], &peer->recv[0].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);
CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->send[1], &peer->send[1].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);
CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->recv[1], &peer->recv[1].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);
}
}
...
}
这一过程完成后将如图7所示。
图中黄色的为ucBuff,蓝色的为mcBuff,mcBuff即PTX中的multimem,multimem操作如下:
The multimem.* operations operate on multimem addresses and accesses all of the multiple memory locations which the multimem address points to.
以ld_reduce为例:
multimem.ld_reduce{.ldsem}{.scope}{.ss}.op.type d, [a];
假设GPU0对peer[0]->send[1].buff执行multimem.ld_reduce,这将会load GPU0和GPU1对应位置的数据,然后执行reduce,结果保存在d。
以ReduceScatter为例介绍一下kernel流程
2.19版本改动较多,所以在看nvls kernel之前,我们先介绍下新版kernel执行的过程。
kernel的转发
首先看下kernel是如何launch的,以及如何一步步执行到对应的proto,algo等对应的device函数中的。
由于有多种api,reduce类型,数据类型,算法,协议,而kernel个是这些变量笛卡尔积,所以nccl用generate.py生成这些kernel定义,主要生成两个数组,ncclDevKernelForFunc和ncclDevFuncTable,分别为global函数和device函数。
static ncclResult_t scheduleCollTasksToPlan(...) {
...
NCCLCHECK(computeColl(&info, &workFuncIndex, &workElem, &proxyOp));
...
if (!plan->kernelSpecialized) {
plan->kernelFn = ncclDevKernelForFunc[workFuncIndex];
plan->kernelSpecialized = ncclDevKernelForFuncIsSpecialized[workFuncIndex];
}
...
}
enqueue过程通过computeColl计算出workFuncIndex,然后记录下kernelFn为ncclDevKernelForFunc[workFuncIndex],以ReduceScatter sum为例,得到的workFuncIndex为485,查询ncclDevKernelForFunc[485]为ncclDevKernel_ReduceScatter_Sum_f32_RING_LL,那么launch kernel就会执行这一kernel,这里注意下,我们实际用的是SIMPLE协议,但是这个kerne为LL,我们看下如何进一步转发的。
DEFINE_ncclDevKernel(ReduceScatter_Sum_f32_RING_LL, ncclFuncReduceScatter, FuncSum, float, NCCL_ALGO_RING, NCCL_PROTO_LL, 483)
#define DEFINE_ncclDevKernel(suffix, coll, redop, ty, algo, proto, specializedFnId) \
__global__ void ncclDevKernel_##suffix(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) { \
ncclKernelMain<specializedFnId, RunWork<coll, ty, redop<ty>, algo, proto>>(comm, channelMask, workHead); \
}
ncclDevKernel_ReduceScatter_Sum_f32_RING_LL这个函数定义如上,specializedFnId为483,然后直接执行ncclKernelMain。
在看ncclKernelMain之前我们先看下现有参数信息存放。
__shared__ ncclShmemData ncclShmem;
struct ncclShmemGroup {
ncclConnInfo *recvConns[NCCL_MAX_NVLS_ARITY];
ncclConnInfo *sendConns[NCCL_MAX_NVLS_ARITY];
void* srcs[NCCL_MAX_NVLS_ARITY+1];
void* dsts[NCCL_MAX_NVLS_ARITY+1];
union {
unpackGroupShmem unpack;
} devicePlugin;
};
struct ncclShmemData {
struct ncclShmemGroup groups[NCCL_MAX_GROUPS];
uint64_t redOpArgs[NCCL_MAX_NVLS_ARITY+1];
int channelId;
int aborted;
alignas(16) struct ncclDevComm comm;
alignas(16) struct ncclDevChannel channel;
alignas(16) struct ncclWork work;
alignas(16) union {
unpackShmem unpack;
} devicePlugin;
};
ncclShmem位于共享内存,存储了kernel需要的参数信息,比如channelId,comm,channel等,一个block中的线程都会使用这些信息用于收发数据。之前版本中一个block所有线程的peer是一样的,而新版中不同线程可能会对应不同的peer,比如send/recv,一个block可以收发8个peer,再比如本节介绍的nvls,一个block中不同warp使用流水线的方式完成整体流程,因此引入了数据结构ncclShmemGroup groups,一个group表示执行相同逻辑的线程组,group所需要的conn,srcs,dsts等信息存储在groups中。
template<int SpecializedFnId, typename SpecializedRunWork>
__device__ void ncclKernelMain(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) {
int tid = threadIdx.x;
if (tid < WARP_SIZE) {
int x = tid;
if (channelMask & (1ull<<x)) {
int y = __popcll(channelMask & ((1ull<<x)-1));
if (blockIdx.x == y) ncclShmem.channelId = x;
}
...
}
__syncthreads(); // publish ncclShmem.channelId
int channelId = ncclShmem.channelId;
...
}
选择block和channel的对应关系,就是计算当前block应该处理channel。
template<int SpecializedFnId, typename SpecializedRunWork>
__device__ void ncclKernelMain(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) {
...
while (true) {
// Notify host that all fifo reads are complete.
...
if (0 <= SpecializedFnId && ncclShmem.work.header.funcIndex == (unsigned)SpecializedFnId) {
SpecializedRunWork().run(&ncclShmem.work);
} else {
ncclDevFuncTable[ncclShmem.work.header.funcIndex]();
}
int workIxNext = ncclShmem.work.header.workNext;
__syncthreads();
...
}
...
}
funcIndex为485,而SpecializedFnId为483,因此会再次去ncclDevFuncTable中找funcIndex对应的函数,函数为ncclDevFunc_ReduceScatter_Sum_f32_RING_SIMPLE,这样就找到了需要执行的函数。
DEFINE_ncclDevFunc(ReduceScatter_Sum_f32_RING_SIMPLE, ncclFuncReduceScatter, FuncSum, float, NCCL_ALGO_RING, NCCL_PROTO_SIMPLE)
#define DEFINE_ncclDevFunc(suffix, coll, redop, ty, algo, proto) \
__device__ void ncclDevFunc_##suffix() { \
RunWork<coll, ty, redop<ty>, algo, proto>().run(&ncclShmem.work); \
}
template<ncclFunc_t Fn, typename T, typename RedOp, int Algo, int Proto>
struct RunWork {
// This __forceinline__ is necessary. The compiler was inserting a function call
// here from the LL ncclKernel.
__device__ __forceinline__ void run(ncclWork *w) {
int wid = threadIdx.x / WARP_SIZE;
ncclWorkElem* we = w->header.type == ncclWorkTypeRegColl ? &w->regElems[0].elem : &w->elems[0];
int stride = w->header.type == ncclWorkTypeRegColl ? sizeof(ncclWorkElemReg) : sizeof(ncclWorkElem);
#pragma unroll 1
while ((char*)we + stride <= (char*)(w+1) && we->isUsed) {
if (wid < we->nWarps) {
RunWorkElement<Fn, T, RedOp, Algo, Proto>().run(we);
}
we = (ncclWorkElem*)((char*)we + stride);
}
}
};
我们看下这个函数的定义,可以得到,Fn为ncclFuncReduceScatter
,T为float
,RedOp为FuncSum<float>
,algo为NCCL_ALGO_RING
,协议为NCCL_PROTO_SIMPLE
,然后开始执行runRing
template<typename T, typename RedOp, typename Proto>
__device__ __forceinline__ void runRing(ncclWorkElem *args) {
...
const ssize_t loopSize = nChannels*chunkSize;
const ssize_t size = args->count;
Primitives<T, RedOp, FanSymmetric<1>, 0, Proto, 0>
prims(tid, nthreads, &ring->prev, &ring->next, args->sendbuff, args->recvbuff, args->redOpArg);
for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
ssize_t realChunkSize;
...
realChunkSize = int(realChunkSize);
ssize_t chunkOffset = gridOffset + bid*int(realChunkSize);
/// begin ReduceScatter steps ///
ssize_t offset;
int nelem = min(realChunkSize, size-chunkOffset);
int rankDest;
// step 0: push data to next GPU
rankDest = ringRanks[nranks-1];
offset = chunkOffset + rankDest * size;
prims.send(offset, nelem);
第一步,执行自己rank对应的block数据的send,send就是将数据发送到下一个rank的buffer。
// k-2 steps: reduce and copy to next GPU
for (int j=2; j<nranks; ++j) {
rankDest = ringRanks[nranks-j];
offset = chunkOffset + rankDest * size;
prims.recvReduceSend(offset, nelem);
}
然后接下来的nranks - 2次步骤,执行recvReduceSend,就是将前一个rank发送到自己buffer中的数据和自己的用户输入数据中对应位置执行reduce,然后发送给下一个rank。
// step k-1: reduce this buffer and data, which will produce the final result
rankDest = ringRanks[0];
offset = chunkOffset + rankDest * size;
prims.recvReduceCopy(offset, chunkOffset, nelem, /*postOp=*/true);
}
}
}
最后一次执行recvReduceCopy,就是将前一个rank发送过来的数据和自己用户输入中对应位置执行reduce,并拷贝到用户输出。
primitive初始化
回顾下ReduceScatter中的primitives的构造,recvPeers为ring中的前一个rank,sendPeers为ring中下一个rank,inputBuf和outputBuf为用户执行api提供的输入输出buff,group为默认参数0。redOpArg用于比如mean的操作,会被设置为nranks,在reduceCopy的时候会除以nranks,本例为sum操作,可以忽略redOpArg。
Primitives<T, RedOp, FanSymmetric<1>, 0, Proto, 0>
prims(tid, nthreads, &ring->prev, &ring->next, args->sendbuff, args->recvbuff, args->redOpArg);
__device__ Primitives(
int tid, int nthreads, int const *recvPeers, int const *sendPeers,
void const *inputBuf, void *outputBuf, uint64_t redOpArg, uint8_t group=0,
uint8_t connIndexRecv = 0, uint8_t connIndexSend = 0, struct ncclWorkElem* e = nullptr, int stepSize_=0
):
tid(tid), nthreads(nthreads), tidInBlock(threadIdx.x), group(group),
stepSize(stepSize_ == 0 ? ncclShmem.comm.buffSizes[NCCL_PROTO_SIMPLE]/NCCL_STEPS/sizeof(T) : stepSize_) {
}
模板参数中Direct和P2p为0,Fan为FanSymmetric<1>,作用是记录有几个recv和send,此时MaxRecv和MaxArity均为1。
template<int MaxArity>
struct FanSymmetric {
static constexpr int MaxRecv = MaxArity, MaxSend = MaxArity;
int n;
FanSymmetric() = default;
__device__ FanSymmetric(int nrecv, int nsend): n(nrecv) {
// assert(nrecv == nsend && nrecv <= MaxArity);
}
__device__ int nrecv() const { return n; }
__device__ int nsend() const { return n; }
};
然后继续看初始化过程
__device__ Primitives(...)
// For send operations, we need an extra warp to overlap the threadfence and the copy
this->nworkers = nthreads - (MaxSend > 0 && nthreads-WARP_SIZE >= 64 ? WARP_SIZE : 0);
int nrecv=0, nsend=0;
while (nrecv < MaxRecv && recvPeers[nrecv] != -1) nrecv++;
while (nsend < MaxSend && sendPeers[nsend] != -1) nsend++;
this->fan = Fan(nrecv, nsend);
constexpr int ThreadPerSync = 8;
static_assert(MaxSend <= ThreadPerSync && MaxRecv <= ThreadPerSync, "Not enough threads to cover all peers");
int g = tid / ThreadPerSync;
int ng = nthreads / ThreadPerSync;
index = tid % ThreadPerSync;
flags = 0;
if (g == 0) {
if (index < nrecv) flags |= RoleWaitRecv;
if (index == nrecv) flags |= RoleInput;
} else if (g == 1) {
if (index < nsend) flags |= RoleWaitSend;
if (index == nsend) flags |= RoleOutput;
} else if (g == ng - 2) {
if (index < nrecv) flags |= RolePostRecv;
} else if (g == ng - 1) {
if (index < nsend) flags |= RolePostSend;
}
...
}
nthreads为执行的总线程数,本例子中等于block中的线程数,nworkers是实际干活的线程数,由于发送的时候需要一个warp执行threadfence,因此实际干活的线程数为nthreads减去一个warp,不过当总的warp数少的时候就不会使用独立的同步warp。记录nsend和nrecv,此时均为1。
然后开始设置每个线程的role,将nthreads按照8分成多个小组,假设一共有n-1个组,假设recvPeer和sendPeer都有两个,那么各个线程的role分配如图8所示,其中WaitRecv表示这个线程负责等待直到fifo中有数据可以接收,本例中g[0]的thr[0]负责等待第0个recvPeer,thr[1]负责第1个recvPeer,Input线程负责写入用户buff的地址,PostRecv负责在接收到数据后通知recvPeer,g[n-2]的thr[0]负责通知第0个recvPeer,thr[1]负责通知第1个recvPeer,同理对于send。
__device__ __forceinline__ void loadRecvConn(ncclDevChannelPeer *peer, int connIndex, struct ncclWorkElem* e) {
if (flags & (RoleWaitRecv|RolePostRecv)) {
auto *conn = &peer->recv[connIndex];
step = conn->step;
step = roundUp(step, SlicePerChunk*StepPerSlice);
if (flags & RolePostRecv) {
connStepPtr = conn->head;
*connStepPtr = step; // Return credits in case we rounded up.
}
if (flags & RoleWaitRecv) {
ncclShmem.groups[group].recvConns[index] = conn; // WaitRecv role saves since that's who needs it in setDataPtrs()
flags |= (conn->flags & NCCL_NVLS_MIN_POLL) ? NvlsMinPolling : 0;
connStepPtr = conn->tail;
connStepCache = loadStepValue(connStepPtr);
flags |= (conn->offsFifo != nullptr) ? OffsFifoEnabled : 0;
if (Direct) {
...
}
if (flags & OffsFifoEnabled)
connOffsFifoPtr = conn->offsFifo;
connEltsFifo = (T*)conn->buffs[NCCL_PROTO_SIMPLE];
}
}
}
只有RoleWaitRecv和RolePostRecv的线程才会执行loadRecvConn,读取step,之前章节中介绍过,step表示在fifo中的位置;RolePostRecv线程负责通知recvPeer,所以需要保存conn中的head指针到connStepPtr,RoleWaitRecv线程负责等待直到fifo中有新的数据,因此需要保存conn中的tail指针到connStepPtr,并将内容cache到connStepCache,以避免频繁的global mem读取,最后将conn->buff,即fifo,记录到connEltsFifo中。
__device__ __forceinline__ void loadSendConn(ncclDevChannelPeer *peer, int connIndex, struct ncclWorkElem* e) {
if (flags & (RoleWaitSend|RolePostSend)) {
auto *conn = &peer->send[connIndex];
step = conn->step;
step = roundUp(step, SlicePerChunk*StepPerSlice);
if (flags & RolePostSend) {
connStepPtr = conn->tail;
connEltsFifo = (T*)conn->buffs[NCCL_PROTO_SIMPLE];
}
if (flags & RoleWaitSend) {
ncclShmem.groups[group].sendConns[index] = conn; // WaitSend role saves since that's who needs it in setDataPtrs()
flags |= (conn->flags & NCCL_NVLS_MIN_POLL) ? NvlsMinPolling : 0;
connStepPtr = conn->head;
connStepCache = loadStepValue(connStepPtr);
flags |= (conn->offsFifo != nullptr) ? OffsFifoEnabled : 0;
if (flags & OffsFifoEnabled)
connOffsFifoPtr = conn->offsFifo;
connEltsFifo = (T*)conn->buffs[NCCL_PROTO_SIMPLE];
...
}
}
}
loadSendConn逻辑一样,RolePostSend线程负责通知send peer,因此持有tail指针,RoleWaitSend线程负责等待send peer,因此持有head指针,然后记录fifo。
最后是执行setDataPtrs,设置userBuff为用户的输入输出。
__device__ void setDataPtrs(void const *inputBuf, void *outputBuf, uint64_t redOpArg, struct ncclWorkElemReg* e) {
if (flags & RoleInput) {
userBuff = (T*)inputBuf;
ncclShmem.redOpArgs[0] = redOpArg; // scaler for local input
}
if (flags & RoleOutput) userBuff = (T*)outputBuf;
...
}
到这里就完成了初始化。
recvReduceSend
__device__ __forceinline__ void recvReduceSend(intptr_t inpIx, int eltN, bool postOp=false) {
genericOp<0, 0, 1, 1, Input, -1>(inpIx, -1, eltN, postOp);
}
template <int DirectRecv1, int DirectSend1, int Recv, int Send, int SrcBuf, int DstBuf>
__device__ __forceinline__ void genericOp(
intptr_t srcIx, intptr_t dstIx, int nelem, bool postOp
) {
constexpr int DirectRecv = 1 && Direct && DirectRecv1;
constexpr int DirectSend = 1 && Direct && DirectSend1;
constexpr int Src = SrcBuf != -1;
constexpr int Dst = DstBuf != -1;
nelem = nelem < 0 ? 0 : nelem;
int sliceSize = stepSize*StepPerSlice;
sliceSize = max(divUp(nelem, 16*SlicePerChunk)*16, sliceSize/32);
int slice = 0;
int offset = 0;
...
}
模板参数中Recv表示是否需要执行recv,Send表示是否执行Send,SrcBuf表示输入中是否有用户的src buff,DstBuf表示输出中是否包含了用户的dst buff。然后计算得到DirectRecv和DirectSend为0,Src为1,Dst为0。
template <int DirectRecv1, int DirectSend1, int Recv, int Send, int SrcBuf, int DstBuf>
__device__ __forceinline__ void genericOp(
intptr_t srcIx, intptr_t dstIx, int nelem, bool postOp
) {
...
if (tid < nworkers && offset < nelem) {
do {
sliceSize = sliceSize < nelem-offset ? sliceSize : nelem-offset;
if (Src && (flags & (SrcBuf==Input ? RoleInput : RoleOutput)))
ncclShmem.groups[group].srcs[0] = userBuff + srcIx + offset;
if (Dst && (flags & (DstBuf==Input ? RoleInput : RoleOutput)))
ncclShmem.groups[group].dsts[0] = userBuff + dstIx + offset;
waitPeer<DirectRecv, DirectSend, Recv, Send, Src, Dst>(srcIx, dstIx, offset, sliceSize);
...
} while (slice < SlicePerChunk && offset < nelem);
}
...
}
2.7.8中负责数据收发的工作线程和负责同步的线程都在一个循环里,会引入很多分支指令影响性能,新版中将逻辑拆分成了两个循环,以提高性能,工作线程执行第一个循环,同步线程执行第二个循环。
RoleInput线程将用户buff填入到srcs[0],然后执行waitPeer,waitPeer函数就是之前的waitSend和waitRecv,会等待直到可以发送和接收数据,并将数据地址填入到srcs和dsts。
template <int DirectRecv, int DirectSend, int Recv, int Send, int Src, int Dst>
__device__ __forceinline__ void waitPeer(intptr_t srcIx, intptr_t dstIx, int offset, int nelts) {
const bool isSendNotRecv = (Send && Recv) ? (flags & RoleWaitSend) : Send;
const bool noRecvWait = DirectRecv && Src && (flags & DirectRead); // no wait when directly reading from remote input
const bool noSendWait = DirectSend && (flags & (DirectRead|DirectWrite)); // no wait in empty send (e.g. directScatter) or direct remote write
if (((flags & (Recv*RoleWaitRecv)) && !noRecvWait) ||
((flags & (Send*RoleWaitSend)) && !noSendWait)) {
int spins = 0;
while (connStepCache + (isSendNotRecv ? NCCL_STEPS : 0) < step + StepPerSlice) {
connStepCache = loadStepValue(connStepPtr);
if (checkAbort(spins)) break;
}
}
...
}
noRecvWait和noSendWait都为0,RoleWaitSend线程的isSendNotRecv为1,由于持有的connStepPtr是head指针,所以他判断等待的逻辑是如果head指针加上队列容量小于step + StepPerSlice,那么不能执行send,否则会超过队列容量,因此循环等待;而RoleWaitRecv线程的isSendNotRecv为0,由于持有的connStepPtr是tail指针,所以他判断等待逻辑是如果step + StepPerSlice超过了队尾指针,说明队列中已经没有数据了,那么就需要等待。
template <int DirectRecv, int DirectSend, int Recv, int Send, int Src, int Dst>
__device__ __forceinline__ void waitPeer(intptr_t srcIx, intptr_t dstIx, int offset, int nelts) {
...
if (flags & (Recv*RoleWaitRecv | Send*RoleWaitSend)) {
if (isSendNotRecv && (flags & SizesFifoEnabled))
connSizesFifoPtr[step%NCCL_STEPS] = nelts*sizeof(T);
void **ptrs = isSendNotRecv ? (ncclShmem.groups[group].dsts + Dst)
: (ncclShmem.groups[group].srcs + Src);
if (flags & OffsFifoEnabled)
else if (isSendNotRecv && DirectSend) {
} else if (!isSendNotRecv && DirectRecv) {
}
else {
ptrs[index] = connEltsFifo + (step%NCCL_STEPS)*stepSize;
}
step += StepPerSlice;
}
}
然后开始填充srcs和dsts数组,就是将自己持有fifo对应的slot填进去,然后更新step。所以对于recvReduceSend,srcs[0]为用户buff,srcs[1]为前一个ran的fifo,dsts[0]为下一个rank的fifo,因此就能达到前边描述的作用,接收前一个rank的数据,和用户的输入buff执行reduce,然后发送给下一个rank。
template <int DirectRecv1, int DirectSend1, int Recv, int Send, int SrcBuf, int DstBuf>
__device__ __forceinline__ void genericOp(
intptr_t srcIx, intptr_t dstIx, int nelem, bool postOp
) {
...
if (tid < nworkers && offset < nelem) {
do {
...
subBarrier();
int workSize = ncclShmem.aborted ? 0 : sliceSize;
if (DirectRecv && ncclShmem.groups[group].srcs[0] == ncclShmem.groups[group].dsts[0]
} else if (DirectSend && !DirectRecv && SrcBuf != Input && ncclShmem.groups[group].dsts[Dst] == nullptr) {
} else {
constexpr int PreOpSrcs = SrcBuf != Input ? 0 :
DirectRecv*MaxRecv == NCCL_MAX_DIRECT_ARITY ? (1+NCCL_MAX_DIRECT_ARITY) : 1;
reduceCopy<Unroll, RedOp, T,
MultimemSrcs, Recv+Src, Recv*MaxRecv+Src,
MultimemDsts, Send+Dst, Send*MaxSend+Dst, PreOpSrcs>
(tid, nworkers, ncclShmem.redOpArgs[0], ncclShmem.redOpArgs, postOp,
Recv*fan.nrecv()+Src, ncclShmem.groups[group].srcs,
Send*fan.nsend()+Dst, ncclShmem.groups[group].dsts,
workSize);
}
barrier(); // This barrier has a counterpart in following loop
postPeer<Recv, Send>(0 < sliceSize);
offset += sliceSize;
slice += 1;
} while (slice < SlicePerChunk && offset < nelem);
}
while (slice < SlicePerChunk) {
sliceSize = sliceSize < nelem-offset ? sliceSize : nelem-offset;
barrier(); // Has couterpart in preceding worker-only loop.
postPeer<Recv, Send>(0 < sliceSize);
offset += sliceSize;
slice += 1;
}
}
waitPeer完成之后,说明可以执行数据收发了,这里先执行subBarrier(),作用是同步一下所有的工作线程,保证在waitPeer完成之后才进入收发数据的逻辑。然后执行reduceCopy将数据从srcs完成reduce并拷贝到dsts。然后执行barrier(),barrier所有的线程,即工作线程加同步线程,因为同步线程只有在等到数据收发结束才能开始post,然后看下同步线程执行的postPeer。
template<int Recv, int Send>
inline __device__ void postPeer(bool dataStored) {
if (flags & (Recv*RolePostRecv | Send*RolePostSend)) {
step += StepPerSlice;
if (Send && (flags & RolePostSend) && dataStored) fence_acq_rel_sys();
st_relaxed_sys_global(connStepPtr, step);
}
}
RolePost类线程需要更新step,然后将step写入到connStepPtr,对于RolePostRecv,持有的是head指针,直接接入就好;对于RolePostSend,持有的是tail指针,为了保证先完成数据的写之后再完成post,需要加一个fence,这里使用了acq_rel的屏障,其实这个场景使用release语义就是足够的,不过查了一下PTX,好像没有单独的release语义指令。对于读数据的场景,也是需要配对使用读屏障,但是nccl的实现使用的是volatile,这样可以bypass L1 cache,因此不需要使用屏障。
nvls
ReduceScatter kernel
if (tid < tidEndScatter) {
// Scatter
using Proto = ProtoSimple<1, 1, COLL_UNROLL>;
Primitives<T, RedOp, FanAsymmetric<0, NCCL_MAX_NVLS_ARITY>, /*Direct=*/0, Proto, 0>
prims(tid, nThreadsScatter, NULL, nvls->up, args->sendbuff, NULL,
args->redOpArg, 0 * Proto::MaxGroupWidth, 1, 1);
for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
ssize_t offset = gridOffset + bid * chunkSize;
int nelem = min(chunkSize, size - offset);
prims.scatter(offset, nvls->nHeads * size, nelem, size, -1, 0);
}
}
scatter线程会通过prim执行scatter操作,sendPeers为up,因此就是所有的rank,inputBuf为用户的输入args->sendbuff,connIndexSend为1,因此load第1个send conn。
__device__ __forceinline__ void
scatter(intptr_t inpIx, ssize_t totalElem, int peerElem, ssize_t peerOffset, int skip, int shift) {
ScatterGatherOp<0, 0, 0, 1>(inpIx, -1, totalElem, peerElem, peerOffset, skip, shift, /*postOp=*/false);
}
template <int DirectRecv1, int DirectSend1, int Recv, int Send>
__device__ __forceinline__ void
ScatterGatherOp(intptr_t inpIx, intptr_t outIx, ssize_t totalElem, int peerElem, ssize_t peerOffset, int skip, int shift, bool postOp) {
constexpr int DirectRecv = 1 && Direct && DirectRecv1;
constexpr int DirectSend = 1 && Direct && DirectSend1;
int offset = 0; // slice offset
int sliceSize = stepSize*StepPerSlice;
int dataSize = max(DIVUP(peerElem, 16*SlicePerChunk)*16, sliceSize/32); // per-peer slice size
#pragma unroll
for (int slice=0; slice<SlicePerChunk; ++slice) {
ssize_t realSize = max(0, min(dataSize, peerElem-offset));
bool fenceNeeded = false;
if (tid < nworkers) {
if (Send) {
// Scatter pre-scales data of input buffer only in non-Direct case
constexpr int PreOpSrcs = DirectSend ? 0 : 1;
if (flags & RoleInput) ncclShmem.groups[group].srcs[0] = userBuff + inpIx + offset;
// realSize is not accurate here; but intra-node does not rely on sizes FIFO
waitPeer<0, DirectSend, 0, 1, 1, 0>(0, inpIx, offset, realSize);
subBarrier();
#pragma unroll
// Loop over peers
for (int j=0; j<fan.nsend(); j++) {
int i = (j+shift)%fan.nsend();
ssize_t pOffset = i*peerOffset;
// Skip the data I am responsible of reducing myself
if (skip >= 0 && i >= skip) pOffset += peerElem;
void* src0 = (T*)ncclShmem.groups[group].srcs[0] + pOffset;
ssize_t realPeerSize = min(realSize, totalElem-pOffset);
if (realPeerSize > 0 && ncclShmem.groups[group].dsts[i] != nullptr) {
reduceCopy<Unroll, RedOp, T, 0,1,1, 0,1,1, PreOpSrcs>(tid, nworkers, ncclShmem.redOpArgs[0], ncclShmem.redOpArgs, false, 1, &src0, 1, ncclShmem.groups[group].dsts+i, realPeerSize);
// Mark for threadfence at the end
fenceNeeded |= true;
}
}
} else if (Recv) {
}
}
fenceNeeded = barrierAny(fenceNeeded);
postPeer<Recv, Send>(fenceNeeded);
offset += realSize;
}
}
如图9所示,scatter做的就是将userBuff的数据按照peerOffset的间隔,发送到所有的sendPeer对应的buff,即图中的peer[0]->send[1].buff和peer[1]->send[1].buff。
对于reduce线程,sendPeers为NULL,recvPeers为nvls->down,connIndexRecv为1,因此load第一个recv conn,然后执行recv。
else if (tid < tidEndReduce) {
// Reduce through NVLS
using Proto = ProtoSimple<1, 1, COLL_UNROLL, 1, 0>;
Primitives<T, RedOp, FanAsymmetric<1, 0>, /*Direct=*/0, Proto, 0>
prims(tid - tidEndScatter, nThreadsReduce, &nvls->down, NULL, NULL, args->recvbuff,
args->redOpArg, 3 * Proto::MaxGroupWidth, 0, 0);
for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
ssize_t offset = gridOffset + bid * chunkSize;
int nelem = min(chunkSize, size - offset);
prims.recv(offset, nelem);
}
}
recv函数中执行会将dst设置为args->recvbuff,src为自己rank对应的Multicast buffer,如图10所示,执行完成之后,GPU0的recvBuff就拿到了所有卡对应值reduce的结果。
reduceCopy kernel
以ReduceScatter过程为例,我们看下reduceCopy kernel如何同时支持Unicast buffer和Multicast buffer的。
template<int Unroll, typename RedFn, typename T,
int MultimemSrcs, int MinSrcs, int MaxSrcs,
int MultimemDsts, int MinDsts, int MaxDsts, int PreOpSrcs,
typename IntBytes>
__device__ __forceinline__ void reduceCopy(
int thread, int nThreads,
uint64_t redArg, uint64_t *preOpArgs, bool postOp,
int nSrcs, void **srcPtrs, int nDsts, void **dstPtrs,
IntBytes nElts
) {
int lane = thread%WARP_SIZE;
// If a multimem src is present then our biggest pack size is limited to what
// is supported for this redfn/type.
constexpr int BigPackSize = (MultimemSrcs == 0) ? 16 : LoadMultimem_BigPackSize<RedFn>::BigPackSize;
IntBytes nBytesBehind = 0;
IntBytes nBytesAhead = nElts*sizeof(T);
#if __cpp_if_constexpr
if constexpr (BigPackSize > sizeof(T)) {
#else
if (BigPackSize > sizeof(T)) {
#endif
// Check that all pointers are BigPackSize aligned.
bool aligned = true;
if (lane < nSrcs) aligned &= 0 == cvta_to_global(srcPtrs[lane]) % (BigPackSize + !BigPackSize);
if (lane < nDsts) aligned &= 0 == cvta_to_global(dstPtrs[lane]) % (BigPackSize + !BigPackSize);
aligned = __all_sync(~0u, aligned);
if (aligned) {
reduceCopyPacks<RedFn, T, Unroll, BigPackSize,
MultimemSrcs, MinSrcs, MaxSrcs, MultimemDsts, MinDsts, MaxDsts, PreOpSrcs>
(nThreads, /*&*/thread, redArg, preOpArgs, postOp,
nSrcs, srcPtrs, nDsts, dstPtrs, /*&*/nBytesBehind, /*&*/nBytesAhead);
if (nBytesAhead == 0) return;
reduceCopyPacks<RedFn, T, /*Unroll=*/1, BigPackSize,
MultimemSrcs, MinSrcs, MaxSrcs, MultimemDsts, MinDsts, MaxDsts, PreOpSrcs>
(nThreads, /*&*/thread, redArg, preOpArgs, postOp,
nSrcs, srcPtrs, nDsts, dstPtrs, /*&*/nBytesBehind, /*&*/nBytesAhead);
if (nBytesAhead == 0) return;
}
}
...
}
模板参数中,MultimemSrcs表示有几个输入是multimem,MultimemDsts表示有几个输出是multimem,入参中thread为tid,nThreads为总的线程数,共有nSrcs个输入,地址位于srcPtrs,共有nDsts个输出,存放于dstPtrs,nElts为元素个数,函数的作用就是将所有的src reduce,然后存到所有的dst。
nBytesBehind表示已经处理过了多少数据,nBytesAhead表示还有多少数据未处理。
然后开始看是否可以用向量化指令,BigPackSize就是load/store指令的粒度,如果输入为非Multimem,那么尝试使用16字节,即128位,如果是multimem,则需要判断Func和数据类型,本例中为FuncSum,因此也是16字节。
template<typename Fn>
struct LoadMultimem_BigPackSize {
using T = typename Fn::EltType;
static constexpr bool IsSum = std::is_same<Fn, FuncSum<T>>::value ||
std::is_same<Fn, FuncPreMulSum<T>>::value ||
std::is_same<Fn, FuncSumPostDiv<T>>::value;
static constexpr bool IsMinMax = std::is_same<Fn, FuncMinMax<T>>::value;
static constexpr bool IsFloat = IsFloatingPoint<T>::value;
static constexpr int BigPackSize =
IsFloat && IsSum && sizeof(T) < 8 ? 16 :
IsFloat && IsSum ? 8 :
IsFloat && IsMinMax && sizeof(T)==2 ? 16 :
!IsFloat && (IsSum||IsMinMax) && sizeof(T)>=4 ? sizeof(T) :
/*multimem.ld_reduce not supported:*/ 0;
};
使用向量化的前提是输入输出有需要是对齐的,以对齐为例看下reduceCopyPacks的逻辑。
template<typename RedFn, typename T, int Unroll, int BytePerPack,
int MultimemSrcs, int MinSrcs, int MaxSrcs,
int MultimemDsts, int MinDsts, int MaxDsts, int PreOpSrcs,
typename IntBytes>
__device__ __forceinline__ void reduceCopyPacks(
int nThreads, int &thread,
uint64_t redArg, uint64_t *preOpArgs, bool postOp,
int nSrcs, void **srcPtrs, int nDsts, void **dstPtrs,
IntBytes &nBytesBehind, IntBytes &nBytesAhead
) {
// A hunk is the amount of contiguous data a warp consumes per loop iteration
// assuming all threads partake.
constexpr int BytePerHunk = Unroll*WARP_SIZE*BytePerPack;
int nWarps = nThreads/WARP_SIZE;
int warp = thread/WARP_SIZE;
int lane = thread%WARP_SIZE;
...
}
BytePerPack就是BigPackSize,即16字节,假设Unroll为4,那么一个warp的访存模式如图11,一个蓝框的长度为32个16字节,BytePerHunk为一个warp一次性处理的连续数据长度,即4个蓝框,warp里的第一个线程会访问箭头指向的4个蓝框里的第一个16字节。
然后初始化各个线程的初始位置
__device__ __forceinline__ void reduceCopyPacks(...) {
// This thread's initial position.
IntBytes threadBytesBehind = nBytesBehind + (warp*BytePerHunk + lane*BytePerPack);
IntBytes threadBytesAhead = nBytesAhead - (warp*BytePerHunk + lane*BytePerPack);
// Number of hunks to be consumed over all warps.
IntBytes nHunksAhead = nBytesAhead/(BytePerHunk + !BytePerHunk);
// Advance collective position.
nBytesBehind += nHunksAhead*BytePerHunk;
nBytesAhead -= nHunksAhead*BytePerHunk;
if (Unroll==1 && BytePerPack <= nBytesAhead) {
// Only Unroll=1 can do partial hunks (where not all threads partake).
nHunksAhead += 1;
nBytesBehind += nBytesAhead - (nBytesAhead%(BytePerPack + !BytePerPack));
nBytesAhead = nBytesAhead%(BytePerPack + !BytePerPack);
}
nHunksAhead -= warp;
RedFn redFn(redArg);
uintptr_t minSrcs[MinSrcs + !MinSrcs];
uintptr_t minDsts[MinDsts + !MinDsts];
#pragma unroll
for (int s=0; s < MinSrcs; s++)
minSrcs[s] = cvta_to_global(srcPtrs[s]) + threadBytesBehind;
#pragma unroll
for (int d=0; d < MinDsts; d++)
minDsts[d] = cvta_to_global(dstPtrs[d]) + threadBytesBehind;
...
}
threadBytesBehind即当前线程的起始位置,threadBytesAhead为当前线程需要处理的数据量,然后将MinSrcs个src和minDsts个dst指针记录到minSrcs和minDsts。
__device__ __forceinline__ void reduceCopyPacks(...) {
...
while (Unroll==1 ? (BytePerPack <= threadBytesAhead) : (0 < nHunksAhead)) {
BytePack<BytePerPack> acc[Unroll];
{ RedFn preFn(0 < PreOpSrcs ? preOpArgs[0] : 0);
#pragma unroll Unroll
for (int u=0; u < Unroll; u++) {
if (0 < MultimemSrcs) {
// applyLoadMultimem uses relaxed semantics for same reason we use volatile below.
acc[u] = applyLoadMultimem<RedFn, BytePerPack>(redFn, minSrcs[0]);
} else {
// Use volatile loads in case credits are polled for with volatile (instead of acquire).
acc[u] = ld_volatile_global<BytePerPack>(minSrcs[0]);
if (0 < PreOpSrcs) acc[u] = applyPreOp(preFn, acc[u]);
}
minSrcs[0] += WARP_SIZE*BytePerPack;
}
}
...
}
BytePack在这个场景下就是16字节,由一个union描述,acc用于存储reduce结果。
template<>
union alignas(16) BytePack<16> {
BytePack<8> half[2];
uint8_t u8[16];
uint16_t u16[8];
uint32_t u32[4];
uint64_t u64[2];
ulong2 ul2, native;
};
然后开始初始化acc,如果输入中没有multimem,那么会通过ld_volatile_global将第一个蓝框的对应的128b load到acc[0],然后循环Unroll,将所有蓝框的对应数据load到acc,然后看下ld_volatile_global。
#define DEFINE_ld_st_16__space(space, addr_cxx_ty, addr_reg_ty) \
template<> \
__device__ __forceinline__ BytePack<16> ld_##space<16>(addr_cxx_ty addr) { \
BytePack<16> ans; \
asm("ld." #space ".v2.b64 {%0,%1}, [%2];" : "=l"(ans.u64[0]), "=l"(ans.u64[1]) : #addr_reg_ty(addr)); \
return ans; \
} \
template<> \
__device__ __forceinline__ BytePack<16> ld_volatile_##space<16>(addr_cxx_ty addr) { \
BytePack<16> ans; \
asm("ld.volatile." #space ".v2.b64 {%0,%1}, [%2];" : "=l"(ans.u64[0]), "=l"(ans.u64[1]) : #addr_reg_ty(addr)); \
return ans; \
} \
template<> \
__device__ __forceinline__ void st_##space<16>(addr_cxx_ty addr, BytePack<16> value) { \
asm("st." #space ".v2.b64 [%0], {%1,%2};" :: #addr_reg_ty(addr), "l"(value.u64[0]), "l"(value.u64[1]) : "memory"); \
}
DEFINE_ld_st_16__space(global, uintptr_t, l)
BytePerPack为16,因此会执行ld_volatile_global<16>,其实就是ld.volatile.global.v2.b64将128b load到ans的u64[0]和u64[1]。
而当输入中包括了multimem之后,会通过applyLoadMultimem将数据load并reduce然后存储到acc;st_global<16>就是将BytePack的u64[0]和u64[1]通过st.global.v2.b64存储到addr。
#define SIZEOF_BytePack_field_u32 4
#define PTX_REG_BytePack_field_u32 "r"
DEFINE_Apply_LoadMultimem_sum_v4(float, f32, u32)
#define DEFINE_Apply_LoadMultimem_sum_v4(T, ptx_ty, pack_field) \
template<> \
struct Apply_LoadMultimem<FuncSum<T>, 4*(SIZEOF_BytePack_field_##pack_field)> { \
static constexpr int PackSize = 4*(SIZEOF_BytePack_field_##pack_field); \
__device__ static BytePack<PackSize> load(FuncSum<T> fn, uintptr_t addr) { \
BytePack<PackSize> ans; \
asm("multimem.ld_reduce.relaxed.sys.global.add.v4." #ptx_ty " {%0,%1,%2,%3}, [%4];" \
: "=" PTX_REG_BytePack_field_##pack_field(ans.pack_field[0]), \
"=" PTX_REG_BytePack_field_##pack_field(ans.pack_field[1]), \
"=" PTX_REG_BytePack_field_##pack_field(ans.pack_field[2]), \
"=" PTX_REG_BytePack_field_##pack_field(ans.pack_field[3]) \
: "l"(addr)); \
return ans; \
} \
};
可以看到用的是multimem.ld_reduce.relaxed.sys.global.add.v4.f32将对4个float都执行了reduce操作结果存到acc。
完成第一个src的读取之后,继续读取其他的src到tmp,然后通过Apply_Reduce 执行reduce操作,Apply_Reduce其实就是将4个float执行elmentwise的求和。
__device__ __forceinline__ void reduceCopyPacks(...) {
...
while (Unroll==1 ? (BytePerPack <= threadBytesAhead) : (0 < nHunksAhead)) {
...
#pragma unroll (MinSrcs-1 + !(MinSrcs-1))
for (int s=1; s < MinSrcs; s++) {
BytePack<BytePerPack> tmp[Unroll];
RedFn preFn(s < PreOpSrcs ? preOpArgs[s] : 0);
#pragma unroll Unroll
for (int u=0; u < Unroll; u++) {
if (s < MultimemSrcs) {
// applyLoadMultimem uses relaxed semantics for same reason we use volatile below.
acc[u] = applyLoadMultimem<RedFn, BytePerPack>(redFn, minSrcs[s]);
} else {
// Use volatile loads in case credits are polled for with volatile (instead of acquire).
tmp[u] = ld_volatile_global<BytePerPack>(minSrcs[s]);
}
minSrcs[s] += WARP_SIZE*BytePerPack;
}
#pragma unroll Unroll
for (int u=0; u < Unroll; u++) {
if (s < PreOpSrcs) tmp[u] = applyPreOp(preFn, tmp[u]);
acc[u] = applyReduce(redFn, acc[u], tmp[u]);
}
}
for (int s=MinSrcs; (MinSrcs < MaxSrcs) && (s < MaxSrcs) && (s < nSrcs); s++) {
uintptr_t src = cvta_to_global(srcPtrs[s]) + threadBytesBehind;
BytePack<BytePerPack> tmp[Unroll];
RedFn preFn(s < PreOpSrcs ? preOpArgs[s] : 0);
#pragma unroll Unroll
for (int u=0; u < Unroll; u++) {
// Use volatile loads in case credits are polled for with volatile (instead of acquire).
tmp[u] = ld_volatile_global<BytePerPack>(src);
src += WARP_SIZE*BytePerPack;
}
#pragma unroll Unroll
for (int u=0; u < Unroll; u++) {
if (s < PreOpSrcs) tmp[u] = applyPreOp(preFn, tmp[u]);
acc[u] = applyReduce(redFn, acc[u], tmp[u]);
}
}
...
}
...
}
template<typename Fn, int EltPerPack>
struct Apply_Reduce {
template<int Size>
__device__ static BytePack<Size> reduce(Fn fn, BytePack<Size> a, BytePack<Size> b) {
a.half[0] = Apply_Reduce<Fn, EltPerPack/2>::reduce(fn, a.half[0], b.half[0]);
a.half[1] = Apply_Reduce<Fn, EltPerPack/2>::reduce(fn, a.half[1], b.half[1]);
return a;
}
};
template<typename T>
struct Apply_Reduce<FuncSum<T>, /*EltPerPack=*/1> {
__device__ static BytePack<sizeof(T)> reduce(FuncSum<T> fn, BytePack<sizeof(T)> a, BytePack<sizeof(T)> b) {
return toPack<T>(fromPack<T>(a) + fromPack<T>(b));
}
};
到这里就拿到了所有输入reduce的结果,然后开始存储到所有的输出dst。
__device__ __forceinline__ void reduceCopyPacks(...) {
...
while (Unroll==1 ? (BytePerPack <= threadBytesAhead) : (0 < nHunksAhead)) {
...
#pragma unroll (MinDsts + !MinDsts)
for (int d=0; d < MinDsts; d++) {
#pragma unroll Unroll
for (int u=0; u < Unroll; u++) {
if (d < MultimemDsts) {
multimem_st_global(minDsts[d], acc[u]);
} else {
st_global<BytePerPack>(minDsts[d], acc[u]);
}
minDsts[d] += WARP_SIZE*BytePerPack;
}
}
for (int d=MinDsts; (MinDsts < MaxDsts) && (d < MaxDsts) && (d < nDsts); d++) {
uintptr_t dst = cvta_to_global(dstPtrs[d]) + threadBytesBehind;
#pragma unroll Unroll
for (int u=0; u < Unroll; u++) {
st_global<BytePerPack>(dst, acc[u]);
dst += WARP_SIZE*BytePerPack;
}
}
}
...
}
到这里就完成了reduceCopy的过程,这里再提一下nvls场景的head,tail等flag都是mcBuff,我们看下waitPeer中如何判断是否需要等待的。
inline __device__ uint64_t loadStepValue(uint64_t* ptr) {
#if __CUDA_ARCH__ >= 900 && CUDART_VERSION >= 12010
if (flags & NvlsMinPolling) {
uint64_t ans;
asm("multimem.ld_reduce.acquire.sys.global.min.u64 %0, [%1];" : "=l"(ans) : "l"(cvta_to_global(ptr)));
return ans;
}
#endif
// volatile is faster than acquire but not as correct. Make sure reduceCopy
// loads data using volatile so it doesn't see stale data in L1.
return ld_volatile_global(ptr);
}
当为nvls场景的时候,flags会有NvlsMinPolling,这里读取使用的是multimem.ld_reduce读取所有peer的step,然后取min,因此知道所有peer都准备好数据时候,才会接收数据,这里使用aquire语义,和postPeer的release配对,保证内存序。
然后看下postPeer
template<int Recv, int Send>
inline __device__ void postPeer(bool dataStored) {
if (flags & (Recv*RolePostRecv | Send*RolePostSend)) {
step += StepPerSlice;
if (Send && (flags & RolePostSend) && dataStored) fence_acq_rel_sys();
st_relaxed_sys_global(connStepPtr, step);
}
}
这里会使用非multimem的指令去写mcBuff,PTX手册中说这是一种未定义的行为,但是官方说对于写操作是可以的。
AllReduce
allreduce kernel主要有三种线程组,scatter线程逻辑如下
using Proto = ProtoSimple<1, 1, COLL_UNROLL>;
Primitives<T, RedOp, FanAsymmetric<0, NCCL_MAX_NVLS_ARITY>, /*Direct=*/0, Proto, 0>
prims(tid, nThreadsScatter, NULL, nvls->up, args->sendbuff, NULL,
args->redOpArg, 0 * Proto::MaxGroupWidth, 1, 1);
for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
ssize_t offset = gridOffset + bid * nvls->nHeads * chunkSize;
int nelem = args->regUsed ? 0 : min(nvls->nHeads * chunkSize, size - offset);
prims.scatter(offset, nelem, chunkSize, chunkSize, -1, 0);
}
和ring allreduce一样,nvls一次循环大小为nranks * chunkSize,即变量nlem或者loopSize,scatter线程负责将sendbuff中的数据发送到nvls->up,connIndexSend为1,因此使用第一个send conn。执行完之后如图12所示。
reduce线程组逻辑如下所示。
else if (tid < tidEndReduce && nvls->headRank != -1) {
if (!hasOut) {
// Reduce, broadcast through NVLS
using Proto = ProtoSimple<1, 1, COLL_UNROLL, 1, 1>;
Primitives<T, RedOp, FanSymmetric<1>, /*Direct=*/1, Proto, 0>
prims(tid - tidEndGather, nThreadsReduce, &nvls->down, &nvls->down, NULL, NULL,
args->redOpArg, 2 * Proto::MaxGroupWidth, 0, 0, args);
for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
ssize_t offset = gridOffset + (bid * nvls->nHeads + nvls->headRank) * chunkSize;
int nelem = min(chunkSize, size - offset);
prims.directRecvDirectSend(offset, offset, nelem);
}
}
...
}
这里MultimemSrcs和MultimemDsts为1,使用第0个conn,因此执行directRecvDirectSend之后效果如图13,GPU 0执行流程如黄色箭头,将两卡的浅黄数据块reduce后得到深黄数据块,然后通过multimem.st将数据广播到两卡,同理GPU 1为绿色箭头,将两卡浅蓝数据块reduce后得到深蓝数据块,此时两卡都有了全局数据。
最后是gather线程负责将到嗯全局数据拷贝到recvbuff。
} else if (tid < tidEndGather) {
// Gather
using Proto = ProtoSimple<1, 1, COLL_UNROLL>;
Primitives<T, RedOp, FanAsymmetric<NCCL_MAX_NVLS_ARITY, 0>, /*Direct=*/0, Proto, 0>
prims(tid - tidEndScatter, nThreadsGather, nvls->up, NULL, NULL, args->recvbuff,
args->redOpArg, 1 * Proto::MaxGroupWidth, 1, 1);
for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
ssize_t offset = gridOffset + bid * nvls->nHeads * chunkSize;
int nelem = args->regUsed ? 0 :min(nvls->nHeads * chunkSize, size - offset);
prims.gather(offset, nelem, chunkSize, chunkSize, -1, 0);
}
gather流程就是ScatterGatherOp执行recv分支,不再赘述,执行后如图14,到这里就完成了allreduce。