本文目录
- 前述
- 一、环境依赖
- 二、数据准备
- 1. 数据加载
- 2. 构建单词表
- 程序解析
- (1)将列表里每个子列表的所有单词合并到一个新列表(没有子列表)中。
- (2)Counter()-- 统计迭代对象各元素出现次数,并按次数从多到少排序
- (3)获取出现频率最高的前 50000 个元素及其个数。
- (4) 建立字典word_dict{ }:存放元素及其索引号
- (5) 建立字典index_dict{ }---{ 索引号:元素 }
- 3. 将英文、中文单词列表转为单词索引列表
- 4. 划分batch
- 三、模型搭建
前述
基础请查看:Transformer基础查看地址!
一、环境依赖
nltk==3.5
numpy==1.18.5
seaborn==0.11.1
matplotlib==3.3.2
psyco==1.6
zhtools==0.0.5
#torch==1.12.1 安装torch时使用下面的命令
pip install torch==1.12.1+cu113 torchvision==0.13.1+cu113 torchaudio==0.12.1 --extra-index-url https://download.pytorch.org/whl/cu113 -i https://pypi.tuna.tsinghua.edu.cn/simple
代码导入包 :
import copy
import math
import matplotlib.pyplot as plt
import numpy as np
import os
import seaborn as sns
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import Counter
from langconv import Converter
from nltk import word_tokenize
from torch.autograd import Variable
二、数据准备
数据集可以去网络上下载,下面的是train.txt文件部分内容,前面为英文,后面为繁体中文,中间以'\t'
隔开。其他数据文件也相同。
这里数据集是英文和繁体中文,所以第一步我们需要将繁体中文变为简体中文。
转换代码如下:
def cht_to_chs(sent):
"""" zh-hans" 是一个语言代码,用于指代中文(汉语)的简体字形式。在国际化和本地化领域,语言代码用于标识特定语言或语言变体。
在这里,"zh" 表示汉语(中文),"hans" 表示简体字形式。因此,"zh-hans" 表示简体中文。"""
sent = Converter("zh-hans").convert(sent)
sent = sent.encode("utf-8")
return sent
1. 数据加载
作用:读取数据路径下的完整句子,将每个句子分割为一个一个的单词,并存到子列表中。返回含有子列表的列表,
"""参数参数path 为数据的路径,如下
train_file= 'nmt/en-cn/train.txt' # 训练集
dev_file= "nmt/en-cn/dev.txt" # 验证集
load_data(train_file)
"""
def load_data(self, path):
"""
读取英文、中文数据
对每条样本分词并构建包含起始符和终止符的单词列表
"""
en = [] #定义英文列表
cn = [] #定义中文列表
with open(path, mode="r", encoding="utf-8") as f: #只读的形式打开文件路径,文件描述符为f。
for line in f.readlines(): #按行读取
sent_en, sent_cn = line.strip().split("\t") #以‘\t’进行分割,前面的赋给sent_en,后面的赋给sent_cn 。
sent_en = sent_en.lower() #将英文转换为小写。
sent_cn = cht_to_chs(sent_cn) #将繁体中文转为简体中文。
""" word_tokenize() 是 NLTK库中的一个函数,用于将文本分词成单词(token)。
它可以将一个句子或文本分解成一个个单词或标点符号,用于处理英文句子"""
sent_en = ["BOS"] + word_tokenize(sent_en) + ["EOS"]
# 中文按字符切分
sent_cn = ["BOS"] + [char for char in sent_cn] + ["EOS"]
en.append(sent_en) #将切割好的英文 存入英文列表。包含['BOS', 'i', 'love', 'you', 'EOS']
cn.append(sent_cn) #将切割好的中文 存入中文列表。
return en, cn #返回两个单词列表
"""
输出列表格式如下:
en = [
['BOS','I', 'love', 'natural', 'language', 'processing', '.', 'EOS'] ,
['BOS', 'Natural', 'language', 'processing', 'is', 'fascinating', '.', 'EOS']
]
"""
2. 构建单词表
"""输入参数
train_en, train_cn = load_data(train_file)
build_dict(train_en) 这里的输入为单词列表。
输入列表如下:
train_en= [
['BOS','I', 'love', 'natural', 'language', 'processing', '.', 'EOS'] ,
['BOS', 'Natural', 'language', 'processing', 'is', 'fascinating', '.', 'EOS']
]
"""
PAD = 0 # padding占位符的索引
UNK = 1 # 未登录词标识符的索引
def build_dict(self, sentences, max_words=5e4):
"""
构造分词后的列表数据
构建单词-索引映射(key为单词,value为id值)
"""
# 统计数据集中单词词频
word_count = Counter([word for sent in sentences for word in sent])
# 按词频保留前max_words个单词构建词典
# 添加UNK和PAD两个单词
ls = word_count.most_common(int(max_words))
total_words = len(ls) + 2
word_dict = {word [0]: index + 2 for index, word in enumerate(ls)}
word_dict['UNK'] = UNK
word_dict['PAD'] = PAD
# 构建id2word映射
index_dict = {v: k for k, v in word_dict.items()}
return word_dict, total_words, index_dict
程序解析
(1)将列表里每个子列表的所有单词合并到一个新列表(没有子列表)中。
将sentences里面每句话的每个单词组合形成一个新的列表。
sentences = [
['BOS','I', 'love', 'natural', 'language', 'processing', '.', 'EOS'] ,
['BOS', 'Natural', 'language', 'processing', 'is', 'fascinating', '.', 'EOS']
]
word_list = [word for sent in sentences for word in sent]
"""
另一种写法:
word_list = []
for sent in sentences:
for word in sent:
word_list.append(word)
"""
print(word_list )
"""
输出: ['BOS', 'I', 'love', 'natural', 'language', 'processing', '.', 'EOS', 'BOS', 'Natural', 'language', 'processing', 'is', 'fascinating', '.', 'EOS']
"""
(2)Counter()-- 统计迭代对象各元素出现次数,并按次数从多到少排序
from collections import Counter
#Python 中的一个内置数据结构
# 定义一个列表
word_list = ['BOS', 'I', 'love', 'natural', 'language', 'processing', '.', 'EOS', 'BOS', 'Natural', 'language', 'processing', 'is', 'fascinating', '.', 'EOS']
# 使用 Counter 统计列表中各元素的出现次数
word_count = Counter(word_list)
print(word_count )
"""
输出: Counter({'BOS': 2, 'language': 2, 'processing': 2, '.': 2, 'EOS': 2, 'I': 1, 'love': 1, 'natural': 1, 'Natural': 1, 'is': 1, 'fascinating': 1})
"""
(3)获取出现频率最高的前 50000 个元素及其个数。
from collections import Counter
word_count = Counter({'BOS': 2, 'language': 2, 'processing': 2, '.': 2, 'EOS': 2, 'I': 1, 'love': 1, 'natural': 1, 'Natural': 1, 'is': 1, 'fascinating': 1})
ls = word_count.most_common(int(5e4))#返回列表中频率最高的元素和它们的计数,按照计数从高到低排序。频率最高的前 50000 个元素。
print(ls)
"""
输出:
[('BOS', 2), ('language', 2), ('processing', 2), ('.', 2), ('EOS', 2), ('I', 1), ('love', 1), ('natural', 1), ('Natural', 1), ('is', 1), ('fascinating', 1)]
"""
(4) 建立字典word_dict{ }:存放元素及其索引号
enumerate(可迭代元素),返回的第一个值为索引,第二个值为元素。
ls = [('BOS', 2), ('language', 2), ('processing', 2), ('.', 2), ('EOS', 2), ('I', 1), ('love', 1), ('natural', 1), ('Natural', 1), ('is', 1), ('fascinating', 1)]
word_dict = {word [0]: index + 2 for index, word in enumerate(ls)}
"""另一种写法:
word_dict = {}
for index, word in enumerate(ls):
word_dict[ word[0] ] = index + 2
print(word_dict)
"""
print(word_dict) #存放元素及其索引号
"""
输出: {'BOS': 2, 'language': 3, 'processing': 4, '.': 5, 'EOS': 6, 'I': 7, 'love': 8, 'natural': 9, 'Natural': 10, 'is': 11, 'fascinating': 12}
"""
word_dict['UNK'] = 1
word_dict['PAD'] = 0
print(word_dict)
"""
输出:{'BOS': 2, 'language': 3, 'processing': 4, '.': 5, 'EOS': 6, 'I': 7, 'love': 8, 'natural': 9, 'Natural': 10, 'is': 11, 'fascinating': 12, 'UNK': 1, 'PAD': 0}
"""
(5) 建立字典index_dict{ }—{ 索引号:元素 }
word_dict= {'BOS': 2, 'language': 3, 'processing': 4, '.': 5, 'EOS': 6, 'I': 7, 'love': 8, 'natural': 9, 'Natural': 10, 'is': 11, 'fascinating': 12, 'UNK': 1, 'PAD': 0}
index_dict = {v: k for k, v in word_dict.items()}
print(index_dict)
"""
输出:{2: 'BOS', 3: 'language', 4: 'processing', 5: '.', 6: 'EOS', 7: 'I', 8: 'love', 9: 'natural', 10: 'Natural', 11: 'is', 12: 'fascinating', 1: 'UNK', 0: 'PAD'}
"""
3. 将英文、中文单词列表转为单词索引列表
def word2id(self, en, cn, en_dict, cn_dict, sort=True):
"""
将英文、中文单词列表转为单词索引列表
`sort=True`表示以英文语句长度排序,以便按批次填充时,同批次语句填充尽量少
"""
length = len(en)
# 单词映射为索引
out_en_ids = [[en_dict.get(word, UNK) for word in sent] for sent in en]
out_cn_ids = [[cn_dict.get(word, UNK) for word in sent] for sent in cn]
# 按照语句长度排序
def len_argsort(seq):
"""
传入一系列语句数据(分好词的列表形式),
按照语句长度排序后,返回排序后原来各语句在数据中的索引下标
"""
return sorted(range(len(seq)), key=lambda x: len(seq[x]))
# 按相同顺序对中文、英文样本排序
if sort:
# 以英文语句长度排序
sorted_index = len_argsort(out_en_ids)
out_en_ids = [out_en_ids[idx] for idx in sorted_index]
out_cn_ids = [out_cn_ids[idx] for idx in sorted_index]
return out_en_ids, out_cn_ids
4. 划分batch
def split_batch(self, en, cn, batch_size, shuffle=True):
"""
划分批次
`shuffle=True`表示对各批次顺序随机打乱
"""
# 每隔batch_size取一个索引作为后续batch的起始索引
idx_list = np.arange(0, len(en), batch_size)
# 起始索引随机打乱
if shuffle:
np.random.shuffle(idx_list)
# 存放所有批次的语句索引
batch_indexs = []
for idx in idx_list:
"""
形如[array([4, 5, 6, 7]),
array([0, 1, 2, 3]),
array([8, 9, 10, 11]),
...]
"""
# 起始索引最大的批次可能发生越界,要限定其索引
batch_indexs.append(np.arange(idx, min(idx + batch_size, len(en))))
# 构建批次列表
batches = []
for batch_index in batch_indexs:
# 按当前批次的样本索引采样
batch_en = [en[index] for index in batch_index]
batch_cn = [cn[index] for index in batch_index]
# 对当前批次中所有语句填充、对齐长度
# 维度为:batch_size * 当前批次中语句的最大长度
batch_cn = seq_padding(batch_cn)
batch_en = seq_padding(batch_en)
# 将当前批次添加到批次列表
# Batch类用于实现注意力掩码
batches.append(Batch(batch_en, batch_cn))
return batches