如何轻松搭建一套行情回放系统

news2025/1/16 1:08:15

一个量化策略在生产(交易)环境中运行时,实时数据的处理通常是由事件驱动的。为确保研发和生产使用同一套代码,通常在研发阶段需将历史数据,严格按照事件发生的时间顺序进行回放,以此模拟交易环境。在 DolphinDB 中,用户通过 replay 函数可以实现对静态数据的回放,即将历史数据按照时间顺序以“实时数据”的方式注入流数据表中。对相同时间戳的数据还可以指定额外排序列,使数据回放顺序更接近实时交易场景。

在历史数据回放、股票行情回放两篇教程中已经介绍了 DolphinDB 的回放功能,本教程更加侧重于回放功能的工程化实践本教程将介绍如何基于 DolphinDB 分布式数据库、回放功能以及 DolphinDB API 搭建一个行情数据回放服务,该服务支持多个用户同时通过 C++ 、 Python 等客户端提交数据回放请求

1. 基于 DolphinDB 的行情回放服务

本教程实现的行情回放服务基于 3 类国内 A 股行情数据源:逐笔委托数据、逐笔成交数据、Level 2 快照数据,支持以下功能与特性:

  • C++、Python 客户端提交回放请求(指定回放股票列表 、回放日期、回放速率、回放数据源)
  • 多个用户同时回放
  • 多个数据源同时有序回放
  • 在时间有序的基础上支持排序列有序(如:针对逐笔数据中的交易所原始消息记录号排序)
  • 发布回放结束信号
  • 对回放结果订阅消费

1.1 行情回放服务架构

本教程示例 DolphinDB 搭建的行情回放服务架构如下图所示:

行情回放服务架构

  • 行情数据接入:实时行情数据和历史行情数据可以通过 DolphinDB API 或插件存储到 DolphinDB 分布式时序数据库中。
  • 函数模块封装:数据查询和回放过程可以通过 DolphinDB 函数视图封装内置,仅暴露股票列表 、回放日期、回放速率、回放数据源等关键参数给行情服务用户。
  • 行情用户请求:需要进行行情回放的用户可以通过 DolphinDB 客户端软件(如 DolphinDB GUI 工具、DolphinDB VS Code 插件、DolphinDB API 等)调用封装好的回放函数对存储在数据库中的行情数据进行回放,同时,用户还可以在客户端对回放结果进行实时订阅消费。此外,支持多用户并发回放。

1.2 回放服务搭建步骤

本教程示例 DolphinDB 搭建行情回放服务的具体操作步骤如下图所示:

搭建步骤

  • Step 1:服务提供者设计合理分区的数据库表,在 DolphinDB 集成开发环境中执行对应的建库建表、数据导入等脚本,以将历史行情数据存储到 DolphinDB 分布式数据库中作为回放服务的数据源。在第二章将给出本教程涉及的 3 类行情数据的分区存储方案及建库建表脚本。
  • Step 2:服务提供者在 DolphinDB 集成开发环境中将回放过程中的操作封装成函数视图,通过封装使得行情服务用户不需要关心 DolphinDB 回放功能的细节,只需要指定简单的回放参数(股票、日期、回放速率、数据源)即可提交回放请求。在第三章将给全部函数视图的定义脚本。
  • Step 3:服务提供者在外部程序中通过 DolphinDB API 调用上述函数视图实现提交回放的功能。在第四章将给出 API 端提交回放任务的 C++ 实现和 Python 实现。此外,在第四章提交回放的基础上,在第五章将介绍对回放结果的 API 端订阅与消费的代码实现。

在第六章将给出多用户多表回放、多天回放的性能测试结果。最后两章为开发环境配置与总结。

2. 行情数据分区存储方案

本教程的回放服务基于 3 类国内 A 股行情数据源:逐笔成交数据、逐笔委托数据、快照数据,均使用 TSDB 存储引擎存储在 DolphinDB 分布式数据库中。

数据源代码样例中的分区数据库路径代码样例中的表名分区机制排序列建库建表脚本
逐笔委托dfs://Test_orderorderVALUE: 交易日, HASH: [SYMBOL, 25]股票ID,交易时间附录 逐笔委托建库建表脚本
逐笔成交dfs://Test_transactiontransactionVALUE: 交易日, HASH: [SYMBOL, 25]股票ID,交易时间附录 逐笔成交建库建表脚本
快照dfs://Test_snapshotsnapshotVALUE: 交易日, HASH: [SYMBOL, 20]股票ID,交易时间附录 快照建库建表脚本

回放的原理是从数据库中读取需要的行情数据,并根据时间列排序后写入到相应的流数据表。因此,读数据库并排序的性能对回放速度有很大的影响,合理的分区机制将有助于提高数据加载速度。基于用户通常按照日期和股票提交回放请求的特点,设计了上表的分区方案。

此外,本教程的历史数据存储在三节点双副本的 DolphinDB 集群中,集群和副本同样可以提升读取性能,同时可以增加系统的可用性,分区的副本通常是存放在不同的物理节点的,所以一旦某个分区不可用,系统依然可以调用其它副本分区来保证回放服务的正常运转。

附录将提供部分原始数据的 csv 文件(原始行情数据文件)以及对应的示例导入脚本(逐笔委托示例导入脚本、逐笔成交示例导入脚本、快照示例导入脚本),以便读者快速体验搭建本教程所示的行情回放服务。

3. 行情回放自定义函数

本章介绍回放过程中的主要函数功能及其实现,最后将函数封装成视图以便通过 API 等方式调用。

本章的开发工具采用 DolphinDB GUI,完整脚本见附录:行情回放函数。

函数名函数入参函数功能
stkReplaystkList:回放股票列表startDate:回放开始日期endDate:回放结束日期replayRate:回放速率replayUuid:回放用户标识replayName:回放数据源名称行情回放服务主函数
dsTbtimeRS:数据源时间划分startDate:回放开始日期endDate:回放结束日期stkList:回放股票列表replayName:回放数据源名称构造回放数据源
createEndtabName:回放输出表名称sortColumn:相同时间戳时额外排序列名构造回放结束信号
replayJobinputDict:回放输入数据源tabName:回放输出表名称dateDict:回放排序时间列timeDict:回放排序时间列replayRate:回放速率sortColumn:相同时间戳时额外排序列名定义回放任务内容

3.1 stkReplay 函数:行情回放服务主函数

函数定义代码:

def stkReplay(stkList, mutable startDate, mutable endDate, replayRate, replayUuid, replayName)
{
    maxCnt = 50
    returnBody = dict(STRING, STRING)
    startDate = datetimeParse(startDate, "yyyyMMdd")
    endDate = datetimeParse(endDate, "yyyyMMdd") + 1
    sortColumn = "ApplSeqNum"
    if(stkList.size() > maxCnt)
    {
        returnBody["errorCode"] = "0"
        returnBody["errorMsg"] = "超过单次回放股票上限,最大回放上限:" + string(maxCnt)
        return returnBody
    }
    if(size(replayName) != 0)
    { 
        for(name in replayName)
        {
            if(not name in ["snapshot", "order", "transaction"])
            {
                returnBody["errorCode"] = "0"
                returnBody["errorMsg"] = "请输入正确的数据源名称,不能识别的数据源名称:" + name
                return returnBody
            }
        }
    }
    else
    {
        returnBody["errorCode"] = "0"
        returnBody["errorMsg"] = "缺少回放数据源,请输入正确的数据源名称"
        return returnBody
    }
    try 
    {
        if(size(replayName) == 1 && replayName[0] == "snapshot")
        {
            colName = ["timestamp", "biz_type", "biz_data"]
            colType = [TIMESTAMP, SYMBOL, BLOB]
            sortColumn = "NULL"
        }
        else
        {
            colName = ["timestamp", "biz_type", "biz_data", sortColumn]
            colType = [TIMESTAMP, SYMBOL, BLOB, LONG]
        }
        msgTmp = streamTable(10000000:0, colName, colType)
        tabName = "replay_" + replayUuid
        enableTableShareAndPersistence(table=msgTmp, tableName=tabName, asynWrite=true, compress=true, cacheSize=10000000, retentionMinutes=60, flushMode=0, preCache=1000000)
        
        timeRS = cutPoints(09:30:00.000..15:00:00.000, 23)
        
        inputDict = dict(replayName, each(dsTb{timeRS, startDate, endDate, stkList}, replayName))
        dateDict = dict(replayName, take(`MDDate, replayName.size()))
        timeDict = dict(replayName, take(`MDTime, replayName.size()))
        
        jobId = "replay_" + replayUuid
        jobDesc = "replay stock data"
        submitJob(jobId, jobDesc, replayJob{inputDict, tabName, dateDict, timeDict, replayRate, sortColumn})
        returnBody["errorCode"] = "1"
        returnBody["errorMsg"] = "后台回放成功"
        return returnBody
    }
    catch(ex)
    {
        returnBody["errorCode"] = "0"
        returnBody["errorMsg"] = "回放行情数据异常,异常信息:" + ex
        return returnBody
    }
}

函数功能:

自定义函数 stkReplay 是整个回放的主体函数,用户传入的参数在 stkReplay 里会进行有效性判断及格式处理,可以根据实际需求更改。

首先,用 maxCnt 来控制用户一次回放股票数量的最大上限,本例中设置的是 50 。returnBody 构造了信息字典,返回给用户以提示执行错误或执行成功。回放开始日期 startDate 和回放结束日期 endDate 利用 datetimeParse 函数进行格式处理 。replayRate 是回放速率,replayUuid 是回放表名名称,replayName 是回放数据源列表,sortColumn 是数据源同回放时间戳排序列列名。

当输入参数无误后,便初始化回放结果流表,结果流表为异构流数据表,字段类型为 BLOB 的字段包含了一条原始记录的全部信息,同时结果流表为持久化流表,enableTableShareAndPersistence 函数把流数据表共享并把它持久化到磁盘上,使用持久化流表可以避免内存占用过大。当回放数据源包含逐笔成交(transaction )或逐笔委托(order)时,本例实现了对相同时间戳的逐笔数据按交易所原始消息记录号(ApplSeqNum)进行排序(具体实现见 3.3 replayJob 函数),所以结果流表中必须冗余一列来存放排序列。若回放数据源仅包含快照(snapshot)时,则不需要冗余一列排序列。

定义回放需要的其他参数。inputDict 构造了回放数据源列表字典,利用 each 函数和 部分应用 可以对多个数据源进行简洁的定义。dateDict 和 timeDict 构造了回放数据源时间戳字典。最后通过 submitJob 提交后台回放任务。

3.2 dsTb 函数:构造回放数据源

函数定义代码:

def dsTb(timeRS, startDate, endDate, stkList, replayName)
{
    if(replayName == "snapshot"){
        tab = loadTable("dfs://Test_snapshot", "snapshot")
	}
	else if(replayName == "order") {
		tab = loadTable("dfs://Test_order", "order")
	}
	else if(replayName == "transaction") {
		tab = loadTable("dfs://Test_transaction", "transaction")
	}
	else {
		return NULL
	}
    ds = replayDS(sqlObj=<select * from tab where MDDate>=startDate and MDDate<endDate and HTSCSecurityID in stkList>, dateColumn='MDDate', timeColumn='MDTime', timeRepartitionSchema=timeRS)
    return ds
}

函数功能:

自定义函数 dsTb 返回符合用户回放需求的数据源划分结果。主要是对内置 replayDS 函数进行了进一步的封装,函数首先对用户端输入的 replayName 进行判断,选择从数据库加载对应的表对象。利用 replayDS 对交易日期在 startDate 至 endDate 之间、股票代号在 stkList 内的数据按照 timeRS 进行数据源划分,返回某一个数据源的列表。

timeRS 参数对应 replayDS 函数中的 timeRepartitionSchema 参数,是时间类型的向量,可用于将数据源划分为更小粒度的多个数据源,以保证查询 DFS 表中数据的效率以及控制内存大小。本例在 3.1 stkReplay 函数 中构造了 timeRS 变量,其意为对于一天的数据在有效时间段内平均分为 23 份。

执行如下代码查看 dsTb 函数返回的结果:

timeRS = cutPoints(09:30:00.000..15:00:00.000, 3)
startDate = 2021.12.01
endDate = 2021.12.02
stkList = ['000616.SZ']
replayName = ["order"]
ds = dsTb(timeRS, startDate, endDate, stkList, replayName)

ds 为一个向量,其中每一个元素如下,数据源被划分为多个小的 SQL 查询语句,具体原理参考 replayDS 函数。

DataSource< select [4] * from tab where time(MDTime) < 09:30:00.000,nanotime(MDTime) >= 00:00:00.000000000,date(MDDate) == 2021.12.01,MDDate >= 2021.12.01 and MDDate < 2021.12.02 and SecurityID in ["000616.SZ"] order by MDDate asc,MDTime asc >
DataSource< select [4] * from tab where time(MDTime) < 11:20:00.001,time(MDTime) >= 09:30:00.000,date(MDDate) == 2021.12.01,MDDate >= 2021.12.01 and MDDate < 2021.12.02 and SecurityID in ["000616.SZ"] order by MDDate asc,MDTime asc >
DataSource< select [4] * from tab where time(MDTime) < 13:10:00.001,time(MDTime) >= 11:20:00.001,date(MDDate) == 2021.12.01,MDDate >= 2021.12.01 and MDDate < 2021.12.02 and SecurityID in ["000616.SZ"] order by MDDate asc,MDTime asc >
DataSource< select [4] * from tab where time(MDTime) < 15:00:00.001,time(MDTime) >= 13:10:00.001,date(MDDate) == 2021.12.01,MDDate >= 2021.12.01 and MDDate < 2021.12.02 and SecurityID in ["000616.SZ"] order by MDDate asc,MDTime asc >
DataSource< select [4] * from tab where nanotime(MDTime) <= 23:59:59.999999999,time(MDTime) >= 15:00:00.001,date(MDDate) == 2021.12.01,MDDate >= 2021.12.01 and MDDate < 2021.12.02 and SecurityID in ["000616.SZ"] order by MDDate asc,MDTime asc >

3.3 replayJob 函数:定义回放任务内容

函数定义代码:

def replayJob(inputDict, tabName, dateDict, timeDict, replayRate, sortColumn)
{
    if(sortColumn == "NULL")
    {
        replay(inputTables=inputDict, outputTables=objByName(tabName), dateColumn=dateDict, timeColumn=timeDict, replayRate=int(replayRate), absoluteRate=false, parallelLevel=23)
    }
    else
    {
        replay(inputTables=inputDict, outputTables=objByName(tabName), dateColumn=dateDict, timeColumn=timeDict, replayRate=int(replayRate), absoluteRate=false, parallelLevel=23, sortColumns=sortColumn)    
    }
    createEnd(tabName, sortColumn)
}

函数功能:

自定义函数 replayJob 提交用户数据回放并调用 createEnd 函数,这里首先通过内置 replay 函数回放用户需求数据,此处回放模式为 N 对 1 异构回放。replay 函数返回后即表示需要的数据已经全部回放结束,再执行自定义 createEnd 函数以构造结束信号写入回放结果里。调用 createEnd 函数是可选的,其功能是在回放结束时再发布一条特殊的记录,以标识回放结束。

参数 sortColumn 用于指定额外的排序列,如果用户回放的数据源为仅仅包含快照(snapshot),对于这种特殊情况需要指定 replayJob 函数的回放时间戳排序列参数 sortColumn 为 NULL ,则调用内置的 replay 函数时不加入 sortColumn 参数。

3.4 createEnd 函数:构造回放结束信号

函数定义代码:

def createEnd(tabName, sortColumn)
{
    dbName = "dfs://End"
    tbName = "endline"
    if(not existsDatabase(dbName))
    {
        db = database(directory=dbName, partitionType=VALUE, partitionScheme=2023.04.03..2023.04.04)
        endTb = table(2200.01.01T23:59:59.000 as DateTime, `END as point, long(0) as ApplSeqNum)
        endLine = db.createPartitionedTable(table=endTb, tableName=tbName, partitionColumns=`DateTime)
        endLine.append!(endTb)
    }
     
    ds = replayDS(sqlObj=<select * from loadTable(dbName, tbName)>, dateColumn='DateTime', timeColumn='DateTime')
    
    inputEnd = dict(["end"], [ds])
    dateEnd = dict(["end"], [`DateTime])
    timeEnd = dict(["end"], [`DateTime])
    if(sortColumn == "NULL")
    {
        replay(inputTables=inputEnd, outputTables=objByName(tabName), dateColumn=dateEnd, timeColumn=timeEnd, replayRate=-1, absoluteRate=false, parallelLevel=1)
    }
    else
    {
        replay(inputTables=inputEnd, outputTables=objByName(tabName), dateColumn=dateEnd, timeColumn=timeEnd, replayRate=-1, absoluteRate=false, parallelLevel=1, sortColumns=sortColumn)
    }
}

函数功能:

自定义函数 createEnd 是在回放结束时给用户提供一条回放结束信号,利用了 replay 回放模式中的 N 对 1 异构回放构造回放信号,会往参数 tabName 指定的异构流表中写入一条消息类型列为 end 的记录。为方便后期异构消费解析以及复用,此处为结束信号独立建一个数据库并创建分区表,表内必须有时间列,其他字段可选。并向该表写入一条模拟数据,其数据内容没有任何强制要求。DolphinDB 建库建表相关知识可参考分区数据库 。inputEnd、dateEnd、timeEnd 字典的 key 按需设置为字符串 end,将对应 replay 函数指定的输出表中的第二个字典,即消息类型。

参数 sortColumn 用于指定额外的排序列,如果用户回放的数据源为仅仅包含快照(snapshot),则调用内置的 replay 函数时不加入 sortColumn 参数,结果流表中的结束信号记录如下。

3.5 封装函数视图

将以上函数利用 addFunctionView 封装成函数视图,具体代码如下。 API 端仅需要调用主函数 stkReplay,第四章回放全部基于该函数视图。

addFunctionView(dsTb)
addFunctionView(createEnd)
addFunctionView(replayJob)
addFunctionView(stkReplay)

4. API 提交回放

4.1 C++ API

本例程序运行在 Linux 系统 DolphinDB C++ API 环境下,编译成可执行文件以命令行输入参数执行。完整脚本见附录 C++ 源码。

4.1.1 C++ API 调用回放服务

C++ API 调用回放服务代码如下:

DictionarySP result = conn.run("stkReplay", args);
string errorCode = result->get(Util::createString("errorCode"))->getString();
if (errorCode != "1") 
{
    std::cout << result->getString() << endl;
    return -1;
}

conn 是 DolphinDB C++ API DBConnection 对象, C++ 应用可以通过它在 DolphinDB 服务器上执行脚本和函数并在两者之间双向传递数据。conn 通过 run 方法调用 stkReplay 自定义函数,args 容器内包含了 stkReplay 函数所需的参数。执行结果返回到字典 result 中,可以通过 get 方法来获得 key 为 errorCode 对应的值,如果错误代号不为 1 ,则代表回放执行错误,返回错误信息并结束程序。

多用户进行回放时,需要对用户回放的流表进行命名,为了不重复命名以避免冲突,对每个回放用户生成唯一识别号,会生成形如 “Eq8Jk8Dd0Tw5Ej8D” 的识别字符串,代码如下:

string uuid(int len)
{
    char* str = (char*)malloc(len + 1);
    srand(getpid());
    for (int i = 0; i < len; ++i)
    {
        switch (i % 3)
        {
        case 0:
            str[i] = 'A' + std::rand() % 26;
            break;
        case 1:
            str[i] = 'a' + std::rand() % 26;
            break;
        default:
            str[i] = '0' + std::rand() % 10;
            break;
        }
    }
    str[len] = '\0';
    std::string rst = str;
    free(str);
    return rst;
}

4.2 Python API

本例程序运行在 Windows 系统 DolphinDB Python API 环境下,可以在任何具备该环境的客户端执行。完整脚本见附录 Python 源码。

4.2.1 Python API 调用回放服务

Python API 调用回放服务代码如下:

stk_list = ['000616.SZ','000681.SZ']
start_date = '20211201'
end_date = '20211201'
replay_rate = -1
replay_name = ['snapshot']
s.upload({'stk_list':stk_list, 'start_date':start_date, 'end_date':end_date, 'replay_rate':replay_rate, 'replay_uuid':uuidStr, 'replay_name':replay_name})
s.run("stkReplay(stk_list, start_date, end_date, replay_rate, replay_uuid, replay_name)")

stk_list 是回放股票列表,start_date 是回放开始日期,end_date 是回放结束日期,replay_rate 是回放速率,replay_uuid 是回放流表名称,replay_name 是回放数据源列表。s 是 DolphinDB 会话,Python 应用通过会话在 DolphinDB 服务器上执行脚本和函数以及在两者之间双向传递数据。使用 s 的 upload 方法上传 Python 对象,使用 run 方法执行 stkReplay 函数。

多用户进行回放时,需要对用户回放的流表进行命名,为了不重复命名以避免冲突,对每个回放用户生成唯一识别号,会生成形如“Eq8Jk8Dd0Tw5Ej8D”的识别字符串,代码如下:

def uuid(length):
    str=""
    for i in range(length):
        if(i % 3 == 0):
            str += chr(ord('A') + random.randint(0, os.getpid() + 1) % 26)
        elif(i % 3 == 1):
            str += chr(ord('a') + random.randint(0, os.getpid() + 1) % 26)
        else:
            str += chr(ord('0') + random.randint(0, os.getpid() + 1) % 10)
    return str

uuidStr = uuid(16)

5. API 订阅消费

5.1 C++ API

5.1.1 反序列化器构造

为使异构流数据表反序列化输出,构造输出表结构,代码如下:

DictionarySP snap_full_schema = conn.run("loadTable(\"dfs://Test_snapshot\", \"snapshot\").schema()");
DictionarySP order_full_schema = conn.run("loadTable(\"dfs://Test_order\", \"order\").schema()");
DictionarySP transac_full_schema = conn.run("loadTable(\"dfs://Test_transaction\", \"transaction\").schema()");
DictionarySP end_full_schema = conn.run("loadTable(\"dfs://End\", \"endline\").schema()");

unordered_map<string, DictionarySP> sym2schema;
sym2schema["snapshot"] = snap_full_schema;
sym2schema["order"] = order_full_schema;
sym2schema["transaction"] = transac_full_schema;
sym2schema["end"] = end_full_schema;
StreamDeserializerSP sdsp = new StreamDeserializer(sym2schema);

上述前四行分别是快照、逐笔委托、逐笔成交、结束信号的表结构构造,回放根据需要选择对应的表结构,结束信号表必须有。unordered_map 创建了 key->schema 的映射表,可以将回放数据源和结束信号按照 schema 进行结构解析。通过 StreamDeserializer 对象来构造异构流表反序列化器。

5.1.2 C++ API 订阅回放服务

在 DolphinDB C++ API 中订阅异构流表,代码如下:

int listenport = 10260;
ThreadedClient threadedClient(listenport);
string tableNameUuid = "replay_" + uuidStr;
auto thread = threadedClient.subscribe(hostName, port, myHandler, tableNameUuid, "stkReplay", 0, true, nullptr, true, 500000, 0.001, false, "admin", "123456", sdsp);
std::cout << "Successed to subscribe " + tableNameUuid << endl;
thread->join();

listenport 是单线程客户端的订阅端口号,tableNameUuid 是用户回放的流表名称,通过 threadedClient.subscribe 可以订阅回放的流表,详情用法参考 C++ API 订阅 。thread 指向循环调用 myHandler 的线程的指针,线程在 topic 被取消订阅后会退出。

5.1.3 消费函数构造

调用订阅函数 threadedClient.subscribe 订阅异构流数据表时,在最后一个参数指定相应的流数据反序列化实例 StreamDeserializerSP ,在订阅时会对收到的数据进行反序列化再传递给用户自定义的回调函数 myHandler,用户可以在回调函数内自定义消费,本文仅实现了简单的输出消费,当消息 msg 的标识为 end 时,通过 threadedClient.unsubscribe 取消订阅,代码如下:

long sumcount = 0;
long long starttime = Util::getNanoEpochTime();
auto myHandler = [&](vector<Message> msgs) 
{
    for (auto& msg : msgs) 
    {
        std::cout << msg.getSymbol() << " : " << msg->getString() << endl;
        if(msg.getSymbol() == "end")
        {
            threadedClient.unsubscribe(hostName, port, tableNameUuid,"stkReplay");
        }
        sumcount += msg->get(0)->size();
    }
    long long speed = (Util::getNanoEpochTime() - starttime) / sumcount;
    std::cout << "callback speed: " << speed << "ns" << endl;
};

5.1.4 程序执行

将如上 C++ 程序代码写入 main.cpp 中,在 DolphinDB C++ API 环境下编译成可执行文件 main ,按照如下命令格式输入,回车,即可开始“回放-订阅-消费”过程。

单支股票最大速度回放 “order“ 表一天数据命令:

$ ./main 000616.SZ 20211201 20211201 -1 order

两支(多支)股票最大速度回放三张表一天数据命令:

$ ./main 000616.SZ,000681.SZ 20211201 20211201 -1 snapshot,order,transaction

5.1.5 消费输出

根据 5.1.4 构造的消费函数,两支股票(“000616.SZ” &“000681.SZ”)最大速度回放三张表一天数据输出如下图所示:

消费输出结果

5.2 Python API

5.2.1 反序列化器构造

为使异构流数据表反序列化输出,Python API 通过 streamDeserializer 类来构造异构流表反序列化器,同时定义输出表结构,代码如下:

sd = ddb.streamDeserializer({
    'snapshot':  ["dfs://Test_snapshot", "snapshot"],
    'order': ["dfs://Test_order", "order"],
    'transaction': ["dfs://Test_transaction", "transaction"],
    'end': ["dfs://End", "endline"],
}, s)

上述主体部分分别是快照、逐笔委托、逐笔成交、结束信号的表结构构造,回放根据需要选择对应的表结构,结束信号表必须有。

5.2.2 Python API 订阅回放服务

在 DolphinDB Python API 中订阅异构流表,代码如下:

s.enableStreaming(0)
s.subscribe(host=hostname, port=portname, handler=myHandler, tableName="replay_"+uuidStr, actionName="replay_stock_data", offset=0, resub=False, msgAsTable=False, streamDeserializer=sd, userName="admin", password="123456")
event.wait()

enableStreaming 函数启用流数据功能,函数参数指定开启数据订阅的端口。s 使用 subscribe 函数来订阅 DolphinDB 中的流数据表,详情用法参考 Python API 订阅 。订阅是异步执行的,event.wait() 保持主线程不退出。

5.2.3 消费函数构造

调用订阅函数 s.subscribe 订阅异构流数据表时,在参数指定相应的流数据反序列化实例 streamDeserializer,在订阅时会对收到的数据进行反序列化再传递给用户自定义的回调函数 myHandler,用户可以在回调函数内自定义消费,仅输出消费代码如下:

def myHandler(lst):
    if lst[-1] == "snapshot":
        print("SNAPSHOT: ", lst)
    elif lst[-1] == 'order':
        print("ORDER: ", lst)
    elif lst[-1] == 'transaction':
        print("TRANSACTION: ", lst)
    else:
        print("END: ", lst)
        event.set()

5.2.4 消费输出

根据 5.2.3 构造的消费函数,两支股票(“000616.SZ” &“000681.SZ”)最大速度回放三张表一天数据输出如下图所示:

消费输出结果

6. 性能测试

测试的交易日范围从 2021 年 12 月 1 日至 12 月 9 日共 7 个连续交易日。测试脚本见附录:C++ 测试源码。

6.1 测试服务器配置

CPU 类型Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz
逻辑 CPU 总数64
内存503 GB
硬盘SSD
OSCentOS Linux release 7.9.2009 (Core)
DolphinDB Server 版本2.00.9.3 2023.03.29

6.2 50 支深交所股票一天全速并发回放性能测试

测试方法:选取深交所 2021 年 12 月 1 日 50 支交易股票数据,linux 后台同时启动多个第四章 C++ API 提交回放程序,提交多个并发回放任务。

回放耗时=所有回放任务最晚结束时间-所有回放任务最早收到时间
回放速率=所有用户回放数据量总和/回放耗时
平均单个用户回放耗时=(所有回放任务结束时间和-所有回放任务开始时间和)/并发数量
平均单个用户回放速率=单个用户回放数据量/平均单个用户回放耗时
并发数量数据源数据量回放耗时回放速率单个用户回放耗时(平均)单个用户回放速率(平均)
1snapshottransactionorder2,775,129行约2.669s约104w/s约2.669s约104w/s
10snapshottransactionorder27,751,290行约10.161s约273w/s约8.482s约32w/s
20snapshottransactionorder55,502,580行约18.761s约296w/s约16.728s约16w/s
40snapshottransactionorder111,005,160行约35.537s约312w/s约19.091s约14w/s
1snapshottransaction1,416,548行约2.003s约70w/s约2.003s约70w/s
10snapshottransaction14,165,480行约7.155s约198w/s约6.382s约22w/s
20snapshottransaction28,330,960行约13.115s约216w/s约11.436s约12w/s
40snapshottransaction56,661,920行约27.003s约210w/s约13.740s约10w/s
1snapshot194,045行约1.387s约14w/s约1.387s约14w/s
10snapshot1,940,450行约6.428s约30w/s约5.128s约4w/s
20snapshot3,880,900行约11.782s约33w/s约10.539s约2w/s
40snapshot7,761,800行约23.274s约33w/s约12.393s约1w/s

6.3 50 支深交所股票跨天全速回放性能测试

测试方法:选取深交所 50 支交易股票数据,linux 后台启动第四章 C++ API 提交回放程序,提交多天回放任务。

回放耗时=回放任务结束时间-回放任务收到时间
回放速率=回放数据量/回放耗时
数据源数据量回放耗时回放速率
snapshottransactionorder一天:2,775,129行约2.925s约95w/s
snapshottransactionorder一周:17,366,642行约16.393s约106w/s
snapshottransaction一天:1,416,548行约1.912s约74w/s
snapshottransaction一周:8,899,562行约9.142s约97w/s
snapshot一天:194,045行约1.267s约15w/s
snapshot一周:1,279,869行约6.659s约19w/s

7. 开发环境配置

部署 DolphinDB Server

  • Server 版本:2.00.9.3 2023.03.29
  • Server 部署模式:单服务器集群
  • 配置文件:cluster.cfg
maxMemSize=128
maxConnections=5000
workerNum=24
localExecutors=23
webWorkerNum=2
chunkCacheEngineMemSize=16
newValuePartitionPolicy=add
logLevel=INFO
maxLogSize=512
node1.volumes=/ssd/ssd3/pocTSDB/volumes/node1,/ssd/ssd4/pocTSDB/volumes/node1
node2.volumes=/ssd/ssd5/pocTSDB/volumes/node2,/ssd/ssd6/pocTSDB/volumes/node2
node3.volumes=/ssd/ssd7/pocTSDB/volumes/node3,/ssd/ssd8/pocTSDB/volumes/node3
diskIOConcurrencyLevel=0
node1.redoLogDir=/ssd/ssd3/pocTSDB/redoLog/node1
node2.redoLogDir=/ssd/ssd4/pocTSDB/redoLog/node2
node3.redoLogDir=/ssd/ssd5/pocTSDB/redoLog/node3
node1.chunkMetaDir=/ssd/ssd3/pocTSDB/metaDir/chunkMeta/node1
node2.chunkMetaDir=/ssd/ssd4/pocTSDB/metaDir/chunkMeta/node2
node3.chunkMetaDir=/ssd/ssd5/pocTSDB/metaDir/chunkMeta/node3
node1.persistenceDir=/ssd/ssd6/pocTSDB/persistenceDir/node1
node2.persistenceDir=/ssd/ssd7/pocTSDB/persistenceDir/node2
node3.persistenceDir=/ssd/ssd8/pocTSDB/persistenceDir/node3
maxPubConnections=128
subExecutors=24
subThrottle=1
persistenceWorkerNum=1
node1.subPort=8825
node2.subPort=8826
node3.subPort=8827
maxPartitionNumPerQuery=200000
streamingHAMode=raft
streamingRaftGroups=2:node1:node2:node3
node1.streamingHADir=/ssd/ssd6/pocTSDB/streamingHADir/node1
node2.streamingHADir=/ssd/ssd7/pocTSDB/streamingHADir/node2
node3.streamingHADir=/ssd/ssd8/pocTSDB/streamingHADir/node3
TSDBCacheEngineSize=16
TSDBCacheEngineCompression=false
node1.TSDBRedoLogDir=/ssd/ssd3/pocTSDB/TSDBRedoLogDir/node1
node2.TSDBRedoLogDir=/ssd/ssd4/pocTSDB/TSDBRedoLogDir/node2
node3.TSDBRedoLogDir=/ssd/ssd5/pocTSDB/TSDBRedoLogDir/node3
TSDBLevelFileIndexCacheSize=20
TSDBLevelFileIndexCacheInvalidPercent=0.6
lanCluster=0
enableChunkGranularityConfig=true
路径配置参数需要开发人员根据实际环境配置

有关单服务器集群部署教程,请参考:单服务器集群部署。

DolphinDB client 开发环境

  • CPU 类型:Intel(R) Core(TM) i5-11500 @ 2.70GHz 2.71 GHz
  • 逻辑 CPU 总数:12
  • 内存:16GB
  • OS:Windows 11 专业版
  • DolphinDB GUI 版本:1.30.20.1

DolphinDB GUI 安装教程:DolphinDB GUI 使用手册

DolphinDB C++ API 安装

  • C++ API 版本:release200.9
C++ API 版本建议按 DolphinDB server 版本选择,如 2.00.9 版本的 server 安装 release200.9 分支的 API

C++ API 教程:C++ API 教程

DolphinDB Python API 安装

  • Python API 版本:1.30.21.1

Python API 教程:Python API 教程

8. 路线图 (Roadmap)

  • 在后续版本中,replay 函数将支持原速回放,即严格以两条记录的时间戳之差为输出间隔向下游注入数据;
  • 在后续版本中,replay 函数将支持原速倍速回放,即严格以两条记录的时间戳之差为输出间隔进行倍速再向下游注入数据。

9. 总结

本教程基于高性能分布式时序数据库、数据回放工具、异构流数据表、C++ API 订阅消费、Python API 订阅消费等 DolphinDB 特性,给出了行情数据最佳分区存储方案,提供了多用户多支股票多天多表交易所行情数据回放最佳实践方案,意在使用户可以在此基础上快速搭建自己的行情回放系统。

10. 附录

附录中的压缩包包含以下文件:

  • 原始行情数据文件
  • 逐笔委托建库建表脚本
  • 逐笔成交建库建表脚本
  • 快照建库建表脚本
  • 逐笔委托示例导入脚本
  • 逐笔成交示例导入脚本
  • 快照示例导入脚本
  • 行情回放函数
  • C++ 源码
  • Python 源码
  • C++ 测试源码

注意:由于原始行情数据文件较大,故采用分卷压缩方式。解压时:

  1. 下载所有的压缩分卷至同一目录。
  2. 使用 7z 等解压缩软件提取名为 data_pack.zip.001 分卷,得到 data_pack 文件夹。
  3. 在 data_pack 文件夹中,解压 data.zip 即可得到原始行情数据文件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/540746.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ChatGPT 提问,软件杂项部分

堆内存与栈内存一般分别 有多少 ChatGPT 堆内存和栈内存的大小取决于操作系统和编译器的限制以及程序的运行环境。以下是一些常见的默认大小范围&#xff0c;但请注意这些值可以因环境而异&#xff1a; 栈内存大小&#xff1a; Windows平台&#xff1a;默认情况下&#xff…

vue3 大致总结

一、开发、生产、测试环境的文件编写 需要以VITE开头&#xff01;&#xff01;&#xff01; 输出时&#xff1a;console.log(import.meta.env.VITE_ENV,"------***---------"); 二、路由守卫 1、全局路由守卫beforeEach和afterEach ①全局前置守卫beforeEach ②…

六、达梦8数据库适配记录

达梦数据库适配记录 记录关于我的业务微服务,适配国产达梦数据库的过程,以及遇到的一些错误问题和其解决方案。 目前的项目最初基于Mysql开发,现在要适配到达梦,不要以为迁移任务很easy,但实际过程中还是出现了很多问题。 基 由于达梦是的国产数据库,本身与MySQL数据库…

idea配置阿里云翻译

idea配置阿里云翻译 0前言1开通阿里云机器翻译2配置阿里云AccessKeyidea配置Translation 0前言 使用idea的码农们都应该对Translation这款插件不会陌生了&#xff0c;尤其是英语基础比较薄弱的盆友&#xff0c;在看源码的时候更是会经常使用Translation边翻边看源码。 但是由于…

EW代理工具的使用说明

一、EW介绍 Earthworm&#xff08;EW&#xff09; 是一套便携式的网络穿透工具&#xff0c;具有 SOCKS v5服务架设和端口转发两大核心功能&#xff0c;可在复杂网络环境下完成网络穿透。 该工具能够以“正向”、“反向”、“多级级联”等方式打通一条网络隧道&#xff0c;直达…

基于REST风格的SpringMVC请求路径设置与参数传递

文章目录 1 REST简介2 RESTful入门案例2.1 环境准备2.2 思路分析2.3 修改RESTful风格新增删除传递路径参数 修改根据ID查询查询所有 知识点1&#xff1a;PathVariable 3 RESTful快速开发知识点1&#xff1a;RestController知识点2&#xff1a;GetMapping PostMapping PutMappin…

【STL】

目录 什么是STLSTL定义两大特点两个层次 STL主要构成容器容器概念容器分类vectordequestackqueuelistset/multiset容器map/multimap容器 算法迭代器仿函数适配器空间配置器新的改变功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图片如何插…

选Ubuntu 还是 Fedora ?

提起开发&#xff0c;程序员们更青睐于不同版本的Linux操作系统而不是Windows。 为什么&#xff1f;因为Linux操作起来更安全、快捷&#xff0c;最重要的是&#xff0c;它的发行版本众多。你可以根据需要挑选最适合的那一款。那么&#xff0c;问题来了&#xff0c;到底哪个版本…

开源地质建模GemPy实战

推荐&#xff1a;用 NSDT设计器 快速搭建可编程3D场景。 在设计任何类型的工程结构时&#xff0c;确定地面以下的东西并有效地将其映射出来是首要也是最重要的部分之一。 地下建模带有很大的误差范围&#xff0c;因为即使是我们今天使用的最先进的地下调查方法也无法完全绘制出…

【Linux】信号概述

目录 1、信号概念2、Linux常用信号表3、信号的5种默认处理动作 橙色 1、信号概念 信号是 Linux进程间通信的最古老的方式之一&#xff0c;是事件发生时对进程的通知机制&#xff0c;有时也称之为软件中断&#xff0c;它是在软件层次上对中断机制的一种模拟&#xff0c;是一种…

Vue3(6) Transition

目录 组件 基于CSS的过渡效果 JavaScript钩子 Vue 提供了两个内置组件&#xff0c;可以帮助你制作基于状态变化的过渡和动画&#xff1a; <Transition> 会在一个元素或组件进入和离开 DOM 时应用动画。 <TransitionGroup> 会在一个 v-for 列表中的元素或组件被…

C++模板template

我们现在有几个变量&#xff0c;我们向要实现他们的交换&#xff0c;所以我们现在写了一个swap函数 我们现在可以实现对这两个变量之间的交换&#xff0c; 那么我们有有两个变量需要交换呢&#xff1f;&#xff1f; 我们刚才的Swap函数的参数是int类型的&#xff0c;我们现在的…

ChatGPT 和对话式 AI 的未来:2023 年的进展和应用

人工智能(Artificial Intelligence)在过去一段时间以来以前所未有的速度快速发展。从自动化日常任务到重要提醒的设定,AI以各种方式渗透到我们的生活中。然而,在这个领域中迈出的最重要一步是ChatGPT。 ChatGPT被瑞银(UBS)评为“有史以来增长最快的消费者应用程序”,于…

Cy5.5-PEG-SH近红外荧光PEG试剂 Cyanine5.5-PEG-SH,Thiol-PEG-Cy5.5可用于活体成像

Cy5.5-PEG-SH &#xff0c;Cy5.5聚乙二醇巯基 英文名称&#xff1a;Cy5.5-PEG-SH 中文名称&#xff1a;Cy5.5聚乙二醇巯基 性状: 深蓝色固体或粘性液体&#xff0c;取决于分子量 溶剂&#xff1a;溶于水、 DMSO等常规性有机溶剂 激发/发射波长&#xff1a;684 nm/710 nm …

Windows操作系统重要内容

windows 常用用户&#xff1a; SYSTEM&#xff1a;本地机器上拥有最高权限的用户。&#xff08;为系统核心组件访问文件资源提供权限&#xff09;Administrator&#xff1a;默认系统管理员用户。Guest&#xff1a;只拥有相对较少的权限&#xff0c;默认被禁用。 Windows 常见…

【中医推荐】33部中医书籍,中医医书精品(在线免费阅读),值得珍藏的国粹,涵盖中药、针灸、推拿、按摩、拔罐、气功,食疗等诸多领域

中医诞生于原始社会&#xff0c;春秋战国时期中医理论已基本形成&#xff0c;之后历代均有总结发展。除此之外对汉字文化圈国家影响深远&#xff0c;如日本医学、韩国韩医学、朝鲜高丽医学、越南东医学等都是以中医为基础发展起来的。 中医承载着中国古代人民同疾病作斗争的经…

(华三AC+AP)在华三AC上通过用户mac地址或者IP地址查询在那一台AP下

起因&#xff1a;用户终端的WiFi信号一直不停地断开重连&#xff0c;发现AP的信号消失了&#xff0c;经过检查配置并没有问题&#xff0c;但是在后来发现重启可以让AP恢复使用&#xff0c;但是过一段时间还是会出现这样的问题&#xff0c;因为AP没有备用换下维修&#xff0c;这…

只需浏览器!在线完成Flutter从编程到打包全过程

本文作者&#xff1a;林梓泓 引言 云端 IDE 是基于云的集成开发环境&#xff0c;开发人员可以远程编写运行和调试代码&#xff0c;无需本地安装&#xff0c;仅通过浏览器即可开发软件。 与传统本地开发相比&#xff0c;云端开发环境主要有以下的优势&#xff1a; 快速启动项…

react类组件生命周期基础总结

组件的生命周期是指组件从被创建到挂载到页面中运行起来&#xff0c;再到组件不用时卸载的过程&#xff0c;只有类组件才有生命周期&#xff08;类组件 实例化 函数组件 不需要实例化&#xff09; 生命周期新版本和旧版本的对比图如下&#xff1a; 生命周期&#xff08;constr…

大数据|Spark介绍

前文回顾&#xff1a;Hive和数据仓库 目录 &#x1f4da;为什么会有Spark &#x1f4da;Spark的基本架构和组件 &#x1f407;主要体系结构和组件 &#x1f407;Spark集群的基本结构 &#x1f407;Spark系统的基本结构 &#x1f407;Spark应用程序的基本结构 &#x1f4…