[Zombodb那些事]Zombodb与ElasticSearch的Bulk通信

news2025/1/13 6:36:32

Zombodb与ElasticSearch的Bulk通信

0.前言

Zombodb是一个PostgreSQL插件,使用rust编写,支持pg14以下版本。Zombodb可以允许PostgreSQL查询ElasticSearch中的内容。本篇为《Zombodb那些事》第一篇,后面将更新其他部分内容。

Zombodb会在pg数据库上创建Zombodb索引,当插入/删除/更新数据时在pg上执行的时候到底经历了什么过程呢?

例如:往foo表中插入一条记录,zombodb与es之间的通信是什么?

insert into foo (id) values (1);

再比如:更新foo表中的一条记录,zombodb与es之间的通信是什么?

update foo set id = id where id = 1;

同理,删除又做了什么呢?

当插入的数据比较大的时候,Zombodb是如何防止OOM?如何保证高性能的请求?又如何保证在用户取消执行SQL时,ES与数据库中的数据能够保持一致?

为了回答这些问题,便有了这篇文章。

15cdda097a8ae4c8f9ce935875a47d86.png

bdf1598d366303fedd96abbd78ea84e4.png

1.Bulk实现概要

在Zombodb中,bulk.rs实现了往ES发送Bulk请求,并处理应答。

Bulk内核层面,有三个结构体、一个枚举。

  • BulkRequestCommand Bulk请求命令枚举类型

  • ElasticsearchBulkRequest ES Bulk请求结构体

  • Handler 处理器结构体

  • BulkReceiver Bulk请求命令序列化Buffer结构体

首先来看Bulk请求命令:

// 伪代码
pub enum BulkRequestCommand<'a> {
    Insert {ctid: u64, ...}
   Update {...}
   ...
}

15d45a0402d221960f620324a1c2c680.png

ES Bulk请求实现机制为委托设计模式,通过使用内部的handler处理器实现,而ES Bulk结构体本身是对外提供接口,例如:插入/更新/删除等操作。

308e6766f508f80342f503f2b91fc338.png

b8751f5f3a17009a86afbcd20824b8e5.png

ES Bulk本身内容详细的来说:

  • 处理器

    • 是ES Bulk请求的处理器,在内部会创建发送端与接收端通道,ES Bulk请求结构体会把前面准备好的Bulk请求命令通过发送端通道发出去,同时创建多个线程,每个线程会通过接收端通道从通道中循环读取每一个Bulk请求命令,随后发给ES,对ES的应答进行处理。

      当然处理器的工作是比较复杂的,这里只是简单的说了一下工作原理,后续会详细阐述。

  • ES对象

    • ES对象会在后面的内核分析中讲解这一块内容,本节不做额外说明,简单理解为给ES直接打交道的对象,例如:给es发送mapping请求、setting请求、刷新请求等等。

    • 在这里ES对象主要的作用在于:刷新索引(refresh操作)、给ES发送请求。

  • 刷新

    ---immediate策略
    CREATE INDEX idxtest 
    ON test 
    USING zombodb ((test.*)) 
    WITH (url='http://localhost:9200/', 
        refresh_interval='immediate');
    ---other策略,zombodb会把5s传递给索引设置项_settings
    CREATE INDEX idxtest 
    ON test 
    USING zombodb ((test.*)) 
    WITH (url='http://localhost:9200/', 
        refresh_interval='5s');
    • 对应ES的refresh操作,此处为布尔值,表示是否进行刷新,如果进行刷新,会获取用户在创建索引时的刷新策略,此处有三个值:immediate、async、other。表示立刻刷新、异步刷新、不做刷新操作(这个策略下ES会在后台自动刷新)。refresh_interval是一个GUC值,默认是-1,跟immediate等价,下面是两个例子。

  • 并发相关

queue_size通道容量并没有使用,估计是Zombodb的bug,上游传递的是10_000,代码里面多处写了这个字面量,估计是没替换成这个变量。

concurrency并发度,从ES的shards、CPU核心数、用户指定的并发度取最小值。

CREATE INDEX idxevents ON events USING zombodb ((events)) WITH (bulk_concurrency=2);

batch_size内存中请求命令字节最大限制,在给ES发起bulk请求之前,会从BulkReceiver(是一个reader)读取数据,由于数据可能比较大,例如:插入一篇非常多文字的文章,Zombodb实现了std::io::Read ,在读取的时候可以拿到每次读取字节的偏移量,下次接着读。但是,BulkReceiver中会有个接收端通道,每次从对象中缓存的请求命令进行读取。如果取不出来,则从handler的接收端通道读取,此时可能会出现OOM的问题,batch_size是用来控制每次读取的最大限制。

error_receiver为接收请求错误的通道,处理器handle在遇到错误时,会通过handler的error_sender发送错误信息,error_receiver则会从通道中接收错误信息。

2.处理器Handler

在上面一小节提到ES Bulk对外的接口会通过调用内部的handler来完成相应的功能,因此最核心的内容为handler的相关操作,一起来看看handler里面包含了什么内容,又是怎么实现并发控制、ES请求。

在Zombodb内部会将请求划分为:

d5c41def9ee120fc2c349bd6b4cab065.png

  • 非延迟请求

insert into foo (id) values (1);

这种请求比较简单,handler直接收到insert的command后,此时调用queue_command(false),将请求命令发送出去,接收端收到之后根据请求过来的command序列化到buffer中,发送给ES,处理应答结果。

  • 延迟请求

update foo set id = id where id = 1;

延迟请求比较有意思了,简单来说就是在更新期间,Zombodb推迟对 ES BulkRequestCommand 进行排队,直到通过索引 AM API 看到相应的插入命令。

一个update语句,可以被拆解为两步骤:

1.update触发器触发bulk的update逻辑,此时会记录下当前ctid,将update请求放到handler的prior_update选项中。

2.执行索引数据插入,此时会触发bulk的insert逻辑,从handler中获取到prior_update放入insert的请求中。把该请求通过发送端发送出去(通过调用queue_command(false))),接收端收到后会判断是一个insert请求,此时会将请求放到set中,由于之前记录了prior_update,因此会把请求放入延迟插入请求命令数组中(调用queue_command_ex(is_deferred=true)),再最后进行处理。

handler中最核心的工作是通过queue_command来实现的,queue_command会去调用queue_command_ex,只不过queue_command处理非延迟请求,queue_command_ex可以通过是否延迟请求参数控制。

pub fn queue_command(
    &mut self,
    command: BulkRequestCommand<'static>,
) -> Result<(), crossbeam_channel::SendError<BulkRequestCommand<'static>>> {
    self.queue_command_ex(command, false)
}

pub fn queue_command_ex(is_deferred: bool)
}

queue_command_ex中操作如下:

  • 非延迟插入,获取当前事务id,将xid放入Zombodb执行器(后续文章说明)的数组中,通过执行器的es对象发起transaction_in_progress请求命令,回调queue_command。

  • 插入请求会放入延迟插入通道中。

  • bulk发送通道发送请求命令,此时支持超时重试、执行sql时的ctrl+c中断响应。

  • 创建bulk接收通道的线程,用来将刚才的请求对象序列化ES Json字节流,ES应答包处理。

  • 文档数加1。

以一个实际插入为例,此时为非延迟插入请求。

insert into foo (id) values (1);

9afba7263caf05baf18f79713b477b42.png

第一次通道中放入事务正在运行的命令:

TransactionInProgress {
    xid: 856,
}

第二次通道中放入实际插入的数据命令:

Insert {
    prior_update: None,
    ctid: 1,
    cmin: 1,
    cmax: 1,
    xmin: 865,
    // xmax: u64, 此时没 xmax
    builder: JsonBuilder<'a>, // 插入的一条记录json格式
}

第三次通道中放入事务提交的命令

TransactionCommitted {
    xid: 865,
}

那如果换成延迟插入请求:

b4bbbb91ff6f280b67a8dcfb68384728.png

update foo set id = id where id = 1;

第一次通道中放入事务正在运行的命令:

TransactionInProgress {
    xid: 866,
}

第二次通道中放入更新命令:这里是由更新触发触发器调用。

Update {
    ctid: 1,
    cmax: 1,
    xmax: 866,
}

第三次通道中放入实际插入的数据命令:

Insert {
    prior_update: None,
    ctid: 2,
    cmin: 1,
    cmax: 1,
    xmin: 866,
    // xmax: u64, 此时没 xmax
    builder: JsonBuilder<'a>, // 插入的一条记录json格式
}

第四次通道中放入事务提交的命令:

TransactionCommitted {
    xid: 866,
}

28d3d4eb5d47da9076449aad40baeab2.png

handler中有如下成员,下面列的比较详细,就不多赘述了,延迟插入的实现是通过prior_update、in_flight、deferred这三个变量来实现。handler最本质的工作是将所有延迟插入的请求放入到deferred数组中,将创建的多个线程放入线程数组中,最后由ElasticsearchBulkRequest的finish递归处理:

  • 延迟请求,调用queue_command_ex(true),递归调用finish,获取处理的总文档数与请求成功数

  • 非延迟请求,直接调用wait_for_completion,等待所有线程完成,获取处理的总文档数与请求成功数。

当然在finish中还会去根据用户是否传递刷新索引选项来决定用何种策略去refresh ES。

110cf65e99334984473abd65fbad364c.png

3.序列化请求命令

在前面我们知道handler会创建通道,会把请求命令通过发送端发送出去,接收端收到进行处理,那么如何处理的?处理了哪些东西?

这就引入了BulkReciever结构,接收端通道得到的Bulk请求命令是一个枚举类型,并不是一个真正的数据,而给ES的请求必须是json格式,同时为了高效的传输数据,防止rust oom的问题,引入了这么一层抽象。

通过BulkReciever实现std::io::Read trait,根据不同的请求枚举命令,序列化出不同的json结构,这里的细节是放入了字节流数组中,读取的时候按照偏移量进行读取。

f0998b0a16ebf4cf9ed9998c515b7b5b.png

以最复杂的插入请求为例,由于上面提到的延迟请求更新问题,这里需要判断是否有延迟插入,如果有,先序列化一下,随后再序列化当前插入请求命令。

还是以上述的插入与更新为例,非延迟插入:

insert into foo (id) values (1);

c9419a806a3eb57ce72f0e68debaf7a8.png

延迟插入:

update foo set id = id where id = 1;

870876621cfadb4f557258d0c5b446fa.png

上面两图中的json串为BulkReciever所做的核心逻辑,根据不同的请求命令生成对应的Json串。

4.一些细节

实际实现层面,Zombodb解决了诸多问题,例如:性能问题、delete问题、延迟插入问题等等。

4.1 延迟插入问题

延迟插入的内容在前面也详细的阐述,它解决了HOT所带来的问题。相关issue:

https://github.com/zombodb/zombodb/issues/618

https://github.com/zombodb/zombodb/issues/759

MR:https://github.com/zombodb/zombodb/commit/7ce4bb42bf71cf855d14ebe02e8f9656adf78d4b

delete问题在上面759中有涉及到,复现SQL为:

BEGIN;
INSERT INTO cats VALUES (1, 'foo');
UPDATE cats SET value = 'bar' WHERE id = 1;
DELETE FROM cats WHERE id = 1;
COMMIT;

由于早期的delete实现是直接把update请求放入通道中就完事了,但是如果在同一个事务中先Update再Delete,则会存在update es失败问题,这个问题主要的原因在于Zombodb做了延迟插入,例如:delete之前的update会先update再insert,在insert会被延迟,由于事务还未提交, es中是没有数据的,所以碰到delete的时候,直接更新就会update es失败。因此,现在ES Bulk请求的delete实现为:

if self.handler.in_flight.contains(&ctid) {
    self.handler.deferred.push(command);
    Ok(())
} else {
    self.handler.queue_command(command)
}

4.2 性能问题

索引性能优化:例如:ES请求时从reader中进行数据读取,queue_size设置等的代码被合入主干的MR为:

https://github.com/zombodb/zombodb/pull/684

在这个MR中提到了很多ES插入的优化点,例如:batch_size、queue_size、bluk容量等等,其实就是前面讲的内容。

在给ES发送请求后得到的回包也是经过cbor经过压缩,可以看到解析是由serde_cbor来做的,同时传递给es的url的format也是cbor。

{}/_bulk?format=cbor&filter_path={}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/159447.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

智能文字识别技术推动彝文识别弘扬中华文化

前言 谈起图像识别自己颇有感触&#xff0c;因为之前的两段工作经历都和图像识别密切相关&#xff1b;之前一家公司的主营业务就是将历史上珍贵文献进行数字化&#xff1b;上家公司自己负责图像识别模块相关的工作&#xff1b;不但使用了第三方平台产品而且进行了自建&#xff…

设计模式相关内容介绍—UML

统一建模语言(Unified ModelingLanguage&#xff0c;UML)是用来设计软件的可视化建模语言。它的特点是简单、统一、图形化、能表达软件设计中的动态与静态信息。 UML从目标系统的不同角度出发&#xff0c;定义了用例图、类图、对象图、状态图、活动图、时序图、协作图、构件图、…

经过2022年这大环境,我学会了如何管理我的领导

2022年这大环境&#xff0c;可以说是我干软件开发这些年来&#xff0c;经历的最残酷的一年&#xff0c;所以做为职场软件开发一员的我&#xff0c;不得不修炼一下真本事。 很多时候不是你不努力&#xff0c;不是你连mysql连的不溜&#xff0c;不是你布局页面布局的不精细&#…

16.Isaac教程--Codelets详解

Codelets详解 ISAAC教程合集地址: https://blog.csdn.net/kunhe0512/category_12163211.html 文章目录Codelets详解Codelets 和 tick接收消息传输消息方便的 ToProto/FromProto 函数配置参数应用程序 JSON子图姿态组件是机器人应用程序的基本构建块。 Isaac SDK 包含可在您的应…

「数据结构详解·九」图的初步

「数据结构详解一」树的初步「数据结构详解二」二叉树的初步「数据结构详解三」栈「数据结构详解四」队列「数据结构详解五」链表「数据结构详解六」哈希表「数据结构详解七」并查集的初步「数据结构详解八」带权并查集 & 扩展域并查集「数据结构详解九」图的初步 注意&…

基于JavaWEB SSM SpringBoot婚纱影楼摄影预约网站设计和实现

基于JavaWEB SSM SpringBoot婚纱影楼摄影预约网站设计和实现 博主介绍&#xff1a;5年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 超级帅帅吴 Java毕设项目精品实战案例《500套》 欢迎点赞 收藏 ⭐留言 文末…

天宝营养冲刺深交所IPO:业绩明显波动,深创投是股东

撰稿|汤汤 来源|贝多财经 近日&#xff0c;贝多财经发现&#xff0c;天宝动物营养科技股份有限公司&#xff08;下称“天宝营养”&#xff09;递交预披露更新招股书&#xff0c;准备在深圳证券交易所主板上市&#xff0c;红塔证券为其独家保荐人。本次冲刺上市&#xff0c;天…

《Protein Actions Principles and Modeling》-《蛋白质作用原理和建模》中文分享(16)

​《Protein Actions Principles and Modeling》-《蛋白质作用原理和建模》 本人能力有限&#xff0c;如果错误欢迎批评指正。 第四章&#xff1a;Protein Binding Leads to Biological Actions &#xff08;蛋白质的结合会产生生物作用&#xff09; -在变构中&#xff0c;…

大神推荐,这几个电脑实用技巧,让你电脑用起来更加流畅舒服

电脑在我们的日常生活中&#xff0c;往往承担着“办公学习”的作用&#xff01;所以我们应该掌握哪些常用、好用的电脑使用技巧呢&#xff1f;今天就给大家分享下&#xff0c;我日常在使用电脑过程中&#xff0c;经常会使用到的几个电脑使用技巧&#xff01;第一&#xff1a;快…

基于FPGA的UDP 通信(三)

目录 引言 设计框图 UDP接收模块 设计源码 TEST BENCH 仿真结果 引言 前文链接&#xff1a; 基于FPGA的UDP 通信&#xff08;一&#xff09; 基于FPGA的UDP 通信&#xff08;二&#xff09; 本文基于FPGA设计千兆以太网通信模块&#xff1a;FPGA接收上位机数据。后续…

端到端的传输协议

&#xff08;一&#xff09;如何在一条物理链路上进行有效和可靠的数据传输 ——数据链路层传输协议 &#xff08;1&#xff09;标识高层送下来的数据块的起止、特定内容&#xff08;例如校验比特&#xff09;的位置 ——组帧技术 &#xff08;2&#xff09;如何发现传输中的错…

数据结构---线性表

刘佳瑜*&#xff0c;王越 *, 黄扬* , 张钊* (淮北师范大学计算机科学与技术学院&#xff0c;安徽 淮北) *These authors contributed to the work equllly and should be regarded as co-first authors. &#x1f31e;欢迎来到数据结构的世界 &#x1f308;博客主页&#xff1…

回溯法--最大团问题

问题描述什么是最大团&#xff1f;最大团的定义&#xff1f;完全图&#xff1a;如果无向图中的任何一对顶点之间都有一条边&#xff0c;这种无向图称为完全图。完全子图&#xff1a;给定无向图G(V,E)。如果U⊆V&#xff0c;且对任意u&#xff0c;v⊆U 有(u&#xff0c;v) ⊆ E&…

ZigBee 3.0实战教程-Silicon Labs EFR32+EmberZnet-5-02:串口发送数据-hello world

【源码、文档、软件、硬件、技术交流、技术支持&#xff0c;入口见文末】 【所有相关IDE、SDK和例程源码均可从群文件免费获取&#xff0c;免安装&#xff0c;解压即用】 持续更新中&#xff0c;欢迎关注&#xff01; 前面《ZigBee 3.0实战教程-Silicon Labs EFR32EmberZnet-5…

90 后学霸博士 8 年进击战:用机器学习为化工研究叠 BUFF

本文首发自微信公众号&#xff1a;HyperAI超神经 内容一览&#xff1a;ScienceAI 作为近两年的技术热点&#xff0c;引起了业界广泛关注和讨论。本文将围绕 ScienceAdvances 的一篇论文&#xff0c;介绍如何利用机器学习&#xff0c;对燃煤电厂的胺排放量进行预测。 关键词&…

初始化一个GCP项目并用gcloud访问操作

1 简介 谷歌云GCP&#xff08;Google Cloud Platform&#xff09;是由Google提供的云平台&#xff0c;还是为用户提供了许多免费的产品&#xff0c;还是可以尝试一下的。对于学习或者小项目&#xff0c;都可以使用。 2 创建一个新项目 要使用GCP&#xff0c;我们需要创建一个…

【日常业务开发】常用JSON库API

【日常业务开发】常用JSON库APIGsonJava 对象转 Json字符串(序列化)Json字符串转Java 对象(反序列化)FastJsonJava 对象转 Json字符串(序列化)Json字符串转Java 对象(反序列化)JacksonJava 对象转 Json字符串(序列化)Json字符串转Java 对象(反序列化)Json 字符串内容反序列化为…

计算机网络各层设备故障及可行的解决方案

计算机网络分层 我们采用某五层模型进行研究 根据有关资料的统计&#xff0c;网络发生故障具体分布为&#xff1a; 应用层占3%&#xff1b; 表示层占7%&#xff1b; 会话层占8%&#xff1b; 传输层占10%&#xff1b; 网络层占12%&#xff1b; 数据链路层占25%&#xff…

大数据开发-Linux操作

目录1.1 touch:创建空文件1.2 mv :move 剪切粘贴--重命名和移动功能1.3 cat命名&#xff1a;查看文件内容1.4 cp &#xff1a;copy 复制粘贴1.5 ps、kill、ifconfig、clear1.6 回顾&#xff1a;1.1 touch:创建空文件 touch a.txt b.txt — 创建空文件a.txt与b.txt touch /root…

数据结构与算法(二)——递归算法

目录 前言 递归算法 1、什么是递归算法 2、核心理念 3、代码演示 4、问题探讨&#xff1a;只递不归会怎样&#xff1f; 5、小结 递归实例&#xff1a;汉诺塔问题 1、故事引入 2、移动盘子的逻辑 3、N个盘子的移动分析 4、代码实现N个盘子的移动 5、汉诺塔移动次数计…