Dink进阶之路

news2024/11/25 9:54:21

1、环境变量

cat /etc/profile

#flink需要
export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=/etc/hadoop/conf

2、Flink配置

2.1、flink-conf.yaml

jobmanager.rpc.address: node-146
jobmanager.rpc.port: 6123
# 设置jobmanager总内存
jobmanager.memory.process.size: 4096m
# 设置taskmanager的运行总内存
taskmanager.memory.process.size: 4096m
# 设置用户代码运行内存
taskmanager.memory.task.heap.size: 3072m
# 设置flink框架内存
taskmanager.memory.framework.heap.size: 128m
# 设置managed memory内存
taskmanager.memory.managed.size: 128m
# 设置堆外内存
taskmanager.memory.framework.off-heap.size: 128m
# 设置网络缓存
taskmanager.memory.network.max: 128m
# 设置JVM内存
taskmanager.memory.jvm-metaspace.size: 256m
taskmanager.memory.jvm-overhead.max: 256m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
jobmanager.execution.failover-strategy: region
classloader.check-leaked-classloader: false
akka.ask.timeout: 50s
web.timeout: 50000
heartbeat.timeout: 180000
taskmanager.network.request-backoff.max: 240000
state.savepoints.dir: hdfs://hdfs-ha/flink/savepoints/
state.checkpoints.dir: hdfs://hdfs-ha/flink/checkpoints/
env.java.opts: -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError

jvm相关参数

堆设置
-Xms :初始堆大小
-Xmx :最大堆大小
-XX:NewSize=n :设置年轻代大小
-XX:NewRatio=n: 设置年轻代和年老代的比值。如:为3,表示年轻代与年老代比值为1:3,年轻代占整个年轻代年老代和的1/4
-XX:SurvivorRatio=n :年轻代中Eden区与两个Survivor区的比值。注意Survivor区有两个。如:3,表示Eden:Survivor=3:2,一个Survivor区占整个年轻代的1/5
-XX:MaxPermSize=n :设置持久代大小
收集器设置
-XX:+UseSerialGC :设置串行收集器
-XX:+UseParallelGC :设置并行收集器
-XX:+UseParalledlOldGC :设置并行年老代收集器
-XX:+UseConcMarkSweepGC :设置并发收集器
垃圾回收统计信息
-XX:+PrintHeapAtGC GC的heap详情
-XX:+PrintGCDetails  GC详情
-XX:+PrintGCTimeStamps  打印GC时间信息
-XX:+PrintTenuringDistribution    打印年龄信息等
-XX:+HandlePromotionFailure   老年代分配担保(true  or false)
并行收集器设置
-XX:ParallelGCThreads=n :设置并行收集器收集时使用的CPU数。并行收集线程数。
-XX:MaxGCPauseMillis=n :设置并行收集最大暂停时间
-XX:GCTimeRatio=n :设置垃圾回收时间占程序运行时间的百分比。公式为1/(1+n)
并发收集器设置
-XX:+CMSIncrementalMode :设置为增量模式。适用于单CPU情况。
-XX:ParallelGCThreads=n :设置并发收集器年轻代收集方式为并行收集时,使用的CPU数。并行收集线程数

2.2、masters


node-146:8081

2.2、workers

node-107
node-124
node-131
node-139

2.3、lib

flink-shaded-zookeeper-3.4.14.jar
commons-cli-1.5.0.jar
log4j-slf4j-impl-2.17.1.jar
log4j-core-2.17.1.jar
log4j-api-2.17.1.jar
log4j-1.2-api-2.17.1.jar
flink-json-1.13.6.jar
flink-csv-1.13.6.jar
flink-table_2.12-1.13.6.jar
flink-table-blink_2.12-1.13.6.jar
flink-dist_2.12-1.13.6.jar
flink-connector-jdbc_2.12-1.13.6.jar
flink-sql-connector-mysql-cdc-2.3.0.jar
flink-sql-connector-hive-3.1.2_2.12-1.13.6.jar
flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
flink-connector-starrocks-1.2.7_flink-1.13_2.12.jar
hudi-flink1.13-bundle_2.12-0.11.1.jar
mysql-connector-j-8.0.33.jar
flink-sql-connector-kafka_2.12-1.13.6.jar
flink-sql-connector-elasticsearch7_2.12-1.13.6.jar
dlink-client-base-0.7.4.jar
dlink-client-1.13-0.7.4.jar
dlink-common-0.7.4.jar

2.4、分发各节点

for host in {node-107,node-124,node-131,node-139};do scp /usr/bin/tarall root@$host:/data/app/;done

3、dinky配置

3.1、application.yml

url: jdbc:mysql://${MYSQL_ADDR:192.168.0.24:3306}/${MYSQL_DATABASE:dlink}?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username: ${MYSQL_USERNAME:dlink}
password: ${MYSQL_PASSWORD:Dlink*2023}
driver-class-name: com.mysql.cj.jdbc.Driver

3.2、plugins

/data/app/dlink-release-0.7.4/plugins

antlr-runtime-3.5.2.jar
flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
hive-exec-3.1.0.3.1.5.0-152.jar
javax.ws.rs-api-2.1.jar
jersey-common-2.27.jar
jersey-core-1.19.jar
libfb303-0.9.3.jar
mysql-connector-j-8.0.33.jar

3.3、plugins-flink

/data/app/dlink-release-0.7.4/plugins/flink1.13

flink-connector-jdbc_2.12-1.13.6.jar
flink-connector-starrocks-1.2.7_flink-1.13_2.12.jar
flink-csv-1.13.6.jar
flink-dist_2.12-1.13.6.jar
flink-doris-connector-1.13_2.12-1.0.3.jar
flink-json-1.13.6.jar
flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
flink-shaded-zookeeper-3.4.14.jar
flink-sql-connector-elasticsearch7_2.12-1.13.6.jar
flink-sql-connector-hive-3.1.2_2.12-1.13.6.jar
flink-sql-connector-kafka_2.12-1.13.6.jar
flink-sql-connector-mysql-cdc-2.3.0.jar
flink-table_2.12-1.13.6.jar
flink-table-blink_2.12-1.13.6.jar
hudi-flink1.13-bundle_2.12-0.11.1.jar

3.4、dinky启动

sh auto.sh start 1.13

3.5、上传jar包

3.5.1、创建HDFS目录
# 创建HDFS目录并上传dinky的jar包
sudo -u hdfs hdfs dfs -mkdir -p /dlink/{jar,flink-dist-13}

3.5.2、上传Flink的jar包
sudo -u hdfs hadoop fs -put /data/app/flink-1.13.6/lib /dlink/flink-dist-13
sudo -u hdfs hadoop fs -put /data/app/flink-1.13.6/plugins /dlink/flink-dist-13

3.5.3、上传dinky的jar包
sudo -u hdfs hdfs dfs -put /data/app/dlink-release-0.7.4/jar/dlink-app-1.13-0.7.4-jar-with-dependencies.jar /dlink/jar
sudo -u hdfs hadoop fs -put /data/app/dlink-release-0.7.4/lib/dlink-metadata-* /dlink/flink-dist-13/lib/
sudo -u hdfs hadoop fs -put druid-1.2.8.jar mysql-connector-j-8.0.33.jar /dlink/flink-dist-13/lib/

4、dinky操作

4.1、配置中心

4.1.1、Flink配置

提交 FlinkSQL 的 Jar 文件路径
在这里插入图片描述

4.2、注册中心

4.2.1、Flink实例管理
1、启动FlinkOnSession
bin/yarn-session.sh -jm 1024m -tm 1024m -nm flink-13-session -d

bin/yarn-session.sh -jm 4096 -tm 4096 -qu default -s 4 -nm spider-13-session -d

bin/yarn-session.sh \
-d -nm spider-13-session \
-p 2 \
-Dyarn.application.queue=default \
-Djobmanager.memory.process.size=4096mb \
-Dtaskmanager.memory.process.size=16384mb \
-Dtaskmanager.memory.framework.heap.size=128m \
-Dtaskmanager.memory.task.heap.size=15360m \
-Dtaskmanager.memory.managed.size=128m \
-Dtaskmanager.memory.framework.off-heap.size=128m \
-Dtaskmanager.memory.network.max=128m \
-Dtaskmanager.memory.jvm-metaspace.size=256m \
-Dtaskmanager.memory.jvm-overhead.max=256m \
-Dtaskmanager.numberOfTaskSlots=2

2、集群实例管理——新建

在这里插入图片描述

4、dinky开发

4.1、准备数据

4.1.1、MySQL中建表
 -- MySQL
CREATE TABLE products (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
       (default,"car battery","12V car battery"),
       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
       (default,"hammer","12oz carpenter's hammer"),
       (default,"hammer","14oz carpenter's hammer"),
       (default,"hammer","16oz carpenter's hammer"),
       (default,"rocks","box of assorted rocks"),
       (default,"jacket","water resistent black wind breaker"),
       (default,"spare tire","24 inch spare tire");

CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;

INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
       (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
       (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

update gmall.orders set price=55.5 where order_id=10001;


4.2、汇总到一个 topic

-- 汇总到一个 topic
-- 当指定 sink.topic 参数时,所有 Change Log 会被写入这一个 topic。

EXECUTE CDCSOURCE cdc_kafka_one WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '192.168.0.122',
 'port' = '3306',
 'username' = 'root',
 'password' = '123456',
 'checkpoint' = '3000',
 'scan.startup.mode' = 'initial',
 'parallelism' = '1',
 'table-name' = 'data\.products,data\.orders',
 'sink.connector'='datastream-kafka',
 'sink.topic'='dlinkcdc',
 'sink.brokers'='node-124:6667,node-131:6667,node-107:6667'
);

在这里插入图片描述

4.3、汇总到多个 topic

-- 当不指定 sink.topic 参数时,所有 Change Log 会被写入对应库表名的 topic。

EXECUTE CDCSOURCE cdc_kafka_mul WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '192.168.0.122',
 'port' = '3306',
 'username' = 'root',
 'password' = '123456',
 'checkpoint' = '3000',
 'scan.startup.mode' = 'initial',
 'parallelism' = '1',
 'table-name' = 'data\.products,data\.orders',
 'sink.connector'='datastream-kafka',
 'sink.brokers'='node-124:6667,node-131:6667,node-107:6667'
);

在这里插入图片描述

4.4、准备数据

4.4.1、MySQL中创建表
--MySQL中创建表s_user

CREATE TABLE `s_user` (
   `id` INT(11) NOT NULL,
   `name` VARCHAR(32) DEFAULT NULL,
   `p_id` INT(2) DEFAULT NULL,
   PRIMARY KEY (`id`)
);

--插入数据:
insert into s_user values(10086,'lm',61),(10010, 'ls',11), (10000,'ll',61);

4.4.2、StarRocks中创建表
CREATE TABLE IF NOT EXISTS tmp.`s_user` (
   `id` int(10) NOT NULL COMMENT "",
   `name` varchar(20) NOT NULL COMMENT "",
   `p_id` INT(2) NULL COMMENT ""
)
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
4.4.3、创建MySQL-To-StarRocks任务
--创建映射至MySQL的映射表source_mysql_suser
create table source_mysql_suser (
   id int,
   name string,
   p_id int,
   primary key (id) not enforced
)with (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://192.168.0.122:3306/data',
   'username' = 'root',
   'password' = '123456',
   'table-name' = 's_user'
);


--创建至StarRocks的映射表sink_starrocks_suser:
CREATE TABLE sink_starrocks_suser (
   id INT,
   name STRING,
   p_id INT,
   PRIMARY KEY (id) NOT ENFORCED
)WITH (
   'connector' = 'starrocks',
   'jdbc-url'='jdbc:mysql://192.168.0.106:9030',
   'load-url'='192.168.0.106:8030',
   'database-name' = 'tmp',
   'table-name' = 's_user',
   'username' = 'starrocks',
   'password' = 'StarRocks*2023',
   'sink.buffer-flush.interval-ms' = '5000',
   'sink.properties.column_separator' = '\x01',
   'sink.properties.row_delimiter' = '\x02'
);

--清洗数据并写入StarRocks
insert into sink_starrocks_suser select id,name,p_id from source_mysql_suser where p_id = 61;

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1043072.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MongoDB(二)基础操作 创建、删除,查询等

mongodb有一个特点,如果某个库,库下面没数据(mongodb成集合),该库等于不存在的 mongodb只要创建一个库,在库下写入数据,该库才会生成 mongoshe [-hhost -pxxx] 创建数据库 use 数据库名 # 如果…

阿里巴巴OceanBase介绍

前言 官网地址:https://www.oceanbase.com/ OceanBase是由蚂蚁集团完全自主研发的国产原生分布式数据库,始创于2010年。是全球唯一在 TPC-C 和 TPC-H 测试上都刷新了世界纪录的国产原生分布式数据库。 2010年,创始人阳振坤加入阿里巴巴&…

华为云云耀云服务器L实例评测|华为云云耀云服务器L实例CentOS的存储和备份策略

1 华为云云耀云服务器L实例介绍 华为云云耀云服务器L实例是华为云计算服务中的一种虚拟云服务器,它提供了强大的计算资源,可以在云端运行各种应用程序和服务。 华为云服务器提供了多种实例类型,包括通用型、计算优化型、内存优化型等&#…

数据中心不能“偏科”,AIGC时代算力、存力需协调发展

黄金比例是数学上一种堪称“完美”的比例关系,最早由欧几里得在《几何原本》中进行了系统论述。之后,黄金比例的理念被广泛应用到数学、物理、建筑、农业等多个领域,代表着最合理、最协调的一种情况或者状态。 在数据中心领域,相…

模板的注意事项

目录 swap函数&#xff1a; 模板不支持分离编译 声明和定义分离的好处 swap函数&#xff1a; #include<iostream> using namespace std; template <class T> void swap(T&left, T&right) {T temp right;right left;left temp; } int main() {int a …

Vue比较两个数字大小

实现一个比较两个数字大小的页面&#xff0c;练习Vue实例的创建、数据绑定和事件监听方法&#xff1b; <div id"aa"> <ul> <li> <span>第一个数&#xff1a;</span><input v-model.number"first"/> </li> <…

详解Java执行groovy脚本的两种方式

详解Java执行groovy脚本的两种方式 文章目录 详解Java执行groovy脚本的两种方式介绍记录Java执行groovy脚本的两种invokeFunction:invokeMethod:以下为案例&#xff1a;引入依赖定义脚本内容并执行运行结果&#xff1a;例如把脚本内容定义为这样&#xff1a;执行结果就是这样了…

微盟十年:踩准节奏,持续增长

今年以来&#xff0c;中国SaaS行业经历了资本由炽热到寒冬&#xff0c;行业融资笔数和金额均呈现断崖式下跌&#xff0c;截至7月共发生投融资50笔&#xff0c;与去年相比直接腰斩&#xff1b;投融资金额43.52亿元&#xff0c;与2021年同期的258.2亿元、2022年同期的142.37亿元形…

适合在家做的副业 整理5个,有电脑就行

今天&#xff0c;我们不说别的&#xff0c;整理5个适合个人在家单干的副业。需要电脑&#xff0c;如果你没电脑就不用看了&#xff0c;最后两个&#xff0c;我们也在做&#xff0c;你可以看到最后了解。这些副业&#xff0c;大家多去实践操作&#xff0c;前期&#xff0c;每月三…

单层和多层中的应力和分层控制

引言 类金刚石碳(DLC)膜具有诸如高硬度和低摩擦系数的优异特性&#xff0c;并且在切削工具、金属模具和机器部件中具有应用。不幸的是&#xff0c;它们通常表现出低粘合强度由于高的内部压缩应力&#xff0c;导致从衬底上剥离。英思特已经尝试了各种方法来降低内应力&#xff…

OJ练习第181题——寻找两个正序数组的中位数

寻找两个正序数组的中位数 力扣链接&#xff1a;4. 寻找两个正序数组的中位数 题目描述 给定两个大小分别为 m 和 n 的正序&#xff08;从小到大&#xff09;数组 nums1 和 nums2。请你找出并返回这两个正序数组的 中位数 。 算法的时间复杂度应该为 O(log (mn)) 。 示例…

V4L2 Camera 开发

一、什么是V4L2 vl42是video for Linux 2的缩写&#xff0c;是一套Linux内核视频设备的驱动框架&#xff0c;该驱动框架为应用层提供一套统一的操作接口(一系列的ioctl) 假如要进行视频数据采集&#xff0c;大体的步骤如图左侧所示&#xff1a; 打开设备文件/dev/videoX&…

从零学算法(LCR 178)

教学过程中&#xff0c;教练示范一次&#xff0c;学员跟做三次。该过程被混乱剪辑后&#xff0c;记录于数组 actions&#xff0c;其中 actions[i] 表示做出该动作的人员编号。请返回教练的编号。 示例 1&#xff1a; 输入&#xff1a;actions [5, 7, 5, 5] 输出&#xff1a;7 …

java中使用redis2个库并支持Redis哈希表

一个redis实例&#xff0c;默认包含16个库&#xff0c;序号从0到15。在redis命令行中&#xff0c;可以用select 序号来切换。我最近在做的一个项目中&#xff0c;需要使用redis的2个库。一个是由其他子系统写入&#xff0c;web后端&#xff08;java&#xff09;只读取&#xff…

瑞芯微RK3568|SDK开发之Kernel编译

1. Kernel手动编译 1.1 kernel查询帮助 使用./build.sh -h kernel查看kernel的详细编译命令如下所示。 图1.1编译内核 上图表示&#xff0c;单独编译kernel固件分为三步&#xff0c;进入kernel目录&#xff0c;选择默认配置文件&#xff0c;编译镜像。 1.2 kernel…

多家快递公司物流信息批量查询方法及操作说明

在网购时代&#xff0c;快递单号的查询和管理成了一个重要的问题。尤其是对于需要处理大量快递单号的人来说&#xff0c;一个高效、便捷的查询工具至关重要。本文将介绍如何使用“固乔快递查询助手”软件&#xff0c;快速、准确地查询多家不同快递公司的快递单号&#xff0c;并…

最新白皮书:软件定义的硬件打开通往高性能数据加速的大门

在众多行业的数字化转型过程中&#xff0c;基于硬件的数据处理加速是构建高性能、高效率智能系统的关键之处&#xff0c;因而市场上出现了诸如FPGA、GPU和xPU等许多通用或者面向特定应用&#xff08;如NPU&#xff09;的硬件加速器。尽管它们的性能和效率都高于通用处理器&…

基于R的linkET包qcorrplot可视化Mantel test相关性网络热图分析correlation heatmap

写在前面 需求是对瘤胃宏基因组结果鉴定到的差异菌株与表观指标、瘤胃代谢组、血清代谢组、牛奶代谢组中有差异的部分进行关联分析&#xff0c;效果图如下&#xff1a; 数据准备 逗号分隔的csv格式文件&#xff0c;两个表格&#xff0c;一个是每个样本对应的表观指标数据&…

工业交换机常见的故障有哪些?

通常情况下&#xff0c;工业交换机出现故障可以分为两类&#xff1a;软件性能故障和硬件物理故障。软性能故障通常指工业交换机在研发设计阶段出现的问题。 物理层故障主要指交换机本身的硬件故障以及连接交换机的物理线路故障。安防专用工业交换机的交换是根据通信双方传输信…

构建智能客服知识库,优化客户体验不是难题!

在当今快节奏的商业环境中&#xff0c;客户都希望得到及时个性化的支持&#xff0c;拥有一个智能客服知识库对于现代企业至关重要。智能客服知识库是一个集中存储、组织和访问与客户服务互动相关的信息的综合性知识库。它为企业提供了全面的知识来源&#xff0c;使他们能够为客…