大数据项目之电商数仓、实时数仓同步数据、离线数仓同步数据、用户行为数据同步、日志消费Flume配置实操、日志消费Flume测试、日志消费Flume启停脚本

news2025/2/23 5:24:26

文章目录

  • 8. 实时数仓同步数据
  • 9. 离线数仓同步数据
    • 9.1 用户行为数据同步
      • 9.1.1 数据通道
        • 9.1.1.1 用户行为数据通道
      • 9.1.2 日志消费Flume配置概述
        • 9.1.2.1 日志消费Flume关键配置
      • 9.1.3 日志消费Flume配置实操
        • 9.1.3.1 创建Flume配置文件
        • 9.1.3.2 配置文件内容如下
          • 9.1.3.2.1 配置优化
            • 9.1.3.2.1.1 FileChannel优化
            • 9.1.3.2.1.2 HDFS Sink优化
        • 9.1.3.3 编写Flume拦截器
          • 9.1.3.3.1 数据漂移问题
          • 9.1.3.3.2 在com.summer.gmall.flume.interceptor包下创建TimestampInterceptor类
          • 9.1.3.3.3 重新打包
          • 9.1.3.3.4 需要先将打好的包放入到hadoop104的/opt/module/flume-1.9.0/lib文件夹下面
      • 9.1.4 日志消费Flume测试
        • 9.1.4.1 启动Zookeeper、Kafka集群
        • 9.1.4.2 启动日志采集Flume
        • 9.1.4.3 启动hadoop104的日志消费Flume
        • 9.1.4.4 生成模拟数据
        • 9.1.4.5 观察HDFS是否出现数据
      • 9.1.5 日志消费Flume启停脚本
        • 9.1.5.1 在hadoop102节点的/home/summer/bin目录下创建脚本f2.sh
        • 9.1.5.2 增加脚本执行权限
        • 9.1.5.3 f2启动
        • 9.1.5.4 f2停止

8. 实时数仓同步数据

  实时数仓由Flink源源不断从Kafka当中读数据计算,所以不需要手动同步数据到实时数仓。

9. 离线数仓同步数据

9.1 用户行为数据同步

9.1.1 数据通道

  用户行为数据由Flume从Kafka直接同步到HDFS,由于离线数仓采用Hive的分区表按天统计,所以目标路径要包含一层日期。具体数据流向如下图所示。

9.1.1.1 用户行为数据通道

在这里插入图片描述

9.1.2 日志消费Flume配置概述

  按照规划,该Flume需将Kafka中topic_log的数据发往HDFS。并且对每天产生的用户行为日志进行区分,将不同天的数据发往HDFS不同天的路径。
此处选择KafkaSource、FileChannel、HDFSSink。
关键配置如下:

9.1.2.1 日志消费Flume关键配置

在这里插入图片描述

9.1.3 日志消费Flume配置实操

9.1.3.1 创建Flume配置文件

在这里插入图片描述
先将里面的文件删除。

9.1.3.2 配置文件内容如下

# 定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1


# 配置source1
# 连接Kafka集群
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092  
# 消费者从topic_log读数据
a1.sources.r1.kafka.topics = topic_log 
# 给消费者设置一个唯一标识,也就是消费者组ID,建议配置和业务一样的,如果用默认的,有多个消费者,则会有的消费者可以消费到数据,有些消费者消费不了数据
a1.sources.r1.kafka.consumer.group.id = topic_log 
# 一次最大传输多少数据,数据满了就传输
a1.sources.r1.batchSize = 2000   
# 多久传输一次,时间到了就传输,不管数据到2000了没有,时间一到就开始传输数据。
a1.sources.r1.batchDurationMillis = 1000
# 拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.summer.gmall.flume.interceptor.TimestampInterceptor$Builder


# 配置channel
a1.channels.c1.type = file
# 这个是Flume的第一次备份路径
a1.channels.c1.checkpointDir = /opt/module/flume-1.9.0/checkpoint/behavior1  
# 打开二次Flume的二次备份,一般情况这个都是关闭状态,要是想打开的话改为true即可,然后配置backupCheckpointDir 的路径,这个路径要和第一次的路径放在不同的磁盘上
a1.channels.c1.useDualCheckpoints = false
# 这个是Flume的第二次备份路径
# a1.channels.c1.backupCheckpointDir = /opt/module/flume-1.9.0/checkpoint/behavior2
# 是flume的多目录存储,可以将服务器存在多个磁盘上,目前我只有一个磁盘,因此只能配置一个磁盘。和Hadoop的datadirs是一个意思。 
 a1.channels.c1.dataDirs = /opt/module/flume-1.9.0/data/behavior1  
 # file channel写入文件,这个文件最大可以容纳2G内容
 a1.channels.c1.maxFileSize = 2146435071
# 是file channel条数的限制,最大是1000000 
 a1.channels.c1.capacity = 1000000
# 等一会,如果还没有足够的空间,还是会回滚
 a1.channels.c1.keep-alive = 3

 
# 配置sink
 a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
# 如果想配置6个小时落一次盘的话,可以将round打开,将hdfs.roundValue这个参数设置为6,hdfs.roundUnit这个参数设置为hour
# 并且a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d/%h要多加一个%h
a1.sinks.k1.hdfs.round = false

# 解决数据漂移问题
# 多少秒后滚动一次
a1.sinks.k1.hdfs.rollInterval = 10
# 文件大小超过设置值后滚动一次
a1.sinks.k1.hdfs.rollSize = 134217728
# event的条数,当设置多少条后,就是多少条后滚动一次
a1.sinks.k1.hdfs.rollCount = 0

#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

在这里插入图片描述

hdfs.roundfalseShould the timestamp be rounded down (if true, affects all time based escape sequences except %t)
hdfs.roundValue1Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.
hdfs.roundUnitsecondThe unit of the round down value - second, minute or hour.

在这里插入图片描述

9.1.3.2.1 配置优化
9.1.3.2.1.1 FileChannel优化

  通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
官方说明如下:

Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据

9.1.3.2.1.2 HDFS Sink优化

1.HDFS存入大量小文件,有什么影响?
元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命
计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。
2.HDFS小文件处理
官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:
(1)文件在达到128M时会滚动生成新文件
(2)文件创建超3600秒时会滚动生成新文件

9.1.3.3 编写Flume拦截器

9.1.3.3.1 数据漂移问题

在这里插入图片描述

9.1.3.3.2 在com.summer.gmall.flume.interceptor包下创建TimestampInterceptor类

在这里插入图片描述

package com.summer.gmall.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

/**
 * @author Redamancy
 * @create 2022-10-29 19:54
 */
public class TimestampInterceptor implements Interceptor {
    @Override
    public void initialize() {
        
    }

    @Override
    public Event intercept(Event event) {
        //1 获取header和body的数据
        Map<String, String> headers = event.getHeaders();
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);
        
        //2 将body的数据类型转成jsonObject类型(方便获取数据)
        JSONObject jsonObject = JSONObject.parseObject(log);
        
        //3 header中timestamp时间字段替换成日志生产的时间戳(解决数据漂移问题)
        String ts = jsonObject.getString("ts");
        headers.put("timestamp", ts);
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {

    }
    public static class Builder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            return new TimestampInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

9.1.3.3.3 重新打包

在这里插入图片描述
先clean再package

9.1.3.3.4 需要先将打好的包放入到hadoop104的/opt/module/flume-1.9.0/lib文件夹下面

需要先在这个目录下面看看有没有这个名字的jar包,如果有则先删除rm,然后再将写的jar上传到这个目录下面,如果不先删除该jar包,则上传的jar名字后面会加个.0,jar包也不会运行起来。

在这里插入图片描述
在这里插入图片描述

9.1.4 日志消费Flume测试

9.1.4.1 启动Zookeeper、Kafka集群

在这里插入图片描述

9.1.4.2 启动日志采集Flume

[summer@hadoop102 module]$ f1.sh start

在这里插入图片描述

9.1.4.3 启动hadoop104的日志消费Flume

[summer@hadoop104 flume-1.9.0]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console

在这里插入图片描述

9.1.4.4 生成模拟数据

[summer@hadoop102 ~]$ lg.sh

在这里插入图片描述

9.1.4.5 观察HDFS是否出现数据

生成模拟数据之前
在这里插入图片描述生成模拟数据之后
在这里插入图片描述

在这里插入图片描述

9.1.5 日志消费Flume启停脚本

  若上述测试通过,为方便,此处创建一个Flume的启停脚本。

9.1.5.1 在hadoop102节点的/home/summer/bin目录下创建脚本f2.sh

[summer@hadoop102 bin]$ vim f2.sh

在脚本中填写如下内容
在这里插入图片描述

#!/bin/bash

case $1 in
"start")
        echo " --------启动 hadoop104 下游flume-------"
        ssh hadoop104 "nohup /opt/module/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume-1.9.0/conf -f /opt/module/flume-1.9.0/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &"
;;
"stop")

        echo " --------停止 hadoop104 下游flume-------"
        ssh hadoop104 "ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac

9.1.5.2 增加脚本执行权限

[summer@hadoop102 bin]$ chmod 777 f2.sh 

9.1.5.3 f2启动

[summer@hadoop102 bin]$ f2.sh start

9.1.5.4 f2停止

[summer@hadoop102 bin]$ f2.sh stop

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/28864.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Arcpy新增随机高程点、空间插值及批量制图

&#xff08;1&#xff09;在“地质调查点基础数据表.xls”中图幅范围内增加200个随机位置的高程点。构建一个shape文件&#xff0c;采用自定义工具的模式&#xff0c;参数有两个&#xff1a;一个是让用户选择excel文件&#xff0c;一个让用户指定新生成的文件名。 &#xff08…

五子棋小游戏——Java

文章目录一、内容简介&#xff1a;二、基本流程三、具体步骤1.菜单栏2.创建棋盘并初始化为空格(1)定义行数、列数为常量(2)定义棋盘(3)给棋盘添加坐标并初始化棋盘为空格3.打印棋盘4.玩家落子5.判断输赢四、代码实现五、效果展示一、内容简介&#xff1a; 五子棋小游戏是我们日…

网络工程SSM毕设项目 计算机毕业设计【源码+论文】

文章目录前言 题目1 : 基于SSM的游戏攻略资讯补丁售卖商城 <br /> 题目2 : 基于SSM的疫情期间医院门诊网站 <br /> 题目3 : 基于SSM的在线课堂学习设计与实现<br /> 题目4 : 基于SSM的大学生兼职信息系统 <br /> 题目5 : 基于SSM的大学生社团管理系统 …

2022 云原生编程挑战赛圆满收官,见证冠军战队的诞生

11 月 3 日&#xff0c;天池大赛第三届云原生编程挑战赛在杭州云栖大会圆满收官。三大赛道18大战队手历经 3 个月激烈的角逐&#xff0c;终于交上了满意的答卷&#xff0c;同时也捧回了属于他们的荣耀奖杯。 云原生编程挑战赛发起人王荣刚在开场分享中提到&#xff0c;“在阿里…

【无标题】后来,我认为王阳明比尼采,叔本华都高明

悲欣交集 ——灵遁者 虽然我是个写作者&#xff0c;但我还是希望无苦难可以诉说。可事与愿违&#xff0c;我的笔下总有忧伤&#xff0c;也许我天生忧郁。 我觉得现在比以往任何时候&#xff0c;都更能体验和接触苦难。打开新闻&#xff0c;打开抖音&#xff0c;苦难就扑面而…

SpringBoot 整合 Shiro 权限框架

目录Shiro概述Shiro介绍基本功能Shiro架构SpringBoot整合Shiro环境搭建登录、授权、角色认证实现自定义实现 RealmShiro配置类controller代码权限异常处理多个 realm 的认证策略设置会话管理获得session方式Shiro概述 Shiro介绍 Apache Shiro 是一个功能强大且易于使用的 Jav…

力扣(LeetCode)42. 接雨水(C++)

栈 明确目标——计算接雨水的总量。 可以想到一层一层的接雨水。和算法结合&#xff0c;介绍思想 &#xff1a; 遍历柱子&#xff0c;栈 stkstkstk 维护降序高度的柱子&#xff0c;如果出现升序&#xff0c;说明形成凹槽&#xff0c;计算凹槽能接的雨水&#xff0c;加入答案。…

Java强软弱虚引用和ThreadLocal工作原理(一)

一、概述 本篇文章先引入java的四种引用在android开发中的使用&#xff0c;然后结合弱引用来理解ThreadLocal的工作原理。 二、JVM名词介绍 在提出四种引用之前&#xff0c;我们先提前说一下 Java运行时数据区域 虚拟机栈 堆 垃圾回收机制 这四个概念。 2.1 java运行时数据…

freeswitch通过limit限制cps

概述 freeswitch在业务开发中有极大的便利性&#xff0c;因为fs内部实现了很多小功能&#xff0c;这些小功能组合在一起&#xff0c;通过拨号计划就可以实现很多常见的业务功能。 在voip云平台的开发中&#xff0c;我们经常会碰到资源的限制&#xff0c;有外部线路资源方面的…

Linux环境下安装并使用使用Git命令实现文件上传

⭐️前面的话⭐️ 本篇文章将介绍在Linux环境下安装Git并使用Git实现代码上传到gitee&#xff0c;上传操作的核心就是三把斧&#xff0c;一是add&#xff0c;二是commit&#xff0c;三是push&#xff0c;此外还会简单介绍一下.gitignore配置文件的作用。 &#x1f4d2;博客主页…

【broadcast-service】一个轻量级Python发布订阅者框架

本文节选至本人博客&#xff1a;https://www.blog.zeeland.cn/archives/broadcast-service-description Introduction 前两天在Python最佳实践-构建自己的第三方库文章中介绍了自己构建的一个轻量级的Python发布订阅者框架&#xff0c;今天来简单介绍一下。 项目地址&#xf…

不同的量化交易软件速度差距大吗?

哪家券商的软件交易速度快&#xff1f;那个平台有极速柜台系统&#xff1f;成为了一个热门的话题&#xff0c;我来说下我的看法。其实呢&#xff0c;大部分的主流券商速度都是差不多的&#xff0c;否则的话&#xff0c;那速度有差距大家肯定都会冲向最快的那一家了。极速柜台系…

查看mysql的版本

1. mysql --version linux下使用命令&#xff1a; mysql --version 2. mysql -V 没有连接到MySQL服务器&#xff0c;就想查看MySQL的版本。打开cmd&#xff0c;切换至mysql的bin目录&#xff0c;运行下面的命令即可&#xff1a; 2.1 mysql -V e:\mysql\bin> mysql -V mys…

k8s编程operator——client-go中的informer

文章目录1、介绍1.1 简单使用1.2 List & Watch1.3 informer简介2、store2.1 ThreadSafeMap建立索引&#xff1a;threadSafeMap源码分析&#xff1a;2.2 Indexer2.3 DeltaFIFO3、reflector3.1 Reflector的定义3.2 Reflector的创建3.3 Reflector的循环执行3.4 List操作3.5 Wa…

JAVA 之 Spring框架学习 1:Springの初体验 IOC DI 注入 案例

Spring技术是JavaEE开发必备技能&#xff0c;企业开发技术选型命中率>90% 专业角度 简化开发&#xff0c;降低企业级开发的复杂性 框架整合&#xff0c;高效整合其他技术&#xff0c;提高企业级应用开发与运行效率 1.学习Spring框架设计思想 2.学习基础操作&#xff0c;思…

数据结构之选择排序(堆排序)

选择排序 选择排序分为两种一个是堆排序 一个是简单选择排序 简单选择排序 就是从头到尾扫描一遍待排序元素找出最小的 最小的之前的数的往后一位&#xff0c;第一个空间空出来 把最小的元素存入 然后从第二个空间开始变为待排序元素 最后一个元素不用处理 代码实现 算法性…

【python】根据自定义曲线函数进行拟合

【参考】 官网 curve_fit示例与评估&#xff1a;拟合curve_fit使用矫正的R^2评估非线性模型&#xff1a;拟合评估其他&#xff1a; curve_fit()实现任意形式的曲线拟合-csdn拟合优度r^2-csdn 官网示例 拟合函数&#xff1a; f(x)ae−bxcf(x)ae^{-bx}cf(x)ae−bxc import m…

Git——IDEA集成Git(详细)

目录 一、配置Git忽略文件 1.1 为什么忽略&#xff1f; 1.2 怎么忽略&#xff1f; 二. IDEA定位Git程序&#xff08;准备环境&#xff09; 三、IDEA操作Git 3.1 初始化Git本地库、添加暂存区、提交本地库 3.2 切换版本 3.3 创建分支 3.4 切换分支 3.5 合并分支 3.5.1…

存储模块 --- Cache

Cache 高速缓冲存储器 内存一般采用SDRAM芯片&#xff0c;对内存的访问肯定是不及CPU的速度的&#xff0c;通常说内存访问要比CPU的速度慢的多。也就是说内存拖后腿了。 CPU访问内存并不是完全随机的。 在某个时间段内&#xff0c;CPU总是访问当前内存地址的相邻内存地址&…

数理统计笔记5:参数估计-区间估计

引言 数理统计笔记的第5篇先介绍了参数估计的区间估计。包括单总体均值、单总体比例、单总体方差以及两总体均值之差、两总体方差之比几种常见的情况。 引言总体均值的置信区间&#xff08;σ2\sigma^2σ2已知&#xff09;-Z法总体均值的置信区间&#xff08;σ2\sigma^2σ2未知…