Python 算法交易实验81 QTV200日常推进-重新实验SMA/EMA/RSI

news2024/12/28 4:50:52

说明

本次实验考虑两个点:

  • 1 按照上一篇谈到的业务目标进行反推,有针对性的寻找策略
  • 2 worker增加计算的指标,重新计算之前的实验

内容

工具方面,感觉rabbitmq还是太慢了。看了下,rabbitmq主要还是面向可靠和灵活路由的。目前我的需求虽然不是很需要速度,但是这吞吐太低了实在难受。特别是从结果队列里取数,一条一条的。

终于可以接着继续写了,这周还是很忙,不过成果不错。比较开心小伙伴帮我搞定了kafka,我这个java绝缘体真的是只要搞java就碰到坑。在kafka的过程中,对性能比较关注,实测下来发现json序列化的开销实在是很大。在消息的传输过程中,为了通用性,我对数据都进行了json传输,时间占比在整体80%以上。

与实验相关的是:这次打算开多个worker快速跑数,一个本能是想利用缓存–Redis。后来一试发现比clickhouse慢了一个量级,前后一想就对上了—序列化太费了。

如果数据是结构化的,尽量直接使用结构化数据库

另外正好最近在转向ORM方式,突然发现实验要保存的数据通过这种方式更加合适。

1 结果对象

先定义结果的数据模型,然后使用mysql进行全局存储。

以下按照之前的计划,生成了短、中、长期指标。

from Basefuncs import * 

from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Index
from sqlalchemy.orm import sessionmaker,declarative_base
from datetime import datetime
import shortuuid
def get_shortuuid(charset=None):
    """
    生成一个简洁的唯一标识符(短 UUID)。

    参数:
    charset (str, optional): 自定义的字符集。如果未提供,将使用默认字符集。

    返回:
    str: 生成的短 UUID。
    """
    if charset:
        su = shortuuid.ShortUUID(charset=charset)
        return su.uuid()
    else:
        return shortuuid.uuid()

m7_24013_url = f"mysql+pymysql://USER:PASSWD@IP:PORT/mydb"

# from urllib.parse import quote_plus
# the_passed = quote_plus('!@#*')
# # 创建数据库引擎
m7_engine = create_engine(m7_24013_url)

# 创建基类
Base = declarative_base()

# 定义数据类型
class SMAEMABacktest(Base):
    __tablename__  = 'smaema_backtest'
    id = Column(Integer, primary_key=True,autoincrement=True)
    short_uuid = Column(String(50), default = lambda: get_shortuuid())
    strategy_name = Column(String(50))
    long_t = Column(Integer)
    short_t = Column(Integer)

    deals = Column(Integer)
    # 交易时间
    median_trade_days = Column(Integer)

    # 单利润率
    total_mean_npr = Column(Float)
    last3_mean_npr = Column(Float)
    last1_mean_npr = Column(Float)
    # 盈亏比
    total_win_loss_ratio = Column(Float)
    last3_win_loss_ratio = Column(Float)
    last1_win_loss_ratio = Column(Float)
    # 胜率
    total_win_rate = Column(Float)
    last3_win_rate = Column(Float)
    last1_win_rate = Column(Float)  

    get_data_duration = Column(Float)
    process_data_duration = Column(Float)
    create_time = Column(DateTime, default=lambda: datetime.now())
    
    #def __init__(self,**kwargs):
        # self.short_uuid = get_shortuuid()
        # for key, value in kwargs.items():
        #     if key in ['create_time'] and isinstance(value, str):
        #         value = datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
        #     setattr(self, key, value)

# 创建表
Base.metadata.create_all(m7_engine)

开始读取数据, 70万条数据1.15秒从数据库取出(如果用redis我记得大约是8秒)

short_t = 100
long_t = 200

# sma
sma_result = SMAEMABacktest(strategy_name ='sma', long_t = long_t, short_t = short_t)
# ema 
ema_result = SMAEMABacktest(strategy_name ='ema', long_t = long_t, short_t = short_t)

ch_cfg = CHCfg(database=clickhouse_db, host = clickhouse_ip )
chc =  CHClient(**ch_cfg.dict())

query_sql = f'select {clickhouse_select_cols_str} from {clickhouse_table_name}'
tick1 = time.time()
data = chc._exe_sql(query_sql)
tick2 = time.time()
print('getting data %.2f' % (tick2 - tick1 ))
getting data 1.15

主程序如下: 当worker开始执行时去队列取数,然后把结果写到数据库里。

大体上是这样的效果:
在这里插入图片描述

2 处理逻辑

def process(input_smaema = None, Session = Session):
    gfgo_lite_server = input_smaema.gfgo_lite_server
    short_t = input_smaema.short_t
    long_t = input_smaema.long_t
    clickhouse_db = input_smaema.clickhouse_db
    clickhouse_ip = input_smaema.clickhouse_ip
    clickhouse_table_name = input_smaema.clickhouse_table_name
    clickhouse_select_cols = input_smaema.clickhouse_select_cols
    clickhouse_select_cols_str = input_smaema.clickhouse_select_cols_str

    ucs = UCS(gfgo_lite_server =gfgo_lite_server)
    # Base对象不是pydantic允许属性为空
    # sma
    sma_result = SMAEMABacktest(strategy_name ='sma', long_t = long_t, short_t = short_t)
    # ema 
    ema_result = SMAEMABacktest(strategy_name ='ema', long_t = long_t, short_t = short_t)

    ch_cfg = CHCfg(database=clickhouse_db, host = clickhouse_ip )
    chc =  CHClient(**ch_cfg.dict())

    query_sql = f'select {clickhouse_select_cols_str} from {clickhouse_table_name}'
    tick1 = time.time()
    data = chc._exe_sql(query_sql)
    tick2 = time.time()
    print('getting data %.2f' % (tick2 - tick1 ))

    t1 = short_t
    t2 = long_t

    df = pd.DataFrame(data, columns = ['shard','part','block','brick','code','amt','close','high','low',
    'open', 'data_dt', 'pid', 'ts','vol'])

    df1 = df.sort_values(['data_dt'])
    '''
    bfill 是 "backward fill" 的缩写,是一种处理缺失值的方法。在数据处理中,当数据序列中存在缺失值(NaN,即 "Not a Number")时,bfill 方法会用该缺失值后面的有效值来填充这个缺失值。
    具体来说,bfill 方法从序列的末尾开始向前查找,找到第一个非缺失值,并用这个值来填充前面的缺失值。这个过程会一直进行,直到所有的缺失值都被填充完毕或者到达序列的开头。
    '''
    df1['sma_t1'] = df1['close'].rolling(window=t1).mean().bfill()
    df1['sma_t2'] = df1['close'].rolling(window=t2).mean().bfill()

    # 计算指数移动平均线
    df1['ema_t1'] = df1['close'].ewm(span=t1, adjust=False).mean()
    df1['ema_t2'] = df1['close'].ewm(span=t2, adjust=False).mean()

    # 回测
    bt_df = df1 

    import tqdm
    # -----------------------  sma
    open_orders = []
    close_orders = []
    for i in tqdm.tqdm(range(len(bt_df))):
        tem_dict = dict(bt_df.iloc[i])
        data_dt = tem_dict['data_dt']
        close = tem_dict['close']
        sma_t1 = tem_dict['sma_t1']
        sma_t2 = tem_dict['sma_t2']

        if len(open_orders) == 0:
            if sma_t1 >  sma_t2:# 金叉
                order_dict =  {}
                order_dict['but_dt'] = data_dt
                order_dict['buy_price'] = close
                open_orders.append(order_dict)
                continue
                
        if len(open_orders):
            if  sma_t1< sma_t2: # 死叉
                for  item in reversed(open_orders):
                    order_dict = open_orders.pop()
                    order_dict['sell_dt'] = data_dt
                    order_dict['sell_price'] = close
                    order_dict['gp'] = order_dict['sell_price'] - order_dict['buy_price']
                    close_orders.append(order_dict)


    tick3 = time.time()


    close_df = pd.DataFrame(close_orders)
    sma_result = cal_kpis(close_df = close_df, some_obj = sma_result)
    sma_result.get_data_duration = round(tick2 -tick1 , 3)
    sma_result.process_data_duration = round(tick3 -tick2 , 3)

    with Session() as session:
        session.add(sma_result)
        session.commit()


    # ========================= ema 

    tick4 = time.time()
    open_orders = []
    close_orders = []
    for i in tqdm.tqdm(range(len(bt_df))):
        tem_dict = dict(bt_df.iloc[i])
        data_dt = tem_dict['data_dt']
        close = tem_dict['close']
        ema_t1 = tem_dict['ema_t1']
        ema_t2 = tem_dict['ema_t2']

        if len(open_orders) == 0:
            if ema_t1 >  ema_t2:# 金叉
                order_dict =  {}
                order_dict['but_dt'] = data_dt
                order_dict['buy_price'] = close
                open_orders.append(order_dict)
                continue
                
        if len(open_orders):
            if  ema_t1< ema_t2: # 死叉
                for  item in reversed(open_orders):
                    order_dict = open_orders.pop()
                    order_dict['sell_dt'] = data_dt
                    order_dict['sell_price'] = close
                    order_dict['gp'] = order_dict['sell_price'] - order_dict['buy_price']
                    close_orders.append(order_dict)

    tick5 = time.time()
    close_df = pd.DataFrame(close_orders)
    close_df = pd.DataFrame(close_orders)
    ema_result = cal_kpis(close_df = close_df, some_obj = ema_result)
    ema_result.get_data_duration = round(tick2 -tick1 , 3)
    ema_result.process_data_duration = round(tick5 -tick4 , 3)

    with Session() as session:
        session.add(ema_result)
        session.commit()

    print('done')


if __name__ =='__main__':
    rm = RabbitManager()
    # 获取1个数据
    data_list = rm.get_message_early_ack('smaema', count=1)
    if len(data_list):
        the_task = data_list[0]

        short_t = the_task['short_t']
        long_t = the_task['long_t']

        input_smaema = InputSMAEMA_Para(short_t = short_t ,long_t = long_t)
        process(input_smaema)

3 调度

最近也在研究一些简单又可靠的调度方法。

  • 1 cron: 简单是简单了,就是有很多缺陷。首先是只能到分钟级,另外没有任务的控制,反正到点就触发。这样如果是长耗时任务,很可能发太多撑爆内存。
  • 2 nohup + for(while) : 简单,但是不好控制worker的数量。
  • 3 nohup + apscheduler : 这个可以有。最初是打算docker + aps的,但那样要么都执行docker worker,要么就要管理挂载,比较麻烦。未来模式成熟了,可以考虑同时保留 systemd + aps 和docker + aps 两种简单的调度模式并存。
from datetime import datetime
import os 
from apscheduler.schedulers.blocking import BlockingScheduler

def exe_sh(cmd = None):
    os.system(cmd)

# 后台启动命令 nohup python3 aps.py >/dev/null 2>&1 &

if __name__ == '__main__':
    sche1 = BlockingScheduler()
    # sche1.add_job(exe_sh,'interval', seconds=1, kwargs ={'cmd':'python3 ./main_handler/main.py'})
    sche1.add_job(exe_sh,'interval', seconds=10, 
    kwargs ={'cmd':'python3 qtv200_0004_sma_ema_clickhouse.py'},
    max_instances=20,coalesce=True)
    print('[S] starting inteverl')
    sche1.start()

我启动了周期任务,10秒一次,最多允许20个任务实例,且超时任务最后会合并(为1次)。看到这种方法是满核调用的。
在这里插入图片描述

在这里插入图片描述
有意思的是,我问了下deepseek,如果没有骗我的话,aps默认是线程池。
在这里插入图片描述
一般来说,python是单进程~单线程的。如果在一个脚本内,一般来说不会突破一个核的限制。现在在调度模式下一个线程实际上触发了多个线程的工作,某种程度上说就绕过了这个限制。

4 结果

还挺理想的,8个核 ~ 20 worker,在睡一觉之后,大约完成了 10%的任务。我打算在另一个位置也启动一下worker。另外注意到租用机的数据读取时间是2秒,比我的主机慢了一倍。
在这里插入图片描述

总体上,本次实验应该算是成功了:

指标计算的提升:

  • 1 增加了盈亏比
  • 2 增加了胜率

架构和工具方面的提升:

  • 1 使用clickhouse,增强了worker处理速度
  • 2 使用ORM,灵活存储
  • 3 使用APS调度

下一步:

  • 1 对结果进行数据分析,例如聚类,以发现规律
  • 2 准备展开更多的标的的计算,暂时先考虑SMA/EMA策略
  • 3 将选出的策略与强化学习框架对接
  • 4 准备前端投射,实盘操作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2047637.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【软件测试】软件系统测试方案(Word原件)

1. 引言 1.1. 编写目的 1.2. 项目背景 1.3. 读者对象 1.4. 参考资料 1.5. 术语与缩略语 2. 测试策略 2.1. 测试完成标准 2.2. 测试类型 2.2.1. 功能测试 2.2.2. 性能测试 2.2.3. 安全性与访问控制测试 2.3. 测试工具 3. 测试技术 4. 测试资源 4.1. 人员安排 4.2. 测试环境 4.2.…

Openstack 与 Ceph集群搭建(上): 规划与准备

文章目录 写在前面网络架构节点规划软件版本避坑指南 基础配置1. host配置2. 修改hostname名称3. 确保root账号能登录系统4. 配置NTP5. 配置免密登录 写在前面 近期将进行三节点的Openstack、Ceph集群混合部署&#xff0c;本人将详细记录该过程。在此之前&#xff0c;本文为Op…

逆向开发LabVIEW程序的操作与注意事项(无源代码)

1. 概述与准备工作 当手头没有源代码&#xff0c;只有LabVIEW编译后的可执行程序时&#xff0c;逆向开发的难度和复杂性大大增加。需要用到的工具、方法和策略也会有所不同。逆向工程的目标是在没有源代码的情况下重建或理解该程序的功能、结构和行为。涉及CameraLink通讯的程…

Android大脑--systemserver进程

用心坚持输出易读、有趣、有深度、高质量、体系化的技术文章&#xff0c;技术文章也可以有温度。 本文摘要 系统native进程的文章就先告一段落了&#xff0c;从这篇文章开始写Java层的文章&#xff0c;本文同样延续自述的方式来介绍systemserver进程&#xff0c;通过本文您将…

day34-nginx常用模块

## 0. 网络面试题 网络面试题: TCP三次握手 TCP四次挥手 DNS解析流程 OSI七层模型 抓包工具 tcpdump RAID级别区别 开机启动流程 如何实现不同的网段之间通信(路由器) ip route add 192.168.1.0 255.255.255.0 下一跳的地址或者接口 探测服务器开启了哪些端口(无法登录服务器…

嵌入式开发如何看芯片数据手册

不管什么芯片手册&#xff0c;它再怎么写得天花乱坠&#xff0c;本质也只是芯片的使用说明书而已。而说明书一个最显著的特点就是必须尽可能地使用通俗易懂的语句&#xff0c;向使用者交代清楚该产品的特点、功能以及使用方法。 以TMP423为例&#xff0c;这是一个测量温度的芯…

【密码学】密钥管理:①基本概念和密钥生成

密钥管理是处理密钥从产生到最终销毁的整个过程的有关问题&#xff0c;包括系统的初始化及密钥的产生、存储、备份与恢复、装入、分配、保护、更新、控制、丢失、撤销和销毁等内容。 一、密钥管理技术诞生的背景 随着计算机网络的普及和发展&#xff0c;数据传输和存储的安全问…

蓝牙音视频远程控制协议(AVRCP) command跟response介绍

零.声明 本专栏文章我们会以连载的方式持续更新&#xff0c;本专栏计划更新内容如下&#xff1a; 第一篇:蓝牙综合介绍 &#xff0c;主要介绍蓝牙的一些概念&#xff0c;产生背景&#xff0c;发展轨迹&#xff0c;市面蓝牙介绍&#xff0c;以及蓝牙开发板介绍。 第二篇:Trans…

智慧运维:数据中心可视化管理平台

图扑智慧运维数据中心可视化管理平台&#xff0c;实时监控与数据分析&#xff0c;优化资源分配&#xff0c;提升运维效率&#xff0c;确保数据中心的安全稳定运行。

Linux进程间通信——匿名管道

文章目录 进程间通信管道匿名管道匿名管道使用 进程间通信 进程设计的特点之一就是独立性&#xff0c;要避免其他东西影响自身的数据 但有时候我们需要共享数据或者传递信息&#xff0c;传统的父子进程也只能父进程传递给子进程信息 因此进程间通信还是很必要的&#xff0c;…

Apollo9.0 PNC源码学习之Planning模块—— Lattice规划(三):静态障碍物与动态障碍物ST图构建

参考文章: (1)Apollo6.0代码Lattice算法详解——Part4:计算障碍物ST/SL图 (2)自动驾驶规划理论与实践Lattice算法详解 1 计算障碍物ST/SL图 计算障碍物ST/SL图主要函数关系图: // 通过预测得到障碍物list auto ptr_prediction_querier = std::make_shared<Predict…

2024新型数字政府综合解决方案(五)

新型数字政府综合解决方案通过集成人工智能、大数据、区块链和云计算技术&#xff0c;打造了一个智能化、透明化和高效的政务服务平台&#xff0c;旨在提升政府服务的响应速度、处理效率和数据安全性。该方案实现了跨部门的数据共享与实时更新&#xff0c;通过智能化的流程自动…

Waterfox vG6.0.8 官方版下载和安装步骤(一款响应速度非常快的浏览器)

前言 Waterfox 水狐浏览器&#xff0c;从字面上我们可以轻松的了解该款浏览器的一些特点。Waterfox是通过Mozilla官方认证的纯64位版火狐浏览器&#xff0c;而Waterfox 10采用Firefox 10官方源码编译而成&#xff0c;改进了大内存和64位计算的细节&#xff0c;在64位Windows系…

用Python读取Excel数据在PPT中的创建图表

可视化数据已成为提高演示文稿专业度的关键因素之一。使用Python从Excel读取数据并在PowerPoint幻灯片中创建图表不仅能够极大地简化图表创建过程&#xff0c;还能确保数据的准确性和图表的即时性。通过Python这一桥梁&#xff0c;我们可以轻松实现数据自动化处理和图表生成&am…

MyBatis全解

目录 一&#xff0c; MyBatis 概述 1.1-介绍 MyBatis 的历史和发展 1.2-MyBatis 的特点和优势 1.3-MyBatis 与 JDBC 的对比 1.4-MyBatis 与其他 ORM 框架的对比 二&#xff0c; 快速入门 2.1-环境搭建 2.2-第一个 MyBatis 应用程序 2.3-配置文件详解 (mybatis-config.…

软件需求设计分析报告(Word原件)

第1章 序言 第2章 引言 2.1 项目概述 2.1.1 项目背景 2.1.2 项目目标 2.2 编写目的 2.3 文档约定 2.4 预期读者及阅读建议 第3章 技术要求 3.1 软件开发要求 3.1.1 接口要求 3.1.2 系统专有技术 3.1.3 查询功能 3.1.4 数据安全 3.1.5 可靠性要求 3.1.6 稳定性要求 3.1.7 安全性…

练习:python条件语句、循环语句和函数的综合运用

需求描述&#xff1a; 期望输出效果&#xff1a; 练习成果&#xff1a; #简单的银行业务流程 many 50000 def main_menu():print("----------主菜单----------"f"\n{name}您好&#xff0c;欢迎来到ATM&#xff0c;请选择操作&#xff1a;""\n查询余…

鼠标手势软件,效率办公必备!移动鼠标即可执行命令

鼠标手势软件是一种通过在屏幕上绘制特定手势来触发预设操作或命令的工具&#xff0c;它能够极大地提高用户的操作效率&#xff0c;特别是在进行重复性工作时尤为明显。这类软件通常支持多种手势操作&#xff0c;如拖拽、双击、滚动等&#xff0c;并允许用户自定义手势以适应个…

【Linux】系列入门摘抄笔记-8-权限管理chmod/chown

Linux操作系统中文件的基本权限由9个字符组成&#xff0c;分别为属主、属组和其他用户&#xff0c;用于规定是否对文件有读、写和执行权限。 文件/目录的权限与归属 目录列表中&#xff0c;有9列 第一列&#xff1a;文件类型与权限&#xff08;共10个字符&#xff0c;分为四组…

RAG完整构建流程-从入门到放弃

RAG完整构建流程 LLM模型缺陷&#xff1a; ​ 知识是有局限性的(缺少垂直领域/非公开知识/数据安全) ​ 知识实时性(训练周期长、成本高) ​ 幻觉问题(模型生成的问题) ​ 方法&#xff1a;Retrieval-Augmented Generation&#xff08;RAG&#xff09; ​ 检索&#xff1…