Python 算法交易实验74 QTV200第二步(改): 数据清洗并写入Mongo

news2025/1/22 22:58:56

说明

之前第二步是打算进入Clickhouse的,实测下来有一些bug
在这里插入图片描述
可以看到有一些分钟数据重复了。简单分析原因:

  • 1 起异步任务时,还是会有两个任务重复的问题,这个在同步情况下是不会出现的
  • 2 数据库没有upsert模式。clickhouse是最近刚应用的库,我还没有完善其操作模式。

解决思路:

  • 1 既然采用了异步,就没有办法去控制其前置的依赖和顺序,否则就会退回到同步状态。而且从效率上,n次异步IO的cpu开销,可能也只相当于1次的同步开销。可以认为,异步是更轻松,但是更’粗心’的工作状态。所以在设计上,如果每次的操作都是“无害”的,那么就没问题。这里的数据同步任务,最重要的是不重不漏,所以只要能够确保数据不重不漏即可。
  • 2 每次负责crawl的worker不直接操作数据库是对的,这可以避免过多的数据库操作开销。在同步结果的队列中,每个周期执行一次Mongo操作是完全没问题的。同步队列中可以有一些冗余的数据,在整合数据时就删除了。剩余的部分,可以直接采用upsert的方式存入。

结论:使用Mongo作为第一个数据节点的持久化。

反思点:

  • 1 对于数据的集成,可能还是Mongo更合适。因为不必事先定义表结构,而且之前做了一些开发,Mongo的操作方式非常完善。擅长在记录级数据的复杂度操作。
  • 2 clickhouse更适合用于在我的数据系统中直接输出的数据,特别是空间数据,按UCS方式规范。擅长在块级别数据的效率操作。

内容

1 目标数据库准备

采用m4.24086

很巧,QTV102的数据也在这里,所以QTV200的数据可以继续放在这里 。

回顾一下WMongo的操作,有好一阵子没用了。

from Basefuncs import *
# analysis 
target_server = 'm4.24086'
# machine_name = 'm4'
machine_name = get_machine_name()

# 在本地建立连接文件,避免每次都向mymeta请求数据。 随主机变化,这里有可能要修改(TryConnectionOnceAndForever)中关于mymeta的连接配置。
try:
    target_w = from_pickle(target_server)
    color_print('【Loading target_w】from pickle')
except:
    w = WMongo('w')
    target_w = w.TryConnectionOnceAndForever(server_name =target_server, current_machine_name = machine_name)
    to_pickle(target_w, target_server)

有一些设计是好的,只要给出目标服务器名称,对象就会自动寻找合适的连接方式(local、lan、wan)来完成连接,对应的连接保存为本地文件。之后可以考虑通过GlobalBuffer来简化判断,还有neo4j来存储和管理关系。

进入队列的字段名,不允许有 _msg_id 字段
Wmongo_v9000.012
设置当前连接 local
>>> Switching To Mymeta
设置当前连接 local
在CN001访问mymeta,通用
当前机器的名称: m4
1.当前使用的MongeAgent:http://172.17.0.1:24011/
2.Tier1:meta, Tier2:servers
3.ConnectionHash:e8d1bc791049988d89465d5ce24d993b
4.FilterDict:{'my_server_pkey': 'm4.24086'}
5.Limits:1
6.Sort:
7.Skip:0
>>> Hit Records
当前机器的局网: my.cn001
【I】目标服务的机器:m4, 目标服务的机器局网:my.cn001
【I】采用local方式连接目标主机
Wmongo_v9000.012
设置当前连接 local
获取已有连接
target connection hash: d35632b63b77f17d4d12808fb707cb1f
data save to pickle:  ./m4.24086.pkl

然后就可以通过对象操作了

target_w.cname_recs()
{'data': {'QTV102': {'log_monitor': 264276,
   'log_sniffer': 792827,
   'log_worker': 264276,
   'stats': 5177,
   'step1_mongo_in': 2895114,
   'step1_mongo_meta': 2895114,
   'step1_mongo_out': 2895114},
  'QTV102_Capital_Data': {'capital_daily': 32787},
  'QTV102_Model_Signal': {'log_monitor': 264277,
   'log_sniffer': 792827,
   'log_worker': 0,
   'stats': 5218,
   'step1_mongo_in': 2436678,
   'step1_mongo_meta': 0,
   'step1_mongo_out': 16860560},
  'QTV102_Strategy': {'strategy_online': 58,
   'trade_orders': 128,
   'trade_strategy': 23},
  'QuantData001': {'log_monitor': 264276,
   'log_sniffer': 792830,
   'log_worker': 264276,
   'stats': 5178,
   'step1_mongo_in': 460297,
   'step1_mongo_meta': 460297,
   'step1_mongo_out': 460297},
  'QuantData_510500': {'log_monitor': 264277,
   'log_sniffer': 792831,
   'log_worker': 264278,
   'stats': 5178,
   'step1_mongo_in': 657702,
   'step1_mongo_meta': 657702,
   'step1_mongo_out': 657702},
  'SmartQuant_512660': {'log_monitor': 264277,
   'log_sniffer': 792830,
   'log_worker': 0,
   'stats': 5219,
   'step1_mongo_in': 460297,
   'step1_mongo_meta': 0,
   'step1_mongo_out': 450098},
  'Strategy_512660': {'capitals': 261,
   'monthly_report': 66,
   'orders': 130,
   'slog': 430056,
   'summary_report': 124,
   'yearly_report': 8},
  'test_for_mongo_engine': {'user': 2}},
 'msg': 'ok',
 'status': True}

很早前随便做的一版,看起来业务效果还是不错的。这部分内容,以后就不必放在mongo,在clickhouse里一个查询就好了。
在这里插入图片描述

仍然(在逻辑上)设置表的结构为:qtv200.market_data,需要的索引有:

  • 1 pid: 主键。这个是确定的主键,对后续的基础操作来说是必须的。
  • 2 UCS(shard、part、block、brick): 管理块级数据的键,在后续的块级任务来说非常重要。
  • 3 code: 业务筛选字段
  • 4 ts: 时间,排序字段

mongo方便之处就在于:当你的逻辑明确了,建立好索引,一切就好了

# 主键 pkey
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'pid')
# UCS
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'shard')
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'part')
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'block')
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'brick')
# 业务
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'code')
# 排序
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'ts')

Out[4]: {'data': {'ts_1': 'Not Existed and Created'}, 'msg': 'ok', 'status': True}

在这里插入图片描述

改造1:修改获取最大最小值的部分 etl_worker

变的简单了,不需要关心数据库里有什么,只要把当前有重复的pid去掉就可以了

...
    data_df1['pid'] = data_df1['code'].apply(str) + '_' + data_df1['ts'].apply(str)

    keep_cols1 = ['data_dt','open','close','high','low', 'vol','amt', 'brick','block','part', 'shard', 'code','ts', 'pid']
    data_df2 =data_df1[keep_cols1].drop_duplicates(['pid'])

    output_df = data_df2
    output_data_listofdict = output_df.to_dict(orient='records')
    output_data_listofdict2 = slice_list_by_batch2(output_data_listofdict, qm.batch_size)
    for some_data_listofdict in output_data_listofdict2:
        qm.parrallel_write_msg(target_stream_name, some_data_listofdict)

在脚本里做相应修改

#conda init
conda activate base

cd  /home/workers && python3 etl_worker_mongo.py

改造2:修改入库的部分 s2mongo

暂时先以脚本方式执行,不固化到接口中。

现在可以采用一些更好的方式来初始化队列。

from Basefuncs import * 
import logging
from logging.handlers import RotatingFileHandler
def get_logger(name , lpath = '/var/log/' ):
    logger = logging.getLogger(name)
    fpath = lpath + name + '.log'
    handler = RotatingFileHandler(fpath , maxBytes=100*1024*1024, backupCount=10)
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)
    return logger

logger = get_logger('etf_raw_data')

# IO
machine_host = '192.168.0.4'
source_redis_agent_host = f'http://{machine_host}:24118/'

stream_cfg = StreamCfg(q_max_len = 1000000, batch_size = 10000, redis_agent_host = source_redis_agent_host)
qm = QManager(**stream_cfg.dict())
# qm.info()

# analysis 
target_server = 'm4.24086'
target_w = from_pickle(target_server)
# machine_name = 'm4'
# machine_name = get_machine_name()
# # 在本地建立连接文件,避免每次都向mymeta请求数据。 随主机变化,这里有可能要修改(TryConnectionOnceAndForever)中关于mymeta的连接配置。
# try:
#     target_w = from_pickle(target_server)
#     color_print('【Loading target_w】from pickle')
# except:
#     w = WMongo('w')
#     target_w = w.TryConnectionOnceAndForever(server_name =target_server, current_machine_name = machine_name)
#     to_pickle(target_w, target_server)
# target_w.cname_recs()

# Name
ss_name = 'xxx'
t_tier1 = 'xxx'
t_tier2 = 'xxx'

keep_cols =['data_dt','open','close','high','low', 'vol','amt', 'brick','block','part', 'shard', 'code','ts', 'pid']
num_cols = ['open','close','high','low', 'vol','amt','ts']
# Process 
## 1 read source -- 这里本身也可以用pydantic 
ss_data_list = qm.xrange(ss_name)['data']
if len(ss_data_list):
    ss_data_df0 = pd.DataFrame(ss_data_list)
    msg_id_list = list(ss_data_df0['_msg_id'])
    ss_data_df = ss_data_df0[keep_cols].dropna()
    for the_col in num_cols:
        ss_data_df[the_col] = ss_data_df[the_col].apply(float)
    # 写入mongo
    resp = target_w.insert_or_update_with_key(tier1 = t_tier1, tier2 = t_tier2, data_listofdict = ss_data_df.to_dict(orient='records'), key_name ='pid')
    qm.xdel(ss_name,msg_id_list)
    logger.info(get_time_str1() + 'efl_s2mongo insert recs %s' % len(ss_data_df))
else:
    logger.info(get_time_str1() + 'efl_s2mongo insert not recs')

以上,规定了几部分。

  • 1 IO部分。队列和数据的handler现在通过pydantic的对象,可以非常简洁的定义。然后约定好入队列和目标数据库表的必要信息。
  • 2 处理。主要就是将需要保留的字段,以及需要转数值的字段明确。然后就是读取,保留,转换,插入,最后删除。

在测试中,就一次的数据反复插了几次,数据是不会重复的。

在这里插入图片描述
对应的日志可以看到一开始插入过n次,后面加入了定时任务,然后就转入运行了

└─ $ cat /var/log/etf_raw_data.log
2024-06-29 18:38:27efl_s2mongo insert recs 12
2024-06-29 18:40:53efl_s2mongo insert recs 12
2024-06-29 18:41:31efl_s2mongo insert recs 12
2024-06-29 18:41:42efl_s2mongo insert recs 12
2024-06-29 18:43:27efl_s2mongo insert recs 12
2024-06-29 18:44:31efl_s2mongo insert recs 12
2024-06-29 18:46:49efl_s2mongo insert recs 12
2024-06-29 18:47:01efl_s2mongo insert not recs
2024-06-29 18:47:01efl_s2mongo insert not recs
2024-06-29 18:47:31efl_s2mongo insert not recs
2024-06-29 18:48:01efl_s2mongo insert not recs
2024-06-29 18:48:31efl_s2mongo insert not recs
2024-06-29 18:49:01efl_s2mongo insert not recs
2024-06-29 18:49:02efl_s2mongo insert not recs
2024-06-29 18:49:31efl_s2mongo insert not recs
2024-06-29 18:49:32efl_s2mongo insert not recs
2024-06-29 18:50:01efl_s2mongo insert not recs
2024-06-29 18:50:02efl_s2mongo insert not recs
2024-06-29 18:50:31efl_s2mongo insert not recs
2024-06-29 18:50:32efl_s2mongo insert not recs
2024-06-29 18:51:01efl_s2mongo insert not recs
2024-06-29 18:51:02efl_s2mongo insert not recs
2024-06-29 18:51:31efl_s2mongo insert not recs
2024-06-29 18:51:32efl_s2mongo insert not recs
关于定时任务

我偷了个懒,就是把这脚本和etl脚本放在一起。这两个任务被绑在一起串行了。主要是懒的再去定一个定时任务。

└─ $ cat exe_qtv200_etl_worker.sh
#!/bin/bash

# 记录
# sh /home/test_exe.sh com_info_change_pattern running

# 有些情况需要把source替换为 .
# . /root/anaconda3/etc/profile.d/conda.sh
# 激活 base 环境(或你创建的特定环境)
source /root/miniconda3/etc/profile.d/conda.sh

#conda init
conda activate base

cd  /home/workers && python3 etl_worker_mongo.py
cd  /home/workers && python3 etf_raw_data_s2mongo.py

对于后续其他的etl,每一个还是应该另起一个任务,这样才能利用异步来确保多个etf的数据及时获取。

【调整完毕】

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1876980.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

代码随想录:链表

文章目录 代码随想录---链表链表基础(创建以及增删查改)设计链表 链表的反转[206. 反转链表](https://leetcode.cn/problems/reverse-linked-list/)递归法迭代法 删除链表倒数第N个结点[19. 删除链表的倒数第 N 个结点](https://leetcode.cn/problems/remove-nth-node-from-end…

3ds Max导出fbx贴图问题简单记录

1.前言 工作中发现3ds Max导出的fbx在其它软件(Autodesk viewer,blender,navisworks,FBXReview等)中丢失了部分贴图,但导出的fbx用3ds Max打开却正常显示。 fbx格式使用范围较广,很多常见的三…

基于MDEV的PCI设备虚拟化DEMO实现

利用周末时间做了一个MDEV虚拟化PCI设备的小试验&#xff0c;简单记录一下&#xff1a; DEMO架构&#xff0c;此图参考了内核文档&#xff1a;Documentation/driver-api/vfio-mediated-device.rst host kernel watchdog pci driver: #include <linux/init.h> #include …

【Java】面试必问之Java常见线上故障排查方案详解

一、问题解析 在软件开发过程中&#xff0c;排查和修复产线问题是每⼀位⼯程师都需要掌握的基本技能。但是在⽣产环境中&#xff0c; 程序代码、硬件、⽹络、协作软件等任⼀因素&#xff0c;都会引发意想不到的问题&#xff0c;所以排查产线问题⽐较困 难&#xff0c;所以问…

关于数据库的ACID几点

首先的话就是关于ACID&#xff0c;最重要的就是原子性了&#xff0c;这是基础。 原子性是指事务包含的所有操作&#xff0c;要么全部完成&#xff0c;要么全部不完成。如果不能保证原子性&#xff0c;可能会出现以下问题&#xff1a; 数据不一致&#xff1a;事务中的部分操作…

QT事件处理及实例(鼠标事件、键盘事件、事件过滤)

这篇文章通过鼠标事件、键盘事件和事件过滤的三个实例介绍事件处理的实现。 鼠标事件及实例 鼠标事件包括鼠标的移动、按下、松开、单击和双击等。 创建一个MouseEvent项目&#xff0c;通过项目介绍如何获得和处理鼠标事件。程序效果如下图所示。 界面布局代码如下&#xff…

【算法训练记录——Day36】

Day36——贪心Ⅳ 1.leetcode_452用最少数量的箭引爆气球2.leetcode_435无重叠区间3.leetcode_763划分字母区间4.leetcode_ 1.leetcode_452用最少数量的箭引爆气球 思路&#xff1a;看了眼题解&#xff0c;局部最优&#xff1a;当气球出现重叠&#xff0c;一起射&#xff0c;所用…

【工具推荐】Nuclei

文章目录 NucleiLinux安装方式Kali安装Windows安装 Nuclei Nuclei 是一款注重于可配置性、可扩展性和易用性的基于模板的快速漏洞验证工具。它使用 Go 语言开发&#xff0c;具有强大的可配置性、可扩展性&#xff0c;并且易于使用。Nuclei 的核心是利用模板&#xff08;表示为简…

多机调度问题

#include<iostream> #include<string> using namespace std; struct work {int time;int number; }; int setwork0(int m,int n,int a[],struct work w[]) {int maxtime0;for(int i1; i<m; i){cout<<i<<"号设备处理作业"<<w[i].num…

AD9026芯片开发实录5-ADRV9026 - FAQ

1. What information should I provide to help speed resolution of my issue?  Please provide as much detail as possible including all of the detail described in the table below 2. What are the key specifications of ADRV9026 chip?  The ADRV9026 is a 4…

kafka学习笔记08

Springboot项目整合spring-kafka依赖包配置 有这种方式&#xff0c;就是可以是把之前test里的配置在这写上&#xff0c;用Bean注解上。 现在来介绍第二种方式&#xff1a; 1.添加kafka依赖&#xff1a; 2.添加kafka配置方式: 编写代码发送消息&#xff1a; 测试&#xff1a; …

ROS2自定义接口Python实现机器人移动

1.创建机器人节点接口 cd chapt3_ws/ ros2 pkg create example_interfaces_rclpy --build-type ament_python --dependencies rclpy example_ros2_interfaces --destination-directory src --node-name example_interfaces_robot_02 --maintainer-name "Joe Chen" …

【STM32】在标准库中使用定时器

1.TIM简介 STM32F407系列控制器有2个高级控制定时器、10个通用定时器和2个基本定时器。通常情况下&#xff0c;先看定时器挂在哪个总线上APB1或者APB2&#xff0c;然后定时器时钟需要在此基础上乘以2。 2.标准库实现定时中断 #ifndef __BSP_TIMER_H #define __BSP_TIMER_H#if…

计算机基础知识——C基础+C指针+char类型

指针 这里讲的很细 https://blog.csdn.net/weixin_43624626/article/details/130715839 内存地址&#xff1a;内存中每个字节单位都有一个编号&#xff08;一般用十六进制表示&#xff09; 存储类型 数据类型 *指针变量名&#xff1b;int *p; //定义了一个指针变量p,指向的数…

自研Eclipse插件的生成及安装和使用

说明&#xff1a; 本处是使用个人自研的Eclipse插件为例&#xff0c;创建了一个菜单式的插件组&#xff0c;插件组下&#xff0c;有一个生成右击Jakarta EE服务端点类后&#xff0c;生成端点对应的Restful客户端。有什么问题&#xff0c;欢迎大家交流&#xff01;&#xff01;…

仓库管理系统11--物资设置

1、添加用户控件 <UserControl x:Class"West.StoreMgr.View.GoodsTypeView"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://schemas.microsoft.com/winfx/2006/xaml"xmlns:mc"http://schemas.openxm…

[小试牛刀-习题练]《计算机组成原理》之计算机系统概述【详解过程】

【计算机系统概述】 1、【冯诺伊曼结构】计算机中数据采用二进制编码表示&#xff0c;其主要原因是&#xff08;D&#xff09; I、二进制运算规则简单II、制造两个稳态的物理器件较为容易III、便于逻辑门电路实现算术运算 A.仅I、Ⅱ B.仅I、Ⅲ C.仅Ⅱ、Ⅲ D. I、Ⅱ、Ⅲ I…

redis 单节点数据如何平滑迁移到集群中

目的 如何把一个redis单节点的数据迁移到 redis集群中 方案&#xff1a; 使用命令redis-cli --cluster import 导入数据至集群 --cluster-from <arg>--cluster-from-user <arg> 数据源用户--cluster-from-pass <arg> 数据源密码--cluster-from-askpass--c…

Linux开发讲课20--- QSPI

SPI 是英语 Serial Peripheral interface 的缩写&#xff0c;顾名思义就是串行外围设备接口&#xff0c;一种高速的&#xff0c;全双工&#xff0c;同步的通信总线&#xff0c;并且在芯片的管脚上只占用四根线&#xff0c;节约了芯片的管脚&#xff0c;为 PCB 的布局上节省空间…

《SpringBoot+Vue》Chapter04 SpringBoot整合Web开发

返回JSON数据 默认实现 依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>在springboot web依赖中加入了jackson-databind作为JSON处理器 创建一个实体类对象…