说明
最初(去年7月)快快上了一版,到现在差不多正好一年。总体上当时做的还是蛮粗糙的,没有考虑模式,只是简单的用判别模型做了一道。
过去的一年,显然不是特别好的一年。我知道的大部分还是以亏损居多。这版策略竟然没有亏钱,还算勉强及格。而且我在买卖和手续费上是比较苛刻的,所以实际上肯定会比这个数字好一些。 最近的20个订单还赚了一些。
总体上,这种盈亏比接近1的策略是没啥意思的。2或者3可能才是比较能让人提神的东西。改进的思路其实还蛮多的,只是没有什么具体的切入口,然后,突然想到这篇:
书里给过一些建议
详细的原文没去看了,大致的意思就是一开始的时候不要想着自己瞎搞,先把市面上最基础的策略拿来变形。很接地气的一个建议,我觉得从这里切入了。
内容
1 粗的计划
Line A: 将一些基础的策略:均线策略、RSI、MACD、布林带… 这些常见的,基础的策略做一个基础版本,然后做一个参数网格,由程序进行批量的回测,找出一些合适的候选策略。
LineA >> 强化学习
之后就将LineA的每一个结果视为Bandit,然后由强化学习框架进行组织。所以,LineA最终会产出:
- 1 基础策略的参数变体: 作为基准参考
- 2 强化学习框架:作为一个pilot
- 3 通用的回测方法
LineA的结果将直接投入生产。使用经典策略的风险很低。
LineB: 架构完善 + 单模型改进
架构上,采用新的数据库和队列方法,确保可以支持大量的任务同时进行。过去走Mongo、Redis Stream 的方式不对,太慢了。走ClickHouse、Redis KV、Kafka。
与UCS规范对接,在未来可以无缝的接入更多维度。
最主要的改进是模型目标改进,我需要的是“大砍刀”模型。
在任务调度上也需要进行优化,避免手动了,这方面我计划用airflow。
LineB >> 遗传算法 + MPLR(Matrix Parallel LR)
寻找最优模型,前者确保了方法正确,后者确保了效率足够。
LineB 最终产出:
- 1 适应性更强的模型
- 2 一套新的策略,预期不会低于1.5的盈亏比
LineC: 模式识别 + 生成式模型 + 向量数据库
采用规则方法和HMM方法进行模式识别。
首先是基于规则,可以给出一个简单、易于理解的模式,作为开始和基准。
接下来通过HMM作为媒介,我将会逐步把注意力转移到概率图领域,这是我认为更有前途的领域。
2 简单开始
先做一个简单的测试:
- 1 均线策略复现 - 【经典策略】
- 2 使用RabbiMQ分发 - 【模拟】
- 3 使用RabbitMQ收集结果
2.1 均线策略
大语言模型很方便,总体上应该差不多
我的数据存储在clickhouse中,真的是超快,感觉和直接从内存里拿差异都不大。一次性就取到了约70万行数据。
data = chc._exe_sql('select * from his_hs300_min')
executed in 1.38s, finished 18:24:13 2024-08-03
(711144, 14)
主要的策略逻辑
# 周期
t1 = 2400 # 10天
t2 = 5*t1 # 50天
# 计算简单移动平均线
df1['sma_t1'] = df1['close'].rolling(window=t1).mean().fillna(method='bfill')
df1['sma_t2'] = df1['close'].rolling(window=t2).mean().fillna(method='bfill')
# 计算指数移动平均线
df1['ema_t1'] = df1['close'].ewm(span=t1, adjust=False).mean()
df1['ema_t2'] = df1['close'].ewm(span=t2, adjust=False).mean()
然后就可以进行回测
# 回测
bt_df = df1
import tqdm
open_orders = []
close_orders = []
for i in tqdm.tqdm(range(len(bt_df))):
tem_dict = dict(bt_df.iloc[i])
data_dt = tem_dict['data_dt']
close = tem_dict['close']
sma_t1 = tem_dict['sma_t1']
sma_t2 = tem_dict['sma_t2']
if len(open_orders) == 0:
if sma_t1 > sma_t2:# 金叉
order_dict = {}
order_dict['but_dt'] = data_dt
order_dict['buy_price'] = close
open_orders.append(order_dict)
continue
if len(open_orders):
if sma_t1< sma_t2: # 死叉
for item in reversed(open_orders):
order_dict = open_orders.pop()
order_dict['sell_dt'] = data_dt
order_dict['sell_price'] = close
order_dict['gp'] = order_dict['sell_price'] - order_dict['buy_price']
close_orders.append(order_dict)
close_df = pd.DataFrame(close_orders)
close_df= close_df.sort_values(['but_dt'])
pd.set_option('display.max_rows', 500)
close_df['gpr'] = close_df['gp'] / close_df['buy_price']
close_df['npr'] = close_df['gpr'] - 0.005
close_df['npr'].mean()
close_df['gpr'].mean()
close_df
but_dt buy_price sell_dt sell_price gp gpr npr
0 2012-05-28 09:31:00 2.1105 2012-07-05 11:24:00 2.0335 -0.0770 -0.036484 -0.041484
1 2012-10-17 13:35:00 1.9333 2012-11-08 13:38:00 1.8993 -0.0340 -0.017587 -0.022587
2 2012-12-18 09:35:00 1.9803 2013-03-14 09:44:00 2.1180 0.1377 0.069535 0.064535
3 2013-05-17 13:52:00 2.1617 2013-06-18 14:15:00 2.0467 -0.1150 -0.053199 -0.058199
4 2013-08-14 10:27:00 2.0274 2013-11-04 14:33:00 2.0425 0.0151 0.007448 0.002448
5 2013-11-29 14:11:00 2.0945 2013-12-20 11:15:00 1.9803 -0.1142 -0.054524 -0.059524
6 2014-02-21 14:51:00 1.9367 2014-02-25 13:27:00 1.8853 -0.0514 -0.026540 -0.031540
7 2014-04-11 14:53:00 1.9419 2014-05-05 13:28:00 1.8260 -0.1159 -0.059684 -0.064684
8 2014-06-25 10:20:00 1.8415 2015-02-12 13:59:00 2.9836 1.1421 0.620201 0.615201
9 2015-02-27 09:37:00 3.0928 2015-06-29 14:34:00 3.6731 0.5803 0.187629 0.182629
10 2015-10-22 14:05:00 3.0651 2016-01-07 10:26:00 2.9636 -0.1015 -0.033115 -0.038115
11 2016-03-21 14:55:00 2.8512 2016-05-11 13:48:00 2.7146 -0.1366 -0.047910 -0.052910
12 2016-06-15 14:15:00 2.7525 2016-06-16 14:51:00 2.7323 -0.0202 -0.007339 -0.012339
13 2016-06-30 13:41:00 2.7860 2016-09-26 13:21:00 2.9182 0.1322 0.047452 0.042452
14 2016-10-19 10:18:00 2.9860 2016-12-20 14:13:00 2.9481 -0.0379 -0.012693 -0.017693
...
这种策略(SMA)竟然从总体上看是成功的: 历史上一共产生了35个订单,总体平均是1.79%的净收益,即使最最近的几个单子,亏的也不算离谱。
总体1.79%, 1.21%, -0.8%
再看下EMA,更富有进攻性,但是生存性甚至不如SMA。
这种策略主要是依赖短期和长期两个周期 的变化来产生不同的变体,另外就是当用在不同的标的时,效果肯定会不同。
不管是SMA还是EMA,也不管是哪种长短参数,从效果来看,我希望得到:
- 1 单数
- 2 持有天数中位数
- 3 平均 npr
- 4 最近3年 npr
- 5 最近1年 npr
所以,对于任何一个worker,获取一个数据集,然后根据参数进行计算后,返回所需要的结果,即可。
2.2 RabbitMQ分发
2.2.0 利用已有的RabbitMQ服务
之前做了一个简单的RabbitMQ的透传服务,所以现在可以用一个简单的对象来封装操作
import requests as req
import pika
import json
class RabbitManager:
__version__ = 1.0
def __init__(self, send_url = 'http://YOURIP:24098/send_workq_message/',
durable = True, rabbit = 'rabbit01',
read_ip = 'YOURIP',
read_port = 24091,
read_user = 'xxx',
read_pwd = 'xxxx',
read_heartbeat = 600
):
self.send_url = send_url
self.durable = durable
self.rabbit = rabbit
self.credentials = pika.PlainCredentials(read_user,read_pwd)
self.connection_para = pika.ConnectionParameters(read_ip, read_port, '/', self.credentials, heartbeat=read_heartbeat)
def send_message(self,queue = None, message_list = None):
para_dict = {}
para_dict['rabbit'] = self.rabbit
para_dict['routing_key'] = queue
para_dict['durable'] = self.durable
para_dict['message_list'] = message_list
para_dict['queue'] = queue
return req.post(self.send_url, json = para_dict).json()
# 返回消息id列表和消息列表
def get_message_early_ack(self, queue = None, count = 3):
data_list = []
connection = pika.BlockingConnection(self.connection_para )
with connection.channel() as channel:
# 声明一个队列
channel.queue_declare(queue=queue, durable=self.durable)
for i in range(count):
method_frame, header_frame, body = channel.basic_get(queue=queue)
if body is None :
break
res_dict = json.loads(body.decode())
data_list.append(res_dict)
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
connection.close()
return data_list
'''
rm = RabbitManager()
rm.send_message(queue ='hello1', message_list = [{'msg_id': 111, 'content':'it is me '}] )
'''
发送:
消费:
data_list = rm.get_message_early_ack('hello1')
data_list
[{'msg_id': 111, 'content': 'it is me '}]
2.2.1 发送需要测试的任务
先将一次的执行参数化。
简单点,将本地要的几万种组合算出来,然后一次性填入worker
from Basefuncs import *
class ListBatchIterator:
def __init__(self, some_list, batch_size):
self.some_list = some_list
self.batch_size = batch_size
@staticmethod
def slice_list_by_batch(list_length, batch_num):
batch_list =list(range(0, list_length +batch_num , batch_num))
res_list = []
for i in range(len(batch_list)-1):
res_list.append((batch_list[i],batch_list[i+1]))
return res_list
def __iter__(self):
the_slice_list = self.slice_list_by_batch(len(self.some_list), self.batch_size)
for the_slice in the_slice_list:
yield self.some_list[the_slice[0]:the_slice[1]]
rm = RabbitManager()
short_t_list = list(range(100,10000, 100))
long_t_list = list(range(600,60000,600))
task_list = []
cnt = 0
for i in short_t_list:
for j in long_t_list:
tem_dict = {}
tem_dict['ord_id'] = cnt
cnt+=1
tem_dict['short_t'] = i
tem_dict['long_t'] = j
task_list.append(tem_dict)
task_list_iter = ListBatchIterator(task_list, 1000)
# 发送到队列
for tem_list in task_list_iter:
rm.send_message(queue ='smaema', message_list = tem_list )
2.2.2 写一个worker
从队列中读取元数据,处理后送到结果队列。
from Basefuncs import *
from typing import List, Optional
from pydantic import BaseModel
# 函数的数据模型定义
class SMAEMA(BaseModel):
clickhouse_ip : str = 'xxx'
clickhouse_db : str = 'xxx'
clickhouse_table_name : str = 'his_hs300_min'
clickhouse_select_cols : list = ['shard','part','block','brick','code','amt','close','high','low',
'open', 'data_dt', 'pid', 'ts','vol']
gfgo_lite_server : str = 'http://xxx:24090/'
current_year : int = 2024
# 短: [100, 200, ....10000] 100个
# 长: [600,1200, ...] 100个
short_t : int
long_t : int
...
# 处理函数
def processing_sma_ema(clickhouse_ip = None, clickhouse_db = None, clickhouse_table_name = None, clickhouse_select_cols = None, gfgo_lite_server = None, current_year = None, short_t = None , long_t = None ):
m7ch_qtv200 = CHCfg(database=clickhouse_db, host = clickhouse_ip )
chc = CHClient(**m7ch_qtv200.dict())
# chc._exe_sql('show tables ')
ucs = UCS(gfgo_lite_server =gfgo_lite_server)
tick1 = time.time()
data = chc._exe_sql('select %s from %s' % (','.join(clickhouse_select_cols),clickhouse_table_name))
tick2 = time.time()
t1 = short_t
t2 = long_t
df = pd.DataFrame(data, columns = ['shard','part','block','brick','code','amt','close','high','low',
'open', 'data_dt', 'pid', 'ts','vol'])
df1 = df.sort_values(['data_dt'])
'''
bfill 是 "backward fill" 的缩写,是一种处理缺失值的方法。在数据处理中,当数据序列中存在缺失值(NaN,即 "Not a Number")时,bfill 方法会用该缺失值后面的有效值来填充这个缺失值。
具体来说,bfill 方法从序列的末尾开始向前查找,找到第一个非缺失值,并用这个值来填充前面的缺失值。这个过程会一直进行,直到所有的缺失值都被填充完毕或者到达序列的开头。
'''
df1['sma_t1'] = df1['close'].rolling(window=t1).mean().bfill()
df1['sma_t2'] = df1['close'].rolling(window=t2).mean().bfill()
# 计算指数移动平均线
df1['ema_t1'] = df1['close'].ewm(span=t1, adjust=False).mean()
df1['ema_t2'] = df1['close'].ewm(span=t2, adjust=False).mean()
# 回测
bt_df = df1
import tqdm
# ----------------------- sma
open_orders = []
close_orders = []
for i in tqdm.tqdm(range(len(bt_df))):
tem_dict = dict(bt_df.iloc[i])
data_dt = tem_dict['data_dt']
close = tem_dict['close']
sma_t1 = tem_dict['sma_t1']
sma_t2 = tem_dict['sma_t2']
if len(open_orders) == 0:
if sma_t1 > sma_t2:# 金叉
order_dict = {}
order_dict['but_dt'] = data_dt
order_dict['buy_price'] = close
open_orders.append(order_dict)
continue
if len(open_orders):
if sma_t1< sma_t2: # 死叉
for item in reversed(open_orders):
order_dict = open_orders.pop()
order_dict['sell_dt'] = data_dt
order_dict['sell_price'] = close
order_dict['gp'] = order_dict['sell_price'] - order_dict['buy_price']
close_orders.append(order_dict)
close_df = pd.DataFrame(close_orders)
close_df= close_df.sort_values(['but_dt'])
pd.set_option('display.max_rows', 500)
close_df['gpr'] = close_df['gp'] / close_df['buy_price']
close_df['npr'] = close_df['gpr'] - 0.005
close_df['buy_ts'] = ucs.dt_str2num_s(list(close_df['but_dt']))
close_df['sell_ts'] = ucs.dt_str2num_s(list(close_df['sell_dt']))
close_df['gap_ts'] = (close_df['sell_ts'] - close_df['buy_ts'] )//86400
#
# close_df['gpr'].mean()
# ----------------------- sma result
result_dict = {}
result_dict['sma.deals'] = len(close_df)
result_dict['sma.trade_days'] = close_df['gap_ts'].median()
result_dict['sma.total_mean_npr'] = round(close_df['npr'].mean(),3)
sel3 = close_df['but_dt'].apply(lambda x: True if int(x[:4]) >= current_year - 3 else False)
result_dict['sma.last3_mean_npr'] = round(close_df[sel3]['npr'].mean(),3)
sel1 = close_df['but_dt'].apply(lambda x: True if int(x[:4]) >= current_year - 1 else False)
result_dict['sma.last1_mean_npr'] = round(close_df[sel1]['npr'].mean(),3)
# ----------------------- ema
open_orders = []
close_orders = []
for i in tqdm.tqdm(range(len(bt_df))):
tem_dict = dict(bt_df.iloc[i])
data_dt = tem_dict['data_dt']
close = tem_dict['close']
ema_t1 = tem_dict['ema_t1']
ema_t2 = tem_dict['ema_t2']
if len(open_orders) == 0:
if ema_t1 > ema_t2:# 金叉
order_dict = {}
order_dict['but_dt'] = data_dt
order_dict['buy_price'] = close
open_orders.append(order_dict)
continue
if len(open_orders):
if ema_t1< ema_t2: # 死叉
for item in reversed(open_orders):
order_dict = open_orders.pop()
order_dict['sell_dt'] = data_dt
order_dict['sell_price'] = close
order_dict['gp'] = order_dict['sell_price'] - order_dict['buy_price']
close_orders.append(order_dict)
close_df = pd.DataFrame(close_orders)
close_df= close_df.sort_values(['but_dt'])
pd.set_option('display.max_rows', 500)
close_df['gpr'] = close_df['gp'] / close_df['buy_price']
close_df['npr'] = close_df['gpr'] - 0.005
close_df['buy_ts'] = ucs.dt_str2num_s(list(close_df['but_dt']))
close_df['sell_ts'] = ucs.dt_str2num_s(list(close_df['sell_dt']))
close_df['gap_ts'] = (close_df['sell_ts'] - close_df['buy_ts'] )//86400
# ----------------------- ema result
result_dict['ema.deals'] = len(close_df)
result_dict['ema.trade_days'] = close_df['gap_ts'].median()
result_dict['ema.total_mean_npr'] = round(close_df['npr'].mean(),3)
sel3 = close_df['but_dt'].apply(lambda x: True if int(x[:4]) >= current_year - 3 else False)
result_dict['ema.last3_mean_npr'] = round(close_df[sel3]['npr'].mean(),3)
sel1 = close_df['but_dt'].apply(lambda x: True if int(x[:4]) >= current_year - 1 else False)
result_dict['ema.last1_mean_npr'] = round(close_df[sel1]['npr'].mean(),3)
tick3 = time.time()
result_dict['get_data_duration'] = tick2 - tick1
result_dict['process_data_duration'] = tick3 - tick2
result_dict['short_t'] = short_t
result_dict['long_t'] = long_t
return result_dict
执行语句
if __name__ = '__main__':
rm = RabbitManager()
# 获取1个数据
data_list = rm.get_message_early_ack('smaema', count=1)
if len(data_list):
the_task = data_list[0]
short_t = the_task['short_t']
long_t = the_task['long_t']
smaema = SMAEMA(short_t = short_t ,long_t = long_t)
res_dict = processing_sma_ema(**smaema.dict())
rm.send_message(queue ='smaema_result', message_list = [res_dict] )
2.2.3 在算网拉取并启动worker
后面就慢慢等了,看看效果怎么样。好的话就可以铺开来把一堆标的都跑一遍。