💛前情提要💛
本文是传知代码平台
中的相关前沿知识与技术的分享~
接下来我们即将进入一个全新的空间,对技术有一个全新的视角~
本文所涉及所有资源均在传知代码平台可获取
以下的内容一定会让你对AI 赋能时代
有一个颠覆性的认识哦!!!
以下内容干货满满,跟上步伐吧~
📌导航小助手📌
- 💡本章重点
- 🍞一. 概述
- 🍞二. 演示效果
- 🍞三.实现
- 🍞四.网络的训练
- 🫓总结
💡本章重点
- CNN实现脑电信号的情感识别
🍞一. 概述
情绪(或情感)识别(或检测)正日益引起来自多学科背景研究者的关注。情感计算,作为Picart提出的一个新兴研究领域,旨在使计算机系统能够准确地处理、识别和理解人类表达的情感信息,从而实现自然的人机交互(HCI),这是情感计算中的前沿科学问题。作为一种复杂的心理状态,反映在生理行为和生理活动中。过去十年里,研究人员一直致力于通过收集各种生理行为和生理活动中的情感信息来识别情感,例如来自麦克风的声音信号、神经生理活动测量设备的数据、摄像头的视频以及网站的文本等。情感检测研究的核心是利用统计机器学习技术(如分类、回归或聚类)实时或在线识别用户的不同情感状态。本文将基于DEAP数据集,利用CNN方法进行情感识别。
信号是一串时间序列,而由于不可能只通过检测人大脑皮层单一位置的信号来获得大脑活动的全貌,因此一般是多个通道的信号,即多条时间序列。
-
DEAP数据集
DEAP(Dataset for Emotion Analysis using Physiological Signals)数据集在脑电情感识别领域的文献中被广泛使用,它是一个开放和免费的数据集,其中包含从视听刺激中记录的生物信号以及参与实验的个体对其情感状态的主观评估。该数据集由伦敦玛丽皇后大学、特文特大学、日内瓦大学和EPFL的研究人员所采集并且标定。
🍞二. 演示效果
其中红色的波形代表兴奋的情感,蓝色代表平淡的情感,旁边的黑色块内是模型的预测结果,0代表预测对应情绪为平淡,1代表预测对应情绪为兴奋。
🍞三.实现
提取数据
首先,我们从DEAP数据集中得到的数据是关于32个受试者32种不同电影40个不同的信号,因此我们需要对数据集进行预处理。我们下载得到的数据集的格式文件是32个.dat格式的文件,利用python打开这类文件,并观察该数据集的全貌
def getDataFromDeap(filename: str):
data = pickle._Unpickler(open(filename, 'rb'))
data.encoding = 'latin1'
return data.load()
labels = []
deap = []
for i in range(1, 33):
data = getDataFromDeap( "./DEAP/data_preprocessed_python/raw/s" + (str(i) if i >= 10 else '0' + str(i) ) + '.dat')
labels.append(data['labels'])
deap.append(data['data'])
labels = np.array(labels)
deap = np.array(deap)
print(labels.shape)
print(deap.shape)
labels = labels.reshape(1280, 4)
deap = deap.reshape(1280, 40, 8064)
print(labels.shape)
print(deap.shape)
通过输出结果我们可以看到对于每一个数据文件(这代表是一个人测试数据)有32个测试结果,每一个测试结果对应着40条不同的生理信号(其中前32为脑电信号通道),每个测试结果还对应着4个不同的情感标签,用来表示对应的情感特征。 8064代表共有63秒采样时间以及128hz的采样频率。
数据预处理
在分类过程中我们需要提取相应的特征,这个过程便是数据的预处理过程。需要对数据进行哪些处理呢?一个在现有的实验中非常有效的特征是信号的微分熵,即我们需要对每个通道的脑电信号进行成分划分并且得到相应成分的微分熵
也就是通过计算信号(可以理解为一串序列)的方差从而得到
def diffEntropy(signal):
variance = np.var(signal, ddof = 1)
return math.log(2 * math.pi * math.e * variance) / 2
信号的成分常采用波频过滤的方式即常见的alpha, beta, gamma, theta四个不同的频段
其中delat成分由于只与深度睡眠有关故不做考虑。其核心代码为:
def butter_bandpass(lowcut, highcut, fs, order=5):
nyq = 0.5 * fs
low = lowcut / nyq
high = highcut / nyq
b, a = butter(order, [low, high], btype='band')
return b, a
def butter_bandpass_filter(data, lowcut, highcut, fs, order=5):
b, a = butter_bandpass(lowcut, highcut, fs, order=order)
y = lfilter(b, a, data)
return y
提取空间特征
现在我们已经得到了对应通道信号的微分熵,然后简单的按照这些数据进行分类是不靠谱的,因为通道代表大脑皮层,而大脑皮层的各个部位存在着一定的空间位置关系,因此我们需要再提取一个特征,可以更好的被CNN进行捕获,一个常用的特征是ISO的脑电标准图,它将脑电极的区域进行划分,这种划分可以被我们认为是这些脑电极对应的脑电信号存在空间关系,其中的字母分布代表不同的通道,这可以在数据集的官方网站中进行查询。
因此这一步就是把我们得到对应通道的微分熵数据按照上面的位置摆放,得到有4个不同脑电成分的二维数据
核心代码为
class eeged():
...
def convert2d(self, data, h = 9, w = 9):
data_2d = np.zeros([h, w])
data_2d[0] = (0., 0, 0, data[0], 0, data[16], 0, 0, 0)
data_2d[1] = (0., 0, 0, data[1], 0, data[17], 0, 0, 0)
data_2d[2] = (data[3], 0., data[2], 0, data[18], 0, data[19], 0, data[20])
data_2d[3] = (0., data[4], 0, data[5], 0, data[22], 0, data[21], 0)
data_2d[4] = (data[7], 0., data[6], 0, data[23], 0, data[24], 0, data[25])
data_2d[5] = (0., data[8], 0, data[9], 0, data[27], 0, data[26], 0)
data_2d[6] = (data[11], 0., data[10], 0, data[15], 0, data[28], 0, data[29])
data_2d[7] = (0., 0, 0, data[12], 0, data[30], 0, 0, 0)
data_2d[8] = (0., 0, 0, data[13], data[14], data[31], 0, 0, 0)
...
标签处理
前面提到,数据集给我们的标签是4个,在实际过程中根据不同的情感模型,我们可以选择不同的标签进行识别工作,在这里我们只选择’variance’标签进行识别,这个标签代表的是人的情感强度,值越大人越兴奋。原始的标签为1-9的实数,在这里我们简单转为二分类任务,这也是现在研究领域中常用的处理
...
def get_labels(self):
valence_labels = self.label[:, 0] > 5
arousal_labels = self.label[:, 1] > 5
v = []
a = []
for i in range(len(valence_labels)):
for j in range(0, 60):
v.append(valence_labels[i])
a.append(arousal_labels[i])
v = np.array(v, dtype=float)
a = np.array(a, dtype=float)
# print("labels:", v.shape)
return a, v
...
卷积神经网络
现在我们已经提取了脑电信号的特征,在这种数据处理的背景下,仅仅需要一个比较简单的CNN网络就能够得到较为显著的情感识别效果,其网络结构为:
class CNN(nn.Module):
def __init__(self, classes = 2):
super(CNN, self).__init__()
# input: batch, 4, 9, 9 output: batch, 24, 5, 5
self.conv1 = nn.Conv2d(in_channels = 4, out_channels = 24, kernel_size = 5)
self.relu1 = nn.ReLU()
# input: batch, 24, 5, 5 output: batch, 128, 4, 4
self.conv2 = nn.Conv2d(in_channels = 24, out_channels = 128, kernel_size = 2)
self.relu2 = nn.ReLU()
# input: batch, 128, 4, 4 output: batch, 128, 2, 2
self.maxpool = torch.nn.MaxPool2d(kernel_size = (2,2))
self.fc1 = nn.Linear(in_features = 128 * 2 * 2, out_features = 96)
self.relu3 = nn.ReLU()
self.fc2 = nn.Linear(in_features = 96, out_features = classes)
def forward(self, x): # input: 9 X 9 after padding
x = self.conv1(x)
x = self.relu1(x)
x = self.conv2(x)
x = self.relu2(x)
x = self.maxpool(x)
x = x.view(-1, np.prod(x.shape[1:]))
x = self.fc1(x)
x = self.relu3(x)
x = self.fc2(x)
return x
输入的数据为的维度为[batch, 4, 9, 9],后面的9来自于预先处理的二维结构,4来源于我们利用了脑电信号的4个成分。我们的网络结构采用了2层卷积,1层池化和两层全连接层。
🍞四.网络的训练
在训练之前,我们先将数据集分割成训练集和测试集
[x_train, l_train, x_test, l_test] = datasplit.getdata(path = './DEAP/de3d', device = 'cpu', labels = 'valence', splits = [0.6, 0.8], valid = False)
x_test = x_test.to(device)
l_test = l_test.to(device)
l_test = torch.argmax(l_test, dim=1)
其中datasplit.getdata需要做的事情就是将数据集的顺序打乱,然后按照一定的比例进行分割,其核心代码为:
def getdata(path = '../DEAP/de3d', device = 'cpu', labels = 'valence', splits = [0.6, 0.8], valid = False):
train_data = np.empty([0, 4, 9, 9])
test_data = np.empty([0, 4, 9, 9])
train_label = np.empty([0, 2])
test_label = np.empty([0, 2])
if(valid):
valid_data = np.empty([0, 4, 9, 9])
valid_label = np.empty([0, 2])
v, t = [int(2400 * i) for i in splits]
for file in os.listdir(path):
file = sio.loadmat(path + '/' + file)
data = file['data']
random_indices = np.random.permutation(data.shape[0])
data = data[random_indices, :, :, :]
if(valid):
train_data = np.concatenate([train_data, data[:v, :, :, :]])
valid_data = np.concatenate([valid_data, data[v:t, :, :, :]])
test_data = np.concatenate([test_data, data[t:, :, :, :]])
else:
train_data = np.concatenate([train_data, data[:t, :, :, :]])
test_data = np.concatenate([test_data, data[t:, :, :, :]])
label = one_hot(np.int32(file[ labels + '_labels'][0]), 2)
label = label[random_indices, :]
if(valid):
train_label = np.concatenate([train_label, label[:v, :]])
valid_label = np.concatenate([valid_label, label[v:t, :]])
test_label = np.concatenate([test_label, label[t:, :]])
else:
train_label = np.concatenate([train_label, label[:t, :]])
test_label = np.concatenate([test_label, label[t:, :]])
if(valid):
x_train = torch.tensor(train_data, requires_grad=False, dtype=torch.float32, device=device)
l_train = torch.tensor(train_label, requires_grad=False, dtype=torch.float32, device=device)
x_valid = torch.tensor(valid_data, requires_grad=False, dtype=torch.float32, device=device)
l_valid = torch.tensor(valid_label, requires_grad=False, dtype=torch.float32, device=device)
x_test = torch.tensor(test_data, requires_grad=False, dtype=torch.float32, device=device)
l_test = torch.tensor(test_label, requires_grad=False, dtype=torch.float32, device=device)
print( x_train.shape, l_train.shape, x_valid.shape, l_valid.shape, x_test.shape, l_test.shape )
return x_train, l_train, x_valid, x_valid, x_test, l_test
else:
x_train = torch.tensor(train_data, requires_grad=False, dtype=torch.float32, device=device)
l_train = torch.tensor(train_label, requires_grad=False, dtype=torch.float32, device=device)
x_test = torch.tensor(test_data, requires_grad=False, dtype=torch.float32, device=device)
l_test = torch.tensor(test_label, requires_grad=False, dtype=torch.float32, device=device)
print( x_train.shape, l_train.shape, x_test.shape, l_test.shape )
return x_train, l_train, x_test, l_test
上面的代码还实现了需要验证集划分的情况。
现在就可以开始训练了,定义好每一步的训练函数:
def training(model, optimizer, x, y):
criterion = nn.CrossEntropyLoss()
optimizer.zero_grad()
y_pred = model(x)
loss = criterion(y_pred, y)
loss.backward()
optimizer.step()
这里采用较为常用的Adam优化器进行训练:
convnn = model.CNN().to(device)
optimizer = torch.optim.Adam(convnn.parameters())
每一步训练结束我们都输出一次网络在训练集上的结果(只是为了可视化,并不按照这些结果进行调整),并得到最终的训练结果
for i in range(1, epoch + 1):
print(f"Training {i} epoch.... ", end = '')
for data in dataloader:
x, y = data
x = x.to(device)
y = y.to(device)
training(model = convnn, optimizer = optimizer, x = x, y = y)
with torch.no_grad():
l_pre = convnn(x_test)
l_pre = torch.argmax(l_pre, dim=1)
print('Finished! Test acc after training:', end = "")
print((l_test == l_pre).float().mean())
with torch.no_grad():
l_pre = convnn(x_test)
l_pre = torch.argmax(l_pre, dim=1)
print('Test acc after training:')
print((l_test == l_pre).float().mean())
🫓总结
综上,我们基本了解了“一项全新的技术啦” 🍭 ~~
恭喜你的内功又双叒叕得到了提高!!!
感谢你们的阅读😆
后续还会继续更新💓,欢迎持续关注📌哟~
💫如果有错误❌,欢迎指正呀💫
✨如果觉得收获满满,可以点点赞👍支持一下哟~✨
【传知科技 – 了解更多新知识】