时序数据的内存服务

news2025/1/22 13:09:24

说明

既要坚定锻炼成熟架构的道路,也要在合理的范围内重塑设计

计算时序数据的特征,少不了“Rolling”类的操作。过去,直接采用pandas进行rolling,效率很不错,但是在实战应用时不太行。

反思下来:离线的操作拓展困难,很多都是一次性的

后来开发了ADBS,通过Mongo和Redis,在数据的持久化和吞吐上是没问题的。但是,对于全量的历史回滚计算遇到了问题。

ADBS的架构将问题简化到了一个时隙,大大简化了逻辑,开发完成也即生产完成,这点很好;但是在大量的全量计算中,Mongo库还是碰到了“大频次”访问的问题。目前我关注的时隙大约是60万,也就是说各Worker会频繁的发出数据查询请求。最终优化下来,处理的时间大约要1.5天。实时计算方面倒没什么问题,(每分钟计算300时隙 – 这不就是 TPS = 5吗?)

之前纠结的点在于使用硬件去堆高能力,而不去做设计上优化。但今天转念一想,不能轴啊,毕竟可以用很小的代价来实现的。

内容

1 问题本质

可使用内存服务的点在于:

  • 1 数据不算很大,60万行数据,甚至占不了几百兆(内存装得下)
  • 2 查询的条件很简单,就是其实和终止时隙(不需要额外的算法开销)
  • 3 数据重复被取的次数会非常多(1个时隙可能会重复取数千次)
  • 4 应用面广(假设会经常的增改新的指标)
  • 5 几乎没有修改成本(只要在worker的取数环节改为向这个服务取数即可),可能需要给worker一个参数让它切换

TPS = 5 ,那么一天计算约40万次计算;因为我把时隙定在分钟,所以应该是TPM=300。

从吞吐的指标上看,至少要能达到TPS = 100或者 TPM = 6000这个水平。这样处理速度快20倍,不到1个小时就能跑完全量。

2 方案

构造一个服务,服务的主要目的是将这种滚动式的数据查询从数据库中挪到内存中

服务的构造上,在这个场景用Tornado是最合适的,不过我不想花时间去搞;还是要用Flask搭建,未来有时间了,还是应该把Tornado的模板搞一搞,又轻又快。

服务能够提速的原因在于:

  • 1 读写只在内存,比硬盘能快几十倍
  • 2 不需要数据库执行通用的条件筛选(可能从百毫秒级别,降到毫秒或者微秒级)

顺带的能稍微节约一下硬盘的耐久度消耗(虽然有时候我也暗搓搓的希望硬盘用坏了可以买新的,哈哈)。

服务最大的开销可能就是将结果进行序列化,然后在网络(局网)中传输了。

其实如果是非滚动的计算,当前的架构是完全可以满足的,所以新的服务应该只是这个特殊场景的补充,用最小化的功能来进行设计。

  • 之所以不使用redis,是因为还要进行筛选(不仅仅是kv),没那么方便

2.1 取数接口

按 random_seed ,从某个库拖取区间数据

接口的入参需要指定:

  • 1 数据库服务名称:服务会自动进行合适的连接,然后将表中的所有数据下载

接口服务在拉取数据后存入服务的内存变量(字典):

  • 1 键值即为数据库服务名称
  • 2 返回数据的条数
  • 3 返回数据对象(df)所占的内存空间

在不使用时,通过重启容器来释放内存

2.2 返数接口

和原来查表同样的入参

一般来说,通过一组变量作为联合主键来进行数据的筛选,然后通过时隙的比较来进行取数,之后将结果作为listofdict返回。

3 实施一

采用了Tornado做网络服务,的确是比Flask强很多。

单核情况下,T的处理大约是120ms~150ms, F的处理 一般在200ms。所以T的TPS是7,而F的TPS是5。

在概念上,我有两个认知盲点或者错误:

  • 1 这个取数不是IO密集,是CPU密集操作
  • 2 进程间数据不共享

取数的时候,如果直接从Mongo取,那么需要数据库进行查询,这个会耗用CPU;当采用内存处理时,从python的对象转为json传输,也要耗用CPU。可能后者的耗用还会大一些。

因为现在都是在使用MA进行数据操作,MA为了标准化操作,也会进行转换,所以我觉得效率都消耗在这上面。

速度的确提高了很多,本以为这就是这个故事的结果,没想到又反转了。

实施一的问题

tornado在多核运行时出现了内存暴增的问题。我估计在多核状态下,可能还会有很多进程会被随机创建,然后每个进程会去获取静态数据(是的,肯定不是十个核拿十次,而是有多少进程多少次)。所以多核的时候,内存方案会有问题。我想以前碰到的一些(其他人)的工程上内存爆掉的问题,可能有一部分是类似这样的。python因为天生是单进程的,所以不太会体会到。

我觉得,如果要多核操作,一定还是要面向数据库的,还是转向redis找解决方案

4 实施二

擦,突然发现其实我已经有了答案,灯下黑啊。

我现在往Redis的Stream不就是存取数据,然后分发执行吗?效率很高啊。只是有一个地方需要改变,就是消息的id用我的时隙号替代就可以了…

甚至再往前一步,所有的实时数据可以一直往这个队列加啊,之前都是什么鬼方案,哈哈。

在python里使用redis操作,原来我就是没有使用id这个关键字。
在这里插入图片描述

所以只要在现有的redis agent里加一个接口就行(10分钟都不用),完全不影响其他的操作。这样,就把这个问题重新简化为了IO问题。之前也看过,这60多万的内存占用应该也就200多M。

要点

  • 1 在RedisAgent中添加一个接口,可以方便的生成一个队列(主要是为了方便)
  • 2 在Worker中不通过Agent,直接访问Redis数据库

将mongo中的数据都搬到redis(60多万条数据,耗时7分钟左右)


100%|██████████| 115/115 [06:49<00:00,  3.57s/it]

from configs_base import redis_agent_host,project_name, worker01_config,cur_w

tier1 = 'MyQuantBaseStep1Signals'
tier2 = 'step1_mongo_in'


min_slot = cur_w.minmax(tier1 = tier1, tier2 = tier2, minmax='min', attrname='data_slot')['data']
max_slot = cur_w.minmax(tier1 = tier1, tier2 = tier2, minmax='max', attrname='data_slot')['data']
print(min_slot)
print(max_slot)

slot_tuple_list = slice_list_by_batch1(int(min_slot),int(max_slot)+1,50000)

redis_q_name = 'BUFF.%s.%s' %(tier1, tier2)
import tqdm 
for tem_tuple in tqdm.tqdm(slot_tuple_list):
    min_time_slot, max_time_slot = tem_tuple
    recs = cur_w.query_recs(tier1 = tier1, tier2 = tier2,
        filter_dict = {'$and':[{'market':market,
            'code':code, 'data_slot':{'$gte':min_time_slot ,'$lt':max_time_slot}}]},
            limits= (max_time_slot - min_time_slot) * 10,silent=True
            )['data']
    recs_df = pd.DataFrame(recs)
    recs_df['_data_source_ranking'] = recs_df['data_source'].map(data_source_ranking).fillna(999)
    recs_df1 = recs_df.sort_values(['_data_source_ranking']).drop_duplicates(['market','code','data_slot'])


    tem_msg_id_list
    keep_cols = [x for x in recs_df1.columns if not x.startswith('_')]
    recs_df1 = recs_df1.sort_values(['data_slot'])
    tem_msg_dict_list = recs_df1[keep_cols].to_dict(orient='records')
    tem_msg_id_list = list(recs_df1['data_slot'].apply(int))

    req.post(redis_agent_host + 'batch_add_msg_with_id/',
            json={'stream_name':redis_q_name,
                    'msg_dict_list':tem_msg_dict_list, 
                    'msg_id_list': tem_msg_id_list,
                'maxlen':10000000}).json()

Worker从Redis取数

import redis
lq = redis.Redis()

def flat_kvlist2dict(some_list):
    tem_dict = {}
    for i in range(0,len(some_list),2):
        k = some_list[i]
        v = some_list[i+1]
        tem_dict[k] = v
    return tem_dict

%%timeit 
stream_name = redis_q_name
start_id = 28001000
end_id = 28023010
count = 100000
cmd = 'xrange %s %s %s count %s' %(stream_name,start_id,end_id,count)
print(cmd)

res = lq.execute_command(cmd)
res_df = pd.DataFrame(res)
msg_body_df = pd.DataFrame(res_df[1].apply(flat_kvlist2dict).tolist())

每次的取数大约20多毫秒(TPS ~ 50),看起来不再需要考虑序列化的问题(一来一去,每次全量减少120万次的json序列化工作)
在这里插入图片描述

再之后修改Worker的取数方式就好了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/434273.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux --- 软件安装、项目部署

一、软件安装 1.1、软件安装方式 在Linux系统中&#xff0c;安装软件的方式主要有四种&#xff0c;这四种安装方式的特点如下&#xff1a; 1.2、安装JDK 上述我们介绍了Linux系统软件安装的四种形式&#xff0c;接下来我们就通过第一种(二进制发布包)形式来安装 JDK。 JDK…

文案优化技巧,批量文案改写工具

在当今竞争激烈的市场中&#xff0c;一篇优秀的文案可以吸引更多的潜在客户&#xff0c;提高转化率&#xff0c;带来更多的收益。然而&#xff0c;写出优秀的文案有时是一项具有挑战性的任务。许多人不得不花费大量的时间和精力来编辑和重写它们&#xff0c;这不仅耗时费力&…

具有柔性结构的孤岛直流微电网的分级控制(Malab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️❤️&#x1f4a5;&#x1f4a5;&#x1f4a5; &#x1f4cb;&#x1f4cb;&#x1f4cb;本文目录如下&#xff1a;⛳️⛳️⛳️ 目录 1 概述 2 数学…

PICO 4 Pro:加入眼动和面部追踪,VR体验乐趣加倍

VR产品的体验在最近几年得到长足的进展&#xff0c;其中有几个重要的关键点。2019-2020年&#xff0c;VR一体机超越PC VR成为主流&#xff0c;便携性和综合体验做到了极佳的均衡。到了2022年&#xff0c;Pancake光学、彩色VST透视、眼动追踪、面部追踪等技术开始落地&#xff0…

MB510 3BSE002540R1在机器视觉工业领域最基本的应用

​ MB510 3BSE002540R1在机器视觉工业领域最基本的应用 大家都说人类感知外界信息的80%是通过眼睛获得的&#xff0c;图像包含的信息量是最巨大的。那么机器视觉技术的出现&#xff0c;就是为机器设备安上了感知外界的眼睛&#xff0c;使机器具有像人一样的视觉功能&#xff0c…

京东淘宝天猫户外服饰行业数据分析(电商数据查询软件)

户外运动越来越火&#xff0c;甚至还形成了一种独有的穿衣风格——“户外穿搭风”。 冲锋衣、工装裤、工装裙、口袋马甲、渔夫帽等都是这两年在这种户外穿搭风潮席卷之下爆红的产物。无论是在京东还是淘宝天猫&#xff0c;这类服饰的销售表现都比较出色。 京东数据&#xff1a;…

Spark大数据处理学习笔记(3.1)掌握RDD的创建

该文章主要为完成实训任务&#xff0c;详细实现过程及结果见【http://t.csdn.cn/oT0of】 文章目录 一、准备工作1.1 准备文件1.1.1 准备本地系统文件1.1.2 启动HDFS服务1.1.3 上传文件到HDFS 1.2 启动Spark Shell1.2.1 启动Spark服务1.2.2 启动Spark Shell 二、创建RDD2.1 通过…

Java语言背景介绍 及 语言跨平台原理

01_Java语言背景介绍 Java语言的三个版本&#xff1a; ●Java SE ● Java ME ●Java EE Java SE&#xff1a; Java语言的&#xff08;标准版&#xff09;&#xff0c;用于桌面应用的开发&#xff0c;是其他两个版本的基础。 桌面应用&#xff1a;GUI程序&#xff0c;是采…

科普丨关于 A/B 测试的十问十答

你想知道的&#xff0c;都在这里&#xff01;本文是神策数据「十问十答」科普系列文章的第二期&#xff0c;围绕 A/B 测试展开。 1 Q&#xff1a;什么是 A/B 测试&#xff1f; A&#xff1a;A/B 测试作为互联网企业的核心增长手段之一&#xff0c;其价值已在实际应用中被多次验…

ERTEC200P-2 PROFINET设备完全开发手册(6-2)

6.2 诊断与报警实验 首先确认固件为 App1_STANDARD, 将宏定义改为&#xff1a; #define EXAMPL_DEV_CONFIG_VERSION 1 参照第6节的内容&#xff0c;编译和调试固件&#xff0c;并在TIA Portal 中建立RT项目。启动固件后&#xff0c;TIA Portal 切换到在线&#xff0c;可以看…

springboot+vue人职匹配推荐系统(源码+文档)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的人职匹配推荐系统。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 目前有各类成品java毕设&#xff0c;需要请看文…

《离散数学导学》精炼——第9章(函数)

学习是一个长久而艰苦的过程&#xff0c;但不学习则更艰苦。 文章目录 引言正文函数的定义全函数覆盖运算函数的性质&#xff08;重点&#xff09;单射满射双射 递归函数良定义 引言 笔者一直觉得在计算机这一学科的学习中&#xff0c;离散数学是极为重要的知识基础。离散化的…

node中模块化

目录 模块化概念模块化规范Node.js 中模块的分类加载模块 模块作用域module对象module.exports 对象exports 对象 Node.js中的模块化规范 模块化概念 模块化是指解决一个复杂问题时&#xff0c;自顶向下逐层把系统划分成若干模块的过程。对于整个系统来说&#xff0c;模块是可…

SpringBoot的Filter过滤器结合JWT实现登录验证

概念&#xff1a;Filter 过滤器&#xff0c;是 JavaWeb 三大组件(Servlet、Filter、Listener)之一。 过滤器可以把对资源的请求拦截下来&#xff0c;从而实现一些特殊的功能。 过滤器一般完成一些通用的操作&#xff0c;比如&#xff1a;登录校验、统一编码处理、敏感字符处理等…

看完这篇文章你就彻底懂啦{保姆级讲解}-----(LeetCode刷题27移除元素) 2023.4.18

目录 前言算法题&#xff08;LeetCode 27.移除元素&#xff09;—&#xff08;保姆级别讲解&#xff09;分析题目算法思想&#xff08;重要&#xff09;暴力解法代码&#xff1a;双指针法&#xff08;快慢指针法&#xff09;代码&#xff1a;反思 结束语 前言 本文章一部分内容…

UDP - C/S模型

由于UDP不需要维护连接&#xff0c;程序逻辑简单了很多&#xff0c;但是UDP协议是不可靠的&#xff0c;保证通讯可靠性的机制需要在应用层实现。 通信函数 ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,struct sockaddr *src_addr, socklen_t *addrlen); 参…

无人机动力系统优化测试平台-15kg级-Flight Stand 15

产品简介 通过Flight Stand 15测试台对电机和螺旋桨的拉力&#xff0c;扭矩&#xff0c;转速&#xff0c;电流&#xff0c;电压&#xff0c;温度&#xff0c;空速&#xff0c;螺旋桨效率和电机效率的测量帮助您精准地描述和评估其性能参数&#xff0c;这是我们五年多来的无人机…

[JAVASE]初识Java:数据类型与变量

CSDN的各位友友们你们好,今天千泽为大家带来的是 [JAVASE]初识Java&#xff1a;数据类型与变量、运算符, 接下来让我们一起了解一下吧! 如果对您有帮助的话希望能够得到您的支持和关注,我会持续更新 数据类型与变量 数据类型 在Java中数据类型分为基本数据类型与引用数据类…

15.2 矩阵链乘法

1.代码 public class MatrixChainMultiplication {public static void main(String[] args) { // 在该代码中&#xff0c;我们首先创建了两个n * n的矩阵m和s&#xff0c;分别用于记录最优值和分割点。 其中m 矩阵 通过i j 来显示在i到j的矩阵链中最优解 // // …

JavaSE/异常

博客制作不易&#xff0c;欢迎各位点赞&#x1f44d;收藏⭐关注 前言 在使用Java编写代码时&#xff0c;我们难免会遇到数组越界、运行超时、栈溢出等异常问题。所以如果我们熟练掌握异常的定义和使用&#xff0c;这对我们学习Java有很大的帮助。 一、异常的定义 程序执行过程…