1 数据集处理
1.1数据集下载
数据集来源:kaggle,网址:https://www.kaggle.com/,点击进入网站,左侧选择Datasets。
进入后搜索栏搜索关键词bird。此时出现很多数据集可以选择,推荐选择第一个或者第三个。这里以第三个为例进行演示。
点击进入,注册登录账号后,点击Download进行下载。下载后选择第一个压缩包进行解压。
1.2 数据集划分
先观察数据集。进入数据集文件夹,直接选择images进入。
此时我们可以观察到有200个文件夹,代表200个鸟的种类。当我们知道文件结构后需要对其进行划分了。按照常用比例8:1:1划分为训练集、验证集和测试集。
import os
import shutil
import random
# 数据集根目录,这里选择自己的目录
data_dir = './bird200/CUB_200_2011/images'
# 划分后的数据集根目录
output_dir = './bird200_new'
# 创建训练集、验证集和测试集的文件夹
train_dir = os.path.join(output_dir, 'train')
val_dir = os.path.join(output_dir, 'val')
test_dir = os.path.join(output_dir, 'test')
os.makedirs(train_dir, exist_ok=True)
os.makedirs(val_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)
# 遍历每个类别文件夹
for class_name in os.listdir(data_dir):
class_dir = os.path.join(data_dir, class_name)
if os.path.isdir(class_dir):
# 创建训练集、验证集和测试集的类别文件夹
train_class_dir = os.path.join(train_dir, class_name)
val_class_dir = os.path.join(val_dir, class_name)
test_class_dir = os.path.join(test_dir, class_name)
os.makedirs(train_class_dir, exist_ok=True)
os.makedirs(val_class_dir, exist_ok=True)
os.makedirs(test_class_dir, exist_ok=True)
# 获取该类别下的所有文件
files = os.listdir(class_dir)
random.shuffle(files) # 随机打乱文件顺序
# 计算划分的索引
num_files = len(files)
train_size = int(num_files * 0.8)
val_size = int(num_files * 0.1)
# 划分文件
train_files = files[:train_size]
val_files = files[train_size:train_size + val_size]
test_files = files[train_size + val_size:]
# 复制文件到对应的文件夹
for file in train_files:
src_path = os.path.join(class_dir, file)
dst_path = os.path.join(train_class_dir, file)
shutil.copyfile(src_path, dst_path)
for file in val_files:
src_path = os.path.join(class_dir, file)
dst_path = os.path.join(val_class_dir, file)
shutil.copyfile(src_path, dst_path)
for file in test_files:
src_path = os.path.join(class_dir, file)
dst_path = os.path.join(test_class_dir, file)
shutil.copyfile(src_path, dst_path)
print("数据集划分完成!")
2.前期准备工作
2.1 导入需要的库
import warnings
from sklearn.exceptions import ConvergenceWarning
warnings.filterwarnings("ignore", category=ConvergenceWarning)
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=UserWarning)
# 导入必要的库
import itertools
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from PIL import Image
from sklearn.metrics import classification_report, f1_score , confusion_matrix
# 导入TensorFlow库
import tensorflow as tf
from tensorflow import keras
from keras.layers import Dense, Dropout , BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import layers,models,Model
from keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers.experimental import preprocessing
from tensorflow.keras.callbacks import Callback, EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras import mixed_precision
mixed_precision.set_global_policy('float32')
# 输出TensorFlow版本
print(tf.__version__)
进行测试运行结果:
2.2加载数据
这里进行数据集载入,遍历文件夹读取照片和标签信息。并对其转化格式,使其符合TensorFlow运行。
# 定义数据集路径
dataset = {
"train_data" : "./lty/input/bird-species/train",
"valid_data" : "./lty/input/bird-species/valid",
"test_data" : "./lty/input/bird-species/test"
}
all_data = []
# 遍历数据集文件夹并读取图片和标签信息
for path in dataset.values():
data = {"imgpath": [] , "labels": [] }
category = os.listdir(path)
for folder in category:
folderpath = os.path.join(path , folder)
filelist = os.listdir(folderpath)
for file in filelist:
fpath = os.path.join(folderpath, file)
data["imgpath"].append(fpath)
data["labels"].append(folder)
# 将数据加入列表中
all_data.append(data.copy())
data.clear()
# 将列表转化为DataFrame格式
train_df = pd.DataFrame(all_data[0] , index=range(len(all_data[0]['imgpath'])))
valid_df = pd.DataFrame(all_data[1] , index=range(len(all_data[1]['imgpath'])))
test_df = pd.DataFrame(all_data[2] , index=range(len(all_data[2]['imgpath'])))
# 将标签转化为数字编码
lb = LabelEncoder()
train_df['encoded_labels'] = lb.fit_transform(train_df['labels'])
valid_df['encoded_labels'] = lb.fit_transform(valid_df['labels'])
test_df['encoded_labels'] = lb.fit_transform(test_df['labels'])
进行一个可视化展示,展示每个类别数量以及标签。
# 获取训练集中每个类别的图像数量和标签
train = train_df["labels"].value_counts()
label = train.tolist()
index = train.index.tolist()
# 设置颜色列表
colors = [
"#1f77b4", "#ff7f0e", "#2ca02c", "#d62728", "#9467bd",
"#8c564b", "#e377c2", "#7f7f7f", "#bcbd22", "#17becf",
"#aec7e8", "#ffbb78", "#98df8a", "#ff9896", "#c5b0d5",
"#c49c94", "#f7b6d2", "#c7c7c7", "#dbdb8d", "#9edae5",
"#5254a3", "#6b6ecf", "#bdbdbd", "#8ca252", "#bd9e39",
"#ad494a", "#8c6d31", "#6b6ecf", "#e7ba52", "#ce6dbd",
"#9c9ede", "#cedb9c", "#de9ed6", "#ad494a", "#d6616b",
"#f7f7f7", "#7b4173", "#a55194", "#ce6dbd"
]
# 绘制水平条形图
plt.figure(figsize=(30,30))
plt.title("Training data images count per class",fontsize=38)
plt.xlabel('Number of images', fontsize=35)
plt.ylabel('Classes', fontsize=35)
plt.barh(index,label, color=colors)
plt.grid(True)
plt.show()
展示如图:
2.3检查训练集、验证集和测试集的内容
通过打印输出训练集、验证集和测试集的内容,确保分配没有出现问题。
import os
# 设置训练集文件夹路径
path = "C:/Users/z/lty/bird-species/train"
# 获取train训练集文件夹列表
dirs = os.listdir(path)
# 遍历文件夹列表并打印文件名
for file in dirs:
print(file)
展示如图:
# 打印训练集信息
print("----------Train-------------")
print(train_df[["imgpath", "labels"]].head(5)) # 打印前5行的图像路径和标签
print(train_df.shape) # 打印训练集的形状,即行数和列数
# 打印验证集信息
print("--------Validation----------")
print(valid_df[["imgpath", "labels"]].head(5)) # 打印前5行的图像路径和标签
print(valid_df.shape) # 打印验证集的形状,即行数和列数
# 打印测试集信息
print("----------Test--------------")
print(test_df[["imgpath", "labels"]].head(5)) # 打印前5行的图像路径和标签
print(test_df.shape) # 打印测试集的形状,即行数和列数
展示如图:
随机展示数据集的照片,用于直观体验:
import matplotlib.pyplot as plt
from PIL import Image
# 创建一个大小为15x12的画布
plt.figure(figsize=(15, 12))
# 从验证集中随机选择16个样本,重置索引并逐行处理
for i, row in valid_df.sample(n=16).reset_index().iterrows():
# 在4x4的子图中的第i+1个位置创建一个子图
plt.subplot(4, 4, i+1)
# 获取图像路径
image_path = row['imgpath']
image = Image.open(image_path)
plt.imshow(image)
# 设置子图的标题为标签值
plt.title(row["labels"])
plt.axis('off')
plt.show()
如图所示:
2.4数据预处理
再对数据进行处理,例如进行数据增强等操作。
%%time
BATCH_SIZE = 35
IMAGE_SIZE = (224, 224)
# 导入图像数据生成器 ImageDataGenerator
from keras.preprocessing.image import ImageDataGenerator
# 定义数据增强生成器
generator = ImageDataGenerator(
preprocessing_function=tf.keras.applications.efficientnet.preprocess_input, # 预处理函数
rescale=1./255, # 将像素值缩放到0-1之间
width_shift_range=0.2, # 水平和垂直方向上的随机平移范围
height_shift_range=0.2,
zoom_range=0.2 # 随机缩放图像的范围
)
# 将训练集数据分批生成并进行数据增强
train_images = generator.flow_from_dataframe(
dataframe=train_df, # 使用train_df作为数据源
x_col='imgpath', # 图像路径的列名
y_col='labels', # 标签的列名
target_size=IMAGE_SIZE, # 图像的目标大小
color_mode='rgb', # 图像的颜色通道模式
class_mode='categorical', # 分类模式,输出是一个one-hot编码的向量
batch_size=BATCH_SIZE, # 批次大小
shuffle=True, # 是否打乱数据顺序
seed=42 # 随机种子
)
# 将验证集数据分批生成
val_images = generator.flow_from_dataframe(
dataframe=valid_df, # 使用valid_df作为数据源
x_col='imgpath',
y_col='labels',
target_size=IMAGE_SIZE,
color_mode='rgb',
class_mode='categorical',
batch_size=BATCH_SIZE,
shuffle=False
)
# 将测试集数据分批生成
test_images = generator.flow_from_dataframe(
dataframe=test_df, # 使用test_df作为数据源
x_col='imgpath',
y_col='labels',
target_size=IMAGE_SIZE,
color_mode='rgb',
class_mode='categorical',
batch_size=BATCH_SIZE,
shuffle=False
)
经过这些处理后,照片的大小就是一致的了。最后一步就是调整子图间的空白部分。
labels = [k for k in train_images.class_indices]
sample_images = train_images.__next__()
images = sample_generate[0]
titles = sample_generate[1]
plt.figure(figsize = (15 , 15))
for i in range(20):
plt.subplot(5 , 5 , i+1)
plt.subplots_adjust(hspace = 0.3 , wspace = 0.3)#调整子图之间的空白区域
plt.imshow(images[i])
plt.title(f'Class: {labels[np.argmax(titles[i],axis=0)]}')
plt.axis("off")
运行结果如下:
3.进行模型训练
3.1 模型加载
先进行模型加载,我们可以下载一个适合的预训练模型,再其基础上进行训练和微调可以减少训练时间。
# 加载预训练模型
pretrained_model = tf.keras.applications.EfficientNetB5(
input_shape=(224, 224, 3),
include_top=False, # 不加载或重新初始化顶层(输出层)的参数
weights='imagenet',
pooling='max'
)
# 冻结预训练神经网络的层
for i, layer in enumerate(pretrained_model.layers):
pretrained_model.layers[i].trainable = False
3.2 构建模型
这一步构建我们自己的训练模型。
# 获取类别数
num_classes = len(set(train_images.classes))
# 对数据进行增强
augment = tf.keras.Sequential([
layers.experimental.preprocessing.RandomFlip("horizontal"),
layers.experimental.preprocessing.RandomRotation(0.15),
layers.experimental.preprocessing.RandomZoom(0.12),
layers.experimental.preprocessing.RandomContrast(0.12),
], name='AugmentationLayer')
# 输入层
inputs = layers.Input(shape=(224, 224, 3), name='inputLayer')
x = augment(inputs) # 应用数据增强
pretrain_out = pretrained_model(x, training=False)
# 添加全连接层和激活函数
x = layers.Dense(1024)(pretrain_out)
x = layers.Activation(activation="relu")(x)
x = BatchNormalization()(x)
x = layers.Dropout(0.45)(x)
x = layers.Dense(512)(x)
x = layers.Activation(activation="relu")(x)
x = BatchNormalization()(x)
x = layers.Dropout(0.3)(x)
x = layers.Dense(num_classes)(x)
outputs = layers.Activation(activation="softmax", dtype=tf.float32, name='activationLayer')(x)
# 创建模型
model = Model(inputs=inputs, outputs=outputs)
# 编译模型
model.compile(
optimizer=Adam(0.0005),
loss='categorical_crossentropy',
metrics=['accuracy']
)
# 打印模型结构摘要
print(model.summary())
展示如图,可以查看模型具体参数:
3.3 模型训练
这一步就可以正式训练模型了。在 transfer learning 中,我们可以选择保持预训练模型的一部分或全部参数不变(称为冻结),只对最后几层或某些层进行微调,以适应新任务的特定要求。这样做的原因是预训练模型已经学习到了通用的特征,我们可以认为这些特征对于新任务也是有用的。通过仅微调少量参数,我们可以在较小的数据集上快速训练出具有良好性能的模型。
# 训练模型
history = model.fit(
train_images,
steps_per_epoch=len(train_images),
validation_data=val_images,
validation_steps=len(val_images),
epochs=10,
callbacks=[
# 提前停止回调,如果验证集损失在连续3个epoch中没有改善,则提前停止训练
EarlyStopping(monitor="val_loss", patience=3, restore_best_weights=True),
# 学习率调整,在验证集损失没有改善时降低学习率
ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=2, mode='min')
]
)
# 保存模型权重
model.save_weights('./lty/input/bird-species/my_checkpoint')
训练如图所示:
4.结果与评估
4.1 结果展示
绘制训练过程图。
# 定义所需的变量
tr_acc = history.history['accuracy']
tr_loss = history.history['loss']
val_acc = history.history['val_accuracy']
val_loss = history.history['val_loss']
index_loss = np.argmin(val_loss)
val_lowest = val_loss[index_loss]
index_acc = np.argmax(val_acc)
acc_highest = val_acc[index_acc]
Epochs = [i+1 for i in range(len(tr_acc))]
loss_label = f'best epoch= {str(index_loss + 1)}'
acc_label = f'best epoch= {str(index_acc + 1)}'
# 绘制训练历史
plt.figure(figsize= (20, 8))
plt.style.use('fivethirtyeight')
plt.subplot(1, 2, 1)
plt.plot(Epochs, tr_loss, 'r', label= 'Training loss')
plt.plot(Epochs, val_loss, 'g', label= 'Validation loss')
plt.scatter(index_loss + 1, val_lowest, s= 150, c= 'blue', label= loss_label)
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(Epochs, tr_acc, 'r', label= 'Training Accuracy')
plt.plot(Epochs, val_acc, 'g', label= 'Validation Accuracy')
plt.scatter(index_acc + 1 , acc_highest, s= 150, c= 'blue', label= acc_label)
plt.title('Training and Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.tight_layout
plt.show()
展示结果:
4.2 评估
评估模型在当前数据集上的性能,包括测试损失和测试的准确率,并打印分数报告。
results = model.evaluate(test_images, verbose=0)
# 对测试集进行评估,返回测试损失和测试准确率
print(" Test Loss: {:.5f}".format(results[0]))
# 打印测试损失,保留小数点后5位
print("Test Accuracy: {:.2f}%".format(results[1] * 100))
# 打印测试准确率,乘以100后保留两位小数
y_true = test_images.classes # 获取测试集样本的真实标签
y_pred = np.argmax(model.predict(test_images), axis=1)
# 获取模型对测试集样本的预测标签
# 计算并打印F1 Score和分类报告
f1 = f1_score(y_true, y_pred, average='macro') # 计算F1 Score
print("F1 Score:", f1)
print(classification_report(y_true, y_pred, target_names=test_images.class_indices.keys())) # 打印分类报告
展示:
为了获得结果展示,我能可以打印预测结果。这里展示前8条。
# 创建一个字典,将类别索引和对应的类别名称进行关联
classes = dict(zip(test_images.class_indices.values(), test_images.class_indices.keys()))
# 创建一个DataFrame来存储预测结果
Predictions = pd.DataFrame({
"Image Index": list(range(len(test_images.labels))), # 图像索引
"Test Labels": test_images.labels, # 真实标签
"Test Classes": [classes[i] for i in test_images.labels], # 真实类别
"Prediction Labels": y_pred, # 预测标签
"Prediction Classes": [classes[i] for i in y_pred], # 预测类别
"Path": test_images.filenames, # 图像路径
"Prediction Probability": [x for x in np.asarray(tf.reduce_max(model.predict(test_images), axis=1))] # 预测概率
})
Predictions.head(8) # 输出前8行预测结果
展示:
4.3 优化方向
在这里我们展示了最容易出错的样本,用于后续。
plt.figure(figsize=(20, 20))
# 选择分类错误的预测结果中概率最高的20个样本
subset = Predictions[Predictions["Test Labels"] != Predictions["Prediction Labels"]].sort_values("Prediction Probability").tail(20).reset_index()
for i, row in subset.iterrows():
plt.subplot(5, 4, i+1)
image_path = row['Path']
image = Image.open(image_path)
# 显示图像
plt.imshow(image)
# 设置图像标题,包括真实类别和预测类别
plt.title(f'TRUE: {row["Test Classes"]} | PRED: {row["Prediction Classes"]}', fontsize=8)
plt.axis('off')
plt.show()
展示如图:
混淆矩阵(Confusion Matrix)和分类报告(Classification Report)
import numpy as np
from sklearn.metrics import confusion_matrix
import itertools
import matplotlib.pyplot as plt
# 使用模型对测试图片进行预测
preds = model.predict_generator(test_images)
# 找到每个预测结果中概率最高的类别作为预测标签
y_pred = np.argmax(preds, axis=1)
# 获取测试图片的真实标签和对应的类别字典
g_dict = test_images.class_indices
# 创建一个包含所有类别名称的列表
classes = list(g_dict.keys())
# 计算混淆矩阵
cm = confusion_matrix(test_images.classes, y_pred)
plt.figure(figsize=(30, 30))
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes, rotation=45)
plt.yticks(tick_marks, classes)
thresh = cm.max() / 2.
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
plt.text(j, i, cm[i, j], horizontalalignment='center', color='white' if cm[i, j] > thresh else 'black')
plt.tight_layout()
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()
使用matplotlib库绘制了一个大小为30x30的图像窗口,显示了混淆矩阵的热力图。热力图的颜色越深,表示预测结果越准确。图像中的数字表示每个混淆矩阵单元格中的样本数量。图像上方的标题为"Confusion Matrix",颜色条表示每个颜色对应的样本数量范围。x轴标签为预测标签,y轴标签为真实标签。