时隔一个多月,终于想起来写大数据算法基础的实验报告,主要是快截止了,hh
这两天加急把这个报告写完了~
接下来,写一写证明过程(参考书籍:高等教育出版社《数据科学与工程算法基础》)主要代码以及总结体会o(* ̄▽ ̄*)ブ
本次实验主要设计三块内容,分别是水库抽样算法(当水库大小为1时),水库抽样算法(当水库大小为k>1时)以及分布式水库抽样算法
水库抽样算法
主要证明过程
主要Python代码
水库抽样算法(返回一个)
import random
def sampling_single(stream):
reservoir = stream[0]
i = 1
for i, item in enumerate(stream):
j = random.randint(0, i)
if j < 1:
reservoir = item
return reservoir
F = [i for i in range(100)]
H = sampling_single(F)
print(f"Randomly sampled element: {H}")
水库抽样算法(返回多个)
import random
def reservoir_sampling(stream, k):
reservoir = []
for i, item in enumerate(stream):
if i < k:
reservoir.append(item)
else:
j = random.randint(0, i)
if j < k:
reservoir[j] = item
return reservoir
data_stream = [i for i in range(100)]
sampled_data = reservoir_sampling(data_stream, 10)
分布式水库抽样算法
主要证明过程
一个Hadoop任务Sample由 n 个 Map 组成,其中每个 Map 都接受到一个数据流 Substream,当这些数据无法完全保存在内存时,如何随机地抽取一个含有 k 条记录的样本(每条记录被抽中的概率相同),于是,这就引出了分布式水库抽样算法(分层水库抽样 + 重抽样 = 分布式水库抽样算法)
先在每个 Map 上独立运行水库抽样算法,之后对 n 个子样本就行重抽样,获得满足要求的最终结果。
主要 Python 代码
import random
def reservoir_sampling(stream, k):
reservoir = []
for i, item in enumerate(stream):
if i < k:
reservoir.append(item)
else:
j = random.randint(0, i)
if j < k:
reservoir[j] = item
return reservoir
def distributed_sampling(n, k, stream):
N = []
F = []
H = []
for i in range(n):
F.append(reservoir_sampling(stream, k))
N.append(len(F[i]))
total_N = sum(N)
for j in range(k):
p = random.random()
m = 0
cumulative_N = 0
while cumulative_N < p * total_N :
cumulative_N += N[m]
m += 1
H.append(random.choice(F[m-1]))
return H
n = 15
k = 10
data_stream = [i for i in range(100)]
H = distributed_sampling(n, k, data_stream)
print("Final Sample H:", H)
总结
水库抽样技术归根到底就是在总体容量未知的情况下,仅通过单遍扫描数据集便能生成等概率抽样集合的一种均匀抽样技术。
代码或许很简单,但是其中的数学知识以及思想方法是很值得学习的!