从 OceanBase 迁移数据到 DolphinDB

news2025/1/29 13:54:46

OceanBase 是一款金融级分布式关系数据库,具有数据强一致、高可用、高性能、在线扩展、高度兼容 SQL标准和主流关系数据库、低成本等特点,但是其学习成本较高,且缺乏金融计算函数以及流式增量计算的功能。

DolphinDB 是一款国产的高性能分布式时序数据库产品。支持 SQL 和使用类 Python 的语法来处理数据,学习成本很低。并且 DolphinDB 提供了 1500 多个函数,对于复杂的数据处理场景有很强的表达能力,极大地降低了用户开发成本。同时 DolphinDB 还具有强大的流式增量计算能力,能够满足用户实时数据的处理需求。

本文旨在为有从 OceanBase 迁移至 DolphinDB 需求的用户提供一份简洁明了的参考,整体框架如下:

1. 应用需求

很多之前使用 OceanBase 的用户不可避免地需要将数据迁移同步至 DolphinDB。DolphinDB 提供了多种灵活的数据同步方法,来帮助用户方便地把海量数据从多个数据源进行全量同步或增量同步。本文中的实践案例基于该需求,提供了将逐笔成交数据从 OceanBase 迁移至 DolphinDB 的高性能解决方案。

现有 2021.01.04 一天的逐笔成交数据,存储于 OceanBase。其部分数据示例如下:

SecurityIDTradeTimeTradePriceTradeQtyTradeAmountBuyNoSellNoTradeIndexChannelNoTradeBSFlagBizIndex
6000352021.01.04T09:32:59.0002.91002905749885398712666136B802050
6000352021.01.04T09:25:00.0002.91290084391778113657581876N45204
6000352021.01.04T09:25:00.0002.9126007566177811371081886N45205
6000352021.01.04T09:32:59.0002.91002905749885398722666146B802051
6000352021.01.04T09:25:00.0002.915001455177811876281896N45206
6000352021.01.04T09:32:59.0002.91002905749885398732666156B802052
6000352021.01.04T09:30:02.0002.910029018941205774447176S252209
6000352021.01.04T09:32:59.0002.91002905749885398802666166B802053
6000352021.01.04T09:30:02.0002.910029018941209284461066S256679
6000352021.01.04T09:32:59.0002.92005805749885398842666176B802054

2. 实现方法

从 OceanBase 迁移数据到 DolphinDB 的方法有以下三种:

  • MySQL 插件

MySQL 插件是 DolphinDB 提供的用于导入 MySQL 数据的插件,同样也适用于 OceanBase 的 MySQL 模式。MySQL 插件配合 DolphinDB 脚本使用,与服务器在同一个进程空间内运行,能高效地完成 OceanBase 数据到 DolphinDB 的数据写入。

MySQL 插件提供如下函数,函数的具体使用请参考 DolphinDB MySQL Plugin

  1. mysql::connect(host, port, user, password, db)
  2. mysql::showTables(connection)
  3. mysql::extractSchema(connection, tableName)
  4. mysql::load(connection, table_or_query, [schema], [startRow], [rowNum], [allowEmptyTable])
  5. mysql::loadEx(connection, dbHandle,tableName,partitionColumns,table_or_query,[schema],[startRow],[rowNum],[transform])
  • ODBC 插件

ODBC(Open Database Connectivity) 插件是 DolphinDB 提供的通过 ODBC 接口访问 支持 ODBC 协议开源产品。其使用方式与 MySQL 插件类似,本文不再赘述,感兴趣的读者可参考 ODBC 插件使用指南。

  • DataX 驱动

DataX 是可扩展的数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的 Reader 插件,以及向目标端写入数据的 Writer 插件,理论上 DataX 框架可以支持任意数据源类型的数据同步工作。

DolphinDB 提供基于 DataXReader 和 DataXWriter 的开源驱动。DolphinDBWriter 插件实现了向 DolphinDB 写入数据,使用 DataX 的现有 reader 插件结合 DolphinDBWriter 插件,即可实现从不同数据源向 DolphinDB 同步数据。用户可以在 Java 项目中包含 DataX 的驱动包,开发从 OceanBase 数据源到 DolphinDB 的数据迁移软件。

DataX 驱动的开发基于 Java SDK,支持高可用。

实现途径数据写入效率高可用
MySQL 插件不支持
DataX 驱动支持

3. 迁移案例与操作步骤

3.1 在 DolphinDB 创建表

针对上面的测试数据,我们需要在 DolphinDB 里创建对应的库表,用于存储迁移过来的数据。对于实际的数据,需要综合考虑被迁移数据的字段、类型、数据量,在 DolphinDB 是否需要分区,分区方案,使用 OLAP还是 TSDB 引擎等情况,去设计建库建表方案。一些数据存储库表设计实践,可以参考 DolphinDB 数据库分区教程

本例建表文件 createTable.dos 内容如下:

def createTick(dbName, tbName){
	if(existsDatabase(dbName)){
		dropDatabase(dbName)
	}
	db1 = database(, VALUE, 2020.01.01..2021.01.01)
	db2 = database(, HASH, [SYMBOL, 10])
	db = database(dbName, COMPO, [db1, db2], , "TSDB")
	db = database(dbName)
	name = `SecurityID`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNo`SellNo`ChannelNo`TradeIndex`TradeBSFlag`BizIndex
	type = `SYMBOL`TIMESTAMP`DOUBLE`INT`DOUBLE`INT`INT`INT`INT`SYMBOL`INT
	schemaTable = table(1:0, name, type)
	db.createPartitionedTable(table=schemaTable, tableName=tbName, partitionColumns=`TradeTime`SecurityID, compressMethods={TradeTime:"delta"}, sortColumns=`SecurityID`TradeTime, keepDuplicates=ALL)
}

dbName="dfs://TSDB_tick"
tbName="tick"
createTick(dbName, tbName)

从 OceanBase 迁移到 DolphinDB 的数据字段映射关系如下表:

OceanBase 字段含义OceanBase 字段OceanBase 数据类型DolphinDB 字段含义DolphinDB 字段DolphinDB 数据类型
证券代码SecurityIDvarchar(10)证券代码SecurityIDSYMBOL
交易时间TradeTimetimestamp交易时间TradeTimeTIMESTAMP
交易价格TradePricedouble交易价格TradePriceDOUBLE
交易数量TradeQtyint(11)交易数量TradeQtyINT
交易金额TradeAmountdouble交易金额TradeAmountDOUBLE
买方委托索引BuyNoint(11)买方委托索引BuyNoINT
卖方委托索引SellNoint(11)卖方委托索引SellNoINT
成交编号TradeIndexint(11)成交编号TradeIndexINT
频道代码ChannelNoint(11)频道代码ChannelNoINT
成交方向TradeBSFlagvarchar(20)成交方向TradeBSFlagSYMBOL
业务序列号BizIndexint(11)业务序列号BizIndexINT

3.2 通过 MySQL 插件迁移

3.2.1 安装 MySQL 插件

MySQL 插件的安装参考 DolphinDB MySQL Plugin

3.2.2 同步数据

1. 运行以下命令加载 MySQL 插件

loadPlugin("ServerPath/plugins/mysql/PluginMySQL.txt")

2. 运行以下命令建立与 OceanBase 的连接

conn = mysql::connect(`127.0.0.1,2881,`root,`123456,`db1)

3. 运行以下命令开始同步数据

mysql::loadEx(conn, database('dfs://TSDB_tick'), `tick, `TradeTime`SecurityID,"tick")

数据共 27211975 条,同步数据耗时约52秒。

4. 同步增量数据

如需实现增量同步,只需将mysql::loadEX中的源数据表名替换为查询语句,并借助 DolphinDB 提供的scheduleJob函数设置定时任务,即可实现增量同步,示例如下,每天 00:05 同步前一天数据:

def scheduleLoad(){
  sqlQuery = "select * from tick where date(TradeTime) =  '" +temporalFormat(today()-1, 'y-MM-dd') +"' ;"
  mysql::loadEx(conn, database('dfs://TSDB_tick'), `tick, `TradeTime`SecurityID,sqlQuery)
}
scheduleJob(jobId=`test, jobDesc="test",jobFunc=scheduleLoad,scheduleTime=00:05m,startDate=2023.04.04, endDate=2024.01.01, frequency='D')

注:为防止节点重启时定时任务解析失败,预先在配置文件里添加 preloadModules=plugins::mysql

3.3 通过 DataX 驱动迁移

3.3.1 部署 DataX

从 DataX 下载地址 下载 DataX 压缩包后,解压至自定义目录。

3.3.2 部署 DataX-DolphinDBWriter 插件

将 DataX-DolphinDBWriter 中源码的 ./dist/dolphindbwriter 目录下所有内容拷贝到 DataX/plugin/writer 目录下。

3.3.3 执行 DataX 任务

1. 根据实际环境配置json文件。详情参考:#DataX DolphinDBWriter插件配置项,配置 json 文件

配置文件 OceanBase_tick.json 的具体内容如下,并将 json 文件置于自定义目录下,本教程中方放置于 datax-writer-master/ddb_script/ 目录下。

{
    "job": {
        "setting": {
            "speed": {
                "channel":1
            }
        },
        "content": [
            {
                "writer": {
                    "parameter": {
                        "dbPath": "dfs://TSDB_tick",
                        "tableName": "tick",
                        "userId": "admin",
                        "pwd": "123456",
                        "host": "127.0.0.1",
                        "batchSize": 200000,
                        "table": [
                            {
                                "type": "DT_SYMBOL",
                                "name": "SecurityID"
                            },
                            {
                                "type": "DT_TIMESTAMP",
                                "name": "TradeTime"
                            },
                            {
                                "type": "DT_DOUBLE",
                                "name": "TradePrice"
                            },
                            {
                                "type": "DT_INT",
                                "name": "TradeQty"
                            },
                            {
                                "type": "DT_DOUBLE",
                                "name": "TradeAmount"
                            },
                            {
                                "type": "DT_INT",
                                "name": "BuyNo"
                            },
                            {
                                "type": "DT_INT",
                                "name": "SellNo"
                            },
                            {
                                "type": "DT_INT",
                                "name": "TradeIndex"
                            },
                            {
                                "type": "DT_INT",
                                "name": "ChannelNo"
                            },
                            {
                                "type": "DT_SYMBOL",
                                "name": "TradeBSFlag"
                            },
                            {
                                "type": "DT_INT",
                                "name": "BizIndex"
                            }
                        ],
                        "port": 8800
                    },
                    "name": "dolphindbwriter"
                },
                "reader": {
                    "name": "oceanbasev10reader",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "batchSize":10000,
                        "column": [
                            "*"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "tick"
                                ],
                                "jdbcUrl": [
                                    "jdbc:oceanbase://127.0.0.1:2883/db1"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

2. Linux 终端中执行以下命令以执行 DataX 任务

cd ./dataX/bin/
python datax.py ../../datax-writer-master/ddb_script/ocean.json

3. 查看 DataX 同步结果

任务启动时刻                    : 2023-04-03 14:58:52
任务结束时刻                    : 2023-04-03 15:00:52
任务总计耗时                    :                120s
任务平均流量                    :           12.32MB/s
记录写入速度                    :         226766rec/s
读出记录总数                    :            27211975
读写失败总数                    :                   0

4. 同步增量数据

使用 DataX 同步增量数据,可在 ocean.json 的 ”reader“ 中增加 "where" 条件对数据日期进行筛选,如此每次执行同步任务时至同步 where 条件过滤后的数据,以同步前一天的数据为例,示例如下:

"reader": {
    "name": "oceanbasev10reader",
    "parameter": {
        "username": "root",
        "password": "123456",
        "batchSize":10000,
        "column": [
            "*"
        ],
        "connection": [
            {
                "table": [
                    "tick"
                ],
                "jdbcUrl": [
                    "jdbc:oceanbase://127.0.0.1:2883/db1"
                ]
            }
        ],
        "where":"date(TradeTime) = (SELECT DATE_ADD(CURDATE(), INTERVAL -1 DAY))"
    }
}

4. 基准性能

分别使用 MySQL 插件和 DataX 驱动进行数据迁移, 数据量 2721 万条,迁移耗时对比如下表所示:

MySQL插件DataX
52s120s

综上,MySQL 插件与 DataX 均能实现 将 OceanBase 中数据迁移到 DolphinDB中,但是各有优缺点:

  • MySQL 插件性能较好,适合批量数据的导入,但是运维管理不便。
  • DataX 导入数据较慢,适合千万级别以下数据集导入,但是其日志追踪,可扩展性以及管理比较方便。

用户可以根据自己数据量的大小以及工程化的便捷性进行选择导入方式,同时,由于篇幅有限,涉及到DolphinDB 和 DataX 框架的一些其它操作未能更进一步展示,用户在使用过程中需要按照实际情况进行调整。也欢迎大家对本教程中可能存在的纰漏和缺陷批评指正。

附录

DataX DolphinDB-Writer 配置项

配置项是否必须数据类型默认值描述
hoststringServer Host
portintServer Port
userIdstringDolphinDB 用户名导入分布式库时,必须要有权限的用户才能操作,否则会返回
pwdstringDolphinDB 用户密码
dbPathstring需要写入的目标分布式库名称,比如"dfs://MYDB"。
tableNamestring目标数据表名称
batchSizeint10000000datax每次写入dolphindb的批次记录数
table写入表的字段集合,具体参考后续table项配置详解
saveFunctionNamestring自定义数据处理函数。若未指定此配置,插件在接收到reader的数据后,会将数据提交到DolphinDB并通过tableInsert函数写入指定库表;如果定义此参数,则会用指定函数替换tableInsert函数。
saveFunctionDefstring数据入库自定义函数。此函数指 用dolphindb 脚本来实现的数据入库过程。 此函数必须接受三个参数:dfsPath(分布式库路径), tbName(数据表名), data(从datax导入的数据,table格式)

table 配置详解

table 用于配置写入表的字段集合。内部结构为

 {"name": "columnName", "type": "DT_STRING", "isKeyField":true}

请注意此处列定义的顺序,需要与原表提取的列顺序完全一致。

  • name :字段名称。
  • isKeyField:是否唯一键值,可以允许组合唯一键。本属性用于数据更新场景,用于确认更新数据的主键,若无更新数据的场景,无需设置。
  • type 枚举值以及对应 DolphinDB 数据类型如下
DolphinDB 类型配置值
DOUBLEDT_DOUBLE
FLOATDT_FLOAT
BOOLDT_BOOL
DATEDT_DATE
MONTHDT_MONTH
DATETIMEDT_DATETIME
TIMEDT_TIME
SECONDDT_SECOND
TIMESTAMPDT_TIMESTAMP
NANOTIMEDT_NANOTIME
NANOTIMETAMPDT_NANOTIMETAMP
INTDT_INT
LONGDT_LONG
UUIDDT_UUID
SHORTDT_SHORT
STRINGDT_STRING
SYMBOLDT_SYMBOL

完整代码及测试数据

  • DataX: OceanBase_tick.json
  • DolphinDB: mysql插件导入数据.dos, createTable.dos
  • 模拟产生数据: genTickCsv.dos

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/572549.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Micro-python Socket 支持 ROS2 topic 框架 (一)

消息Topic ROS2官方文档 Topic官方介绍 是各节点之间的信息交流媒介,可以实现一对一,一对多,多对一,多对多的信息交流,如图所示 (一)使用工具打开消息流图 打开rqt_graph(注意其…

如何利用IDEA将Git分支代码回退到指定历史版本

一、背景 作为一名后端开发,相信大家一定遇到过这样的情景,代码开发人员过多,并且开发分支过多,导致代码版本管理困难,这样就难免遇到一些代码合并出错,比如,当我提交了本次修改到本地和远程分…

Spring Boot注解@Async与线程池的配置

目录 使用异步注解创建异步任务 Async注解 使用Demo 线程池配置 Spring Boot默认用于异步任务线程池配置 线程池配置 线程池隔离 为什么需要线程池隔离? 线程池隔离实现Demo 线程池配置: 异步任务: 测试demo 参考内容: 使…

动态优化会议地点

前言 在现在快节奏的工作节奏下,大家的活动范围越来越广,但是出行成本也相应提高。在集体会面的时候,如何选择合适的地点成为了一个棘手的问题。本文将介绍如何通过动态优化选择会议地点,以达到平均交通成本最低的目标。 动态优化…

【操作系统真象还原】第4章:保护模式入门(4.4~4.5节)

目录 4.4 处理器微架构简介 4.4.1 流水线 4.4.2 乱序执行 4.4.3 缓存 4.4.4 分支预测 4.5 使用远跳转指令清空流水线,更新段描述符缓冲寄存器 4.6 保护模式之内存段的保护 4.6.1 向段寄存器加载选择子时的保护 4.6.2 代码段和数据段的保护 4.6.3 栈段的保…

CentOS7 网络配置

在Linux系统下 查询CentOS7的ip地址 输入ip查询命名 ip addr 也可以输入 ifconfig查看ip,但此命令会出现3个条目, centos的ip地址是ens33条目中的inet值。 输入命令: ip addr 结果如下: 使用: ifconfig 命令查询结果如下: 发现 ens33 没有 inet 这个属性…

国内半导体分立器件逐步向高端应用市场推进,未来可期

分立器件行业概况 半导体分立器件是半导体产业的基础及核心领域之一,其具有应用领域广阔、高成品率、特殊器件不可替代等特性。 从市场需求看,分立器件受益于物联网、可穿戴设备、智能家居、健康护理、安防电子、新能源汽车、智能电网、5G通信射频等市…

如何在华为OD机试中获得满分?Java实现【云短信平台优惠活动】一文详解!

✅创作者:陈书予 🎉个人主页:陈书予的个人主页 🍁陈书予的个人社区,欢迎你的加入: 陈书予的社区 🌟专栏地址: Java华为OD机试真题(2022&2023) 文章目录 1. 题目描述2. 输入描述3. 输出描述4. Java算法源码5. 测试6.解题思路1. 题目描述 某云短信厂商,为庆祝国…

如何建立自己的微信小程序,做一个微信小程序大概多少钱?

如今,小程序的功能越来越强大,也越来越受欢迎,它不仅能帮助企业和商家做推广,还能给他们带来很多好处。所以,很多企业都开始建立自己的小程序。但是对于如何建立自己的微信小程序,以及做一个微信小程序大概…

Zemax Lumerical | 二维光栅出瞳扩展系统优化

简介 本文提出并演示了一种以二维光栅耦出的光瞳扩展(EPE)系统优化和公差分析的仿真方法。 在这个工作流程中,我们将使用3个软件进行不同的工作 ,以实现优化系统的大目标。首先,我们使用 Lumerical 构建光栅模型并使用…

基于C++的网盘系统项目开发教程

项目资源下载 基于C的网盘系统项目源码CSDN下载地址基于C的网盘系统项目源码GitHub下载地址 项目简介 本项目基于C开发,整个项目采用C/S架构,使用Sqlite3数据库存储用户信息,本地磁盘存储用户文件,使用Socket进行客户端和服务器之…

智能指针: share_ptr(共享智能指针)

智能指针 c中不像java自带垃圾回收机制,必须释放掉分配的内存,否则机会造成内存泄漏。因此c11加入了智能指针。智能指针是存储指向动态分配(堆)对象指针的类,用于生存期的控制,能够确保在离开指针所在作用…

Selenium4自动化框架(超级详细)

目录 Selenium4 安装Selenium 安装浏览器驱动 实战案例 导入模块及浏览器驱动 导入模块 启动驱动 定位元素 id、name、class定位 tag_name定位 xpath定位 css选择器定位 link_text、partial_link_text定位 其他定位 定位一组元素 执行操作 浏览器操作 获取信息…

1. 自然语言处理NLP-数据预处理

NLP任务预处理的流程包括: 收集语料库、文本清洗、分词、去掉停用词、标准化和特征提取等。 (1)收集语料库 (2)清洗数据 eg:删除所有不相关的字符,例如非字母数字字母 (3&#xff09…

Java Servlet相关面试题

一、什么是servlet? Servlet是运行在java服务器中的小型Java程序。 作用:接收用户请求,并对请求作出处理,将处理结果相应给客户端。 Servlet是JavaWeb三大组件(Servlet、过滤器,监听器 )之一…

C++学习day--12 循环

第 1 节: 需求分析、项目实现——重复验证 项目实现&#xff1a; #include <iostream> #include <Windows.h> #include <string> using namespace std; int main(void) { string name; string pwd; while (1) { system("cls"); std::cout <…

Windows编辑开发中的内聚性、内聚类型、耦合性和耦合类型

我是荔园微风&#xff0c;作为一名在IT界整整25年的老兵&#xff0c;今天总结一下Windows编辑开发中的内聚性、内聚类型、耦合性和耦合类型。 软件设计的基本原则是信息隐蔽性与模块独立性。 模块设计目标是高内聚&#xff0c;低耦合。 然后记住下面这张神图&#xff0c;一张…

Mybatis之MetaObject

在mybatis中&#xff0c;ResultSetHandler在收集JDBC返回的结果后需要转换成对应的Bean对象&#xff0c;其实映射的原理基本大家都能想到使用的时候java中的反射机制&#xff0c;但是在Mybatis中&#xff0c;提供了一个更加强大的对象&#xff0c;就是MetaObject&#xff0c;使…

Python - 面向对象编程 - 实例方法、静态方法、类方法

实例方法 在类中定义的方法默认都是实例方法&#xff0c;前面几篇文章已经大量使用到实例方法 实例方法栗子 class PoloBlog:def __init__(self, name, age):print("自动调用构造方法")self.name nameself.age agedef test(self):print("一个实例方法&…

阿里P8写出的《深入理解Java虚拟机》最新版,轻松学会JVM底层

前言 Java是目前用户最多、使用范围最广的软件开发技术&#xff0c;Java的技术体系主要由支撑Java程序运行的虚拟机、提供各开发领域接口支持的Java类库、Java编程语言及许许多多的第三E方Java框架(如Spring、 MyBatis等) 构成。在国内&#xff0c;有关Java类库API、Java语言语…