写在前面:
该工具更新的股票日数据来自优矿,为了把股票日数据在本地存储一份,这就面临需要定期更新的需求,为此开发了这个工具。
定期更新的股票日数据特征:
1 旧股票日数据(也就是上次更新的数据,假如说10天前更新的日数据)
2 那本次要更新的就是最近这10产生的股票日数据,这10天产生的股票日数据有两种
一种是10天前已经上市的股票,那这十天的日数据需要追加到旧股票日数据的后面
一种是这10天内新上市的股票,那就创建新的文件存储股票日数据
业务过程技术实现逻辑:
1 旧股票数据做什么处理?
==》计算出每个股票最后的日期,对于最后的日期距离现在很远的,说明是退市股票,不予理睬,对日期进行分组,取得股票最多的那个日期
2 上次数据更新时点到当前的时间区间新上市的股票如何计算?
==》从优矿中下载当前上市的所有股票代码,与旧股票数据进行比对,筛出在就股票中没有的股票代码即为新上市的股票
演示工具:
“选择【结果放置目录】” ==》由于处理过程中会产生中间文件,所以需要设置一个目录用来存放程序生成的文件
“选择【未更新旧日数据目录】”==》就是上次更新的日数据存储的目录
“上传【当前最新股票列表文件】”==》从优矿中下载当前上市的股票列表,上传给工具,后面做比对筛出新上市的股票
“生成【未更新数据最后日期列表】”==》遍历旧日数据,取最后一条的日期,用以后面知道哪些股票要从哪个时间节点开始下载日数据
“筛出【要更新的股票列表】”==》两种,一种是非新上市的股票代码列表,一种是新上市的股票代码列表
“将合并股票日数据所在目录”==》从优矿中下载的未复权日数据所存储的目录
代码:
股票数据有五千多个,定期更新业务中多个步骤需要遍历,遍历的语法耗时较长,所以开发时用了多线程,耗时的业务都子线程中运行,避免界面卡死。
import datetime,os,shutil,sys
from threading import Thread
import pandas as pd
from PyQt5 import QtCore,QtWidgets
from typing import Any,Dict,List
'''
股票日数据更新工具
'''
class DailyUpdateWidget(QtWidgets.QWidget):
signal_excute = QtCore.pyqtSignal(object)
def __init__(self):
super().__init__()
self.thread_caculate: Thread = None
self.init_data()
self.init_ui()
self.register_event()
self.progress_init()
pass
def init_data(self):
self.last_date_filename: str = 'last_date.xlsx'
self.daily_pd_column_list: List = ["tradeDate", "preClosePrice", "actPreClosePrice", "openPrice",
"highestPrice", "lowestPrice", "closePrice", "turnoverVol", "turnoverValue",
"dealAmount", "turnoverRate", "accumAdjFactor", "negMarketValue",
"marketValue", "chgPct", "PE", "PE1", "PB", "isOpen", "vwap"]
pass
def init_ui(self):
self.setWindowTitle('股票日数据更新工具')
self.setMinimumHeight(600)
self.setMinimumWidth(600)
self.caculate_progress = QtWidgets.QProgressBar()
self.caculate_status_label = QtWidgets.QLabel()
layout_progress = QtWidgets.QHBoxLayout()
layout_progress.addWidget(self.caculate_progress)
layout_progress.addWidget(self.caculate_status_label)
clear_btn = QtWidgets.QPushButton('清空重选')
clear_btn.clicked.connect(self.clear_btn_clicked)
self.excute_btn = QtWidgets.QPushButton('更新日数据')
self.excute_btn.clicked.connect(self.excute_btn_clicked)
layout_one = QtWidgets.QHBoxLayout()
layout_one.addWidget(clear_btn)
layout_one.addWidget(self.excute_btn)
layout_one.addStretch(1)
results_put_dir_btn = QtWidgets.QPushButton('选择【结果放置目录】')
results_put_dir_btn.clicked.connect(self.results_put_dir_btn_clicked)
self.results_put_dir_lineedit = QtWidgets.QLineEdit()
# self.results_put_dir_lineedit.setReadOnly(True)
old_daily_dir_btn = QtWidgets.QPushButton('选择【未更新旧日数据目录】')
old_daily_dir_btn.clicked.connect(self.old_daily_dir_btn_clicked)
self.old_daily_dir_lineedit = QtWidgets.QLineEdit()
# self.old_daily_dir_lineedit.setReadOnly(True)
generater_old_daily_last_date_btn = QtWidgets.QPushButton('生成【未更新数据最后日期列表】')
generater_old_daily_last_date_btn.clicked.connect(self.generater_old_daily_last_date_btn_clicked)
self.generater_old_daily_last_date_lineedit = QtWidgets.QLineEdit()
# self.generater_old_daily_last_date_lineedit.setReadOnly(True)
now_stock_list_btn = QtWidgets.QPushButton('上传【当前最新股票列表文件】')
now_stock_list_btn.clicked.connect(self.now_stock_list_btn_clicked)
self.now_stock_list_lineedit = QtWidgets.QLineEdit()
# self.now_stock_list_lineedit.setReadOnly(True)
filter_update_stock_btn = QtWidgets.QPushButton('筛出【要更新的股票列表】')
filter_update_stock_btn.clicked.connect(self.filter_update_stock_btn_clicked)
self.filter_update_stock_lineedit = QtWidgets.QLineEdit()
# self.filter_update_stock_lineedit.setReadOnly(True)
new_stock_daily_contact_btn = QtWidgets.QPushButton('将合并股票日数据所在目录')
new_stock_daily_contact_btn.clicked.connect(self.new_stock_daily_contact_btn_clicked)
self.new_stock_daily_dir_lineedit = QtWidgets.QLineEdit()
# self.new_stock_daily_dir_lineedit.setReadOnly(True)
layout_two = QtWidgets.QFormLayout()
layout_two.addRow(results_put_dir_btn,self.results_put_dir_lineedit)
layout_two.addRow(old_daily_dir_btn,self.old_daily_dir_lineedit)
layout_two.addRow(now_stock_list_btn,self.now_stock_list_lineedit)
layout_two.addRow(generater_old_daily_last_date_btn,self.generater_old_daily_last_date_lineedit)
layout_two.addRow(filter_update_stock_btn,self.filter_update_stock_lineedit)
layout_two.addRow(new_stock_daily_contact_btn,self.new_stock_daily_dir_lineedit)
tip_label = QtWidgets.QLabel('操作日志:')
self.log_textedit = QtWidgets.QTextEdit()
layout = QtWidgets.QVBoxLayout()
layout.addLayout(layout_progress)
layout.addLayout(layout_one)
layout.addLayout(layout_two)
layout.addWidget(tip_label)
layout.addWidget(self.log_textedit)
self.setLayout(layout)
pass
def register_event(self):
self.signal_excute.connect(self.process_excute_event)
pass
def process_excute_event(self,data:Dict[str,Any]):
mark_str = data['mark_str']
status = data['status']
if status == 'error':
self.thread_caculate = None
self.progress_finished()
QtWidgets.QMessageBox.information(
self,
'提示',
data['data'],
QtWidgets.QMessageBox.Yes
)
return
if mark_str == 'old_last_date':
if status == 'waiting':
self.write_log(data['data'])
else:
res_data = data['data']
self.generater_old_daily_last_date_lineedit.setText(res_data)
self.thread_caculate = None
self.progress_finished()
self.write_log('生成旧股票日数据最后日期列表完毕')
QtWidgets.QMessageBox.information(
self,
'提示',
'生成旧股票日数据最后日期列表完毕',
QtWidgets.QMessageBox.Yes
)
pass
elif mark_str == 'filter_stockcode':
res_data = data['data']
self.filter_update_stock_lineedit.setText(res_data)
self.thread_caculate = None
self.progress_finished()
self.write_log('筛选待更新股票代码列表完毕')
QtWidgets.QMessageBox.information(
self,
'提示',
'筛选待更新股票代码列表完毕',
QtWidgets.QMessageBox.Yes
)
pass
elif mark_str == 'update_daily_data':
if status == 'waiting':
self.write_log(data['data'])
else:
self.thread_caculate = None
self.progress_finished()
self.write_log('股票日数据更新完毕')
QtWidgets.QMessageBox.information(
self,
'提示',
'股票日数据更新完毕',
QtWidgets.QMessageBox.Yes
)
self.excute_btn.setDisabled(False)
pass
pass
def clear_btn_clicked(self):
self.old_daily_dir_lineedit.setText('')
self.generater_old_daily_last_date_lineedit.setText('')
self.now_stock_list_lineedit.setText('')
self.filter_update_stock_lineedit.setText('')
self.new_stock_daily_dir_lineedit.setText('')
self.log_textedit.clear()
pass
def excute_btn_clicked(self):
new_data_dir = self.new_stock_daily_dir_lineedit.text()
if not new_data_dir:
QtWidgets.QMessageBox.information(
self,
'提示',
'请选择要用于追加的增量日数据所在文件夹',
QtWidgets.QMessageBox.Yes
)
return
results_dir = self.check_results_dir_excuted()
if not results_dir:
return
old_dir = self.old_daily_dir_lineedit.text()
if not old_dir:
QtWidgets.QMessageBox.information(
self,
'提示',
'请选择旧股票数据所在目录',
QtWidgets.QMessageBox.Yes
)
return
self.write_log('开始更新股票日数据。。。')
self.excute_btn.setDisabled(True)
mark_str = 'update_daily_data'
pre_map = {
'old_dir': old_dir,
'results_dir': results_dir,
'new_data_dir':new_data_dir
}
self.start_caculate_thread(mark_str, pre_map)
pass
def results_put_dir_btn_clicked(self):
path = QtWidgets.QFileDialog.getExistingDirectory(
self,
'打开处理结果要放置的文件夹',
'.'
)
if not path:
return
self.results_put_dir_lineedit.setText(path)
pass
def old_daily_dir_btn_clicked(self):
path = QtWidgets.QFileDialog.getExistingDirectory(
self,
'打开当前股票日数据所在文件夹',
'.'
)
if not path:
return
self.old_daily_dir_lineedit.setText(path)
pass
def check_results_dir_excuted(self):
results_dir = self.results_put_dir_lineedit.text()
results_dir = results_dir.strip()
if not results_dir:
QtWidgets.QMessageBox.information(
self,
'提示',
'请先选择结果要放置的目录',
QtWidgets.QMessageBox.Yes
)
return None
return results_dir
def generater_old_daily_last_date_btn_clicked(self):
results_dir = self.check_results_dir_excuted()
if not results_dir:
return
old_dir = self.old_daily_dir_lineedit.text()
old_dir = old_dir.strip()
if not old_dir:
QtWidgets.QMessageBox.information(
self,
'提示',
'请选择旧股票日数据所在的目录',
QtWidgets.QMessageBox.Yes
)
return
self.write_log('开始生成旧数据最后日期列表。。。')
mark_str = 'old_last_date'
pre_map = {
'old_dir':old_dir,
'results_dir':results_dir
}
self.start_caculate_thread(mark_str,pre_map)
pass
def now_stock_list_btn_clicked(self):
path,_ = QtWidgets.QFileDialog.getOpenFileName(
self,
'打开最新股票列表文件',
'.',
'CSV(*.csv)'
)
if not path:
return
self.now_stock_list_lineedit.setText(path)
pass
def filter_update_stock_btn_clicked(self):
results_dir = self.check_results_dir_excuted()
if not results_dir:
return
last_date_file_path = self.generater_old_daily_last_date_lineedit.text()
now_stock_list_file_path = self.now_stock_list_lineedit.text()
if not last_date_file_path or not now_stock_list_file_path:
QtWidgets.QMessageBox.information(
self,
'提示',
'旧股票日数据最后日期列表和最新股票列表必须都存在',
QtWidgets.QMessageBox.Yes
)
return
self.write_log('开始筛出需要更新的股票代码。。。')
mark_str = 'filter_stockcode'
pre_map = {
'results_dir':results_dir,
'last_date_file_path': last_date_file_path,
'now_stock_list_file_path': now_stock_list_file_path
}
self.start_caculate_thread(mark_str, pre_map)
pass
def new_stock_daily_contact_btn_clicked(self):
path = QtWidgets.QFileDialog.getExistingDirectory(
self,
'选择要用于追加的增量日数据所在文件夹',
'.'
)
if not path:
return
self.new_stock_daily_dir_lineedit.setText(path)
pass
def write_log(self,log_str:str):
now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
exist_str = self.log_textedit.toPlainText()
pre_str = f'{now_str}:{log_str}\n{exist_str}'
self.log_textedit.setPlainText(pre_str)
pass
def start_caculate_thread(self,mark_str:str,data:Dict[str,Any]):
if self.thread_caculate:
QtWidgets.QMessageBox.information(
self,
'提示',
'线程正在执行任务,请稍后。。。',
QtWidgets.QMessageBox.Yes
)
return
self.thread_caculate = Thread(
target=self.running_caculate_thread,
args=(
mark_str,data,
)
)
self.thread_caculate.start()
self.progress_busy()
pass
def running_caculate_thread(self,mark_str:str,data:Dict[str,Any]):
if mark_str == 'old_last_date':
old_dir = data['old_dir']
results_dir = data['results_dir']
if not old_dir:
res_map = {
'mark_str':mark_str,
'status':'error',
'data':'请选择待旧股票日数据文件夹'
}
self.signal_excute.emit(res_map)
return
if not results_dir:
res_map = {
'mark_str': mark_str,
'status': 'error',
'data': '请选择结果放置的文件夹'
}
self.signal_excute.emit(res_map)
return
final_file_path = results_dir + os.path.sep + self.last_date_filename
file_list = os.listdir(old_dir)
res_list = []
# tradeDate
i = 1
for file_item in file_list:
if i%500 == 0:
res_map = {
'mark_str': mark_str,
'status': 'waiting',
'data': f'已处理{i}条'
}
self.signal_excute.emit(res_map)
ticker = file_item.split('.')[0]
file_path = old_dir + os.path.sep + file_item
df = pd.read_csv(file_path,encoding='utf-8')
last_date = df.iloc[-1]['tradeDate']
res_list.append({
'ticker':ticker,
'last_date':last_date
})
i += 1
res_df = pd.DataFrame(res_list)
res_df.to_excel(final_file_path,engine='openpyxl')
res_map = {
'mark_str': mark_str,
'status': 'success',
'data': final_file_path
}
self.signal_excute.emit(res_map)
pass
elif mark_str == 'filter_stockcode':
def pd_000(x):
secID = x['secID']
ticker = secID.split('.')[0]
return ticker
results_dir = data['results_dir']
final_input_dir = results_dir + os.path.sep + 'waiting_update_code' + os.path.sep
if not os.path.exists(final_input_dir):
os.mkdir(final_input_dir)
last_date_file_path = data['last_date_file_path']
now_stock_list_file_path = data['now_stock_list_file_path']
last_df = pd.read_excel(last_date_file_path,engine='openpyxl')
now_df = pd.read_csv(now_stock_list_file_path,encoding='utf-8')
now_df['ticker00'] = now_df.apply(pd_000,axis=1)
exist_ticker_list = last_df['ticker'].values.tolist()
add_df = now_df.loc[~now_df['ticker00'].isin(exist_ticker_list)]
if len(add_df)>0:
add_df.to_excel(final_input_dir+'add_codes.xlsx',engine='openpyxl')
df_group = last_df.groupby(by='last_date')
for name,group in df_group:
name_str = name.replace('-','_').replace('/','_').replace(':','_')
input_file_path = final_input_dir + name_str + '.xlsx'
group.to_excel(input_file_path,engine='openpyxl')
pass
res_map = {
'mark_str': mark_str,
'status': 'success',
'data': final_input_dir
}
self.signal_excute.emit(res_map)
pass
elif mark_str == 'update_daily_data':
old_dir = data['old_dir']
results_dir = data['results_dir']
new_data_dir = data['new_data_dir']
final_input_dir = results_dir + os.path.sep + 'new_daily' + os.path.sep
if not os.path.exists(final_input_dir):
os.mkdir(final_input_dir)
pass
new_file_list = os.listdir(new_data_dir)
new_ticker_list = []
i = 1
for file_item in new_file_list:
file_path = new_data_dir + os.path.sep + file_item
df = pd.read_csv(file_path,encoding='utf-8')
df_group = df.groupby(by='secID')
for name,group in df_group:
if i%100 == 0:
res_map = {
'mark_str': mark_str,
'status': 'waiting',
'data': f"数据已处理{i}条"
}
self.signal_excute.emit(res_map)
ticker00 = name.split('.')[0]
new_ticker_list.append(ticker00)
old_file_path = old_dir + os.path.sep + ticker00 + '.csv'
if not os.path.exists(old_file_path):
# 新上市的股票
new_final_path = final_input_dir + ticker00 + '.csv'
node_df = group.loc[:,self.daily_pd_column_list].copy()
node_df.to_csv(new_final_path,encoding='utf-8')
pass
else:
# 做增量更新的股票
# 1 将旧股票文件copy到新目录下
shutil.copy(old_file_path,final_input_dir)
# 2 读取旧股票日数据,并将新数据追加到其后面
old_final_path = final_input_dir + ticker00 + '.csv'
old_df = pd.read_csv(old_final_path,encoding='utf-8')
old_df = old_df.loc[:,self.daily_pd_column_list].copy()
node_df = group.loc[:,self.daily_pd_column_list].copy()
two_df = pd.concat([old_df,node_df])
two_df.to_csv(old_final_path,encoding='utf-8')
pass
i += 1
pass
# 把这次没有更新的股票日数据文件复制到新目录下
res_map = {
'mark_str': mark_str,
'status': 'waiting',
'data': f"开始迁移无需更新的日数据文件"
}
self.signal_excute.emit(res_map)
old_file_list = os.listdir(old_dir)
for file_item in old_file_list:
ticker00 = file_item.split('.')[0]
if ticker00 in new_ticker_list:
continue
old_file_path = old_dir + os.path.sep + ticker00 + '.csv'
shutil.copy(old_file_path, final_input_dir)
res_map = {
'mark_str': mark_str,
'status': 'success',
'data': '日数据更新完毕'
}
self.signal_excute.emit(res_map)
pass
pass
def progress_init(self) -> None:
self.caculate_progress.setValue(0)
self.caculate_status_label.setText('无任务')
def progress_busy(self) -> None:
self.caculate_progress.setRange(0, 0)
self.caculate_status_label.setText('正在执行')
def progress_finished(self) -> None:
self.caculate_progress.setRange(0, 100)
self.caculate_progress.setValue(100)
self.caculate_status_label.setText('执行完毕')
pass
if __name__ == '__main__':
QtCore.QCoreApplication.setAttribute(QtCore.Qt.HighDpiScaleFactorRoundingPolicy.PassThrough)
app = QtWidgets.QApplication(sys.argv)
main_window = DailyUpdateWidget()
main_window.show()
app.exec()
pass
股票日数据下载:
旧日数据包(更新至2023-07-17)。后续只分享增量部分,增量的数据自行通过工具同步即可,大概率每周末更新。链接挂在这里。
旧日数据包(更新至2023-07-17)
链接:https://pan.baidu.com/s/1Ez5xA8bI4prlpEiAay019g
提取码:cg30