CDC一键入湖:当 Apache Hudi DeltaStreamer 遇见 Serverless Spark

news2025/1/17 9:05:57

文章目录

  • 1. 整体架构
  • 2. 环境准备
  • 3. 配置全局变量
  • 4. 创建专属工作目录和存储桶
  • 5. 创建 EMR Serverless Execution Role
  • 6. 创建 EMR Serverless Application
  • 7. 提交 Apache Hudi DeltaStreamer CDC 作业
    • 7.1 准备作业描述文件
    • 7.2 提交作业
    • 7.3 监控作业
    • 7.4 错误检索
    • 7.5 停止作业
  • 8. 结果验证
  • 9. 评估与展望

Apache Hudi的DeltaStreamer是一种以近实时方式摄取数据并写入Hudi表的工具类,它简化了流式数据入湖并存储为Hudi表的操作,自 0.10.0 版开始,Hudi又在DeltaStreamer的基础上增加了基于Debezium的CDC数据处理能力,这使得其可以直接将Debezium采集的CDC数据落地成Hudi表,这一功能极大地简化了从源头业务数据库到Hudi数据湖的数据集成工作。本文发表于Apache Hudi公众号,本文地址:https://laurence.blog.csdn.net/article/details/132012360,转载请注明出处!

另一方面,得益于开箱即用和零运维的极致体验,越来越多的云上用户开始拥抱Serverless产品。Amazon云平台上的EMR是一个集成了多款主流大数据工具的计算平台,自6.6.0版本开始,EMR推出了 Serverless版本,开始提供无服务器的Spark运行环境,用户无需维护Hadoop/Spark集群,即可轻松提交Spark作业。

一个是“全配置”的Hudi工具类, 一个是“开箱即用”的Spark运行环境,两者结合在一起,无需编写CDC处理代码,无需构建Spark集群,仅通过一条命令,就可以轻松实现CDC数据入湖,这是一个非常吸引人的技术方案,本文我们就详细介绍一下这一方案的整体架构和实现细节。

1. 整体架构

Apache Huid在 0.10.0版引入的DeltaStreamer CDC是一整条CDC数据处理链路中的末端环节,为了能让大家清楚地理解DeltaStreamer在其中所处的位置和发挥的作用,我们有必要看一下完整架构:

请添加图片描述

①:MySQL是一个业务数据库,是CDC数据的源头;

②:系统使用一个CDC摄取工具实时读取MySQL的binlog,业界主流的CDC摄取工具有:Debezium,Maxwell,FlinkCDC等,在该架构中,选型的是安装了Debezium MySQL Connector的Kafka Connect;

③:现在越来越多的CDC数据摄取方案开始引入Schema Registry用于更好的控制上游业务系统的Schema变更,实现更可控的Schema Evolution。在开源社区,较为主流的产品是Confluent Schema Registry,且目前Hudi的DeltaStreamer也仅支持Confluent这一种Schema Registry,所以该架构选型的也是它。引入Schema Registry之后,Kafka Connect在捕获一条记录时,会先在其本地的Schema Cache中查找是否已经存在对应的Schema,如果有,则直接从本地Cache中获得Schema ID,如果没有,则会将其提交给Schema Registry,由Schema Registry完成该Schema的注册并将生成的Schema ID返回给Kafka Connect,Kafka Connect会基于Schema ID对原始的CDC数据进行封装(序列化):一是将Schema ID添加到消息中,二是如果使用Avro格式传递消息,Kafka Connect会去除Avro消息中的Schema部分,只保留Raw Data,因为Schema信息已缓存在Producer和Consumer本地或可通过Schema Registry一次性获得,没有必要伴随Raw Data传输,这样可以大大减小Avro消息的体积,提升传输效率。这些工作是通过Confluent提供的Avro Converter(io.confluent.connect.avro.AvroConverter)完成的;

④:Kafka Connect将封装好的Avro消息投递给Kafka

⑤:EMR Serverless为DeltaStreamer提供Serverless的Spark运行环境;

⑥:Hudi的DeltaStreamer作为一个Spark作业运行在EMR Serverless环境中,它从Kafka读取到Avro消息后,会使用Confluent提供的Avro反序列化器(io.confluent.kafka.serializers.KafkaAvroDeserializer)解析Avro消息,得到Schema ID和Raw Data,反序列化器同样会先在本地的Schema Cache中根据ID查找对应的Schema,如果找到就根据这个Schema将Raw Data反序列化,如果没有找到,就向Schema Registry请求获取该ID对应的Schema,然后再进行反序列化;

⑦:DeltaStreamer将解析出来的数据写入存放在S3上的Hudi表,如果数据表不存在,会自动创建表并同步到Hive MetaStore中

2. 环境准备

限于篇幅,本文不会介绍①、②、③、④环节的构建工作,读者可以参考以下文档自行构建一套完整的测试环境:

①MySQL:如果仅以测试为目的,建议使用Debezium提供的官方Docker镜像,构建操作可参考其官方文档(下文将给出的操作示例所处理的CDC数据就是自于该MySQL镜像中的inventory数据库);

②Kafka Connect:如果仅以测试为目的,建议使用Confluent提供的官方Docker镜像,构建操作可参考其官方文档,或者使用AWS上托管的Kafka Connct:Amazon MSK Connect。需要提醒的是:Kafka Connect上必须安装Debezium MySQL Connector和Confluent Avro Converter两个插件,因此需要在官方镜像的基础上手动添加这两个插件;

③Confluent Schema Registry:如果仅以测试为目的,建议使用Confluent提供的官方Docker镜像,构建操作可参考其官方文档;

④Kafka:如果仅以测试为目的,建议使用Confluent提供的官方Docker镜像,构建操作可参考其官方文档,或者使用AWS上托管的Kafka:Amazon MSK

完成上述工作后,我们会获得“Confluent Schema Registry”和“Kafka Bootstrap Servers”两项依赖服务的地址,它们是启动DeltaStreamer CDC作业的必要条件,后续会以参数形式传递给DeltaStreamer作业。

3. 配置全局变量

环境准备工作就绪后,就可以着手第⑤、⑥、⑦部分的工作了。本文所有操作全部通过命令完成,以shell脚本形式提供给读者使用,脚本上会标注实操步骤的序号,如果是二选一操作,会使用字母a/b加以标识,部分操作还有示例供读者参考。为了使脚本具有良好的可移植性,我们将与环境相关的依赖项和需要用户自定义的配置项抽离出来,以全局变量的形式集中配置,如果您在自己的环境中执行本文操作,只需修改下面的全局变量即可,不必修改具体命令:

变量说明设定时机
APP_NAME由用户为本应用设定的名称提前设定
APP_S3_HOME由用户为本应用设定的S3专属桶提前设定
APP_LOCAL_HOME由用户为本应用设定的本地工作目录提前设定
SCHEMA_REGISTRY_URL用户环境中的Confluent Schema Registry地址提前设定
KAFKA_BOOTSTRAP_SERVERS用户环境中的Kafka Bootstrap Servers地址提前设定
EMR_SERVERLESS_APP_SUBNET_ID将要创建的EMR Serverless Application所属子网ID提前设定
EMR_SERVERLESS_APP_SECURITY_GROUP_ID将要创建的EMR Serverless Application所属安全组ID提前设定
EMR_SERVERLESS_APP_ID将要创建的EMR Serverless Application的ID过程中产生
EMR_SERVERLESS_EXECUTION_ROLE_ARN将要创建的EMR Serverless Execution Role的ARN过程中产生
EMR_SERVERLESS_JOB_RUN_ID提交EMR Serverless作业后返回的作业ID过程中产生

接下来,我们将进入实操阶段,需要您拥有一个安装了AWS CLI并配置了用户凭证的Linux环境(建议使用Amazon Linux2),通过SSH登录后,先使用命令sudo yum -y install jq安装操作json文件的命令行工具:jq(后续脚本会使用到它),然后将以上全局变量悉数导出(请根据您的AWS账号和本地环境替换命令行中的相应值):

# 实操步骤(1)
export APP_NAME='change-to-your-app-name'
export APP_S3_HOME='change-to-your-app-s3-home'
export APP_LOCAL_HOME='change-to-your-app-local-home'
export SCHEMA_REGISTRY_URL='change-to-your-schema-registry-url'
export KAFKA_BOOTSTRAP_SERVERS='change-to-your-kafka-bootstrap-servers'
export EMR_SERVERLESS_APP_SUBNET_ID='change-to-your-subnet-id'
export EMR_SERVERLESS_APP_SECURITY_GROUP_ID='change-to-your-security-group-id'

以下是一份示例:

# 示例(非实操步骤)
export APP_NAME='apache-hudi-delta-streamer'
export APP_S3_HOME='s3://apache-hudi-delta-streamer'
export APP_LOCAL_HOME='/home/ec2-user/apache-hudi-delta-streamer'
export SCHEMA_REGISTRY_URL='http://localhost:8081'
export KAFKA_BOOTSTRAP_SERVERS='localhost:9092'
export EMR_SERVERLESS_APP_SUBNET_ID='subnet-0a11afe6dbb4df759'
export EMR_SERVERLESS_APP_SECURITY_GROUP_ID='sg-071f18562f41b5804'

至于 EMR_SERVERLESS_APP_IDEMR_SERVERLESS_EXECUTION_ROLE_ARNEMR_SERVERLESS_JOB_RUN_ID 三个变量将在后续的操作过程中产生并导出。

4. 创建专属工作目录和存储桶

作为一项最佳实践,我们先为应用程序(Job)创建一个专属的本地工作目录(即APP_LOCAL_HOME设定的路径)和一个S3存储桶(即APP_S3_HOME设定的桶),应用程序的脚本、配置文件、依赖包、日志以及产生的数据都统一存放在专属目录和存储桶中,这样会便于维护:

# 实操步骤(2)
mkdir -p $APP_LOCAL_HOME
aws s3 mb $APP_S3_HOME

5. 创建 EMR Serverless Execution Role

运行EMR Serverless作业需要配置一个IAM Role,这个Role将赋予EMR Serverless作业访问AWS相关资源的权限,我们的DeltaStreamer CDC作业应至少需要分配:

  • 对S3专属桶的读写权限
  • 对Glue Data Catalog的读写权限
  • 对Glue Schema Registry的读写权限

您可以根据EMR Serverless的官方文档手动创建这个Role,然后将其ARN作为变量导出(请根据您的AWS账号环境替换命令行中的相应值):

# 实操步骤(3/a)
export EMR_SERVERLESS_EXECUTION_ROLE_ARN='change-to-your-emr-serverless-execution-role-arn'

以下是一份示例:

# 示例(非实操步骤)
export EMR_SERVERLESS_EXECUTION_ROLE_ARN='arn:aws:iam::123456789000:role/EMR_SERVERLESS_ADMIN'

考虑到手动创建这个Role较为烦琐,本文提供如下一段脚本,可以在您的AWS账号中创建一个拥有管理员权限的Role:EMR_SERVERLESS_ADMIN,从而帮助您快速完成本节工作(注意:由于该Role具有最高权限,应谨慎使用,完成快速验证后,还是应该在生产环境中配置严格限定权限的专有Execution Role):

# 实操步骤(3/b)
EMR_SERVERLESS_EXECUTION_ROLE_NAME='EMR_SERVERLESS_ADMIN'
cat << EOF > $APP_LOCAL_HOME/assume-role-policy.json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "EMRServerlessTrustPolicy",
            "Effect": "Allow",
            "Principal": {
                "Service": "emr-serverless.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
EOF
jq . $APP_LOCAL_HOME/assume-role-policy.json
export EMR_SERVERLESS_EXECUTION_ROLE_ARN=$(aws iam create-role \
    --no-paginate --no-cli-pager --output text \
    --role-name "$EMR_SERVERLESS_EXECUTION_ROLE_NAME" \
    --assume-role-policy-document file://$APP_LOCAL_HOME/assume-role-policy.json \
    --query Role.Arn)
aws iam attach-role-policy \
    --policy-arn "arn:aws:iam::aws:policy/AdministratorAccess" \
    --role-name "$EMR_SERVERLESS_EXECUTION_ROLE_NAME"

6. 创建 EMR Serverless Application

向EMR Serverless提交作业前,需要先创建一个EMR Serverless Application,这是EMR Serverless中的一个概念,可以理解为一个虚拟的EMR集群。在创建Application时,需要指定EMR的版本,网络配置,集群规模,预热节点等信息。通常,我们仅需如下一条命令就可以完成创建工作:

# 示例(非实操步骤)
aws emr-serverless create-application \
    --name "$APP_NAME" \
    --type "SPARK" \
    --release-label "emr-6.11.0"

但是,这样创建出的Application是没有网络配置的,由于我们的DeltaStreamer CDC作业需要访问位于特定VPC中的Confluent Schema Registry和Kafka Bootstrap Servers,所以必须显式地为Application设定子网和安全组,以确保DeltaStreamer可以连通这两项服务。因此,我们需要使用以下命令创建一个带有特定网络配置的Application:

# 实操步骤(4)
cat << EOF > $APP_LOCAL_HOME/create-application.json
{
    "name":"$APP_NAME",
    "releaseLabel":"emr-6.11.0",
    "type":"SPARK",
    "networkConfiguration":{
        "subnetIds":[
            "$EMR_SERVERLESS_APP_SUBNET_ID"
        ],
        "securityGroupIds":[
            "$EMR_SERVERLESS_APP_SECURITY_GROUP_ID"
        ]
    }
}
EOF
jq . $APP_LOCAL_HOME/create-application.json
export EMR_SERVERLESS_APP_ID=$(aws emr-serverless create-application \
    --no-paginate --no-cli-pager --output text \
    --release-label "emr-6.11.0" --type "SPARK" \
    --cli-input-json file://$APP_LOCAL_HOME/create-application.json \
    --query "applicationId")

7. 提交 Apache Hudi DeltaStreamer CDC 作业

创建好Application就可以提交作业了,Apache Hudi DeltaStreamer CDC是一个较为复杂的作业,配置项非常多,这一点从Hudi官方博客给出的示例中可见一斑,我们要做的是:将使用spark-submit命令提交的作业“翻译”成EMR Serverless的作业。

7.1 准备作业描述文件

使用命令行提交EMR Serverless作业需要提供一个json格式的作业描述文件,通常在spark-submit命令行中配置的参数都会由这个文件来描述。由于DeltaStreamer作业的配置项非常多,限于篇幅,我们无法一一做出解释,您可以将下面的作业描述文件和Hudi官方博客提供的原生Spark作业做一下对比,然后就能相对容易地理解作业描述文件的作用了。

需要注意的是,在执行下面的脚本时,请根据您的AWS账号和本地环境替换脚本中所有的<your-xxx>部分,这些被替换的部分取决于您本地环境中的源头数据库、数据表,Kakfa Topic以及Schema Registry等信息,每换一张表都需要调整相应的值,所以没有被抽离到全局变量中。

此外,该作业其实并不依赖任何第三方Jar包,其使用的Confluent Avro Converter已经集成到了hudi-utilities-bundle.jar中,这里我们特意在配置中声明--conf spark.jars=$(...)(参考示例命令)是为了演示“如何加载三方类库”,供有需要的读者参考。

# 实操步骤(5)
cat << EOF > $APP_LOCAL_HOME/start-job-run.json
{
    "name":"apache-hudi-delta-streamer",
    "applicationId":"$EMR_SERVERLESS_APP_ID",
    "executionRoleArn":"$EMR_SERVERLESS_EXECUTION_ROLE_ARN",
    "jobDriver":{
        "sparkSubmit":{
        "entryPoint":"/usr/lib/hudi/hudi-utilities-bundle.jar",
        "entryPointArguments":[
            "--continuous",
            "--enable-sync",
            "--table-type", "COPY_ON_WRITE",
            "--op", "UPSERT",
            "--target-base-path", "<your-table-s3-path>",
            "--target-table", "orders",
            "--min-sync-interval-seconds", "60",
            "--source-class", "org.apache.hudi.utilities.sources.debezium.MysqlDebeziumSource",
            "--source-ordering-field", "_event_origin_ts_ms",
            "--payload-class", "org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload",
            "--hoodie-conf", "bootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS",
            "--hoodie-conf", "schema.registry.url=$SCHEMA_REGISTRY_URL",
            "--hoodie-conf", "hoodie.deltastreamer.schemaprovider.registry.url=${SCHEMA_REGISTRY_URL}/subjects/<your-registry-name>.<your-src-database>.<your-src-table>-value/versions/latest",
            "--hoodie-conf", "hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer",
            "--hoodie-conf", "hoodie.deltastreamer.source.kafka.topic=<your-kafka-topic-of-your-table-cdc-message>",
            "--hoodie-conf", "auto.offset.reset=earliest",
            "--hoodie-conf", "hoodie.datasource.write.recordkey.field=<your-table-recordkey-field>",
            "--hoodie-conf", "hoodie.datasource.write.partitionpath.field=<your-table-partitionpath-field>",
            "--hoodie-conf", "hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor",
            "--hoodie-conf", "hoodie.datasource.write.hive_style_partitioning=true",
            "--hoodie-conf", "hoodie.datasource.hive_sync.database=<your-sync-database>",
            "--hoodie-conf", "hoodie.datasource.hive_sync.table==<your-sync-table>",
            "--hoodie-conf", "hoodie.datasource.hive_sync.partition_fields=<your-table-partition-fields>"
        ],
         "sparkSubmitParameters":"--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.jars=<your-app-dependent-jars>"
        }
   },
   "configurationOverrides":{
        "monitoringConfiguration":{
            "s3MonitoringConfiguration":{
                "logUri":"<your-s3-location-for-emr-logs>"
            }
        }
   }
}
EOF
jq . $APP_LOCAL_HOME/start-job-run.json

以下是一份示例:

# 示例(非实操步骤)
cat << EOF > $APP_LOCAL_HOME/start-job-run.json
{
    "name":"apache-hudi-delta-streamer",
    "applicationId":"$EMR_SERVERLESS_APP_ID",
    "executionRoleArn":"$EMR_SERVERLESS_EXECUTION_ROLE_ARN",
    "jobDriver":{
        "sparkSubmit":{
        "entryPoint":"/usr/lib/hudi/hudi-utilities-bundle.jar",
        "entryPointArguments":[
            "--continuous",
            "--enable-sync",
            "--table-type", "COPY_ON_WRITE",
            "--op", "UPSERT",
            "--target-base-path", "$APP_S3_HOME/data/mysql-server-3/inventory/orders",
            "--target-table", "orders",
            "--min-sync-interval-seconds", "60",
            "--source-class", "org.apache.hudi.utilities.sources.debezium.MysqlDebeziumSource",
            "--source-ordering-field", "_event_origin_ts_ms",
            "--payload-class", "org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload",
            "--hoodie-conf", "bootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS",
            "--hoodie-conf", "schema.registry.url=$SCHEMA_REGISTRY_URL",
            "--hoodie-conf", "hoodie.deltastreamer.schemaprovider.registry.url=${SCHEMA_REGISTRY_URL}/subjects/osci.mysql-server-3.inventory.orders-value/versions/latest",
            "--hoodie-conf", "hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer",
            "--hoodie-conf", "hoodie.deltastreamer.source.kafka.topic=osci.mysql-server-3.inventory.orders",
            "--hoodie-conf", "auto.offset.reset=earliest",
            "--hoodie-conf", "hoodie.datasource.write.recordkey.field=order_number",
            "--hoodie-conf", "hoodie.datasource.write.partitionpath.field=order_date",
            "--hoodie-conf", "hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor",
            "--hoodie-conf", "hoodie.datasource.write.hive_style_partitioning=true",
            "--hoodie-conf", "hoodie.datasource.hive_sync.database=inventory",
            "--hoodie-conf", "hoodie.datasource.hive_sync.table=orders",
            "--hoodie-conf", "hoodie.datasource.hive_sync.partition_fields=order_date"
        ],
         "sparkSubmitParameters":"--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.jars=$(aws s3 ls $APP_S3_HOME/jars/ | grep -o '\S*\.jar$'| awk '{print "'"$APP_S3_HOME/jars/"'"$1","}' | tr -d '\n' | sed 's/,$//')"
        }
   },
   "configurationOverrides":{
        "monitoringConfiguration":{
            "s3MonitoringConfiguration":{
                "logUri":"$APP_S3_HOME/logs"
            }
        }
   }
}
EOF
jq . $APP_LOCAL_HOME/start-job-run.json

7.2 提交作业

准备好作业描述文件后,就可以正式提交作业了,命令如下:

# 实操步骤(6)
export EMR_SERVERLESS_JOB_RUN_ID=$(aws emr-serverless start-job-run \
    --no-paginate --no-cli-pager --output text \
    --name apache-hudi-delta-streamer \
    --application-id $EMR_SERVERLESS_APP_ID \
    --execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \
    --execution-timeout-minutes 0 \
    --cli-input-json file://$APP_LOCAL_HOME/start-job-run.json \
    --query jobRunId)

7.3 监控作业

作业提交后,可以在控制台查看作业运行状态。如果想在命令行窗口持续监控作业,可以使用如下脚本:

# 实操步骤(7)
now=$(date +%s)sec
while true; do
    jobStatus=$(aws emr-serverless get-job-run \
                    --no-paginate --no-cli-pager --output text \
                    --application-id $EMR_SERVERLESS_APP_ID \
                    --job-run-id $EMR_SERVERLESS_JOB_RUN_ID \
                    --query jobRun.state)
    if [ "$jobStatus" = "PENDING" ] || [ "$jobStatus" = "SCHEDULED" ] || [ "$jobStatus" = "RUNNING" ]; then
        for i in {0..5}; do
            echo -ne "\E[33;5m>>> The job [ $EMR_SERVERLESS_JOB_RUN_ID ] state is [ $jobStatus ], duration [ $(date -u --date now-$now +%H:%M:%S) ] ....\r\E[0m"
            sleep 1
        done
    else
        echo -ne "The job [ $EMR_SERVERLESS_JOB_RUN_ID ] is [ $jobStatus ]\n\n"
        break
    fi
done

7.4 错误检索

作业开始运行后,Spark Driver和Executor会持续生成日志,这些日志存放在配置的$APP_S3_HOME/logs路径下,如果作业失败,可以使用下面的脚本快速检索到错误信息:

# 实操步骤(8)
JOB_LOG_HOME=$APP_LOCAL_HOME/log/$EMR_SERVERLESS_JOB_RUN_ID
rm -rf $JOB_LOG_HOME && mkdir -p $JOB_LOG_HOME
aws s3 cp --recursive $APP_S3_HOME/logs/applications/$EMR_SERVERLESS_APP_ID/jobs/$EMR_SERVERLESS_JOB_RUN_ID/ $JOB_LOG_HOME >& /dev/null
gzip -d -r -f $JOB_LOG_HOME >& /dev/null
grep --color=always -r -i -E 'error|failed|exception' $JOB_LOG_HOME

7.5 停止作业

DeltaStreamer是一个持续运行的作业,如果需要停止作业,可以使用如下命令:

# 实操步骤(9)
aws emr-serverless cancel-job-run \
    --no-paginate --no-cli-pager\
    --application-id $EMR_SERVERLESS_APP_ID \
    --job-run-id $EMR_SERVERLESS_JOB_RUN_ID

8. 结果验证

作业启动后会自动创建一个数据表,并在指定的S3位置上写入数据,使用如下命令可以查看自动创建的数据表和落地的数据文件:

# 实操步骤(10)
aws s3 ls --recursive <your-table-s3-path>
aws glue get-table --no-paginate --no-cli-pager \
    --database-name <your-sync-database> --name <your-sync-table>
# 示例(非实操步骤)
aws s3 ls --recursive $APP_S3_HOME/data/mysql-server-3/inventory/orders/
aws glue get-table --no-paginate --no-cli-pager \
    --database-name inventory --name orders

9. 评估与展望

本文,我们详细介绍了如何在EMR Serverless上运行Apapche Hudi DeltaStreamer将CDC数据接入到Hudi表中,这是一个主打“零编码”,“零运维”的超轻量解决方案。但是,它的局限性也很明显,那就是:一个DeltaStreamer作业只能接入一张表,这对于动辄就需要接入数百张甚至数千张表的数据湖来说是不实用的,尽管Hudi也提供了用于多表接入的MultiTableDeltaStreamer,但是这个工具类目前的成熟度和完备性还不足以应用于生产。此外,Hudi自0.10.0起针对Kafka Connect提供了Hudi Sink插件(目前也是仅支持单表),为CDC数据接入Hudi数据湖开辟了新的途径,这是值得持续关注的新亮点。

从长远来看,CDC数据入湖并落地为Hudi表是一个非常普遍的需求,迭代并完善包括DeltaStreamer、HoodieMultiTableDeltaStreamer和Kafka Connect Hudi Sink插件在内的多种原生组件在社区的呼声将会越来越强烈,相信伴随着Hudi的蓬勃发展,这些组件将不断成熟起来,并逐步应用到生产环境中。


关于作者:耿立超,架构师,著有 《大数据平台架构与原型实现:数据中台建设实战》一书,多年IT系统开发和架构经验,对大数据、企业级应用架构、SaaS、分布式存储和领域驱动设计有丰富的实践经验,个人技术博客:https://laurence.blog.csdn.net

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/814236.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

仿转转闲鱼链接后台生成

教程&#xff1a;修改数据库账号密码直接使用。 源码带有教程! 下载程序&#xff1a;https://pan.baidu.com/s/16lN3gvRIZm7pqhvVMYYecQ?pwd6zw3

TIA博途中通过UDT实现IO地址映射到DB块中的具体方法

TIA博途中通过UDT实现IO地址映射到DB块中的具体方法 如下图所示,打开TIA博途,新建一个项目,添加一个PLC UDT数据类型Iomap,数据类型为Array[0…49] of Byte, 如下图所示,再添加一个全局DB块,在DB块中添加一个变量map,数据类型为Iomap, 如下图所示,在PLC变量表中添加一…

iOS开发-NotificationServiceExtension实现实时音视频呼叫通知响铃与震动

iOS开发-NotificationServiceExtension实现实时音视频呼叫通知响铃与震动 在之前的开发中&#xff0c;遇到了实时音视频呼叫通知&#xff0c;当App未打开或者App在后台时候&#xff0c;需要通知到用户&#xff0c;用户点击通知栏后是否接入实时音视频的视频或者音频通话。 在…

【微服务】springboot整合mongodb使用详解

目录 一、mongodb简介 1.1 什么是mongodb 1.2 mongodb特点 二、mongodb中的核心术语 2.1 mogodb与数据库对比 2.2 mongodb中的核心概念 2.3 与关系数据库的差异 2.3.1 半结构化 2.3.2 支持多级嵌套 2.3.3 关系弱化 三、mongodb 技术优势和应用场景 3.1 mongodb 技术…

redis主从复制哨兵Cluster

目录 前言 一、模式介绍 1.1 主从复制 1.2 哨兵 1.3 集群 二、主从复制 2.1 主从复制的作用 2.2 主从复制流程 2.3 搭建Redis 主从复制 三、Redis 哨兵模式 3.1 哨兵模式原理 3.2 哨兵模式的作用 3.3 哨兵组成结构 3.4 哨兵故障转移机制 3.5 搭建Redis 哨兵模式…

利用频谱仪进行简单的2.4G 频率测试

一、概述 1. 信号源 我们开发2.4G 无线产品的时候&#xff0c;经常需要对产品的无线信号进行测试&#xff0c;以确定精确的频率。在进行频率测试之前&#xff0c;我们的2.4G 射频芯片需要进入单载波模式。 2. 频谱仪 这里选择的是普源的频谱仪。测试范围是 9kHz - 3.2GHz。…

【工具篇】Lombok 介绍及使用(详细教程)

Lombok 介绍及使用 一&#xff0c;Lombok介绍 在 Java 开发中&#xff0c;常常需要编写大量的getter、setter方法、equals和hashCode方法、构造函数等重复且繁琐的代码。 为了减少 Java 代码中的冗余和样板代码&#xff0c;提高代码的可读性和开发效率&#xff0c;就有了Lomb…

数据结构: 线性表(无哨兵位单链表实现)

文章目录 1. 线性表的链式表示: 链表1.1 顺序表的优缺点1.2 链表的概念1.3 链表的优缺点1.4 链表的结构 2. 单链表的定义2.1 单链表的结构体2.2 接口函数 3. 接口函数的实现3.1 动态申请一个结点 (BuySListNode)3.2 单链表打印 (SListPrint)3.3 单链表尾插 (SListPushBack)3.4 …

【雕爷学编程】MicroPython动手做(20)——掌控板之三轴加速度6

知识点&#xff1a;什么是掌控板&#xff1f; 掌控板是一块普及STEAM创客教育、人工智能教育、机器人编程教育的开源智能硬件。它集成ESP-32高性能双核芯片&#xff0c;支持WiFi和蓝牙双模通信&#xff0c;可作为物联网节点&#xff0c;实现物联网应用。同时掌控板上集成了OLED…

【LangChain】向量存储(Vector stores)

概要 存储和搜索非结构化数据的最常见方法之一是嵌入它并存储生成的嵌入向量&#xff0c;然后在查询时嵌入非结构化查询并检索与嵌入查询“最相似”的嵌入向量。向量存储负责存储嵌入数据并为您执行向量搜索。 内容 本篇讲述与 VectorStore 相关的基本功能。使用向量存储的一…

【Git】远程仓库的创建、SSH协议克隆、拉取、推送

目录 一、创建远程仓库 二、HTTPS协议克隆仓库 三、SSH协议克隆仓库 四、向远程仓库推送 五、从远程仓库拉取 六、忽略特殊文件 七、配置命令别名 一、创建远程仓库 首先我们可以从GitHub或者Gitee中创建自己的个人仓库 工作台 - Gitee.comhttps://gitee.com/ 二、HTT…

Sestra 实用教程(三)输入与输出

目 录 一、前言二、分析流程三、输入文件3.1 模型与荷载3.2 分析控制数据 四、输出文件五、参考文献 一、前言 SESAM &#xff08;Super Element Structure Analysis Module&#xff09;是由挪威船级社&#xff08;DNV-GL&#xff09;开发的一款有限元分析&#xff08;FEA&…

关于在VS2017中编译Qt项目遇到的问题

关于在VS2017中编译Qt项目遇到的问题 【QT】VS打开QT项目运行不成功 error MSB6006 “cmd.exe”已退出,代码为 2。如何在VS2017里部署的Qt Designer上编辑槽函数 【QT】VS打开QT项目运行不成功 error MSB6006 “cmd.exe”已退出,代码为 2。 链接 如何在VS2017里部署的Qt Design…

【LeetCode】解码方法

这里写目录标题 题目描述算法流程编程代码代码优化 链接: 解码方法 题目描述 算法流程 编程代码 class Solution { public:int numDecodings(string s) {int n s.size();vector<int> dp(n);dp[0] s[0] ! 0;if(n 1) return dp[0];if(s[1] < 9 && s[1] >…

python之编写form表单提交到后端

一、环境配置 我们先去python的框架中下载Flask&#xff0c;具体的配置我给大家找了一篇博客讲解&#xff0c;环境调试没问题后&#xff0c;开始我们form表单提交的过程 Python之flask框架_python flask_【网络星空】的博客-CSDN博客 二、前端代码 在VScode里编写前端的代码为…

移动端加入购物车界面设计

效果图 源码如下 页面设计 <template><div class"container"><!--商品详情 start--><van-image class"goods-item-image" :src"goods.goodsHeadImg"></van-image><div class"goods-price">&…

【安装vue脚手架报错:npm install -g @vue-cli pm ERR! code EINVALIDTAGNAME 】

当我们执行npm install -g vue-cli时候会报错&#xff1a; npm ERR! Invalid tag name “vue-cli” of package “vue-cli”: Tags may not have any characters that encodeURIComponent encodes. npm ERR! A complete log of this run can be found in: /Users/wuwenlu/.npm/…

【MySQL】MySQL索引、事务、用户管理

20岁的男生穷困潦倒&#xff0c;20岁的女生风华正茂&#xff0c;没有人会一直风华正茂&#xff0c;也没有人会一直穷困潦倒… 文章目录 一、MySQL索引特性&#xff08;重点&#xff09;1.磁盘、OS、MySQL&#xff0c;在进行数据IO时三者的关系2.索引的理解3.聚簇索引&#xff0…

仿找靓机链接生成 独立后台管理

教程&#xff1a;修改数据库账号密码直接使用。 源码带有教程! 下载程序&#xff1a;https://pan.baidu.com/s/16lN3gvRIZm7pqhvVMYYecQ?pwd6zw3

软考A计划-系统集成项目管理工程师-项目采购管理-下

点击跳转专栏>Unity3D特效百例点击跳转专栏>案例项目实战源码点击跳转专栏>游戏脚本-辅助自动化点击跳转专栏>Android控件全解手册点击跳转专栏>Scratch编程案例点击跳转>软考全系列点击跳转>蓝桥系列 &#x1f449;关于作者 专注于Android/Unity和各种游…