flume kafka channel 应用详解

news2024/10/7 6:45:35

1 官方文档

Documentation -> Flume User Guide

在这里插入图片描述

2 kafka source (消费者)

在这里插入图片描述
在这里插入图片描述
Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics. If you have multiple Kafka sources running, you can configure them with the same Consumer Group so each will read a unique set of partitions for the topics. This currently supports Kafka server releases 0.10.1.0 or higher. Testing was done up to 2.0.1 that was the highest avilable version at the time of the release.

Kafka Source 是一个 Apache Kafka 消费者,它从 Kafka 主题中读取消息。 如果您有多个 Kafka 源正在运行,您可以使用相同的消费者组配置它们,这样每个源都将读取一组唯一的主题分区。 目前支持 Kafka 服务器版本 0.10.1.0 或更高版本。 测试已完成至 2.0.1,这是发布时的最高可用版本。

3 kafka sink (生产者)

This is a Flume Sink implementation that can publish data to a Kafka topic. One of the objective is to integrate Flume with Kafka so that pull based processing systems can process the data coming through various Flume sources.

This currently supports Kafka server releases 0.10.1.0 or higher. Testing was done up to 2.0.1 that was the highest avilable version at the time of the release.

Required properties are marked in bold font.

这是一个可以将数据发布到 Kafka 主题的 Flume Sink 实现。 目标之一是将 Flume 与 Kafka 集成,以便基于拉取的处理系统可以处理来自各种 Flume 源的数据。
目前支持 Kafka 服务器版本 0.10.1.0 或更高版本。 测试已完成至 2.0.1,这是发布时的最高可用版本。
必需的属性以粗体标记。

4 Kafka Channel

在这里插入图片描述

4.1 第一种使用

With Flume source and sink - it provides a reliable and highly available channel for events
在这里插入图片描述

4.2 第二种使用

With Flume source and interceptor but no sink - it allows writing Flu me events into a Kafka topic, for use by other apps
在这里插入图片描述

4.3 第三种使用

With Flume sink, but no source - it is a low-latency, fault tolerant way to send events from Kafka to Flume sinks such as HDFS, HBase or Solr在这里插入图片描述

5 实操

5.1 flume读取数据写入kafka

创建配置文件:


vim file_to_kafka.conf

#定义
a1.sources = r1
a1.channels = c1

#配置sources
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
#监控目录
a1.sources.r1.filegroups.f1 = /usr/local/mock-log/logs/app.*
#断点续传位置
a1.sources.r1.positionFile = /usr/local/flume-1.9.0/taildir_position.json


#配置channels
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop4:9092
a1.channels.c1.kafka.topic = topic_log
#flume event 只传body
a1.channels.c1.parseAsFlumeEvent = false


#组装
a1.sources.r1.channels = c1

启动flume:

nohup  bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console >flume.log 2>&1 &

启动kafka

docker exec -it kafka /bin/bash
cd /opt/kafka/bin
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_log

模拟生产日志,查看kafka:
在这里插入图片描述

5.2 flume channel 读取 kafa 消息写入hdfs

创建配置文件:

#定义组件
a1.channels = c1
a1.sinks = k1
#配置channels
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop4:9092
a1.channels.c1.topic = topic_log
a1.channels.c1.group.id = flume-consumer
a1.channels.c1.parseAsFlumeEvent=false

#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /rosh_mall/log/topic_log/%Y-%m-%d
#文件前缀
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.useLocalTimeStamp = true

#10秒或128M文件写入hdfs
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

#组装
a1.sinks.k1.channel = c1

启动:

#flume 启动agent  名字 a1  配置文件
nohup  bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs.conf -Dflume.root.logger=info,console >flume.log 2>&1 &

查看日志:

tail -f flume.log

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/195649.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2.DjangoRestFramework【基于DRF的RESTAPI的序列化使用】

进入Django rest framwork官网就能看到rest_framwork相关的教程; 1.安装rest_framwork pip install djangorestframework pip install markdown # Markdown support for the browsable API. pip install django-filter # Filtering support2.在setting中注册 setting…

QT-线性拟合(自动找直线区域)

最近有个需求,需要对一个S曲线的散点图做线性拟合,百度上线性拟合和曲线拟合公式很多,没什么问题,但需求里面有一个预期就是自动找出直线部分,前面因为其它事情耽搁,一直没有实现,心里多少有点梗…

SpringBoot微服务项目,转发并响应下载请求

在微服务项目中,我经常会碰到从一个微服务项目转发下载请求并实现下载文件的需求,因此在此做一个转发下载的示例。总的下载转发流程如下,我会按照这个流程一一介绍下载流程。 1、客户端的下载请求 这里主要介绍controller层是如何接收客户端…

apple pencil一代平替笔有哪些?平替电容笔推荐

当今社会,高科技推动了数字产品的发展。无论是在工作中,还是在学习中,大的屏幕都能让画面变得更清楚。不管是现在还是未来,Ipad设备都会变成我们每天的一个重要组成部分。如果ipad与一款易于使用的电容笔相结合,将会大…

git-secret:在 Git 存储库中加密和存储密钥(下)

在本篇文章中,将带你了解如何在 Docker 容器中设置git-secret和gpg,通过 Makefile recipe 为不同的场景创建工作流。 Makefile Adjustment 将git-secret和gpg指令添加到 Makefile 中.make/01-00-application-setup.mk: # File: .make/01-0…

C语言基础复习

目录 数组 一维数组 完全初始化int a[5]{1,2,3,4,5}; 不完全初始化int a[5]{1,2} 完全不初始化”,int a[5] 二维数组 完全初始化 不完全初始化 指针 变量的访问方式: 指针变量的定义: 指针变量的赋值: 指针变量的运算…

Spring-Security入门

简介 Security 是 Spring 家族中的一个安全管理框架。相比与另外一个安全框架Shiro,它提供了更丰富的功能,社区资源也比Shiro丰富。 ​ 一般来说中大型的项目都是使用SpringSecurity 来做安全框架。小项目有Shiro的比较多,因为相比与Spring…

ChatGPT - InstructGPT 论文简读

发表于NLP会议:NeurlPS,EMNLP EMNLP: Empirical Methods in Natural Language Processing,自然语言处理中的经验方法NeurlPS: Neural Information Processing Systems,神经信息处理系统ChatGPT: Optimizing Language Models for Dialogue,优化对话的语言模型 ChatGPT:htt…

一文了解编程领域的模版

文章目录模版含义代码模版泛型模版引擎小结🍊在编程领域,模板是一种代码片段,它可以被重复使用,并允许您在保持代码的基本结构不变的情况下,根据需要调整其中的内容。模板通常在构建大型程序或开发一类相关程序时非常有…

Arthas的学习与使用

一、简介 Arthas 是一款线上监控诊断产品,通过全局视角实时查看应用 load、内存、gc、线程的状态信息,并能在不修改应用代码的情况下,对业务问题进行诊断,包括查看方法调用的出入参、异常,监测方法执行耗时&#xff0c…

Maven知识点-反应堆

前言 在一个多模块的Maven项目中,反应堆(Reactor)是指所有模块组成的一个构建结构。对于单模块的项目,反应堆就是该模块本身;但是对于多模块项目来说,反应堆就包含了各模块之间继承和依赖的关系&#xff0…

一篇带你MySQL入门

文章目录1. MySQL概述1.1 数据库相关概念1.2 MySQL数据库1.2.1 版本1.2.2 下载1.2.3 数据模型2. SQL2.1 SQL通用语法2.2 SQL分类2.3 DDL2.3.1 数据库操作2.3.2 表操作2.4 图形化界面工具2.4.1 安装2.4.2 使用2.5 DML2.5.1 添加数据2.5.2 修改数据2.5.3 删除数据2.6 DQL2.6.1 基…

每天一道大厂SQL题【Day04】大数据排序统计

每天一道大厂SQL题【Day04】大数据排序统计 大家好,我是Maynor。相信大家和我一样,都有一个大厂梦,作为一名资深大数据选手,深知SQL重要性,接下来我准备用100天时间,基于大数据岗面试中的经典SQL题&#x…

酒店管理|基于Springboot+Vue前后端分离实现酒店管理系统

作者主页:编程指南针 作者简介:Java领域优质创作者、CSDN博客专家 、掘金特邀作者、多年架构师设计经验、腾讯课堂常驻讲师 主要内容:Java项目、毕业设计、简历模板、学习资料、面试题库、技术互助 收藏点赞不迷路 关注作者有好处 文末获取源…

webpack5从入门到精通

前言 webpack是什么? 摘自官网的一段话:webpack 是一个用于现代 JavaScript 应用程序的 静态模块打包工具。当 webpack 处理应用程序时,它会在内部从一个或多个入口点构建一个 依赖图(dependency graph),然后将你项目中所需的每…

[oeasy]python0072_修改字体前景颜色_foreground_color_font

修改颜色 回忆上次内容 m 可以改变字体样式 0-9 之间设置的都是字体效果0 重置为默认1 变亮2 变暗3 斜体4 下划线5 慢闪6 快闪7 前景背景互换8 隐藏9 中划线 叠加效果 \33[1;3moeasy;分割 取消效果 21 取消 122 取消 223 取消 3一直到 290 是全部取消,回到默认 最…

静态链接库与动态链接库

静态链接库与动态链接库的区别 静态链接库:在项目中引用了库函数,编译时链接器会将引用的函数代码或变量,链接到可执行文件里,和可执行程序组装在一起 动态链接库:在编译阶段不参与链接,不会和可执行文件…

【Unity】流式播放远端音频:WAV格式音频篇(一)

先了解一下wav的格式: 参考1:【音频】WAV 格式详解_tyustli的博客-CSDN博客_wav文件格式详解wav 文件支持多种不同的比特率、采样率、多声道音频。WAV 文件格式是 Microsoft 的 RIFF 规范的一个子集,用于存储多媒体文件。RIFF(res…

git-secret:在 Git 存储库中加密和存储密钥(上)

目前市面上已经存在许多较为成熟的密钥管理产品,比如 HashiCorp Vault,AWS Secrets Manager 以及 GCP Secret Manager。由于这些产品需要集成和维护等服务,因此在项目中引入会增加一定成本和开销。阅读本文,将带你了解如何在 Dock…

numpy数值差分

文章目录diffediff1ddiff diff是numpy中用于求差分的函数&#xff0c;函数定义为 diff(a, n1, axis-1, prepend<no value>, append<no value>)其中a为数组&#xff0c;n为差分的阶数&#xff0c;axis为求导对应的坐标轴&#xff0c;默认-1表示最后一个轴。 例如…