Flume系列:Flume Source使用

news2025/1/15 18:06:55

目录

Apache Hadoop生态-目录汇总-持续更新

 1:taildir source

2:kafka source

3:exec source(tail -F)

4:netcat source(采集端口)

5:spoolDir读取目录文件(文件更新不同步)


Apache Hadoop生态-目录汇总-持续更新

系统环境:centos7

Java环境:Java8

 1:taildir source

# 1:定义组件
file_flume_kafka.sources = r1
file_flume_kafka.channels = c1
file_flume_kafka.sinks = k1

# 2:定义source
file_flume_kafka.sources.r1.type = TAILDIR
file_flume_kafka.sources.r1.positionFile = /usr/local/flume-1.9.0/project_v4/tail_dir.json
file_flume_kafka.sources.r1.fileSuffix = .COMPLETED
file_flume_kafka.sources.r1.filegroups = f1
file_flume_kafka.sources.r1.filegroups.f1 = /log/app.*.log
### 多个文件夹写法
#file_flume_kafka.sources.r1.filegroups = f1 f2
#file_flume_kafka.sources.r1.filegroups.f1 = /log/app.*.log
#file_flume_kafka.sources.r1.filegroups.f2 = /log2/app.*.log.*

 ## 定义source拦截器(ETL数据清洗,判断数据是否完整)
file_flume_kafka.sources.r1.interceptors = i1
file_flume_kafka.sources.r1.interceptors.i1.type = com.wester.flume.interceptor.ETLInterceptor$Builder

# 3:定义channel
....
这里主要介绍sources顾这里省略,到channel模块查看写法

# 4:定义sink
....
这里主要介绍sources顾这里省略,到sink模块查看写法

# 5:定义关联关系
file_flume_kafka.sources.r1.channels = c1
file_flume_kafka.sinks.k1.channel = c1

2:kafka source

# 1:定义组件
kafka_flume_hdfs.sources = r1
kafka_flume_hdfs.channels = c1
kafka_flume_hdfs.sinks = k1

# 2:定义source
kafka_flume_hdfs.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
kafka_flume_hdfs.sources.r1.kafka.bootstrap.servers = 192.168.5.103:9092,192.168.5.87:9092,192.168.5.114:9092
kafka_flume_hdfs.sources.r1.kafka.topics = project_v4_topic_log
kafka_flume_hdfs.sources.r1.batchSize = 5000
kafka_flume_hdfs.sources.r1.batchDurationMillis = 2000
#从头开始消费-非实时场景常使用
kafka_flume_hdfs.sources.r1.kafka.consumer.auto.offset.reset = earliest
  ## 配置时间连接器(解决零点漂移问题)
kafka_flume_hdfs.sources.r1.interceptors = i1
kafka_flume_hdfs.sources.r1.interceptors.i1.type = com.wester.flume.interceptor.TimeStampInterceptor$Builder

# 3:定义channel
....
这里主要介绍sources顾这里省略,到channel模块查看写法

# 4:定义sink
....
这里主要介绍sources顾这里省略,到sink模块查看写法

# 5:定义关联关系
kafka_flume_hdfs.sources.r1.channels = c1
kafka_flume_hdfs.sinks.k1.channel = c1

3:exec source(tail -F)

exec 即 execute 执行的意思。表示执行Linux 命令来读取文件

# 1:定义组件
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# 2:定义source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /logs/app.log

# 3:定义channel
....
这里主要介绍sources顾这里省略,到channel模块查看写法

# 4:定义sink
....
这里主要介绍sources顾这里省略,到sink模块查看写法

# 5:定义关联关系
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

4:netcat source(采集端口)

# 1:定义组件
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 2:定义source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.1.100
a1.sources.r1.port = 44444

# 3:定义channel
....
这里主要介绍sources顾这里省略,到channel模块查看写法

# 4:定义sink
....
这里主要介绍sources顾这里省略,到sink模块查看写法

# 5:定义关联关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

netcat使用

(1)安装 netcat 工具
$ sudo yum install -y nc

(2)判断 44444 端口是否被占用
$ sudo netstat -nlp | grep 44444

(3)使用 netcat 工具向本机的 44444 端口发送内容
$ nc localhost 44444
abcd
这边输入内容

5:spoolDir读取目录文件(文件更新不同步)

# 1:定义组件
a3.sources = r3
a3.sinks = k3
a3.channels = c3

# 2:定义source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /upload  # 同步的文件夹
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp 结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# 3:定义channel
....
这里主要介绍sources顾这里省略,到channel模块查看写法

# 4:定义sink
....
这里主要介绍sources顾这里省略,到sink模块查看写法

# 5:定义关联关系
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/428689.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Axios请求(对ajax的二次封装)——Axios API、Axios实例、请求配置、Axios响应结构

axios起步——介绍和使用基本用例post请求场景复现核心干货axios APIaxios(config)axios(url[,config])请求方式别名axios实例创建一个axios实例axios.create([config])实例方法axios请求配置axios响应结构场景复现 最近学习与前端相关的小程序时,接触了异步请求ap…

医学图像配准 (Medical Image Registration)

目录 Classification Transformation Registration Algorithms Landmark Based Surfaced Based Voxel Intensity Based Information Theory Based Registration using basis functions Registration using splines Other Physics Based Registration Optimization V…

OctoClock CDA 2990

CDA 2990 CDA 2990为时钟和PPS分发设备,支持外部一路时钟和PPS输入,最高支持8路时钟和PPS输出。同时CDA 2990可选配带GPS模块版本,可外接GPS天线,支持通过GPS锁定时钟和PPS信号输出。CDA 2990主要用于多台USRP设备进行同步。 CDA…

C++之 继承 (inheritance)

目录 启示 一、基本语法 二、继承的方式 三种: 公共基础 / 保护继承 / 私有继承 三、继承中的对象模型 ①父类中所有非静态成员属性都会继承给子类 ②而私有成员属性同样继承过去,但是被编译器隐藏,因此无法访问 四、继承中构造和析…

2023年,送你一份最新的后端架构师知识图谱

这是一个能让你成为架构师的文章,请耐心读完。 为什么写这个 前几天心血来潮搜了下《后端架构师》的技术图谱。发现最新最火的文章更新时间还停留在5年前。最新的技术体系并没有罗列在内。而且文章的颗粒度特别细,是从数据结构和常用算法开始的。这是典…

【加餐 2】Tab 标签页管理

【加餐 2】Tab 标签页管理 对于管理系统,经常需要开启多个标签页,但是每次都需要手动去关闭,很麻烦,所以就有了这个功能,可以一键关闭所有标签页,或者关闭除当前标签页外的所有标签页,对于重要的标签页,可以进行固定至前列,方便下次快速打开。 一、实现效果 实现效…

论文浅尝 | 利用知识图谱增强的Transformer进行跨领域方面抽取

笔记整理:沈小力,东南大学硕士,研究方向为知识图谱链接:https://dl.acm.org/doi/pdf/10.1145/3511808.3557275动机情感分析是自然语言处理的基础任务,它包含介绍了细粒度情感分析中的一个常见任务——基于方面的情感分…

【CSS】鼠标移动到元素上方显示 / 移出盒子范围隐藏案例 ( 子绝父相 | 显示隐藏元素对象 | 鼠标经过样式设置 | 半透明遮罩设置 )

文章目录一、鼠标移动到元素上方显示 / 移出盒子范围隐藏案例要点分析1、子绝父相2、显示隐藏元素对象3、鼠标经过样式设置4、半透明遮罩设置二、代码示例一、鼠标移动到元素上方显示 / 移出盒子范围隐藏案例要点分析 1、子绝父相 这里要 在一个 div 盒子上方套一层遮罩 , 遮罩…

【原理图专题】OrCAD Capture 设计规则(DRC)检查

在原理图设计完成后,需要进行DRC检查,DRC检查能协助工程师快速检查原理图的物理、电气规则是否正确,能快速定位错误和原因。 DRC检查从Capture 工具栏中如下图红框所示的图标中Design Rules Check进入 进入后将打开DRC窗口,有四个选项卡。分别是Design Rules Options、Elec…

带你了解攻击与防护相关知识

目录 一、攻击篇 1.什么是恶意软件? 2.恶意软件有哪些特征? 3. 恶意软件的可分为那几类? 4. 恶意软件的免杀技术有哪些? 5. 反病毒技术有哪些? 6. 反病毒网关的工作原理是什么&#xff1f…

java计时器

在 Java中,我们有一个重要的概念:同步和异步。同步就是 Java中的线程安全,异步就是 Java中的线程非安全。 在使用 JVM时,我们一般都是用 start ()方法启动一个线程,然后设置时间,比如…

Android开发中,自定义注解的两种应用方式

java注解在Android开发中主要有两种使用方式;一种是在程序运行期间获取类的信息进行反射调用;另一种是使用注解处理,在编译期间生成相关代码,然后在运行期间通过调用这些代码来实现相关功能。 我们先了解一下注解的分类和关键字 …

前端实用js dom合集

1. 整个网页变为灰色主题,最外层加css样式:filter:grayscale(1) 黑色主题:filter:invert(1) 2.js剪辑视频片段制作gif动图: 效果:点击开始就开始录制,点击结束右边显示生成的gif动图 生成g…

Python程序异常处理

一、什么是异常 异常就是程序运行时发生错误的信号,在程序由于某些原因出现错误的时候,若程序没有处理它,则会抛出异常,程序也的运行也会随之终止; 程序异常带来的问题: 1.程序终止,无法运行…

浙大数据结构(1)

开始学习数据结构(拖了好久终于开干了) 来自【浙江大学】数据结构(合149讲)陈越 何钦铭 Be a Fighter and Keep Fighting!!! 数据结构(data structure)定义 是计算机中存储,组织数据的方法。通常情况下,精心选择的数据结构可以带…

Chapter7-吞吐量优先的使用场景

7.1 在 Broker 端进行消息过滤 在 Broker 端进行消息过滤,可以减少无效消息发送到 Consumer ,少占用网络带宽从而提高吞吐量。 Broker 端有三种方式进行消息过滤 。 7.1.1 消息的 Tag 和 Key 对一个应用来说,尽可能只用一个 Topic &#xff…

【数据结构学习3】线性表-链表、单链表

目录链式存储结构链表概念头结点的意义单链表的定义和表示单链表的基本操作链式存储结构 链表概念 概念 结点在存储器中的位置是任意的,即逻辑上相邻的数据元素在物理上不一定相邻。线性表的链式表示又称为非顺序映像或链式映像用一组物理位置任意的存储单元来存…

三公经费用泛微全过程数字化管理,使用有记录,付款有依据

公开透明是现代财政制度的重要准则和基本特征。组织要以公开、透明、科学的预算制度确定财政支出,贯穿预算编制、执行、监督全过程。 组织常见的费用管理——“三公”经费,通常指因公出国(境)费、公务用车购置及运行费、公务接待…

富士康转移3000亿产能,iPhone的印度产能倍增,不再“赏饭吃”

日前消息指今年三月份印度的iPhone产量已经是去年的四倍之多,占比将近7%,显示出苹果和富士康都在加速提升印度的iPhone产能,凸显出他们的决心,这对中国制造业将带来深远影响。一、富士康对中国制造影响巨大2021年的数据…

leetcode每日一题:数组篇(1/2)

😚一个不甘平凡的普通人,日更算法学习和打卡,期待您的关注和认可,陪您一起学习打卡!!!😘😘😘 🤗专栏:每日算法学习 💬个人…