【2024】Datawhale AI夏令营 Task2笔记——Baseline代码详细解读
本文对可完成赛事“逻辑推理赛道:复杂推理能力评估”初赛的baseline代码进行详细解读,该baseline代码由Datawhale AI夏令营提供,核心内容是调用灵积模型服务平台的大语言模型对测试集的题目进行推理。
🔴注意:
1、在进行代码解读时,为了简化代码、不影响代码理解,本文对baseline代码进行了一定调整。与Datawhale AI夏令营提供的代码相比,本文解读的代码中删除了重复导入模块的代码、实际未被使用的变量定义的代码、实际未被使用的函数定义的代码。
2、本文仅涉及代码解读,不涉及代码运行环境的配置问题。
一、环境配置
1.1 安装必要的第三方库
包括灵积模型服务SDK——dashcope和日志记录SDK——loguru。
!pip install scipy openai tiktoken retry dashscope loguru
dashcope:dashscope是阿里云提供的模型服务灵积的英文名称,是一个开箱即用的模型服务API库,后续用于设置个人的灵积模型服务API KEY和调用具体模型完成推理任务。
loguru:Loguru是Python中一个流行的日志库,它提供了强大的日志记录功能,使得开发人员能够轻松地跟踪和调试代码。
1.2 为代码运行提供必要的工具和环境配置
导入日志处理、多线程、api请求等相关模块,设置日志记录配置,指定后续用于推理的大语言模型的名称。
from multiprocessing import Process, Manager ## 提供进程创建和管理的支持
import json ## 用于解析和生成json数据
import os ## 提供操作系统接口功能,如文件和目录操作
from pprint import pprint ## 用于美观打印数据结构
import re ## 正则表达式的操作模块
from tqdm import tqdm ## 用于显示进度条
import random ## 用于提供生成随机数的功能
import uuid ## 用于生成唯一标识符
import openai ## OpenAI API客户端
import tiktoken
import numpy as np
import requests ## 用于发送Https请求
from retry import retry ## 用于失败重试
from scipy import sparse
from http import HTTPStatus ## 用于提供Http状态码
import dashscope ## 用于使用灵积模型服务
from concurrent.futures import ThreadPoolExecutor, as_completed ## 提供线程和进程池执行功能
from loguru import logger ## 用于日志记录
import time
'''
设置日志记录的配置
'''
logger.remove() # 移除默认的控制台输出
logger.add("logs/app_{time:YYYY-MM-DD}.log", level="INFO", rotation="00:00", retention="10 days", compression="zip") ## 将日志输出到文件,文件名格式为logs/app_YYYY-MM-DD.log,日志级别为INFO。日志文件每日轮换,并保留最近10天的日志。旧日志文件压缩为 zip 格式。
'''
指定后续用于推理的大语言模型的名称
'''
MODEL_NAME = 'qwen1.5-1.8b-chat' # 使用MODEL_NAME变量指定后续推理使用的大模型,此模型由灵积模型服务提供,需要是在DashScope上可查询(https://dashscope.console.aliyun.com/billing)、使用的大模型的名称。此处指定的是qwen1.5-1.8b-chat。
1.3 设置模型服务灵积的API-KEY
此份代码使用灵积模型服务上的大语言模型进行推理,因此需要先设置模型服务灵积的API-KEY,为后续调用灵积模型服务上的大语言模型做准备。
dashscope.api_key="sk-" # 这里需要替换为在Task 1中申请过的API-KEY字符串
二、大语言模型的推理答案生成
2.1 组建能够促使大语言模型推理的prompt(定义函数)
定义函数get_prompt
,将需要大语言模型推理的问题相关的信息,包括问题背景(probelm
)、问题(question
)及问题对应的选项(options
)进行整理,形成促使大语言模型进行推理的prompt模版。
def get_prompt(problem, question, options):
'''
函数接收的options是一个列表,里面包含着问题对应的选项的字符串内容,列表中的每个元素对应一个选项。下面一行代码将options列表中的元素整理成一个长度更长字符串,包含所有选项的信息
'''
options = '\n'.join(f"{'ABCDEFG'[i]}. {o}" for i, o in enumerate(options))
'''
prompt模板的格式。在具体使用时,此模板将使用变量的内容代替模板中变量名称的部分
'''
prompt = f"""你是一个逻辑推理专家,擅长解决逻辑推理问题。以下是一个逻辑推理的题目,形式为单项选择题。所有的问题都是(close-world assumption)闭世界假设,即未观测事实都为假。请逐步分析问题并在最后一行输出答案,最后一行的格式为"答案是:A"。题目如下:
### 题目:
{problem}
### 问题:
{question}
{options}
"""
# print(prompt)
return prompt
此处提供一个示例,展示待推理的问题相关的信息经过函数get_prompt
处理前后的不同:
处理前:
{
'problem':
'有一群人和一些食物类型。下列是关于这些个体和食物的已知信息:\n\n1. 鸡肉是一种食物。\n2. 苹果是一种食物。\n3. 如果X吃了Y,且X活着,则Y是一种食物。\n4. Bill存活。\n5. Bill吃了花生。\n6. John吃所有食物。\n7. Sue吃所有Bill吃的食物。\n8. John喜欢所有食物。\n\n根据以上信息,回答以下选择题:',
'questions': [{
'question': '选择题 1:\n谁喜欢吃花生?',
'options': ['Bill', 'Sue', 'John', 'None of the above']
}
],
'id': 'round1_test_data_000'
}
处理后(即变量prompt
的内容):
你是一个逻辑推理专家,擅长解决逻辑推理问题。以下是一个逻辑推理的题目,形式为单项选择题。所有的问题都是(close-world assumption)闭世界假设,即未观测事实都为假。请逐步分析问题并在最后一行输出答案,最后一行的格式为"答案是:A"。题目如下:
###题目:
有一群人和一些食物类型。下列是关于这些个体和食物的已知信息:
鸡肉是一种食物。
苹果是一种食物。
如果X吃了Y,且X活着,则Y是一种食物。
Bill存活。
Bill吃了花生。
John吃所有食物。
Sue吃所有Bill吃的食物。
John喜欢所有食物。
根据以上信息,回答以下选择题:
###问题:
选择题 1:
谁喜欢吃花生?
A. Bill
B. Sue
C. John
D. None of the above
2.2 【🔴关键】调用灵积模型服务平台的大语言模型进行推理(定义函数)
定义函数call_qwen_api
,通过Dashscope API调用其平台上的指定模型进行推理并返回结果(及具体的结果文本)。
def call_qwen_api(MODEL_NAME, query): # 采用dashscope的api调用模型推理,通过http传输的json封装返回结果
'''
将待查询的内容(query)(前面经过处理的prompt)以固定的数据格式装入消息(messages)
'''
messages = [
{'role': 'user', 'content': query}
]
'''
将用于推理的大语言模型的名称(MODEL_NAME)、已整理好格式的待查询的内容(messages)和返回结果的格式('message')作为参数,传入dashscope的api——dashscope.Generation.call()函数,生成结果(response)。
注意:
1、response是一个结构复杂的对象,包含了API响应状态信息、结果文本等内容,并不是直接的文本结果本身;
2、dashscope.Generation.call()函数使用了@retry(delay=3, tries=3)装饰器,这意味着在调用 API失败时,函数会自动重试最多3次,每次重试间隔3秒
'''
response = dashscope.Generation.call(
MODEL_NAME,
messages=messages,
result_format='message', # set the result is message format.
)
## 检查API的响应状态码,如果状态码正常,即为HTTPStatus.OK,则提取并返回生成的文本内容。如果状态码不正常,即不是HTTPStatus.OK,则打印错误信息并抛出异常
if response.status_code == HTTPStatus.OK: ## API响应正常,可提取并返回生成的文本内容。文本内容在response['output']['choices'][0]['message']['content']中显示
# print(response)
return response['output']['choices'][0]['message']['content']
else: ## API响应异常,打印错误信息,抛出异常
print('Request id: %s, Status code: %s, error code: %s, error message: %s' % (
response.request_id, response.status_code,
response.code, response.message
))
raise Exception()
此处提供一个示例,展示调用大语言模型后得到的结果(即变量result
的内容):
{
"status_code": 200,
"request_id": "18d43478-2aec-92be-8840-cd3bb08009cf",
"code": "",
"message": "",
"output": {
"text": null,
"finish_reason": null,
"choices": [
{
"finish_reason": "stop",
"message": {
"role": "assistant",
"content": "我们可以通过分析给出的信息来解答这个问题。\n\n首先,我们知道Bill吃了花生(根据第5条信息),并且Bill还活着(根据第4条信息)。根据第3条规则,“如果X吃了Y,且X活着,则Y是一种食物”,我们可以推断出花生是一种食物。\n\n接下来,我们知道John吃所有食物(根据第6条信息),并且John喜欢所有食物(根据第8条信息)。既然花生是一种食物,那么John应该也吃花生,并且喜欢花生。\n\nSue吃所有Bill吃的食物(根据第7条信息),既然Bill吃了花生,那么Sue也应该吃花生。\n\n所以,喜欢吃花生的人有Bill、Sue和John。\n\n答案是:C. John\n\n但是,题目要求选择一个选项,而实际上Bill、Sue和John都喜欢吃花生。然而,在给出的选项中,只有John被明确列出作为喜欢吃花生的人(选项C)。因此,虽然Bill和Sue也喜欢吃花生,但根据题目要求,正确答案应当是包含在选项中的那个,即John。\n\n答案是:C. John"
}
}
]
},
"usage": {
"input_tokens": 210,
"output_tokens": 223,
"total_tokens": 433
}
}
2.3 若灵积模型服务调用出错,重新尝试调用(定义函数)
由于API调用可能出错,为了保证每个推理问题都被处理,令API调用出错时重试:当某个推理问题因API调用出错而不能完成推理时,使用函数 api_retry
,在重新调用call_qwen_api
函数至多若干次(由下端代码中的max_retries
指定),并在调用失败时进行重试。
def api_retry(MODEL_NAME, query):
max_retries = 5 ## 最大重试次数
retry_delay = 60 # in seconds ## 重试间隔时间,设为60秒
attempts = 0 ## 记录当前重试次数,初始值为0
while attempts < max_retries: ## 在未达到最大重试次数前,执行以下判断和操作
try: ## 尝试重新调用call_qwen_api()并返回结果
return call_qwen_api(MODEL_NAME, query)
except Exception as e: ## 重试但仍失败时,执行以下判断和操作
attempts += 1 ## 更新当前重试次数
if attempts < max_retries: ## 当前未达到最大重试次数,可继续重试
logger.warning(f"Attempt {attempts} failed for text: {query}. Retrying in {retry_delay} seconds...") ## 在日志中记录重试失败的信息
time.sleep(retry_delay) ## 休眠一段时间再进入循环
else: ## 当前已达到最大重试次数,在日志中记录重试多次仍失败的信息,抛出异常
logger.error(f"All {max_retries} attempts failed for text: {query}. Error: {e}")
raise
2.4 从大语言模型生成的结果中提取答案选项(定义函数)
由于大语言模型返回的文本内容包含较多信息,并不只是直接明了的选项字符,因此定义函数extract
,用于从大语言模型返回的文本内容(对应函数参数input_text
)中提取答案。
# 这里使用extract抽取模获得抽取的结果
def extract(input_text):
ans_pattern = re.compile(r"答案是:(.)", re.S) ## 定义一个正则表达式模式的编译规则,该规则匹配字符串"答案是:"后面的一个字符(包括换行符),其中re.S标志使得点号.能够匹配所有字符,包括换行符
problems = ans_pattern.findall(input_text) ## 在input_text中查找所有匹配的内容,并将结果存储在列表problems中
# print(problems)
if(problems == ''): ## 如果没有匹配到内容,返回'A',即选择'A'作为问题的答案
return 'A'
return problems[0] ## 选择匹配到的第一个内容作为问题的答案
2.5 使用多线程同时推理多个问题(定义函数)
定义一个名为process_datas
的函数,用于并发处理一组数据,多线程执行api_retry
函数并对结果进行处理和提取。
def process_datas(datas,MODEL_NAME):
'''
datas:list,存放着所有待推理的问题相关信息,其中每个元素对应一个问题
MODEL_NAME:str,用于推理的大语言模型的名称
'''
results = [] ## 用于存储所有补充了问题答案后的问题相关信息
## 定义线程池,使用16个线程
with ThreadPoolExecutor(max_workers=16) as executor:
future_data = {} ## 记录线程对象的信息,包括该线程处理的问题相关信息、问题id
lens = 0 ## 记录处理的问题(一个问题背景下可能有多个问题,这里是指问题而不是问题背景)/函数api_retry的数量
'''
提交线程
'''
for data in tqdm(datas, desc="Submitting tasks", total=len(datas)): ## data是一个具体问题的信息;使用tqdm展示进度条
problem = data['problem'] ## 从当前具体问题的信息中提取问题背景
for id, question in enumerate(data['questions']): ## 从当前具体问题的信息中提取问题及选项。一个问题背景下可能有多个问题需要推理,因此此处设置循环,每个问题都单独组装一个prompt
prompt = get_prompt(
problem,
question['question'], ## 问题
question['options'], ## 问题选项
)
future = executor.submit(api_retry, MODEL_NAME, prompt) ## 将MODEL_NAME和prompt作为函数api_retry的参数,将函数api_retry提交进入线程池,等待处理。future是一个线程对象
future_data[future] = (data,id) ## 在future_data字典中,以线程对象为键,该线程对象对应的问题相关信息、问题id为值
time.sleep(0.6) # 控制每0.5秒提交一个任务 ## 这里的提交是指提交一个api_retry函数处理的线程
lens += 1
'''
处理多线程任务
'''
for future in tqdm(as_completed(future_data), total=lens, desc="Processing tasks"): ## future是一个线程对象
## 取出线程对象中的问题数据及对应的问题id
data = future_data[future][0]
problem_id = future_data[future][1]
try:
res = future.result() ## 使用线程对象的result()方法获取线程对象对应的函数的运行结果,即api_retry()函数的运行结果,是大语言模型的推理回答文本
extract_response = extract(res) ## 从推理回答文本中提取具体的回答,即选项字符
data['questions'][problem_id]['answer'] = extract_response ## 将大语言模型回答的选项加入此问题信息中,作为此问题的相关信息的一部分
results.append(data) ## 将补充了问题答案后的问题相关信息存储到results列表中
except Exception as e: ## 如果无法获取大语言模型的推理回答(一般是因为api_retry函数执行过程中API调用异常),在日志中记录出错信息
logger.error(f"Failed to process text: {data}. Error: {e}")
return results
2.6 读取待推理的问题相关信息并进行推理(定义函数)
定义函数main
,用于从输入文件读取数据、进行推理,并将结果返回。
def main(ifn):
'''
ifn:input filename的缩写,表示输入文件(后面在调用main函数时会传入'round1_test_data.jsonl'作为ifn,此字符串代表的文件包含待测试的问题相关信息)
'''
if os.path.exists(ofn):
pass
data = []
'''
按行读取数据
'''
with open(ifn) as reader: ## 打开输入文件
for line in reader: ## 逐行读取ifn中的内容,每一行对应一个问题的相关信息
sample = json.loads(line)
data.append(sample) ## 将ifn中的问题逐个存放到data列表中
datas = data ## 此时datas列表存放着所有待推理的问题
'''
多线程处理数据
'''
return_list = process_datas(datas,MODEL_NAME)
print(len(return_list)) ## 打印处理的问题的数目(一个问题背景下可能有多个问题)
print("All tasks finished!")
return return_list
此处提供一个示例,展示读取文件后datas
的内容:
假设ifn
代表的文件中有如下内容:
则datas
的内容如下:
读取文件主要是将问题相关信息从“在文件中存储”转换为“在列表中存储”,方便后续读取问题相关信息。
2.7 正式启动推理
先提供一个推理回答示例,运行extract
函数并打印结果,验证提取答案选项的代码是否正确。随后调用 main
函数,启动推理。
if __name__ == '__main__':
'''
先提供一个推理回答示例,验证提取答案选项的代码是否正确
'''
a = extract("""根据欧几里得算法,逐步解析计算两个数6和7的最大公约数(gcd)的步骤如下:
1. 判断6和7是否相等:不相等。
2. 判断6和7大小关系,7 > 6,所以用更大的数7减去较小的数6得到结果1。
3. 现在计算6和1的最大公约数。
4. 6 > 1,根据算法用更大的数6减去较小的数1得到结果5。
5. 再计算5和1的最大公约数。
6. 5 > 1,用5减去1得到结果4。
7. 再计算4和1的最大公约数。
8. 4 > 1,用4减去1得到结果3。
9. 再计算3和1的最大公约数。
10. 3 > 1,用3减去1得到结果2。
11. 再计算2和1的最大公约数。
12. 2 > 1,用2减去1得到结果1。
13. 最后计算1和1的最大公约数,两数相等,gcd即为这两个数,也就是1。
因此,6和7的最大公约数是1。
答案是:C.""")
print(a)
'''
调用主函数,对文件round1_test_data.jsonl中的问题进行推理
'''
return_list = main('round1_test_data.jsonl')
三、整理、补充与结果文件生成
3.1 整理推理回答情况
整理推理回答情况包括两个部分:
- 判断问题回答是否完整,即检查推理后问题是否包含答案选项;
- 去除被重复处理的问题相关信息;
- 整理问题相关信息的顺序。
1、定义函数has_complete_answer
,检查推理后问题是否包含答案选项,包含则回答完整,不包含则回答不完整。
def has_complete_answer(questions):
# 这里假设完整答案的判断逻辑是:每个question都有一个'answer'键
for question in questions:
if 'answer' not in question:
return False
return True
此处提供一个示例,展示questions
的内容:
[{"question": "选择题 1:\n谁喜欢吃花生?", "options": ["Bill", "Sue", "John", "None of the above"], "answer": 'A'}]
正常来讲,经过大语言模型推理(call_qwen_api
函数)及答案提取(extract
函数)后,questions
包含问题背景下所有具体问题及选项,同时还应当含有answer
键及其值。一般都是完整的,除非该问题在进行推理时API调用异常。
2、定义函数filter_problems
,去除被重复处理的问题相关信息。若一个问题背景被多次推理,以最后一次推理的结果为准。
def filter_problems(data):
result = [] ## 用于存储已回答完整的问题相关信息
problem_set = set() ## 用于存储被完整回答的问题背景
for item in data: ## 每个item表示包含由推理得到的答案选项在内的问题相关信息
problem = item['problem']
if problem in problem_set: ## 如果当前问题背景已经被回答过,执行下方操作
for existing_item in result: ## 遍历result列表,找到与当前问题背景相同的问题相关信息记录,在检查当前问题被完整回答的情况下,更新替换这个问题背景下的问题相关信息记录
if existing_item['problem'] == problem:
if has_complete_answer(item['questions']):
existing_item['questions'] = item['questions']
existing_item['id'] = item['id']
break
else: ## 如果当前问题背景未被回答过,且其下所有问题回答完整,将此问题的相关信息添加到result列表,将此问题背景添加到problem_set集合
if has_complete_answer(item['questions']):
result.append(item)
problem_set.add(problem)
return result
此处提供一个示例,展示item
的内容(与2.6的示例相比,增加了answer
键及其值):
{"problem": "有一群人和一些食物类型。下列是关于这些个体和食物的已知信息:\n\n1. 鸡肉是一种食物。\n2. 苹果是一种食物。\n3. 如果X吃了Y,且X活着,则Y是一种食物。\n4. Bill存活。\n5. Bill吃了花生。\n6. John吃所有食物。\n7. Sue吃所有Bill吃的食物。\n8. John喜欢所有食物。\n\n根据以上信息,回答以下选择题:", "questions": [{"question": "选择题 1:\n谁喜欢吃花生?", "options": ["Bill", "Sue", "John", "None of the above"], "answer": 'A'}], "id": "round1_test_data_000"}
3、启动整理,然后将问题相关信息按照序号进行排序。
return_list = filter_problems(return_list)
sorted_data = sorted(return_list, key=lambda x: int(str(x['id'])[-3:])) ## 表示选择每个问题相关信息中id键的值的后三位作为问题的序号,根据序号对问题相关信息进行排序
print(sorted_data)
3.2 补充未被推理回答的问题
对于在前面的过程中由于API调用异常而未能回答完整的题目,需要补充答案选项。
1、定义函数find_missing_ids
,将未被完整回答的题目的序号按顺序存储在列表中并返回。
def find_missing_ids(dict_list):
# 提取所有序号
extracted_ids = {int(d['id'][-3:]) for d in dict_list}
# 创建0-500的序号集合
all_ids = set(range(500))
# 找出缺失的序号 ## 因为测试集'round1_test_data.jsonl'文件中共有500个问题背景,因此这样可以找出未被完整回答的题号
missing_ids = all_ids - extracted_ids
return sorted(missing_ids)
2、从前面的sorted_data
中找出缺失的序号,这些缺失的序号对应着未被完整回答的题目的序号。
# 示例字典列表
dict_list = sorted_data ## 根据题目序号排序后的问题相关信息(含答案选项)
# 找出缺失的序号
missing_ids = find_missing_ids(dict_list)
print("缺失的序号:", missing_ids)
len(missing_ids)
3、对于在前面未被完整回答的题目,将答案选项统一设置为’A’,补充答案后将该问题相关信息添加到sorted_data
中。所有题目都被完整回答后再次对问题相关信息进行排序。
data = []
with open('round1_test_data.jsonl') as reader:
for id, line in enumerate(reader):
if (id in missing_ids): ## 对于在前面未被完整回答的题目,将答案选项统一设置为'A'
sample = json.loads(line)
for question in sample['questions']:
question['answer'] = 'A'
sorted_data.append(sample)
sorted_data = sorted(sorted_data, key=lambda x: int(str(x['id'])[-3:])) ## 重新进行排序
3.3 结果文件生成
将sorted_data
列表中的数据写进结果文件upload.jsonl
中。
with open('upload.jsonl', 'w') as writer:
for sample in sorted_data:
writer.write(json.dumps(sample, ensure_ascii=False))
writer.write('\n')
将upload.jsonl文件上传到比赛平台即可获得测评分数。
四、总结
总体而言,代码的执行逻辑如下图所示(该图由Datawhale AI夏令营提供):
各函数之间的调用关系如下图所示: