山东大学机器学习实验lab9 决策树
- 所有公式来源于
<<机器学习>>周志华
- github上有
.ipynb
源文件
修改:
- 2024 5.15 添加了一些Node属性,用于标记每个Node使用的划分feature的名称,修改后的版本见 github
Node
类
- 构造函数 初始化决策树的节点,包含节点ID、特征数据、特征ID、划分阈值、标签、父节点ID、子节点列表以及节点所属的类别
set_sons
设置节点的子节点列表judge_stop
判断是否停止继续生成节点条件包括- 所有样本属于同一类别
- 所有样本在所有属性上取值相同
- 节点对应的样本集合为空
DecisionTree
类
- 构造函数 初始化决策树,包括根节点列表、节点ID计数器
ent
计算信息熵,衡量数据集纯度gain_single_feature
对于连续属性,寻找最佳划分点以最大化信息增益gain_rate
计算所有特征的信息增益率,选择信息增益率最高的特征作为分裂依据generate_node
递归生成子节点,根据特征的最佳分裂点划分数据集,直到满足停止条件train
从根节点开始构建决策树predict
给定数据,根据决策树进行分类预测
Data
类
- 构造函数 加载数据集,初始化特征和标签
get_k_exmaine
实现K折交叉验证的数据切分,返回K个特征和标签组
整体流程
- 数据预处理:使用
Data
类读取数据文件,进行K折交叉验证的数据切分 - 模型训练与评估:对每一份测试数据,结合其余K-1份数据进行训练,使用
DecisionTree
类构建决策树模型,然后对测试集进行预测,计算正确率 - 结果展示:收集每一次交叉验证的正确率,最后计算并输出平均正确率
代码执行流程
- 首先,
Data
类加载数据并进行K折交叉验证数据分割 - 接着,对于每个验证折,训练数据被合并以训练决策树模型,然后在对应的测试数据上进行预测
- 对每个测试样本,通过遍历决策树找到其所属类别,并与实际标签对比,累计正确预测的数量
- 计算并打印每次验证的正确率,最后计算并输出所有折的平均正确率,以此评估模型的泛化能力
代码以及运行结果展示
import numpy as np
import matplotlib.pyplot as plt
import math
class Node():
def __init__(self,id_,features_,feature_id_,divide_,labels_,father_,sons_=[]):
self.divide=divide_
self.feature_id=feature_id_
self.id=id_
self.feature=features_
self.labels=labels_
self.father=father_
self.sons=sons_
self.clas='None'
def set_sons(self,sons_):
self.sons=sons_
def judge_stop(self):
labels=np.unique(self.labels)
#如果节点样本属于同一类别
if(labels.shape[0]==1):
self.clas=labels[0]
return True
features=np.unique(self.feature)
#如果所有样本在所有属性上取值相同
if(features.shape[0]==1):
unique_values, counts = np.unique(labels, return_counts=True)
self.clas = unique_values[counts.argmax()]
return True
#如果对应的样本集合为空
if(self.feature.shape[0]==0 or self.feature.shape[1]==0):
self.clas=1
return True
return False
class DecisionTree():
def __init__(self):
self.tree=[]
self.id=0
pass
#计算信息熵
def ent(self,labels):
labels_s=list(set([labels[i,0] for i in range(labels.shape[0])]))
ans=0
for label in labels_s:
num=np.sum(labels==label)
p=num/labels.shape[0]
ans-=p*math.log(p,2)
return ans
#计算一个标签对应的最佳分界(连续值)
def gain_single_feature(self,feature,labels):
origin_ent=self.ent(labels)
divide_edge=[]
feature=list(set(feature))
feature=np.sort(feature)
divide_edge=[(feature[i]+feature[i+1])/2.0 for i in range(feature.shape[0]-1)]
best_ent=0
best_divide=0
l1=l2=np.array([[]])
for condition in divide_edge:
labels1=np.array([labels[i] for i in range(feature.shape[0]) if feature[i]<=condition])
labels2=np.array([labels[i] for i in range(feature.shape[0]) if feature[i]>condition])
ent1=self.ent(labels1)
ent2=self.ent(labels2)
ans=origin_ent-((labels1.shape[0]/labels.shape[0])*ent1+(labels2.shape[0]/labels.shape[0])*ent2)
if(ans>=best_ent):
best_divide=condition
l1=labels1
l2=labels2
best_ent=ans
return best_divide,l1,l2,best_ent
#计算信息增益
def gain_rate(self,features,labels):
origin_ent=self.ent(labels)
gain_rate=0
feature_id=-1
divide=-1
l=labels.shape[0]
for id in range(features.shape[1]):
divide1,labels1,labels2,th_gain=self.gain_single_feature(features[:,id],labels)
l1=labels1.shape[0]
l2=labels2.shape[0]
iv=-1*((l1/l)*math.log(l1/l,2)+(l2/l)*math.log(l2/l,2))
if iv!=0:
rate=th_gain/iv
else:
rate=0
if(rate>=gain_rate):
gain_rate=rate
divide=divide1
feature_id=id
return feature_id,divide
def generate_node(self,node:Node):
a=1
features1_id=np.array([i for i in range(node.feature.shape[0]) if node.feature[i,node.feature_id]>=node.divide])
features2_id=np.array([i for i in range(node.feature.shape[0]) if node.feature[i,node.feature_id]<node.divide])
features1=node.feature[features1_id]
features2=node.feature[features2_id]
labels1=node.labels[features1_id]
labels2=node.labels[features2_id]
features1=np.delete(features1,node.feature_id,axis=1)
features2=np.delete(features2,node.feature_id,axis=1)
features_id1,divide1=self.gain_rate(features1,labels1)
features_id2,divide2=self.gain_rate(features2,labels2)
tmp=0
if(features_id1!=-1):
tmp+=1
node1=Node(self.id+tmp,features1,features_id1,divide1,labels1,node.id,[])
node1.father=node.id
self.tree.append(node1)
node.sons.append(self.id+tmp)
if(features_id2!=-1):
tmp+=1
node2=Node(self.id+tmp,features2,features_id2,divide2,labels2,node.id,[])
node2.father=node.id
self.tree.append(node2)
node.sons.append(self.id+tmp)
self.id+=tmp
if(tmp==0):
unique_values, counts = np.unique(node.labels, return_counts=True)
node.clas = 0 if counts[0]>counts[1] else 1
return
for n in [self.tree[i] for i in node.sons]:
if(n.judge_stop()):
continue
else:
self.generate_node(n)
def train(self,features,labels):
feature_id,divide=self.gain_rate(features,labels)
root=Node(0,features,feature_id,divide,labels,-1,[])
self.tree.append(root)
self.generate_node(root)
def predict(self,features):
re=[]
for feature in features:
node=self.tree[0]
while(node.clas=='None'):
th_feature=feature[node.feature_id]
feature=np.delete(feature,node.feature_id,axis=0)
th_divide=node.divide
if(node.clas!='None'):
break
if(th_feature<th_divide):
node=self.tree[node.sons[len(node.sons)-1]]
else:
node=self.tree[node.sons[0]]
re.append(node.clas)
return re
class Data():
def __init__(self):
self.data=np.loadtxt('/home/wangxv/Files/hw/ml/lab9/data/ex6Data/ex6Data.txt',delimiter=',')
self.data_num=self.data.shape[0]
self.features=self.data[:,:-1]
self.labels=self.data[:,-1:]
def get_k_exmaine(self,k:int):
num=int(self.data_num/k)
data=self.data
np.random.shuffle(self.data)
features=data[:,:-1]
labels=self.data[:,-1:]
feature_groups=[features[i:i+num-1] for i in np.linspace(0,self.data_num,k+1,dtype=int)[:-1]]
labels_groups=[labels[i:i+num-1] for i in np.linspace(0,self.data_num,k+1,dtype=int)[:-1]]
return feature_groups,labels_groups
data=Data()
feature_groups,label_groups=data.get_k_exmaine(10)
rate_set=[]
for ind in range(10):
dt=DecisionTree()
features_=[feature_groups[i] for i in range(10) if i!=ind]
labels_=[label_groups[i] for i in range(10) if i!=ind]
train_features=features_[0]
train_labels=labels_[0]
for feature,label in zip(features_[1:],labels_[1:]):
train_features=np.vstack((train_features,feature))
train_labels=np.vstack((train_labels,label))
test_features=feature_groups[ind]
test_labels=label_groups[ind]
dt.train(train_features,train_labels)
pred_re=dt.predict(test_features)
right_num=0
for i in range(len(pred_re)):
if pred_re[i]==test_labels[i][0]:
right_num+=1
right_rate=right_num/len(pred_re)
print(str(ind+1)+' correct_rate : '+str(right_rate))
rate_set.append(right_rate)
print("average_rate : "+str(np.mean(np.array(rate_set))))
1 correct_rate : 0.7930327868852459
2 correct_rate : 0.8032786885245902
3 correct_rate : 0.8012295081967213
4 correct_rate : 0.7848360655737705
5 correct_rate : 0.7889344262295082
6 correct_rate : 0.7909836065573771
7 correct_rate : 0.7827868852459017
8 correct_rate : 0.7766393442622951
9 correct_rate : 0.8012295081967213
10 correct_rate : 0.7725409836065574
average_rate : 0.7895491803278689
最终得到的平均正确率为0.7895>0.78符合实验书要求
可视化
- 需要将自定义的
Node
类转换为可识别的数据格式
from graphviz import Digraph
def show_tree(root_node, dot=None):
if dot is None:
dot = Digraph(comment='decision_tree')
node_label = f"{root_node.id} [label=\"feature {root_node.feature_id}: {root_node.divide} | class: {root_node.clas}\"]"
dot.node(str(root_node.id), node_label)
if root_node.sons:
for son_id in root_node.sons:
dot.edge(str(root_node.id), str(son_id))
visualize_tree(dt.tree[son_id], dot)
return dot
root_node=dt.tree[0]
dot = show_tree(root_node)
dot.render('decision_tree', view=True,format='png')
from PIL import Image
Image.open('./decision_tree.png')
- 这个决策树图有点过于大了,凑合看吧