历史高频行情数据存储最佳实践:DolphinDB Array Vector 使用指南

news2024/11/19 5:31:28
越来越多的机构使用 L1/L2 的快照行情数据进行量化金融的研究。作为一个高性能时序数据库,DolphinDB 非常适合存储和处理海量的历史高频行情数据。针对快照数据包含多档位信息的特点,DolphinDB 研发了一种方便、灵活且高效的数据结构——Array Vector,可以显著地简化某些常用的查询与计算代码,提高存储和计算地性能。Array Vector 还可以存储不定长的二维数组,在数据处理、模型应用等方面都具有重要意义。

体验 DolphinDB Array Vector 带来的性能提升!戳链接获取:高频行情存储&计算解决方案。

概念

DolphinDB 提供了一种特殊的数据结构,数组向量(Array Vector),用以存储比如股票的多档报价数据。顾名思义,Array Vector 的每一个元素是一个一维数组(类型完全相同)。这种存储方式不仅可以简化某些查询与计算的代码,在不同列中含有大量重复数据的情况下,还可以提高数据压缩比,提升查询速度。计算上,Array Vector 可以与标量、向量或另一个 Array Vector 进行二元运算,能够方便地实现向量化运算,提升计算性能。

Array Vector 和矩阵(Matrix)都可以组织二维的结构化数据,但两者有很大区别。首先矩阵的每一行长度相同,Array Vector 无这一要求。其次,在 DolphinDB 的实现中,矩阵的存储是列优先,而Array Vector是行优先。根据实现方式的不同,Array Vector 又可以进一步分为 **Fast Array Vector(数组向量)**和 **Columnar Tuple(列式元组)**两种。后续在无特殊注明的情况下,Array Vector 一般指 Fast Array Vector。

Fast Array Vector 的实现有两个常规的向量组成,第二个向量连续存储所有行的数据,第一个向量存储每一行的数据结束的索引。这一种存储方式十分紧凑,读写和计算的效率都很高,但很难改变每一行的长度。为弥补这个不足,DolphinDB 引入了 **Columnar Tuple。**顾名思义,Columnar Tuple 是一个元组,代表一列,其中每一个元素代表一行,但值类型必须保持一致。Columnar Tuple 的元素都是一个独立的对象(Scalar 或 Vector),因此可以方便的更改。

Fast Array Vector 和 Columnar Tuple 的区别

  • Fast Array Vector 目前暂时不支持 SYMBOL 和 STRING 两种类型;Columnar Tuple 支持。
  • 使用 typestr 函数查看数据类型,Fast Array Vector 返回的是 “FAST XXX[] VECTOR”;Columnar Tuple 返回的是 “ANY VECTOR”,所以需要用 isColumnarTuple 函数判断变量类型是否为列式元组。
  • Fast Array Vector 的存储、查询、计算等效率更高,但更新和删除操作效率较低并且每一行的元素长度不能改变;Columnar Tuple 在更新和删除操作上的表现更好且每一行的元素长度允许改变。【上述的更新和删除操作是通过底层的 C++ 代码实现的。目前更新和删除的接口未开放,脚本层面还不支持更新和删除 Array Vector 中的元素。所以一般情况下,推荐用户使用 Fast Array Vector (数组向量)。】

Fast Array Vector 和 Columnar Tuple 的应用场景

  • Fast Array Vector:数值类型的二维数组的存储和计算。比如,Level 2 行情的十档量价和50档委托等。
  • Columnar Tuple:需要频繁更新二维数组元素的场景。比如,响应式状态引擎内部会自动把输入的 Fast Array Vector 的列转化为 Columnar Tuple,以此提高更新引擎状态的效率。这部分的转化是引擎内部自动完成的,对用户是透明的。用户不会感知到这一个过程,只需要向引擎传入 Fast Array Vector 的列即可。

Array Vector 支持的函数和操作

Array Vector 的创建

创建 Array Vector 类型的变量
  • Fast Array Vector (数组向量)

(1)通过 array 或 bigarray 函数定义空的数组向量,并通过 append! 添加数据

x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
/* x
[[1,2,3],[4,5],[6,7,8],[9,10]]
*/

(2)通过 fixedLengthArrayVector 将多个向量/元组/矩阵或表拼接成数组向量。

vec = 1 2 3
tp = [4 5 6, 7 8 9]
m =  matrix(10 11 12, 13 14 15, 16 17 18)
tb = table(19 20 21 as v1, 22 23 24 as v2)
x = fixedLengthArrayVector(vec, tp, m, tb)
/* x
[[1,4,7,10,13,16,19,22],[2,5,8,11,14,17,20,23],[3,6,9,12,15,18,21,24]]
*/

(3)通过 arrayVector 将单个向量拆分成数组向量。

x = arrayVector(3 5 8 10, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
/* x
[[1,2,3],[4,5],[6,7,8],[9,10]]
*/
  • Columnar Tuple (列式元组)

通过 setColumnarTuple! 函数将一个普通元组转换成列式元组。

x = [[1,2,3],[4,5],[6,7,8],[9,10]].setColumnarTuple!()
/* x
([1,2,3],[4,5],[6,7,8],[9,10])
*/
创建含有 Array Vector 类型列的表

(1)创建 Array Vector 类型的变量,将其指定为表中的一列。

Fast Array Vector 的变量在表中的列类型是 "XXX[]",比如 "INT[]"、"DOUBLE[]" 等;

Columnar Tuple 的变量在表中为 ”ANY“ 类型。

x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
t = table(1 2 3 4 as id, x as newCol1)
update t set newCol2=x
t["newCol3"] = y
/* t
id newCol1 newCol2 newCol3
-- ------- ------- -------
1  [1,2,3] [1,2,3] [1,2,3]
2  [4,5]   [4,5]   [4,5]  
3  [6,7,8] [6,7,8] [6,7,8]
4  [9,10]  [9,10]  [9,10] 
*/

t.schema().colDefs
/*
name    typeString typeInt extra comment
------- ---------- ------- ----- -------
id      INT        4                    
newCol1 INT[]      68                   
newCol2 INT[]      68                   
newCol3 ANY        25   
*/

(2)通过 fixedLengthArrayVector 将表中多列拼成一列。

t = table(1 2 3 4 as id, 1 3 5 6 as v1, 4 7 9 3 as v2)
t = select *, fixedLengthArrayVector(v1, v2) as newCol from t
/* t
id v1 v2 newCol
-- -- -- ------
1  1  4  [1,4] 
2  3  7  [3,7] 
3  5  9  [5,9] 
4  6  3  [6,3] 
*/

(3)通过 toArray + group by 将表中每组的数据组合成 Array Vector。

t = table(1 1 3 4 as id, 1 3 5 6 as v1)
new_t = select toArray(v1) as newV1 from t group by id
/* new_t
id newV1
-- -----
1  [1,3]
3  [5]  
4  [6]  
*/

注意事项

因为 toArray 生成的结果是 Fast Array Vector 类型的数据,Fast Array Vector 暂时不支持 SYMBOL 和 STRING 类型,所以 toArray 函数暂时也不支持对 SYMBOL 和 STRING 的列使用。

在 group by 的时候,如果需要将 SYMBOL 和 STRING 类型的数据组合,建议使用字符串拼接,比如

t = table(1 1 3 4 as group_id, `a1`a2`a3`a4 as name)
new_t = select concat(name, ";") as name from t group by group_id 
/* new_t
group_id name 
-------- -----
1        a1;a2
3        a3   
4        a4        
*/

使用的时候,用 split 函数再将字符串拆分,比如

select *, name.split(";")[0] as name0 from new_t
/*
group_id name  name0
-------- ----- -----
1        a1;a2 a1   
3        a3    a3   
4        a4    a4   
*/

(4)设置 loadText 的 schema 和 arrayDelimiter,从文本文件中读取含 Fast Array Vector 列的表。

用 saveText 存储数据时,Fast Array Vector 的列会自动存储为如下格式:Array Vector 内部用 arrayDelimiter 隔开。

x = array(INT[], 0).append!([1 3 5, 2 7 9])
t = table(1 2 as id, x as value) 
saveText(t, "./test.csv")

"./test.csv" 内的文本内容

id,value
1,"1,3,5"
2,"2,7,9"

针对上面的 csv,读文本数据时,schema 中设置 value 列的类型为 “INT[]”,arrayDelimiter 设置为逗号 “,“。

t = loadText("./test.csv", schema=table(`id`value as name, ["INT", "INT[]"] as type), arrayDelimiter=",")
/* t
id value  
-- -------
1  [1,3,5]
2  [2,7,9]
*/t = loadText("./test.csv", schema=table(`id`value as name, ["INT", "INT[]"] as type), arrayDelimiter=",") /* t id value   -- ------- 1  [1,3,5] 2  [2,7,9] */

Array Vector 的基础操作

访问 Array Vector 中的元素

用户可以通过函数(row、at)的方式访问 Array Vector 的行列;也可以通过下标(“x[index]”)的方式访问 Array Vector 的元素。

当使用下标的方式(“x[index]”)访问 Array Vector 时满足以下规则:

(1)当 index 为标量或者数据对的时候,表示对列操作,比如 index=0 或者 index=0:3;

(2)当 index 为向量的时候,表示对行的操作,比如 index = [1, 2, 3];

(3)当 index 越界时,对应位置的数据会用空值填充;

访问 Array Vector 变量中的行
  • 通过 row 函数读取 Array Vector 中的一行。【只能访问一行,返回一个向量】
x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
x.row(1)
/*
[4,5]
*/

y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
y.row(1)
/*
[4,5]
*/

// 当 index 越界时,空值填充
x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
x.row(10)
/*
[,,,]
*/

y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
y.row(10)
/*
[,,,]
*/
  • 通过 x[index] 且 index 为向量的方式访问 Array Vector 的行。【可以访问多行,返回一个和 x 相同类型的 Array Vector】
// 读取一行
x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
x[[1]]
/*
[[4,5]]
*/

y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
y[[1]]
/*
([4,5])
*/

// 读取多行
x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
x[[1, 2]]
/*
[[4,5],[6,7,8]]
*/

y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
y[[1, 2]]
/*
([4,5],[6,7,8])
*/

// 当 index 越界时,空值填充
x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
x[[3, 4]]
/*
[[9,10],]
*/

y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
y[[3, 4]]
/*
([9,10],)
*/
访问 Array Vector 变量中的列
  • 通过 x[index] 且 index 为标量的方式访问 Array Vector 的一列。【只能访问一列,返回一个向量】
x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
x[1]
/*
[2,5,7,10]
*/

y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
y[1]
/*
[2,5,7,10]
*/

// 当 index 越界时,空值填充
x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
x[10]
/*
[,,,]
*/

y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
y[10]
/*
[,,,]
*/
  • 通过 x[start:end] 的方式访问 Array Vector 的列。【可以访问多行,返回一个 Array Vector】
// end 不为空
x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
x[0:2]
/*
[[1,2],[4,5],[6,7],[9,10]]
*/

y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
y[0:2]
/*
[[1,2],[4,5],[6,7],[9,10]]
*/

// end 为空
x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
x[1:]
/*
[[2,3],[5],[7,8],[10]]
*/

y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
y[1:]
/*
([2,3],[5],[7,8],[10])
*/

// 当 index 越界时,空值填充
x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
x[1:4]
/*
[[2,3,],[5,,],[7,8,],[10,,]]
*/

y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
y[1:4]
/*
[[2,3,],[5,,],[7,8,],[10,,]]
*/

注意事项

x[start:end] 中必须满足 start >= 0 且 start < end, 否则参数校验时抛出异常。

当 x 是 STRING 或 SYMBOL 类型的 Columnar Tuple,结果返回一个 Columnar Tuple;

当 x 是 Columnar Tuple 且 end == NULL(每一行的长度可能不一致),结果返回一个 Columnar Tuple;

其他情况下,结果都会返回一个 Fast Array Vector。

访问 Array Vector 变量中 r 行 c 列的元素
  • Fast Array Vector 可以通过 x[r, c] 的方式定位一个元素;Columnar Tuple 可以通过 x[c, r] 的方式定位一个元素
r, c = 2, 1
x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
x[r, c]
/*
[7]
*/

y = [[1,2,3],[4,5],[6,7,8],[9,10]].setColumnarTuple!()
y[c, r]
/*
7
*/

注意事项:x[r, c] 中的索引 r 和 c 不能越界,否则参数校验时抛出异常。

  • 先定位行,再定位列
r, c = 2, 1
rows, cols = [1, 2, 3], 0:2

// 用函数先定位某一行,再定位某一列
x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
x.row(r).at(c)
/*
7
*/

y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
y.row(r).at(c)
/*
7
*/

// 用下标先定位某一行,再定位某一列
x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
x[[r]][c]
/*
[7]
*/

y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
y[[r]][c]
/*
[7]
*/

// 用下标先定位某几行,再定位某几列
x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
x[rows][cols]
/*
[[4,5],[6,7],[9,10]]
*/

y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
y[rows][cols]
/*
[[4,5],[6,7],[9,10]]
*/

// 当 index 越界时,空值填充
x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
x[2 3 4][1:3]
/*
[[7,8],[10,],[,]]
*/

y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
y[2 3 4][1:3]
/*
[[7,8],[10,],[,]]
*/
  • 先定位列,再定位行
r, c = 2, 1
rows, cols = [1, 2, 3], 0:2

// 用下标先定位某一列,再定位某一行
x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
x[c][r]
/*
7
*/

y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
y[c][r]
/*
7
*/

// 用下标先定位某几列,再定位某几行
x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
x[cols][rows]
/*
[[4,5],[6,7],[9,10]]
*/

y = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
y[cols][rows]
/*
[[4,5],[6,7],[9,10]]
*/

// 当 index 越界时,空值填充
x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
x[1:3][2 3 4]
/*
[[7,8],[10,],]
*/

y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
y[1:3][2 3 4]
/*
[[7,8],[10,],]
*/
访问表中 Array Vector 列的元素

可以通过 x[index] 且 index 为标量或者数据对的方式将表中 Array Vector 列的指定位置的元素取出。

当 index 越界时,指定位置元素用空值填充。

x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
t = table(1 2 3 4 as id, x as x, y as y)
new_t = select *, x[2] as x_newCol1, x[1:3] as x_newCol2, y[2] as y_newCol1, y[1:3] as y_newCol2 from t
/* new_t
id x       y       x_newCol1 x_newCol2 y_newCol1 y_newCol2
-- ------- ------- --------- --------- --------- ---------
1  [1,2,3] [1,2,3] 3         [2,3]     3         [2,3]    
2  [4,5]   [4,5]             [5,]                [5,]     
3  [6,7,8] [6,7,8] 8         [7,8]     8         [7,8]    
4  [9,10]  [9,10]            [10,]               [10,]    
*/
Array Vector 插入数据

Array Vector 目前支持末尾增加行的操作,暂时不支持修改和删除元素的操作。

  • 通过 append! 方法向 Array Vector 的变量中插入数据
// 插入一行
x = array(INT[], 0).append!([1 2 3, 4 5 6])
x.append!(7)
x.append!([8 9])
/* x
[[1,2,3],[4,5,6],[7],[8,9]]
*/

y = [1 2 3, 4 5 6].setColumnarTuple!()
y.append!(7)
y.append!([8 9])
/* y
([1,2,3],[4,5,6],7,[8,9])
*/

// 插入多行
x = array(INT[], 0).append!([1 2 3, 4 5 6])
x.append!([7, 8 9])
/* x
[[1,2,3],[4,5,6],[7],[8,9]]
*/

y = [1 2 3, 4 5 6].setColumnarTuple!()
y.append!([7, 8 9])
/* y
([1,2,3],[4,5,6],7,[8,9])
*/
  • 通过 tableInsert 、 append! 等方法向含有 Array Vector 列的表中插入数据
x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
t = table(1 2 3 4 as id, x as col1, y as col2)
t.tableInsert(5, 11, 11)
t.tableInsert(6, [12 13 14], [12 13 14])
t.append!(table(7 8 as id, [15 16, 17] as col1, [15 16, 17] as col2))
/* t
id col1       col2      
-- ---------- ----------
1  [1,2,3]    [1,2,3]   
2  [4,5]      [4,5]     
3  [6,7,8]    [6,7,8]   
4  [9,10]     [9,10]    
5  [11]       11        
6  [12,13,14] [12,13,14]
7  [15,16]    [15,16]   
8  [17]       17         
*/
转化 Array Vector 为向量、矩阵

(1)通过 flatten 函数可以将 Array Vector 展开成一维向量 。

x = array(INT[], 0).append!([1 2 3, 4 5 6])
z = flatten(x)
/* z
[1,2,3,4,5,6]
*/

y = [1 2 3, 4 5 6].setColumnarTuple!()
z = flatten(y)
/* z
[1,2,3,4,5,6]
*/

(2)通过 matrix 函数可以将等长的 Array Vector 转化为矩阵 。

x = array(INT[], 0).append!([1 2 3, 4 5 6])
z = matrix(x)
/* z
#0 #1 #2
-- -- --
1  2  3 
4  5  6 
*/

y = [1 2 3, 4 5 6].setColumnarTuple!()
z = matrix(y)
/* z
#0 #1
-- --
1  4 
2  5 
3  6 
*/
对 Fast Array Vector 的每个元素过滤

可以通过 x[cond] 的方式将 Fast Array Vector 所有满足 cond 条件的元素过滤出来。

(该功能 1.30.21 / 2.00.10 后支持)

x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
z = x[x>=5]
/* z
[,[5],[6,7,8],[9,10]]
*/

t = table(1 2 3 4 as id, x as x)
new_t = select *, x[x>=5] as newCol from t
/* new_t
id x       newCol 
-- ------- -------
1  [1,2,3] []  
2  [4,5]   [5]    
3  [6,7,8] [6,7,8]
4  [9,10]  [9,10] 
*/

Array Vector 的计算

和标量计算

即把 Array Vector 的每个元素和标量做计算。

x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
z = x + 1
/* z
[[2,3,4],[5,6],[7,8,9],[10,11]]
*/

y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
z = y + 1
/* z
([2,3,4],[5,6],[7,8,9],[10,11])
*/

t = table(1 2 3 4 as id, x as x, y as y)
new_t = select *, x + 1 as new_x, y + 1 as new_y from t
/* new_t
id x       y       new_x   new_y  
-- ------- ------- ------- -------
1  [1,2,3] [1,2,3] [2,3,4] [2,3,4]
2  [4,5]   [4,5]   [5,6]   [5,6]  
3  [6,7,8] [6,7,8] [7,8,9] [7,8,9]
4  [9,10]  [9,10]  [10,11] [10,11]
*/
和向量计算

要求向量的长度和 Array Vector 的行数相等。

即把 Array Vector 的每行数据和向量对应位置的元素做计算。

x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
z = x * [1, 2, 3, 4]
/* z
[[1,2,3],[8,10],[18,21,24],[36,40]]
*/

y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
z = y * [1, 2, 3, 4]
/* z
([1,2,3],[8,10],[18,21,24],[36,40])
*/

t = table(1 2 3 4 as id, x as x, y as y)
new_t = select *, x*id as new_x, y*id as new_y from t
/* new_t
id x       y       new_x      new_y     
-- ------- ------- ---------- ----------
1  [1,2,3] [1,2,3] [1,2,3]    [1,2,3]   
2  [4,5]   [4,5]   [8,10]     [8,10]    
3  [6,7,8] [6,7,8] [18,21,24] [18,21,24]
4  [9,10]  [9,10]  [36,40]    [36,40]  
*/
和 Array Vector 计算

要求两个 Array Vector 的大小相等。

即把两个 Array Vector 的对应位置的元素做计算。

x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
xx = array(INT[], 0).append!([3 1 2, 3 1, 1 2 3, 0 1])
z = pow(x, xx)
/* z
[[1,2,9],[64,5],[6,49,512],[1,10]]
*/

y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
yy = [3 1 2, 3 1, 1 2 3, 0 1].setColumnarTuple!()
z = pow(y, yy)
/* z
([1,2,9],[64,5],[6,49,512],[1,10])
*/

t = table(1 2 3 4 as id, x as x, xx as xx, y as y, yy as yy)
new_t = select *, pow(x, xx) as new_x, pow(y, yy) as new_y from t
/* new_t
id x       xx      y       yy      new_x      new_y     
-- ------- ------- ------- ------- ---------- ----------
1  [1,2,3] [3,1,2] [1,2,3] [3,1,2] [1,2,9]    [1,2,9]   
2  [4,5]   [3,1]   [4,5]   [3,1]   [64,5]     [64,5]    
3  [6,7,8] [1,2,3] [6,7,8] [1,2,3] [6,49,512] [6,49,512]
4  [9,10]  [0,1]   [9,10]  [0,1]   [1,10]     [1,10]     
*/

注意事项

Fast Array Vector 和 Columnar Tuple 之间不能直接计算。

按行计算

(1)Array Vector 支持 行计算系列(row 系列)。

为了满足用户逐行计算的需求,DolphinDB 设计了 row 系列函数。

row 系列函数以 “rowFunc“ 的格式命名,例如 rowSumrowAlign 等函数。

row 系列函数的输入的参数可以是向量 / 向量元组 / 矩阵 / Array Vector。

  • 单目函数示例:按行求和
x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
z = rowSum(x)
/* z
[6,9,21,19]
*/

y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
z = rowSum(y)
/* z
[6,9,21,19]
*/

t = table(1 2 3 4 as id, x as x, y as y)
new_t = select *, rowSum(x) as new_x, rowSum(y) as new_y from t
/* new_t
id x       y       new_x new_y
-- ------- ------- ----- -----
1  [1,2,3] [1,2,3] 6     6    
2  [4,5]   [4,5]   9     9    
3  [6,7,8] [6,7,8] 21    21   
4  [9,10]  [9,10]  19    19   
*/
  • 双目函数示例(Array Vector 和向量):按行求加权平均
x = array(INT[], 0).append!([1 2 3, 4 5 6, 6 7 8, 9 10 11])
z = rowWavg(x, [1, 1, 2])
/* z
[2.25,5.25,7.25,10.25]
*/

y = [1 2 3, 4 5 6, 6 7 8, 9 10 11].setColumnarTuple!()
z = rowWavg(y, [1, 1, 2])
/* z
[2.25,5.25,7.25,10.25]
*/

t = table(1 2 3 4 as id, x as x, y as y)
new_t = select *, rowWavg(x, [1, 1, 2]) as new_x, rowWavg(y, [1, 1, 2]) as new_y from t
/* new_t
id x         y         new_x new_y
-- --------- --------- ----- -----
1  [1,2,3]   [1,2,3]   2.25  2.25 
2  [4,5,6]   [4,5,6]   5.25  5.25 
3  [6,7,8]   [6,7,8]   7.25  7.25 
4  [9,10,11] [9,10,11] 10.25 10.25
*/
  • 双目函数示例(Array Vector 和 Array Vector):按行求相关系数
x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
xx = array(INT[], 0).append!([3 1 2, 3 1, 1 2 3, 0 1])
z = rowCorr(x, xx)
/* z
[-0.5,-1,1,1]
*/

y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
yy = [3 1 2, 3 1, 1 2 3, 0 1].setColumnarTuple!()
z = rowCorr(y, yy)
/* z
[-0.5,-1,1,1]
*/

t = table(1 2 3 4 as id, x as x, xx as xx, y as y, yy as yy)
new_t = select *, rowCorr(x, xx) as new_x, rowCorr(y, yy) as new_y from t
/* new_t
id x       xx      y       yy      new_x new_y
-- ------- ------- ------- ------- ----- -----
1  [1,2,3] [3,1,2] [1,2,3] [3,1,2] -0.5  -0.5 
2  [4,5]   [3,1]   [4,5]   [3,1]   -1    -1   
3  [6,7,8] [1,2,3] [6,7,8] [1,2,3] 1     1    
4  [9,10]  [0,1]   [9,10]  [0,1]   1     1       
*/
  • 特殊双目函数示例:按行对齐

针对金融场景的存在的特殊的数据对齐规则,DolphinDB 开发了 rowAlign 和 rowAt 函数。

rowAlign(left, right, how):实现 left 和 right 的数据对齐。输入参数 leftright 是数组向量,表示需要对齐的数据;how 是字符串,表述对齐的方式;最终返回一个长度为 2 的元组,分别表示对齐后数据在原数据中的索引。

rowAt(X, Y):实现按行从 X 中取出 Y 索引的元素。输入参数 X 是矩阵或数组向量;当 Y 是和 X 行数相等的向量时,返回一个与 Y 长度相同的向量;当 Y 是和 X 行数相等的数组向量时,返回一个与 Y 维度相同的数组向量。

下面以 how="bid" 为例,说明具体的 rowAlign 对齐结果。

假设 left 是某个时刻的五档买价,right 是上一时刻的五档买价,都是严格单调减的序列

根据买价的值,按行进行数据对齐

根据 how 指定的对齐规则,保留满足条件的数据。how="bid" 时,最大值为 max(max(left), max(right)) = max(8.99, 9.00) = 9.00;最小值为 max(min(left), min(right)) = max(8.91, 8.95) = 8.95。【下图蓝色部分为删除部分】

获取剩余数据在原来向量中的索引,没有数据的位置用 -1 填充。

下面以上图 left 和 leftIndex 为例,说明具体的 rowAt(left, leftIndex) 取数结果。

left = array(DOUBLE[], 0).append!([9.00 8.98 8.97 8.96 8.95, 8.99 8.97 8.95 8.93 8.91])
right = prev(left)
/* 
left:[[9,8.98,8.97,8.96,8.949999999999999],[8.99,8.97,8.949999999999999,8.929999999999999,8.91]]
right:[,[9,8.98,8.97,8.96,8.949999999999999]]
*/

leftIndex, rightIndex = rowAlign(left, right, how="bid")
/*
leftIndex:[[0,1,2,3,4],[-1,0,-1,1,-1,2]]
rightIndex:[[-1,-1,-1,-1,-1],[0,-1,1,2,3,4]]
*/

leftResult = rowAt(left, leftIndex)
rightResult = rowAt(right, rightIndex)
/*
leftResult:[[9,8.98,8.97,8.96,8.949999999999999],[,8.99,,8.97,,8.949999999999999]]
rightResult:[[,,,,],[9,,8.98,8.97,8.96,8.949999999999999]]
*/

(2)Fast Array Vector 支持调用高阶函数 byRow,对数组向量的每行元素进行计算。

  • 窗口函数示例:求每行的累计和
x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
z = byRow(cumsum, x)
/* z
[[1,3,6],[4,9],[6,13,21],[9,19]]
*/

t = table(1 2 3 4 as id, x as x)
new_t = select *, byRow(cumsum, x) as new_x from t
/* new_t
id x       new_x        
-- ------- ---------
1  [1,2,3] [1,3,6]  
2  [4,5]   [4,9]    
3  [6,7,8] [6,13,21]
4  [9,10]  [9,19]   
*/

注意事项

  • byRow 只支持 Fast Array Vector
  • 使用 byRow 函数时,自定义函数 func 的返回值只能是标量(defg 定义的聚合函数)或者和 Fast Array Vector 每行等长的向量(类似窗口函数)。
// 返回值是标量
defg foo1(v){
    return last(v)-first(v)
}

// 返回值是和 v 等长的向量
def foo2(v){
    return v \ prev(v) - 1
}

x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
x1 = byRow(foo1, x)
x2 = byRow(foo2, x)
/*
x1: [2,1,2,1]
x2: [[,1,0.5],[,0.25],[,0.166666666666667,0.142857142857143],[,0.111111111111111]]
*/

t = table(1 2 3 4 as id, x as x)
new_t = select *, byRow(foo1, x) as x1, byRow(foo2, x) as x2 from t
/* new_t
id x       x1 x2                                    
-- ------- -- --------------------------------------
1  [1,2,3] 2  [,1,0.5]                              
2  [4,5]   1  [,0.25]                               
3  [6,7,8] 2  [,0.166666666666667,0.142857142857143]
4  [9,10]  1  [,0.111111111111111]                  
*/

(3)Array Vector 支持调用高阶函数 each 和 loop,对 Array Vector 的每行元素进行计算。

和 byRow 函数使用方式类似,都是将自定义函数作用在 Array Vector 的每一行上。

  • 两者的区别是:
    • byRow 只支持 Fast Array Vector;each 和 loop 支持 Fast Array Vector 和 Columnar Tuple。
    • each 和 loop 的自定义函数 func,返回值没有限制,可以是和每行元素不同长度的向量。自定义函数 func 返回值是向量时,返回的结果是 tuple 类型,对应表中的列是 Columnar Tuple 类型。
    • byRow 的自定义函数 func 返回值是向量时,byRow 返回的结果是 Fast Array Vector 类型,对应表中的列也是 Fast Array Vector 类型。
// 返回值可以是和 v 不等长的向量
def foo3(v){
    return [last(v)-first(v), last(v)+first(v)]
} 

x = array(INT[], 0).append!([1 2 3, 4 5, 6 7 8, 9 10])
z = loop(foo3, x)
/* z
([2,4],[1,9],[2,14],[1,19])
*/

y = [1 2 3, 4 5, 6 7 8, 9 10].setColumnarTuple!()
z = loop(foo3, y)
/* z
([2,4],[1,9],[2,14],[1,19])
*/

t = table(1 2 3 4 as id, x as x, y as y)
new_t = select *, loop(foo3, x) as new_x, loop(foo3, y) as new_y from t
/* new_t
id x       y       new_x  new_y 
-- ------- ------- ------ ------
1  [1,2,3] [1,2,3] [2,4]  [2,4] 
2  [4,5]   [4,5]   [1,9]  [1,9] 
3  [6,7,8] [6,7,8] [2,14] [2,14]
4  [9,10]  [9,10]  [1,19] [1,19]
*/

API 写入

C++ API

Step1:C++ 中创建表对象。指定对应 Array Vector 列的类型为 DT_XX_ARRAY。

(比如下例中的 value 列是 INT 类型的 Array Vector,所以指定为 DT_INT_ARRAY)

int rowNum = 3;
int colNum = 2;
vector<string> colNames = {"id","value"};
vector<DATA_TYPE> colTypes = {DT_INT, DT_INT_ARRAY};
ConstantSP table = Util::createTable(colNames, colTypes, rowNum, rowNum+1);
vector<VectorSP> columnVecs;
columnVecs.reserve(colNum);
for(int i = 0; i < colNum; ++i) {
    columnVecs.emplace_back(table->getColumn(i));
}

Step2:逐行向 C++ 的表对象中添加数据。创建一个 DdbVector 类型的向量,作为 Array Vector 里的一行。

for(int i = 0; i < rowNum; ++i) {
    // 构造 Array Vector 的一行
    DdbVector<int> intVec(0, 10);
    for(int j = 0; j < 3; ++j) {
        intVec.add(i*3+j);
    }
    VectorSP value_row = intVec.createVector(DT_INT);
    columnVecs[0]->setInt(i, i);        // id 列
    columnVecs[1]->set(i, value_row);    // value 列
}

Step3:连接 DolphinDB,上传数据。

(示例中,是通过 upload 方法把表数据 table 上传到 DolphinDB 的内存表 myTable。C++ API 的更多操作可以查看教程:C++ API使用教程)

// 连接 DolphinDB 节点
DBConnection conn;
conn.connect("127.0.0.1", 8848);
conn.login("admin", "123456", false);
// 上传数据到内存表
conn.upload("myTable", table);

Step4:查询数据

string script = "select * from myTable;";
ConstantSP result = conn.run(script);
std::cout<<"------ check data ------"<<std::endl;
std::cout<<result->getString()<<std::endl;

Java API

Step1:连接 DolphinDB,并在DolphinDB 中创建一张维度表,用于接收数据。指定对应列的类型为 XX[]。

(比如下例中的 value 列是 INT 类型的 Array Vector,所以指定为 INT[])

// 连接 DolphinDB 的节点
DBConnection conn = new DBConnection();
boolean success = conn.connect("127.0.0.1", 8848, "admin", "123456");
// 创建维度表
String ddb_script = "dbName = \"dfs://testDB\"\n" +
                "tbName = \"test\"\n" +
                "if(existsDatabase(dbName)) {\n" +
                "    dropDatabase(dbName)\n" +
                "}\n" +
                "db = database(dbName, VALUE, 1 2 3 4, , 'TSDB')\n" +
                "schemaTB = table(1:0, `id`value, [INT, INT[]])\n" +
                "pt = createTable(db, schemaTB, tbName, sortColumns=`id)";
conn.run(ddb_script);

Step2:Java 构造数据,创建表对象。每一列对应一个 List,Array Vector 列是一个类型为 vector 的 List,其中每一行是一个向量。

List<String> colNames = Arrays.asList("id", "value");
List<Vector> cols = new ArrayList<>(6);
int rowNum = 4;
List<Integer> idCol = new ArrayList<>(rowNum);
List<Vector> valueCol = new ArrayList<>(rowNum);    // Array Vector 列
for(int i = 0; i < rowNum; ++i) {
    idCol.add(i + 1);
    List<Integer> valueRow = new ArrayList<>(50);   // Array Vector 中的一行
    for(int j = 0; j < 3; ++j) {
        valueRow.add(i*3 +j);
    }
    valueCol.add(new BasicIntVector(valueRow));
}
cols.add(new BasicIntVector(idCol));
cols.add(new BasicArrayVector(valueCol));
BasicTable tb = new BasicTable(colNames, cols);

Step3:往预先创建的维度表中插入数据。

(示例中,是通过 tableInsert 方法把表数据 tb 上传到 DolphinDB 的维度表 loadTable ('dfs://testDB','test')。Java API 的更多操作可以查看教程:Java API使用教程)

List<Entity> tbArg = new ArrayList<>(1);
tbArg.add(tb);
conn.run("tableInsert{loadTable('dfs://testDB','test')}", tbArg);

Step4:查询数据

BasicTable t;
t = (BasicTable)conn.run("select * from loadTable('dfs://testDB','test')");
System.out.println(t.getString());

Python API

Step1:Python 中创建表对象。

(比如下例中的 value 列是 INT 类型的 Array Vector)

df = pd.DataFrame({
    'id': [1, 2, 3, 4],
    'value': [np.array([1,2,3],dtype=np.int64), np.array([4,5,6],dtype=np.int64), np.array([7,8,9],dtype=np.int64), np.array([10,11,12],dtype=np.int64)]
})

Step2:连接 DolphinDB,上传数据。

(示例中,是通过 table 方法把数据框 df 上传到 DolphinDB 的内存表 myTable。Python API 的更多操作可以查看教程:Python API使用教程)

// 连接 DolphinDB 节点
s = ddb.session()
s.connect("127.0.0.1", 8848, "admin", "123456")
// 上传数据到内存表
s.table(data=df, tableAliasName="myTable")

Step3:查询数据

t = s.run("select * from myTable")
print(t)

 

Array Vector 在 level2 快照数据中的应用

快照数据的存储

Array Vector 在存储中的一个典型应用场景就是存储快照十档行情数据。

level 2 的快照数据包含买卖十档价格、十档成交量、十档实际总委托笔数、前 50 笔买卖订单等多种多档数据,并且后续也常常需要将这些数据整体计算处理,所以很适合将多档数据作为整体存储为一列。

  • 多档多列存储

  • Array Vector 存储

以上交所 level 2 的快照数据为例,进行具体的说明。下面是本教程中的存储的分布式表结构。

字段名称数据类型数据说明
SecurityIDSYMBOL证券代码
DateTimeTIMESTAMP日期时间
PreClosePxDOUBLE昨收价
OpenPxDOUBLE开始价
HighPxDOUBLE最高价
LowPxDOUBLE最低价
LastPxDOUBLE最新价
TotalVolumeTradeINT成交总量
TotalValueTradeDOUBLE成交总金额
InstrumentStatusSYMBOL交易状态
BidPriceDOUBLE[]申买十价
BidOrderQtyINT[]申买十量
BidNumOrdersINT[]申买十实际总委托笔数
BidOrdersINT[]申买一前 50 笔订单
OfferPriceDOUBLE[]申卖十价
OfferOrderQtyINT[]申卖十量
OfferNumOrdersINT[]申卖十实际总委托笔数
OfferOrdersINT[]申卖一前 50 笔订单
NumTradesINT成交笔数
IOPVDOUBLEETF 净值估值
TotalBidQtyINT委托买入总量
TotalOfferQtyINT委托卖出总量
WeightedAvgBidPxDOUBLE加权平均委买价格
WeightedAvgOfferPxDOUBLE加权平均委卖价格
TotalBidNumberINT买入总笔数
TotalOfferNumberINT卖出总笔数
BidTradeMaxDurationINT买入成交最大等待时间
OfferTradeMaxDurationINT买入成交最大等待时间
NumBidOrdersINT买方委托价位数
NumOfferOrdersINT卖方委托价位数
WithdrawBuyNumberINT买入撤单笔数
WithdrawBuyAmountINT买入撤单数量
WithdrawBuyMoneyDOUBLE买入撤单金额
WithdrawSellNumberINT卖出撤单笔数
WithdrawSellAmountINT卖出撤单数量
WithdrawSellMoneyDOUBLE卖出撤单金额
ETFBuyNumberINTETF 申购笔数
ETFBuyAmountINTETF 申购数量
ETFBuyMoneyDOUBLEETF 申购金额
ETFSellNumberINTETF 赎回笔数
ETFSellAmountINTETF 赎回数量
ETFSellMoneyDOUBLEETF 赎回金额

其中,买卖十档价格(BidPrice / OfferPrice)、十档成交量(BidOrderQty / OfferOrderQty)、十档实际总委托笔数(BidNumOrders / OfferNumOrders)、前 50 笔买卖订单(BidOrders / OfferOrders)这 8 个字段都采用 Array Vector 的格式存储。在建表时,通过 “DOUBLE[]” 和 “INT[]” 的方式指定列的类型为 Array Vector。下面是对应的建表语句。

dbName = "dfs://SH_TSDB_snapshot_ArrayVector"
tbName = "snapshot"
if(existsDatabase(dbName)){
	dropDatabase(dbName)
}
db1 = database(, VALUE, 2020.01.01..2021.01.01)
db2 = database(, HASH, [SYMBOL, 20])
db = database(dbName, COMPO, [db1, db2], , "TSDB")
schemaTable = table(
	array(SYMBOL, 0) as SecurityID,
	array(TIMESTAMP, 0) as DateTime,
	array(DOUBLE, 0) as PreClosePx,
	array(DOUBLE, 0) as OpenPx,
	array(DOUBLE, 0) as HighPx,
	array(DOUBLE, 0) as LowPx,
	array(DOUBLE, 0) as LastPx,
	array(INT, 0) as TotalVolumeTrade,
	array(DOUBLE, 0) as TotalValueTrade,
	array(SYMBOL, 0) as InstrumentStatus,
	array(DOUBLE[], 0) as BidPrice,
	array(INT[], 0) as BidOrderQty,
	array(INT[], 0) as BidNumOrders,
	array(INT[], 0) as BidOrders,
	array(DOUBLE[], 0) as OfferPrice,
	array(INT[], 0) as OfferOrderQty,
	array(INT[], 0) as OfferNumOrders,
	array(INT[], 0) as OfferOrders,
	array(INT, 0) as NumTrades,
	array(DOUBLE, 0) as IOPV,
	array(INT, 0) as TotalBidQty,
	array(INT, 0) as TotalOfferQty,
	array(DOUBLE, 0) as WeightedAvgBidPx,
	array(DOUBLE, 0) as WeightedAvgOfferPx,
	array(INT, 0) as TotalBidNumber,
	array(INT, 0) as TotalOfferNumber,
	array(INT, 0) as BidTradeMaxDuration,
	array(INT, 0) as OfferTradeMaxDuration,
	array(INT, 0) as NumBidOrders,
	array(INT, 0) as NumOfferOrders,
	array(INT, 0) as WithdrawBuyNumber,
	array(INT, 0) as WithdrawBuyAmount,
	array(DOUBLE, 0) as WithdrawBuyMoney,
	array(INT, 0) as WithdrawSellNumber,
	array(INT, 0) as WithdrawSellAmount,
	array(DOUBLE, 0) as WithdrawSellMoney,
	array(INT, 0) as ETFBuyNumber,
	array(INT, 0) as ETFBuyAmount,
	array(DOUBLE, 0) as ETFBuyMoney,
	array(INT, 0) as ETFSellNumber,
	array(INT, 0) as ETFSellAmount,
	array(DOUBLE, 0) as ETFSellMoney
)
db.createPartitionedTable(table=schemaTable, tableName=tbName, partitionColumns=`DateTime`SecurityID, compressMethods={DateTime:"delta"}, sortColumns=`SecurityID`DateTime, keepDuplicates=ALL)

导入数据时,可以通过 fixedLengthArrayVector 函数将多档的数据合并成一列 Array Vector。下面是对应的导入数据脚本。

 def transform(data){
	t = select SecurityID, DateTime, PreClosePx, OpenPx, HighPx, LowPx, LastPx, TotalVolumeTrade, TotalValueTrade, InstrumentStatus,
		      fixedLengthArrayVector(BidPrice0, BidPrice1, BidPrice2, BidPrice3,  BidPrice4, BidPrice5, BidPrice6, BidPrice7, BidPrice8, BidPrice9) as BidPrice,
		      fixedLengthArrayVector(BidOrderQty0, BidOrderQty1, BidOrderQty2, BidOrderQty3,  BidOrderQty4, BidOrderQty5, BidOrderQty6, BidOrderQty7, BidOrderQty8, BidOrderQty9) as BidOrderQty,
		      fixedLengthArrayVector(BidNumOrders0, BidNumOrders1, BidNumOrders2, BidNumOrders3,  BidNumOrders4, BidNumOrders5, BidNumOrders6, BidNumOrders7, BidNumOrders8, BidNumOrders9) as BidNumOrders,
		      fixedLengthArrayVector(BidOrders0, BidOrders1, BidOrders2, BidOrders3,  BidOrders4, BidOrders5, BidOrders6, BidOrders7, BidOrders8, BidOrders9, BidOrders10, BidOrders11, BidOrders12, BidOrders13,  BidOrders14, BidOrders15, BidOrders16, BidOrders17, BidOrders18, BidOrders19, BidOrders20, BidOrders21, BidOrders22, BidOrders23,  BidOrders24, BidOrders25, BidOrders26, BidOrders27, BidOrders28, BidOrders29, BidOrders30, BidOrders31, BidOrders32, BidOrders33,  BidOrders34, BidOrders35, BidOrders36, BidOrders37, BidOrders38, BidOrders39, BidOrders40, BidOrders41, BidOrders42, BidOrders43,  BidOrders44, BidOrders45, BidOrders46, BidOrders47, BidOrders48, BidOrders49) as BidOrders,
		      fixedLengthArrayVector(OfferPrice0, OfferPrice1, OfferPrice2, OfferPrice3,  OfferPrice4, OfferPrice5, OfferPrice6, OfferPrice7, OfferPrice8, OfferPrice9) as OfferPrice,
		      fixedLengthArrayVector(OfferOrderQty0, OfferOrderQty1, OfferOrderQty2, OfferOrderQty3,  OfferOrderQty4, OfferOrderQty5, OfferOrderQty6, OfferOrderQty7, OfferOrderQty8, OfferOrderQty9) as OfferOrderQty,
		      fixedLengthArrayVector(OfferNumOrders0, OfferNumOrders1, OfferNumOrders2, OfferNumOrders3,  OfferNumOrders4, OfferNumOrders5, OfferNumOrders6, OfferNumOrders7, OfferNumOrders8, OfferNumOrders9) as OfferNumOrders,
		      fixedLengthArrayVector(OfferOrders0, OfferOrders1, OfferOrders2, OfferOrders3,  OfferOrders4, OfferOrders5, OfferOrders6, OfferOrders7, OfferOrders8, OfferOrders9, OfferOrders10, OfferOrders11, OfferOrders12, OfferOrders13,  OfferOrders14, OfferOrders15, OfferOrders16, OfferOrders17, OfferOrders18, OfferOrders19, OfferOrders20, OfferOrders21, OfferOrders22, OfferOrders23,  OfferOrders24, OfferOrders25, OfferOrders26, OfferOrders27, OfferOrders28, OfferOrders29, OfferOrders30, OfferOrders31, OfferOrders32, OfferOrders33,  OfferOrders34, OfferOrders35, OfferOrders36, OfferOrders37, OfferOrders38, OfferOrders39, OfferOrders40, OfferOrders41, OfferOrders42, OfferOrders43,  OfferOrders44, OfferOrders45, OfferOrders46, OfferOrders47, OfferOrders48, OfferOrders49) as OfferOrders,
		      NumTrades, IOPV, TotalBidQty, TotalOfferQty, WeightedAvgBidPx, WeightedAvgOfferPx, TotalBidNumber, TotalOfferNumber, BidTradeMaxDuration,OfferTradeMaxDuration, NumBidOrders, NumOfferOrders, WithdrawBuyNumber, WithdrawBuyAmount, WithdrawBuyMoney,WithdrawSellNumber, WithdrawSellAmount, WithdrawSellMoney, ETFBuyNumber, ETFBuyAmount, ETFBuyMoney, ETFSellNumber, ETFSellAmount, ETFSellMoney
		      from data
	return t
}

def loadData(csvDir, dbName, tbName){
	schemaTB = extractTextSchema(csvDir)
	update schemaTB set type = "SYMBOL" where name = "SecurityID"
	loadTextEx(dbHandle=database(dbName), tableName=tbName, partitionColumns=`DateTime`SecurityID, sortColumns=`SecurityID`DateTime, filename=csvDir, schema=schemaTB, transform=transform)
}
// 后台提交导入任务
csvDir = "/home/v2/下载/data/testdata/snapshot_100stocks_multi.csv"
dbName, tbName = "dfs://SH_TSDB_snapshot_ArrayVector", "snapshot"
submitJob("loadData", "load Data", loadData{csvDir, dbName, tbName})
getRecentJobs()

快照数据分组时保留明细

Array Vector 在存储中的另一个典型应用场景就是分组计算时将组内所有明细数据保留。

DolphinDB 提供了函数 toArraytoArray 搭配 group by 语句,可以将 group by 分组的数据存储成数组向量的一行,便于用户直接查看该分组下的所有数据。

比如,将快照数据聚合成 5 分钟 tick 数据并入库,结果表字段包括高开低收、成交量、成交金额、bid0 和ask0 的量价切片。

// 创建存储 5 min tick 数据的分布式库表
def createFiveMinuteBarDB(dbName, tbName){
	if(existsDatabase(dbName)){
		dropDatabase(dbName)
	}
	db = database(dbName, VALUE, 2021.01.01..2021.12.31, , "TSDB")
	colNames = `SecurityID`DateTime`OpenPx`HighPx`LowPx`LastPx`TotalVolume`TotalValueTrade`BidPrice0`BidOrderQty0`OfferPrice0`OfferOrderQty0
	colTypes = [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, DOUBLE[], INT[], DOUBLE[], INT[]]
	schemaTable = table(1:0, colNames, colTypes)
	db.createPartitionedTable(table=schemaTable, tableName=tbName, partitionColumns=`DateTime, compressMethods={DateTime:"delta"}, sortColumns=`SecurityID`DateTime, keepDuplicates=ALL)
}
dbName, tbName = "dfs://fiveMinuteBar", "fiveMinuteBar"
createFiveMinuteBarDB(dbName, tbName)

// 根据快照数计算 5 min tick 数据
t = select first(OpenPx) as OpenPx, max(HighPx) as HighPx, min(LowPx) as LowPx, last(LastPx) as LastPx, last(TotalVolumeTrade) as TotalVolumeTrade, last(TotalValueTrade) as TotalValueTrade, toArray(BidPrice[0]) as BidPrice0,  toArray(BidOrderQty[0]) as BidOrderQty0, toArray(OfferPrice[0]) as OfferPrice0, toArray(OfferOrderQty[0]) as OfferOrderQty0 from loadTable("dfs://SH_TSDB_snapshot_ArrayVector", "snapshot") group by SecurityID, interval(DateTime, 5m, "none") as DateTime map

// 将数据存入数据库
loadTable(dbName, tbName).append!(t)

基于快照数据的高频因子计算

DolphinDB 不仅提供了高速存取时序数据的基本功能,还内置了向量化的多范式编程语言和强大的计算引擎,可高效用于量化金融的因子开发,包括基于历史数据的批量高频因子计算和基于实时 Level 2 行情的流式计算。

本章节以基于快照数据计算高频因子为例,展示 Array Vector 在因子计算中的应用。

净委买增额

根据《量化交易因子挖掘笔记-从限价订单簿(LOB)挖掘高频价量因子 》 中对净委买增额的定义和计算方法,实现的指标公式如下:

其中 bidAmtDifft 表示 t 时刻的委买增额; bidi,t 表示 t 时刻快照数据的第 i 档买方报价;指示函数 I 含义如下:

根据上述公式,结合 DolphinDB 丰富的内置函数(prev 获取上一个 tick 的价格;rowSum 计算 Array Vector 的行和),可以转写出如下代码:

@state
def calculateAmtDiff(bid, ask, bidvol, askvol){
	lastBidPrice = prev(bid[0])		// 上一个 tick 的买一价
	lastAskPrice = prev(ask[0])		// 上一个 tick 的卖一价
	lastBidQty = prev(bidvol[0])		// 上一个 tick 的买一量
	lastAskQty = prev(askvol[0])	// 上一个 tick 的卖一量
	// 求委买增额
	bidAmtDiff = rowSum(bid*bidvol*(bid >= lastBidPrice)) - lastBidPrice*lastBidQty
	// 求委卖增额
	askAmtDiff = rowSum(ask*askvol*(ask <= lastAskPrice)) - lastAskPrice*lastAskQty
	return bidAmtDiff - askAmtDiff
}

用上面的自定义函数,可以快速地实现净委买增额的批计算和流计算。

  • 批计算
snapshot = loadTable("dfs://SH_TSDB_snapshot_ArrayVector", "snapshot")
res1 = select SecurityID, DateTime, calculateAmtDiff(BidPrice, OfferPrice, BidOrderQty, OfferOrderQty) as amtDiff from snapshot context by SecurityID csort DateTime
  • 流计算
// 创建输入输出表
share(streamTable(1:0, snapshot.schema().colDefs.name, snapshot.schema().colDefs.typeString), `snapshotStreamTable)
share(streamTable(1:0, `SecurityID`DateTime`amtDiff, [SYMBOL, TIMESTAMP, DOUBLE]), `res2)
go
// 创建流计算引擎
createReactiveStateEngine(name="calAmtDiffDemo", metrics=<[DateTime, calculateAmtDiff(BidPrice, OfferPrice, BidOrderQty, OfferOrderQty)]>, dummyTable=snapshotStreamTable, outputTable=res2, keyColumn=`SecurityID)
// 创建订阅
subscribeTable(tableName="snapshotStreamTable", actionName="calAmtDiffTest", offset=-1, handler=getStreamEngine("calAmtDiffDemo"), msgAsTable=true)
// 取数据回放,模拟流数据
testData = select * from snapshot where date(DateTime)=2021.12.01 order by DateTime
submitJob("replayData", "replay snapshot data", replay{inputTables=testData, outputTables=snapshotStreamTable, dateColumn=`DateTime, timeColumn=`DateTime, replayRate=1000})
十档净委买增额

根据《DolphinDB 处理 Level 2 行情数据实例》 中 3.1.4 章节,实现的指标公式如下:

其中 level10_Difft 表示 t 时刻的委买增额; bidi,t 表示 t 时刻快照数据的第 i 档买方报价;指示函数 I 含义如下:

DolphinDB 中可以先通过行对齐函数 rowAlign 实现当前十档价格和前一个十档价格进行行对齐;然后通过 rowAt 和 nullFill 函数分别获取对应档位的委托量和价格;最后通过 rowSum 函数计算总的变化额。

@state
def level10_Diff(price, qty, buy){
        prevPrice = price.prev()
        left, right = rowAlign(price, prevPrice, how=iif(buy, "bid", "ask"))
        qtyDiff = (qty.rowAt(left).nullFill(0) - qty.prev().rowAt(right).nullFill(0)) 
        amtDiff = rowSum(nullFill(price.rowAt(left), prevPrice.rowAt(right)) * qtyDiff)
        return amtDiff
}

snapshot = loadTable("dfs://SH_TSDB_snapshot_ArrayVector", "snapshot")
res = select SecurityID, DateTime, level10_Diff(BidPrice, BidOrderQty, true) as level10_Diff from snapshot context by SecurityID csort DateTime

总结

针对不定长度二维数组的存储和计算,DolphinDB 提供了一种特殊的数据形式 —— Array Vector。

本教程针对 Array Vector 的特点、使用方式、注意事项、应用场景等做了一系列的介绍。

DolphinDB 能够存储大量的历史高频行情数据,为其后续的高效因子挖掘和计算提供支持,特别是在 level 2 快照数据的存储和计算上,Array Vector 展现了其简洁、高效、灵活的特点。

附件

snapshot_100stocks_multi.zip

体验 DolphinDB Array Vector 带来的性能提升!戳链接获取:高频行情存储&计算解决方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1039538.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【李沐深度学习笔记】基础优化方法

课程地址和说明 基础优化方法p2 本系列文章是我学习李沐老师深度学习系列课程的学习笔记&#xff0c;可能会对李沐老师上课没讲到的进行补充。 基础优化方法 在讲具体的线性回归实现之前&#xff0c;要先讲一下基础的优化模型的方法 梯度下降 当模型没有显示解&#xff08…

比较身高-第15届蓝桥杯第一次STEMA测评Scratch真题精选

[导读]&#xff1a;超平老师的《Scratch蓝桥杯真题解析100讲》已经全部完成&#xff0c;后续会不定期解读蓝桥杯真题&#xff0c;这是Scratch蓝桥杯真题解析第153讲。 第15届蓝桥杯第1次STEMA测评已于2023年8月20日落下帷幕&#xff0c;编程题一共有6题&#xff0c;分别如下&a…

【HUAWEI】trunk和access两种链路模式实例

目录 &#x1f96e;0.写在前面 &#x1f363;基本操作命令 &#x1f363;常见视图命令 &#x1f96e;1、trunkaccess &#x1f363;1.1、拓扑图 &#x1f363;1.2、操作思路 &#x1f363;1.3、配置操作 &#x1f361;1.3.1、LSW1配置 &#x1f361;1.3.2、LSW2配置 &#x1f3…

Android Key/Trust Store研究+ssl证书密钥

前言&#xff1a;软件搞环境涉及到了中间件thal trustzone certificate key&#xff0c;翻译过来是thal信任区域证书密钥 &#xff0c;不明白这是什么&#xff0c;学习一下 ssl证书密钥 SSL密钥是SSL加密通信中的重要组成部分。SSL证书通过加密算法生成&#xff0c;用于保护网…

sgx支持数据库环境配置,编译,debug

环境都编译为debug模式&#xff0c;为了开发&#xff0c;并利用sgx的debugger sgx-gdb进行debug 查看cpu是否支持sgx delldell-Precision-3630-Tower  /nvme  lscpu Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte…

AUTOSAR中的Crypto Stack(一)--概述

前面我们聊到了比较多的关于信息安全的概念,以及主流MCU的信息安全方案。但从软件工程师的角度来看,最终这些信息安全的概念都是会从软件来实现;如何设计出一种合理、安全的信息安全软件框架,我们从AUTOSAR的加密栈来分析。 该协议栈主要从以下几个方面来介绍: AUTOSAR中…

蓝桥杯打卡Day15天

文章目录 买不到的数目错误票据 一、买不到的数目OJ链接 本题思路:引理&#xff1a;给定a&#xff0c;b&#xff0c;若dgcd(a,b)>1 ,则一定不能凑出最大数。结论&#xff1a;如果 a,b均是正整数且互质&#xff0c;那么由 axby,x≥0,y≥0 不能凑出的最大数是 ab−a−b。 证…

Bigemap如何查看历史影像

工具 Bigemap gis office地图软件 BIGEMAP GIS Office-全能版 Bigemap APP_卫星地图APP_高清卫星地图APP 很多人都在寻找历史影像图&#xff0c;这块的需求是非常大&#xff0c;历史影像一般可以用于历史地貌的变迁分析&#xff0c;还原以前的生态场景&#xff0c;对范围面积…

深入探讨Vue.js:从基础到高级(最佳实践)

文章目录 Vue.js 基础1. Vue.js 是什么&#xff1f;2. Vue 实例3. 双向数据绑定 Vue 组件1. 什么是 Vue 组件&#xff1f;2. 组件之间的通信 Vue 模板语法1. 插值和指令2. 条件和循环3. 事件绑定和表单输入绑定 Vue 路由1. Vue Router安装和配置&#xff1a;导航&#xff1a; 2…

企业做软文推广的三大错误有哪些?媒介盒子为您解答

软文营销已经成为企业宣传的主要方式&#xff0c;但有很多企业来找媒介盒子咨询&#xff0c;明明花了大量成本来做软文推广&#xff0c;为什么就是没效果呢&#xff1f;小编看了下&#xff0c;发现大部分企业做软文推广效果不明显&#xff0c;基本上犯了三大错误&#xff0c;接…

风向对风力机发电的影响

目录 1. 摘要2. 简介 1. 摘要 随着风力发电机的增大&#xff0c;风向随高度的变化&#xff08;风偏转&#xff09;在入流风场中起到了至关重要的作用。我们使用明尼苏达大学Eolos风能研究站5年的实地数据集来探讨风偏转的特性及其对涡轮性能的影响。风偏转表现出明显的日变化&…

一款值得入手的双节电池1A电流线性充电芯片-YB4028

概述: YB4028 是一款双节串联锂电池充电管理芯片&#xff0c;集成涓流、恒流、恒压三段式线性充电管理&#xff0c;符合锂电池安全充电规范。充电输入耐压高达20V&#xff0c;充电电流高至 10A&#xff0c;可通过片外电阻配置。 YB4028 集成防倒灌电路&#xff0c;输入电压拔…

大数据flink篇之一-基础知识

一、起源 2010至2014年间&#xff0c;由柏林工业大学、柏林洪堡大学和哈索普拉特纳研究所联合发起名Stratosphere的研究项目。2014年4月&#xff0c;项目贡献给Apache基金会&#xff0c;成为孵化项目。更名为Flink2014年12月&#xff0c;成为基金会顶级项目2015年9月&#xff…

MAC word 如何并列排列两张图片

系统&#xff1a;MAC os 参考博客 https://baijiahao.baidu.com/s?id1700824516945958911&wfrspider&forpc 步骤1 新建一个word文档和表格 修改表格属性 去掉自动重调尺寸以适应内容 插入图片 在表格的位置插入对应的图片如下 去除边框 最终结果如下

数据大爆炸:大数据分析如何改变我们的世界

文章目录 大数据分析的基本概念数据的三个V大数据分析的技术 大数据分析在商业中的应用1. 个性化营销2. 风险管理3. 供应链优化4. 客户服务 大数据分析在医疗保健中的应用1. 疾病预测2. 患者治疗3. 医疗设备监控 大数据分析在科学研究中的应用1. 天文学2. 生物学3. 气象学 大数…

mock.js与组件通信之总线的讲解

目录 一Mock.js 1.1简介 1.2 安装配置Mock.js 1.3 mock.js的使用 二. 组件通信之总线 2.1 总线的简介 2.2 总线的使用-以导航栏的收进为例 好啦今天的分享就到这啦&#xff01;&#xff01; 一Mock.js 1.1简介 Mock.js 是一个用于生成随机数据的 JavaScript 库。它可以模拟…

关于vantUI的导航组件tab标签页在ios和安卓中运用遇到的坑

vantTab的默认值 应用场景问题描述原始代码更正代码 应用场景 根据路由传值设置默认tab页&#xff0c;获取不同的数据并进行展示 问题描述 ios可正常按照路由传值默认tab页&#xff0c;安卓始终默认tabList的第一个value值&#xff0c;疑安卓系统中不接受dataMap.tabActive为…

虚拟车衣VR云展厅平台扩大了展览的触达范围

传统展厅主要是以静态陈列的形式来传达内容&#xff0c;主要的展示形式有图片、视频等&#xff0c;具有一定的局限性&#xff0c;体验感较差&#xff0c;客户往往不能深入地了解信息和细节内容。 VR全景看车是通过虚拟现实技术实现逼真的汽车观赏和试乘体验。消费者可以通过智能…

Python图像处理-----几何变换

文章目录 一、图像几何变换理论二、图像平移2.1 使用数学公式的实现方式为:2.2 使用矩阵实现的方式为2.3 使用opencv三、图像缩放3.1 用数学式子表示为公式(a为缩放系数):3.2 用矩阵表示如公式所示:一、图像几何变换理论 图像几何变换不改变图像的像素值,在图像平面上进行像…

Docker ---- network中的命令详解

最近一直在使用docker&#xff0c;记录一些遇到的问题。 问题1&#xff1a;在搭建ealsticsearch与kibana时运行成功后第二次想运行出错了或者访问不了&#xff1f; 因为两个启动的容器是被互相隔离的&#xff0c;没有启用网络的互相通信不了。 问题2&#xff1a;怎么查看自己…