赛题流程
初赛数据集为逻辑推理数据,其中训练集中包含500条训练数据,测试集中包含500条测试数据。每个问题包括若干子问题,每个子问题为单项选择题,选项不定(最多5个)。目标是为每个子问题选择一个正确答案。推理答案基于闭世界假设(closed-world assumption),即未观测事实或者无法推断的事实为假。
具体的,每条训练数据包含 content
, questions
字段,其中content
是题干,questions
为具体的子问题。questions
是一个子问题列表,每个子问题包括options
和answer
字段,其中options
是一个列表,包含具体的选项,按照ABCDE顺序排列,answer
是标准答案。
具体的,我们来看一个例子,选自测试集中的round1_test_data_000
,即第一条:
{
'problem':
'有一群人和一些食物类型。下列是关于这些个体和食物的已知信息:\n\n1. 鸡肉是一种食物。\n2. 苹果是一种食物。\n3. 如果X吃了Y,且X活着,则Y是一种食物。\n4. Bill存活。\n5. Bill吃了花生。\n6. John吃所有食物。\n7. Sue吃所有Bill吃的食物。\n8. John喜欢所有食物。\n\n根据以上信息,回答以下选择题:',
'questions': [
{
'question': '选择题 1:\n谁喜欢吃花生?',
'options': ['Bill', 'Sue', 'John', 'None of the above']
}
], 'id': 'round1_test_data_000'
}
这里需要通过数据处理,将上面的问题字典转化为MD格式的prompt。这就是我们让大语言模型能理解并做分析的秘诀,转化结果为:
你是一个逻辑推理专家,擅长解决逻辑推理问题。以下是一个逻辑推理的题目,形式为单项选择题。所有的问题都是(close-world assumption)闭世界假设,即未观测事实都为假。请逐步分析问题并在最后一行输出答案,最后一行的格式为"答案是:A"。题目如下:
###题目:
有一群人和一些食物类型。下列是关于这些个体和食物的已知信息:
鸡肉是一种食物。
苹果是一种食物。
如果X吃了Y,且X活着,则Y是一种食物。
Bill存活。
Bill吃了花生。
John吃所有食物。
Sue吃所有Bill吃的食物。
John喜欢所有食物。
根据以上信息,回答以下选择题:
###问题:
选择题 1:
谁喜欢吃花生?
A. Bill
B. Sue
C. John
D. None of the above
结合上面的内容,调用Qwen2-1.5b-instruct
的内容返回为:
{
"status_code": 200,
"request_id": "18d43478-2aec-92be-8840-cd3bb08009cf",
"code": "",
"message": "",
"output": {
"text": null,
"finish_reason": null,
"choices": [
{
"finish_reason": "stop",
"message": {
"role": "assistant",
"content": "我们可以通过分析给出的信息来解答这个问题。\n\n首先,我们知道Bill吃了花生(根据第5条信息),并且Bill还活着(根据第4条信息)。根据第3条规则,“如果X吃了Y,且X活着,则Y是一种食物”,我们可以推断出花生是一种食物。\n\n接下来,我们知道John吃所有食物(根据第6条信息),并且John喜欢所有食物(根据第8条信息)。既然花生是一种食物,那么John应该也吃花生,并且喜欢花生。\n\nSue吃所有Bill吃的食物(根据第7条信息),既然Bill吃了花生,那么Sue也应该吃花生。\n\n所以,喜欢吃花生的人有Bill、Sue和John。\n\n答案是:C. John\n\n但是,题目要求选择一个选项,而实际上Bill、Sue和John都喜欢吃花生。然而,在给出的选项中,只有John被明确列出作为喜欢吃花生的人(选项C)。因此,虽然Bill和Sue也喜欢吃花生,但根据题目要求,正确答案应当是包含在选项中的那个,即John。\n\n答案是:C. John"
}
}
]
},
"usage": {
"input_tokens": 210,
"output_tokens": 223,
"total_tokens": 433
}
}
所以问题进一步变为了优化output['choices']['message']['content']
对应value的值,我们需要对其进行后处理,最终生成比赛官方要求的结果:
{'id': 'round1_test_data_000',
'questions': [{'answer': 'A'}, {'answer': 'D'}, ...], # 顺序与子问题对应
}
baseline介绍
主流程
上述举例描述的流程,画出要写代码的流程图如下所示:
大模型配置
环境导入
首先导入相应的api包:
from multiprocessing import Process, Manager
import json
import os
from pprint import pprint
import re
from tqdm import tqdm
import random, uuid
import openai
import tiktoken
import json
import numpy as np
import requests
from retry import retry
from scipy import sparse
#from rank_bm25 import BM25Okapi
#import jieba
from http import HTTPStatus
import dashscope
from concurrent.futures import ThreadPoolExecutor, as_completed
from loguru import logger
import json
import time
from tqdm import tqdm
logger.remove() # 移除默认的控制台输出
logger.add("logs/app_{time:YYYY-MM-DD}.log", level="INFO", rotation="00:00", retention="10 days", compression="zip")
MODEL_NAME = 'qwen1.5-1.8b-chat'
设置重连API
因为是调用的阿里云服务,所以设置重连请求API:
def api_retry(MODEL_NAME, query):
max_retries = 5
retry_delay = 60 # in seconds
attempts = 0
while attempts < max_retries:
try:
return call_qwen_api(MODEL_NAME, query)
except Exception as e:
attempts += 1
if attempts < max_retries:
logger.warning(f"Attempt {attempts} failed for text: {query}. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
else:
logger.error(f"All {max_retries} attempts failed for text: {query}. Error: {e}")
raise
def call_qwen_api(MODEL_NAME, query):
# 这里采用dashscope的api调用模型推理,通过http传输的json封装返回结果
messages = [
{'role': 'user', 'content': query}]
response = dashscope.Generation.call(
MODEL_NAME,
messages=messages,
result_format='message', # set the result is message format.
)
if response.status_code == HTTPStatus.OK:
# print(response)
return response['output']['choices'][0]['message']['content']
else:
print('Request id: %s, Status code: %s, error code: %s, error message: %s' % (
response.request_id, response.status_code,
response.code, response.message
))
raise Exception()
答案生成部分
prompt模板函数
get_prompt的模版函数,通过字符串处理的方式拼接完整的prompt:
# 这里定义了prompt推理模版
def get_prompt(problem, question, options):
options = '\n'.join(f"{'ABCDEFG'[i]}. {o}" for i, o in enumerate(options))
prompt = f"""你是一个逻辑推理专家,擅长解决逻辑推理问题。以下是一个逻辑推理的题目,形式为单项选择题。所有的问题都是(close-world assumption)闭世界假设,即未观测事实都为假。请逐步分析问题并在最后一行输出答案,最后一行的格式为"答案是:A"。题目如下:
### 题目:
{problem}
### 问题:
{question}
{options}
"""
# print(prompt)
return prompt
### 抽取函数 通过抽取函数可以将大语言模型生成的结果抽取成答案对应的选项,这里的匹配原则和prompt呼应。我们可以看到prompt要求【最后一行的格式为"答案是:A"】这样的规范,那么我们采用正则表达式re.compile方法匹配到答案对应的选项。当我们匹配为空时,我们默认选"A"。
def extract(input_text):
# 这里使用extract抽取模获得抽取的结果
ans_pattern = re.compile(r"答案是:(.)", re.S)
problems = ans_pattern.findall(input_text)
# print(problems)
if(problems == ''):
return 'A'
return problems[0]
多线程处理
这里开了16个worker,因为测试集是500条,其中question里包含了子问题,所以总共加起来大概会有1328次问答结果,又考虑阿里云api的并发限制,所以加个睡眠时间确保更顺利的出来结果。
def process_datas(datas,MODEL_NAME):
results = []
# 定义线程池 选择16线程
with ThreadPoolExecutor(max_workers=16) as executor:
# 这里我们使用future_data 存储每个线程的数据
future_data = {}
# 这里的lens记录了调用api的次数,也就是我们每个问题背景下的所有子问题之和。
lens = 0
# 送入多线程任务
# 这里每个data下是一个问题背景,其中包含多个子问题。
for data in tqdm(datas, desc="Submitting tasks", total=len(datas)):
problem = data['problem']
# 这里面我们用enumerate方法每次循环得到问题的序号id和实际的问题。
for id,question in enumerate(data['questions']):
prompt = get_prompt(problem,
question['question'],
question['options'],
)
# 这里送入线程池等待处理,使用api_retry,向api_retry传入MODEL_NAME, prompt参数
future = executor.submit(api_retry, MODEL_NAME, prompt)
# 每个线程我们存储对应的json问题数据以及问题序号id,这样我们就能定位出执行的是哪个子问题
future_data[future] = (data,id)
time.sleep(0.6) # 控制每0.6秒提交一个任务 防止接口超过并发数
lens += 1
# 处理多线程任务
for future in tqdm(as_completed(future_data), total=lens, desc="Processing tasks"):
# print('data',data)
# 取出每个线程中的字典数据及对应的问题id
data = future_data[future][0]
problem_id = future_data[future][1]
try:
# 获取api运行结果
res = future.result()
# 抽取大语言模型返回结果
extract_response = extract(res)
# print('res',extract_response)
# 装入answer字段
data['questions'][problem_id]['answer'] = extract_response
# 在结果列表中新增数据字典
results.append(data)
# print('data',data)
except Exception as e:
logger.error(f"Failed to process text: {data}. Error: {e}")
return results
答案生成主函数
def main(ifn, ofn):
if os.path.exists(ofn):
pass
data = []
# 按行读取数据
with open(ifn) as reader:
for line in reader:
sample = json.loads(line)
data.append(sample)
datas = data
# print(data)
# 均匀地分成多个数据集
return_list = process_datas(datas,MODEL_NAME)
print(len(return_list))
print("All tasks finished!")
return return_list
生成结果为:
Submitting tasks: 0%| | 2/500 [00:02<10:52, 1.31s/it]
{"status_code": 200, "request_id": "97b8fbae-528c-9ecd-9370-0f2c167843b0", "code": "", "message": "", "output": {"text": null, "finish_reason": null, "choices": [{"finish_reason": "stop", "message": {"role": "assistant", "content": "分析已知信息:\n- 我们知道鸡肉和苹果是食物。\n- Bill存活并吃了花生。\n- John吃所有食物,因此他吃了鸡肉、苹果和花生。\n- Sue吃所有Bill吃的食物,所以她吃了鸡肉、苹果和花生。\n- John喜欢所有食物,并且已经吃了这些食物。\n\n根据以上信息,我们可以得出结论:\n- Bill吃了花生。\n- Sue也吃了花生,因为她吃所有Bill吃的食物。\n- John吃了花生,因为他吃所有食物。\n- 所以,至少Bill、Sue和John都吃了花生。\n\n答案是:D. None of the above"}}]}, "usage": {"input_tokens": 210, "output_tokens": 128, "total_tokens": 338}}
Submitting tasks: 1%| | 3/500 [00:03<08:09, 1.01it/s]
......
纠错与结果文件生成
去重与排序
将一个问题背景下的所有问题存入同一个字典,并按id序号排序。
def has_complete_answer(questions):
# 这里假设完整答案的判断逻辑是:每个question都有一个'answer'键
for question in questions:
if 'answer' not in question:
return False
return True
def filter_problems(data):
result = []
problem_set = set()
for item in data:
# print('处理的item' ,item)
problem = item['problem']
if problem in problem_set:
# 找到已存在的字典
for existing_item in result:
if existing_item['problem'] == problem:
# 如果当前字典有完整答案,替换已存在的字典
if has_complete_answer(item['questions']):
existing_item['questions'] = item['questions']
existing_item['id'] = item['id']
break
else:
# 如果当前字典有完整答案,添加到结果列表
if has_complete_answer(item['questions']):
result.append(item)
problem_set.add(problem)
return result
return_list = filter_problems(return_list)
# 排序工作 通过id字段后三位代表序号
sorted_data = sorted(return_list, key=lambda x: int(str(x['id'])[-3:]))
print(sorted_data)
"""
"""
纠错
def find_missing_ids(dict_list):
# 提取所有序号
extracted_ids = {int(d['id'][-3:]) for d in dict_list}
# 创建0-500的序号集合
all_ids = set(range(500))
# 找出缺失的序号
missing_ids = all_ids - extracted_ids
return sorted(missing_ids)
# 示例字典列表
dict_list = sorted_data
# 找出缺失的序号
missing_ids = find_missing_ids(dict_list)
print("缺失的序号:", missing_ids)
len(missing_ids)
"""
缺失的序号: [33, 67, 93, 95, 183, 216, 229, 267, 309, 361]
"""
错误的题号可以回溯整个过程,分析哪部分出了问题。而这里为了简单起见,针对缺失的这十个题号直接补错。
补错
针对空缺的列表我们进行补错,让每个answer字段默认填充为A,当然如果这种补错机制大家觉得不满意可以再送入多线程函数处理一边。
data = []
with open('round1_test_data.jsonl') as reader:
for id,line in enumerate(reader):
if(id in missing_ids):
sample = json.loads(line)
for question in sample['questions']:
question['answer'] = 'A'
sorted_data.append(sample)
sorted_data = sorted(sorted_data, key=lambda x: int(str(x['id'])[-3:]))
生成比赛提交文件
with open('upload.jsonl', 'w') as writer:
for sample in sorted_data:
writer.write(json.dumps(sample, ensure_ascii=False))
writer.write('\n')
比赛部分结果
根据我看到的一些对照实验,大致的结果如下表所示,其中我跑过的是GPT-4o-mini
、Qwen2-7B-Instruct
和 Qwen1.5-1.8b-chat
,其中最后的1.8b,结果很稳定,是0.3494,而一直到7月底,阿里云官方对该api调用都是免费,往7B开始,就需要收费,在多线程处理小节中,我提到总共需要调用api为1328次,7B的费用大概在0.8元左右,即跑一次,gpt4omini的话大概是0.3美金左右。
模型名称 | 线上分数范围 |
---|---|
GPT-4o | 0.82以上 |
GPT-4o-mini | 0.76 - 0.78 |
Qwen2-72B-Instruct | 0.8左右 |
Qwen2-7B-Instruct | 0.65 - 0.66 |
Qwen1.5-1.8b-chat | 0.3494 |
reference
https://datawhaler.feishu.cn/wiki/CvNRwdXDHimxJskZaArcvYqDnIc