2023_Spark_实验二十九:Flume配置KafkaSink

news2024/11/25 4:52:24

实验目的:掌握Flume采集数据发送到Kafka的方法

实验方法:通过配置Flume的KafkaSink采集数据到Kafka中

实验步骤:

一、明确日志采集方式

一般Flume采集日志source有两种方式:

1.Exec类型的Source

可以将命令产生的输出作为源,如:

a1.sources.r1.type = exec

a1.sources.r1.command = ping 10.3.1.227 //此处输入命令

2.Spooling Directory类型的 Source

将指定的文件加入到“自动搜集 ”目录中。flume会持续监听这个目录,把文件当做source来处理。注意:一旦文件被放到“自动收集”目录中后,便不能修改,如果修改,flume会报错。此外,也不能有重名的文件,如果有,flume也会报错。

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /home/work/data

向指定的文件目录下传送一个日志文件,发现flume的控制台打印相关的信息;此外,待收集的文件,会追加一个后缀:completed,表示已处理完。

3.确定采集策略:

采用exec方式采集数据

如果采用spooldir的方式来监控log文件夹,flume会采集log数据,flume会不断修改文件名,导致重复。

所以使用exec命令行的方式,通过tail -F *.log命令比较好!

注意: -F根据文件名进行追踪,并保持重试,即该文件被删除或改名后,如果再次创建相同的文件名,会继续追踪。 而-f根据文件的nodeid即文件描述符进行追踪,当文件改名或被删除,追踪停止 。

二、配置KafkaSink

Flume版本多,网上教程多,版本之间不兼容,推荐大家以Flume官网为准。

Exec Source

Kafka Sink

三、配置Flume配置文件

1. 拷贝一份配置文件模板

cp flume-conf.properties.template kafka.conf

2. 编辑kafka.conf

kafka.conf编辑内容如下

# 定义a2配置文件中每个组件的名称
a2.sources = execSrc
a2.channels = memoryChannel
a2.sinks = loggerSink

# 配置source组件
# For each one of the sources, the type is defined
a2.sources.execSrc.type = exec
a2.sources.execSrc.command = tail -F /home/hadoop/scripts/realtime/realdata.log

# 配置sink组件
# Each sink's type must be defined
a2.sinks.loggerSink.type = org.apache.flume.sink.kafka.KafkaSink
a2.sinks.loggerSink.kafka.topic = RealDataTopic
a2.sinks.loggerSink.kafka.bootstrap.servers = hd1:9092
a2.sinks.loggerSink.kafka.flumeBatchSize = 20
a2.sinks.loggerSink.kafka.producer.acks = 1
a2.sinks.loggerSink.kafka.producer.linger.ms = 1
a2.sinks.loggerSink.kafka.producer.compression.type = snappy

# 配置缓存方式
# Each channel's type is defined.
a2.channels.memoryChannel.type = memory
a2.channels.memoryChannel.capacity = 1000
a2.channels.memoryChannel.transactionCapacity = 100

# 配置source channel sink之间的连接关系
# The channel can be defined as follows.
a2.sources.execSrc.channels = memoryChannel
a2.sinks.loggerSink.channels = memoryChannel

3. 启动测试

/opt/module/apache-flume-1.9.0-bin/bin/flume-ng agent -c conf -f /opt/module/apache-flume-1.9.0-bin/conf/kafka.conf -n a2 -Dflume.root.logger=INFO,console

实验结果:配置kafkaSink成功,配置source为exec读取shell脚本模拟产生的实时数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1321408.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HarmonyOS4.0从零开始的开发教程18后台代理提醒

HarmonyOS(十六)后台代理提醒 简介 随着生活节奏的加快,我们有时会忘记一些重要的事情或日子,所以提醒功能必不可少。应用可能需要在指定的时刻,向用户发送一些业务提醒通知。例如购物类应用,希望在指定时…

搭建Eureka服务

搭建Eureka服务 文章目录 搭建Eureka服务搭建EurekaServer注册user-service注册多个实例 在order-service中完成服务拉取和负载均衡 搭建EurekaServer <dependency><!--eureka服务器--><groupId>org.springframework.cloud</groupId><artifactId>…

QUIC在零信任解决方案的落地实践

一 前言 ZTNA为以“网络为中心”的传统企业体系架构向以“身份为中心”的新型企业安全体系架构转变&#xff0c;提供解决方案。随着传统网络边界不断弱化&#xff0c;企业SaaS规模化日益增多&#xff0c;给终端安全访问接入创造了多元化的空间。其中BYOD办公方式尤为突出&#…

什么是uniapp?如何开发uniapp?

大家好&#xff01;我是咕噜铁蛋&#xff01;随着移动应用市场的持续发展&#xff0c;开发者们面临着不断增长的需求和多样化的平台选择。在这个背景下&#xff0c;UniApp应运而生&#xff0c;成为一种跨平台开发框架&#xff0c;为开发者提供了一种高效、简便的方式来开发移动…

前端框架的单文件组件(Single File Component)

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

机器学习之线性回归(Linear Regression)

概念 线性回归(Linear Regression)是机器学习中的一种基本的监督学习算法,用于建立输入变量(特征)与输出变量(目标)之间的线性关系。它假设输入变量与输出变量之间存在线性关系,并试图找到最佳拟合线来描述这种关系。 在简单线性回归中,只涉及两个变量:一个是自变量…

【赠书活动】OpenCV4工业缺陷检测的六种方法

文章目录 前言机器视觉缺陷检测工业上常见缺陷检测方法延伸阅读推荐语 赠书活动 前言 随着工业制造的发展&#xff0c;对产品质量的要求越来越高。工业缺陷检测是确保产品质量的重要环节&#xff0c;而计算机视觉技术的应用能够有效提升工业缺陷检测的效率和精度。 OpenCV是一…

AWS解决方案架构师学习与备考

系列文章目录 送书第一期 《用户画像&#xff1a;平台构建与业务实践》 送书活动之抽奖工具的打造 《获取博客评论用户抽取幸运中奖者》 送书第二期 《Spring Cloud Alibaba核心技术与实战案例》 送书第三期 《深入浅出Java虚拟机》 送书第四期 《AI时代项目经理成长之道》 …

Vue 项目中使用 debugger 在 chrome 谷歌浏览器中失效以及 console.log 指向去了 vue.js 代码

问题 今天在代码里面输出 console.log 信息直接指向了 vue.js&#xff0c;并且代码里面写了 debgger 也不生效 解决 f12 找到浏览器的这个设置图标 找到这个 ignore list 的 custom exclusion rules 取消掉 /node_modules/|/bower_components/ 这样就正常了

Neural Network——神经网络

1.feature reusing——特征复用 1.1 什么是特征复用 回顾我们之前所学习的模型&#xff0c;本质上都是基于线性回归&#xff0c;但却都可以运用于非线性相关的数据&#xff0c;包括使用了如下方法 增加更多的特征产生新的特征&#xff08;多项式回归&#xff09;核函数 在本身…

服务器数据恢复-raid5故障导致上层分区无法访问的数据恢复案例

服务器数据恢复环境&故障&#xff1a; 一台服务器上3块硬盘组建了一组raid5磁盘阵列。服务器运行过程中有一块硬盘的指示灯变为红色&#xff0c;raid5磁盘阵列出现故障&#xff0c;服务器上层操作系统的分区无法识别。 服务器数据恢复过程&#xff1a; 1、将故障服务器上磁…

【数据结构】模式匹配之KMP算法与Bug日志—C/C++实现

​&#x1f308;个人主页&#xff1a;Sarapines Programmer&#x1f525; 系列专栏&#xff1a;《数据结构奇遇记》&#x1f516;墨香寄清辞&#xff1a;墨痕寄壮志&#xff0c;星辰梦未满。 通幽径心凝意&#xff0c;剑指苍穹势如山。 目录 &#x1f31e;1. 模式匹配的基本概念…

工作:三菱PLC程序开发流程总结

工作&#xff1a;三菱PLC程序开发流程总结 一、程序流程图 程序流程图是逻辑思维与动作流程的检查图&#xff0c;是保证逻辑思维合理的前提&#xff0c;写代码丢失方向可从程序流程图重新整理&#xff0c;程序流程图非常重要。 二、组态配置 组态配置是将所用到的基板和模块…

网络编程二

前言 在上一篇关于网络协议的博客中&#xff0c;我们简单概括了网络套接字中的UDP协议&#xff0c;本篇博客我们将继续学习分享关于网络套接字中另一个协议&#xff0c;TCP网络协议 一、UDP和TCP协议区别是什么&#xff1f; 二者之间的区别如下 &#x1f517;UDP的主要特点 …

Mysql查询使用group_concat函数后,如果查询无结果,仍会返回一条空数据

1、在查询中使用了group_concat 函数&#xff0c;简单例子如下&#xff1a; select GROUP_CONCAT(recordid) from s_au_user where username 121212此sql查询一个username 为121212的数据&#xff0c;当然肯定是查询不到的&#xff0c;理论上应该返回0条结果&#xff0c;但是…

Python操作Word

Python操作Word 一、Word简介二、向Word写入内容2.1 导入模块2.2 创建doc文档对象2.3 添加段落2.4 添加列表2.5 添加图片2.6 保存文件 三、读取Word内容四、批量生成Word文件 一、Word简介 ​ 在日常工作中&#xff0c;有很多简单重复的劳动其实完全可以交给Python程序&#x…

Camtasia2024下载安装使用教程汇总

Camtasia Studio2024提供了强大的屏幕录像(Camtasia Recorder)、视频剪辑和编辑(Camtasi Studio)、视频菜单制作(Camtasia MenuMaker)等功能&#xff0c;界面简洁明晰、操作方便快捷。使用Camtasia Studio官方用户可以方便地进行屏幕操作的录制和配音、视频的剪辑和过场动画、添…

晚期食管癌肿瘤治疗线程分类

文章目录 1、肿瘤治疗的线数1.1 基础概念1.2 线程定义1.3 如何计算治疗线数 2 食管癌治疗指南2.1 食管癌诊疗指南2.1 CSCO 本文前半部分主要来源于参考文件1&#xff0c;其余部分来源于官方指南。无原创内容&#xff0c;全部为摘要。 1、肿瘤治疗的线数 1.1 基础概念 抗肿瘤药…

【Hive】——DML

1 Load&#xff08;加载数据&#xff09; 1.1 概述 1.2 语法 LOAD DATA [LOCAL] INPATH filepath [OVERWRITE] INTO TABLE tablename [PARTITION (partcol1val1, partcol2val2 ...)]LOAD DATA [LOCAL] INPATH filepath [OVERWRITE] INTO TABLE tablename [PARTITION (partcol…

ABAP与HANA集成 2:ABAP调用HANA存储过程或SQL语句

作者 idan lian 如需转载备注出处 需求 虽然是做BW模块&#xff0c;但是最近项目上种种&#xff0c;都需要给ABAP人员或者前台用户提供能供他们使用的表&#xff0c;就稍微研究了下ABAP和HANA的集成问题&#xff0c;因为我们BW更擅长的还是HANA&#xff0c;而且HANA的运行效…