07用户行为日志数据采集

news2024/11/15 3:39:56

用户行为数据由Flume从Kafka直接同步到HDFS,由于离线数仓采用Hive的分区表按天统计,所以目标路径要包含一层日期。具体数据流向如下图所示。
在这里插入图片描述
按照规划,该Flume需将Kafka中topic_log的数据发往HDFS。并且对每天产生的用户行为日志进行区分,将不同天的数据发往HDFS不同天的路径。
此处选择KafkaSource、FileChannel、HDFSSink。关键配置如下:
在这里插入图片描述

日志消费者 Flume 实操

  1. 在hadoop101 节点的Flume 的 job目录下创建 kafka_to_hdfs_log.conf,内容如下
    配置注释:

    • FileChannel优化:配置 dataDirsk可以通过逗号分隔指向多个路径,每个路径对应不同硬盘,可以增加吞吐量。
    • 新增checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据
    #定义组件
    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1
    
    #配置source1
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.batchSize = 5000
    a1.sources.r1.batchDurationMillis = 2000
    a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092
    a1.sources.r1.kafka.topics=topic_log
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.logan.gmall.flume.interceptor.TimestampInterceptor$Builder
    
    #配置channel
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
    a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
    a1.channels.c1.maxFileSize = 2146435071
    a1.channels.c1.capacity = 1000000
    a1.channels.c1.keep-alive = 6
    
    #配置sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
    a1.sinks.k1.hdfs.filePrefix = log
    a1.sinks.k1.hdfs.round = false
    
    
    a1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.rollSize = 134217728
    a1.sinks.k1.hdfs.rollCount = 0
    
    #控制输出文件类型
    a1.sinks.k1.hdfs.fileType = CompressedStream
    a1.sinks.k1.hdfs.codeC = gzip
    
    #组装 
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
  2. HDFS Sink 优化

    • HDFS存入大量小文件,有什么影响?
      元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命。
    • HDFS小文件处理。
      官方默认三个参数配置写入HDFS 后会产生小文件: hdfs.rollInterval, hdfs.rollSize, hdfs.rollCount。
      本次配置的参数为hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0。意味着文件在达到128M时会滚动生成新文件,或者文件超过 3600 秒会生成新文件。
  3. 编写 Flume 拦截器

    • 解决问题
      在这里插入图片描述
    • 在com.logan.gmall.flume.interceptor包下创建TimestampInterceptor类
    package com.logan.gmall.flume.interceptor;
    
    import com.alibaba.fastjson.JSONObject;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.StandardCharsets;
    import java.util.List;
    import java.util.Map;
    
    public class TimestampInterceptor implements Interceptor {
    
        @Override
        public void initialize() {
    
        }
    
        @Override
        public Event intercept(Event event) {
            // 获取header和body数据
            Map<String, String> headers = event.getHeaders();
            String body = new String(event.getBody(), StandardCharsets.UTF_8);
    
            // 将body转换成JsonObject类型
            JSONObject jsonObject = JSONObject.parseObject(body);
    
            // 将header中的timestamp时间转换成body中的timestamp(解决数据漂移问题)
            String ts = jsonObject.getString("ts");
            headers.put("timestamp", ts);
    
            return event;
        }
    
        @Override
        public List<Event> intercept(List<Event> list) {
            for (Event event : list) {
                intercept(event);
            }
            return list;
        }
    
        public static class Builder implements Interceptor.Builder{
    
            @Override
            public Interceptor build() {
                return new TimestampInterceptor();
            }
    
            @Override
            public void configure(Context context) {
    
            }
        }
        @Override
        public void close() {
    
        }
    }
    
    
  4. 将打好的包放入到hadoop101的/opt/module/flume/lib文件夹下

启动测试

  1. 启动 Zookeeper、Kafka 集群
  2. 启动 hadoop101 的消费Flume
[logan@hadoop101 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console
  1. 生成模拟数据[logan@hadoop101 ~]$ vim /opt/module/applog/log/app.2023-12-14.log
{"common":{"ar":"110000","ba":"vivo","ch":"oppo","is_new":"0","md":"vivo iqoo3","mid":"mid_70997","os":"Android 11.0","uid":"776","vc":"v2.1.134"},"start":{"entry":"icon","loading_time":11968,"open_ad_id":16,"open_ad_ms":7891,"open_ad_skip_ms":0},"ts":1672503309000}
{"common":{"ar":"110000","ba":"vivo","ch":"oppo","is_new":"0","md":"vivo iqoo3","mid":"mid_70997","os":"Android 11.0","uid":"776","vc":"v2.1.134"},"displays":[{"display_type":"activity","item":"2","item_type":"activity_id","order":1,"pos_id":1},{"display_type":"activity","item":"2","item_type":"activity_id","order":2,"pos_id":1},{"display_type":"query","item":"9","item_type":"sku_id","order":3,"pos_id":1},{"display_type":"query","item":"18","item_type":"sku_id","order":4,"pos_id":4},{"display_type":"promotion","item":"35","item_type":"sku_id","order":5,"pos_id":4},{"display_type":"query","item":"35","item_type":"sku_id","order":6,"pos_id":4},{"display_type":"recommend","item":"13","item_type":"sku_id","order":7,"pos_id":5}],"page":{"during_time":14287,"page_id":"home"},"ts":1672503309000}
  1. 检查HFDS是否生成数据
  2. 当 HDFS 生成数据后,增加[logan@hadoop101 bin]$ vim f2.sh
#!/bin/bash

case $1 in
"start")
        echo " --------启动 hadoop101 日志数据flume-------"
        ssh hadoop101 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &"
;;
"stop")

        echo " --------停止 hadoop101 日志数据flume-------"
        ssh hadoop101 "ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac
  1. 最终 HDFS 文件
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1311577.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

cfa一级考生复习经验分享系列(三)

从总成绩可以看出&#xff0c;位于90%水平之上&#xff0c;且置信区间全体均高于90%线。 从各科目成绩可以看出&#xff0c;所有科目均位于90%线上或高于90%线&#xff0c;其中&#xff0c;另类与衍生、公司金额、经济学、权益投资、固定收益、财报分析表现较好&#xff0c;目测…

多架构容器镜像构建实战

最近在一个国产化项目中遇到了这样一个场景&#xff0c;在同一个 Kubernetes 集群中的节点是混合架构的&#xff0c;也就是说&#xff0c;其中某些节点的 CPU 架构是 x86 的&#xff0c;而另一些节点是 ARM 的。为了让我们的镜像在这样的环境下运行&#xff0c;一种最简单的做法…

双端队列和优先级队列

文章目录 前言dequedeque底层设计迭代器设计 priority仿函数数组中的第k个最大元素优先级队列模拟实现pushpop调整仿函数存储自定义类型 前言 今天要介绍比较特殊的结构&#xff0c;双端队列。 还有一个适配器&#xff0c;优先级队列。 deque 栈的默认容器用了一个deque的东西…

案例课7——百度智能客服

1.公司介绍 百度智能客服是百度智能云推出的将AI技术赋能企业客服业务的一揽子解决方案。该方案基于百度世界先进的语音技术、自然语言理解技术、知识图谱等构建完备的一体化产品方案&#xff0c;结合各行业头部客户丰富的运营经验&#xff0c;持续深耕机场服务、电力调度等场…

【普中】基于51单片机简易计算器显示设计( proteus仿真+程序+设计报告+实物演示+讲解视频)

目录标题 &#x1f4df;1. 主要功能&#xff1a;&#x1f4df;2. 讲解视频&#xff1a;&#x1f4df;3. 设计说明书(报告)&#x1f4df;4. 仿真&#x1f4df;5. 实物烧录和现象&#x1f4df;6. 程序代码&#x1f4df;7. 设计资料内容清单 【普中开发板】基于51单片机简易计算器…

日志框架Log4j、JUL、JCL、Slf4j、Logback、Log4j2

为什么程序需要记录日志 我们不可能实时的24小时对系统进行人工监控&#xff0c;那么如果程序出现异常错误时要如何排查呢&#xff1f;并且系统在运行时做了哪些事情我们又从何得知呢&#xff1f;这个时候日志这个概念就出现了&#xff0c;日志的出现对系统监控和异常分析起着…

Jenkins 添加节点报错

报错日志 Error: A JNI error has occurred, please check your installation and try again Exception in thread "main" java.lang.UnsupportedClassVersionError: hudson/remoting/Launcher has been compiled by a more recent version of the Java Runtime (cl…

react+datav+echarts实现可视化数据大屏

&#x1f4d3;最近有点闲&#xff0c;就学习了下react&#xff0c;没想到就把react学完了&#xff0c;觉得还不错&#xff0c;就打算出一把reactdatav的简易版可视化数据大屏供大家做个参考。 &#x1f4d3;效果如下 1下载必要的框架 &#x1f4d3; react路由 npm install re…

Rancher中使用promtail+loki+grafna收集k8s日志并展示

Rancher中使用promtail+loki+grafna收集k8s日志并展示 根据应用需求和日志数量级别选择对应的日志收集、过滤和展示方式,当日志量不太大,又想简单集中管理查看日志时,可使用promtail+loki+grafna的方式。本文找那个loki和grafana外置在了k8s集群之外。 1、添加Chart Repo …

js解析.shp文件

效果图 原理与源码 本文采用的是shapefile.js工具 这里是他的npm地址 https://www.npmjs.com/package/shapefile 这是他的unpkg地址&#xff0c;可以点开查看源码 https://unpkg.com/shapefile0.6.6/dist/shapefile.js 这个最关键的核心问题是如何用这个工具&#xff0c;网上…

[开源更新] 企业级身份管理和访问管理系统、为数字身份安全赋能

一、系统简介 名称&#xff1a;JNPF权限管理系统 JNPF 权限管理系统可用于管理企业内员工账号、权限、身份认证、应用访问等&#xff0c;可整合部署在本地或云端的内部办公系统、业务系统及第三方 SaaS 系统的所有身份&#xff0c;实现一个账号打通所有应用的服务。其有如下几…

【Docker】WSL 2 上的 Docker 搭建和入门

▒ 目录 ▒ &#x1f6eb; 导读开发环境 1️⃣ 安装安装Docker Desktop for Windows 2️⃣ 环境配置3️⃣ hello world第一次运行再次运行分析总结 &#x1f4d6; 参考资料 &#x1f6eb; 导读 开发环境 版本号描述文章日期2023-12-14操作系统Win11 - 22H222621.2715WSL2 C:…

60.Sentinel源码分析

Sentinel源码分析 1.Sentinel的基本概念 Sentinel实现限流、隔离、降级、熔断等功能&#xff0c;本质要做的就是两件事情&#xff1a; 统计数据&#xff1a;统计某个资源的访问数据&#xff08;QPS、RT等信息&#xff09; 规则判断&#xff1a;判断限流规则、隔离规则、降级规…

单片机——通信协议(FPGA+c语言应用之spi协议解析篇)

引言 串行外设接口(SPI)是微控制器和外围IC&#xff08;如传感器、ADC、DAC、移位寄存器、SRAM等&#xff09;之间使用最广泛的接口之一。本文先简要说明SPI接口&#xff0c;然后介绍ADI公司支持SPI的模拟开关与多路转换器&#xff0c;以及它们如何帮助减少系统电路板设计中的数…

在接口实现类中,加不加@Override的区别

最近的软件构造实验经常需要设计接口&#xff0c;我们知道Override注解是告诉编译器&#xff0c;下面的方法是重写父类的方法&#xff0c;那么单纯实现接口的方法需不需要加Override呢&#xff1f; 定义一个类实现接口&#xff0c;使用idea时&#xff0c;声明implements之后会…

风速预测(二)基于Pytorch的EMD-LSTM模型

目录 前言 1 风速数据EMD分解与可视化 1.1 导入数据 1.2 EMD分解 2 数据集制作与预处理 2.1 先划分数据集&#xff0c;按照8&#xff1a;2划分训练集和测试集 2.2 设置滑动窗口大小为7&#xff0c;制作数据集 3 基于Pytorch的EMD-LSTM模型预测 3.1 数据加载&#xff0…

HTTP 404错误:页面未找到,如何解决

在互联网上浏览时&#xff0c;偶尔会遇到“HTTP 404错误&#xff1a;页面未找到”的提示。这通常意味着用户尝试访问的网页不存在或无法找到。本文将探讨HTTP 404错误的原因以及如何解决这个问题。 一、HTTP 404错误的原因 HTTP 404错误可能是由多种原因引起的。以下是一些常…

IDEA鼠标滚轮缩放字体大小的快捷键

IDEA 如果你想改变鼠标滚轮缩放字体大小的快捷键&#xff0c;可以按以下步骤进行操作&#xff1a; 打开 IntelliJ IDEA 编辑器。选择菜单栏的 “File” -> “Settings”。在弹出的对话框中&#xff0c;选择 File | Settings | Editor | General

IntelliJ IDEA2023学习教程

详细介绍idea开发工具及使用技巧 1. 2023版安装1.1删除老版本1.2 下载及安装 3.快捷技巧4. 创建各种model 1. 2023版安装 1.1删除老版本 如果以前装有idea需要先删除&#xff0c;以避免冲突&#xff0c;在idea安装目录/bin/Uninstall.exe双击1.2 下载及安装 最新版本 https:/…

JavaEE:多线程(1):线程是啥?怎么创建和操作?

进程的问题 本质上来说&#xff0c;进程可以解决并发编程的问题 但是有些情况下进程表现不尽如人意 1. 如果请求很多&#xff0c;需要频繁的创建和销毁进程的时候&#xff0c;此时使用多进程编程&#xff0c;系统开销就会很大 2. 一个进程刚刚启动的时候&#xff0c;需要把…