8、用户行为数据同步

news2025/1/6 4:17:27

1、 数据通道

用户行为数据由Flume从Kafka直接同步到HDFS,由于离线数仓采用Hive的分区表按天统计,所以目标路径要包含一层日期。具体数据流向如下图所示。

2、 日志消费Flume配置概述

按照规划,该Flume需将Kafka中topic_log的数据发往HDFS。并且对每天产生的用户行为日志进行区分,将不同天的数据发往HDFS不同天的路径。

此处选择KafkaSource、FileChannel、HDFSSink。

关键配置如下:

3 、日志消费Flume配置实操

1)创建Flume配置文件

在hadoop104节点的Flume家目录下创建job目录,在job下创建kafka_to_hdfs_log.conf

[shuidi@hadoop104 flume]$ cd /opt/module/flume/
[shuidi@hadoop104 flume]$ mkdir job 
[shuidi@hadoop104 flume]$ vim job/kafka_to_hdfs_log.conf 

 2)配置文件内容如下

#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder

#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3)FileChannel优化

通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。

官方说明如下:

Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据

4)HDFS Sink优化

(1)HDFS存入大量小文件,有什么影响?

元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命

计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。

(2)HDFS小文件处理

官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount

基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:

 ①文件在达到128M时会滚动生成新文件

 ②文件创建超3600秒时会滚动生成新文件

5)编写Flume拦截器

(1)零点漂移问题

(2)在idea里创建名为gmall的项目

(3)在pom.xml文件中添加如下配置

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.10.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

(4)在com.atguigu.gmall.flume.interceptor包下创建TimestampInterceptor类

package com.atguigu.gmall.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

import java.util.List;
import java.util.Map;

public class TimestampInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
    //1、获取header和body的数据
    Map<String, String> headers = event.getHeaders();
    String log = new String(event.getBody(), StandardCharsets.UTF_8);

    try {
        //2、将body的数据类型转成jsonObject类型(方便获取数据)
        JSONObject jsonObject = JSONObject.parseObject(log);

        //3、header中timestamp时间字段替换成日志生成的时间戳(解决数据漂移问题)
        String ts = jsonObject.getString("ts");
        headers.put("timestamp", ts);

        return event;
    } catch (Exception e) {
        e.printStackTrace();
        return null;
    }
}

@Override
public List<Event> intercept(List<Event> list) {
    Iterator<Event> iterator = list.iterator();
    while (iterator.hasNext()) {
        Event event = iterator.next();
        if (intercept(event) == null) {
            iterator.remove();
        }
    }
    return list;
}

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimestampInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

(5)打包

(6)需要先将打好的包放入到hadoop104的/opt/module/flume/lib文件夹下面。

4、 日志消费Flume测试

1)启动Zookeeper、Kafka、HDFS

2)启动日志采集Flume

[shuidi@hadoop102 ~]$ f1.sh start

3)启动hadoop104的日志消费Flume

[shuidi@hadoop104 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf

4)生成模拟数据

[shuidi@hadoop102 ~]$ lg.sh 

5)观察HDFS是否出现数据

5、 日志消费Flume启停脚本

若上述测试通过,为方便,此处创建一个Flume的启停脚本。

1)在hadoop102节点的/home/shuidi/bin目录下创建脚本f2.sh

[shuidi@hadoop102 bin]$ vim f2.sh

 在脚本中填写如下内容。

#!/bin/bash

case $1 in
"start")
        echo " --------启动 hadoop104 日志数据flume-------"
        ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &"
;;
"stop")

        echo " --------停止 hadoop104 日志数据flume-------"
        ssh hadoop104 "ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac

2)增加脚本执行权限

[shuidi@hadoop102 bin]$ chmod 777 f2.sh

3)f2启动

[shuidi@hadoop102 bin]$ f2.sh start

4)f2停止

[shuidi@hadoop102 bin]$ f2.sh stop

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2119950.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

cmake编译MQTT-C源码

Windows端编译MQTT-C源码&#xff0c;获取mqttc库&#xff08;动态库与静态库&#xff09;&#xff0c;用于集成到Qt工程中使用mqtt订阅与发布功能。 编译源码与编译出来的mqttc动态库、静态库下载​​​​​​​​​​​​​​https://download.csdn.net/download/qq_38159549…

直播怎么录屏?录屏网页的工具有吗?推荐这3款你千万不要错过~

直播与网页录屏&#xff1a;三款必备录屏软件推荐 为什么要记录直播&#xff1f;直播可以捕捉实时发生事件&#xff0c;是真真实实的one take&#xff0c;更重要的是可以记录直播画面中的实时弹幕、评论区的互动&#xff0c;无论是激动人心的体育赛事、教育课程还是互动性强的连…

C++11 14 17 20 23进化史

C11、C14、C17、C20和C23是C语言标准的不同版本&#xff0c;它们之间在功能、特性和语法上存在一些区别。以下是对这些版本主要区别的概述&#xff1a; C11 C11是C语言的一个重要标准&#xff0c;引入了大量新特性和改进&#xff0c;使C变得更加易用和强大。主要特性包括&…

Qt工程使用MQTT-C库与mqtt服务器数据通信

实现mqtt订阅与发布话题&#xff0c;与mqtt服务器进行数据通信 编译环境&#xff1a;Qt5.15.2 vs2019 需要mqttc库&#xff1a;mqttc.lib, mqttc.dll&#xff08;根据MQTT-C源码编译出来的库&#xff0c;参考cmake编译MQTT-C源码-CSDN博客&#xff09; 一、Qt pro文件编写 …

android kotlin 基础复习 继承 inherit

1、新建文件kt 2、代码&#xff1a; /**用户基类**/ open class Person1(name:String){/**次级构造函数**/constructor(name:String,age:Int):this(name){//初始化println("-------基类次级构造函数---------")println("name:${name},age:${age}")} }/**子…

信息安全工程师(1)计算机网络分类

一、按分布范围分类 广域网&#xff08;WAN&#xff09;&#xff1a; 定义&#xff1a;广域网的任务是提供长距离通信&#xff0c;运送主机所发送的数据。其覆盖范围通常是直径为几十千米到几千千米的区域&#xff0c;因此也被称为远程网。特点&#xff1a;连接广域网的各个结点…

计算机毕业设计 财会信息管理系统 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试

&#x1f34a;作者&#xff1a;计算机编程-吉哥 &#x1f34a;简介&#xff1a;专业从事JavaWeb程序开发&#xff0c;微信小程序开发&#xff0c;定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事&#xff0c;生活就是快乐的。 &#x1f34a;心愿&#xff1a;点…

【吊打面试官系列-Redis面试题】怎么理解 Redis 事务?

大家好&#xff0c;我是锋哥。今天分享关于【怎么理解 Redis 事务&#xff1f;】面试题&#xff0c;希望对大家有帮助&#xff1b; 怎么理解 Redis 事务&#xff1f; 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 1&#xff09;事务是一个单独的隔离操作&#xff1…

【LabVIEW学习篇 - 22】:ActiveX

文章目录 ActiveXActiveX打开Windows颜色选择对话框ActiveX将浏览器嵌入到前面板 ActiveX ActiveX是微软推出的一个开放的技术集的统称&#xff0c;它是很早之前出现的OLE(object linking and Embedding)技术的扩展&#xff0c;它是基于COM(Component Object Model)技术而建立…

meta元素

1&#xff0c;meta元素有4个全局属性 charset 设置文档的编码类型&#xff0c;通常设置为utf-8 <meta charset"utf-8" /> content 配合name或者http-equiv属性使用&#xff0c;为其value、 name 元数据名称(name的值)说明application name当前页所属Web应用系…

进程替换篇

文章目录 目录 前言 1.进程替换概念 2.进程替换的原理 3.进程替换的接口 4.接口功能验证 ①execl接口演示 ②execlp接口演示 ③execle接口演示 ④execv接口验证 5.尝试写一个自己的shell【了解】 前言 你一定见过类似于这样的“黑框框”&#xff0c;这个“黑框框”其实就是…

js 请求api + 解析数据 2个例子

起因&#xff0c; 目的: 补补 js 基础。 例1&#xff0c; 请求天气 api&#xff0c; 天气数据api js 中的 await await 关键字只能在 async 函数内部使用。函数内部可以使用 await&#xff0c;但是在函数外部直接使用 await 是不允许的。 async function fetchWeatherData…

[mysql]最基本的SELECT...FROM结构

第0种&#xff1a;最基本的查询语句 SELECT 字段名&#xff0c;字段名 FROM 表名 SELECT 1&#xff1b; SELECT 11,3*2&#xff1b; FROM SELECT 11,3*2 FROM DUAL&#xff1b;#dual&#xff1a;伪表 我们可以用它来保持一个平衡 这里我们的值不需要在任何一个表里&#xf…

MyBaits的初理解

一.Mybaits的简介 Mybaits就是对JDBC的简化&#xff0c;就是对持久化的实现。 二.基础 需要导的dependencies <dependencies><!-- mybatis依赖 --><dependency><groupId>org.mybatis</groupId><artifactId>mybatis</artifactId>&l…

第 5 章多视图几何

本章讲解如何处理多个视图&#xff0c;以及如何利用多个视图的几何关系来恢复照相机位置信息和三维结构。通过在不同视点拍摄的图像&#xff0c;我们可以利用特征匹配来计算出三维场景点以及照相机位置。本章会介绍一些基本的方法&#xff0c;展示一个三维重建的完整例子&#…

获取多行文本内容,去掉首尾空格、去掉空字符串,解析为文本数组

核心代码 // 获取多行文本内容&#xff0c;去掉首尾空格、去掉空字符串&#xff0c;解析为文本数组 getMultiLineTexts(textareaValue) {return textareaValue.split("\n").map((v) > v.split("\t").join("").trim()).filter((v, i, ar) &g…

Qt | ubuntu20.04安装Qt6.5.3并创建一个example完整教程(涉及诸多开发细节,商用慎重)

点击上方"蓝字"关注我们 01、下载 >>> 下载Qt在线安装包 这里采用镜像地址进行下载,避免网络过慢。 镜像地址:http://mirrors.ustc.edu.cn/qtproject/archive/online_installers/4.5/ 选择最新版本下载,如截至目前最新版本为qt-unified-linux-x64-4.5.2…

“探索数字孪生技术:细数其在各行业的实际应用场景“

数字孪生城市是指在数字世界中创建一个同物理实体城市外观一致、行动一致、思想一致的 数字虚拟城市&#xff0c;实现对现实世界的监测、诊断、回溯、预测和决策控制&#xff0c;用于实体城市的规划、建设、 治理和优化等全生命周期管理&#xff0c;提高城市运行效率和市民居住…

内网穿透的应用-Deepin系统安装x11vnc实现任意设备无公网IP远程连接Deepin桌面

文章目录 前言1. 安装x11vnc2. 本地远程连接测试3. Deepin安装Cpolar4. 配置公网远程地址5. 公网远程连接Deepin桌面6. 固定连接公网地址7. 固定公网地址连接测试 前言 本文主要介绍在Deepin系统中安装x11vnc工具&#xff0c;并结合Cpolar内网穿透工具实现任意设备无公网IP也可…

计算机毕业设计Pyhive+Spark招聘可视化 职位薪资预测 招聘推荐系统 招聘大数据 招聘爬虫 大数据毕业设计 Hadoop Scrapy

《SparkHive招聘推荐与预测系统》开题报告 一、引言 随着互联网技术的飞速发展&#xff0c;招聘行业积累了大量的数据&#xff0c;包括职位信息、应聘者信息、企业信息等。这些数据中蕴含着丰富的价值&#xff0c;能够帮助企业和求职者更好地匹配&#xff0c;提高招聘效率。然…