文章目录
- 概述
- VAE代码实现
- 关闭eager execution
- 修改bottlenectk组件
- 修改loss损失函数
- Preprocessline模块实现
- Loader模块
- Padder模块
- LogSpectrogramExtractor模块
- MinMaxNormaliser模块
- Saver模块
- PreprocessPipeLine模块
- 知识补充
- property修饰词
- train训练模块
- load_fsdd模块
- train模块
- 执行效果
- 最终代码
- 总结
概述
- 这部分的代码是在原来的自动编码器上的代码进行修改的,进行了如下的修改
- 对于bottleneck,使用概率分布,替代了原来的点映射
- 修改损失函数为 α \alpha αRMSE +KL
- 声音预处理模块preprocessline
- 模型训练文件train
VAE代码实现
关闭eager execution
- eager execution 的相关知识,链接添加链接描述
import tensorflow as tf
tf.compat.v1.disable_eager_execution()
修改bottlenectk组件
注意
- 这里还是要结合正态分布的两个特征参数,将之映射为特征空间中一个特征点,同时为了保证特征点,关于mean value左右波动,所以需要将原来的协防差矩阵进行转换,右边的 ε \varepsilon ε还是在对应维度下的标准正态中随机抽样的点
上述这段,很重要,至少在代码生成中很重要,因为并没有现成的库给你调用!!下面就是具体的代码
def _add_bottleneck(self,x):
# 9、第九部分进行编写
# 4.1,如果要将自动编码器转为变分自动编码器,需要实现正态分布,并且能够随机从中选取特定的点
""" 首先将数据展平,然后在传入全链接层 """
self._shape_before_bottleneck = K.int_shape(x)[1:] # [batch_size,height,weight,channel],这里只需要后面三个的大小
x = Flatten()(x)
self.mu = Dense(self.latent_space_dim,name = "mu")(x)
self.log_variance = Dense(self.latent_space_dim,name ="log_variance")(x)
def sample_point_from_normal_distribution(args):
mu,log_variance = args
epsilon = K.random_normal(shape = K.shape(self.mu),mean = 0.,
stddev = 1.)
sample_point = mu + K.exp(log_variance / 2) * epsilon
return sample_point
x = Lambda(sample_point_from_normal_distribution,
name = "encoder_output")([self.mu,self.log_variance])
x = Dense(self.latent_space_dim,name = "encoder_output")(x)
return x
修改loss损失函数
- KL散度,用来标准正态分布和正态分布之间的差异
- 对于那种方差和标准分布参数完全不同的分布进行矫正
- 因为只有不断向标准正态分布靠拢,才能不断确保最终的表示空间是零点对称的。
- 同时还能减少类与类之间的空白间隔的出现
-
α
\alpha
α是重建损失函数的权重,
- 太大,最终的效果和自动编码器的效果相同
- 太小,重建的图片和原图一点关系都没有
KL散度损失函数
- 这个函数是用来衡量当前的正态分布和高斯正态分布之间的距离,使得当前的正态分布,不断向标准正态分布进行靠拢
- 具体的数学公式如下,这个损失函数并没有现成的库可以调用,所以需要根据公式进行自定义
具体实现代码如下
# 4.2 将两种损失函数进行综合
def _calculate_combined_loss(self,y_target,y_predict):
""" 两个损失函数进行汇总 """
reconstruction_loss = self._calculate_reconstruction_loss(y_target,y_predict)
kl_loss = self._calculate_kl_loss(y_target,y_predict)
combined_loss = self.reconstruction_weight * reconstruction_loss + kl_loss
return combined_loss
# 4.2 损失函数重建
def _calculate_reconstruction_loss(self,y_target,y_predict):
""" 模型重建损失函数,加上了对应alpha """
error = y_predict - y_target
reconstruction_loss = K.mean(K.square(error),axis = [1,2,3]) # 注意,这里只需要返回除了第一个图片序号的后两个维度
return reconstruction_loss
# 4.2 损失函数重建
def _calculate_kl_loss(self,y_target,y_predict):
""" KL散度,用来衡量当前的正态分布和标准正态分布之间的距离"""
kl_loss = -0.5 * K.sum(1 + self.log_variance - K.square(self.mu) - K.exp(self.log_variance),axis=1)
return kl_loss
Preprocessline模块实现
- 这部分需要一些基础知识,需要自己学一下,这里也会做相关介绍:
- 总的课程
- 快速理解短时傅立叶变换
- 理解声音密度、强度和音色等基础声音特征
预处理流程
- 在代码中,作者将预处理分成了几个模块,并且每一个模块都负责一个功能,然后再用一个preprocess类,按照需求将各个模块进行组装。
- 具体功能如下
- Loader模块:负责读取音频文件,并返回对应的信号序列
- Padder模块:对信号的进行补充,确保每一个信号的长度都是统一长度相同
- LogSpectrogramExtractor模块:从数据序列中,提取出对应的频谱数据,并返回
- MinMaxNormaliser模块:正则化模块,对频谱数据,按照要求,将之隐射到对应的范围之内
- Saver模块:保存模块,将数据进行保存,主要保存两种特征,分别如下
- 数据的最值,这个是用来进行数据恢复的,因为数据还需要从正则化之后的数据恢复到原来的数据
- 提取出来的特征:将提取出来的频谱数据和声音的振幅,以及声音的时间序列进行保存
Loader模块
class Loader:
""" 关于这个函数还是有一些不懂的地方,需要学习一下,相关的音频知识 """
def __init__(self,
sample_rate,
duration,
mono # 这是单声道还是双声道
):
self.sample_rate = sample_rate
self.duration = duration
self.mono = mono
def load(self,file_path):
""" 加载文件,这里加载的是什么东西 """
signal = librosa.load(file_path,
sr = self.sample_rate,
duration = self.duration,
mono = self.mono)[0]
return signal
- 参数说明
- sample_rate:采样频率
- duration:加载的音频时长,默认是完全加载的
- mono:是否将音频转换为单声道
- 返回值
- numpy数组:音频的时间序列,支持多通道
- 采样率
Padder模块
class Padder:
""" padder这个类别是用来对数组进行填补,保证数据的大小一致 """
def __init__(self,mode ="constant"):
self.mode = mode
def left_pad(self,array,num_missing_items):
paded_array = np.pad(array,
(num_missing_items,0),
mode = self.mode)
return paded_array
def right_pad(self, array, num_missing_items):
paded_array = np.pad(array,
( 0,num_missing_items),
mode=self.mode)
return paded_array
- 很常见的就是对数组进行扩充,是的数组能够保证大小一致
LogSpectrogramExtractor模块
class LogSpectrogramExtractor:
""" LogSpectrogram是提取了时间序列中的频谱信息(以DB为单位) """
def __init__(self,frame_size,hop_length):
self.frame_size = frame_size
self.hop_length = hop_length
def extract(self,signal):
stft = librosa.stft(signal,
n_fft = self.frame_size,
hop_length=self.hop_length)[:-1]
spectrogram = np.abs(stft) # 取绝对值,化成频谱图
log_spectrogram = librosa.amplitude_to_db(spectrogram) # 加上分贝图
return log_spectrogram
-
这部分主要是负责提取时域中音频信息的频域信息,主要用了两种方法,具体介绍如下
-
librosa.stft:
- 这部分是用来短时傅立叶变换,返回一个复数矩阵
- 参数说明
- n_fft:进行傅立叶变化的时域窗口
- hop_length:帧移动的跳数,时域窗口的每一次迭代的移动步数
- 返回
- 复数矩阵,这里仅仅不需要最后一个输出,具体执行如下效果图,
-
librosa.amplitude_to_db:
- 将幅度频谱转换为dB标度频谱,就是用分贝表示幅度
- 参数
- 输入幅度
- 返回
- 复数序列,将原来的数据替换为对应的dB,下面是转换之后的db图
- 在网上找了一下,关于这块,这部分介绍的是比较详细的
- 链接
- 一般来说,这里需要了解一下对应的log-spectrogram数据到底是什么样的,有什么特征
MinMaxNormaliser模块
class MinMaxNormaliser:
""" 对数据进行正则化 """
def __init__(self,min_val,max_val):
self.min = min_val
self.max = max_val
def nomalise(self,array):
"""" 将原来的数组映射到min_val,max_val之间,这部分是用来将声音提取出来,提取特征"""
norm_array = (array - array.min) / (array.max - array.min)
norm_array = norm_array * (self.max - self.min) + self.min
return norm_array
def denormalise(self,norm_array,original_min,original_max):
""" 这个是用来还原的,后续生成声音 """
array = (norm_array -self.min) / (self.max - self.min)
array = array * (original_max - original_min) * original_min
return array
- 这部分使用来对数据进行正则化的,我觉得他大部分函数都是自己实现的,可能就是让你学习一下的
- normalise:对数据进行正则化,将原来的数据映射到新的min和max之间
- denormalise:对数据进行还原
Saver模块
class Saver:
""" 保存特征和对应的min和max """
def __init__(self,feature_save_dir,min_max_values_save_dir):
self.feature_save_dir = feature_save_dir # 这是特征保存的路径,和原来的路径并不相同
self.min_max_values_save_dir = min_max_values_save_dir
def save_feature(self,feature,file_path):
""" 经过处理之后的特征 """
save_path = self._generate_save_path(file_path)
np.save(save_path,feature)
def save_min_max_values(self,min_max_values):
""" 保存音频文件对应的最大最小值 """
save_path = os.path.join(self.min_max_values_save_dir,"min_max_values.pkl")
self._save(min_max_values,save_path)
@staticmethod
def _save(data,save_path):
with open(save_path,"wb") as f:
pickle.dump(data,f)
def _generate_save_path(self,file_path):
file_name = os.path.split(file_path)[1]
save_path = os.path.join(self.feature_save_dir,file_name + ".npy")
return save_path
- 这个模块是负责保存提取出来的特征和保存原数据的最值
- min_max_values.pkl
- 将提取出来的数据,获取其最大值和最小值,保存为对应的pkl文件
- 文件名.npy
- 将提取出来的频谱特征保存为对应的npy文件
- min_max_values.pkl
PreprocessPipeLine模块
class PreprocessingPipeline:
""" 将上述的每一个文件经过下述流程处理:
1、加载音频文件
2、对数据进行padding,确保等长
3、从数据中提取出log频谱图
4、将频谱图进行正则花
5、保存频谱图
store the min max values for all the log spectrogram
"""
def __init__(self):
""" 这里并没有写死,考虑到了代码的鲁棒性,可以通用于不同的预处理模块 """
self.padder = None
self.extractor = None
self.normaliser = None
self.saver = None
self._loader = None
self._num_expected_samples = None
self.min_max_value = dict()
# 这部分是用来判定是否需要进行padding的
# 将成员变量定义为padder,判定是否需要进行
@property
def loader(self):
return self._loader
@loader.setter
def loader(self,loader):
self._loader = loader
self._num_expected_samples = int(loader.sample_rate * loader.duration)
def process(self,audio_files_dir):
for root, _, files in os.walk(audio_files_dir):
for file in files:
file_path = os.path.join(root, file)
self._process_file(file_path)
print(f"Processed file {file_path}")
self.saver.save_min_max_values(self.min_max_value)
# 这里还需要保存对应每一个音频文件对应的最大和最小值,便于还原
self.saver.save_min_max_values(self.min_max_value)
def _process_file(self,file_path):
""" 处理单个文件的过程 """
signal = self.loader.load(file_path) # 加载信号
if self._is_padding_necessary(signal): # 判定是否需要进行padding
signal = self._apply_padding(signal)
feature = self.extractor.extract(signal) # 提取特定的特征,这里保留一个通用的函数
norm_feature = self.normaliser.normalise(feature) # 对特征进行正则化
save_path = self.saver.save_feature(norm_feature,file_path)
self._store_min_max_value(save_path,feature.min(),feature.max())
# 4.3 逐步实现上述的方法
def _is_padding_necessary(self,signal):
""" 判定是否需要对这个array进行扩充 """
if len(signal) < self._num_expected_samples:
return True
return False
def _apply_padding(self,signal):
""" 对array进行padding """
padding_signal = self.padder.right_pad(signal,self._num_expected_samples - len(signal))
return padding_signal
def _store_min_max_value(self,save_path,min_value,max_value):
self.min_max_value[save_path] = {
"min" :min_value,
"max":max_value
}
- 数据预处理的完整流程,主要是调用的process方法实现,然后处理单个文件,是调用私有函数_process方法
知识补充
property修饰词
- 参考连接 , property修饰词的说明
- 作用
- 使用property来创建只读属性,将方法转为相同名称的只读属性,防止属性被修改。
train训练模块
- 这部分主要是加载已经提取出来的特征频谱图,同时调用对应的模型进行训练。
load_fsdd模块
def load_fsdd(spectrograms_path):
x_train = []
for root, _, file_names in os.walk(spectrograms_path):
for file_name in file_names:
file_path = os.path.join(root, file_name)
spectrogram = np.load(file_path) # (n_bins, n_frames, 1)
x_train.append(spectrogram)
x_train = np.array(x_train)
# 三个点是省略多个切片的操作,相当于是多个维度,但是省略前三个维度的写法
x_train = x_train[..., np.newaxis] # -> (3000, 256, 64, 1)
return x_train
- 这部分是读取所有的频谱图,但是频谱图并没有加上新一个新的channel列的数据。
- 正常读取的数据是一个二维数组,然后第一列是数据的样本序号,并没有channel.所以才有
x_train[... , np.newaxis]
* numpy.newaxis 方法是 None 的别名,用于 Python 中的数组索引。numpy.newaxis 最直接的用法是在 Python 中为 NumPy 数组添加一个新维度。例如,将一维数组转换为二维数组,将二维数组转换为 3D 数组,等等。
train模块
def train(x_train, learning_rate, batch_size, epochs):
autoencoder = VAE(
input_shape=(256, 64, 1),
conv_filters=(512, 256, 128, 64, 32),
conv_kernels=(3, 3, 3, 3, 3),
conv_strides=(2, 2, 2, 2, (2, 1)),
latent_space_dim=128
)
autoencoder.summary()
autoencoder.compile(learning_rate)
autoencoder.train(x_train, batch_size, epochs)
return autoencoder
- 这个主要是根据以前的模型制定相关参数,并创建对应的模型,然后开始训练
执行效果
最终代码
import os
import numpy as np
from vae import VAE
LEARNING_RATE = 0.0005
BATCH_SIZE = 16
EPOCHS = 150
SPECTROGRAMS_PATH = "/root/PycharmProjects/VAEGenerate/Mycode/fsdd/spectrogram"
def load_fsdd(spectrograms_path):
x_train = []
for root, _, file_names in os.walk(spectrograms_path):
for file_name in file_names:
file_path = os.path.join(root, file_name)
spectrogram = np.load(file_path) # (n_bins, n_frames, 1)
x_train.append(spectrogram)
x_train = np.array(x_train)
print(x_train.shape)
# 三个点是省略多个切片的操作,相当于是多个维度,但是省略前三个维度的写法
x_train = x_train[..., np.newaxis] # -> (3000, 256, 64, 1)
return x_train
def train(x_train, learning_rate, batch_size, epochs):
autoencoder = VAE(
input_shape=(256, 64, 1),
conv_filters=(512, 256, 128, 64, 32),
conv_kernels=(3, 3, 3, 3, 3),
conv_strides=(2, 2, 2, 2, (2, 1)),
latent_space_dim=128
)
autoencoder.summary()
autoencoder.compile(learning_rate)
autoencoder.train(x_train, batch_size, epochs)
return autoencoder
if __name__ == "__main__":
x_train = load_fsdd(SPECTROGRAMS_PATH)
autoencoder = train(x_train, LEARNING_RATE, BATCH_SIZE, EPOCHS)
autoencoder.save("model")
总结
- 这部分还是学到了很多东西,至少对于作者的代码编写习惯更加了解。但是对于短时傅立叶转换那一块尚且了解的不够具体,虽然有对应的库可以实现,但是还是不够具体。