这篇文章说实在的,写的差强人意。
实质性内容是两个现有方法的拼凑!
讲的模模糊糊!对于复现代码不太友好!
撸一点,通读一遍 ,再撸一点,通读一遍~~~
"""
注意:使用了训练集索引。
"""
import xlwt
import xlrd
import numpy as np
import pandas as pd
from pathlib import Path
from copy import deepcopy
from sklearn.preprocessing import StandardScaler
from time import time
from sklearn.metrics.pairwise import pairwise_distances
from numpy.linalg import inv
from sklearn.metrics import accuracy_score, mean_absolute_error, f1_score, mutual_info_score
from sklearn.neighbors import NearestNeighbors
np.seterr(divide='ignore',invalid='ignore')
class AL_ALBL():
def __init__(self,X, y, labeled, budget, X_test, y_test):
self.X = X
self.y = y
self.nSample, self.nDim = X.shape
self.labels = sorted(np.unique(self.y))
self.nClass = len(self.labels)
self.M = np.array([[(i - j) ** 2 for i in range(self.nClass)] for j in range(self.nClass)])
self.X_test = X_test
self.y_test = y_test
self.labeled = list(deepcopy(labeled))
self.dist_matrix = pairwise_distances(X=self.X, metric='euclidean')
self.K = -1.0 * self.dist_matrix # 无标记样本池对应的核矩阵
self.K_test_pool = -1.0 * pairwise_distances(X=self.X_test, Y=self.X, metric='euclidean')
# -------------------------------------------------
self.budgetLeft = deepcopy(budget)
self.c = 0.01
self.unlabeled = [i for i in range(self.nSample)]
self.model_initial()
# ------------------------------
self._nstrategies = 2
self._delta = 0.1
self._w = np.ones(self._nstrategies)
self._pmin = 1.0 / (self._nstrategies * 10.0)
self._start = True
self._aw = np.zeros(self.nSample)
self._aw[self.labeled] = 1.0
self._s_idx = None
# -------------------------------
self._pmin = 1.0 / (self._nstrategies * 10.0)
self.PS = np.array([0.5,0.5])
self.phi = np.zeros((2, self.nSample))
self.Q = np.zeros(self.nSample)
self.T = deepcopy(budget)
self.hat_y_matrix = np.zeros((self.T, self.nSample))
self.Wt = np.zeros((self.T, self.nSample))
self.reward = 0.0
self._nT = 1/ (self.nSample * self.T)
# -------------------------------
self.ACClist = []
self.MZElist = []
self.MAElist = []
self.F1list = []
self.MIlist = []
self.ALC_ACC = 0.0
self.ALC_MZE = 0.0
self.ALC_MAE = 0.0
self.ALC_F1 = 0.0
self.ALC_MI = 0.0
self.Redundancy = 0.0
# -------------------------------
def model_initial(self):
self.T_labeled = self.M[self.y[self.labeled],:]
self.K_labeled = self.K[np.ix_(self.labeled, self.labeled)]
self.K_labeled_inv = inv(self.c * np.eye(len(self.labeled)) + self.K_labeled)
self.Beta = self.K_labeled_inv @ self.T_labeled
# -----------------------------
for idx in self.labeled:
self.unlabeled.remove(idx)
return self
def Block_Matrix_Inverse(self, A11_inv, A12, A21, A22):
n = A11_inv.shape[0]
m = A22.shape[0]
M = np.zeros((m+n, m+n))
B22 = inv(A22 - A21 @ A11_inv @ A12)
B12 = -A11_inv @ A12 @ B22
M[n:,n:] = B22
M[:n,:n] = A11_inv - B12 @ (A21 @ A11_inv)
M[:n,n:] = B12
M[n:,:n] = -B22 @ A21 @ A11_inv
return M
def model_incremental_train(self, new_ids):
A12 = self.K[np.ix_(self.labeled, new_ids)]
A22 = self.K[np.ix_(new_ids, new_ids)] + self.c * np.eye(len(new_ids))
K_bar_inv = self.Block_Matrix_Inverse(A11_inv=self.K_labeled_inv, A12=A12, A21=A12.T, A22=A22)
T_bar = np.vstack((self.T_labeled, self.M[self.y[new_ids],:]))
Beta_bar = K_bar_inv @ T_bar
# --------------------------
self.K_labeled_inv = K_bar_inv
self.T_labeled = T_bar
self.Beta = Beta_bar
return self
def tmp_incremental_train(self, tmp_idx):
A12 = self.K[np.ix_(self.labeled, [tmp_idx])]
A22 = self.K[np.ix_([tmp_idx], [tmp_idx])] + self.c * np.eye(1)
K_bar_inv = self.Block_Matrix_Inverse(A11_inv=self.K_labeled_inv, A12=A12, A21=A12.T, A22=A22)
return K_bar_inv
def predict_proba(self, ids):
K_test_labeled = self.K[np.ix_(ids, self.labeled)]
output = K_test_labeled @ self.Beta
predictions = np.linalg.norm(output[:, None] - self.M, axis=2, ord=1)
predictions = -predictions
predictions = np.exp(predictions)
predictions_sum = np.sum(predictions, axis=1, keepdims=True)
proba_matrix = predictions / predictions_sum
return proba_matrix
def predict(self, ids):
K_test_labeled = self.K[np.ix_(ids, self.labeled)]
output = K_test_labeled @ self.Beta
predictions = np.argmin(np.linalg.norm(output[:, None] - self.M, axis=2, ord=1), axis=1)
return predictions
def get_EC(self, ids):
K_test_labeled = self.K[np.ix_(ids, self.labeled)]
output = K_test_labeled @ self.Beta
ids_norm1 = np.linalg.norm(output[:, None] - self.M, axis=2, ord=1)
predictions = -1.0 * deepcopy(ids_norm1)
predictions = np.exp(predictions)
predictions_sum = np.sum(predictions, axis=1, keepdims=True)
proba_matrix = predictions / predictions_sum
return np.sum(ids_norm1 * proba_matrix, axis=1)
def evaluation(self):
output = self.K_test_pool[:,self.labeled] @ self.Beta
y_hat = np.argmin(np.linalg.norm(output[:, None] - self.M, axis=2, ord=1), axis=1)
self.ACClist.append(accuracy_score(self.y_test, y_hat))
self.MZElist.append(1-accuracy_score(self.y_test, y_hat))
self.MAElist.append(mean_absolute_error(self.y_test, y_hat))
self.F1list.append(f1_score(self.y_test, y_hat, average='macro'))
self.MIlist.append(mutual_info_score(labels_true=self.y_test, labels_pred=y_hat))
self.ALC_ACC += self.ACClist[-1]
self.ALC_MZE += self.MZElist[-1]
self.ALC_MAE += self.MAElist[-1]
self.ALC_F1 += self.F1list[-1]
self.ALC_MI += self.MIlist[-1]
def SoftMax(self,value_list):
exp_values = np.exp(value_list)
sum_exp_values = np.sum(exp_values)
return exp_values / sum_exp_values
def select(self):
self.evaluation()
t = 0 #迭代次数
while self.budgetLeft > 0:
if not self._start:
self._w[self._s_idx] *= np.exp(self._pmin / 2.0 * (self.reward + 1.0 / self.last_p * np.sqrt(np.log(self._nstrategies / self._delta) / self._nstrategies)))
self._start = False
W = self._w.sum()
p = (1.0 - self._nstrategies * self._pmin) * self._w / W + self._pmin
s_idx = np.random.choice(np.arange(self._nstrategies), p=p)
tar_idx = None
if s_idx == 0:
print("Div")
# ----------------Diversity sampling criterion
dist_D_L = self.dist_matrix[np.ix_(np.arange(self.nSample), self.labeled)]
Div = np.min(dist_D_L, axis=1)
tar_idx = np.argmax(Div)
elif s_idx == 1:
print("Expected misclassification cost")
# ----------------Least Confidence criterion
EC = self.get_EC(ids=np.arange(self.nSample))
tar_idx = np.argmax(EC)
# proba_matrix = self.predict_proba(ids=np.arange(self.nSample))
# proba_max = np.max(proba_matrix, axis=1)
# tar_idx = np.argmin(proba_max)
# ==========================================
self.last_p = p[s_idx]
# ==========================================
if tar_idx in self.labeled:
"""不用更新模型"""
"""计算奖励"""
hat_y = self.predict(ids=[tar_idx])
if hat_y == self.y[tar_idx]:
self.reward += self._nT / p[s_idx]
elif tar_idx not in self.labeled:
"""更新模型"""
self.model_incremental_train(new_ids=[tar_idx])
self.unlabeled.remove(tar_idx)
self.labeled.append(tar_idx)
self.budgetLeft -= 1
self.evaluation()
"""计算奖励"""
hat_y = self.predict(ids=[tar_idx])
if hat_y == self.y[tar_idx]:
self.reward += self._nT / p[s_idx] # the IW-ACC
# -----------------------------------------------
t += 1 #迭代次数加一
neigh = NearestNeighbors(n_neighbors=1)
neigh.fit(X=self.X[self.labeled])
self.Redundancy = (1/np.mean(neigh.kneighbors()[0].flatten()))
if __name__ == '__main__':
# name_list = ["Balance-scale","Toy","Cleveland","Knowledge","Glass",
# "Melanoma","Housing-5bin","Housing-10bin","Car"]
name_list = ["Student","Balance-scale","Newthyroid","CTGs","Knowledge","Car","Nursery",
"Toy","Melanoma","Eucalyptus","Glass","Obesity1","stock-10bin","Computer-10bin"]
class results():
def __init__(self):
self.ACCList = []
self.MZEList = []
self.MAEList = []
self.F1List = []
self.MIList = []
self.ALC_ACC = []
self.ALC_MZE = []
self.ALC_MAE = []
self.ALC_F1 = []
self.ALC_MI = []
self.Redun = []
class stores():
def __init__(self):
self.num_labeled_mean = []
self.num_labeled_std = []
#-----------------------
self.ACCList_mean = []
self.ACCList_std = []
#-----------------------
self.MZEList_mean = []
self.MZEList_std = []
# -----------------
self.MAEList_mean = []
self.MAEList_std = []
# -----------------
self.F1List_mean = []
self.F1List_std = []
# -----------------
self.MIList_mean = []
self.MIList_std = []
# -----------------
self.ALC_ACC_mean = []
self.ALC_ACC_std = []
# -----------------
self.ALC_MZE_mean = []
self.ALC_MZE_std = []
# -----------------
self.ALC_MAE_mean = []
self.ALC_MAE_std = []
# -----------------
self.ALC_F1_mean = []
self.ALC_F1_std = []
# -----------------
self.ALC_MI_mean = []
self.ALC_MI_std = []
# -----------------
self.ALC_ACC_list = []
self.ALC_MZE_list = []
self.ALC_MAE_list = []
self.ALC_F1_list = []
self.ALC_MI_list = []
# -----------------
self.Redun_list = []#TODO
self.Redun_mean = []#TODO
self.Redun_std = []#TODO
for name in name_list:
print("########################{}".format(name))
data_path = Path("D:\Chapter1\DataSet")
partition_path = Path(r"D:\Chapter1\Partition")
"""--------------read the whole data--------------------"""
read_data_path = data_path.joinpath(name + ".csv")
data = np.array(pd.read_csv(read_data_path, header=None))
X = np.asarray(data[:, :-1], np.float64)
scaler = StandardScaler()
X = scaler.fit_transform(X)
y = data[:, -1]
y -= y.min()
dist_matrix = pairwise_distances(X=X, metric="euclidean")
nClass = len(np.unique(y))
nSample = len(y)
Budget = 25 * nClass
"""--------read the partitions--------"""
read_partition_path = str(partition_path.joinpath(name + ".xls"))
book_partition = xlrd.open_workbook(read_partition_path)
workbook = xlwt.Workbook()
count = 0
# --------------------------------------
RESULT = results()
STORE = stores()
# --------------------------------------
for SN in book_partition.sheet_names():
print("================{}".format(SN))
S_Time = time()
train_ids = []
test_ids = []
labeled = []
table_partition = book_partition.sheet_by_name(SN)
for idx in table_partition.col_values(0):
if isinstance(idx,float):
train_ids.append(int(idx))
for idx in table_partition.col_values(1):
if isinstance(idx,float):
test_ids.append(int(idx))
for idx in table_partition.col_values(2):
if isinstance(idx,float):
labeled.append(int(idx))
X_train = X[train_ids]
y_train = y[train_ids].astype(np.int32)
X_test = X[test_ids]
y_test = y[test_ids]
model = AL_ALBL(X=X_train, y=y_train, labeled=labeled, budget=Budget, X_test=X_test, y_test=y_test)
model.select()
RESULT.ACCList.append(model.ACClist)
RESULT.MZEList.append(model.MZElist)
RESULT.MAEList.append(model.MAElist)
RESULT.F1List.append(model.F1list)
RESULT.MIList.append(model.MIlist)
RESULT.ALC_ACC.append(model.ALC_ACC)
RESULT.ALC_MZE.append(model.ALC_MZE)
RESULT.ALC_MAE.append(model.ALC_MAE)
RESULT.ALC_F1.append(model.ALC_F1)
RESULT.ALC_MI.append(model.ALC_MI)
RESULT.Redun.append(model.Redundancy) # TODO
print("SN===",SN, "time:",time()-S_Time)
STORE.ACCList_mean = np.mean(RESULT.ACCList, axis=0)
STORE.ACCList_std = np.std(RESULT.ACCList, axis=0)
STORE.MZEList_mean = np.mean(RESULT.MZEList, axis=0)
STORE.MZEList_std = np.std(RESULT.MZEList, axis=0)
STORE.MAEList_mean = np.mean(RESULT.MAEList, axis=0)
STORE.MAEList_std = np.std(RESULT.MAEList, axis=0)
STORE.F1List_mean = np.mean(RESULT.F1List, axis=0)
STORE.F1List_std = np.std(RESULT.F1List, axis=0)
STORE.MIList_mean = np.mean(RESULT.MIList, axis=0)
STORE.MIList_std = np.std(RESULT.MIList, axis=0)
STORE.ALC_ACC_mean = np.mean(RESULT.ALC_ACC)
STORE.ALC_ACC_std = np.std(RESULT.ALC_ACC)
STORE.ALC_MZE_mean = np.mean(RESULT.ALC_MZE)
STORE.ALC_MZE_std = np.std(RESULT.ALC_MZE)
STORE.ALC_MAE_mean = np.mean(RESULT.ALC_MAE)
STORE.ALC_MAE_std = np.std(RESULT.ALC_MAE)
STORE.ALC_F1_mean = np.mean(RESULT.ALC_F1)
STORE.ALC_F1_std = np.std(RESULT.ALC_F1)
STORE.ALC_MI_mean = np.mean(RESULT.ALC_MI)
STORE.ALC_MI_std = np.std(RESULT.ALC_MI)
STORE.ALC_ACC_list = RESULT.ALC_ACC
STORE.ALC_MZE_list = RESULT.ALC_MZE
STORE.ALC_MAE_list = RESULT.ALC_MAE
STORE.ALC_F1_list = RESULT.ALC_F1
STORE.ALC_MI_list = RESULT.ALC_MI
STORE.Redun_list = RESULT.Redun # TODO
STORE.Redun_mean = np.mean(RESULT.Redun)# TODO
STORE.Redun_std = np.std(RESULT.Redun)# TODO
sheet_names = ["ACC","MZE","MAE","F1","MI",
"ALC_ACC_list","ALC_MZE_list","ALC_MAE_list","ALC_F1_list","ALC_MI_list",
"ALC_ACC", "ALC_MZE","ALC_MAE", "ALC_F1", "ALC_MI",
"Redun_list","Redun"]
workbook = xlwt.Workbook()
for sn in sheet_names:
print("sn::",sn)
sheet = workbook.add_sheet(sn)
n_col = len(STORE.MZEList_mean)
if sn == "ACC":
sheet.write(0, 0, sn)
for j in range(1,n_col + 1):
sheet.write(j,0,STORE.ACCList_mean[j - 1])
sheet.write(j,1,STORE.ACCList_std[j - 1])
elif sn == "MZE":
sheet.write(0, 0, sn)
for j in range(1,n_col + 1):
sheet.write(j,0,STORE.MZEList_mean[j - 1])
sheet.write(j,1,STORE.MZEList_std[j - 1])
elif sn == "MAE":
sheet.write(0, 0, sn)
for j in range(1,n_col + 1):
sheet.write(j,0,STORE.MAEList_mean[j - 1])
sheet.write(j,1,STORE.MAEList_std[j - 1])
elif sn == "F1":
sheet.write(0, 0, sn)
for j in range(1,n_col + 1):
sheet.write(j,0,STORE.F1List_mean[j - 1])
sheet.write(j,1,STORE.F1List_std[j - 1])
elif sn == "MI":
sheet.write(0, 0, sn)
for j in range(1,n_col + 1):
sheet.write(j,0,STORE.MIList_mean[j - 1])
sheet.write(j,1,STORE.MIList_std[j - 1])
# ---------------------------------------------------
elif sn == "ALC_ACC_list":
sheet.write(0, 0, sn)
for j in range(1,len(STORE.ALC_ACC_list) + 1):
sheet.write(j,0,STORE.ALC_ACC_list[j - 1])
elif sn == "ALC_MZE_list":
sheet.write(0, 0, sn)
for j in range(1,len(STORE.ALC_MZE_list) + 1):
sheet.write(j,0,STORE.ALC_MZE_list[j - 1])
elif sn == "ALC_MAE_list":
sheet.write(0, 0, sn)
for j in range(1,len(STORE.ALC_MAE_list) + 1):
sheet.write(j,0,STORE.ALC_MAE_list[j - 1])
elif sn == "ALC_F1_list":
sheet.write(0, 0, sn)
for j in range(1,len(STORE.ALC_F1_list) + 1):
sheet.write(j,0,STORE.ALC_F1_list[j - 1])
elif sn == "ALC_MI_list":
sheet.write(0, 0, sn)
for j in range(1,len(STORE.ALC_MI_list) + 1):
sheet.write(j,0,STORE.ALC_MI_list[j - 1])
# -----------------
elif sn == "ALC_ACC":
sheet.write(0, 0, sn)
sheet.write(1, 0, STORE.ALC_ACC_mean)
sheet.write(2, 0, STORE.ALC_ACC_std)
elif sn == "ALC_MZE":
sheet.write(0, 0, sn)
sheet.write(1, 0, STORE.ALC_MZE_mean)
sheet.write(2, 0, STORE.ALC_MZE_std)
elif sn == "ALC_MAE":
sheet.write(0, 0, sn)
sheet.write(1, 0, STORE.ALC_MAE_mean)
sheet.write(2, 0, STORE.ALC_MAE_std)
elif sn == "ALC_F1":
sheet.write(0, 0, sn)
sheet.write(1, 0, STORE.ALC_F1_mean)
sheet.write(2, 0, STORE.ALC_F1_std)
elif sn == "ALC_MI":
sheet.write(0, 0, sn)
sheet.write(1, 0, STORE.ALC_MI_mean)
sheet.write(2, 0, STORE.ALC_MI_std)
elif sn == "Redun_list":
sheet.write(0, 0, sn)
for j in range(1,len(STORE.Redun_list) + 1):
sheet.write(j,0,STORE.Redun_list[j - 1])
elif sn == "Redun":
sheet.write(0, 0, sn)
sheet.write(1, 0, STORE.Redun_mean)
sheet.write(2, 0, STORE.Redun_std)
save_path = Path(r"D:\Chapter1\ALresult\ALBL")
save_path = str(save_path.joinpath(name + ".xls"))
workbook.save(save_path)