目录
- 前言
- 总体设计
- 系统整体结构图
- 系统流程图
- 运行环境
- Python 环境
- 服务器环境
- 模块实现
- 1. 构造数据集
- 2. 识别网络
- 3. 命名实体纠错
- 4. 检索问题类别
- 5. 查询结果
- 系统测试
- 1. 命名实体识别网络测试
- 2. 知识图谱问答系统整体测试
- 工程源代码下载
- 其它资料下载
前言
这个项目充分利用了Google的Bert模型,这是一种基于Attention的大规模语料预训练模型,以及LSTM命名实体识别网络。项目的目标是设计一套通用的问答系统处理逻辑,以实现智能问答任务。
首先,我们采用了Bert模型,这是一种在自然语言处理领域非常强大的预训练模型。它具备对上下文的深刻理解和信息抽取能力,有助于理解复杂的自然语言问题。
接着,我们构建了一个LSTM命名实体识别网络。这个网络可以识别文本中的命名实体,例如人名、地名、组织机构等。这对于问答系统的准确性非常重要,因为它有助于识别问题中提到的实体,并提供相关信息。
在项目的设计中,我们着重考虑了通用的处理逻辑,使得问答系统能够适应各种不同领域和主题的问题。这个通用逻辑包括问题解析、文本理解、命名实体识别、答案生成等步骤。
最终,我们成功地实现了一个智能问答系统,它可以接受用户提出的问题,并基于Bert模型和LSTM命名实体识别网络,理解问题并提供精确的答案。这个系统的通用性使得它在多个领域和应用中都具有广泛的潜力,从解答常见问题到处理专业领域的知识查询。
总体设计
本部分包括系统整体结构图和系统流程图。
系统整体结构图
系统整体结构如图所示。
系统流程图
系统流程如图所示。
Neo4j数据库流程如图所示。
运行环境
本部分包括 Python 环境和服务器环境。
Python 环境
需要Python 3.7及以上配置,在Windows环境下载Anaconda完成Python所需的配置,下载地址为https://www.anaconda.com/,也可下载虚拟机在Linux环境下运行代码。TensorFlow 1.0, NumPy, py-Levenshtein, jieba, Scikit-learn 依据根目录文件requirement.txt下载。
pip install -r requirement.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
服务器环境
Mac/Windows 10用户可直接从终端通过SSH (Secure Shell)访问服务器。Windows 7用户可安装OpenSSH访问。
OpenSSH是SSH协议的免费开源实现,可以进行远程控制,或在计算机之间传送文件。实现此功能的传统方式,会使用明文传送密码。缺点:Telnet(终端仿真协议)、RCP、FTP、 Login、 RSH不安全。
OpenSSH提供了服务器端后台程序和客户端工具,用来加密远程控制和文件传输过程中的数据,并由此代替原来的类似服务。下 载地址为https://www.mls-software.com/opensshd.html,下载后按照默认完成安装即可。打开cmd命令窗口即可远程操作,如图所示。
模块实现
本项目包括5个模块:构造数据集、识别网络、命名实体纠错、检索问题类别、查询结果,下面分别给出各模块的功能介绍及相关代码。
1. 构造数据集
数据是从北京邮电大学图书馆网站爬取,主要包含教师的电话、研究方向、性别,以及课程的学分、开设学期等信息。通过循环语句按照中文习惯将爬取的信息构造为问句的形式,并对构造的语句进行标注,无用实体标记为0,将有用实体分为三类: TEA (老师)、COU (课程)、DIR (研究方向)。标注方式为实体开头B——实体类别标注,非实体开头为I——实体类别标注,训练集数据如图所示。
加载训练集相关代码如下:
def _read_data(cls, input_file):
#读取数据集文件
with codecs.open(input_file,'r',encoding='utf-8') as f:
lines = []
words = []
labels = []
for line in f:
contends = line.strip()
tokens = contends.split('\t')
if len(tokens) == 2:
words.append(tokens[0])
labels.append(tokens[1])
else:
if len(contends) == 0:
l=''.join([label for label in labels if len(label) > 0])
w = ' '.join([word for word in words if len(word) > 0])
lines.append([l, w])
words = []
labels = []
continue
if contends.startswith("-DOCSTART-"):
words.append('')
continue
return lines
#读取训练集
def get_train_examples(self, data_dir):
return self._create_example(
self._read_data(os.path.join(data_dir, "train.txt")), "train"
)
#读取验证集
def get_dev_examples(self, data_dir):
return self._create_example(
self._read_data(os.path.join(data_dir,"dev.txt")),"dev"
)
#读取测试集
def get_test_examples(self, data_dir):
return self._create_example(
self._read_data(os.path.join(data_dir, "test.txt")), "test")
2. 识别网络
使用Google的Bert,调用LSTM模型代码,加以修改,进行训练。
def train_ner(): #定义训练
import os
from bert_base.train.train_helper import get_args_parser
from bert_base.train.bert_lstm_ner import train
args = get_args_parser()
if True:
import sys
param_str = '\n'.join(['%20s = %s' % (k, v) for k, v in sorted(vars(args).items())])
print('usage: %s\n%20s %s\n%s\n%s\n' % (' '.join(sys.argv), 'ARG', 'VALUE', '_' * 50, param_str))
print(args)
os.environ['CUDA_VISIBLE_DEVICES'] = args.device_map
train(args=args)
#数据处理代码
def convert_single_example(ex_index, example, label_list, max_seq_length, tokenizer, output_dir, mode):
#将一个样本进行分析,字和标签转化为ID,结构化到输入特征对象中
label_map = {}
#1表示从1开始对标签进行索引化
for (i, label) in enumerate(label_list, 1):
label_map[label] = i
#保存label->index 的映射
if not os.path.exists(os.path.join(output_dir, 'label2id.pkl')):
with codecs.open(os.path.join(output_dir,'label2id.pkl'),'wb')as w:
pickle.dump(label_map, w)
textlist = example.text.split(' ')
labellist = example.label.split(' ')
tokens = []
labels = []
for i, word in enumerate(textlist):
#分词,不在BERT的vocab.txt中,则进行WordPiece处理,分字可替换为list(input)
token = tokenizer.tokenize(word)
tokens.extend(token)
label_1 = labellist[i]
for m in range(len(token)):
if m == 0:
labels.append(label_1)
else: #一般不会出现else分支
labels.append("X")
#tokens = tokenizer.tokenize(example.text)
#序列截断
if len(tokens) >= max_seq_length - 1:
tokens = tokens[0:(max_seq_length - 2)]
#-2的原因是因为序列需要加一个句首和句尾标志
labels = labels[0:(max_seq_length - 2)]
ntokens = []
segment_ids = []
label_ids = []
ntokens.append("[CLS]") #句子开始设置CLS标志
segment_ids.append(0)
#append("O") or append("[CLS]") not sure!
label_ids.append(label_map["[CLS]"])
#O或者CLS会减少标签个数,但句首和句尾使用不同的标志标注
for i, token in enumerate(tokens):
ntokens.append(token)
segment_ids.append(0)
label_ids.append(label_map[labels[i]])
ntokens.append("[SEP]") #句尾添加[SEP]标志
segment_ids.append(0)
#append("O") or append("[SEP]") not sure!
label_ids.append(label_map["[SEP]"])
input_ids = tokenizer.convert_tokens_to_ids(ntokens)
#将序列中的字(ntokens)转化为ID形式
input_mask = [1] * len(input_ids)
#label_mask = [1] * len(input_ids)
#使用padding
while len(input_ids) < max_seq_length:
input_ids.append(0)
input_mask.append(0)
segment_ids.append(0)
label_ids.append(0)
ntokens.append("**NULL**")
#label_mask.append(0)
#print(len(input_ids))
assert len(input_ids) == max_seq_length
assert len(input_mask) == max_seq_length
assert len(segment_ids) == max_seq_length
assert len(label_ids) == max_seq_length
#assert len(label_mask) == max_seq_length
#打印部分样本数据信息
if ex_index < 5:
tf.logging.info("*** Example ***")
tf.logging.info("guid: %s" % (example.guid))
tf.logging.info("tokens: %s" % " ".join(
[tokenization.printable_text(x) for x in tokens]))
tf.logging.info("input_ids:%s"% " ".join([str(x) for x in input_ids]))
tf.logging.info("input_mask: %s" % " ".join([str(x) for x in input_mask]))
tf.logging.info("segment_ids: %s" % " ".join([str(x) for x in segment_ids]))
tf.logging.info("label_ids: %s" % " ".join([str(x) for x in label_ids]))
# tf.logging.info("label_mask: %s" % " ".join([str(x) for x in label_mask]))
#结构化为一个类
feature = InputFeatures(
input_ids=input_ids,
input_mask=input_mask,
segment_ids=segment_ids,
label_ids=label_ids,
#label_mask = label_mask
)
#mode='test'的时候才有效
write_tokens(ntokens, output_dir, mode)
return feature
3. 命名实体纠错
对识别到的课程实体进行纠错,依据为course.txt
中存储的所有课程全称,采用最短编辑距离匹配法与包含法相结合。
class Select_course:
def __init__(self):
self.f = csv.reader(open('QA/dict/course.txt','r'))
self.course_name = [i[0].strip() for i in self.f]
self.led = 3
self.limit_num = 10
self.select_word = []
self.is_same = False
self.have_same_length = False
self.input_word = ''
self.is_include = False
#print(self.course_name)
#print('列表创建完毕....')
#包含搜索
def select_first(self, input_word):
self.select_word = []
self.is_same = False
self.is_include = False
self.have_same_length = False
self.input_word = input_word
if input_word in self.course_name:
self.is_same = True
self.select_word.append(input_word)
if self.is_same == False:
for i in self.course_name:
mark = True
for one_word in input_word:
if not one_word in i:
mark = False
if mark:
self.select_word.append(i)
if len(self.select_word) != 0:
self.is_include = True
#print('第一轮筛选:')
#print(self.select_word)
#模糊搜索
def select_second(self):
self.led = 3
if self.is_same or self.is_include:
return
for name in self.course_name:
ed = ls.distance(self.input_word, name)
if ed <= self.led:
self.led = ed
self.select_word.append(name)
select_word_copy1 = copy.deepcopy(self.select_word)
for name in select_word_copy1:
ed = ls.distance(self.input_word, name)
if ed > self.led:
self.select_word.remove(name)
if ed == self.led and len(name) == len(self.input_word):
self.hava_same_length = True
#print('第二轮筛选:')
#print(self.select_word)
对识别到的老师实体进行纠错,依据为teacher.csv中存储的所有老师姓名全称,基于最短编辑距离匹配法,并使纠错逻辑符合用户输入错误姓名的规律。
class Select_name:
def __init__(self): #定义初始化
self.f = csv.reader(open('QA/dict/teacher.csv','r'))
self.teacher_name = [i[0] for i in self.f]
self.led = 3
self.limit_num = 10
self.select_word = []
self.have_same_length = False
self.is_same = False
self.input_word = ''
#print(self.teacher_name)
#print('列表创建完毕....')
def select_first(self, input_word): #定义首选
self.select_word = []
self.have_same_length = False
self.is_same = False
self.input_word = input_word
if input_word in self.teacher_name:
self.is_same = True
self.select_word.append(input_word)
if self.is_same == False:
for name in self.teacher_name:
ed = ls.distance(self.input_word, name)
if ed <= self.led:
self.led = ed
self.select_word.append(name)
select_word_copy1 = copy.deepcopy(self.select_word)
for name in select_word_copy1:
ed = ls.distance(self.input_word, name)
if ed > self.led:
self.select_word.remove(name)
if ed == self.led and len(name) == len(self.input_word):
self.hava_same_length = True
#print('第一轮筛选:')
#print(self.select_word)
return
def select_second3(self): #定义后续筛选
if self.is_same == True or len(self.input_word) != 3:
return
select_word_copy2 = copy.deepcopy(self.select_word)
if self.hava_same_length:
for name in select_word_copy2:
if len(self.input_word)!=len(name):
self.select_word.remove(name)
#print('第二轮筛选:')
#print(self.select_word)
def select_third3(self):
if self.is_same == True or len(self.input_word) != 3:
return
select_word_copy3 = copy.deepcopy(self.select_word)
self.select_word = []
for name in select_word_copy3:
if name[0] == self.input_word[0] and name[2] == self.input_word[2]:
self.select_word.append(name)
for name in select_word_copy3:
if not(name[0]==self.input_word[0]and name[2]== self.input_word[2]):
self.select_word.append(name)
#print('第三轮筛选:')
#print(self.select_word)
def limit_name_num(self):
while(len(self.select_word)>self.limit_num):
self.select_word.pop()
#print('列表大小限制:')
#print(self.select_word)
4. 检索问题类别
以下为三个类别的关键词列表:
self.direction_qwds
= [“ 做什么”“干什么”“专长”“专攻”“兴趣”“方向”“方面”“研究”“科研”]
self.location_qwds
= [“地址”“地点”“地方”“在哪”去哪”“到哪”找到”“办公室”“位置”“见到”]
self.telephone_qwds
= [“ 座机”“固话”“电话”“号码”“联系”]
通过识别到的实体类别和检索到的关键词进行问题分类,相关代码如下:
if self.check_words(self.direction_qwds,question)and('teacher' in types): question_type = 'teacher_direction'
question_types.append(question_type)
if self.check_words(self.location_qwds, question)and ('teacher' in types): question_type = 'teacher_location'
question_types.append(question_type)
if self.check_words(self.telephone_qwds,question)and ('teacher' in types): question_type = 'teacher_telephone'
question_types.append(question_type)
5. 查询结果
根据识别到的具体问题类别,将问句翻译成数据库查询语句,相关代码如下:
if final_question_type == 'teacher_direction':
sql = "MATCH (m:Teacher) where m.name = '{0}' return m.name, m.research_direction".format(i)
if final_question_type == 'teacher_location':
sql = "MATCH (m:Teacher) where m.name = '{0}' return m.name, m.office_location".format(i)
if final_question_type == 'teacher_telephone':
sql = "MATCH (m:Teacher) where m.name = '{0}' return m.name, m.telephone".format(i)
#连接数据库
def __init__(self):
self.g = Graph(
"http://10.3.55.50:7474/browser",
user="********",
password="********")
self.num_limit = 30
#查询结果并返回编写的模版答案语句
def search_main(self, sqls, final_question_types):
final_answers = []
temp_data = []
data = []
for i in sqls:
for one_sql in i:
temp_data.append(self.g.run(one_sql).data()[0])
#print(temp_data)
data.append(temp_data)
temp_data = []
#print(data)
temp_answer = []
answer = []
for i in zip(final_question_types, data):
for one_type_and_data in zip(i[0],i[1]):
temp_answer.append(self.answer_prettify(one_type_and_data[0],one_type_and_data[1]))
answer.append(temp_answer)
temp_answer = []
return answer
重复询问以剔除错误的备选,例如,识别到用户输入的老师姓名为王红,但查询到北京邮电大学没有王红,存在王春红、王小红,此时重复询问用户以确定唯一实体对象。
ask_again = ''
final_question_types = []
for i in zip(tags, pre_words):
#print(i)
if len(i[1]) == 1:
final_question_types.append(classifier.classify(text, i[0]))
final_words.append(i[1][0])
if len(i[1]) > 1:
print('>1')
if i[0] == 'teacher':
ask_again = '请问您要询问的是哪个老师的信息:{0}'.format(','.join(i[1]))
if i[0] == 'course':
ask_again = '请问您要询问的是哪门课程的信息:{0}'.format(','.join(i[1]))
#print(ask_again)
answer_again = input(ask_again)
final_words.append(answer_again)
final_question_types.append(classifier.classify(text, i[0]))
系统测试
本部分包括命名实体识别网络测试和知识图谱问答系统整体测试。
1. 命名实体识别网络测试
输入常用问句,从测试结果可知,测试基本能实现老师、课程实体的识别,模型训练效果如图所示。
2. 知识图谱问答系统整体测试
输入常用问句,从问答系统返回的答案可知,系统运行状态良好,基本能回答用户提出的问题,效果如图所示。
工程源代码下载
详见本人博客资源下载页
其它资料下载
如果大家想继续了解人工智能相关学习路线和知识体系,欢迎大家翻阅我的另外一篇博客《重磅 | 完备的人工智能AI 学习——基础知识学习路线,所有资料免关注免套路直接网盘下载》
这篇博客参考了Github知名开源平台,AI技术平台以及相关领域专家:Datawhale,ApacheCN,AI有道和黄海广博士等约有近100G相关资料,希望能帮助到所有小伙伴们。