目录
- 摘要
- Abstract
- 1 LSTM模型实战
- 1.1 数据处理
- 1.2 LSTM模型的搭建
- 1.3 数据的预测和可视化
- 2 transformer(上)
- 2.1 Transformer 结构
- 2.2 Transformer 编码器
- 总结
摘要
本周的工作内容主要分为两个部分,第一部分是使用LSTM模型预测股票市场数据的涨幅趋势,LSTM通过其独特的结构和机制,克服了传统RNN在时序预测任务中的局限性,这使得LSTM在各种时序数据处理场景中有更为优秀的表现。第二部分的学习内容是transformer,本周主要学习了transformer的基本架构和编码器的工作原理。
Abstract
This week’s work mainly consists of two parts. The first part is to use LSTM model to predict the upward trend of stock market data. LSTM overcomes the limitations of traditional RNN in time series prediction tasks through its unique structure and mechanism, which makes LSTM perform better in various time series data processing scenarios. The second part of the learning content is a small part of transformer, mainly involving the basic architecture of transformer and the working principle of encoder.
1 LSTM模型实战
在之前的学习基础之上,本周通过Python的Keras库构建LSTM模型,旨在预测时间序列中的步骤和序列在股票市场数据的应用。
实验过程如下:
模型用到的库有Keras 、NumPy 、Matplotlib等
导入代码如下所示:
import numpy as np
import datetime as dt
from numpy import newaxis
from core.utils import Timer
from keras.layers import Dense, Activation, Dropout, LSTM
from keras.models import Sequential, load_model
from keras.callbacks import EarlyStopping, ModelCheckpoint
import matplotlib.pyplot as plt
1.1 数据处理
DataLoader 是一个数据处理工具,能够从 CSV 文件中读取数据,生成训练和测试数据窗口,并在必要时对数据进行归一化。这些功能为 LSTM 模型的训练和评估提供了便利,确保模型可以处理时间序列数据。代码如下所示:
class DataLoader():
"""A class for loading and transforming data for the lstm model"""
def __init__(self, filename, split, cols):
dataframe = pd.read_csv(filename)
i_split = int(len(dataframe) * split)
self.data_train = dataframe.get(cols).values[:i_split]
self.data_test = dataframe.get(cols).values[i_split:]
self.len_train = len(self.data_train)
self.len_test = len(self.data_test)
self.len_train_windows = None
def get_test_data(self, seq_len, normalise):
'''
Create x, y test data windows
Warning: batch method, not generative, make sure you have enough memory to
load data, otherwise reduce size of the training split.
'''
data_windows = []
for i in range(self.len_test - seq_len):
data_windows.append(self.data_test[i:i+seq_len])
data_windows = np.array(data_windows).astype(float)
data_windows = self.normalise_windows(data_windows, single_window=False) if normalise else data_windows
x = data_windows[:, :-1]
y = data_windows[:, -1, [0]]
return x,y
def get_train_data(self, seq_len, normalise):
'''
Create x, y train data windows
Warning: batch method, not generative, make sure you have enough memory to
load data, otherwise use generate_training_window() method.
'''
data_x = []
data_y = []
for i in range(self.len_train - seq_len):
x, y = self._next_window(i, seq_len, normalise)
data_x.append(x)
data_y.append(y)
return np.array(data_x), np.array(data_y)
def generate_train_batch(self, seq_len, batch_size, normalise):
'''Yield a generator of training data from filename on given list of cols split for train/test'''
i = 0
while i < (self.len_train - seq_len):
x_batch = []
y_batch = []
for b in range(batch_size):
if i >= (self.len_train - seq_len):
# stop-condition for a smaller final batch if data doesn't divide evenly
yield np.array(x_batch), np.array(y_batch)
i = 0
x, y = self._next_window(i, seq_len, normalise)
x_batch.append(x)
y_batch.append(y)
i += 1
yield np.array(x_batch), np.array(y_batch)
def _next_window(self, i, seq_len, normalise):
'''Generates the next data window from the given index location i'''
window = self.data_train[i:i+seq_len]
window = self.normalise_windows(window, single_window=True)[0] if normalise else window
x = window[:-1]
y = window[-1, [0]]
return x, y
def normalise_windows(self, window_data, single_window=False):
'''Normalise window with a base value of zero'''
normalised_data = []
window_data = [window_data] if single_window else window_data
for window in window_data:
normalised_window = []
for col_i in range(window.shape[1]):
normalised_col = [((float(p) / float(window[0, col_i])) - 1) for p in window[:, col_i]]
normalised_window.append(normalised_col)
normalised_window = np.array(normalised_window).T # reshape and transpose array back into original multidimensional format
normalised_data.append(normalised_window)
return np.array(normalised_data)
1.2 LSTM模型的搭建
Model 类封装了构建、训练和预测 LSTM 模型的功能,支持多种训练方式和预测方法。它能够处理不同层的添加、模型的编译和训练,并提供了灵活的预测方法,适用于时间序列数据的建模。
代码如下所示:
class Model():
"""A class for an building and inferencing an lstm model"""
def __init__(self):
self.model = Sequential()
def load_model(self, filepath):
print('[Model] Loading model from file %s' % filepath)
self.model = load_model(filepath)
def build_model(self, configs):
timer = Timer()
timer.start()
for layer in configs['model']['layers']:
neurons = layer['neurons'] if 'neurons' in layer else None
dropout_rate = layer['rate'] if 'rate' in layer else None
activation = layer['activation'] if 'activation' in layer else None
return_seq = layer['return_seq'] if 'return_seq' in layer else None
input_timesteps = layer['input_timesteps'] if 'input_timesteps' in layer else None
input_dim = layer['input_dim'] if 'input_dim' in layer else None
if layer['type'] == 'dense':
self.model.add(Dense(neurons, activation=activation))
if layer['type'] == 'lstm':
self.model.add(LSTM(neurons, input_shape=(input_timesteps, input_dim), return_sequences=return_seq))
if layer['type'] == 'dropout':
self.model.add(Dropout(dropout_rate))
self.model.compile(loss=configs['model']['loss'], optimizer=configs['model']['optimizer'])
print('[Model] Model Compiled')
timer.stop()
def train(self, x, y, epochs, batch_size, save_dir):
timer = Timer()
timer.start()
print('[Model] Training Started')
print('[Model] %s epochs, %s batch size' % (epochs, batch_size))
save_fname = os.path.join(save_dir, '%s-e%s.h5' % (dt.datetime.now().strftime('%d%m%Y-%H%M%S'), str(epochs)))
callbacks = [
EarlyStopping(monitor='val_loss', patience=2),
ModelCheckpoint(filepath=save_fname, monitor='val_loss', save_best_only=True)
]
self.model.fit(
x,
y,
epochs=epochs,
batch_size=batch_size,
callbacks=callbacks
)
self.model.save(save_fname)
print('[Model] Training Completed. Model saved as %s' % save_fname)
timer.stop()
def train_generator(self, data_gen, epochs, batch_size, steps_per_epoch, save_dir):
timer = Timer()
timer.start()
print('[Model] Training Started')
print('[Model] %s epochs, %s batch size, %s batches per epoch' % (epochs, batch_size, steps_per_epoch))
save_fname = os.path.join(save_dir, '%s-e%s.h5' % (dt.datetime.now().strftime('%d%m%Y-%H%M%S'), str(epochs)))
callbacks = [
ModelCheckpoint(filepath=save_fname, monitor='loss', save_best_only=True)
]
self.model.fit_generator(
data_gen,
steps_per_epoch=steps_per_epoch,
epochs=epochs,
callbacks=callbacks,
workers=1
)
print('[Model] Training Completed. Model saved as %s' % save_fname)
timer.stop()
def predict_point_by_point(self, data):
#Predict each timestep given the last sequence of true data, in effect only predicting 1 step ahead each time
print('[Model] Predicting Point-by-Point...')
predicted = self.model.predict(data)
predicted = np.reshape(predicted, (predicted.size,))
return predicted
def predict_sequences_multiple(self, data, window_size, prediction_len):
#Predict sequence of 50 steps before shifting prediction run forward by 50 steps
print('[Model] Predicting Sequences Multiple...')
prediction_seqs = []
for i in range(int(len(data)/prediction_len)):
curr_frame = data[i*prediction_len]
predicted = []
for j in range(prediction_len):
predicted.append(self.model.predict(curr_frame[newaxis,:,:])[0,0])
curr_frame = curr_frame[1:]
curr_frame = np.insert(curr_frame, [window_size-2], predicted[-1], axis=0)
prediction_seqs.append(predicted)
return prediction_seqs
def predict_sequence_full(self, data, window_size):
#Shift the window by 1 new prediction each time, re-run predictions on new window
print('[Model] Predicting Sequences Full...')
curr_frame = data[0]
predicted = []
for i in range(len(data)):
predicted.append(self.model.predict(curr_frame[newaxis,:,:])[0,0])
curr_frame = curr_frame[1:]
curr_frame = np.insert(curr_frame, [window_size-2], predicted[-1], axis=0)
return predicted
以其中train方法为例介绍:
def train(self, x, y, epochs, batch_size, save_dir):
timer = Timer()
timer.start()
print('[Model] Training Started')
print('[Model] %s epochs, %s batch size' % (epochs, batch_size))
train 方法接收训练数据 x 和 y,训练的轮数 epochs,批量大小 batch_size 和保存模型的目录 save_dir。
1.3 数据的预测和可视化
绘图函数plot_results_multiple:
def plot_results_multiple(predicted_data, true_data, prediction_len):
fig = plt.figure(facecolor='white')
ax = fig.add_subplot(111)
ax.plot(true_data, label='True Data')
for i, data in enumerate(predicted_data):
padding = [None for p in range(i * prediction_len)]
plt.plot(padding + data, label='Prediction')
plt.legend()
plt.show()
plot_results_multiple的功能是绘制多组预测数据与真实数据的对比图。首先绘制真实数据。对于每组预测数据,先创建填充(padding),以便将每组预测数据在图上正确对齐。通过plt.plot绘制每组预测数据,并使用图例标识。最后显示绘制的图形。
主函数main:
def main():
configs = json.load(open('config.json', 'r'))
if not os.path.exists(configs['model']['save_dir']): os.makedirs(configs['model']['save_dir'])
data = DataLoader(
os.path.join('data', configs['data']['filename']),
configs['data']['train_test_split'],
configs['data']['columns']
)
model = Model()
model.build_model(configs)
x, y = data.get_train_data(
seq_len=configs['data']['sequence_length'],
normalise=configs['data']['normalise']
)
主函数,用于执行整个模型的训练与预测流程。从config.json中加载配置文件,获取模型保存目录和数据相关信息。如果保存目录不存在,则创建该目录。使用DataLoader类加载数据,包括文件路径、训练测试划分比例及需要的列。创建模型实例并构建模型。从数据集中获取训练数据(x为特征,y为标签)。
模型训练与预测
steps_per_epoch = math.ceil((data.len_train - configs['data']['sequence_length']) / configs['training']['batch_size'])
model.train_generator(
data_gen=data.generate_train_batch(
seq_len=configs['data']['sequence_length'],
batch_size=configs['training']['batch_size'],
normalise=configs['data']['normalise']
),
epochs=configs['training']['epochs'],
batch_size=configs['training']['batch_size'],
steps_per_epoch=steps_per_epoch,
save_dir=configs['model']['save_dir']
)
x_test, y_test = data.get_test_data(
seq_len=configs['data']['sequence_length'],
normalise=configs['data']['normalise']
)
predictions = model.predict_sequences_multiple(x_test, configs['data']['sequence_length'], configs['data']['sequence_length'])
plot_results_multiple(predictions, y_test, configs['data']['sequence_length'])
该部分的代码代码则是实现了基于生成器的训练方式,适用于数据量较大的情况。计算每个epoch的步骤数(steps_per_epoch),使用生成器训练模型。data.generate_train_batch生成训练数据的批次。从数据集中获取测试数据(x_test和y_test)。使用模型进行多序列预测。调用plot_results_multiple函数绘制预测结果与真实结果的对比图。
模型的训练过程如下所示:
模型的预测结果如下所示:
可以看到,预训模型预测的趋势与实际股票趋势比较吻合!
2 transformer(上)
Transformer最初由Google Brain团队在2017年提出。它的主要创新点在于完全基于自注意力机制(self-attention)来处理序列数据,而不依赖于传统的递归神经网络(RNN)或卷积神经网络(CNN)。Transformer在自然语言处理(NLP)领域取得了显著的成功,成为了许多先进模型(如BERT、GPT等)的基础。
2.1 Transformer 结构
一般的序列到序列模型会分成编码器和解码器。编码器负责处理输入的序列,再把处理好的结果“丢”给解码器,由解码器决定要输出的序列。序列到序列典型的模型就是 Transformer。
transformer的基本架构如下图所示,后续我们会根据其架构对transformer进行逐步的学习。
2.2 Transformer 编码器
编码器输入一排向量,输出另外一排向量。Transformer的编码器使用的是自注意力,输入一排向量,输出另外一个同样长度的向量。
编码器里面会分成很多的块(block),每一个块都是输入一排向量,输出一排向量,最后一个块会输出最终的向量序列。Transformer 的编码器的每个块并不是神经网络的一层,在每个块里面,输入一排向量后做自注意力,考虑整个序列的信息,输出另外一排向量。接下来这排向量会“丢”到全连接网络网络里面,输出另外一排向量,这一排向量就是块的输出,每个块的工作原理如下图所示。
事实上在原来的 Transformer 里面做的事情是更复杂的。Transformer 里面加入了残差连接(residual connection)的设计。
Residual connection(残差连接): 是一种在深度神经网络中使用的技术,旨在解决深层网络训练中的梯度消失问题。它的核心思想是通过将输入直接添加到输出中,从而使模型能够学习到输入与输出之间的残差(或差异),而不是学习完整的映射。
一个block的运作过程如下:
(1)向量 b 输入到自注意力层后得到向量 a
(2)输出向量 a 加上其输入向量 b 得到新的输出。得到残差的结果
(3)做层归一化(layer normalization),将输出作为FC的输入。
(4)将FC的输入加上其的输出,得到残差的结果
(5)将得到的残差的结果再做一次层归一化得到最终的输出
层归一化不需要考虑批量的信息,而批量归一化需要考虑批量的信息。层归一化输入一个向量,输出另外一个向量,会计算输入向量的平均值和标准差。
批量归一化是对不同样本不同特征的同一个维度去计算均值跟标准差,但层归一化是对同一个特征、同一个样本里面不同的维度去计算均值跟标准差,接着做个归一化。
编码器的详细工作原理如上图所示
总结
经过本周的学习,我对LSTM模型有了进一步的理解,但对模型训练方面还不够熟悉,有许多地方还存在不明白的地方,后续还需要对模型的实际运作过程进行学习,除此在transformer编码器的学习过程中,发现对于过去的一些知识存在遗忘现象,例如Embedding和Multi-head attention,在下周的学习中我会对相关知识进行复习。