1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume

news2024/11/18 21:25:53

1、数据仓库概念

数据仓库( Data Warehouse ),是为企业制定决策,提供数据支持的。可以帮助企业,改进业务流程、提高产品质量等。
数据仓库的输入数据通常包括:业务数据、用户行为数据和爬虫数据等。
业务数据:就是各行业在处理事务过程中产生的数据。比如用户在电商网站中登录、下单、支付等过程中,需要和网站后台数据库进行增删改查交互,产生的数据就是业务数据。业务数据通常存储在MySQL、Oracle等数据库中。
用户行为数据:用户在使用产品过程中,通过埋点收集与客户端产品交互过程中产生的数据,并发往日志服务器进行保存。比如页面浏览、点击、停留、评论、点赞、收藏等。用户行为数据通常存储在日志文件中。
爬虫数据:通常是通过技术手段获取其他公司网站的数据。
在这里插入图片描述

2、项目需求及架构设计

2.1 项目需求分析

1、采集平台
(1)用户行为数据采集平台
(2)业务数据采集平台

2.2 项目架构

2.2.1 技术选型

技术选型主要考虑的因素:数据量大小、业务需求、行业内经验、技术成熟度、开发维护成本、总成本预算。

  • 数据采集传输:Flume、Kafka、DataX、MaxWell
  • 数据储存:MySQL、HDFS、Hbase、Redis
  • 数据计算:Hive、Spark、Flink
  • 数据查询:Presto、ClickHouse
  • 数据可视化:Superset、Sugar
  • 任务调度:DolphinScheduler
  • 集群监控:Zabbix、Prometheus
2.2.2 系统数据流程设计

在这里插入图片描述

2.2.3 框架版本选型

(1)Apache框架版本

框架版本
Hadoop3.1.3
Zookeeper3.5.7
MySQL5.7.16
Hive3.1.2
Flume1.9.0
Kafka3.0.0
Spark3.0.0
DataX3.0.0
Superset1.3.2
DolphinScheduler2.0.3
MaxWell1.29.2
Flink1.13.0
Redis6.0.8
Hbase2.0.5
ClickHouse20.4.5.36-2
2.2.4 测试集群资源规划设计
服务名称子服务服务器(hadoop102)服务器(hadoop103)服务器(hadoop104)
HDFSNameNode
HDFSDataNode
HDFSSecondaryNameNode
YarnResourceManager
YarnNodeManager
ZookeeperZookeeper Server
Flume(采集日志)Flume
KafkaKafka
Flume(消费kafka日志)Flume
Flume(消费kafka业务)Flume
Hive
MySQLMysQL
DataX
Spark
DolphinSchedulerApiApplicationServer
DolphinSchedulerAlertServer
DolphinSchedulerMasterServer
DolphinSchedulerWorkerServer
DolphinSchedulerLoggerServer
SupersetSuperset
Flink
ClickHouse
Redis
Hbase
服务数总计201112

3、用户行为日志

3.1 用户行为日志概述

用户行为日志的内容,主要包括用户的各项行为信息以及行为所处的环境信息。收集这些信息的主要目的是优化产品和为各项分析统计指标提供数据支撑。收集这些信息的手段通常为埋点。

目前主流的埋点方式,有代码埋点(前端/后端)、可视化埋点、全埋点等。

代码埋点是通过调用埋点SDK函数,在需要埋点的业务逻辑功能位置调用接口,上报埋点数据。例如,我们对页面中的某个按钮埋点后,当这个按钮被点击时,可以在这个按钮对应的 OnClick 函数里面调用SDK提供的数据发送接口,来发送数据。

可视化埋点只需要研发人员集成采集 SDK,不需要写埋点代码,业务人员就可以通过访问分析平台的“圈选”功能,来“圈”出需要对用户行为进行捕捉的控件,并对该事件进行命名。圈选完毕后,这些配置会同步到各个用户的终端上,由采集 SDK 按照圈选的配置自动进行用户行为数据的采集和发送。

全埋点是通过在产品中嵌入SDK,前端自动采集页面上的全部用户行为事件,上报埋点数据,相当于做了一个统一的埋点。然后再通过界面配置哪些数据需要在系统里面进行分析。

3.2 用户行为日志内容

本项目收集和分析的用户行为信息主要有页面浏览记录、动作记录、曝光记录、启动记录和错误记录。

3.2.1 页面流量记录

页面浏览记录,记录的是访客对页面的浏览行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息及页面信息等。

3.2.2 动作记录

动作记录,记录的是用户的业务操作行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息 及动作目标对象信息等。

3.2.3 曝光记录

曝光记录,记录的是曝光行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息及曝光对象信息等。

3.2.4 启动记录

启动记录,记录的是用户启动应用的行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息、启动类型及开屏广告信息等。

3.2.5 错误记录

启动记录,记录的是用户在使用应用过程中的报错行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息、以及可能与报错相关的页面信息、动作信息、曝光信息和动作信息。

3.3 用户行为日志格式

我们的日志结构大致可分为两类,一是页面日志,二是启动日志。

3.3.1 页面日志

页面日志,以页面浏览为单位,即一个页面浏览记录,生成一条页面埋点日志。一条完整的页面日志包含,一个页面浏览记录,若干个用户在该页面所做的动作记录,若干个该页面的曝光记录,以及一个在该页面发生的报错记录。除上述行为信息,页面日志还包含了这些行为所处的各种环境信息,包括用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息等。

{
	"common": {                     -- 环境信息
		"ar": "230000",             -- 地区编码
		"ba": "iPhone",             -- 手机品牌
		"ch": "Appstore",           -- 渠道
		"is_new": "1",              -- 是否首日使用,首次使用的当日,该字段值为1,过了24:00,该字段置为0。
		"md": "iPhone 8",           -- 手机型号
		"mid": "YXfhjAYH6As2z9Iq",  -- 设备id
		"os": "iOS 13.2.9",         -- 操作系统
		"uid": "485",               -- 会员id
		"vc": "v2.1.134"            -- app版本号
	},
	"actions": [{                   -- 动作(事件)
		"action_id": "favor_add",   -- 动作id
		"item": "3",                -- 目标id
		"item_type": "sku_id",      -- 目标类型
		"ts": 1585744376605         -- 动作时间戳
	    }
	],
	"displays": [{                  -- 曝光
			"displayType": "query", -- 曝光类型
			"item": "3",            -- 曝光对象id
			"item_type": "sku_id",  -- 曝光对象类型
			"order": 1,             -- 出现顺序
			"pos_id": 2             -- 曝光位置
		},
		{
			"displayType": "promotion",
			"item": "6",
			"item_type": "sku_id",
			"order": 2,
			"pos_id": 1
		},
		{
			"displayType": "promotion",
			"item": "9",
			"item_type": "sku_id",
			"order": 3,
			"pos_id": 3
		},
		{
			"displayType": "recommend",
			"item": "6",
			"item_type": "sku_id",
			"order": 4,
			"pos_id": 2
		},
		{
			"displayType": "query ",
			"item": "6",
			"item_type": "sku_id",
			"order": 5,
			"pos_id": 1
		}
	],
	"page": {                          -- 页面信息
		"during_time": 7648,           -- 持续时间毫秒
		"item": "3", 	               -- 目标id
		"item_type": "sku_id",         -- 目标类型
		"last_page_id": "login",       -- 上页类型
		"page_id": "good_detail",      -- 页面ID
		"sourceType": "promotion"      -- 来源类型
	},                                 
	"err": {                           --错误
		"error_code": "1234",          --错误码
		"msg": "***********"           --错误信息
	},                                 
	"ts": 1585744374423                --跳入时间戳
}
3.3.2 启动日志

启动日志以启动为单位,及一次启动行为,生成一条启动日志。一条完整的启动日志包括一个启动记录,一个本次启动时的报错记录,以及启动时所处的环境信息,包括用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息等。

{
  "common": {
    "ar": "370000",
    "ba": "Honor",
    "ch": "wandoujia",
    "is_new": "1",
    "md": "Honor 20s",
    "mid": "eQF5boERMJFOujcp",
    "os": "Android 11.0",
    "uid": "76",
    "vc": "v2.1.134"
  },
  "start": {   
    "entry": "icon",         --icon手机图标  notice 通知   install 安装后启动
    "loading_time": 18803,  --启动加载时间
    "open_ad_id": 7,        --广告页ID
    "open_ad_ms": 3449,    -- 广告总共播放时间
    "open_ad_skip_ms": 1989   --  用户跳过广告时点
  },
"err":{                     --错误
"error_code": "1234",      --错误码
    "msg": "***********"       --错误信息
},
  "ts": 1585744304000
}

3.4 服务器和JDK准备

3.4.1 服务器准备
第1章 Hadoop运行环境搭建
1.1 模板虚拟机环境准备
0)安装模板虚拟机,IP地址192.168.10.100、主机名称hadoop100、内存4G、硬盘50G

1)hadoop100虚拟机配置要求如下(本文Linux系统全部以CentOS-7.5-x86-1804为例)
(1)使用yum安装需要虚拟机可以正常上网,yum安装前可以先测试下虚拟机联网情况
[root@hadoop100 ~]# ping www.baidu.com
PING www.baidu.com (14.215.177.39) 56(84) bytes of data.
64 bytes from 14.215.177.39 (14.215.177.39): icmp_seq=1 ttl=128 time=8.60 ms
64 bytes from 14.215.177.39 (14.215.177.39): icmp_seq=2 ttl=128 time=7.72 ms
(2)安装epel-release
注:Extra Packages for Enterprise Linux是为“红帽系”的操作系统提供额外的软件包,适用于RHEL、CentOS和Scientific Linux。相当于是一个软件仓库,大多数rpm包在官方 repository 中是找不到的)
[root@hadoop100 ~]# yum install -y epel-release
(3)注意:如果Linux安装的是最小系统版,还需要安装如下工具;如果安装的是Linux桌面标准版,不需要执行如下操作
net-tool:工具包集合,包含ifconfig等命令
[root@hadoop100 ~]# yum install -y net-tools 
vim:编辑器
[root@hadoop100 ~]# yum install -y vim
2)关闭防火墙,关闭防火墙开机自启
[root@hadoop100 ~]# systemctl stop firewalld
[root@hadoop100 ~]# systemctl disable firewalld.service
	注意:在企业开发时,通常单个服务器的防火墙时关闭的。公司整体对外会设置非常安全的防火墙
3)创建atguigu用户,并修改atguigu用户的密码
[root@hadoop100 ~]# useradd atguigu
[root@hadoop100 ~]# passwd atguigu
4)配置atguigu用户具有root权限,方便后期加sudo执行root权限的命令
[root@hadoop100 ~]# vim /etc/sudoers
修改/etc/sudoers文件,在%wheel这行下面添加一行,如下所示:
## Allow root to run any commands anywhere
root    ALL=(ALL)     ALL

## Allows people in group wheel to run all commands
%wheel  ALL=(ALL)       ALL
atguigu   ALL=(ALL)     NOPASSWD:ALL
注意:atguigu这一行不要直接放到root行下面,因为所有用户都属于wheel组,你先配置了atguigu具有免密功能,但是程序执行到%wheel行时,该功能又被覆盖回需要密码。所以atguigu要放到%wheel这行下面。
5)在/opt目录下创建文件夹,并修改所属主和所属组
(1)在/opt目录下创建module、software文件夹
[root@hadoop100 ~]# mkdir /opt/module
[root@hadoop100 ~]# mkdir /opt/software
	(2)修改module、software文件夹的所有者和所属组均为atguigu用户 
[root@hadoop100 ~]# chown atguigu:atguigu /opt/module 
[root@hadoop100 ~]# chown atguigu:atguigu /opt/software
(3)查看module、software文件夹的所有者和所属组
[root@hadoop100 ~]# cd /opt/
[root@hadoop100 opt]# ll
总用量 12
drwxr-xr-x. 2 atguigu atguigu 4096 5月  28 17:18 module
drwxr-xr-x. 2 root    root    4096 9月   7 2017 rh
drwxr-xr-x. 2 atguigu atguigu 4096 5月  28 17:18 software
6)卸载虚拟机自带的JDK
	注意:如果你的虚拟机是最小化安装不需要执行这一步。
[root@hadoop100 ~]# rpm -qa | grep -i java | xargs -n1 rpm -e --nodeps 
rpm -qa:查询所安装的所有rpm软件包
grep -i:忽略大小写
xargs -n1:表示每次只传递一个参数
rpm -e –nodeps:强制卸载软件
7)重启虚拟机
[root@hadoop100 ~]# reboot
1.2 克隆虚拟机
1)利用模板机hadoop100,克隆三台虚拟机:hadoop102、hadoop103、hadoop104
	注意:克隆时,要先关闭hadoop100
2)修改克隆机IP,以下以hadoop102举例说明
(1)修改克隆虚拟机的静态IP
[atguigu@hadoop100 ~]# sudo vim /etc/sysconfig/network-scripts/ifcfg-ens33
改成
DEVICE=ens33
TYPE=Ethernet
ONBOOT=yes
BOOTPROTO=static
NAME="ens33"
IPADDR=192.168.10.102
PREFIX=24
GATEWAY=192.168.10.2
DNS1=192.168.10.2
(2)查看Linux虚拟机的虚拟网络编辑器,编辑->虚拟网络编辑器->VMnet8


(3)查看Windows系统适配器VMware Network Adapter VMnet8的IP地址

(4)保证Linux系统ifcfg-ens33文件中IP地址、虚拟网络编辑器地址和Windows系统VM8网络IP地址相同。
3)修改克隆机主机名,以下以hadoop102举例说明
	(1)修改主机名称
[atguigu@hadoop100 ~]# sudo vim /etc/hostname
hadoop102
(2)配置Linux克隆机主机名称映射hosts文件,打开/etc/hosts
[atguigu@hadoop100 ~]# sudo vim /etc/hosts
添加如下内容
192.168.10.100 hadoop100
192.168.10.101 hadoop101
192.168.10.102 hadoop102
192.168.10.103 hadoop103
192.168.10.104 hadoop104
192.168.10.105 hadoop105
192.168.10.106 hadoop106
192.168.10.107 hadoop107
192.168.10.108 hadoop108
4)重启克隆机hadoop102 
[atguigu@hadoop100 ~]# sudo reboot
5)修改windows的主机映射文件(hosts文件)
(1)如果操作系统是window7,可以直接修改 
	①进入C:\Windows\System32\drivers\etc路径
	②打开hosts文件并添加如下内容,然后保存
192.168.10.100 hadoop100
192.168.10.101 hadoop101
192.168.10.102 hadoop102
192.168.10.103 hadoop103
192.168.10.104 hadoop104
192.168.10.105 hadoop105
192.168.10.106 hadoop106
192.168.10.107 hadoop107
192.168.10.108 hadoop108
(2)如果操作系统是window10,先拷贝出来,修改保存以后,再覆盖即可
①进入C:\Windows\System32\drivers\etc路径
②拷贝hosts文件到桌面
③打开桌面hosts文件并添加如下内容
192.168.10.100 hadoop100
192.168.10.101 hadoop101
192.168.10.102 hadoop102
192.168.10.103 hadoop103
192.168.10.104 hadoop104
192.168.10.105 hadoop105
192.168.10.106 hadoop106
192.168.10.107 hadoop107
192.168.10.108 hadoop108
④将桌面hosts文件覆盖C:\Windows\System32\drivers\etc路径hosts文件
3.4.2 编写集群分发脚本xsync

1、xsync集群分发脚本
(1)需求:循环复制文件到所有节点的相同目录下
(2)需求分析
①rsync命令原始拷贝:
rsync -av /opt/module root@hadoop103:/opt/
②期望脚本:
xsync要同步的文件名称
③说明:在/home/zhm/bin这个目录下存放的脚本,zhm用户可以在系统任何地方直接执行。
(3)脚本实现
①在用的家目录/home/zhm下创建bin文件夹
②在/home/atguigu/bin目录下创建xsync文件,以便全局调用
在该文件中编写如下代码

#!/bin/bash
#1. 判断参数个数
if [ $# -lt 1 ]
then
  echo Not Enough Arguement!
  exit;
fi
#2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
do
  echo ====================  $host  ====================
  #3. 遍历所有目录,挨个发送
  for file in $@
  do
    #4 判断文件是否存在
    if [ -e $file ]
    then
      #5. 获取父目录
      pdir=$(cd -P $(dirname $file); pwd)
      #6. 获取当前文件的名称
      fname=$(basename $file)
      ssh $host "mkdir -p $pdir"
      rsync -av $pdir/$fname $host:$pdir
    else
      echo $file does not exists!
    fi
  done
done

③修改脚本xsync具有执行权限

3.4.3 SSH无密登录配置

说明:这里面只配置了hadoop102、hadoop103到其他主机的无密登录;因为hadoop102未外配置的是NameNode,hadoop103配置的是ResourceManager,都要求对其他节点无密访问。
1、hadoop102上生成公钥和私钥:

ssh-keygen -t rsa (要cd ~/.ssh)

然后敲(三个回车),就会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)
2、将hadoop102公钥拷贝到要免密登录的目标机器上(在.ssh目录下,下面一样)

ssh-copy-id hadoop102
ssh-copy-id hadoop103
ssh-copy-id hadoop104

3、hadoop103上生成公钥和私钥:

ssh-keygen -t rsa

4、将hadoop103公钥拷贝到要免密登录的目标机器上

ssh-copy-id hadoop102
ssh-copy-id hadoop103
ssh-copy-id hadoop104
3.4.4 JDK准备

1、卸载现有JDK(3台节点)

sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps

(1)rpm -qa:表示查询所有已经安装的软件包
(2)grep -i:表示过滤时不区分大小写
(3)xargs -n1:表示一次获取上次执行结果的一个值
(4)rpm -e --nodeps:表示卸载软件
2、用XShell工具将JDK导入到hadoop102的/opt/software文件夹下面
3、解压JDK到/opt/module目录下

tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/

4、配置JDK环境变量
(1)新建/etc/profile.d/my_env.sh文件
添加如下内容,然后保存(:wq)退出

#JAVA_HOME
export JAVA_HOME=/opt/module/jdk
export PATH=$PATH:$JAVA_HOME/bin

(2)让环境变量生效

source /etc/profile.d/my_env.sh

5、测试JDK是否安装成功

java -version

6、分发JDK

xsync /opt/module/jdk

7、分发环境变量配置文件

sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.sh

8、分别在hadoop103、hadoop104上执行source

3.5 模拟数据

3.5.1 使用说明

1、将application.yml、gmall2020-mock-log-2021-10-10.jar、path.json、logback.xml上传到hadoop102的/opt/module/applog目录下

资源获取
链接:https://pan.baidu.com/s/1d_sumGVy5OxYGHT1HbB7xg
提取码:zhm6

(1)创建applog路径
(2)上传文件到/opt/module/applog目录
2、配置文件
(1)application.yml文件
可以根据需求生成对应日期的用户行为日志。

vim application.yml

修改如下内容

# 外部配置打开
logging.config: "./logback.xml"
#业务日期  注意:并不是Linux系统生成日志的日期,而是生成数据中的时间
mock.date: "2020-06-14"

#模拟数据发送模式
#mock.type: "http"
#mock.type: "kafka"
mock.type: "log"

#http模式下,发送的地址
mock.url: "http://hdp1/applog"

#kafka模式下,发送的地址
mock:
  kafka-server: "hdp1:9092,hdp2:9092,hdp3:9092"
  kafka-topic: "ODS_BASE_LOG"

#启动次数
mock.startup.count: 200
#设备最大值
mock.max.mid: 500000
#会员最大值
mock.max.uid: 100
#商品最大值
mock.max.sku-id: 35
#页面平均访问时间
mock.page.during-time-ms: 20000
#错误概率 百分比
mock.error.rate: 3
#每条日志发送延迟 ms
mock.log.sleep: 10
#商品详情来源  用户查询,商品推广,智能推荐, 促销活动
mock.detail.source-type-rate: "40:25:15:20"
#领取购物券概率
mock.if_get_coupon_rate: 75
#购物券最大id
mock.max.coupon-id: 3
#搜索关键词  
mock.search.keyword: "图书,小米,iphone11,电视,口红,ps5,苹果手机,小米盒子"

(2)path.json,该文件用来配置访问路径
(3)logback配置文件
可配置日志生成路径,修改内容如下

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property name="LOG_HOME" value="/opt/module/applog/log" />
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern>
        </rollingPolicy>
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <!-- 将某一个包下日志单独打印日志 -->
    <logger name="com.atgugu.gmall2020.mock.log.util.LogUtil"
            level="INFO" additivity="false">
        <appender-ref ref="rollingFile" />
        <appender-ref ref="console" />
    </logger>

    <root level="error"  >
        <appender-ref ref="console" />
    </root>
</configuration>

(4)生成日志

  • 进入到/opt/module/applog路径,执行以下命令
java -jar gmall2020-mock-log-2021-10-10.jar
  • 在/opt/module/applog/log目录下查看生成日志
3.5.2 集群日志生成脚本

在hadoop102的/home/atguigu目录下创建bin目录,这样脚本可以在服务器的任何目录执行。
1、在/home/atguigu/bin目录下创建脚本lg.sh
2、在脚本中编写如下内容

#!/bin/bash
for i in hadoop102 hadoop103; do
    echo "========== $i =========="
    ssh $i "cd /opt/module/applog/; java -jar gmall2020-mock-log-2021-10-10.jar >/dev/null 2>&1 &"
done 

3、修改脚本执行权限

chmod 777 lg.sh

4、将jar包及配置文件上传至hadoop103的/opt/module/applog/路径
5、启动脚本

lg.sh

6、分别在hadoop102、hadoop103的/opt/module/applog/log目录上查看生成的数据

4、用户行为数据采集模块

4.1 数据通道

用户行为日志数据通道
在这里插入图片描述

4.2 环境准备

4.2.1 集群所有进程查看脚本

1、在/home/atguigu/bin目录下创建脚本jpsall

vim jpsall

2、在脚本中编写如下内容

#! /bin/bash
 
for i in hadoop102 hadoop103 hadoop104
do
    echo --------- $i ----------
    ssh $i jps
done

3、修改脚本执行权限

chmod 777 jpsall

4、启动脚本

xcall
4.2.2 Hadoop安装
4.2.3 Zookeeper安装
4.2.4 Kafka安装
4.2.5 Flume安装

4.3 日志采集Flume

4.3.1 日志采集Flume概述

按照规划,需要采集的用户行为日志文件分布在hadoop102,hadoop103两台日志服务器,故需要在hadoop102,hadoop103两台节点配置日志采集Flume。日志采集Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到Kafka。

此处可选择TaildirSource和KafkaChannel,并配置日志校验拦截器。

选择TailDirSource和KafkaChannel的原因如下:
1、TailDirSource
TailDirSource相比ExecSource、SpoolingDirectorySource的优势
TailDirSource:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
SpoolingDirectorySource监控目录,支持断点续传。
2、KafkaChannel
采用Kafka Channel,省去了Sink,提高了效率。

日志采集Flume关键配置如下:
在这里插入图片描述

4.3.2 日志采集Flume配置实操

1、创建Flume配置文件
在hadoop102节点的Flume的job目录下创建file_to_kafka.conf

@hadoop104 flume]$ mkdir job
@hadoop104 flume]$ vim job/file_to_kafka.conf 

2、配置文件内容如下

#定义组件
a1.sources = r1
a1.channels = c1

#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = org.zhm.gmall.flume.interceptor.ETLInterceptor$Builder

#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

#组装 
a1.sources.r1.channels = c1

3、编写Flume拦截器
(1)创建Maven工程flume-interceptor
(2)创建包:com.atguigu.gmall.flume.interceptor
(3)在pom.xml文件中添加如下配置

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

(4)在org.zhm.gmall.flume.utils包下创建JSONUtil类

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONException;

public class JSONUtil {
/*
* 通过异常判断是否是json字符串
* 是:返回true  不是:返回false
* */
    public static boolean isJSONValidate(String log){
        try {
            JSONObject.parseObject(log);
            return true;
        }catch (JSONException e){
            return false;
        }
    }
}

(5)在org.zhm.gmall.flume.interceptor包下创建ETLInterceptor类

import com.atguigu.gmall.flume.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;


import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;

public class ETLInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
		
		//1、获取body当中的数据并转成字符串
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);
		//2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回null
        if (JSONUtil.isJSONValidate(log)) {
            return event;
        } else {
            return null;
        }
    }

    @Override
    public List<Event> intercept(List<Event> list) {

        Iterator<Event> iterator = list.iterator();

        while (iterator.hasNext()){
            Event next = iterator.next();
            if(intercept(next)==null){
                iterator.remove();
            }
        }

        return list;
    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new ETLInterceptor();
        }
        @Override
        public void configure(Context context) {

        }

    }

    @Override
    public void close() {

    }
}

(6)打包
在这里插入图片描述

(7)需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。

4.3.3 日志采集Flume测试

1、启动Zookeeper、Kafka集群
2、启动hadoop102的日志采集Flume

 bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console

3、启动一个Kafka的Console-Consumer

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log

4、生成模拟数据

lg.sh

5、观察Kafka消费者是否能消费到数据

4.3.4 日志·采集Flume启停脚本

1、分发日志采集Flume配置文件和拦截器
若上述测试通过,需将hadoop102节点的Flume的配置文件和拦截器jar包,向另一台日志服务器发送一份。

scp -r job hadoop103:/opt/module/flume/
[atguigu@hadoop102 flume]$ scp lib/flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar hadoop103:/opt/module/flume/lib/

2、方便起见,此处编写一个日志采集Flume进程的启停脚本
在hadoop102节点的/home/atguigu/bin目录下创建脚本f1.sh

vim f1.sh

在脚本中填写如下内容

#!/bin/bash

case $1 in
"start"){
        for i in hadoop102 hadoop103
        do
                echo " --------启动 $i 采集flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &"
        done
};; 
"stop"){
        for i in hadoop102 hadoop103
        do
                echo " --------停止 $i 采集flume-------"
                ssh $i "ps -ef | grep file_to_kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
        done

};;
esac

3、增加脚本执行权限

 chmod 777 f1.sh

4、f1启动

f1.sh start

5、f1停止

f1.sh stop

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/691934.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux——使用第三方库链接的方式——动态式

回顾上文&#xff1a; (122条消息) Linux使用第三方库链接的使用方式——静态式_橙予清的zzz~的博客-CSDN博客https://blog.csdn.net/weixin_69283129/article/details/131414804?spm1001.2014.3001.5502 上篇文章中&#xff0c;我讲到了关于链接第三方库作为静态库的使…

股票技术分析方法综述

文章目录 K线均线MACDKDJ和RSIBOLL线趋势理论、支撑位和压力位形态理论量价关系理论道氏理论波浪理论江恩理论缠论自定义指标 K线 K线的组合形态是K线技术分析中的重要部分&#xff0c;包括早晨之星、黄昏之星、红三兵、黑三兵等。 早晨之星&#xff1a;由三根K线组成&#x…

OpenGL 抗锯齿

1.简介 你可以看到&#xff0c;我们只是绘制了一个简单的立方体&#xff0c;你就能注意到它存在锯齿边缘。 可能不是非常明显&#xff0c;但如果你离近仔细观察立方体的边缘&#xff0c;你就应该能够看到锯齿状的图案。如果放大的话&#xff0c;你会看到下面的图案&#xff1a…

家校互动、班级管理系统

最近做了一款使用若依开源框架搭建的一款家校互动、班级管理的平台&#xff0c;采用uniapp作为APP端&#xff0c;原生小程序作为小程序的家长端。

软件测试的概念与过程(软件测试的历史、概念、结构、过程)

软件测试的概念与过程----软件测试的历史 软件测试的历史软件的概念软件的结构软件测试的过程 软件测试的历史 1、早期的的软件开发过程中&#xff0c;将测试“调试”&#xff0c;目的是纠正软件已经知道的故障&#xff0c;常常有开发人员自己去完成这部分工作。 2、1957年&…

使用数据集工具

一.数据集工具介绍 HuggingFace通过API提供了统一的数据集处理工具&#xff0c;它提供的数据集如下所示&#xff1a; 该界面左侧可以根据不同的任务类型、类库、语言、License等来筛选数据集&#xff0c;右侧为具体的数据集列表&#xff0c;其中有经典的glue、super_glue数据集…

Unity | HDRP高清渲染管线学习笔记:材质系统Lit着色器

目录 一、Lit着色器 1. Surface Options 2. Surface Inputs&#xff08;表面输入&#xff09; 3. Transparency Inputs 二、HDRP渲染优先级 我们可以把现实世界中的物体分成不透明物体和透明物体&#xff08;其中包括透明或者半透明&#xff09;。在实时渲染时&#xff0c…

Debian二次开发网关支持Docker+RS485+网口

随着物联网技术的不断发展&#xff0c;瑞芯微边缘计算网关作为一种集成多种接口和功能的智能网关&#xff0c;逐渐成为了物联网领域中的热门产品。本文将详细介绍瑞芯微边缘计算网关的特点和优势&#xff0c;并探讨其在实际应用中的广泛应用。 瑞芯微Linux系统边缘计算网关是一…

【Java】 Java 私有接口方法的使用

本文仅供学习参考&#xff01; 相关教程地址&#xff1a; https://www.baeldung.com/java-interface-private-methods https://www.geeksforgeeks.org/private-methods-java-9-interfaces/ https://www.runoob.com/java/java9-private-interface-methods.html 接口是定义一组方…

java之路—— SpringMVC的常用注解解析以及作用、应用

创作不易&#xff0c;真的希望能给个免费的小 文章目录 1、Controller2、RequestMapping3.GetMapping、PostMapping、PutMapping、DeleteMapping4. RequestParam5.PathVariable6.RequestHeader7.CookieValue8.RequestBody9.ResponseBody10.SessionAttribute11.ControllerAdvice…

二层、三层交换机是什么?有什么区别?

作者&#xff1a;Insist-- 个人主页&#xff1a;insist--个人主页 作者会持续更新网络知识和python基础知识&#xff0c;期待你的关注 前言 本文将讲解二层交换机和三层交换机是什么&#xff0c;以及他们的区别。 目录 一、二层交换机是什么&#xff1f; 二、二层交换机的主…

本地生活多城市合伙人系统开发

本地生活多城市合伙人项目是一种基于本地生活服务的创业项目&#xff0c;旨在为各个城市的居民提供方便、实惠、高品质的生活服务。该项目通过招募多个城市的合伙人&#xff0c;建立完整的本地生活服务平台和供应链体系&#xff0c;覆盖不同类型的本地生活服务&#xff0c;如餐…

Nginx的Rewrite(地址重定向)

目录 前言 一、Rewrite 跳转场景 二、Rewrite 跳转实现 三、Rewrite实际场景 3.1Nginx跳转需求的实现方式 3.2rewrite放在 server{}&#xff0c;if{}&#xff0c;location{}段中 3.3对域名或参数字符串 四、Rewrite正则表达式 五、Rewrite语法格式 5.1rewrite语法格式…

互联网常见架构接口压测性能分析及调优手段建议

目录 互联网常见架构接口压测性能分析及调优手段建议 1 接口名称: 获取列表 1.1 压测现象:单台tps700多&#xff0c;应用cpu高负载 1.1.1 问题分析: 1.1.2 改进措施: 1.1.3 改进效果: 1.2 压测现象&#xff1a;数据库资源利用率高 1.2.1 问题分析: 1.2.2 改进措施: 1.2.3 改…

SciencePub学术 | 计算机科学类重点SCIEEI征稿中

SciencePub学术 刊源推荐:计算机科学类重点SCIE&EI征稿中&#xff01;信息如下&#xff0c;录满为止&#xff1a; 一、期刊概况&#xff1a; 计算机科学类重点SCIE&EI 【期刊简介】IF&#xff1a;3.0-3.5&#xff0c;JCR 2区&#xff0c;中科院4区&#xff1b; 【检…

使用R绘制气泡图、带有显著性标记的热力图、渐变曲线图

大家好&#xff0c;我是带我去滑雪&#xff01; 一幅精美的科研绘图会有诸多益处&#xff0c;精美的图像可以更好地传达研究结果和数据分析的重要信息。通过使用清晰、直观和易于理解的图像&#xff0c;可以更好地向读者展示研究的发现&#xff0c;有助于读者理解和解释数据。还…

JAVA开发(记一次504 gateway timeout错误排查过程)

一、问题与背景&#xff1a; 最近在发布一个web项目&#xff0c;在测试环境都是可以的&#xff0c;发布到生产环境通过IP访问也是可以的&#xff0c;但是通过域名访问就出现504 gateway timeout。通过postman去测试接口也是一样。ip和端口都可以通&#xff0c;域名却不行&…

如何在矩池云上运行 AI 图像编辑工具 DragGAN

5 月&#xff0c;DragGAN 横空出世&#xff0c;在开源代码尚未公布前&#xff0c;就在Github上斩获近 20000 Star&#xff0c;彼时&#xff0c;页面上只有效果图和一句“Code will be released in June”&#xff0c;然而这也足够带给人们无限期待。 在6月末&#xff0c;在若干…

SpringBoot最多可以处理多少个请求?

SpringBoot最多可以处理多少个请求&#xff1f; SpringBoot夺命连环14问&#xff0c;1天刷完别人半个月的springboot面试内容&#xff0c;比啃书效果好多了&#xff01;_哔哩哔哩_bilibili 最小线程数&#xff1a;最少的厨师的量&#xff0c;饭店人不多的时候的量。 最大线程数…

微信可以自动跟圈的软件有吗?

对于许多人而言&#xff0c;每日发朋友圈已成为必要的任务之一&#xff0c;如同上学时老师检查作业般。但发圈何尝不像是写作业一样呢&#xff1f;有许多小号需发几十上百条朋友圈&#xff0c;令人感到枯燥、浪费时间。再加上我们还有其他事务要处理&#xff0c;例如服装店老板…