一、实验内容简介
该实验主要使用频繁模式和关联规则进行数据挖掘,使用Apriori算法和Python语言来编写和设计程序,然后用不同规模的数据集来检验效果,最后分析和探讨实验结果,看其是否达到了理想的效果。
二、算法说明
关联规则是形如 X→Y 的蕴涵表达式,其中 X 和 Y 是不相交的项集,即 X∩Y=∅。关联规则的强度可以用它的支持度(support)和置信度(confidence)来度量。计算公式如下:
支持度:support(A=>B) = P(A∪B),表示 A 和 B 同时出现的概率。
置信度:confidence(A=>B)=support(A∪B) / support(A),表示 A 和 B 同时出现的概率占 A 出现概率的比值。
强关联规则是指达到了最小支持度和最小置信度的关联规则。
Apriori算法作为频繁模式和关联规则中的第一个也是最经典的算法,直至现在它也仍然流行。它利用逐层搜索的迭代方法找出数据库中项集的关系,以形成规则,其过程由连接(类矩阵运算)与剪枝(去掉那些没必要的中间结果)组成。该算法中项集的概念即为项的集合。包含 K 个项的集合为 k 项集。项集出现的频率是包含项集的事务数,称为项集的频率。如果某项集满足最小支持度,则称它为频繁项集。
三、算法分析与设计
了解完算法的基本原理后,现在开始真正实现该算法。首先需要读取最小支持度,读取数据集。这里的数据集可大可小,我用Python中的字典来表示数据集,既方便又不容易出错。然后是频繁模式的存储,这里使用列表,前面k个元素表示具体的事务,最后一个元素表示前面元素的组合在数据集中出现的错误。
运用面向对象的思想,把Apriori算法封装成一个类。首先写构造函数进行初始化:
def __init__(self, support: float, data: dict):
"""
初始化
:param support: 支持度
:param data: 数据集
"""
self.support = support
self.data = data
self.FirstList = [] # 第一轮扫描后的频繁模式
self.FinalList = [] # 最后的频繁模式
self.FinalList_noValue = [] # 最后的频繁模式(不包括频率)
self.allList = [] # 所有的频繁模式
自定义几个列表(存放频繁模式)、初始化支持度和数据集。
接着进行第一轮扫描,把出现次数达到最小支持度要求的1项集先提取出来。具体方式就是通过遍历数据集一个一个找。然后建立一个字典,把每一项的名字作为关键字,再把这一项出现的次数作为值。
def findFirst(self):
dict = {}
for value in self.data.values():
for i in value:
if i in dict.keys():
dict[i] += 1
else:
dict[i] = 1
for key, value in dict.items():
if value >= self.support * len(self.data):
self.FirstList.append(key)
# 进行后面的扫描
self.findMore()
接着进行后续的扫描,后续的扫描的过程大体上相同,于是在findMore函数中就可以使用递归,简化代码。基本思路是将上一次遍历的k-1项集来合成k项集,这里使用集合的并运算,同时控制长度,符合要求后转换成列表进行排序(不排序的话内部会呈乱序排列,因为集合没有顺序),然后再转换成元组(元组才能通过哈希作为字典的关键字)返回。接着跟第一次扫描的思路大体相同,挨个遍历,获得频繁项集。不停地递归下去,直到下一个返回的列表为空,这时这一次获得的就是最大频繁项集。
def findMore(self, k=2):
"""
进行第二次至第k次扫描
"""
# 具体代码实现较长,详见附录
最后得到结果后当然要进行输出,这里也把输出结果简单地封装了一下。主要输出结果有最大频繁项集和支持度最高的频繁项集。
def __str__(self):
# 输出运算符重载,也就是输出最终信息
if self.allList: # 验证是否为空
self.allList.sort(key=lambda l: (l[-1], len(l)), reverse=True)
print("所有的频繁模式按支持度从大到小为" + str(self.allList))
print("极大频繁模式为" + str(self.FinalList))
return '前k个频繁模式按支持度从大到小为' + str(self.allList[0:len(self.FinalList[-1]) - 1])
return '无频繁模式!'
四、测试结果
在写完代码后,最重要的就是进行大量的测试,来分析其正确性与性能。
首先验证正确性。我首先使用了一个很简单的数据集来验证。
先给定0.5的支持度:
经过验证是正确的,再试试0.15的支持度:
再试试0.9的支持度,这个支持度很高,没有任何一个组合能够达到:
程序也成功输出了正确结果。
接下来就要扩大数据集的容量了,这样才能分析算法的性能。这里使用随机变量来模拟大量的数据:
在这里,arr和data2都可以修改,arr可以修改其中的元素来改变权重(如arr2),data2可以修改数量,这里统一使用0.5作为支持度。
首先用100000的数据来测试:
可以看到,一共花了1.7秒。
再使用arr2:
可以看到,牛奶的支持度显著上升。
然后把数据量变为1000000来试试:
一共花了15.58秒。
然后把数据量变为10000000来试试:
一共花了171.1秒,可以看出,它花费的时间与数据量规模大小成正比。
五、分析与探讨
测试完算法后,来分析它的性能,思考还有哪些能改进的地方。首先可以看到,因为Apriori算法每次迭代都要对全部数据进行一次扫描,这显然是比较低效的。同时,该算法使用的数据结构也是比较低级的线性表,跟树、并查集等其他数据结构比起来也显得比较低效。但是,该算法有助于我们理解频繁模式挖掘,同时它也启发了其他的高效算法,是我们学习频繁模式的很好的入门级算法。
因为该算法本身的原因,它的优化有瓶颈限制,最多进行一些语法上的优化和一些小技巧。如果要进行大幅度的性能改进,还要使用其他的高效算法,如FP-Tree算法等。
附录:源代码
"""
输入事务集,给定支持度
输出所有的频繁模式,并按支持度降序排列
输出极大频繁模式,并按支持度排序
输出支持度最大的前k个频繁模式
"""
import time
import itertools
import random
class Apriori:
def __init__(self, support: float, data: dict):
"""
初始化
:param support: 支持度
:param data: 数据集
"""
self.support = support
self.data = data
self.FirstList = [] # 第一轮扫描后的频繁模式
self.FinalList = [] # 最后的频繁模式
self.FinalList_noValue = [] # 最后的频繁模式(不包括频率)
self.allList = [] # 所有的频繁模式
def findFirst(self):
"""
第一次扫描
"""
dict = {}
for value in self.data.values():
for i in value:
if i in dict.keys():
dict[i] += 1
else:
dict[i] = 1
for key, value in dict.items():
if value >= self.support * len(self.data):
self.FirstList.append(key)
# 进行后面的扫描
self.findMore()
def findMore(self, k=2):
"""
进行第二次至第k次扫描
"""
Dict = {}
List_k = []
List_k_noValue = []
# 寻找最大频繁项集
if k == 2:
C = list(itertools.combinations(self.FirstList, k))
else:
C = self.Ck(k)
for i in C:
for value in self.data.values():
if set(i).issubset(set(value)):
if i in Dict:
Dict[i] += 1
else:
Dict[i] = 1
for key, value in Dict.items():
if value >= self.support * len(self.data):
list_temp = list(key)
List_k_noValue.append(list_temp.copy())
list_temp.append(value)
List_k.append(list_temp)
if List_k:
List_k.sort(key=lambda l: l[-1], reverse=True)
self.allList.extend(List_k)
self.FinalList_noValue = List_k_noValue
self.FinalList = List_k
self.findMore(k + 1)
def Ck(self, k):
"""
根据k-1项的频繁项集列表生成k项的候选项集
:return: ck项集
"""
Ck_list = []
for i in range(len(self.FinalList_noValue) - 1):
for j in range(i + 1, len(self.FinalList_noValue) - 1):
s = set(self.FinalList_noValue[i]) | set(self.FinalList_noValue[j])
s = list(s)
s.sort()
if len(s) == k and tuple(s) not in Ck_list:
Ck_list.append(tuple(s))
return Ck_list
def __str__(self):
# 输出运算符重载,也就是输出最终信息
if self.allList: # 验证是否为空
self.allList.sort(key=lambda l: (l[-1], len(l)), reverse=True)
print("所有的频繁模式按支持度从大到小为" + str(self.allList))
print("极大频繁模式为" + str(self.FinalList))
return '前k个频繁模式按支持度从大到小为' + str(self.allList[0:len(self.FinalList[-1]) - 1])
return '无频繁模式!'
if __name__ == '__main__':
try:
support = float(input("请给出最小支持度:"))
if support <= 0 or support >= 1:
raise Exception("范围错误")
except ValueError:
print('输入类型错误')
except Exception as e:
print(e)
else:
data = {
1: ['A', 'B', 'C'],
2: ['A', 'C'],
3: ['A', 'B', 'C', 'D'],
4: ['B', 'C', 'E'],
5: ['A', 'C', 'E'],
6: ['A', 'B', 'C', 'F'],
7: ['A', 'B', 'C']
}
arr = ['牛奶', '面包', '鸡蛋', '馒头', '包子', '饼干']
arr2 = ['牛奶', '面包', '鸡蛋', '馒头', '包子', '饼干','牛奶','牛奶']
data2 = {i: [random.choice(arr) for j in range(10)] for i in range(1000000)}
start = time.time()
A = Apriori(support, data2)
A.findFirst()
print(A)
# 给出运行时间
end = time.time()
print("本次运行一共花了%.4f秒" % (end - start))
finally:
print("程序运行完毕!")