基于 RocketMQ Connect 构建数据流转处理平台

news2024/10/5 19:14:36

作者:周波,阿里云智能高级开发工程师, Apache RocketMQ Committer

01 从问题中来的 RocketMQ Connect

在电商系统、金融系统及物流系统,我们经常可以看到 RocketMQ 的身影。原因不难理解,随着数字化转型范围的扩大及进程的加快,业务系统的数据也在每日暴增,此时为了保证系统的稳定运行,就需要把运行压力分担出去。RocketMQ 就担任着这样的角色,它的异步消息处理与高并发读写能力,决定了系统底层的重构不会影响上层应用的功能。而 RocketMQ 的另一个优势——可伸缩能力,使系统在面临流量的不确定性时,实现对流量的缓冲处理。此外,RocketMQ 的顺序设计特性使其成为一个天然的排队引擎,例如,三个应用同时对一个后台引擎发起请求,确保不引起“撞车”事故。因此,RocketMQ 被用在异步解耦、削峰填谷以及事务消息等场景中。

但是,数字化转型浪潮也带来了更多用户对数据价值的关注——如何让数据产生更大利用价值?RocketMQ 自身不具备数据分析能力,但是有不少用户希望从 RocketMQ Topic 中获取数据并进行在线或离线的数据分析。然而,使用市面上的数据集成或数据同步工具,将 RocketMQ Topic 数据同步到一些分析系统中虽然是一种可行方案,却会引入新的组件,造成数据同步的链路较长,时延相对较高,用户体验不佳。

举个例子,假设业务场景中使用 OceanBase 作为数据存储,同时希望将这些数据同步到 Elasticsearch 进行全文搜索,有两种可行的数据同步方案。

方案一: 从 OceanBase 中获取数据,写入 Elasticsearch 组件并进行数据同步,在数据源较少时此方案没什么问题,一旦数据增多,开发和维护都非常复杂,此时就要用到第二种方案。

在这里插入图片描述

方案二: 引入消息中间件对上下游进行解藕,这能解决第一种方案的问题,但是一些较为复杂的问题还没有完全解决。比如,如何将数据从源数据同步到目标系统并保证高性能,如果保证同步任务的部分节点挂掉,数据同步依然正常进行,节点恢复依然可以断点续传,同时随着数据管道的增多,如何管理数据管道也变得十分困难。

在这里插入图片描述

总的来说,数据集成过程中的挑战主要有五个。

挑战一: 数据源多,市面上可能有上百个数据源,且各数据源的系统差异较大,实现任意数据源之间的数据同步工作量较大,研发周期很长。

挑战二: 高性能问题,如何高效地从源数据系统同步到目的数据系统,并保障其性能。

挑战三: 高可用问题,即 Failover 能力,当一个节点挂掉是否这个节点的任务就停止了,任务重新启动是否还可以断点续传。

挑战四: 弹性扩缩容能力,根据系统流量动态增加或减少节点数量,既能通过扩容满足高峰期业务,也能在低峰期缩减节点,节省成本。

挑战五: 数据管道的管理运维,随着数据管道的增多,运维监控的数据管道也会变得越来越复杂,如何高效管理监控众多的同步任务。

面对上述挑战 RocketMQ 如何解决?

第一,标准化数据集成 API (Open Messaging Connect API)。在 RocketMQ 生态中增加 Connect 组件,一方面对数据集成过程抽象,抽象标准的数据格式以及描述数据的 Schema,另一方面对同步任务进行抽象,任务的创建、分片都抽象成一套标准化的流程。

第二,基于标准的 API 实现 Connect Runtime。Runtime 提供了集群管理、配置管理、位点管理、负载均衡相关的能力,拥有了这些能力,开发者或者用户就只需要关注数据如何获取或如何写入,从而快速构建数据生态,如与OceanBase、MySQL、Elasticsearc 等快速建立连接,搭建数据集成平台。整个数据集成平台的构建也非常简单,通过 Runtime 提供的 RESTFull API 进行简单调用即可。

第三,提供完善的运维工具,方便管理同步任务,同时提供丰富的 Metrics 信息,方便查看同步任务的 TPS、流量等信息。

02 RocketMQ Connect 两大使用场景

这里为大家整理了 RocketMQ Connect 的两大使用场景。

场景一,RocketMQ 作为中间媒介,可以将上下游数据打通,比如在新旧系统迁移的过程中,如果在业务量不大时使用 MySQL 就可以满足业务需求,而随着业务的增长,MySQL 性能无法满足业务要求时,需要对系统进行升级,选用分布式数据库OceanBase 提升系统性能。

如何将旧系统数据无缝迁移到 OceanBase 中呢?在这个场景中 RocketMQ Connect 就可以发挥作用,RocketMQ Connect 可以构建一个从 MySQL 到 OceanBase 的数据管道,实现数据的平滑迁移。RocketMQ Connect 还可以用在搭建数据湖、搜索引擎、ETL 平台等场景。例如将各个数据源的数据集成到 RocketMQ Topic当中,目标存储只需要对接 Elasticsearch 就可以构建一个搜索平台,目标存储如果是数据湖就可以构建一个数据湖平台。

除此之外,RocketMQ 自身也可以作为一个数据源,将一个 RocketMQ 集群的数据同步到另一个集群,可以构建 RocketMQ 多活容灾能力,这是社区正在孵化的Replicator可以实现的能力。

在这里插入图片描述

场景二,RocketMQ 作为端点。 RocketMQ 的生态中提供了流计算能力组件 -RocketMQ Streams,Connector 将各个存储系统的数据集成到 RocketMQ Topic 当中,下游使用 RocketMQ Streams 流计算的能力就可以构建一个实时的流计算平台。当然也可以配合业务系统的 Service 实现业务系统快速从其它存储统一快速获取数据的能力。

在这里插入图片描述

还可以将 RocketMQ 作为端点的上游,将业务消息发到 Topic 中,使用 Connector 对数据做持久化或转存的操作。

在这里插入图片描述

如此一来,RocketMQ 就具备数据集成能力,可以实现任意任意异构数据源之间的数据同步,同时也具备统一的集群管理、监控能力及配置化搭建数据管道搭建能力,开发者或者用户只需要专注于数据拷贝,简单配置就可以得到一个具备配置化、低代码、低延时、高可用,支持故障处理和动态扩缩容数据集成平台。

那么, RocketMQ Connect 是如何实现的呢?

03 RocketMQ Connect 实现原理

在介绍实现原理前,先来了解两个概念。

概念一,什么是 Connector(连接器)? 它定义数据从哪复制到哪,是从源数据系统读取数据写入RocketMQ,这种是 SourceConnector,或从 RocketMQ 读数据写入到目标系统,这种是 SinkConnector。Connector 决定需要创建任务的数量,从 Worker 接收配置传递给任务。

在这里插入图片描述

概念二,什么是 Task ? Task 是 Connector 任务分片的最小分配单位,是实际将源数据源数据复制到 RocketMQ(SourceTask),或者将数据从 RocketMQ 读出写入到目标系统(SinkTask)真正的执行者,Task 是无状态的,可以动态的启停任务,多个 Task 可以并行执行,Connector 复制数据的并行度主要体现在 Task 上。一个 Task 任务可以理解为一个线程,多个 Task 则以多线程的方式运行。

在这里插入图片描述

通过 Connect 的 API 也可以看到 Connector 和 Task 各自的职责,Connector 实现时就已经确定数据复制的流向,Connector 接收数据源相关的配置,taskClass 获取需要创建的任务类型,通过 taskConfigs 的数量确定任务数量,并且为 Task 分配好配置。Task 拿到配置以后数据源建立连接并获取数据写入到目标存储。通过下面的两张图可以清楚的看到,Connector 和 Task 处理基本流程。

在这里插入图片描述

一个 RocketMQ Connect 集群中会有多个 Connector ,每个 Connector 会对应一个或多个 Task,这些任务运行在 Worker(进程)中。Worker 进程是 Connector 和 Task 运行环境,它提供 RESTFull 能力,接收 HTTP 请求,将获取到的配置传递给 Connector 和Task,它还负责启动 Connector 和 Task,保存 Connector 配置信息,保存 Task 同步数据的位点信息,除此以外,Worker 还提供负载均衡能力,Connect 集群高可用、扩缩容、故障处理主要依赖 Worker 的负责均衡能力实现的。Worker 提供服务的流程如下:

在这里插入图片描述

Worker 提供的服务发现及负载均衡的实现原理如下:

服务发现:

在这里插入图片描述

用过 RocketMQ 的开发者应该知道,它的使用很简单,就是发送和接收消息。消费模式分为集群模式和广播模式两种,集群消费模式下一个 Topic 可以有多个 Consumer 消费消息,任意一个 Consumer 的上线或下线 RocketMQ 服务端都有感知,并且还可以将客户端上下线信息通知给其它节点,利用 RocketMQ 这个特性就实现了 Worker 的服务发现。

配置/Offset同步:

在这里插入图片描述

Connector 的配置 /Offset 信息同步通过每个 Worker 订阅相同的 Topic,不同 Worker 使用不同的 Consumer Group 实现的, Worker 节点可以通过这种方式消费到相同Topic的所有数据,即 Connector 配置 /Offset 信息,这类似于广播消费模式,这种数据同步模式可以保证任何一个 Worker 挂掉,该Worker上的任务依旧可以在存活的 Worker 正常拉起运行 ,并且可以获取到任务对应的 Offset 信息实现断点续传, 这是故障转移以及高可用能力的基础。

负载均衡:

RocketMQ 消费场景中,消费客户端 与Topic Queue 之间有负载均衡能力,Connector 在这一部分也是类似的,只不过它负载均衡的对象不一样,Connector 是 Worker 节点和 Task 之间的负载均衡,与 RocketMQ 客户端负载均衡一样,可以根据使用场景选择不同负载均衡算法。

在这里插入图片描述

上文提到过 RocketMQ Connect 提供 RESTFull API能力。通过 RESTFull AP 可以创建 Connector,管理 Connector 以及查看 Connector 状态,简单列举:

  • POST /connectors/{connector name}
  • GET /connectors/{connector name}/config
  • GET /connectors/{connector name}/status
  • POST /connectors/{connector name}/stop

目前 Connector 支持单机、集群两种部署模式。集群模式至少要有两个节点,才能保证它的高可用。并且集群可以动态增加或者减少,做到了动态控制提升集群性能和节省成本节省的能力。单机模式更多方便了开发者开发测试 Connector 。

如何如何实现一个 Connector 呢? 还是结合一个具体的场景看一看,例如业务数据当前是写入 MySQL 数据库中的,希望将 MySQL 中数据实时同步到数据湖 Hudi 当中。只要实现 MySQL Source Connector 、Hudi Sink Connector 这两个 Connector 即可。

在这里插入图片描述

下面就以 MySQLSource Connector 为例,来看一下具体的如何实现。

实现 Connector 最主要的就是实现两个 API 。第一个是 Connector API ,除了实现它生命周期相关的 API 外,还有任务如何分配,是通过 Topic、Table 还是通过数据库的维度去分。第二个API 是需要创建的 Task,Connector 通过任务分配将相关的配置信息传递给 Task, Task 拿到这些信息,例如数据库账号,密码,IP,端口后就会创建数据库连接,再通过 MySQL 提供的 BINLOG 机智获取到表的数据,将这些数据写到一个阻塞队列中。Task 有个 Poll 方法,实现 Connector 时只要调用到 Poll 方法时可以获取到数据即可,这样 Connector 就基本写完了。然后打包以 Jar 包的形式提供出来,将它加载到 Worker 的节点中。

在这里插入图片描述

创建 Connector 任务后, Worker 中会创建一个或者多个线程,不停的轮询 Poll 方法,从而获取到 MySQL 表中的数据,再通过 RocketMQ Producer 发送到 RocketMQ Broker中,这就是 Connector 从实现到运行的整体过程(见下图)。

在这里插入图片描述

04 RocketMQ Connect 现状与未来

RocketMQ Connect 的发展历程分为三个阶段。

第一阶段:Preview 阶段

RocketMQ Connect 发展的初期也即 Preview 阶段,实现了 Open Messaging Connect API 1.0 版本,基于该版本实现了 RocketMQ Connect Runtime ,同时提供了 10+ Connector 实现(MySQL,Redis,Kafka,Jms,MongoDB……)。在该阶段,RocketMQ Connect 可以简单实现端到端的数据源同步,但功能还不够完善,不支持数据转换,序列化等能力,生态相对还比较贫乏。

第二阶段:1.0 阶段

在 1.0 阶段,Open Messaging Connect API 进行了升级,支持 Schema、Transform,Converter 等能力,在此基础上对 Connect Runtime 也进行了重大升级,对数据转换,序列化做了支持,复杂Schema也做了完善的支持。该阶段的 API、Runtime 能力已经基本完善,在此基础上,还有30+ Connecotor 实现,覆盖了 CDC、JDBC、SFTP、NoSQL、缓存 Redis、HTTP、AMQP、JMS、数据湖、实时数仓、Replicator、等 Connector 实现,还做了 Kafka Connector Adaptor 可以运行 Kafka 生态的 Connector。

第三阶段:2.0 阶段

RocketMQ Connect 当前处于这个阶段,重点发展 Connector 生态,当 RocketMQ 的 Connector 生态达到 100 + 时,RocketMQ 基本上可以与任意的一个数据系统去做连接。

目前 RocketMQ 社区正在和 OceanBase 社区合作,进行 OceanBase 到 RocketMQ Connect 的研发工作,提供 JDBC 和 CDC 两种模式接入模式,后续会在社区中发布,欢迎感兴趣的同学试用。

05 总结

RocketMQ 是一个可靠的数据集成组件,具备分布式、伸缩性、故障容错等能力,可以实现 RocketMQ 与其他数据系统之间的数据流入与流出。通过 RocketMQ Connect 可以实现 CDC,构建数据湖,结合流计算可实现数据价值。

加入 Apache RocketMQ 社区

十年铸剑,Apache RocketMQ 的成长离不开全球 800+ 位开发者的积极参与贡献,相信在下个版本你就是 Apache RocketMQ 的贡献者,在社区不仅可以结识社区大牛,提升技术水平,也可以提升个人影响力,促进自身成长。

社区 5.x 版本正在进行着如火如荼的开发,以及 30 +个 SIG(兴趣小组)等你加入,欢迎立志打造世界级分布式系统的同学通过以下方式加入社区:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/413281.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux学习笔记——多线程

文章目录补充知识Linux线程概念线程的优点线程的缺点线程异常线程用途多进程的应用场景Linux进程VS线程重新理解进程进程和线程线程共享的进程资源和环境为什么线程切换的成本更低进程和线程的关系Linux线程控制POSIX线程库创建线程线程ID及进程地址空间布局线程终止线程等待线…

pycharm常用的插件

pycharm等IDE 无法编辑bat文件 Chinese Language Pack 汉化插件 vim CodeGlance 右侧地图 将类似于Sublime中的代码小地图嵌入到编辑器窗格中。使用自定义颜色进行语法高亮,同时使用明暗主题。 Rainbow Brackets(必备推荐)收费了 这个插件…

【数据库原理 • 六】数据库备份与恢复

前言 数据库技术是计算机科学技术中发展最快,应用最广的技术之一,它是专门研究如何科学的组织和存储数据,如何高效地获取和处理数据的技术。它已成为各行各业存储数据、管理信息、共享资源和决策支持的最先进,最常用的技术。 当前…

Baumer工业相机堡盟工业相机中预处理相机的特性优势以及行业应用

Baumer工业相机堡盟工业相机如何通过BGAPISDK里显示彩色相机和黑白相机的图像(C#)Baumer工业相机Baumer工业相机的预处理相机的技术背景Baumer工业相机中预处理相机的特性Baumer工业相机中图像压缩相机的特性Baumer工业相机中3D激光相机的特性Baumer工业…

ESP32设备驱动-MAX30102脉搏血氧饱和度和心率监测传感器驱动

MAX30102脉搏血氧饱和度和心率监测传感器驱动 文章目录 MAX30102脉搏血氧饱和度和心率监测传感器驱动1、MAX30102介绍2、硬件准备3、软件准备4、驱动实现1、MAX30102介绍 MAX30102是一款集成脉搏血氧饱和度和心率监测生物传感器模块。 它包括内部 LED、光电探测器、光学元件和…

网页滚动体验,IScroll滚动插件,你安装了类似的滚动页面插件吗

IScroll是一款基于JavaScript的插件,用于在网页中实现平滑滚动效果。 这个插件可以帮助用户创建回到页面顶部和底部的按钮、生成页面导航快照,以及设置滚动时间等功能,从而提升网页的用户体验。 IScroll的特点在于,它能够平滑地…

【15】数据操作reshape、张量的运算

1. N维数组 ① 机器学习用的最多的是N维数组,N维数组是机器学习和神经网络的主要数据结构。 2. 创建数组 ① 创建数组需要:形状、数据类型、元素值。 3. 访问元素 ① 可以根据切片,或者间隔步长访问元素。 ② [::3,::2]是每隔3行、2列访问…

考研数据结构--线性表

线性表 文章目录线性表概述线性表的特点线性表的基本操作线性表的顺序表示概述优缺点操作顺序表的定义顺序表的初始化顺序表的插入顺序表的删除顺序表的查找顺序表的输出顺序表的判空顺序表的销毁main方法测试线性表的链式表示概述优缺点单链表操作单链表的定义单链表的初始化单…

自动化面试题3

1、PLC输入输出如何接线,源性漏型如何看? 所谓“源型输入”,是指电流从模块的公共端流入,从模块的输入通道流出的接线方式。源型输入的公共端作为电源正极(共阳极)。源型输入可以等效为在输入模块外部连接…

C# | 上位机开发新手指南(九)加密算法——RSA

上位机开发新手指南(九)加密算法——RSA 文章目录上位机开发新手指南(九)加密算法——RSA前言RSA的特性非对称性安全性可逆性签名速度较慢密钥管理RSA算法的参数公钥公钥指数e模数n私钥私钥指数d模数n质数p和qdp和dqqInv质数填充方…

性能测试,python 内存分析工具 -memray

Memray是一个由彭博社开发的、开源内存剖析器;开源一个多月,已经收获了超8.4k的star,是名副其实的明星项目。今天我们就给大家来推荐这款python内存分析神器。 Memray可以跟踪python代码、本机扩展模块和python解释器本身中内存分配&#xf…

yolov5-v7.0实例分割快速体验

简介 🚀yolov5-v7.0版本正式发布,本次更新的v7.0则是全面的大版本升级,最主要的功能就是全面集成支持了实例分割,yolov5已经集成检测、分类、分割任务。 前面几篇文章已经介绍过关于Yolov5的一些方面 yolov5目标检测:https://bl…

[计算机图形学]几何:曲线和曲面(前瞻预习/复习回顾)

一、曲线 1.Bzier Curves—贝塞尔曲线 贝塞尔曲线也是一种显式的几何表示方法。贝塞尔曲线定义了一系列的控制点,致使确定满足这些控制点关系的唯一一条曲线:如上图定义的贝塞尔曲线满足 起始点为p0,结束点为p3,起始点的切线方向…

ABB机械臂和RobotStudio编程简介

ABB机械臂和RobotStudio编程简介机械臂ABB机械臂ABB示教器RobotStudio与编程简介RobotStudio简介与安装RobotStudio使用RAPID程序指令机械臂 一种能够进行编程并在自动控制下执行某些操作和 移动作业任务的机械装置。 —— 美国国家标准局(NSB) 一种用于移动各种材料、零部件、…

基于目标级联法的微网群多主体分布式优化调度(Matlab代码实现)

💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…

家政服务小程序实战开发教程017-我的页面未注册功能开发

顾客在家政服务小程序可以在线提交预约信息,预约成功后可以查看订单的进度。我们本篇就来实现一下我的页面的功能。 1 新建页面 进入编辑器,在组件页面区域点击号,创建我的页面 2 未登录页面开发 我的页面的逻辑是如果用户未注册&#…

【MyBatisPlus】一文带你快速上手MyBatisPlus

文章目录MyBatisPlus学习笔记前言1、MyBatisPlus概述2、快速体验3、CRUD接口3.1 Mapper层CRUD接口3.1.1 Insert3.1.2 Delete3.1.3 Update3.1.4 Select3.2 Service层CRUD接口3.2.1 Save3.2.2 Remove3.2.3 Update3.2.4 Get3.3 自定义SQL接口4、常用注解和配置4.1 TableId4.2 Tabl…

【C++】模板进阶(非类型模板参数、类模板的特化和模板的分离编译)

之前我们讲解过模板初阶,没有阅读过的童鞋可以先去阅读之前的博文----->模板初阶 本章我们将针对模板进行进一步的讲解。 目录 (一)非类型模板参数 (二)模板的特化 (1)概念 &#xff0…

我的面试八股(Java集合篇)

Java集合 两个抽象接口派生:一个是Collection接口,存放单一元素;一个是Map接口存放键值对。 Vector为什么是线程安全 简单,因为官方在可能涉及到线程不安全的操作都进行了synchronized操作,就自身源码就给你加了把锁。 Vector…

Stacking:解决机器学习进行多模型组合的实用工具

文章目录1 Stacking原理第一步:生成预测结果第二步:整合预测结果2 使用Python实现Stacking第一步:生成预测结果第二步:整合预测结果借助sklearn实现stacking3 各领域内的一些实际应用在机器学习领域,算法的选择和参数的…