试验环境
windows10
Anaconda+PyCharm(小白参考文章:https://coderx.com.cn/?p=14)
数据库:
VM中安装MySQL5.7(设置utf8及相应配置优化)
复权
小白参考文章:https://zhuanlan.zhihu.com/p/469820288
数据来源
AKShare官方文档:https://www.akshare.xyz/index.html
AKShare 安装
pip install akshare --upgrade -i https://pypi.org/simple
接口介绍
一、东财实时行情数据
描述:东方财富网-沪深京 A 股-实时行情数据;
接口:stock_zh_a_spot_em;
目标地址:http://quote.eastmoney.com/center/gridlist.html#hs_a_board
限量:单次返回所有沪深京 A 股上市公司的实时行情数据;
一、东财历史行情数据
描述: 东方财富-沪深京 A 股日频率数据; 历史数据按日频率更新, 当日收盘价请在收盘后获取
接口: stock_zh_a_hist;
目标地址:http://quote.eastmoney.com/concept/sh600160.html?from=classic(示例);
限量:单次返回指定沪深京 A 股上市公司、指定周期和指定日期间的历史行情日频率数据;
测试
import akshare as ak
import pandas as pd
import os
### 设置工作路径
mypath=r"E:\PycharmProjects\quantProject"
stock_zh_spot_df = ak.stock_zh_a_spot_em() ## 获取实时数据
stock_zh_spot_data=stock_zh_spot_df[stock_zh_spot_df['名称']!=''] ## 去除名称为空值的数据
codes_names=stock_zh_spot_data[['代码','名称']]
codes_names.to_csv(os.path.join(mypath+'\\'+'code.csv'),encoding='utf_8_sig',index=False) ## 数据导出为csv文件
length=len(codes_names)
all_data = pd.DataFrame([])
for i in range(length):
try:
data_df = ak.stock_zh_a_hist(symbol=codes_names['代码'][i], period="daily", start_date="20230224", adjust="qfq") ## 日度数据,前复权
data_df['stock_id']=codes_names['代码'][i]
all_data=all_data.append(data_df)
except:
KeyError()
all_data.to_csv(os.path.join(mypath + '\\'+ 'All_Data.csv'), encoding='utf_8_sig') ## 数据导出为csv文件
# all_data.to_csv(os.path.join(mypath+'\\'+'All_Data.txt'),sep="\t",index=True) ## 数据导出为txt文件
获取股票列表写入数据库
import akshare as ak
import sqlalchemy
import pandas as pd
def create_mysql_engine():
"""
创建数据库引擎对象
:return: 新创建的数据库引擎对象
"""
# 引擎参数信息
host = '192.168.9.110'
user = 'root'
passwd = 'A_quant88'
port = '3306'
db = 'quant'
# 创建数据库引擎对象
mysql_engine = sqlalchemy.create_engine(
'mysql+pymysql://{0}:{1}@{2}:{3}'.format(user, passwd, host, port),
poolclass=sqlalchemy.pool.NullPool
)
# 如果不存在数据库db_quant则创建
mysql_engine.execute("CREATE DATABASE IF NOT EXISTS {0} ".format(db))
# 创建连接数据库db_quant的引擎对象
db_engine = sqlalchemy.create_engine(
'mysql+pymysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(user, passwd, host, port, db),
poolclass=sqlalchemy.pool.NullPool
)
# 返回引擎对象
return db_engine
def get_stock_codes(update=False):
"""
获取指定日期的A股代码列表
若参数update为False,表示从数据库中读取股票列表
若数据库中不存在股票列表的表,或者update为True,则下载指定日期date的交易股票列表
:param update: 是否更新股票列表,默认为False
:return: A股代码的列表
"""
# 创建数据库引擎对象
engine = create_mysql_engine()
# 数据库中股票代码的表名
table_name = 'stock_codes'
# 数据库中不存在股票代码表,或者需要更新股票代码表
if table_name not in sqlalchemy.inspect(engine).get_table_names() or update:
# 查询股票数据
stock_zh_spot_df = ak.stock_zh_a_spot_em() ## 获取实时数据
stock_zh_spot_data = stock_zh_spot_df[stock_zh_spot_df['名称'] != ''] ## 去除名称为空值的数据
codes_names = stock_zh_spot_data[['代码', '名称']]
# 将股票代码写入数据库
codes_names.to_sql(name=table_name, con=engine, if_exists='replace', index=False, index_label=False)
# 返回股票列表
return codes_names['代码'].tolist()
# 从数据库中读取股票代码列表
else:
# 待执行的sql语句
sql_cmd = 'SELECT {} FROM {}'.format('代码', table_name)
# 读取sql,返回股票列表
return pd.read_sql(sql=sql_cmd, con=engine)['代码'].tolist()
if __name__ == '__main__':
stock_codes = get_stock_codes()
# print(stock_codes)