当你手头有了支持AVX-512(SIMD)的i9-11900K,你最想做什么?
i9-11900K?现在都14代了,谁还用11代的?
但12代以上就没有AVX-512了!
AVX-512有什么特别之处?有了这个硬件支持,现在你可以做一些“超级酷”的事情,比如“同时把16个32位整数加起来”。
还有什么更酷的呢?比如同时把16个32位整数或者8个64位整数,从小到大或者从大到小进行排序。
另外,既然要搞单指令多数据,那能不能把256个32位整数也(几乎)同时排序呢?更多整数能否尽可能快的排序呢?
这就是刚刚研发的筛子排序法,下面具体讲讲这个方法。
先说明,我们不直接使用AVX-512,而是用c/c++编译器提供的intrinsic指令(长得就像函数一样,但在编译的时候会被替换成相应的机器指令),intrinsic这个词是一个形容词,指的就是“固有的,内在的,本身的”。
查找Intel Intrinsic Guide不难发现如下指令:
_mm512_mask_reduce_max_epu32
_mm512_mask_reduce_min_epu32
这两个指令指的是,512位寄存器当成16个32位(无符号)寄存器,找里面那个最大或者最小的32位无符号整数,给出一个mask(掩码),mask为16位整数,对应16个32位整数,如果对应的位为1,则参与最大最小值的查找,如果为0,则忽略(取最小值的时候忽略项被假定为全1即最大值,取最大值的时候忽略项目被假定为全0即最小值)那个对应的整数。
看看具体的说明文档:
unsigned int _mm512_mask_reduce_min_epu32 (__mmask16 k, __m512i a)
Synopsis
unsigned int _mm512_mask_reduce_min_epu32 (__mmask16 k, __m512i a)
#include <immintrin.h>
Instruction: Sequence
CPUID Flags: AVX512FDescription
Reduce the packed unsigned 32-bit integers in a by maximum using mask k. Returns the minimum of all active elements in a.
Operation
DEFINE REDUCE_MIN(src, len) {
IF len == 2
RETURN (src[31:0] < src[63:32] ? src[31:0] : src[63:32])
FI
len := len / 2
FOR j:= 0 to (len-1)
i := j*32
src[i+31:i] := (src[i+31:i] < src[i+32*len+31:i+32*len] ? src[i+31:i] : src[i+32*len+31:i+32*len])
ENDFOR
RETURN REDUCE_MIN(src[32*len-1:0], len)
}
tmp := a
FOR j := 0 to 16
i := j*32
IF k[j]
tmp[i+31:i] := a[i+31:i]
ELSE
tmp[i+31:i] := 0xFFFFFFFF
FI
ENDFOR
dst[31:0] := REDUCE_MIN(tmp, 16)
把这个指令,和比较指令配合起来,我们就可以找到16个32位整数中的最小值,以及那个最小值所在的位置(最小值可能出现多次)。
一条指令就可以找到最小值,但如何找到它的位置呢?我们通过将这个最小值复制16次,平铺在512位寄存器之中,然后再把这个平铺的结果和原16个32位整数的寄存器内容进行相等比较,并获得掩码,也就是说,那些相等的,在结果掩码中会置1,不相等的清0。
__mmask16 _mm512_mask_cmpeq_epi32_mask (__mmask16 k1, __m512i a, __m512i b)
#include <immintrin.h>
Instruction: vpcmpeqd k {k}, zmm, zmm
CPUID Flags: AVX512F
Description
Compare packed 32-bit integers in a and b for equality, and store the results in mask vector k using zeromask k1 (elements are zeroed out when the corresponding mask bit is not set).
FOR j := 0 to 15
i := j*32
IF k1[j]
k[j] := ( a[i+31:i] == b[i+31:i] ) ? 1 : 0
ELSE
k[j] := 0
FI
ENDFOR
k[MAX:16] := 0
可见这个指令也是需要掩码操作的,输入掩码对应位为0的,不进行比较,或者假定不相等。
有了这两个条件,我们就可以求出16个32为整数构成的512位寄存器中,到底那个或者哪几个32位整数是最小的,以及它们的位置:
bool sieve_get_min(__mmask16 mask, __m512i a, uint32_t& _min, __mmask16& _mask_min) {
if (mask != 0) {
_mask_min = _mm512_mask_cmpeq_epi32_mask(mask,a, _mm512_set1_epi32(
_min = _mm512_mask_reduce_min_epu32(mask, a)));
return true;
}
return false;
}
内嵌的这一表达式 _min = _mm512_mask_reduce_min_epu32(mask, a)
先求出最小值,然后调用函数
_mm512_set1_epi32(
_min = _mm512_mask_reduce_min_epu32(mask, a))
将最小值复制16次,铺满整个512位寄存器。
最后调用函数
_mask_min = _mm512_mask_cmpeq_epi32_mask(mask,a, _mm512_set1_epi32(
_min = _mm512_mask_reduce_min_epu32(mask, a)))
获得最小值出现位置对应的掩码。需要注意的是,如果最初的掩码就是0,那就不需要做任何计算,直接返回失败即可。比如_mask_min==0x82,那么最小值出现在16个32位整数构成的数组中下标为1和7(从0开始)的两个位置上。
现在,我们能确定连续16个32位整数中的最小值,以及这些最小值的分布情况。
然后我们如何对这16个32位整数进行排序呢?
上述给出求最小值的方法,求最大值也是一样的,我们把两者一起求出来,写一个函数完成,
bool sieve_get_min_max(__mmask16 mask, __m512i a, uint32_t& _min, uint32_t& _max, __mmask16& _mask_min, __mmask16& _mask_max) {
if (mask != 0) {
_mask_max = _mm512_cmpeq_epi32_mask(a, _mm512_set1_epi32(
_max = _mm512_mask_reduce_max_epu32(mask, a)));
_mask_min = _mm512_cmpeq_epi32_mask(a, _mm512_set1_epi32(
_min = _mm512_mask_reduce_min_epu32(mask, a)));
return true;
}
return false;
}
然后呢?如何对16个数进行排序?
首先调用这个函数,求出16个数中最大和最小的,但是要注意,求出的结果,可能具有多个最大值或者最小值。对于这种情况,我们有两个选项,一个是多个值,只选其中一个,另一个选项是多个值一起处理。
如果要只选其中一个,那么我们需要一个函数,把获得的掩码进行裁剪,比如只保留最高位或者只保留最低位,
__mmask16 single_bit(int leading_or_trailing, __mmask16 old_mask, __mmask16 mask) {
if (mask == 0) return mask;
unsigned short lz = __lzcnt16(mask);
unsigned short tz = _tzcnt_u16(mask);
unsigned short pc = __popcnt16(mask);
__mmask16 and_mask = mask & old_mask;
unsigned short ac = __popcnt16(and_mask);
if (ac > 1) {
__mmask16 cover = ~((leading_or_trailing
? ((lz == 0 || lz >= 16) ? mask : (1 << (15 - lz)))
: ((tz == 0 || tz >= 16) ? mask : (1 << tz))
));
mask &= cover;
lz = __lzcnt16(mask);
tz = _tzcnt_u16(mask);
pc = __popcnt16(mask);
}
switch (pc) {
case 0:
//return mask;
break;
case 1:
mask = (lz == 0 || lz >= 16) ? mask : (1 << (15 - lz));
break;
default:
//count of 1 >=2
mask = (leading_or_trailing
? ((lz == 0 || lz >= 16) ? mask : (1 << (15 - lz)))
: ((tz == 0 || tz >= 16) ? mask : (1 << tz))
);
break;
}
return mask;
}
这个函数看上去比较复杂,要实现的就是上述的掩码修正。
有了这个基础,就可以进行最后的处理了:一共16个32位整数,第一次我们取最大和最小值各1一个,分别放在结果的最高和最低下标上(15和0),然后修改掩码,让下一次查找和比较不再考虑已经处理过的两个值,然后进行第二次比较和查找,并把结果放在内层的下标上(14和1),然后是(13和2),依次类推,最后只需要进行8次同样的比较和查找并将数值填入对应的高低位置,就完成了16个数值的比较过程。
代码如下(为了加速sieve_get_min的功能已经被合并在其中了),:
__forceinline __m512i sieve_sort32x16(__m512i a, uint32_t* result = nullptr) {
uint32_t buffer[16] = { 0 };
if (result == nullptr) _mm512_store_epi32(result = buffer, a);
__mmask16 mask = 0xffff;
mask &= ~(
single_bit(0, mask, _mm512_cmpeq_epi32_mask(a, _mm512_set1_epi32(result[15] = _mm512_mask_reduce_max_epu32(mask, a))))
| single_bit(1, mask, _mm512_cmpeq_epi32_mask(a, _mm512_set1_epi32(result[0] = _mm512_mask_reduce_min_epu32(mask, a))))
);
mask &= ~(
single_bit(0, mask, _mm512_cmpeq_epi32_mask(a, _mm512_set1_epi32(result[14] = _mm512_mask_reduce_max_epu32(mask, a))))
| single_bit(1, mask, _mm512_cmpeq_epi32_mask(a, _mm512_set1_epi32(result[1] = _mm512_mask_reduce_min_epu32(mask, a))))
);
mask &= ~(
single_bit(0, mask, _mm512_cmpeq_epi32_mask(a, _mm512_set1_epi32(result[13] = _mm512_mask_reduce_max_epu32(mask, a))))
| single_bit(1, mask, _mm512_cmpeq_epi32_mask(a, _mm512_set1_epi32(result[2] = _mm512_mask_reduce_min_epu32(mask, a))))
);
mask &= ~(
single_bit(0, mask, _mm512_cmpeq_epi32_mask(a, _mm512_set1_epi32(result[12] = _mm512_mask_reduce_max_epu32(mask, a))))
| single_bit(1, mask, _mm512_cmpeq_epi32_mask(a, _mm512_set1_epi32(result[3] = _mm512_mask_reduce_min_epu32(mask, a))))
);
mask &= ~(
single_bit(0, mask, _mm512_cmpeq_epi32_mask(a, _mm512_set1_epi32(result[11] = _mm512_mask_reduce_max_epu32(mask, a))))
| single_bit(1, mask, _mm512_cmpeq_epi32_mask(a, _mm512_set1_epi32(result[4] = _mm512_mask_reduce_min_epu32(mask, a))))
);
mask &= ~(
single_bit(0, mask, _mm512_cmpeq_epi32_mask(a, _mm512_set1_epi32(result[10] = _mm512_mask_reduce_max_epu32(mask, a))))
| single_bit(1, mask, _mm512_cmpeq_epi32_mask(a, _mm512_set1_epi32(result[5] = _mm512_mask_reduce_min_epu32(mask, a))))
);
mask &= ~(
single_bit(0, mask, _mm512_cmpeq_epi32_mask(a, _mm512_set1_epi32(result[9] = _mm512_mask_reduce_max_epu32(mask, a))))
| single_bit(1, mask, _mm512_cmpeq_epi32_mask(a, _mm512_set1_epi32(result[6] = _mm512_mask_reduce_min_epu32(mask, a))))
);
mask &= ~(
single_bit(0, mask, _mm512_cmpeq_epi32_mask(a, _mm512_set1_epi32(result[8] = _mm512_mask_reduce_max_epu32(mask, a))))
| single_bit(1, mask, _mm512_cmpeq_epi32_mask(a, _mm512_set1_epi32(result[7] = _mm512_mask_reduce_min_epu32(mask, a))))
);
return _mm512_loadu_epi32(result);
}
观察代码不难发现,这个算法叫做叫筛子算法,是因为,先前被找到的最大和最小值,不再参与下一次的搜索和比较过程,就像被筛掉了一样。由于整个过程都只是用掩码和位运算来重定向数值的输出,所以并无大量数据交换操作,整个算法过程就像一种由组合逻辑电路实现的数据重排过程,没有分支也没有循环,于是不用分支预测所有数据都能预取,显然跑起来是相当快的。
先预览一下代码,本文未完待续。
GitHub - yyl-20020115/SieveSort: New sort algorithm using Intel SIMD-AVX512New sort algorithm using Intel SIMD-AVX512. Contribute to yyl-20020115/SieveSort development by creating an account on GitHub.https://github.com/yyl-20020115/SieveSort/