用时序数据库 DolphinDB 实现地震波形的分析预警

news2024/11/15 22:27:56

1. 绪论

波形数据的存储与实时流处理是地震预警、地震速报、地震烈度速报、震源机制解等数字地震台网综合处理系统的前提,合理的存储方案与高效的实时流处理架构能极大地节约存储成本、降低响应延时、方便震源分析。

本篇教程会为有该方面需求的客户提供一个基于 DolphinDB 的地震波形数据存储及实时流处理的解决方案,助力用户降低存储成本、提升效率。

1.1 行业背景

国家地震台网、 区域地震台网和流动地震台网等组成的中国数字地震观测网络系统每天不间断地产生地震波行数据。该数据的原始形态是方便数据处理的,但为了进行归档和交换,必须进行统一的数据格式化,现使用的国际通用交换格式为 SEED。 国家测震台网数据备份中心实时接收存储的地震波形数据始于2007年8月,至今数据量大约有 100TB 左右。随着地震台站数量和强震数据的不断增多,可以预见今后的测震数据增速会越来越快。

现今大部分测震数据是以 SEED 文件格式或存储于盘阵内,或存储于光盘、移动硬盘等离线介质内,辅以数十张描述这些数据文件的关系型表,这些表遵从《中国数字测震台网数据规范》。有数据分析需求的研究人员通过网络等方式进行数据申请,数据服务方按照请求进行波形数据的截取和打包,并发送给研究员。

传统模式下的地震波形数据存储较为分散,一次能处理的数据长度有限,分析应用的流程冗长,因此并不能有效支撑大规模的数据查询、分析、计算和可视化服务。

1.2 业务挑战

总体而言,地震波形数据的处理可以简化成数据采集、数据清洗和数据可视化展示三个部分。其中的数据采集一般通过部署在各地的地震计完成。地震计 7X24 小时不间断地记录和测量当地的地面运动数据,然后通过网络传输到台网中心,由数据处理部门以人工交互或自动化方式处理数据,监测全国和全球的地震活动情况。

图1 地震波形数据流向

目前地震行业中数据采集相关工作比较完善,基本上也形成了统一的标准,但是在数据处理与数据分析模块中,还未形成统一的解决方案。随着国家对防震减灾的要求越来越高,用于数据采集的地震计数量将会呈数量级增长,并且随着物联网的发展,将会有更多数据接入用于地震监测。因此,原有在单台机器上运行的地震数据处理软件(如地震自动测定软件)的数据处理压力会越来越大。同时,海量数据的汇入,对数据存储、管理、回溯等都提出了新的要求。现如今,对地震数据的处理与分析面临以下几大难点:

  • 文件存储:以 MiniSeed 文件形式单机存储,横向扩展性差,当数据量大时,扩展成为一个难题
  • 离线分析:从服务器上下载 MiniSeed 文件,再通过专业的地震分析软件将其解析成结构化数据进行分析,过程繁琐,效率低下
  • 实时处理与展示:目前无法实现实时计算、实时展示,特别是异常检测等时效性强的需求,无法满足

针对上述问题,目前业界的普遍做法是建立云架构数据中心, 它基于分布式文件系统的海量存储能力和分布式计算能力, 整合数据采集工具和 SQL 交互引擎、列式数据存储引擎, 提供对海量数据的采集、存储、处理分析和应用展示等系列功能,但也存在以下几个问题:

  • 存储成本:将 MiniSeed 文件解析成结构化数据存入数据库体积变大,造成硬件资源成本急剧增加
  • 数据导入/导出功能:结构化数据存入数据库后,按需将数据打包成 MiniSeed 文件难度大,如一个台站所有通道一天的数据打包下载速度极慢,无法为业务人员提供数据支持
  • 外部订阅功能:以接口形式获取数据速度极慢,无法为后续其他系统提供数据支撑

2. DolphinDB解决方案

上述地震波形数据在存储与分析模块时面对的问题,可由 DolphinDB 在存储、流计算上支持的以下特性,以及通过这些特性的组合使用得以解决:

  • 高效存储方式:数据分区列式存储。针对地震数据可采用时间+ id 的方式组合分区来达成灵活分区存储,对于不同分区数据可以多线程并行操作
  • 高效压缩算法:支持 lz4、delta 压缩算法,大大提升压缩率,节约硬件资源
  • 高效排序算法:sortColumn 对分区内数据排序存储,快速定位数据在分区中的位置
  • 强大的流数据计算引擎:毫秒级别计算响应延时

同时 DolphinDB 良好的生态也为 MiniSeed 文件导入、导出以及二次开发提供了极大的便利:

  • 插件支持:自研 MiniSeed 插件,支持 MiniSeed 文件的导入与导出功能,避免了调用第三方工具导入/导出文件的繁琐过程
  • 丰富的机器学习支持:内置一系列常用的机器学习算法,例如最小二乘回归、随机森林、K-平均等,同时还提供了一些插件,方便地用 DolphinDB 的脚本语言调用第三方库进行机器学习,如 XGBoost 插件
  • 丰富的 API 生态:支持 Python, Java, C++, Go 等 API,助力二次开发

以某台网中心为例:每个通道的采样频率为每10毫秒采集一条监测记录,包含时间戳、sid 和采样值三个字段,并以通道为单位、每 3s 将采集到的监测记录打包成 MiniSeed 文件发送到台网中心。有以下需求:

  • 结构化分布式存储:MiniSeed 文件解析成结构化数据并入库存储
  • 低资源投入:控制结构化数据入库存储的体积
  • 实时计算和异常检测:采样数据实时计算、异常检测
  • 低延迟快速响应:任意通道历史数据查询、事件关联查询
  • 数据导入/导出:MiniSeed 文件快速导入、历史数据导出成 MiniSeed 文件
  • 二次开发:能够通过接口获取数据

DolphinDB解决方案架构图如下:

架构说明如下:

  • MiniSeed 文件解析:DolphinDB 通过自研的 MiniSeed 插件可以将 MiniSeed 文件解析为结构化数据,解析结果包含两部分内容,一部分是采样信息,包含采样时间、采样值、MiniSeed 文件块 id;另一部分是 MiniSeed 文件块信息,包含文件块 id、采样时间、解析时间、解析数据总条数、MiniSeed 采样频率等。
  • 实时流接入:流数据表是 DolphinDB 设计专门用于应对实时流数据存储与计算的内存表。具备吞吐量大,低延迟的优点,支持持久化,支持高可用。
  • 流数据发布、订阅与消费:DolphinDB 流数据模块采用发布-订阅-消费的模式。流数据首先注入流数据表中,通过流数据表来发布数据,数据节点或者第三方的应用可以通过 DolphinDB 脚本或 API 来订阅及消费流数据。
  • 分布式存储:DolphinDB 是一款高性能分布式时序数据库,采取了列式存储、数据分区、分布式文件系统、并行处理、多版本并发控制 (MVCC)、数据压缩等技术,在海量数据的查询、计算以及批量加载的性能上具有显著优势。
  • 异常检测引擎:在实时数据流计算场景下,计算要求高效和即时,DolphinDB 精心研发了适合流计算场景的引擎,系统内部采用了增量计算,优化了实时计算的性能。对于异常检测引擎,使用者只需通过简单的参数配置即可实现复杂的规则设计、窗口检测。
  • 可视化展示:DolphinDB 开发了 Grafana 数据源插件 dolphindb-datasource,让用户在 Grafana 面板 dashboard 上通过编写查询脚本、订阅流数据表的方式,与 DolphinDB 进行交互 (基于 WebSocket),实现 DolphinDB 时序数据的可视化。

3. 存储方案设计

存储方案与数据的写入、查询、更新、删除常见操作密切相关,合理的存储方案能极大地提升效率,并且在一定程度上节约硬件资源。因此,在实施整个解决方案之前,需先设计合理的的存储方案。常见的地震数据表结构如表1所示:

字段名称数据类型字段描述
tsTIMESTAMP采样时间
dataINT采样值
netSTRING台网编号
staSTRING台站编号
locSTRING位置编号
chnSTRING通道

表1:常见地震数据表结构

3.1 DolphinDB 分区方案设计

进行历史数据查询时,通常根据台网、台站、位置、通道等条件筛选数据,若要能够快速地查询出历史数据,那么在设计分布式数据库的分区方案时,经常使用查询条件所对应的字段就要被设计为分区列进行值分区,这样在查询时能够进行剪枝处理,精确命中所需的数据。

但由于地震数据查询场景中数据过滤条件过多,按照上述方式设计分区方案,会有多个分区列,导致分区数量过多,降低检索效率。因此,一张表存储所有字段的建表方式会降低查询效率。

考虑到在实际生产环境中,台网、台站、位置、通道等基本信息的取值是固定的并且取值范围较小,因此可以设计维度表存储这些基本信息,然后再建立一张分区表,存储时间、采样值等信息。这种表结构不仅能提升写入、查询、更新、删除等操作的性能,还能避免数据重复,节约磁盘资源。基于此,重新设计的分区表结构与维度表结构如表2、表3所示。

字段名称DolphinDB 数据类型字段描述是否为分区列是否为sortColumn压缩算法
idINT由台网、台站、位置、通道四个字段1对1映射而得,该字段就能表示台网、台站、位置、通道等信息delta
tsTIMESTAMP采样时间delta
dataINT采样值delta

表2 分区表结构

字段名称DolphinDB 数据类型字段描述
idINT同表2中的id
netSYMBOL台网编号
staSYMBOL台站编号
locSYMBOL位置编号
chnSYMBOL通道
tagidSYMBOLmseed 文件块的 sid

表3 维度表结构

3.2 压缩算法设计

对数据进行压缩时,不仅要考虑压缩率,还要兼顾解压速度。DolphinDB 内置 lz4 和 delta 两种压缩算法,根据不同类型的数据采用合适的压缩算法能极大地提升压缩率,降低存储成本。

对于 SHORT, INT, LONG, DECIMAL, 时间或日期类型数据,采用 delta 算法压缩能提升压缩率的同时保证解压速度,其余类型的数据一般建议选择 lz4 算法。

根据上述规则,分区表中的三个列均采用 delta 算法。

4. 环境准备

本章主要展示如何使用 MiniSeed 插件模拟生成数据并解析入库,如果读者已经有数据,可酌情忽略第4.3和第4.4模拟数据章节。

4.1 加载插件

转到 MiniSeed 教程 dolphindb/DolphinDBPlugin,在 bin 目录下,找到需要的插件版本并下载。通过以下方式加载插件:

 loadPlugin("./plugins/mseed/PluginMseed.txt");

MiniSeed 插件提供了 read、write、parse、parseStream 等接口,为 MiniSeed 文件的读、写、解析提供了强大的帮助,用户只需加载 MiniSeed 插件,就能快速地调用这些接口。

4.2 分布式数据库创建与流数据表持久化

根据3.1节,存储采样历史数据的数据库设计方案如下:

  • 选择 TSDB 存储引擎,采用组合分区,一级分区按日期进行值分区,二级分区按设备 Id 进行值分区,每个分区存储一个 id 一天的历史数据
  • 分区列为 ts 和 id,索引列为 id 和 ts。 ts, id, value 列均采用 delta 压缩算法,每个分区内 sortKey 相同的数据全部保留

通过以下方式创建分布式数据库和分区表:

  //创建储实时数据的分布式数据库和分区表
if( existsDatabase("dfs://real") ){ dropDatabase("dfs://real") }
create database "dfs://real" partitioned by VALUE(2023.03.01..2023.03.10), VALUE(1..3900), engine='TSDB'
create table "dfs://real"."realData"(
	id INT[compress="delta"],
	ts TIMESTAMP[compress="delta"],
	value INT[compress="delta"]
)
partitioned by ts, id,
sortColumns=[`id, `ts],
keepDuplicates=ALL

  //创建存储时延计算结果的分布式数据库和分区表
if(existsDatabase("dfs://delay")){dropDatabase("dfs://delay")}
create database "dfs://delay" partitioned by VALUE(2023.03.01..2023.03.10), HASH([INT, 10]), engine='TSDB'	
create table "dfs://delay"."delayData"(
	id INT[compress="delta"],
	tagid SYMBOL,
	startTime TIMESTAMP[compress="delta"],
	receivedTime TIMESTAMP[compress="delta"],
	delay INT[compress="delta"]
)
partitioned by startTime, id,
sortColumns=[`id, `startTime],
keepDuplicates=ALL			

  //创建存储台网、台站、位置、通道等基础信息的维度表 
if(existsTable("dfs://real","tagInfo")){ dropTable(database("dfs://real"),"tagInfo") }
create table "dfs://real"."tagInfo"(
	id INT[compress="delta"],
	net SYMBOL,
	sta SYMBOL,
	loc SYMBOL,
	chn SYMBOL,
	tagid SYMBOL
)
sortColumns=[`id]

库表创建后,可以先行模拟台网、台站、位置、通道等基础信息的维度表数据,示例代码如下:

// 导入台网、台站、位置、通道等基础信息的维度表信息数据
net = ["ZJ","YN","XZ","XJ","TJ"]
sta = ["A0001","A0002","A0003","A0004","A0005","A0006","B0001","B0002","B0003","C0001"]
tmp = `EIE`EIN`EIZ
netList = stretch(net,150)
staList = take(stretch(sta,30),150)
locList = take(`40,150)
chn = take(tmp,150)
colt =   array(STRING)
for(i in 0..(chn.size()-1)){
	colt.append!( chn[i].split()[0] + "_" + chn[i].split()[1] + "_" +chn[i].split()[2] )
}
tagid = "XFDSN:"+netList+"_"+staList+"_"+locList+"_"+colt
t = table(1..150 as id,netList as net,staList as sta,locList as loc,chn,tagid)
pt = loadTable("dfs://real","tagInfo")
pt.append!(t)

流数据表用于接收实时流数据以及进行时延计算,定义四张流数据表,分别用于接收采样信息、MiniSeed 文件信息、时延计算结果以及异常告警信息:

  • dataStream:接收采样数据,其结果有三个走向,分别是存入分布式数据库、接受外部 API 的订阅、实时展示
  • metaStream:保存 MiniSeed 插件解析文件的元数据(如文件的数据行数、解析的时间、文件的时间等),对元数据进行时延计算后将结果注入到 delay 结果表中
  • delayStream:时延计算结果表
  • abnormalStream:异常告警信息结果表

上述流数据表创建及持久化方式如下:

  //删除流表及流数据引擎
unsubscribeTable(tableName = `dataStream,actionName = `append_data_into_dfs)
unsubscribeTable(tableName = `dataStream,actionName = `abnormalDetect)
unsubscribeTable(tableName = `metaStream,actionName = `calculate_delay)
unsubscribeTable(tableName = `delayStream,actionName = `append_delay_into_dfs)	
try{ dropStreamTable(`metaStream) }catch(ex){ print(ex) }
try{ dropStreamTable(`dataStream) }catch(ex){ print(ex) }
try{ dropStreamTable(`delayStream) }catch(ex){ print(ex) }
try{ dropStreamEngine(`engine) }catch(ex){ print(ex) }
try{ dropStreamTable(`abnormalStream) }catch(ex){ print(ex) }	

  //创建建立接收实时流数据的流数据表
st1 = streamTable(1000000:0,`id`tagid`startTime`receivedTime`actualCount`expectedCount`sampleRate,[INT,SYMBOL,TIMESTAMP,TIMESTAMP,INT,INT,DOUBLE])
st2 = streamTable(1000000:0,`id`ts`data,[INT,TIMESTAMP,INT])
st3 = streamTable(1000000:0,`id`tagid`startTime`receivedTime`delay,[INT,SYMBOL,TIMESTAMP,TIMESTAMP,INT])
st4 = streamTable(10000:0,`Time`id`anomalyType`anomalyString,[TIMESTAMP,STRING,INT,STRING])
enableTableShareAndPersistence(table=st1, tableName=`metaStream, asynWrite=true, compress=true, cacheSize=2000000, preCache = 100000)
enableTableShareAndPersistence(table=st2, tableName=`dataStream, asynWrite=true, compress=true, cacheSize=2000000000, preCache = 100000)
enableTableShareAndPersistence(table=st3, tableName=`delayStream, asynWrite=true, compress=true, cacheSize=2000000, preCache = 100000)
enableTableShareAndPersistence(table=st4, tableName=`abnormalStream, asynWrite=true, compress=true, cacheSize=2000000, preCache = 100000)

dataStream 表结构见表2,metaStream、delayStream、abnormalStream 表结构分别见表4、表5、表6。

字段名称DolphinDB 数据类型含义
idINT同表2中的id
tagidSYMBOL同维度表中的tagid
startTimeTIMESTAMPMiniSeed文件块第一条数据采样时间
receivedTimeTIMESTAMP解析时间
actualCountINT解析数据量
expectedCountINTMiniSeed包头指定的采样值个数
sampleRateDOUBLEMiniSeed采样频率

表4 metaStream 表结构

字段名称DolphinDB 数据类型含义
idINT同表2中的 id
tagidSYMBOL同维度表中的 tagid
startTimeTIMESTAMPMiniSeed 文件块第一条数据采样时间
receivedTimeTIMESTAMP解析时间
delayINTMiniSeed 文件块采样时间与解析时间差值

表5 delayStream 表结构

字段名称DolphinDB 数据类型含义
TimeTIMESTAMP告警时间戳
idINT同表2中的 id
anomalyTypeINT异常的类型,即异常指标的表达式在 metrics 中的下标
anomalyStringSTRING异常的内容

表6 abnormalStream 表结构

4.3 MiniSeed 实时流模拟

调用 MiniSeed 插件的 write 函数向 MiniSeed 文件写入采样数据,模拟地震计源源不断地生成 MiniSeed 实时流过程。以下代码以 submitJob 提交任务的形式模拟通道 “ZJ_A0001_40_E_I_E” 一直往 Miniseed 文件写入实时流数据的场景。

def writeMseed(){
	/*
	 * Description:
	 * 	此函数用于模拟实际生产环境中,每3s向台网中心发送 MiniSeed 数据包的过程,即每隔3s向 aimPath 路径上的 MiniSeed 文件写数据
	 */
	sid = "XFDSN:ZJ_A0001_40_E_I_E"
	aimPath = "../streamMiniSeed/ZJ_A0001_40_E_I_E.20000101.mseed"
	sampleRate = 100
	cnt = 0
	do{
		startTime = now()
		mseed::write(aimPath, sid, startTime, sampleRate, rand(-3000..500,300))
		sleep(3000)
		cnt += 1
	}while(cnt < 40)	
}
	

mseedStreamJobid = submitJob("wrtieMseedStream","wrtieMseedStream",witeMseed) //提交后台任务
/*
 * 使用以下方式取消实时写入任务
 * cancelJob(mseedStreamJobid)
 */

4.4 MiniSeed 历史数据模拟

调用 MiniSeed 插件的 write 函数模拟历史数据。以下代码以 submitJob 的方式提交了模拟2023.03.02所有通道1天内波形数据的任务。

def writeMseed(filePath,sid,startTime,sampleRate,valueList){
	/*
	 * Description:
	 * 	此函数用于将一段连续的采样值写入到 MiniSeed 文件
	 * Input:
	 * 	filePath:STRING MiniSeed 文件存储路径
	 * 	sid:STRING 文件块id 
	 * 	startTime:TIMESTAMP 开始时间
	 * 	sampleRate:DOUBLE 采样频率
	 * 	valueList:INT VECTOR 采样值向量
	 */
	mseed::write(filePath, sid, startTime, sampleRate, take(valueList,8639999))
}

def parallelWrite(){
	/*
	 * Description:
	 * 	此函数用于多线程 写 MiniSeed 文件
	 */
	realDbName,dtName = "dfs://real","tagInfo"
	filePathList = exec "../miniSeed/"+net+"."+sta+"."+loc+"."+chn+"."+"20230302.mseed" from loadTable(realDbName,dtName)
	sidList = exec tagid from loadTable(realDbName,dtName)
	startTime = 2023.03.02T00:00:00.000
	sampleRate = 100
	ploop(writeMseed{,,startTime,sampleRate, rand(-3000..500,300)},filePathList,sidList)
}

submitJob("writeMseed","writeMseed",parallelWrite)

4.5 MiniSeed 历史数据解析入库

调用 parseStream 函数多线程解析 MiniSeed 文件,该接口返回一个包含三个键值对的字典 ret:ret[`data]、 ret[`size] 和 ret[`metaData],其中,ret[`data]为采样数据,ret[`metaData]为 MiniSeed 文件块信息。以下代码以 submitJob 的方式提交了解析所有通道 2023.03.02 1天内波形数据的任务。

def parseAndImportIntoDfs(realDbName,realTbNname,dtName,tagInfo,fileParse){
	/*
	 * Description:
	 * 	此函数用于解析 miniSeed 文件,并将结构化数据存入分布式数据库
	 * Input:
	 * 	realDbName,realTbNname,dtNam:STRING 
	 * 	tagInfo:dict 键为台网、台站、位置通道拼接起的字符串,值为对应的 id
	 * 	fileParse:STRING 解析的miniSeed文件存储路径
	 */
	ret = mseed::parseStream(file(fileParse).readBytes(30000000))
	data = ret[`data]
	data.replaceColumn!(`id,tagInfo[data.id])
	delete from data where id = NULL
	pt = loadTable(realDbName,realTbNname)
	pt.append!(data)	 	
}

def parallelInToDfs(realDbName,realTbNname,dtName,tagInfo,filePathList){
	/*
	 * Description:
	 * 	此函数用于并行解析 MiniSeed 文件,并存入分布式数据库
	 * Input:
	 * 	realDbName,realTbNname,dtName:均为 STRING 类型常量,分别代表分布式数据库名、分区表名、维度表名
	 * 	tagInfo:dict 键为由台网、台站、位置、通道组成的字符串,值为对应的id
	 * 	filePathList:STRING VECTOR 需要解析的MiniSeed文件路径向量
	 */
	ploop(parseAndImportIntoDfs{realDbName,realTbNname,dtName,tagInfo,},filePathList)
}

realDbName,realTbNname,dtName = "dfs://real","realData","tagInfo"
filePathList = exec "../miniSeed/"+net+"."+sta+"."+loc+"."+chn+"."+"20230302.mseed" from loadTable(realDbName,dtName)
filePathListGroup = cut(filePathList,48)
d = exec tagid,id from loadTable(realDbName,dtName)
tagInfo=dict(d[`tagid],d[`id])
for(i in 0..(filePathListGroup.size()-1)){
	submitJobEx("parse_mseed_into_dfs"+string(i),"parse_mseed_into_dfs"+string(i),i%10,48,parallelInToDfs,realDbName,realTbNname,dtName,tagInfo,filePathListGroup[i])
}	

将历史数据解析入库并强制刷盘后,观察磁盘空间占用,其磁盘空间占用大小为1.23GB,观察原始 MiniSeed 文件大小为3.12GB,由此可见,MiniSeed 文件解压后存储为结构化数据的体积大大减少了。

5. 数据接入与查询

本章内容包括:

  • 实时流数据解析
  • 实时流数据存储、时延计算、异常告警
  • 历史数据查询与导出

5.1 实时流数据解析

调用 MiniSeed 插件的 parseStream 函数解析4.3节模拟的 MiniSeed 实时流,结果注入 dataStream、metaStream。dataStream、metaStream 接收到的数据如表7、表8所示

idtsdata
12023.03.09T12:33:00.984-539
12023.03.09T12:33:00.994-1250
12023.03.09T12:33:01.004761
12023.03.09T12:33:01.0141835
12023.03.09T12:33:01.024-2811
………………

表7 dataStream 表示例

idtagidstartTimereceiveTimeactualCountsampleRate
1XFDSN:AH_ANQ_00_B_H_E2023.03.09T12:33:00.9842023.03.09T12:33:03.995300100.00
1XFDSN:AH_ANQ_00_B_H_E2023.03.09T12:33:03.9852023.03.09T12:33:06.998300100.00
1XFDSN:AH_ANQ_00_B_H_E2023.03.09T12:33:06.9852023.03.09T12:33:10.014300100.00
1XFDSN:AH_ANQ_00_B_H_E2023.03.09T12:33:09.9862023.03.09T12:33:13.007300100.00
1XFDSN:AH_ANQ_00_B_H_E2023.03.09T12:33:12.9862023.03.09T12:33:15.991300100.00
………………………………

表8 metaStream 示例

以下代码以 submitJob 的方式提交了解析波形实时流数据的任务,streamSimulate 函数代码见附录7.2:

def streamSimulate(meta,data,tagInfo){
	/*
	 * Description:
	 * 	此函数用于接收 MiniSeed 实时流文件,并将其解析为结构化数据,存入流数据表中   
	 * Input:
	 * 	meta:stream TABLE   存放 meta 数据的流表   
	 * 	data:stream TABLE  存放采样数据的流表  
	 * 	tagInfo:dict   tagid 和 id 映射的字典     
	 */
	ret = mseed::parseStream(file("../streamMiniSeed/ZJ_A0001_40_E_I_E.20000101.mseed").readBytes(30000000))
	data_num_row = ret[`data].size()
	meta_num_row = ret[`metaData].size()
	do{
		ret = mseed::parseStream(file("../streamMiniSeed/ZJ_A0001_40_E_I_E.20000101.mseed").readBytes(30000000))
		if(ret[`data].size() > data_num_row){
			now_data_num_row = ret[`data].size()
			now_meta_num_row = ret[`metaData].size()
			qdata = ret[`data][data_num_row:now_data_num_row]
			qmeta= ret[`metaData][meta_num_row:now_meta_num_row]
			data_num_row = now_data_num_row
			meta_num_row = now_meta_num_row
			tmp = tagInfo[qmeta.id]
			qdata.replaceColumn!(`id,tagInfo[qdata.id])
			qmeta = select tmp as ids ,* from qmeta
			delete from qdata where id = NULL
			delete from qmeta where ids = NULL
			objByName(data).append!(qdata)
			objByName(meta).append!(qmeta)
		}
	}while(true)
}

meta,data= "metaStream","dataStream"
d = exec tagid,id from loadTable("dfs://real","tagInfo")
tagInfo=dict(d[`tagid],d[`id])
submitJob("streamSimulate","streamSimulate",streamSimulate,meta,data,tagInfo)

5.2 实时流数据存储、时延计算、异常告警

5.2.1 实时流数据存储

通过流数据表的发布-订阅功能可实现对流数据的处理。对于采样数据,通过内置函数 subscribeTable 订阅 dataStream,实现入库存储。subscribeTable 函数调用方式如下:

subscribeTable([server],tableName,[actionName],[offset=-1],handler,[msgAsTable=false],[batchSize=0],
[throttle=1],[hash=-1],[reconnect=false],[filter],[persistOffset=false],[timeTrigger=false],
[handlerNeedMsgId=false],[raftGroup],[userId=””],[password=””])

5.2.2 时延计算

对于时延计算,首先,定义时延计算函数 calDelay

def calDelay(msg){
	/*
	 * msg:数据流
	 * delayTableName:时延结果流数据表
	 */
	t = select first(id) as id,first(tagid) as tagid,last(startTime) as startTime,receivedTime,last((receivedTime+8*60*60*1000+3000)-(startTime+(actualCount-1)*10)) as delay from msg group by receivedTime
	objByName(`delayStream).append!(t) 	
}

其计算规则如下:

receivedTime-(startTime+(actualCount)*10)

举例说明:receivedTime 为"2023.03.09T12:33:03.995",startTime 为"2023.03.09T12:33:00.984",actualCount 为300,则该 MiniSeed 文件块的时延为2023.03.09T12:33:03.995-(2023.03.09T12:33:00.984+300*10) = 11ms。

然后,再通过 subscribeTable 函数订阅 metaStream,将计算结果注入 delayStream,实现时延计算。时延计算结果示例如表9所示:

idtagidstartTimereceiveTimedelay
1XFDSN:AH_ANQ_00_B_H_E2023.03.09T12:33:00.9842023.03.09T12:33:03.99511
1XFDSN:AH_ANQ_00_B_H_E2023.03.09T12:33:03.9852023.03.09T12:33:06.99813
1XFDSN:AH_ANQ_00_B_H_E2023.03.09T12:33:06.9852023.03.09T12:33:10.01429
1XFDSN:AH_ANQ_00_B_H_E2023.03.09T12:33:09.9862023.03.09T12:33:13.00722
1XFDSN:AH_ANQ_00_B_H_E2023.03.09T12:33:12.9862023.03.09T12:33:15.9915
…………………………

表9 delayStream 示例

5.2.3 异常告警

DolphinDB 内置异常告警引擎 createAnomalyDetectionEngine,其调用方式如下:

createAnomalyDetectionEngine(name,metrics,dummyTable,outputTable,timeColumn,[keyColumn],[windowSize], 
[step],[garbageSize],[roundTime=true],[snapshotDir],[snapshotIntervalInMsgCount],[raftGroup])

对于异常告警,首先根据 createAnomalyDetectionEngine 函数定义异常告警引擎 engine,其告警规则为:

[max(data)>= 300,min(data)<=-2500]

即,3秒长度的时间窗口内,若采样数据最大值大于300或采样数据最小值小于-2500,则告警。

然后通过 subscribeTable 函数订阅 dataStream,将告警结果注入 abnormalStream,实现异常告警。异常告警结果示例如表10所示:

TimeidanomalyTypeanomalyString
2023.03.31T10:01:33.00010max(data) >= 300
2023.03.31T10:01:51.00010max(data) >= 300
2023.03.31T10:01:51.00011min(data) <= -2500
……………………

表10 abnormalStream 示例

在 abnormalStream 中,若同一个 id 在同一个时间戳内出现了两条数据,则说明,在3s内,同时出现了波形数据的最大值大于300、最小值小于-2500的现象。

实时流数据存储、时延计算、异常告警代码如下所示:

//实时流数据入库存储
subscribeTable(tableName=`dataStream, actionName="append_data_into_dfs", offset=0, handler=loadTable("dfs://real","realData"), msgAsTable=true,batchSize=100000, throttle=1, reconnect=true)
//时延计算
subscribeTable(tableName=`metaStream, actionName="calculate_delay", offset=0, handler=calDelay, msgAsTable=true,batchSize=100000, throttle=1, reconnect=true)
//时延计算结果入库存储
subscribeTable(tableName=`delayStream , actionName="append_delay_into_dfs", offset=0, handler=loadTable("dfs://delay","delayData"), msgAsTable=true,batchSize=100000, throttle=1, reconnect=true)
//建立异常告警引擎,设置告警规则
engine = createAnomalyDetectionEngine(name="engine", metrics=<[max(data)>= 300,min(data)<=-2500]>, dummyTable=objByName(`dataStream), outputTable=objByName(`abnormalStream), timeColumn=`ts, keyColumn=`id, windowSize =3000 , step = 3000)
//订阅实时流数据,进行异常告警
subscribeTable(tableName="dataStream", actionName="abnormalDetect", offset=0, handler=append!{engine}, msgAsTable=true)	

5.3 历史数据查询与导出

历史数据查询与导出测试 case 如下:

  • case1:某个台网、某个台站一天所有通道的历史观测数据降采样查询,精确到分钟
  • case2:某个事件发生时刻前2分钟、后11分钟历史观测数据查询,关联台站为20-30个
  • case3:某个台网、某个台站一天内所有通道数据导出为 MiniSeed 文件
  • case4:Java API 查询,某个台网、某个台站一天所有通道的历史观测数据

性能测试结果如表11所示:

场景执行10次耗时(毫秒)平均耗时(毫秒)结果集数据量(条)结果集大小
case17,5747574,320118KB
case22,14521414,040384KB
case319,2371,92325,920,000692MB
case435,2713,52725,920,000692MB

表11 查询与导出性能测试结果

查询 case1-3 代码如下所示:

netAim,staAim,locAim = `ZJ,`A0003, `40  //查询case的查询条件
// case1
timer(10){
	aim  = exec id from loadTable("dfs://real","tagInfo") where net = netAim and  sta = staAim and loc = locAim
	t = select avg(value)  as sample_value from loadTable("dfs://real","realData") where ts >= 2023.03.02T00:00:00.000 and ts < 2023.03.03T00:00:00.000 and id in  aim   group by bar(ts,60000),id
}

// case2
timer(10){
	aim = exec id from loadTable("dfs://real","tagInfo") where net = netAim
	t = select *  from loadTable("dfs://real","realData") where ts >= 2023.03.02T00:08:00.000 and ts <= 2023.03.02T00:21:00.000 and id in aim		 	
}

// case3
timer {
	aim = exec id from loadTable("dfs://real","tagInfo") where net = netAim and sta = staAim
	t = select * from loadTable("dfs://real","realData") where ts >= 2023.03.02T00:00:00.000 and ts < 2023.03.03T00:00:00.000 and id in aim
	t = select ts,loc,net,sta,chn,value,tagid from ej(loadTable("dfs://real","tagInfo"),t,`id) // 导出的结果集数据
	
	sidList = exec distinct tagid from t // 涉及的所有sid
	startTimeTable = select min(ts) as startTime from t group by tagid order by tagid desc  // 文件开始采样时间
	dataList = [] // 涉及的所有采样值
	for(i in 0..(sidList.size()-1)){
		tmpValue = exec value from t where tagid = sidList[i]
		dataList.append!(tmpValue)
	}
	
	aimFilePath= "<YOURDIR>/"+sidList+'.20230302.mseed' // 目标文件
	sampleRate = 100.00 // 采样频率,表示1秒钟有100个采集点,即10ms采集一次
	ploop(mseed::write{,,,sampleRate,},aimFilePath,sidListid,startTimeTable[`startTime],dataList) // 使用mseed::write函数写入文件,使用ploop并行加速
}

5.4 可视化展示

结构化实时流数据可视化展示通过 grafana 实现。在 grafana 中,可通过编写简单的查询语句,便能获取需要展示的数据。grafana 中执行以下查询语句对采样数据实时流进行可视化展示:

//波形实时流数据可视化展示
real_db_name,dt_name = "dfs://real","tagInfo"
t = select avg(data)  as value from dataStream group by bar(ts,6000) as ts, id
select t.value from t left join loadTable(realDbName,dtName) rt on t.id = rt.id  pivot by t.ts,rt.tagid

图3 波形数据

执行以下查询语句对时延数据进行可视化展示:

//时延流数据可视化展示
select delay from delayStream pivot by startTime,tagid

图4 时延数据

6. 总结

DolphinDB 在地震行业的运用远远不止波形数据的存储与查询。正如前文所说,DolphinDB 具有非常完善的产品生态,在强大的生态支持下,DolphinDB 能够支持更加复杂的波形数据过滤与深度学习预警,限于篇幅,本文不作过多介绍。感兴趣的用户可查阅浙江智臾科技有限公司/Tutorials_CN 作更加深入细致的了解。

7. 附录

7.1 参考文献

  • 《大数据架构下的地震波形数据分析应用浅析》
  • 《数字地震台网综合处理系统通用架构研究与设计》
  • 《地震台网智慧服务平台的开发与应用》
  • DolphinDB 流数据教程 streaming_tutorial.md · 浙江智臾科技有限公司/Tutorials_CN - Gitee
  • DolphinDB 分布式存储教程 database.md · dolphindb/Tutorials_CN - Gitee
  • DolphinDB-datasource 插件教程 README.zh.md · dolphindb/grafana-datasource - Gitee

7.2 脚本运行顺序说明

用户可按照行文顺序运行文章中所列出的脚本复现解决方案,此外为方便用户复现,本节对文章中所涉及的脚本整理成文件,用户只需下载并执行附件压缩包中1~8的 txt 文件,然后按照如下所示脚本调用文件中的函数即可复现本文的解决方案:

login(`admin,`123456)
undef(all);
clearAllCache();
go;

/***
 * ==============================
 * 在运行此脚本此之前,需要做以下事情:
 * 	1.建立与server同级的目录  miniSeed
 * 	2.建立与server同级的目录  streamMiniSeed
 * 	3.建立与server同级的目录  outputMiniSeed
 * 	4.在./server/plugins/目录下建立mseed目录,并将libPluginMseed.so文件和PluginMseed.txt文件放置在改目录下
 * 	5. 运行脚本1-8
 * ==============================
 */



netAim,staAim,locAim = `ZJ,`A0003, `40  //查询case的查询条件


  //step1:加载插件、创建分布式库表、流数据表持久化
mainPrepare()

  //step2:MiniSeed实时流数据模拟与解析
mainStreamSimulate()

  //step3:实时流入库、时延计算及异常告警
mainBuidSubscribe()

  //step4:MiniSeed历史数据模拟( 耗时1min左右,该步骤后台作业完成后再执行step5)
mainHistoryMiniSeedSimulate()


  //step5:MiniSeed历史数据入库( 耗时5min左右,该步骤后台作业完成后再执行step6和step7)
mainHistoryMiniSeedParse()


  //step6:磁盘空间占用统计(单位:GB)
mainDiskUsage()

  //step7:查询case1-case4
queryRecord =  mainQueryTest(netAim,staAim,locAim)

  //step8:查询case5
outputRecord = mainOutputMiniSeed(netAim,staAim)

测试脚本压缩包:test_scripts.zip

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/593421.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何在不丢失格式的情况下将 Excel 转换为 PDF?

Microsoft Excel 因其专业的数据或信息显示能力而被广泛采用。但是&#xff0c;它在兼容性上有一个恼人的缺点。 首先&#xff0c;Excel只能在Windows和MacOS上使用&#xff0c;Linux或其他操作系统的用户将无法使用&#xff1b;其次&#xff0c;即使您有一个可以打开和阅读 E…

PERMA幸福模型

PERMA幸福模型 由“积极心理学之父”马丁塞利格曼&#xff08;Martin Seligman&#xff09;在《持续的幸福 》一书中提出。 模型介绍 PERMA指的是创造充实、快乐和有意义的生活所必需的五大要素&#xff0c;它们是: 正向情绪Positive emotions 积极情绪是快乐人生的一个重要…

三种不同的存储芯片性能比较

为了进行性能比较&#xff0c;使用了三种不同的存储芯片&#xff0c;即Everspin EM064LX 64Mib STT‐MRAM、Micron MT25Q 128Mib NOR闪存和Micron MT29F 1Gib SLC NAND闪存。 该基准测试在STM32H755ZI上运行&#xff0c;Cortex‐M7内核的时钟频率为240MHz。Quad SPI总线用于1‐…

安泰功率放大器ATA-8202在EMC电磁兼容测试中的具体应用

作为电子及电器产品质量衡量指标之一的电磁兼容性&#xff08;EMC&#xff09;测试&#xff0c;它的测试正确与否直接关系到产品自身使用的可靠性及安全性&#xff0c;并且它还有可能对于同系统中的其他部分造成影响&#xff0c;不良的电磁兼容性会直接导致系统无法正常运转&am…

GIS数据转换器坐标生成功能

​很多时候我们会拿到一些没有坐标的图片或栅格数据&#xff0c;我们如何给它赋予坐标呢&#xff1f;下面介绍一种为这些数据生成坐标的方法。使用工具是GIS数据转换器-栅格-V4.0。 GIS数据转换器坐标生成功能 使用图片是这样的&#xff0c;网上找的不知道哪位大神画的三国时期…

【毕业季征文】追光人,终将光芒万丈!

追光人&#xff0c;终将光芒万丈&#xff01; 写在最前面最后从0到1 我的二进制探索之路一、从0到1&#xff0c;破茧成蝶二、从0到2n&#xff0c;全面发展三、从0到∞&#xff0c;逐光而行 参考 写在最前面 参与活动#毕业季征文|未来可期# 岁月不居&#xff0c;时节如流。转眼…

朋友轻松拿下字节27K的offer,羡慕了....

最近有朋友去字节面试&#xff0c;面试前后进行了20天左右&#xff0c;包含4轮电话面试、1轮笔试、1轮主管视频面试、1轮hr视频面试。 据他所说&#xff0c;80%的人都会栽在第一轮面试&#xff0c;要不是他面试前做足准备&#xff0c;估计都坚持不完后面几轮面试。 其实&…

C++ 范围for循环

1. 在C98中&#xff0c;不同的容器和数组&#xff0c;遍历的方法不尽相同&#xff0c;写法不统一&#xff0c;也不够简洁&#xff0c;而C11基于范围的 for循环以统一、简洁的方式来遍历容器和数组&#xff0c;用起来更方便了。 C 容器都有哪些 简单介绍&#xff1a; //STL C标…

7分钟的面试,我太难了.....

干了两年外包&#xff0c;本来想出来正儿八经找个互联网公司上班&#xff0c;没想到算法死在另一家厂子。 自从加入这家外包公司&#xff0c;每天都在加班&#xff0c;钱倒是给的不少&#xff0c;所以也就忍了。没想到11月一纸通知&#xff0c;所有人不许加班&#xff0c;薪资…

leetcode-最大二叉树(java)

leetcode654题--最大二叉树 leetcode654 - 最大二叉树解题思路代码演示二叉树专题 leetcode654 - 最大二叉树 leetcode654 - 最大二叉树 原题链接 给定一个不重复的整数数组 nums 。 最大二叉树 可以用下面的算法从 nums 递归地构建: 创建一个根节点&#xff0c;其值为 nums 中…

幻读与不可重复读的区分

幻读 VS 不可重复读 幻读重点在于数据是否存在。原本不存在的数据却真实的存在了&#xff0c;这便是幻读。 在同一个事务中&#xff0c;第一次读取到结果集和第二次读取到的结果集不同。引起幻读的原因在于另一个事务进行了INSERT操作。 不可重复读重点在于数…

中国人民大学与加拿大女王大学金融硕士——跟5月说再见,期待新的精彩

岁月清浅&#xff0c;时光无言。5月的风即将吹来6月的的绚烂&#xff0c;在这个美好的季节&#xff0c;你有新的期盼了吗&#xff1f;在职的你&#xff0c;是否需要再学习呢&#xff0c;中国人民大学与加拿大女王大学金融硕士项目为你提供在职读研的平台&#xff0c;在这里开启…

CMU 15-445 Project #0 - C++ Primer

Project #0 - C Primer 一、题目链接二、准备工作1.项目构建2.代码测试3.代码格式化4.压缩与提交 三、部分实现 一、题目链接 二、准备工作 以下操作在题目文档中均有提及&#xff0c;这里进行简要整理。 1.项目构建 首先需要从远程仓库克隆项目文件&#xff0c;由于该仓库会…

GB28181——实时视频显示

一、主要实现功能 1、支持四路设备接入 2、实时视频分屏显示 3、接入设备信息存储 二、待完善功能&#xff08;现阶段&#xff09; 1、设备的删除 &#xff08;已更新&#xff09; 2、语音对讲 &#xff08;暂时不处理&#xff09; 3、视频输出通道的实时修改 &#xff08;已更…

Nginx详细解说

1 什么是Nginx Nginx是lgor Sysoev为俄罗斯访问量第二的rambler.ru站点设计开发的。从2004年发布至今&#xff0c;凭借开源的力量&#xff0c;已经接近成熟与完善。 Nginx功能丰富&#xff0c;可作为HTTP服务器&#xff0c;也可作为反向代理服务器&#xff0c;邮件服务器。支…

科技云报道:大模型的中场战事,深入垂直行业腹地

科技云报道原创。 自从OpenAI于2022年11月推出ChatGPT后&#xff0c;一场波及全球科技界的“AI海啸”就此爆发。 自今年以来&#xff0c;国内已有超过30家企业入局大模型赛道。从百度“文心一言”、阿里“通义千问”的发布&#xff0c;到网易“玉言”、科大讯飞“星火”、昆仑…

Centos7匹配Locust+prometheus+grafana性能监控平台

一、安装Loust 之前已写过&#xff0c;这里忽略一万字。。。 Locust初次体验【解决webUI没数据】_Vikesgao的博客-CSDN博客 二、安装prometheus Prometheus 是一个开源的服务监控系统和时序数据库&#xff0c;其提供了通用的数据模型和快捷数据采集、存储和查询接口。它的核心…

LLM:LLaMA模型和微调的Alpaca模型

待写 LLaMA模型 论文原文:https://arxiv.org/abs/2302.13971v1 预训练数据 模型架构 模型就是用的transformer的decoder,所以在结构上它与GPT是非常类似的,只是有一些细节需要注意一下。 1、RMS Pre-Norm 2、SwiGLU激活函数 3、RoPE旋转位置编码 Alpaca模型 [Stanford …

cuda编程学习——CUDA全局内存性能优化(八)

前言 参考资料&#xff1a; 高升博客 《CUDA C编程权威指南》 以及 CUDA官方文档 CUDA编程&#xff1a;基础与实践 樊哲勇 文章所有代码可在我的GitHub获得&#xff0c;后续会慢慢更新 文章、讲解视频同步更新公众《AI知识物语》&#xff0c;B站&#xff1a;出门吃三碗饭 …

Python3数据分析与挖掘建模(8)检验

1. 假设检验 1.1 概述 假设检验是一种统计推断方法&#xff0c;用于对一个或多个总体参数提出关于其取值的假设&#xff0c;并根据样本数据对这些假设进行检验。假设检验的目的是根据样本数据提供统计上的证据&#xff0c;以便对总体参数的假设进行接受或拒绝。 在假设检验中…