Python 算法交易实验72 QTV200第一步: 获取原始数据并存入队列

news2025/1/11 12:43:16

说明

最近的数据流往前进了一步,我觉得基本可以开始同步的推进QTV200了。上次规划了整体的数据流,现在开始第一步。

内容

1 结构位置

这是上次的总体图:
在这里插入图片描述
以下是这次要实现的一小部分:

在这里插入图片描述
从结构上,这个是整体数据流的起点,系统因为这些不断 运行的数据才开始“动”了起来,可以称为源点。

2 规范与约束

源点是基于每分钟的节拍从外界读取数据,这部分目前我没用用付费接口(数据的需求量很小),所以基于自律(类似与吃自助餐)的原则,增加一些规范与约束。

  • 我获取的数据不会多,可以约束在60个ETF之内。
  • 每次请求只会查询当前时刻的前10分钟数据(数据少),每只ETF一天最多有60610 ~ 3600条数据
  • 每次请求10条的目的是为了防止某个时隙程序中断或者失效,通过冗余的数据可以在10分钟之内的终端内无缝恢复(从这角度,用某个云服务器做这件事比较合适,下一版考虑)
  • 任务按照秒进行划分,每秒最多提交6只ETF的请求,数据请求总量为60条
  • 周末一定不会发起查询请求

这样可以确保非必要不请求数据,即使请求数据,请求也被均匀分摊,每次的请求量非常之小(环保)

3 工具与方法

通过FLask-APS执行秒级的任务调度,通过Flask-Celery实现各ETF的异步抓取,确保时效的同时,减少CPU开销。(同步方式会独占一个核,很浪费的)。

有的时候倒也不纯粹是为了节约这点计算成本,而是总体成本。设想,一开始只跟踪3~4个ETF,同步状态下并发,可能抢占4个核一小会,还不至于出现卡顿(主机有32核)。但是如果跟踪60个ETF,那么整个机器就会因为这个原因处于卡顿状态,那就真的很没必要。

即使是现阶段,QTV102与Mongo通信的时候更新少量数据,但是是同步状态的,都让我的CPU负载处于一个很奇怪的状态。
在这里插入图片描述
虽然看着很满,其实我知道很多是处于浪费的状态的。

所以,如果用合适的方法进行调度,那么即使是60个ETF,甚至是600个ETF可能单核都足够了。这种效率的提升是很夸张的。目前在IO密集处理这块可以做到的提升最大。未来QTV200实现后,应该会把QTV102整个卸载掉。

3.1 worker

对于每一个ETF来说,处理的流程是相同的。所以,可以先做一个worker,然后调用的时候按不通的ETF代码进行参数化就可以了。

etf_crawl_worker.py

# 0 记录日志
import logging
from logging.handlers import RotatingFileHandler

logger = logging.getLogger('MyLogger')
handler = RotatingFileHandler('/var/log/workers.log', maxBytes=1024*1024*100, backupCount=5)
logger.addHandler(handler)
logger.setLevel(logging.INFO)


# 1 允许传入一个参数
import argparse
def get_arg():
    parser = argparse.ArgumentParser(description='Customized Arguments')
#     parser.add_argument('-p','--pkl', default='Meta')

    # 制程名
    parser.add_argument('--etf_code')
    parser.add_argument('--assigned_work_dt')
    # 准备解析参数
    args = parser.parse_args()

    res_dict = {}
    res_dict['etf_code'] = args.etf_code
    res_dict['assigned_work_dt'] = args.assigned_work_dt
    return res_dict

arg_dict = get_arg()

etf_code = arg_dict['etf_code']
assigned_work_dt = arg_dict.get('assigned_work_dt')

from Basefuncs import * 
import time

# 2 判断是否是可执行时间
if assigned_work_dt is None:
    ts = time.time()
else:
    if assigned_work_dt.strip() == '':
        ts = time.time()
    else:
        ts = inverse_time_str(assigned_work_dt)

cur_dt_str = get_time_str1(ts)
cur_time = cur_dt_str.split()[-1]
 
morning_start = '09:25:00'
morning_end = '11:41:00'
afternoon_start = '12:55:00'
afternoon_end = '15:11:00'

is_moring_work = False 
is_afternoon_work = False
if cur_time >= morning_start and cur_time < morning_end:
    is_moring_work = True 
if cur_time >= afternoon_start and cur_time < afternoon_end:
    is_afternoon_work = True 

is_work_time = is_moring_work or is_afternoon_work


if is_work_time:
    start_dt = get_time_str1( (ts//60)*60 - 600 )
    end_dt = get_time_str1( ts + 60 )

    # 目标队列设置
    qm = QManager(redis_agent_host = 'http://192.168.0.4:24118/',redis_connection_hash = None)
    # qm.info()

    # 3 执行
    # etf_code = '510300'
    import akshare as ak 
    para_dict  ={}
    para_dict['symbol'] = etf_code
    para_dict['period'] = "1"
    para_dict['adjust'] = ''
    para_dict['start_date'] = start_dt
    para_dict['end_date'] = end_dt
    # 如果时间段不对,那么就是空
    df = ak.fund_etf_hist_min_em(**para_dict)

    # 是否获取到了数据
    is_query_data = True if len(df) else False 

    if is_query_data:
        # ak的变量字典映射
        ak_dict = {}
        ak_dict['时间'] = 'data_dt'
        ak_dict['开盘'] = 'open'
        ak_dict['收盘'] = 'close'
        ak_dict['最高'] = 'high'
        ak_dict['最低'] = 'low'
        ak_dict['成交量'] = 'vol'
        ak_dict['成交额'] = 'amt'

        keep_cols = ['data_dt','open','close','high','low','vol','amt']

        cols = list(df.columns)
        new_cols = [ak_dict.get(x) or x for x in cols ]
        df.columns = new_cols
        df1 = df[keep_cols]
        df1['data_source'] = 'AK'
        df1['code'] = etf_code
        df1['market'] = 'SH'
    
        df1['rec_id'] = df1['data_source'] + '_' + df1['market'] + '_' + df1['code'].apply(str) \
                                + '_' + df1['data_dt']

        # 调整股和手
        vol_approximal = df1['amt'] / df1['close']
        maybe_wrong = (vol_approximal / df1['vol']) > 10
        if maybe_wrong.sum() > 0:
            df1['vol'] = df1['vol'] * 100

        stream_name = 'YOURS.stream_in'
        # 写入结果队列
        data_listofdict = df1.to_dict(orient='records')
        resp = qm.parrallel_write_msg(stream_name, data_listofdict)
        logger.info('%s %s 【%s】' % (cur_dt_str,'etf_crawl_worker',resp['msg'] ))   

    else:
        logger.info('%s %s 【未获取到数据】' % (cur_dt_str,'etf_crawl_worker'))   

else:
    logger.info('%s %s 【不在工作时间】' % (cur_dt_str,'etf_crawl_worker'))        

worker分为几部分:

  • 1 设定使用rotate日志,记录每次执行的效果
  • 2 get_arg 获取调用时传入的关键字参数
  • 3 判断是否在工作时间。默认情况,使用当前时间;也可接受使用指定的时间
  • 4 如果是在工作时间,那么推算对应时间的前10分钟和后1分钟,作为参数发起请求
  • 5 获取数据后,还有一个判断交易量是手还是股的小逻辑
  • 6 处理完成后,推入队列,然后记录日志,worker执行完毕

两种调用方法:

  • 1 实时获取
python3 etf_crawl_worker.py --etf_code=510300
  • 2 指定历史时间的获取
python3 etf_crawl_worker.py --etf_code=510300 --assigned_work_dt='2024-06-21 10:00:00'

获取日志查看,第三条数据是因为指定了工作时间。

/var/log/workers.log

└─ $ cat workers.log
2024-06-23 20:27:07 etf_crawl_worker 【不在工作时间】
2024-06-23 20:40:41 etf_crawl_worker 【不在工作时间】
2024-06-23 20:57:48 etf_crawl_worker 【ok,add 12 of 12  messages】

3.2 shell

exe_etf_crawl_worker.sh
chmod +x exe_etf_crawl_worker.sh

本来想让脚本可以接受第二个参数的,但是似乎隔空传带空格的参数很麻烦,另外实时调用(脚本)的时候没有必要再筛选时间了。

#!/bin/bash

# 记录
# sh /home/test_exe.sh com_info_change_pattern running

# 有些情况需要把source替换为 .
# . /root/anaconda3/etc/profile.d/conda.sh
# 激活 base 环境(或你创建的特定环境)
source /root/miniconda3/etc/profile.d/conda.sh

#conda init
conda activate base

cd  /home/workers && python3 etf_crawl_worker.py --etf_code=$1

3.3 flask_celery

将flask_celery升级为可执行脚本的版本

In [1]: import requests as req
   ...: param_dict = {'the_cmd': 'bash /home/test_exe.sh'}
   ...: resp = req.post('http://127.0.0.1:24104/exe_sh/',json = param_dict )

In [2]: resp
Out[2]: <Response [200]>

└─ $ cat tem.log
2024-06-23 22:16:25 - 脚本执行

3.4 将任务发布为flask_aps任务

任务参数如下

# 任务6:执行脚本-qtv200 510300 get 
task006 = {}
task006['machine'] = 'm4'
task006['task_id'] = 'task006'
task006['description'] = '执行脚本,在周一到周五,上午9点到下午4点执行,获取510300的数据。在秒0执行'
task006['pid'] = '.'.join([task006['machine'],task006['task_id']  ])
task006['job_name'] = 'make_a_request' # 这个是对flask-aps来说的
task006['set_to_status'] = 'running'
task006['running_status'] = ''
task006['start_dt'] = '2024-05-01 00:00:00'
task006['end_dt'] = '2099-06-01 00:00:00'
task006['task_kwargs'] = {'para_dict': 
                                {'url':'http://172.17.0.1:24104/exe_sh/',
                                 'json_data':
                                    {
                                    'the_cmd': 'bash /home/exe_etf_crawl_worker.sh 510300'
                                    }

                                }
                            }
task006['interval_para'] ={'second':'0',
                            'day_of_week':'0-4',
                            'hour':'9-16'}
task006 = TaskTable(**task006)
task006.save()

ok,明天等着看吧

In [11]: the_task_obj = TaskTable.objects(machine='m4',task_id ='task006').first()
    ...: exe_a_task(the_task_obj)
set to status running  current state  init
Publish a task
Out[11]: 1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1854614.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2025秋招NLP算法面试真题(二)-史上最全Transformer面试题:灵魂20问帮你彻底搞定Transformer

简单介绍 之前的20个问题的文章在这里&#xff1a; https://zhuanlan.zhihu.com/p/148656446 其实这20个问题不是让大家背答案&#xff0c;而是为了帮助大家梳理 transformer的相关知识点&#xff0c;所以你注意看会发现我的问题也是有某种顺序的。 本文涉及到的代码可以在…

sudo 权限之危险的 bash 命令

文章目录 [toc]事出有因干就完事了创建用户配置 sudo 权限sudo 验证使用 bash 命令执行 chmod 命令使用 bash 命令执行删根 事出有因 使用普通用户安装 tidb 时&#xff0c;发现报错了&#xff0c;报错内容如下&#xff1a; ERROR SSHCommand {"host": "…

green bamboo snake

green bamboo snake 【竹叶青蛇】 为什么写这个呢&#xff0c;因为回县城听说邻居有人被蛇咬伤&#xff0c;虽然不足以危及生命&#xff0c;严重的送去市里了。 1&#xff09;这种经常都是一动不动&#xff0c;会躲在草地、菜地的菜叶里面、果树上、有时候会到民房大厅休息&a…

嵌入式系统中的加解密签名

笔者来了解一下嵌入式系统中的加解密 1、背景与名词解释 笔者最近在做安全升级相关的模块&#xff0c;碰到了一些相关的概念和一些应用场景&#xff0c;特来学习记录一下。 1.1 名词解释 对称加密&#xff1a;对称加密是一种加密方法&#xff0c;使用相同的密钥&#xff08;…

如何搭建饥荒服务器

《饥荒》是由Klei Entertainment开发的一款动作冒险类求生游戏&#xff0c;于2013年4月23日在PC上发行&#xff0c;2015年7月9日在iOS发布口袋版。游戏讲述的是关于一名科学家被恶魔传送到了一个神秘的世界&#xff0c;玩家将在这个异世界生存并逃出这个异世界的故事。《饥荒》…

力扣SQL50 求关注者的数量 分组计数

Problem: 1729. 求关注者的数量 Code select user_id, count(1) followers_count from Followers group by user_id order by user_id;

stm32学习笔记---GPIO输入(代码部分)按键控制LED/光敏传感器控制蜂鸣器

目录 第一个代码&#xff1a;按键控制LED 模块化程序 LED驱动程序 GPIO的四个读取函数 GPIO_ReadInputDataBit GPIO_ReadInputData GPIO_ReadOutputDataBit GPIO_ReadOutputData Key驱动程序 第二个代码&#xff1a;光敏传感器控制蜂鸣器 蜂鸣器驱动代码 光敏传感器…

【内存管理】页面分配机制

前言 Linux内核中是如何分配出页面的&#xff0c;如果我们站在CPU的角度去看这个问题&#xff0c;CPU能分配出来的页面是以物理页面为单位的。也就是我们计算机中常讲的分页机制。本文就看下Linux内核是如何管理&#xff0c;释放和分配这些物理页面的。 伙伴算法 伙伴系统的…

K8s部署高可用Jenkins

小伙伴们大家好呀&#xff01;断更了近一个月&#xff0c;XiXi去学习了一下K8s和Jenkins的相关技术。学习内容有些庞杂&#xff0c;近一个月的时间里我只学会了一些皮毛&#xff0c;更多的内容还需要后面不断学习&#xff0c;不断积累。最主要的是云主机真得很贵&#xff0c;为…

C++ | Leetcode C++题解之第155题最小栈

题目&#xff1a; 题解&#xff1a; class MinStack {stack<int> x_stack;stack<int> min_stack; public:MinStack() {min_stack.push(INT_MAX);}void push(int x) {x_stack.push(x);min_stack.push(min(min_stack.top(), x));}void pop() {x_stack.pop();min_sta…

多物理场仿真对新能源汽车用电机优化分析 衡祖仿真

1、问题所在 为了改善空气质量&#xff0c;减少环境污染&#xff0c;减少对石油的依赖&#xff0c;降低能源安全风险&#xff0c;国家大力倡导发展新能源汽车&#xff0c;大量新能源车企应运而生&#xff0c;竞争日趋激烈。使用经济效率较高的电机对于增强企业市场竞争力非常重…

常用加密算法之 RSA 简介及应用

引言 相关博文&#xff1a; Spring Boot 开发 – 常用加密算法简介&#xff08;一&#xff09;常用加密算法之 SM4 简介及应用 一、RSA算法简介 RSA &#xff08;Rivest-Shamir-Adleman&#xff09; 算法是一种非对称加密技术&#xff0c;由Ron Rivest、Adi Shamir和Leonar…

本地离线模型搭建指南-中文大语言模型底座选择依据

搭建一个本地中文大语言模型&#xff08;LLM&#xff09;涉及多个关键步骤&#xff0c;从选择模型底座&#xff0c;到运行机器和框架&#xff0c;再到具体的架构实现和训练方式。以下是一个详细的指南&#xff0c;帮助你从零开始构建和运行一个中文大语言模型。 本地离线模型搭…

spdlog生产者消费者模式

spdlog生产者消费者模式 spdlog提供了异步模式&#xff0c;显示的创建async_logger, 配合环形队列实现的消息队列和线程池实现了异步模式。异步logger提交日志信息和自身指针&#xff0c; 任务线程从消息队列中取出消息后执行对应的sink和flush动作。 1. 环形队列 1.1 环形队…

独角兽品牌獭崎酱酒:高性价比的酱香之选

在酱香型白酒领域中&#xff0c;獭崎酱酒以其独特的品牌定位和高性价比迅速崛起&#xff0c;成为市场上备受关注的独角兽品牌。作为贵州茅台镇的一款新秀酱香酒&#xff0c;獭崎酱酒不仅传承了百年酿造工艺&#xff0c;还以创新的商业模式和亲民的价格赢得了广大消费者的青睐。…

双指针算法——部分OJ题详解

目录 关于双指针算法&#xff1a; 1&#xff0c;对撞指针 2&#xff0c;快慢指针 部分OJ题详解 283.移动零 1089.复写零 202.快乐数 11.盛水最多的容器 611.有效三角形的个数 剑指offer 57.和为s的两个数字 15.三数之和 18.四数之和 关于双指针算法&#xff1a; …

硬盘数据恢复软件,推荐5种适合你的方法来恢复硬盘数据

硬盘数据恢复软件&#xff0c;作为解决数据丢失问题的关键工具&#xff0c;帮助用户在重要文件丢失时迅速找回数据。本教程介绍5种恢复实用硬盘数据方法&#xff0c;适应不同类型和严重程度的数据损坏情况。 文章摘要&#xff1a; 一. 硬盘数据恢复软件 二. 数据恢复原理 三. …

ThinkPHP:查询数据库数据之后,更改查询数据的字段名称

一、原始查询数据 含有字段item_no&#xff0c;lot_num&#xff0c;position $data[brushed] db::table(wip_station_transaction) ->where([wip_entity_name>$wip_entity_name,line_code>$line_code,]) ->field([item_no, lot_num, position]) ->select(); …

React18中各种Hooks用法总结( 内附案例讲解)

React中各种Hooks用法总结 内附案例讲解 一、useState useState 是一个 React Hook&#xff0c;它允许你向组件添加一个 状态变量。 import React, { FC, memo, useState } from react import { MainContainer } from ./style interface IProps {children?: React.ReactNo…

上新:NFTScan 正式上线 Bitcoin-brc20 浏览器!

近日&#xff0c;NFTScan 团队正式对外发布了 Bitcoin-brc20 浏览器&#xff0c;将为 Bitcoin 生态的 NFT 开发者和用户提供简洁高效的 NFT 数据搜索查询服务。作为比特币生态中最火热的标准之一&#xff0c;brc20 也吸引着广泛的关注。洞悉其巨大潜力&#xff0c;NFTScan 对 b…