Flume 与 Kafka 整合实战

news2025/1/9 1:17:52

目录

一、Kafka 作为 Source【数据进入到kafka中,抽取出来】

(一)环境准备与配置文件创建

(二)创建主题

(三)测试步骤

二、Kafka 作为 Sink数据从别的地方抽取到kafka里面】

(一)编写配置脚本

(二)创建 topic

(三)测试过程

三、应用场景示例 

四、总结


        在大数据处理的生态系统中,Flume 和 Kafka 都是非常重要的组件。Flume 擅长收集、聚合和传输大量的日志数据等,而 Kafka 则是一个高性能的分布式消息队列,能够处理海量的实时数据。将 Flume 和 Kafka 进行整合,可以构建强大的数据处理管道,实现数据的高效采集、传输和处理。本文将详细介绍 Flume 和 Kafka 整合的两种常见方式:Kafka 作为 Source 和 Kafka 作为 Sink。

一、Kafka 作为 Source【数据进入到kafka中,抽取出来】

 

(一)环境准备与配置文件创建

        在 Flume 的 conf 文件夹下,创建一个名为 kafka - memory - logger.conf 的脚本文件。这里需要注意,在实际操作中可能会遇到错误,例如 kafka 的每一批次的读取数量大于了 channel 的容量。这种情况下的解决方案是要么降低 kafka 的每一批次读取的容量,要么提高 channel 的容量。

https://flume.liyifeng.org/#kafka-source

kafka-memory-logger.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 100
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sources.r1.kafka.topics = five
a1.sources.r1.kafka.consumer.group.id = qiaodaohu

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = logger
a1.sinks.k1.maxBytesToLog = 128

 

(二)创建主题

        接着创建一个 topic,名字可以叫做 kafka - flume,当然也可以直接使用以前创建好的主题。

kafka-topics.sh --create --topic kafka-flume --bootstrap-server bigdata01:9092 --partitions 3 --replication-factor 1

 

(三)测试步骤

首先启动一个消息生产者,向 topic 中发送消息。

kafka-console-producer.sh --topic kafka-flume --bootstrap-server bigdata01:9092

然后启动 Flume,接收消息并查看 log 日志,这样就可以验证数据是否能够从 Kafka 成功抽取到 Flume 中并进行后续处理。

在flume的flumeconf 文件夹下

flume-ng agent -n a1 -c ../conf -f ./kafka-memory-logger.conf -Dflume.root.logger=INFO,console

二、Kafka 作为 Sink数据从别的地方抽取到kafka里面】

 

 

(一)编写配置脚本

编写一个名为 flume - kafka - sink.conf 的脚本,内容如下:

##a1就是flume agent的名称
## source r1
## channel c1
## sink k1
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = bigdata01
a1.sources.r1.port = 44444

# 修改sink为kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092
a1.sinks.k1.kafka.topic = five
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

这里的流程是 netcat(模拟数据源)→ memory(内存通道)→ kafka。

 

(二)创建 topic

使用以下命令创建 topic(flume - kafka):

kafka-topics.sh --create --topic flume-kafka --bootstrap-server bigdata01:9092 --partitions 3 --replication-factor 1

 

(三)测试过程

启动 Flume:

flume-ng agent -n a1 -c conf -f $FLUME_HOME/job/flume-kafka-sink.conf -Dflume.root.logger=INFO,console

使用 telnet 命令,向端口发送消息:

yum -y install telnet

telnet bigdata01 44444

在窗口不断地发送文本数据,数据就会被抽取到 Kafka 中。


使用消费者获取 Kafka 数据:

kafka-console-consumer.sh --topic flume-kafka --bootstrap-server bigdata01:9092 --from-beginning

 

三、应用场景示例 

        假定有这样一个场景:Flume 可以抽取不断产生的日志,抽取到的日志数据,发送给 Kafka,Kafka 经过处理,可以展示在页面上,或者进行汇总统计。这样就实现了一定的实时效果,在实际的大数据处理流程中,这种整合方式能够有效地处理海量的实时数据,提高数据处理的效率和可靠性。

四、总结

        通过 Flume 和 Kafka 的整合,我们能够构建更加灵活、高效的数据处理架构,满足不同场景下的大数据处理需求,为后续的数据挖掘、分析等提供坚实的数据基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2250492.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

存储服务器一般做是做什么阵列?详细列举一下

存储服务器通常使用 RAID(Redundant Array of Independent Disks) 阵列技术来管理磁盘,以提高数据的性能、可靠性和可用性。所选择的 RAID 类型取决于存储服务器的具体用途和需求,比如性能要求、容量需求、容错能力等。 以下是存…

无人机的起降装置:探索起飞和降落的秘密 !

一、起降系统的运行方式 起飞方式 垂直起飞:小型无人机通常采用垂直起飞方式,利用螺旋桨产生的升力直接从地面升起。这种方式适用于空间有限或需要快速起飞的场景。 跑道起飞:大型无人机或需要较长起飞距离的无人机,可能会采用…

代码随想录day01--数组

两数之和 题目 地址:https://leetcode.cn/problems/two-sum/ 给定一个整数数组 nums 和一个目标值 target,请你在该数组中找出和为目标值的那 两个 整数,并返回他们的数组下标。 你可以假设每种输入只会对应一个答案。但是,数…

Webpack前端工程化进阶系列(二) —— HMR热模块更新(图文+代码)

前言 之前更新过一篇Webpack文章:Webpack入门只看这一篇就够了(图文代码),没想到颇受好评,很快就阅读量就破万了hhh,应读者私信的要求,决定继续更新Webpack进阶系列的文章! 进入今天的主题 —— HMR 热模块替换(HotM…

Flink的双流join理解

如何保证Flink双流Join准确性和及时性、除了窗口join还存在哪些实现方式、究竟如何回答才能完全打动面试官呢。。你将在文中找到答案。 1 引子 1.1 数据库SQL中的JOIN 我们先来看看数据库SQL中的JOIN操作。如下所示的订单查询SQL,通过将订单表的id和订单详情表ord…

【MYSQL数据库相关知识介绍】

MySQL 在我们日常技术中是一个广泛使用的开源关系型数据库管理系统,所以作为测试同学,掌握mysql的相关知识是必不可少的技能之一,所以小编从软件测试的角色出发,来整理一些跟测试相关的知识,希望能够帮助到大家。 一、…

数组和链表OJ题

leetcode用编译器调试的技巧 数组和链表练习题 leetcode/reverse_Link/main.c Hera_Yc/bit_C_学习 - 码云 - 开源中国 1、移除元素 ​​​​​​27. 移除元素 - 力扣(LeetCode) int removeElement(int* nums, int numsSize, int val) {int src 0, …

云服务器架构有什么区别?X86计算、Arm、GPU/FPGA/ASIC和裸金属全解析

阿里云服务器ECS架构有什么区别?X86计算、Arm计算、GPU/FPGA/ASIC、弹性裸金属服务器和高性能计算有什么区别?x86架构是最常见的,CPU采用Intel或AMD处理器;ARM架构具有低功耗的特性,CPU采用Ampere Altra / AltraMax或阿…

泽众TestCenter测试管理工具之案例库,提升测试工作的效率和质量

在当今的软件开发生命周期中,测试管理工具扮演着至关重要的角色。泽众TestCenter测试管理工具(简称TC),作为一款广受好评的测试管理工具,凭借其强大的案例库功能,极大地提升了测试工作的效率和质量。 案例库…

Spring Cloud(Kilburn 2022.0.2版本)系列教程(五) 服务网关(SpringCloud Gateway)

Spring Cloud(Kilburn 2022.0.2版本)系列教程(五) 服务网关(SpringCloud Gateway) 一、服务网关 1.1 什么是网关 在微服务架构中,服务网关是一个至关重要的组件。它作为系统的入口,负责接收客户端的请求,并将这些请求路由到相应的后端服务…

基于单片机的多功能宠物窝的设计

本设计以STM32主控制器为核心芯片,它的组成元件有电机、温度传感器、时钟模块等。温度传感器的作用是采集环境温度的数据,时钟模块的作用是采集时间。将具体数据进行收集以后,主控制器将所有相关数据予以处理,从而将有关信息传递到…

Windows搭建MaskRCNN环境

环境:python3.6 1. 在miniconda上创建虚拟环境 miniconda下载地址:https://mirrors.tuna.tsinghua.edu.cn/anaconda/miniconda # 创建环境 conda create -n maskrcnn python3.6 # 激活 maskrcnn 环境,后续操作都在这个环境下进行 conda ac…

LLM PPT Translator

LLM PPT Translator 引言Github 地址UI PreviewTranslated Result Samples 引言 周末开发了1个PowerPoint文档翻译工具,上传PowerPoint文档,指定想翻译的目标语言,通过LLM的能力将文档翻译成目标语言的文档。 Github 地址 https://github.…

新质驱动·科东软件受邀出席2024智能网联+低空经济暨第二届湾区汽车T9+N闭门会议

为推进广东省加快发展新质生产力,贯彻落实“百县千镇万村高质量发展工程”,推动韶关市新丰县智能网联新能源汽车、低空经济与数字技术的创新与发展,充分发挥湾区汽车产业链头部企业的带动作用。韶关市指导、珠三角湾区智能网联新能源汽车产业…

Zookeeper选举算法与提案处理概览

共识算法(Consensus Algorithm) 共识算法即在分布式系统中节点达成共识的算法,提高系统在分布式环境下的容错性。 依据系统对故障组件的容错能力可分为: 崩溃容错协议(Crash Fault Tolerant, CFT) : 无恶意行为,如进程崩溃,只要…

实例讲解MATLAB绘图坐标轴标签旋转

在进行绘图时需要在图片上添加上做标轴的标签,但是当数据量比较多时,例如一天24小时的数据,这时把每个小时显示在左边轴的标签上,文字内容放不下,因此需要将坐标轴标签旋转一定的角度,这样可以更好在图形上…

flutter项目AndroidiOS自动打包脚本

从业数年余,开发出身,经数载努力位项目经理,因环境欠佳,终失业.失业达七月有余,几经周转,现又从开发,既回原点亦从始.并非与诸位抢食,仅为糊口,望海涵!因从头开始,所经之处皆为新奇,遂处处留痕以备日后之需. 自动打包脚本原文地址:https://zhuanlan.zhihu.com/p/481472311 转…

免费实用在线AI工具集合 - 加菲工具

免费在线工具-加菲工具 https://orcc.online/ sql格式化 https://orcc.online/tools/sql 时间戳转换 https://orcc.online/tools/timestamp Base64 编码解码 https://orcc.online/tools/base64 URL 编码解码 https://orcc.online/tools/url Hash(MD5/SHA1/SHA256…) 计算 h…

Scala学习记录,统计成绩

统计成绩练习 1.计算每个同学的总分和平均分 2.统计每个科目的平均分 3.列出总分前三名和单科前三名,并保存结果到文件中 解题思路如下: 1.读入txt文件,按行读入 2.处理数据 (1)计算每个同学的总分平均分 import s…

第六届机器人、智能控制与人工智能国际(RICAI 2024)

会议信息 会议时间与地点:2024年12月6-8日,中国南京 会议官网:www.ic-ricai.org (点击了解大会参会等详细内容) 会议简介 第六届机器人、智能控制与人工智能国际学术会议(RICAI 2024)将于20…