Datawhale AI夏令营 AI+逻辑推理 Task2总结
一、大语言模型解题方案介绍
1.1 大模型推理介绍
推理是建立在训练完成的基础上,将训练好的模型应用于新的、未见过的数据,模型利用先前学到的规律进行预测、分类和生成新内容,使得AI在实际应用中能够做出更有意义的决策。
1.2 大模型推理实现最常用方法——提示工程(Prompt Engineering)
提示工程是一门较新的学科,关注提示词开发和优化,帮助用户将大语言模型用于各场景和研究领域。研究人员可利用提示工程来提升大语言模型处理复杂任务场景的能力,如问答和算术推理能力。开发人员可通过提示工程设计、研发强大的工程技术,实现和大语言模型或其他生态工具的高效接轨。
提示工程不仅仅是关于设计和研发提示词。它包含了与大语言模型交互和研发的各种技能和技术。提示工程在实现和大语言模型交互、对接,以及理解大语言模型能力方面都起着重要作用。用户可以通过提示工程来提高大语言模型的安全性,也可以赋能大语言模型,比如借助专业领域知识和外部工具来增强大语言模型能力。
本赛题中,通过数据处理,将问题字典转化为MD格式的prompt,这就是让大语言模型能理解并作分析的秘诀。
二、Baseline整体代码介绍
整体代码主要包括答案生成和纠错与结果文件生成两大模块。
答案生成部分包括大模型的处理函数、大模型返回结果抽取、多线程处理及答案生成的启动,代码核心是大模型部分。
纠错与结果生成部分存在的目的是由于目前使用了API调用在线开源大模型,因为网络、模型能力等原因会导致有一些结果会出现缺失。(比如大模型回答时,没有明确给出ABCD的结果,而返回的空值。也有时因为网络retry模块机会使用结束后,依然没有提取到结果会跳过某个问题。)
2.1 环境配置
导入需要的环境,包括日志处理、多线程、API请求等相关库引入。
import json
import os
from pprint import pprint
import re
from tqdm import tqdm
import random
import uuid
import openai
import tiktoken
import json
import numpy as np
import requests
from retry import retry
from scipy import sparse
from http import HTTPStatus
import dashscope
from concurrent.futures import ThreadPoolExecutor, as_completed
from loguru import logger
import json
import time
from tqdm import tqdm
logger.remove() # 移除默认的控制台输出
logger.add("logs/app_{time:YYYY-MM-DD}.log", level="INFO", rotation="00:00", retention="10 days", compression="zip")
MODEL_NAME = 'qwen2-7b-instruct'
2.2 答案生成部分
def call_qwen_api(MODEL_NAME, query):
# 这里采用dashscope的api调用模型推理,通过http传输的json封装返回结果
messages = [{'role': 'user', 'content': query}]
response = dashscope.Generation.call(
MODEL_NAME,
messages=messages,
result_format='message', # set the result is message format.
)
if response.status_code == HTTPStatus.OK:
# print(response)
return response['output']['choices'][0]['message']['content']
else:
print('Request id: %s, Status code: %s, error code: %s, error message: %s' % (
response.request_id, response.status_code,
response.code, response.message
))
raise Exception()
call_qwen_api
函数就是通过输入模型名称、prompt,完成大模型api的调用。
def api_retry(MODEL_NAME, query):
# 最大尝试次数
max_retries = 5
# 再次尝试等待时间
retry_delay = 60 # in seconds
attempts = 0
while attempts < max_retries:
try:
return call_qwen_api(MODEL_NAME, query)
except Exception as e:
attempts += 1
if attempts < max_retries:
logger.warning(f"Attempt {attempts} failed for text: {query}. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
else:
logger.error(f"All {max_retries} attempts failed for text: {query}. Error: {e}")
raise
api_retry
函数是当大模型调用API时可能会导致出错中断的问题,为了保证每个问题都被大模型处理过,我们需要设置一个反复尝试的函数。
def get_prompt(problem, question, options):
options = '\n'.join(f"{'ABCDEFG'[i]}. {o}" for i, o in enumerate(options))
prompt = f"""你是一个逻辑推理专家,擅长解决逻辑推理问题。以下是一个逻辑推理的题目,形式为单项选择题。所有的问题都是(close-world assumption)闭世界假设,即未观测事实都为假。请逐步分析问题并在最后一行输出答案,最后一行的格式为"答案是:A"。题目如下:
### 题目:
{problem}
### 问题:
{question}
{options}
"""
# print(prompt)
return prompt
get_prompt
是prompt的模板函数,通过字符串处理的方式拼接完整的prompt
# 这里使用extract抽取模获得抽取的结果
def extract(input_text):
# re.compile()将字符串形式的正则表达式编译为Pattern模式对象,第二个参数是匹配模式
ans_pattern = re.compile(r"答案是:(.)", re.S)
problems = ans_pattern.findall(input_text)
# print(problems)
if(problems == ''):
return 'A'
return problems[0]
通过抽取函数可以将大语言模型生成的结果抽取成答案对应的选项,这里的匹配原则和prompt呼应。我们可以看到prompt要求【最后一行的格式为"答案是:A"】这样的规范,那么我们采用正则表达式re.compile方法匹配到答案对应的选项。当我们匹配为空时,我们默认选"A"。
def process_datas(datas,MODEL_NAME):
results = []
# 定义线程池 选择16线程
with ThreadPoolExecutor(max_workers=16) as executor:
# 使用future_data 存储每个线程的数据
future_data = {}
lasttask = ''
lastmark = 0
# lens记录了调用api的次数,也就是每个问题背景下的所有子问题之和
lens = 0
# 进入多线程任务
# 这里每个data下是一个问题背景,其中包含多个子问题
for data in tqdm(datas, desc="Submitting tasks", total=len(datas)):
problem = data['problem']
# 用enumerate方法每次循环得到问题的序号id和实际的问题
for id, question in enumerate(data['questions']):
prompt = get_prompt(problem,
question['question'],
question['options'],
)
# 送入线程池等待处理,使用api_retry,向api_retry传入MODEL_NAME,prompt参数
future = executor.submit(api_retry, MODEL_NAME, prompt)
# 每个线程存储对应的json问题数据以及问题序号id,方便定位出执行的是哪个子问题
future_data[future] = (data, id)
time.sleep(0.6) # 控制每0.6秒提交一个任务
lens += 1
# 处理多线程任务
for future in tqdm(as_completed(future_data), total=lens, desc="Processing tasks"):
# print('data',data)
# 取出每个线程中的字典数据及对应的问题id
data = future_data[future][0]
problem_id = future_data[future][1]
try:
# 获取api运行结果
res = future.result()
# 抽取大语言模型返回结果
extract_response = extract(res)
# print('res',extract_response)
# 装入answer字段
data['questions'][problem_id]['answer'] = extract_response
# 在结果列表中新增数据字典
results.append(data)
# print('data',data)
except Exception as e:
logger.error(f"Failed to process text: {data}. Error: {e}")
return results
多线程处理数据
def main(ifn, ofn):
if os.path.exists(ofn):
pass
data = []
# 按行读取数据
with open(ifn) as reader:
for line in reader:
sample = json.loads(line)
data.append(sample)
datas = data
# print(data)
# 均匀地分成多个数据集
return_list = process_datas(datas,MODEL_NAME)
print(len(return_list))
print("All tasks finished!")
return return_list
if __name__ == '__main__':
a = extract("""根据欧几里得算法,逐步解析计算两个数6和7的最大公约数(gcd)的步骤如下:
1. 判断6和7是否相等:不相等。
2. 判断6和7大小关系,7 > 6,所以用更大的数7减去较小的数6得到结果1。
3. 现在计算6和1的最大公约数。
4. 6 > 1,根据算法用更大的数6减去较小的数1得到结果5。
5. 再计算5和1的最大公约数。
6. 5 > 1,用5减去1得到结果4。
7. 再计算4和1的最大公约数。
8. 4 > 1,用4减去1得到结果3。
9. 再计算3和1的最大公约数。
10. 3 > 1,用3减去1得到结果2。
11. 再计算2和1的最大公约数。
12. 2 > 1,用2减去1得到结果1。
13. 最后计算1和1的最大公约数,两数相等,gcd即为这两个数,也就是1。
因此,6和7的最大公约数是1。
答案是:C.""")
print(a)
# 调用主函数
return_list = main('round1_test_data.jsonl', 'upload.jsonl')
启动函数
2.3 纠错与结果文件生成部分
def has_complete_answer(questions):
# 这里假设完整答案的判断逻辑是:每个question都有一个'answer'键
for question in questions:
if 'answer' not in question:
return False
return True
def filter_problems(data):
result = []
problem_set = set()
for item in data:
# print('处理的item' ,item)
problem = item['problem']
if problem in problem_set:
# 找到已存在的字典
for existing_item in result:
if existing_item['problem'] == problem:
# 如果当前字典有完整答案,替换已存在的字典
if has_complete_answer(item['questions']):
existing_item['questions'] = item['questions']
existing_item['id'] = item['id']
break
else:
# 如果当前字典有完整答案,添加到结果列表
if has_complete_answer(item['questions']):
result.append(item)
problem_set.add(problem)
return result
return_list
return_list = filter_problems(return_list)
sorted_data = sorted(return_list, key=lambda x: int(str(x['id'])[-3:]))
print(sorted_data)
将一个问题背景下的所有问题存入同一个字典,并按id序号排序。
def find_missing_ids(dict_list):
# 提取所有序号
extracted_ids = {int(d['id'][-3:]) for d in dict_list}
# 创建0-500的序号集合
all_ids = set(range(500))
# 找出缺失的序号
missing_ids = all_ids - extracted_ids
return sorted(missing_ids)
# 示例字典列表
dict_list = sorted_data
# 找出缺失的序号
missing_ids = find_missing_ids(dict_list)
print("缺失的序号:", missing_ids)
len(missing_ids)
纠错函数
data = []
with open('round1_test_data.jsonl') as reader:
for id,line in enumerate(reader):
if(id in missing_ids):
sample = json.loads(line)
for question in sample['questions']:
question['answer'] = 'A'
sorted_data.append(sample)
sorted_data = sorted(sorted_data, key=lambda x: int(str(x['id'])[-3:]))
补错函数,针对空缺的列表我们进行补错,让每个answer字段默认填充为A。
with open('upload.jsonl', 'w') as writer:
for sample in sorted_data:
writer.write(json.dumps(sample, ensure_ascii=False))
writer.write('\n')
生成结果文件并存储
三、Baseline改进
- 原baseline中针对空缺的列表补错时,只是将每个answer字段默认填充为A,为了提高准确率,这里把它送入多线程函数再处理一遍
- 改进prompt