前几天有写过一篇 【爬虫】爬取A股数据写入数据库(一),现在继续完善下,将已有数据通过ORM形式批量写入数据库。
2024/05,本文主要内容如下:
- 对东方财富官网进行分析,并作数据爬取,使用python,使用pip install requests 模拟http数据请求,获取数据。
- 将爬取的数据写入通过 sqlalchemy ORM 写入 sqlite数据库。
- 记录爬取股票的基本信息,如果库中已存在某个股票代码,则进行更新。
- 后续计划:会不断完善,最终目标是做出一个简单的股票查看客户端。
- 本系列所有源码均无偿分享,仅作交流无其他,供大家参考。
python依赖环境如下:
conda create --name xuan_gu python=3.9 --channel https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda remove --name xuan_gu --all
conda activate xuan_gu
#pip install PyQt5==5.15.10 -i https://pypi.tuna.tsinghua.edu.cn/simple
#pip install pyqtgraph==0.13.6 -i https://pypi.tuna.tsinghua.edu.cn/simple
#python -m pyqtgraph.examples 查看图形化的pyqtgraph示例
pip install requests==2.31.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install pandas==2.2.2 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install jsonpath==0.8.2 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install sqlalchemy==2.0.30 -i https://pypi.tuna.tsinghua.edu.cn/simple
1. 对东方财富官网的分析
东方财富网页地址:https://data.eastmoney.com/gdhs/
通过分析网页,发现https://datacenter-web.eastmoney.com/api/data/v1/get?请求后面带着一些参数即可以获取到相应数据,我们不断调试,模拟这类请求即可。分析过程如下图所示,F12调出调试框,不断尝试:
在这里插入图片描述
2. 爬取数据代码逻辑
如下即爬取数据的可运行代码,复制后直接能跑:
import pandas as pd
from typing import List
import requests
class CustomedSession(requests.Session):
def request(self, *args, **kwargs):
kwargs.setdefault('timeout', 60)
return super(CustomedSession, self).request(*args, **kwargs)
session = CustomedSession()
adapter = requests.adapters.HTTPAdapter(pool_connections = 50, pool_maxsize = 50, max_retries = 5)
session.mount('http://', adapter)
session.mount('https://', adapter)
# 请求地址
QEURY_URL = 'http://datacenter-web.eastmoney.com/api/data/v1/get'
# HTTP 请求头
EASTMONEY_REQUEST_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.3; WOW64; Trident/7.0; Touch; rv:11.0) like Gecko',
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
# 'Referer': 'http://quote.eastmoney.com/center/gridlist.html',
}
# 请求返回值过滤
RESULT_FIELDS = {
'SECURITY_CODE': '股票代码',
'SECURITY_NAME_ABBR': '股票名称',
'END_DATE': '本次股东户数统计截止日',
'PRE_END_DATE': '上次股东户数统计截止日',
'INTERVAL_CHRATE': '区间涨跌幅',
'AVG_MARKET_CAP': '户均持股市值',
'AVG_HOLD_NUM': '户均持股数量',
'TOTAL_MARKET_CAP': '总市值',
'TOTAL_A_SHARES': '总股本',
'HOLD_NOTICE_DATE': '公告日期',
'HOLDER_NUM': '本次股东户数',
'PRE_HOLDER_NUM': '上次股东户数',
'HOLDER_NUM_CHANGE': '股东户数增减',
'HOLDER_NUM_RATIO': '股东户数较上期变化百分比',
'f2': '最新价',
'f3': '涨跌幅百分比',
}
"""
获取沪深A股市场最新公开的股东数目变化情况: 当作获取所有股票
"""
def get_latest_holder_number() -> pd.DataFrame:
# 请求页码
QEURY_PAGE = 1
PAGE_COUNT = 100
dfs: List[pd.DataFrame] = []
while 1:
if QEURY_PAGE > PAGE_COUNT:
break
# 请求参数
QUERY_PARAM = [
('sortColumns', 'HOLD_NOTICE_DATE,SECURITY_CODE'),
('sortTypes', '-1,-1'),
('pageSize', 500),
('pageNumber', QEURY_PAGE),
('columns', 'SECURITY_CODE,SECURITY_NAME_ABBR,END_DATE,INTERVAL_CHRATE,AVG_MARKET_CAP,AVG_HOLD_NUM,TOTAL_MARKET_CAP,TOTAL_A_SHARES,HOLD_NOTICE_DATE,HOLDER_NUM,PRE_HOLDER_NUM,HOLDER_NUM_CHANGE,HOLDER_NUM_RATIO,END_DATE,PRE_END_DATE',),
('quoteColumns', 'f2,f3'),
('source', 'WEB'),
('client', 'WEB'),
('reportName', 'RPT_HOLDERNUMLATEST'),
]
params = tuple(QUERY_PARAM)
response = session.get(QEURY_URL, headers=EASTMONEY_REQUEST_HEADERS, params=params)
resultJson = response.json()
PAGE_COUNT = resultJson.get('result').get('pages')
print('json len=', len(str(resultJson)), 'page count=', PAGE_COUNT, 'page number=', QEURY_PAGE)
if PAGE_COUNT is None:
break
data = resultJson.get('result').get('data')
if data is None:
break
df = pd.DataFrame(data)
df = df.rename(columns=RESULT_FIELDS)[RESULT_FIELDS.values()]
dfs.append(df)
QEURY_PAGE += 1
if len(dfs) == 0:
df = pd.DataFrame(columns=RESULT_FIELDS.values())
return df
df = pd.concat(dfs, ignore_index=True)
return df
if __name__ == "__main__":
data = get_latest_holder_number()
print(data)
调用如上函数即可:
3. 将爬取的数据通过ORM形式写入数据库
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Float, Index
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy.orm import scoped_session
from datetime import datetime
# 声明一个基类,所有的ORM类都将继承自这个基类
Base = declarative_base()
# 创建引擎
engine = create_engine('sqlite:///a.db', echo=False)
# 绑定引擎
Session = sessionmaker(bind=engine)
# 创建数据库链接池,直接使用session即可为当前线程拿出一个链接对象conn
session = scoped_session(Session)
# 股票基础信息表
class stock_base_info(Base):
__tablename__ = 'stock_base_info'
SECURITY_CODE = Column(String, primary_key=True, index=True, nullable=False, comment="股票代码")
SECURITY_NAME_ABBR = Column(String, nullable=False, comment="股票名称")
TOTAL_MARKET_CAP = Column(Float, comment="总市值")
TOTAL_A_SHARES = Column(Float, comment="总股本")
HOLD_NOTICE_DATE = Column(String, comment="公告日期")
HOLDER_NUM = Column(Integer, comment="本次股东户数")
HOLDER_NUM_RATIO = Column(String, comment="股东户数较上期变化百分比")
PRE_HOLDER_NUM = Column(Integer, comment="上次股东户数")
f2 = Column(String, comment="股票价格")
last_updated = Column(DateTime, default=datetime.now, onupdate=datetime.now, comment="最后更新时间")
__table_args__ = (
Index('idx_SECURITY_CODE_index', SECURITY_CODE, unique=True),
)
# 创建表, 创建所有class xx(Base)
Base.metadata.create_all(engine)
def insert_or_update_stock_info(net_list):
all_instances = session.query(stock_base_info).all()
db_list = []
for v in all_instances:
db_list.append({
'SECURITY_CODE': v.SECURITY_CODE,
'SECURITY_NAME_ABBR': v.SECURITY_NAME_ABBR,
'TOTAL_MARKET_CAP': v.TOTAL_MARKET_CAP,
'TOTAL_A_SHARES': v.TOTAL_A_SHARES,
'HOLD_NOTICE_DATE': v.HOLD_NOTICE_DATE,
'HOLDER_NUM': v.HOLDER_NUM,
'HOLDER_NUM_RATIO': v.HOLDER_NUM_RATIO,
'PRE_HOLDER_NUM': v.PRE_HOLDER_NUM,
'f2': v.f2,
})
# 查询出库中所有的数据 db_list; 从爬取的数据 net_list 中找到库中已有的数据进行更新 形成 db_map 并批量更新
# 将 net_list 中不在 db_list 中的数据,形成 net_map 并批量插入
db_map, not_exist_map = {}, {}
for v in db_list:
db_map[v['SECURITY_CODE']] = v
for item in net_list:
code = item['SECURITY_CODE']
if code in db_map:
db_map[code].update(item)
else:
not_exist_map[code] = item
update_result = list(db_map.values())
insert_result = list(not_exist_map.values())
if len(update_result) > 0:
session.bulk_update_mappings(stock_base_info, update_result)
if len(insert_result) > 0:
session.bulk_insert_mappings(stock_base_info, insert_result)
session.commit()
if __name__ == "__main__":
pass
4. 整体逻辑流程
步骤:
- 爬取数据得到返回结果
- 将返回结果组成数组,并写入数据库
- 对于库中已存在的信息根据 股票代码 进行批量更新,对于不存在的进行批量插入
import stock
import db_orm
def update_base_info_db():
data_df = stock.get_latest_holder_number()
print('获取的股票数量=', data_df.shape)
net_list = []
for index, row in data_df.iterrows():
code = row['股票代码']
name = row['股票名称']
cap = row['总市值']
shares = row['总股本']
data = row['公告日期']
num = row['本次股东户数']
pre_num = row['上次股东户数']
ratio = row['股东户数较上期变化百分比']
f2 = row['最新价'] # float类型
net_list.append({
'SECURITY_CODE': code,
'SECURITY_NAME_ABBR': name,
'TOTAL_MARKET_CAP': cap,
'TOTAL_A_SHARES': shares,
'HOLD_NOTICE_DATE': data,
'HOLDER_NUM': num,
'HOLDER_NUM_RATIO': ratio,
'PRE_HOLDER_NUM': pre_num,
'f2': str(f2),
})
if len(net_list) > 0:
db_orm.insert_or_update_stock_info(net_list=net_list)
if __name__ == "__main__":
update_base_info_db()
最终结果保存在 a.db中,例如:
更多内容可关注我,后续源码包均在上面回复下载:
【爬虫】爬取A股数据系列工具