[数仓]埋点数据接入

news2024/12/29 8:36:33

第40个视频的1:03:31

一、采集flume

日志服务器:将日志采集到本地,共有两个日志服务器,因此要安装两台flume,每个flume采集其所在服务器上的日志

source:taildir source

可以实时的读取文件中的数据,支持断点续传

1、flle_to_kafka.conf

文件存于:在flume目录下创建一个job目录

#定义组件
a1.sources=r1
a1.chennls=c1

#配置source
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
a1.sources.r1.filegroup.f1=/opt/module/aoolog/log/app.* 
#监控这个文件夹下以app.开头的文件,日志是按天滚动的,一天一个文件,文件格式:app.yyyy-MM-dd.log
a1.sources.r1.positionFile=/opt/module/flume/taildir_position.json   #设置断点续传位置的
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = 包名.ETLInterceptor$Builder #配置拦截器

#配置channels
a1.chennls.c1.type=org.apache.flume.channel.kafka.KafkaChannel
a1.chennls.c1.kafka.bootstrap.servers=hadoop1:9092,hadoop2:9092,hadoop3:9092 
#往哪个kafka集群写数据
a1.chennls.c1.kafka.topic=topic_log #写到哪个topic
a1.chennls.c1.parseAsFlumeEvent=false 
#为false将数据以正常格式写入kafka
#为true(默认)将数据以Flume Event(header+body)的形式写入kafka。这样读出来的时候也是Event


#组装
a1.sources.r1.channels=c1

2、拦截器

判断json串是否完整

3、启动

nohup bin/flume-ng agent -n a1 -c conf/ -f job/flle_to_kafka.conf -Dflume.root.logger=info,console 1>out 2>&1 &

二、消费flume

#定义组件
a1.channels=c1
a1.sinks=k1

#配置source
a1.sources=r1
a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers=hadoop1:9092,hadoop2:9092,hadoop3:9092
a1.sources.r1.kafka.topics=topic_log  #有多个用,分割
a1.sources.r1.kafka.consumer.group.id=topic_log
a1.sources.r1.batchSize=2000 #生产环境一般为2000
a1.sources.r1.batchDurationMillis=  
#如果2000条迟迟没到.需要时间控制,单位毫秒.1000毫秒=1S
#生产环境,看多长时间生成2000条数据(上面设置的值),1天一亿条,约1000条/s,这里可以写2s
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=包名.TimestampInterceptor$Builder

#配置channel
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/opt/module/flume/checkpoint/behaviour1
#file chennel的索引维护在内存中,会备份到磁盘中,这个备份的磁盘目录就是checkpointDir
a1.channels.c1.useDualCheckpoints=false 
#默认为false,如果设置为true,则会将checkpointDir再备份一份,担心一份不安全,可以用二次备份.需要再配置一个目录
a1.channels.c1.dataDirs=/opt/module/flume/data//behaviour1  #可以将数据存储在一个服务器的多个磁盘上
a1.channels.c1.maxFileSize=2146435071  #将数据写到文件,设置该文件的最大值,默认为2G
a1.channels.c1.capacity=1000000  #file channel 容量的设置,默认为100万条
a1.channels.c1.keep-alive=3 #默认3  
#如果已经达到file channel的容量100万条,再批写入15条,此时写入不成功,数据会回滚,重新从kafka读取,浪费了性能.
#设置keep-alive,flume会等sink写入一会,这样file channel也许就有空间了,keep-alive用于设置等待时长

#配置sink
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=/user/hive/warehouse/ods.db/ods_flow_ph/dt=%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix=log
a1.sinks.k1.hdfs.round=false  #true则下支持用配置设置滚动时间,类似crontab定时那个
a1.sinks.k1.hdfs.rollInterval=5 #需要设置成5分钟
a1.sinks.k1.hdfs.rollSize=134217720 #128M
a1.sinks.k1.hdfs.rollCount=110000 #

#控制输出类型
a1.sinks.k1.hdfs.fileType=CompressedStream
a1.sinks.k1.hdfs.codeC=lzo

#组装
a1.sources.r1.channels=c1
a1.sinks.channel=c1

  1. 时间拦截器

在采集kafka的时候数据写到kafka channel的时间为23:59:59,消费kafka source读的时候时间已经漂移了。

拦截器:复制一下

2、a1.sinks.k1.hdfs.path=/user/hive/warehouse/ods.db/ods_flow_ph/dt=%Y-%m-%d目录下的中的小文件问题

正常应该每个文件控制在128M

小文件的危害:1.namenode元数据空间不足,2每个小文件都会对应一个map task,占用大量的内存

这里讲的是用

a1.sinks.k1.hdfs.rollInterval=5 #需要设置成5分钟

a1.sinks.k1.hdfs.rollSize=134217720 #128M

a1.sinks.k1.hdfs.rollCount=110000 #

参数调节

三、fllume的kafka channel

kafka source就是消费者

kafka sink就是kafka的生产者

kafka channel 将数据以event(header+body)的形式存储,

这样读的时候,读出来的是event,

在kafka channel中设置 parseAsFlume=false则会以正常格式存储,不封装成Event,但是咱们得代码需要header,在拦截器中使用,所以不能这么设置。

用这个channel必须有kafka集群

将数据存储到kafka 的topic里,写到partition 的leader中。

2.我们需要先把数据写入kafka,因为还有实时数仓。

kafka channel的使用方式:可以是

  1. source + channel + sink

  1. source +拦截器 +kafka channel (没有sink)

  1. channel channel +sink (没有source)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/196943.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

详解Linux中网络文件系统

目录 前言 一、samba服务简介 1、windos如何共享文件 2、在linux中访问共享文件 二、samba基本信息 三、samba的安装与启用 1.服务端 2.客户端 3.服务启用 四、建立samba服务共享目录 五、samba用户的建立 六、 samba用户访问加目录 七、samba的访问控制 八、 sa…

Web(十一)

Request 1. request对象和response对象的原理 1. request和response对象是由服务器创建的。我们来使用它们 2. request对象是来获取请求消息,response对象是来设置响应消息 2. request对象继承体系结构: ServletRequest -- 接…

Kafka知识概况

Kafka知识概况Kafka简介Kafka 生产者Kafka BrokerKafka 消费者Kafka-Eagle 监控Kafka-Kraft 模式集成 SpringBootKafka简介 消息队列简介: 目 前企业中比较常见的消息队列产 品主 要有 Kafka、ActiveMQ 、RabbitMQ 、RocketMQ 等。在大数据场景主要采用 Kafka 作为…

【ElasticSearch8.X】学习笔记(一)

【ElasticSearch8.X】学习笔记一、8.x与7.x的对比二、安装elk8.x2.1、下载2.2、集群规划2.3、安装2.4、配置环境2.5、修改配置文件2.6、启动2.5、安装其他结点三、Kibana 安装3.1、下载3.2、配置环境3.2、修改配置文件3.4、启动一、8.x与7.x的对比 减少内存堆使用,…

JavaScript 进阶--charater2

系列文章目录 提示: JavaScript进阶笔记 ,希望各位看官可以高抬小手一键三连 上一章测试题 答案在最后给出 文章目录系列文章目录前言一、深入对象1.1创建对象三种方式1. 利用对象字面量创建对象2.利用new Object 创建对象3. 利用构造函数创建对象1.2 构…

设计师百度百科词条创建怎么做?

设计分为平面设计、空间设计、工业设计、珠宝设计、游戏设计、家具设计、建筑设计、室内设计、景观设计、服装设计、网页设计、系统设计、剧场设计、动漫设计、品牌设计、造型设计、三维设计师、杂志封面设计师、包装设计师、形象设计师等领域。 设计师是一个提供创意的工作&a…

Hive(8):Hive内、外部表

关键字:EXTERNAL 1 什么是内部表 内部表(Internal table)也称为被Hive拥有和管理的托管表(Managed table)。 默认情况下创建的表就是内部表,Hive拥有该表的结构和文件。换句话说,Hive完全管理…

Hue(2):Hue 的安装

1 上传解压安装包 Hue 的安装支持多种方式,包括 rpm 包的方式进行安装、tar.gz 包的方式进行安装以及 cloudera manager 的方式来进行安装等,我们这里使用 tar.gz 包的方式来进行安装。 Hue 的压缩包的下载地址: http://archive.cloudera.…

SSH远程登录RaspberryPi命令行响应缓慢问题

SSH远程登录RaspberryPi命令行响应缓慢问题1. 问题2. 分析3. 解决3.1 去掉PAM部分鉴权模块3.2 去掉sshd的DNS设置3.3 无线WiFi信号优化方法一:ifconfig操作方法二:内核自动检测4. 结果5. 补充资料5.1 [排除wifi网卡功率自管理问题](https://raspberrypi.…

LeetCode刷题---21.合并两个有序链表(双指针)

文章目录一、编程题:19. 删除链表的倒数第 N 个结点(双指针-快慢指针)1.题目描述2.示例1:3.示例2:4.示例3:5.提示:二、解题思路1.思路2.复杂度分析:3.算法图解三、代码实现总结一、编…

【Linux】调试器 gdb 及 ‘\r‘ 的使用

目录 前言 gdb 断点 打断点 查看、删除断点 断点使能 调试 显示数据 其他指令 ‘\r’的使用 行缓冲区 小程序 前言 🥑在 Linux 下我们可以通过 gcc 进行编译,但与 vs 相比若想对代码进行调试,我们还需要学会使用调试器 gdb 。 &am…

Elastic Job学习笔记

目标: 第一章:概述 1、理解任务调度的概念 2、理解分布式任务调度的概念 3、能够说出Elastic-Job是什么 第二章:Elastic-Job快速入门 1、能够搭建Elastic-Job快速入门工程环境 2、能够编写Elastic-Job快速入门的程序 3、理解Elastic-Job整体架…

Studio 3T怎么用mysql语句执行查询

目录说明说明 mongo图形界面 Studio 3T怎么执行mongo的原生语句进行查询 就先说到这\color{#008B8B}{ 就先说到这}就先说到这 在下Apollo\color{#008B8B}{在下Apollo}在下Apollo 一个爱分享Java、生活的小人物,\color{#008B8B}{一个爱分享Java、生活的小人物&…

【前端】Vue项目:旅游App-(20)home:点击跳转至带参数的动态路由

文章目录目标过程与代码详情页detailhome中设置点击跳转效果总代码修改或添加的文件router/indexdetailhome-content参考本项目博客总结:【前端】Vue项目:旅游App-博客总结 目标 点击热门精选的item跳转至对应详情页: 详情页: 路…

什么是HTTPDNS?HTTPDNS有哪些作用?

近几年来,HTTPDNS技术大火,很多大的网站都开始部署自己的HTTDNS服务器,那么什么是HTTPDNS,HTTPDNS和传统的DNS技术相比有哪些区别?HTTPDNS又有哪些作用呢?针对这些问题,本文中科三方做下简单介绍…

【学习】Linux 系统 文件权限表示

学习内容描述:Linux 系统 文件权限格式 重点知识: Linux 系统 文件权限格式是10位,格式例如:-rw-rw-rw-,表示文件所有者、所属组、其他用户都具有读和写的权限 。 (1)第0位确定文件类型 其中: …

计算机组成原理 第三章笔记

视频网址 仅仅是笔记记录,若有错误请指出。 零碎的 存储器的分类 磁表面存储器:磁盘,磁带磁芯存储器半导体存储器 RAM ROM光存储器 看下面这个思维导图 存储器的性能指标 存储容量:存储字数字长单位成本: 每位价格总成本/总容量存储速度:数…

Android Studio Linux系统模拟器启动异常

一、报错:/dev/kvm device permission denied 1、首先检查/dec/kvm的所属关系 命令:ls -al /dev/kvm 可以看到,属于root用户,组也是root用户 2、查看当前用户是否在root组 命令:grep root /etc/group 显然是没有的…

C语言学习笔记-数组

C 数组 C 语言支持数组数据结构,它可以存储一个固定大小的相同类型元素的顺序集合。数组是用来存储一系列数据,但它往往被认为是一系列相同类型的变量。 数组的声明并不是声明一个个单独的变量,比如 runoob0、runoob1、…、runoob99&#x…

Project3:Ants Vs. SomeBees

Ants Vs. SomeBees1. 前言2. Phase 1:Basic gameplay3. Phase 2:More Ants!4. Phase 3: Water and Might5. 测试结果1. 前言 本项目是 CS 61A 的第三个项目,要求是实现一个类似于植物大战僵尸的游戏,这里 Ants 就相当…