一、赛题理解
1. 赛题名称
AI量化模型预测
2. 赛题理解
本赛事是一个量化金融挑战,旨在通过大数据与机器学习的方法,使用给定的训练集和测试集数据,预测未来中间价的移动方向。参赛者需要理解市场行为的原理,创建量化策略,并利用过去不超过100个数据点的信息,对未来5、10、20、40和60个数据点的中间价进行预测(下跌、不变或上涨)。挑战包含10只股票,79个交易日的数据,其中前64个交易日用于训练,后15个用于测试。数据集包括股票的5档量价、中间价、交易量等信息。
3. 赛题链接2023 iFLYTEK A.I.开发者大赛-讯飞开放平台 (xfyun.cn)https://challenge.xfyun.cn/topic/info?type=quantitative-model4. 评估指标
macro-F1 score,具体公式如下:
二、数据竞赛开发步骤
1. 问题分析
问题分析是竞赛的第一步,它涉及对问题进行定义、目标明确化和数据需求的识别。
2. 数据清洗
在采集数据完后,对数据进行数据清洗,即把采集到的、不适合用来做机器学习训练的数据进行预处理,从而转化为适合机器学习的数据。
3. 数据探索
对清洗后的数据进行可视化和统计分析的过程。通过数据探索,可以深入了解数据的特征、分布、相关性等情况,发现数据之间的模式和趋势,为特征工程和特征筛选提供指导。常见的数据探索方法包括绘制直方图、散点图、箱线图等图形,计算统计指标如均值、方差、相关系数等。
4. 特征工程
根据数据的领域知识和探索结果,对原始数据进行变换、组合、衍生等操作,以创建更有意义、更能表达问题特征的新特征。好的特征工程可以提高模型的性能和泛化能力。常见的特征工程包括特征缩放、编码分类变量、处理时间序列数据、创建多项式特征等。
5. 特征筛选
通过一定的评估方法选择对模型训练最有用的特征,去除冗余或不相关的特征。这样可以降低模型复杂度,加快训练速度,减少过拟合的可能性,并提高模型的解释性。常见的特征筛选方法包括基于统计的方法、基于模型的方法和基于特征重要性的方法。
6. 模型训练
使用清洗、探索和筛选后的数据来训练机器学习模型的过程。根据问题的类型和数据的特点,可以选择合适的机器学习算法和模型架构进行训练。训练过程涉及优化模型参数,使其能够最好地拟合训练数据,并能在未见过的数据上泛化。
7. 模型保存
模型训练完成后,需要将训练得到的模型保存起来,以便在后续部署和使用中使用。模型保存可以包括保存模型权重、参数和架构,以及相关的辅助信息。在实际应用中,保存的模型可以用于预测新数据的结果或作为其他任务的基础。
当然,对于上面的特征工程不是一步到位的,而是不断的通过模型训练的结果进行不断的优化调整,直到最优的特征组合。
三、 baseline详细解读
1. 数据读取
首先使用os模块中的listdir读取训练集的文件列表保存到train_files中,创建一个空的训练数据DataFrame,然后遍历每个训练集文件使用pd.read_csv进行csv文件读取并存入临时的DataFrame:tmp中,同时将当前文件名作为列添加到临时tmp,最后将当前临时DataFrame与主训练数据DataFrame进行合并。测试集与上面训练集一样。
# 定义文件路径
path = 'AI量化模型预测挑战赛公开数据/'
# 读取训练集文件列表
train_files = os.listdir(path+'train')
# 创建一个空的训练数据DataFrame
train_df = pd.DataFrame()
# 遍历每个训练集文件
for filename in tqdm.tqdm(train_files):
# 读取CSV文件并存入临时DataFrame
tmp = pd.read_csv(path+'train/'+filename)
# 将当前文件名作为列添加到临时DataFrame
tmp['file'] = filename
# 将当前临时DataFrame与主训练数据DataFrame进行合并
train_df = pd.concat([train_df, tmp], axis=0, ignore_index=True)
# 读取测试集文件列表
test_files = os.listdir(path+'test')
# 创建一个空的测试数据DataFrame
test_df = pd.DataFrame()
# 遍历每个测试集文件
for filename in tqdm.tqdm(test_files):
# 读取CSV文件并存入临时DataFrame
tmp = pd.read_csv(path+'test/'+filename)
# 将当前文件名作为列添加到临时DataFrame
tmp['file'] = filename
# 将当前临时DataFrame与主测试数据DataFrame进行合并
test_df = pd.concat([test_df, tmp], axis=0, ignore_index=True)
在通过遍历文件目录,再使用pd.read_csv读取文件,文件比较多,因此读取慢。如果把读取后的数据直接保存到本地,再直接加载,那么就会提高读取的速度。python中有一个内置库pickle,可以保存程序中的变量到本地,然后通过pickle加载本地数据就可以实现读取速度优化。
# 训练集和测试集文件保存
# Save 'train_df' to a file using pickle
with open('train_df.pickle', 'wb') as f:
pickle.dump(train_df, f)
# Save 'test_df' to a file using pickle
with open('test_df.pickle', 'wb') as f:
pickle.dump(test_df, f)
# 训练集和测试集文件读取
# Load 'train_df' from the saved file
with open('train_df.pickle', 'rb') as f:
train_df = pickle.load(f)
# Load 'test_df' from the saved file
with open('test_df.pickle', 'rb') as f:
test_df = pickle.load(f)
2. 特征构造
(1)提取时间信息,同时为了保证时间的一致性,故进行排序
# 时间相关特征
train_df['hour'] = train_df['time'].apply(lambda x:int(x.split(':')[0]))
test_df['hour'] = test_df['time'].apply(lambda x:int(x.split(':')[0]))
train_df['minute'] = train_df['time'].apply(lambda x:int(x.split(':')[1]))
test_df['minute'] = test_df['time'].apply(lambda x:int(x.split(':')[1]))
# 为了保证时间顺序的一致性,故进行排序
train_df = train_df.sort_values(['file','time'])
test_df = test_df.sort_values(['file','time'])
(2)计算每个样本的加权平均价格(Weighted Average Price,WAP)作为新的特征,并将其命名为'wap1', 'wap2', 'wap3', 'wap4'。通过对买入价和卖出价以及对应的数量进行加权平均,旨在更好地反映市场交易价格的趋势和情况。
# 对训练集进行特征工程
# 计算买一价和卖一价的加权平均作为新特征'wap1',加权平均的计算方式是:(买一价 * 买一量 + 卖一价 * 卖一量) / (买一量 + 卖一量)
train_df['wap1'] = (train_df['n_bid1'] * train_df['n_bsize1'] + train_df['n_ask1'] * train_df['n_asize1']) / (train_df['n_bsize1'] + train_df['n_asize1'])
# 计算买二价和卖二价的加权平均作为新特征'wap2',加权平均的计算方式同样是:(买二价 * 买二量 + 卖二价 * 卖二量) / (买二量 + 卖二量)
train_df['wap2'] = (train_df['n_bid2'] * train_df['n_bsize2'] + train_df['n_ask2'] * train_df['n_asize2']) / (train_df['n_bsize2'] + train_df['n_asize2'])
# 计算买三价和卖三价的加权平均作为新特征'wap3',加权平均的计算方式同样是:(买三价 * 买三量 + 卖三价 * 卖三量) / (买三量 + 卖三量)
train_df['wap3'] = (train_df['n_bid3'] * train_df['n_bsize3'] + train_df['n_ask3'] * train_df['n_asize3']) / (train_df['n_bsize3'] + train_df['n_asize3'])
# 计算买四价和卖四价的加权平均作为新特征'wap4',加权平均的计算方式同样是:(买四价 * 买四量 + 卖四价 * 卖四量) / (买四量 + 卖四量)
train_df['wap4'] = (train_df['n_bid4'] * train_df['n_bsize4'] + train_df['n_ask4'] * train_df['n_asize4']) / (train_df['n_bsize4'] + train_df['n_asize4'])
# 对测试集进行特征工程,步骤同训练集一样
test_df['wap1'] = (test_df['n_bid1'] * test_df['n_bsize1'] + test_df['n_ask1'] * test_df['n_asize1']) / (test_df['n_bsize1'] + test_df['n_asize1'])
test_df['wap2'] = (test_df['n_bid2'] * test_df['n_bsize2'] + test_df['n_ask2'] * test_df['n_asize2']) / (test_df['n_bsize2'] + test_df['n_asize2'])
test_df['wap3'] = (test_df['n_bid3'] * test_df['n_bsize3'] + test_df['n_ask3'] * test_df['n_asize3']) / (test_df['n_bsize3'] + test_df['n_asize3'])
test_df['wap4'] = (test_df['n_bid4'] * test_df['n_bsize4'] + test_df['n_ask4'] * test_df['n_asize4']) / (test_df['n_bsize4'] + test_df['n_asize4'])
(3)围绕买卖价格和买卖量进行计算
'wap_balance': 该特征计算了'wap1'和'wap2'之间的差值的绝对值。反映出买卖价格之间的平衡情况,即市场在买卖价格上的趋势或震荡程度。
'bid_spread': 该特征计算了买一价和买二价之间的差值。展示买价的变化情况,可能与市场的需求和供应有关。
'ask_spread': 该特征计算了卖一价和卖二价之间的差值。类似于'bid_spread',展示卖价的变化情况。
'total_volume': 该特征计算了买一量、买二量、卖一量和卖二量之和。表示市场的总交易量,可能与市场的活跃度和流动性有关。
'volume_imbalance': 该特征计算了买一量、买二量、卖一量和卖二量之间的差值的绝对值。表示买入和卖出量之间的差异,可能反映了市场上买卖双方的力量对比。
# 对训练集进行当前时间特征的构建
# 计算'wap1'和'wap2'之间的差值的绝对值作为新特征'wap_balance'
train_df['wap_balance'] = abs(train_df['wap1'] - train_df['wap2'])
# 计算买一价和买二价之间的差值作为新特征'bid_spread'
train_df['bid_spread'] = train_df['n_bid1'] - train_df['n_bid2']
# 计算卖一价和卖二价之间的差值作为新特征'ask_spread'
train_df['ask_spread'] = train_df['n_ask1'] - train_df['n_ask2']
# 计算买一量、买二量、卖一量和卖二量之和作为新特征'total_volume'
train_df['total_volume'] = (train_df['n_asize1'] + train_df['n_asize2']) + (train_df['n_bsize1'] + train_df['n_bsize2'])
# 计算买一量、买二量、卖一量和卖二量之间的差值的绝对值作为新特征'volume_imbalance'
train_df['volume_imbalance'] = abs((train_df['n_asize1'] + train_df['n_asize2']) - (train_df['n_bsize1'] + train_df['n_bsize2']))
# 对测试集进行当前时间特征的构建,步骤同训练集一样
test_df['wap_balance'] = abs(test_df['wap1'] - test_df['wap2'])
test_df['bid_spread'] = test_df['n_bid1'] - test_df['n_bid2']
test_df['ask_spread'] = test_df['n_ask1'] - test_df['n_ask2']
test_df['total_volume'] = (test_df['n_asize1'] + test_df['n_asize2']) + (test_df['n_bsize1'] + test_df['n_bsize2'])
test_df['volume_imbalance'] = abs((test_df['n_asize1'] + test_df['n_asize2']) - (test_df['n_bsize1'] + test_df['n_bsize2']))
(4) 历史平移
在训练集和测试集中,对特定的列进行历史平移,通过shift
函数,将当前样本的特征值移动到之前的时间点。对于每个要平移的特征(如'wap1','wap2','wap_balance'等),使用不同的时间偏移量(1、5、10、20、40、60)来创建新特征。例如,'file_wap1_shift1'表示'wap1'特征向前平移一个时间点。
# 历史平移
# 对每个特征进行不同时间偏移量的历史平移,生成新特征
for val in ['wap1','wap2','wap_balance','price_spread','bid_spread','ask_spread','total_volume','volume_imbalance']:
for loc in [1,5,10,20,40,60]:
# 对训练集进行历史平移,按照文件分组后对特征进行时间偏移,shift(loc)表示向前平移loc个时间点
train_df[f'file_{val}_shift{loc}'] = train_df.groupby(['file'])[val].shift(loc)
# 对测试集进行历史平移,步骤同训练集一样
test_df[f'file_{val}_shift{loc}'] = test_df.groupby(['file'])[val].shift(loc)
(5)差分特征
类似于历史平移,对每个要处理的特征进行差分计算。通过diff
函数,计算当前样本与之前时间点的差异,用于观察特征值之间的增长关系。对于每个要处理的特征和不同的时间差(1、5、10、20、40、60),生成新特征。例如,'file_wap1_diff1'表示'wap1'特征与前一个时间点的差异。
# 差分特征
# 对每个特征进行不同时间差的差分计算,生成新特征
for val in ['wap1','wap2','wap_balance','price_spread','bid_spread','ask_spread','total_volume','volume_imbalance']:
for loc in [1,5,10,20,40,60]:
# 对训练集进行差分计算,按照文件分组后对特征进行时间差计算,diff(loc)表示与前loc个时间点的差异
train_df[f'file_{val}_diff{loc}'] = train_df.groupby(['file'])[val].diff(loc)
# 对测试集进行差分计算,步骤同训练集一样
test_df[f'file_{val}_diff{loc}'] = test_df.groupby(['file'])[val].diff(loc)
(6)窗口统计
使用滑动窗口来计算历史信息的统计数据,如均值和标准差。对于每个要处理的特征,分别计算滑动窗口大小为7的均值和标准差。这样可以获取历史信息在窗口内的分布变化信息。例如,'file_wap1_win7_mean'表示'wap1'特征在滑动窗口大小为7的窗口内的均值。
# 窗口统计
# 对每个特征进行滑动窗口统计,生成均值和标准差特征
for val in ['wap1','wap2','wap_balance','price_spread','bid_spread','ask_spread','total_volume','volume_imbalance']:
# 计算滑动窗口大小为7的均值特征,min_periods=3表示窗口内至少包含3个有效值
train_df[f'file_{val}_win7_mean'] = train_df.groupby(['file'])[val].transform(lambda x: x.rolling(window=7, min_periods=3).mean())
# 计算滑动窗口大小为7的标准差特征
train_df[f'file_{val}_win7_std'] = train_df.groupby(['file'])[val].transform(lambda x: x.rolling(window=7, min_periods=3).std())
# 对测试集进行窗口统计,步骤同训练集一样
test_df[f'file_{val}_win7_mean'] = test_df.groupby(['file'])[val].transform(lambda x: x.rolling(window=7, min_periods=3).mean())
test_df[f'file_{val}_win7_std'] = test_df.groupby(['file'])[val].transform(lambda x: x.rolling(window=7, min_periods=3).std())
3. 模型训练
(1)cv_model函数
参数:clf是机器学习模型的类对象,train_x是训练集特征,train_y是训练集标签,test_x是测试集特征,clf_name是模型的名称,seed是随机种子。
返回:函数返回训练集的交叉验证预测结果oof和测试集的预测结果test_predict。
交叉验证:
(2)交叉验证
使用5折交叉验证(folds = 5),将训练数据集分成5个子集,依次用其中4个子集作为训练集,另一个子集作为验证集,进行5轮训练和验证,以得到更稳定的模型评估结果。
(3)模型训练和预测:
对于每一折,根据指定的模型名称(clf_name),选择相应的分类器(LightGBM、XGBoost或CatBoost)。设置分类器的超参数,例如学习率、树的深度、正则化参数等。使用训练数据集(trn_x和trn_y)进行模型训练,并使用验证数据集(val_x)进行验证。使用训练好的模型对验证数据集和测试数据集(test_x)进行预测。
(4)模型融合:
每一折,将验证集的预测结果(val_pred)存储在oof中,用于后续模型融合。
每一折,将测试集的预测结果(test_pred)进行累加,最后除以折数(kf.n_splits)得到测试集的平均预测结果。
(5)F1得分和评估:
每一折,计算验证集预测结果的F1得分,并将其存储在cv_scores列表中。
(6)模型融合和输出:
将三个模型(LightGBM、XGBoost和CatBoost)的测试集预测结果取平均,得到最终的测试集预测结果final_test,用于提交和生成最终的预测结果。
def cv_model(clf, train_x, train_y, test_x, clf_name, seed = 2023):
folds = 5
kf = KFold(n_splits=folds, shuffle=True, random_state=seed)
oof = np.zeros([train_x.shape[0], 3])
test_predict = np.zeros([test_x.shape[0], 3])
cv_scores = []
for i, (train_index, valid_index) in enumerate(kf.split(train_x, train_y)):
print('************************************ {} ************************************'.format(str(i+1)))
trn_x, trn_y, val_x, val_y = train_x.iloc[train_index], train_y[train_index], train_x.iloc[valid_index], train_y[valid_index]
if clf_name == "lgb":
train_matrix = clf.Dataset(trn_x, label=trn_y)
valid_matrix = clf.Dataset(val_x, label=val_y)
params = {
'boosting_type': 'gbdt',
'objective': 'multiclass',
'num_class':3,
'min_child_weight': 6,
'num_leaves': 2 ** 6,
'lambda_l2': 10,
'feature_fraction': 0.8,
'bagging_fraction': 0.8,
'bagging_freq': 4,
'learning_rate': 0.35,
'seed': 2023,
'nthread' : 16,
'verbose' : -1,
}
model = clf.train(params, train_matrix, 2000, valid_sets=[train_matrix, valid_matrix],
categorical_feature=[], verbose_eval=1000, early_stopping_rounds=100)
val_pred = model.predict(val_x, num_iteration=model.best_iteration)
test_pred = model.predict(test_x, num_iteration=model.best_iteration)
if clf_name == "xgb":
xgb_params = {
'booster': 'gbtree',
'objective': 'multi:softprob',
'num_class':3,
'max_depth': 5,
'lambda': 10,
'subsample': 0.7,
'colsample_bytree': 0.7,
'colsample_bylevel': 0.7,
'eta': 0.35,
'tree_method': 'hist',
'seed': 520,
'nthread': 16
}
train_matrix = clf.DMatrix(trn_x , label=trn_y)
valid_matrix = clf.DMatrix(val_x , label=val_y)
test_matrix = clf.DMatrix(test_x)
watchlist = [(train_matrix, 'train'),(valid_matrix, 'eval')]
model = clf.train(xgb_params, train_matrix, num_boost_round=2000, evals=watchlist, verbose_eval=1000, early_stopping_rounds=100)
val_pred = model.predict(valid_matrix)
test_pred = model.predict(test_matrix)
if clf_name == "cat":
params = {'learning_rate': 0.35, 'depth': 5, 'bootstrap_type':'Bernoulli','random_seed':2023,
'od_type': 'Iter', 'od_wait': 100, 'random_seed': 11, 'allow_writing_files': False,
'loss_function': 'MultiClass'}
model = clf(iterations=2000, **params)
model.fit(trn_x, trn_y, eval_set=(val_x, val_y),
metric_period=1000,
use_best_model=True,
cat_features=[],
verbose=1)
val_pred = model.predict_proba(val_x)
test_pred = model.predict_proba(test_x)
oof[valid_index] = val_pred
test_predict += test_pred / kf.n_splits
F1_score = f1_score(val_y, np.argmax(val_pred, axis=1), average='macro')
cv_scores.append(F1_score)
print(cv_scores)
return oof, test_predict
# 选择lightgbm模型
lgb_oof, lgb_test = cv_model(lgb, train_df[cols], train_df['label_5'], test_df[cols], 'lgb')
# 选择xgboost模型
xgb_oof, xgb_test = cv_model(xgb, train_df[cols], train_df['label_5'], test_df[cols], 'xgb')
# 选择catboost模型
cat_oof, cat_test = cv_model(CatBoostClassifier, train_df[cols], train_df['label_5'], test_df[cols], 'cat')
# 进行取平均融合
final_test = (lgb_test + xgb_test + cat_test) / 3
注意:在使用进阶方案之后,会发现进阶方案比开始的方案要低的原因:
在进阶方案中新添加了许多的特征工程,而使得特征数量增多,而原来的模型训练中迭代次数很少,这就会导致模型还没有学习到特征而提前终止了模型训练,此时就需要我们自己手动调整迭代次数,可以增大到5000或者更大,有早停处理,不用担心会一直训练下去。当然学习率调得很低的话会导致模型迭代次数增大,如果觉得模型不断的迭代太多了的话可以调大学习率。
四、后续优化方案
1. 补充完整特征工程
由于上面的特征工程---围绕买卖价格和买卖量进行计算时只构建买一卖一和买二卖二相关特征,所以可以先将上面的特征进行完善。
2. 特征筛选
(1)相关性
通过统计单个特征与目标变量之间的相关性,选取相关性较高的特征。
(2)递归特征消除
通过递归地拟合模型并去除对模型性能贡献较小的特征,直到达到所需的特征数量
(3)特征重要性
通过查看模型内部的特征重要性指标
(4)具有空重要性的特征选择
使用目标排列的特征选择过程测试与噪声(混洗目标)拟合时特征重要性分布的实际重要性显着性。
Feature Selection with Null Importances | Kaggle
强烈推荐此方案
3. 特征交叉
选取特征重要性排名靠前进行特征交叉,可以选择排名靠前的特征组合来构建新的特征,从而增强模型的表现。
4. 模型调参
机器学习和深度学习中,通过调整模型的超参数来优化模型性能。超参数是在模型训练之前设置的参数,它们不会在训练过程中自动学习,而需要手动选择。
一般常用的调参方法:
a. 网格搜索(Grid Search):遍历所有超参数的组合,逐一尝试。虽然它是全面搜索,但在超参数空间较大时计算代价较高。
b. 随机搜索(Random Search):随机从超参数空间中抽取组合进行尝试。相比网格搜索,它的计算代价较低,但可能需要更多的尝试次数。
c. 贝叶斯优化(Bayesian Optimization):通过构建超参数组合的概率模型,根据模型对性能的估计来选择最有可能改善性能的组合。
d. 遗传算法(Genetic Algorithms):借鉴自然进化的原理,通过基因交叉和变异等方式逐渐优化超参数组合。
网格搜索一般在数据量很大时不建议使用,会导致迭代的次数很多,而无法跑出最优的参数组合。强烈建议使用贝叶斯优化进行模型调参
5. 特征构造
(1)uuid存在多个数据样本,可以进行mean,max,min,std,skew等数据的统计。
(2)进行业务特征的构造,比如移动平均线(ma),指数加权移动平均线(ema),移动平均线收敛/发散指标(macd),基于随机指标的技术分析指标(kdj),相对强弱指数(rsi)等。
6. stacking集成
stacking是一种分层模型集成框架。以两层为例,第一层由多个基学习器组成,其输入为原始训练集,第二层的模型则是以第一层基学习器的输出作为特征加入训练集进行再训练,从而得到完整的stacking模型。
五、降低内存方法
比赛按照上面的baseline会读取很大的数据内存,因此在此处放入一个降低内存的方法:
通过减少数字列的数据类型并将对象列转换为分类类型来优化 DataFrame 'train_df' 的内存使用量,从而减少内存使用量。
predictor_columns = [col for col in test_df.columns if col not in ['uuid','time','file']]
def reduce_mem_usage(df):
start_mem = df.memory_usage().sum() / 1024**2
print('Memory usage of dataframe is {:.2f} MB'.format(start_mem))
for col in tqdm.tqdm(predictor_columns):
col_type = df[col].dtype
if col_type != object:
c_min = df[col].min()
c_max = df[col].max()
if str(col_type)[:3] == 'int':
if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
df[col] = df[col].astype(np.int8)
elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
df[col] = df[col].astype(np.int16)
elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
df[col] = df[col].astype(np.int32)
elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
df[col] = df[col].astype(np.int64)
else:
if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
df[col] = df[col].astype(np.float16)
elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
df[col] = df[col].astype(np.float32)
else:
df[col] = df[col].astype(np.float64)
else:
df[col] = df[col].astype('category')
end_mem = df.memory_usage().sum() / 1024**2
print('Memory usage after optimization is: {:.2f} MB'.format(end_mem))
print('Decreased by {:.1f}%'.format(100 * (start_mem - end_mem) / start_mem))
return df
train_df = reduce_mem_usage(train_df)
科大讯飞AI量化模型预测挑战赛_Baseline - 飞桨AI Studio (baidu.com)