打卡
目录
打卡
任务说明
流程
数据准备与加载
加载预训练词向量(分词)
数据集预处理
模型构建
Embedding
RNN(循环神经网络) + LSTM
全连接层
损失函数与优化器
训练逻辑
评估指标和逻辑
模型训练与保存
模型加载与测试
自定义输入测试
代码
任务说明
使用MindSpore实现一个基于RNN网络的情感分类模型
流程
数据准备与加载
1、从 https://mindspore-website.obs.myhuaweicloud.com/notebook/datasets/aclImdb_v1.tar.gz 下载数据集。注意,可用tqdm
库对下载百分比进行可视化、用IO的方式可安全地下载临时文件,而后保存至指定的路径并返回。如下,是下载的数据集展示。
2、将IMDB数据集加载至内存并构造为迭代对象后,使用 mindspore.dataset
提供的Generatordataset
接口加载数据集迭代对象,并进行下一步的数据处理,例子如下,其中 IMDBData 类是 IMDB 数据集加载器,imdb_train
是构建的一个
对象。Generatordataset
import mindspore.dataset as ds
def load_imdb(imdb_path):
imdb_train = ds.GeneratorDataset(
IMDBData(imdb_path, "train"),
column_names=["text", "label"],
shuffle=True,
num_samples=10000)
imdb_test = ds.GeneratorDataset(
IMDBData(imdb_path, "test"),
column_names=["text", "label"],
shuffle=False)
return imdb_train, imdb_test
imdb_train, imdb_test = load_imdb(imdb_path)
加载预训练词向量(分词)
Glove( Global Vectors for Word Representation ) 词向量作为Embedding,是一种无监督学习算法。从 'https://mindspore-website.obs.myhuaweicloud.com/notebook/datasets/glove.6B.zip' 下载数据集。如下图所示。
预训练词向量是对输入单词的数值化表示,通过nn.Embedding
层,采用查表的方式,输入单词对应词表中的index,获得对应的表达向量。
由于数据集中可能存在词表没有覆盖的单词,因此需要加入<unk>
标记符;同时由于输入长度的不一致,在打包为一个batch时需要将短的文本进行填充,因此需要加入<pad>
标记符。 完成后的词表长度为原词表长度+2。mindspore.dataset.text.Vocab 用于创建用于训练NLP模型的Vocab,Vocab是数据集中可能出现的所有Token的集合,保存了各Token与其ID之间的映射关系,其中的函数 from_list(word_list, special_tokens=None, special_first=True) 从给定Token列表创建Vocab, special_tokens 表示追加到Vocab中的Token列表;tokens_to_ids(tokens) 查找指定Token对应的ID。
示例代码如下,根据输出,对应的词表大小 400002 ,向量长度为100。
import zipfile
import numpy as np
def load_glove(glove_path):
glove_100d_path = os.path.join(cache_dir, 'glove.6B.100d.txt')
if not os.path.exists(glove_100d_path):
glove_zip = zipfile.ZipFile(glove_path)
glove_zip.extractall(cache_dir)
embeddings = []
tokens = []
with open(glove_100d_path, encoding='utf-8') as gf:
for glove in gf:
word, embedding = glove.split(maxsplit=1)
tokens.append(word)
embeddings.append(np.fromstring(embedding, dtype=np.float32, sep=' '))
# 添加 <unk>, <pad> 两个特殊占位符对应的embedding
embeddings.append(np.random.rand(100))
embeddings.append(np.zeros((100,), np.float32))
vocab = ds.text.Vocab.from_list(tokens, special_tokens=["<unk>", "<pad>"], special_first=False)
embeddings = np.array(embeddings).astype(np.float32)
return vocab, embeddings
glove_path = download('glove.6B.zip', 'https://mindspore-website.obs.myhuaweicloud.com/notebook/datasets/glove.6B.zip')
vocab, embeddings = load_glove(glove_path) #
print(len(vocab.vocab())) # 400002
print(np.shape(embeddings)) ## (400002, 100) 比原始文件多两行
idx = vocab.tokens_to_ids('the')
embedding = embeddings[idx]
print(f"idx={idx}, embedding={embedding}")
代码运行结果例子。
数据集预处理
- 通过Vocab将所有的 Token 处理为index id。
- 将文本序列统一长度,不足的使用
<pad>
补齐,超出的进行截断。
- 首先针对token 到 index id 的查表操作,使用
mindspore.dataset.text.Lookup(vocab, unknown_token=None, data_type=mstype.int32)
接口,将前文构造的词表加载,并指定unknown_token
。 - 其次为文本序列统一长度操作,使用
dataset.transforms.PadEnd(pad_shape, pad_value=None)
接口,此接口定义最大长度和补齐值(pad_value
),这里取最大长度为500,填充值对应词表中<pad>
的 index id。 - 由于后续模型训练的需要,同时要将
label
数据转为float32格式。 - 接着,手动将IMDB数据集分割为训练和验证两部分,比例取0.7, 0.3。
-
最后,通过
batch(batch_size, drop_remainder=False, num_parallel_workers=None, **kwargs)
接口指定数据集的 batch 大小,,并设置是否丢弃无法被batch size整除的剩余数据。
代码例子
import mindspore as ms
# 根据词表,将分词标记(token)映射到其索引值(id)。
lookup_op = ds.text.Lookup(
vocab, # 词表对象,用于存储分词和索引的映射。
unknown_token='<unk>' # 备用词汇,用于要查找的单词不在词汇表时进行替换。 如果单词不在词汇表中,则查找结果将替换为 unknown_token 的值。 如果单词不在词汇表中,且未指定 unknown_token ,将抛出运行时错误。默认值: None ,不指定该参数。
)
# 对输入Tensor进行填充,要求 pad_shape 与输入Tensor的维度保持一致。
pad_op = ds.transforms.PadEnd(
[500], ## 指定填充的shape。设置为较小的维数时该维度的元素将被截断。
pad_value=vocab.tokens_to_ids('<pad>') ## 用于填充的值。默认 None ,表示不指定填充值。 当指定为默认值,输入Tensor为数值型时默认填充 0 ,输入Tensor为字符型时填充空字符串。
)
type_cast_op = ds.transforms.TypeCast(ms.float32)
imdb_train = imdb_train.map(operations=[lookup_op, pad_op], input_columns=['text'])
imdb_train = imdb_train.map(operations=[type_cast_op], input_columns=['label'])
imdb_test = imdb_test.map(operations=[lookup_op, pad_op], input_columns=['text'])
imdb_test = imdb_test.map(operations=[type_cast_op], input_columns=['label'])
imdb_train, imdb_valid = imdb_train.split([0.7, 0.3])
imdb_train = imdb_train.batch(64, drop_remainder=True)
imdb_valid = imdb_valid.batch(64, drop_remainder=True)
模型构建
- 结构:nn.Embedding -> nn.RNN -> nn.Dense
- 其中,
nn.Embedding
层加载Glove词向量,RNN 层做特征提取,nn.Dense 层
将特征转化为与分类数量相同的size,用于后续进行模型优化训练。 - 这里使用能够一定程度规避RNN梯度消失问题的变种LSTM(Long short-term memory)做特征提取层。
Embedding
mindspore.nn.Embedding(vocab_size, embedding_size, use_one_hot=False, embedding_table='normal', dtype=mstype.float32, padding_idx=None)
用于存储词向量并使用索引进行检索,根据输入Tensor中的id,从 embedding_table 中查询对应的 embedding 向量。当输入为id组成的序列时,输出为对应embedding向量构成的矩阵。当 use_one_hot 等于True时,x的类型必须是mindpore.int32。
-
vocab_size (int) - 词典的大小。如上文,对应的词表大小 400002 。
-
embedding_size (int) - 每个嵌入向量的大小。如上文,向量长度为100。
-
use_one_hot (bool) - 指定是否使用one-hot形式。默认值:
False
。 -
embedding_table (Union[Tensor, str, Initializer, numbers.Number]) - embedding_table的初始化方法。当指定为字符串,字符串取值请参见类 mindspore.common.initializer 。默认值:
"normal"
。 -
dtype (mindspore.dtype) - x的数据类型。默认值:
mstype.float32
。 -
padding_idx (int, None) - 将 padding_idx 对应索引所输出的嵌入向量用零填充。默认值:
None
。该功能已停用。
RNN(循环神经网络) + LSTM
循环神经网络(Recurrent Neural Network, RNN)是一类以序列(sequence)数据为输入,在序列的演进方向进行递归(recursion)且所有节点(循环单元)按链式连接的神经网络。
RNN的结构拆解:
RNN单个Cell的结构简单,因此也造成了梯度消失(Gradient Vanishing)问题,具体表现为RNN网络在序列较长时,在序列尾部已经基本丢失了序列首部的信息。为了克服这一问题,LSTM(Long short-term memory)被提出,通过门控机制(Gating Mechanism)来控制信息流在每个循环步中的留存和丢弃。选择LSTM变种而不是经典的RNN做特征提取,来规避梯度消失问题,可以获得更好的模型效果。
mindspore.nn.LSTM(*args, **kwargs)
长短期记忆(LSTM)网络,根据输入序列和给定的初始状态计算输出序列和最终状态。在LSTM模型中,有两条管道连接两个连续的Cell,一条是Cell状态管道,另一条是隐藏状态管道。将两个连续的时间节点表示为 t−1 和 t。指定在 t 时刻输入 , t-1 时刻的隐藏状态 和Cell状态 。
t 时刻的Cell状态 和隐藏状态 使用门控机制计算得到。
输入门 计算出候选值。遗忘门 决定是否让 学到的信息通过或部分通过。
输出门 决定哪些信息输出。
候选Cell状态 是用当前输入计算的。
最后,使用遗忘门、输入门、输出门计算得到当前时刻的Cell状态 和隐藏状态 。
如下公式,𝜎 是sigmoid激活函数, ∗ 是乘积。 𝑊, 𝑏 是公式中输出和输入之间的可学习权重。例如, 、 是用于从输入 𝑥 转换为 𝑖 的权重和偏置。
MindSpore中的LSTM隐藏了整个循环神经网络在序列时间步(Time step)上的循环(同pyTorch),送入输入序列、初始状态,即可获得每个时间步的隐藏状态(hidden state)拼接而成的矩阵,以及最后一个时间步对应的隐状态。我们使用最后的一个时间步的隐藏状态作为输入句子的编码特征,送入下一层。LSTM 公式为:
全连接层
全连接层,即 nn.Dense (in_channels, out_channels, weight_init=None, bias_init=None, has_bias=True, activation=None, dtype=mstype.float32)
将特征维度变换为二分类所需的维度1,经过Dense层后的输出即为模型预测结果。
其中公式为 outputs = activation(X * kernel + bias) ,activation 是激活函数,kernel 是权重矩阵,bias 是偏置向量。
模型构建的示例代码如下:
import math
import mindspore as ms
import mindspore.nn as nn
import mindspore.ops as ops
from mindspore.common.initializer import Uniform, HeUniform
class RNN(nn.Cell):
def __init__(self, embeddings,
hidden_dim,
output_dim,
n_layers,
bidirectional,
pad_idx):
super().__init__()
vocab_size, embedding_dim = embeddings.shape
self.embedding = nn.Embedding(
vocab_size,
embedding_dim,
embedding_table=ms.Tensor(embeddings),
padding_idx=pad_idx)
self.rnn = nn.LSTM(embedding_dim,
hidden_dim,
num_layers=n_layers,
bidirectional=bidirectional,
batch_first=True)
weight_init = HeUniform(math.sqrt(5))
bias_init = Uniform(1 / math.sqrt(hidden_dim * 2))
self.fc = nn.Dense(hidden_dim * 2,
output_dim,
weight_init=weight_init,
bias_init=bias_init)
def construct(self, inputs):
embedded = self.embedding(inputs)
_, (hidden, _) = self.rnn(embedded)
hidden = ops.concat((hidden[-2, :, :], hidden[-1, :, :]), axis=1)
output = self.fc(hidden)
return output
损失函数与优化器
针对本节情感分类问题的特性,即预测Positive或Negative的二分类问题,选择nn.BCEWithLogitsLoss(reduction='mean', weight=None, pos_weight=None)
(二分类交叉熵损失函数)。
训练逻辑
一般训练逻辑分为一下步骤:
- 读取一个Batch的数据;
- 送入网络,进行正向计算和反向传播,更新权重;
- 返回loss。
grad_fn = mindspore.value_and_grad(forward_fn, None, optimizer.parameters) 生成求导函数,用于计算给定函数的正向计算结果和梯度。
评估指标和逻辑
模型评估:使用模型的预测结果和测试集的正确标签进行对比,求出预测的准确率。
由于IMDB的情感分类为二分类问题,对预测值直接进行四舍五入即可获得分类标签(0或1),然后判断是否与正确标签相等即可。下面为二分类准确率计算函数实现:
def binary_accuracy(preds, y):
"""
计算每个batch的准确率
"""
# 对预测值进行四舍五入
rounded_preds = np.around(ops.sigmoid(preds).asnumpy())
correct = (rounded_preds == y).astype(np.float32)
acc = correct.sum() / len(correct)
return acc
模型评估逻辑设计步骤:
- 读取一个Batch的数据;
- 送入网络,进行正向计算,获得预测结果;
- 计算准确率。
def evaluate(model, test_dataset, criterion, epoch=0):
total = test_dataset.get_dataset_size()
epoch_loss = 0
epoch_acc = 0
step_total = 0
## 在进行evaluate前,通过model.set_train(False)将模型置为评估状态,此时Dropout不生效。
model.set_train(False)
with tqdm(total=total) as t:
# 使用tqdm进行loss和过程的可视化。
t.set_description('Epoch %i' % epoch)
for i in test_dataset.create_tuple_iterator():
## 进行evaluate时,使用的模型是不包含损失函数和优化器的网络主体
predictions = model(i[0])
loss = criterion(predictions, i[1])
epoch_loss += loss.asnumpy()
acc = binary_accuracy(predictions, i[1])
epoch_acc += acc
step_total += 1
t.set_postfix(loss=epoch_loss/step_total, acc=epoch_acc/step_total)
t.update(1)
return epoch_loss / total
模型训练与保存
模型训练,设置5轮。同时维护一个用于保存最优模型的变量best_valid_loss
,根据每一轮评估的loss值,取loss值最小的轮次,将模型进行保存。为节省用例运行时长,此处num_epochs设置为3 。
num_epochs = 3
best_valid_loss = float('inf')
ckpt_file_name = os.path.join(cache_dir, 'sentiment-analysis.ckpt')
for epoch in range(num_epochs):
train_one_epoch(model, imdb_train, epoch)
valid_loss = evaluate(model, imdb_valid, loss_fn, epoch)
if valid_loss < best_valid_loss:
best_valid_loss = valid_loss
ms.save_checkpoint(model, ckpt_file_name)
模型加载与测试
加载已保存的最优模型(即checkpoint),供后续测试使用。
直接使用MindSpore提供的Checkpoint加载和网络权重加载接口:1.将保存的模型Checkpoint加载到内存中,2.将Checkpoint加载至模型。
param_dict = ms.load_checkpoint(ckpt_file_name)
ms.load_param_into_net(model, param_dict)
## 对测试集打batch,然后使用evaluate方法进行评估,得到模型在测试集上的效果。
imdb_test = imdb_test.batch(64)
evaluate(model, imdb_test, loss_fn)
如下测试集效果,一般,有空了可以调一调训练参数。比如LSTM层数、学习率等。
自定义输入测试
输入一句评价,获得评价的情感分类.
score_map = {
1: "Positive",
0: "Negative"
}
def predict_sentiment(model, vocab, sentence):
model.set_train(False)
tokenized = sentence.lower().split()
indexed = vocab.tokens_to_ids(tokenized)
tensor = ms.Tensor(indexed, ms.int32)
tensor = tensor.expand_dims(0)
prediction = model(tensor)
return score_map[int(np.round(ops.sigmoid(prediction).asnumpy()))]
predict_sentiment(model, vocab, "This film is terrible")
predict_sentiment(model, vocab, "This film is great")
predict_sentiment(model, vocab, "This movie is not good, but i like it")
代码
import os
import shutil
import requests
import tempfile
from tqdm import tqdm
from typing import IO
from pathlib import Path
import re
import six
import string
import tarfile
import mindspore.dataset as ds
import zipfile
import numpy as np
import mindspore as ms
# 指定保存路径为 `home_path/.mindspore_examples`
cache_dir = Path.home() / '.mindspore_examples'
def http_get(url: str, temp_file: IO):
"""使用requests库下载数据,并使用tqdm库进行流程可视化"""
req = requests.get(url, stream=True)
content_length = req.headers.get('Content-Length')
total = int(content_length) if content_length is not None else None
progress = tqdm(unit='B', total=total)
for chunk in req.iter_content(chunk_size=1024):
if chunk:
progress.update(len(chunk))
temp_file.write(chunk)
progress.close()
def download(file_name: str, url: str):
"""下载数据并存为指定名称"""
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)
cache_path = os.path.join(cache_dir, file_name)
cache_exist = os.path.exists(cache_path)
if not cache_exist:
with tempfile.NamedTemporaryFile() as temp_file:
http_get(url, temp_file)
temp_file.flush()
temp_file.seek(0)
with open(cache_path, 'wb') as cache_file:
shutil.copyfileobj(temp_file, cache_file)
return cache_path
class IMDBData():
"""IMDB数据集加载器
加载IMDB数据集并处理为一个Python迭代对象。
"""
# label_map是一个类属性,它是一个字典,将影评的情感标签映射为数值("pos"代表正面评价,映射为1;"neg"代表负面评价,映射为0)。
label_map = {
"pos": 1,
"neg": 0
}
def __init__(self, path, mode="train"):
# 构造函数接受两个参数:path(数据集的路径)和mode(模式,默认为"train",可能还有"test"等)。
# 初始化实例变量mode和path,以及两个空列表docs和labels用于存储文档内容和对应的标签。然后调用_load方法加载正面和负面评价的数据。
self.mode = mode
self.path = path
self.docs, self.labels = [], []
self._load("pos")
self._load("neg")
def _load(self, label):
pattern = re.compile(r"aclImdb/{}/{}/.*\.txt$".format(self.mode, label))
# 将数据加载至内存
with tarfile.open(self.path) as tarf:
tf = tarf.next()
while tf is not None:
if bool(pattern.match(tf.name)):
# 对文本进行分词、去除标点和特殊字符、小写处理
self.docs.append(str(tarf.extractfile(tf).read().rstrip(six.b("\n\r"))
.translate(None, six.b(string.punctuation)).lower()).split())
self.labels.append([self.label_map[label]])
tf = tarf.next()
def __getitem__(self, idx):
return self.docs[idx], self.labels[idx]
def __len__(self):
return len(self.docs)
def load_imdb(imdb_path):
imdb_train = ds.GeneratorDataset(IMDBData(imdb_path, "train"), column_names=["text", "label"], shuffle=True, num_samples=10000)
imdb_test = ds.GeneratorDataset(IMDBData(imdb_path, "test"), column_names=["text", "label"], shuffle=False)
return imdb_train, imdb_test
def load_glove(glove_path):
glove_100d_path = os.path.join(cache_dir, 'glove.6B.100d.txt')
if not os.path.exists(glove_100d_path):
glove_zip = zipfile.ZipFile(glove_path)
glove_zip.extractall(cache_dir)
embeddings = []
tokens = []
with open(glove_100d_path, encoding='utf-8') as gf:
for glove in gf:
word, embedding = glove.split(maxsplit=1)
tokens.append(word)
embeddings.append(np.fromstring(embedding, dtype=np.float32, sep=' '))
# 添加 <unk>, <pad> 两个特殊占位符对应的embedding
embeddings.append(np.random.rand(100))
embeddings.append(np.zeros((100,), np.float32))
vocab = ds.text.Vocab.from_list(tokens, special_tokens=["<unk>", "<pad>"], special_first=False)
embeddings = np.array(embeddings).astype(np.float32)
return vocab, embeddings
imdb_path = download('aclImdb_v1.tar.gz', 'https://mindspore-website.obs.myhuaweicloud.com/notebook/datasets/aclImdb_v1.tar.gz')
glove_path = download('glove.6B.zip', 'https://mindspore-website.obs.myhuaweicloud.com/notebook/datasets/glove.6B.zip')
imdb_train = IMDBData(imdb_path, 'train')
print(f"train dataset len: {len(imdb_train)}") ### pos + neg = 25000 === train/test
imdb_train, imdb_test = load_imdb(imdb_path) ## imdb_train 是构建的一个 mindspore.dataset.Generatordataset 对象。
vocab, embeddings = load_glove(glove_path)
print(len(vocab.vocab())) ## 400002, <mindspore.dataset.text.utils.Vocab object at 0xfffe9a9e38b0>
print(np.shape(embeddings)) ## (400002, 100)
## 例子
idx = vocab.tokens_to_ids('the')
embedding = embeddings[idx]
print(f"idx={idx}, embedding={embedding}, len embedding={len(embedding)}")
idx = vocab.tokens_to_ids('it')
embedding = embeddings[idx]
print(f"idx={idx}, embedding={embedding}, len embedding={len(embedding)}")
# 数据预处理
lookup_op = ds.text.Lookup(vocab, unknown_token='<unk>')
pad_op = ds.transforms.PadEnd([500], pad_value=vocab.tokens_to_ids('<pad>'))
type_cast_op = ds.transforms.TypeCast(ms.float32)
imdb_train = imdb_train.map(operations=[lookup_op, pad_op], input_columns=['text'])
imdb_train = imdb_train.map(operations=[type_cast_op], input_columns=['label'])
imdb_test = imdb_test.map(operations=[lookup_op, pad_op], input_columns=['text'])
imdb_test = imdb_test.map(operations=[type_cast_op], input_columns=['label'])
imdb_train, imdb_valid = imdb_train.split([0.7, 0.3])
print(f"len imdb_train = ", len(imdb_train))
imdb_train = imdb_train.batch(64, drop_remainder=True)
imdb_valid = imdb_valid.batch(64, drop_remainder=True)
print(f"len imdb_train = ", len(imdb_train) * 64)
###############################################################################3
## model construct
import math
import mindspore as ms
import mindspore.nn as nn
import mindspore.ops as ops
from mindspore.common.initializer import Uniform, HeUniform
class RNN(nn.Cell):
def __init__(self, embeddings, hidden_dim, output_dim, n_layers,
bidirectional, pad_idx):
super().__init__()
vocab_size, embedding_dim = embeddings.shape
self.embedding = nn.Embedding(vocab_size, embedding_dim, embedding_table=ms.Tensor(embeddings), padding_idx=pad_idx)
self.rnn = nn.LSTM(embedding_dim, ## 100
hidden_dim, ##
num_layers=n_layers,
bidirectional=bidirectional,
batch_first=True)
weight_init = HeUniform(math.sqrt(5))
bias_init = Uniform(1 / math.sqrt(hidden_dim * 2))
self.fc = nn.Dense(hidden_dim * 2, output_dim, weight_init=weight_init, bias_init=bias_init)
def construct(self, inputs):
embedded = self.embedding(inputs)
_, (hidden, _) = self.rnn(embedded)
hidden = ops.concat((hidden[-2, :, :], hidden[-1, :, :]), axis=1)
output = self.fc(hidden)
return output
def forward_fn(data, label):
logits = model(data)
loss = loss_fn(logits, label)
return loss
def train_step(data, label):
loss, grads = grad_fn(data, label)
optimizer(grads)
return loss
def train_one_epoch(model, train_dataset, epoch=0):
model.set_train()
total = train_dataset.get_dataset_size()
loss_total = 0
step_total = 0
with tqdm(total=total) as t:
t.set_description('Epoch %i' % epoch)
for i in train_dataset.create_tuple_iterator():
loss = train_step(*i)
loss_total += loss.asnumpy()
step_total += 1
t.set_postfix(loss=loss_total/step_total)
t.update(1)
def binary_accuracy(preds, y):
"""
二分类准确率计算函数
计算每个batch的准确率
"""
# 对预测值进行四舍五入
rounded_preds = np.around(ops.sigmoid(preds).asnumpy())
correct = (rounded_preds == y).astype(np.float32)
acc = correct.sum() / len(correct)
return acc
def evaluate(model, test_dataset, criterion, epoch=0):
total = test_dataset.get_dataset_size()
epoch_loss = 0
epoch_acc = 0
step_total = 0
## 在进行evaluate前,需要通过model.set_train(False)将模型置为评估状态,此时Dropout不生效。
model.set_train(False)
with tqdm(total=total) as t:
# 使用tqdm进行loss和过程的可视化。
t.set_description('Epoch %i' % epoch)
for i in test_dataset.create_tuple_iterator():
## 进行evaluate时,使用的模型是不包含损失函数和优化器的网络主体
predictions = model(i[0])
loss = criterion(predictions, i[1])
epoch_loss += loss.asnumpy()
acc = binary_accuracy(predictions, i[1])
epoch_acc += acc
step_total += 1
t.set_postfix(loss=epoch_loss/step_total, acc=epoch_acc/step_total)
t.update(1)
return epoch_loss / total
hidden_size = 256 ## 输入size
output_size = 1 ## 输出size
num_layers = 2 ## 层级
bidirectional = True
lr = 0.001 ## 学习率
pad_idx = vocab.tokens_to_ids('<pad>') ## tokens_to_ids(tokens) 查找指定Token对应的ID。
model = RNN(embeddings, hidden_size, output_size, num_layers, bidirectional, pad_idx)
loss_fn = nn.BCEWithLogitsLoss(reduction='mean')
optimizer = nn.Adam(model.trainable_params(), learning_rate=lr)
print("model = ", model)
print("loss_fn = ", loss_fn)
print("optimizer = ", optimizer)
## 生成求导函数,用于计算给定函数的正向计算结果和梯度。
grad_fn = ms.value_and_grad(forward_fn, None, optimizer.parameters)
### 模型训练,设置5轮。同时维护一个用于保存最优模型的变量best_valid_loss,根据每一轮评估的loss值,取loss值最小的轮次,将模型进行保存。
num_epochs = 5
best_valid_loss = float('inf')
ckpt_file_name = os.path.join(cache_dir, 'sentiment-analysis.ckpt')
for epoch in range(num_epochs):
train_one_epoch(model, imdb_train, epoch)
valid_loss = evaluate(model, imdb_valid, loss_fn, epoch)
if valid_loss < best_valid_loss:
best_valid_loss = valid_loss
ms.save_checkpoint(model, ckpt_file_name)
## 加载已保存的最优模型(即checkpoint),供后续测试使用。
param_dict = ms.load_checkpoint(ckpt_file_name)
ms.load_param_into_net(model, param_dict)
## 对测试集打batch,然后使用evaluate方法进行评估,得到模型在测试集上的效果。
imdb_test = imdb_test.batch(64)
evaluate(model, imdb_test, loss_fn)
################################自定义输入测试
## 输入一句评价,获得评价的情感分类.
score_map = {
1: "Positive",
0: "Negative"
}
def predict_sentiment(model, vocab, sentence):
model.set_train(False)
tokenized = sentence.lower().split()
indexed = vocab.tokens_to_ids(tokenized)
tensor = ms.Tensor(indexed, ms.int32)
tensor = tensor.expand_dims(0)
prediction = model(tensor)
print(f"prediction={prediction}")
return score_map[int(np.round(ops.sigmoid(prediction).asnumpy()))]
predict_sentiment(model, vocab, "This film is terrible")
predict_sentiment(model, vocab, "This film is great")
predict_sentiment(model, vocab, "This movie is not good, but i like it")