离线数仓同步数据3

news2025/1/8 4:21:47

业务数据_增量表数据同步

  • 1)Flume配置概述
  • 2)Flume配置实操
  • 3)通道测试
  • 4)编写Flume启停脚本

在这里插入图片描述

1)Flume配置概述

Flume需要将Kafka中topic_db主题的数据传输到HDFS,故其需选用KafkaSource以及HDFSSink,Channel选用FileChannel。
需要注意的是, HDFSSink需要将不同mysql业务表的数据写到不同的路径,并且路径中应当包含一层日期,用于区分每天的数据。关键配置如下:

2)Flume配置实操

1)创建Flume配置文件
在hadoop104节点的Flume的job目录下创建kafka_to_hdfs_db.conf
[atguigu@hadoop104 flume]$ mkdir job
[atguigu@hadoop104 flume]$ vim job/kafka_to_hdfs_db.conf 
(2)配置文件内容如下
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics = topic_db
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampAndTableNameInterceptor$Builder

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{tableName}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0


a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

(3)编写Flume拦截器
新建一个Maven项目,并在pom.xml文件中加入如下配置
<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

在com.atguigu.gmall.flume.interceptor包下创建TimestampAndTableNameInterceptor类
package com.atguigu.gmall.flume.interceptor;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
public class TimestampAndTableNameInterceptor implements Interceptor {
    @Override
    public void initialize() {
    }
    @Override
    public Event intercept(Event event) {
        Map<String, String> headers = event.getHeaders();
String log = new String(event.getBody(), StandardCharsets.UTF_8);

 		JSONObject jsonObject = JSONObject.parseObject(log);

 		Long ts = jsonObject.getLong("ts");
 		//Maxwell输出的数据中的ts字段时间戳单位为秒,Flume HDFSSink要求单位为毫秒
 		String timeMills = String.valueOf(ts * 1000);

 		String tableName = jsonObject.getString("table");

 		headers.put("timestamp", timeMills);
 		headers.put("tableName", tableName);
		return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {

        for (Event event : events) {
            intercept(event);
        }
        return events;
    }

    @Override
    public void close() {
    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimestampAndTableNameInterceptor ();
        }

        @Override
        public void configure(Context context) {

        }
    }
}






重新打包
将打好的包放入到hadoop104的/opt/module/flume/lib文件夹下
[atguigu@hadoop102 lib]$ ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

3)通道测试

1)启动Zookeeper、Kafka集群
(2)启动hadoop104的Flume
[atguigu@hadoop104 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_db.conf -Dflume.root.logger=info,console
(3)生成模拟数据
[atguigu@hadoop102 bin]$ cd /opt/module/db_log/
[atguigu@hadoop102 db_log]$ java -jar gmall2020-mock-db-2021-11-14.jar 
(4)观察HDFS上的目标路径是否有数据出现
若HDFS上的目标路径已有增量表的数据出现了,就证明数据通道已经打通。
(5)数据目标路径的日期说明
仔细观察,会发现目标路径中的日期,并非模拟数据的业务日期,而是当前日期。这是由于Maxwell输出的JSON字符串中的ts字段的值,是数据的变动日期。而真实场景下,数据的业务日期与变动日期应当是一致的。

4)编写Flume启停脚本

为方便使用,此处编写一个Flume的启停脚本
(1)在hadoop102节点的/home/atguigu/bin目录下创建脚本f3.sh
[atguigu@hadoop102 bin]$ vim f3.sh
	在脚本中填写如下内容
#!/bin/bash

case $1 in
"start")
        echo " --------启动 hadoop104 业务数据flume-------"
        ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_db.conf >/dev/null 2>&1 &"
;;
"stop")

        echo " --------停止 hadoop104 业务数据flume-------"
        ssh hadoop104 "ps -ef | grep kafka_to_hdfs_db | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac
(2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod 777 f3.sh
(3)f3启动
[atguigu@hadoop102 module]$ f3.sh start
(4)f3停止
[atguigu@hadoop102 module]$ f3.sh stop



2.2.6.3 Maxwell配置
1)Maxwell时间戳问题

此处为了模拟真实环境,对Maxwell源码进行了改动,增加了一个参数mock_date,该参数的作用就是指定Maxwell输出JSON字符串的ts时间戳的日期,接下来进行测试。
修改Maxwell配置文件config.properties,增加mock_date参数,如下
log_level=info

producer=kafka
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092

#kafka topic配置
kafka_topic=topic_db

#注:该参数仅在maxwell教学版中存在,修改该参数后重启Maxwell才可生效
mock_date=2020-06-14

# mysql login info
host=hadoop102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
注:该参数仅供学习使用,修改该参数后重启Maxwell才可生效。
重启Maxwell
[atguigu@hadoop102 bin]$ mxw.sh restart
重新生成模拟数据
[atguigu@hadoop102 bin]$ cd /opt/module/db_log/
[atguigu@hadoop102 db_log]$ java -jar gmall2020-mock-db-2021-11-14.jar 
观察HDFS目标路径日期是否正常


2.2.6.4 增量表首日全量同步

通常情况下,增量表需要在首日进行一次全量同步,后续每日再进行增量同步,首日全量同步可以使用Maxwell的bootstrap功能,方便起见,下面编写一个增量表首日全量同步脚本。
1)在~/bin目录创建mysql_to_kafka_inc_init.sh
[atguigu@hadoop102 bin]$ vim mysql_to_kafka_inc_init.sh
脚本内容如下
#!/bin/bash

# 该脚本的作用是初始化所有的增量表,只需执行一次

MAXWELL_HOME=/opt/module/maxwell

import_data() {
    $MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}

case $1 in
"cart_info")
  import_data cart_info
  ;;
"comment_info")
  import_data comment_info
  ;;
"coupon_use")
  import_data coupon_use
  ;;
"favor_info")
  import_data favor_info
  ;;
"order_detail")
  import_data order_detail
  ;;
"order_detail_activity")
  import_data order_detail_activity
  ;;
"order_detail_coupon")
  import_data order_detail_coupon
  ;;
"order_info")
  import_data order_info
  ;;
"order_refund_info")
  import_data order_refund_info
  ;;
"order_status_log")
  import_data order_status_log
  ;;
"payment_info")
  import_data payment_info
  ;;
"refund_payment")
  import_data refund_payment
  ;;
"user_info")
  import_data user_info
  ;;
"all")
  import_data cart_info
  import_data comment_info
  import_data coupon_use
  import_data favor_info
  import_data order_detail
  import_data order_detail_activity
  import_data order_detail_coupon
  import_data order_info
  import_data order_refund_info
  import_data order_status_log
  import_data payment_info
  import_data refund_payment
  import_data user_info
  ;;
esac
2)为mysql_to_kafka_inc_init.sh增加执行权限
[atguigu@hadoop102 bin]$ chmod 777 ~/bin/mysql_to_kafka_inc_init.sh

3)测试同步脚本
(1)清理历史数据
为方便查看结果,现将HDFS上之前同步的增量表数据删除
[atguigu@hadoop102 ~]$ hadoop fs -ls /origin_data/gmall/db | grep _inc | awk '{print KaTeX parse error: Expected 'EOF', got '}' at position 2: 8}̲' | xargs hadoo… mysql_to_kafka_inc_init.sh all
4)检查同步结果
观察HDFS上是否重新出现增量表数据。


2.3 采集通道启动/停止脚本
1)在/home/atguigu/bin目录下创建脚本cluster.sh
[atguigu@hadoop102 bin]$ vim cluster.sh
	在脚本中填写如下内容
#!/bin/bash

case $1 in
"start"){
        echo ================== 启动 集群 ==================

        #启动 Zookeeper集群
        zk.sh start

        #启动 Hadoop集群
        hdp.sh start

        #启动 Kafka采集集群
        kf.sh start

        #启动采集 Flume
        f1.sh start

#启动日志消费 Flume
        f2.sh start

#启动业务消费 Flume
        f3.sh start

#启动 maxwell
        mxw.sh start

        };;
"stop"){
        echo ================== 停止 集群 ==================

#停止 Maxwell
        mxw.sh stop

#停止 业务消费Flume
        f3.sh stop

#停止 日志消费Flume
        f2.sh stop

#停止 日志采集Flume
        f1.sh stop

        #停止 Kafka采集集群
        kf.sh stop

        #停止 Hadoop集群
        hdp.sh stop

        #停止 Zookeeper集群
        zk.sh stop

};;
esac


2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod 777 cluster.sh
3)cluster集群启动脚本
[atguigu@hadoop102 module]$ cluster.sh start
4)cluster集群停止脚本
[atguigu@hadoop102 module]$ cluster.sh stop

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/982044.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mysql索引条件下推 、 count(*)、count(1)、IN 、exists等

索引下推 Index Condition Pushdown(ICP)&#xff0c;是一种在存储引擎层使用索引过滤数据的优化方式 如果没有ICP&#xff0c;存储引擎会遍历以定位基表中的行&#xff0c;并将他们返回给mysql服务器&#xff0c;有mysql 服务器评估where 后面的条件是否保留行。启用ICP&…

Debian11安装PostgreSQL+PostGIS+pgRouting ,链接Navicat

船新版本&#xff0c;遵循官网教程 1 准备一个Debian11系统2 从官网安装Postgres2.1 安装Postgres2.2 修改Postgres密码2.3 配置Postgres远程访问 3 安装Postgis、pgRouting4 链接Navicat 1 准备一个Debian11系统 2 从官网安装Postgres 2.1 安装Postgres 1 进入Postgre的官网…

GE IC693CPU374CPU模块

处理能力&#xff1a;IC693CPU374 CPU 模块通常具有高性能的处理器&#xff0c;用于执行复杂的控制逻辑和数据处理任务。 内存容量&#xff1a;它通常具有内置的RAM和Flash存储器&#xff0c;用于存储控制程序、数据和配置信息。 多通信接口&#xff1a;该模块通常具有多个通…

集合的笔记

集合 包装类 泛型类有一个不幸的限制, 不能使用基本类型作为类型参数, 解决办法是使用这个包装类, 每一种基本类型都有对应的包装类 基本类型和它们对应的包装类型之间的转换是自动进行的。在调用 add 方法时&#xff0c;个值为 42 的 Integer 类对象在一种被称为自动装箱的过…

redis(0)-安装实操

1.基本概念 key-value型数据库&#xff0c;秒10万级查询。 2.计算向数据移动 3.安装步骤 3.1总体流程 //源码目录&#xff1a;/home/ftp/redis5 src //安装目录&#xff1a;make install /opt/tang/redis5/bin 只是一些bin文件 //make install 只是把bin 复制到某个路…

数据结构与算法学习(day1)——简化版桶排序

文章目录 前言本章目标简化版桶排序题目一题目二 前言 &#xff08;1&#xff09;我是一个大三的学生&#xff08;准确来说应该是准大三&#xff0c;因为明天才报名哈哈哈&#xff09;。 &#xff08;2&#xff09;最近就想每天闲着没事也刷些C语言习题来锻炼下编程水平&#x…

如何选择靠谱的全景平台?VR全景加盟从哪方面对比?

VR全景行业经过近几年的发展&#xff0c;已经逐渐普及开来&#xff0c;线下各个行业都有实体商家开始引入VR全景去做营销宣传推广了。不少老板也意识到线上线下双渠道的重要性&#xff0c;而VR全景的存在就刚好满足各行各业的需求&#xff0c;从这一点不难看出&#xff0c;VR全…

协议定制 + Json序列化反序列化

文章目录 协议定制 Json序列化反序列化1. 再谈 "协议"1.1 结构化数据1.2 序列化和反序列化 2. 网络版计算器2.1 服务端2.2 协议定制(1) 网络发送和读取的正确理解(2) 协议定制的问题 2.3 客户端2.4 代码 3. Json实现序列化反序列化3.1 简单介绍3.2 使用 协议定制 J…

《低代码指南》——AI低代码维格云技术与部署架构说明

#整体技术架构说明 维格云概念上由两个部分组成:工作台(workbench)和数据表(datasheet)。 工作台(workbench)维护系统集群节点、组织和用户数据,提供文件夹、表格、仪表盘、表单、镜像、审计、权限服务等功能。 数据表(datasheet)为多个协作成员提供实时协作,以便…

变电站自动化监控系统

力安科技变电站自动化监控系统是以箱式变电站为管理对象&#xff0c;加装箱变网关&#xff0c;在完成箱变智能化改造的基础上&#xff0c;依托电易云&#xff0c;构建一体化智慧箱变及运维系统。智能箱式变电站被广泛应用于住宅小区、城市公用变压器、工厂、商场、机场、电站等…

Java语法中一些需要注意的点(仅用于个人学习)

1.当字符串和其他类型相加时&#xff0c;基本都是字符串&#xff0c;这与运算顺序有关。 2.Java中用ctrl d 来结束循环输入。 3.nextLine() 遇到空格不会结束。 4.方法重载 4.1. 方法名必须相同 4.2. 参数列表必须不同(参数的个数不同、参数的类型不同、类型的次序必须不…

【大数据之Kafka】八、Kafka Broker之生产经验

1 节点服役和退役 1.1 服役新节点 新节点准备&#xff1a; &#xff08;1&#xff09;关闭hadoop104&#xff0c;并右键执行克隆操作。 &#xff08;2&#xff09;开启hadoop105&#xff0c;并修改 IP 地址为105。 vim /etc/sysconfig/network-scripts/ifcfg/ens33&#xff…

Edge官方鼠标手势

前言 日期&#xff1a;2023年8月 Edge浏览器目前已自带官方的鼠标手势功能&#xff0c;若要使用首先将浏览器更新至最新版&#xff0c;下文介绍使用方法。 官方鼠标手势 前提 更新Edge至最新版&#xff0c;并关闭其它鼠标手势扩展。 开启鼠标手势 打开Edge浏览器的设置&…

Linux c++开发-04-让Hello World更像一个工程

外层CMakeLists.txt src中的CMakeLists.txt ADD_EXECUTABLE(hello main.cpp)main.cpp 然后 cd build cmake … make ./bin/hello

TCPIP协议学习

TCP协议 连接导向&#xff1a;TCP是一种面向连接的协议&#xff0c;这意味着通信的两端在建立通信之前必须通过握手过程建立连接。握手过程包括三次握手&#xff0c;其中客户端向服务器发送连接请求&#xff0c;服务器回复确认&#xff0c;最后客户端再次确认连接。这样建立的连…

基于springboot+vue的校园失物招领系统-前后端分离(内含文档+源码+教程)

近年来&#xff0c;信息化管理行业的不断兴起&#xff0c;使得人们的日常生活越来越离不开计算机和互联网技术。首先&#xff0c;根据收集到的用户需求分析&#xff0c;对设计系统有一个初步的认识与了解&#xff0c;确定校园失物招领网站的总体功能模块。然后&#xff0c;详细…

InstructPix2Pix(CVPR2023)-图像编辑论文解读

文章目录 1.摘要2.背景3.算法3.1 生成多模态训练集3.1.1生成指令及成对caption3.1.2 依据成对的caption生成成对的图像 3.2 InstructPix2Pix 4.实验结果4.1基线比较4.2消融实验 5.结论 论文&#xff1a; 《InstructPix2Pix: Learning to Follow Image Editing Instructions》 …

什么是系统集成项目管理工程师,证书难考吗?

系统集成项目管理工程师&#xff0c;属于软考三个级别中的 “中级”&#xff0c;相当于中级职称。 系统集成项目管理程师是工信部和人社部举办的软考中新增开的一门考试。软考全称全国计算机技术与 软件专业技术资格&#xff08;水平&#xff09;考试&#xff0c;这门新开的系…

Nacos配置文件更新+热更新+多环境配置共享+集群搭建

对服务配置文件 场景&#xff1a; 如果多个服务对应的配置文件都需要更改时&#xff0c;可以利用配置管理&#xff0c;方便对配置文件进行更新&#xff0c;而且是在本地配置前先读取nacos的配置文件&#xff0c;优先级大于本地配置文件 配置步骤 1.首先在Nacos中的配置列表中增…

0-5V转4-20mA电路

本设计采用运放与三极管做二线制恒流源电路 分析&#xff1a; Va2*V- (1) (2) (3) 联立&#xff08;2&#xff09;&#xff08;3&#xff09;得&#xff1a; &#xff08;5&#xff09; 由于 &#xff08;6&#xff09; …