说明
最近实在太忙, 没太有空推进这个项目,我想还是尽量抽一点点时间推进具体的工程,然后更多的还是用碎片化的时间从整体上对qtv200进行设计完善。有些结构的问题其实是需要理清的,例如:
- 1 要先基于原始数据进行描述性分析:采用「二阶导」来描述变化趋势
- 2 先模式后模型:基于易于解释的数据生成模式,这样模式本身也是易于理解的
- 3 UCS time loop: 按照统一时间轴进行统一的调度。这个非常有用,算是固化下来了。Python一些可能用的到的函数系列130 UCS-Time Brick
内容
1 增加源数据
增加ETF的行情数据跟踪
按照之前设计的方式,增加若干标的的数据跟踪。
按照ETF和执行秒数的差异,进行任务的参数化。本身其实没啥关系,本着不给别人服务器带来不必要的负担,按秒数分开会非常均匀。
...
# 任务19:执行脚本-qtv200 159845 get
task019 = {}
task019['machine'] = 'm4'
task019['task_id'] = 'task019'
task019['description'] = '执行脚本,在周一到周五,上午9点到下午4点执行,获取 159845 的数据。在秒1执行'
task019['pid'] = '.'.join([task019['machine'],task019['task_id'] ])
task019['job_name'] = 'make_a_request' # 这个是对flask-aps来说的
task019['set_to_status'] = 'running'
task019['running_status'] = ''
task019['start_dt'] = '2024-05-01 00:00:00'
task019['end_dt'] = '2099-06-01 00:00:00'
task019['task_kwargs'] = {'para_dict':
{'url':'http://172.17.0.1:24104/exe_sh/',
'json_data':
{
'the_cmd': 'bash /home/exe_etf_crawl_worker.sh 159845'
}
}
}
task019['interval_para'] ={'second':'7',
'day_of_week':'0-4',
'hour':'9-16'}
task019 = TaskTable(**task019)
task019.save()
# 任务20:执行脚本-qtv200 512690 get
task020 = {}
task020['machine'] = 'm4'
task020['task_id'] = 'task020'
task020['description'] = '执行脚本,在周一到周五,上午9点到下午4点执行,获取 512690 的数据。在秒1执行'
task020['pid'] = '.'.join([task020['machine'],task020['task_id'] ])
task020['job_name'] = 'make_a_request' # 这个是对flask-aps来说的
task020['set_to_status'] = 'running'
task020['running_status'] = ''
task020['start_dt'] = '2024-05-01 00:00:00'
task020['end_dt'] = '2099-06-01 00:00:00'
task020['task_kwargs'] = {'para_dict':
{'url':'http://172.17.0.1:24104/exe_sh/',
'json_data':
{
'the_cmd': 'bash /home/exe_etf_crawl_worker.sh 512690'
}
}
}
task020['interval_para'] ={'second':'8',
'day_of_week':'0-4',
'hour':'9-16'}
task020 = TaskTable(**task020)
task020.save()
获取任务并进行发布
def exe_a_task(the_task_obj):
the_task_fsm = FlaskAPSTask(transitions = transitions, wflask=wf, task_id = the_task_obj.task_id)
the_task_fsm.action_rule(the_task_obj)
current_task_status = the_task_fsm.get_a_task_status(task_id = the_task_obj.task_id)
return the_task_obj.update(set__update_time=get_time_str1(), set__running_status =current_task_status)
tasks = TaskTable.objects(machine='m4')
for the_task_obj in tasks:
exe_a_task(the_task_obj)
这样就好了,周一就会自动更新。
2 存量数据
存量数据我从ricequant拿,这里其实涉及到不同数据源的融合问题。行情数据较为简单,我的方法是全量数据抓取后,按同样的方式处理,然后抽几条校验一下。
这不算是严格的数据校验,而严格的方法是我目前没法花时间去研究的。按以往的经验来来看,问题不大。
import pandas as pd
start_date = '2000-01-01'
end_date = '2099-12-31'
# 真实取的是分钟
df = get_price('510300.XSHG',start_date=start_date,end_date=end_date,frequency='1m')
df1 = df.reset_index()
df1.to_csv('510300_all_data_20240706.csv', index=False)
可以看到 数据下载后进行转换。
一种比较好的方式是用pydantic。这相当于定义了一个数据模型,并尽可能的按设计进行校验和转换。
from typing import List, Tuple, Optional
from pydantic import BaseModel, Field, field_validator
# 将一般字符串转为UCS 名称
def dt_str2ucs_blockname(some_dt_str):
some_dt_str1 =some_dt_str.replace('-','.').replace(' ','.').replace(':','.')
return '.'.join(some_dt_str1.split('.')[:4])
'''
dt_str2ucs_blockname('2024-06-24 09:30:00')
'2024.06.24.09'
'''
# Layer1(Dict)
class MarketData(BaseModel):
data_dt:str
open: float
close:float
high:float
low:float
vol:float
amt:float
@property
def brick(self):
return dt_str2ucs_blockname(self.data_dt)
@property
def block(self):
return self.brick[:self.brick.rfind('.')]
@property
def part(self):
return self.block[:self.block.rfind('.')]
@property
def shard(self):
return self.part[:self.part.rfind('.')]
def dict(self):
data = {}
data['data_dt'] = self.data_dt
data['open'] = self.open
data['close'] = self.close
data['high'] = self.high
data['low'] = self.low
data['vol'] = self.vol
data['amt'] = self.amt
data['brick'] = self.brick
data['block'] = self.block
data['part'] = self.part
data['shard'] = self.shard
return data
md = MarketData(**{'data_dt': '2012-05-28 09:31:00',
'open': 2.1121,
'close': 2.1105,
'high': 2.1146,
'low': 2.1096,
'vol': 50330003.0,
'amt': 128372828.0})
md.dict()
{'data_dt': '2012-05-28 09:31:00',
'open': 2.1121,
'close': 2.1105,
'high': 2.1146,
'low': 2.1096,
'vol': 50330003.0,
'amt': 128372828.0,
'brick': '2012.05.28.09',
'block': '2012.05.28',
'part': '2012.05',
'shard': '2012'}
不过之前没有按照这种方式实现,所以现在要确认一下当时爬取时的格式,然后输入队列即可。
之前crawl的核心字段映射与转换。
字段 | 变量 |
---|---|
行情 | data_dt,open,close,high,low,vol,amt |
分类 | data_source, code , market |
id | rec_id |
变换 | vol将转为股的单位 |
if is_query_data:
# ak的变量字典映射
ak_dict = {}
ak_dict['时间'] = 'data_dt'
ak_dict['开盘'] = 'open'
ak_dict['收盘'] = 'close'
ak_dict['最高'] = 'high'
ak_dict['最低'] = 'low'
ak_dict['成交量'] = 'vol'
ak_dict['成交额'] = 'amt'
keep_cols = ['data_dt','open','close','high','low','vol','amt']
cols = list(df.columns)
new_cols = [ak_dict.get(x) or x for x in cols ]
df.columns = new_cols
df1 = df[keep_cols]
df1['data_source'] = 'AK'
df1['code'] = etf_code
df1['market'] = 'SH'
df1['rec_id'] = df1['data_source'] + '_' + df1['market'] + '_' + df1['code'].apply(str) \
+ '_' + df1['data_dt']
# 调整股和手
vol_approximal = df1['amt'] / df1['close']
maybe_wrong = (vol_approximal / df1['vol']) > 10
if maybe_wrong.sum() > 0:
df1['vol'] = df1['vol'] * 100
抽取一条数据比对
AK的数据14:58分的close是3.461, vol 是4116900
RQ的数据也是这样的,数据一致
数据的唯一主键pid由etf 和时间戳共同构成,所以不会出现重复数据(即使数据源不同)。
所以接下来,将数据推入队列即可(稍微注意的是,入口队列长度为10万,应对增量是足够的),存量可以控制下断点续传
整理之后的代码
from Basefuncs import *
# 1 read
df = pd.read_csv('510300_all_20230911.csv')
etf_code = '510300'
# 2 rename
df['data_dt'] = df['datetime'].apply(str)
df['open'] = df['open']
df['close'] = df['close']
df['high'] = df['high']
df['low'] = df['low']
df['vol'] = df['volume']
df['amt'] = df['total_turnover']
keep_cols = ['data_dt', 'open', 'close', 'high', 'low', 'vol', 'amt']
df1 = df[keep_cols]
# 3 adjust vol
vol_approximal = df1['amt'] / df1['close']
maybe_wrong = (vol_approximal/df1['vol']) >10
# 4 othercols
df1['data_source'] = 'RQ'
df1['code'] = etf_code
df1['market'] = 'SH'
df1['rec_id'] = df1['data_source'] + '_' + df1['market'] + '_' + df1['code'].apply(str) \
+ '_' + df1['data_dt']
# 5 Q Operation
qm = QManager(redis_agent_host = 'http://192.168.0.4:24118/',redis_connection_hash = None)
stream_name = 'BUFF.andy.calnet.qtv200.stream_in'
## data slice
data_listofdict = df1.to_dict(orient='records')
data_listofdict2 = slice_list_by_batch2(data_listofdict, 10000)
start_id = 0
# run until complete
for i, some_listofdict in enumerate(data_listofdict2):
if i >= start_id:
print('current index : ', i )
tem_resp = qm.parrallel_write_msg(stream_name, some_listofdict)
if tem_resp['status'] is False:
break
start_id = i
这样就完成了。