试验环境
windows10
Anaconda+PyCharm(小白参考文章:https://coderx.com.cn/?p=14)
VM中安装MySQL5.7(设置utf8及相应配置优化)
关于复权
小白参考文章:https://zhuanlan.zhihu.com/p/469820288
数据来源
AKShare官方文档:https://www.akshare.xyz/index.html
接口介绍
一、东财实时行情数据
描述:东方财富网-沪深京 A 股-实时行情数据;
接口:stock_zh_a_spot_em;
目标地址:http://quote.eastmoney.com/center/gridlist.html#hs_a_board
限量:单次返回所有沪深京 A 股上市公司的实时行情数据;描述: 东方财富-沪深京 A 股日频率数据; 历史数据按日频率更新, 当日收盘价请在收盘后获取
接口: stock_zh_a_hist;
目标地址:http://quote.eastmoney.com/concept/sh603777.html?from=classic(示例);
限量:单次返回指定沪深京 A 股上市公司、指定周期和指定日期间的历史行情日频率数据;
测试
# -*- coding: utf-8 -*-
# 按 Shift+F10 执行或将其替换为您的代码。
# 按 双击 Shift 在所有地方搜索类、文件、工具窗口、操作和设置。
### 导包
import akshare as ak
import pandas as pd
import os
### 设置工作路径
mypath=r"E:\PycharmProjects\pythonProject"
stock_zh_spot_df = ak.stock_zh_a_spot_em() ## 获取实时数据
stock_zh_spot_data=stock_zh_spot_df[stock_zh_spot_df['名称']!=''] ## 去除名称为空值的数据
codes_names=stock_zh_spot_data[['代码','名称']]
codes_names.to_csv(os.path.join(mypath+'\\'+'111.csv'),encoding='utf_8_sig') ## 数据导出为csv文件
print(codes_names)
length=len(codes_names)
all_data = pd.DataFrame([])
for i in range(length):
try:
data_df = ak.stock_zh_a_hist(symbol=codes_names['代码'][i], period="daily", start_date="20230224", adjust="qfq") ## 日度数据,前复权
data_df['stock_id']=codes_names['代码'][i]
all_data=all_data.append(data_df)
# print(all_data)
except:
KeyError()
all_data.to_csv(os.path.join(mypath + '\\'+ 'All_Data.csv'), encoding='utf_8_sig') ## 数据导出为csv文件
# all_data.to_csv(os.path.join(mypath+'\\'+'All_Data.txt'),sep="\t",index=True) ## 数据导出为txt文件
获取股票列表写入数据库
import akshare as ak
import sqlalchemy
import pandas as pd
def create_mysql_engine():
"""
创建数据库引擎对象
:return: 新创建的数据库引擎对象
"""
# 引擎参数信息
host = '192.168.9.110'
user = 'root'
passwd = 'A_quant88'
port = '3306'
db = 'quant'
# 创建数据库引擎对象
mysql_engine = sqlalchemy.create_engine(
'mysql+pymysql://{0}:{1}@{2}:{3}'.format(user, passwd, host, port),
poolclass=sqlalchemy.pool.NullPool
)
# 如果不存在数据库db_quant则创建
mysql_engine.execute("CREATE DATABASE IF NOT EXISTS {0} ".format(db))
# 创建连接数据库db_quant的引擎对象
db_engine = sqlalchemy.create_engine(
'mysql+pymysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(user, passwd, host, port, db),
poolclass=sqlalchemy.pool.NullPool
)
# 返回引擎对象
return db_engine
def get_stock_codes(date=None, update=False):
"""
获取指定日期的A股代码列表
若参数update为False,表示从数据库中读取股票列表
若数据库中不存在股票列表的表,或者update为True,则下载指定日期date的交易股票列表
若参数date为空,则返回最近1个交易日的A股代码列表
若参数date不为空,且为交易日,则返回date当日的A股代码列表
若参数date不为空,但不为交易日,则打印提示非交易日信息,程序退出
:param date: 日期,默认为None
:param update: 是否更新股票列表,默认为False
:return: A股代码的列表
"""
# 创建数据库引擎对象
engine = create_mysql_engine()
# 数据库中股票代码的表名
table_name = 'stock_codes'
# 数据库中不存在股票代码表,或者需要更新股票代码表
if table_name not in sqlalchemy.inspect(engine).get_table_names() or update:
# 查询股票数据
stock_zh_spot_df = ak.stock_zh_a_spot_em() ## 获取实时数据
stock_zh_spot_data = stock_zh_spot_df[stock_zh_spot_df['名称'] != ''] ## 去除名称为空值的数据
codes_names = stock_zh_spot_data[['代码', '名称']]
print(codes_names)
# 将股票代码写入数据库
codes_names.to_sql(name=table_name, con=engine, if_exists='replace', index=False, index_label=False)
# 返回股票列表
return codes_names['代码'].tolist()
# 从数据库中读取股票代码列表
else:
# 待执行的sql语句
sql_cmd = 'SELECT {} FROM {}'.format('代码', table_name)
# 读取sql,返回股票列表
return pd.read_sql(sql=sql_cmd, con=engine)['代码'].tolist()
if __name__ == '__main__':
stock_codes = get_stock_codes()
# print(stock_codes)
获取股票历史数据
import akshare as ak
import sqlalchemy
import datetime
import pandas as pd
def create_mysql_engine():
"""
创建数据库引擎对象
:return: 新创建的数据库引擎对象
"""
# 引擎参数信息
host = '192.168.9.110'
user = 'root'
passwd = 'A_quant88'
port = '3306'
db = 'quant'
# 创建数据库引擎对象
mysql_engine = sqlalchemy.create_engine(
'mysql+pymysql://{0}:{1}@{2}:{3}'.format(user, passwd, host, port),
poolclass=sqlalchemy.pool.NullPool
)
# 如果不存在数据库db_quant则创建
mysql_engine.execute("CREATE DATABASE IF NOT EXISTS {0} ".format(db))
# 创建连接数据库db_quant的引擎对象
db_engine = sqlalchemy.create_engine(
'mysql+pymysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(user, passwd, host, port, db),
poolclass=sqlalchemy.pool.NullPool
)
# 返回引擎对象
return db_engine
def get_stock_codes(date=None, update=False):
"""
获取指定日期的A股代码列表
若参数update为False,表示从数据库中读取股票列表
若数据库中不存在股票列表的表,或者update为True,则下载指定日期date的交易股票列表
若参数date为空,则返回最近1个交易日的A股代码列表
若参数date不为空,且为交易日,则返回date当日的A股代码列表
若参数date不为空,但不为交易日,则打印提示非交易日信息,程序退出
:param date: 日期,默认为None
:param update: 是否更新股票列表,默认为False
:return: A股代码的列表
"""
# 创建数据库引擎对象
engine = create_mysql_engine()
# 数据库中股票代码的表名
table_name = 'stock_codes'
# 数据库中不存在股票代码表,或者需要更新股票代码表
if table_name not in sqlalchemy.inspect(engine).get_table_names() or update:
# 查询股票数据
stock_zh_spot_df = ak.stock_zh_a_spot_em() ## 获取实时数据
stock_zh_spot_data = stock_zh_spot_df[stock_zh_spot_df['名称'] != ''] ## 去除名称为空值的数据
codes_names = stock_zh_spot_data[['代码', '名称']]
print(codes_names)
# 将股票代码写入数据库
codes_names.to_sql(name=table_name, con=engine, if_exists='replace', index=False, index_label=False)
# 返回股票列表
return codes_names['代码'].tolist()
# 从数据库中读取股票代码列表
else:
# 待执行的sql语句
sql_cmd = 'SELECT {} FROM {}'.format('代码', table_name)
# 读取sql,返回股票列表
return pd.read_sql(sql=sql_cmd, con=engine)['代码'].tolist()
def create_data(stock_codes, period = "daily",start_date = '20230214', end_date = datetime.date.today().strftime('%Y%m%d'),
adj = 'hfq'):
"""
下载指定日期内,指定股票的日线数据
:param stock_codes: 待下载数据的股票代码
:param from_date: 日线开始日期1990-12-19
:param to_date: 日线结束日期
:param adjustflag: 复权选项 1:后复权 2:前复权 3:不复权 默认为前复权
:return: None
"""
# 下载股票循环
for code in stock_codes:
print('正在下载{}...'.format(code))
# 下载日线数据
data_df = ak.stock_zh_a_hist(symbol=code, period=period, start_date=start_date, end_date=end_date,
adjust=adj) ## 日度数据,后复权
print(data_df)
if __name__ == '__main__':
stock_codes = get_stock_codes()
create_data(stock_codes)
# print(stock_codes)
多线程获取股票历史数据
import akshare as ak
import sqlalchemy
import datetime
import multiprocessing
import pandas as pd
def create_mysql_engine():
"""
创建数据库引擎对象
:return: 新创建的数据库引擎对象
"""
# 引擎参数信息
host = '192.168.9.110'
user = 'root'
passwd = 'A_quant88'
port = '3306'
db = 'quant'
# 创建数据库引擎对象
mysql_engine = sqlalchemy.create_engine(
'mysql+pymysql://{0}:{1}@{2}:{3}'.format(user, passwd, host, port),
poolclass=sqlalchemy.pool.NullPool
)
# 如果不存在数据库db_quant则创建
mysql_engine.execute("CREATE DATABASE IF NOT EXISTS {0} ".format(db))
# 创建连接数据库db_quant的引擎对象
db_engine = sqlalchemy.create_engine(
'mysql+pymysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(user, passwd, host, port, db),
poolclass=sqlalchemy.pool.NullPool
)
# 返回引擎对象
return db_engine
def get_stock_codes(date=None, update=False):
"""
获取指定日期的A股代码列表
若参数update为False,表示从数据库中读取股票列表
若数据库中不存在股票列表的表,或者update为True,则下载指定日期date的交易股票列表
若参数date为空,则返回最近1个交易日的A股代码列表
若参数date不为空,且为交易日,则返回date当日的A股代码列表
若参数date不为空,但不为交易日,则打印提示非交易日信息,程序退出
:param date: 日期,默认为None
:param update: 是否更新股票列表,默认为False
:return: A股代码的列表
"""
# 创建数据库引擎对象
engine = create_mysql_engine()
# 数据库中股票代码的表名
table_name = 'stock_codes'
# 数据库中不存在股票代码表,或者需要更新股票代码表
if table_name not in sqlalchemy.inspect(engine).get_table_names() or update:
# 查询股票数据
stock_zh_spot_df = ak.stock_zh_a_spot_em() ## 获取实时数据
stock_zh_spot_data = stock_zh_spot_df[stock_zh_spot_df['名称'] != ''] ## 去除名称为空值的数据
codes_names = stock_zh_spot_data[['代码', '名称']]
# print(codes_names)
# 将股票代码写入数据库
codes_names.to_sql(name=table_name, con=engine, if_exists='replace', index=False, index_label=False)
# 返回股票列表
return codes_names['代码'].tolist()
# 从数据库中读取股票代码列表
else:
# 待执行的sql语句
sql_cmd = 'SELECT {} FROM {}'.format('代码', table_name)
# 读取sql,返回股票列表
return pd.read_sql(sql=sql_cmd, con=engine)['代码'].tolist()
def create_data(stock_codes, period = "daily",start_date = '20230224', end_date = datetime.date.today().strftime('%Y%m%d'),
adj = 'hfq'):
"""
下载指定日期内,指定股票的日线数据
:param stock_codes: 待下载数据的股票代码
:param from_date: 日线开始日期1990-12-19
:param to_date: 日线结束日期
:param adjustflag: 复权选项 1:后复权 2:前复权 3:不复权 默认为前复权
:return: None
"""
# 下载股票循环
for index,code in enumerate(stock_codes):
print('({}/{})正在创建{}...'.format(index + 1, len(stock_codes), code))
try:
data_df = ak.stock_zh_a_hist(symbol=code, period=period, start_date=start_date, end_date=end_date,
adjust=adj) ## 日度数据,后复权
print(data_df)
if data_df.empty:
continue
except Exception as e:
print(e)
# 将数值数据转为float型,便于后续处理
convert_list = ['开盘','收盘','最高','最低','成交量','成交额','振幅','涨跌幅','涨跌额','换手率']
data_df[convert_list] = data_df[convert_list].astype(float)
def get_code_group(process_num, stock_codes):
"""
获取代码分组,用于多进程计算,每个进程处理一组股票
:param process_num: 进程数
:param stock_codes: 待处理的股票代码
:return: 分组后的股票代码列表,列表的每个元素为一组股票代码的列表
"""
# 创建空的分组
code_group = [[] for i in range(process_num)]
# 按余数为每个分组分配股票
for index, code in enumerate(stock_codes):
code_group[index % process_num].append(code)
return code_group
def multiprocessing_func(func, args):
"""
多进程调用函数
:param func: 函数名
:param args: func的参数,类型为元组,第0个元素为进程数,第1个元素为股票代码列表
:return: 包含各子进程返回对象的列表
"""
# 用于保存各子进程返回对象的列表
results = []
# 创建进程池
with multiprocessing.Pool(processes=args[0]) as pool:
# 多进程异步计算
for codes in get_code_group(args[0], args[1]):
results.append(pool.apply_async(func, args=(codes, *args[2:],)))
# 阻止后续任务提交到进程池
pool.close()
# 等待所有进程结束
pool.join()
return results
def create_data_mp(stock_codes, process_num=6,
period = "daily",start_date='20230224', end_date=datetime.date.today().strftime('%Y%m%d'), adj='hfq'):
"""
使用多进程创建指定日期内,指定股票的日线数据,计算扩展因子
:param stock_codes: 待创建数据的股票代码
:param process_num: 进程数
:param from_date: 日线开始日期
:param to_date: 日线结束日期
:param adjustflag: 复权选项 1:后复权 2:前复权 3:不复权 默认为前复权
:return: None
"""
multiprocessing_func(create_data, (process_num, stock_codes, period,start_date, end_date, adj))
if __name__ == '__main__':
stock_codes = get_stock_codes()
# print(stock_codes)
# create_data(stock_codes)
create_data_mp(stock_codes)
# print(stock_codes)
多线程获取股票历史数据录入数据库
import akshare as ak
import sqlalchemy
import datetime
import multiprocessing
import pandas as pd
def create_mysql_engine():
"""
创建数据库引擎对象
:return: 新创建的数据库引擎对象
"""
# 引擎参数信息
host = '192.168.9.110'
user = 'root'
passwd = 'A_quant88'
port = '3306'
db = 'quant'
# 创建数据库引擎对象
mysql_engine = sqlalchemy.create_engine(
'mysql+pymysql://{0}:{1}@{2}:{3}'.format(user, passwd, host, port),
poolclass=sqlalchemy.pool.NullPool
)
# 如果不存在数据库db_quant则创建
mysql_engine.execute("CREATE DATABASE IF NOT EXISTS {0} ".format(db))
# 创建连接数据库db_quant的引擎对象
db_engine = sqlalchemy.create_engine(
'mysql+pymysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(user, passwd, host, port, db),
poolclass=sqlalchemy.pool.NullPool
)
# 返回引擎对象
return db_engine
def get_stock_codes(date=None, update=False):
"""
获取指定日期的A股代码列表
若参数update为False,表示从数据库中读取股票列表
若数据库中不存在股票列表的表,或者update为True,则下载指定日期date的交易股票列表
若参数date为空,则返回最近1个交易日的A股代码列表
若参数date不为空,且为交易日,则返回date当日的A股代码列表
若参数date不为空,但不为交易日,则打印提示非交易日信息,程序退出
:param date: 日期,默认为None
:param update: 是否更新股票列表,默认为False
:return: A股代码的列表
"""
# 创建数据库引擎对象
engine = create_mysql_engine()
# 数据库中股票代码的表名
table_name = 'stock_codes'
# 数据库中不存在股票代码表,或者需要更新股票代码表
if table_name not in sqlalchemy.inspect(engine).get_table_names() or update:
# 查询股票数据
stock_zh_spot_df = ak.stock_zh_a_spot_em() ## 获取实时数据
stock_zh_spot_data = stock_zh_spot_df[stock_zh_spot_df['名称'] != ''] ## 去除名称为空值的数据
codes_names = stock_zh_spot_data[['代码', '名称']]
# print(codes_names)
# 将股票代码写入数据库
codes_names.to_sql(name=table_name, con=engine, if_exists='replace', index=False, index_label=False)
# 返回股票列表
return codes_names['代码'].tolist()
# 从数据库中读取股票代码列表
else:
# 待执行的sql语句
sql_cmd = 'SELECT {} FROM {}'.format('代码', table_name)
# 读取sql,返回股票列表
return pd.read_sql(sql=sql_cmd, con=engine)['代码'].tolist()
def create_data(stock_codes, period = "daily",start_date = '20230223', end_date = datetime.date.today().strftime('%Y%m%d'),
adj = 'hfq'):
"""
下载指定日期内,指定股票的日线数据
:param stock_codes: 待下载数据的股票代码
:param from_date: 日线开始日期1990-12-19
:param to_date: 日线结束日期
:param adjustflag: 复权选项 1:后复权 2:前复权 3:不复权 默认为前复权
:return: None
"""
# 创建数据库引擎对象
engine = create_mysql_engine()
# 下载股票循环
for index,code in enumerate(stock_codes):
print('({}/{})正在创建{}...'.format(index + 1, len(stock_codes), code))
try:
data_df = ak.stock_zh_a_hist(symbol=code, period=period, start_date=start_date, end_date=end_date,
adjust=adj) ## 日度数据,后复权
convert_list = ['开盘', '收盘', '最高', '最低', '成交量', '成交额', '振幅', '涨跌幅', '涨跌额', '换手率']
data_df[convert_list] = data_df[convert_list].astype(float)
# 写入数据库
table_name = '{}'.format(code)
data_df.to_sql(name=table_name, con=engine, if_exists='replace', index=True, index_label='id')
if data_df.empty:
continue
except Exception as e:
print(e)
# 将数值数据转为float型,便于后续处理
def get_code_group(process_num, stock_codes):
"""
获取代码分组,用于多进程计算,每个进程处理一组股票
:param process_num: 进程数
:param stock_codes: 待处理的股票代码
:return: 分组后的股票代码列表,列表的每个元素为一组股票代码的列表
"""
# 创建空的分组
code_group = [[] for i in range(process_num)]
# 按余数为每个分组分配股票
for index, code in enumerate(stock_codes):
code_group[index % process_num].append(code)
return code_group
def multiprocessing_func(func, args):
"""
多进程调用函数
:param func: 函数名
:param args: func的参数,类型为元组,第0个元素为进程数,第1个元素为股票代码列表
:return: 包含各子进程返回对象的列表
"""
# 用于保存各子进程返回对象的列表
results = []
# 创建进程池
with multiprocessing.Pool(processes=args[0]) as pool:
# 多进程异步计算
for codes in get_code_group(args[0], args[1]):
results.append(pool.apply_async(func, args=(codes, *args[2:],)))
# 阻止后续任务提交到进程池
pool.close()
# 等待所有进程结束
pool.join()
return results
def create_data_mp(stock_codes, process_num=6,
period = "daily",start_date='20230223', end_date=datetime.date.today().strftime('%Y%m%d'), adj='hfq'):
"""
使用多进程创建指定日期内,指定股票的日线数据,计算扩展因子
:param stock_codes: 待创建数据的股票代码
:param process_num: 进程数
:param from_date: 日线开始日期
:param to_date: 日线结束日期
:param adjustflag: 复权选项 1:后复权 2:前复权 3:不复权 默认为前复权
:return: None
"""
multiprocessing_func(create_data, (process_num, stock_codes, period,start_date, end_date, adj))
if __name__ == '__main__':
stock_codes = get_stock_codes()
create_data_mp(stock_codes)
# print(stock_codes)