目录
1 unordered 系列关联式容器
2 哈希介绍
3 闭散列哈希
4 哈希桶
5 封装实现unordered系列set和map
6 位图
7 哈希切割
8 布隆过滤器
1 unordered 系列关联式容器
在学习哈希结构实现之前,我们先学习一下哈希在库里面的一些使用unordered_set 和unorderen_map。这两个容器其实与set和map在功能上几乎相同的,也是为了查找而生的,但是为什么有了set和map之后还要设计出这样的unordered系列的两个容器呢?我们的set和map是用红黑树来实现的,而在早期,确实认为红黑树实现的set和map这两个搜索容器就能够满足我们的需求了,但后续发展过程中,还是发现有些场景用红黑树不够方便,效率不够高,所以在C++11又将哈希结构的搜索容器unordered_set和unorderen_map添加到了stl标准库中。为什么取名为unordered(无序)呢?因为在早期还没有使用哈希结构的时候,人们使用的都是set和map,由于他们的底层是红黑树,那么数据中序遍历出来就是有序的,而哈希结构实现的搜索容器数据遍历出来并不是有序的,为了和有序的set和map做区分,所以干脆就叫做无序的set和map了。
unordered系列的set和map底层也是用的同一个哈希结构的模板,具体实现我们在后面会讲,由于底层结构采用的是哈希桶的结构,所以这两个容器的迭代器丝毫不支持反向迭代器的,只有单项迭代器。
那么既然有了红黑树的set和map,还是要搞一个哈希的set和map,那就说明了用哈希结构实现这两个容器肯定是有它的独特的优势在里面的。我们在真正学习哈希之前可以先来测试一下他们的性能。
测试效率我们可以从插入的效率和查找的效率这两方面来看,而测试的数据我们也可以像红黑树和AVL树的测试一样提供三种测试用例,分别是有序,带有大量重复的随机数和少量重复的随机数。
首先是有序数据的插入和查找,这应该算是搜索树结构的最好的情况了,因为他的插入之后每次都只需要左旋就行了。
在有序的数据中,他们的插入效率是差不多的,但是查找的效率还是 哈希结构的set更胜一筹
再看一下重复较多的数据的插入和查找,这里我们就只测十万个数据,因为随机数总共就只能生成三万多个,数据多了之后重复的数据过多,意义不大
当数据是一些重复较多的随机数时,不管是插入效率还是查找效率,红黑树结构的set都落后于哈希结构的set。
最后测试一下少量重复的随机数的插入和删除
二者的插入效率倒是都差不多,但是查找效率哈希结构的set还是完胜红黑树结构的set。
综合而言,在插入数据方面,哈希结构和红黑树结构差距不大,但是查找方面大部分场景哈希结构的效率还是要比搜索树结构快,等学习了哈希的底层之后我们就能知道,他的查找的时间复杂度已经是一个接近O(1)的效率了。
map的测试就不做了,底层结构是一样的,效率按理来说也是和set的对比一样。同时哈希的map也支持map的方括号重载这样的重要的接口。
但是并不是说红黑树就完全比不上哈希了,我们说过,哈希的set和map遍历出来并不一定是有序的,而map和set则是有序的,这就是哈希的结构带来的一些问题,虽然说不上是缺点,这两种结构各有各的应用场景吗,但是对比下来,在查找和搜索的场景下,哈希结构还是要由于红黑树的,但是在需要数据有序的场景下,哈希结构则并不能很好的支持。
2 哈希介绍
哈希其实本质上就是一种映射,映射的意思就是把你的key值跟对应数据的存储的位置建立一个关联关系,那么只要知道key值,就能根据固定的算法求出该数据所在的位置。
我们以前玩过的映射比如说 计数排序或者计数的数组,这些都是简单的哈希映射,他们适合数据范围小,分布集中的数据,但是如果数据十分离散,这种简易的映射就不能是满足需求了。
我们上面所说的简易的哈希映射,我们使用的是直接定址法,就是取关键字或者数据的某个线性函数作为散列地址(散列就是哈希表,底层是数组),也就是 hash(key)=a key+b 来得到的地址,这种定址方法很简单,但是要求我们事先求出数据的分布范围,也就是要求出a和b。
而还有一种定址方法就是除留余数法,顾名思义,就是用key值与某个数相除的余数作为数据的位置,具体的做法就是: 我们设散列的允许的地址数为 m(也就是一共有m个空间),取一个不大于但是最接近或者等于 m 的质数 p 作为除数,这时候每个key所对应的地址就是hash(key)=key%p。
采用这种定址法,这时候每个数据的位置就跟数据范围无关了, 而是跟自身key值以及除数p有关,当然我们一般就是直接把 m 当作 p 来用,而不追求p一定是质数。
但是这种方法有一个问题,就是不同的 key 值取模出来的地址可能相同,这在我们的直接定址法中是不会出现的。
当不同的key使用除留余数法得出来的地址相同,我们称他们冲突,一般叫做哈希冲突或者哈希碰撞
这时候有两种解决方法:
1 闭散列---- 开放定址法,采用这种方法,如果一个数据使用key求出来的地址已经被别的数据占用了,那么该数据就去找一个其他的位置。找位置有两种方法,一种是线性探测法,线性探测法就是在求出来的地址处逐个往后找,知道某一个位置为空,就存进去,找位置的时候有可能探测到了数组的结尾,这时候无非就是回到数组开头继续往后找。 第二种方法就是二次探测法,这种方法不是一个一个位置探测,而是采用 i 的平方来跳跃式的探测,而且在使用 i 的平方探测时,是一次往后一次往前 这样来回探测,比如先探测原位置的后面第 1^2 的位置,再探测原位置的前面第 2^2的位置,接着再探测原位置的后面第 3^2的位置,以此类推,这种方法的优点就是冲突的数据不会过于集中,导致冲突一大片数据。
线性探测法如图:一共十个空间,那么 hash(key)=key%10
二次探测:
我么可以发现,线性探测的话,如果在某一个范围内冲突严重的话,那么会把该范围及其附近的映射关系全部打乱,而二次探测则是会尽量避免选址在冲突的区域附近,防止冲突集中在同一个区域。
2 开散列---- 哈希桶/拉链法
这种方法避免冲突的做法是: 我们要插入的的数据使用hash函数求出key对应的位置之后,不是直接存放到散列中(哈希表中),而是使用一个单链表挂在映射的位置。这样一来,如果不同的key求出来映射的位置相同,那么也不会发生冲突,直接再新建一个节点挂在该位置的链表上就行了。
在我们学习单链表的时候,就说过单链表一般不单独用来存储数据,而是用作其他复杂数据结构的子结构,哈希桶这里就是一种应用 。
我们把挂在哈希表上
的单链表称为桶
库里面的unordered_set和map就是用桶结构来实现的
那么这样一来,哈希表或者散列里面存的就不是数据了,而是链表的头指针。
同时,我们也发现,上面的链表的插入顺序采用的是头插,因为单链表的头插的效率更高,尾插的话还需要找尾节点再插入。
也正是由于unordered_set和unordered_map的底层是哈希桶,导致他们的迭代器只能加加,是一个单项迭代器。
同时,哈希桶结构也会有一个问题就是,当一个位置的冲突很多时,那么挂的单链表就会很长,效率就会下降。
那么为了减少冲突,两种方案都有对应的扩容的逻辑,是否扩容则是由一个叫做负载因子的值决定的。
负载因子=存的数据个数 / 哈希表的空间个数
当我们的负载因子很小时,说明存的数据个数要比开的哈希表的空间少得多,那么这时候空间的利用率不是很高,但是插入数据时发生冲突的概率就会很小。
当我们的负载因子很大也就是接近 1 时,说明存的数据个数接近开的哈希表的空间的总个数了,说明空间利用率很高,但是新插入数据的时候发生冲突的概率很大。
于是我们就需要将哈希表的负载因子控制在一定范围内,也就是需要一个标定的最大负载因子,当哈希表的负载因子大于等于最大负载因子时,哈希表就需要扩容,至于扩容的逻辑我们在实现的时候再讲。
3 闭散列哈希
闭散列哈希就是哈希表中直接存数据,冲突的话就采用线性探测或者二次探测去找新的位置存储,那么它实现起来就不难。
首先哈希表应该要是一个数组,我么可以直接封装 vector 来实现哈希表。
那么我们首先要搞清楚一个概念: 哈希表的空间和数据的个数分别对应vector的哪个值?
我们哈希表的插入是开好的空间,然后进行数据的插入。它实际上并不是一个push的过程,而是相当于一个覆盖,也就是把 vector 中为空的项覆盖为我们的数据。
那么就显而易见了,vector的size就是我们哈希表开好的空间 m ,那么哈希表中存的数据的个数我们就无从得知了,所以我们在哈希中要用一个变量来保存数据的个数。
同时由于我们既要实现插入,也要实现删除,那么删除是惰性删除,还是真正的把空间释放呢? 当然是惰性删除,在哈希表中删除一个数据不能影响整个表,只是将该数据的位置标记为删除,因为如果要释放空间的话就只能整个vector都释放,要不就需要挪动数据,而挪动数据则会影响我们的映射关系,导致映射关系全部移动了,所以我们必须采用惰性删除, 等待被新的数据映射进来然后覆盖。
那么我们如何标识每一个位置是存有数据、无数据、还是曾经有数据但是被删除了呢?还是说无数据和删除归为一类呢?都可以,那么我们就直接使用两个状态,无数据和有数据来标识。
//状态位
enum Flag
{
EMPTY,
EXIST,
};
//数据
template<typename K,typename V>
struct Data
{
K _key;
V _val;
Flag _flag=EMPTY; //默认生成的构造,会把flag初始化为空
};
//哈希
template<typename K,typename V>
class Hash
{
typedef Data<K, V> Data;
private:
vector<Data> _table;
size_t _n;
};
首先插入数据就是先求出映射的位置,然后看是否有冲突,视情况要不要探测。那么基础的insert的逻辑如下
void insert(const K& key, const V& val)
{
//首先求出key映射的位置
size_t index = key % _table.size();
//采用线性探测
while (_table[index % _table.size()]._flag != EMPTY)//向后探测直到空
{
index++;
}
_table[index % _table.size()]._key = key;
_table[index % _table.size()]._val = val;
_table[index % _table.size()]._flag = EXIST;
++_n; //计数器加加
}
但是,在上面的这段代码中,我们的程序是可能会陷入死循环的,因为我们并没有控制负载因子,可能会导致我们的哈希表的所有位置都有数据,而导致while循环无法结束。
同时,为了减少冲突的概率,我们是要保持哈希表中时刻都有一部分空间是空余出来的,为了防止后续的插入数据发生哈希冲突,也就是需要采用以空间换时间的策略。那么我们可以把负载因子控制在 0.7 左右,也就是每次插入之前我们需要判断一下负载因子是否达到了上限,如果达到了,我们就需要扩容。
那么现在的问题就是扩容的逻辑了,扩容我们是直接对vector进行resize吗? 当然不是,因为扩容之后,所有的数据都要重新计算映射的位置,扩容就是为了减少冲突的,而在扩容之后,原本冲突的数据扩完容之后可能就不再冲突了,而原本不冲突的数据扩容之后还是不冲突。如图:
那么扩容我们就需要再开一个哈希表,然后将旧表的数据一个一个插入到新的表中,最后交换两个表就行了。于此同时,我们就需要考虑另外一个问题了,就是初始情况,我们的表的大小是0,所以我们最好也判断一下size是否为0,给一个初始值。
template<typename K,typename V>
struct KeyOfData
{
K operator()(Data<K, V>& d) { return d._key; }
};
if (_table.size() == 0) //初始
_table.resize(INIT_SIZE);
if (_n >= _table.size() * 0.7) //负载因子超过0.7,需要扩容
{
vector<Data> newtable;
newtable.resize(_table.size()*2);
//遍历旧表,将数据插入到新表中
for (auto& d : _table)
{
if (d._flag != EMPTY)
{
//那么这里我们还需要一个将key从data中提取出来的仿函数
size_t index = KeyOfData()(d) % newtable.size();
while (newtable[index % newtable.size()]._flag != EMPTY)
{
index++;
}
newtable[index % newtable.size()] = d;
}
}
//最后swap一下
_table.swap(newtable);
}
那么我么可以拿上面的案例来测试一下插入的逻辑是否正确。
扩容之前:
扩容之后:
目前看来我们的插入的逻辑是没有问题的。但是其实我们还忽略了一个最重要的点,就是相同的key不能同时存在,所以我们还需要判断key是否已经存在,那么由此导致我们的insert实际上是需要一个返回值来反馈是否插入成功的。
修改后的代码如下:
bool insert(const K& key, const V& val)
{
if (_table.size() == 0) //初始
_table.resize(INIT_SIZE);
if (_n >= _table.size() * 0.7) //负载因子超过0.7,需要扩容
{
vector<Data> newtable;
newtable.resize(_table.size()*2);
//遍历旧表,将数据插入到新表中
for (auto& d : _table)
{
if (d._flag != EMPTY)
{
//那么这里我们还需要一个将key从data中提取出来的仿函数
size_t index = KeyOfData()(d) % newtable.size();
while (newtable[index % newtable.size()]._flag != EMPTY)
{
index++;
}
newtable[index % newtable.size()] = d;
}
}
//最后swap一下
_table.swap(newtable);
}
//首先求出key映射的位置
size_t index = key % _table.size();
//采用线性探测
while (_table[index % _table.size()]._flag != EMPTY)//向后探测直到空
{
if (KeyOfData()(_table[index % _table.size()]) == key)
return false;
index++;
}
_table[index % _table.size()]._key = key;
_table[index % _table.size()]._val = val;
_table[index % _table.size()]._flag = EXIST;
++_n; //计数器加加
return true;
}
那么接下来就是删除的逻辑,删除是使用惰性删除。但是删除的时候,当我们发现映射出来的位置不是我们自己的值时,我们不能直接返回false,因为它可能是因为冲突了然后向后探测找了新的位置。比如下面的场景,我们要删除7,7映射到的位置就是下标为7,但是由于17已经存到了该位置上,所以7就往后存了。那么,删除的逻辑就应该是先找到key值对应的位置,然后不断往后遍历,直到某一个位置的key值就是我们要删除的key值,或者遍历到空了,就说明后面也没有由于冲突被放进来的值了。那么初步的逻辑就是这样的
bool erase(const K& key)
{
size_t index = key % _table.size();
while (_table[index % _table.size()]._flag != EMPTY)
{
if (_table[index % _table.size()]._key == key) //找到了
{
_table[index % _table.size()]._flag = EMPTY; //状态置为空
_n--;
return true;
}
index++;
}
//走到这里说明表中没有该数据
return false;
}
但是真的就这么简单吗?
如上面的图,当我们删除了7之后,如果我们还要使用erase(6)来删除6这个数据呢?那么当我们从6号位置往后遍历,遍历到 8 好下标就停下来了,这不就出问题了?
那么问题出在哪了呢?这是由于我们的 7 号位置本来是放了数据的,而数据 6 则线性探测一直探测到了7的后面才找到空位置来存放 。 但是当数据 7 被删除之后,我们把7的位置的状态设置为了空,这是不合理的。
所以在哈希表中,我们删除某个数据并不是把该位置置为空,而是要做一种特殊的标记,因此,我们使用两种状态是搞不定这里的哈希表的,我们必须要需要加上第三种状态,也就是删除,有了删除状态之后,就算我们遇到了删除状态,我们也不能停下来,而是要遇到空状态才能停下来。
我们需要把删除状态加上
那么就修改成了下面的的代码
bool erase(const K& key)
{
size_t index = key % _table.size();
while (_table[index % _table.size()]._flag != EMPTY)
{
if (_table[index % _table.size()]._flag==EXIST
&&KeyOfData()(_table[index % _table.size()]) == key) //找到了
{
_table[index % _table.size()]._flag = DELETE;
--_n;
return true;
}
index++;
}
//走到这里说明表中没有该数据
return false;
}
但是其实还没有,我们的这段程序还是有一个bug,就是,有一种可能,我们的哈希表中所有的位置都曾放过值,只不过不是同一时间都存在,而是某些位置已经被删除了,然后有放进了一些数据到EMPTY的位置,也就是说,哈希表中的所有数据不是EXIST就是DELETE,那么我们上面的while循环就会死循环下去,那么我们要怎么防止这种情况呢? 很简单,在循环的过程中加一个计数器,只要计数器的值等于哈希表的大小了,就说明已经找了一轮了,那么也就没有继续找下去的必要了,直接break然后返回false。
所以最终的删除的代码如下:
bool erase(const K& key)
{
size_t index = key % _table.size();
int cnt = ; 0
while (_table[index % _table.size()]._flag != EMPTY)
{
if (cnt == _table.size())
break; //防止死循环
if (_table[index % _table.size()]._flag==EXIST
&&KeyOfData()(_table[index % _table.size()]) == key) //找到了
{
_table[index % _table.size()]._flag = DELETE;
--_n;
return true;
}
++cnt;
index++;
}
//走到这里说明表中没有该数据
return false;
}
查找的逻辑逻辑也是和删除类似的,查找其实就是删除的一个子过程罢了,我们直接把代码复制粘贴一下就行了。
当然我们也可以修改一下返回值,把找到的数据的地址返回去。
Data* find(const K& key)
{
size_t index = key % _table.size();
int cnt = 0;
while (_table[index % _table.size()]._flag != EMPTY)
{
if (cnt == _table.size())
break; //防止死循环
if (_table[index % _table.size()]._flag == EXIST
&& KeyOfData()(_table[index % _table.size()]) == key) //找到了
{
return &_table[index % _table.size()];
}
++cnt;
index++;
}
//走到这里说明表中没有该数据
return nullptr;
}
我们可以使用一个特殊的案例来测试一下删除的逻辑,所有的位置都放过数据,最后在把所有的数据删除。
Myhash::Hash<int, int> hs;
int arr[] = {1,11,21,31,41,51,61};
for (auto e : arr)
hs.insert(e,e);
for (auto e : arr)
hs.erase(e);
hs.insert(8, 8);
hs.insert(9, 9);
hs.insert(0, 0);
hs.erase(0);
hs.erase(9);
hs.erase(8);
插入前七个数据之后
删除前七个数据之后
再插入剩余三个EMPTY位置
删除另外三个数据:
所以我们的逻辑是没有问题的。
目前我们实现的闭散列哈希还有很多地方没有完成,比如我们存储的如果是string或者字符串类型的数据,我们现实生活中查找的大多数的数据的都是 字符串类型的,所以我们至少要支持字符串类型的存储。
但是这里就有一个很大的问题,字符串如何进行取模操作求出映射的位置?我们的取模操作是建立在传过来的key是整型的前提下的,但是如果传的是字符串,那么我们是无法进行取模操作的,俺么如此一来,我们就需要一个算法能够将字符串依据他的内容得出一个整型值来进行取模映射,同时这个算法必须要是稳定的,也就是传过来相同的字符串,必须返回的是相同的整型。
我们最容易想到的就是拿字符串的第一个字符的ASCII码值来作为整型取模映射,但是这么简单的操作,会导致之后冲突的概率非常大,只要是首字符相同的字符串就是冲突的,这也太扯了。
那么第二种思路就是把字符串中所有的字符的ASCII码值加起来的到的结果来进行取模映射,虽然可能会存在溢出的风险,但是就算溢出了发生截断也无所谓,反正相同的字符串得到的结果是相同的,同时由于我们的取模操作,就算加起来的和再大也不影响他能够映射在哈希表中。那么这里有一个问题?我们的汉字也是字符吗?是,一个汉字一般就是由两个字符来表示的,但是也有可能不是,取决于编码,但是无论如何汉字我们也是能转换为整型的。
同时为了支持泛型编程,那么我们的将key转换为一个整型的算法也需要用一个仿函数来作为模板参数。由于我们现实中一般都是拿整形或者字符串作key,所以我们就考虑这两种情况就够了。
对应我们的库里面也是一样的,需要一个从Value 中提取出key的仿函数,也需要一个将key转换得到一个整型的仿函数。
//一般的类型都是整型,直接返回就行了
template<typename K>
struct HashKey
{
size_t operator()(K key) { return key; }
};
//模板特化对string类型进行处理
template<>
struct HashKey<string>
{
size_t operator()(string str)
{
size_t sum = 0;
for (auto ch : str)
sum += ch;
return sum;
}
};
那么我们上面的代码中凡是要求位置的代码都要换成这样的
size_t index =HashKey()(key) % _table.size();
Myhash::Hash<string, int,Myhash::HashKey<string>> hs;
vector<string> vstr;
vstr.push_back("abc");
vstr.push_back("ab");
vstr.push_back("ac");
vstr.push_back("ad");
vstr.push_back("aaa");
vstr.push_back("adb");
for (auto s : vstr)
hs.insert(s,0);
那么目前我们的哈希表也能够进行字符串的映射了。
写完简单的闭散列哈希的实现,我们就能知道一些问题,比如我们用红黑树实现的map和set是通过比较大小来确定位置的,这就要求我们需要有一个提取key的仿函数以及key要支持比较大小。而我们的哈希则是一种映射,他是通过哈希函数来确定位置的,这就要求我们的key需要能够转换成整型,同时我们也需要一个从val中提取key的仿函数。
最后,闭散列哈希的最大的问题就是哈希冲突的问题,这一点在线性探测尤为突出。但是就算我们将探测方式换成二次探测,也只是让冲突的数据没有挨着存,冲突的数据最终还是要去占用别人的位置,并没有从根本上解决问题。所以我们的闭散列的哈希也就不往后实现了,因为没有必要了,我们实现以下插入和删除主要是为了熟悉一下哈希是怎么映射的,重点还是要实现哈希桶
4 哈希桶
因为闭散列的哈希容易冲突,所以我们就需要一种能够解决哈希冲突的结构来映射,于是我们就有了哈希桶。
哈希桶实现哈希的话,哈希表里面存储的就应该是Data*的指针了,同时初始化的时候就都初始化为nullptr就行了,在哈系统的实现中,我们还是要一个变量来保存存进去的数据的个数,因为哈希桶的实现虽然发生冲突不是去占别人的位置,而是挂在链表上,但是可能会出现同一个位置冲突很多,也就是某一个链表很长,这样时会影响他的效率的,所以我们还是需要控制负载因子,不过哈希桶实现的话负载因子就可以稍微放宽一点了,比如最大不超过1就行。
那么首先还是定义他的结构:
template<typename Data>
struct ListNode
{
ListNode()
:_next(nullptr)
,_kv(kv)
{}
Data _kv;
ListNode<Data>* _next;
};
template<typename K>
struct Hash
{
size_t operator()(const K& key) { return key; }
};
template<>
struct Hash<string>
{
size_t operator()(const string& str)
{
size_t sum = 0;
for (auto ch : str)
sum += ch;
return sum;
}
};
template<typename K,typename V>
struct KeyOfVal
{
K& operator()(const pair<K, V>& p) { return p.first; }
};
template<typename K, typename V, typename Hash=Hash<K>, typename KeyOfVal=KeyOfVal<K,V>>
class HashBucket
{
typedef ListNode<pair<K,V>> Node;
public:
private:
vector<Node*> _table;
size_t _n=0;
};
然后完成插入函数
基础的插入的逻辑如下
bool insert(pair<K, V>kv)
{
//判断是否需要扩容
if (_table.size() == 0)
{
_table.resize(INIT_SIZE,nullptr);
}
//扩容逻辑 ... ...
size_t index = Hash()(KeyOfVal()(kv))%_table.size(); //先求映射的位置
//先看对应的链表中是否已经存在该key
Node* cur = _table[index];
while (cur)
{
if (KeyOfVal()(kv) == KeyOfVal()(cur->_kv))
return false;
cur = cur->_next;
}
//头插到对应的链表
Node* newnode = new Node(kv);
newnode->_next = _table[index];
_table[index] = newnode;
_n++;
return true;
}
那么接下来我们要完成的就是扩容,扩容其实很简单,有很多种方式,其中一种再创建一个哈希表,对其进行扩容之后,复用我们的insert。
if (_n == _table.size())
{
//1 复用哈希的insert
HashBucket hs;
hs._table.resize(2 * _table.size(),nullptr);
for (int i = 0; i < _table.size(); ++i)
{
Node* cur = _table[i];
while (cur)
{
hs.insert(cur->_kv);
cur = cur->_next;
}
}
//最后交换两个哈希的表
_table.swap(hs._table);
}
我们要用这种方法的话就先要实现哈希的析构函数,因为交换之后,原来的表的节点都还在,除了函数的作用域这个局部的hs就要被销毁。而由于我们是采用的链表的形式来存储的,所以我们必须自己实现一个哈希的析构函数,析构函数的逻辑就和上面的一模一样,只不过在删除之前要保存next。
~HashBucket()
{
for (int i = 0; i < _table.size(); ++i)
{
while (_table[i])
{
Node* next = _table[i]->_next;
delete _table[i];
_table[i] = next;
}
}
}
但是这样做有一点过于偷懒了,其实我们还有更好的方法来完成扩容。
我们旧表中的节点虽然可能需要重新映射到其他的位置,挂到其他的位置,但是我们的节点还是可以继续利用的,我么可以把节点中的数据拿出来求映射后的下标,然后直接挂上去,而不是这样在新表中insert,insert虽然代码量少,但是他不仅需要new节点,最后还需要对旧表节点一个一个释放,浪费了效率。
直接挪节点的扩容:
//2 挪节点
vector<Node*> newtable(_table.size()*2,nullptr);
for (int i = 0; i < _table.size(); ++i)
{
Node* cur = _table[i];
while (cur)
{
size_t index = Hash()(KeyOfVal()(cur->_kv));
Node* next = cur->_next;
cur->_next = newtable[index];
newtable[index] = cur;
cur = next;
}
_table[i] = nullptr; //最后一定要置空,防止节点被析构
}
_table.swap(newtable);
然后就是删除的逻辑,删除没什么好说的,就是单链表的删除,同时要判断删除的是不是头节点,如果是的话还需要修改哈希表。
bool erase(const K& key)
{
size_t index = Hash()(key) % _table.size();
Node* cur = _table[index];
Node* prev = nullptr;
while (cur)
{
if (key == KeyOfVal()(cur->_kv))
break;
prev = cur;
cur = cur->_next;
}
//如果cur为空,说明根本就没有这个数据
if (!cur)
return false;
//如果prev为空,那就说明cur是头节点
if (!prev)
{
_table[index] = cur->_next;
delete cur;
--_n;
return true;
}
else //说明cur不是头节点
{
prev->_next = cur->_next;
delete cur;
--_n;
return true;
}
}
最后还有拷贝,拷贝则需要深拷贝来支持,包括链表中的节点的顺序我们都不能改变,不过这里难度也不是很大了,我们一个一个拷贝就行了。resize + 遍历尾插.
HashBucket(const HashBucket& hs)
{
_table.resize(hs._table.size(),nullptr);
for (int i = 0; i < _table.size(); ++i)
{
Node* cur = hs._table[i];
Node* tail = _table[i];
while (cur)
{
if (tail == nullptr) //第一次插入
{
_table[i] = new Node(cur->_kv);
tail = _table[i];
}
else
{
tail->_next = new Node(cur->_kv);
tail = tail->_next;
}
cur = cur->_next;
++_n;
}
}
}
然后就是赋值重载,我们还是可以直接复用拷贝构造来完成。
HashBucket operator=(HashBucket hs)
{
_table.swap(hs._table);
}
最后还是需要实现一下find,find我们返回数据的指针就行了。
pair<K, V>* find(const K& key)
{
size_t index = Hash()(key) % _table.size();
Node* cur = _table[index];
while (cur)
{
if (key == KeyOfVal()(cur->_kv))
return &cur->_kv;
cur = cur->_next;
}
return nullptr;
}
我们实现的哈希都是采用初始大小为10,扩容为2被扩容的,但是有些地方觉得哈希表的大小开素数个更好,取模取出来不容易冲突。比如我们的stl库里面就是采用的这种方法,初始的大小为53,然后每次扩容都是去找离当前容量的2被最近的一个素数,于是就形成了下面的这张素数表,那么每次扩容的时候都是去找当前容量的下一个素数。虽然我们偷懒没有采取这种方案,但是还是可以稍微了解学习一下的。同时我们看到他的最大的容量不是前一个的二倍了,而是四字节无符号整型的最大值,毕竟哈希表中要存的是指针,那么就算42亿个表项,光vector就有16G了,一般我们也用不到这么大。
还有一点就是,我们日常生活中经常的用的字符串存到哈希表中,我们自己设计的那个字符串转整型的算法还是有些潦草,我们其实有很多著名的字符串转整型的Hash函数,可以在网上查阅一下学习他们是怎么做的。下面是一个有关的链接
https://www.cnblogs.com/uvsjoh/archive/2012/03/27/2420120.html
常见hash函数的对比
5 封装实现unordered系列set和map
和红黑树类似,unordered系列的set和map也是使用一个哈希表模板来实现的,这样一来我们就需要对我们的模板参数以及仿函数等作出一些修改。
首先,哈希表是最底层的实现,那么我们的仿函数等就不需要给缺省值,就算要给缺省值,也是在set和map这一层给,然后传给hash表,其次我们要存的数据的类型也可以通过set和map这一层传给hash。
那么set的顶层结构设计:
template<typename K>
struct Hash
{
size_t operator()(const K& key) { return key; }
};
template<>
struct Hash<string>
{
size_t operator()(const string& str)
{
size_t sum = 0;
for (auto ch : str)
sum += ch;
return sum;
}
};
template<typename K>
struct KeyOfVal
{
//set的val就是一个key
const K& operator()(const K& key) { return key; }
};
template<typename K, typename Hash = Hash<K>, typename KeyOfVal = KeyOfVal<K>>
class set
{
public:
private:
Hashbucket::HashBucket<K, K, Hash, KeyOfVal> _hs;
};
map的顶层设计:
template<typename K>
struct Hash
{
size_t operator()(const K& key) { return key; }
};
template<>
struct Hash<string>
{
size_t operator()(const string& str)
{
size_t sum = 0;
for (auto ch : str)
sum += ch;
return sum;
}
};
template<typename K, typename Data>
struct KeyOfVal
{
const K& operator()(const Data& p) { return p.first; }
};
template<typename K, typename V, typename Hash = Hash<K>, typename KeyOfVal = KeyOfVal<K, pair<const K,V>>>
class map
{
public:
private:
Hashbucket::HashBucket<K, pair<const K,V>, Hash, KeyOfVal> _hs;
};
外部如果我们想要用map或者set来存储一些自定义类型的值作为key,我们需要写一个将我们的自定义类型的key转换为整型的仿函数通过模板传参。
那么set和map的insert、erase、find等接口就都是直接复用hash的就行了。拷贝构造和析构等我们就不需要在set和map这一层实现了,因为编译器自动生成的就会去调用hash的拷贝和析构。
set:
bool insert(const K& key)
{
return _hs.insert(key);
}
bool erase(const K& key)
{
return _hs.erase(key);
}
K* find(const K& key)
{
return _hs.find(key);
}
map:
bool insert(const pair<K,V>&data )
{
return _hs.insert(data);
}
bool erase(const K& key)
{
return _hs.erase(key);
}
pair<K,V>* find(const K& key)
{
return _hs.find(key);
}
那么set和map的最基础的功能就实现了,我们目前看来也没什么问题。
接下来就是进一步的封装,比如封装迭代器。
但是我们的 hash 还没有实现迭代器,那么哈希的迭代器要怎么实现呢?
按照遍历顺序,哈西的遍历首先是遍历哈希表,遍历哈希表的时候每一个链表也要从前往后遍历。那么这时候就会出现一个问题,那就是当我们遍历到某一个桶的结尾时,这时候再加加该怎么走呢?
按逻辑来说就是要指向哈希表的下一个桶的头节点,但是如果我们的迭代器中只有16这个节点的指针,怎么才能找到下一个桶呢?这是一个很大的问题。
这里有两种解决方案,一种就是把每一个桶的尾节点和下一个桶的头节点链接起来,如图:
如果是这种实现的话,那么我们的迭代器的加加就很简单了,只需要不断找next就行了。
但是要实现这种结构的代价是很大很大的。 比如我们在 3 位置再插入一个节点,那么就不能只管 3 位置的哈希桶,还需要找到上一个哈希桶的尾节点,连接起来。
插入还不算复杂。但是删除节点的时候,那就需要判断很多条件,比如要删除的节点时链表的头节点,那么就需要去找到上一个桶的尾节点来连接到删除节点的下一个节点。如果删除的节点是桶的尾节点,那么就要找到下一个桶的头节点和删除节点的上一个节点进行连接。 如果这个删除的节点既是头节点又是尾节点,也就是要删除的节点是这个单链表中的唯一的节点,那这时候要找到上一个桶的尾节点和下一个桶的头节点来连接上。对于我们目前来说,将hash实现成这种方法的成本太高了,要将原来的删除和插入的逻辑推倒重写一遍,所以我们不采用这种方法。
而第二种方法就很简单了,我么可以将哈希对象的指针存在迭代器中,那么每次迭代器加加之前我们就判断一下他是不是这个单链表的尾节点,如果是尾节点,就在哈希表中去找下一个节点的指针。这种是最简单的方法,但是实际上会有一些小问题,一会就知道了。
那么我们先实现一下基础的迭代器
迭代器的结构
//哈希桶迭代器
template<typename K, typename Data, typename Hash, typename KeyOfVal, typename Ref, typename Ptr>
class _hs_iterator
{
typedef ListNode<Data> Node;
typedef typename HashBucket<K, Data, Hash, KeyOfVal> HashBucket;
private:
Node* _node;
const HashBucket* _hs;
};
这时候就有一个问题了,我们在迭代器中引用了HashBucket类,在HashBucket中引用了迭代器这个类,他们两个属于互相引用了,那么按照先后顺序就肯定有一个类的声明在另一个类的后面,那么就会找不到声明。所以我们需要一个模板的前置声明来解决问题,也就是将类模板先放一份声明在迭代器的前面,告诉迭代器后面我们会定义这个类。
那么接下来就是实现迭代器的加加了,按照我们上面的逻辑来实现就行了。
_hs_iterator operator++() //前置
{
if (_node->next) //不为空,说明还不是尾节点
{
_node = _node->_next;
}
else //尾节点
{
//首先求出当前桶在哈希表中的下一个位置
size_t index = Hash()(KeyOfVal()(_node->_data)) % _hs->_table.size()+1;
for (; index < _hs->_table.size(); ++index)
{
if (_hs->_table[index] != nullptr)
break;
}
if (index < _hs->_table.size())
_node = _hs->_table[index];
else
_node = nullptr;
}
return *this;
}
_hs_iterator operator++(int ) //后置
{
_hs_iterator ret = *this;
if (_node->next) //不为空,说明还不是尾节点
{
_node = _node->_next;
}
else //尾节点
{
//首先求出当前桶在哈希表中的位置
size_t index = Hash()(KeyOfVal()(_node->_data)) % _hs->_table.size();
for (; index < _hs->_table.size(); ++index)
{
if (_hs->_table[index] != nullptr)
break;
}
if (index < _hs->_table.size())
_node = _hs->_table[index];
else
_node = nullptr;
}
return ret;
}
!=以及构造函数
bool operator!=(const _hs_iterator& it)
{
return _node != it._node;
}
_hs_iterator(Node* node,const HashBucket*hs)
:_node(node),_hs(hs)
{}
Ref operator*()
{
return _node->_data;
}
Ptr operator->()
{
return &_node->_data;
}
那么接下来就是实现hash的begin和end了。
iterator begin()
{
//先找到第一个位置
Node* node = nullptr;
int i = 0;
for (; i < _table.size(); ++i)
{
if (_table[i])
{
node = _table[i];
break;
}
}
return iterator(node,this);
}
iterator end()
{
return iterator(nullptr,this);
}
那么目前我们就实现了set和map的普通迭代器了。测试一下
接下来就是const迭代器了。
const迭代器我们首先想到的就是复用普通迭代器的模板来实现,那么可以这样么?我们不妨试一下,这时候会有下面的这一段报错
问题肯定是出在了const迭代器上,他的begin和end无法转换到指定的类型,这个报错我们可以思考一下。
这其实很好理解,因为我们的迭代器的两个变量是Node* 类型和 HashBucket 类型的,但是如果我们const的set对象去调用begin的话,那么_hs 就是const被修饰的,那么调用的就是const版本的begin,而const版本的begin我们传过去的this就是const的HashBucket*类型,,Node*也是const的,可是我们却使用HashBucket* 的成员来接收了,所以存在权限扩大的嫌疑。那么要怎么解决呢?难道我们在迭代器的成员变量声明中加上const?这也是不能的,因为我们的普通迭代器是需要支持通过迭代器修改val的。那么库里面是怎么做的呢?
库里面的普通迭代器和const迭代器用的是不同的模板,而不是用一个模板来实现两个迭代器。那么我们也是需要怎么做的。
const迭代器如下:
template<typename K, typename Data, typename Hash, typename KeyOfVal, typename Ref, typename Ptr>
class _hs_const_iterator
{
typedef ListNode<Data> Node;
typedef typename HashBucket<K, Data, Hash, KeyOfVal> HashBucket;
public:
_hs_const_iterator operator++() //前置
{
if (_node->_next) //不为空,说明还不是尾节点
{
_node = _node->_next;
}
else //尾节点
{
//首先求出当前桶在哈希表中的位置
size_t index = Hash()(KeyOfVal()(_node->_data)) % _hs->_table.size() + 1;
for (; index < _hs->_table.size(); ++index)
{
if (_hs->_table[index] != nullptr)
break;
}
if (index < _hs->_table.size())
_node = _hs->_table[index];
else
_node = nullptr;
}
return *this;
}
_hs_const_iterator operator++(int) //后置
{
_hs_iterator ret = *this;
if (_node->next) //不为空,说明还不是尾节点
{
_node = _node->_next;
}
else //尾节点
{
//首先求出当前桶在哈希表中的位置
size_t index = Hash()(KeyOfVal()(_node->_data)) % _hs->_table.size();
for (; index < _hs->_table.size(); ++index)
{
if (_hs->_table[index] != nullptr)
break;
}
if (index < _hs->_table.size())
_node = _hs->_table[index];
else
_node = nullptr;
}
return ret;
}
bool operator!=(const _hs_const_iterator& it)
{
return _node != it._node;
}
_hs_const_iterator(const Node* node, const HashBucket* hs)
:_node(node), _hs(hs)
{}
Ref operator*()
{
return _node->_data;
}
Ptr operator->()
{
return &_node->_data;
}
const Node* _node;
const HashBucket* _hs;
};
测试const迭代器:
其实,set的迭代器只需要有const迭代器就行了,set的普通迭代器和const迭代器都是hash的const迭代器。
实现完迭代器之后,还可以实现以下方括号的重载,主要就是重载insert然后对insert的返回值进行封装,那么这里就不实现了,如果想要参考,可以去上一篇的红黑树的set和map中对照一下就行了。
6 位图
我们的不管是哈希还是红黑数,我们都无法解决海量数据的查找的问题,比如给出以下场景:
给40亿个不重复的无符号整数,没排过序。再给一个无符号整数,判断该证书是否出现在这40亿个数中
对于这样的问题,我们首先想到的肯定是用set容器来判断,但是呢,不管是我们的红黑树还是哈希的set,每个整型数据就占四个字节了,再加上一个用于链接的指针(哈希只需要一个,红黑树甚至需要两个),那么每个数据要存储到set中至少都需要8个字节的空间了。那么40亿个数,我们就需要大约 32G 内存,所以我们使用set容器来完成是不太现实的,即便你的内存真的有这么大,用这种方式也真的是浪费了,40亿个数据插入和扩容旋转都慢死了。
对于这种只用于判断在或者不在,只有两种状态的情景,我们根本就用不上4或者8个字节,我们只需要一个比特位就能够用于标记某个数据在或者不在的两种状态,那么就算我们把所有的unsigned int 的数据都标记一遍,那么也只需要512M内存就够了,因为每个数据只需要为他预留一个比特位的标记位。
怎么标记呢?
我们用直接定址法,将对应的数据映射到对应的比特位中,把对应的比特位置为1,那么解决上面的问题就很简单了,我们只需要将每个数据都在位图中进行标记,然后要查找的数转换为比特位的位置去看该比特位是不是1就行了。这种结构就叫做位图结构。
但是我们是不支持按比特位来申请数组的,最小单位都是字节,那么有关系吗?没有。
就假设我们开的是char类型的数组来作为位图结构,那么我们可以自己规定好,比如我们的数据0-7,这把个数映射到第一个字节上,按照从低位到高位以此表示0-7.
那么第二个字节就以此从低位到高位映射8-15
以此类推,我们就只需要512M的空间就能映射42亿多个数据。
那么这个结构我们就很好定义了只需要一个char数组就行了,但是这个结构有一个要求,由于我们使用的是一个静态的数组,所以在使用该结构时需要将要标记的数据个数通过模板参数传过来,当然也可以使用动态数组,但是也需要事先给好数据的个数,避免后续扩容,也可以直接使用vector来封装.
那么结构定义如下:
template<size_t N>
class bitset
{
public:
bitset()
{
size_t cnt = N / 8 + 1; //N个数据就需要开N/8+1个字节,防止开N/8个字节不够
_arr.resize(cnt,0);
}
private:
vector<char> _arr;
};
而我们的位图结构只需要三个接口,set(标记),reset(取消标记)以及test(检测)。
首先实现标记,第一步我们需要知道该数据要映射到第几个字节,然后需要直到映射到该字节的从低到高多少位,最后把该位置1就行了。
void set(size_t key)
{
size_t index = key / 8; //第几个字节
size_t bit = key % 8;//第几个位
arr[index] |= (1 << bit);
}
而reset也是一样的思路。
void reset(size_t key)
{
size_t index = key / 8; //第几个字节
size_t bit = key % 8;//第几个位
arr[index] &= ~(1 << bit);
}
最后就是test
bool test(size_t key)
{
size_t index = key / 8; //第几个字节
size_t bit = key % 8;//第几个位
return arr[index] >> bit & 1;
}
这样就完成了一个基础的位图结构了,查找插入取消的效率都是O(1)。
但是位图有一些缺陷,首先就是只能映射整型的数据,第二就是她只有两种状态,只能知道在或者不在,无法知道出现了多少次。
那么如果有下面的场景,我们该如何解决?
给定一百一个整数,找出只出现一次的整数
这个题看着好像跟上面的差不多,但是这里多了一种状态,我们上面的位图是只能表示出现了和未出现,但是这里需要三个状态:未出现,出现一次,出现一次以上。
其实也很简单,三种状态我们也就只需要两个比特位就能完成。也就是说,对于所有的整数,每个整数我们只需要用两个比特位来表示它未出现,只出现一次和出现一次以上这三种状态。那么就算是所有的整形数据,我们也只需要一个G就能解决。
那么我们这么让两个比特位标记一个整数呢?难道按照上面的位图再重新设计一个结构,每个数据用连续的两个比特位来标记?可以是可以,也不是很复杂,就是求下标的时候变成 / 4 ,然后她的第一个比特位就是低到高的第 (N%4)*2 ,但是我们想说的是,没必要重新设计一下。我们只需要复用上面的位图就行了。
每个数据需要两个比特位很简单啊,我们只需要开两个一个比特位的位图,然后用一个位图的相应位置表示第一个比特位,第二个位图的相应位置表示第二个比特位,这样一来我们给还能够直接复用上面的接口,那么话不多说,直接梭哈。
//双比特位位图
template<size_t N>
class double_bit_set
{
public:
void set(size_t key)
{
if (s1.test(key)) //说明第一个比特位为1了,那么就把第二个比特位也改成1,用 1 1 表示出现一次以上
{
s2.set(key);
}
else //说明第一个比特位为0,那么把第一个比特位变为1,用 1 0 表示出现一次
{
s1.set(key);
}
}
void reset(size_t key)
{
s1.reset(key);
s2.reset(key);
}
bool test(size_t key)
{
return s1.test(key) && !s2.test(key); //第一个比特位为1,第二个比特位不为1才是只出现一次
}
private:
bitset<N> s1;
bitset<N> s2;
};
其实我们这里的reset接口已经不需要存在了。
那么还有的场景需要我们统计出现0次,出现1次,出现2次和出现2次以上的,这时候我们还是只需要两个比特位就能表示,至于那种状态用什么样的值来表示,全看你自己的定义。
最后还有这样的场景
给两个文件,分别有100一个整数,只有1G内存,如何找到两个文件的交集
注意,我们这里的交集不需要算重复的数据。
那么这个也很好做了,只需要将其中一个文件的所有数据用一个位图来标记,然后依次取出第二个文件的数据来比对,如果为1,就将该位置为0(交集中相同的数字只统计一次),然后放到一个存放交集数据的文件中。
7 哈希切割
给一个超过100G大小的日志文件,文件中存放的是ip地址,如何找到出现次数最多的ip地址?
ip其实就是一个字符串,比如 255.255.255.255 ,那么字符串我们如何统计呢?用位图能统计吗?不能,首先第一步我们位图只能统计整形,对于这种字符串类型,就算用一定的算法转换为了整型来标记,但是字符串之间是可能冲突的,也就是不同的字符串通过算法转换出来一个相同的整型,那么就会映射到一个相同的位置,这样就不确定了。同时,位图只能用于只有少数的状态的情景,而这里的题意明显是一个位置需要统计很多次的,所以位图无法完成。
这种问题就是一个kv模型,但是由于数据量太大,导致放不到内存中,我们无法使用map来解决,那这时候要怎么办呢?
首先我们要做的第一件事肯定是要将100G的文件拆分成一个一个的小文件的,总之我们要想办法让每个文件的数据量小到能放进内存中使用map来统计每个ip出现的次数,然后使用一个变量保存该文件出现最多的ip以及出现的次数,再将map的数据clear,然后打开下一个文件继续重复以上过程,看是否需要更新最大的次数和出现次数最多的ip这两个变量,直到所有的文件都统计完,那么保留的就是出现次数最多的ip了。
我们怎么拆分文件呢?
首先想到的肯定就是平均拆分,但是平均拆分的话,我们就无法保证最终结果的准确性了,因为这样统计出来只能表示在该小文件中的某个出现次数最多的ip,并不能代表这个ip总共出现的次数,因为这个ip或者该小文件中其他的ip可能在别的文件中也出现了很多次,所以平均切分出来的统计结果是不准确的。
我们切分时要保证所有的相同的ip必须在一个文件中,只有这样使用map统计出来的次数才是该ip的总次数。那么我们就需要用哈希切分。什么是哈希切分呢?我们有100G数据,要将其切分为多个小文件,同时相同的字符串必须在一个文件中,那么我们可以使用哈希函数来将字符串转换出一个整型,在以该整形来映射到对应的小文件中。比如转换出来的整型是 key ,而我们总共要划分出100 个小文件,对于转换出来整型为 key 的字符串就放到 第 key%100 号文件中。
这样一边切分下来之后,我们能保证相同的ip的字符串一定在同一个文件中。但是这并不能说明这个文件中只有这一个ip,因为还有其他的ip也可能转换成整形之后取模求出来的也是一样的值,那么可能还会存在一个文件中。
这样划分出来之后,文件就小了,我们就能通过map来统计每一个ip出现的次数了,但是也并不是所有的文件都能够统计出来,因为有可能划分出来之后小文件还是很大,那么现在对于没个小文件而言就有三种情况:
1 文件不是很大,所有的 ip 能存在map中来统计次数
2 文件虽然很大,但是文件中的大多数ip都是同一类,或者说文件中的ip的种类不多,还是能够在内存中用map来统计次数
3 文件很大,同时文件中不同的ip很多,也就是key值很多,那么这时候无法全部放到内存中,这时候需要继续进行哈希切分,继续进行哈希切分的时候,我们要换一个哈希函数来进行,不能再用上一次切分所使用的哈希函数。
那么如何区分这三种情况呢?
很简单,不管怎么样,我们都不断插入到 map 中,然后捕获异常来判断map是否存的下,我们可以看一下insert函数的描述
如果该文件每次插入都是成功的,那么说明是第一种或者第二种情况,这时候都不需要继续划分。而当我们捕获到异常了,就说明是第三种情况,map中存不下了,那么这时候我们也不继续往map中存了,将map清空,然后对该文件继续哈希切割。
8 布隆过滤器
布隆过滤器是对比位图而存在的。我们说了位图只能够能够标记整数是否存在,但是无法准确标记字符串是否存在,还是那个原因,字符串转换成整形之后是有可能冲突的,同时,还是因为字符串由于有各种排列组合,那么就注定了字符串的数量是无限的,那么如果数据多了之后,就算使用哈希函数将字符串转换成整型然后映射到位图中,出现冲突的概率还是很大的。同时,我们也不好确定我们要开的位图的大小,因为我们实现是不知道每个字符串转换出来的整数的key的值,那么也就是说我们上面的位图解决不了字符串的是否存在的逻辑判断,我们整型能用位图判断是因为整形是不会发生冲突的,而字符串则很有可能发生冲突。
但是当数据量很大的时候,我们也只能是用位图结构来进行标记映射了。
同时,不管是否发生冲突,我们的位图都能够用来判断某个字符串 不存在 ,因为该字符串所映射的比特位为0,就代表该字符串必定不存在。而如果该字符串映射的比特位为1,我们不能确保该字符串就存在,因为可能是别的字符串与该字符串映射的位置冲突了。
既然不存在这个结果是准确的,存在的结果不一定准确,可能存在误判,那么我们能不能想办法降低这个误判率呢?
于是有一个大佬就想出来一个降低误判率的方法:
每一个字符串不止映射一个位置,而是使用多个不同的哈希函数,将该字符串映射到多个位置。
那么这样一来,在我们的位图中,不存在的判断还是准确的,而准确的判断的正确率也会有所提升,误判率会下降。因为我们判断该字符串存在是要去判断这个字符串映射出来的多个位置是否都为1,如果都为1我们才认为它存在,有一个不为1,我们就认为该字符串不存在,这样就大大降低了误判率。
当然这也只能降低误判率,而不能完全消除误判的几率。
那么我们如何确定一个值要映射几个位置呢?
难道是映射的位置越多越好?当然不是,当每一个值映射的位置过多的时候,那么我们标记和搜索的效率就会降低,最主要的是空间浪费会很大。
我们还是需要选一个合适的 N ,让每一个值映射 N 个位置,也就是使用N个哈希函数,同时,为了能够映射,我们使用哈希函数转换出来的整形还是需要取模之后再去映射的。 那么我们开多少个空间呢?比如我们有 M个数据,每个数据映射 N 个位置,那么我们的空间的数量开多少合适呢?难道就开MN个比特位?这样当然不好,就好比我们的闭散列哈希表,都要将负载因子控制在0.7左右,避免效率太低。
布隆过滤器所需要的空间有一个公式
如果我们空间足够的话,我们可以控制布隆过滤器的空间使用率为0.5左右,这样误判率更低,同时空间利用率也还行。 但是一般来说控制在 0.7 左右就行了。
那么我们就可以用位图来实现一个简单的布隆过滤器
template<size_t N>
class BloomFilter
{
public:
void set(const string& str)
{
for (int i = 0; i < 5; ++i)
{
size_t key = Hash[i](str); //Hash[] 是一个函数指针数组,保存的是哈希函数的地址
_bs.set(key%cnt);
}
}
void test(const string& str)
{
bool ret = true;
for (int i = 0; i < 5; ++i)
{
size_t key = Hash[i](str); //Hash[] 是一个函数指针数组,保存的是哈希函数的地址
ret & = _bs.test(key % cnt);
}
return ret;
}
private:
size_t _cnt = N * 5 * 10 / 7;
bitset<N * 5 * 10 / 7> _bs; //假设我们每个值映射 5 个位置
};
对于布隆过滤器而言,不能设计出reset,因为会影响其他的数据的映射。
如果场景需要,确实要设计出reset,那么我们的布隆过滤器就不能只有一个位图结构来判断在不在,还需要能够保存该位被置1的次数,也就是每一个位置都需要需要一个计数器,那么reset的时候就将每一个映射的位置的计数器减减就行了。
那么布隆过滤器适合什么样的场景呢?
适合于不需要一定确定的场景,或者说只需要准确判断其中一个状态,另一个状态不要求准确。比如我们注册账号的时候昵称的判重,我们就可以设置一个布隆过滤器来判重,我们要的是该昵称不存在这个结果准确,这样就能保证不会出现重名。 而对于判断出该昵称已经存在这个结果,虽然实际上可能改昵称并不存在,但是不影响。
我们要利用的就是布隆过滤器能够准确判断出数据不在位图中的这个特性。
最后有下面的这样一个场景:
给出两个文件,分别由100亿个query(查询指令,我们理解为字符串就行),我们只有1G内存,如何找到两个文件的交集? 分别给出近似算法和精确算法
近似算法很简单,使用布隆过滤器就行了,不过我们要控制好开的空间的总个数,不能超过1G,我们就使用接近一个G的内存,那么大概能有七八十亿个比特位供我们映射。我们将其中一个文件的字符串全部通过多个哈希函数映射到多个位置,然后再不断取另一个文件的数据来test就行了,这样就能得出近似结果。
而如果要得出精确结果,那么就要分别将两个大文件进行哈希切分,分成多个小文件,当然两个文件在进行切分的的时候需要使用同一个哈希函数来转换。Ai小文件加载到map中,然后读取Bi小文件来find,看能否找到交集。如果map存不下,那么还是需要继续哈希切分,换一个函数进行哈希切分,假如Ai文件的数据放不进内存,那么不管Bi能不能放进去,我们都直接再使用哈希函数对Ai和Bi进行切分。