【Flume Kafaka实战】Using Kafka with Flume

news2025/1/10 10:56:41

一 目标

在Cloudera Manager中创建两个Flume的Agent,Agent1从local file中获取内容,写入到kafka的队列中。Agent2以Agent1的sink作为source,将数据从kafka中读取出来,写入到HDFS中。

二 实战

2.1 Kafka Sink

第一步,在Cloudera Manager中安装Flume,安装时指定两个Agent。这一步很简单。

第二步,创建一个新Role Group。默认情况下,所有的Agent都处于一个叫Agent Default Group的角色组中,处于同一角色组中的Agent共享相同的配置。但是在我们这个例子中,两个Agent要完成不同的工作,需要不同的配置。所有新建一个Role Group,并把其中一个Agent移到到这个新的Group中,如下图所示。

第三步,分别编辑两个Agent的配置文件,我的第一个Agent名字为file2Kafka,配置文件内容如下。不难看出,这个配置的source就是去tail一个本地文件,然后写入到kafka的消息队列中。

即:Kafka Sink

# Name the components on this agent
file2Kafka.sources = file2Kafka_source
file2Kafka.sinks = file2Kafka_sink
file2Kafka.channels = file2Kafka_channel

# Describe/configure the source
file2Kafka.sources.file2Kafka_source.type = exec
file2Kafka.sources.file2Kafka_source.command = tail -F /home/demo/flume-exec.txt

# Describe the sink
file2Kafka.sinks.file2Kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
# topic前不加kafka
file2Kafka.sinks.file2Kafka_sink.topic = flumetest
file2Kafka.sinks.file2Kafka_sink.kafka.bootstrap.servers= slave1:9092,slave2:9092
file2Kafka.sinks.file2Kafka_sink.kafka.flumeBatchSize= 20

# Use a channel which buffers events in memory
file2Kafka.channels.file2Kafka_channel.type = memory
file2Kafka.channels.file2Kafka_channel.capacity = 1000
file2Kafka.channels.file2Kafka_channel.transactionCapacity = 1000

# Bind the source and sink to the channel
file2Kafka.sources.file2Kafka_source.channels = file2Kafka_channel
file2Kafka.sinks.file2Kafka_sink.channel = file2Kafka_channel

2.2 Kafka Source

第二Agent的名字是kafka2Hdfs,配置文件如下。这个配置的内容就是把Agent1中写到kafka的数据读出来,然后写入到HDFS中。注意hdfs.path这个配置,由于在Cloudera Manager中,Flume知道HDFS相关的配置,所以无需去加入hdfs://my-cluster这样的协议前缀。

# Name the components on this agent
kafka2Hdfs.sources = kafka2Hdfs_source
kafka2Hdfs.sinks = kafka2Hdfs_sink
kafka2Hdfs.channels = kafka2Hdfs_channel

# Describe/configure the source
kafka2Hdfs.sources.kafka2Hdfs_source.type = org.apache.flume.source.kafka.KafkaSource
kafka2Hdfs.sources.kafka2Hdfs_source.batchSize = 10
kafka2Hdfs.sources.kafka2Hdfs_source.batchDurationMillis = 1000
kafka2Hdfs.sources.kafka2Hdfs_source.kafka.bootstrap.servers = slave1:9092,slave2:9092
kafka2Hdfs.sources.kafka2Hdfs_source.kafka.topics = flumetest
kafka2Hdfs.sources.kafka2Hdfs_source.kafka.consumer.group.id = flume

# Describe the sink
kafka2Hdfs.sinks.kafka2Hdfs_sink.type = hdfs
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.path = /flume/
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.fileType = DataStream
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.filePrefix=sxt
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.rollCount=0
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.rollInterval=0

# Use a channel which buffers events in memory
kafka2Hdfs.channels.kafka2Hdfs_channel.type = memory
kafka2Hdfs.channels.kafka2Hdfs_channel.capacity = 1000
kafka2Hdfs.channels.kafka2Hdfs_channel.transactionCapacity = 100

# Bind the source and sink to the channel
kafka2Hdfs.sources.kafka2Hdfs_source.channels = kafka2Hdfs_channel
kafka2Hdfs.sinks.kafka2Hdfs_sink.channel = kafka2Hdfs_channel

整个配置完成之后,Cloudera Manager中的界面如下图:

在运行中可能会出现一些目录读写的权限问题,需要去修改hdfs中相关目录的权限。比如我的配置中,数据是写到/flume这个目录下的,这个目录我是用root用户去创建的,但flume运行是使用一个叫flume的用户名来运行的,所以用hdfs dfs -chmod 777 /flume把这个目录的读写权限放开了。

这是一个例子,主要演示如何在cloudera manager中把两个flume的agent串联在一起使用。在现实的生产中,如果需要把一个文本数据通过kakfa写入到hdfs中,更合理的做法是使用一个agent,把kafka作为channel来使用。具体可以参考https://www.cloudera.com/documentation/kafka/latest/topics/kafka_flume.html

2.3 Kafka Channel

# Name the components on this agent
kafkaCh.sources = src_1_file
kafkaCh.channels = ch_1_kafka
kafkaCh.sinks = sink_1_hdfs

# Describe/configure the source
kafkaCh.sources.src_1_file.type = exec
kafkaCh.sources.src_1_file.command = tail -F /home/demo/flume-exec.txt

# Define a kafka channel
kafkaCh.channels.ch_1_kafka.type = org.apache.flume.channel.kafka.KafkaChannel
kafkaCh.channels.ch_1_kafka.kafka.bootstrap.servers = slave1:9092,slave2:9092
kafkaCh.channels.ch_1_kafka.kafka.topic = kafka_channel
kafkaCh.channels.ch_1_kafka.kafka.consumer.group.id = flume-consumer

# Describe the sink
kafkaCh.sinks.sink_1_hdfs.type = hdfs
kafkaCh.sinks.sink_1_hdfs.hdfs.path = /flume/kafka/channel
kafkaCh.sinks.sink_1_hdfs.hdfs.fileType = DataStream
kafkaCh.sinks.sink_1_hdfs.hdfs.filePrefix=sxt
kafkaCh.sinks.sink_1_hdfs.hdfs.rollCount=0
kafkaCh.sinks.sink_1_hdfs.hdfs.rollInterval=0

# Bind the source and sink to the channel
kafkaCh.sources.src_1_file.channels = ch_1_kafka
kafkaCh.sinks.sink_1_hdfs.channel = ch_1_kafka

将上面两个Agent放在一个Agent中,用Kafka Channel实现。

注意:hdfs.path 必须存在,且有权限进行操作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2177675.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java-数据类型与变量

一、字面常量 在我们使用Java进行数据的打印时,经常会用到这个语句: System.out.println("Hello world!"); 而当这个语句运行时,不论如何都会输出"Hello world!",所以"Hello world!"就是字面常量…

哪款宠物空气净化器能有效去除浮毛?希喂、352实测分享

你是否曾经站在家电卖场里,面对琳琅满目的宠物空气净化器产品而感到无所适从?或者在浏览网上商城时,被海量的参数和功能描述搞得头晕眼花?别担心,你不是一个人。在这个科技飞速发展的时代,选择一台既能满足…

Grafana指标汉化

1、Grafana解压 目录 conf 2、找到:defaults.ini 3、打开defaults.ini ,搜索:en-US 4.重新运行 :grafana-server.exe

开放词汇全景分割

开放词汇全景分割是一种先进的计算机视觉任务,它旨在将图像中的每个像素分割并分类到预先定义或未定义的类别中。这与传统的图像分割不同,后者通常仅限于识别有限的、预先定义的对象类别。开放词汇全景分割的目标是识别和处理图像中的任何可能的对象&…

基于Hadoop的微博舆情监测分析系统

作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏:…

大模型,互联网玩家们的「角斗场」

文/孟永辉 布局大模型,似乎正在成为每一个互联网玩家必然都要去做的事情。无论是以BAT为代表的综合性的大型平台来讲,还是以WPS、携程为代表的专业性的平台们而言,几乎都是如此。一时间,大模型,成为了一个全新的风口。…

9.5K Star,开源在线网盘

Hi,骚年,我是大 G,公众号「GitHub 指北」会推荐 GitHub 上有趣有用的项目,一分钟 get 一个优秀的开源项目,挖掘开源的价值,欢迎关注。 随着云存储的广泛应用,越来越多的人和企业需要一个简单、…

android SELinux权限适配

抓log方法, setenforce 0, 如果不先将selinux设置为permission mode,会导致一个问题。 程序运行的时候遇到权限策略限制(假设 sepolicy 1),程序运行失败。添加权限(sepolicy 1),然后…

Java Web —— 第十天(SpringBoot原理)

SpringBoot框架之所以使用起来更简单更快捷,是因为SpringBoot框架底层提供了两个非常重要的 功能:一个是起步依赖,一个是自动配置。 通过SpringBoot所提供的起步依赖,就可以大大的简化pom文件当中依赖的配置,从而解决…

Python 知识宝库 —— 数据可视化:matplotlib 与 seaborn 的使用技巧

🎬 鸽芷咕:个人主页 🔥 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想,就是为了理想的生活! 博主简介 博主致力于嵌入式、Python、人工智能、C/C领域和各种前沿技术的优质博客分享,用最优质的内容带来最舒适的…

SpringBoot日常:链路追踪skyworking的接入

前言 开发中遇到这样的一个常见,服务之间调用需要链路追踪,并且日志报错时能够及时预警,最重要的一点是不要写太多的侵入式代码,那么我们来捋捋常用的链路追踪组件,有Google的Dapper,阿里的鹰眼&#xff0…

Excel提取数据

Excel提取数据 在使用Excel的过程中,我需要将带有评语的评分的文本内容提取出评分,如下所示 其中分数与文本之间用空格分隔,只有分数的评语会自动靠右对齐,我需要做的就是将B列的评语从中提取出分数放到C列中,以下为实…

卡牌小程序搭建:多样化在线拆卡体验

潮玩市场已经成为备受消费者瞩目的行业,新兴的潮流玩具不断出现,其中卡牌更是备受广大消费者关注,不管是哪个年龄层的消费者,对卡牌都非常情有独钟。在科技的不断发展下,卡牌的玩法与互联网相结合,打造出了…

如何高效删除 MySQL 日志表中的历史数据?实战指南

在处理高并发的物联网平台或者其他日志密集型应用时,数据库中的日志表往往会迅速增长,数据量庞大到数百GB甚至更高,严重影响数据库性能。如何有效管理这些庞大的日志数据,特别是在不影响在线业务的情况下,成为了一项技…

IPD变革中,数据治理是关键

IPD变革中,数据治理是关键 2024-09-29 14:41汉捷咨询 华为轮值董事长徐直军先生在回顾IPD变革时,提到:“华为IPD变革前期,对数据的关注不够,没有系统梳理产品的信息架构和数据标准,也没有对业务流中的数据…

C++的隐式构造函数、隐式转换和explicit关键字

1、隐式的意思是不用告诉它该怎么做&#xff0c;有点类似于自动化的意思 #include <iostream> #include <string>class Entity { private:std::string m_Name;int m_Age; public:Entity(const std::string& name) :m_Name(name),m_Age(-1){}Entity(int age):…

安全、稳定、SLA高达99.9%:Azure OpenAI数据分离与隔离优势

近期有不少客户&#xff0c;由于其开发的系统软件是面向海外以及政企的&#xff0c;又想通过微软Azure OpenAI服务将大模型接入其业务作为优势&#xff0c;因此非常重视服务的安全性和稳定性。 下面将重点介绍微软Azure OpenAI 服务的数据、隐私和安全内容。 稳定&#xff1a;S…

使用kubectl快速查看各个节点的CPU和内存占用量

本文章视频教程地址&#xff1a;https://www.bilibili.com/video/BV1TdxkedE1K 前言 笔者之前写过一篇文章关于在Kubernetes上安装 Prometheus 和 Grafana 监控去查看Kubernetes各个节点的资源占用率&#xff0c;文章地址&#xff1a;https://blog.csdn.net/m0_51510236/arti…

springcloud 面试题

什么是微服务&#xff1f; 本文导图&#xff1a;SpringCloud 梳理-ProcessOn 分布式架构CAP理论 CAP定理是分布式系统中最基础的原则&#xff0c;所以理解和掌握了CAP对系统架构的设计至关重要。分布式架构下所有系统不可能同时满足以下三点&#xff1a;Consisteny&#xff08…

Unity android 接USBCamera

目录 一、前提 1. unity打包android后&#xff0c;链接USB摄像头&#xff0c;需要USB权限。 二、流程 1.Unity导出android工程&#xff0c;Player配置如图&#xff1a; 2.导出android工程 3.在android工程中找到AndroidManifest.xml加入usb权限相关 <?xml version&quo…