复现BERT:
只能说爷今天干了一上午一下午的代码
bert的输入: batch_size * max_len * emb_num @768 * 768 bert的输出:三维字符级别特征(NER可能就更适合) 二维篇章级别特征(比如文本分类可能就更适合) batch_size * max_len * emb_num, batch_size * emb_num
绝对位置编码
from torch.utils.data import Dataset, DataLoader import torch import torch.nn as nn import pandas as pd import sklearn import random import numpy as np class BertEmbedding(nn.Module): def __init__(self, config): super().__init__() self.config = config self.word_embeddings = nn.Embedding(config["vocab_size"], config["hidden_size"]) self.word_embeddings.weight.requires_grad = True self.position_embeddings = nn.Embedding(config["max_len"], config["hidden_size"]) self.position_embeddings.weight.requires_grad = True self.token_type_embeddings = nn.Embedding(config["type_vocab_size"], config["hidden_size"]) self.token_type_embeddings.weight.requires_grad = True self.layernorm = nn.LayerNorm(config["hidden_size"]) self.dropout = nn.Dropout(config["hidden_dropout_pro"]) def forward(self, batch_index, batch_seg_idx): word_emb = self.word_embeddings(batch_index) pos_idx = torch.arange(0, self.position_embeddings.weight.data.shape[0]) pos_idx = pos_idx.repeat(self.config["batch_size"], 1) pos_emb = self.position_embeddings(pos_idx) token_emb = self.token_type_embeddings(batch_seg_idx) emb = word_emb + pos_emb + token_emb layer_norm_emb = self.layernorm(emb) dropout_emb = self.dropout(layer_norm_emb) return dropout_emb class BertModel(nn.Module): def __init__(self, config): super().__init__() self.embedding = BertEmbedding(config) self.bert_layer = nn.Linear(config["hidden_size"], config["hidden_size"]) def forward(self, batch_index, batch_seg_idx): emb = self.embedding(batch_index, batch_seg_idx) bert_out1 = self.bert_layer(emb) return bert_out1 class Model(nn.Module): def __init__(self, config): super().__init__() self.bert = BertModel(config)#batch_size * max_len * emb_num @768 * 768 = batch_size * max_len * emb_num, batch_size * emb_num self.cls_mask = nn.Linear(config["hidden_size"], config["vocab_size"]) self.cls_nsp = nn.Linear(config["hidden_size"], 2) def forward(self, batch_index, batch_seg_idx): bert_out = self.bert(batch_index, batch_seg_idx) def get_data(file_path): all_data = pd.read_csv(file_path) all_data = sklearn.utils.shuffle(all_data) t1 = all_data["text1"].tolist() t2 = all_data["text2"].tolist() l = all_data["label"].tolist() return t1, t2, l class BertDataset(Dataset): def __init__(self, text1, text2, label, max_len, word_2_index): assert len(text1) == len(text2) == len(label), "NSP数据长度不一,复现个锤子!!!" self.text1 = text1 self.text2 = text2 self.label = label self.max_len = max_len self.word_2_index = word_2_index def __getitem__(self, index): #mask_id = [0] * self.max_len mask_v = [0] * self.max_len text1 = self.text1[index] text2 = self.text2[index] label = self.label[index] n = int((self.max_len-4) / 2) text1_id = [self.word_2_index.get(i, self.word_2_index["[UNK]"]) for i in text1][:n] text2_id = [self.word_2_index.get(i, self.word_2_index["[UNK]"]) for i in text2][:n] #text = text1 + text2 text_id = [self.word_2_index["[CLS]"]] + text1_id + [self.word_2_index["[SEP]"]] + text2_id + [self.word_2_index["[SEP]"]] segment_id = [0] + [0] * len(text1_id) + [0] + [1] * len(text2_id) + [1] + [2] * (self.max_len - len(text_id)) text_id = text_id + [self.word_2_index["[PAD]"]] * (self.max_len - len(text_id)) for i, v in enumerate(text_id): if v in [self.word_2_index["[PAD]"], self.word_2_index["[SEP]"], self.word_2_index["[UNK]"]]: continue if random.random() < 0.15: r = random.random() if r < 0.8: text_id[i] = self.word_2_index["[MASK]"] mask_v[i] = v elif r > 0.9: text_id[i] = random.randint(6, len(self.word_2_index)-1) mask_v[i] = v return torch.tensor(text_id), torch.tensor(label), torch.tensor(mask_v), torch.tensor(segment_id) def __len__(self): return len(self.text1) if __name__ == "__main__": text1, text2, label = get_data("..//self_bert//data//self_task2.csv") epoch = 1024 batch_size = 32 max_len = 256 with open("..//self_bert//data//index_2_word.text", "r", encoding="utf-8") as f: index_2_word = f.read().split("\n") word_2_index = {word: index for index, word in enumerate(index_2_word)} config ={ "epoch": epoch, "batch_size": batch_size, "max_len": max_len, "vocab_size": len(word_2_index), "hidden_size": 768, "type_vocab_size": 3, "hidden_dropout_pro": 0.2, } train_dataset = BertDataset(text1, text2, label, max_len, word_2_index) train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False) model = Model(config) for e in range(epoch): print(f"here is the {e}th epoch") for batch_text_index, batch_text_label, batch_mask_value , batch_segment_id in train_dataloader: model.forward(batch_text_index, batch_segment_id)
回家看看花书,也许还会谢谢代码,结束!