9.2、增量表数据同步

news2024/11/24 12:56:16

1、数据通道

2、Flume配置

1)Flume配置概述

Flume需要将Kafka中topic_db主题的数据传输到HDFS,故其需选用KafkaSource以及HDFSSink,Channel选用FileChannel。

需要注意的是, HDFSSink需要将不同mysql业务表的数据写到不同的路径,并且路径中应当包含一层日期,用于区分每天的数据。关键配置如下:

具体数据示例如下:

2)Flume配置实操

(1)创建Flume配置文件

在hadoop104节点的Flume的job目录下创建kafka_to_hdfs_db.conf

[shuidi@hadoop104 flume]$ mkdir job
[shuidi@hadoop104 flume]$ vim job/kafka_to_hdfs_db.conf 

 (2)配置文件内容如下

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics = topic_db
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampAndTableNameInterceptor$Builder

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{tableName}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0


a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

(3)编写Flume拦截器

①新建一个Maven项目,并在pom.xml文件中加入如下配置

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

②在com.atguigu.gmall.flume.interceptor包下创建TimestampAndTableNameInterceptor类

package com.atguigu.gmall.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class TimestampAndTableNameInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);         
        JSONObject jsonObject = JSONObject.parseObject(log);         
        Long ts = jsonObject.getLong("ts");         //Maxwell输出的数据中的ts字段时间戳单位为秒,Flume HDFSSink要求单位为毫秒         
        String timeMills = String.valueOf(ts * 1000);         
        String tableName = jsonObject.getString("table");         
        headers.put("timestamp", timeMills);         
        headers.put("tableName", tableName);        
        return event;

    }

    @Override
    public List<Event> intercept(List<Event> events) {

        for (Event event : events) {
            intercept(event);
        }

        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {


        @Override
        public Interceptor build() {
            return new TimestampAndTableNameInterceptor ();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

③重新打包

 ④将打好的包放入到hadoop104的/opt/module/flume/lib文件夹下

[shuidi@hadoop102 lib]$ ls | grep interceptor flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

3)通道测试

(1)启动Zookeeper、Kafka集群

(2)启动hadoop104的Flume

[shuidi@hadoop104 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_db.conf -Dflume.root.logger=info,console

(3)生成模拟数据

[shuidi@hadoop102 bin]$ cd /opt/module/db_log/
[shuidi@hadoop102 db_log]$ java -jar gmall2020-mock-db-2021-11-14.jar 

(4)观察HDFS上的目标路径是否有数据出现

若HDFS上的目标路径已有增量表的数据出现了,就证明数据通道已经打通。

(5)数据目标路径的日期说明

仔细观察,会发现目标路径中的日期,并非模拟数据的业务日期,而是当前日期。这是由于Maxwell输出的JSON字符串中的ts字段的值,是数据的变动日期。而真实场景下,数据的业务日期与变动日期应当是一致的。

4)编写Flume启停脚本

为方便使用,此处编写一个Flume的启停脚本

(1)在hadoop102节点的/home/shuidi/bin目录下创建脚本f3.sh

[shuidi@hadoop102 bin]$ vim f3.sh

在脚本中填写如下内容

#!/bin/bash

case $1 in
"start")
        echo " --------启动 hadoop104 业务数据flume-------"
        ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_db.conf >/dev/null 2>&1 &"
;;
"stop")

        echo " --------停止 hadoop104 业务数据flume-------"
        ssh hadoop104 "ps -ef | grep kafka_to_hdfs_db | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9"
;;
esac

(2)增加脚本执行权限

[shuidi@hadoop102 bin]$ chmod 777 f3.sh

(3)f3启动

[shuidi@hadoop102 module]$ f3.sh start

(4)f3停止

[shuidi@hadoop102 module]$ f3.sh stop

3、Maxwell配置

1)Maxwell时间戳问题

此处为了模拟真实环境,对Maxwell源码进行了改动,增加了一个参数mock_date,该参数的作用就是指定Maxwell输出JSON字符串的ts时间戳的日期,接下来进行测试。

①修改Maxwell配置文件config.properties,增加mock_date参数,如下

log_level=info

producer=kafka
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092

#kafka topic配置
kafka_topic=topic_db

#注:该参数仅在maxwell教学版中存在,修改该参数后重启Maxwell才可生效
mock_date=2020-06-14

# mysql login info
host=hadoop102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai

 

注:该参数仅供学习使用,修改该参数后重启Maxwell才可生效。

②重启Maxwell

[shuidi@hadoop102 bin]$ mxw.sh restart

③重新生成模拟数据

[shuidi@hadoop102 bin]$ cd /opt/module/db_log/
[shuidi@hadoop102 db_log]$ java -jar gmall2020-mock-db-2021-11-14.jar 

④观察HDFS目标路径日期是否正常

4、增量表首日全量同步

通常情况下,增量表需要在首日进行一次全量同步,后续每日再进行增量同步,首日全量同步可以使用Maxwell的bootstrap功能,方便起见,下面编写一个增量表首日全量同步脚本。

1)在~/bin目录创建mysql_to_kafka_inc_init.sh

[shuidi@hadoop102 bin]$ vim mysql_to_kafka_inc_init.sh

脚本内容如下

#!/bin/bash

# 该脚本的作用是初始化所有的增量表,只需执行一次

MAXWELL_HOME=/opt/module/maxwell

import_data() {
    $MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}

case $1 in
"cart_info")
  import_data cart_info
  ;;
"comment_info")
  import_data comment_info
  ;;
"coupon_use")
  import_data coupon_use
  ;;
"favor_info")
  import_data favor_info
  ;;
"order_detail")
  import_data order_detail
  ;;
"order_detail_activity")
  import_data order_detail_activity
  ;;
"order_detail_coupon")
  import_data order_detail_coupon
  ;;
"order_info")
  import_data order_info
  ;;
"order_refund_info")
  import_data order_refund_info
  ;;
"order_status_log")
  import_data order_status_log
  ;;
"payment_info")
  import_data payment_info
  ;;
"refund_payment")
  import_data refund_payment
  ;;
"user_info")
  import_data user_info
  ;;
"all")
  import_data cart_info
  import_data comment_info
  import_data coupon_use
  import_data favor_info
  import_data order_detail
  import_data order_detail_activity
  import_data order_detail_coupon
  import_data order_info
  import_data order_refund_info
  import_data order_status_log
  import_data payment_info
  import_data refund_payment
  import_data user_info
  ;;
esac

2)为mysql_to_kafka_inc_init.sh增加执行权限

[shuidi@hadoop102 bin]$ chmod 777 ~/bin/mysql_to_kafka_inc_init.sh

3)测试同步脚本

(1)清理历史数据

为方便查看结果,现将HDFS上之前同步的增量表数据删除

[shuidi@hadoop102 ~]$ hadoop fs -ls /origin_data/gmall/db | grep _inc | awk '{print $8}' | xargs hadoop fs -rm -r -f

(2)执行同步脚本

[shuidi@hadoop102 bin]$ mysql_to_kafka_inc_init.sh all 

4)检查同步结果

观察HDFS上是否重新出现增量表数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/721527.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023.7.4 Dataloader切分

一、 如果文件夹路径是 path/to/folder with spaces/&#xff0c;使用以下方式输入 path/to/folder\ with\ spaces/或者使用引号包裹路径&#xff1a; "path/to/folder with spaces/"这样可以确保命令行正确解析文件夹路径&#xff0c;并将空格作为路径的一部分进…

ADB自动化测试框架

一、介绍 adb的全称为Android Debug Bridge&#xff0c;就是起到调试桥的作用&#xff0c;利用adb工具的前提是在手机上打开usb调试&#xff0c;然后通过数据线连接电脑。在电脑上使用命令模式来操作手机&#xff1a;重启、进入recovery、进入fastboot、推送文件功能等。简单来…

Intellij IDEA 初学入门图文教程(八) —— IDEA 在提交代码时 Performing Code Analysis 卡死

在使用 IDEA 开发过程中&#xff0c;提交代码时常常会在碰到代码中的 JS 文件时卡死&#xff0c;进度框上显示 Performing Code Analysis&#xff0c;如图&#xff1a; 原因是 IDEA 工具默认提交代码时&#xff0c;分析代码功能是打开的&#xff0c;需要通过配置关闭下就可以了…

Linux高性能网络编程:TCP底层的收发过程

今天探索高性能网络编程&#xff0c;但是我觉得在谈系统API之前可以先讲一些Linux底层的收发包过程&#xff0c;如下这是一个简单的socket编程代码&#xff1a; int main() {... fd socket(AF_INET, SOCKET_STREAM, 0);bind(fd, ...);listen(fd, ...);// 如何建立连接...afd …

冒泡排序法(优化与实例演示)

冒泡排序法 冒泡排序法基本介绍 冒泡排序是一种简单而经典的排序算法&#xff0c;它的原理是通过不断比较相邻元素的大小并交换位置&#xff0c;将较大&#xff08;或较小&#xff09;的元素逐渐“冒泡”到数组的末尾。这个过程持续进行多轮&#xff0c;直到整个数组按照顺序…

【Zabbix 6.0 监控系统安装和部署】

目录 一、Zabbix 介绍1、zabbix 是什么&#xff1f;2、zabbix 监控原理&#xff08;重点&#xff09;3、Zabbix 6.0 新特性4、Zabbix 6.0 功能组件1、Zabbix Server2、数据库3、Web 界面4、Zabbix Agent5、Zabbix Proxy6、Java Gateway 二、Zabbix 6.0 部署1、部署 zabbix 服务…

idea goland 插件 struct to struct

go-struct-to-struct idea goland 插件。实现自动生成 struct 间 转换代码。 https://plugins.jetbrains.com/plugin/22196-struct-to-struct/ IntelliJ plugin that Automatically generate two struct transformations through function declarations Usage define func …

【怎么实现多组输入之EOF】

C语言怎么实现多组输入之EOF C语言之EOF介绍1、什么是EOF&#xff1f;2、EOF的用法3、EOF的扩展3.1、scanf返回值之EOF3.2、scanf函数的返回值有以下几种情况 4、如何是实现多组输入&#xff1f;4.1、多组输入---- 常规写法例程14.2、多组输入---- 实现多组输入的打印例程24.3、…

不想被卷的程序员们,应该学什么?

我真的好像感慨一下&#xff0c;这个世界真的给计算机应届生留活路了吗&#xff1f; 看着周围的同学&#xff0c;打算搞前端、JAVA、C、C的&#xff0c;一个两个去跑去应聘。你以为是00后整治职场&#xff1f; 真相是主打一个卑微&#xff1a;现阶段以学习为主&#xff08;工…

探寻日本区块链游戏的未来潜力

日本的区块链游戏 日本是全球范围内游戏市场人均利润最高的国家之一。其中&#xff0c;《My Crypto Heroes》的首次公售金额达到了 16,000 ETH。 关键要点&#xff1a; 日本具有强大的游戏基础&#xff0c;使其成为加密游戏发展的理想地区。 日本流行的加密货币游戏包括《My…

Python中jsonpath库使用,及与xpath语法区别

jsonpath库使用 pip install jsonpath 基本语法 JSONPath语法元素和对应XPath元素的对比

Work20230705

//main.c #include "uart4.h" extern void printf(const char *fmt, ...); void delay_ms(int ms) {int i,j;for(i 0; i < ms;i)for (j 0; j < 1800; j); }int main() {while(1){//将获取到的字符1发送到终端//hal_put_char(hal_get_char()1);hal_put_string…

POSTGRESQL SQL 执行用 IN 还是 EXISTS 还是 ANY

开头还是介绍一下群&#xff0c;如果感兴趣polardb ,mongodb ,mysql ,postgresql ,redis 等有问题&#xff0c;有需求都可以加群群内有各大数据库行业大咖&#xff0c;CTO&#xff0c;可以解决你的问题。加群请联系 liuaustin3 &#xff0c;在新加的朋友会分到3群&#xff08;共…

【后端面经-计算机基础】HTTP和TCP的区别

【后端面经-计算机基础】HTTP和TCP的区别 文章目录 【后端面经-计算机基础】HTTP和TCP的区别1. OSI七层模型和相关协议2. TCP协议2.1 特点&#xff1a;2.2 报文格式2.3 三次握手和四次挥手 3. HTTP协议3.1 特点3.2 报文格式3.2 https和http 4. HTTP vs TCP5. 面试模拟参考资料 …

全网最牛,python接口自动化测试-接口sign签名(实战撸码)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 一般公司对外的接…

groupkfold 报错:raise keyerror(f“{not foud} not in index“)

【1】使用groupkfold 的时候出现以上报错&#xff1a;索引错误&#xff0c;groups的索引和x y 的不对应 【2】源代码&#xff1a; 【3】进行修改&#xff1a; 可以成功索引&#xff01;&#xff01;&#xff01;

tomcat下上传html

html 最基本结构服务器xshelltomcat 下载是否可以访问到服务器上传html html 最基本结构 .html 后缀名 <!DOCTYPE HTML> <html><head><meta charset"utf-8"> <title>2306</title></head><body>大家好&#xff01;…

C++图形开发(7):能进行抛物线运动且触墙能反弹的小球

今天来实现一下触墙能反弹的小球、 我们之前所实现的都只是小球的上下&#xff0c;也就是y轴方向的运动&#xff08;详见&#xff1a;C图形开发&#xff08;6&#xff09;&#xff1a;落下后能弹起的小球&#xff09;&#xff0c;那么要使小球能够呈抛物线状运动&#xff0c;我…

Failed to start connector [Connector[HTTP/1.1-8080]]

1、解决Web server failed to start. Port 8080 was already in use 2、SpringBoot启动报错:“Error starting ApplicationContext. To display the conditions report re-run your application with ‘debug’ enabled.” 3、Failed to start end point associated with Proto…

015-从零搭建微服务-远程调用(一)

写在最前 如果这个项目让你有所收获&#xff0c;记得 Star 关注哦&#xff0c;这对我是非常不错的鼓励与支持。 源码地址&#xff08;后端&#xff09;&#xff1a;https://gitee.com/csps/mingyue 源码地址&#xff08;前端&#xff09;&#xff1a;https://gitee.com/csps…