FlinkX学习

news2025/1/9 1:45:37

FlinkX学习

FlinkX安装

由于flinkx已经改名chunjun 官网已不存在

(https://gitee.com/lugela/flinkx#flinkx)这里可以看到flinkx的操作文档

1、上传并解压

unzip flinkx-1.10.zip -d /usr/local/soft/

2、配置环境变量

FLINKX_HOME=/usr/local/soft/flinkx-1.10
export PATH=$FLINKX_HOME/bin:$PATH

3、给bin/flinkx这个文件加上执行权限

 chmod +x flinkx

4、修改配置文件,设置运行端口

vim flinkconf/flink-conf.yaml

## web服务端口,不指定的话会随机生成一个
rest.bind-port: 8888

启动

命令行参数选项

  • model
    • 描述:执行模式,也就是flink集群的工作模式
      • local: 本地模式
      • standalone: 独立部署模式的flink集群
      • yarn: yarn模式的flink集群,需要提前在yarn上启动一个flink session,使用默认名称"Flink session cluster"
      • yarnPer: yarn模式的flink集群,单独为当前任务启动一个flink session,使用默认名称"Flink per-job cluster"
    • 必选:否
    • 默认值:local
  • job
    • 描述:数据同步任务描述文件的存放路径;该描述文件中使用json字符串存放任务信息。
    • 必选:是
    • 默认值:无
  • pluginRoot
    • 描述:插件根目录地址,也就是打包后产生的pluginRoot目录。
    • 必选:是
    • 默认值:无
  • flinkconf
    • 描述:flink配置文件所在的目录(单机模式下不需要),如/opt/dtstack/flink-1.8.1/conf
    • 必选:否
    • 默认值:无
  • yarnconf
    • 描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop
    • 必选:否
    • 默认值:无
  • flinkLibJar
    • 描述:flink lib所在的目录(单机模式下不需要),如/opt/dtstack/flink-1.8.1/lib
    • 必选:否
    • 默认值:无
  • confProp
    • 描述:flink相关参数,如{“flink.checkpoint.interval”:200000}
    • 必选:否
    • 默认值:无
  • queue
    • 描述:yarn队列,如default
    • 必选:否
    • 默认值:无
  • pluginLoadMode
    • 描述:yarnPer模式插件加载方式:
      • classpath:提交任务时不上传插件包,需要在yarn-node节点pluginRoot目录下部署插件包,但任务启动速度较快
      • shipfile:提交任务时上传pluginRoot目录下部署插件包的插件包,yarn-node节点不需要部署插件包,任务启动速度取决于插件包的大小及网络环境
    • 必选:否
    • 默认值:classpath

FlinkX概述

FlinkX是在是袋鼠云内部广泛使用的基于flink的分布式离线和实时的数据同步框架,实现了多种异构数据源之间高效的数据迁移。

不同的数据源头被抽象成不同的Reader插件,不同的数据目标被抽象成不同的Writer插件。理论上,FlinkX框架可以支持任意数据源类型的数据同步工作。作为一套生态系统,每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

FlinkX是一个基于Flink的批流统一体的数据同步工具,既可以采集静态的数据,比如MySQL,HDFS等,也可以采集实时变化的数据,比如MySQL binlog,Kafka等

在底层实现上,FlinkX依赖Flink,数据同步任务会被翻译成StreamGraph在Flink上执行.

image-20220619223533456.png

FlinkX的简单使用

MySQL2HDFS

场景

将mysql Y1数据库下的Student表数据写入HDFS上的指定路径中

参考文档

mysqlreader:(https://gitee.com/lugela/flinkx/blob/1.10_release/docs/offline/reader/mysqlreader.md)

hdfswriter:(https://gitee.com/lugela/flinkx/blob/1.10_release/docs/offline/writer/hdfswriter.md)

创建mysql2hdfs.json文件

{
    "job": {
        "content": [
            {
                "reader": {
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false"
                                ],
                                "table": [
                                    "Student"
                                ]
                            }
                        ],
                        "column": [
                            "*"
                        ],
                        "where": "Sid > 05 ",
                        "requestAccumulatorInterval": 2
                    },
                    "name": "mysqlreader"
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "path": "hdfs://master:9000/bigdata30/flinkx/out1",
                        "defaultFS": "hdfs://master:9000",
                        "column": [
                            {
                                "name": "col1",
                                "index": 0,
                                "type": "string"
                            },{
                                "name": "col2",
                                "index": 1,
                                "type": "string"
                            },{
                                "name": "col3",
                                "index": 2,
                                "type": "string"
                            },{
                                "name": "col4",
                                "index": 3,
                                "type": "string"
                            }
                        ],
                        "fieldDelimiter": ",",
                        "fileType": "text",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "restore": {
                "isRestore": false,
                "isStream": false
            },
            "errorLimit": {},
            "speed": {
                "channel": 1
            }
        }
    }
}

运行模式

  • 单机模式:对应Flink集群的单机模式
  • standalone模式:对应Flink集群的分布式模式
  • yarn模式:对应Flink集群的yarn模式
  • yarnPer模式: 对应Flink集群的Per-job模式

运行:

flinkx -mode local -job ./mysql2hdfs.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/

监听日志:

flinkx 任务启动后,会在执行命令的目录下生成一个nohup.out文件

tail -f nohup.out

通过web界面查看任务运行情况

http://master:8888

hdfs上出现文件:

image.png

查看该文件:

hdfs dfs -cat /bigdata30/flinkx/out1/0.44b7d6c8dcaadcc14ae55fb482f9fb27.0

出现Sid大于05的学生:

image.png

MySQLToHive

hivewrite:(https://github.com/oceanos/flinkx/blob/1.8_release/docs/hivewriter.md)

配置文件:

{
    "job": {
        "content": [
            {
                "reader": {
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false"
                                ],
                                "table": [
                                    "Student"
                                ]
                            }
                        ],
                        "column": [
                            "*"
                        ],
                        "where": "Sid > 05 ",
                        "requestAccumulatorInterval": 2
                    },
                    "name": "mysqlreader"
                },
                "writer": {
                    "name": "hivewriter",
                    "parameter": {
                        "jdbcUrl": "jdbc:hive2://master:10000/bigdata30",
                        "username": "",
                        "password": "",
                        "fileType": "text",
                        "fieldDelimiter": ",",
                        "writeMode": "overwrite",
                        "compress": "",
                        "charsetName": "UTF-8",
                        "maxFileSize": 1073741824,
                        "tablesColumn": "{\"Student\":[{\"key\":\"SId\",\"type\":\"string\"},{\"key\":\"Sname\",\"type\":\"string\"},{\"key\":\"Sage\",\"type\":\"string\"},{\"key\":\"Ssex\",\"type\":\"string\"}]}",
                        "defaultFS": "hdfs://master:9000"
                    }
                }
            }
        ],
        "setting": {
            "restore": {
                "isRestore": false,
                "isStream": false
            },
            "errorLimit": {},
            "speed": {
                "channel": 1
            }
        }
    }
}

在hive中建表:

CREATE TABLE `bigdata30`.`Student`(
    `SId` STRING,
    `Sname` STRING,
    `Sage` STRING,
    `Ssex` STRING
				)
PARTITIONED BY ( 
  `pt` string)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',';

启动hiveserver2

启动任务

flinkx -mode local -job ./mysql2hive.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/

运行发现报错 无法解决。

翻阅chunjun官网 在hive-sink中发现 只支持hive1.x hive2.x 现hive版本为3.1.2 不支持 猜测报错原因

尝试使用chunjun 解决

MySQLToHBase

场景

将mysql Y1数据库中的Student表数据写入HBase flinkx_Student表中

配置文件

{
    "job": {
        "content": [
            {
                "reader": {
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false"
                                ],
                                "table": [
                                    "Student"
                                ]
                            }
                        ],
                        "column": [
                            "*"
                        ],
                        "where": "Sid > 05 ",
                        "requestAccumulatorInterval": 2
                    },
                    "name": "mysqlreader"
                },
                "writer": {
                    "name": "hbasewriter",
                    "parameter": {
                        "hbaseConfig": {
                            "hbase.zookeeper.property.clientPort": "2181",
                            "hbase.rootdir": "hdfs://master:9000/hbase",
                            "hbase.cluster.distributed": "true",
                            "hbase.zookeeper.quorum": "master,node1,node2",
                            "zookeeper.znode.parent": "/hbase"
                        },
                        "table": "flinkx_Student",
                        "rowkeyColumn": "$(cf1:SId)",
                        "column": [
                            {
                                "name": "cf1:SId",
                                "type": "string"
                            },
                            {
                                "name": "cf1:Sname",
                                "type": "string"
                            },
                            {
                                "name": "cf1:Sage",
                                "type": "string"
                            },
                            {
                                "name": "cf1:Ssex",
                                "type": "string"
                            }
                        ]
                    }
                }
            }
        ],
        "setting": {
            "restore": {
                "isRestore": false,
                "isStream": false
            },
            "errorLimit": {},
            "speed": {
                "channel": 1
            }
        }
    }
}

在hbase中创建flinkx_Student表

create 'flinkx_Student','cf1'

启动

flinkx -mode local -job ./mysql2hbase.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/

hbase中的flinkx_Student表出现数据

image.png

MySQLToMySQL

场景

将mysql Y1数据库中的Student表数据写入datax1数据库中的Student2表中

配置文件 mysql2mysql.json

{
    "job": {
      "content": [
        {
          "reader": {
            "name": "mysqlreader",
            "parameter": {
              "column": [
                {
                  "name": "SId",
                  "type": "string"
                },
                {
                  "name": "Sname",
                  "type": "string"
                },
                {
                  "name": "Sage",
                  "type": "string"
                },
                {
                  "name": "Ssex",
                  "type": "string"
                }
              ],
              "username": "root",
              "password": "123456",
              "connection": [
                {
                  "jdbcUrl": [
                    "jdbc:mysql://master:3306/Y1?useSSL=false"
                  ],
                  "table": [
                    "Student"
                  ]
                }
              ]
            }
          },
          "writer": {
            "name": "mysqlwriter",
            "parameter": {
              "username": "root",
              "password": "123456",
              "connection": [
                {
                  "jdbcUrl": "jdbc:mysql://master:3306/datax1?useSSL=false",
                  "table": [
                    "Student2"
                  ]
                }
              ],
              "writeMode": "insert",
              "column": [
                {
                    "name": "SId",
                    "type": "string"
                  },
                  {
                    "name": "Sname",
                    "type": "string"
                  },
                  {
                    "name": "Sage",
                    "type": "string"
                  },
                  {
                    "name": "Ssex",
                    "type": "string"
                  }
              ]
            }
          }
        }
      ],
      "setting": {
        "speed": {
          "channel": 1,
          "bytes": 0
        }
      }
    }
  }

在mysql datax1数据库中建表:

create table if not exists datax1.Student2(
    SID varchar(10),
    Sname varchar(100),
    Sage varchar(100),
    Ssex varchar(10)
)CHARSET = utf8 COLLATE utf8_general_ci;

运行:

flinkx -mode local -job ./mysql2mysql.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/

进入网页查看:

master:8888

image.png

查看Student2表 数据已导入:

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1870898.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PHP-CGI的漏洞(CVE-2024-4577)

通过前两篇文章的铺垫,现在我们可以了解 CVE-2024-4577这个漏洞的原理 漏洞原理 CVE-2024-4577是CVE-2012-1823这个老漏洞的绕过,php cgi的老漏洞至今已经12年,具体可以参考我的另一个文档 简单来说,就是使用cgi模式运行的PHP&…

零拷贝技术(zero copy),DMA,mmap,sendfile

在一些高性能的IO场景下我们经常能听到零拷贝技术,这是个不错的话题。 零拷贝指的是内核态与用户态之间的数据拷贝,而这两个区域的数据拷贝只能依靠CPU,但是CPU最重要的作用应该是运算。 一、DMA的由来 在没有DMA之前,磁盘的IO…

【NPS】哑终端设备如何实现域VLAN动态分配

在【NPS】微软NPS配置802.1x,验证域账号,动态分配VLAN(有线网络续篇)中,已经通过C3PL策略配置实现了802.1x验证没有通过时,自动分配一个Guest VLAN,以确保用户至少能够访问基本的网络服务。问题…

数字时代的文化革命:Facebook的社会影响

随着数字技术的飞速发展和互联网的普及,社交网络如今已成为人们日常生活中不可或缺的一部分。在众多社交平台中,Facebook作为最大的社交网络之一,不仅连接了全球数十亿用户,更深刻影响了人们的社会互动方式、文化认同和信息传播模…

展开说说:Android列表之RecyclerView

RecyclerView 它是从Android5.0出现的全新列表组件,更加强大和灵活。用于显示列表形式 (list) 或者网格形式 (grid) 的数据,替代ListView和GridView成为Android主流的列表组件。可以说Android客户端只要有表格的地方就有RecyclerView。 RecyclerView 内…

【linux】使用vnc连接远程桌面,需要安装tigervnc,并在服务端期待,然后在客户端使用tigervnc-viewer进行连接即可

vnc 远程设置方法 需要服务端安装软件: sudo apt install -y tigervnc-standalone-server# 先配置密码使用: tightvncpasswd启动服务,禁用本机 vncserver -localhost no -geometry 1924x1080 :1客户端安装软件: sudo apt insta…

JavaScript高级程序设计(第四版)--学习记录之基本引用类型

Date Date类型将日期保存为自协调世界时间1970年1月1日午夜至今所经过的毫秒数。 创建日期对象 let now new Date() Date.parse()方法接收一个表示日期的字符串参数,尝试将这个字符串转换为表示该日期的毫秒数。 let time new Date(Date.parse("May 24,2024&…

Jmeter+InfluxDB+Grafana性能测试数据展示

JmeterInfluxDBGrafana提供了一种更好的对Jmeter压测结果的实时监控展示。可以理解为数据源产生的数据加上时间记录并存储,然后使用各种开源图表组件进行展示。实现jmeter报告的更好的可视化展示 1)方便测试结果数据落地以及更好的分析 2)将…

超好用的思维导图—万兴亿图脑图 v10解锁版安装教程 (思维导图软件和头脑风暴工具)

前言 万兴亿图脑图 (Wondershare EdrawMind) 是一款多平台协作思维导图软件和头脑风暴工具,亿图思维导图提供丰富的布局,样式,主题及配色方案,集成拥有数万幅原创思维导图作品的思维导图社区,涵盖教育,职场,自我提升等各大领域精华知识.支持会议演示,多端创作,云端存储,导图分…

BioCLIP:物种图像的基础视觉模型

从无人机到个人手机,各种相机收集的自然世界图像是越来越丰富的生物信息来源。从图像中提取生物相关信息用于科学的计算方法和工具激增,尤其是计算机视觉。然而,其中大多数都是为特定任务设计的,不容易适应或扩展到新的问题、环境…

第30课 绘制原理图——放置网络标签

什么是网络标签? 我们在很多电路图中都能看到,为了让图纸更加简洁,并不是每一根导线都要确确实实地画出来。可以在导线悬空的一端添加一个名称标签,接着在另一根导线的悬空一端添加上一个同名的名称标签,那么就可以让…

1.回溯算法.基础

1.回溯算法 基础知识题目1.组合2.组合-优化3.组合总和|||4.电话号码和字母组合5.组合总和6.组合总和II7.分割回文串8.复原IP地址 基础知识 回溯法也可以叫做回溯搜索法,它是一种搜索的方式。回溯是递归的副产品,只要有递归就会有回溯 因为回溯的本质是穷…

【毛毛虫案例-重力 Objective-C语言】

一、接下来,我们给这个毛毛虫,添加一下重力 1.把我们之前的代码,复制粘贴一份儿,改个名字,叫做:17-毛毛虫案例-重力, 重力的话,实际上,就比较简单了啊,那我们重力的话,去添加的时候,我也要在外面,去添加, 重力的话,叫做啥,UIGravityBehavior,啊, UIGravity…

2024年度临沂市安全文化书画摄影展开幕

人海信息网山东讯 6月27日,2024年度临沂市安全文化书画摄影作品展,在临沂高新区隆重开幕。本次书画摄影展深入贯彻“以人为本,安全发展”的重要思想,立意高远,内涵丰富,思想深邃,承载着健康、幸…

【公开数据集获取】

Open Images Dataset https://www.youtube.com/watch?vdLSFX6Jq-F0

MySQL锁和使用

在MySQL中,锁用于控制并发访问,以保证数据的一致性和完整性。MySQL提供了多种类型的锁,包括表级锁、行级锁和页面级锁。以下是MySQL中各种锁的详细介绍及其使用方法: 1. 表级锁(Table Locks) 表级锁用于锁…

AI绘画Stable Diffusion 超强一键去除图片中的物体,免费使用!

大家好,我是设计师阿威 在生成图像时总有一些不完美的小瑕疵,比如多余的物体或碍眼的水印,它们破坏了图片的美感。但别担心,今天我们将介绍一款神奇的工具——sd-webui-cleaner,它可以帮助我们使用Stable Diffusion轻…

简易深度学习(1)深入分析神经元及多层感知机

一、神经元 单个神经元结构其实可以认为是一个线性回归模型。例如下图中 该神经元输入为三个特征(x1,x2,x3),为了方便理解,大家可以认为每条线上都有一个权重和特征对应(w1,w2&…

62.指针和二维数组(2)

一.指针和二维数组 1.如a是一个二维数组,则数组中的第i行可以看作是一个一维数组,这个一维数组的数组名是a[i]。 2.a[i]代表二维数组中第i行的首个元素的地址,即a[i][0]的地址。 二.进一步思考 二维数组可以看作是数组的数组,本…

算法入门:二分查找及其Java实现

在程序开发中,算法是解决问题的核心。本篇博客将详细讲解一种高效的查找算法——二分查找,并通过Java代码示例帮助你理解其实现和应用。 如果你觉得这篇文章对你有帮助,不要忘记点赞、收藏和关注我,这将是对我最大的支持和鼓励&am…