Python 算法交易实验73 QTV200第二步: 数据清洗并写入ClickHouse

news2025/1/9 18:35:47

说明

先检查一下昨天启动的worker是否正常工作,然后做一些简单的清洗,存入clickhouse。

内容

1 检查数据

from Basefuncs import * 
# 将一般字符串转为UCS 名称
def dt_str2ucs_blockname(some_dt_str):
    some_dt_str1   =some_dt_str.replace('-','.').replace(' ','.').replace(':','.')
    return '.'.join(some_dt_str1.split('.')[:4])
'''
dt_str2ucs_blockname('2024-06-24 09:30:00')
'2024.06.24.09'
'''
# 测试队列声明
qm = QManager(redis_agent_host = 'http://192.168.0.4:xx/',redis_connection_hash = None,q_max_len= 1000000, batch_size=10000)
qm.info()
target_stream_name = 'xxx'
qm.stream_len(target_stream_name)
2804

获取数据(使用单worker,模式比较简单且性能足够)

data = qm.xrange(target_stream_name)['data']
data_df = pd.DataFrame(data)
keep_cols = ['rec_id', 'data_dt','open', 'close','high','low','vol', 'amt', 'data_source','code','market']
data_df1 = data_df[keep_cols].dropna().drop_duplicates(['rec_id'])

# 第一次操作,把之前无关的数据删掉
data_df1 = data_df1[data_df1['data_dt'] >='2024-06-24 00:00:00']

在这里插入图片描述
向clickhouse发起query,请求每个etf的最大时间,之后要使得新增的数据大于这个时间,另外目标表的字段形如
在这里插入图片描述
这是之前做的设计,因为隔的时间有点久都有点忘了。不过这个设计是合理的,后面会看到。

要做的转换也很简单:

  • 1 将时间字符转为时间戳
  • 2 从日期中分解出shard、part、block和brick

转换段

import time

data_df1['ts'] = data_df1['data_dt'].apply(inverse_time_str).apply(int)

data_df1['brick'] = data_df1['data_dt'].apply(dt_str2ucs_blockname)
data_df1['block'] =data_df1['brick'].apply(lambda x: x[:x.rfind('.')])
data_df1['part'] =data_df1['block'].apply(lambda x: x[:x.rfind('.')])
data_df1['shard'] =data_df1['part'].apply(lambda x: x[:x.rfind('.')])

data_df1['pid'] = data_df1['code'].apply(str) + '_' + data_df1['ts'].apply(str)

keep_cols1 = ['data_dt','open','close','high','low', 'vol','amt', 'brick','block','part', 'shard', 'code','ts', 'pid']
data_df2 =data_df1[keep_cols1]

在这里插入图片描述

今天就到这里吧,明晚接着写。

Go on …

昨天疏忽了,数据不应该直接存库,而是应该整理好之后送到队列。然后由默认的worker将数据搬到clickhouse.

2 存数规则

第二步的输入队列BUFF.xxxstream_in,输出队列BUFF.xxx.stream_out
第一次需要确保对应数据表的存在。clickhouse对数值的要求比较严格,为了避免麻烦,统一设置成Float32。(这样可以用统一的同步worker)。另外clickhouse不支持删除数据,这点倒是比较特别。
在这里插入图片描述
但可以支持全部删除数据(保留数据结构) TRUNCATE table market_data_v2

create_table_sql = '''
CREATE TABLE market_data_v2
(
    data_dt String,
    open Float32,
    close Float32,
    high Float32,
    low Float32,
    vol Float32,
    amt Float32,
    brick String,
    block String,
    part String,
    shard String,
    code String,
    ts Float32,
    pid String
)
ENGINE = MergeTree
ORDER BY (ts )
'''

click_para = gb.getx('sp_global.buffer.lan.xxx.xxx.para')
chc = CHClient(**click_para)
chc._exe_sql(create_table_sql)
chc._exe_sql('show tables')
[('market_data',), ('market_data_v2',)]

etl_worker.py

# 0 记录日志
import logging
from logging.handlers import RotatingFileHandler

logger = logging.getLogger('MyLogger')
handler = RotatingFileHandler('/var/log/workers.log', maxBytes=1024*1024*100, backupCount=5)
logger.addHandler(handler)
logger.setLevel(logging.INFO)

# ---------------------------------------- 设置日志

from Basefuncs import * 
def tuple_list2dict(tuple_list):
    """
    将包含三个元素的tuple列表转换为字典。
    
    参数:
    tuple_list (List[Tuple[K, V1, V2]]): 包含键和两个值的tuple的列表。
    
    返回:
    Dict[K, Tuple[V1, V2]]: 转换后的字典,其中值是包含两个元素的tuple。
    """
    return {key:value1 for key, value1 in tuple_list}

# 将一般字符串转为UCS 名称
def dt_str2ucs_blockname(some_dt_str):
    some_dt_str1   =some_dt_str.replace('-','.').replace(' ','.').replace(':','.')
    return '.'.join(some_dt_str1.split('.')[:4])
'''
dt_str2ucs_blockname('2024-06-24 09:30:00')
'2024.06.24.09'
'''
# ---------------------------------------- 基本函数

# 测试队列声明
qm = QManager(redis_agent_host = 'http://192.168.0.4:xx/',redis_connection_hash = None,q_max_len= 1000000, batch_size=10000)
qm.info()
source_stream_name ='stream_in'
target_stream_name ='stream_out'
source_stream_len =  qm.stream_len(source_stream_name)
target_stream_len = qm.stream_len(target_stream_name)
print('source',source_stream_len)
print('target', target_stream_len)
# qm.ensure_group(target_stream_name)
cur_dt_str = get_time_str1()
if source_stream_len:
    is_source_recs = True
else:
    is_source_recs = False
    logger.info('%s %s source No Recs' %(cur_dt_str,'etl_worker'))
# 获取数据(使用单worker,模式比较简单且性能足够)

# ---------------------------------------- 队列取数,有数据才执行下面
if is_source_recs:

	
	# ---------------------------------------- 取数,取出消息列表和需要的列
    # worker 30 秒启动一次
    data = qm.xrange(source_stream_name)['data']

    data_df = pd.DataFrame(data)
    msg_id_list = list(data_df['_msg_id'])
    keep_cols = ['rec_id', 'data_dt','open', 'close','high','low','vol', 'amt', 'data_source','code','market']
    data_df1 = data_df[keep_cols].dropna().drop_duplicates(['rec_id'])

    # 第一次操作,把之前无关的数据删掉
    # data_df1 = data_df1[data_df1['data_dt'] >='2024-06-24 00:00:00']

    import time

    data_df1['ts'] = data_df1['data_dt'].apply(inverse_time_str).apply(int)

    data_df1['brick'] = data_df1['data_dt'].apply(dt_str2ucs_blockname)
    data_df1['block'] =data_df1['brick'].apply(lambda x: x[:x.rfind('.')])
    data_df1['part'] =data_df1['block'].apply(lambda x: x[:x.rfind('.')])
    data_df1['shard'] =data_df1['part'].apply(lambda x: x[:x.rfind('.')])

    data_df1['pid'] = data_df1['code'].apply(str) + '_' + data_df1['ts'].apply(str)

    keep_cols1 = ['data_dt','open','close','high','low', 'vol','amt', 'brick','block','part', 'shard', 'code','ts', 'pid']
    data_df2 =data_df1[keep_cols1]
	
	# ------------------------------------- 获取当前数据库已有的数据

    # 获取各code最大值
    click_para = {'database': 'xx',
        'host': '192.168.0.4',
        'name': 'xx',
        'password': 'xx',
        'port': xxx,
        'user': 'xx'}
    chc = CHClient(**click_para)

    '''
    这个 SQL 语句的作用是按照 `code` 分组,并为每个 `code` 找到对应的最新日期(`data_dt`),这个最新日期是基于 `ts` 字段的最大值来确定的。`argMax` 函数在这里用于找到每个分组中 `ts` 值最大时对应的 `data_dt` 值。

    具体来说,`argMax(data_dt, ts)` 会返回每个 `code` 分组中使得 `ts` 达到最大值的 `data_dt` 值。这意味着对于每个 `code`,查询会找到 `ts` 字段的最大值,并返回对应的 `data_dt` 值,即每个 `code` 的最新数据日期。

    最终,这个查询会返回一个结果集,其中包含每个 `code` 以及对应的最新数据日期(`last_data_dt`)。这对于分析每个代码的最新市场数据非常有用。
    '''

    latest_sql = '''
    SELECT
        code,
        argMax(data_dt, ts) AS last_data_dt
    FROM
        market_data_v2
    GROUP BY
        code
    '''
    # 更新时
    latest_date_tuple_list = chc._exe_sql(latest_sql)
    latest_date_dict = tuple_list2dict(latest_date_tuple_list)
	# ------------------------------------- 使用时间进行过滤
    # 筛选新数据
    data_df2['existed_dt'] = data_df2['code'].map(latest_date_dict).fillna('')

    output_sel = data_df2['data_dt'] > data_df2['existed_dt']
    output_df = data_df2[output_sel][keep_cols1]
    output_data_listofdict = output_df.to_dict(orient='records')
    output_data_listofdict2 = slice_list_by_batch2(output_data_listofdict, qm.batch_size)
    for some_data_listofdict in output_data_listofdict2:
        qm.parrallel_write_msg(target_stream_name, some_data_listofdict)

    del_msg = qm.xdel(source_stream_name, msg_id_list)
    logger.info('%s %s del source %s Recs' %(cur_dt_str,'etl_worker',del_msg['data'] ))

将该脚本发布为任务,30秒执行一次同步。

exe_qtv200_etl_worker.sh

#!/bin/bash

# 记录
# sh /home/test_exe.sh com_info_change_pattern running

# 有些情况需要把source替换为 .
# . /root/anaconda3/etc/profile.d/conda.sh
# 激活 base 环境(或你创建的特定环境)
source /root/miniconda3/etc/profile.d/conda.sh

#conda init
conda activate base

cd  /home/workers && python3 etl_worker.py

存数成功,后续就自动运行了。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1870780.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

简单粗暴-安装detectron2

文章目录 一、下载detectron2二、修改文件三、安装detectron2参考文献: 其他配置版本: torch1.12.1CUDA:11.6python:3.9 建议以上配置,其他配置未测试,如果CUDA版本不匹配的话,可以多配置不同版…

达梦数据库的系统视图v$locked_object

达梦数据库的系统视图v$locked_object 在达梦数据库(Dameng Database)中,V$LOCKED_OBJECT 视图提供了与数据库中被锁定对象相关的信息。这通常用于监控和诊断数据库中的锁定问题,帮助管理员了解哪些对象被锁定了,以及…

PyCharm2024 for mac Python编辑开发

Mac分享吧 文章目录 效果一、下载软件二、开始安装1、双击运行软件(适合自己的M芯片版或Intel芯片版),将其从左侧拖入右侧文件夹中,等待安装完毕2、应用程序显示软件图标,表示安装成功3、打开访达,点击【文…

自动建立用户练习

一丶编辑文本存放用户名 vim userlist 二丶编辑文本存放需要创建用户的密码 vim passlist 三丶编辑脚本 vim create_user.sh #!bin/bash [ "$#" -lt "2" ] && { #echo error please input userlist anpassli…

postgre事务id用完后,如何解决这个问题

在PG中事务年龄不能超过2^31 (2的31次方2,147,483,648),如果超过了,这条数据就会丢失。 PG中不允许这种情况出现,当事务的年龄离2^31还有1千万的时候,数据库的日志中就会 有如下告警: warning:…

【第一周】认识小程序

目录 认识小程序发展历史发展前景发展优势个人企业/创业 账号申请开发工具下载流程使用说明 协作项目交流收益渠道 认识小程序 发展历史 微信小程序自2016年首次提出以来,经历了快速的发展和完善过程,以下是其主要发展历史节点: 2016年1月…

监控员工电脑的软件有哪些?6款企业必备的电脑监控软件

监控员工电脑的软件在企业管理和网络安全领域扮演着重要角色,它们可以帮助企业提高工作效率,确保数据安全,以及合规性。以下是六款知名的员工电脑监控软件: 1.安企神 - 一个全面的企业级电脑监控和管理解决方案。 2.Work Examine…

从单点到全景:视频汇聚/安防监控EasyCVR全景视频监控技术的演进之路

在当今日新月异的科技浪潮中,安防监控领域的技术发展日新月异,全景摄像机便是这一领域的杰出代表。它以其独特的360度无死角监控能力,为各行各业提供了前所未有的安全保障,成为现代安防体系中的重要组成部分。 一、全景摄像机的技…

个人支付系统实现

基础首页: 订单: 智能售卡系统 基于webmanworkerman开发 禁用函数检查 使用这个脚本检查是否有禁用函数。命令行运行curl -Ss https://www.workerman.net/check | php 如果有提示Function 函数名 may be disabled. Please check disable_functions in …

仓库管理系统13--物资设置

1、添加窗体 2、设计UI界面 注意这个下拉框的绑定&#xff0c;你看到的选项是由displaymember决定&#xff0c;当你选择了哪个选项时&#xff0c;后台绑定这个选项的ID <UserControl x:Class"West.StoreMgr.View.GoodsView"xmlns"http://schemas.microsoft…

群晖系统百度网盘套件卸载之后无法再次安装 ContainerManager项目无法删除

前言 最近重新组了个NAS&#xff0c;在套件迁移的时候遇到个头疼的问题。在用矿神的百度网盘在迁移的时候出错了&#xff0c;于是我自己删掉baiduapp得容器和镜像然后卸载套件。不知道中间出了啥问题&#xff0c;套件是已经卸载了&#xff0c;但是群晖ContainerManager套件中的…

前端实现 海浪(波浪)进度条效果(支持自定义长度;调节速度,2s缓冲结束)

实现海浪进度条 文章目录 实现海浪进度条效果图如下(投入使用的版本)背景和过程一、调试和探索过程(下面都会给出来对应代码)二、类似Element-plus的进度条样式1. CSS的样式如下2. HTML结构如下 二、电涌效果的进度条如下1. CSS的样式如下2. HTML的结构如下:3. JavaScript代码如…

C# 异步编程详解(Task,async/await)

文章目录 1.什么是异步2.Task 产生背景3.Thread(线程) 和 Task(异步)的区别3.1 几个名词3.2 Thread 与 Task 的区别 4.Task API4.1 创建和启动任务4.2 Task 等待、延续和组合4.3 task.Result4.4 Task.Delay() 和 Thread.Sleep() 区别 5.CancellationToken 和 CancellationToken…

基于模型蒸馏的模型加速方案总结

1.简介 1.1目的 在过去的一段时间里&#xff0c;对基于模型蒸馏技术的模型加速方案的方法在多个数据集上进行了一系列的实验。所谓的模型蒸馏技术&#xff0c;简单的来说就是利用一个设计简单的小网络去学习一个设计比较复杂的大网络。特别的有&#xff0c;本次实验针对每一个…

计算机图形学入门18:阴影映射

1.前言 前面几篇关于光栅化的文章中介绍了如何计算物体表面的光照&#xff0c;但是着色并不会进行阴影的计算&#xff0c;阴影需要单独进行处理&#xff0c;目前最常用的阴影计算技术之一就是Shadow Mapping技术&#xff0c;也就是俗称的阴影映射技术。 2.阴影映射 Shadow Map…

压缩pdf文件大小的方法,如何压缩pdf格式的大小

pdf太大怎么压缩&#xff1f;当你需要通过电子邮件发送一个PDF文件&#xff0c;却发现文件太大无法成功发出时&#xff0c;这些情况下&#xff0c;我们都需要找到一种方法来压缩PDF文件&#xff0c;以便更便捷地进行分享和传输。PDF文件的大小通常与其中包含的图片、图形和文本…

【智能算法应用】麻雀搜索算法在物流配送中心选址的应用(无待选配送中心)

目录 1.算法原理2.数学模型3.结果展示4.参考文献5.代码获取 1.算法原理 【智能算法】麻雀搜索算法&#xff08;SSA&#xff09;原理及实现 2.数学模型 模型假设 待定物流配送中心的库存总能满足需求点的需求不考虑从工厂到待定物流配送中心的运输成本不考虑选定区域内待确定…

openinstall拥抱鸿蒙生态,SDK全面适配HarmonyOS NEXT

作为国内领先的App渠道统计与深度链接服务商&#xff0c;openinstall持续推动鸿蒙生态建设&#xff0c;近日正式发布openinstall HarmonyOS SDK&#xff0c;并成功入驻鸿蒙生态伙伴SDK专区&#xff0c;成为华为鸿蒙生态的合作伙伴&#xff0c;为鸿蒙应用开发者带来安全合规、高…

可用的搜索引擎

presearchhttps://presearch.com/yandexhttps://ya.ru

MySQL简介:开源数据库的基石(一)

目录 引言&#xff1a;数据库领域的革新者 一、MySQL的发展历程&#xff1a;从开源先锋到行业领袖 二、MySQL的核心特性&#xff1a;性能、安全与灵活性并重 三、MySQL的应用场景&#xff1a;从Web开发到企业级应用的全面覆盖 四、MySQL在开源数据库中的地位&#xff1a;开…