前几天有写过两篇:
【爬虫】爬取A股数据写入数据库(二)
【爬虫】爬取A股数据写入数据库(一)
现在继续完善,分析及爬取股票的历史K线数据通过ORM形式批量写入数据库。
2024/05,本文主要内容如下:
- 对东方财富官网进行分析,并作数据爬取,使用python,使用pip install requests 模拟http数据请求,获取数据。
- 将爬取的数据写入通过 sqlalchemy ORM 写入 sqlite数据库。
- 记录爬取股票的基本信息,如果库中已存在某个股票代码,则进行更新。
- 后续计划:会不断完善,最终目标是做出一个简单的股票查看客户端。
- 本系列所有源码均无偿分享,仅作交流无其他,供大家参考。
python依赖环境如下:
pip install requests==2.31.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install pandas==2.2.2 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install jsonpath==0.8.2 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install sqlalchemy==2.0.30 -i https://pypi.tuna.tsinghua.edu.cn/simple
1. 对东方财富官网历史K线数据分析
网页地址:https://quote.eastmoney.com/sz002224.html?jump_to_web=true#fullScreenChart
通过分析网页,发现https://push2his.eastmoney.com/api/qt/stock/kline/get?请求后面带着一些参数即可以获取到相应数据,我们不断调试,模拟这类请求即可。分析过程如下图所示,F12调出调试框,不断尝试:
2. 爬取数据代码逻辑
如下即爬取数据的可运行代码,复制后直接能跑:
import pandas as pd
from typing import List
import requests
from jsonpath import jsonpath
class CustomedSession(requests.Session):
def request(self, *args, **kwargs):
kwargs.setdefault('timeout', 60)
return super(CustomedSession, self).request(*args, **kwargs)
session = CustomedSession()
adapter = requests.adapters.HTTPAdapter(pool_connections = 50, pool_maxsize = 50, max_retries = 5)
session.mount('http://', adapter)
session.mount('https://', adapter)
# 请求地址
QEURY_URL = 'http://push2his.eastmoney.com/api/qt/stock/kline/get'
# HTTP 请求头
EASTMONEY_REQUEST_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.3; WOW64; Trident/7.0; Touch; rv:11.0) like Gecko',
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
}
"""
获取单只股票的历史K线数据
"""
def get_k_history_data(
stock_codes: str, # 股票代码
beg: str = '19000101', # 开始日期,19000101,表示 1900年1月1日
end: str = '20500101', # 结束日期
klt: int = 101, # 行情之间的时间间隔 1、5、15、30、60分钟; 101:日; 102:周; 103:月
fqt: int = 1, # 复权方式,0 不复权 1 前复权 2 后复权
):
try:
# 生成东方财富专用的secid
if stock_codes[:3] == '000': # 沪市指数
secid = f'1.{stock_codes}'
elif stock_codes[:3] == '399': # 深证指数
secid = f'0.{stock_codes}'
if stock_codes[0] != '6': # 沪市股票
secid = f'0.{stock_codes}'
else:
secid = f'1.{stock_codes}' # 深市股票
EASTMONEY_KLINE_FIELDS = {'f51': '日期', 'f52': '开盘', 'f53': '收盘', 'f54': '最高', 'f55': '最低',
'f56': '成交量', 'f57': '成交额', 'f58': '振幅', 'f59': '涨跌幅', 'f60': '涨跌额', 'f61': '换手率',}
fields = list(EASTMONEY_KLINE_FIELDS.keys())
# columns = list(EASTMONEY_KLINE_FIELDS.values())
fields2 = ",".join(fields)
params = (
('fields1', 'f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13'),
('fields2', fields2),
('beg', beg),
('end', end),
('rtntype', '6'),
('secid', secid),
('klt', f'{klt}'),
('fqt', f'{fqt}'),
)
code = secid.split('.')[-1]
json_response = session.get(QEURY_URL, headers=EASTMONEY_REQUEST_HEADERS, params=params, verify=False).json()
data_list = []
klines: List[str] = jsonpath(json_response, '$..klines[:]')
if not klines:
return data_list
name = json_response['data']['name']
rows = [kline.split(',') for kline in klines]
# 0 1 2 3 4 5 6 7 8 9 10
# 日期, 开盘, 收盘, 最高, 最低, 成交量, 成交额, 振幅, 涨跌幅, 涨跌额, 换手率
# 2024-05-08, 4.89, 4.82, 4.91, 4.80, 61811, 29955564.00, 2.25, -1.23, -0.06, 0.98
# data_list = [{'code': '002224', 'name': '三力士', 'time': '2024-05-08', 'info': '0,1,2,3,4,5,6,7,8,9,10'}]
for row in rows:
time, open, close, high, low, vol, quota, mm, change, range, tun = row
line_str = f'{open},{close},{high},{low},{vol},{quota},{mm},{change},{range},{tun}'
data_list.append({'id': None,'code': code, 'name': name, 'time': time, 'info': line_str})
return data_list
except Exception as e:
print('get_k_history_data error-----------------------', str(e))
return data_list
if __name__ == "__main__":
data = get_k_history_data(stock_codes='002224', beg='20240507', end='20500101')
print('----', data)
3. 将爬取的数据通过ORM形式写入数据库
数据库表设计:
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Float, Index, Table
from sqlalchemy.orm import declarative_base, sessionmaker, scoped_session
from sqlalchemy.schema import UniqueConstraint
from datetime import datetime
# 声明一个基类,所有的ORM类都将继承自这个基类
DBBase = declarative_base()
# 创建引擎
engine = create_engine('sqlite:///a.db', echo=False)
# 绑定引擎
Session = sessionmaker(bind=engine)
# 创建数据库链接池,直接使用session即可为当前线程拿出一个链接对象conn
db_session = scoped_session(Session)
'''
股票K线信息表
0 1 2 3 4 5 6 7 8 9 10
日期, 开盘, 收盘, 最高, 最低, 成交量, 成交额, 振幅, 涨跌幅, 涨跌额, 换手率
2024-05-08, 4.89, 4.82, 4.91, 4.80, 61811, 29955564.00, 2.25, -1.23, -0.06, 0.98
data_list = [{'code': '002224', 'name': '三力士', 'time': '2024-05-08', 'info': '1,2,3,4,5,6,7,8,9,10'}]
'''
class tb_k(DBBase):
__tablename__ = 'tb_k'
id = Column(Integer, primary_key=True, autoincrement=True)
code = Column(String, nullable=False, comment="股票代码")
name = Column(String, comment="股票名称")
time = Column(String, comment="时间")
info = Column(String, comment="开盘,收盘,最高,最低,成交量,成交额,振幅,涨跌幅,涨跌额,换手率")
__table_args__ = (
Index('unique_index', 'code', 'time', unique=True),
)
# 创建表, 创建所有class xx(DBBase)
DBBase.metadata.create_all(engine)
写入数据库的逻辑:
# 查询某个股票最近更新K线的日期
def query_latast_K_data(code):
result = db_session.query(tb_k).filter(tb_k.code==code).order_by(desc(tb_k.time)).first()
if result is None:
return '19000101'
return str(result.time).replace('-','')
# 批量插入或更新某只股票的历史K线数据
def insert_or_update_stock_k(data_list):
if len(data_list) <= 0:
return
try:
db_session.bulk_insert_mappings(tb_k, data_list)
db_session.commit()
except Exception as e:
print('insert_or_update_stock_k error=', str(e))
4. 整体逻辑流程
步骤:
- 输入某个股票代码爬取该股票的历史K线数据
- 将返回结果组成数组,批量写入数据库
- 每次写入前,会根据该股票代码,查询最新的同步日期,从该日期开始进行追加同步
# 更新某个股票的最新日K线数据到数据库
def update_k_info_db(code='002224'):
# 根据 code 查询库中已存在的某个股票日K线数据的最近日期,作为开始日期,向后获取
beg_time = db_orm.query_latast_K_data(code)
data_list = stock.get_k_history_data(stock_codes=code, beg=beg_time, end='20500101')
if len(data_list) > 0:
db_orm.insert_or_update_stock_k(data_list)
if __name__ == "__main__":
update_base_info_db()
最终结果保存在 a.db中,例如:
更多内容可关注我,后续源码包均在上面回复下载:
【爬虫】爬取A股数据系列工具