原创文章第119篇,专注“个人成长与财富自由、世界运作的逻辑, AI量化投资”。
继续强化学习应用于金融投资。
我们的AI量化平台,针对传统规则量化策略,进行了“积木式”的拆分,这种拆分的好处,就是最大化复用代码逻辑,这样开发策略又快且不容易出错。
针对强化学习环境,我们也打算这么做。看到有些平台,股票一个环境,加密货币一个环境,期货又是另一个环境,甚至把数据源处理都耦合到环境中,这是不对的。维护起来特别麻烦且容易出错。
01 强化学习环境的构成
一个完整的金融强化学习环境,包括数据(通常是OHLC以及特征数据)、交易动作(做多,平仓或者权重),回测系统(回测系统能够对交易动作做出反馈),激励指标(收益率、夏普比等)。
当下很多开源包在实现这个环境,存在一些问题。它们大篇幅做数据特征处理,实现了简单的量化回测,激励指标比较单一。这些环境扩展性不好,更谈不上接入实盘。
前文我们对传统量化与机器学习量化实现了统一的框架,数据管理及自动化标注,接入到backtrader的回测系统中。
同样,我们也希望可以最大化复用之前的成果,把强化学习环境也整合到一起,我们就不必关心数据处理,回测系统等环节的处理。
然而,backtrader本身为是传统量化而生,它的run直接就遍历了整个时间序列,而强化学习是由环境来调用回测引擎,一步一步往前走。
对于backtrader要进行一些改造。
一、重写_run_next
Backtrader在执行run的时间,在_runnext函数里整个顺序进行遍历。我们重写这个函数,只做第一步初始化的运算。
def _runnext(self, runstrats): ''' Actual implementation of run in full next mode. All objects have its ``next`` method invoke on each data arrival ''' self.runstrats_container = runstrats self._init_run() def _init_run(self): datas = sorted(self.datas, key=lambda x: (x._timeframe, x._compression)) datas1 = datas[1:] data0 = datas[0] d0ret = True rsonly = [i for i, x in enumerate(datas) if x.resampling and not x.replaying] onlyresample = len(datas) == len(rsonly) noresample = not rsonly clonecount = sum(d._clone for d in datas) ldatas = len(datas) ldatas_noclones = ldatas - clonecount dt0 = date2num(datetime.datetime.max) - 2 # default at max self.bt_state_container = {"datas": datas, "datas1": datas1, "data0": data0, "d0ret": d0ret, "rsonly": rsonly, "onlyresample": onlyresample, "noresample": noresample, "ldatas_noclones": ldatas_noclones, "dt0": dt0, }
二、提供一个step单步执行函数
这个函数的代码大多可以从cerebro里查到,这里就是展开描述。
def _step(self, runstrats, datas, datas1, data0, d0ret, rsonly, onlyresample, noresample, ldatas_noclones, dt0): # if any has live data in the buffer, no data will wait anything newqcheck = not any(d.haslivedata() for d in datas) if not newqcheck: # If no data has reached the live status or all, wait for # the next incoming data livecount = sum(d._laststatus == d.LIVE for d in datas) newqcheck = not livecount or livecount == ldatas_noclones lastret = False # Notify anything from the store even before moving datas # because datas may not move due to an error reported by the store self._storenotify() if self._event_stop: # stop if requested return True self._datanotify() if self._event_stop: # stop if requested return True # record starting time and tell feeds to discount the elapsed time # from the qcheck value drets = [] qstart = datetime.datetime.utcnow() for d in datas: qlapse = datetime.datetime.utcnow() - qstart d.do_qcheck(newqcheck, qlapse.total_seconds()) drets.append(d.next(ticks=False)) d0ret = any((dret for dret in drets)) if not d0ret and any((dret is None for dret in drets)): d0ret = None if d0ret: dts = [] for i, ret in enumerate(drets): dts.append(datas[i].datetime[0] if ret else None) # Get index to minimum datetime if onlyresample or noresample: dt0 = min((d for d in dts if d is not None)) else: dt0 = min((d for i, d in enumerate(dts) if d is not None and i not in rsonly)) dmaster = datas[dts.index(dt0)] # and timemaster self._dtmaster = dmaster.num2date(dt0) self._udtmaster = num2date(dt0) # slen = len(runstrats[0]) # Try to get something for those that didn't return for i, ret in enumerate(drets): if ret: # dts already contains a valid datetime for this i continue # try to get a data by checking with a master d = datas[i] d._check(forcedata=dmaster) # check to force output if d.next(datamaster=dmaster, ticks=False): # retry dts[i] = d.datetime[0] # good -> store # self._plotfillers2[i].append(slen) # mark as fill else: # self._plotfillers[i].append(slen) # mark as empty pass # make sure only those at dmaster level end up delivering for i, dti in enumerate(dts): if dti is not None: di = datas[i] rpi = False and di.replaying # to check behavior if dti > dt0: if not rpi: # must see all ticks ... di.rewind() # cannot deliver yet # self._plotfillers[i].append(slen) elif not di.replaying: # Replay forces tick fill, else force here di._tick_fill(force=True) # self._plotfillers2[i].append(slen) # mark as fill elif d0ret is None: # meant for things like live feeds which may not produce a bar # at the moment but need the loop to run for notifications and # getting resample and others to produce timely bars for data in datas: data._check() else: lastret = data0._last() for data in datas1: lastret += data._last(datamaster=data0) if not lastret: # Only go extra round if something was changed by "lasts" return True # return somethin signaling the end # Datas may have generated a new notification after next self._datanotify() if self._event_stop: # stop if requested return True if d0ret or lastret: # if any bar, check timers before broker self._check_timers(runstrats, dt0, cheat=True) if self.p.cheat_on_open: for strat in runstrats: strat._next_open() if self._event_stop: # stop if requested return True self._brokernotify() if self._event_stop: # stop if requested return True if d0ret or lastret: # bars produced by data or filters self._check_timers(runstrats, dt0, cheat=False) for strat in runstrats: strat._next() if self._event_stop: # stop if requested return True self._next_writers(runstrats) self.bt_state_container = {"datas": datas, "datas1": datas1, "data0": data0, "d0ret": d0ret, "rsonly": rsonly, "onlyresample": onlyresample, "noresample": noresample, "ldatas_noclones": ldatas_noclones, "dt0": dt0, } return False
如此,我们的backtrader就可以实现单步循环,为我们的环境提供回测能力了。
一些思考:
疫情还在持续,不知要多久,不知以何种方式结束。
目前大家看到的,可预见到的,肯定不是大家所期待的。
我们很多时候决定不了什么,只能耐心等待,保护好自己,积蓄能力。
三年后,也许这些都是故事 ,也许一个超级疫苗就研发出来,从此人间皆安。
我们要做的事情是,如果再有类似的“黑天鹅”事件发生,我们是否不那么被动,有更多的选择的权利?!
小时候,受到不公正的待遇,私下默默努力。但现代毕竟不是武侠里的快意恩愁。我们能做的事情是升级自己的环境,有能力远离那些不喜欢的人,不喜欢的事。尽管不好的事情哪里都有,但越往上走,会越发文明,越发自由。
努力的意义是自由,财务自由就是你能离得开职场,进而不受地点的约束,而同样可以过上有品质的生活,你就是自由的。
创业九死一生,若成会带来大自由,不成则更加不自由。读书,写作也许是很好的一条路。读书,写作是在疫情封控这样的情境下都可以做的事情。
代码细节,请前往星球微信群交流。
强化学习框架stable-baseline3以及pandas datareader
ETF轮动+RSRS择时,加上卡曼滤波:年化48.41%,夏普比1.89