第三期【Demo教程】教你使用SeaTunnel把数据从MySQL导到Hive

news2024/12/25 9:12:46

随着数据技术的快速发展,了解并掌握各种工具和技术变得尤为重要。为此,我们准备在Apache SeaTunnel社区发起如何使用连接器的Demo演示计划,邀请所有热爱数据同步技术的同学分享他们的知识和实操经验!

file

我们第三期主题是:如何使用SeaTunnel连接器从MySQL同步到Hive,如果您对此计划感兴趣,也欢迎联系社区运营同学参与Demo录制!无论您是数据工程师、开发者还是技术爱好者,都欢迎您参与并展示您的技术才能。

敲重点敲重点如果你是用户,想看什么同步场景的Demo!请下滑到最底部留言,我们优先出品呼声最高的同步场景Demo!

Demo计划目标

我们的目标是创建一个共享和学习的平台,通过具体的Demo演示和对应的文档帮助社区成员更好地理解和应用各种数据连接器。这些Demo可以帮助新手快速学习,同时也为资深专家提供一个展示创新解决方案的舞台。

src="//player.bilibili.com/player.html?isOutside=true&aid=1855834722&bvid=BV1ks421T7ZV&cid=1586444145&p=1" scrolling="no" border="0" frameborder="no" framespacing="0" allowfullscreen="true">

如何使用 SeaTunnel进行高效数据同步

关于从MySQL同步到Hive,前段时间也有用户投稿,感兴趣的同学可以搜索看看:

【最佳实践】2个步骤教你从Mysql同步到Hive

如何使用 SeaTunnel 同步 MySQL 数据到 Hive

Mysql Source连接器相关请参考之前的教程: 全方位解读SeaTunnel MySQL CDC连接器:实现数据高效同步的强大工具

需要参考的文档及代码原文链接:https://seatunnel.apache.org/docs/2.3.5/connector-v2/sink/Hive (预计2.3.6版本才能正式使用)

描述

将数据写入到 Hive。

要使用此连接器,您必须确保您的 Spark/Flink 集群已经集成了 Hive。

如果您使用 SeaTunnel Zeta Engine,则需要将 seatunnel-hadoop3-3.1.4-uber.jarhive-exec-3.1.3.jarlibfb303-0.9.3.jar 放置在 $SEATUNNEL_HOME/lib/ 目录下。 :::

关键特性

  • 精确一次

默认情况下,我们使用两阶段提交(2PC)来确保 精确一次

  • 文件格式
    • text
    • csv
    • parquet
    • orc
    • json
  • 压缩编码
    • lzo

选项

名称类型必需默认值
table_namestring-
metastore_uristring-
compress_codecstringnone
hdfs_site_pathstring-
hive_site_pathstring-
hive.hadoop.confMap-
hive.hadoop.conf-pathstring-
krb5_pathstring/etc/krb5.conf
kerberos_principalstring-
kerberos_keytab_pathstring-
abort_drop_partition_metadatabooleantrue
common-options-

table_name [string]

目标 Hive 表的名称,例如:db1.table1。如果源是多模式的,您可以使用 ${database_name}.${table_name} 来生成表名,它会用源中生成的 CatalogTable 的值替换 ${database_name}${table_name}

metastore_uri [string]

Hive Metastore 的 URI。

hdfs_site_path [string]

hdfs-site.xml 的路径,用于加载 namenodes 的高可用配置。

hive_site_path [string]

hive-site.xml 的路径。

hive.hadoop.conf [map]

Hadoop 配置文件中的属性(core-site.xmlhdfs-site.xmlhive-site.xml)。

hive.hadoop.conf-path [string]

core-site.xmlhdfs-site.xmlhive-site.xml 文件的指定加载路径。

krb5_path [string]

krb5.conf 的路径,用于 Kerberos 认证。

kerberos_principal [string]

Kerberos 的 principal。

kerberos_keytab_path [string]

Kerberos 的 keytab 路径。

abort_drop_partition_metadata [list]

决定在中止操作期间是否从 Hive Metastore 中删除分区元数据的标志。

注意:这仅影响 metastore 中的元数据,同步过程中生成的数据将始终被删除。

common options

Sink 插件的常用参数,请参阅 Sink Common Options 获取详细信息。

示例

Hive {
  table_name = "default.seatunnel_orc"
  metastore_uri = "thrift://namenode001:9083"
}

示例 1

我们有一个源表,如下所示:

create table test_hive_source(
  test_tinyint TINYINT,
  test_smallint SMALLINT,
  test_int INT,
  test_bigint BIGINT,
  test_boolean BOOLEAN,
  test_float FLOAT,
  test_double DOUBLE,
  test_string STRING,
  test_binary BINARY,
  test_timestamp TIMESTAMP,
  test_decimal DECIMAL(8,2),
  test_char CHAR(64),
  test_varchar VARCHAR(64),
  test_date DATE,
  test_array ARRAY<INT>,
  test_map MAP<STRING, FLOAT>,
  test_struct STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
)
PARTITIONED BY (test_par1 STRING, test_par2 STRING);

我们需要从源表读取数据并写入到另一个表中:

create table test_hive_sink_text_simple(
  test_tinyint TINYINT,
  test_smallint SMALLINT,
  test_int INT,
  test_bigint BIGINT,
  test_boolean BOOLEAN,
  test_float FLOAT,
  test_double DOUBLE,
  test_string STRING,
  test_binary BINARY,
  test_timestamp TIMESTAMP,
  test_decimal DECIMAL(8,2),
  test_char CHAR(64),
  test_varchar VARCHAR(64),
  test_date DATE
)
PARTITIONED BY (test_par1 STRING, test_par2 STRING);

作业配置文件如下:

env {
  parallelism = 3
  job.name="test_hive_source_to_hive"
}

source {
  Hive {
    table_name = "test_hive.test_hive_source"
    metastore_uri = "thrift://ctyun7:9083"
  }
}

sink {
  Hive {
    table_name = "test_hive.test_hive_sink_text_simple"
    metastore_uri = "thrift://ctyun7:9083"
    hive.hadoop.conf = {
      bucket = "s3a://mybucket"
    }
  }
}

Hive on S3

1、为 EMR 的 Hive 创建 lib 目录

mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib

2、从 Maven 中心获取 jar 到 lib 目录

cd ${SEATUNNEL_HOME}/plugins/Hive/lib
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.5/hadoop-aws-2.6.5.jar
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar

3、从 EMR 环境中复制 jar 到 lib 目录

cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.60.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/hadoop-common-3.3.6-amzn-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/javax.inject-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/aopalliance-1.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib

4、运行测试用例

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    schema = {
      fields {
        pk_id = bigint
        name = string
        score = int
      }
      primaryKey {
        name = "pk_id"
        columnNames = [pk_id]
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [1, "A", 100]
      },
      {
        kind = INSERT
        fields = [2, "B", 100]
      },
      {
        kind = INSERT
        fields = [3, "C", 100]
      }
    ]
  }
}

sink {
  Hive {
    table_name = "test_hive.test_hive_sink_on_s3"
    metastore_uri = "thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083"
    hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
    hive.hadoop.conf = {
       bucket="s3://ws-package"
    }
  }
}

Hive on OSS

1、为 EMR 的 Hive 创建 lib 目录

mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib

2、从 Maven 中心获取 jar 到 lib 目录

cd ${SEATUNNEL_HOME}/plugins/Hive/lib
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar

3、从 EMR 环境中复制 jar 到 lib 目录并删除冲突的 jar

cp -r /opt/apps/JINDOSDK/jindosdk-current/lib/jindo-*.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
rm -f ${SEATUNNEL_HOME}/lib/hadoop-aliyun-*.jar

4、运行测试用例

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    schema = {
      fields {
        pk_id = bigint
        name = string
        score = int
      }
      primaryKey {
        name = "pk_id"
        columnNames = [pk_id]
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [1, "A", 100]
      },
      {
        kind = INSERT
        fields = [2, "B", 100]
      },
      {
        kind = INSERT
        fields = [3, "C", 100]
      }
    ]
  }
}

sink {
  Hive {
    table_name = "test_hive.test_hive_sink_on_oss"
    metastore_uri = "thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083"
    hive.hadoop.conf-path = "/tmp/hadoop"
    hive.hadoop.conf = {
        bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com"
    }
  }
}

示例 2

我们有多个源表,如下所示:

create table test_1(
)
PARTITIONED BY (xx);

create table test_2(
)
PARTITIONED BY (xx);
...

我们需要从这些源表读取数据并写入到其他表中:

作业配置文件如下:

env {
  # 在这里设置 Flink 配置
  parallelism = 3
  job.name="test_hive_source_to_hive"
}

source {
  Hive {
    tables_configs = [
      {
        table_name = "test_hive.test_1"
        metastore_uri = "thrift://ctyun6:9083"
      },
      {
        table_name = "test_hive.test_2"
        metastore_uri = "thrift://ctyun7:9083"
      }
    ]
  }
}

sink {
  Hive {
    table_name = "${database_name}.${table_name}"
    metastore_uri = "thrift://ctyun7:9083"
  }
}

通过视频教程,我们探讨了如何使用 Apache SeaTunnel 的 Hive Sink Connector 将数据高效地写入 Hive 表。

无论是在本地环境还是云上部署,使用 Hive Sink Connector 都能够帮助企业构建高效、可靠的数据处理流程。希望通过本文的指导,您能更好地理解和应用这一强大的工具,以满足您的数据处理需求。

如果您对本文内容有任何疑问或建议,欢迎在评论区分享您的想法。让我们共同探讨和进步,不断推动数据技术的边界。

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1843131.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python和OpenCV图像分块之图像边长缩小比率是2

import cv2 import numpy as npimg cv2.imread("F:\\mytupian\\xihuduanqiao.jpg") # 低反光 cv2.imshow(image, img) # # 图像分块 # dst np.zeros(img.shape, img.dtype) ratio 2 #图像边长缩小比率是2&#xff0c;也就是一张图片被分割成四份 height, wi…

Python学习笔记6:pychram相关知识及安装教程,后续需要学习的入门知识

上篇文章说了&#xff0c;今天去公司重新装一下IDE&#xff0c;最后也是把过程这边再记录一下&#xff0c;有需要的可以参考一下。 关于pychram pychram是什么&#xff1f; PyCharm是由JetBrains公司开发的一款流行的Python集成开发环境&#xff08;IDE&#xff09;。它专为…

[创业之路-120] :全程图解:软件研发人员如何从企业的顶层看软件产品研发?

目录 一、企业全局 二、供应链 三、团队管理 四、研发流程IPD 五、软件开发流程 六、项目管理 七、研发管理者的自我修炼 一、企业全局 二、供应链 三、团队管理 四、研发流程IPD 五、软件开发流程 六、项目管理 七、研发管理者的自我修炼

RabbitMQ的部署

一、前言 演示的为RabbitMQ的单机部署&#xff0c;在Centos7虚拟机中使用Docker来安装&#xff0c;需要掌握相应的docker命令 二、下载镜像 启动Docker: systemctl start docker 在线拉取&#xff1a;docker pull docker pull rabbitmq:3-management 三、安装MQ 运行容器&…

华媒舍:8个让你东南亚媒体发稿事半功倍的方法

本文将为您介绍8个方法&#xff0c;可以帮助您在东南亚地区的媒体发稿过程中事半功倍。无论您是一名公关人员、市场营销专家还是普通的新闻工作者&#xff0c;这些方法都将对您极具帮助。 1. 了解目标受众 在东南亚地区发布媒体稿件时&#xff0c;首要的步骤是了解目标受众。不…

Matlab数学建模实战应用:案例3 - 投资组合优化

目录 前言 一、问题分析 二、模型建立 三、Matlab代码实现 完整代码示例 四、模型验证 五、模型应用 实例示范&#xff1a;投资组合优化 步骤 1&#xff1a;导入数据并计算统计量 步骤 2&#xff1a;建立优化模型并求解 步骤 3&#xff1a;绘制有效前沿&#xff08;…

汇聚荣做拼多多口碑怎么样?

汇聚荣做拼多多口碑怎么样?汇聚荣作为拼多多平台上的一个商家&#xff0c;其口碑的好坏直接关联到消费者的购买决策和品牌信誉。在电商平台上&#xff0c;良好的口碑是吸引顾客的重要因素之一&#xff0c;尤其是对于竞争激烈的拼多多平台而言。那么&#xff0c;汇聚荣在拼多多…

iOS APP内存泄漏的问题

iOS APP内存泄漏是指应用程序不再使用内存&#xff0c;但内存却没有被释放&#xff0c;导致应用程序占用过多的内存&#xff0c;甚至崩溃。内存泄漏是iOS开发中常见的问题&#xff0c;会严重影响应用程序的性能和稳定性。北京木奇移动技术有限公司&#xff0c;专业的软件外包开…

《车载以太网通信测试》课程来袭!!!

本课程包含教程和脚本两部分内容。 教程 详细介绍以太网&#xff0c;如何理解TCP/IP协议&#xff0c;CAPL中涉及以太网的代码&#xff0c;以太网测试环境如何搭建&#xff0c;从物理层、链路层、网络层、传输层到应用层多种协议测试点的测试原理和测试方法介绍&#xff0c;中…

【PL理论】(34) 类型系统:不完备性 | 为什么推导树推导失败? | 实现类型系统 | 调整到类型系统 | 思考:强制程序员写类型还是自动推断类型?

&#x1f4ac; 写在前面&#xff1a;回顾我们的目标是为 F- 语言设计一个完备但不完全的类型系统&#xff0c;本章我们探讨的主题是类型系统的完备性。 目录 0x00 类型系统的不完备性 0x01 为什么推导树推导失败&#xff1f; 0x02 实现类型系统 0x03 调整到类型系统 0x04…

Java面试八股之什么是mybatis流式查询

什么是mybatis流式查询 Mybatis流式查询是一种处理大量数据的有效方法&#xff0c;它允许你以低内存消耗的方式来处理查询结果。传统的查询操作会一次性将所有数据加载到内存中&#xff0c;如果数据量非常大&#xff0c;可能会导致OutOfMemoryError&#xff08;OOM&#xff09…

js语法---weakMap和weakSet:弱映射和弱集合

weakMap weakMap是Map的一种&#xff0c;但它有更多的限制&#xff0c; 1. WeakMap 和 Map 的第一个不同点就是&#xff0c;WeakMap 的键必须是对象&#xff0c;不能是原始值(number,string,symbol...) 2. WeakMap 不支持迭代以及 keys()&#xff0c;values() 和 entries() …

JavaScript的学习之旅(6.20)

目录 一、认识三个常见的js代码 二、js写入的第二种方式 三、js里内外部文件 一、认识三个常见的js代码 <script>//写入js位置的第一个地方// 控制浏览器弹出一个警告框alert("这是一个警告");// 在计算机页面输入一个内容&#xff08;写入body中&#xff…

力扣SQL50 至少有5名直接下属的经理 子查询 join 虚拟表

Problem: 570. 至少有5名直接下属的经理 &#x1f468;‍&#x1f3eb; 参考题解 &#x1f37b;子查询 select name from Employee where id in(select managerId from Employee group by managerId having managerId ! null and count(*) > 5);&#x1f37b; join 虚拟…

力扣SQL50 有趣的电影 简单查询

Problem: 620. 有趣的电影 Code select * from cinema where id % 2 1 and description ! boring order by rating desc;

【云岚到家】-day04-1-数据同步方案-Canal-MQ

【云岚到家】-day04-1-数据同步方案-Canal-MQ 1 服务搜索1.1 服务搜索技术方案1.1.1 需求分析1.1.2 技术方案1.1.2.1 使用Elasticsearch进行全文检索1.1.2.2 索引同步方案 1.1.3 CanalMQ1.1.3.1 MySQL主从数据同步1.1.3.2 Canal工作流程1.1.3.3 具体实现方案 1.2 MQ技术方案1.2…

conda创建虚拟环境报错解决

1.报错截图 2.解决办法 查看当前所有虚拟环境 conda env list 解决办法 解决方法 bash conda config --add channels conda-forge conda config --set channel_priority strict conda config --set channel_priority flexible

Java文件/文件夹的新增/删除/递归遍历

获取File对象 这里的字符串可以乱写&#xff0c;但是如果不存在后续的操作也会失败 // 获取抽象的File对象&#xff08;文件或者目录&#xff0c;不一定真实存在&#xff09;File file1 new File("D:\\2_WorkSpace\\qcbyProject\\shixun\\collection-test\\src\\FileTes…

B站广告开户投流是什么政策?要哪些资质?

B站&#xff08;哔哩哔哩&#xff09;作为年轻人喜爱的视频分享社区&#xff0c;其广告价值也日益凸显。为了更好地服务广告主&#xff0c;B站近日对广告开户投流政策进行了更新&#xff0c;云衔科技作为专业的数字营销服务商&#xff0c;也积极响应&#xff0c;为广告主提供一…

【项目实践】Ulike充电牙刷拆解

前言 用了一段时间的充电牙刷&#xff0c;某一次突然没电了&#xff0c;按键也没有反应。无奈只能废弃。最近略微得了些空闲&#xff0c;想着把它拆解看看里面的结构和电路。以下是鼓捣过程记录。 为什么不能直接抽出来&#xff1f; 在网上看到很多拆解视频&#xff0c;都是打开…