Python 算法交易实验76 QTV200日常推进

news2024/11/27 21:02:20

说明

最近实在太忙, 没太有空推进这个项目,我想还是尽量抽一点点时间推进具体的工程,然后更多的还是用碎片化的时间从整体上对qtv200进行设计完善。有些结构的问题其实是需要理清的,例如:

  • 1 要先基于原始数据进行描述性分析:采用「二阶导」来描述变化趋势
  • 2 先模式后模型:基于易于解释的数据生成模式,这样模式本身也是易于理解的
  • 3 UCS time loop: 按照统一时间轴进行统一的调度。这个非常有用,算是固化下来了。Python一些可能用的到的函数系列130 UCS-Time Brick

内容

1 增加源数据

增加ETF的行情数据跟踪

按照之前设计的方式,增加若干标的的数据跟踪。

按照ETF和执行秒数的差异,进行任务的参数化。本身其实没啥关系,本着不给别人服务器带来不必要的负担,按秒数分开会非常均匀。

...

# 任务19:执行脚本-qtv200 159845 get 
task019 = {}
task019['machine'] = 'm4'
task019['task_id'] = 'task019'
task019['description'] = '执行脚本,在周一到周五,上午9点到下午4点执行,获取 159845 的数据。在秒1执行'
task019['pid'] = '.'.join([task019['machine'],task019['task_id']  ])
task019['job_name'] = 'make_a_request' # 这个是对flask-aps来说的
task019['set_to_status'] = 'running'
task019['running_status'] = ''
task019['start_dt'] = '2024-05-01 00:00:00'
task019['end_dt'] = '2099-06-01 00:00:00'
task019['task_kwargs'] = {'para_dict': 
                                {'url':'http://172.17.0.1:24104/exe_sh/',
                                 'json_data':
                                    {
                                    'the_cmd': 'bash /home/exe_etf_crawl_worker.sh 159845'
                                    }

                                }
                            }
task019['interval_para'] ={'second':'7',
                            'day_of_week':'0-4',
                            'hour':'9-16'}
task019 = TaskTable(**task019)
task019.save()

# 任务20:执行脚本-qtv200 512690 get 
task020 = {}
task020['machine'] = 'm4'
task020['task_id'] = 'task020'
task020['description'] = '执行脚本,在周一到周五,上午9点到下午4点执行,获取 512690 的数据。在秒1执行'
task020['pid'] = '.'.join([task020['machine'],task020['task_id']  ])
task020['job_name'] = 'make_a_request' # 这个是对flask-aps来说的
task020['set_to_status'] = 'running'
task020['running_status'] = ''
task020['start_dt'] = '2024-05-01 00:00:00'
task020['end_dt'] = '2099-06-01 00:00:00'
task020['task_kwargs'] = {'para_dict': 
                                {'url':'http://172.17.0.1:24104/exe_sh/',
                                 'json_data':
                                    {
                                    'the_cmd': 'bash /home/exe_etf_crawl_worker.sh 512690'
                                    }

                                }
                            }
task020['interval_para'] ={'second':'8',
                            'day_of_week':'0-4',
                            'hour':'9-16'}
task020 = TaskTable(**task020)
task020.save()

获取任务并进行发布

def exe_a_task(the_task_obj):
    the_task_fsm = FlaskAPSTask(transitions = transitions, wflask=wf, task_id = the_task_obj.task_id)
    the_task_fsm.action_rule(the_task_obj)
    current_task_status = the_task_fsm.get_a_task_status(task_id = the_task_obj.task_id)
    return the_task_obj.update(set__update_time=get_time_str1(), set__running_status =current_task_status)

tasks = TaskTable.objects(machine='m4')

for the_task_obj in  tasks:
    exe_a_task(the_task_obj)

这样就好了,周一就会自动更新。

2 存量数据

存量数据我从ricequant拿,这里其实涉及到不同数据源的融合问题。行情数据较为简单,我的方法是全量数据抓取后,按同样的方式处理,然后抽几条校验一下。

这不算是严格的数据校验,而严格的方法是我目前没法花时间去研究的。按以往的经验来来看,问题不大。

import pandas as pd
start_date = '2000-01-01'
end_date = '2099-12-31'

# 真实取的是分钟
df = get_price('510300.XSHG',start_date=start_date,end_date=end_date,frequency='1m')
df1 = df.reset_index()
df1.to_csv('510300_all_data_20240706.csv', index=False)

在这里插入图片描述
可以看到 数据下载后进行转换。
一种比较好的方式是用pydantic。这相当于定义了一个数据模型,并尽可能的按设计进行校验和转换。

from typing import List, Tuple, Optional
from pydantic import BaseModel, Field, field_validator

# 将一般字符串转为UCS 名称
def dt_str2ucs_blockname(some_dt_str):
    some_dt_str1   =some_dt_str.replace('-','.').replace(' ','.').replace(':','.')
    return '.'.join(some_dt_str1.split('.')[:4])

'''
dt_str2ucs_blockname('2024-06-24 09:30:00')
'2024.06.24.09'
'''
# Layer1(Dict) 
class MarketData(BaseModel):
    data_dt:str
    open: float
    close:float
    high:float
    low:float
    vol:float
    amt:float

    @property
    def brick(self):
        return dt_str2ucs_blockname(self.data_dt)
    
    @property
    def block(self):
        return self.brick[:self.brick.rfind('.')]
    
    @property
    def part(self):
        return self.block[:self.block.rfind('.')]
    
    @property
    def shard(self):
        return self.part[:self.part.rfind('.')]

    def dict(self):
        data = {}
        data['data_dt']  = self.data_dt
        data['open']  = self.open
        data['close']  = self.close
        data['high']  = self.high
        data['low']  = self.low
        data['vol']  = self.vol
        data['amt']  = self.amt

        data['brick']  = self.brick
        data['block']  = self.block
        data['part']  = self.part
        data['shard']  = self.shard
        return data


md = MarketData(**{'data_dt': '2012-05-28 09:31:00',
 'open': 2.1121,
 'close': 2.1105,
 'high': 2.1146,
 'low': 2.1096,
 'vol': 50330003.0,
 'amt': 128372828.0})

md.dict()
{'data_dt': '2012-05-28 09:31:00',
 'open': 2.1121,
 'close': 2.1105,
 'high': 2.1146,
 'low': 2.1096,
 'vol': 50330003.0,
 'amt': 128372828.0,
 'brick': '2012.05.28.09',
 'block': '2012.05.28',
 'part': '2012.05',
 'shard': '2012'}

不过之前没有按照这种方式实现,所以现在要确认一下当时爬取时的格式,然后输入队列即可。

之前crawl的核心字段映射与转换。

字段变量
行情data_dt,open,close,high,low,vol,amt
分类data_source, code , market
idrec_id
变换vol将转为股的单位
    if is_query_data:
        # ak的变量字典映射
        ak_dict = {}
        ak_dict['时间'] = 'data_dt'
        ak_dict['开盘'] = 'open'
        ak_dict['收盘'] = 'close'
        ak_dict['最高'] = 'high'
        ak_dict['最低'] = 'low'
        ak_dict['成交量'] = 'vol'
        ak_dict['成交额'] = 'amt'

        keep_cols = ['data_dt','open','close','high','low','vol','amt']

        cols = list(df.columns)
        new_cols = [ak_dict.get(x) or x for x in cols ]
        df.columns = new_cols
        df1 = df[keep_cols]
        df1['data_source'] = 'AK'
        df1['code'] = etf_code
        df1['market'] = 'SH'
    
        df1['rec_id'] = df1['data_source'] + '_' + df1['market'] + '_' + df1['code'].apply(str) \
                                + '_' + df1['data_dt']

        # 调整股和手
        vol_approximal = df1['amt'] / df1['close']
        maybe_wrong = (vol_approximal / df1['vol']) > 10
        if maybe_wrong.sum() > 0:
            df1['vol'] = df1['vol'] * 100

抽取一条数据比对

AK的数据14:58分的close是3.461, vol 是4116900
在这里插入图片描述
RQ的数据也是这样的,数据一致
在这里插入图片描述
数据的唯一主键pid由etf 和时间戳共同构成,所以不会出现重复数据(即使数据源不同)。

所以接下来,将数据推入队列即可(稍微注意的是,入口队列长度为10万,应对增量是足够的),存量可以控制下断点续传

整理之后的代码

from Basefuncs import *

# 1 read
df  = pd.read_csv('510300_all_20230911.csv')
etf_code = '510300'

# 2 rename
df['data_dt'] = df['datetime'].apply(str)
df['open'] = df['open']
df['close'] = df['close']
df['high'] = df['high']
df['low'] = df['low']
df['vol'] = df['volume']
df['amt'] = df['total_turnover']

keep_cols = ['data_dt', 'open', 'close', 'high', 'low', 'vol', 'amt']

df1 = df[keep_cols]

# 3 adjust vol
vol_approximal = df1['amt'] / df1['close']
maybe_wrong = (vol_approximal/df1['vol']) >10

# 4 othercols 
df1['data_source'] = 'RQ'
df1['code'] = etf_code
df1['market'] = 'SH'
df1['rec_id'] = df1['data_source'] + '_' + df1['market'] + '_' + df1['code'].apply(str) \
                                + '_' + df1['data_dt']

# 5 Q Operation
qm = QManager(redis_agent_host = 'http://192.168.0.4:24118/',redis_connection_hash = None)
stream_name = 'BUFF.andy.calnet.qtv200.stream_in'

## data slice
data_listofdict = df1.to_dict(orient='records')
data_listofdict2 = slice_list_by_batch2(data_listofdict, 10000)

start_id = 0

# run until complete
for i, some_listofdict in enumerate(data_listofdict2):
    if i >= start_id:
        print('current index : ', i )
        tem_resp = qm.parrallel_write_msg(stream_name, some_listofdict)
        if tem_resp['status'] is False:
            break
        start_id = i 

这样就完成了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1903233.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【ROS2】初级:客户端-编写一个简单的服务和客户端(Python)

目标:使用 Python 创建并运行服务节点和客户端节点。 教程级别:初学者 时间:20 分钟 目录 背景 先决条件 任务 1. 创建一个包2. 编写服务节点3. 编写客户端节点4. 构建并运行 摘要 下一步 相关内容 背景 当节点通过服务进行通信时&#xff0c…

【机器学习】机器学习重塑广告营销:精准触达,高效转化的未来之路

📝个人主页🌹:Eternity._ 🌹🌹期待您的关注 🌹🌹 ❀目录 📒1. 引言📙2. 机器学习基础与广告营销的结合🧩机器学习在广告营销中的核心应用领域🌹用…

将大型语言模型模块化打造协作智能体

B UILDING C OOPERATIVE E MBODIED A GENTS MODULARLY WITH L ARGE L ANGUAGE M ODELS 论文链接: https://arxiv.org/abs/2307.02485https://arxiv.org/abs/2307.02485 1.概述 在去中心化控制及多任务环境中,多智能体合作问题因原始感官观察、高昂…

穿梭印度风情记:维乐 Angel Revo Halo坐垫,让每一寸旅程闪耀光辉!

想象骑乘在印度的万花筒世界中,斑斓色彩与悠久历史交织,每一转轮都是对神秘东方的深刻探索。在这样的骑行之旅中,维乐Angel Revo Halo坐垫不仅是你的坐骑上的宝石,更是舒适与探险的完美媒介。    探索印度的色彩与灵魂&#x…

每日一题~oj(贪心)

对于位置 i来说,如果 不选她,那她的贡献是 vali-1 *2,如果选他 ,那么她的贡献是 ai. 每一个数的贡献 是基于前一个数的贡献 来计算的。只要保证这个数的前一个数的贡献是最优的,那么以此类推下去,整体的val…

【项目设计】负载均衡式——Online Judge

负载均衡式——Online Judge😎 前言🙌Online Judge 项目一、项目介绍二、项目技术栈三、项目使用环境四、项目宏观框架五、项目后端服务实现过程1、comm模块设计1.1 Log.hpp实现1.2 Util.hpp实现 2、compiler_server 模块设计2.1compile.hpp文件代码编写…

【QT】容器类控件

目录 概述 Group Box 核心属性 Tab Widget 核心属性 核心信号 核心方法 使用示例: 布局管理器 垂直布局 核心属性 使用示例: 水平布局 核⼼属性 (和 QVBoxLayout 属性是⼀致的) 网格布局 核心属性 使用示例: 示例&#x…

【C++ OpenCV】机器视觉-二值图像和灰度图像的膨胀、腐蚀、开运算、闭运算

原图 结果图 //包含头文件 #include <opencv2/opencv.hpp>//命名空间 using namespace cv; using namespace std;//全局函数声明部分//我的腐蚀运算 Mat Erode(Mat src, Mat Mask, uint32_t x0, uint32_t y0) {uint32_t x 0, y 0;Mat dst(src.rows, src.cols, CV_8U…

设计模式之状态机模式

一、状态机模式介绍 状态机模式&#xff08;State Machine Pattern&#xff09;是一种用于描述对象行为的软件设计模式&#xff0c;属于行为型设计模式。在状态机模式中&#xff0c;对象的行为取决于其内部状态&#xff0c;并且在不同的状态下&#xff0c;对象可能会有不同的行…

RAG 案框架(Qanything、RAGFlow、FastGPT、智谱RAG)对比

各家的技术方案 有道的QAnything 亮点在&#xff1a;rerank RAGFLow 亮点在&#xff1a;数据处理index 智谱AI 亮点在文档解析、切片、query改写及recall模型的微调 FastGPT 优点&#xff1a;灵活性更高 下面分别按照模块比较各框架的却别 功能模块QAnythingRAGFLowFastG…

【手写数据库内核组件】01 解析树的结构,不同类型的数据结构组多层的链表树,抽象类型统一引用格式

不同类型的链表 ​专栏内容&#xff1a; postgresql使用入门基础手写数据库toadb并发编程 个人主页&#xff1a;我的主页 管理社区&#xff1a;开源数据库 座右铭&#xff1a;天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物. 文章目录 不同类型…

Day05-04-持续集成总结

Day05-04-持续集成总结 1. 持续集成2. 代码上线目标项目 1. 持续集成 git 基本使用, 拉取代码,上传代码,分支操作,tag标签 gitlab 用户 用户组 项目 , 备份,https,优化. jenkins 工具平台,运维核心, 自由风格工程,maven风格项目,流水线项目, 流水线(pipeline) mavenpom.xmlta…

Android 10年,35岁,该往哪个方向发力

网上看到个网友发的帖子&#xff0c;觉的这个可能是很多开发人员都会面临和需要思考的问题。 不管怎样&#xff0c; 要对生活保持乐观&#xff0c;生活还是有很多的选择和出路的。 &#xff08;内容来自网络&#xff0c;不代表个人观点&#xff09; 《Android Camera开发入门》…

关闭vue3中脑瘫的ESLine

在创建vue3的时候脑子一抽选了ESLine,然后这傻卵子ESLine老是给我报错 博主用的idea开发前端 ,纯粹是用不惯vscode 关闭idea中的ESLine,这个只是取消红色波浪线, 界面中的显示 第二步,在vue.config.js中添加 lintOnSave: false 到这里就ok了,其他的我试过了一点用没有

专业140+总分420+天津大学815信号与系统考研经验天大电子信息与通信工程,真题,大纲,参考书。

顺利上岸天津大学&#xff0c;专业课815信号与系统140&#xff0c;总分420&#xff0c;总结一些自己的复习经历&#xff0c;希望对于报考天大的同学有些许帮助&#xff0c;少走弯路&#xff0c;顺利上岸。专业课&#xff1a; 815信号与系统&#xff1a;指定教材吴大正&#xf…

飞书 API 2-4:如何使用 API 将数据写入数据表

一、引入 上一篇创建好数据表之后&#xff0c;接下来就是写入数据和对数据的处理。 本文主要探讨数据的插入、更新和删除操作。所有的操作都是基于上一篇&#xff08;飞书 API 2-4&#xff09;创建的数据表进行操作。上面最终的数据表只有 2 个字段&#xff1a;序号和邮箱。序…

(完整音频)DockerHub、OpenAI、GitCode,脱钩时代,我们该如何自处?

本期主播 朱峰&#xff1a;「津津乐道播客网络」创始人&#xff0c;产品及技术专家。&#xff08;微博&#xff1a;zhufengme&#xff09;高春辉&#xff1a;「科技乱炖」主播。“中国互联网站长第一人”&#xff0c;科技、互联网领域的连续创业者。&#xff08;微博&#xff1…

SCI一区TOP|准随机分形搜索算法(QRFS)原理及实现【免费获取Matlab代码】

目录 1.背景2.算法原理2.1算法思想2.2算法过程 3.结果展示4.参考文献5.代码获取 1.背景 2024年&#xff0c;LA Beltran受到分形几何、低差异序列启发&#xff0c;提出了准随机分形搜索算法&#xff08;Quasi-random Fractal Search, QRFS&#xff09;。 2.算法原理 2.1算法思…

内容营销专家刘鑫炜:网站排名需考虑哪些SEO优化技巧?

网站排名的SEO优化技巧包括&#xff1a; 1. 关键词研究&#xff1a;了解目标受众的搜索关键词&#xff0c;将这些关键词合理地应用在网站的标题、描述、正文和标签中&#xff0c;有助于提高网站排名。 2. 内容优化&#xff1a;创建高质量、有价值的内容&#xff0c;可以吸引搜…

【leetcode52-55图论、56-63回溯】

图论 回溯【56-63】 46.全排列 class Solution:def permute(self, nums):result []self.backtracking(nums, [], [False] * len(nums), result)return resultdef backtracking(self, nums, path, used, result):if len(path) len(nums):result.append(path[:])returnfor i …