DolphinDB 流数据状态函数插件介绍

news2024/11/19 0:46:09

1. 引言

量化金融的研究和实盘中,越来越多的机构需要根据高频的行情数据(L1/L2以及逐笔委托数据)来计算量价因子,每只股票的每一条新数据的注入都会更新该只股票的所有因子值。这些因子通常是有状态的:不仅与当前的多个指标有关,而且与多个指标的历史状态相关。

以国内的股票市场为例,每3秒收到一个快照,每个股票每天4800个快照,计算因子时可能会用到之前若干个快照的数据,甚至之前若干天的数据。绝大多数机构的研发环境系统(例如 Python)与生产环境系统(例如C++)在研发中所使用编程语言的不同,要维护两套代码,是非常沉重的负担。

而 DolphinDB 所提供的响应式状态引擎(Reactive State Engine),可以实现带有状态的高频因子的流计算解决上述问题。针对这些因子,状态引擎做了很多特殊的优化。而且,状态引擎能够将历史数据批量处理(研发阶段)中编写的表达式或函数作为输入进行流式计算,并确保流式计算的结果与批量计算完全一致。只要在历史数据的批量计算中验证正确,即可保证流数据的实时计算正确。这避免了在生产环境中重写研发代码的高额成本,以及维护研发和生产两套代码的负担。

DolphinDB 流数据响应式状态引擎每输入一条数据引擎都将触发一条结果输出,因此输入和输出数据量一致。DolphinDB 针对生产业务中的常见状态函数(滑动窗口函数、累积函数、序列相关函数和 topN 相关函数等)进行特殊优化,大幅提升了这些函数在响应式状态引擎中的计算效率。

响应式状态引擎目前仅支持系统优化过的状态函数。如果 DolphinDB 用户需要使用自定义函数封装复杂的状态函数,可以用 @state 进行声明。但是这种自定义函数仍然具有局限性,相比 C++ 开发的状态函数算子,自定义函数的运行效率较低。并且状态引擎支持的自定义函数仅支持 if-else、赋值、return 等少量语句。在很多应用场景下,仍然需要用 C++ 开发新的函数算子。

随着响应式状态引擎支持的业务越来越多,各种应用场景对状态函数的需求也越来越多样化,如果继续在 DolphinDB 中维护这些状态函数,已经无法适应日益灵活的状态函数需求。因此我们将流数据的状态函数制作成插件,利用插件系统来支持这些多种多样的函数需求。

2. 状态函数插件系统原理简介

2.1 插件的加载

流数据系统中维护了一张哈希表,其中包含了状态函数名称到状态函数 factory 函数的映射。DolphinDB 为插件提供了一个注册流数据状态函数的接口,插件所在的动态链接库中会有一个初始化函数 initialize。当用户通过 loadPlugin 加载插件时,loadPlugin 将检查插件中是否包含了 initialize 函数。如果有,该函数将被调用,插件中包含的状态函数也将被注册到哈希表当中。

2.2 插件的使用

插件当中的流数据状态函数一经加载注册,其使用便与系统自带的其他流数据状态函数没有区别。在创建流数据响应式状态引擎时,引擎会根据函数名,在哈希表中查找对应的 factory 函数并创建状态函数的实例。

但是,因为 DolphinDB 在创建流数据响应式状态引擎的时候可能需要使用和流数据状态函数对应的同名批处理函数做一些诸如类型推导的处理工作,因此插件中也需要实现对应的批处理函数的原型。

3. 状态函数插件的使用

下面以 stateMavg 函数为例,介绍流数据插件的使用。

首先调用 loadPlugin 函数加载插件:

loadPlugin("/path/to/PluginReactiveState.txt")
go

插件导入完成后,调用 listStateFunction(moduleName) 可以获取所有通过插件导入的状态函数及其用法的列表。其中就包含了我们将要使用的 stateMavg 函数:rse::stateMavg

插件中的函数都带有模块名。如果在脚本中想要直接使用插件中的函数(即不需要添加 rse::),除了用 loadPlugin 导入插件以外,还需要导入模块名:

use rse

状态函数导入后,我们就可以利用该函数构造 metric 并创建响应式状态引擎,和普通的响应式状态函数的使用方法相同。

n=10
trade = table(09:00:00 + 1..n as time, take(`A`B, n) as sym, 1..n as val)

outputTable = table(100:0, `sym`time`total, [STRING, SECOND, DOUBLE])

engine = createReactiveStateEngine(name="test", metrics=[<time>, <rse::stateMavg(val, 3)>], dummyTable=trade, outputTable=outputTable, keyColumn=`sym, keepOrder=true)
engine.append!(trade)
select * from outputTable

4. 如何开发状态函数插件

4.1 状态函数接口

对于一个新的 ReactiveState 的实现,首先要继承 ReactiveState 类,并实现下列接口:

  • snapshotState:根据当前的函数状态生成 snapshot
  • restoreState:从 snapshot 中恢复函数的状态
  • append:新增数据并输出一条结果
  • addKeys:新增数据并输出一条结果
  • removeKeys:删除key
  • getMemoryUsed:获取函数当前的内存使用情况

如果函数不支持某些功能(例如 snapshot),相应接口应该抛出异常 RuntimeException

响应式状态函数定义示例:

class MyReactiveState : public ReactiveState {
public:
    MyReactiveState(/* ... */);
	virtual ~MyReactiveState(){}

	virtual IO_ERR snapshotState(const DataOutputStreamSP& out) override {
		throw RuntimeException("not supported");
	}
	virtual IO_ERR restoreState(const DataInputStreamSP& in){
		throw RuntimeException("not supported");
	}
	virtual void append(Heap* heap, const ConstantSP& keyIndex);
	virtual void addKeys(int count);
	virtual void removeKeys(const vector<int>& keyIndices);
	virtual void getMemoryUsed(long long& fixedMemUsed, long long& variableMemUsedPerKey);
private:
    // ...
};

4.2 状态函数接口的实现

下面还是以 stateMavg 函数为例,介绍状态函数的实现。stateMavg 若窗口中 X 的元素个数小于 window,则直接输出 X;否则取输出字段前 window - 1 个值与当前 X 值求平均。

stateMavg 状态函数类的完整定义如下:

class StateMovingAvgReactiveState : public ReactiveState {
public:
	StateMovingAvgReactiveState(int window, int minPeriod, int inputColIndex, int outputColIndex) : window_(window), minPeriods_(minPeriod),
		inputColIndex_(inputColIndex), outputColIndex_(outputColIndex){
		compute_ = std::bind(&StateMovingAvgReactiveState::calculate, this, std::placeholders::_1, std::placeholders::_2);
	}

	virtual IO_ERR snapshotState(const DataOutputStreamSP& out){
		throw RuntimeException("not supported");
	}
	virtual IO_ERR restoreState(const DataInputStreamSP& in){
		throw RuntimeException("not supported");
	}
	virtual void append(Heap* heap, const ConstantSP& keyIndex);
	virtual void addKeys(int count);
	virtual void removeKeys(const vector<int>& keyIndices);
	virtual void getMemoryUsed(long long& fixedMemUsed, long long& variableMemUsedPerKey){
		fixedMemUsed = sizeof(*this);
		variableMemUsedPerKey = sizeof(CircularQueue<char>) + sizeof(double) * window_ + 16;
	}
	
	static ReactiveStateSP createInstance(const vector<ObjectSP>& args, const vector<int>& inputColIndices, const vector<DATA_TYPE>& inputColTypes, const vector<int>& outputColIndices);
private:
	double calculate(int index, double val);

private:
	int window_;
	int inputColIndex_;
	int outputColIndex_;
	vector<double> sums_;
	vector<int> counts_;
	vector<CircularQueue<double>> data_;
};

在这里,我们选择不支持 snapshot 功能。另外,getMemoryUsed 实现起来也很简单,把类中成员变量所占据的内存空间相加即可。因此,本节主要介绍用于数据的输入和输出的 append 函数,以及有新增 key 和删除清理 key 时候的回调函数:addKeys 和 removeKeys

成员变量当中,window_ 是窗口长度参数,inputColIndex_ 和 outputColIndex_ 是引擎提供的输入输出列的编号,而 sums_ 、counts_ 以及 data_ 分别为各个 key 分组当前窗口中的元素的和、数量等数据状态。

当有新增 key 的时候,需要为该 key 初始化状态:

void StateMovingAvgReactiveState::addKeys(int count){
	for(int i=0; i<count; ++i){
		sums_.push_back(0);
		counts_.push_back(0);
		data_.emplace_back(window_);
	}
}

而 key 被清理掉时,对应的状态也应该被删除:

void StateMovingAvgReactiveState::removeKeys(const vector<int>& keyIndices){
	removeElements<double>(sums_, keyIndices);
	removeElements<int>(counts_, keyIndices);
	removeElements<CircularQueue<double>>(data_, keyIndices);
}

然后可以实现计算逻辑:

double StateMovingAvgReactiveState::calculate(int index, double val){
	CircularQueue<double>& queue = data_[index];
    if (val == DBL_NMIN) {
		return val;
	}

	if(LIKELY(queue.size() >= window_)){
		double v = queue.head();
		if(v != DBL_NMIN){
			sums_[index] -= v;
			--counts_[index];
		}
	}
	sums_[index] += val;
	++counts_[index];
	
	double result;
	// 若窗口中 X 的元素个数大于等于 window,则取输出字段前 window - 1 个值与当前 X 值求平均
	if(counts_[index] >= window_)
		result = sums_[index]/counts_[index];
	// 否则直接输出 X
	else
		result = val;
	
	sums_[index] -= val;
	sums_[index] += result;
	queue.push(result);
	return result;
}

最后将数据输入和输出功能封装为 append 函数:

void StateMovingAvgReactiveState::append(Heap* heap, const ConstantSP& keyIndex){
	ConstantSP col = table_->getColumn(inputColIndex_);
	INDEX start = 0;
	INDEX len = keyIndex->size();
	INDEX* pkeyIndex = keyIndex->getIndexArray();
	double buf[Util::BUF_SIZE];
	double bufR[Util::BUF_SIZE];
	while(start < len){
		int count = std::min(len - start, Util::BUF_SIZE);
		col->getDouble(pkeyIndex + start, count, buf);
		for(int i=0; i<count; ++i){
			INDEX curIndex = pkeyIndex[start + i];
			bufR[i] = calculate(curIndex, buf[i]);
		}
		setData<double>(outputColIndex_, pkeyIndex + start, count, bufR);
		start += count;
	}
}

输入数据存储在 table_ 中,可以通过 keyIndex 进行访问。输出数据同理,可以用 keyIndex 作为索引,用 setData 函数输出。

为了提升运行效率,会先把数据以数组的形式取到 buffer 当中再进行处理。如果输入的数据的分组个数大于 BUF_SIZE,则通过 while 循环分段处理。

4.3 实现状态函数 factory

同时,还需要一个 factory 函数用于创建响应式状态函数的实例,其函数签名为:

typedef ReactiveStateSP(*StateFuncFactory)(
    const vector<ObjectSP>& args,
    const vector<int>& inputColIndices,
    const vector<DATA_TYPE>& inputColTypes,
    const vector<int>& outputColIndices);

例如:

ReactiveStateSP createMyReactiveState(const vector<ObjectSP>& args, const vector<int>& inputColIndices, const vector<DATA_TYPE>& inputColTypes, const vector<int>& outputColIndices, SQLContextSP& context, Heap* heap){
	checkOutputColumn(outputColIndices,1,"myReactiveState");
	string funcName = "myReactiveState";
	string syntax = "Usage: genericStateIterate(X, initial, window, func). ";
    // 检查参数的有效性并处理参数
    // ...
	return new MyReactiveState(/* ... */);
}

4.4 在初始化函数中注册状态函数 factory

我们在插件中封装了一个 registerReactiveState 函数,可以直接使用。需要把对 registerReactiveState 函数的调用放入 intialize 函数当中:

ConstantSP initialize(Heap *heap, vector<ConstantSP>& arguments) {
    // ...
    string modname = arguments[0]->getString();
    if (!registerReactiveState(heap, modname, "myReactiveState", (long long)&createMyReactiveState, 1,
                              "(arg1, arg2,[arg3])")) {
        return new Bool(false);
    }
    // ...
}

4.5 创建批处理函数原型并注册

如前面所述,即使只会用到插件中的流处理状态函数,不需要批处理,创建并实现批处理函数仍然是必要的。

例如:

ConstantSP myReactiveState(Heap *heap, vector<ConstantSP> &arguments) {
  ConstantSP result = new Double(0.0);
  // ...
  return result;
}

最后把批处理函数声明为 extern "C"

extern "C" {
  ConstantSP myReactiveState(Heap *heap, vector<ConstantSP> &arguments);
}

并编辑插件的描述文件 PluginReactiveState.txt,添加批处理函数:

rse,libPluginReactiveState.so
…
myReactiveState,myReactiveState,system,2,3,0
…

这样,一个插件就开发完成了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/517173.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

精选博客系列|VMware发布下一代Workspace ONE SaaS平台,性能提升了10倍!

我们很高兴地宣布下一代 Workspace ONE SaaS 平台面世了&#xff01;日前公布的 Workspace ONE 架构的根本变化已经包含了我们最近的一些进展&#xff0c;例如自由式编排器&#xff0c;而且将成为未来 VMware 终端用户计算&#xff08;EUC&#xff09;创新的基石。 现代化的架…

【python】python闭包的详细解读(傻瓜式教学)

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;公众号&#x1f448;&#xff1a;测试开发自动化 &#x1f449;专__注&#x1f448;&#xff1a;专注主流机器人、人工智能等相关领域的开发、…

第八届SkyHackathon环境配置及问答指南

第八届SkyHackathon环境配置提问说明 本届Hackathon以传送带上的箱子检测为目标, 参赛的队伍需要完成: 利用Omniverse Replicator合成训练数据集利用合成的数据集, 训练目标检测模型将检测模型部署在组委会提供的Jetson Xavier NX节点上 其中上述第一和第二步需要参赛的同学…

前端——JS

1.JS的学习内容 JavaScript的组成包含ECMAScript、DOM、BOM。 2.JS的特点 JS是运行在浏览器上的一种脚本语言 【1】Java和JS的区别&#xff1a; 【2】HTML和CSS和JS这三者的关系 3.JS的引入方式 3.1JS的引入方式1 <!DOCTYPE html> <html lang"en"> &…

飞书文档和Cnfluence之间的区别是什么

PingCode 知识库、Confluence 等知识库工具和腾讯文档、WPS、飞书文档到底有什么区别&#xff1f;这是企业团队在找文档管理工具最常见的问题。这两种工具虽然都可以用于组织和共享知识&#xff0c;但它们在功能、使用场景和用户群体方面有很大的区别。那么&#xff0c;它们到底…

亚马逊云科技推出Matter PKI合规指导手册

亚马逊云科技推出Matter公钥基础设施&#xff08;Public Key Infrastructure,PKI&#xff09;合规指导手册&#xff0c;帮助客户使用Amazon Private Certificate Authority&#xff08;Amazon Private CA&#xff09;证书服务构建符合Matter要求的PKI证书体系&#xff0c;加快客…

R语言 | 数据汇总与简单图表制作

目录 一、准备工作 1.1 下载MASS扩展包与crabs对象 1.2 准备与调整系统内建state相关的对象 1.3 准备mtcars对象 二、了解数据的唯一值 三、基础统计知识与R语言 3.1 数据的集中趋势 3.1.1 认识统计学名词——平均数 3.1.2 认识统计学名词——中位数 3.1.3 认识统计学…

Adobe Photoshop 2022版 功能介绍及使用技巧

目录 版本介绍&#xff1a; 使用技巧&#xff1a; 截图展示&#xff1a; 分享 版本介绍&#xff1a; Adobe Photoshop 2022是Adobe公司的一款专业的图像处理软件&#xff0c;它提供了强大的图像处理功能&#xff0c;从色彩调整&#xff0c;图层处理到高级合成等功能。新版…

webhub123 硬件工程师学习和交流、问答各类网站集合

硬件不像软件开源共享的多&#xff0c;硬件的开发和学习相对不自由&#xff0c;能收集到资料有限。为了帮硬件开发工程找到更多大门&#xff0c;我们整理了一些学习、交流的优质网站&#xff0c;收录到 webhub123 硬件工程师学习和交流、问答各类网站集合http://​www.webhub12…

有没有中国版本的ChatGPT?

ChatGPT是一个基于人工智能的聊天机器人&#xff0c;它可以与用户进行自然语言交互。ChatGPT使用了最新的自然语言处理技术&#xff0c;包括深度学习和神经网络&#xff0c;以便更好地理解用户的意图和回答用户的问题。 ChatGPT可以回答各种问题&#xff0c;包括但不限于常见问…

截面空间计量模型(Stata)

截面空间计量模型(Stata) 文章目录 截面空间计量模型(Stata)[toc]1 广义空间自回归模型&#xff08;SAC&#xff09;2 空间误差模型(SEM)3 空间杜宾模型(SDM)4 广义空间嵌套模型(GNS)5 空间(自回归)滞后模型(SAR,SLM)6 空间杜宾误差模型(SDEM) 1 广义空间自回归模型&#xff08…

JUC并发编程14 | ThreadLocal

尚硅谷JUC并发编程&#xff08;100-111&#xff09; ThreadLocal ThreadLocal 使用 ThreadLocal是什么&#xff1f;ThreadLocal 提供线程局部变量。这些变量与正常的变量有所不同&#xff0c;因为每一个线程在访问ThreadLocal实例的时候&#xff08;通过其get或set方法&…

UNIAPP实战项目笔记68 购物车勾选到订单确认

UNIAPP实战项目笔记68 购物车勾选到订单确认 思路 需要用到vuex 页面间传值 案例截图 订单结算页面 购物车页面 确认订单页面 支付页面 代码 前端代码 购物车页面 shopcart.vue <template><view class"shop-cart"><template v-if" list.l…

启扬方案助力智能配送终端,打造智能取件新模式!

随着快递业务的不断发展和智能化程度的提高&#xff0c;智能快递柜已经成为了快递末端配送的新型解决方案&#xff0c;智能快递柜不仅可以提高用户取件的便捷性和安全性&#xff0c;还减少了人工成本&#xff0c;同时提高了快递配送的效率和服务质量&#xff0c;这也使得智能快…

2022年美国大学生数学建模竞赛C题贸易策略解题全过程文档及程序

2022年美国大学生数学建模竞赛 C题 贸易策略 原题再现&#xff1a; 背景:   市场贸易者经常购买和销售股票&#xff0c;目标是最大化他们的总回报。针对每次购买和销售&#xff0c;经常会存在回报提成。两个案例是金子和比特币。   要求:   你们团队被贸易者要求建立一…

Js中的微任务和宏任务

1.前言 任务可以分成两种&#xff0c;一种是同步任务&#xff08;synchronous&#xff09;&#xff0c;另一种是异步&#xff08;asynchronous&#xff09;&#xff0c;异步任务又分为宏任务和微任务。 同步任务&#xff1a;在主线程上排队执行的任务&#xff0c;只有前一个任…

C++11 -- 右值引用和移动语义

文章目录 基本概念左值和左值引用右值和右值引用 右值引用和移动语义的意义和使用场景左值引用与右值引用比较右值引用的特殊场景左值引用的短板右值引用和移动语义 完美转发模板中的&&万能引用完美转发在传参过程中保留原生类型属性完美转发实际中的使用场景 基本概念…

如何用ChatGPT分析品牌舆论传播概况,并给到处理建议?

该场景对应的关键词库&#xff08;25个&#xff09;&#xff1a; 舆论传播、数据分析、主题、事件、时间段、媒体渠道、数据来源、情感分析、关键词提取、主题挖掘、大众集中讨论的话题、讨论关注程度、舆论关注倾向、关联类似事件、聚焦某一种情绪、人群范围、事件涉及群体、谁…

企业电子招标采购系统源码java 版本 Spring Cloud + Spring Boot

项目说明 随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大&#xff0c;公司对内部招采管理的提升提出了更高的要求。在企业里建立一个公平、公开、公正的采购环境&#xff0c;最大限度控制采购成本至关重要。符合国家电子招投标法律法规及相关规范&#xff0c;以及…

程序员的职场,光有技术是不行的,送给每个即将工作的程序员

又是一年五月份&#xff0c;大批量学计算机的学生又要涌入职场了&#xff0c;牛皮的已经早早找到了工作&#xff0c;但不管你技术再牛&#xff0c;在程序员的职场&#xff0c;光有技术是不行的&#xff0c;你还要懂得一些职场的雷坑和上升技巧。 我做了二十多年程序员&#xf…