电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume

news2025/1/11 2:52:13

1、数据仓库概念

数据仓库( Data Warehouse ),是为企业制定决策,提供数据支持的。可以帮助企业,改进业务流程、提高产品质量等。
数据仓库的输入数据通常包括:业务数据、用户行为数据和爬虫数据等。
业务数据:就是各行业在处理事务过程中产生的数据。比如用户在电商网站中登录、下单、支付等过程中,需要和网站后台数据库进行增删改查交互,产生的数据就是业务数据。业务数据通常存储在MySQL、Oracle等数据库中。
用户行为数据:用户在使用产品过程中,通过埋点收集与客户端产品交互过程中产生的数据,并发往日志服务器进行保存。比如页面浏览、点击、停留、评论、点赞、收藏等。用户行为数据通常存储在日志文件中。
爬虫数据:通常是通过技术手段获取其他公司网站的数据。
在这里插入图片描述

2、项目需求及架构设计

2.1 项目需求分析

1、采集平台
(1)用户行为数据采集平台
(2)业务数据采集平台

2.2 项目架构

2.2.1 技术选型

技术选型主要考虑的因素:数据量大小、业务需求、行业内经验、技术成熟度、开发维护成本、总成本预算。

  • 数据采集传输:Flume、Kafka、DataX、MaxWell
  • 数据储存:MySQL、HDFS、Hbase、Redis
  • 数据计算:Hive、Spark、Flink
  • 数据查询:Presto、ClickHouse
  • 数据可视化:Superset、Sugar
  • 任务调度:DolphinScheduler
  • 集群监控:Zabbix、Prometheus
2.2.2 系统数据流程设计

在这里插入图片描述

2.2.3 框架版本选型

(1)Apache框架版本

框架版本
Hadoop3.1.3
Zookeeper3.5.7
MySQL5.7.16
Hive3.1.2
Flume1.9.0
Kafka3.0.0
Spark3.0.0
DataX3.0.0
Superset1.3.2
DolphinScheduler2.0.3
MaxWell1.29.2
Flink1.13.0
Redis6.0.8
Hbase2.0.5
ClickHouse20.4.5.36-2
2.2.4 测试集群资源规划设计
服务名称子服务服务器(hadoop102)服务器(hadoop103)服务器(hadoop104)
HDFSNameNode
HDFSDataNode
HDFSSecondaryNameNode
YarnResourceManager
YarnNodeManager
ZookeeperZookeeper Server
Flume(采集日志)Flume
KafkaKafka
Flume(消费kafka日志)Flume
Flume(消费kafka业务)Flume
Hive
MySQLMysQL
DataX
Spark
DolphinSchedulerApiApplicationServer
DolphinSchedulerAlertServer
DolphinSchedulerMasterServer
DolphinSchedulerWorkerServer
DolphinSchedulerLoggerServer
SupersetSuperset
Flink
ClickHouse
Redis
Hbase
服务数总计201112

3、用户行为日志

3.1 用户行为日志概述

用户行为日志的内容,主要包括用户的各项行为信息以及行为所处的环境信息。收集这些信息的主要目的是优化产品和为各项分析统计指标提供数据支撑。收集这些信息的手段通常为埋点。

目前主流的埋点方式,有代码埋点(前端/后端)、可视化埋点、全埋点等。

代码埋点是通过调用埋点SDK函数,在需要埋点的业务逻辑功能位置调用接口,上报埋点数据。例如,我们对页面中的某个按钮埋点后,当这个按钮被点击时,可以在这个按钮对应的 OnClick 函数里面调用SDK提供的数据发送接口,来发送数据。

可视化埋点只需要研发人员集成采集 SDK,不需要写埋点代码,业务人员就可以通过访问分析平台的“圈选”功能,来“圈”出需要对用户行为进行捕捉的控件,并对该事件进行命名。圈选完毕后,这些配置会同步到各个用户的终端上,由采集 SDK 按照圈选的配置自动进行用户行为数据的采集和发送。

全埋点是通过在产品中嵌入SDK,前端自动采集页面上的全部用户行为事件,上报埋点数据,相当于做了一个统一的埋点。然后再通过界面配置哪些数据需要在系统里面进行分析。

3.2 用户行为日志内容

本项目收集和分析的用户行为信息主要有页面浏览记录、动作记录、曝光记录、启动记录和错误记录。

3.2.1 页面流量记录

页面浏览记录,记录的是访客对页面的浏览行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息及页面信息等。

3.2.2 动作记录

动作记录,记录的是用户的业务操作行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息 及动作目标对象信息等。

3.2.3 曝光记录

曝光记录,记录的是曝光行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息及曝光对象信息等。

3.2.4 启动记录

启动记录,记录的是用户启动应用的行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息、启动类型及开屏广告信息等。

3.2.5 错误记录

启动记录,记录的是用户在使用应用过程中的报错行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息、以及可能与报错相关的页面信息、动作信息、曝光信息和动作信息。

3.3 用户行为日志格式

我们的日志结构大致可分为两类,一是页面日志,二是启动日志。

3.3.1 页面日志

页面日志,以页面浏览为单位,即一个页面浏览记录,生成一条页面埋点日志。一条完整的页面日志包含,一个页面浏览记录,若干个用户在该页面所做的动作记录,若干个该页面的曝光记录,以及一个在该页面发生的报错记录。除上述行为信息,页面日志还包含了这些行为所处的各种环境信息,包括用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息等。

{
	"common": {                     -- 环境信息
		"ar": "230000",             -- 地区编码
		"ba": "iPhone",             -- 手机品牌
		"ch": "Appstore",           -- 渠道
		"is_new": "1",              -- 是否首日使用,首次使用的当日,该字段值为1,过了24:00,该字段置为0。
		"md": "iPhone 8",           -- 手机型号
		"mid": "YXfhjAYH6As2z9Iq",  -- 设备id
		"os": "iOS 13.2.9",         -- 操作系统
		"uid": "485",               -- 会员id
		"vc": "v2.1.134"            -- app版本号
	},
	"actions": [{                   -- 动作(事件)
		"action_id": "favor_add",   -- 动作id
		"item": "3",                -- 目标id
		"item_type": "sku_id",      -- 目标类型
		"ts": 1585744376605         -- 动作时间戳
	    }
	],
	"displays": [{                  -- 曝光
			"displayType": "query", -- 曝光类型
			"item": "3",            -- 曝光对象id
			"item_type": "sku_id",  -- 曝光对象类型
			"order": 1,             -- 出现顺序
			"pos_id": 2             -- 曝光位置
		},
		{
			"displayType": "promotion",
			"item": "6",
			"item_type": "sku_id",
			"order": 2,
			"pos_id": 1
		},
		{
			"displayType": "promotion",
			"item": "9",
			"item_type": "sku_id",
			"order": 3,
			"pos_id": 3
		},
		{
			"displayType": "recommend",
			"item": "6",
			"item_type": "sku_id",
			"order": 4,
			"pos_id": 2
		},
		{
			"displayType": "query ",
			"item": "6",
			"item_type": "sku_id",
			"order": 5,
			"pos_id": 1
		}
	],
	"page": {                          -- 页面信息
		"during_time": 7648,           -- 持续时间毫秒
		"item": "3", 	               -- 目标id
		"item_type": "sku_id",         -- 目标类型
		"last_page_id": "login",       -- 上页类型
		"page_id": "good_detail",      -- 页面ID
		"sourceType": "promotion"      -- 来源类型
	},                                 
	"err": {                           --错误
		"error_code": "1234",          --错误码
		"msg": "***********"           --错误信息
	},                                 
	"ts": 1585744374423                --跳入时间戳
}
3.3.2 启动日志

启动日志以启动为单位,及一次启动行为,生成一条启动日志。一条完整的启动日志包括一个启动记录,一个本次启动时的报错记录,以及启动时所处的环境信息,包括用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息等。

{
  "common": {
    "ar": "370000",
    "ba": "Honor",
    "ch": "wandoujia",
    "is_new": "1",
    "md": "Honor 20s",
    "mid": "eQF5boERMJFOujcp",
    "os": "Android 11.0",
    "uid": "76",
    "vc": "v2.1.134"
  },
  "start": {   
    "entry": "icon",         --icon手机图标  notice 通知   install 安装后启动
    "loading_time": 18803,  --启动加载时间
    "open_ad_id": 7,        --广告页ID
    "open_ad_ms": 3449,    -- 广告总共播放时间
    "open_ad_skip_ms": 1989   --  用户跳过广告时点
  },
"err":{                     --错误
"error_code": "1234",      --错误码
    "msg": "***********"       --错误信息
},
  "ts": 1585744304000
}

3.4 服务器和JDK准备

3.4.1 服务器准备
第1章 Hadoop运行环境搭建
1.1 模板虚拟机环境准备
0)安装模板虚拟机,IP地址192.168.10.100、主机名称hadoop100、内存4G、硬盘50G

1)hadoop100虚拟机配置要求如下(本文Linux系统全部以CentOS-7.5-x86-1804为例)
(1)使用yum安装需要虚拟机可以正常上网,yum安装前可以先测试下虚拟机联网情况
[root@hadoop100 ~]# ping www.baidu.com
PING www.baidu.com (14.215.177.39) 56(84) bytes of data.
64 bytes from 14.215.177.39 (14.215.177.39): icmp_seq=1 ttl=128 time=8.60 ms
64 bytes from 14.215.177.39 (14.215.177.39): icmp_seq=2 ttl=128 time=7.72 ms
(2)安装epel-release
注:Extra Packages for Enterprise Linux是为“红帽系”的操作系统提供额外的软件包,适用于RHEL、CentOS和Scientific Linux。相当于是一个软件仓库,大多数rpm包在官方 repository 中是找不到的)
[root@hadoop100 ~]# yum install -y epel-release
(3)注意:如果Linux安装的是最小系统版,还需要安装如下工具;如果安装的是Linux桌面标准版,不需要执行如下操作
net-tool:工具包集合,包含ifconfig等命令
[root@hadoop100 ~]# yum install -y net-tools 
vim:编辑器
[root@hadoop100 ~]# yum install -y vim
2)关闭防火墙,关闭防火墙开机自启
[root@hadoop100 ~]# systemctl stop firewalld
[root@hadoop100 ~]# systemctl disable firewalld.service
	注意:在企业开发时,通常单个服务器的防火墙时关闭的。公司整体对外会设置非常安全的防火墙
3)创建atguigu用户,并修改atguigu用户的密码
[root@hadoop100 ~]# useradd atguigu
[root@hadoop100 ~]# passwd atguigu
4)配置atguigu用户具有root权限,方便后期加sudo执行root权限的命令
[root@hadoop100 ~]# vim /etc/sudoers
修改/etc/sudoers文件,在%wheel这行下面添加一行,如下所示:
## Allow root to run any commands anywhere
root    ALL=(ALL)     ALL

## Allows people in group wheel to run all commands
%wheel  ALL=(ALL)       ALL
atguigu   ALL=(ALL)     NOPASSWD:ALL
注意:atguigu这一行不要直接放到root行下面,因为所有用户都属于wheel组,你先配置了atguigu具有免密功能,但是程序执行到%wheel行时,该功能又被覆盖回需要密码。所以atguigu要放到%wheel这行下面。
5)在/opt目录下创建文件夹,并修改所属主和所属组
(1)在/opt目录下创建module、software文件夹
[root@hadoop100 ~]# mkdir /opt/module
[root@hadoop100 ~]# mkdir /opt/software
	(2)修改module、software文件夹的所有者和所属组均为atguigu用户 
[root@hadoop100 ~]# chown atguigu:atguigu /opt/module 
[root@hadoop100 ~]# chown atguigu:atguigu /opt/software
(3)查看module、software文件夹的所有者和所属组
[root@hadoop100 ~]# cd /opt/
[root@hadoop100 opt]# ll
总用量 12
drwxr-xr-x. 2 atguigu atguigu 4096 5月  28 17:18 module
drwxr-xr-x. 2 root    root    4096 9月   7 2017 rh
drwxr-xr-x. 2 atguigu atguigu 4096 5月  28 17:18 software
6)卸载虚拟机自带的JDK
	注意:如果你的虚拟机是最小化安装不需要执行这一步。
[root@hadoop100 ~]# rpm -qa | grep -i java | xargs -n1 rpm -e --nodeps 
rpm -qa:查询所安装的所有rpm软件包
grep -i:忽略大小写
xargs -n1:表示每次只传递一个参数
rpm -e –nodeps:强制卸载软件
7)重启虚拟机
[root@hadoop100 ~]# reboot
1.2 克隆虚拟机
1)利用模板机hadoop100,克隆三台虚拟机:hadoop102、hadoop103、hadoop104
	注意:克隆时,要先关闭hadoop100
2)修改克隆机IP,以下以hadoop102举例说明
(1)修改克隆虚拟机的静态IP
[atguigu@hadoop100 ~]# sudo vim /etc/sysconfig/network-scripts/ifcfg-ens33
改成
DEVICE=ens33
TYPE=Ethernet
ONBOOT=yes
BOOTPROTO=static
NAME="ens33"
IPADDR=192.168.10.102
PREFIX=24
GATEWAY=192.168.10.2
DNS1=192.168.10.2
(2)查看Linux虚拟机的虚拟网络编辑器,编辑->虚拟网络编辑器->VMnet8


(3)查看Windows系统适配器VMware Network Adapter VMnet8的IP地址

(4)保证Linux系统ifcfg-ens33文件中IP地址、虚拟网络编辑器地址和Windows系统VM8网络IP地址相同。
3)修改克隆机主机名,以下以hadoop102举例说明
	(1)修改主机名称
[atguigu@hadoop100 ~]# sudo vim /etc/hostname
hadoop102
(2)配置Linux克隆机主机名称映射hosts文件,打开/etc/hosts
[atguigu@hadoop100 ~]# sudo vim /etc/hosts
添加如下内容
192.168.10.100 hadoop100
192.168.10.101 hadoop101
192.168.10.102 hadoop102
192.168.10.103 hadoop103
192.168.10.104 hadoop104
192.168.10.105 hadoop105
192.168.10.106 hadoop106
192.168.10.107 hadoop107
192.168.10.108 hadoop108
4)重启克隆机hadoop102 
[atguigu@hadoop100 ~]# sudo reboot
5)修改windows的主机映射文件(hosts文件)
(1)如果操作系统是window7,可以直接修改 
	①进入C:\Windows\System32\drivers\etc路径
	②打开hosts文件并添加如下内容,然后保存
192.168.10.100 hadoop100
192.168.10.101 hadoop101
192.168.10.102 hadoop102
192.168.10.103 hadoop103
192.168.10.104 hadoop104
192.168.10.105 hadoop105
192.168.10.106 hadoop106
192.168.10.107 hadoop107
192.168.10.108 hadoop108
(2)如果操作系统是window10,先拷贝出来,修改保存以后,再覆盖即可
①进入C:\Windows\System32\drivers\etc路径
②拷贝hosts文件到桌面
③打开桌面hosts文件并添加如下内容
192.168.10.100 hadoop100
192.168.10.101 hadoop101
192.168.10.102 hadoop102
192.168.10.103 hadoop103
192.168.10.104 hadoop104
192.168.10.105 hadoop105
192.168.10.106 hadoop106
192.168.10.107 hadoop107
192.168.10.108 hadoop108
④将桌面hosts文件覆盖C:\Windows\System32\drivers\etc路径hosts文件
3.4.2 编写集群分发脚本xsync

1、xsync集群分发脚本
(1)需求:循环复制文件到所有节点的相同目录下
(2)需求分析
①rsync命令原始拷贝:
rsync -av /opt/module root@hadoop103:/opt/
②期望脚本:
xsync要同步的文件名称
③说明:在/home/zhm/bin这个目录下存放的脚本,zhm用户可以在系统任何地方直接执行。
(3)脚本实现
①在用的家目录/home/zhm下创建bin文件夹
②在/home/atguigu/bin目录下创建xsync文件,以便全局调用
在该文件中编写如下代码

#!/bin/bash
#1. 判断参数个数
if [ $# -lt 1 ]
then
  echo Not Enough Arguement!
  exit;
fi
#2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
do
  echo ====================  $host  ====================
  #3. 遍历所有目录,挨个发送
  for file in $@
  do
    #4 判断文件是否存在
    if [ -e $file ]
    then
      #5. 获取父目录
      pdir=$(cd -P $(dirname $file); pwd)
      #6. 获取当前文件的名称
      fname=$(basename $file)
      ssh $host "mkdir -p $pdir"
      rsync -av $pdir/$fname $host:$pdir
    else
      echo $file does not exists!
    fi
  done
done

③修改脚本xsync具有执行权限

3.4.3 SSH无密登录配置

说明:这里面只配置了hadoop102、hadoop103到其他主机的无密登录;因为hadoop102未外配置的是NameNode,hadoop103配置的是ResourceManager,都要求对其他节点无密访问。
1、hadoop102上生成公钥和私钥:

ssh-keygen -t rsa (要cd ~/.ssh)

然后敲(三个回车),就会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)
2、将hadoop102公钥拷贝到要免密登录的目标机器上(在.ssh目录下,下面一样)

ssh-copy-id hadoop102
ssh-copy-id hadoop103
ssh-copy-id hadoop104

3、hadoop103上生成公钥和私钥:

ssh-keygen -t rsa

4、将hadoop103公钥拷贝到要免密登录的目标机器上

ssh-copy-id hadoop102
ssh-copy-id hadoop103
ssh-copy-id hadoop104
3.4.4 JDK准备

1、卸载现有JDK(3台节点)

sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps

(1)rpm -qa:表示查询所有已经安装的软件包
(2)grep -i:表示过滤时不区分大小写
(3)xargs -n1:表示一次获取上次执行结果的一个值
(4)rpm -e --nodeps:表示卸载软件
2、用XShell工具将JDK导入到hadoop102的/opt/software文件夹下面
3、解压JDK到/opt/module目录下

tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/

4、配置JDK环境变量
(1)新建/etc/profile.d/my_env.sh文件
添加如下内容,然后保存(:wq)退出

#JAVA_HOME
export JAVA_HOME=/opt/module/jdk
export PATH=$PATH:$JAVA_HOME/bin

(2)让环境变量生效

source /etc/profile.d/my_env.sh

5、测试JDK是否安装成功

java -version

6、分发JDK

xsync /opt/module/jdk

7、分发环境变量配置文件

sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.sh

8、分别在hadoop103、hadoop104上执行source

3.5 模拟数据

3.5.1 使用说明

1、将application.yml、gmall2020-mock-log-2021-10-10.jar、path.json、logback.xml上传到hadoop102的/opt/module/applog目录下

资源获取
链接:https://pan.baidu.com/s/1d_sumGVy5OxYGHT1HbB7xg
提取码:zhm6

(1)创建applog路径
(2)上传文件到/opt/module/applog目录
2、配置文件
(1)application.yml文件
可以根据需求生成对应日期的用户行为日志。

vim application.yml

修改如下内容

# 外部配置打开
logging.config: "./logback.xml"
#业务日期  注意:并不是Linux系统生成日志的日期,而是生成数据中的时间
mock.date: "2020-06-14"

#模拟数据发送模式
#mock.type: "http"
#mock.type: "kafka"
mock.type: "log"

#http模式下,发送的地址
mock.url: "http://hdp1/applog"

#kafka模式下,发送的地址
mock:
  kafka-server: "hdp1:9092,hdp2:9092,hdp3:9092"
  kafka-topic: "ODS_BASE_LOG"

#启动次数
mock.startup.count: 200
#设备最大值
mock.max.mid: 500000
#会员最大值
mock.max.uid: 100
#商品最大值
mock.max.sku-id: 35
#页面平均访问时间
mock.page.during-time-ms: 20000
#错误概率 百分比
mock.error.rate: 3
#每条日志发送延迟 ms
mock.log.sleep: 10
#商品详情来源  用户查询,商品推广,智能推荐, 促销活动
mock.detail.source-type-rate: "40:25:15:20"
#领取购物券概率
mock.if_get_coupon_rate: 75
#购物券最大id
mock.max.coupon-id: 3
#搜索关键词  
mock.search.keyword: "图书,小米,iphone11,电视,口红,ps5,苹果手机,小米盒子"

(2)path.json,该文件用来配置访问路径
(3)logback配置文件
可配置日志生成路径,修改内容如下

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property name="LOG_HOME" value="/opt/module/applog/log" />
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern>
        </rollingPolicy>
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <!-- 将某一个包下日志单独打印日志 -->
    <logger name="com.atgugu.gmall2020.mock.log.util.LogUtil"
            level="INFO" additivity="false">
        <appender-ref ref="rollingFile" />
        <appender-ref ref="console" />
    </logger>

    <root level="error"  >
        <appender-ref ref="console" />
    </root>
</configuration>

(4)生成日志

  • 进入到/opt/module/applog路径,执行以下命令
java -jar gmall2020-mock-log-2021-10-10.jar
  • 在/opt/module/applog/log目录下查看生成日志
3.5.2 集群日志生成脚本

在hadoop102的/home/atguigu目录下创建bin目录,这样脚本可以在服务器的任何目录执行。
1、在/home/atguigu/bin目录下创建脚本lg.sh
2、在脚本中编写如下内容

#!/bin/bash
for i in hadoop102 hadoop103; do
    echo "========== $i =========="
    ssh $i "cd /opt/module/applog/; java -jar gmall2020-mock-log-2021-10-10.jar >/dev/null 2>&1 &"
done 

3、修改脚本执行权限

chmod 777 lg.sh

4、将jar包及配置文件上传至hadoop103的/opt/module/applog/路径
5、启动脚本

lg.sh

6、分别在hadoop102、hadoop103的/opt/module/applog/log目录上查看生成的数据

4、用户行为数据采集模块

4.1 数据通道

用户行为日志数据通道
在这里插入图片描述

4.2 环境准备

4.2.1 集群所有进程查看脚本

1、在/home/atguigu/bin目录下创建脚本jpsall

vim jpsall

2、在脚本中编写如下内容

#! /bin/bash
 
for i in hadoop102 hadoop103 hadoop104
do
    echo --------- $i ----------
    ssh $i jps
done

3、修改脚本执行权限

chmod 777 jpsall

4、启动脚本

xcall
4.2.2 Hadoop安装
4.2.3 Zookeeper安装
4.2.4 Kafka安装
4.2.5 Flume安装

4.3 日志采集Flume

4.3.1 日志采集Flume概述

按照规划,需要采集的用户行为日志文件分布在hadoop102,hadoop103两台日志服务器,故需要在hadoop102,hadoop103两台节点配置日志采集Flume。日志采集Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到Kafka。

此处可选择TaildirSource和KafkaChannel,并配置日志校验拦截器。

选择TailDirSource和KafkaChannel的原因如下:
1、TailDirSource
TailDirSource相比ExecSource、SpoolingDirectorySource的优势
TailDirSource:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
SpoolingDirectorySource监控目录,支持断点续传。
2、KafkaChannel
采用Kafka Channel,省去了Sink,提高了效率。

日志采集Flume关键配置如下:
在这里插入图片描述

4.3.2 日志采集Flume配置实操

1、创建Flume配置文件
在hadoop102节点的Flume的job目录下创建file_to_kafka.conf

@hadoop104 flume]$ mkdir job
@hadoop104 flume]$ vim job/file_to_kafka.conf 

2、配置文件内容如下

#定义组件
a1.sources = r1
a1.channels = c1

#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = org.zhm.gmall.flume.interceptor.ETLInterceptor$Builder

#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

#组装 
a1.sources.r1.channels = c1

3、编写Flume拦截器
(1)创建Maven工程flume-interceptor
(2)创建包:com.atguigu.gmall.flume.interceptor
(3)在pom.xml文件中添加如下配置

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

(4)在org.zhm.gmall.flume.utils包下创建JSONUtil类

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONException;

public class JSONUtil {
/*
* 通过异常判断是否是json字符串
* 是:返回true  不是:返回false
* */
    public static boolean isJSONValidate(String log){
        try {
            JSONObject.parseObject(log);
            return true;
        }catch (JSONException e){
            return false;
        }
    }
}

(5)在org.zhm.gmall.flume.interceptor包下创建ETLInterceptor类

import com.atguigu.gmall.flume.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;


import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;

public class ETLInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
		
		//1、获取body当中的数据并转成字符串
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);
		//2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回null
        if (JSONUtil.isJSONValidate(log)) {
            return event;
        } else {
            return null;
        }
    }

    @Override
    public List<Event> intercept(List<Event> list) {

        Iterator<Event> iterator = list.iterator();

        while (iterator.hasNext()){
            Event next = iterator.next();
            if(intercept(next)==null){
                iterator.remove();
            }
        }

        return list;
    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new ETLInterceptor();
        }
        @Override
        public void configure(Context context) {

        }

    }

    @Override
    public void close() {

    }
}

(6)打包
在这里插入图片描述

(7)需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。

4.3.3 日志采集Flume测试

1、启动Zookeeper、Kafka集群
2、启动hadoop102的日志采集Flume

 bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console

3、启动一个Kafka的Console-Consumer

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log

4、生成模拟数据

lg.sh

5、观察Kafka消费者是否能消费到数据

4.3.4 日志·采集Flume启停脚本

1、分发日志采集Flume配置文件和拦截器
若上述测试通过,需将hadoop102节点的Flume的配置文件和拦截器jar包,向另一台日志服务器发送一份。

scp -r job hadoop103:/opt/module/flume/
[atguigu@hadoop102 flume]$ scp lib/flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar hadoop103:/opt/module/flume/lib/

2、方便起见,此处编写一个日志采集Flume进程的启停脚本
在hadoop102节点的/home/atguigu/bin目录下创建脚本f1.sh

vim f1.sh

在脚本中填写如下内容

#!/bin/bash

case $1 in
"start"){
        for i in hadoop102 hadoop103
        do
                echo " --------启动 $i 采集flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &"
        done
};; 
"stop"){
        for i in hadoop102 hadoop103
        do
                echo " --------停止 $i 采集flume-------"
                ssh $i "ps -ef | grep file_to_kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
        done

};;
esac

3、增加脚本执行权限

 chmod 777 f1.sh

4、f1启动

f1.sh start

5、f1停止

f1.sh stop

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/667966.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux信号概念、认识、处理动作 ( 2 ) -【Linux通信架构系列 】

系列文章目录 C技能系列 Linux通信架构系列 C高性能优化编程系列 深入理解软件架构设计系列 高级C并发线程编程 期待你的关注哦&#xff01;&#xff01;&#xff01; 现在的一切都是为将来的梦想编织翅膀&#xff0c;让梦想在现实中展翅高飞。 Now everything is for the…

适合嵌入式开发的GUI(嵌入式学习)

嵌入式开发的GUI如何选择&#xff1f; 常见的嵌入式GUI开发方法轻量级GUI库优缺点 基于Web技术优缺点 Qt框架优缺点 原生开发优缺点 嵌入式系统的限制 常见的嵌入式GUI开发方法 嵌入式开发中的GUI&#xff08;图形用户界面&#xff09;是指在嵌入式系统中实现图形化的用户界面…

Unity核心7——2D动画

一、序列帧动画 &#xff08;一&#xff09;什么是序列帧动画 ​ 我们最常见的序列帧动画就是我们看的日本动画片&#xff0c;以固定时间间隔按序列切换图片&#xff0c;就是序列帧动画的本质 ​ 当固定时间间隔足够短时&#xff0c;我们肉眼就会认为图片是连续动态的&#…

Pandas数据处理与分析教程:从基础到实战

文章目录 前言什么是Pandas&#xff1f;Pandas的安装和导入数据结构Series&#xff08;案例1&#xff1a;创建Series&#xff09;DataFrame&#xff08;案例2&#xff1a;创建DataFrame&#xff09; 数据读取和写入从CSV文件中读取数据&#xff08;案例3&#xff1a;读取CSV文件…

一起学SF框架系列6.2-模块core-Environment

Environment是集成在容器中的抽象接口&#xff0c;它对应用程序环境的两个关键方面进行建模&#xff1a;配置文件&#xff08;profiles&#xff09;和属性&#xff08;properties&#xff09;。 配置文件&#xff08;profiles&#xff09; 配置文件为核心容器中提供了一种机制…

Redis7---单线程和多线程(一)

目录 一、几个面试题 1.Redis的单线程部分 1.2 Redis所谓的“单线程” 1.3 Redis演进变化 1.3.1 Redis 3.x 单线程时代性能很快的原因 1.3.2 Redis 4.0 之前一直采用单线程的主要原因有三个 2. Redis单线程为什么加了多线程特性 3.Redis 6/7的多线程特性和IO多路复用入…

【系统架构】第二章-计算机系统基础知识(一)

计算机硬件 1、处理器&#xff1a;CISC&#xff08;复杂指令集&#xff09;、RISC&#xff08;精简指令集&#xff09; 2、存储器&#xff1a;按照与处理器的物理距离&#xff1a;片上缓存、片外缓存、主存、外存 3、总线&#xff1a;按照总线在计算机中的位置划分&#xff1a…

大厂流出2023年最新软件测试面试题【全】

1.B/S架构和C/S架构区别 B/S 只需要有操作系统和浏览器就行&#xff0c;可以实现跨平台&#xff0c;客户端零维护&#xff0c;维护成本低&#xff0c;但是个性化能力低&#xff0c;响应速度较慢C/S响应速度快&#xff0c;安全性强&#xff0c;一般应用于局域网中&#xff0c;因…

格雷码转换电路

目录 格雷码转换电路 1、简介 1.2、格雷码转化为二进制码原理如下&#xff1a; 1.3、二进制码转化为格雷码原理如下&#xff1a; 2、实验任务 3、程序设计 3.1、格雷码转换二进制 3.2、二进制转换格雷码 4、仿真测试 5、仿真验证 格雷码转换电路 格雷码&#xff0c;…

推荐一款能够节省办公空间的显示器!

作为一名高校科研人员&#xff0c;课题组师生日常科研工作必备电子设备的维护及更新对于科研进度有着极大影响作用。近日购买了最新一代的戴尔显示器E2424HS。 以下是我的一些真实使用体验&#xff1a; 01 外观高端大气 拆箱前&#xff0c;在检查外包装没有任何破损后&#…

城市道路工程设计技术措施

为在城市道路工程建设中正确执行国家和行业有关法律、标准、规范和规程&#xff0c;提高工程建设质量&#xff0c;特制定《城市道路工程设计技术规程》&#xff08;2011年版&#xff09;专门编的。  本办法主要依据国家和行业有关法规、标准、规范和规程等&#xff0c;参照地…

Spring 是什么框架?

对于一门技术&#xff0c;我们需要从为什么要学、学什么以及怎么学这三个方向入手来学习。那在说Spring这三点之前&#xff0c;我们先看Spring之前要学什么。 Java基础、Java语法进阶、MySQL基础操作、MySQL数据库设计和多表操作、JDBC、Maven基础、MyBatis、HMLCSS、JavaScrip…

字节跳动做了3年软件测试,五月无情被辞,想给划水的兄弟提个醒

前言 先简单交代一下背景吧&#xff0c;某不知名 985 的本硕&#xff0c;20 年毕业加入字节&#xff0c;以“人员优化”的名义无情被裁员&#xff0c;之后跳槽到了有赞&#xff0c;一直从事软件测试的工作。之前没有实习经历&#xff0c;算是5年的工作经验吧。 这5年之间完成…

全网最全,Web测试点详细整理(测试场景举例+常见问题分析)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 Web测试检查表 功…

go环境安装配置(windows)

下载go 安装包 官网&#xff1a; https://go.dev/dl/ go语言中文网&#xff1a; https://studygolang.com/dl 下载压缩包&#xff0c;解压 环境变量配置 GOROOT 配置你解压的目录 在Path中追加一条 %GOROOT%\bin 这里go对比JAVA还多了一个配置&#xff1a; GOPATH 配置go以…

原生微信小程序基础-分包加载自定义组件项目全流程

小程序基础-分包加载&&自定义组件 小程序分包加载 小程序分包加载-为什么要分包加载 微信平台对小程序单个包的代码体积限制为 2M&#xff0c;超过 2M 的情况下可以采用分包来解决即使小程序代码体积没有超过 2M 时也可以拆分成多个包来实现按需加载配置文件能忽略的…

远程控制电脑软件全方位测评,远程控制工具哪家强

*本文内容以及测试数据来自“B站” 作者&#xff1a;小李student 先问大家一个问题&#xff0c;你心目中最好的远控软件是哪个? 今天我要测评几个有意思的远控软件&#xff0c;他们分别是todesk 、向日葵、RayLink、parsec、其实我还测试了微软自带的远程控制&#xff0c;但…

用 Optaplanner 实现云资源优化:建模详解

引言 装箱问题是一个经典的计算机科学优化问题,它的目标是将一堆物品尽可能地放入一些容器中,以最小化容器数量或最大化容器利用率。在实际应用中,我们可以在物流、资源调度、计算机集群等领域看到装箱问题的应用。 问题描述 在云计算领域,装箱问题同样非常重要。比如,…

Java中关于字符串常量池的详解!!!

字符串常量池 前言1. 直接赋值法&#xff0c;默认从常量池中取对象2. new一个对象3. 字符串常量池、字符串对象、内部的value引用、具体的字符数组之间的关系4. 手动入池方法&#xff1a;intern方法 前言 Java使用 “ ” 称为字符串常量&#xff0c;为了提高程序的运行速度&am…

2023最新软件测试面试题【1000道题含答案】

1、自动化代码中,用到了哪些设计模式? 单例设计模式 工厂模式PO设计模式数据驱动模式面向接口编程设计模式 2、什么是断言( Assert) ? 断言Assert用于在代码中验证实际结果是不是符合预期结果&#xff0c;如果测试用例执行失败会抛出异常并提供断言日志 3、什么是web自动化测…