8、离线数仓同步数据

news2025/1/16 3:57:20

1、 用户行为数据同步

1.1、 数据通道

用户行为数据由Flume从Kafka直接同步到HDFS,由于离线数仓采用Hive的分区表按天统计,所以目标路径要包含一层日期。具体数据流向如下图所示。

1.2、 日志消费Flume配置概述

按照规划,该Flume需将Kafka中topic_log的数据发往HDFS。并且对每天产生的用户行为日志进行区分,将不同天的数据发往HDFS不同天的路径。

此处选择KafkaSource、FileChannel、HDFSSink。

关键配置如下:

1.3、 日志消费Flume配置实操

1)创建Flume配置文件

在hadoop104节点的Flume的job目录下创建kafka_to_hdfs_log.conf

[shuidi@hadoop104 flume]$ vim job/kafka_to_hdfs_log.conf 

2)配置文件内容如下

#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder

#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

注:配置优化

1)FileChannel优化

通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。

官方说明如下:

Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据

2)HDFS Sink优化

(1)HDFS存入大量小文件,有什么影响?

元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命

计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。

(2)HDFS小文件处理

官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount

基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:

(1)文件在达到128M时会滚动生成新文件

(2)文件创建超3600秒时会滚动生成新文件

3)编写Flume拦截器

(1)数据漂移问题

(2)在com.atguigu.gmall.flume.interceptor包下创建TimestampInterceptor类

package com.atguigu.gmall.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class TimestampInterceptor implements Interceptor {
    

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        //1、获取header和body的数据
        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);

        //2、将body的数据类型转成jsonObject类型(方便获取数据)
        JSONObject jsonObject = JSONObject.parseObject(log);

        //3、header中timestamp时间字段替换成日志生成的时间戳(解决数据漂移问题)
        String ts = jsonObject.getString("ts");
        headers.put("timestamp", ts);

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimestampInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

 (3)重新打包

 (4)需要先将打好的包放入到hadoop104的/opt/module/flume/lib文件夹下面。

1.4、 日志消费Flume测试

1)启动Zookeeper、Kafka集群

2)启动日志采集Flume

[shuidi@hadoop102 ~]$ f1.sh start

3)启动hadoop104的日志消费Flume

[shuidi@hadoop104 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console

4)生成模拟数据

[shuidi@hadoop102 ~]$ lg.sh 

5)观察HDFS是否出现数据

1.5 、日志消费Flume启停脚本

若上述测试通过,为方便,此处创建一个Flume的启停脚本。

1)在hadoop102节点的/home/atguigu/bin目录下创建脚本f2.sh

[shuidi@hadoop102 bin]$ vim f2.sh

在脚本中填写如下内容

#!/bin/bash

case $1 in
"start")
        echo " --------启动 hadoop104 日志数据flume-------"
        ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &"
;;
"stop")

        echo " --------停止 hadoop104 日志数据flume-------"
        ssh hadoop104 "ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac

2)增加脚本执行权限

[shuidi@hadoop102 bin]$ chmod 777 f2.sh

3)f2启动

[shuidi@hadoop102 module]$ f2.sh start

4)f2停止

[shuidi@hadoop102 module]$ f2.sh stop

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/724321.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

案例挑战——事务传播行为

事务的传播行为 一、背景介绍二、 什么是事物的传播行为常见的事物传播行为mandatorysupportsrequiredrequired_newnestednot supportnever 三、为什么要有事务的传播行为&#xff0c;它是为了解决什么问题&#xff1f;四、如何使用事务的传播行为外围方法没有事务&#xff0c;…

audio标签如何去掉controls属性上的下载和倍速

<audioref"audioPlayer":src"data.url"controlstimeupdate"updateProgress" ></audio> controlslist 属性将帮助浏览器选择在媒体元素上显示的控件。 <audioref"audioPlayer":src"data.url"controlscontrol…

【Hello mysql】 数据库表操作

Mysql专栏&#xff1a;Mysql 本篇博客简介&#xff1a;介绍数据库的表操作 数据库表操作 创建表查看表结构修改表添加列修改列删除列修改表名修改列名 删除表总结 创建表 语法 CREATE TABLE table_name ( field1 datatype, field2 datatype, field3 datatype ) character set…

Facebook 商务管理:成为 Facebook 业务经理大师的关键

Facebook 商务管理&#xff1a;成为 Facebook 业务经理大师的关键 尽管社交媒体行业有许多冉冉升起的新星&#xff0c;但Facebook仍然是不败的冠军。Facebook每月活跃用户超过2.85亿&#xff0c;在受欢迎程度方面遥遥领先于同行&#xff0c;它无疑是您业务的绝佳免费营销工具。…

一亿港元的“入场费”?香港合规门槛太高,加密从业者仍选择观望!

6月1日&#xff0c;备受瞩目的香港加密新规正式生效&#xff0c;但靴子落地&#xff0c;市场预期却不及以往&#xff0c;想象中人声鼎沸的讨论并未出现。尽管蔓延的熊市仍负主要责任&#xff0c;但仍有很多因素使从业者观望态度。 合规门槛太高&#xff1f; 纵观圈内&#xff0…

Python编程实现针对回撤的交易策略

在金融交易市场上&#xff0c;回撤是一个常见的现象。因此&#xff0c;对于投资者来说&#xff0c;研究和设计针对回撤的交易策略是非常必要的。本文将介绍如何使用Python编程实现针对回撤的交易策略&#xff0c;以帮助投资者更好地进行交易。 一、回撤分析 在设计针对回撤的…

智能导航:独家互联网网站推荐指南

在数字化时代&#xff0c;人们对于影视娱乐的需求日益增长。而随着高速互联网的普及和技术的进步&#xff0c;极速冲浪成为了探索各种精彩影视作品的主流方式。众多影视网站应运而生&#xff0c;为我们提供了丰富多样的内容&#xff0c;并以便捷的方式满足我们对于电影、剧集和…

带你了解Zabbix的基础概念、Zabbix部署

Zabbix的基础概念 一、Zabbix的概念1、监控软件的作用2、zabbix 是什么&#xff1f;3、zabbix 监控原理 二、Zabbix&#xff08;6.0&#xff09;新特性1、Zabbix&#xff08;6.0&#xff09;的功能2、Zabbix 6.0 功能组件 三、实验&#xff08;部署 zabbix 6.0&#xff09;1、Z…

青岛大学_王卓老师【数据结构与算法】Week03_12_线性表的链式表示和实现12_学习笔记

本文是个人学习笔记&#xff0c;素材来自青岛大学王卓老师的教学视频。 一方面用于学习记录与分享&#xff0c;另一方面是想让更多的人看到这么好的《数据结构与算法》的学习视频。 如有侵权&#xff0c;请留言作删文处理。 课程视频链接&#xff1a; 数据结构与算法基础–…

软件确认测试的依据有哪些?

软件测试是软件开发过程中不可或缺的一环&#xff0c;而软件确认测试则是其中一个重要的阶段。软件确认测试&#xff0c;又称用户验收测试&#xff0c;是软件开发生命周期中的最后一个阶段。它旨在确认软件是否满足用户的需求并符合预期的功能。确认测试侧重于用户的角度&#…

python接口自动化(十六)--参数关联接口后传(详解)

简介 大家对前边的自动化新建任务之后&#xff0c;接着对这个新建任务操作了解之后&#xff0c;希望带小伙伴进一步巩固胜利的果实&#xff0c;夯实基础。因此再在沙场实例演练一下博客园的相关接口。我们用自动化发随笔之后&#xff0c;要想接着对这篇随笔操作&#xff0c;不用…

一文读懂FPC(14)- FPC的挠曲性

FPC系列文章目录 1.什么是FPC 2.什么是R-FPC 3&#xff0c;FPC的基材 4.FPC基材压延铜和电解铜的区别 5&#xff0c;FPC的辅材 6&#xff0c;FPC常见的四种类型 7&#xff0c;FPC的生产流程简介 8&#xff0c;R-FPC的生产流程简介 9&#xff0c;FPC的发展及应用 10&a…

智安网络|新型恶意软件攻击:持续威胁网络安全

当今数字化时代&#xff0c;恶意软件已经成为网络安全领域中的一项巨大威胁。随着技术的不断进步&#xff0c;恶意软件的攻击方式也在不断演变和发展。 以下是一些目前比较常见的新型恶意软件攻击&#xff1a; **1.勒索软件&#xff1a;**勒索软件是一种恶意软件&#xff0c;它…

Flutter基础布局

Column:纵向布局 Column相当于Android原生的LinearLayout线性布局。 主要代码&#xff1a; class MyHomePage extends StatelessWidget {const MyHomePage({Key? key}) : super(key: key);overrideWidget build(BuildContext context) {return Container(width: double.infi…

Kotlin单例模式的一种懒汉模式写法

Kotlin单例模式的一种懒汉模式写法 class MyHelpler {companion object {private val singleHelpler by lazy(mode LazyThreadSafetyMode.SYNCHRONIZED) { MyHelpler() }fun instance() singleHelpler}fun sayHi() {println("fly")} }fun main(args: Array<Stri…

Java代码混淆技术学习

1. ClassFinal 1.1 创建springboot项目 不做过多演示 spring boot版本2.7.8 1.2 maven引入 <plugin><!-- https://gitee.com/roseboy/classfinal --><groupId>net.roseboy</groupId><artifactId>classfinal-maven-plugin</artifactId>&…

【C#】并行编程实战:使用 PLINQ(1)

PLINQ 是语言集成查询&#xff08;Language Integrate Query , LINQ&#xff09;的并行实现&#xff08;P 表示并行&#xff09;。本章将介绍其编程的各个方面以及与之相关的一些优缺点。 PLINQ 介绍 | Microsoft Learn了解如何使用 .NET 中的 PLINQ 并行执行查询。 PLINQ 代表…

人体微生物分布及其与人体的共生

我们知道&#xff0c;人体的皮肤、口腔、肺部、肠道、阴道等都是微生物的栖息地&#xff0c;每个部位都有独特的微生物群组成。微生物群受到基因、饮食、环境和生活方式等多种因素的影响。 当然&#xff0c;人体微生物群的组成也会随着年龄的增长而发生变化。从婴儿期到老年阶段…

什么皮肤微生物群:它是皮肤健康的关键吗?

在我们日常的护肤和美容过程中&#xff0c;我们经常听到关于皮肤的各种话题&#xff0c;从保湿到抗衰老&#xff0c;从痘痘到过敏... 随着科学的不断进步和技术的发展&#xff0c;人们开始逐渐发现&#xff0c;皮肤上隐藏着一个神秘的世界——皮肤微生物群。它在维护我们的皮肤…

CGLIB动态代理详解分析

一、介绍 CGLIB是强大的、高性能的代码生成库&#xff0c;被广泛应用于AOP框架&#xff0c;它底层使用ASM来操作字节码生成新的类&#xff0c;为对象引入间接级别&#xff0c;以控制对象的访问。CGLIB相比于JDK动态代理更加强大&#xff0c;JDK动态代理只能对接口进行代理&…