基于Python 3.9版本演示
一、写在前面
最近看了一篇在Lancet子刊《eClinicalMedicine》上发表的机器学习分类的文章:《Development of a novel dementia risk prediction model in the general population: A large, longitudinal, population-based machine-learning study》。
学到一种叫做“概率校准”的骚操作,顺手利用GPT系统学习学习。
文章中用的技术是:保序回归(Isotonic regression)。
二、保序回归(Isotonic regression)
保序回归(Isotonic Regression)是一种用于拟合数据的非参数回归技术,其目标是在满足单调性约束(即预测值随着输入值单调变化)的前提下,找到一个最佳拟合模型。保序回归在机器学习中的一个重要应用是对分类模型的概率进行校准,以提高预测概率的准确性。
在机器学习分类任务中,模型输出的概率往往需要进行校准以更准确地反映真实情况。校准后的概率更能代表事件实际发生的可能性,这对于后续决策过程非常重要。
保序回归在概率校准中的具体作用如下:
- 概率校准:分类模型(如逻辑回归、决策树等)输出的概率有时会偏离真实概率分布。通过保序回归,可以对这些概率进行校准,使得校准后的概率更接近真实情况。
- 实现方法:校准过程通常包括两个步骤:计算分类器在不同置信水平下的预测概率与实际标签的匹配程度。使用保序回归拟合这些概率与实际标签的关系,从而得到校准后的概率。
- 好处:保序回归的非参数特性使其可以适应多种数据分布,而无需对数据分布进行假设。此外,保序回归保证了输出概率的单调性,有助于提升校准效果。
在不平衡数据中,保序回归在概率校准方面也发挥着重要作用。由于不平衡数据集中的类别分布不均衡,分类模型可能倾向于预测多数类,这会导致输出概率的不准确性。保序回归可以帮助校准这些概率,提高模型在不平衡数据集上的表现。
三、保序回归(Isotonic regression)代码实现
下面,我编一个1比3的不太平衡的数据进行测试,对照组使用不进行校准的LR模型,实验组就是加入校准的LR模型,看看性能能够提高多少?
(1)不进行校准的LR模型(默认参数)
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
dataset = pd.read_csv('8PSMjianmo.csv')
X = dataset.iloc[:, 1:20].values
Y = dataset.iloc[:, 0].values
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size = 0.30, random_state = 666)
from sklearn.preprocessing import StandardScaler
sc = StandardScaler()
X_train = sc.fit_transform(X_train)
X_test = sc.transform(X_test)
from sklearn.linear_model import LogisticRegression
classifier = LogisticRegression(solver='lbfgs')
classifier.fit(X_train, y_train)
y_pred = classifier.predict(X_test)
y_testprba = classifier.predict_proba(X_test)[:,1]
y_trainpred = classifier.predict(X_train)
y_trainprba = classifier.predict_proba(X_train)[:,1]
from sklearn.metrics import confusion_matrix
cm_test = confusion_matrix(y_test, y_pred)
cm_train = confusion_matrix(y_train, y_trainpred)
print(cm_train)
print(cm_test)
#绘画测试集混淆矩阵
classes = list(set(y_test))
classes.sort()
plt.imshow(cm_test, cmap=plt.cm.Blues)
indices = range(len(cm_test))
plt.xticks(indices, classes)
plt.yticks(indices, classes)
plt.colorbar()
plt.xlabel('guess')
plt.ylabel('fact')
for first_index in range(len(cm_test)):
for second_index in range(len(cm_test[first_index])):
plt.text(first_index, second_index, cm_test[first_index][second_index])
plt.show()
#绘画训练集混淆矩阵
classes = list(set(y_train))
classes.sort()
plt.imshow(cm_train, cmap=plt.cm.Blues)
indices = range(len(cm_train))
plt.xticks(indices, classes)
plt.yticks(indices, classes)
plt.colorbar()
plt.xlabel('guess')
plt.ylabel('fact')
for first_index in range(len(cm_train)):
for second_index in range(len(cm_train[first_index])):
plt.text(first_index, second_index, cm_train[first_index][second_index])
plt.show()
import math
from sklearn.metrics import confusion_matrix,roc_auc_score,auc,roc_curve
cm = confusion_matrix(y_test, y_pred)
cm_train = confusion_matrix(y_train, y_trainpred)
# 计算并打印性能参数
def calculate_metrics(cm, y_true, y_pred_prob):
a = cm[0, 0]
b = cm[0, 1]
c = cm[1, 0]
d = cm[1, 1]
acc = (a + d) / (a + b + c + d)
error_rate = 1 - acc
sen = d / (d + c)
sep = a / (a + b)
precision = d / (b + d)
F1 = (2 * precision * sen) / (precision + sen)
MCC = (d * a - b * c) / (np.sqrt((d + b) * (d + c) * (a + b) * (a + c)))
auc_score = roc_auc_score(y_true, y_pred_prob)
metrics = {
"Accuracy": acc,
"Error Rate": error_rate,
"Sensitivity": sen,
"Specificity": sep,
"Precision": precision,
"F1 Score": F1,
"MCC": MCC,
"AUC": auc_score
}
return metrics
metrics_test = calculate_metrics(cm_test, y_test, y_testprba)
metrics_train = calculate_metrics(cm_train, y_train, y_trainprba)
print("Performance Metrics (Test):")
for key, value in metrics_test.items():
print(f"{key}: {value:.4f}")
print("\nPerformance Metrics (Train):")
for key, value in metrics_train.items():
print(f"{key}: {value:.4f}")
结果输出:
记住这些个数字。
(2)进行校准的LR模型(默认参数)
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.isotonic import IsotonicRegression
from sklearn.metrics import confusion_matrix, brier_score_loss
# 读取数据
dataset = pd.read_csv('8PSMjianmo.csv')
X = dataset.iloc[:, 1:20].values
Y = dataset.iloc[:, 0].values
# 分割数据集
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.30, random_state=666)
# 标准化数据
sc = StandardScaler()
X_train = sc.fit_transform(X_train)
X_test = sc.transform(X_test)
# 训练逻辑回归模型
classifier = LogisticRegression(solver='lbfgs')
classifier.fit(X_train, y_train)
# 获取训练集和测试集的预测概率
y_train_proba = classifier.predict_proba(X_train)[:, 1]
y_test_proba = classifier.predict_proba(X_test)[:, 1]
# 使用等渗回归进行概率校准
iso_reg = IsotonicRegression(out_of_bounds='clip')
iso_reg.fit(y_train_proba, y_train)
y_train_calibrated = iso_reg.transform(y_train_proba)
y_test_calibrated = iso_reg.transform(y_test_proba)
# 使用校准后的概率进行预测
y_train_pred_calibrated = (y_train_calibrated > 0.5).astype(int)
y_test_pred_calibrated = (y_test_calibrated > 0.5).astype(int)
# 计算混淆矩阵
cm_train = confusion_matrix(y_train, y_train_pred_calibrated)
cm_test = confusion_matrix(y_test, y_test_pred_calibrated)
# 打印结果
print("训练集混淆矩阵:")
print(cm_train)
print("测试集混淆矩阵:")
print(cm_test)
# 计算并打印Brier分数(用于衡量校准效果)
brier_score_train = brier_score_loss(y_train, y_train_calibrated)
brier_score_test = brier_score_loss(y_test, y_test_calibrated)
print("训练集Brier分数:", brier_score_train)
print("测试集Brier分数:", brier_score_test)
#绘画测试集混淆矩阵
classes = list(set(y_test))
classes.sort()
plt.imshow(cm_test, cmap=plt.cm.Blues)
indices = range(len(cm_test))
plt.xticks(indices, classes)
plt.yticks(indices, classes)
plt.colorbar()
plt.xlabel('guess')
plt.ylabel('fact')
for first_index in range(len(cm_test)):
for second_index in range(len(cm_test[first_index])):
plt.text(first_index, second_index, cm_test[first_index][second_index])
plt.show()
#绘画训练集混淆矩阵
classes = list(set(y_train))
classes.sort()
plt.imshow(cm_train, cmap=plt.cm.Blues)
indices = range(len(cm_train))
plt.xticks(indices, classes)
plt.yticks(indices, classes)
plt.colorbar()
plt.xlabel('guess')
plt.ylabel('fact')
for first_index in range(len(cm_train)):
for second_index in range(len(cm_train[first_index])):
plt.text(first_index, second_index, cm_train[first_index][second_index])
plt.show()
import math
from sklearn.metrics import confusion_matrix,roc_auc_score,auc,roc_curve
# 计算各项性能指标
def calculate_metrics(cm, y_true, y_proba):
a = cm[0, 0]
b = cm[0, 1]
c = cm[1, 0]
d = cm[1, 1]
acc = (a + d) / (a + b + c + d)
error_rate = 1 - acc
sen = d / (d + c)
sep = a / (a + b)
precision = d / (b + d)
F1 = (2 * precision * sen) / (precision + sen)
MCC = (d * a - b * c) / (math.sqrt((d + b) * (d + c) * (a + b) * (a + c)))
auc = roc_auc_score(y_true, y_proba)
return acc, error_rate, sen, sep, precision, F1, MCC, auc
# 测试集指标
test_metrics = calculate_metrics(cm_test, y_test, y_test_calibrated)
# 训练集指标
train_metrics = calculate_metrics(cm_train, y_train, y_train_calibrated)
# 打印结果
print("训练集混淆矩阵:")
print(cm_train)
print("测试集混淆矩阵:")
print(cm_test)
print("训练集指标:")
print(f"准确率: {train_metrics[0]:.4f}, 错误率: {train_metrics[1]:.4f}, 敏感度: {train_metrics[2]:.4f}, 特异度: {train_metrics[3]:.4f}, 精确度: {train_metrics[4]:.4f}, F1分数: {train_metrics[5]:.4f}, MCC: {train_metrics[6]:.4f}, AUC: {train_metrics[7]:.4f}")
print("测试集指标:")
print(f"准确率: {test_metrics[0]:.4f}, 错误率: {test_metrics[1]:.4f}, 敏感度: {test_metrics[2]:.4f}, 特异度: {test_metrics[3]:.4f}, 精确度: {test_metrics[4]:.4f}, F1分数: {test_metrics[5]:.4f}, MCC: {test_metrics[6]:.4f}, AUC: {test_metrics[7]:.4f}")
看看结果:
总体来看,作用不是太大呢。训练集的AUC高了0.2,但是验证集的AUC又低了0.2,真能量守恒定律。
四、换个策略
参考那篇文章的策略:采用五折交叉验证来建立和评估模型,其中四折用于训练,一折用于评估,在训练集中,其中三折用于建立LR模型,另一折采用保序回归(Isotonic regression)进行概率校正,在训练集内部采用交叉验证对超参数进行调参。
代码:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.model_selection import train_test_split, KFold, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.isotonic import IsotonicRegression
from sklearn.metrics import confusion_matrix, roc_auc_score, brier_score_loss, roc_curve
# 读取数据
dataset = pd.read_csv('8PSMjianmo.csv')
X = dataset.iloc[:, 1:20].values
Y = dataset.iloc[:, 0].values
# 分割数据集
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.30, random_state=666)
# 标准化数据
sc = StandardScaler()
X_train = sc.fit_transform(X_train)
X_test = sc.transform(X_test)
# 计算并打印性能参数
def calculate_metrics(cm, y_true, y_proba):
a = cm[0, 0]
b = cm[0, 1]
c = cm[1, 0]
d = cm[1, 1]
acc = (a + d) / (a + b + c + d)
error_rate = 1 - acc
sen = d / (d + c)
sep = a / (a + b)
precision = d / (b + d)
F1 = (2 * precision * sen) / (precision + sen)
MCC = (d * a - b * c) / (np.sqrt((d + b) * (d + c) * (a + b) * (a + c)))
auc = roc_auc_score(y_true, y_proba)
metrics = {
"Accuracy": acc,
"Error Rate": error_rate,
"Sensitivity": sen,
"Specificity": sep,
"Precision": precision,
"F1 Score": F1,
"MCC": MCC,
"AUC": auc
}
return metrics
# 五折交叉验证
kf = KFold(n_splits=5, shuffle=True, random_state=666)
test_metrics_list = []
train_metrics_list = []
best_params = None
for train_index, val_index in kf.split(X_train):
X_train_fold, X_val_fold = X_train[train_index], X_train[val_index]
y_train_fold, y_val_fold = y_train[train_index], y_train[val_index]
# 再次划分训练集和验证集,用于等渗回归
inner_kf = KFold(n_splits=4, shuffle=True, random_state=666)
for inner_train_index, inner_calib_index in inner_kf.split(X_train_fold):
X_inner_train, X_inner_calib = X_train_fold[inner_train_index], X_train_fold[inner_calib_index]
y_inner_train, y_inner_calib = y_train_fold[inner_train_index], y_train_fold[inner_calib_index]
# 超参数调优
param_grid = {
'C': [0.01, 0.1, 1, 10, 100],
'solver': ['liblinear', 'lbfgs']
}
grid_search = GridSearchCV(LogisticRegression(random_state=666, max_iter=10000), param_grid, cv=3, n_jobs=-1, scoring='roc_auc')
grid_search.fit(X_inner_train, y_inner_train)
best_lr = grid_search.best_estimator_
best_params = grid_search.best_params_
# 训练最佳模型
best_lr.fit(X_inner_train, y_inner_train)
y_inner_train_proba = best_lr.predict_proba(X_inner_train)[:, 1]
y_inner_calib_proba = best_lr.predict_proba(X_inner_calib)[:, 1]
# 使用等渗回归进行概率校准
iso_reg = IsotonicRegression(out_of_bounds='clip')
iso_reg.fit(y_inner_calib_proba, y_inner_calib)
y_inner_train_calibrated = iso_reg.transform(y_inner_train_proba)
y_inner_calib_calibrated = iso_reg.transform(y_inner_calib_proba)
# 使用校准后的概率进行预测
y_inner_train_pred_calibrated = (y_inner_train_calibrated > 0.5).astype(int)
y_inner_calib_pred_calibrated = (y_inner_calib_calibrated > 0.5).astype(int)
# 计算混淆矩阵
cm_inner_train = confusion_matrix(y_inner_train, y_inner_train_pred_calibrated)
cm_inner_calib = confusion_matrix(y_inner_calib, y_inner_calib_pred_calibrated)
# 计算性能指标
train_metrics = calculate_metrics(cm_inner_train, y_inner_train, y_inner_train_calibrated)
test_metrics = calculate_metrics(cm_inner_calib, y_inner_calib, y_inner_calib_calibrated)
train_metrics_list.append(train_metrics)
test_metrics_list.append(test_metrics)
# 打印训练集和测试集的性能指标
print("Inner Fold Training Metrics:")
for key, value in train_metrics.items():
print(f"{key}: {value:.4f}")
print("Inner Fold Test Metrics:")
for key, value in test_metrics.items():
print(f"{key}: {value:.4f}")
# 计算并打印整体性能指标
avg_train_metrics = {key: np.mean([metrics[key] for metrics in train_metrics_list]) for key in train_metrics_list[0].keys()}
avg_test_metrics = {key: np.mean([metrics[key] for metrics in test_metrics_list]) for key in test_metrics_list[0].keys()}
print("Average Training Metrics:")
for key, value in avg_train_metrics.items():
print(f"{key}: {value:.4f}")
print("Average Test Metrics:")
for key, value in avg_test_metrics.items():
print(f"{key}: {value:.4f}")
# 打印最优参数
print("Best Logistic Regression Parameters:")
print(best_params)
# 使用校准后的模型预测测试集
best_lr.fit(X_train, y_train)
y_train_proba = best_lr.predict_proba(X_train)[:, 1]
y_test_proba = best_lr.predict_proba(X_test)[:, 1]
y_train_calibrated = iso_reg.transform(y_train_proba)
y_test_calibrated = iso_reg.transform(y_test_proba)
y_train_pred_calibrated = (y_train_calibrated > 0.5).astype(int)
y_test_pred_calibrated = (y_test_calibrated > 0.5).astype(int)
# 计算训练集和测试集混淆矩阵
cm_train = confusion_matrix(y_train, y_train_pred_calibrated)
cm_test = confusion_matrix(y_test, y_test_pred_calibrated)
print("训练集混淆矩阵:")
print(cm_train)
print("测试集混淆矩阵:")
print(cm_test)
# 计算并打印Brier分数(用于衡量校准效果)
brier_score_train = brier_score_loss(y_train, y_train_calibrated)
brier_score_test = brier_score_loss(y_test, y_test_calibrated)
print("训练集Brier分数:", brier_score_train)
print("测试集Brier分数:", brier_score_test)
# 计算训练集和测试集性能指标
train_metrics_final = calculate_metrics(cm_train, y_train, y_train_calibrated)
test_metrics_final = calculate_metrics(cm_test, y_test, y_test_calibrated)
print("Train Set Performance Metrics:")
for key, value in train_metrics_final.items():
print(f"{key}: {value:.4f}")
print("Test Set Performance Metrics:")
for key, value in test_metrics_final.items():
print(f"{key}: {value:.4f}")
# 绘制训练集和测试集ROC曲线
fpr_train, tpr_train, thresholds_train = roc_curve(y_train, y_train_calibrated, pos_label=1, drop_intermediate=False)
fpr_test, tpr_test, thresholds_test = roc_curve(y_test, y_test_calibrated, pos_label=1, drop_intermediate=False)
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.plot([0, 1], [0, 1], '--', color='navy')
plt.plot(fpr_train, tpr_train, 'k--', label='Mean ROC (area = {0:.4f})'.format(train_metrics_final['AUC']), lw=2, color='darkorange')
plt.xlim([-0.01, 1.01])
plt.ylim([-0.01, 1.01])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve for Training Set')
plt.legend(loc="lower right")
plt.subplot(1, 2, 2)
plt.plot([0, 1], [0, 1], '--', color='navy')
plt.plot(fpr_test, tpr_test, 'k--', label='Mean ROC (area = {0:.4f})'.format(test_metrics_final['AUC']), lw=2, color='darkorange')
plt.xlim([-0.01, 1.01])
plt.ylim([-0.01, 1.01])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve for Test Set')
plt.legend(loc="lower right")
plt.tight_layout()
plt.show()
输出:
有点用,但不多。
五、最后
各位可以去试一试在其他数据或者在其他机器学习分类模型中使用的效果。
数据不分享啦。