目录
写在前面:
下载日数据
下载“新增合约”日数据
下载“待更新合约”日数据
日数据文件
“选择日数据所在目录”按钮点击
“执行”按钮点击
sqlite3代码
按钮点击后执行的代码
子线程代码
写在前面:
本文默认已经创建了项目,如果不知道如何创建一个空项目的,请参看以下两篇博文
PyQt5将项目搬到一个新的虚拟环境中
https://blog.csdn.net/m0_37967652/article/details/122625280
python_PyQt5开发工具结构基础
https://blog.csdn.net/m0_37967652/article/details/131969032
前序:
【期货日数据维护与使用_日数据维护_sqlite3数据库创建】博文
【期货日数据维护与使用_日数据维护_界面代码】博文
【期货日数据维护与使用_日数据维护_合约更新】博文
下载日数据
下载“新增合约”日数据
“新增合约”列表下载后在 new.json 中
在优矿中执行的代码
ticker_list = ["fu2501", "lu2501", "sc2501"]
df = DataAPI.MktFutdGet(secID=u"",ticker=ticker_list,tradeDate=u"",beginDate=u"",endDate=u"",exchangeCD="",field=u"",pandas="1")
df.to_csv('daily_20240106_00.csv',encoding='utf-8')
下载“待更新合约”日数据
“待更新合约”列表下载后在 {日期}.json,可能会有多个{日期}.json
在优矿中执行的代码
ticker_list = ["AP401", "AP403", "AP404", "AP405", ...]
df = DataAPI.MktFutdGet(secID=u"",ticker=ticker_list,tradeDate=u"",beginDate=u"20240103",endDate=u"",exchangeCD="",field=u"",pandas="1")
df.to_csv('daily_20240106_11.csv',encoding='utf-8')
日数据文件
日数据字段列表:
['secID','ticker','exchangeCD','secShortName','tradeDate','contractObject','contractMark','preSettlePrice','preClosePrice','openPrice','highestPrice','lowestPrice','closePrice','settlePrice','turnoverVol','turnoverValue','openInt','CHG','CHG1','CHGPct','mainCon','smainCon','limitUpPrice','limitDownPrice']
想知道字段含义,可以自行到优矿网站查看
“选择日数据所在目录”按钮点击
def choice_daily_dir_btn_clicked(self):
path = QtWidgets.QFileDialog.getExistingDirectory(
self,
'选择日数据所在目录',
SQLITE_FROM_DIR
)
if not path:
return
self.choice_daily_dir_lineedit.setText(path)
pass
“执行”按钮点击
更新日数据的过程在子线程中进行,任务表示定义为 self.mark_str_step_two 常量,常量定义在 init_data()方法中
更新日数据逻辑:
1 将下载所得的日数据文件合并到一个 pd.DataFrame 中
2 从 t_symbol_basemsg 中获取到 日数据包含的合约名的品种、交割年份信息
3 将日数据追加到合约的csv文件,如果是新增的合约,创建新的csv文件;同时也把日数据插入到 t_last30_daily
4 更新 t_online_symbol 数据
5 计算主力合约,按品种逐一进行计算,并更新csv文件
5.1 主力合约换月规则参照文化财经的换月规则
5.2 从 t_last30_daily 获取某品种最近的日数据
5.3 如果某日只有一个合约,该合约为主力合约
5.4 计算某日 “成交量最大” 和 “持仓量最大” 的合约
5.5 如果“成交量最大”和“持仓量最大”的合约是同一个,并且与之前的主力合约也是同一个,那主力合约延续,不切换
5.6 如果“成交量最大”和“持仓量最大”的合约是同一个,但与之前的主力合约不是同一个,那预备下一个交易日主力合约切换为今日同时“成交量最大”和“持仓量最大”的合约,到下一个交易日,如果预备切换的合约依然是同时“成交量最大”和“持仓量最大”,切换,如果不是,不切换,主力合约延续
5.7 如果“成交量最大”和“持仓量最大”的合约不是同一个,那只要“成交量最大”是之前的主力合约,或“持仓量最大”是之前的主力合约,主力合约延续,不切换
5.8 如果“成交量最大”和“持仓量最大”的合约不是同一个,且都不是之前的主力合约,那主力合约取“成交量最大”的合约作为新的主力合约,当日切换
sqlite3代码
t_symbol_basemsg 查询日数据合约对应的合约信息
def query_pro_ticker_deliYear_from_symbol_basemsg_by_query(tickers:List,deliYear:int):
conn = sqlite3.connect(YOUKUANG_DB_NAME)
c = conn.cursor()
# s
ticker_str = '\',\''.join(tickers)
ticker_str = '\'' + ticker_str + '\''
sql_str = '''
select product_code,ticker,deliYear from t_symbol_basemsg where ticker in ({tickers}) and deliYear>={deliYear}
'''.format(tickers=ticker_str,deliYear=deliYear)
c.execute(sql_str)
res_list = c.fetchall()
# e
conn.commit()
conn.close()
return res_list
t_last30_daily 批量插入数据
def batch_insert_last30_daily(pre_list: List):
conn = sqlite3.connect(YOUKUANG_DB_NAME)
c = conn.cursor()
# s
sql_str = '''
insert into t_last30_daily values (?,?,?,?,?,?,?,?,?,?,?,?)
'''
c.executemany(sql_str,pre_list)
# e
conn.commit()
conn.close()
pass
t_online_symbol 清空表格数据
def delete_all_data_of_online_symbol():
conn = sqlite3.connect(YOUKUANG_DB_NAME)
c = conn.cursor()
# s
sql_str = '''
delete from t_online_symbol
'''
c.execute(sql_str)
# e
conn.commit()
conn.close()
pass
t_online_symbol 批量插入数据
def batch_insert_online_symbol(pre_list:List):
conn = sqlite3.connect(YOUKUANG_DB_NAME)
c = conn.cursor()
# s
sql_str = '''
insert into t_online_symbol values (?,?,?,?)
'''
c.executemany(sql_str, pre_list)
# e
conn.commit()
conn.close()
pass
t_product 查询所有品种
def query_all_product_codes_of_product():
conn = sqlite3.connect(YOUKUANG_DB_NAME)
c = conn.cursor()
# s
sql_str = '''
select code from t_product
'''
c.execute(sql_str)
res_list = c.fetchall()
# e
conn.commit()
conn.close()
res_list00 = []
for item in res_list:
res_list00.append(item[0])
return res_list00
t_last30_daily 查询某品种最近的日数据
def query_daily_by_pro_in_last30_daily(product_code:str):
conn = sqlite3.connect(YOUKUANG_DB_NAME)
c = conn.cursor()
# s
product_code_str = '\'' + product_code + '\''
sql_str = '''
select product_code,ticker,deliYear,tradeDate,openPrice,highestPrice,lowestPrice,closePrice,settlePrice,turnoverVol,turnoverValue,openInt from t_last30_daily where product_code={product_code}
'''.format(product_code=product_code_str)
c.execute(sql_str)
res_list = c.fetchall()
# e
conn.commit()
conn.close()
return res_list
t_main_symbol 新增品种就插入,已有品种如果主力合约切换就更新
def input_main_symbol(product_code:str,ticker:str,deliYear:int,start_date:str=None):
conn = sqlite3.connect(YOUKUANG_DB_NAME)
c = conn.cursor()
# s
product_code_str = '\''+product_code+'\''
sql_str_query = '''
select * from t_main_symbol where product_code={product_code}
'''.format(product_code=product_code_str)
c.execute(sql_str_query)
res_one = c.fetchone()
if len(res_one)<=0:
# 新增品种插入
pre_one = [product_code,ticker,deliYear,start_date]
one_insert_main_symbol(pre_one)
pass
else:
# 已有品种修改
update_main_symbol(product_code,ticker,deliYear,start_date)
pass
# e
conn.commit()
conn.close()
pass
按钮点击后执行的代码
def excute_step_two_btn_clicked(self):
dir_path = self.choice_daily_dir_lineedit.text()
if len(dir_path) <= 0:
QtWidgets.QMessageBox.information(
self,
'提示',
'请选择日数据所在目录',
QtWidgets.QMessageBox.Yes
)
return
pre_map = {
'dir_path': dir_path,
'cur_deliYear': self.deliYear_spinbox.value()
}
self.start_caculate_thread(self.mark_str_step_two, pre_map)
pass
子线程代码
子线程中任务名为 self.mark_str_step_two 的执行代码,于 running_caculate_thread 方法中对应任务名下
dir_path = data['dir_path']
cur_deliYear = data['cur_deliYear']
file_list = os.listdir(dir_path)
if len(file_list)<=0:
self.thread_out_log('Error Error 没有日数据文件')
pre_map = {
'mark_str':self.mark_str_error,
'data':'Error Error 没有日数据文件'
}
self.signal_excute.emit(pre_map)
return
# 将下载的日数据合并到同一个df中
df = pd.DataFrame()
for item in file_list:
file_path = dir_path + os.path.sep + item
df_one = pd.read_csv(file_path,encoding='utf-8')
if not df_one.columns.isin(self.d_csv_column_list).any():
self.thread_out_log(f"{item},文件不是日数据文件")
continue
df = pd.concat([df,df_one])
pass
if len(df)<=0:
self.thread_out_log('Error Error 没有要更新的日数据')
pre_map = {
'mark_str': self.mark_str_error,
'data': 'Error Error 没有要更新的日数据'
}
self.signal_excute.emit(pre_map)
return
pre_add_tickers = df['ticker'].unique()
ticker_list = sqlite_tool.query_pro_ticker_deliYear_from_symbol_basemsg_by_query(pre_add_tickers,cur_deliYear)
ticker_map = {}
for item in ticker_list:
ticker_map[item[1]] = item
pass
df['o_date'] = pd.to_datetime(df['tradeDate'])
online_symbol_list = []
df_group = df.groupby(by='ticker',as_index=False)
self.thread_out_log('开始逐一处理每个合约:')
for name,group in df_group:
self.thread_out_log(f'开始追加 {name}')
# 将新数据添加到 t_last30_daily
pre_new_df = group.loc[:,self.d_csv_column_list].copy()
pre_new_df['product_code'] = ticker_map[name][0]
pre_new_df['ticker'] = ticker_map[name][1]
pre_new_df['deliYear'] = ticker_map[name][2]
pre_new_list = pre_new_df.loc[:,self.last30_daily_column_list].values.tolist()
sqlite_tool.batch_insert_last30_daily(pre_new_list)
# 将新数据更新到csv
csv_file_name = f"{name}_{ticker_map[name][2]}.csv"
csv_file_path = YOUKUANG_D_DIR + csv_file_name
if not os.path.exists(csv_file_path):
exist_df = pd.DataFrame()
else:
df_one = pd.read_csv(csv_file_path,encoding='utf-8')
df_one['o_date'] = pd.to_datetime(df_one['tradeDate'])
exist_df = df_one.loc[:,self.pre_d_csv_column_list].copy()
pass
new_df = group.loc[:,self.pre_d_csv_column_list].copy()
if len(exist_df)>0:
exist_df.sort_values(by='o_date',ascending=True,inplace=True)
new_df = new_df.loc[new_df['o_date']>exist_df.iloc[-1]['o_date']].copy()
pass
if len(new_df)>0:
pre_save_df = pd.concat([exist_df,new_df])
pre_save_df.sort_values(by='o_date',ascending=True,inplace=True)
pre_save_df = pre_save_df.loc[:,self.d_csv_column_list].copy()
pre_save_df.to_csv(csv_file_path,encoding='utf-8')
online_symbol_list.append(
[ticker_map[name][0],name,ticker_map[name][2],pre_save_df.iloc[-1]['tradeDate']]
)
pass
pass
# 删除 t_online_symbol 表格,加入新数据
self.thread_out_log('t_online_symbol表格更新')
sqlite_tool.delete_all_data_of_online_symbol()
sqlite_tool.batch_insert_online_symbol(online_symbol_list)
self.thread_out_log('------------- 开始计算主力合约:')
product_code_list = sqlite_tool.query_all_product_codes_of_product()
for pro_code in product_code_list:
self.thread_out_log(f'产品:{pro_code}')
csv_main_file_path = YOUKUANG_MAIN_DIR + pro_code + '.csv'
if not os.path.exists(csv_main_file_path):
# 新增品种
cur_main_ticker = None
cur_main_deliYear = None
last_one_tradeDate = None
pass
else:
df_one = pd.read_csv(csv_main_file_path,encoding='utf-8')
df_one['o_date'] = pd.to_datetime(df_one['tradeDate'])
df_one.sort_values(by='o_date',ascending=True,inplace=True)
cur_main_ticker = df_one.iloc[-1]['ticker']
cur_main_deliYear = df_one.iloc[-1]['deliYear']
last_one_tradeDate = df_one.iloc[-1]['tradeDate']
pass
# 从 t_last30_daily 中计算追加这几天的主力合约
daily_list = sqlite_tool.query_daily_by_pro_in_last30_daily(pro_code)
df_two = pd.DataFrame(columns=self.pre_main_csv_column_list,data=daily_list)
df_two['o_date'] = pd.to_datetime(df_two['tradeDate'])
if last_one_tradeDate:
df_two = df_two.loc[df_two['o_date']>last_one_tradeDate].copy()
df_two.dropna(inplace=True)
if len(df_two) <= 0:
self.thread_out_log(f'{pro_code},没有可追加的主力数据')
continue
df_two['row_i'] = [i for i in range(len(df_two))]
df_two_group = df_two.groupby(by='o_date',as_index=False)
df_main_new = pd.DataFrame()
next_change_yeah = False
pre_next_main_ticker = None
pre_next_main_deliYear = None
for two_name,group in df_two_group:
if len(group)<=1:
# 当天该品种只有一个合约,那当天主力合约就为该合约
df_main_new = pd.concat([df_main_new,group.iloc[[0]]])
sqlite_tool.input_main_symbol(pro_code, group.iloc[0]['ticker'], group.iloc[0]['deliYear'],
group.iloc[0]['tradeDate'])
cur_main_ticker = group.iloc[0]['ticker']
cur_main_deliYear = group.iloc[0]['deliYear']
pass
else:
# 计算每日最大成交量和最大持仓量
df_vol = group.sort_values(by='turnoverVol', ascending=False)
df_inte = group.sort_values(by='openInt', ascending=False)
if df_vol.iloc[0]['row_i'] == df_inte.iloc[0]['row_i']:
# 当日成交量最大和持仓量最大 为同一个合约
if next_change_yeah:
# 前一个交易日满足 【主力合约要被切换,下一个交易日切换为新的合约】条件,
# 检查今日新的合约是否依然是成交量和持仓量最大,如果不是,不切换新合约
if pre_next_main_ticker == df_vol.iloc[0]['ticker'] and pre_next_main_deliYear == \
df_vol.iloc[0]['deliYear']:
# 切换新合约
df_main_new = pd.concat([df_main_new, df_vol.iloc[[0]]])
sqlite_tool.input_main_symbol(pro_code, df_vol.iloc[0]['ticker'], df_vol.iloc[0]['deliYear'],
df_vol.iloc[0]['tradeDate'])
cur_main_ticker = pre_next_main_ticker
cur_main_deliYear = pre_next_main_deliYear
next_change_yeah = False
pass
else:
# 撤销昨日的 【主力合约要被切换,下一个交易日切换为新的合约】
next_change_yeah = False
# ----------- 【当日成交量最大和持仓量最大 为同一个合约】 延续当前合约 start
pre_next_main_ticker = df_vol.iloc[0]['ticker']
pre_next_main_deliYear = df_vol.iloc[0]['deliYear']
if pre_next_main_ticker == cur_main_ticker and pre_next_main_deliYear == cur_main_deliYear:
# 主力合约没有切换,延续使用
df_main_new = pd.concat([df_main_new, df_vol.iloc[[0]]])
pass
else:
# 主力合约要被切换,下一个交易日切换为新的合约
next_change_yeah = True
cur_df = group.loc[(group['ticker'] == cur_main_ticker) & (
group['deliYear'] == cur_main_deliYear)].copy()
df_main_new = pd.concat([df_main_new, cur_df])
pass
# ----------- 【当日成交量最大和持仓量最大 为同一个合约】 延续当前合约 end
pass
pass
else:
# ----------- 【当日成交量最大和持仓量最大 为同一个合约】 延续当前合约 start
pre_next_main_ticker = df_vol.iloc[0]['ticker']
pre_next_main_deliYear = df_vol.iloc[0]['deliYear']
if pre_next_main_ticker == cur_main_ticker and pre_next_main_deliYear == cur_main_deliYear:
# 主力合约没有切换,延续使用
df_main_new = pd.concat([df_main_new, df_vol.iloc[[0]]])
pass
else:
# 主力合约要被切换,下一个交易日切换为新的合约
next_change_yeah = True
cur_df = group.loc[
(group['ticker'] == cur_main_ticker) & (
group['deliYear'] == cur_main_deliYear)].copy()
df_main_new = pd.concat([df_main_new, cur_df])
pass
# ----------- 【当日成交量最大和持仓量最大 为同一个合约】 延续当前合约 end
pass
else:
# 当日成交量最大 和 持仓量最大 为不同的合约
# 成交量最大 或 持仓量最大,这两者中有一个的合约是当前主力合约,该主力合约就延续
if df_vol.iloc[0]['ticker'] == cur_main_ticker and df_vol.iloc[0][
'deliYear'] == cur_main_deliYear:
df_main_new = pd.concat([df_main_new, df_vol.iloc[[0]]])
elif df_inte.iloc[0]['ticker'] == cur_main_ticker and df_inte.iloc[0][
'deliYear'] == cur_main_deliYear:
df_main_new = pd.concat([df_main_new, df_inte.iloc[[0]]])
else:
# 当日成交量最大 和 持仓量最大 都不是当前主力合约,将主力合约切换为成交量最大的合约
df_main_new = pd.concat([df_main_new, df_vol.iloc[[0]]])
sqlite_tool.input_main_symbol(pro_code, df_vol.iloc[0]['ticker'], df_vol.iloc[0]['deliYear'],
df_vol.iloc[0]['tradeDate'])
cur_main_ticker = df_vol.iloc[0]['ticker']
cur_main_deliYear = df_vol.iloc[0]['deliYear']
pass
pass
pass
pass
if len(df_main_new)<=0:
self.thread_out_log(f'{pro_code},没有新增主力合约数据')
pass
else:
df_main_new = df_main_new.loc[:,self.main_csv_column_list].copy()
if not os.path.exists(csv_main_file_path):
df_main_new.to_csv(csv_main_file_path, encoding='utf-8')
pass
else:
df_one = df_one.loc[:, self.main_csv_column_list].copy()
df_main_new00 = pd.concat([df_one, df_main_new])
df_main_new00 = df_main_new00.loc[:, self.main_csv_column_list].copy()
df_main_new00.to_csv(csv_main_file_path, encoding='utf-8')
pass
pass
pass
pre_map = {
'mark_str': self.mark_str_step_two,
'data':None
}
self.signal_excute.emit(pre_map)
pass